Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 :
12 : use crate::walingest::{WalIngestError, WalIngestErrorKind};
13 : use crate::{PERF_TRACE_TARGET, ensure_walingest};
14 : use anyhow::Context;
15 : use bytes::{Buf, Bytes, BytesMut};
16 : use enum_map::Enum;
17 : use pageserver_api::key::{
18 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
19 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
20 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
21 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
22 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
23 : };
24 : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
25 : use pageserver_api::models::RelSizeMigration;
26 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
27 : use pageserver_api::shard::ShardIdentity;
28 : use postgres_ffi::{BLCKSZ, PgMajorVersion, TimestampTz, TransactionId};
29 : use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
30 : use postgres_ffi_types::{Oid, RepOriginId};
31 : use serde::{Deserialize, Serialize};
32 : use strum::IntoEnumIterator;
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::{debug, info, info_span, trace, warn};
35 : use utils::bin_ser::{BeSer, DeserializeError};
36 : use utils::lsn::Lsn;
37 : use utils::pausable_failpoint;
38 : use wal_decoder::models::record::NeonWalRecord;
39 : use wal_decoder::models::value::Value;
40 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
41 :
42 : use super::tenant::{PageReconstructError, Timeline};
43 : use crate::aux_file;
44 : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
45 : use crate::keyspace::{KeySpace, KeySpaceAccum};
46 : use crate::metrics::{
47 : RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
48 : RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
49 : RELSIZE_SNAPSHOT_CACHE_MISSES,
50 : };
51 : use crate::span::{
52 : debug_assert_current_span_has_tenant_and_timeline_id,
53 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
54 : };
55 : use crate::tenant::storage_layer::IoConcurrency;
56 : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
57 :
58 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
59 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
60 :
61 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
62 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
63 :
64 : #[derive(Debug)]
65 : pub enum LsnForTimestamp {
66 : /// Found commits both before and after the given timestamp
67 : Present(Lsn),
68 :
69 : /// Found no commits after the given timestamp, this means
70 : /// that the newest data in the branch is older than the given
71 : /// timestamp.
72 : ///
73 : /// All commits <= LSN happened before the given timestamp
74 : Future(Lsn),
75 :
76 : /// The queried timestamp is past our horizon we look back at (PITR)
77 : ///
78 : /// All commits > LSN happened after the given timestamp,
79 : /// but any commits < LSN might have happened before or after
80 : /// the given timestamp. We don't know because no data before
81 : /// the given lsn is available.
82 : Past(Lsn),
83 :
84 : /// We have found no commit with a timestamp,
85 : /// so we can't return anything meaningful.
86 : ///
87 : /// The associated LSN is the lower bound value we can safely
88 : /// create branches on, but no statement is made if it is
89 : /// older or newer than the timestamp.
90 : ///
91 : /// This variant can e.g. be returned right after a
92 : /// cluster import.
93 : NoData(Lsn),
94 : }
95 :
96 : /// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
97 : /// See comments libs/pageserver_api/src/models.rs.
98 : /// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
99 : /// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
100 : #[derive(Debug, Clone, Copy, Default)]
101 : pub struct LsnRange {
102 : pub effective_lsn: Lsn,
103 : pub request_lsn: Lsn,
104 : }
105 :
106 : impl LsnRange {
107 0 : pub fn at(lsn: Lsn) -> LsnRange {
108 0 : LsnRange {
109 0 : effective_lsn: lsn,
110 0 : request_lsn: lsn,
111 0 : }
112 0 : }
113 16 : pub fn is_latest(&self) -> bool {
114 16 : self.request_lsn == Lsn::MAX
115 16 : }
116 : }
117 :
118 : #[derive(Debug, thiserror::Error)]
119 : pub(crate) enum CalculateLogicalSizeError {
120 : #[error("cancelled")]
121 : Cancelled,
122 :
123 : /// Something went wrong while reading the metadata we use to calculate logical size
124 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
125 : /// in the `From` implementation for this variant.
126 : #[error(transparent)]
127 : PageRead(PageReconstructError),
128 :
129 : /// Something went wrong deserializing metadata that we read to calculate logical size
130 : #[error("decode error: {0}")]
131 : Decode(#[from] DeserializeError),
132 : }
133 :
134 : #[derive(Debug, thiserror::Error)]
135 : pub(crate) enum CollectKeySpaceError {
136 : #[error(transparent)]
137 : Decode(#[from] DeserializeError),
138 : #[error(transparent)]
139 : PageRead(PageReconstructError),
140 : #[error("cancelled")]
141 : Cancelled,
142 : }
143 :
144 : impl From<PageReconstructError> for CollectKeySpaceError {
145 0 : fn from(err: PageReconstructError) -> Self {
146 0 : match err {
147 0 : PageReconstructError::Cancelled => Self::Cancelled,
148 0 : err => Self::PageRead(err),
149 : }
150 0 : }
151 : }
152 :
153 : impl From<PageReconstructError> for CalculateLogicalSizeError {
154 0 : fn from(pre: PageReconstructError) -> Self {
155 0 : match pre {
156 0 : PageReconstructError::Cancelled => Self::Cancelled,
157 0 : _ => Self::PageRead(pre),
158 : }
159 0 : }
160 : }
161 :
162 : #[derive(Debug, thiserror::Error)]
163 : pub enum RelationError {
164 : #[error("invalid relnode")]
165 : InvalidRelnode,
166 : }
167 :
168 : ///
169 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
170 : /// and other special kinds of files, in a versioned key-value store. The
171 : /// Timeline struct provides the key-value store.
172 : ///
173 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
174 : /// implementation, and might be moved into a separate struct later.
175 : impl Timeline {
176 : /// Start ingesting a WAL record, or other atomic modification of
177 : /// the timeline.
178 : ///
179 : /// This provides a transaction-like interface to perform a bunch
180 : /// of modifications atomically.
181 : ///
182 : /// To ingest a WAL record, call begin_modification(lsn) to get a
183 : /// DatadirModification object. Use the functions in the object to
184 : /// modify the repository state, updating all the pages and metadata
185 : /// that the WAL record affects. When you're done, call commit() to
186 : /// commit the changes.
187 : ///
188 : /// Lsn stored in modification is advanced by `ingest_record` and
189 : /// is used by `commit()` to update `last_record_lsn`.
190 : ///
191 : /// Calling commit() will flush all the changes and reset the state,
192 : /// so the `DatadirModification` struct can be reused to perform the next modification.
193 : ///
194 : /// Note that any pending modifications you make through the
195 : /// modification object won't be visible to calls to the 'get' and list
196 : /// functions of the timeline until you finish! And if you update the
197 : /// same page twice, the last update wins.
198 : ///
199 134214 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
200 134214 : where
201 134214 : Self: Sized,
202 : {
203 134214 : DatadirModification {
204 134214 : tline: self,
205 134214 : pending_lsns: Vec::new(),
206 134214 : pending_metadata_pages: HashMap::new(),
207 134214 : pending_data_batch: None,
208 134214 : pending_deletions: Vec::new(),
209 134214 : pending_nblocks: 0,
210 134214 : pending_directory_entries: Vec::new(),
211 134214 : pending_metadata_bytes: 0,
212 134214 : lsn,
213 134214 : }
214 134214 : }
215 :
216 : //------------------------------------------------------------------------------
217 : // Public GET functions
218 : //------------------------------------------------------------------------------
219 :
220 : /// Look up given page version.
221 9192 : pub(crate) async fn get_rel_page_at_lsn(
222 9192 : &self,
223 9192 : tag: RelTag,
224 9192 : blknum: BlockNumber,
225 9192 : version: Version<'_>,
226 9192 : ctx: &RequestContext,
227 9192 : io_concurrency: IoConcurrency,
228 9192 : ) -> Result<Bytes, PageReconstructError> {
229 9192 : match version {
230 9192 : Version::LsnRange(lsns) => {
231 9192 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
232 9192 : let res = self
233 9192 : .get_rel_page_at_lsn_batched(
234 9192 : pages
235 9192 : .iter()
236 9192 : .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
237 9192 : io_concurrency.clone(),
238 9192 : ctx,
239 : )
240 9192 : .await;
241 9192 : assert_eq!(res.len(), 1);
242 9192 : res.into_iter().next().unwrap()
243 : }
244 0 : Version::Modified(modification) => {
245 0 : if tag.relnode == 0 {
246 0 : return Err(PageReconstructError::Other(
247 0 : RelationError::InvalidRelnode.into(),
248 0 : ));
249 0 : }
250 :
251 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
252 0 : if blknum >= nblocks {
253 0 : debug!(
254 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
255 : tag,
256 : blknum,
257 0 : version.get_lsn(),
258 : nblocks
259 : );
260 0 : return Ok(ZERO_PAGE.clone());
261 0 : }
262 :
263 0 : let key = rel_block_to_key(tag, blknum);
264 0 : modification.get(key, ctx).await
265 : }
266 : }
267 9192 : }
268 :
269 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
270 : ///
271 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
272 9192 : pub(crate) async fn get_rel_page_at_lsn_batched(
273 9192 : &self,
274 9192 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
275 9192 : io_concurrency: IoConcurrency,
276 9192 : ctx: &RequestContext,
277 9192 : ) -> Vec<Result<Bytes, PageReconstructError>> {
278 9192 : debug_assert_current_span_has_tenant_and_timeline_id();
279 :
280 9192 : let mut slots_filled = 0;
281 9192 : let page_count = pages.len();
282 :
283 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
284 9192 : let mut result = Vec::with_capacity(pages.len());
285 9192 : let result_slots = result.spare_capacity_mut();
286 :
287 9192 : let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
288 9192 : HashMap::with_capacity(pages.len());
289 :
290 9192 : let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
291 9192 : HashMap::with_capacity(pages.len());
292 :
293 9192 : for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
294 9192 : if tag.relnode == 0 {
295 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
296 0 : RelationError::InvalidRelnode.into(),
297 0 : )));
298 :
299 0 : slots_filled += 1;
300 0 : continue;
301 9192 : }
302 9192 : let lsn = lsns.effective_lsn;
303 9192 : let nblocks = {
304 9192 : let ctx = RequestContextBuilder::from(&ctx)
305 9192 : .perf_span(|crnt_perf_span| {
306 0 : info_span!(
307 : target: PERF_TRACE_TARGET,
308 0 : parent: crnt_perf_span,
309 : "GET_REL_SIZE",
310 : reltag=%tag,
311 : lsn=%lsn,
312 : )
313 0 : })
314 9192 : .attached_child();
315 :
316 9192 : match self
317 9192 : .get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
318 9192 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
319 9192 : .await
320 : {
321 9192 : Ok(nblocks) => nblocks,
322 0 : Err(err) => {
323 0 : result_slots[response_slot_idx].write(Err(err));
324 0 : slots_filled += 1;
325 0 : continue;
326 : }
327 : }
328 : };
329 :
330 9192 : if *blknum >= nblocks {
331 0 : debug!(
332 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
333 : tag, blknum, lsn, nblocks
334 : );
335 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
336 0 : slots_filled += 1;
337 0 : continue;
338 9192 : }
339 :
340 9192 : let key = rel_block_to_key(*tag, *blknum);
341 :
342 9192 : let ctx = RequestContextBuilder::from(&ctx)
343 9192 : .perf_span(|crnt_perf_span| {
344 0 : info_span!(
345 : target: PERF_TRACE_TARGET,
346 0 : parent: crnt_perf_span,
347 : "GET_BATCH",
348 : batch_size = %page_count,
349 : )
350 0 : })
351 9192 : .attached_child();
352 :
353 9192 : let key_slots = keys_slots.entry(key).or_default();
354 9192 : key_slots.push((response_slot_idx, ctx));
355 :
356 9192 : let acc = req_keyspaces.entry(lsn).or_default();
357 9192 : acc.add_key(key);
358 : }
359 :
360 9192 : let query: Vec<(Lsn, KeySpace)> = req_keyspaces
361 9192 : .into_iter()
362 9192 : .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
363 9192 : .collect();
364 :
365 9192 : let query = VersionedKeySpaceQuery::scattered(query);
366 9192 : let res = self
367 9192 : .get_vectored(query, io_concurrency, ctx)
368 9192 : .maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
369 9192 : .await;
370 :
371 9192 : match res {
372 9192 : Ok(results) => {
373 18384 : for (key, res) in results {
374 9192 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
375 9192 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
376 :
377 9192 : for (slot, req_ctx) in key_slots {
378 0 : let clone = match &res {
379 0 : Ok(buf) => Ok(buf.clone()),
380 0 : Err(err) => Err(match err {
381 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
382 :
383 0 : x @ PageReconstructError::Other(_)
384 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
385 0 : | x @ PageReconstructError::WalRedo(_)
386 0 : | x @ PageReconstructError::MissingKey(_) => {
387 0 : PageReconstructError::Other(anyhow::anyhow!(
388 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
389 0 : ))
390 : }
391 : }),
392 : };
393 :
394 0 : result_slots[slot].write(clone);
395 : // There is no standardized way to express that the batched span followed from N request spans.
396 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
397 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
398 0 : req_ctx.perf_follows_from(ctx);
399 0 : slots_filled += 1;
400 : }
401 :
402 9192 : result_slots[first_slot].write(res);
403 9192 : first_req_ctx.perf_follows_from(ctx);
404 9192 : slots_filled += 1;
405 : }
406 : }
407 0 : Err(err) => {
408 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
409 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
410 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
411 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
412 : // but without taking ownership of the GetVectoredError
413 0 : let err = match &err {
414 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
415 : // TODO: restructure get_vectored API to make this error per-key
416 0 : GetVectoredError::MissingKey(err) => {
417 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
418 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
419 0 : )))
420 : }
421 : // TODO: restructure get_vectored API to make this error per-key
422 0 : GetVectoredError::GetReadyAncestorError(err) => {
423 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
424 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
425 0 : )))
426 : }
427 : // TODO: restructure get_vectored API to make this error per-key
428 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
429 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
430 0 : )),
431 : // TODO: we can prevent this error class by moving this check into the type system
432 0 : GetVectoredError::InvalidLsn(e) => {
433 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
434 : }
435 : // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys
436 : // TODO: we can prevent this error class by moving this check into the type system
437 0 : GetVectoredError::Oversized(err, max) => {
438 0 : Err(anyhow::anyhow!("batching oversized: {err} > {max}").into())
439 : }
440 : };
441 :
442 0 : req_ctx.perf_follows_from(ctx);
443 0 : result_slots[*slot].write(err);
444 : }
445 :
446 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
447 : }
448 : };
449 :
450 9192 : assert_eq!(slots_filled, page_count);
451 : // SAFETY:
452 : // 1. `result` and any of its uninint members are not read from until this point
453 : // 2. The length below is tracked at run-time and matches the number of requested pages.
454 9192 : unsafe {
455 9192 : result.set_len(page_count);
456 9192 : }
457 :
458 9192 : result
459 9192 : }
460 :
461 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
462 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
463 : /// for pages up to the highest page number it has stored.
464 0 : pub(crate) async fn get_db_size(
465 0 : &self,
466 0 : spcnode: Oid,
467 0 : dbnode: Oid,
468 0 : version: Version<'_>,
469 0 : ctx: &RequestContext,
470 0 : ) -> Result<usize, PageReconstructError> {
471 0 : let mut total_blocks = 0;
472 :
473 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
474 :
475 0 : if rels.is_empty() {
476 0 : return Ok(0);
477 0 : }
478 :
479 : // Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`.
480 0 : let reldir_key = rel_dir_to_key(spcnode, dbnode);
481 0 : let buf = version.get(self, reldir_key, ctx).await?;
482 0 : let reldir = RelDirectory::des(&buf)?;
483 :
484 0 : for rel in rels {
485 0 : let n_blocks = self
486 0 : .get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), ctx)
487 0 : .await?;
488 0 : total_blocks += n_blocks as usize;
489 : }
490 0 : Ok(total_blocks)
491 0 : }
492 :
493 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
494 : ///
495 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
496 : /// page number stored in the shard.
497 12217 : pub(crate) async fn get_rel_size(
498 12217 : &self,
499 12217 : tag: RelTag,
500 12217 : version: Version<'_>,
501 12217 : ctx: &RequestContext,
502 12217 : ) -> Result<BlockNumber, PageReconstructError> {
503 12217 : self.get_rel_size_in_reldir(tag, version, None, ctx).await
504 12217 : }
505 :
506 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
507 : ///
508 : /// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
509 12217 : pub(crate) async fn get_rel_size_in_reldir(
510 12217 : &self,
511 12217 : tag: RelTag,
512 12217 : version: Version<'_>,
513 12217 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
514 12217 : ctx: &RequestContext,
515 12217 : ) -> Result<BlockNumber, PageReconstructError> {
516 12217 : if tag.relnode == 0 {
517 0 : return Err(PageReconstructError::Other(
518 0 : RelationError::InvalidRelnode.into(),
519 0 : ));
520 12217 : }
521 :
522 12217 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
523 12210 : return Ok(nblocks);
524 7 : }
525 :
526 7 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
527 0 : && !self
528 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
529 0 : .await?
530 : {
531 : // FIXME: Postgres sometimes calls smgrcreate() to create
532 : // FSM, and smgrnblocks() on it immediately afterwards,
533 : // without extending it. Tolerate that by claiming that
534 : // any non-existent FSM fork has size 0.
535 0 : return Ok(0);
536 7 : }
537 :
538 7 : let key = rel_size_to_key(tag);
539 7 : let mut buf = version.get(self, key, ctx).await?;
540 5 : let nblocks = buf.get_u32_le();
541 :
542 5 : self.update_cached_rel_size(tag, version, nblocks);
543 :
544 5 : Ok(nblocks)
545 12217 : }
546 :
547 : /// Does the relation exist?
548 : ///
549 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
550 : /// the shard stores pages for.
551 : ///
552 3025 : pub(crate) async fn get_rel_exists(
553 3025 : &self,
554 3025 : tag: RelTag,
555 3025 : version: Version<'_>,
556 3025 : ctx: &RequestContext,
557 3025 : ) -> Result<bool, PageReconstructError> {
558 3025 : self.get_rel_exists_in_reldir(tag, version, None, ctx).await
559 3025 : }
560 :
561 : /// Does the relation exist? With a cached deserialized `RelDirectory`.
562 : ///
563 : /// There are some cases where the caller loops across all relations. In that specific case,
564 : /// the caller should obtain the deserialized `RelDirectory` first and then call this function
565 : /// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing
566 : /// a new API (e.g., `get_rel_exists_batched`).
567 3025 : pub(crate) async fn get_rel_exists_in_reldir(
568 3025 : &self,
569 3025 : tag: RelTag,
570 3025 : version: Version<'_>,
571 3025 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
572 3025 : ctx: &RequestContext,
573 3025 : ) -> Result<bool, PageReconstructError> {
574 3025 : if tag.relnode == 0 {
575 0 : return Err(PageReconstructError::Other(
576 0 : RelationError::InvalidRelnode.into(),
577 0 : ));
578 3025 : }
579 :
580 : // first try to lookup relation in cache
581 3025 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
582 3016 : return Ok(true);
583 9 : }
584 : // then check if the database was already initialized.
585 : // get_rel_exists can be called before dbdir is created.
586 9 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
587 9 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
588 9 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
589 0 : return Ok(false);
590 9 : }
591 :
592 : // Read path: first read the new reldir keyspace. Early return if the relation exists.
593 : // Otherwise, read the old reldir keyspace.
594 : // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
595 :
596 : if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
597 9 : self.get_rel_size_v2_status()
598 : {
599 : // fetch directory listing (new)
600 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
601 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
602 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
603 0 : let exists_v2 = buf == RelDirExists::Exists;
604 : // Fast path: if the relation exists in the new format, return true.
605 : // TODO: we should have a verification mode that checks both keyspaces
606 : // to ensure the relation only exists in one of them.
607 0 : if exists_v2 {
608 0 : return Ok(true);
609 0 : }
610 9 : }
611 :
612 : // fetch directory listing (old)
613 :
614 9 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
615 :
616 9 : if let Some((cached_key, dir)) = deserialized_reldir_v1 {
617 0 : if cached_key == key {
618 0 : return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
619 0 : } else if cfg!(test) || cfg!(feature = "testing") {
620 0 : panic!("cached reldir key mismatch: {cached_key} != {key}");
621 : } else {
622 0 : warn!("cached reldir key mismatch: {cached_key} != {key}");
623 : }
624 : // Fallback to reading the directory from the datadir.
625 9 : }
626 9 : let buf = version.get(self, key, ctx).await?;
627 :
628 9 : let dir = RelDirectory::des(&buf)?;
629 9 : let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
630 9 : Ok(exists_v1)
631 3025 : }
632 :
633 : /// Get a list of all existing relations in given tablespace and database.
634 : ///
635 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
636 : /// the shard stores pages for.
637 : ///
638 : /// # Cancel-Safety
639 : ///
640 : /// This method is cancellation-safe.
641 0 : pub(crate) async fn list_rels(
642 0 : &self,
643 0 : spcnode: Oid,
644 0 : dbnode: Oid,
645 0 : version: Version<'_>,
646 0 : ctx: &RequestContext,
647 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
648 : // fetch directory listing (old)
649 0 : let key = rel_dir_to_key(spcnode, dbnode);
650 0 : let buf = version.get(self, key, ctx).await?;
651 :
652 0 : let dir = RelDirectory::des(&buf)?;
653 0 : let rels_v1: HashSet<RelTag> =
654 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
655 0 : spcnode,
656 0 : dbnode,
657 0 : relnode: *relnode,
658 0 : forknum: *forknum,
659 0 : }));
660 :
661 0 : if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
662 0 : return Ok(rels_v1);
663 0 : }
664 :
665 : // scan directory listing (new), merge with the old results
666 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
667 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
668 0 : self.conf.get_vectored_concurrent_io,
669 0 : self.gate
670 0 : .enter()
671 0 : .map_err(|_| PageReconstructError::Cancelled)?,
672 : );
673 0 : let results = self
674 0 : .scan(
675 0 : KeySpace::single(key_range),
676 0 : version.get_lsn(),
677 0 : ctx,
678 0 : io_concurrency,
679 0 : )
680 0 : .await?;
681 0 : let mut rels = rels_v1;
682 0 : for (key, val) in results {
683 0 : let val = RelDirExists::decode(&val?)
684 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
685 0 : assert_eq!(key.field6, 1);
686 0 : assert_eq!(key.field2, spcnode);
687 0 : assert_eq!(key.field3, dbnode);
688 0 : let tag = RelTag {
689 0 : spcnode,
690 0 : dbnode,
691 0 : relnode: key.field4,
692 0 : forknum: key.field5,
693 0 : };
694 0 : if val == RelDirExists::Removed {
695 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
696 0 : continue;
697 0 : }
698 0 : let did_not_contain = rels.insert(tag);
699 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
700 : }
701 0 : Ok(rels)
702 0 : }
703 :
704 : /// Get the whole SLRU segment
705 0 : pub(crate) async fn get_slru_segment(
706 0 : &self,
707 0 : kind: SlruKind,
708 0 : segno: u32,
709 0 : lsn: Lsn,
710 0 : ctx: &RequestContext,
711 0 : ) -> Result<Bytes, PageReconstructError> {
712 0 : assert!(self.tenant_shard_id.is_shard_zero());
713 0 : let n_blocks = self
714 0 : .get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
715 0 : .await?;
716 :
717 0 : let keyspace = KeySpace::single(
718 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
719 : );
720 :
721 0 : let batches = keyspace.partition(
722 0 : self.get_shard_identity(),
723 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
724 0 : BLCKSZ as u64,
725 : );
726 :
727 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
728 0 : self.conf.get_vectored_concurrent_io,
729 0 : self.gate
730 0 : .enter()
731 0 : .map_err(|_| PageReconstructError::Cancelled)?,
732 : );
733 :
734 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
735 0 : for batch in batches.parts {
736 0 : let query = VersionedKeySpaceQuery::uniform(batch, lsn);
737 0 : let blocks = self
738 0 : .get_vectored(query, io_concurrency.clone(), ctx)
739 0 : .await?;
740 :
741 0 : for (_key, block) in blocks {
742 0 : let block = block?;
743 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
744 : }
745 : }
746 :
747 0 : Ok(segment.freeze())
748 0 : }
749 :
750 : /// Get size of an SLRU segment
751 0 : pub(crate) async fn get_slru_segment_size(
752 0 : &self,
753 0 : kind: SlruKind,
754 0 : segno: u32,
755 0 : version: Version<'_>,
756 0 : ctx: &RequestContext,
757 0 : ) -> Result<BlockNumber, PageReconstructError> {
758 0 : assert!(self.tenant_shard_id.is_shard_zero());
759 0 : let key = slru_segment_size_to_key(kind, segno);
760 0 : let mut buf = version.get(self, key, ctx).await?;
761 0 : Ok(buf.get_u32_le())
762 0 : }
763 :
764 : /// Does the slru segment exist?
765 0 : pub(crate) async fn get_slru_segment_exists(
766 0 : &self,
767 0 : kind: SlruKind,
768 0 : segno: u32,
769 0 : version: Version<'_>,
770 0 : ctx: &RequestContext,
771 0 : ) -> Result<bool, PageReconstructError> {
772 0 : assert!(self.tenant_shard_id.is_shard_zero());
773 : // fetch directory listing
774 0 : let key = slru_dir_to_key(kind);
775 0 : let buf = version.get(self, key, ctx).await?;
776 :
777 0 : let dir = SlruSegmentDirectory::des(&buf)?;
778 0 : Ok(dir.segments.contains(&segno))
779 0 : }
780 :
781 : /// Locate LSN, such that all transactions that committed before
782 : /// 'search_timestamp' are visible, but nothing newer is.
783 : ///
784 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
785 : /// so it's not well defined which LSN you get if there were multiple commits
786 : /// "in flight" at that point in time.
787 : ///
788 0 : pub(crate) async fn find_lsn_for_timestamp(
789 0 : &self,
790 0 : search_timestamp: TimestampTz,
791 0 : cancel: &CancellationToken,
792 0 : ctx: &RequestContext,
793 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
794 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
795 :
796 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
797 0 : let gc_cutoff_planned = {
798 0 : let gc_info = self.gc_info.read().unwrap();
799 0 : gc_info.min_cutoff()
800 : };
801 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
802 : // but let's be defensive.
803 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
804 : // We use this method to figure out the branching LSN for the new branch, but the
805 : // GC cutoff could be before the branching point and we cannot create a new branch
806 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
807 : // on the safe side.
808 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
809 0 : let max_lsn = self.get_last_record_lsn();
810 :
811 : // LSNs are always 8-byte aligned. low/mid/high represent the
812 : // LSN divided by 8.
813 0 : let mut low = min_lsn.0 / 8;
814 0 : let mut high = max_lsn.0 / 8 + 1;
815 :
816 0 : let mut found_smaller = false;
817 0 : let mut found_larger = false;
818 :
819 0 : while low < high {
820 0 : if cancel.is_cancelled() {
821 0 : return Err(PageReconstructError::Cancelled);
822 0 : }
823 : // cannot overflow, high and low are both smaller than u64::MAX / 2
824 0 : let mid = (high + low) / 2;
825 :
826 0 : let cmp = match self
827 0 : .is_latest_commit_timestamp_ge_than(
828 0 : search_timestamp,
829 0 : Lsn(mid * 8),
830 0 : &mut found_smaller,
831 0 : &mut found_larger,
832 0 : ctx,
833 : )
834 0 : .await
835 : {
836 0 : Ok(res) => res,
837 0 : Err(PageReconstructError::MissingKey(e)) => {
838 0 : warn!(
839 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
840 : e
841 : );
842 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
843 0 : return Ok(LsnForTimestamp::Past(min_lsn));
844 : }
845 0 : Err(e) => return Err(e),
846 : };
847 :
848 0 : if cmp {
849 0 : high = mid;
850 0 : } else {
851 0 : low = mid + 1;
852 0 : }
853 : }
854 :
855 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
856 : // so the LSN of the last commit record before or at `search_timestamp`.
857 : // Remove one from `low` to get `t`.
858 : //
859 : // FIXME: it would be better to get the LSN of the previous commit.
860 : // Otherwise, if you restore to the returned LSN, the database will
861 : // include physical changes from later commits that will be marked
862 : // as aborted, and will need to be vacuumed away.
863 0 : let commit_lsn = Lsn((low - 1) * 8);
864 0 : match (found_smaller, found_larger) {
865 : (false, false) => {
866 : // This can happen if no commit records have been processed yet, e.g.
867 : // just after importing a cluster.
868 0 : Ok(LsnForTimestamp::NoData(min_lsn))
869 : }
870 : (false, true) => {
871 : // Didn't find any commit timestamps smaller than the request
872 0 : Ok(LsnForTimestamp::Past(min_lsn))
873 : }
874 0 : (true, _) if commit_lsn < min_lsn => {
875 : // the search above did set found_smaller to true but it never increased the lsn.
876 : // Then, low is still the old min_lsn, and the subtraction above gave a value
877 : // below the min_lsn. We should never do that.
878 0 : Ok(LsnForTimestamp::Past(min_lsn))
879 : }
880 : (true, false) => {
881 : // Only found commits with timestamps smaller than the request.
882 : // It's still a valid case for branch creation, return it.
883 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
884 : // case, anyway.
885 0 : Ok(LsnForTimestamp::Future(commit_lsn))
886 : }
887 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
888 : }
889 0 : }
890 :
891 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
892 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
893 : ///
894 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
895 : /// with a smaller/larger timestamp.
896 : ///
897 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
898 0 : &self,
899 0 : search_timestamp: TimestampTz,
900 0 : probe_lsn: Lsn,
901 0 : found_smaller: &mut bool,
902 0 : found_larger: &mut bool,
903 0 : ctx: &RequestContext,
904 0 : ) -> Result<bool, PageReconstructError> {
905 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
906 0 : if timestamp >= search_timestamp {
907 0 : *found_larger = true;
908 0 : return ControlFlow::Break(true);
909 0 : } else {
910 0 : *found_smaller = true;
911 0 : }
912 0 : ControlFlow::Continue(())
913 0 : })
914 0 : .await
915 0 : }
916 :
917 : /// Obtain the timestamp for the given lsn.
918 : ///
919 : /// If the lsn has no timestamps (e.g. no commits), returns None.
920 0 : pub(crate) async fn get_timestamp_for_lsn(
921 0 : &self,
922 0 : probe_lsn: Lsn,
923 0 : ctx: &RequestContext,
924 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
925 0 : let mut max: Option<TimestampTz> = None;
926 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
927 0 : if let Some(max_prev) = max {
928 0 : max = Some(max_prev.max(timestamp));
929 0 : } else {
930 0 : max = Some(timestamp);
931 0 : }
932 0 : ControlFlow::Continue(())
933 0 : })
934 0 : .await?;
935 :
936 0 : Ok(max)
937 0 : }
938 :
939 : /// Runs the given function on all the timestamps for a given lsn
940 : ///
941 : /// The return value is either given by the closure, or set to the `Default`
942 : /// impl's output.
943 0 : async fn map_all_timestamps<T: Default>(
944 0 : &self,
945 0 : probe_lsn: Lsn,
946 0 : ctx: &RequestContext,
947 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
948 0 : ) -> Result<T, PageReconstructError> {
949 0 : for segno in self
950 0 : .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
951 0 : .await?
952 : {
953 0 : let nblocks = self
954 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
955 0 : .await?;
956 :
957 0 : let keyspace = KeySpace::single(
958 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
959 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
960 : );
961 :
962 0 : let batches = keyspace.partition(
963 0 : self.get_shard_identity(),
964 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
965 0 : BLCKSZ as u64,
966 : );
967 :
968 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
969 0 : self.conf.get_vectored_concurrent_io,
970 0 : self.gate
971 0 : .enter()
972 0 : .map_err(|_| PageReconstructError::Cancelled)?,
973 : );
974 :
975 0 : for batch in batches.parts.into_iter().rev() {
976 0 : let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
977 0 : let blocks = self
978 0 : .get_vectored(query, io_concurrency.clone(), ctx)
979 0 : .await?;
980 :
981 0 : for (_key, clog_page) in blocks.into_iter().rev() {
982 0 : let clog_page = clog_page?;
983 :
984 0 : if clog_page.len() == BLCKSZ as usize + 8 {
985 0 : let mut timestamp_bytes = [0u8; 8];
986 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
987 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
988 :
989 0 : match f(timestamp) {
990 0 : ControlFlow::Break(b) => return Ok(b),
991 0 : ControlFlow::Continue(()) => (),
992 : }
993 0 : }
994 : }
995 : }
996 : }
997 0 : Ok(Default::default())
998 0 : }
999 :
1000 0 : pub(crate) async fn get_slru_keyspace(
1001 0 : &self,
1002 0 : version: Version<'_>,
1003 0 : ctx: &RequestContext,
1004 0 : ) -> Result<KeySpace, PageReconstructError> {
1005 0 : let mut accum = KeySpaceAccum::new();
1006 :
1007 0 : for kind in SlruKind::iter() {
1008 0 : let mut segments: Vec<u32> = self
1009 0 : .list_slru_segments(kind, version, ctx)
1010 0 : .await?
1011 0 : .into_iter()
1012 0 : .collect();
1013 0 : segments.sort_unstable();
1014 :
1015 0 : for seg in segments {
1016 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
1017 :
1018 0 : accum.add_range(
1019 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
1020 : );
1021 : }
1022 : }
1023 :
1024 0 : Ok(accum.to_keyspace())
1025 0 : }
1026 :
1027 : /// Get a list of SLRU segments
1028 0 : pub(crate) async fn list_slru_segments(
1029 0 : &self,
1030 0 : kind: SlruKind,
1031 0 : version: Version<'_>,
1032 0 : ctx: &RequestContext,
1033 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
1034 : // fetch directory entry
1035 0 : let key = slru_dir_to_key(kind);
1036 :
1037 0 : let buf = version.get(self, key, ctx).await?;
1038 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
1039 0 : }
1040 :
1041 0 : pub(crate) async fn get_relmap_file(
1042 0 : &self,
1043 0 : spcnode: Oid,
1044 0 : dbnode: Oid,
1045 0 : version: Version<'_>,
1046 0 : ctx: &RequestContext,
1047 0 : ) -> Result<Bytes, PageReconstructError> {
1048 0 : let key = relmap_file_key(spcnode, dbnode);
1049 :
1050 0 : let buf = version.get(self, key, ctx).await?;
1051 0 : Ok(buf)
1052 0 : }
1053 :
1054 168 : pub(crate) async fn list_dbdirs(
1055 168 : &self,
1056 168 : lsn: Lsn,
1057 168 : ctx: &RequestContext,
1058 168 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
1059 : // fetch directory entry
1060 168 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1061 :
1062 168 : Ok(DbDirectory::des(&buf)?.dbdirs)
1063 168 : }
1064 :
1065 0 : pub(crate) async fn get_twophase_file(
1066 0 : &self,
1067 0 : xid: u64,
1068 0 : lsn: Lsn,
1069 0 : ctx: &RequestContext,
1070 0 : ) -> Result<Bytes, PageReconstructError> {
1071 0 : let key = twophase_file_key(xid);
1072 0 : let buf = self.get(key, lsn, ctx).await?;
1073 0 : Ok(buf)
1074 0 : }
1075 :
1076 169 : pub(crate) async fn list_twophase_files(
1077 169 : &self,
1078 169 : lsn: Lsn,
1079 169 : ctx: &RequestContext,
1080 169 : ) -> Result<HashSet<u64>, PageReconstructError> {
1081 : // fetch directory entry
1082 169 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
1083 :
1084 169 : if self.pg_version >= PgMajorVersion::PG17 {
1085 156 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
1086 : } else {
1087 13 : Ok(TwoPhaseDirectory::des(&buf)?
1088 : .xids
1089 13 : .iter()
1090 13 : .map(|x| u64::from(*x))
1091 13 : .collect())
1092 : }
1093 169 : }
1094 :
1095 0 : pub(crate) async fn get_control_file(
1096 0 : &self,
1097 0 : lsn: Lsn,
1098 0 : ctx: &RequestContext,
1099 0 : ) -> Result<Bytes, PageReconstructError> {
1100 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1101 0 : }
1102 :
1103 6 : pub(crate) async fn get_checkpoint(
1104 6 : &self,
1105 6 : lsn: Lsn,
1106 6 : ctx: &RequestContext,
1107 6 : ) -> Result<Bytes, PageReconstructError> {
1108 6 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1109 6 : }
1110 :
1111 6 : async fn list_aux_files_v2(
1112 6 : &self,
1113 6 : lsn: Lsn,
1114 6 : ctx: &RequestContext,
1115 6 : io_concurrency: IoConcurrency,
1116 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1117 6 : let kv = self
1118 6 : .scan(
1119 6 : KeySpace::single(Key::metadata_aux_key_range()),
1120 6 : lsn,
1121 6 : ctx,
1122 6 : io_concurrency,
1123 6 : )
1124 6 : .await?;
1125 6 : let mut result = HashMap::new();
1126 6 : let mut sz = 0;
1127 15 : for (_, v) in kv {
1128 9 : let v = v?;
1129 9 : let v = aux_file::decode_file_value_bytes(&v)
1130 9 : .context("value decode")
1131 9 : .map_err(PageReconstructError::Other)?;
1132 17 : for (fname, content) in v {
1133 8 : sz += fname.len();
1134 8 : sz += content.len();
1135 8 : result.insert(fname, content);
1136 8 : }
1137 : }
1138 6 : self.aux_file_size_estimator.on_initial(sz);
1139 6 : Ok(result)
1140 6 : }
1141 :
1142 0 : pub(crate) async fn trigger_aux_file_size_computation(
1143 0 : &self,
1144 0 : lsn: Lsn,
1145 0 : ctx: &RequestContext,
1146 0 : io_concurrency: IoConcurrency,
1147 0 : ) -> Result<(), PageReconstructError> {
1148 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1149 0 : Ok(())
1150 0 : }
1151 :
1152 6 : pub(crate) async fn list_aux_files(
1153 6 : &self,
1154 6 : lsn: Lsn,
1155 6 : ctx: &RequestContext,
1156 6 : io_concurrency: IoConcurrency,
1157 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1158 6 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1159 6 : }
1160 :
1161 2 : pub(crate) async fn get_replorigins(
1162 2 : &self,
1163 2 : lsn: Lsn,
1164 2 : ctx: &RequestContext,
1165 2 : io_concurrency: IoConcurrency,
1166 2 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1167 2 : let kv = self
1168 2 : .scan(
1169 2 : KeySpace::single(repl_origin_key_range()),
1170 2 : lsn,
1171 2 : ctx,
1172 2 : io_concurrency,
1173 2 : )
1174 2 : .await?;
1175 2 : let mut result = HashMap::new();
1176 6 : for (k, v) in kv {
1177 5 : let v = v?;
1178 5 : if v.is_empty() {
1179 : // This is a tombstone -- we can skip it.
1180 : // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
1181 : // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
1182 : // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
1183 : // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
1184 2 : continue;
1185 3 : }
1186 3 : let origin_id = k.field6 as RepOriginId;
1187 3 : let origin_lsn = Lsn::des(&v)
1188 3 : .with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
1189 2 : if origin_lsn != Lsn::INVALID {
1190 2 : result.insert(origin_id, origin_lsn);
1191 2 : }
1192 : }
1193 1 : Ok(result)
1194 2 : }
1195 :
1196 : /// Does the same as get_current_logical_size but counted on demand.
1197 : /// Used to initialize the logical size tracking on startup.
1198 : ///
1199 : /// Only relation blocks are counted currently. That excludes metadata,
1200 : /// SLRUs, twophase files etc.
1201 : ///
1202 : /// # Cancel-Safety
1203 : ///
1204 : /// This method is cancellation-safe.
1205 7 : pub(crate) async fn get_current_logical_size_non_incremental(
1206 7 : &self,
1207 7 : lsn: Lsn,
1208 7 : ctx: &RequestContext,
1209 7 : ) -> Result<u64, CalculateLogicalSizeError> {
1210 7 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1211 :
1212 7 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1213 :
1214 : // Fetch list of database dirs and iterate them
1215 7 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1216 7 : let dbdir = DbDirectory::des(&buf)?;
1217 :
1218 7 : let mut total_size: u64 = 0;
1219 7 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1220 0 : for rel in self
1221 0 : .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
1222 0 : .await?
1223 : {
1224 0 : if self.cancel.is_cancelled() {
1225 0 : return Err(CalculateLogicalSizeError::Cancelled);
1226 0 : }
1227 0 : let relsize_key = rel_size_to_key(rel);
1228 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1229 0 : let relsize = buf.get_u32_le();
1230 :
1231 0 : total_size += relsize as u64;
1232 : }
1233 : }
1234 7 : Ok(total_size * BLCKSZ as u64)
1235 7 : }
1236 :
1237 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1238 : /// for gc-compaction.
1239 : ///
1240 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1241 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1242 : /// be kept only for a specific range of LSN.
1243 : ///
1244 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1245 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1246 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1247 : /// determine which keys to retain/drop for gc-compaction.
1248 : ///
1249 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1250 : /// to be retained for each of the branch LSN.
1251 : ///
1252 : /// The return value is (dense keyspace, sparse keyspace).
1253 27 : pub(crate) async fn collect_gc_compaction_keyspace(
1254 27 : &self,
1255 27 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1256 27 : let metadata_key_begin = Key::metadata_key_range().start;
1257 27 : let aux_v1_key = AUX_FILES_KEY;
1258 27 : let dense_keyspace = KeySpace {
1259 27 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1260 27 : };
1261 27 : Ok((
1262 27 : dense_keyspace,
1263 27 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1264 27 : ))
1265 27 : }
1266 :
1267 : ///
1268 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1269 : /// Anything that's not listed maybe removed from the underlying storage (from
1270 : /// that LSN forwards).
1271 : ///
1272 : /// The return value is (dense keyspace, sparse keyspace).
1273 168 : pub(crate) async fn collect_keyspace(
1274 168 : &self,
1275 168 : lsn: Lsn,
1276 168 : ctx: &RequestContext,
1277 168 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1278 : // Iterate through key ranges, greedily packing them into partitions
1279 168 : let mut result = KeySpaceAccum::new();
1280 :
1281 : // The dbdir metadata always exists
1282 168 : result.add_key(DBDIR_KEY);
1283 :
1284 : // Fetch list of database dirs and iterate them
1285 168 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1286 168 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1287 :
1288 168 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1289 168 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1290 0 : if has_relmap_file {
1291 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1292 0 : }
1293 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1294 :
1295 0 : let mut rels: Vec<RelTag> = self
1296 0 : .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
1297 0 : .await?
1298 0 : .into_iter()
1299 0 : .collect();
1300 0 : rels.sort_unstable();
1301 0 : for rel in rels {
1302 0 : let relsize_key = rel_size_to_key(rel);
1303 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1304 0 : let relsize = buf.get_u32_le();
1305 :
1306 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1307 0 : result.add_key(relsize_key);
1308 : }
1309 : }
1310 :
1311 : // Iterate SLRUs next
1312 168 : if self.tenant_shard_id.is_shard_zero() {
1313 495 : for kind in [
1314 165 : SlruKind::Clog,
1315 165 : SlruKind::MultiXactMembers,
1316 165 : SlruKind::MultiXactOffsets,
1317 : ] {
1318 495 : let slrudir_key = slru_dir_to_key(kind);
1319 495 : result.add_key(slrudir_key);
1320 495 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1321 495 : let dir = SlruSegmentDirectory::des(&buf)?;
1322 495 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1323 495 : segments.sort_unstable();
1324 495 : for segno in segments {
1325 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1326 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1327 0 : let segsize = buf.get_u32_le();
1328 :
1329 0 : result.add_range(
1330 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1331 : );
1332 0 : result.add_key(segsize_key);
1333 : }
1334 : }
1335 3 : }
1336 :
1337 : // Then pg_twophase
1338 168 : result.add_key(TWOPHASEDIR_KEY);
1339 :
1340 168 : let mut xids: Vec<u64> = self
1341 168 : .list_twophase_files(lsn, ctx)
1342 168 : .await?
1343 168 : .iter()
1344 168 : .cloned()
1345 168 : .collect();
1346 168 : xids.sort_unstable();
1347 168 : for xid in xids {
1348 0 : result.add_key(twophase_file_key(xid));
1349 0 : }
1350 :
1351 168 : result.add_key(CONTROLFILE_KEY);
1352 168 : result.add_key(CHECKPOINT_KEY);
1353 :
1354 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1355 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1356 : // and the keys will not be garbage-colllected.
1357 : #[cfg(test)]
1358 : {
1359 168 : let guard = self.extra_test_dense_keyspace.load();
1360 168 : for kr in &guard.ranges {
1361 0 : result.add_range(kr.clone());
1362 0 : }
1363 : }
1364 :
1365 168 : let dense_keyspace = result.to_keyspace();
1366 168 : let sparse_keyspace = SparseKeySpace(KeySpace {
1367 168 : ranges: vec![
1368 168 : Key::metadata_aux_key_range(),
1369 168 : repl_origin_key_range(),
1370 168 : Key::rel_dir_sparse_key_range(),
1371 168 : ],
1372 168 : });
1373 :
1374 168 : if cfg!(debug_assertions) {
1375 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1376 :
1377 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1378 : // category of sparse keys are split into their own image/delta files. If there
1379 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1380 : // and we want the developer to keep the keyspaces separated.
1381 :
1382 168 : let ranges = &sparse_keyspace.0.ranges;
1383 :
1384 : // TODO: use a single overlaps_with across the codebase
1385 504 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1386 504 : !(a.end <= b.start || b.end <= a.start)
1387 504 : }
1388 504 : for i in 0..ranges.len() {
1389 504 : for j in 0..i {
1390 504 : if overlaps_with(&ranges[i], &ranges[j]) {
1391 0 : panic!(
1392 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1393 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1394 : );
1395 504 : }
1396 : }
1397 : }
1398 336 : for i in 1..ranges.len() {
1399 336 : assert!(
1400 336 : ranges[i - 1].end <= ranges[i].start,
1401 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1402 0 : ranges[i - 1].start,
1403 0 : ranges[i - 1].end,
1404 0 : ranges[i].start,
1405 0 : ranges[i].end
1406 : );
1407 : }
1408 0 : }
1409 :
1410 168 : Ok((dense_keyspace, sparse_keyspace))
1411 168 : }
1412 :
1413 : /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
1414 : /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
1415 : /// at the particular LSN (snapshot).
1416 224270 : pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
1417 224270 : let lsn = version.get_lsn();
1418 : {
1419 224270 : let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
1420 224270 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
1421 224259 : if lsn >= *cached_lsn {
1422 221686 : RELSIZE_LATEST_CACHE_HITS.inc();
1423 221686 : return Some(*nblocks);
1424 2573 : }
1425 2573 : RELSIZE_CACHE_MISSES_OLD.inc();
1426 11 : }
1427 : }
1428 : {
1429 2584 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1430 2584 : if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
1431 2563 : RELSIZE_SNAPSHOT_CACHE_HITS.inc();
1432 2563 : return Some(*nblock);
1433 21 : }
1434 : }
1435 21 : if version.is_latest() {
1436 10 : RELSIZE_LATEST_CACHE_MISSES.inc();
1437 11 : } else {
1438 11 : RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
1439 11 : }
1440 21 : None
1441 224270 : }
1442 :
1443 : /// Update cached relation size if there is no more recent update
1444 5 : pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
1445 5 : let lsn = version.get_lsn();
1446 5 : if version.is_latest() {
1447 0 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1448 0 : match rel_size_cache.entry(tag) {
1449 0 : hash_map::Entry::Occupied(mut entry) => {
1450 0 : let cached_lsn = entry.get_mut();
1451 0 : if lsn >= cached_lsn.0 {
1452 0 : *cached_lsn = (lsn, nblocks);
1453 0 : }
1454 : }
1455 0 : hash_map::Entry::Vacant(entry) => {
1456 0 : entry.insert((lsn, nblocks));
1457 0 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1458 0 : }
1459 : }
1460 : } else {
1461 5 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1462 5 : if rel_size_cache.capacity() != 0 {
1463 5 : rel_size_cache.insert((lsn, tag), nblocks);
1464 5 : RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
1465 5 : }
1466 : }
1467 5 : }
1468 :
1469 : /// Store cached relation size
1470 141360 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1471 141360 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1472 141360 : if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
1473 960 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1474 140400 : }
1475 141360 : }
1476 :
1477 : /// Remove cached relation size
1478 1 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1479 1 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1480 1 : if rel_size_cache.remove(tag).is_some() {
1481 1 : RELSIZE_LATEST_CACHE_ENTRIES.dec();
1482 1 : }
1483 1 : }
1484 : }
1485 :
1486 : /// DatadirModification represents an operation to ingest an atomic set of
1487 : /// updates to the repository.
1488 : ///
1489 : /// It is created by the 'begin_record' function. It is called for each WAL
1490 : /// record, so that all the modifications by a one WAL record appear atomic.
1491 : pub struct DatadirModification<'a> {
1492 : /// The timeline this modification applies to. You can access this to
1493 : /// read the state, but note that any pending updates are *not* reflected
1494 : /// in the state in 'tline' yet.
1495 : pub tline: &'a Timeline,
1496 :
1497 : /// Current LSN of the modification
1498 : lsn: Lsn,
1499 :
1500 : // The modifications are not applied directly to the underlying key-value store.
1501 : // The put-functions add the modifications here, and they are flushed to the
1502 : // underlying key-value store by the 'finish' function.
1503 : pending_lsns: Vec<Lsn>,
1504 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1505 : pending_nblocks: i64,
1506 :
1507 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1508 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1509 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1510 :
1511 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1512 : /// which keys are stored here.
1513 : pending_data_batch: Option<SerializedValueBatch>,
1514 :
1515 : /// For special "directory" keys that store key-value maps, track the size of the map
1516 : /// if it was updated in this modification.
1517 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1518 :
1519 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1520 : pending_metadata_bytes: usize,
1521 : }
1522 :
1523 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1524 : pub enum MetricsUpdate {
1525 : /// Set the metrics to this value
1526 : Set(u64),
1527 : /// Increment the metrics by this value
1528 : Add(u64),
1529 : /// Decrement the metrics by this value
1530 : Sub(u64),
1531 : }
1532 :
1533 : impl DatadirModification<'_> {
1534 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1535 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1536 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1537 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1538 :
1539 : /// Get the current lsn
1540 1 : pub(crate) fn get_lsn(&self) -> Lsn {
1541 1 : self.lsn
1542 1 : }
1543 :
1544 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1545 0 : self.pending_data_batch
1546 0 : .as_ref()
1547 0 : .map_or(0, |b| b.buffer_size())
1548 0 : + self.pending_metadata_bytes
1549 0 : }
1550 :
1551 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1552 0 : self.pending_data_batch
1553 0 : .as_ref()
1554 0 : .is_some_and(|b| b.has_data())
1555 0 : }
1556 :
1557 : /// Returns statistics about the currently pending modifications.
1558 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1559 0 : let mut stats = DatadirModificationStats::default();
1560 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1561 0 : match value {
1562 0 : Value::Image(_) => stats.metadata_images += 1,
1563 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1564 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1565 : }
1566 : }
1567 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1568 0 : match valuemeta {
1569 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1570 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1571 0 : ValueMeta::Observed(_) => {}
1572 : }
1573 : }
1574 0 : stats
1575 0 : }
1576 :
1577 : /// Set the current lsn
1578 72929 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
1579 72929 : ensure_walingest!(
1580 : lsn >= self.lsn,
1581 : "setting an older lsn {} than {} is not allowed",
1582 : lsn,
1583 : self.lsn
1584 : );
1585 :
1586 72929 : if lsn > self.lsn {
1587 72929 : self.pending_lsns.push(self.lsn);
1588 72929 : self.lsn = lsn;
1589 72929 : }
1590 72929 : Ok(())
1591 72929 : }
1592 :
1593 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1594 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1595 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1596 : ///
1597 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1598 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1599 : /// via [`Self::get`] as soon as their record has been ingested.
1600 425364 : fn is_data_key(key: &Key) -> bool {
1601 425364 : key.is_rel_block_key() || key.is_slru_block_key()
1602 425364 : }
1603 :
1604 : /// Initialize a completely new repository.
1605 : ///
1606 : /// This inserts the directory metadata entries that are assumed to
1607 : /// always exist.
1608 111 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1609 111 : let buf = DbDirectory::ser(&DbDirectory {
1610 111 : dbdirs: HashMap::new(),
1611 111 : })?;
1612 111 : self.pending_directory_entries
1613 111 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1614 111 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1615 :
1616 111 : let buf = if self.tline.pg_version >= PgMajorVersion::PG17 {
1617 98 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1618 98 : xids: HashSet::new(),
1619 98 : })
1620 : } else {
1621 13 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1622 13 : xids: HashSet::new(),
1623 13 : })
1624 0 : }?;
1625 111 : self.pending_directory_entries
1626 111 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1627 111 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1628 :
1629 111 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1630 111 : let empty_dir = Value::Image(buf);
1631 :
1632 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1633 : // harmless but they'd just be dropped on later compaction.
1634 111 : if self.tline.tenant_shard_id.is_shard_zero() {
1635 108 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1636 108 : self.pending_directory_entries.push((
1637 108 : DirectoryKind::SlruSegment(SlruKind::Clog),
1638 108 : MetricsUpdate::Set(0),
1639 108 : ));
1640 108 : self.put(
1641 108 : slru_dir_to_key(SlruKind::MultiXactMembers),
1642 108 : empty_dir.clone(),
1643 108 : );
1644 108 : self.pending_directory_entries.push((
1645 108 : DirectoryKind::SlruSegment(SlruKind::Clog),
1646 108 : MetricsUpdate::Set(0),
1647 108 : ));
1648 108 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1649 108 : self.pending_directory_entries.push((
1650 108 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1651 108 : MetricsUpdate::Set(0),
1652 108 : ));
1653 108 : }
1654 :
1655 111 : Ok(())
1656 111 : }
1657 :
1658 : #[cfg(test)]
1659 110 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1660 110 : self.init_empty()?;
1661 110 : self.put_control_file(bytes::Bytes::from_static(
1662 110 : b"control_file contents do not matter",
1663 : ))
1664 110 : .context("put_control_file")?;
1665 110 : self.put_checkpoint(bytes::Bytes::from_static(
1666 110 : b"checkpoint_file contents do not matter",
1667 : ))
1668 110 : .context("put_checkpoint_file")?;
1669 110 : Ok(())
1670 110 : }
1671 :
1672 : /// Creates a relation if it is not already present.
1673 : /// Returns the current size of the relation
1674 209028 : pub(crate) async fn create_relation_if_required(
1675 209028 : &mut self,
1676 209028 : rel: RelTag,
1677 209028 : ctx: &RequestContext,
1678 209028 : ) -> Result<u32, WalIngestError> {
1679 : // Get current size and put rel creation if rel doesn't exist
1680 : //
1681 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1682 : // check the cache too. This is because eagerly checking the cache results in
1683 : // less work overall and 10% better performance. It's more work on cache miss
1684 : // but cache miss is rare.
1685 209028 : if let Some(nblocks) = self
1686 209028 : .tline
1687 209028 : .get_cached_rel_size(&rel, Version::Modified(self))
1688 : {
1689 209023 : Ok(nblocks)
1690 5 : } else if !self
1691 5 : .tline
1692 5 : .get_rel_exists(rel, Version::Modified(self), ctx)
1693 5 : .await?
1694 : {
1695 : // create it with 0 size initially, the logic below will extend it
1696 5 : self.put_rel_creation(rel, 0, ctx).await?;
1697 5 : Ok(0)
1698 : } else {
1699 0 : Ok(self
1700 0 : .tline
1701 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1702 0 : .await?)
1703 : }
1704 209028 : }
1705 :
1706 : /// Given a block number for a relation (which represents a newly written block),
1707 : /// the previous block count of the relation, and the shard info, find the gaps
1708 : /// that were created by the newly written block if any.
1709 72835 : fn find_gaps(
1710 72835 : rel: RelTag,
1711 72835 : blkno: u32,
1712 72835 : previous_nblocks: u32,
1713 72835 : shard: &ShardIdentity,
1714 72835 : ) -> Option<KeySpace> {
1715 72835 : let mut key = rel_block_to_key(rel, blkno);
1716 72835 : let mut gap_accum = None;
1717 :
1718 72835 : for gap_blkno in previous_nblocks..blkno {
1719 16 : key.field6 = gap_blkno;
1720 :
1721 16 : if shard.get_shard_number(&key) != shard.number {
1722 4 : continue;
1723 12 : }
1724 :
1725 12 : gap_accum
1726 12 : .get_or_insert_with(KeySpaceAccum::new)
1727 12 : .add_key(key);
1728 : }
1729 :
1730 72835 : gap_accum.map(|accum| accum.to_keyspace())
1731 72835 : }
1732 :
1733 72926 : pub async fn ingest_batch(
1734 72926 : &mut self,
1735 72926 : mut batch: SerializedValueBatch,
1736 72926 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1737 72926 : shard: &ShardIdentity,
1738 72926 : ctx: &RequestContext,
1739 72926 : ) -> Result<(), WalIngestError> {
1740 72926 : let mut gaps_at_lsns = Vec::default();
1741 :
1742 72926 : for meta in batch.metadata.iter() {
1743 72821 : let key = Key::from_compact(meta.key());
1744 72821 : let (rel, blkno) = key
1745 72821 : .to_rel_block()
1746 72821 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
1747 72821 : let new_nblocks = blkno + 1;
1748 :
1749 72821 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1750 72821 : if new_nblocks > old_nblocks {
1751 1195 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1752 71626 : }
1753 :
1754 72821 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1755 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1756 72821 : }
1757 : }
1758 :
1759 72926 : if !gaps_at_lsns.is_empty() {
1760 0 : batch.zero_gaps(gaps_at_lsns);
1761 72926 : }
1762 :
1763 72926 : match self.pending_data_batch.as_mut() {
1764 10 : Some(pending_batch) => {
1765 10 : pending_batch.extend(batch);
1766 10 : }
1767 72916 : None if batch.has_data() => {
1768 72815 : self.pending_data_batch = Some(batch);
1769 72815 : }
1770 101 : None => {
1771 101 : // Nothing to initialize the batch with
1772 101 : }
1773 : }
1774 :
1775 72926 : Ok(())
1776 72926 : }
1777 :
1778 : /// Put a new page version that can be constructed from a WAL record
1779 : ///
1780 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1781 : /// current end-of-file. It's up to the caller to check that the relation size
1782 : /// matches the blocks inserted!
1783 6 : pub fn put_rel_wal_record(
1784 6 : &mut self,
1785 6 : rel: RelTag,
1786 6 : blknum: BlockNumber,
1787 6 : rec: NeonWalRecord,
1788 6 : ) -> Result<(), WalIngestError> {
1789 6 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1790 6 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1791 6 : Ok(())
1792 6 : }
1793 :
1794 : // Same, but for an SLRU.
1795 4 : pub fn put_slru_wal_record(
1796 4 : &mut self,
1797 4 : kind: SlruKind,
1798 4 : segno: u32,
1799 4 : blknum: BlockNumber,
1800 4 : rec: NeonWalRecord,
1801 4 : ) -> Result<(), WalIngestError> {
1802 4 : if !self.tline.tenant_shard_id.is_shard_zero() {
1803 0 : return Ok(());
1804 4 : }
1805 :
1806 4 : self.put(
1807 4 : slru_block_to_key(kind, segno, blknum),
1808 4 : Value::WalRecord(rec),
1809 : );
1810 4 : Ok(())
1811 4 : }
1812 :
1813 : /// Like put_wal_record, but with ready-made image of the page.
1814 138921 : pub fn put_rel_page_image(
1815 138921 : &mut self,
1816 138921 : rel: RelTag,
1817 138921 : blknum: BlockNumber,
1818 138921 : img: Bytes,
1819 138921 : ) -> Result<(), WalIngestError> {
1820 138921 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1821 138921 : let key = rel_block_to_key(rel, blknum);
1822 138921 : if !key.is_valid_key_on_write_path() {
1823 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1824 138921 : }
1825 138921 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1826 138921 : Ok(())
1827 138921 : }
1828 :
1829 3 : pub fn put_slru_page_image(
1830 3 : &mut self,
1831 3 : kind: SlruKind,
1832 3 : segno: u32,
1833 3 : blknum: BlockNumber,
1834 3 : img: Bytes,
1835 3 : ) -> Result<(), WalIngestError> {
1836 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1837 :
1838 3 : let key = slru_block_to_key(kind, segno, blknum);
1839 3 : if !key.is_valid_key_on_write_path() {
1840 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1841 3 : }
1842 3 : self.put(key, Value::Image(img));
1843 3 : Ok(())
1844 3 : }
1845 :
1846 1499 : pub(crate) fn put_rel_page_image_zero(
1847 1499 : &mut self,
1848 1499 : rel: RelTag,
1849 1499 : blknum: BlockNumber,
1850 1499 : ) -> Result<(), WalIngestError> {
1851 1499 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1852 1499 : let key = rel_block_to_key(rel, blknum);
1853 1499 : if !key.is_valid_key_on_write_path() {
1854 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1855 1499 : }
1856 :
1857 1499 : let batch = self
1858 1499 : .pending_data_batch
1859 1499 : .get_or_insert_with(SerializedValueBatch::default);
1860 :
1861 1499 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1862 :
1863 1499 : Ok(())
1864 1499 : }
1865 :
1866 0 : pub(crate) fn put_slru_page_image_zero(
1867 0 : &mut self,
1868 0 : kind: SlruKind,
1869 0 : segno: u32,
1870 0 : blknum: BlockNumber,
1871 0 : ) -> Result<(), WalIngestError> {
1872 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1873 0 : let key = slru_block_to_key(kind, segno, blknum);
1874 0 : if !key.is_valid_key_on_write_path() {
1875 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1876 0 : }
1877 :
1878 0 : let batch = self
1879 0 : .pending_data_batch
1880 0 : .get_or_insert_with(SerializedValueBatch::default);
1881 :
1882 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1883 :
1884 0 : Ok(())
1885 0 : }
1886 :
1887 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
1888 : /// we enable it, we also need to persist it in `index_part.json`.
1889 973 : pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
1890 973 : let status = self.tline.get_rel_size_v2_status();
1891 973 : let config = self.tline.get_rel_size_v2_enabled();
1892 973 : match (config, status) {
1893 : (false, RelSizeMigration::Legacy) => {
1894 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
1895 973 : Ok(false)
1896 : }
1897 : (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1898 : // index_part already persisted that the timeline has enabled rel_size_v2
1899 0 : Ok(true)
1900 : }
1901 : (true, RelSizeMigration::Legacy) => {
1902 : // The first time we enable it, we need to persist it in `index_part.json`
1903 0 : self.tline
1904 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
1905 0 : tracing::info!("enabled rel_size_v2");
1906 0 : Ok(true)
1907 : }
1908 : (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1909 : // index_part already persisted that the timeline has enabled rel_size_v2
1910 : // and we don't need to do anything
1911 0 : Ok(true)
1912 : }
1913 : }
1914 973 : }
1915 :
1916 : /// Store a relmapper file (pg_filenode.map) in the repository
1917 8 : pub async fn put_relmap_file(
1918 8 : &mut self,
1919 8 : spcnode: Oid,
1920 8 : dbnode: Oid,
1921 8 : img: Bytes,
1922 8 : ctx: &RequestContext,
1923 8 : ) -> Result<(), WalIngestError> {
1924 8 : let v2_enabled = self
1925 8 : .maybe_enable_rel_size_v2()
1926 8 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
1927 :
1928 : // Add it to the directory (if it doesn't exist already)
1929 8 : let buf = self.get(DBDIR_KEY, ctx).await?;
1930 8 : let mut dbdir = DbDirectory::des(&buf)?;
1931 :
1932 8 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1933 8 : if r.is_none() || r == Some(false) {
1934 : // The dbdir entry didn't exist, or it contained a
1935 : // 'false'. The 'insert' call already updated it with
1936 : // 'true', now write the updated 'dbdirs' map back.
1937 8 : let buf = DbDirectory::ser(&dbdir)?;
1938 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1939 0 : }
1940 8 : if r.is_none() {
1941 : // Create RelDirectory
1942 : // TODO: if we have fully migrated to v2, no need to create this directory
1943 4 : let buf = RelDirectory::ser(&RelDirectory {
1944 4 : rels: HashSet::new(),
1945 4 : })?;
1946 4 : self.pending_directory_entries
1947 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1948 4 : if v2_enabled {
1949 0 : self.pending_directory_entries
1950 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1951 4 : }
1952 4 : self.put(
1953 4 : rel_dir_to_key(spcnode, dbnode),
1954 4 : Value::Image(Bytes::from(buf)),
1955 : );
1956 4 : }
1957 :
1958 8 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1959 8 : Ok(())
1960 8 : }
1961 :
1962 0 : pub async fn put_twophase_file(
1963 0 : &mut self,
1964 0 : xid: u64,
1965 0 : img: Bytes,
1966 0 : ctx: &RequestContext,
1967 0 : ) -> Result<(), WalIngestError> {
1968 : // Add it to the directory entry
1969 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1970 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
1971 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1972 0 : if !dir.xids.insert(xid) {
1973 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
1974 0 : }
1975 0 : self.pending_directory_entries.push((
1976 0 : DirectoryKind::TwoPhase,
1977 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1978 0 : ));
1979 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1980 : } else {
1981 0 : let xid = xid as u32;
1982 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1983 0 : if !dir.xids.insert(xid) {
1984 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
1985 0 : }
1986 0 : self.pending_directory_entries.push((
1987 0 : DirectoryKind::TwoPhase,
1988 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1989 0 : ));
1990 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1991 : };
1992 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1993 :
1994 0 : self.put(twophase_file_key(xid), Value::Image(img));
1995 0 : Ok(())
1996 0 : }
1997 :
1998 1 : pub async fn set_replorigin(
1999 1 : &mut self,
2000 1 : origin_id: RepOriginId,
2001 1 : origin_lsn: Lsn,
2002 1 : ) -> Result<(), WalIngestError> {
2003 1 : let key = repl_origin_key(origin_id);
2004 1 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
2005 1 : Ok(())
2006 1 : }
2007 :
2008 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
2009 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
2010 0 : }
2011 :
2012 111 : pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2013 111 : self.put(CONTROLFILE_KEY, Value::Image(img));
2014 111 : Ok(())
2015 111 : }
2016 :
2017 118 : pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2018 118 : self.put(CHECKPOINT_KEY, Value::Image(img));
2019 118 : Ok(())
2020 118 : }
2021 :
2022 0 : pub async fn drop_dbdir(
2023 0 : &mut self,
2024 0 : spcnode: Oid,
2025 0 : dbnode: Oid,
2026 0 : ctx: &RequestContext,
2027 0 : ) -> Result<(), WalIngestError> {
2028 0 : let total_blocks = self
2029 0 : .tline
2030 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
2031 0 : .await?;
2032 :
2033 : // Remove entry from dbdir
2034 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
2035 0 : let mut dir = DbDirectory::des(&buf)?;
2036 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
2037 0 : let buf = DbDirectory::ser(&dir)?;
2038 0 : self.pending_directory_entries.push((
2039 0 : DirectoryKind::Db,
2040 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
2041 0 : ));
2042 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2043 : } else {
2044 0 : warn!(
2045 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
2046 : spcnode, dbnode
2047 : );
2048 : }
2049 :
2050 : // Update logical database size.
2051 0 : self.pending_nblocks -= total_blocks as i64;
2052 :
2053 : // Delete all relations and metadata files for the spcnode/dnode
2054 0 : self.delete(dbdir_key_range(spcnode, dbnode));
2055 0 : Ok(())
2056 0 : }
2057 :
2058 : /// Create a relation fork.
2059 : ///
2060 : /// 'nblocks' is the initial size.
2061 960 : pub async fn put_rel_creation(
2062 960 : &mut self,
2063 960 : rel: RelTag,
2064 960 : nblocks: BlockNumber,
2065 960 : ctx: &RequestContext,
2066 960 : ) -> Result<(), WalIngestError> {
2067 960 : if rel.relnode == 0 {
2068 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
2069 0 : "invalid relnode"
2070 0 : )))?;
2071 960 : }
2072 : // It's possible that this is the first rel for this db in this
2073 : // tablespace. Create the reldir entry for it if so.
2074 960 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
2075 :
2076 960 : let dbdir_exists =
2077 960 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
2078 : // Didn't exist. Update dbdir
2079 4 : e.insert(false);
2080 4 : let buf = DbDirectory::ser(&dbdir)?;
2081 4 : self.pending_directory_entries.push((
2082 4 : DirectoryKind::Db,
2083 4 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
2084 4 : ));
2085 4 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2086 4 : false
2087 : } else {
2088 956 : true
2089 : };
2090 :
2091 960 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
2092 960 : let mut rel_dir = if !dbdir_exists {
2093 : // Create the RelDirectory
2094 4 : RelDirectory::default()
2095 : } else {
2096 : // reldir already exists, fetch it
2097 956 : RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
2098 : };
2099 :
2100 960 : let v2_enabled = self
2101 960 : .maybe_enable_rel_size_v2()
2102 960 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2103 :
2104 960 : if v2_enabled {
2105 0 : if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
2106 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2107 0 : }
2108 0 : let sparse_rel_dir_key =
2109 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2110 : // check if the rel_dir_key exists in v2
2111 0 : let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
2112 0 : let val = RelDirExists::decode_option(val)
2113 0 : .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
2114 0 : if val == RelDirExists::Exists {
2115 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2116 0 : }
2117 0 : self.put(
2118 0 : sparse_rel_dir_key,
2119 0 : Value::Image(RelDirExists::Exists.encode()),
2120 : );
2121 0 : if !dbdir_exists {
2122 0 : self.pending_directory_entries
2123 0 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2124 0 : self.pending_directory_entries
2125 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2126 : // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
2127 : // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
2128 : // will be key not found errors if we don't create an empty one for rel_size_v2.
2129 0 : self.put(
2130 0 : rel_dir_key,
2131 0 : Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
2132 : );
2133 0 : }
2134 0 : self.pending_directory_entries
2135 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2136 : } else {
2137 : // Add the new relation to the rel directory entry, and write it back
2138 960 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2139 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2140 960 : }
2141 960 : if !dbdir_exists {
2142 4 : self.pending_directory_entries
2143 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2144 956 : }
2145 960 : self.pending_directory_entries
2146 960 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2147 960 : self.put(
2148 960 : rel_dir_key,
2149 960 : Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
2150 : );
2151 : }
2152 :
2153 : // Put size
2154 960 : let size_key = rel_size_to_key(rel);
2155 960 : let buf = nblocks.to_le_bytes();
2156 960 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2157 :
2158 960 : self.pending_nblocks += nblocks as i64;
2159 :
2160 : // Update relation size cache
2161 960 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2162 :
2163 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2164 : // caller.
2165 960 : Ok(())
2166 960 : }
2167 :
2168 : /// Truncate relation
2169 3006 : pub async fn put_rel_truncation(
2170 3006 : &mut self,
2171 3006 : rel: RelTag,
2172 3006 : nblocks: BlockNumber,
2173 3006 : ctx: &RequestContext,
2174 3006 : ) -> Result<(), WalIngestError> {
2175 3006 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2176 3006 : if self
2177 3006 : .tline
2178 3006 : .get_rel_exists(rel, Version::Modified(self), ctx)
2179 3006 : .await?
2180 : {
2181 3006 : let size_key = rel_size_to_key(rel);
2182 : // Fetch the old size first
2183 3006 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2184 :
2185 : // Update the entry with the new size.
2186 3006 : let buf = nblocks.to_le_bytes();
2187 3006 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2188 :
2189 : // Update relation size cache
2190 3006 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2191 :
2192 : // Update logical database size.
2193 3006 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2194 0 : }
2195 3006 : Ok(())
2196 3006 : }
2197 :
2198 : /// Extend relation
2199 : /// If new size is smaller, do nothing.
2200 138340 : pub async fn put_rel_extend(
2201 138340 : &mut self,
2202 138340 : rel: RelTag,
2203 138340 : nblocks: BlockNumber,
2204 138340 : ctx: &RequestContext,
2205 138340 : ) -> Result<(), WalIngestError> {
2206 138340 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2207 :
2208 : // Put size
2209 138340 : let size_key = rel_size_to_key(rel);
2210 138340 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2211 :
2212 : // only extend relation here. never decrease the size
2213 138340 : if nblocks > old_size {
2214 137394 : let buf = nblocks.to_le_bytes();
2215 137394 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2216 137394 :
2217 137394 : // Update relation size cache
2218 137394 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2219 137394 :
2220 137394 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2221 137394 : }
2222 138340 : Ok(())
2223 138340 : }
2224 :
2225 : /// Drop some relations
2226 5 : pub(crate) async fn put_rel_drops(
2227 5 : &mut self,
2228 5 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2229 5 : ctx: &RequestContext,
2230 5 : ) -> Result<(), WalIngestError> {
2231 5 : let v2_enabled = self
2232 5 : .maybe_enable_rel_size_v2()
2233 5 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2234 6 : for ((spc_node, db_node), rel_tags) in drop_relations {
2235 1 : let dir_key = rel_dir_to_key(spc_node, db_node);
2236 1 : let buf = self.get(dir_key, ctx).await?;
2237 1 : let mut dir = RelDirectory::des(&buf)?;
2238 :
2239 1 : let mut dirty = false;
2240 2 : for rel_tag in rel_tags {
2241 1 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2242 1 : self.pending_directory_entries
2243 1 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2244 1 : dirty = true;
2245 1 : true
2246 0 : } else if v2_enabled {
2247 : // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
2248 : // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
2249 : // logic).
2250 0 : let key =
2251 0 : rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2252 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2253 0 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2254 0 : if val == RelDirExists::Exists {
2255 0 : self.pending_directory_entries
2256 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2257 : // put tombstone
2258 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2259 : // no need to set dirty to true
2260 0 : true
2261 : } else {
2262 0 : false
2263 : }
2264 : } else {
2265 0 : false
2266 : };
2267 :
2268 1 : if found {
2269 : // update logical size
2270 1 : let size_key = rel_size_to_key(rel_tag);
2271 1 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2272 1 : self.pending_nblocks -= old_size as i64;
2273 :
2274 : // Remove entry from relation size cache
2275 1 : self.tline.remove_cached_rel_size(&rel_tag);
2276 :
2277 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2278 1 : self.delete(rel_key_range(rel_tag));
2279 0 : }
2280 : }
2281 :
2282 1 : if dirty {
2283 1 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2284 0 : }
2285 : }
2286 :
2287 5 : Ok(())
2288 5 : }
2289 :
2290 3 : pub async fn put_slru_segment_creation(
2291 3 : &mut self,
2292 3 : kind: SlruKind,
2293 3 : segno: u32,
2294 3 : nblocks: BlockNumber,
2295 3 : ctx: &RequestContext,
2296 3 : ) -> Result<(), WalIngestError> {
2297 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2298 :
2299 : // Add it to the directory entry
2300 3 : let dir_key = slru_dir_to_key(kind);
2301 3 : let buf = self.get(dir_key, ctx).await?;
2302 3 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2303 :
2304 3 : if !dir.segments.insert(segno) {
2305 0 : Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
2306 3 : }
2307 3 : self.pending_directory_entries.push((
2308 3 : DirectoryKind::SlruSegment(kind),
2309 3 : MetricsUpdate::Set(dir.segments.len() as u64),
2310 3 : ));
2311 3 : self.put(
2312 3 : dir_key,
2313 3 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2314 : );
2315 :
2316 : // Put size
2317 3 : let size_key = slru_segment_size_to_key(kind, segno);
2318 3 : let buf = nblocks.to_le_bytes();
2319 3 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2320 :
2321 : // even if nblocks > 0, we don't insert any actual blocks here
2322 :
2323 3 : Ok(())
2324 3 : }
2325 :
2326 : /// Extend SLRU segment
2327 0 : pub fn put_slru_extend(
2328 0 : &mut self,
2329 0 : kind: SlruKind,
2330 0 : segno: u32,
2331 0 : nblocks: BlockNumber,
2332 0 : ) -> Result<(), WalIngestError> {
2333 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2334 :
2335 : // Put size
2336 0 : let size_key = slru_segment_size_to_key(kind, segno);
2337 0 : let buf = nblocks.to_le_bytes();
2338 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2339 0 : Ok(())
2340 0 : }
2341 :
2342 : /// This method is used for marking truncated SLRU files
2343 0 : pub async fn drop_slru_segment(
2344 0 : &mut self,
2345 0 : kind: SlruKind,
2346 0 : segno: u32,
2347 0 : ctx: &RequestContext,
2348 0 : ) -> Result<(), WalIngestError> {
2349 : // Remove it from the directory entry
2350 0 : let dir_key = slru_dir_to_key(kind);
2351 0 : let buf = self.get(dir_key, ctx).await?;
2352 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2353 :
2354 0 : if !dir.segments.remove(&segno) {
2355 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2356 0 : }
2357 0 : self.pending_directory_entries.push((
2358 0 : DirectoryKind::SlruSegment(kind),
2359 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2360 0 : ));
2361 0 : self.put(
2362 0 : dir_key,
2363 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2364 : );
2365 :
2366 : // Delete size entry, as well as all blocks
2367 0 : self.delete(slru_segment_key_range(kind, segno));
2368 :
2369 0 : Ok(())
2370 0 : }
2371 :
2372 : /// Drop a relmapper file (pg_filenode.map)
2373 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
2374 : // TODO
2375 0 : Ok(())
2376 0 : }
2377 :
2378 : /// This method is used for marking truncated SLRU files
2379 0 : pub async fn drop_twophase_file(
2380 0 : &mut self,
2381 0 : xid: u64,
2382 0 : ctx: &RequestContext,
2383 0 : ) -> Result<(), WalIngestError> {
2384 : // Remove it from the directory entry
2385 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2386 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2387 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2388 :
2389 0 : if !dir.xids.remove(&xid) {
2390 0 : warn!("twophase file for xid {} does not exist", xid);
2391 0 : }
2392 0 : self.pending_directory_entries.push((
2393 0 : DirectoryKind::TwoPhase,
2394 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2395 0 : ));
2396 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2397 : } else {
2398 0 : let xid: u32 = u32::try_from(xid)
2399 0 : .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
2400 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2401 :
2402 0 : if !dir.xids.remove(&xid) {
2403 0 : warn!("twophase file for xid {} does not exist", xid);
2404 0 : }
2405 0 : self.pending_directory_entries.push((
2406 0 : DirectoryKind::TwoPhase,
2407 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2408 0 : ));
2409 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2410 : };
2411 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2412 :
2413 : // Delete it
2414 0 : self.delete(twophase_key_range(xid));
2415 :
2416 0 : Ok(())
2417 0 : }
2418 :
2419 8 : pub async fn put_file(
2420 8 : &mut self,
2421 8 : path: &str,
2422 8 : content: &[u8],
2423 8 : ctx: &RequestContext,
2424 8 : ) -> Result<(), WalIngestError> {
2425 8 : let key = aux_file::encode_aux_file_key(path);
2426 : // retrieve the key from the engine
2427 8 : let old_val = match self.get(key, ctx).await {
2428 2 : Ok(val) => Some(val),
2429 6 : Err(PageReconstructError::MissingKey(_)) => None,
2430 0 : Err(e) => return Err(e.into()),
2431 : };
2432 8 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2433 2 : aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
2434 : } else {
2435 6 : Vec::new()
2436 : };
2437 8 : let mut other_files = Vec::with_capacity(files.len());
2438 8 : let mut modifying_file = None;
2439 10 : for file @ (p, content) in files {
2440 2 : if path == p {
2441 2 : assert!(
2442 2 : modifying_file.is_none(),
2443 0 : "duplicated entries found for {path}"
2444 : );
2445 2 : modifying_file = Some(content);
2446 0 : } else {
2447 0 : other_files.push(file);
2448 0 : }
2449 : }
2450 8 : let mut new_files = other_files;
2451 8 : match (modifying_file, content.is_empty()) {
2452 1 : (Some(old_content), false) => {
2453 1 : self.tline
2454 1 : .aux_file_size_estimator
2455 1 : .on_update(old_content.len(), content.len());
2456 1 : new_files.push((path, content));
2457 1 : }
2458 1 : (Some(old_content), true) => {
2459 1 : self.tline
2460 1 : .aux_file_size_estimator
2461 1 : .on_remove(old_content.len());
2462 1 : // not adding the file key to the final `new_files` vec.
2463 1 : }
2464 6 : (None, false) => {
2465 6 : self.tline.aux_file_size_estimator.on_add(content.len());
2466 6 : new_files.push((path, content));
2467 6 : }
2468 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2469 : // Compute doesn't know if previous version of this file exists or not, so
2470 : // attempt to delete non-existing file can cause this message.
2471 : // To avoid false alarms, log it as info rather than warning.
2472 0 : (None, true) if path.starts_with("pg_stat/") => {
2473 0 : info!("removing non-existing pg_stat file: {}", path)
2474 : }
2475 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2476 : }
2477 8 : let new_val = aux_file::encode_file_value(&new_files)
2478 8 : .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
2479 8 : self.put(key, Value::Image(new_val.into()));
2480 :
2481 8 : Ok(())
2482 8 : }
2483 :
2484 : ///
2485 : /// Flush changes accumulated so far to the underlying repository.
2486 : ///
2487 : /// Usually, changes made in DatadirModification are atomic, but this allows
2488 : /// you to flush them to the underlying repository before the final `commit`.
2489 : /// That allows to free up the memory used to hold the pending changes.
2490 : ///
2491 : /// Currently only used during bulk import of a data directory. In that
2492 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2493 : /// whole import fails and the timeline will be deleted anyway.
2494 : /// (Or to be precise, it will be left behind for debugging purposes and
2495 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2496 : ///
2497 : /// Note: A consequence of flushing the pending operations is that they
2498 : /// won't be visible to subsequent operations until `commit`. The function
2499 : /// retains all the metadata, but data pages are flushed. That's again OK
2500 : /// for bulk import, where you are just loading data pages and won't try to
2501 : /// modify the same pages twice.
2502 965 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2503 : // Unless we have accumulated a decent amount of changes, it's not worth it
2504 : // to scan through the pending_updates list.
2505 965 : let pending_nblocks = self.pending_nblocks;
2506 965 : if pending_nblocks < 10000 {
2507 965 : return Ok(());
2508 0 : }
2509 :
2510 0 : let mut writer = self.tline.writer().await;
2511 :
2512 : // Flush relation and SLRU data blocks, keep metadata.
2513 0 : if let Some(batch) = self.pending_data_batch.take() {
2514 0 : tracing::debug!(
2515 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2516 : batch.max_lsn,
2517 0 : self.tline.get_last_record_lsn()
2518 : );
2519 :
2520 : // This bails out on first error without modifying pending_updates.
2521 : // That's Ok, cf this function's doc comment.
2522 0 : writer.put_batch(batch, ctx).await?;
2523 0 : }
2524 :
2525 0 : if pending_nblocks != 0 {
2526 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2527 0 : self.pending_nblocks = 0;
2528 0 : }
2529 :
2530 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2531 0 : writer.update_directory_entries_count(kind, count);
2532 0 : }
2533 :
2534 0 : Ok(())
2535 965 : }
2536 :
2537 : ///
2538 : /// Finish this atomic update, writing all the updated keys to the
2539 : /// underlying timeline.
2540 : /// All the modifications in this atomic update are stamped by the specified LSN.
2541 : ///
2542 371556 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2543 371556 : let mut writer = self.tline.writer().await;
2544 :
2545 371556 : let pending_nblocks = self.pending_nblocks;
2546 371556 : self.pending_nblocks = 0;
2547 :
2548 : // Ordering: the items in this batch do not need to be in any global order, but values for
2549 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2550 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2551 : // more details.
2552 :
2553 371556 : let metadata_batch = {
2554 371556 : let pending_meta = self
2555 371556 : .pending_metadata_pages
2556 371556 : .drain()
2557 371556 : .flat_map(|(key, values)| {
2558 137061 : values
2559 137061 : .into_iter()
2560 137061 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2561 137061 : })
2562 371556 : .collect::<Vec<_>>();
2563 :
2564 371556 : if pending_meta.is_empty() {
2565 236139 : None
2566 : } else {
2567 135417 : Some(SerializedValueBatch::from_values(pending_meta))
2568 : }
2569 : };
2570 :
2571 371556 : let data_batch = self.pending_data_batch.take();
2572 :
2573 371556 : let maybe_batch = match (data_batch, metadata_batch) {
2574 132278 : (Some(mut data), Some(metadata)) => {
2575 132278 : data.extend(metadata);
2576 132278 : Some(data)
2577 : }
2578 71631 : (Some(data), None) => Some(data),
2579 3139 : (None, Some(metadata)) => Some(metadata),
2580 164508 : (None, None) => None,
2581 : };
2582 :
2583 371556 : if let Some(batch) = maybe_batch {
2584 207048 : tracing::debug!(
2585 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2586 : batch.max_lsn,
2587 0 : self.tline.get_last_record_lsn()
2588 : );
2589 :
2590 : // This bails out on first error without modifying pending_updates.
2591 : // That's Ok, cf this function's doc comment.
2592 207048 : writer.put_batch(batch, ctx).await?;
2593 164508 : }
2594 :
2595 371556 : if !self.pending_deletions.is_empty() {
2596 1 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2597 1 : self.pending_deletions.clear();
2598 371555 : }
2599 :
2600 371556 : self.pending_lsns.push(self.lsn);
2601 444485 : for pending_lsn in self.pending_lsns.drain(..) {
2602 444485 : // TODO(vlad): pretty sure the comment below is not valid anymore
2603 444485 : // and we can call finish write with the latest LSN
2604 444485 : //
2605 444485 : // Ideally, we should be able to call writer.finish_write() only once
2606 444485 : // with the highest LSN. However, the last_record_lsn variable in the
2607 444485 : // timeline keeps track of the latest LSN and the immediate previous LSN
2608 444485 : // so we need to record every LSN to not leave a gap between them.
2609 444485 : writer.finish_write(pending_lsn);
2610 444485 : }
2611 :
2612 371556 : if pending_nblocks != 0 {
2613 135285 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2614 236271 : }
2615 :
2616 371556 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2617 1522 : writer.update_directory_entries_count(kind, count);
2618 1522 : }
2619 :
2620 371556 : self.pending_metadata_bytes = 0;
2621 :
2622 371556 : Ok(())
2623 371556 : }
2624 :
2625 145852 : pub(crate) fn len(&self) -> usize {
2626 145852 : self.pending_metadata_pages.len()
2627 145852 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2628 145852 : + self.pending_deletions.len()
2629 145852 : }
2630 :
2631 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2632 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2633 : /// in the modification.
2634 : ///
2635 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2636 : /// page must ensure that the pages they read are already committed in Timeline, for example
2637 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2638 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2639 : /// and not data pages.
2640 143293 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2641 143293 : if !Self::is_data_key(&key) {
2642 : // Have we already updated the same key? Read the latest pending updated
2643 : // version in that case.
2644 : //
2645 : // Note: we don't check pending_deletions. It is an error to request a
2646 : // value that has been removed, deletion only avoids leaking storage.
2647 143293 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2648 7964 : if let Some((_, _, value)) = values.last() {
2649 7964 : return if let Value::Image(img) = value {
2650 7964 : Ok(img.clone())
2651 : } else {
2652 : // Currently, we never need to read back a WAL record that we
2653 : // inserted in the same "transaction". All the metadata updates
2654 : // work directly with Images, and we never need to read actual
2655 : // data pages. We could handle this if we had to, by calling
2656 : // the walredo manager, but let's keep it simple for now.
2657 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2658 0 : "unexpected pending WAL record"
2659 0 : )))
2660 : };
2661 0 : }
2662 135329 : }
2663 : } else {
2664 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2665 : // this key should never be present in pending_data_pages. We ensure this by committing
2666 : // modifications before ingesting DB create operations, which are the only kind that reads
2667 : // data pages during ingest.
2668 0 : if cfg!(debug_assertions) {
2669 0 : assert!(
2670 0 : !self
2671 0 : .pending_data_batch
2672 0 : .as_ref()
2673 0 : .is_some_and(|b| b.updates_key(&key))
2674 : );
2675 0 : }
2676 : }
2677 :
2678 : // Metadata page cache miss, or we're reading a data page.
2679 135329 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2680 135329 : self.tline.get(key, lsn, ctx).await
2681 143293 : }
2682 :
2683 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2684 : /// and the empty value into None.
2685 0 : async fn sparse_get(
2686 0 : &self,
2687 0 : key: Key,
2688 0 : ctx: &RequestContext,
2689 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2690 0 : let val = self.get(key, ctx).await;
2691 0 : match val {
2692 0 : Ok(val) if val.is_empty() => Ok(None),
2693 0 : Ok(val) => Ok(Some(val)),
2694 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2695 0 : Err(e) => Err(e),
2696 : }
2697 0 : }
2698 :
2699 : #[cfg(test)]
2700 2 : pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
2701 2 : self.put(key, val);
2702 2 : }
2703 :
2704 282071 : fn put(&mut self, key: Key, val: Value) {
2705 282071 : if Self::is_data_key(&key) {
2706 138934 : self.put_data(key.to_compact(), val)
2707 : } else {
2708 143137 : self.put_metadata(key.to_compact(), val)
2709 : }
2710 282071 : }
2711 :
2712 138934 : fn put_data(&mut self, key: CompactKey, val: Value) {
2713 138934 : let batch = self
2714 138934 : .pending_data_batch
2715 138934 : .get_or_insert_with(SerializedValueBatch::default);
2716 138934 : batch.put(key, val, self.lsn);
2717 138934 : }
2718 :
2719 143137 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2720 143137 : let values = self.pending_metadata_pages.entry(key).or_default();
2721 : // Replace the previous value if it exists at the same lsn
2722 143137 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2723 6076 : if *last_lsn == self.lsn {
2724 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2725 6076 : self.pending_metadata_bytes -= *last_value_ser_size;
2726 6076 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2727 6076 : self.pending_metadata_bytes += *last_value_ser_size;
2728 :
2729 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2730 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2731 6076 : *last_value = val;
2732 6076 : return;
2733 0 : }
2734 137061 : }
2735 :
2736 137061 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2737 137061 : self.pending_metadata_bytes += val_serialized_size;
2738 137061 : values.push((self.lsn, val_serialized_size, val));
2739 :
2740 137061 : if key == CHECKPOINT_KEY.to_compact() {
2741 118 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2742 136943 : }
2743 143137 : }
2744 :
2745 1 : fn delete(&mut self, key_range: Range<Key>) {
2746 1 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2747 1 : self.pending_deletions.push((key_range, self.lsn));
2748 1 : }
2749 : }
2750 :
2751 : /// Statistics for a DatadirModification.
2752 : #[derive(Default)]
2753 : pub struct DatadirModificationStats {
2754 : pub metadata_images: u64,
2755 : pub metadata_deltas: u64,
2756 : pub data_images: u64,
2757 : pub data_deltas: u64,
2758 : }
2759 :
2760 : /// This struct facilitates accessing either a committed key from the timeline at a
2761 : /// specific LSN, or the latest uncommitted key from a pending modification.
2762 : ///
2763 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2764 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2765 : /// need to look up the keys in the modification first before looking them up in the
2766 : /// timeline to not miss the latest updates.
2767 : #[derive(Clone, Copy)]
2768 : pub enum Version<'a> {
2769 : LsnRange(LsnRange),
2770 : Modified(&'a DatadirModification<'a>),
2771 : }
2772 :
2773 : impl Version<'_> {
2774 25 : async fn get(
2775 25 : &self,
2776 25 : timeline: &Timeline,
2777 25 : key: Key,
2778 25 : ctx: &RequestContext,
2779 25 : ) -> Result<Bytes, PageReconstructError> {
2780 25 : match self {
2781 15 : Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
2782 10 : Version::Modified(modification) => modification.get(key, ctx).await,
2783 : }
2784 25 : }
2785 :
2786 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2787 : /// and the empty value into None.
2788 0 : async fn sparse_get(
2789 0 : &self,
2790 0 : timeline: &Timeline,
2791 0 : key: Key,
2792 0 : ctx: &RequestContext,
2793 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2794 0 : let val = self.get(timeline, key, ctx).await;
2795 0 : match val {
2796 0 : Ok(val) if val.is_empty() => Ok(None),
2797 0 : Ok(val) => Ok(Some(val)),
2798 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2799 0 : Err(e) => Err(e),
2800 : }
2801 0 : }
2802 :
2803 26 : pub fn is_latest(&self) -> bool {
2804 26 : match self {
2805 16 : Version::LsnRange(lsns) => lsns.is_latest(),
2806 10 : Version::Modified(_) => true,
2807 : }
2808 26 : }
2809 :
2810 224275 : pub fn get_lsn(&self) -> Lsn {
2811 224275 : match self {
2812 12224 : Version::LsnRange(lsns) => lsns.effective_lsn,
2813 212051 : Version::Modified(modification) => modification.lsn,
2814 : }
2815 224275 : }
2816 :
2817 12219 : pub fn at(lsn: Lsn) -> Self {
2818 12219 : Version::LsnRange(LsnRange {
2819 12219 : effective_lsn: lsn,
2820 12219 : request_lsn: lsn,
2821 12219 : })
2822 12219 : }
2823 : }
2824 :
2825 : //--- Metadata structs stored in key-value pairs in the repository.
2826 :
2827 0 : #[derive(Debug, Serialize, Deserialize)]
2828 : pub(crate) struct DbDirectory {
2829 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2830 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2831 : }
2832 :
2833 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2834 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2835 : // were named like "pg_twophase/000002E5", now they're like
2836 : // "pg_twophsae/0000000A000002E4".
2837 :
2838 0 : #[derive(Debug, Serialize, Deserialize)]
2839 : pub(crate) struct TwoPhaseDirectory {
2840 : pub(crate) xids: HashSet<TransactionId>,
2841 : }
2842 :
2843 0 : #[derive(Debug, Serialize, Deserialize)]
2844 : struct TwoPhaseDirectoryV17 {
2845 : xids: HashSet<u64>,
2846 : }
2847 :
2848 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2849 : pub(crate) struct RelDirectory {
2850 : // Set of relations that exist. (relfilenode, forknum)
2851 : //
2852 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2853 : // key-value pairs, if you have a lot of relations
2854 : pub(crate) rels: HashSet<(Oid, u8)>,
2855 : }
2856 :
2857 0 : #[derive(Debug, Serialize, Deserialize)]
2858 : struct RelSizeEntry {
2859 : nblocks: u32,
2860 : }
2861 :
2862 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2863 : pub(crate) struct SlruSegmentDirectory {
2864 : // Set of SLRU segments that exist.
2865 : pub(crate) segments: HashSet<u32>,
2866 : }
2867 :
2868 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2869 : #[repr(u8)]
2870 : pub(crate) enum DirectoryKind {
2871 : Db,
2872 : TwoPhase,
2873 : Rel,
2874 : AuxFiles,
2875 : SlruSegment(SlruKind),
2876 : RelV2,
2877 : }
2878 :
2879 : impl DirectoryKind {
2880 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2881 4567 : pub(crate) fn offset(&self) -> usize {
2882 4567 : self.into_usize()
2883 4567 : }
2884 : }
2885 :
2886 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2887 :
2888 : #[allow(clippy::bool_assert_comparison)]
2889 : #[cfg(test)]
2890 : mod tests {
2891 : use hex_literal::hex;
2892 : use pageserver_api::models::ShardParameters;
2893 : use pageserver_api::shard::ShardStripeSize;
2894 : use utils::id::TimelineId;
2895 : use utils::shard::{ShardCount, ShardNumber};
2896 :
2897 : use super::*;
2898 : use crate::DEFAULT_PG_VERSION;
2899 : use crate::tenant::harness::TenantHarness;
2900 :
2901 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2902 : #[tokio::test]
2903 1 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2904 1 : let name = "aux_files_round_trip";
2905 1 : let harness = TenantHarness::create(name).await?;
2906 :
2907 : pub const TIMELINE_ID: TimelineId =
2908 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2909 :
2910 1 : let (tenant, ctx) = harness.load().await;
2911 1 : let (tline, ctx) = tenant
2912 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2913 1 : .await?;
2914 1 : let tline = tline.raw_timeline().unwrap();
2915 :
2916 : // First modification: insert two keys
2917 1 : let mut modification = tline.begin_modification(Lsn(0x1000));
2918 1 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2919 1 : modification.set_lsn(Lsn(0x1008))?;
2920 1 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2921 1 : modification.commit(&ctx).await?;
2922 1 : let expect_1008 = HashMap::from([
2923 1 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2924 1 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2925 1 : ]);
2926 :
2927 1 : let io_concurrency = IoConcurrency::spawn_for_test();
2928 :
2929 1 : let readback = tline
2930 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2931 1 : .await?;
2932 1 : assert_eq!(readback, expect_1008);
2933 :
2934 : // Second modification: update one key, remove the other
2935 1 : let mut modification = tline.begin_modification(Lsn(0x2000));
2936 1 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2937 1 : modification.set_lsn(Lsn(0x2008))?;
2938 1 : modification.put_file("foo/bar2", b"", &ctx).await?;
2939 1 : modification.commit(&ctx).await?;
2940 1 : let expect_2008 =
2941 1 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2942 :
2943 1 : let readback = tline
2944 1 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2945 1 : .await?;
2946 1 : assert_eq!(readback, expect_2008);
2947 :
2948 : // Reading back in time works
2949 1 : let readback = tline
2950 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2951 1 : .await?;
2952 1 : assert_eq!(readback, expect_1008);
2953 :
2954 2 : Ok(())
2955 1 : }
2956 :
2957 : #[test]
2958 1 : fn gap_finding() {
2959 1 : let rel = RelTag {
2960 1 : spcnode: 1663,
2961 1 : dbnode: 208101,
2962 1 : relnode: 2620,
2963 1 : forknum: 0,
2964 1 : };
2965 1 : let base_blkno = 1;
2966 :
2967 1 : let base_key = rel_block_to_key(rel, base_blkno);
2968 1 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2969 :
2970 1 : let shard = ShardIdentity::unsharded();
2971 :
2972 1 : let mut previous_nblocks = 0;
2973 11 : for i in 0..10 {
2974 10 : let crnt_blkno = base_blkno + i;
2975 10 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2976 :
2977 10 : previous_nblocks = crnt_blkno + 1;
2978 :
2979 10 : if i == 0 {
2980 : // The first block we write is 1, so we should find the gap.
2981 1 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2982 : } else {
2983 9 : assert!(gaps.is_none());
2984 : }
2985 : }
2986 :
2987 : // This is an update to an already existing block. No gaps here.
2988 1 : let update_blkno = 5;
2989 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2990 1 : assert!(gaps.is_none());
2991 :
2992 : // This is an update past the current end block.
2993 1 : let after_gap_blkno = 20;
2994 1 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2995 :
2996 1 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2997 1 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2998 1 : assert_eq!(
2999 1 : gaps.unwrap(),
3000 1 : KeySpace::single(gap_start_key..after_gap_key)
3001 : );
3002 1 : }
3003 :
3004 : #[test]
3005 1 : fn sharded_gap_finding() {
3006 1 : let rel = RelTag {
3007 1 : spcnode: 1663,
3008 1 : dbnode: 208101,
3009 1 : relnode: 2620,
3010 1 : forknum: 0,
3011 1 : };
3012 :
3013 1 : let first_blkno = 6;
3014 :
3015 : // This shard will get the even blocks
3016 1 : let shard = ShardIdentity::from_params(
3017 1 : ShardNumber(0),
3018 1 : ShardParameters {
3019 1 : count: ShardCount(2),
3020 1 : stripe_size: ShardStripeSize(1),
3021 1 : },
3022 : );
3023 :
3024 : // Only keys belonging to this shard are considered as gaps.
3025 1 : let mut previous_nblocks = 0;
3026 1 : let gaps =
3027 1 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
3028 1 : assert!(!gaps.ranges.is_empty());
3029 3 : for gap_range in gaps.ranges {
3030 2 : let mut k = gap_range.start;
3031 4 : while k != gap_range.end {
3032 2 : assert_eq!(shard.get_shard_number(&k), shard.number);
3033 2 : k = k.next();
3034 : }
3035 : }
3036 :
3037 1 : previous_nblocks = first_blkno;
3038 :
3039 1 : let update_blkno = 2;
3040 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3041 1 : assert!(gaps.is_none());
3042 1 : }
3043 :
3044 : /*
3045 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
3046 : let incremental = timeline.get_current_logical_size();
3047 : let non_incremental = timeline
3048 : .get_current_logical_size_non_incremental(lsn)
3049 : .unwrap();
3050 : assert_eq!(incremental, non_incremental);
3051 : }
3052 : */
3053 :
3054 : /*
3055 : ///
3056 : /// Test list_rels() function, with branches and dropped relations
3057 : ///
3058 : #[test]
3059 : fn test_list_rels_drop() -> Result<()> {
3060 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
3061 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
3062 : const TESTDB: u32 = 111;
3063 :
3064 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
3065 : // after branching fails below
3066 : let mut writer = tline.begin_record(Lsn(0x10));
3067 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
3068 : writer.finish()?;
3069 :
3070 : // Create a relation on the timeline
3071 : let mut writer = tline.begin_record(Lsn(0x20));
3072 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
3073 : writer.finish()?;
3074 :
3075 : let writer = tline.begin_record(Lsn(0x00));
3076 : writer.finish()?;
3077 :
3078 : // Check that list_rels() lists it after LSN 2, but no before it
3079 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
3080 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
3081 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
3082 :
3083 : // Create a branch, check that the relation is visible there
3084 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
3085 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
3086 : Some(timeline) => timeline,
3087 : None => panic!("Should have a local timeline"),
3088 : };
3089 : let newtline = DatadirTimelineImpl::new(newtline);
3090 : assert!(newtline
3091 : .list_rels(0, TESTDB, Lsn(0x30))?
3092 : .contains(&TESTREL_A));
3093 :
3094 : // Drop it on the branch
3095 : let mut new_writer = newtline.begin_record(Lsn(0x40));
3096 : new_writer.drop_relation(TESTREL_A)?;
3097 : new_writer.finish()?;
3098 :
3099 : // Check that it's no longer listed on the branch after the point where it was dropped
3100 : assert!(newtline
3101 : .list_rels(0, TESTDB, Lsn(0x30))?
3102 : .contains(&TESTREL_A));
3103 : assert!(!newtline
3104 : .list_rels(0, TESTDB, Lsn(0x40))?
3105 : .contains(&TESTREL_A));
3106 :
3107 : // Run checkpoint and garbage collection and check that it's still not visible
3108 : newtline.checkpoint(CheckpointConfig::Forced)?;
3109 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
3110 :
3111 : assert!(!newtline
3112 : .list_rels(0, TESTDB, Lsn(0x40))?
3113 : .contains(&TESTREL_A));
3114 :
3115 : Ok(())
3116 : }
3117 : */
3118 :
3119 : /*
3120 : #[test]
3121 : fn test_read_beyond_eof() -> Result<()> {
3122 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3123 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3124 :
3125 : make_some_layers(&tline, Lsn(0x20))?;
3126 : let mut writer = tline.begin_record(Lsn(0x60));
3127 : walingest.put_rel_page_image(
3128 : &mut writer,
3129 : TESTREL_A,
3130 : 0,
3131 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3132 : )?;
3133 : writer.finish()?;
3134 :
3135 : // Test read before rel creation. Should error out.
3136 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3137 :
3138 : // Read block beyond end of relation at different points in time.
3139 : // These reads should fall into different delta, image, and in-memory layers.
3140 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3141 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3142 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3143 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3144 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3145 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3146 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3147 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3148 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3149 :
3150 : // Test on an in-memory layer with no preceding layer
3151 : let mut writer = tline.begin_record(Lsn(0x70));
3152 : walingest.put_rel_page_image(
3153 : &mut writer,
3154 : TESTREL_B,
3155 : 0,
3156 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3157 : )?;
3158 : writer.finish()?;
3159 :
3160 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3161 :
3162 : Ok(())
3163 : }
3164 : */
3165 : }
|