Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{BTreeSet, HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 : use std::sync::Arc;
12 :
13 : use crate::walingest::{WalIngestError, WalIngestErrorKind};
14 : use crate::{PERF_TRACE_TARGET, ensure_walingest};
15 : use anyhow::Context;
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use pageserver_api::key::{
19 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
20 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
21 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
22 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
23 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
24 : };
25 : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
26 : use pageserver_api::models::RelSizeMigration;
27 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
28 : use pageserver_api::shard::ShardIdentity;
29 : use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
30 : use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
31 : use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
32 : use serde::{Deserialize, Serialize};
33 : use strum::IntoEnumIterator;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::{debug, info, info_span, trace, warn};
36 : use utils::bin_ser::{BeSer, DeserializeError};
37 : use utils::lsn::Lsn;
38 : use utils::pausable_failpoint;
39 : use wal_decoder::models::record::NeonWalRecord;
40 : use wal_decoder::models::value::Value;
41 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
42 :
43 : use super::tenant::{PageReconstructError, Timeline};
44 : use crate::aux_file;
45 : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
46 : use crate::keyspace::{KeySpace, KeySpaceAccum};
47 : use crate::metrics::{
48 : RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
49 : RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
50 : RELSIZE_SNAPSHOT_CACHE_MISSES,
51 : };
52 : use crate::span::{
53 : debug_assert_current_span_has_tenant_and_timeline_id,
54 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
55 : };
56 : use crate::tenant::storage_layer::IoConcurrency;
57 : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
58 :
59 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
60 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
61 :
62 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
63 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
64 :
65 : #[derive(Debug)]
66 : pub enum LsnForTimestamp {
67 : /// Found commits both before and after the given timestamp
68 : Present(Lsn),
69 :
70 : /// Found no commits after the given timestamp, this means
71 : /// that the newest data in the branch is older than the given
72 : /// timestamp.
73 : ///
74 : /// All commits <= LSN happened before the given timestamp
75 : Future(Lsn),
76 :
77 : /// The queried timestamp is past our horizon we look back at (PITR)
78 : ///
79 : /// All commits > LSN happened after the given timestamp,
80 : /// but any commits < LSN might have happened before or after
81 : /// the given timestamp. We don't know because no data before
82 : /// the given lsn is available.
83 : Past(Lsn),
84 :
85 : /// We have found no commit with a timestamp,
86 : /// so we can't return anything meaningful.
87 : ///
88 : /// The associated LSN is the lower bound value we can safely
89 : /// create branches on, but no statement is made if it is
90 : /// older or newer than the timestamp.
91 : ///
92 : /// This variant can e.g. be returned right after a
93 : /// cluster import.
94 : NoData(Lsn),
95 : }
96 :
97 : /// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
98 : /// See comments libs/pageserver_api/src/models.rs.
99 : /// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
100 : /// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
101 : #[derive(Debug, Clone, Copy, Default)]
102 : pub struct LsnRange {
103 : pub effective_lsn: Lsn,
104 : pub request_lsn: Lsn,
105 : }
106 :
107 : impl LsnRange {
108 0 : pub fn at(lsn: Lsn) -> LsnRange {
109 0 : LsnRange {
110 0 : effective_lsn: lsn,
111 0 : request_lsn: lsn,
112 0 : }
113 0 : }
114 16 : pub fn is_latest(&self) -> bool {
115 16 : self.request_lsn == Lsn::MAX
116 16 : }
117 : }
118 :
119 : #[derive(Debug, thiserror::Error)]
120 : pub(crate) enum CalculateLogicalSizeError {
121 : #[error("cancelled")]
122 : Cancelled,
123 :
124 : /// Something went wrong while reading the metadata we use to calculate logical size
125 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
126 : /// in the `From` implementation for this variant.
127 : #[error(transparent)]
128 : PageRead(PageReconstructError),
129 :
130 : /// Something went wrong deserializing metadata that we read to calculate logical size
131 : #[error("decode error: {0}")]
132 : Decode(#[from] DeserializeError),
133 : }
134 :
135 : #[derive(Debug, thiserror::Error)]
136 : pub(crate) enum CollectKeySpaceError {
137 : #[error(transparent)]
138 : Decode(#[from] DeserializeError),
139 : #[error(transparent)]
140 : PageRead(PageReconstructError),
141 : #[error("cancelled")]
142 : Cancelled,
143 : }
144 :
145 : impl CollectKeySpaceError {
146 0 : pub(crate) fn is_cancel(&self) -> bool {
147 0 : match self {
148 0 : CollectKeySpaceError::Decode(_) => false,
149 0 : CollectKeySpaceError::PageRead(e) => e.is_cancel(),
150 0 : CollectKeySpaceError::Cancelled => true,
151 : }
152 0 : }
153 0 : pub(crate) fn into_anyhow(self) -> anyhow::Error {
154 0 : match self {
155 0 : CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
156 0 : CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
157 0 : CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
158 : }
159 0 : }
160 : }
161 :
162 : impl From<PageReconstructError> for CollectKeySpaceError {
163 0 : fn from(err: PageReconstructError) -> Self {
164 0 : match err {
165 0 : PageReconstructError::Cancelled => Self::Cancelled,
166 0 : err => Self::PageRead(err),
167 : }
168 0 : }
169 : }
170 :
171 : impl From<PageReconstructError> for CalculateLogicalSizeError {
172 0 : fn from(pre: PageReconstructError) -> Self {
173 0 : match pre {
174 0 : PageReconstructError::Cancelled => Self::Cancelled,
175 0 : _ => Self::PageRead(pre),
176 : }
177 0 : }
178 : }
179 :
180 : #[derive(Debug, thiserror::Error)]
181 : pub enum RelationError {
182 : #[error("invalid relnode")]
183 : InvalidRelnode,
184 : }
185 :
186 : ///
187 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
188 : /// and other special kinds of files, in a versioned key-value store. The
189 : /// Timeline struct provides the key-value store.
190 : ///
191 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
192 : /// implementation, and might be moved into a separate struct later.
193 : impl Timeline {
194 : /// Start ingesting a WAL record, or other atomic modification of
195 : /// the timeline.
196 : ///
197 : /// This provides a transaction-like interface to perform a bunch
198 : /// of modifications atomically.
199 : ///
200 : /// To ingest a WAL record, call begin_modification(lsn) to get a
201 : /// DatadirModification object. Use the functions in the object to
202 : /// modify the repository state, updating all the pages and metadata
203 : /// that the WAL record affects. When you're done, call commit() to
204 : /// commit the changes.
205 : ///
206 : /// Lsn stored in modification is advanced by `ingest_record` and
207 : /// is used by `commit()` to update `last_record_lsn`.
208 : ///
209 : /// Calling commit() will flush all the changes and reset the state,
210 : /// so the `DatadirModification` struct can be reused to perform the next modification.
211 : ///
212 : /// Note that any pending modifications you make through the
213 : /// modification object won't be visible to calls to the 'get' and list
214 : /// functions of the timeline until you finish! And if you update the
215 : /// same page twice, the last update wins.
216 : ///
217 134213 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
218 134213 : where
219 134213 : Self: Sized,
220 : {
221 134213 : DatadirModification {
222 134213 : tline: self,
223 134213 : pending_lsns: Vec::new(),
224 134213 : pending_metadata_pages: HashMap::new(),
225 134213 : pending_data_batch: None,
226 134213 : pending_deletions: Vec::new(),
227 134213 : pending_nblocks: 0,
228 134213 : pending_directory_entries: Vec::new(),
229 134213 : pending_metadata_bytes: 0,
230 134213 : is_importing_pgdata: false,
231 134213 : lsn,
232 134213 : }
233 134213 : }
234 :
235 2 : pub fn begin_modification_for_import(&self, lsn: Lsn) -> DatadirModification
236 2 : where
237 2 : Self: Sized,
238 : {
239 2 : DatadirModification {
240 2 : tline: self,
241 2 : pending_lsns: Vec::new(),
242 2 : pending_metadata_pages: HashMap::new(),
243 2 : pending_data_batch: None,
244 2 : pending_deletions: Vec::new(),
245 2 : pending_nblocks: 0,
246 2 : pending_directory_entries: Vec::new(),
247 2 : pending_metadata_bytes: 0,
248 2 : is_importing_pgdata: true,
249 2 : lsn,
250 2 : }
251 2 : }
252 :
253 : //------------------------------------------------------------------------------
254 : // Public GET functions
255 : //------------------------------------------------------------------------------
256 :
257 : /// Look up given page version.
258 9192 : pub(crate) async fn get_rel_page_at_lsn(
259 9192 : &self,
260 9192 : tag: RelTag,
261 9192 : blknum: BlockNumber,
262 9192 : version: Version<'_>,
263 9192 : ctx: &RequestContext,
264 9192 : io_concurrency: IoConcurrency,
265 9192 : ) -> Result<Bytes, PageReconstructError> {
266 9192 : match version {
267 9192 : Version::LsnRange(lsns) => {
268 9192 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
269 9192 : let res = self
270 9192 : .get_rel_page_at_lsn_batched(
271 9192 : pages
272 9192 : .iter()
273 9192 : .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
274 9192 : io_concurrency.clone(),
275 9192 : ctx,
276 : )
277 9192 : .await;
278 9192 : assert_eq!(res.len(), 1);
279 9192 : res.into_iter().next().unwrap()
280 : }
281 0 : Version::Modified(modification) => {
282 0 : if tag.relnode == 0 {
283 0 : return Err(PageReconstructError::Other(
284 0 : RelationError::InvalidRelnode.into(),
285 0 : ));
286 0 : }
287 :
288 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
289 0 : if blknum >= nblocks {
290 0 : debug!(
291 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
292 : tag,
293 : blknum,
294 0 : version.get_lsn(),
295 : nblocks
296 : );
297 0 : return Ok(ZERO_PAGE.clone());
298 0 : }
299 :
300 0 : let key = rel_block_to_key(tag, blknum);
301 0 : modification.get(key, ctx).await
302 : }
303 : }
304 9192 : }
305 :
306 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
307 : ///
308 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
309 : ///
310 : /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
311 : /// if the client goes away (e.g. due to timeout or cancellation).
312 : /// TODO: verify that it actually is cancellation-safe.
313 9192 : pub(crate) async fn get_rel_page_at_lsn_batched(
314 9192 : &self,
315 9192 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
316 9192 : io_concurrency: IoConcurrency,
317 9192 : ctx: &RequestContext,
318 9192 : ) -> Vec<Result<Bytes, PageReconstructError>> {
319 9192 : debug_assert_current_span_has_tenant_and_timeline_id();
320 :
321 9192 : let mut slots_filled = 0;
322 9192 : let page_count = pages.len();
323 :
324 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
325 9192 : let mut result = Vec::with_capacity(pages.len());
326 9192 : let result_slots = result.spare_capacity_mut();
327 :
328 9192 : let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
329 9192 : HashMap::with_capacity(pages.len());
330 :
331 9192 : let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
332 9192 : HashMap::with_capacity(pages.len());
333 :
334 9192 : for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
335 9192 : if tag.relnode == 0 {
336 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
337 0 : RelationError::InvalidRelnode.into(),
338 0 : )));
339 :
340 0 : slots_filled += 1;
341 0 : continue;
342 9192 : }
343 9192 : let lsn = lsns.effective_lsn;
344 9192 : let nblocks = {
345 9192 : let ctx = RequestContextBuilder::from(&ctx)
346 9192 : .perf_span(|crnt_perf_span| {
347 0 : info_span!(
348 : target: PERF_TRACE_TARGET,
349 0 : parent: crnt_perf_span,
350 : "GET_REL_SIZE",
351 : reltag=%tag,
352 : lsn=%lsn,
353 : )
354 0 : })
355 9192 : .attached_child();
356 :
357 9192 : match self
358 9192 : .get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
359 9192 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
360 9192 : .await
361 : {
362 9192 : Ok(nblocks) => nblocks,
363 0 : Err(err) => {
364 0 : result_slots[response_slot_idx].write(Err(err));
365 0 : slots_filled += 1;
366 0 : continue;
367 : }
368 : }
369 : };
370 :
371 9192 : if *blknum >= nblocks {
372 0 : debug!(
373 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
374 : tag, blknum, lsn, nblocks
375 : );
376 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
377 0 : slots_filled += 1;
378 0 : continue;
379 9192 : }
380 :
381 9192 : let key = rel_block_to_key(*tag, *blknum);
382 :
383 9192 : let ctx = RequestContextBuilder::from(&ctx)
384 9192 : .perf_span(|crnt_perf_span| {
385 0 : info_span!(
386 : target: PERF_TRACE_TARGET,
387 0 : parent: crnt_perf_span,
388 : "GET_BATCH",
389 : batch_size = %page_count,
390 : )
391 0 : })
392 9192 : .attached_child();
393 :
394 9192 : let key_slots = keys_slots.entry(key).or_default();
395 9192 : key_slots.push((response_slot_idx, ctx));
396 :
397 9192 : let acc = req_keyspaces.entry(lsn).or_default();
398 9192 : acc.add_key(key);
399 : }
400 :
401 9192 : let query: Vec<(Lsn, KeySpace)> = req_keyspaces
402 9192 : .into_iter()
403 9192 : .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
404 9192 : .collect();
405 :
406 9192 : let query = VersionedKeySpaceQuery::scattered(query);
407 9192 : let res = self
408 9192 : .get_vectored(query, io_concurrency, ctx)
409 9192 : .maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
410 9192 : .await;
411 :
412 9192 : match res {
413 9192 : Ok(results) => {
414 18384 : for (key, res) in results {
415 9192 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
416 9192 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
417 :
418 9192 : for (slot, req_ctx) in key_slots {
419 0 : let clone = match &res {
420 0 : Ok(buf) => Ok(buf.clone()),
421 0 : Err(err) => Err(match err {
422 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
423 :
424 0 : x @ PageReconstructError::Other(_)
425 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
426 0 : | x @ PageReconstructError::WalRedo(_)
427 0 : | x @ PageReconstructError::MissingKey(_) => {
428 0 : PageReconstructError::Other(anyhow::anyhow!(
429 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
430 0 : ))
431 : }
432 : }),
433 : };
434 :
435 0 : result_slots[slot].write(clone);
436 : // There is no standardized way to express that the batched span followed from N request spans.
437 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
438 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
439 0 : req_ctx.perf_follows_from(ctx);
440 0 : slots_filled += 1;
441 : }
442 :
443 9192 : result_slots[first_slot].write(res);
444 9192 : first_req_ctx.perf_follows_from(ctx);
445 9192 : slots_filled += 1;
446 : }
447 : }
448 0 : Err(err) => {
449 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
450 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
451 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
452 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
453 : // but without taking ownership of the GetVectoredError
454 0 : let err = match &err {
455 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
456 : // TODO: restructure get_vectored API to make this error per-key
457 0 : GetVectoredError::MissingKey(err) => {
458 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
459 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
460 0 : )))
461 : }
462 : // TODO: restructure get_vectored API to make this error per-key
463 0 : GetVectoredError::GetReadyAncestorError(err) => {
464 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
465 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
466 0 : )))
467 : }
468 : // TODO: restructure get_vectored API to make this error per-key
469 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
470 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
471 0 : )),
472 : // TODO: we can prevent this error class by moving this check into the type system
473 0 : GetVectoredError::InvalidLsn(e) => {
474 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
475 : }
476 : // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys
477 : // TODO: we can prevent this error class by moving this check into the type system
478 0 : GetVectoredError::Oversized(err, max) => {
479 0 : Err(anyhow::anyhow!("batching oversized: {err} > {max}").into())
480 : }
481 : };
482 :
483 0 : req_ctx.perf_follows_from(ctx);
484 0 : result_slots[*slot].write(err);
485 : }
486 :
487 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
488 : }
489 : };
490 :
491 9192 : assert_eq!(slots_filled, page_count);
492 : // SAFETY:
493 : // 1. `result` and any of its uninint members are not read from until this point
494 : // 2. The length below is tracked at run-time and matches the number of requested pages.
495 9192 : unsafe {
496 9192 : result.set_len(page_count);
497 9192 : }
498 :
499 9192 : result
500 9192 : }
501 :
502 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
503 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
504 : /// for pages up to the highest page number it has stored.
505 0 : pub(crate) async fn get_db_size(
506 0 : &self,
507 0 : spcnode: Oid,
508 0 : dbnode: Oid,
509 0 : version: Version<'_>,
510 0 : ctx: &RequestContext,
511 0 : ) -> Result<usize, PageReconstructError> {
512 0 : let mut total_blocks = 0;
513 :
514 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
515 :
516 0 : if rels.is_empty() {
517 0 : return Ok(0);
518 0 : }
519 :
520 : // Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`.
521 : // TODO: refactor this read path to use reldir v2.
522 0 : let reldir_key = rel_dir_to_key(spcnode, dbnode);
523 0 : let buf = version.get(self, reldir_key, ctx).await?;
524 0 : let reldir = RelDirectory::des(&buf)?;
525 :
526 0 : for rel in rels {
527 0 : let n_blocks = self
528 0 : .get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), false, ctx)
529 0 : .await?
530 0 : .expect("allow_missing=false");
531 0 : total_blocks += n_blocks as usize;
532 : }
533 0 : Ok(total_blocks)
534 0 : }
535 :
536 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
537 : ///
538 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
539 : /// page number stored in the shard.
540 12217 : pub(crate) async fn get_rel_size(
541 12217 : &self,
542 12217 : tag: RelTag,
543 12217 : version: Version<'_>,
544 12217 : ctx: &RequestContext,
545 12217 : ) -> Result<BlockNumber, PageReconstructError> {
546 12217 : Ok(self
547 12217 : .get_rel_size_in_reldir(tag, version, None, false, ctx)
548 12217 : .await?
549 12215 : .expect("allow_missing=false"))
550 12217 : }
551 :
552 : /// Get size of a relation file. If `allow_missing` is true, returns None for missing relations,
553 : /// otherwise errors.
554 : ///
555 : /// INVARIANT: never returns None if `allow_missing=false`.
556 : ///
557 : /// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
558 12217 : pub(crate) async fn get_rel_size_in_reldir(
559 12217 : &self,
560 12217 : tag: RelTag,
561 12217 : version: Version<'_>,
562 12217 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
563 12217 : allow_missing: bool,
564 12217 : ctx: &RequestContext,
565 12217 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
566 12217 : if tag.relnode == 0 {
567 0 : return Err(PageReconstructError::Other(
568 0 : RelationError::InvalidRelnode.into(),
569 0 : ));
570 12217 : }
571 :
572 12217 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
573 12210 : return Ok(Some(nblocks));
574 7 : }
575 :
576 7 : if allow_missing
577 0 : && !self
578 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
579 0 : .await?
580 : {
581 0 : return Ok(None);
582 7 : }
583 :
584 7 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
585 0 : && !self
586 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
587 0 : .await?
588 : {
589 : // FIXME: Postgres sometimes calls smgrcreate() to create
590 : // FSM, and smgrnblocks() on it immediately afterwards,
591 : // without extending it. Tolerate that by claiming that
592 : // any non-existent FSM fork has size 0.
593 0 : return Ok(Some(0));
594 7 : }
595 :
596 7 : let key = rel_size_to_key(tag);
597 7 : let mut buf = version.get(self, key, ctx).await?;
598 5 : let nblocks = buf.get_u32_le();
599 :
600 5 : self.update_cached_rel_size(tag, version, nblocks);
601 :
602 5 : Ok(Some(nblocks))
603 12217 : }
604 :
605 : /// Does the relation exist?
606 : ///
607 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
608 : /// the shard stores pages for.
609 : ///
610 3025 : pub(crate) async fn get_rel_exists(
611 3025 : &self,
612 3025 : tag: RelTag,
613 3025 : version: Version<'_>,
614 3025 : ctx: &RequestContext,
615 3025 : ) -> Result<bool, PageReconstructError> {
616 3025 : self.get_rel_exists_in_reldir(tag, version, None, ctx).await
617 3025 : }
618 :
619 9 : async fn get_rel_exists_in_reldir_v1(
620 9 : &self,
621 9 : tag: RelTag,
622 9 : version: Version<'_>,
623 9 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
624 9 : ctx: &RequestContext,
625 9 : ) -> Result<bool, PageReconstructError> {
626 9 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
627 9 : if let Some((cached_key, dir)) = deserialized_reldir_v1 {
628 0 : if cached_key == key {
629 0 : return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
630 0 : } else if cfg!(test) || cfg!(feature = "testing") {
631 0 : panic!("cached reldir key mismatch: {cached_key} != {key}");
632 : } else {
633 0 : warn!("cached reldir key mismatch: {cached_key} != {key}");
634 : }
635 : // Fallback to reading the directory from the datadir.
636 9 : }
637 :
638 9 : let buf = version.get(self, key, ctx).await?;
639 :
640 9 : let dir = RelDirectory::des(&buf)?;
641 9 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
642 9 : }
643 :
644 0 : async fn get_rel_exists_in_reldir_v2(
645 0 : &self,
646 0 : tag: RelTag,
647 0 : version: Version<'_>,
648 0 : ctx: &RequestContext,
649 0 : ) -> Result<bool, PageReconstructError> {
650 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
651 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?).map_err(
652 0 : |_| {
653 0 : PageReconstructError::Other(anyhow::anyhow!(
654 0 : "invalid reldir key: decode failed, {}",
655 0 : key
656 0 : ))
657 0 : },
658 0 : )?;
659 0 : let exists_v2 = buf == RelDirExists::Exists;
660 0 : Ok(exists_v2)
661 0 : }
662 :
663 : /// Does the relation exist? With a cached deserialized `RelDirectory`.
664 : ///
665 : /// There are some cases where the caller loops across all relations. In that specific case,
666 : /// the caller should obtain the deserialized `RelDirectory` first and then call this function
667 : /// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing
668 : /// a new API (e.g., `get_rel_exists_batched`).
669 3025 : pub(crate) async fn get_rel_exists_in_reldir(
670 3025 : &self,
671 3025 : tag: RelTag,
672 3025 : version: Version<'_>,
673 3025 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
674 3025 : ctx: &RequestContext,
675 3025 : ) -> Result<bool, PageReconstructError> {
676 3025 : if tag.relnode == 0 {
677 0 : return Err(PageReconstructError::Other(
678 0 : RelationError::InvalidRelnode.into(),
679 0 : ));
680 3025 : }
681 :
682 : // first try to lookup relation in cache
683 3025 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
684 3016 : return Ok(true);
685 9 : }
686 : // then check if the database was already initialized.
687 : // get_rel_exists can be called before dbdir is created.
688 9 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
689 9 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
690 9 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
691 0 : return Ok(false);
692 9 : }
693 :
694 9 : let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
695 :
696 0 : match v2_status {
697 : RelSizeMigration::Legacy => {
698 9 : let v1_exists = self
699 9 : .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
700 9 : .await?;
701 9 : Ok(v1_exists)
702 : }
703 : RelSizeMigration::Migrating | RelSizeMigration::Migrated
704 0 : if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
705 : {
706 : // For requests below the migrated LSN, we still use the v1 read path.
707 0 : let v1_exists = self
708 0 : .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
709 0 : .await?;
710 0 : Ok(v1_exists)
711 : }
712 : RelSizeMigration::Migrating => {
713 0 : let v1_exists = self
714 0 : .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
715 0 : .await?;
716 0 : let v2_exists_res = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await;
717 0 : match v2_exists_res {
718 0 : Ok(v2_exists) if v1_exists == v2_exists => {}
719 0 : Ok(v2_exists) => {
720 0 : tracing::warn!(
721 0 : "inconsistent v1/v2 reldir keyspace for rel {}: v1_exists={}, v2_exists={}",
722 : tag,
723 : v1_exists,
724 : v2_exists,
725 : );
726 : }
727 0 : Err(e) if e.is_cancel() => {
728 0 : // Cancellation errors are fine to ignore, do not log.
729 0 : }
730 0 : Err(e) => {
731 0 : tracing::warn!("failed to get rel exists in v2: {e}");
732 : }
733 : }
734 0 : Ok(v1_exists)
735 : }
736 : RelSizeMigration::Migrated => {
737 0 : let v2_exists = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await?;
738 0 : Ok(v2_exists)
739 : }
740 : }
741 3025 : }
742 :
743 0 : async fn list_rels_v1(
744 0 : &self,
745 0 : spcnode: Oid,
746 0 : dbnode: Oid,
747 0 : version: Version<'_>,
748 0 : ctx: &RequestContext,
749 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
750 0 : let key = rel_dir_to_key(spcnode, dbnode);
751 0 : let buf = version.get(self, key, ctx).await?;
752 0 : let dir = RelDirectory::des(&buf)?;
753 0 : let rels_v1: HashSet<RelTag> =
754 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
755 0 : spcnode,
756 0 : dbnode,
757 0 : relnode: *relnode,
758 0 : forknum: *forknum,
759 0 : }));
760 0 : Ok(rels_v1)
761 0 : }
762 :
763 0 : async fn list_rels_v2(
764 0 : &self,
765 0 : spcnode: Oid,
766 0 : dbnode: Oid,
767 0 : version: Version<'_>,
768 0 : ctx: &RequestContext,
769 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
770 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
771 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
772 0 : self.conf.get_vectored_concurrent_io,
773 0 : self.gate
774 0 : .enter()
775 0 : .map_err(|_| PageReconstructError::Cancelled)?,
776 : );
777 0 : let results = self
778 0 : .scan(
779 0 : KeySpace::single(key_range),
780 0 : version.get_lsn(),
781 0 : ctx,
782 0 : io_concurrency,
783 0 : )
784 0 : .await?;
785 0 : let mut rels = HashSet::new();
786 0 : for (key, val) in results {
787 0 : let val = RelDirExists::decode(&val?).map_err(|_| {
788 0 : PageReconstructError::Other(anyhow::anyhow!(
789 0 : "invalid reldir key: decode failed, {}",
790 0 : key
791 0 : ))
792 0 : })?;
793 0 : if key.field6 != 1 {
794 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
795 0 : "invalid reldir key: field6 != 1, {}",
796 0 : key
797 0 : )));
798 0 : }
799 0 : if key.field2 != spcnode {
800 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
801 0 : "invalid reldir key: field2 != spcnode, {}",
802 0 : key
803 0 : )));
804 0 : }
805 0 : if key.field3 != dbnode {
806 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
807 0 : "invalid reldir key: field3 != dbnode, {}",
808 0 : key
809 0 : )));
810 0 : }
811 0 : let tag = RelTag {
812 0 : spcnode,
813 0 : dbnode,
814 0 : relnode: key.field4,
815 0 : forknum: key.field5,
816 0 : };
817 0 : if val == RelDirExists::Removed {
818 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2: {tag}");
819 0 : continue;
820 0 : }
821 0 : let did_not_contain = rels.insert(tag);
822 0 : debug_assert!(did_not_contain, "duplicate reltag in v2: {tag}");
823 : }
824 0 : Ok(rels)
825 0 : }
826 :
827 : /// Get a list of all existing relations in given tablespace and database.
828 : ///
829 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
830 : /// the shard stores pages for.
831 : ///
832 : /// # Cancel-Safety
833 : ///
834 : /// This method is cancellation-safe.
835 0 : pub(crate) async fn list_rels(
836 0 : &self,
837 0 : spcnode: Oid,
838 0 : dbnode: Oid,
839 0 : version: Version<'_>,
840 0 : ctx: &RequestContext,
841 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
842 0 : let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
843 :
844 0 : match v2_status {
845 : RelSizeMigration::Legacy => {
846 0 : let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
847 0 : Ok(rels_v1)
848 : }
849 : RelSizeMigration::Migrating | RelSizeMigration::Migrated
850 0 : if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
851 : {
852 : // For requests below the migrated LSN, we still use the v1 read path.
853 0 : let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
854 0 : Ok(rels_v1)
855 : }
856 : RelSizeMigration::Migrating => {
857 0 : let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
858 0 : let rels_v2_res = self.list_rels_v2(spcnode, dbnode, version, ctx).await;
859 0 : match rels_v2_res {
860 0 : Ok(rels_v2) if rels_v1 == rels_v2 => {}
861 0 : Ok(rels_v2) => {
862 0 : tracing::warn!(
863 0 : "inconsistent v1/v2 reldir keyspace for db {} {}: v1_rels.len()={}, v2_rels.len()={}",
864 : spcnode,
865 : dbnode,
866 0 : rels_v1.len(),
867 0 : rels_v2.len()
868 : );
869 : }
870 0 : Err(e) if e.is_cancel() => {
871 0 : // Cancellation errors are fine to ignore, do not log.
872 0 : }
873 0 : Err(e) => {
874 0 : tracing::warn!("failed to list rels in v2: {e}");
875 : }
876 : }
877 0 : Ok(rels_v1)
878 : }
879 : RelSizeMigration::Migrated => {
880 0 : let rels_v2 = self.list_rels_v2(spcnode, dbnode, version, ctx).await?;
881 0 : Ok(rels_v2)
882 : }
883 : }
884 0 : }
885 :
886 : /// Get the whole SLRU segment
887 0 : pub(crate) async fn get_slru_segment(
888 0 : &self,
889 0 : kind: SlruKind,
890 0 : segno: u32,
891 0 : lsn: Lsn,
892 0 : ctx: &RequestContext,
893 0 : ) -> Result<Bytes, PageReconstructError> {
894 0 : assert!(self.tenant_shard_id.is_shard_zero());
895 0 : let n_blocks = self
896 0 : .get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
897 0 : .await?;
898 :
899 0 : let keyspace = KeySpace::single(
900 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
901 : );
902 :
903 0 : let batches = keyspace.partition(
904 0 : self.get_shard_identity(),
905 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
906 0 : BLCKSZ as u64,
907 : );
908 :
909 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
910 0 : self.conf.get_vectored_concurrent_io,
911 0 : self.gate
912 0 : .enter()
913 0 : .map_err(|_| PageReconstructError::Cancelled)?,
914 : );
915 :
916 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
917 0 : for batch in batches.parts {
918 0 : let query = VersionedKeySpaceQuery::uniform(batch, lsn);
919 0 : let blocks = self
920 0 : .get_vectored(query, io_concurrency.clone(), ctx)
921 0 : .await?;
922 :
923 0 : for (_key, block) in blocks {
924 0 : let block = block?;
925 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
926 : }
927 : }
928 :
929 0 : Ok(segment.freeze())
930 0 : }
931 :
932 : /// Get size of an SLRU segment
933 0 : pub(crate) async fn get_slru_segment_size(
934 0 : &self,
935 0 : kind: SlruKind,
936 0 : segno: u32,
937 0 : version: Version<'_>,
938 0 : ctx: &RequestContext,
939 0 : ) -> Result<BlockNumber, PageReconstructError> {
940 0 : assert!(self.tenant_shard_id.is_shard_zero());
941 0 : let key = slru_segment_size_to_key(kind, segno);
942 0 : let mut buf = version.get(self, key, ctx).await?;
943 0 : Ok(buf.get_u32_le())
944 0 : }
945 :
946 : /// Does the slru segment exist?
947 0 : pub(crate) async fn get_slru_segment_exists(
948 0 : &self,
949 0 : kind: SlruKind,
950 0 : segno: u32,
951 0 : version: Version<'_>,
952 0 : ctx: &RequestContext,
953 0 : ) -> Result<bool, PageReconstructError> {
954 0 : assert!(self.tenant_shard_id.is_shard_zero());
955 : // fetch directory listing
956 0 : let key = slru_dir_to_key(kind);
957 0 : let buf = version.get(self, key, ctx).await?;
958 :
959 0 : let dir = SlruSegmentDirectory::des(&buf)?;
960 0 : Ok(dir.segments.contains(&segno))
961 0 : }
962 :
963 : /// Locate LSN, such that all transactions that committed before
964 : /// 'search_timestamp' are visible, but nothing newer is.
965 : ///
966 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
967 : /// so it's not well defined which LSN you get if there were multiple commits
968 : /// "in flight" at that point in time.
969 : ///
970 0 : pub(crate) async fn find_lsn_for_timestamp(
971 0 : &self,
972 0 : search_timestamp: TimestampTz,
973 0 : cancel: &CancellationToken,
974 0 : ctx: &RequestContext,
975 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
976 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
977 :
978 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
979 0 : let gc_cutoff_planned = {
980 0 : let gc_info = self.gc_info.read().unwrap();
981 0 : info!(cutoffs=?gc_info.cutoffs, applied_cutoff=%*gc_cutoff_lsn_guard, "starting find_lsn_for_timestamp");
982 0 : gc_info.min_cutoff()
983 : };
984 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
985 : // but let's be defensive.
986 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
987 : // We use this method to figure out the branching LSN for the new branch, but the
988 : // GC cutoff could be before the branching point and we cannot create a new branch
989 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
990 : // on the safe side.
991 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
992 0 : let max_lsn = self.get_last_record_lsn();
993 :
994 : // LSNs are always 8-byte aligned. low/mid/high represent the
995 : // LSN divided by 8.
996 0 : let mut low = min_lsn.0 / 8;
997 0 : let mut high = max_lsn.0 / 8 + 1;
998 :
999 0 : let mut found_smaller = false;
1000 0 : let mut found_larger = false;
1001 :
1002 0 : while low < high {
1003 0 : if cancel.is_cancelled() {
1004 0 : return Err(PageReconstructError::Cancelled);
1005 0 : }
1006 : // cannot overflow, high and low are both smaller than u64::MAX / 2
1007 0 : let mid = (high + low) / 2;
1008 :
1009 0 : let cmp = match self
1010 0 : .is_latest_commit_timestamp_ge_than(
1011 0 : search_timestamp,
1012 0 : Lsn(mid * 8),
1013 0 : &mut found_smaller,
1014 0 : &mut found_larger,
1015 0 : ctx,
1016 : )
1017 0 : .await
1018 : {
1019 0 : Ok(res) => res,
1020 0 : Err(PageReconstructError::MissingKey(e)) => {
1021 0 : warn!(
1022 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
1023 : e
1024 : );
1025 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
1026 0 : return Ok(LsnForTimestamp::Past(min_lsn));
1027 : }
1028 0 : Err(e) => return Err(e),
1029 : };
1030 :
1031 0 : if cmp {
1032 0 : high = mid;
1033 0 : } else {
1034 0 : low = mid + 1;
1035 0 : }
1036 : }
1037 :
1038 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
1039 : // so the LSN of the last commit record before or at `search_timestamp`.
1040 : // Remove one from `low` to get `t`.
1041 : //
1042 : // FIXME: it would be better to get the LSN of the previous commit.
1043 : // Otherwise, if you restore to the returned LSN, the database will
1044 : // include physical changes from later commits that will be marked
1045 : // as aborted, and will need to be vacuumed away.
1046 0 : let commit_lsn = Lsn((low - 1) * 8);
1047 0 : match (found_smaller, found_larger) {
1048 : (false, false) => {
1049 : // This can happen if no commit records have been processed yet, e.g.
1050 : // just after importing a cluster.
1051 0 : Ok(LsnForTimestamp::NoData(min_lsn))
1052 : }
1053 : (false, true) => {
1054 : // Didn't find any commit timestamps smaller than the request
1055 0 : Ok(LsnForTimestamp::Past(min_lsn))
1056 : }
1057 0 : (true, _) if commit_lsn < min_lsn => {
1058 : // the search above did set found_smaller to true but it never increased the lsn.
1059 : // Then, low is still the old min_lsn, and the subtraction above gave a value
1060 : // below the min_lsn. We should never do that.
1061 0 : Ok(LsnForTimestamp::Past(min_lsn))
1062 : }
1063 : (true, false) => {
1064 : // Only found commits with timestamps smaller than the request.
1065 : // It's still a valid case for branch creation, return it.
1066 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
1067 : // case, anyway.
1068 0 : Ok(LsnForTimestamp::Future(commit_lsn))
1069 : }
1070 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
1071 : }
1072 0 : }
1073 :
1074 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
1075 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
1076 : ///
1077 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
1078 : /// with a smaller/larger timestamp.
1079 : ///
1080 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
1081 0 : &self,
1082 0 : search_timestamp: TimestampTz,
1083 0 : probe_lsn: Lsn,
1084 0 : found_smaller: &mut bool,
1085 0 : found_larger: &mut bool,
1086 0 : ctx: &RequestContext,
1087 0 : ) -> Result<bool, PageReconstructError> {
1088 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
1089 0 : if timestamp >= search_timestamp {
1090 0 : *found_larger = true;
1091 0 : return ControlFlow::Break(true);
1092 0 : } else {
1093 0 : *found_smaller = true;
1094 0 : }
1095 0 : ControlFlow::Continue(())
1096 0 : })
1097 0 : .await
1098 0 : }
1099 :
1100 : /// Obtain the timestamp for the given lsn.
1101 : ///
1102 : /// If the lsn has no timestamps (e.g. no commits), returns None.
1103 0 : pub(crate) async fn get_timestamp_for_lsn(
1104 0 : &self,
1105 0 : probe_lsn: Lsn,
1106 0 : ctx: &RequestContext,
1107 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
1108 0 : let mut max: Option<TimestampTz> = None;
1109 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
1110 0 : if let Some(max_prev) = max {
1111 0 : max = Some(max_prev.max(timestamp));
1112 0 : } else {
1113 0 : max = Some(timestamp);
1114 0 : }
1115 0 : ControlFlow::Continue(())
1116 0 : })
1117 0 : .await?;
1118 :
1119 0 : Ok(max)
1120 0 : }
1121 :
1122 : /// Runs the given function on all the timestamps for a given lsn
1123 : ///
1124 : /// The return value is either given by the closure, or set to the `Default`
1125 : /// impl's output.
1126 0 : async fn map_all_timestamps<T: Default>(
1127 0 : &self,
1128 0 : probe_lsn: Lsn,
1129 0 : ctx: &RequestContext,
1130 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
1131 0 : ) -> Result<T, PageReconstructError> {
1132 0 : for segno in self
1133 0 : .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
1134 0 : .await?
1135 : {
1136 0 : let nblocks = self
1137 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
1138 0 : .await?;
1139 :
1140 0 : let keyspace = KeySpace::single(
1141 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
1142 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
1143 : );
1144 :
1145 0 : let batches = keyspace.partition(
1146 0 : self.get_shard_identity(),
1147 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
1148 0 : BLCKSZ as u64,
1149 : );
1150 :
1151 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
1152 0 : self.conf.get_vectored_concurrent_io,
1153 0 : self.gate
1154 0 : .enter()
1155 0 : .map_err(|_| PageReconstructError::Cancelled)?,
1156 : );
1157 :
1158 0 : for batch in batches.parts.into_iter().rev() {
1159 0 : let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
1160 0 : let blocks = self
1161 0 : .get_vectored(query, io_concurrency.clone(), ctx)
1162 0 : .await?;
1163 :
1164 0 : for (_key, clog_page) in blocks.into_iter().rev() {
1165 0 : let clog_page = clog_page?;
1166 :
1167 0 : if clog_page.len() == BLCKSZ as usize + 8 {
1168 0 : let mut timestamp_bytes = [0u8; 8];
1169 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
1170 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
1171 :
1172 0 : match f(timestamp) {
1173 0 : ControlFlow::Break(b) => return Ok(b),
1174 0 : ControlFlow::Continue(()) => (),
1175 : }
1176 0 : }
1177 : }
1178 : }
1179 : }
1180 0 : Ok(Default::default())
1181 0 : }
1182 :
1183 0 : pub(crate) async fn get_slru_keyspace(
1184 0 : &self,
1185 0 : version: Version<'_>,
1186 0 : ctx: &RequestContext,
1187 0 : ) -> Result<KeySpace, PageReconstructError> {
1188 0 : let mut accum = KeySpaceAccum::new();
1189 :
1190 0 : for kind in SlruKind::iter() {
1191 0 : let mut segments: Vec<u32> = self
1192 0 : .list_slru_segments(kind, version, ctx)
1193 0 : .await?
1194 0 : .into_iter()
1195 0 : .collect();
1196 0 : segments.sort_unstable();
1197 :
1198 0 : for seg in segments {
1199 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
1200 :
1201 0 : accum.add_range(
1202 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
1203 : );
1204 : }
1205 : }
1206 :
1207 0 : Ok(accum.to_keyspace())
1208 0 : }
1209 :
1210 : /// Get a list of SLRU segments
1211 0 : pub(crate) async fn list_slru_segments(
1212 0 : &self,
1213 0 : kind: SlruKind,
1214 0 : version: Version<'_>,
1215 0 : ctx: &RequestContext,
1216 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
1217 : // fetch directory entry
1218 0 : let key = slru_dir_to_key(kind);
1219 :
1220 0 : let buf = version.get(self, key, ctx).await?;
1221 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
1222 0 : }
1223 :
1224 0 : pub(crate) async fn get_relmap_file(
1225 0 : &self,
1226 0 : spcnode: Oid,
1227 0 : dbnode: Oid,
1228 0 : version: Version<'_>,
1229 0 : ctx: &RequestContext,
1230 0 : ) -> Result<Bytes, PageReconstructError> {
1231 0 : let key = relmap_file_key(spcnode, dbnode);
1232 :
1233 0 : let buf = version.get(self, key, ctx).await?;
1234 0 : Ok(buf)
1235 0 : }
1236 :
1237 169 : pub(crate) async fn list_dbdirs(
1238 169 : &self,
1239 169 : lsn: Lsn,
1240 169 : ctx: &RequestContext,
1241 169 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
1242 : // fetch directory entry
1243 169 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1244 :
1245 169 : Ok(DbDirectory::des(&buf)?.dbdirs)
1246 169 : }
1247 :
1248 0 : pub(crate) async fn get_twophase_file(
1249 0 : &self,
1250 0 : xid: u64,
1251 0 : lsn: Lsn,
1252 0 : ctx: &RequestContext,
1253 0 : ) -> Result<Bytes, PageReconstructError> {
1254 0 : let key = twophase_file_key(xid);
1255 0 : let buf = self.get(key, lsn, ctx).await?;
1256 0 : Ok(buf)
1257 0 : }
1258 :
1259 170 : pub(crate) async fn list_twophase_files(
1260 170 : &self,
1261 170 : lsn: Lsn,
1262 170 : ctx: &RequestContext,
1263 170 : ) -> Result<HashSet<u64>, PageReconstructError> {
1264 : // fetch directory entry
1265 170 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
1266 :
1267 170 : if self.pg_version >= PgMajorVersion::PG17 {
1268 157 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
1269 : } else {
1270 13 : Ok(TwoPhaseDirectory::des(&buf)?
1271 : .xids
1272 13 : .iter()
1273 13 : .map(|x| u64::from(*x))
1274 13 : .collect())
1275 : }
1276 170 : }
1277 :
1278 0 : pub(crate) async fn get_control_file(
1279 0 : &self,
1280 0 : lsn: Lsn,
1281 0 : ctx: &RequestContext,
1282 0 : ) -> Result<Bytes, PageReconstructError> {
1283 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1284 0 : }
1285 :
1286 6 : pub(crate) async fn get_checkpoint(
1287 6 : &self,
1288 6 : lsn: Lsn,
1289 6 : ctx: &RequestContext,
1290 6 : ) -> Result<Bytes, PageReconstructError> {
1291 6 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1292 6 : }
1293 :
1294 6 : async fn list_aux_files_v2(
1295 6 : &self,
1296 6 : lsn: Lsn,
1297 6 : ctx: &RequestContext,
1298 6 : io_concurrency: IoConcurrency,
1299 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1300 6 : let kv = self
1301 6 : .scan(
1302 6 : KeySpace::single(Key::metadata_aux_key_range()),
1303 6 : lsn,
1304 6 : ctx,
1305 6 : io_concurrency,
1306 6 : )
1307 6 : .await?;
1308 6 : let mut result = HashMap::new();
1309 6 : let mut sz = 0;
1310 15 : for (_, v) in kv {
1311 9 : let v = v?;
1312 9 : let v = aux_file::decode_file_value_bytes(&v)
1313 9 : .context("value decode")
1314 9 : .map_err(PageReconstructError::Other)?;
1315 17 : for (fname, content) in v {
1316 8 : sz += fname.len();
1317 8 : sz += content.len();
1318 8 : result.insert(fname, content);
1319 8 : }
1320 : }
1321 6 : self.aux_file_size_estimator.on_initial(sz);
1322 6 : Ok(result)
1323 6 : }
1324 :
1325 0 : pub(crate) async fn trigger_aux_file_size_computation(
1326 0 : &self,
1327 0 : lsn: Lsn,
1328 0 : ctx: &RequestContext,
1329 0 : io_concurrency: IoConcurrency,
1330 0 : ) -> Result<(), PageReconstructError> {
1331 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1332 0 : Ok(())
1333 0 : }
1334 :
1335 6 : pub(crate) async fn list_aux_files(
1336 6 : &self,
1337 6 : lsn: Lsn,
1338 6 : ctx: &RequestContext,
1339 6 : io_concurrency: IoConcurrency,
1340 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1341 6 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1342 6 : }
1343 :
1344 2 : pub(crate) async fn get_replorigins(
1345 2 : &self,
1346 2 : lsn: Lsn,
1347 2 : ctx: &RequestContext,
1348 2 : io_concurrency: IoConcurrency,
1349 2 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1350 2 : let kv = self
1351 2 : .scan(
1352 2 : KeySpace::single(repl_origin_key_range()),
1353 2 : lsn,
1354 2 : ctx,
1355 2 : io_concurrency,
1356 2 : )
1357 2 : .await?;
1358 2 : let mut result = HashMap::new();
1359 6 : for (k, v) in kv {
1360 5 : let v = v?;
1361 5 : if v.is_empty() {
1362 : // This is a tombstone -- we can skip it.
1363 : // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
1364 : // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
1365 : // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
1366 : // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
1367 2 : continue;
1368 3 : }
1369 3 : let origin_id = k.field6 as RepOriginId;
1370 3 : let origin_lsn = Lsn::des(&v)
1371 3 : .with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
1372 2 : if origin_lsn != Lsn::INVALID {
1373 2 : result.insert(origin_id, origin_lsn);
1374 2 : }
1375 : }
1376 1 : Ok(result)
1377 2 : }
1378 :
1379 : /// Does the same as get_current_logical_size but counted on demand.
1380 : /// Used to initialize the logical size tracking on startup.
1381 : ///
1382 : /// Only relation blocks are counted currently. That excludes metadata,
1383 : /// SLRUs, twophase files etc.
1384 : ///
1385 : /// # Cancel-Safety
1386 : ///
1387 : /// This method is cancellation-safe.
1388 7 : pub(crate) async fn get_current_logical_size_non_incremental(
1389 7 : &self,
1390 7 : lsn: Lsn,
1391 7 : ctx: &RequestContext,
1392 7 : ) -> Result<u64, CalculateLogicalSizeError> {
1393 7 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1394 :
1395 7 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1396 :
1397 : // Fetch list of database dirs and iterate them
1398 7 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1399 7 : let dbdir = DbDirectory::des(&buf)?;
1400 :
1401 7 : let mut total_size: u64 = 0;
1402 7 : let mut dbdir_cnt = 0;
1403 7 : let mut rel_cnt = 0;
1404 :
1405 7 : for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
1406 0 : dbdir_cnt += 1;
1407 0 : for rel in self
1408 0 : .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
1409 0 : .await?
1410 : {
1411 0 : rel_cnt += 1;
1412 0 : if self.cancel.is_cancelled() {
1413 0 : return Err(CalculateLogicalSizeError::Cancelled);
1414 0 : }
1415 0 : let relsize_key = rel_size_to_key(rel);
1416 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1417 0 : let relsize = buf.get_u32_le();
1418 :
1419 0 : total_size += relsize as u64;
1420 : }
1421 : }
1422 :
1423 7 : self.db_rel_count
1424 7 : .store(Some(Arc::new((dbdir_cnt, rel_cnt))));
1425 :
1426 7 : Ok(total_size * BLCKSZ as u64)
1427 7 : }
1428 :
1429 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1430 : /// for gc-compaction.
1431 : ///
1432 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1433 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1434 : /// be kept only for a specific range of LSN.
1435 : ///
1436 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1437 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1438 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1439 : /// determine which keys to retain/drop for gc-compaction.
1440 : ///
1441 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1442 : /// to be retained for each of the branch LSN.
1443 : ///
1444 : /// The return value is (dense keyspace, sparse keyspace).
1445 27 : pub(crate) async fn collect_gc_compaction_keyspace(
1446 27 : &self,
1447 27 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1448 27 : let metadata_key_begin = Key::metadata_key_range().start;
1449 27 : let aux_v1_key = AUX_FILES_KEY;
1450 27 : let dense_keyspace = KeySpace {
1451 27 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1452 27 : };
1453 27 : Ok((
1454 27 : dense_keyspace,
1455 27 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1456 27 : ))
1457 27 : }
1458 :
1459 : ///
1460 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1461 : /// Anything that's not listed maybe removed from the underlying storage (from
1462 : /// that LSN forwards).
1463 : ///
1464 : /// The return value is (dense keyspace, sparse keyspace).
1465 169 : pub(crate) async fn collect_keyspace(
1466 169 : &self,
1467 169 : lsn: Lsn,
1468 169 : ctx: &RequestContext,
1469 169 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1470 : // Iterate through key ranges, greedily packing them into partitions
1471 169 : let mut result = KeySpaceAccum::new();
1472 :
1473 : // The dbdir metadata always exists
1474 169 : result.add_key(DBDIR_KEY);
1475 :
1476 : // Fetch list of database dirs and iterate them
1477 169 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1478 169 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1479 :
1480 169 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1481 169 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1482 0 : if has_relmap_file {
1483 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1484 0 : }
1485 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1486 :
1487 0 : let mut rels: Vec<RelTag> = self
1488 0 : .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
1489 0 : .await?
1490 0 : .into_iter()
1491 0 : .collect();
1492 0 : rels.sort_unstable();
1493 0 : for rel in rels {
1494 0 : let relsize_key = rel_size_to_key(rel);
1495 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1496 0 : let relsize = buf.get_u32_le();
1497 :
1498 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1499 0 : result.add_key(relsize_key);
1500 : }
1501 : }
1502 :
1503 : // Iterate SLRUs next
1504 169 : if self.tenant_shard_id.is_shard_zero() {
1505 498 : for kind in [
1506 166 : SlruKind::Clog,
1507 166 : SlruKind::MultiXactMembers,
1508 166 : SlruKind::MultiXactOffsets,
1509 : ] {
1510 498 : let slrudir_key = slru_dir_to_key(kind);
1511 498 : result.add_key(slrudir_key);
1512 498 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1513 498 : let dir = SlruSegmentDirectory::des(&buf)?;
1514 498 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1515 498 : segments.sort_unstable();
1516 498 : for segno in segments {
1517 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1518 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1519 0 : let segsize = buf.get_u32_le();
1520 :
1521 0 : result.add_range(
1522 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1523 : );
1524 0 : result.add_key(segsize_key);
1525 : }
1526 : }
1527 3 : }
1528 :
1529 : // Then pg_twophase
1530 169 : result.add_key(TWOPHASEDIR_KEY);
1531 :
1532 169 : let mut xids: Vec<u64> = self
1533 169 : .list_twophase_files(lsn, ctx)
1534 169 : .await?
1535 169 : .iter()
1536 169 : .cloned()
1537 169 : .collect();
1538 169 : xids.sort_unstable();
1539 169 : for xid in xids {
1540 0 : result.add_key(twophase_file_key(xid));
1541 0 : }
1542 :
1543 169 : result.add_key(CONTROLFILE_KEY);
1544 169 : result.add_key(CHECKPOINT_KEY);
1545 :
1546 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1547 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1548 : // and the keys will not be garbage-colllected.
1549 : #[cfg(test)]
1550 : {
1551 169 : let guard = self.extra_test_dense_keyspace.load();
1552 169 : for kr in &guard.ranges {
1553 0 : result.add_range(kr.clone());
1554 0 : }
1555 : }
1556 :
1557 169 : let dense_keyspace = result.to_keyspace();
1558 169 : let sparse_keyspace = SparseKeySpace(KeySpace {
1559 169 : ranges: vec![
1560 169 : Key::metadata_aux_key_range(),
1561 169 : repl_origin_key_range(),
1562 169 : Key::rel_dir_sparse_key_range(),
1563 169 : ],
1564 169 : });
1565 :
1566 169 : if cfg!(debug_assertions) {
1567 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1568 :
1569 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1570 : // category of sparse keys are split into their own image/delta files. If there
1571 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1572 : // and we want the developer to keep the keyspaces separated.
1573 :
1574 169 : let ranges = &sparse_keyspace.0.ranges;
1575 :
1576 : // TODO: use a single overlaps_with across the codebase
1577 507 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1578 507 : !(a.end <= b.start || b.end <= a.start)
1579 507 : }
1580 507 : for i in 0..ranges.len() {
1581 507 : for j in 0..i {
1582 507 : if overlaps_with(&ranges[i], &ranges[j]) {
1583 0 : panic!(
1584 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1585 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1586 : );
1587 507 : }
1588 : }
1589 : }
1590 338 : for i in 1..ranges.len() {
1591 338 : assert!(
1592 338 : ranges[i - 1].end <= ranges[i].start,
1593 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1594 0 : ranges[i - 1].start,
1595 0 : ranges[i - 1].end,
1596 0 : ranges[i].start,
1597 0 : ranges[i].end
1598 : );
1599 : }
1600 0 : }
1601 :
1602 169 : Ok((dense_keyspace, sparse_keyspace))
1603 169 : }
1604 :
1605 : /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
1606 : /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
1607 : /// at the particular LSN (snapshot).
1608 224270 : pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
1609 224270 : let lsn = version.get_lsn();
1610 : {
1611 224270 : let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
1612 224270 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
1613 224259 : if lsn >= *cached_lsn {
1614 221686 : RELSIZE_LATEST_CACHE_HITS.inc();
1615 221686 : return Some(*nblocks);
1616 2573 : }
1617 2573 : RELSIZE_CACHE_MISSES_OLD.inc();
1618 11 : }
1619 : }
1620 : {
1621 2584 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1622 2584 : if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
1623 2563 : RELSIZE_SNAPSHOT_CACHE_HITS.inc();
1624 2563 : return Some(*nblock);
1625 21 : }
1626 : }
1627 21 : if version.is_latest() {
1628 10 : RELSIZE_LATEST_CACHE_MISSES.inc();
1629 11 : } else {
1630 11 : RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
1631 11 : }
1632 21 : None
1633 224270 : }
1634 :
1635 : /// Update cached relation size if there is no more recent update
1636 5 : pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
1637 5 : let lsn = version.get_lsn();
1638 5 : if version.is_latest() {
1639 0 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1640 0 : match rel_size_cache.entry(tag) {
1641 0 : hash_map::Entry::Occupied(mut entry) => {
1642 0 : let cached_lsn = entry.get_mut();
1643 0 : if lsn >= cached_lsn.0 {
1644 0 : *cached_lsn = (lsn, nblocks);
1645 0 : }
1646 : }
1647 0 : hash_map::Entry::Vacant(entry) => {
1648 0 : entry.insert((lsn, nblocks));
1649 0 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1650 0 : }
1651 : }
1652 : } else {
1653 5 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1654 5 : if rel_size_cache.capacity() != 0 {
1655 5 : rel_size_cache.insert((lsn, tag), nblocks);
1656 5 : RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
1657 5 : }
1658 : }
1659 5 : }
1660 :
1661 : /// Store cached relation size
1662 141360 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1663 141360 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1664 141360 : if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
1665 960 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1666 140400 : }
1667 141360 : }
1668 :
1669 : /// Remove cached relation size
1670 1 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1671 1 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1672 1 : if rel_size_cache.remove(tag).is_some() {
1673 1 : RELSIZE_LATEST_CACHE_ENTRIES.dec();
1674 1 : }
1675 1 : }
1676 : }
1677 :
1678 : /// DatadirModification represents an operation to ingest an atomic set of
1679 : /// updates to the repository.
1680 : ///
1681 : /// It is created by the 'begin_record' function. It is called for each WAL
1682 : /// record, so that all the modifications by a one WAL record appear atomic.
1683 : pub struct DatadirModification<'a> {
1684 : /// The timeline this modification applies to. You can access this to
1685 : /// read the state, but note that any pending updates are *not* reflected
1686 : /// in the state in 'tline' yet.
1687 : pub tline: &'a Timeline,
1688 :
1689 : /// Current LSN of the modification
1690 : lsn: Lsn,
1691 :
1692 : // The modifications are not applied directly to the underlying key-value store.
1693 : // The put-functions add the modifications here, and they are flushed to the
1694 : // underlying key-value store by the 'finish' function.
1695 : pending_lsns: Vec<Lsn>,
1696 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1697 : pending_nblocks: i64,
1698 :
1699 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1700 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1701 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1702 :
1703 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1704 : /// which keys are stored here.
1705 : pending_data_batch: Option<SerializedValueBatch>,
1706 :
1707 : /// For special "directory" keys that store key-value maps, track the size of the map
1708 : /// if it was updated in this modification.
1709 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1710 :
1711 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1712 : pending_metadata_bytes: usize,
1713 :
1714 : /// Whether we are importing a pgdata directory.
1715 : is_importing_pgdata: bool,
1716 : }
1717 :
1718 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1719 : pub enum MetricsUpdate {
1720 : /// Set the metrics to this value
1721 : Set(u64),
1722 : /// Increment the metrics by this value
1723 : Add(u64),
1724 : /// Decrement the metrics by this value
1725 : Sub(u64),
1726 : }
1727 :
1728 : /// Controls the behavior of the reldir keyspace.
1729 : pub struct RelDirMode {
1730 : // Whether we can read the v2 keyspace or not.
1731 : current_status: RelSizeMigration,
1732 : // Whether we should initialize the v2 keyspace or not.
1733 : initialize: bool,
1734 : // Whether we should disable v1 access starting this LSNor not
1735 : disable_v1: bool,
1736 : }
1737 :
1738 : impl DatadirModification<'_> {
1739 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1740 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1741 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1742 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1743 :
1744 : /// Get the current lsn
1745 1 : pub(crate) fn get_lsn(&self) -> Lsn {
1746 1 : self.lsn
1747 1 : }
1748 :
1749 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1750 0 : self.pending_data_batch
1751 0 : .as_ref()
1752 0 : .map_or(0, |b| b.buffer_size())
1753 0 : + self.pending_metadata_bytes
1754 0 : }
1755 :
1756 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1757 0 : self.pending_data_batch
1758 0 : .as_ref()
1759 0 : .is_some_and(|b| b.has_data())
1760 0 : }
1761 :
1762 : /// Returns statistics about the currently pending modifications.
1763 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1764 0 : let mut stats = DatadirModificationStats::default();
1765 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1766 0 : match value {
1767 0 : Value::Image(_) => stats.metadata_images += 1,
1768 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1769 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1770 : }
1771 : }
1772 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1773 0 : match valuemeta {
1774 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1775 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1776 0 : ValueMeta::Observed(_) => {}
1777 : }
1778 : }
1779 0 : stats
1780 0 : }
1781 :
1782 : /// Set the current lsn
1783 72929 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
1784 72929 : ensure_walingest!(
1785 : lsn >= self.lsn,
1786 : "setting an older lsn {} than {} is not allowed",
1787 : lsn,
1788 : self.lsn
1789 : );
1790 :
1791 72929 : if lsn > self.lsn {
1792 72929 : self.pending_lsns.push(self.lsn);
1793 72929 : self.lsn = lsn;
1794 72929 : }
1795 72929 : Ok(())
1796 72929 : }
1797 :
1798 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1799 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1800 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1801 : ///
1802 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1803 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1804 : /// via [`Self::get`] as soon as their record has been ingested.
1805 425371 : fn is_data_key(key: &Key) -> bool {
1806 425371 : key.is_rel_block_key() || key.is_slru_block_key()
1807 425371 : }
1808 :
1809 : /// Initialize a completely new repository.
1810 : ///
1811 : /// This inserts the directory metadata entries that are assumed to
1812 : /// always exist.
1813 112 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1814 112 : let buf = DbDirectory::ser(&DbDirectory {
1815 112 : dbdirs: HashMap::new(),
1816 112 : })?;
1817 112 : self.pending_directory_entries
1818 112 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1819 112 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1820 :
1821 112 : let buf = if self.tline.pg_version >= PgMajorVersion::PG17 {
1822 99 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1823 99 : xids: HashSet::new(),
1824 99 : })
1825 : } else {
1826 13 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1827 13 : xids: HashSet::new(),
1828 13 : })
1829 0 : }?;
1830 112 : self.pending_directory_entries
1831 112 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1832 112 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1833 :
1834 112 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1835 112 : let empty_dir = Value::Image(buf);
1836 :
1837 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1838 : // harmless but they'd just be dropped on later compaction.
1839 112 : if self.tline.tenant_shard_id.is_shard_zero() {
1840 109 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1841 109 : self.pending_directory_entries.push((
1842 109 : DirectoryKind::SlruSegment(SlruKind::Clog),
1843 109 : MetricsUpdate::Set(0),
1844 109 : ));
1845 109 : self.put(
1846 109 : slru_dir_to_key(SlruKind::MultiXactMembers),
1847 109 : empty_dir.clone(),
1848 109 : );
1849 109 : self.pending_directory_entries.push((
1850 109 : DirectoryKind::SlruSegment(SlruKind::Clog),
1851 109 : MetricsUpdate::Set(0),
1852 109 : ));
1853 109 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1854 109 : self.pending_directory_entries.push((
1855 109 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1856 109 : MetricsUpdate::Set(0),
1857 109 : ));
1858 109 : }
1859 :
1860 112 : Ok(())
1861 112 : }
1862 :
1863 : #[cfg(test)]
1864 111 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1865 111 : self.init_empty()?;
1866 111 : self.put_control_file(bytes::Bytes::from_static(
1867 111 : b"control_file contents do not matter",
1868 : ))
1869 111 : .context("put_control_file")?;
1870 111 : self.put_checkpoint(bytes::Bytes::from_static(
1871 111 : b"checkpoint_file contents do not matter",
1872 : ))
1873 111 : .context("put_checkpoint_file")?;
1874 111 : Ok(())
1875 111 : }
1876 :
1877 : /// Creates a relation if it is not already present.
1878 : /// Returns the current size of the relation
1879 209028 : pub(crate) async fn create_relation_if_required(
1880 209028 : &mut self,
1881 209028 : rel: RelTag,
1882 209028 : ctx: &RequestContext,
1883 209028 : ) -> Result<u32, WalIngestError> {
1884 : // Get current size and put rel creation if rel doesn't exist
1885 : //
1886 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1887 : // check the cache too. This is because eagerly checking the cache results in
1888 : // less work overall and 10% better performance. It's more work on cache miss
1889 : // but cache miss is rare.
1890 209028 : if let Some(nblocks) = self
1891 209028 : .tline
1892 209028 : .get_cached_rel_size(&rel, Version::Modified(self))
1893 : {
1894 209023 : Ok(nblocks)
1895 5 : } else if !self
1896 5 : .tline
1897 5 : .get_rel_exists(rel, Version::Modified(self), ctx)
1898 5 : .await?
1899 : {
1900 : // create it with 0 size initially, the logic below will extend it
1901 5 : self.put_rel_creation(rel, 0, ctx).await?;
1902 5 : Ok(0)
1903 : } else {
1904 0 : Ok(self
1905 0 : .tline
1906 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1907 0 : .await?)
1908 : }
1909 209028 : }
1910 :
1911 : /// Given a block number for a relation (which represents a newly written block),
1912 : /// the previous block count of the relation, and the shard info, find the gaps
1913 : /// that were created by the newly written block if any.
1914 72835 : fn find_gaps(
1915 72835 : rel: RelTag,
1916 72835 : blkno: u32,
1917 72835 : previous_nblocks: u32,
1918 72835 : shard: &ShardIdentity,
1919 72835 : ) -> Option<KeySpace> {
1920 72835 : let mut key = rel_block_to_key(rel, blkno);
1921 72835 : let mut gap_accum = None;
1922 :
1923 72835 : for gap_blkno in previous_nblocks..blkno {
1924 16 : key.field6 = gap_blkno;
1925 :
1926 16 : if shard.get_shard_number(&key) != shard.number {
1927 4 : continue;
1928 12 : }
1929 :
1930 12 : gap_accum
1931 12 : .get_or_insert_with(KeySpaceAccum::new)
1932 12 : .add_key(key);
1933 : }
1934 :
1935 72835 : gap_accum.map(|accum| accum.to_keyspace())
1936 72835 : }
1937 :
1938 72926 : pub async fn ingest_batch(
1939 72926 : &mut self,
1940 72926 : mut batch: SerializedValueBatch,
1941 72926 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1942 72926 : shard: &ShardIdentity,
1943 72926 : ctx: &RequestContext,
1944 72926 : ) -> Result<(), WalIngestError> {
1945 72926 : let mut gaps_at_lsns = Vec::default();
1946 :
1947 72926 : for meta in batch.metadata.iter() {
1948 72821 : let key = Key::from_compact(meta.key());
1949 72821 : let (rel, blkno) = key
1950 72821 : .to_rel_block()
1951 72821 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
1952 72821 : let new_nblocks = blkno + 1;
1953 :
1954 72821 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1955 72821 : if new_nblocks > old_nblocks {
1956 1195 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1957 71626 : }
1958 :
1959 72821 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1960 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1961 72821 : }
1962 : }
1963 :
1964 72926 : if !gaps_at_lsns.is_empty() {
1965 0 : batch.zero_gaps(gaps_at_lsns);
1966 72926 : }
1967 :
1968 72926 : match self.pending_data_batch.as_mut() {
1969 10 : Some(pending_batch) => {
1970 10 : pending_batch.extend(batch);
1971 10 : }
1972 72916 : None if batch.has_data() => {
1973 72815 : self.pending_data_batch = Some(batch);
1974 72815 : }
1975 101 : None => {
1976 101 : // Nothing to initialize the batch with
1977 101 : }
1978 : }
1979 :
1980 72926 : Ok(())
1981 72926 : }
1982 :
1983 : /// Put a new page version that can be constructed from a WAL record
1984 : ///
1985 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1986 : /// current end-of-file. It's up to the caller to check that the relation size
1987 : /// matches the blocks inserted!
1988 6 : pub fn put_rel_wal_record(
1989 6 : &mut self,
1990 6 : rel: RelTag,
1991 6 : blknum: BlockNumber,
1992 6 : rec: NeonWalRecord,
1993 6 : ) -> Result<(), WalIngestError> {
1994 6 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1995 6 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1996 6 : Ok(())
1997 6 : }
1998 :
1999 : // Same, but for an SLRU.
2000 4 : pub fn put_slru_wal_record(
2001 4 : &mut self,
2002 4 : kind: SlruKind,
2003 4 : segno: u32,
2004 4 : blknum: BlockNumber,
2005 4 : rec: NeonWalRecord,
2006 4 : ) -> Result<(), WalIngestError> {
2007 4 : if !self.tline.tenant_shard_id.is_shard_zero() {
2008 0 : return Ok(());
2009 4 : }
2010 :
2011 4 : self.put(
2012 4 : slru_block_to_key(kind, segno, blknum),
2013 4 : Value::WalRecord(rec),
2014 : );
2015 4 : Ok(())
2016 4 : }
2017 :
2018 : /// Like put_wal_record, but with ready-made image of the page.
2019 138921 : pub fn put_rel_page_image(
2020 138921 : &mut self,
2021 138921 : rel: RelTag,
2022 138921 : blknum: BlockNumber,
2023 138921 : img: Bytes,
2024 138921 : ) -> Result<(), WalIngestError> {
2025 138921 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2026 138921 : let key = rel_block_to_key(rel, blknum);
2027 138921 : if !key.is_valid_key_on_write_path() {
2028 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2029 138921 : }
2030 138921 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
2031 138921 : Ok(())
2032 138921 : }
2033 :
2034 3 : pub fn put_slru_page_image(
2035 3 : &mut self,
2036 3 : kind: SlruKind,
2037 3 : segno: u32,
2038 3 : blknum: BlockNumber,
2039 3 : img: Bytes,
2040 3 : ) -> Result<(), WalIngestError> {
2041 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2042 :
2043 3 : let key = slru_block_to_key(kind, segno, blknum);
2044 3 : if !key.is_valid_key_on_write_path() {
2045 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2046 3 : }
2047 3 : self.put(key, Value::Image(img));
2048 3 : Ok(())
2049 3 : }
2050 :
2051 1499 : pub(crate) fn put_rel_page_image_zero(
2052 1499 : &mut self,
2053 1499 : rel: RelTag,
2054 1499 : blknum: BlockNumber,
2055 1499 : ) -> Result<(), WalIngestError> {
2056 1499 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2057 1499 : let key = rel_block_to_key(rel, blknum);
2058 1499 : if !key.is_valid_key_on_write_path() {
2059 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2060 1499 : }
2061 :
2062 1499 : let batch = self
2063 1499 : .pending_data_batch
2064 1499 : .get_or_insert_with(SerializedValueBatch::default);
2065 :
2066 1499 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
2067 :
2068 1499 : Ok(())
2069 1499 : }
2070 :
2071 0 : pub(crate) fn put_slru_page_image_zero(
2072 0 : &mut self,
2073 0 : kind: SlruKind,
2074 0 : segno: u32,
2075 0 : blknum: BlockNumber,
2076 0 : ) -> Result<(), WalIngestError> {
2077 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2078 0 : let key = slru_block_to_key(kind, segno, blknum);
2079 0 : if !key.is_valid_key_on_write_path() {
2080 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2081 0 : }
2082 :
2083 0 : let batch = self
2084 0 : .pending_data_batch
2085 0 : .get_or_insert_with(SerializedValueBatch::default);
2086 :
2087 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
2088 :
2089 0 : Ok(())
2090 0 : }
2091 :
2092 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
2093 : /// we enable it, we also need to persist it in `index_part.json` (initialize is true).
2094 : ///
2095 : /// As this function is only used on the write path, we do not need to read the migrated_at
2096 : /// field.
2097 973 : pub(crate) fn maybe_enable_rel_size_v2(
2098 973 : &mut self,
2099 973 : lsn: Lsn,
2100 973 : is_create: bool,
2101 973 : ) -> anyhow::Result<RelDirMode> {
2102 : // TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature
2103 973 : let expected_status = self.tline.get_rel_size_v2_expected_state();
2104 973 : let (persistent_status, migrated_at) = self.tline.get_rel_size_v2_status();
2105 :
2106 : // migrated_at LSN means the LSN where we "enable_v2" (but not "disable_v1")
2107 973 : if let Some(migrated_at) = migrated_at
2108 0 : && lsn <= migrated_at
2109 0 : && persistent_status == RelSizeMigration::Migrating
2110 : {
2111 : // Revert the status to legacy if the write head LSN is before the migrating LSN.
2112 0 : self.tline
2113 0 : .update_rel_size_v2_status(RelSizeMigration::Legacy, None)?;
2114 973 : }
2115 : // TODO: what if the migration is at "migrated" state but we need to revert?
2116 :
2117 : // Only initialize the v2 keyspace on new relation creation. No initialization
2118 : // during `timeline_create` (TODO: fix this, we should allow, but currently it
2119 : // hits expected consistency issues; need to ignore warnings).
2120 973 : let can_update = is_create && !self.is_importing_pgdata;
2121 :
2122 973 : match (expected_status, persistent_status) {
2123 : (RelSizeMigration::Legacy, RelSizeMigration::Legacy) => {
2124 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
2125 973 : Ok(RelDirMode {
2126 973 : current_status: RelSizeMigration::Legacy,
2127 973 : initialize: false,
2128 973 : disable_v1: false,
2129 973 : })
2130 : }
2131 : (
2132 : RelSizeMigration::Legacy,
2133 0 : current_status @ RelSizeMigration::Migrating
2134 0 : | current_status @ RelSizeMigration::Migrated,
2135 : ) => {
2136 : // already persisted that the timeline has enabled rel_size_v2, cannot rollback
2137 0 : Ok(RelDirMode {
2138 0 : current_status,
2139 0 : initialize: false,
2140 0 : disable_v1: false,
2141 0 : })
2142 : }
2143 : (
2144 0 : expected_status @ RelSizeMigration::Migrating
2145 0 : | expected_status @ RelSizeMigration::Migrated,
2146 : RelSizeMigration::Legacy,
2147 : ) => {
2148 : // The first time we enable it, we need to persist it in `index_part.json`
2149 : // The caller should update the reldir status once the initialization is done.
2150 :
2151 : Ok(RelDirMode {
2152 0 : current_status: RelSizeMigration::Legacy,
2153 0 : initialize: can_update,
2154 0 : disable_v1: can_update && expected_status == RelSizeMigration::Migrated,
2155 : })
2156 : }
2157 0 : (RelSizeMigration::Migrating, current_status @ RelSizeMigration::Migrating)
2158 0 : | (RelSizeMigration::Migrated, current_status @ RelSizeMigration::Migrated)
2159 0 : | (RelSizeMigration::Migrating, current_status @ RelSizeMigration::Migrated) => {
2160 : // Keep the current state
2161 0 : Ok(RelDirMode {
2162 0 : current_status,
2163 0 : initialize: false,
2164 0 : disable_v1: false,
2165 0 : })
2166 : }
2167 : (RelSizeMigration::Migrated, RelSizeMigration::Migrating) => {
2168 : // Switch to v2-only mode
2169 0 : Ok(RelDirMode {
2170 0 : current_status: RelSizeMigration::Migrating,
2171 0 : initialize: false,
2172 0 : disable_v1: can_update,
2173 0 : })
2174 : }
2175 : }
2176 973 : }
2177 :
2178 : /// Store a relmapper file (pg_filenode.map) in the repository
2179 8 : pub async fn put_relmap_file(
2180 8 : &mut self,
2181 8 : spcnode: Oid,
2182 8 : dbnode: Oid,
2183 8 : img: Bytes,
2184 8 : ctx: &RequestContext,
2185 8 : ) -> Result<(), WalIngestError> {
2186 8 : let v2_mode = self
2187 8 : .maybe_enable_rel_size_v2(self.lsn, false)
2188 8 : .map_err(WalIngestErrorKind::RelSizeV2Error)?;
2189 :
2190 : // Add it to the directory (if it doesn't exist already)
2191 8 : let buf = self.get(DBDIR_KEY, ctx).await?;
2192 8 : let mut dbdir = DbDirectory::des(&buf)?;
2193 :
2194 8 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
2195 8 : if r.is_none() || r == Some(false) {
2196 : // The dbdir entry didn't exist, or it contained a
2197 : // 'false'. The 'insert' call already updated it with
2198 : // 'true', now write the updated 'dbdirs' map back.
2199 8 : let buf = DbDirectory::ser(&dbdir)?;
2200 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2201 0 : }
2202 8 : if r.is_none() {
2203 4 : if v2_mode.current_status != RelSizeMigration::Legacy {
2204 0 : self.pending_directory_entries
2205 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2206 4 : }
2207 :
2208 : // Create RelDirectory in v1 keyspace. TODO: if we have fully migrated to v2, no need to create this directory.
2209 : // Some code path relies on this directory to be present. We should remove it once we starts to set tenants to
2210 : // `RelSizeMigration::Migrated` state (currently we don't, all tenants will have `RelSizeMigration::Migrating`).
2211 4 : let buf = RelDirectory::ser(&RelDirectory {
2212 4 : rels: HashSet::new(),
2213 4 : })?;
2214 4 : self.pending_directory_entries
2215 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2216 4 : self.put(
2217 4 : rel_dir_to_key(spcnode, dbnode),
2218 4 : Value::Image(Bytes::from(buf)),
2219 : );
2220 4 : }
2221 :
2222 8 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
2223 8 : Ok(())
2224 8 : }
2225 :
2226 0 : pub async fn put_twophase_file(
2227 0 : &mut self,
2228 0 : xid: u64,
2229 0 : img: Bytes,
2230 0 : ctx: &RequestContext,
2231 0 : ) -> Result<(), WalIngestError> {
2232 : // Add it to the directory entry
2233 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2234 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2235 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
2236 0 : if !dir.xids.insert(xid) {
2237 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
2238 0 : }
2239 0 : self.pending_directory_entries.push((
2240 0 : DirectoryKind::TwoPhase,
2241 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2242 0 : ));
2243 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2244 : } else {
2245 0 : let xid = xid as u32;
2246 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
2247 0 : if !dir.xids.insert(xid) {
2248 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
2249 0 : }
2250 0 : self.pending_directory_entries.push((
2251 0 : DirectoryKind::TwoPhase,
2252 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2253 0 : ));
2254 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2255 : };
2256 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2257 :
2258 0 : self.put(twophase_file_key(xid), Value::Image(img));
2259 0 : Ok(())
2260 0 : }
2261 :
2262 1 : pub async fn set_replorigin(
2263 1 : &mut self,
2264 1 : origin_id: RepOriginId,
2265 1 : origin_lsn: Lsn,
2266 1 : ) -> Result<(), WalIngestError> {
2267 1 : let key = repl_origin_key(origin_id);
2268 1 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
2269 1 : Ok(())
2270 1 : }
2271 :
2272 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
2273 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
2274 0 : }
2275 :
2276 112 : pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2277 112 : self.put(CONTROLFILE_KEY, Value::Image(img));
2278 112 : Ok(())
2279 112 : }
2280 :
2281 119 : pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2282 119 : self.put(CHECKPOINT_KEY, Value::Image(img));
2283 119 : Ok(())
2284 119 : }
2285 :
2286 0 : pub async fn drop_dbdir(
2287 0 : &mut self,
2288 0 : spcnode: Oid,
2289 0 : dbnode: Oid,
2290 0 : ctx: &RequestContext,
2291 0 : ) -> Result<(), WalIngestError> {
2292 0 : let total_blocks = self
2293 0 : .tline
2294 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
2295 0 : .await?;
2296 :
2297 : // Remove entry from dbdir
2298 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
2299 0 : let mut dir = DbDirectory::des(&buf)?;
2300 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
2301 0 : let buf = DbDirectory::ser(&dir)?;
2302 0 : self.pending_directory_entries.push((
2303 0 : DirectoryKind::Db,
2304 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
2305 0 : ));
2306 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2307 : } else {
2308 0 : warn!(
2309 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
2310 : spcnode, dbnode
2311 : );
2312 : }
2313 :
2314 : // Update logical database size.
2315 0 : self.pending_nblocks -= total_blocks as i64;
2316 :
2317 : // Delete all relations and metadata files for the spcnode/dnode
2318 0 : self.delete(dbdir_key_range(spcnode, dbnode));
2319 0 : Ok(())
2320 0 : }
2321 :
2322 0 : async fn initialize_rel_size_v2_keyspace(
2323 0 : &mut self,
2324 0 : ctx: &RequestContext,
2325 0 : dbdir: &DbDirectory,
2326 0 : ) -> Result<(), WalIngestError> {
2327 : // Copy everything from relv1 to relv2; TODO: check if there's any key in the v2 keyspace, if so, abort.
2328 0 : tracing::info!("initializing rel_size_v2 keyspace");
2329 0 : let mut rel_cnt = 0;
2330 : // relmap_exists (the value of dbdirs hashmap) does not affect the migration: we need to copy things over anyways
2331 0 : for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
2332 0 : let rel_dir_key = rel_dir_to_key(spcnode, dbnode);
2333 0 : let rel_dir = RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?;
2334 0 : for (relnode, forknum) in rel_dir.rels {
2335 0 : let sparse_rel_dir_key = rel_tag_sparse_key(spcnode, dbnode, relnode, forknum);
2336 0 : self.put(
2337 0 : sparse_rel_dir_key,
2338 0 : Value::Image(RelDirExists::Exists.encode()),
2339 0 : );
2340 0 : rel_cnt += 1;
2341 0 : }
2342 : }
2343 0 : tracing::info!(
2344 0 : "initialized rel_size_v2 keyspace at lsn {}: migrated {} relations",
2345 : self.lsn,
2346 : rel_cnt
2347 : );
2348 0 : Ok::<_, WalIngestError>(())
2349 0 : }
2350 :
2351 0 : async fn put_rel_creation_v1_placeholder(
2352 0 : &mut self,
2353 0 : rel: RelTag,
2354 0 : dbdir_exists: bool,
2355 0 : _ctx: &RequestContext,
2356 0 : ) -> Result<(), WalIngestError> {
2357 0 : if !dbdir_exists {
2358 0 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
2359 0 : self.put(
2360 0 : rel_dir_key,
2361 0 : Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
2362 : );
2363 0 : }
2364 0 : Ok(())
2365 0 : }
2366 :
2367 960 : async fn put_rel_creation_v1(
2368 960 : &mut self,
2369 960 : rel: RelTag,
2370 960 : dbdir_exists: bool,
2371 960 : ctx: &RequestContext,
2372 960 : ) -> Result<(), WalIngestError> {
2373 : // Reldir v1 write path
2374 960 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
2375 960 : let mut rel_dir = if !dbdir_exists {
2376 : // Create the RelDirectory
2377 4 : RelDirectory::default()
2378 : } else {
2379 : // reldir already exists, fetch it
2380 956 : RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
2381 : };
2382 :
2383 : // Add the new relation to the rel directory entry, and write it back
2384 960 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2385 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2386 960 : }
2387 960 : if !dbdir_exists {
2388 4 : self.pending_directory_entries
2389 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2390 956 : }
2391 960 : self.pending_directory_entries
2392 960 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2393 960 : self.put(
2394 960 : rel_dir_key,
2395 960 : Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
2396 : );
2397 960 : Ok(())
2398 960 : }
2399 :
2400 0 : async fn put_rel_creation_v2(
2401 0 : &mut self,
2402 0 : rel: RelTag,
2403 0 : dbdir_exists: bool,
2404 0 : ctx: &RequestContext,
2405 0 : ) -> Result<(), WalIngestError> {
2406 : // Reldir v2 write path
2407 0 : let sparse_rel_dir_key =
2408 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2409 : // check if the rel_dir_key exists in v2
2410 0 : let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
2411 0 : let val = RelDirExists::decode_option(val)
2412 0 : .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
2413 0 : if val == RelDirExists::Exists {
2414 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2415 0 : }
2416 0 : self.put(
2417 0 : sparse_rel_dir_key,
2418 0 : Value::Image(RelDirExists::Exists.encode()),
2419 : );
2420 0 : if !dbdir_exists {
2421 0 : self.pending_directory_entries
2422 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2423 0 : }
2424 0 : self.pending_directory_entries
2425 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2426 0 : Ok(())
2427 0 : }
2428 :
2429 : /// Create a relation fork.
2430 : ///
2431 : /// 'nblocks' is the initial size.
2432 960 : pub async fn put_rel_creation(
2433 960 : &mut self,
2434 960 : rel: RelTag,
2435 960 : nblocks: BlockNumber,
2436 960 : ctx: &RequestContext,
2437 960 : ) -> Result<(), WalIngestError> {
2438 960 : if rel.relnode == 0 {
2439 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
2440 0 : "invalid relnode"
2441 0 : )))?;
2442 960 : }
2443 : // It's possible that this is the first rel for this db in this
2444 : // tablespace. Create the reldir entry for it if so.
2445 960 : let mut dbdir: DbDirectory = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
2446 960 : let mut is_dbdir_dirty = false;
2447 :
2448 960 : let dbdir_exists =
2449 960 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
2450 : // Didn't exist. Update dbdir
2451 4 : e.insert(false);
2452 4 : self.pending_directory_entries.push((
2453 4 : DirectoryKind::Db,
2454 4 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
2455 4 : ));
2456 4 : is_dbdir_dirty = true;
2457 4 : false
2458 : } else {
2459 956 : true
2460 : };
2461 :
2462 960 : let mut v2_mode = self
2463 960 : .maybe_enable_rel_size_v2(self.lsn, true)
2464 960 : .map_err(WalIngestErrorKind::RelSizeV2Error)?;
2465 :
2466 960 : if v2_mode.initialize {
2467 0 : if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await {
2468 0 : tracing::warn!("error initializing rel_size_v2 keyspace: {}", e);
2469 : // TODO: circuit breaker so that it won't retry forever
2470 : } else {
2471 0 : v2_mode.current_status = RelSizeMigration::Migrating;
2472 0 : self.tline
2473 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating, Some(self.lsn))
2474 0 : .map_err(WalIngestErrorKind::RelSizeV2Error)?;
2475 : }
2476 960 : }
2477 :
2478 960 : if v2_mode.disable_v1 {
2479 0 : v2_mode.current_status = RelSizeMigration::Migrated;
2480 0 : self.tline
2481 0 : .update_rel_size_v2_status(RelSizeMigration::Migrated, Some(self.lsn))
2482 0 : .map_err(WalIngestErrorKind::RelSizeV2Error)?;
2483 960 : }
2484 :
2485 960 : if is_dbdir_dirty {
2486 4 : let buf = DbDirectory::ser(&dbdir)?;
2487 4 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2488 956 : }
2489 :
2490 960 : if v2_mode.current_status != RelSizeMigration::Migrated {
2491 960 : self.put_rel_creation_v1(rel, dbdir_exists, ctx).await?;
2492 : } else {
2493 : // collect_keyspace depends on the rel_dir_to_key to be present, so we need to create a placeholder
2494 0 : self.put_rel_creation_v1_placeholder(rel, dbdir_exists, ctx)
2495 0 : .await?;
2496 : }
2497 :
2498 960 : if v2_mode.current_status != RelSizeMigration::Legacy {
2499 0 : let write_v2_res = self.put_rel_creation_v2(rel, dbdir_exists, ctx).await;
2500 0 : if let Err(e) = write_v2_res {
2501 0 : if v2_mode.current_status == RelSizeMigration::Migrated {
2502 0 : return Err(e);
2503 0 : }
2504 0 : tracing::warn!("error writing rel_size_v2 keyspace: {}", e);
2505 0 : }
2506 960 : }
2507 :
2508 : // Put size
2509 960 : let size_key = rel_size_to_key(rel);
2510 960 : let buf = nblocks.to_le_bytes();
2511 960 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2512 :
2513 960 : self.pending_nblocks += nblocks as i64;
2514 :
2515 : // Update relation size cache
2516 960 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2517 :
2518 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2519 : // caller.
2520 960 : Ok(())
2521 960 : }
2522 :
2523 : /// Truncate relation
2524 3006 : pub async fn put_rel_truncation(
2525 3006 : &mut self,
2526 3006 : rel: RelTag,
2527 3006 : nblocks: BlockNumber,
2528 3006 : ctx: &RequestContext,
2529 3006 : ) -> Result<(), WalIngestError> {
2530 3006 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2531 3006 : if self
2532 3006 : .tline
2533 3006 : .get_rel_exists(rel, Version::Modified(self), ctx)
2534 3006 : .await?
2535 : {
2536 3006 : let size_key = rel_size_to_key(rel);
2537 : // Fetch the old size first
2538 3006 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2539 :
2540 : // Update the entry with the new size.
2541 3006 : let buf = nblocks.to_le_bytes();
2542 3006 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2543 :
2544 : // Update relation size cache
2545 3006 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2546 :
2547 : // Update logical database size.
2548 3006 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2549 0 : }
2550 3006 : Ok(())
2551 3006 : }
2552 :
2553 : /// Extend relation
2554 : /// If new size is smaller, do nothing.
2555 138340 : pub async fn put_rel_extend(
2556 138340 : &mut self,
2557 138340 : rel: RelTag,
2558 138340 : nblocks: BlockNumber,
2559 138340 : ctx: &RequestContext,
2560 138340 : ) -> Result<(), WalIngestError> {
2561 138340 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2562 :
2563 : // Put size
2564 138340 : let size_key = rel_size_to_key(rel);
2565 138340 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2566 :
2567 : // only extend relation here. never decrease the size
2568 138340 : if nblocks > old_size {
2569 137394 : let buf = nblocks.to_le_bytes();
2570 137394 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2571 137394 :
2572 137394 : // Update relation size cache
2573 137394 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2574 137394 :
2575 137394 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2576 137394 : }
2577 138340 : Ok(())
2578 138340 : }
2579 :
2580 5 : async fn put_rel_drop_v1(
2581 5 : &mut self,
2582 5 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2583 5 : ctx: &RequestContext,
2584 5 : ) -> Result<BTreeSet<RelTag>, WalIngestError> {
2585 5 : let mut dropped_rels = BTreeSet::new();
2586 6 : for ((spc_node, db_node), rel_tags) in drop_relations {
2587 1 : let dir_key = rel_dir_to_key(spc_node, db_node);
2588 1 : let buf = self.get(dir_key, ctx).await?;
2589 1 : let mut dir = RelDirectory::des(&buf)?;
2590 :
2591 1 : let mut dirty = false;
2592 2 : for rel_tag in rel_tags {
2593 1 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2594 1 : self.pending_directory_entries
2595 1 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2596 1 : dirty = true;
2597 1 : dropped_rels.insert(rel_tag);
2598 1 : true
2599 : } else {
2600 0 : false
2601 : };
2602 :
2603 1 : if found {
2604 : // update logical size
2605 1 : let size_key = rel_size_to_key(rel_tag);
2606 1 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2607 1 : self.pending_nblocks -= old_size as i64;
2608 :
2609 : // Remove entry from relation size cache
2610 1 : self.tline.remove_cached_rel_size(&rel_tag);
2611 :
2612 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2613 1 : self.delete(rel_key_range(rel_tag));
2614 0 : }
2615 : }
2616 :
2617 1 : if dirty {
2618 1 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2619 0 : }
2620 : }
2621 5 : Ok(dropped_rels)
2622 5 : }
2623 :
2624 0 : async fn put_rel_drop_v2(
2625 0 : &mut self,
2626 0 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2627 0 : ctx: &RequestContext,
2628 0 : ) -> Result<BTreeSet<RelTag>, WalIngestError> {
2629 0 : let mut dropped_rels = BTreeSet::new();
2630 0 : for ((spc_node, db_node), rel_tags) in drop_relations {
2631 0 : for rel_tag in rel_tags {
2632 0 : let key = rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2633 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2634 0 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2635 0 : if val == RelDirExists::Exists {
2636 0 : dropped_rels.insert(rel_tag);
2637 0 : self.pending_directory_entries
2638 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2639 0 : // put tombstone
2640 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2641 0 : }
2642 : }
2643 : }
2644 0 : Ok(dropped_rels)
2645 0 : }
2646 :
2647 : /// Drop some relations
2648 5 : pub(crate) async fn put_rel_drops(
2649 5 : &mut self,
2650 5 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2651 5 : ctx: &RequestContext,
2652 5 : ) -> Result<(), WalIngestError> {
2653 5 : let v2_mode = self
2654 5 : .maybe_enable_rel_size_v2(self.lsn, false)
2655 5 : .map_err(WalIngestErrorKind::RelSizeV2Error)?;
2656 5 : match v2_mode.current_status {
2657 : RelSizeMigration::Legacy => {
2658 5 : self.put_rel_drop_v1(drop_relations, ctx).await?;
2659 : }
2660 : RelSizeMigration::Migrating => {
2661 0 : let dropped_rels_v1 = self.put_rel_drop_v1(drop_relations.clone(), ctx).await?;
2662 0 : let dropped_rels_v2_res = self.put_rel_drop_v2(drop_relations, ctx).await;
2663 0 : match dropped_rels_v2_res {
2664 0 : Ok(dropped_rels_v2) => {
2665 0 : if dropped_rels_v1 != dropped_rels_v2 {
2666 0 : tracing::warn!(
2667 0 : "inconsistent v1/v2 rel drop: dropped_rels_v1.len()={}, dropped_rels_v2.len()={}",
2668 0 : dropped_rels_v1.len(),
2669 0 : dropped_rels_v2.len()
2670 : );
2671 0 : }
2672 : }
2673 : Err(WalIngestError {
2674 : kind: WalIngestErrorKind::Cancelled,
2675 : ..
2676 0 : }) => {
2677 0 : // Cancellation errors are fine to ignore, do not log.
2678 0 : }
2679 0 : Err(e) => {
2680 0 : tracing::warn!("error dropping rels: {}", e);
2681 : }
2682 : }
2683 : }
2684 : RelSizeMigration::Migrated => {
2685 0 : self.put_rel_drop_v2(drop_relations, ctx).await?;
2686 : }
2687 : }
2688 5 : Ok(())
2689 5 : }
2690 :
2691 3 : pub async fn put_slru_segment_creation(
2692 3 : &mut self,
2693 3 : kind: SlruKind,
2694 3 : segno: u32,
2695 3 : nblocks: BlockNumber,
2696 3 : ctx: &RequestContext,
2697 3 : ) -> Result<(), WalIngestError> {
2698 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2699 :
2700 : // Add it to the directory entry
2701 3 : let dir_key = slru_dir_to_key(kind);
2702 3 : let buf = self.get(dir_key, ctx).await?;
2703 3 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2704 :
2705 3 : if !dir.segments.insert(segno) {
2706 0 : Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
2707 3 : }
2708 3 : self.pending_directory_entries.push((
2709 3 : DirectoryKind::SlruSegment(kind),
2710 3 : MetricsUpdate::Set(dir.segments.len() as u64),
2711 3 : ));
2712 3 : self.put(
2713 3 : dir_key,
2714 3 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2715 : );
2716 :
2717 : // Put size
2718 3 : let size_key = slru_segment_size_to_key(kind, segno);
2719 3 : let buf = nblocks.to_le_bytes();
2720 3 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2721 :
2722 : // even if nblocks > 0, we don't insert any actual blocks here
2723 :
2724 3 : Ok(())
2725 3 : }
2726 :
2727 : /// Extend SLRU segment
2728 0 : pub fn put_slru_extend(
2729 0 : &mut self,
2730 0 : kind: SlruKind,
2731 0 : segno: u32,
2732 0 : nblocks: BlockNumber,
2733 0 : ) -> Result<(), WalIngestError> {
2734 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2735 :
2736 : // Put size
2737 0 : let size_key = slru_segment_size_to_key(kind, segno);
2738 0 : let buf = nblocks.to_le_bytes();
2739 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2740 0 : Ok(())
2741 0 : }
2742 :
2743 : /// This method is used for marking truncated SLRU files
2744 0 : pub async fn drop_slru_segment(
2745 0 : &mut self,
2746 0 : kind: SlruKind,
2747 0 : segno: u32,
2748 0 : ctx: &RequestContext,
2749 0 : ) -> Result<(), WalIngestError> {
2750 : // Remove it from the directory entry
2751 0 : let dir_key = slru_dir_to_key(kind);
2752 0 : let buf = self.get(dir_key, ctx).await?;
2753 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2754 :
2755 0 : if !dir.segments.remove(&segno) {
2756 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2757 0 : }
2758 0 : self.pending_directory_entries.push((
2759 0 : DirectoryKind::SlruSegment(kind),
2760 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2761 0 : ));
2762 0 : self.put(
2763 0 : dir_key,
2764 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2765 : );
2766 :
2767 : // Delete size entry, as well as all blocks
2768 0 : self.delete(slru_segment_key_range(kind, segno));
2769 :
2770 0 : Ok(())
2771 0 : }
2772 :
2773 : /// Drop a relmapper file (pg_filenode.map)
2774 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
2775 : // TODO
2776 0 : Ok(())
2777 0 : }
2778 :
2779 : /// This method is used for marking truncated SLRU files
2780 0 : pub async fn drop_twophase_file(
2781 0 : &mut self,
2782 0 : xid: u64,
2783 0 : ctx: &RequestContext,
2784 0 : ) -> Result<(), WalIngestError> {
2785 : // Remove it from the directory entry
2786 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2787 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2788 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2789 :
2790 0 : if !dir.xids.remove(&xid) {
2791 0 : warn!("twophase file for xid {} does not exist", xid);
2792 0 : }
2793 0 : self.pending_directory_entries.push((
2794 0 : DirectoryKind::TwoPhase,
2795 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2796 0 : ));
2797 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2798 : } else {
2799 0 : let xid: u32 = u32::try_from(xid)
2800 0 : .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
2801 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2802 :
2803 0 : if !dir.xids.remove(&xid) {
2804 0 : warn!("twophase file for xid {} does not exist", xid);
2805 0 : }
2806 0 : self.pending_directory_entries.push((
2807 0 : DirectoryKind::TwoPhase,
2808 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2809 0 : ));
2810 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2811 : };
2812 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2813 :
2814 : // Delete it
2815 0 : self.delete(twophase_key_range(xid));
2816 :
2817 0 : Ok(())
2818 0 : }
2819 :
2820 8 : pub async fn put_file(
2821 8 : &mut self,
2822 8 : path: &str,
2823 8 : content: &[u8],
2824 8 : ctx: &RequestContext,
2825 8 : ) -> Result<(), WalIngestError> {
2826 8 : let key = aux_file::encode_aux_file_key(path);
2827 : // retrieve the key from the engine
2828 8 : let old_val = match self.get(key, ctx).await {
2829 2 : Ok(val) => Some(val),
2830 6 : Err(PageReconstructError::MissingKey(_)) => None,
2831 0 : Err(e) => return Err(e.into()),
2832 : };
2833 8 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2834 2 : aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
2835 : } else {
2836 6 : Vec::new()
2837 : };
2838 8 : let mut other_files = Vec::with_capacity(files.len());
2839 8 : let mut modifying_file = None;
2840 10 : for file @ (p, content) in files {
2841 2 : if path == p {
2842 2 : assert!(
2843 2 : modifying_file.is_none(),
2844 0 : "duplicated entries found for {path}"
2845 : );
2846 2 : modifying_file = Some(content);
2847 0 : } else {
2848 0 : other_files.push(file);
2849 0 : }
2850 : }
2851 8 : let mut new_files = other_files;
2852 8 : match (modifying_file, content.is_empty()) {
2853 1 : (Some(old_content), false) => {
2854 1 : self.tline
2855 1 : .aux_file_size_estimator
2856 1 : .on_update(old_content.len(), content.len());
2857 1 : new_files.push((path, content));
2858 1 : }
2859 1 : (Some(old_content), true) => {
2860 1 : self.tline
2861 1 : .aux_file_size_estimator
2862 1 : .on_remove(old_content.len());
2863 1 : // not adding the file key to the final `new_files` vec.
2864 1 : }
2865 6 : (None, false) => {
2866 6 : self.tline.aux_file_size_estimator.on_add(content.len());
2867 6 : new_files.push((path, content));
2868 6 : }
2869 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2870 : // Compute doesn't know if previous version of this file exists or not, so
2871 : // attempt to delete non-existing file can cause this message.
2872 : // To avoid false alarms, log it as info rather than warning.
2873 0 : (None, true) if path.starts_with("pg_stat/") => {
2874 0 : info!("removing non-existing pg_stat file: {}", path)
2875 : }
2876 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2877 : }
2878 8 : let new_val = aux_file::encode_file_value(&new_files)
2879 8 : .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
2880 8 : self.put(key, Value::Image(new_val.into()));
2881 :
2882 8 : Ok(())
2883 8 : }
2884 :
2885 : ///
2886 : /// Flush changes accumulated so far to the underlying repository.
2887 : ///
2888 : /// Usually, changes made in DatadirModification are atomic, but this allows
2889 : /// you to flush them to the underlying repository before the final `commit`.
2890 : /// That allows to free up the memory used to hold the pending changes.
2891 : ///
2892 : /// Currently only used during bulk import of a data directory. In that
2893 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2894 : /// whole import fails and the timeline will be deleted anyway.
2895 : /// (Or to be precise, it will be left behind for debugging purposes and
2896 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2897 : ///
2898 : /// Note: A consequence of flushing the pending operations is that they
2899 : /// won't be visible to subsequent operations until `commit`. The function
2900 : /// retains all the metadata, but data pages are flushed. That's again OK
2901 : /// for bulk import, where you are just loading data pages and won't try to
2902 : /// modify the same pages twice.
2903 965 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2904 : // Unless we have accumulated a decent amount of changes, it's not worth it
2905 : // to scan through the pending_updates list.
2906 965 : let pending_nblocks = self.pending_nblocks;
2907 965 : if pending_nblocks < 10000 {
2908 965 : return Ok(());
2909 0 : }
2910 :
2911 0 : let mut writer = self.tline.writer().await;
2912 :
2913 : // Flush relation and SLRU data blocks, keep metadata.
2914 0 : if let Some(batch) = self.pending_data_batch.take() {
2915 0 : tracing::debug!(
2916 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2917 : batch.max_lsn,
2918 0 : self.tline.get_last_record_lsn()
2919 : );
2920 :
2921 : // This bails out on first error without modifying pending_updates.
2922 : // That's Ok, cf this function's doc comment.
2923 0 : writer.put_batch(batch, ctx).await?;
2924 0 : }
2925 :
2926 0 : if pending_nblocks != 0 {
2927 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2928 0 : self.pending_nblocks = 0;
2929 0 : }
2930 :
2931 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2932 0 : writer.update_directory_entries_count(kind, count);
2933 0 : }
2934 :
2935 0 : Ok(())
2936 965 : }
2937 :
2938 : ///
2939 : /// Finish this atomic update, writing all the updated keys to the
2940 : /// underlying timeline.
2941 : /// All the modifications in this atomic update are stamped by the specified LSN.
2942 : ///
2943 371557 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2944 371557 : let mut writer = self.tline.writer().await;
2945 :
2946 371557 : let pending_nblocks = self.pending_nblocks;
2947 371557 : self.pending_nblocks = 0;
2948 :
2949 : // Ordering: the items in this batch do not need to be in any global order, but values for
2950 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2951 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2952 : // more details.
2953 :
2954 371557 : let metadata_batch = {
2955 371557 : let pending_meta = self
2956 371557 : .pending_metadata_pages
2957 371557 : .drain()
2958 371557 : .flat_map(|(key, values)| {
2959 137068 : values
2960 137068 : .into_iter()
2961 137068 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2962 137068 : })
2963 371557 : .collect::<Vec<_>>();
2964 :
2965 371557 : if pending_meta.is_empty() {
2966 236139 : None
2967 : } else {
2968 135418 : Some(SerializedValueBatch::from_values(pending_meta))
2969 : }
2970 : };
2971 :
2972 371557 : let data_batch = self.pending_data_batch.take();
2973 :
2974 371557 : let maybe_batch = match (data_batch, metadata_batch) {
2975 132278 : (Some(mut data), Some(metadata)) => {
2976 132278 : data.extend(metadata);
2977 132278 : Some(data)
2978 : }
2979 71631 : (Some(data), None) => Some(data),
2980 3140 : (None, Some(metadata)) => Some(metadata),
2981 164508 : (None, None) => None,
2982 : };
2983 :
2984 371557 : if let Some(batch) = maybe_batch {
2985 207049 : tracing::debug!(
2986 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2987 : batch.max_lsn,
2988 0 : self.tline.get_last_record_lsn()
2989 : );
2990 :
2991 : // This bails out on first error without modifying pending_updates.
2992 : // That's Ok, cf this function's doc comment.
2993 207049 : writer.put_batch(batch, ctx).await?;
2994 164508 : }
2995 :
2996 371557 : if !self.pending_deletions.is_empty() {
2997 1 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2998 1 : self.pending_deletions.clear();
2999 371556 : }
3000 :
3001 371557 : self.pending_lsns.push(self.lsn);
3002 444486 : for pending_lsn in self.pending_lsns.drain(..) {
3003 444486 : // TODO(vlad): pretty sure the comment below is not valid anymore
3004 444486 : // and we can call finish write with the latest LSN
3005 444486 : //
3006 444486 : // Ideally, we should be able to call writer.finish_write() only once
3007 444486 : // with the highest LSN. However, the last_record_lsn variable in the
3008 444486 : // timeline keeps track of the latest LSN and the immediate previous LSN
3009 444486 : // so we need to record every LSN to not leave a gap between them.
3010 444486 : writer.finish_write(pending_lsn);
3011 444486 : }
3012 :
3013 371557 : if pending_nblocks != 0 {
3014 135285 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
3015 236272 : }
3016 :
3017 371557 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
3018 1527 : writer.update_directory_entries_count(kind, count);
3019 1527 : }
3020 :
3021 371557 : self.pending_metadata_bytes = 0;
3022 :
3023 371557 : Ok(())
3024 371557 : }
3025 :
3026 145852 : pub(crate) fn len(&self) -> usize {
3027 145852 : self.pending_metadata_pages.len()
3028 145852 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
3029 145852 : + self.pending_deletions.len()
3030 145852 : }
3031 :
3032 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
3033 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
3034 : /// in the modification.
3035 : ///
3036 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
3037 : /// page must ensure that the pages they read are already committed in Timeline, for example
3038 : /// DB create operations are always preceded by a call to commit(). This is special cased because
3039 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
3040 : /// and not data pages.
3041 143293 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
3042 143293 : if !Self::is_data_key(&key) {
3043 : // Have we already updated the same key? Read the latest pending updated
3044 : // version in that case.
3045 : //
3046 : // Note: we don't check pending_deletions. It is an error to request a
3047 : // value that has been removed, deletion only avoids leaking storage.
3048 143293 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
3049 7964 : if let Some((_, _, value)) = values.last() {
3050 7964 : return if let Value::Image(img) = value {
3051 7964 : Ok(img.clone())
3052 : } else {
3053 : // Currently, we never need to read back a WAL record that we
3054 : // inserted in the same "transaction". All the metadata updates
3055 : // work directly with Images, and we never need to read actual
3056 : // data pages. We could handle this if we had to, by calling
3057 : // the walredo manager, but let's keep it simple for now.
3058 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
3059 0 : "unexpected pending WAL record"
3060 0 : )))
3061 : };
3062 0 : }
3063 135329 : }
3064 : } else {
3065 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
3066 : // this key should never be present in pending_data_pages. We ensure this by committing
3067 : // modifications before ingesting DB create operations, which are the only kind that reads
3068 : // data pages during ingest.
3069 0 : if cfg!(debug_assertions) {
3070 0 : assert!(
3071 0 : !self
3072 0 : .pending_data_batch
3073 0 : .as_ref()
3074 0 : .is_some_and(|b| b.updates_key(&key))
3075 : );
3076 0 : }
3077 : }
3078 :
3079 : // Metadata page cache miss, or we're reading a data page.
3080 135329 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
3081 135329 : self.tline.get(key, lsn, ctx).await
3082 143293 : }
3083 :
3084 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
3085 : /// and the empty value into None.
3086 0 : async fn sparse_get(
3087 0 : &self,
3088 0 : key: Key,
3089 0 : ctx: &RequestContext,
3090 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
3091 0 : let val = self.get(key, ctx).await;
3092 0 : match val {
3093 0 : Ok(val) if val.is_empty() => Ok(None),
3094 0 : Ok(val) => Ok(Some(val)),
3095 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
3096 0 : Err(e) => Err(e),
3097 : }
3098 0 : }
3099 :
3100 : #[cfg(test)]
3101 2 : pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
3102 2 : self.put(key, val);
3103 2 : }
3104 :
3105 282078 : fn put(&mut self, key: Key, val: Value) {
3106 282078 : if Self::is_data_key(&key) {
3107 138934 : self.put_data(key.to_compact(), val)
3108 : } else {
3109 143144 : self.put_metadata(key.to_compact(), val)
3110 : }
3111 282078 : }
3112 :
3113 138934 : fn put_data(&mut self, key: CompactKey, val: Value) {
3114 138934 : let batch = self
3115 138934 : .pending_data_batch
3116 138934 : .get_or_insert_with(SerializedValueBatch::default);
3117 138934 : batch.put(key, val, self.lsn);
3118 138934 : }
3119 :
3120 143144 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
3121 143144 : let values = self.pending_metadata_pages.entry(key).or_default();
3122 : // Replace the previous value if it exists at the same lsn
3123 143144 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
3124 6076 : if *last_lsn == self.lsn {
3125 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
3126 6076 : self.pending_metadata_bytes -= *last_value_ser_size;
3127 6076 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
3128 6076 : self.pending_metadata_bytes += *last_value_ser_size;
3129 :
3130 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
3131 : // have been generated by synthesized zero page writes prior to the first real write to a page.
3132 6076 : *last_value = val;
3133 6076 : return;
3134 0 : }
3135 137068 : }
3136 :
3137 137068 : let val_serialized_size = val.serialized_size().unwrap() as usize;
3138 137068 : self.pending_metadata_bytes += val_serialized_size;
3139 137068 : values.push((self.lsn, val_serialized_size, val));
3140 :
3141 137068 : if key == CHECKPOINT_KEY.to_compact() {
3142 119 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
3143 136949 : }
3144 143144 : }
3145 :
3146 1 : fn delete(&mut self, key_range: Range<Key>) {
3147 1 : trace!("DELETE {}-{}", key_range.start, key_range.end);
3148 1 : self.pending_deletions.push((key_range, self.lsn));
3149 1 : }
3150 : }
3151 :
3152 : /// Statistics for a DatadirModification.
3153 : #[derive(Default)]
3154 : pub struct DatadirModificationStats {
3155 : pub metadata_images: u64,
3156 : pub metadata_deltas: u64,
3157 : pub data_images: u64,
3158 : pub data_deltas: u64,
3159 : }
3160 :
3161 : /// This struct facilitates accessing either a committed key from the timeline at a
3162 : /// specific LSN, or the latest uncommitted key from a pending modification.
3163 : ///
3164 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
3165 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
3166 : /// need to look up the keys in the modification first before looking them up in the
3167 : /// timeline to not miss the latest updates.
3168 : #[derive(Clone, Copy)]
3169 : pub enum Version<'a> {
3170 : LsnRange(LsnRange),
3171 : Modified(&'a DatadirModification<'a>),
3172 : }
3173 :
3174 : impl Version<'_> {
3175 25 : async fn get(
3176 25 : &self,
3177 25 : timeline: &Timeline,
3178 25 : key: Key,
3179 25 : ctx: &RequestContext,
3180 25 : ) -> Result<Bytes, PageReconstructError> {
3181 25 : match self {
3182 15 : Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
3183 10 : Version::Modified(modification) => modification.get(key, ctx).await,
3184 : }
3185 25 : }
3186 :
3187 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
3188 : /// and the empty value into None.
3189 0 : async fn sparse_get(
3190 0 : &self,
3191 0 : timeline: &Timeline,
3192 0 : key: Key,
3193 0 : ctx: &RequestContext,
3194 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
3195 0 : let val = self.get(timeline, key, ctx).await;
3196 0 : match val {
3197 0 : Ok(val) if val.is_empty() => Ok(None),
3198 0 : Ok(val) => Ok(Some(val)),
3199 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
3200 0 : Err(e) => Err(e),
3201 : }
3202 0 : }
3203 :
3204 26 : pub fn is_latest(&self) -> bool {
3205 26 : match self {
3206 16 : Version::LsnRange(lsns) => lsns.is_latest(),
3207 10 : Version::Modified(_) => true,
3208 : }
3209 26 : }
3210 :
3211 224275 : pub fn get_lsn(&self) -> Lsn {
3212 224275 : match self {
3213 12224 : Version::LsnRange(lsns) => lsns.effective_lsn,
3214 212051 : Version::Modified(modification) => modification.lsn,
3215 : }
3216 224275 : }
3217 :
3218 12219 : pub fn at(lsn: Lsn) -> Self {
3219 12219 : Version::LsnRange(LsnRange {
3220 12219 : effective_lsn: lsn,
3221 12219 : request_lsn: lsn,
3222 12219 : })
3223 12219 : }
3224 : }
3225 :
3226 : //--- Metadata structs stored in key-value pairs in the repository.
3227 :
3228 0 : #[derive(Debug, Serialize, Deserialize)]
3229 : pub(crate) struct DbDirectory {
3230 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
3231 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
3232 : }
3233 :
3234 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
3235 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
3236 : // were named like "pg_twophase/000002E5", now they're like
3237 : // "pg_twophsae/0000000A000002E4".
3238 :
3239 0 : #[derive(Debug, Serialize, Deserialize)]
3240 : pub(crate) struct TwoPhaseDirectory {
3241 : pub(crate) xids: HashSet<TransactionId>,
3242 : }
3243 :
3244 0 : #[derive(Debug, Serialize, Deserialize)]
3245 : struct TwoPhaseDirectoryV17 {
3246 : xids: HashSet<u64>,
3247 : }
3248 :
3249 0 : #[derive(Debug, Serialize, Deserialize, Default)]
3250 : pub(crate) struct RelDirectory {
3251 : // Set of relations that exist. (relfilenode, forknum)
3252 : //
3253 : // TODO: Store it as a btree or radix tree or something else that spans multiple
3254 : // key-value pairs, if you have a lot of relations
3255 : pub(crate) rels: HashSet<(Oid, u8)>,
3256 : }
3257 :
3258 0 : #[derive(Debug, Serialize, Deserialize)]
3259 : struct RelSizeEntry {
3260 : nblocks: u32,
3261 : }
3262 :
3263 0 : #[derive(Debug, Serialize, Deserialize, Default)]
3264 : pub(crate) struct SlruSegmentDirectory {
3265 : // Set of SLRU segments that exist.
3266 : pub(crate) segments: HashSet<u32>,
3267 : }
3268 :
3269 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
3270 : #[repr(u8)]
3271 : pub(crate) enum DirectoryKind {
3272 : Db,
3273 : TwoPhase,
3274 : Rel,
3275 : AuxFiles,
3276 : SlruSegment(SlruKind),
3277 : RelV2,
3278 : }
3279 :
3280 : impl DirectoryKind {
3281 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
3282 4582 : pub(crate) fn offset(&self) -> usize {
3283 4582 : self.into_usize()
3284 4582 : }
3285 : }
3286 :
3287 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
3288 :
3289 : #[allow(clippy::bool_assert_comparison)]
3290 : #[cfg(test)]
3291 : mod tests {
3292 : use hex_literal::hex;
3293 : use pageserver_api::models::ShardParameters;
3294 : use utils::id::TimelineId;
3295 : use utils::shard::{ShardCount, ShardNumber, ShardStripeSize};
3296 :
3297 : use super::*;
3298 : use crate::DEFAULT_PG_VERSION;
3299 : use crate::tenant::harness::TenantHarness;
3300 :
3301 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
3302 : #[tokio::test]
3303 1 : async fn aux_files_round_trip() -> anyhow::Result<()> {
3304 1 : let name = "aux_files_round_trip";
3305 1 : let harness = TenantHarness::create(name).await?;
3306 :
3307 : pub const TIMELINE_ID: TimelineId =
3308 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
3309 :
3310 1 : let (tenant, ctx) = harness.load().await;
3311 1 : let (tline, ctx) = tenant
3312 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3313 1 : .await?;
3314 1 : let tline = tline.raw_timeline().unwrap();
3315 :
3316 : // First modification: insert two keys
3317 1 : let mut modification = tline.begin_modification(Lsn(0x1000));
3318 1 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
3319 1 : modification.set_lsn(Lsn(0x1008))?;
3320 1 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
3321 1 : modification.commit(&ctx).await?;
3322 1 : let expect_1008 = HashMap::from([
3323 1 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
3324 1 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
3325 1 : ]);
3326 :
3327 1 : let io_concurrency = IoConcurrency::spawn_for_test();
3328 :
3329 1 : let readback = tline
3330 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
3331 1 : .await?;
3332 1 : assert_eq!(readback, expect_1008);
3333 :
3334 : // Second modification: update one key, remove the other
3335 1 : let mut modification = tline.begin_modification(Lsn(0x2000));
3336 1 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
3337 1 : modification.set_lsn(Lsn(0x2008))?;
3338 1 : modification.put_file("foo/bar2", b"", &ctx).await?;
3339 1 : modification.commit(&ctx).await?;
3340 1 : let expect_2008 =
3341 1 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
3342 :
3343 1 : let readback = tline
3344 1 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
3345 1 : .await?;
3346 1 : assert_eq!(readback, expect_2008);
3347 :
3348 : // Reading back in time works
3349 1 : let readback = tline
3350 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
3351 1 : .await?;
3352 1 : assert_eq!(readback, expect_1008);
3353 :
3354 2 : Ok(())
3355 1 : }
3356 :
3357 : #[test]
3358 1 : fn gap_finding() {
3359 1 : let rel = RelTag {
3360 1 : spcnode: 1663,
3361 1 : dbnode: 208101,
3362 1 : relnode: 2620,
3363 1 : forknum: 0,
3364 1 : };
3365 1 : let base_blkno = 1;
3366 :
3367 1 : let base_key = rel_block_to_key(rel, base_blkno);
3368 1 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
3369 :
3370 1 : let shard = ShardIdentity::unsharded();
3371 :
3372 1 : let mut previous_nblocks = 0;
3373 11 : for i in 0..10 {
3374 10 : let crnt_blkno = base_blkno + i;
3375 10 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
3376 :
3377 10 : previous_nblocks = crnt_blkno + 1;
3378 :
3379 10 : if i == 0 {
3380 : // The first block we write is 1, so we should find the gap.
3381 1 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
3382 : } else {
3383 9 : assert!(gaps.is_none());
3384 : }
3385 : }
3386 :
3387 : // This is an update to an already existing block. No gaps here.
3388 1 : let update_blkno = 5;
3389 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3390 1 : assert!(gaps.is_none());
3391 :
3392 : // This is an update past the current end block.
3393 1 : let after_gap_blkno = 20;
3394 1 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
3395 :
3396 1 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
3397 1 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
3398 1 : assert_eq!(
3399 1 : gaps.unwrap(),
3400 1 : KeySpace::single(gap_start_key..after_gap_key)
3401 : );
3402 1 : }
3403 :
3404 : #[test]
3405 1 : fn sharded_gap_finding() {
3406 1 : let rel = RelTag {
3407 1 : spcnode: 1663,
3408 1 : dbnode: 208101,
3409 1 : relnode: 2620,
3410 1 : forknum: 0,
3411 1 : };
3412 :
3413 1 : let first_blkno = 6;
3414 :
3415 : // This shard will get the even blocks
3416 1 : let shard = ShardIdentity::from_params(
3417 1 : ShardNumber(0),
3418 1 : ShardParameters {
3419 1 : count: ShardCount(2),
3420 1 : stripe_size: ShardStripeSize(1),
3421 1 : },
3422 : );
3423 :
3424 : // Only keys belonging to this shard are considered as gaps.
3425 1 : let mut previous_nblocks = 0;
3426 1 : let gaps =
3427 1 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
3428 1 : assert!(!gaps.ranges.is_empty());
3429 3 : for gap_range in gaps.ranges {
3430 2 : let mut k = gap_range.start;
3431 4 : while k != gap_range.end {
3432 2 : assert_eq!(shard.get_shard_number(&k), shard.number);
3433 2 : k = k.next();
3434 : }
3435 : }
3436 :
3437 1 : previous_nblocks = first_blkno;
3438 :
3439 1 : let update_blkno = 2;
3440 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3441 1 : assert!(gaps.is_none());
3442 1 : }
3443 :
3444 : /*
3445 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
3446 : let incremental = timeline.get_current_logical_size();
3447 : let non_incremental = timeline
3448 : .get_current_logical_size_non_incremental(lsn)
3449 : .unwrap();
3450 : assert_eq!(incremental, non_incremental);
3451 : }
3452 : */
3453 :
3454 : /*
3455 : ///
3456 : /// Test list_rels() function, with branches and dropped relations
3457 : ///
3458 : #[test]
3459 : fn test_list_rels_drop() -> Result<()> {
3460 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
3461 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
3462 : const TESTDB: u32 = 111;
3463 :
3464 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
3465 : // after branching fails below
3466 : let mut writer = tline.begin_record(Lsn(0x10));
3467 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
3468 : writer.finish()?;
3469 :
3470 : // Create a relation on the timeline
3471 : let mut writer = tline.begin_record(Lsn(0x20));
3472 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
3473 : writer.finish()?;
3474 :
3475 : let writer = tline.begin_record(Lsn(0x00));
3476 : writer.finish()?;
3477 :
3478 : // Check that list_rels() lists it after LSN 2, but no before it
3479 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
3480 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
3481 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
3482 :
3483 : // Create a branch, check that the relation is visible there
3484 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
3485 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
3486 : Some(timeline) => timeline,
3487 : None => panic!("Should have a local timeline"),
3488 : };
3489 : let newtline = DatadirTimelineImpl::new(newtline);
3490 : assert!(newtline
3491 : .list_rels(0, TESTDB, Lsn(0x30))?
3492 : .contains(&TESTREL_A));
3493 :
3494 : // Drop it on the branch
3495 : let mut new_writer = newtline.begin_record(Lsn(0x40));
3496 : new_writer.drop_relation(TESTREL_A)?;
3497 : new_writer.finish()?;
3498 :
3499 : // Check that it's no longer listed on the branch after the point where it was dropped
3500 : assert!(newtline
3501 : .list_rels(0, TESTDB, Lsn(0x30))?
3502 : .contains(&TESTREL_A));
3503 : assert!(!newtline
3504 : .list_rels(0, TESTDB, Lsn(0x40))?
3505 : .contains(&TESTREL_A));
3506 :
3507 : // Run checkpoint and garbage collection and check that it's still not visible
3508 : newtline.checkpoint(CheckpointConfig::Forced)?;
3509 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
3510 :
3511 : assert!(!newtline
3512 : .list_rels(0, TESTDB, Lsn(0x40))?
3513 : .contains(&TESTREL_A));
3514 :
3515 : Ok(())
3516 : }
3517 : */
3518 :
3519 : /*
3520 : #[test]
3521 : fn test_read_beyond_eof() -> Result<()> {
3522 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3523 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3524 :
3525 : make_some_layers(&tline, Lsn(0x20))?;
3526 : let mut writer = tline.begin_record(Lsn(0x60));
3527 : walingest.put_rel_page_image(
3528 : &mut writer,
3529 : TESTREL_A,
3530 : 0,
3531 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3532 : )?;
3533 : writer.finish()?;
3534 :
3535 : // Test read before rel creation. Should error out.
3536 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3537 :
3538 : // Read block beyond end of relation at different points in time.
3539 : // These reads should fall into different delta, image, and in-memory layers.
3540 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3541 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3542 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3543 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3544 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3545 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3546 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3547 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3548 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3549 :
3550 : // Test on an in-memory layer with no preceding layer
3551 : let mut writer = tline.begin_record(Lsn(0x70));
3552 : walingest.put_rel_page_image(
3553 : &mut writer,
3554 : TESTREL_B,
3555 : 0,
3556 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3557 : )?;
3558 : writer.finish()?;
3559 :
3560 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3561 :
3562 : Ok(())
3563 : }
3564 : */
3565 : }
|