Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 :
12 : use crate::walingest::{WalIngestError, WalIngestErrorKind};
13 : use crate::{PERF_TRACE_TARGET, ensure_walingest};
14 : use anyhow::Context;
15 : use bytes::{Buf, Bytes, BytesMut};
16 : use enum_map::Enum;
17 : use pageserver_api::key::{
18 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
19 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
20 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
21 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
22 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
23 : };
24 : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
25 : use pageserver_api::models::RelSizeMigration;
26 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
27 : use pageserver_api::shard::ShardIdentity;
28 : use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
29 : use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
30 : use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
31 : use serde::{Deserialize, Serialize};
32 : use strum::IntoEnumIterator;
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::{debug, info, info_span, trace, warn};
35 : use utils::bin_ser::{BeSer, DeserializeError};
36 : use utils::lsn::Lsn;
37 : use utils::pausable_failpoint;
38 : use wal_decoder::models::record::NeonWalRecord;
39 : use wal_decoder::models::value::Value;
40 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
41 :
42 : use super::tenant::{PageReconstructError, Timeline};
43 : use crate::aux_file;
44 : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
45 : use crate::keyspace::{KeySpace, KeySpaceAccum};
46 : use crate::metrics::{
47 : RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
48 : RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
49 : RELSIZE_SNAPSHOT_CACHE_MISSES,
50 : };
51 : use crate::span::{
52 : debug_assert_current_span_has_tenant_and_timeline_id,
53 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
54 : };
55 : use crate::tenant::storage_layer::IoConcurrency;
56 : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
57 :
58 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
59 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
60 :
61 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
62 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
63 :
64 : #[derive(Debug)]
65 : pub enum LsnForTimestamp {
66 : /// Found commits both before and after the given timestamp
67 : Present(Lsn),
68 :
69 : /// Found no commits after the given timestamp, this means
70 : /// that the newest data in the branch is older than the given
71 : /// timestamp.
72 : ///
73 : /// All commits <= LSN happened before the given timestamp
74 : Future(Lsn),
75 :
76 : /// The queried timestamp is past our horizon we look back at (PITR)
77 : ///
78 : /// All commits > LSN happened after the given timestamp,
79 : /// but any commits < LSN might have happened before or after
80 : /// the given timestamp. We don't know because no data before
81 : /// the given lsn is available.
82 : Past(Lsn),
83 :
84 : /// We have found no commit with a timestamp,
85 : /// so we can't return anything meaningful.
86 : ///
87 : /// The associated LSN is the lower bound value we can safely
88 : /// create branches on, but no statement is made if it is
89 : /// older or newer than the timestamp.
90 : ///
91 : /// This variant can e.g. be returned right after a
92 : /// cluster import.
93 : NoData(Lsn),
94 : }
95 :
96 : /// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
97 : /// See comments libs/pageserver_api/src/models.rs.
98 : /// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
99 : /// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
100 : #[derive(Debug, Clone, Copy, Default)]
101 : pub struct LsnRange {
102 : pub effective_lsn: Lsn,
103 : pub request_lsn: Lsn,
104 : }
105 :
106 : impl LsnRange {
107 0 : pub fn at(lsn: Lsn) -> LsnRange {
108 0 : LsnRange {
109 0 : effective_lsn: lsn,
110 0 : request_lsn: lsn,
111 0 : }
112 0 : }
113 16 : pub fn is_latest(&self) -> bool {
114 16 : self.request_lsn == Lsn::MAX
115 16 : }
116 : }
117 :
118 : #[derive(Debug, thiserror::Error)]
119 : pub(crate) enum CalculateLogicalSizeError {
120 : #[error("cancelled")]
121 : Cancelled,
122 :
123 : /// Something went wrong while reading the metadata we use to calculate logical size
124 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
125 : /// in the `From` implementation for this variant.
126 : #[error(transparent)]
127 : PageRead(PageReconstructError),
128 :
129 : /// Something went wrong deserializing metadata that we read to calculate logical size
130 : #[error("decode error: {0}")]
131 : Decode(#[from] DeserializeError),
132 : }
133 :
134 : #[derive(Debug, thiserror::Error)]
135 : pub(crate) enum CollectKeySpaceError {
136 : #[error(transparent)]
137 : Decode(#[from] DeserializeError),
138 : #[error(transparent)]
139 : PageRead(PageReconstructError),
140 : #[error("cancelled")]
141 : Cancelled,
142 : }
143 :
144 : impl CollectKeySpaceError {
145 0 : pub(crate) fn is_cancel(&self) -> bool {
146 0 : match self {
147 0 : CollectKeySpaceError::Decode(_) => false,
148 0 : CollectKeySpaceError::PageRead(e) => e.is_cancel(),
149 0 : CollectKeySpaceError::Cancelled => true,
150 : }
151 0 : }
152 0 : pub(crate) fn into_anyhow(self) -> anyhow::Error {
153 0 : match self {
154 0 : CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
155 0 : CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
156 0 : CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
157 : }
158 0 : }
159 : }
160 :
161 : impl From<PageReconstructError> for CollectKeySpaceError {
162 0 : fn from(err: PageReconstructError) -> Self {
163 0 : match err {
164 0 : PageReconstructError::Cancelled => Self::Cancelled,
165 0 : err => Self::PageRead(err),
166 : }
167 0 : }
168 : }
169 :
170 : impl From<PageReconstructError> for CalculateLogicalSizeError {
171 0 : fn from(pre: PageReconstructError) -> Self {
172 0 : match pre {
173 0 : PageReconstructError::Cancelled => Self::Cancelled,
174 0 : _ => Self::PageRead(pre),
175 : }
176 0 : }
177 : }
178 :
179 : #[derive(Debug, thiserror::Error)]
180 : pub enum RelationError {
181 : #[error("invalid relnode")]
182 : InvalidRelnode,
183 : }
184 :
185 : ///
186 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
187 : /// and other special kinds of files, in a versioned key-value store. The
188 : /// Timeline struct provides the key-value store.
189 : ///
190 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
191 : /// implementation, and might be moved into a separate struct later.
192 : impl Timeline {
193 : /// Start ingesting a WAL record, or other atomic modification of
194 : /// the timeline.
195 : ///
196 : /// This provides a transaction-like interface to perform a bunch
197 : /// of modifications atomically.
198 : ///
199 : /// To ingest a WAL record, call begin_modification(lsn) to get a
200 : /// DatadirModification object. Use the functions in the object to
201 : /// modify the repository state, updating all the pages and metadata
202 : /// that the WAL record affects. When you're done, call commit() to
203 : /// commit the changes.
204 : ///
205 : /// Lsn stored in modification is advanced by `ingest_record` and
206 : /// is used by `commit()` to update `last_record_lsn`.
207 : ///
208 : /// Calling commit() will flush all the changes and reset the state,
209 : /// so the `DatadirModification` struct can be reused to perform the next modification.
210 : ///
211 : /// Note that any pending modifications you make through the
212 : /// modification object won't be visible to calls to the 'get' and list
213 : /// functions of the timeline until you finish! And if you update the
214 : /// same page twice, the last update wins.
215 : ///
216 134215 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
217 134215 : where
218 134215 : Self: Sized,
219 : {
220 134215 : DatadirModification {
221 134215 : tline: self,
222 134215 : pending_lsns: Vec::new(),
223 134215 : pending_metadata_pages: HashMap::new(),
224 134215 : pending_data_batch: None,
225 134215 : pending_deletions: Vec::new(),
226 134215 : pending_nblocks: 0,
227 134215 : pending_directory_entries: Vec::new(),
228 134215 : pending_metadata_bytes: 0,
229 134215 : lsn,
230 134215 : }
231 134215 : }
232 :
233 : //------------------------------------------------------------------------------
234 : // Public GET functions
235 : //------------------------------------------------------------------------------
236 :
237 : /// Look up given page version.
238 9192 : pub(crate) async fn get_rel_page_at_lsn(
239 9192 : &self,
240 9192 : tag: RelTag,
241 9192 : blknum: BlockNumber,
242 9192 : version: Version<'_>,
243 9192 : ctx: &RequestContext,
244 9192 : io_concurrency: IoConcurrency,
245 9192 : ) -> Result<Bytes, PageReconstructError> {
246 9192 : match version {
247 9192 : Version::LsnRange(lsns) => {
248 9192 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
249 9192 : let res = self
250 9192 : .get_rel_page_at_lsn_batched(
251 9192 : pages
252 9192 : .iter()
253 9192 : .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
254 9192 : io_concurrency.clone(),
255 9192 : ctx,
256 : )
257 9192 : .await;
258 9192 : assert_eq!(res.len(), 1);
259 9192 : res.into_iter().next().unwrap()
260 : }
261 0 : Version::Modified(modification) => {
262 0 : if tag.relnode == 0 {
263 0 : return Err(PageReconstructError::Other(
264 0 : RelationError::InvalidRelnode.into(),
265 0 : ));
266 0 : }
267 :
268 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
269 0 : if blknum >= nblocks {
270 0 : debug!(
271 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
272 : tag,
273 : blknum,
274 0 : version.get_lsn(),
275 : nblocks
276 : );
277 0 : return Ok(ZERO_PAGE.clone());
278 0 : }
279 :
280 0 : let key = rel_block_to_key(tag, blknum);
281 0 : modification.get(key, ctx).await
282 : }
283 : }
284 9192 : }
285 :
286 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
287 : ///
288 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
289 : ///
290 : /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
291 : /// if the client goes away (e.g. due to timeout or cancellation).
292 : /// TODO: verify that it actually is cancellation-safe.
293 9192 : pub(crate) async fn get_rel_page_at_lsn_batched(
294 9192 : &self,
295 9192 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
296 9192 : io_concurrency: IoConcurrency,
297 9192 : ctx: &RequestContext,
298 9192 : ) -> Vec<Result<Bytes, PageReconstructError>> {
299 9192 : debug_assert_current_span_has_tenant_and_timeline_id();
300 :
301 9192 : let mut slots_filled = 0;
302 9192 : let page_count = pages.len();
303 :
304 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
305 9192 : let mut result = Vec::with_capacity(pages.len());
306 9192 : let result_slots = result.spare_capacity_mut();
307 :
308 9192 : let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
309 9192 : HashMap::with_capacity(pages.len());
310 :
311 9192 : let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
312 9192 : HashMap::with_capacity(pages.len());
313 :
314 9192 : for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
315 9192 : if tag.relnode == 0 {
316 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
317 0 : RelationError::InvalidRelnode.into(),
318 0 : )));
319 :
320 0 : slots_filled += 1;
321 0 : continue;
322 9192 : }
323 9192 : let lsn = lsns.effective_lsn;
324 9192 : let nblocks = {
325 9192 : let ctx = RequestContextBuilder::from(&ctx)
326 9192 : .perf_span(|crnt_perf_span| {
327 0 : info_span!(
328 : target: PERF_TRACE_TARGET,
329 0 : parent: crnt_perf_span,
330 : "GET_REL_SIZE",
331 : reltag=%tag,
332 : lsn=%lsn,
333 : )
334 0 : })
335 9192 : .attached_child();
336 :
337 9192 : match self
338 9192 : .get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
339 9192 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
340 9192 : .await
341 : {
342 9192 : Ok(nblocks) => nblocks,
343 0 : Err(err) => {
344 0 : result_slots[response_slot_idx].write(Err(err));
345 0 : slots_filled += 1;
346 0 : continue;
347 : }
348 : }
349 : };
350 :
351 9192 : if *blknum >= nblocks {
352 0 : debug!(
353 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
354 : tag, blknum, lsn, nblocks
355 : );
356 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
357 0 : slots_filled += 1;
358 0 : continue;
359 9192 : }
360 :
361 9192 : let key = rel_block_to_key(*tag, *blknum);
362 :
363 9192 : let ctx = RequestContextBuilder::from(&ctx)
364 9192 : .perf_span(|crnt_perf_span| {
365 0 : info_span!(
366 : target: PERF_TRACE_TARGET,
367 0 : parent: crnt_perf_span,
368 : "GET_BATCH",
369 : batch_size = %page_count,
370 : )
371 0 : })
372 9192 : .attached_child();
373 :
374 9192 : let key_slots = keys_slots.entry(key).or_default();
375 9192 : key_slots.push((response_slot_idx, ctx));
376 :
377 9192 : let acc = req_keyspaces.entry(lsn).or_default();
378 9192 : acc.add_key(key);
379 : }
380 :
381 9192 : let query: Vec<(Lsn, KeySpace)> = req_keyspaces
382 9192 : .into_iter()
383 9192 : .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
384 9192 : .collect();
385 :
386 9192 : let query = VersionedKeySpaceQuery::scattered(query);
387 9192 : let res = self
388 9192 : .get_vectored(query, io_concurrency, ctx)
389 9192 : .maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
390 9192 : .await;
391 :
392 9192 : match res {
393 9192 : Ok(results) => {
394 18384 : for (key, res) in results {
395 9192 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
396 9192 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
397 :
398 9192 : for (slot, req_ctx) in key_slots {
399 0 : let clone = match &res {
400 0 : Ok(buf) => Ok(buf.clone()),
401 0 : Err(err) => Err(match err {
402 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
403 :
404 0 : x @ PageReconstructError::Other(_)
405 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
406 0 : | x @ PageReconstructError::WalRedo(_)
407 0 : | x @ PageReconstructError::MissingKey(_) => {
408 0 : PageReconstructError::Other(anyhow::anyhow!(
409 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
410 0 : ))
411 : }
412 : }),
413 : };
414 :
415 0 : result_slots[slot].write(clone);
416 : // There is no standardized way to express that the batched span followed from N request spans.
417 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
418 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
419 0 : req_ctx.perf_follows_from(ctx);
420 0 : slots_filled += 1;
421 : }
422 :
423 9192 : result_slots[first_slot].write(res);
424 9192 : first_req_ctx.perf_follows_from(ctx);
425 9192 : slots_filled += 1;
426 : }
427 : }
428 0 : Err(err) => {
429 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
430 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
431 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
432 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
433 : // but without taking ownership of the GetVectoredError
434 0 : let err = match &err {
435 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
436 : // TODO: restructure get_vectored API to make this error per-key
437 0 : GetVectoredError::MissingKey(err) => {
438 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
439 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
440 0 : )))
441 : }
442 : // TODO: restructure get_vectored API to make this error per-key
443 0 : GetVectoredError::GetReadyAncestorError(err) => {
444 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
445 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
446 0 : )))
447 : }
448 : // TODO: restructure get_vectored API to make this error per-key
449 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
450 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
451 0 : )),
452 : // TODO: we can prevent this error class by moving this check into the type system
453 0 : GetVectoredError::InvalidLsn(e) => {
454 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
455 : }
456 : // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys
457 : // TODO: we can prevent this error class by moving this check into the type system
458 0 : GetVectoredError::Oversized(err, max) => {
459 0 : Err(anyhow::anyhow!("batching oversized: {err} > {max}").into())
460 : }
461 : };
462 :
463 0 : req_ctx.perf_follows_from(ctx);
464 0 : result_slots[*slot].write(err);
465 : }
466 :
467 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
468 : }
469 : };
470 :
471 9192 : assert_eq!(slots_filled, page_count);
472 : // SAFETY:
473 : // 1. `result` and any of its uninint members are not read from until this point
474 : // 2. The length below is tracked at run-time and matches the number of requested pages.
475 9192 : unsafe {
476 9192 : result.set_len(page_count);
477 9192 : }
478 :
479 9192 : result
480 9192 : }
481 :
482 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
483 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
484 : /// for pages up to the highest page number it has stored.
485 0 : pub(crate) async fn get_db_size(
486 0 : &self,
487 0 : spcnode: Oid,
488 0 : dbnode: Oid,
489 0 : version: Version<'_>,
490 0 : ctx: &RequestContext,
491 0 : ) -> Result<usize, PageReconstructError> {
492 0 : let mut total_blocks = 0;
493 :
494 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
495 :
496 0 : if rels.is_empty() {
497 0 : return Ok(0);
498 0 : }
499 :
500 : // Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`.
501 0 : let reldir_key = rel_dir_to_key(spcnode, dbnode);
502 0 : let buf = version.get(self, reldir_key, ctx).await?;
503 0 : let reldir = RelDirectory::des(&buf)?;
504 :
505 0 : for rel in rels {
506 0 : let n_blocks = self
507 0 : .get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), false, ctx)
508 0 : .await?
509 0 : .expect("allow_missing=false");
510 0 : total_blocks += n_blocks as usize;
511 : }
512 0 : Ok(total_blocks)
513 0 : }
514 :
515 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
516 : ///
517 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
518 : /// page number stored in the shard.
519 12217 : pub(crate) async fn get_rel_size(
520 12217 : &self,
521 12217 : tag: RelTag,
522 12217 : version: Version<'_>,
523 12217 : ctx: &RequestContext,
524 12217 : ) -> Result<BlockNumber, PageReconstructError> {
525 12217 : Ok(self
526 12217 : .get_rel_size_in_reldir(tag, version, None, false, ctx)
527 12217 : .await?
528 12215 : .expect("allow_missing=false"))
529 12217 : }
530 :
531 : /// Get size of a relation file. If `allow_missing` is true, returns None for missing relations,
532 : /// otherwise errors.
533 : ///
534 : /// INVARIANT: never returns None if `allow_missing=false`.
535 : ///
536 : /// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
537 12217 : pub(crate) async fn get_rel_size_in_reldir(
538 12217 : &self,
539 12217 : tag: RelTag,
540 12217 : version: Version<'_>,
541 12217 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
542 12217 : allow_missing: bool,
543 12217 : ctx: &RequestContext,
544 12217 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
545 12217 : if tag.relnode == 0 {
546 0 : return Err(PageReconstructError::Other(
547 0 : RelationError::InvalidRelnode.into(),
548 0 : ));
549 12217 : }
550 :
551 12217 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
552 12210 : return Ok(Some(nblocks));
553 7 : }
554 :
555 7 : if allow_missing
556 0 : && !self
557 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
558 0 : .await?
559 : {
560 0 : return Ok(None);
561 7 : }
562 :
563 7 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
564 0 : && !self
565 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
566 0 : .await?
567 : {
568 : // FIXME: Postgres sometimes calls smgrcreate() to create
569 : // FSM, and smgrnblocks() on it immediately afterwards,
570 : // without extending it. Tolerate that by claiming that
571 : // any non-existent FSM fork has size 0.
572 0 : return Ok(Some(0));
573 7 : }
574 :
575 7 : let key = rel_size_to_key(tag);
576 7 : let mut buf = version.get(self, key, ctx).await?;
577 5 : let nblocks = buf.get_u32_le();
578 :
579 5 : self.update_cached_rel_size(tag, version, nblocks);
580 :
581 5 : Ok(Some(nblocks))
582 12217 : }
583 :
584 : /// Does the relation exist?
585 : ///
586 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
587 : /// the shard stores pages for.
588 : ///
589 3025 : pub(crate) async fn get_rel_exists(
590 3025 : &self,
591 3025 : tag: RelTag,
592 3025 : version: Version<'_>,
593 3025 : ctx: &RequestContext,
594 3025 : ) -> Result<bool, PageReconstructError> {
595 3025 : self.get_rel_exists_in_reldir(tag, version, None, ctx).await
596 3025 : }
597 :
598 : /// Does the relation exist? With a cached deserialized `RelDirectory`.
599 : ///
600 : /// There are some cases where the caller loops across all relations. In that specific case,
601 : /// the caller should obtain the deserialized `RelDirectory` first and then call this function
602 : /// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing
603 : /// a new API (e.g., `get_rel_exists_batched`).
604 3025 : pub(crate) async fn get_rel_exists_in_reldir(
605 3025 : &self,
606 3025 : tag: RelTag,
607 3025 : version: Version<'_>,
608 3025 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
609 3025 : ctx: &RequestContext,
610 3025 : ) -> Result<bool, PageReconstructError> {
611 3025 : if tag.relnode == 0 {
612 0 : return Err(PageReconstructError::Other(
613 0 : RelationError::InvalidRelnode.into(),
614 0 : ));
615 3025 : }
616 :
617 : // first try to lookup relation in cache
618 3025 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
619 3016 : return Ok(true);
620 9 : }
621 : // then check if the database was already initialized.
622 : // get_rel_exists can be called before dbdir is created.
623 9 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
624 9 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
625 9 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
626 0 : return Ok(false);
627 9 : }
628 :
629 : // Read path: first read the new reldir keyspace. Early return if the relation exists.
630 : // Otherwise, read the old reldir keyspace.
631 : // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
632 :
633 : if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
634 9 : self.get_rel_size_v2_status()
635 : {
636 : // fetch directory listing (new)
637 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
638 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
639 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
640 0 : let exists_v2 = buf == RelDirExists::Exists;
641 : // Fast path: if the relation exists in the new format, return true.
642 : // TODO: we should have a verification mode that checks both keyspaces
643 : // to ensure the relation only exists in one of them.
644 0 : if exists_v2 {
645 0 : return Ok(true);
646 0 : }
647 9 : }
648 :
649 : // fetch directory listing (old)
650 :
651 9 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
652 :
653 9 : if let Some((cached_key, dir)) = deserialized_reldir_v1 {
654 0 : if cached_key == key {
655 0 : return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
656 0 : } else if cfg!(test) || cfg!(feature = "testing") {
657 0 : panic!("cached reldir key mismatch: {cached_key} != {key}");
658 : } else {
659 0 : warn!("cached reldir key mismatch: {cached_key} != {key}");
660 : }
661 : // Fallback to reading the directory from the datadir.
662 9 : }
663 9 : let buf = version.get(self, key, ctx).await?;
664 :
665 9 : let dir = RelDirectory::des(&buf)?;
666 9 : let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
667 9 : Ok(exists_v1)
668 3025 : }
669 :
670 : /// Get a list of all existing relations in given tablespace and database.
671 : ///
672 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
673 : /// the shard stores pages for.
674 : ///
675 : /// # Cancel-Safety
676 : ///
677 : /// This method is cancellation-safe.
678 0 : pub(crate) async fn list_rels(
679 0 : &self,
680 0 : spcnode: Oid,
681 0 : dbnode: Oid,
682 0 : version: Version<'_>,
683 0 : ctx: &RequestContext,
684 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
685 : // fetch directory listing (old)
686 0 : let key = rel_dir_to_key(spcnode, dbnode);
687 0 : let buf = version.get(self, key, ctx).await?;
688 :
689 0 : let dir = RelDirectory::des(&buf)?;
690 0 : let rels_v1: HashSet<RelTag> =
691 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
692 0 : spcnode,
693 0 : dbnode,
694 0 : relnode: *relnode,
695 0 : forknum: *forknum,
696 0 : }));
697 :
698 0 : if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
699 0 : return Ok(rels_v1);
700 0 : }
701 :
702 : // scan directory listing (new), merge with the old results
703 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
704 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
705 0 : self.conf.get_vectored_concurrent_io,
706 0 : self.gate
707 0 : .enter()
708 0 : .map_err(|_| PageReconstructError::Cancelled)?,
709 : );
710 0 : let results = self
711 0 : .scan(
712 0 : KeySpace::single(key_range),
713 0 : version.get_lsn(),
714 0 : ctx,
715 0 : io_concurrency,
716 0 : )
717 0 : .await?;
718 0 : let mut rels = rels_v1;
719 0 : for (key, val) in results {
720 0 : let val = RelDirExists::decode(&val?)
721 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
722 0 : assert_eq!(key.field6, 1);
723 0 : assert_eq!(key.field2, spcnode);
724 0 : assert_eq!(key.field3, dbnode);
725 0 : let tag = RelTag {
726 0 : spcnode,
727 0 : dbnode,
728 0 : relnode: key.field4,
729 0 : forknum: key.field5,
730 0 : };
731 0 : if val == RelDirExists::Removed {
732 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
733 0 : continue;
734 0 : }
735 0 : let did_not_contain = rels.insert(tag);
736 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
737 : }
738 0 : Ok(rels)
739 0 : }
740 :
741 : /// Get the whole SLRU segment
742 0 : pub(crate) async fn get_slru_segment(
743 0 : &self,
744 0 : kind: SlruKind,
745 0 : segno: u32,
746 0 : lsn: Lsn,
747 0 : ctx: &RequestContext,
748 0 : ) -> Result<Bytes, PageReconstructError> {
749 0 : assert!(self.tenant_shard_id.is_shard_zero());
750 0 : let n_blocks = self
751 0 : .get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
752 0 : .await?;
753 :
754 0 : let keyspace = KeySpace::single(
755 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
756 : );
757 :
758 0 : let batches = keyspace.partition(
759 0 : self.get_shard_identity(),
760 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
761 0 : BLCKSZ as u64,
762 : );
763 :
764 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
765 0 : self.conf.get_vectored_concurrent_io,
766 0 : self.gate
767 0 : .enter()
768 0 : .map_err(|_| PageReconstructError::Cancelled)?,
769 : );
770 :
771 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
772 0 : for batch in batches.parts {
773 0 : let query = VersionedKeySpaceQuery::uniform(batch, lsn);
774 0 : let blocks = self
775 0 : .get_vectored(query, io_concurrency.clone(), ctx)
776 0 : .await?;
777 :
778 0 : for (_key, block) in blocks {
779 0 : let block = block?;
780 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
781 : }
782 : }
783 :
784 0 : Ok(segment.freeze())
785 0 : }
786 :
787 : /// Get size of an SLRU segment
788 0 : pub(crate) async fn get_slru_segment_size(
789 0 : &self,
790 0 : kind: SlruKind,
791 0 : segno: u32,
792 0 : version: Version<'_>,
793 0 : ctx: &RequestContext,
794 0 : ) -> Result<BlockNumber, PageReconstructError> {
795 0 : assert!(self.tenant_shard_id.is_shard_zero());
796 0 : let key = slru_segment_size_to_key(kind, segno);
797 0 : let mut buf = version.get(self, key, ctx).await?;
798 0 : Ok(buf.get_u32_le())
799 0 : }
800 :
801 : /// Does the slru segment exist?
802 0 : pub(crate) async fn get_slru_segment_exists(
803 0 : &self,
804 0 : kind: SlruKind,
805 0 : segno: u32,
806 0 : version: Version<'_>,
807 0 : ctx: &RequestContext,
808 0 : ) -> Result<bool, PageReconstructError> {
809 0 : assert!(self.tenant_shard_id.is_shard_zero());
810 : // fetch directory listing
811 0 : let key = slru_dir_to_key(kind);
812 0 : let buf = version.get(self, key, ctx).await?;
813 :
814 0 : let dir = SlruSegmentDirectory::des(&buf)?;
815 0 : Ok(dir.segments.contains(&segno))
816 0 : }
817 :
818 : /// Locate LSN, such that all transactions that committed before
819 : /// 'search_timestamp' are visible, but nothing newer is.
820 : ///
821 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
822 : /// so it's not well defined which LSN you get if there were multiple commits
823 : /// "in flight" at that point in time.
824 : ///
825 0 : pub(crate) async fn find_lsn_for_timestamp(
826 0 : &self,
827 0 : search_timestamp: TimestampTz,
828 0 : cancel: &CancellationToken,
829 0 : ctx: &RequestContext,
830 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
831 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
832 :
833 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
834 0 : let gc_cutoff_planned = {
835 0 : let gc_info = self.gc_info.read().unwrap();
836 0 : info!(cutoffs=?gc_info.cutoffs, applied_cutoff=%*gc_cutoff_lsn_guard, "starting find_lsn_for_timestamp");
837 0 : gc_info.min_cutoff()
838 : };
839 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
840 : // but let's be defensive.
841 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
842 : // We use this method to figure out the branching LSN for the new branch, but the
843 : // GC cutoff could be before the branching point and we cannot create a new branch
844 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
845 : // on the safe side.
846 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
847 0 : let max_lsn = self.get_last_record_lsn();
848 :
849 : // LSNs are always 8-byte aligned. low/mid/high represent the
850 : // LSN divided by 8.
851 0 : let mut low = min_lsn.0 / 8;
852 0 : let mut high = max_lsn.0 / 8 + 1;
853 :
854 0 : let mut found_smaller = false;
855 0 : let mut found_larger = false;
856 :
857 0 : while low < high {
858 0 : if cancel.is_cancelled() {
859 0 : return Err(PageReconstructError::Cancelled);
860 0 : }
861 : // cannot overflow, high and low are both smaller than u64::MAX / 2
862 0 : let mid = (high + low) / 2;
863 :
864 0 : let cmp = match self
865 0 : .is_latest_commit_timestamp_ge_than(
866 0 : search_timestamp,
867 0 : Lsn(mid * 8),
868 0 : &mut found_smaller,
869 0 : &mut found_larger,
870 0 : ctx,
871 : )
872 0 : .await
873 : {
874 0 : Ok(res) => res,
875 0 : Err(PageReconstructError::MissingKey(e)) => {
876 0 : warn!(
877 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
878 : e
879 : );
880 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
881 0 : return Ok(LsnForTimestamp::Past(min_lsn));
882 : }
883 0 : Err(e) => return Err(e),
884 : };
885 :
886 0 : if cmp {
887 0 : high = mid;
888 0 : } else {
889 0 : low = mid + 1;
890 0 : }
891 : }
892 :
893 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
894 : // so the LSN of the last commit record before or at `search_timestamp`.
895 : // Remove one from `low` to get `t`.
896 : //
897 : // FIXME: it would be better to get the LSN of the previous commit.
898 : // Otherwise, if you restore to the returned LSN, the database will
899 : // include physical changes from later commits that will be marked
900 : // as aborted, and will need to be vacuumed away.
901 0 : let commit_lsn = Lsn((low - 1) * 8);
902 0 : match (found_smaller, found_larger) {
903 : (false, false) => {
904 : // This can happen if no commit records have been processed yet, e.g.
905 : // just after importing a cluster.
906 0 : Ok(LsnForTimestamp::NoData(min_lsn))
907 : }
908 : (false, true) => {
909 : // Didn't find any commit timestamps smaller than the request
910 0 : Ok(LsnForTimestamp::Past(min_lsn))
911 : }
912 0 : (true, _) if commit_lsn < min_lsn => {
913 : // the search above did set found_smaller to true but it never increased the lsn.
914 : // Then, low is still the old min_lsn, and the subtraction above gave a value
915 : // below the min_lsn. We should never do that.
916 0 : Ok(LsnForTimestamp::Past(min_lsn))
917 : }
918 : (true, false) => {
919 : // Only found commits with timestamps smaller than the request.
920 : // It's still a valid case for branch creation, return it.
921 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
922 : // case, anyway.
923 0 : Ok(LsnForTimestamp::Future(commit_lsn))
924 : }
925 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
926 : }
927 0 : }
928 :
929 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
930 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
931 : ///
932 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
933 : /// with a smaller/larger timestamp.
934 : ///
935 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
936 0 : &self,
937 0 : search_timestamp: TimestampTz,
938 0 : probe_lsn: Lsn,
939 0 : found_smaller: &mut bool,
940 0 : found_larger: &mut bool,
941 0 : ctx: &RequestContext,
942 0 : ) -> Result<bool, PageReconstructError> {
943 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
944 0 : if timestamp >= search_timestamp {
945 0 : *found_larger = true;
946 0 : return ControlFlow::Break(true);
947 0 : } else {
948 0 : *found_smaller = true;
949 0 : }
950 0 : ControlFlow::Continue(())
951 0 : })
952 0 : .await
953 0 : }
954 :
955 : /// Obtain the timestamp for the given lsn.
956 : ///
957 : /// If the lsn has no timestamps (e.g. no commits), returns None.
958 0 : pub(crate) async fn get_timestamp_for_lsn(
959 0 : &self,
960 0 : probe_lsn: Lsn,
961 0 : ctx: &RequestContext,
962 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
963 0 : let mut max: Option<TimestampTz> = None;
964 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
965 0 : if let Some(max_prev) = max {
966 0 : max = Some(max_prev.max(timestamp));
967 0 : } else {
968 0 : max = Some(timestamp);
969 0 : }
970 0 : ControlFlow::Continue(())
971 0 : })
972 0 : .await?;
973 :
974 0 : Ok(max)
975 0 : }
976 :
977 : /// Runs the given function on all the timestamps for a given lsn
978 : ///
979 : /// The return value is either given by the closure, or set to the `Default`
980 : /// impl's output.
981 0 : async fn map_all_timestamps<T: Default>(
982 0 : &self,
983 0 : probe_lsn: Lsn,
984 0 : ctx: &RequestContext,
985 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
986 0 : ) -> Result<T, PageReconstructError> {
987 0 : for segno in self
988 0 : .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
989 0 : .await?
990 : {
991 0 : let nblocks = self
992 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
993 0 : .await?;
994 :
995 0 : let keyspace = KeySpace::single(
996 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
997 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
998 : );
999 :
1000 0 : let batches = keyspace.partition(
1001 0 : self.get_shard_identity(),
1002 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
1003 0 : BLCKSZ as u64,
1004 : );
1005 :
1006 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
1007 0 : self.conf.get_vectored_concurrent_io,
1008 0 : self.gate
1009 0 : .enter()
1010 0 : .map_err(|_| PageReconstructError::Cancelled)?,
1011 : );
1012 :
1013 0 : for batch in batches.parts.into_iter().rev() {
1014 0 : let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
1015 0 : let blocks = self
1016 0 : .get_vectored(query, io_concurrency.clone(), ctx)
1017 0 : .await?;
1018 :
1019 0 : for (_key, clog_page) in blocks.into_iter().rev() {
1020 0 : let clog_page = clog_page?;
1021 :
1022 0 : if clog_page.len() == BLCKSZ as usize + 8 {
1023 0 : let mut timestamp_bytes = [0u8; 8];
1024 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
1025 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
1026 :
1027 0 : match f(timestamp) {
1028 0 : ControlFlow::Break(b) => return Ok(b),
1029 0 : ControlFlow::Continue(()) => (),
1030 : }
1031 0 : }
1032 : }
1033 : }
1034 : }
1035 0 : Ok(Default::default())
1036 0 : }
1037 :
1038 0 : pub(crate) async fn get_slru_keyspace(
1039 0 : &self,
1040 0 : version: Version<'_>,
1041 0 : ctx: &RequestContext,
1042 0 : ) -> Result<KeySpace, PageReconstructError> {
1043 0 : let mut accum = KeySpaceAccum::new();
1044 :
1045 0 : for kind in SlruKind::iter() {
1046 0 : let mut segments: Vec<u32> = self
1047 0 : .list_slru_segments(kind, version, ctx)
1048 0 : .await?
1049 0 : .into_iter()
1050 0 : .collect();
1051 0 : segments.sort_unstable();
1052 :
1053 0 : for seg in segments {
1054 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
1055 :
1056 0 : accum.add_range(
1057 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
1058 : );
1059 : }
1060 : }
1061 :
1062 0 : Ok(accum.to_keyspace())
1063 0 : }
1064 :
1065 : /// Get a list of SLRU segments
1066 0 : pub(crate) async fn list_slru_segments(
1067 0 : &self,
1068 0 : kind: SlruKind,
1069 0 : version: Version<'_>,
1070 0 : ctx: &RequestContext,
1071 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
1072 : // fetch directory entry
1073 0 : let key = slru_dir_to_key(kind);
1074 :
1075 0 : let buf = version.get(self, key, ctx).await?;
1076 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
1077 0 : }
1078 :
1079 0 : pub(crate) async fn get_relmap_file(
1080 0 : &self,
1081 0 : spcnode: Oid,
1082 0 : dbnode: Oid,
1083 0 : version: Version<'_>,
1084 0 : ctx: &RequestContext,
1085 0 : ) -> Result<Bytes, PageReconstructError> {
1086 0 : let key = relmap_file_key(spcnode, dbnode);
1087 :
1088 0 : let buf = version.get(self, key, ctx).await?;
1089 0 : Ok(buf)
1090 0 : }
1091 :
1092 169 : pub(crate) async fn list_dbdirs(
1093 169 : &self,
1094 169 : lsn: Lsn,
1095 169 : ctx: &RequestContext,
1096 169 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
1097 : // fetch directory entry
1098 169 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1099 :
1100 169 : Ok(DbDirectory::des(&buf)?.dbdirs)
1101 169 : }
1102 :
1103 0 : pub(crate) async fn get_twophase_file(
1104 0 : &self,
1105 0 : xid: u64,
1106 0 : lsn: Lsn,
1107 0 : ctx: &RequestContext,
1108 0 : ) -> Result<Bytes, PageReconstructError> {
1109 0 : let key = twophase_file_key(xid);
1110 0 : let buf = self.get(key, lsn, ctx).await?;
1111 0 : Ok(buf)
1112 0 : }
1113 :
1114 170 : pub(crate) async fn list_twophase_files(
1115 170 : &self,
1116 170 : lsn: Lsn,
1117 170 : ctx: &RequestContext,
1118 170 : ) -> Result<HashSet<u64>, PageReconstructError> {
1119 : // fetch directory entry
1120 170 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
1121 :
1122 170 : if self.pg_version >= PgMajorVersion::PG17 {
1123 157 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
1124 : } else {
1125 13 : Ok(TwoPhaseDirectory::des(&buf)?
1126 : .xids
1127 13 : .iter()
1128 13 : .map(|x| u64::from(*x))
1129 13 : .collect())
1130 : }
1131 170 : }
1132 :
1133 0 : pub(crate) async fn get_control_file(
1134 0 : &self,
1135 0 : lsn: Lsn,
1136 0 : ctx: &RequestContext,
1137 0 : ) -> Result<Bytes, PageReconstructError> {
1138 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1139 0 : }
1140 :
1141 6 : pub(crate) async fn get_checkpoint(
1142 6 : &self,
1143 6 : lsn: Lsn,
1144 6 : ctx: &RequestContext,
1145 6 : ) -> Result<Bytes, PageReconstructError> {
1146 6 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1147 6 : }
1148 :
1149 6 : async fn list_aux_files_v2(
1150 6 : &self,
1151 6 : lsn: Lsn,
1152 6 : ctx: &RequestContext,
1153 6 : io_concurrency: IoConcurrency,
1154 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1155 6 : let kv = self
1156 6 : .scan(
1157 6 : KeySpace::single(Key::metadata_aux_key_range()),
1158 6 : lsn,
1159 6 : ctx,
1160 6 : io_concurrency,
1161 6 : )
1162 6 : .await?;
1163 6 : let mut result = HashMap::new();
1164 6 : let mut sz = 0;
1165 15 : for (_, v) in kv {
1166 9 : let v = v?;
1167 9 : let v = aux_file::decode_file_value_bytes(&v)
1168 9 : .context("value decode")
1169 9 : .map_err(PageReconstructError::Other)?;
1170 17 : for (fname, content) in v {
1171 8 : sz += fname.len();
1172 8 : sz += content.len();
1173 8 : result.insert(fname, content);
1174 8 : }
1175 : }
1176 6 : self.aux_file_size_estimator.on_initial(sz);
1177 6 : Ok(result)
1178 6 : }
1179 :
1180 0 : pub(crate) async fn trigger_aux_file_size_computation(
1181 0 : &self,
1182 0 : lsn: Lsn,
1183 0 : ctx: &RequestContext,
1184 0 : io_concurrency: IoConcurrency,
1185 0 : ) -> Result<(), PageReconstructError> {
1186 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1187 0 : Ok(())
1188 0 : }
1189 :
1190 6 : pub(crate) async fn list_aux_files(
1191 6 : &self,
1192 6 : lsn: Lsn,
1193 6 : ctx: &RequestContext,
1194 6 : io_concurrency: IoConcurrency,
1195 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1196 6 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1197 6 : }
1198 :
1199 2 : pub(crate) async fn get_replorigins(
1200 2 : &self,
1201 2 : lsn: Lsn,
1202 2 : ctx: &RequestContext,
1203 2 : io_concurrency: IoConcurrency,
1204 2 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1205 2 : let kv = self
1206 2 : .scan(
1207 2 : KeySpace::single(repl_origin_key_range()),
1208 2 : lsn,
1209 2 : ctx,
1210 2 : io_concurrency,
1211 2 : )
1212 2 : .await?;
1213 2 : let mut result = HashMap::new();
1214 6 : for (k, v) in kv {
1215 5 : let v = v?;
1216 5 : if v.is_empty() {
1217 : // This is a tombstone -- we can skip it.
1218 : // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
1219 : // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
1220 : // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
1221 : // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
1222 2 : continue;
1223 3 : }
1224 3 : let origin_id = k.field6 as RepOriginId;
1225 3 : let origin_lsn = Lsn::des(&v)
1226 3 : .with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
1227 2 : if origin_lsn != Lsn::INVALID {
1228 2 : result.insert(origin_id, origin_lsn);
1229 2 : }
1230 : }
1231 1 : Ok(result)
1232 2 : }
1233 :
1234 : /// Does the same as get_current_logical_size but counted on demand.
1235 : /// Used to initialize the logical size tracking on startup.
1236 : ///
1237 : /// Only relation blocks are counted currently. That excludes metadata,
1238 : /// SLRUs, twophase files etc.
1239 : ///
1240 : /// # Cancel-Safety
1241 : ///
1242 : /// This method is cancellation-safe.
1243 7 : pub(crate) async fn get_current_logical_size_non_incremental(
1244 7 : &self,
1245 7 : lsn: Lsn,
1246 7 : ctx: &RequestContext,
1247 7 : ) -> Result<u64, CalculateLogicalSizeError> {
1248 7 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1249 :
1250 7 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1251 :
1252 : // Fetch list of database dirs and iterate them
1253 7 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1254 7 : let dbdir = DbDirectory::des(&buf)?;
1255 :
1256 7 : let mut total_size: u64 = 0;
1257 7 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1258 0 : for rel in self
1259 0 : .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
1260 0 : .await?
1261 : {
1262 0 : if self.cancel.is_cancelled() {
1263 0 : return Err(CalculateLogicalSizeError::Cancelled);
1264 0 : }
1265 0 : let relsize_key = rel_size_to_key(rel);
1266 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1267 0 : let relsize = buf.get_u32_le();
1268 :
1269 0 : total_size += relsize as u64;
1270 : }
1271 : }
1272 7 : Ok(total_size * BLCKSZ as u64)
1273 7 : }
1274 :
1275 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1276 : /// for gc-compaction.
1277 : ///
1278 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1279 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1280 : /// be kept only for a specific range of LSN.
1281 : ///
1282 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1283 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1284 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1285 : /// determine which keys to retain/drop for gc-compaction.
1286 : ///
1287 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1288 : /// to be retained for each of the branch LSN.
1289 : ///
1290 : /// The return value is (dense keyspace, sparse keyspace).
1291 27 : pub(crate) async fn collect_gc_compaction_keyspace(
1292 27 : &self,
1293 27 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1294 27 : let metadata_key_begin = Key::metadata_key_range().start;
1295 27 : let aux_v1_key = AUX_FILES_KEY;
1296 27 : let dense_keyspace = KeySpace {
1297 27 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1298 27 : };
1299 27 : Ok((
1300 27 : dense_keyspace,
1301 27 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1302 27 : ))
1303 27 : }
1304 :
1305 : ///
1306 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1307 : /// Anything that's not listed maybe removed from the underlying storage (from
1308 : /// that LSN forwards).
1309 : ///
1310 : /// The return value is (dense keyspace, sparse keyspace).
1311 169 : pub(crate) async fn collect_keyspace(
1312 169 : &self,
1313 169 : lsn: Lsn,
1314 169 : ctx: &RequestContext,
1315 169 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1316 : // Iterate through key ranges, greedily packing them into partitions
1317 169 : let mut result = KeySpaceAccum::new();
1318 :
1319 : // The dbdir metadata always exists
1320 169 : result.add_key(DBDIR_KEY);
1321 :
1322 : // Fetch list of database dirs and iterate them
1323 169 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1324 169 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1325 :
1326 169 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1327 169 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1328 0 : if has_relmap_file {
1329 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1330 0 : }
1331 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1332 :
1333 0 : let mut rels: Vec<RelTag> = self
1334 0 : .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
1335 0 : .await?
1336 0 : .into_iter()
1337 0 : .collect();
1338 0 : rels.sort_unstable();
1339 0 : for rel in rels {
1340 0 : let relsize_key = rel_size_to_key(rel);
1341 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1342 0 : let relsize = buf.get_u32_le();
1343 :
1344 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1345 0 : result.add_key(relsize_key);
1346 : }
1347 : }
1348 :
1349 : // Iterate SLRUs next
1350 169 : if self.tenant_shard_id.is_shard_zero() {
1351 498 : for kind in [
1352 166 : SlruKind::Clog,
1353 166 : SlruKind::MultiXactMembers,
1354 166 : SlruKind::MultiXactOffsets,
1355 : ] {
1356 498 : let slrudir_key = slru_dir_to_key(kind);
1357 498 : result.add_key(slrudir_key);
1358 498 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1359 498 : let dir = SlruSegmentDirectory::des(&buf)?;
1360 498 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1361 498 : segments.sort_unstable();
1362 498 : for segno in segments {
1363 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1364 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1365 0 : let segsize = buf.get_u32_le();
1366 :
1367 0 : result.add_range(
1368 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1369 : );
1370 0 : result.add_key(segsize_key);
1371 : }
1372 : }
1373 3 : }
1374 :
1375 : // Then pg_twophase
1376 169 : result.add_key(TWOPHASEDIR_KEY);
1377 :
1378 169 : let mut xids: Vec<u64> = self
1379 169 : .list_twophase_files(lsn, ctx)
1380 169 : .await?
1381 169 : .iter()
1382 169 : .cloned()
1383 169 : .collect();
1384 169 : xids.sort_unstable();
1385 169 : for xid in xids {
1386 0 : result.add_key(twophase_file_key(xid));
1387 0 : }
1388 :
1389 169 : result.add_key(CONTROLFILE_KEY);
1390 169 : result.add_key(CHECKPOINT_KEY);
1391 :
1392 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1393 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1394 : // and the keys will not be garbage-colllected.
1395 : #[cfg(test)]
1396 : {
1397 169 : let guard = self.extra_test_dense_keyspace.load();
1398 169 : for kr in &guard.ranges {
1399 0 : result.add_range(kr.clone());
1400 0 : }
1401 : }
1402 :
1403 169 : let dense_keyspace = result.to_keyspace();
1404 169 : let sparse_keyspace = SparseKeySpace(KeySpace {
1405 169 : ranges: vec![
1406 169 : Key::metadata_aux_key_range(),
1407 169 : repl_origin_key_range(),
1408 169 : Key::rel_dir_sparse_key_range(),
1409 169 : ],
1410 169 : });
1411 :
1412 169 : if cfg!(debug_assertions) {
1413 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1414 :
1415 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1416 : // category of sparse keys are split into their own image/delta files. If there
1417 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1418 : // and we want the developer to keep the keyspaces separated.
1419 :
1420 169 : let ranges = &sparse_keyspace.0.ranges;
1421 :
1422 : // TODO: use a single overlaps_with across the codebase
1423 507 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1424 507 : !(a.end <= b.start || b.end <= a.start)
1425 507 : }
1426 507 : for i in 0..ranges.len() {
1427 507 : for j in 0..i {
1428 507 : if overlaps_with(&ranges[i], &ranges[j]) {
1429 0 : panic!(
1430 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1431 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1432 : );
1433 507 : }
1434 : }
1435 : }
1436 338 : for i in 1..ranges.len() {
1437 338 : assert!(
1438 338 : ranges[i - 1].end <= ranges[i].start,
1439 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1440 0 : ranges[i - 1].start,
1441 0 : ranges[i - 1].end,
1442 0 : ranges[i].start,
1443 0 : ranges[i].end
1444 : );
1445 : }
1446 0 : }
1447 :
1448 169 : Ok((dense_keyspace, sparse_keyspace))
1449 169 : }
1450 :
1451 : /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
1452 : /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
1453 : /// at the particular LSN (snapshot).
1454 224270 : pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
1455 224270 : let lsn = version.get_lsn();
1456 : {
1457 224270 : let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
1458 224270 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
1459 224259 : if lsn >= *cached_lsn {
1460 221686 : RELSIZE_LATEST_CACHE_HITS.inc();
1461 221686 : return Some(*nblocks);
1462 2573 : }
1463 2573 : RELSIZE_CACHE_MISSES_OLD.inc();
1464 11 : }
1465 : }
1466 : {
1467 2584 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1468 2584 : if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
1469 2563 : RELSIZE_SNAPSHOT_CACHE_HITS.inc();
1470 2563 : return Some(*nblock);
1471 21 : }
1472 : }
1473 21 : if version.is_latest() {
1474 10 : RELSIZE_LATEST_CACHE_MISSES.inc();
1475 11 : } else {
1476 11 : RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
1477 11 : }
1478 21 : None
1479 224270 : }
1480 :
1481 : /// Update cached relation size if there is no more recent update
1482 5 : pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
1483 5 : let lsn = version.get_lsn();
1484 5 : if version.is_latest() {
1485 0 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1486 0 : match rel_size_cache.entry(tag) {
1487 0 : hash_map::Entry::Occupied(mut entry) => {
1488 0 : let cached_lsn = entry.get_mut();
1489 0 : if lsn >= cached_lsn.0 {
1490 0 : *cached_lsn = (lsn, nblocks);
1491 0 : }
1492 : }
1493 0 : hash_map::Entry::Vacant(entry) => {
1494 0 : entry.insert((lsn, nblocks));
1495 0 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1496 0 : }
1497 : }
1498 : } else {
1499 5 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1500 5 : if rel_size_cache.capacity() != 0 {
1501 5 : rel_size_cache.insert((lsn, tag), nblocks);
1502 5 : RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
1503 5 : }
1504 : }
1505 5 : }
1506 :
1507 : /// Store cached relation size
1508 141360 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1509 141360 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1510 141360 : if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
1511 960 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1512 140400 : }
1513 141360 : }
1514 :
1515 : /// Remove cached relation size
1516 1 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1517 1 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1518 1 : if rel_size_cache.remove(tag).is_some() {
1519 1 : RELSIZE_LATEST_CACHE_ENTRIES.dec();
1520 1 : }
1521 1 : }
1522 : }
1523 :
1524 : /// DatadirModification represents an operation to ingest an atomic set of
1525 : /// updates to the repository.
1526 : ///
1527 : /// It is created by the 'begin_record' function. It is called for each WAL
1528 : /// record, so that all the modifications by a one WAL record appear atomic.
1529 : pub struct DatadirModification<'a> {
1530 : /// The timeline this modification applies to. You can access this to
1531 : /// read the state, but note that any pending updates are *not* reflected
1532 : /// in the state in 'tline' yet.
1533 : pub tline: &'a Timeline,
1534 :
1535 : /// Current LSN of the modification
1536 : lsn: Lsn,
1537 :
1538 : // The modifications are not applied directly to the underlying key-value store.
1539 : // The put-functions add the modifications here, and they are flushed to the
1540 : // underlying key-value store by the 'finish' function.
1541 : pending_lsns: Vec<Lsn>,
1542 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1543 : pending_nblocks: i64,
1544 :
1545 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1546 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1547 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1548 :
1549 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1550 : /// which keys are stored here.
1551 : pending_data_batch: Option<SerializedValueBatch>,
1552 :
1553 : /// For special "directory" keys that store key-value maps, track the size of the map
1554 : /// if it was updated in this modification.
1555 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1556 :
1557 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1558 : pending_metadata_bytes: usize,
1559 : }
1560 :
1561 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1562 : pub enum MetricsUpdate {
1563 : /// Set the metrics to this value
1564 : Set(u64),
1565 : /// Increment the metrics by this value
1566 : Add(u64),
1567 : /// Decrement the metrics by this value
1568 : Sub(u64),
1569 : }
1570 :
1571 : impl DatadirModification<'_> {
1572 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1573 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1574 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1575 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1576 :
1577 : /// Get the current lsn
1578 1 : pub(crate) fn get_lsn(&self) -> Lsn {
1579 1 : self.lsn
1580 1 : }
1581 :
1582 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1583 0 : self.pending_data_batch
1584 0 : .as_ref()
1585 0 : .map_or(0, |b| b.buffer_size())
1586 0 : + self.pending_metadata_bytes
1587 0 : }
1588 :
1589 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1590 0 : self.pending_data_batch
1591 0 : .as_ref()
1592 0 : .is_some_and(|b| b.has_data())
1593 0 : }
1594 :
1595 : /// Returns statistics about the currently pending modifications.
1596 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1597 0 : let mut stats = DatadirModificationStats::default();
1598 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1599 0 : match value {
1600 0 : Value::Image(_) => stats.metadata_images += 1,
1601 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1602 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1603 : }
1604 : }
1605 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1606 0 : match valuemeta {
1607 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1608 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1609 0 : ValueMeta::Observed(_) => {}
1610 : }
1611 : }
1612 0 : stats
1613 0 : }
1614 :
1615 : /// Set the current lsn
1616 72929 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
1617 72929 : ensure_walingest!(
1618 : lsn >= self.lsn,
1619 : "setting an older lsn {} than {} is not allowed",
1620 : lsn,
1621 : self.lsn
1622 : );
1623 :
1624 72929 : if lsn > self.lsn {
1625 72929 : self.pending_lsns.push(self.lsn);
1626 72929 : self.lsn = lsn;
1627 72929 : }
1628 72929 : Ok(())
1629 72929 : }
1630 :
1631 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1632 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1633 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1634 : ///
1635 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1636 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1637 : /// via [`Self::get`] as soon as their record has been ingested.
1638 425371 : fn is_data_key(key: &Key) -> bool {
1639 425371 : key.is_rel_block_key() || key.is_slru_block_key()
1640 425371 : }
1641 :
1642 : /// Initialize a completely new repository.
1643 : ///
1644 : /// This inserts the directory metadata entries that are assumed to
1645 : /// always exist.
1646 112 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1647 112 : let buf = DbDirectory::ser(&DbDirectory {
1648 112 : dbdirs: HashMap::new(),
1649 112 : })?;
1650 112 : self.pending_directory_entries
1651 112 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1652 112 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1653 :
1654 112 : let buf = if self.tline.pg_version >= PgMajorVersion::PG17 {
1655 99 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1656 99 : xids: HashSet::new(),
1657 99 : })
1658 : } else {
1659 13 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1660 13 : xids: HashSet::new(),
1661 13 : })
1662 0 : }?;
1663 112 : self.pending_directory_entries
1664 112 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1665 112 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1666 :
1667 112 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1668 112 : let empty_dir = Value::Image(buf);
1669 :
1670 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1671 : // harmless but they'd just be dropped on later compaction.
1672 112 : if self.tline.tenant_shard_id.is_shard_zero() {
1673 109 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1674 109 : self.pending_directory_entries.push((
1675 109 : DirectoryKind::SlruSegment(SlruKind::Clog),
1676 109 : MetricsUpdate::Set(0),
1677 109 : ));
1678 109 : self.put(
1679 109 : slru_dir_to_key(SlruKind::MultiXactMembers),
1680 109 : empty_dir.clone(),
1681 109 : );
1682 109 : self.pending_directory_entries.push((
1683 109 : DirectoryKind::SlruSegment(SlruKind::Clog),
1684 109 : MetricsUpdate::Set(0),
1685 109 : ));
1686 109 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1687 109 : self.pending_directory_entries.push((
1688 109 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1689 109 : MetricsUpdate::Set(0),
1690 109 : ));
1691 109 : }
1692 :
1693 112 : Ok(())
1694 112 : }
1695 :
1696 : #[cfg(test)]
1697 111 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1698 111 : self.init_empty()?;
1699 111 : self.put_control_file(bytes::Bytes::from_static(
1700 111 : b"control_file contents do not matter",
1701 : ))
1702 111 : .context("put_control_file")?;
1703 111 : self.put_checkpoint(bytes::Bytes::from_static(
1704 111 : b"checkpoint_file contents do not matter",
1705 : ))
1706 111 : .context("put_checkpoint_file")?;
1707 111 : Ok(())
1708 111 : }
1709 :
1710 : /// Creates a relation if it is not already present.
1711 : /// Returns the current size of the relation
1712 209028 : pub(crate) async fn create_relation_if_required(
1713 209028 : &mut self,
1714 209028 : rel: RelTag,
1715 209028 : ctx: &RequestContext,
1716 209028 : ) -> Result<u32, WalIngestError> {
1717 : // Get current size and put rel creation if rel doesn't exist
1718 : //
1719 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1720 : // check the cache too. This is because eagerly checking the cache results in
1721 : // less work overall and 10% better performance. It's more work on cache miss
1722 : // but cache miss is rare.
1723 209028 : if let Some(nblocks) = self
1724 209028 : .tline
1725 209028 : .get_cached_rel_size(&rel, Version::Modified(self))
1726 : {
1727 209023 : Ok(nblocks)
1728 5 : } else if !self
1729 5 : .tline
1730 5 : .get_rel_exists(rel, Version::Modified(self), ctx)
1731 5 : .await?
1732 : {
1733 : // create it with 0 size initially, the logic below will extend it
1734 5 : self.put_rel_creation(rel, 0, ctx).await?;
1735 5 : Ok(0)
1736 : } else {
1737 0 : Ok(self
1738 0 : .tline
1739 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1740 0 : .await?)
1741 : }
1742 209028 : }
1743 :
1744 : /// Given a block number for a relation (which represents a newly written block),
1745 : /// the previous block count of the relation, and the shard info, find the gaps
1746 : /// that were created by the newly written block if any.
1747 72835 : fn find_gaps(
1748 72835 : rel: RelTag,
1749 72835 : blkno: u32,
1750 72835 : previous_nblocks: u32,
1751 72835 : shard: &ShardIdentity,
1752 72835 : ) -> Option<KeySpace> {
1753 72835 : let mut key = rel_block_to_key(rel, blkno);
1754 72835 : let mut gap_accum = None;
1755 :
1756 72835 : for gap_blkno in previous_nblocks..blkno {
1757 16 : key.field6 = gap_blkno;
1758 :
1759 16 : if shard.get_shard_number(&key) != shard.number {
1760 4 : continue;
1761 12 : }
1762 :
1763 12 : gap_accum
1764 12 : .get_or_insert_with(KeySpaceAccum::new)
1765 12 : .add_key(key);
1766 : }
1767 :
1768 72835 : gap_accum.map(|accum| accum.to_keyspace())
1769 72835 : }
1770 :
1771 72926 : pub async fn ingest_batch(
1772 72926 : &mut self,
1773 72926 : mut batch: SerializedValueBatch,
1774 72926 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1775 72926 : shard: &ShardIdentity,
1776 72926 : ctx: &RequestContext,
1777 72926 : ) -> Result<(), WalIngestError> {
1778 72926 : let mut gaps_at_lsns = Vec::default();
1779 :
1780 72926 : for meta in batch.metadata.iter() {
1781 72821 : let key = Key::from_compact(meta.key());
1782 72821 : let (rel, blkno) = key
1783 72821 : .to_rel_block()
1784 72821 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
1785 72821 : let new_nblocks = blkno + 1;
1786 :
1787 72821 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1788 72821 : if new_nblocks > old_nblocks {
1789 1195 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1790 71626 : }
1791 :
1792 72821 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1793 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1794 72821 : }
1795 : }
1796 :
1797 72926 : if !gaps_at_lsns.is_empty() {
1798 0 : batch.zero_gaps(gaps_at_lsns);
1799 72926 : }
1800 :
1801 72926 : match self.pending_data_batch.as_mut() {
1802 10 : Some(pending_batch) => {
1803 10 : pending_batch.extend(batch);
1804 10 : }
1805 72916 : None if batch.has_data() => {
1806 72815 : self.pending_data_batch = Some(batch);
1807 72815 : }
1808 101 : None => {
1809 101 : // Nothing to initialize the batch with
1810 101 : }
1811 : }
1812 :
1813 72926 : Ok(())
1814 72926 : }
1815 :
1816 : /// Put a new page version that can be constructed from a WAL record
1817 : ///
1818 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1819 : /// current end-of-file. It's up to the caller to check that the relation size
1820 : /// matches the blocks inserted!
1821 6 : pub fn put_rel_wal_record(
1822 6 : &mut self,
1823 6 : rel: RelTag,
1824 6 : blknum: BlockNumber,
1825 6 : rec: NeonWalRecord,
1826 6 : ) -> Result<(), WalIngestError> {
1827 6 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1828 6 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1829 6 : Ok(())
1830 6 : }
1831 :
1832 : // Same, but for an SLRU.
1833 4 : pub fn put_slru_wal_record(
1834 4 : &mut self,
1835 4 : kind: SlruKind,
1836 4 : segno: u32,
1837 4 : blknum: BlockNumber,
1838 4 : rec: NeonWalRecord,
1839 4 : ) -> Result<(), WalIngestError> {
1840 4 : if !self.tline.tenant_shard_id.is_shard_zero() {
1841 0 : return Ok(());
1842 4 : }
1843 :
1844 4 : self.put(
1845 4 : slru_block_to_key(kind, segno, blknum),
1846 4 : Value::WalRecord(rec),
1847 : );
1848 4 : Ok(())
1849 4 : }
1850 :
1851 : /// Like put_wal_record, but with ready-made image of the page.
1852 138921 : pub fn put_rel_page_image(
1853 138921 : &mut self,
1854 138921 : rel: RelTag,
1855 138921 : blknum: BlockNumber,
1856 138921 : img: Bytes,
1857 138921 : ) -> Result<(), WalIngestError> {
1858 138921 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1859 138921 : let key = rel_block_to_key(rel, blknum);
1860 138921 : if !key.is_valid_key_on_write_path() {
1861 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1862 138921 : }
1863 138921 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1864 138921 : Ok(())
1865 138921 : }
1866 :
1867 3 : pub fn put_slru_page_image(
1868 3 : &mut self,
1869 3 : kind: SlruKind,
1870 3 : segno: u32,
1871 3 : blknum: BlockNumber,
1872 3 : img: Bytes,
1873 3 : ) -> Result<(), WalIngestError> {
1874 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1875 :
1876 3 : let key = slru_block_to_key(kind, segno, blknum);
1877 3 : if !key.is_valid_key_on_write_path() {
1878 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1879 3 : }
1880 3 : self.put(key, Value::Image(img));
1881 3 : Ok(())
1882 3 : }
1883 :
1884 1499 : pub(crate) fn put_rel_page_image_zero(
1885 1499 : &mut self,
1886 1499 : rel: RelTag,
1887 1499 : blknum: BlockNumber,
1888 1499 : ) -> Result<(), WalIngestError> {
1889 1499 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1890 1499 : let key = rel_block_to_key(rel, blknum);
1891 1499 : if !key.is_valid_key_on_write_path() {
1892 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1893 1499 : }
1894 :
1895 1499 : let batch = self
1896 1499 : .pending_data_batch
1897 1499 : .get_or_insert_with(SerializedValueBatch::default);
1898 :
1899 1499 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1900 :
1901 1499 : Ok(())
1902 1499 : }
1903 :
1904 0 : pub(crate) fn put_slru_page_image_zero(
1905 0 : &mut self,
1906 0 : kind: SlruKind,
1907 0 : segno: u32,
1908 0 : blknum: BlockNumber,
1909 0 : ) -> Result<(), WalIngestError> {
1910 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1911 0 : let key = slru_block_to_key(kind, segno, blknum);
1912 0 : if !key.is_valid_key_on_write_path() {
1913 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1914 0 : }
1915 :
1916 0 : let batch = self
1917 0 : .pending_data_batch
1918 0 : .get_or_insert_with(SerializedValueBatch::default);
1919 :
1920 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1921 :
1922 0 : Ok(())
1923 0 : }
1924 :
1925 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
1926 : /// we enable it, we also need to persist it in `index_part.json`.
1927 973 : pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
1928 973 : let status = self.tline.get_rel_size_v2_status();
1929 973 : let config = self.tline.get_rel_size_v2_enabled();
1930 973 : match (config, status) {
1931 : (false, RelSizeMigration::Legacy) => {
1932 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
1933 973 : Ok(false)
1934 : }
1935 : (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1936 : // index_part already persisted that the timeline has enabled rel_size_v2
1937 0 : Ok(true)
1938 : }
1939 : (true, RelSizeMigration::Legacy) => {
1940 : // The first time we enable it, we need to persist it in `index_part.json`
1941 0 : self.tline
1942 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
1943 0 : tracing::info!("enabled rel_size_v2");
1944 0 : Ok(true)
1945 : }
1946 : (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1947 : // index_part already persisted that the timeline has enabled rel_size_v2
1948 : // and we don't need to do anything
1949 0 : Ok(true)
1950 : }
1951 : }
1952 973 : }
1953 :
1954 : /// Store a relmapper file (pg_filenode.map) in the repository
1955 8 : pub async fn put_relmap_file(
1956 8 : &mut self,
1957 8 : spcnode: Oid,
1958 8 : dbnode: Oid,
1959 8 : img: Bytes,
1960 8 : ctx: &RequestContext,
1961 8 : ) -> Result<(), WalIngestError> {
1962 8 : let v2_enabled = self
1963 8 : .maybe_enable_rel_size_v2()
1964 8 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
1965 :
1966 : // Add it to the directory (if it doesn't exist already)
1967 8 : let buf = self.get(DBDIR_KEY, ctx).await?;
1968 8 : let mut dbdir = DbDirectory::des(&buf)?;
1969 :
1970 8 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1971 8 : if r.is_none() || r == Some(false) {
1972 : // The dbdir entry didn't exist, or it contained a
1973 : // 'false'. The 'insert' call already updated it with
1974 : // 'true', now write the updated 'dbdirs' map back.
1975 8 : let buf = DbDirectory::ser(&dbdir)?;
1976 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1977 0 : }
1978 8 : if r.is_none() {
1979 : // Create RelDirectory
1980 : // TODO: if we have fully migrated to v2, no need to create this directory
1981 4 : let buf = RelDirectory::ser(&RelDirectory {
1982 4 : rels: HashSet::new(),
1983 4 : })?;
1984 4 : self.pending_directory_entries
1985 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1986 4 : if v2_enabled {
1987 0 : self.pending_directory_entries
1988 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1989 4 : }
1990 4 : self.put(
1991 4 : rel_dir_to_key(spcnode, dbnode),
1992 4 : Value::Image(Bytes::from(buf)),
1993 : );
1994 4 : }
1995 :
1996 8 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1997 8 : Ok(())
1998 8 : }
1999 :
2000 0 : pub async fn put_twophase_file(
2001 0 : &mut self,
2002 0 : xid: u64,
2003 0 : img: Bytes,
2004 0 : ctx: &RequestContext,
2005 0 : ) -> Result<(), WalIngestError> {
2006 : // Add it to the directory entry
2007 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2008 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2009 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
2010 0 : if !dir.xids.insert(xid) {
2011 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
2012 0 : }
2013 0 : self.pending_directory_entries.push((
2014 0 : DirectoryKind::TwoPhase,
2015 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2016 0 : ));
2017 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2018 : } else {
2019 0 : let xid = xid as u32;
2020 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
2021 0 : if !dir.xids.insert(xid) {
2022 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
2023 0 : }
2024 0 : self.pending_directory_entries.push((
2025 0 : DirectoryKind::TwoPhase,
2026 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2027 0 : ));
2028 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2029 : };
2030 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2031 :
2032 0 : self.put(twophase_file_key(xid), Value::Image(img));
2033 0 : Ok(())
2034 0 : }
2035 :
2036 1 : pub async fn set_replorigin(
2037 1 : &mut self,
2038 1 : origin_id: RepOriginId,
2039 1 : origin_lsn: Lsn,
2040 1 : ) -> Result<(), WalIngestError> {
2041 1 : let key = repl_origin_key(origin_id);
2042 1 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
2043 1 : Ok(())
2044 1 : }
2045 :
2046 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
2047 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
2048 0 : }
2049 :
2050 112 : pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2051 112 : self.put(CONTROLFILE_KEY, Value::Image(img));
2052 112 : Ok(())
2053 112 : }
2054 :
2055 119 : pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2056 119 : self.put(CHECKPOINT_KEY, Value::Image(img));
2057 119 : Ok(())
2058 119 : }
2059 :
2060 0 : pub async fn drop_dbdir(
2061 0 : &mut self,
2062 0 : spcnode: Oid,
2063 0 : dbnode: Oid,
2064 0 : ctx: &RequestContext,
2065 0 : ) -> Result<(), WalIngestError> {
2066 0 : let total_blocks = self
2067 0 : .tline
2068 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
2069 0 : .await?;
2070 :
2071 : // Remove entry from dbdir
2072 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
2073 0 : let mut dir = DbDirectory::des(&buf)?;
2074 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
2075 0 : let buf = DbDirectory::ser(&dir)?;
2076 0 : self.pending_directory_entries.push((
2077 0 : DirectoryKind::Db,
2078 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
2079 0 : ));
2080 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2081 : } else {
2082 0 : warn!(
2083 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
2084 : spcnode, dbnode
2085 : );
2086 : }
2087 :
2088 : // Update logical database size.
2089 0 : self.pending_nblocks -= total_blocks as i64;
2090 :
2091 : // Delete all relations and metadata files for the spcnode/dnode
2092 0 : self.delete(dbdir_key_range(spcnode, dbnode));
2093 0 : Ok(())
2094 0 : }
2095 :
2096 : /// Create a relation fork.
2097 : ///
2098 : /// 'nblocks' is the initial size.
2099 960 : pub async fn put_rel_creation(
2100 960 : &mut self,
2101 960 : rel: RelTag,
2102 960 : nblocks: BlockNumber,
2103 960 : ctx: &RequestContext,
2104 960 : ) -> Result<(), WalIngestError> {
2105 960 : if rel.relnode == 0 {
2106 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
2107 0 : "invalid relnode"
2108 0 : )))?;
2109 960 : }
2110 : // It's possible that this is the first rel for this db in this
2111 : // tablespace. Create the reldir entry for it if so.
2112 960 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
2113 :
2114 960 : let dbdir_exists =
2115 960 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
2116 : // Didn't exist. Update dbdir
2117 4 : e.insert(false);
2118 4 : let buf = DbDirectory::ser(&dbdir)?;
2119 4 : self.pending_directory_entries.push((
2120 4 : DirectoryKind::Db,
2121 4 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
2122 4 : ));
2123 4 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2124 4 : false
2125 : } else {
2126 956 : true
2127 : };
2128 :
2129 960 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
2130 960 : let mut rel_dir = if !dbdir_exists {
2131 : // Create the RelDirectory
2132 4 : RelDirectory::default()
2133 : } else {
2134 : // reldir already exists, fetch it
2135 956 : RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
2136 : };
2137 :
2138 960 : let v2_enabled = self
2139 960 : .maybe_enable_rel_size_v2()
2140 960 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2141 :
2142 960 : if v2_enabled {
2143 0 : if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
2144 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2145 0 : }
2146 0 : let sparse_rel_dir_key =
2147 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2148 : // check if the rel_dir_key exists in v2
2149 0 : let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
2150 0 : let val = RelDirExists::decode_option(val)
2151 0 : .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
2152 0 : if val == RelDirExists::Exists {
2153 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2154 0 : }
2155 0 : self.put(
2156 0 : sparse_rel_dir_key,
2157 0 : Value::Image(RelDirExists::Exists.encode()),
2158 : );
2159 0 : if !dbdir_exists {
2160 0 : self.pending_directory_entries
2161 0 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2162 0 : self.pending_directory_entries
2163 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2164 : // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
2165 : // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
2166 : // will be key not found errors if we don't create an empty one for rel_size_v2.
2167 0 : self.put(
2168 0 : rel_dir_key,
2169 0 : Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
2170 : );
2171 0 : }
2172 0 : self.pending_directory_entries
2173 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2174 : } else {
2175 : // Add the new relation to the rel directory entry, and write it back
2176 960 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2177 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2178 960 : }
2179 960 : if !dbdir_exists {
2180 4 : self.pending_directory_entries
2181 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2182 956 : }
2183 960 : self.pending_directory_entries
2184 960 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2185 960 : self.put(
2186 960 : rel_dir_key,
2187 960 : Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
2188 : );
2189 : }
2190 :
2191 : // Put size
2192 960 : let size_key = rel_size_to_key(rel);
2193 960 : let buf = nblocks.to_le_bytes();
2194 960 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2195 :
2196 960 : self.pending_nblocks += nblocks as i64;
2197 :
2198 : // Update relation size cache
2199 960 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2200 :
2201 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2202 : // caller.
2203 960 : Ok(())
2204 960 : }
2205 :
2206 : /// Truncate relation
2207 3006 : pub async fn put_rel_truncation(
2208 3006 : &mut self,
2209 3006 : rel: RelTag,
2210 3006 : nblocks: BlockNumber,
2211 3006 : ctx: &RequestContext,
2212 3006 : ) -> Result<(), WalIngestError> {
2213 3006 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2214 3006 : if self
2215 3006 : .tline
2216 3006 : .get_rel_exists(rel, Version::Modified(self), ctx)
2217 3006 : .await?
2218 : {
2219 3006 : let size_key = rel_size_to_key(rel);
2220 : // Fetch the old size first
2221 3006 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2222 :
2223 : // Update the entry with the new size.
2224 3006 : let buf = nblocks.to_le_bytes();
2225 3006 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2226 :
2227 : // Update relation size cache
2228 3006 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2229 :
2230 : // Update logical database size.
2231 3006 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2232 0 : }
2233 3006 : Ok(())
2234 3006 : }
2235 :
2236 : /// Extend relation
2237 : /// If new size is smaller, do nothing.
2238 138340 : pub async fn put_rel_extend(
2239 138340 : &mut self,
2240 138340 : rel: RelTag,
2241 138340 : nblocks: BlockNumber,
2242 138340 : ctx: &RequestContext,
2243 138340 : ) -> Result<(), WalIngestError> {
2244 138340 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2245 :
2246 : // Put size
2247 138340 : let size_key = rel_size_to_key(rel);
2248 138340 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2249 :
2250 : // only extend relation here. never decrease the size
2251 138340 : if nblocks > old_size {
2252 137394 : let buf = nblocks.to_le_bytes();
2253 137394 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2254 137394 :
2255 137394 : // Update relation size cache
2256 137394 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2257 137394 :
2258 137394 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2259 137394 : }
2260 138340 : Ok(())
2261 138340 : }
2262 :
2263 : /// Drop some relations
2264 5 : pub(crate) async fn put_rel_drops(
2265 5 : &mut self,
2266 5 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2267 5 : ctx: &RequestContext,
2268 5 : ) -> Result<(), WalIngestError> {
2269 5 : let v2_enabled = self
2270 5 : .maybe_enable_rel_size_v2()
2271 5 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2272 6 : for ((spc_node, db_node), rel_tags) in drop_relations {
2273 1 : let dir_key = rel_dir_to_key(spc_node, db_node);
2274 1 : let buf = self.get(dir_key, ctx).await?;
2275 1 : let mut dir = RelDirectory::des(&buf)?;
2276 :
2277 1 : let mut dirty = false;
2278 2 : for rel_tag in rel_tags {
2279 1 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2280 1 : self.pending_directory_entries
2281 1 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2282 1 : dirty = true;
2283 1 : true
2284 0 : } else if v2_enabled {
2285 : // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
2286 : // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
2287 : // logic).
2288 0 : let key =
2289 0 : rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2290 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2291 0 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2292 0 : if val == RelDirExists::Exists {
2293 0 : self.pending_directory_entries
2294 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2295 : // put tombstone
2296 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2297 : // no need to set dirty to true
2298 0 : true
2299 : } else {
2300 0 : false
2301 : }
2302 : } else {
2303 0 : false
2304 : };
2305 :
2306 1 : if found {
2307 : // update logical size
2308 1 : let size_key = rel_size_to_key(rel_tag);
2309 1 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2310 1 : self.pending_nblocks -= old_size as i64;
2311 :
2312 : // Remove entry from relation size cache
2313 1 : self.tline.remove_cached_rel_size(&rel_tag);
2314 :
2315 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2316 1 : self.delete(rel_key_range(rel_tag));
2317 0 : }
2318 : }
2319 :
2320 1 : if dirty {
2321 1 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2322 0 : }
2323 : }
2324 :
2325 5 : Ok(())
2326 5 : }
2327 :
2328 3 : pub async fn put_slru_segment_creation(
2329 3 : &mut self,
2330 3 : kind: SlruKind,
2331 3 : segno: u32,
2332 3 : nblocks: BlockNumber,
2333 3 : ctx: &RequestContext,
2334 3 : ) -> Result<(), WalIngestError> {
2335 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2336 :
2337 : // Add it to the directory entry
2338 3 : let dir_key = slru_dir_to_key(kind);
2339 3 : let buf = self.get(dir_key, ctx).await?;
2340 3 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2341 :
2342 3 : if !dir.segments.insert(segno) {
2343 0 : Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
2344 3 : }
2345 3 : self.pending_directory_entries.push((
2346 3 : DirectoryKind::SlruSegment(kind),
2347 3 : MetricsUpdate::Set(dir.segments.len() as u64),
2348 3 : ));
2349 3 : self.put(
2350 3 : dir_key,
2351 3 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2352 : );
2353 :
2354 : // Put size
2355 3 : let size_key = slru_segment_size_to_key(kind, segno);
2356 3 : let buf = nblocks.to_le_bytes();
2357 3 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2358 :
2359 : // even if nblocks > 0, we don't insert any actual blocks here
2360 :
2361 3 : Ok(())
2362 3 : }
2363 :
2364 : /// Extend SLRU segment
2365 0 : pub fn put_slru_extend(
2366 0 : &mut self,
2367 0 : kind: SlruKind,
2368 0 : segno: u32,
2369 0 : nblocks: BlockNumber,
2370 0 : ) -> Result<(), WalIngestError> {
2371 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2372 :
2373 : // Put size
2374 0 : let size_key = slru_segment_size_to_key(kind, segno);
2375 0 : let buf = nblocks.to_le_bytes();
2376 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2377 0 : Ok(())
2378 0 : }
2379 :
2380 : /// This method is used for marking truncated SLRU files
2381 0 : pub async fn drop_slru_segment(
2382 0 : &mut self,
2383 0 : kind: SlruKind,
2384 0 : segno: u32,
2385 0 : ctx: &RequestContext,
2386 0 : ) -> Result<(), WalIngestError> {
2387 : // Remove it from the directory entry
2388 0 : let dir_key = slru_dir_to_key(kind);
2389 0 : let buf = self.get(dir_key, ctx).await?;
2390 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2391 :
2392 0 : if !dir.segments.remove(&segno) {
2393 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2394 0 : }
2395 0 : self.pending_directory_entries.push((
2396 0 : DirectoryKind::SlruSegment(kind),
2397 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2398 0 : ));
2399 0 : self.put(
2400 0 : dir_key,
2401 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2402 : );
2403 :
2404 : // Delete size entry, as well as all blocks
2405 0 : self.delete(slru_segment_key_range(kind, segno));
2406 :
2407 0 : Ok(())
2408 0 : }
2409 :
2410 : /// Drop a relmapper file (pg_filenode.map)
2411 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
2412 : // TODO
2413 0 : Ok(())
2414 0 : }
2415 :
2416 : /// This method is used for marking truncated SLRU files
2417 0 : pub async fn drop_twophase_file(
2418 0 : &mut self,
2419 0 : xid: u64,
2420 0 : ctx: &RequestContext,
2421 0 : ) -> Result<(), WalIngestError> {
2422 : // Remove it from the directory entry
2423 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2424 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2425 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2426 :
2427 0 : if !dir.xids.remove(&xid) {
2428 0 : warn!("twophase file for xid {} does not exist", xid);
2429 0 : }
2430 0 : self.pending_directory_entries.push((
2431 0 : DirectoryKind::TwoPhase,
2432 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2433 0 : ));
2434 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2435 : } else {
2436 0 : let xid: u32 = u32::try_from(xid)
2437 0 : .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
2438 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2439 :
2440 0 : if !dir.xids.remove(&xid) {
2441 0 : warn!("twophase file for xid {} does not exist", xid);
2442 0 : }
2443 0 : self.pending_directory_entries.push((
2444 0 : DirectoryKind::TwoPhase,
2445 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2446 0 : ));
2447 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2448 : };
2449 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2450 :
2451 : // Delete it
2452 0 : self.delete(twophase_key_range(xid));
2453 :
2454 0 : Ok(())
2455 0 : }
2456 :
2457 8 : pub async fn put_file(
2458 8 : &mut self,
2459 8 : path: &str,
2460 8 : content: &[u8],
2461 8 : ctx: &RequestContext,
2462 8 : ) -> Result<(), WalIngestError> {
2463 8 : let key = aux_file::encode_aux_file_key(path);
2464 : // retrieve the key from the engine
2465 8 : let old_val = match self.get(key, ctx).await {
2466 2 : Ok(val) => Some(val),
2467 6 : Err(PageReconstructError::MissingKey(_)) => None,
2468 0 : Err(e) => return Err(e.into()),
2469 : };
2470 8 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2471 2 : aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
2472 : } else {
2473 6 : Vec::new()
2474 : };
2475 8 : let mut other_files = Vec::with_capacity(files.len());
2476 8 : let mut modifying_file = None;
2477 10 : for file @ (p, content) in files {
2478 2 : if path == p {
2479 2 : assert!(
2480 2 : modifying_file.is_none(),
2481 0 : "duplicated entries found for {path}"
2482 : );
2483 2 : modifying_file = Some(content);
2484 0 : } else {
2485 0 : other_files.push(file);
2486 0 : }
2487 : }
2488 8 : let mut new_files = other_files;
2489 8 : match (modifying_file, content.is_empty()) {
2490 1 : (Some(old_content), false) => {
2491 1 : self.tline
2492 1 : .aux_file_size_estimator
2493 1 : .on_update(old_content.len(), content.len());
2494 1 : new_files.push((path, content));
2495 1 : }
2496 1 : (Some(old_content), true) => {
2497 1 : self.tline
2498 1 : .aux_file_size_estimator
2499 1 : .on_remove(old_content.len());
2500 1 : // not adding the file key to the final `new_files` vec.
2501 1 : }
2502 6 : (None, false) => {
2503 6 : self.tline.aux_file_size_estimator.on_add(content.len());
2504 6 : new_files.push((path, content));
2505 6 : }
2506 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2507 : // Compute doesn't know if previous version of this file exists or not, so
2508 : // attempt to delete non-existing file can cause this message.
2509 : // To avoid false alarms, log it as info rather than warning.
2510 0 : (None, true) if path.starts_with("pg_stat/") => {
2511 0 : info!("removing non-existing pg_stat file: {}", path)
2512 : }
2513 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2514 : }
2515 8 : let new_val = aux_file::encode_file_value(&new_files)
2516 8 : .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
2517 8 : self.put(key, Value::Image(new_val.into()));
2518 :
2519 8 : Ok(())
2520 8 : }
2521 :
2522 : ///
2523 : /// Flush changes accumulated so far to the underlying repository.
2524 : ///
2525 : /// Usually, changes made in DatadirModification are atomic, but this allows
2526 : /// you to flush them to the underlying repository before the final `commit`.
2527 : /// That allows to free up the memory used to hold the pending changes.
2528 : ///
2529 : /// Currently only used during bulk import of a data directory. In that
2530 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2531 : /// whole import fails and the timeline will be deleted anyway.
2532 : /// (Or to be precise, it will be left behind for debugging purposes and
2533 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2534 : ///
2535 : /// Note: A consequence of flushing the pending operations is that they
2536 : /// won't be visible to subsequent operations until `commit`. The function
2537 : /// retains all the metadata, but data pages are flushed. That's again OK
2538 : /// for bulk import, where you are just loading data pages and won't try to
2539 : /// modify the same pages twice.
2540 965 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2541 : // Unless we have accumulated a decent amount of changes, it's not worth it
2542 : // to scan through the pending_updates list.
2543 965 : let pending_nblocks = self.pending_nblocks;
2544 965 : if pending_nblocks < 10000 {
2545 965 : return Ok(());
2546 0 : }
2547 :
2548 0 : let mut writer = self.tline.writer().await;
2549 :
2550 : // Flush relation and SLRU data blocks, keep metadata.
2551 0 : if let Some(batch) = self.pending_data_batch.take() {
2552 0 : tracing::debug!(
2553 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2554 : batch.max_lsn,
2555 0 : self.tline.get_last_record_lsn()
2556 : );
2557 :
2558 : // This bails out on first error without modifying pending_updates.
2559 : // That's Ok, cf this function's doc comment.
2560 0 : writer.put_batch(batch, ctx).await?;
2561 0 : }
2562 :
2563 0 : if pending_nblocks != 0 {
2564 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2565 0 : self.pending_nblocks = 0;
2566 0 : }
2567 :
2568 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2569 0 : writer.update_directory_entries_count(kind, count);
2570 0 : }
2571 :
2572 0 : Ok(())
2573 965 : }
2574 :
2575 : ///
2576 : /// Finish this atomic update, writing all the updated keys to the
2577 : /// underlying timeline.
2578 : /// All the modifications in this atomic update are stamped by the specified LSN.
2579 : ///
2580 371557 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2581 371557 : let mut writer = self.tline.writer().await;
2582 :
2583 371557 : let pending_nblocks = self.pending_nblocks;
2584 371557 : self.pending_nblocks = 0;
2585 :
2586 : // Ordering: the items in this batch do not need to be in any global order, but values for
2587 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2588 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2589 : // more details.
2590 :
2591 371557 : let metadata_batch = {
2592 371557 : let pending_meta = self
2593 371557 : .pending_metadata_pages
2594 371557 : .drain()
2595 371557 : .flat_map(|(key, values)| {
2596 137068 : values
2597 137068 : .into_iter()
2598 137068 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2599 137068 : })
2600 371557 : .collect::<Vec<_>>();
2601 :
2602 371557 : if pending_meta.is_empty() {
2603 236139 : None
2604 : } else {
2605 135418 : Some(SerializedValueBatch::from_values(pending_meta))
2606 : }
2607 : };
2608 :
2609 371557 : let data_batch = self.pending_data_batch.take();
2610 :
2611 371557 : let maybe_batch = match (data_batch, metadata_batch) {
2612 132278 : (Some(mut data), Some(metadata)) => {
2613 132278 : data.extend(metadata);
2614 132278 : Some(data)
2615 : }
2616 71631 : (Some(data), None) => Some(data),
2617 3140 : (None, Some(metadata)) => Some(metadata),
2618 164508 : (None, None) => None,
2619 : };
2620 :
2621 371557 : if let Some(batch) = maybe_batch {
2622 207049 : tracing::debug!(
2623 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2624 : batch.max_lsn,
2625 0 : self.tline.get_last_record_lsn()
2626 : );
2627 :
2628 : // This bails out on first error without modifying pending_updates.
2629 : // That's Ok, cf this function's doc comment.
2630 207049 : writer.put_batch(batch, ctx).await?;
2631 164508 : }
2632 :
2633 371557 : if !self.pending_deletions.is_empty() {
2634 1 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2635 1 : self.pending_deletions.clear();
2636 371556 : }
2637 :
2638 371557 : self.pending_lsns.push(self.lsn);
2639 444486 : for pending_lsn in self.pending_lsns.drain(..) {
2640 444486 : // TODO(vlad): pretty sure the comment below is not valid anymore
2641 444486 : // and we can call finish write with the latest LSN
2642 444486 : //
2643 444486 : // Ideally, we should be able to call writer.finish_write() only once
2644 444486 : // with the highest LSN. However, the last_record_lsn variable in the
2645 444486 : // timeline keeps track of the latest LSN and the immediate previous LSN
2646 444486 : // so we need to record every LSN to not leave a gap between them.
2647 444486 : writer.finish_write(pending_lsn);
2648 444486 : }
2649 :
2650 371557 : if pending_nblocks != 0 {
2651 135285 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2652 236272 : }
2653 :
2654 371557 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2655 1527 : writer.update_directory_entries_count(kind, count);
2656 1527 : }
2657 :
2658 371557 : self.pending_metadata_bytes = 0;
2659 :
2660 371557 : Ok(())
2661 371557 : }
2662 :
2663 145852 : pub(crate) fn len(&self) -> usize {
2664 145852 : self.pending_metadata_pages.len()
2665 145852 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2666 145852 : + self.pending_deletions.len()
2667 145852 : }
2668 :
2669 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2670 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2671 : /// in the modification.
2672 : ///
2673 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2674 : /// page must ensure that the pages they read are already committed in Timeline, for example
2675 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2676 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2677 : /// and not data pages.
2678 143293 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2679 143293 : if !Self::is_data_key(&key) {
2680 : // Have we already updated the same key? Read the latest pending updated
2681 : // version in that case.
2682 : //
2683 : // Note: we don't check pending_deletions. It is an error to request a
2684 : // value that has been removed, deletion only avoids leaking storage.
2685 143293 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2686 7964 : if let Some((_, _, value)) = values.last() {
2687 7964 : return if let Value::Image(img) = value {
2688 7964 : Ok(img.clone())
2689 : } else {
2690 : // Currently, we never need to read back a WAL record that we
2691 : // inserted in the same "transaction". All the metadata updates
2692 : // work directly with Images, and we never need to read actual
2693 : // data pages. We could handle this if we had to, by calling
2694 : // the walredo manager, but let's keep it simple for now.
2695 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2696 0 : "unexpected pending WAL record"
2697 0 : )))
2698 : };
2699 0 : }
2700 135329 : }
2701 : } else {
2702 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2703 : // this key should never be present in pending_data_pages. We ensure this by committing
2704 : // modifications before ingesting DB create operations, which are the only kind that reads
2705 : // data pages during ingest.
2706 0 : if cfg!(debug_assertions) {
2707 0 : assert!(
2708 0 : !self
2709 0 : .pending_data_batch
2710 0 : .as_ref()
2711 0 : .is_some_and(|b| b.updates_key(&key))
2712 : );
2713 0 : }
2714 : }
2715 :
2716 : // Metadata page cache miss, or we're reading a data page.
2717 135329 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2718 135329 : self.tline.get(key, lsn, ctx).await
2719 143293 : }
2720 :
2721 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2722 : /// and the empty value into None.
2723 0 : async fn sparse_get(
2724 0 : &self,
2725 0 : key: Key,
2726 0 : ctx: &RequestContext,
2727 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2728 0 : let val = self.get(key, ctx).await;
2729 0 : match val {
2730 0 : Ok(val) if val.is_empty() => Ok(None),
2731 0 : Ok(val) => Ok(Some(val)),
2732 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2733 0 : Err(e) => Err(e),
2734 : }
2735 0 : }
2736 :
2737 : #[cfg(test)]
2738 2 : pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
2739 2 : self.put(key, val);
2740 2 : }
2741 :
2742 282078 : fn put(&mut self, key: Key, val: Value) {
2743 282078 : if Self::is_data_key(&key) {
2744 138934 : self.put_data(key.to_compact(), val)
2745 : } else {
2746 143144 : self.put_metadata(key.to_compact(), val)
2747 : }
2748 282078 : }
2749 :
2750 138934 : fn put_data(&mut self, key: CompactKey, val: Value) {
2751 138934 : let batch = self
2752 138934 : .pending_data_batch
2753 138934 : .get_or_insert_with(SerializedValueBatch::default);
2754 138934 : batch.put(key, val, self.lsn);
2755 138934 : }
2756 :
2757 143144 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2758 143144 : let values = self.pending_metadata_pages.entry(key).or_default();
2759 : // Replace the previous value if it exists at the same lsn
2760 143144 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2761 6076 : if *last_lsn == self.lsn {
2762 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2763 6076 : self.pending_metadata_bytes -= *last_value_ser_size;
2764 6076 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2765 6076 : self.pending_metadata_bytes += *last_value_ser_size;
2766 :
2767 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2768 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2769 6076 : *last_value = val;
2770 6076 : return;
2771 0 : }
2772 137068 : }
2773 :
2774 137068 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2775 137068 : self.pending_metadata_bytes += val_serialized_size;
2776 137068 : values.push((self.lsn, val_serialized_size, val));
2777 :
2778 137068 : if key == CHECKPOINT_KEY.to_compact() {
2779 119 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2780 136949 : }
2781 143144 : }
2782 :
2783 1 : fn delete(&mut self, key_range: Range<Key>) {
2784 1 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2785 1 : self.pending_deletions.push((key_range, self.lsn));
2786 1 : }
2787 : }
2788 :
2789 : /// Statistics for a DatadirModification.
2790 : #[derive(Default)]
2791 : pub struct DatadirModificationStats {
2792 : pub metadata_images: u64,
2793 : pub metadata_deltas: u64,
2794 : pub data_images: u64,
2795 : pub data_deltas: u64,
2796 : }
2797 :
2798 : /// This struct facilitates accessing either a committed key from the timeline at a
2799 : /// specific LSN, or the latest uncommitted key from a pending modification.
2800 : ///
2801 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2802 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2803 : /// need to look up the keys in the modification first before looking them up in the
2804 : /// timeline to not miss the latest updates.
2805 : #[derive(Clone, Copy)]
2806 : pub enum Version<'a> {
2807 : LsnRange(LsnRange),
2808 : Modified(&'a DatadirModification<'a>),
2809 : }
2810 :
2811 : impl Version<'_> {
2812 25 : async fn get(
2813 25 : &self,
2814 25 : timeline: &Timeline,
2815 25 : key: Key,
2816 25 : ctx: &RequestContext,
2817 25 : ) -> Result<Bytes, PageReconstructError> {
2818 25 : match self {
2819 15 : Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
2820 10 : Version::Modified(modification) => modification.get(key, ctx).await,
2821 : }
2822 25 : }
2823 :
2824 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2825 : /// and the empty value into None.
2826 0 : async fn sparse_get(
2827 0 : &self,
2828 0 : timeline: &Timeline,
2829 0 : key: Key,
2830 0 : ctx: &RequestContext,
2831 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2832 0 : let val = self.get(timeline, key, ctx).await;
2833 0 : match val {
2834 0 : Ok(val) if val.is_empty() => Ok(None),
2835 0 : Ok(val) => Ok(Some(val)),
2836 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2837 0 : Err(e) => Err(e),
2838 : }
2839 0 : }
2840 :
2841 26 : pub fn is_latest(&self) -> bool {
2842 26 : match self {
2843 16 : Version::LsnRange(lsns) => lsns.is_latest(),
2844 10 : Version::Modified(_) => true,
2845 : }
2846 26 : }
2847 :
2848 224275 : pub fn get_lsn(&self) -> Lsn {
2849 224275 : match self {
2850 12224 : Version::LsnRange(lsns) => lsns.effective_lsn,
2851 212051 : Version::Modified(modification) => modification.lsn,
2852 : }
2853 224275 : }
2854 :
2855 12219 : pub fn at(lsn: Lsn) -> Self {
2856 12219 : Version::LsnRange(LsnRange {
2857 12219 : effective_lsn: lsn,
2858 12219 : request_lsn: lsn,
2859 12219 : })
2860 12219 : }
2861 : }
2862 :
2863 : //--- Metadata structs stored in key-value pairs in the repository.
2864 :
2865 0 : #[derive(Debug, Serialize, Deserialize)]
2866 : pub(crate) struct DbDirectory {
2867 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2868 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2869 : }
2870 :
2871 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2872 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2873 : // were named like "pg_twophase/000002E5", now they're like
2874 : // "pg_twophsae/0000000A000002E4".
2875 :
2876 0 : #[derive(Debug, Serialize, Deserialize)]
2877 : pub(crate) struct TwoPhaseDirectory {
2878 : pub(crate) xids: HashSet<TransactionId>,
2879 : }
2880 :
2881 0 : #[derive(Debug, Serialize, Deserialize)]
2882 : struct TwoPhaseDirectoryV17 {
2883 : xids: HashSet<u64>,
2884 : }
2885 :
2886 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2887 : pub(crate) struct RelDirectory {
2888 : // Set of relations that exist. (relfilenode, forknum)
2889 : //
2890 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2891 : // key-value pairs, if you have a lot of relations
2892 : pub(crate) rels: HashSet<(Oid, u8)>,
2893 : }
2894 :
2895 0 : #[derive(Debug, Serialize, Deserialize)]
2896 : struct RelSizeEntry {
2897 : nblocks: u32,
2898 : }
2899 :
2900 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2901 : pub(crate) struct SlruSegmentDirectory {
2902 : // Set of SLRU segments that exist.
2903 : pub(crate) segments: HashSet<u32>,
2904 : }
2905 :
2906 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2907 : #[repr(u8)]
2908 : pub(crate) enum DirectoryKind {
2909 : Db,
2910 : TwoPhase,
2911 : Rel,
2912 : AuxFiles,
2913 : SlruSegment(SlruKind),
2914 : RelV2,
2915 : }
2916 :
2917 : impl DirectoryKind {
2918 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2919 4582 : pub(crate) fn offset(&self) -> usize {
2920 4582 : self.into_usize()
2921 4582 : }
2922 : }
2923 :
2924 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2925 :
2926 : #[allow(clippy::bool_assert_comparison)]
2927 : #[cfg(test)]
2928 : mod tests {
2929 : use hex_literal::hex;
2930 : use pageserver_api::models::ShardParameters;
2931 : use utils::id::TimelineId;
2932 : use utils::shard::{ShardCount, ShardNumber, ShardStripeSize};
2933 :
2934 : use super::*;
2935 : use crate::DEFAULT_PG_VERSION;
2936 : use crate::tenant::harness::TenantHarness;
2937 :
2938 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2939 : #[tokio::test]
2940 1 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2941 1 : let name = "aux_files_round_trip";
2942 1 : let harness = TenantHarness::create(name).await?;
2943 :
2944 : pub const TIMELINE_ID: TimelineId =
2945 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2946 :
2947 1 : let (tenant, ctx) = harness.load().await;
2948 1 : let (tline, ctx) = tenant
2949 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2950 1 : .await?;
2951 1 : let tline = tline.raw_timeline().unwrap();
2952 :
2953 : // First modification: insert two keys
2954 1 : let mut modification = tline.begin_modification(Lsn(0x1000));
2955 1 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2956 1 : modification.set_lsn(Lsn(0x1008))?;
2957 1 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2958 1 : modification.commit(&ctx).await?;
2959 1 : let expect_1008 = HashMap::from([
2960 1 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2961 1 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2962 1 : ]);
2963 :
2964 1 : let io_concurrency = IoConcurrency::spawn_for_test();
2965 :
2966 1 : let readback = tline
2967 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2968 1 : .await?;
2969 1 : assert_eq!(readback, expect_1008);
2970 :
2971 : // Second modification: update one key, remove the other
2972 1 : let mut modification = tline.begin_modification(Lsn(0x2000));
2973 1 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2974 1 : modification.set_lsn(Lsn(0x2008))?;
2975 1 : modification.put_file("foo/bar2", b"", &ctx).await?;
2976 1 : modification.commit(&ctx).await?;
2977 1 : let expect_2008 =
2978 1 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2979 :
2980 1 : let readback = tline
2981 1 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2982 1 : .await?;
2983 1 : assert_eq!(readback, expect_2008);
2984 :
2985 : // Reading back in time works
2986 1 : let readback = tline
2987 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2988 1 : .await?;
2989 1 : assert_eq!(readback, expect_1008);
2990 :
2991 2 : Ok(())
2992 1 : }
2993 :
2994 : #[test]
2995 1 : fn gap_finding() {
2996 1 : let rel = RelTag {
2997 1 : spcnode: 1663,
2998 1 : dbnode: 208101,
2999 1 : relnode: 2620,
3000 1 : forknum: 0,
3001 1 : };
3002 1 : let base_blkno = 1;
3003 :
3004 1 : let base_key = rel_block_to_key(rel, base_blkno);
3005 1 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
3006 :
3007 1 : let shard = ShardIdentity::unsharded();
3008 :
3009 1 : let mut previous_nblocks = 0;
3010 11 : for i in 0..10 {
3011 10 : let crnt_blkno = base_blkno + i;
3012 10 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
3013 :
3014 10 : previous_nblocks = crnt_blkno + 1;
3015 :
3016 10 : if i == 0 {
3017 : // The first block we write is 1, so we should find the gap.
3018 1 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
3019 : } else {
3020 9 : assert!(gaps.is_none());
3021 : }
3022 : }
3023 :
3024 : // This is an update to an already existing block. No gaps here.
3025 1 : let update_blkno = 5;
3026 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3027 1 : assert!(gaps.is_none());
3028 :
3029 : // This is an update past the current end block.
3030 1 : let after_gap_blkno = 20;
3031 1 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
3032 :
3033 1 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
3034 1 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
3035 1 : assert_eq!(
3036 1 : gaps.unwrap(),
3037 1 : KeySpace::single(gap_start_key..after_gap_key)
3038 : );
3039 1 : }
3040 :
3041 : #[test]
3042 1 : fn sharded_gap_finding() {
3043 1 : let rel = RelTag {
3044 1 : spcnode: 1663,
3045 1 : dbnode: 208101,
3046 1 : relnode: 2620,
3047 1 : forknum: 0,
3048 1 : };
3049 :
3050 1 : let first_blkno = 6;
3051 :
3052 : // This shard will get the even blocks
3053 1 : let shard = ShardIdentity::from_params(
3054 1 : ShardNumber(0),
3055 1 : ShardParameters {
3056 1 : count: ShardCount(2),
3057 1 : stripe_size: ShardStripeSize(1),
3058 1 : },
3059 : );
3060 :
3061 : // Only keys belonging to this shard are considered as gaps.
3062 1 : let mut previous_nblocks = 0;
3063 1 : let gaps =
3064 1 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
3065 1 : assert!(!gaps.ranges.is_empty());
3066 3 : for gap_range in gaps.ranges {
3067 2 : let mut k = gap_range.start;
3068 4 : while k != gap_range.end {
3069 2 : assert_eq!(shard.get_shard_number(&k), shard.number);
3070 2 : k = k.next();
3071 : }
3072 : }
3073 :
3074 1 : previous_nblocks = first_blkno;
3075 :
3076 1 : let update_blkno = 2;
3077 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3078 1 : assert!(gaps.is_none());
3079 1 : }
3080 :
3081 : /*
3082 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
3083 : let incremental = timeline.get_current_logical_size();
3084 : let non_incremental = timeline
3085 : .get_current_logical_size_non_incremental(lsn)
3086 : .unwrap();
3087 : assert_eq!(incremental, non_incremental);
3088 : }
3089 : */
3090 :
3091 : /*
3092 : ///
3093 : /// Test list_rels() function, with branches and dropped relations
3094 : ///
3095 : #[test]
3096 : fn test_list_rels_drop() -> Result<()> {
3097 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
3098 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
3099 : const TESTDB: u32 = 111;
3100 :
3101 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
3102 : // after branching fails below
3103 : let mut writer = tline.begin_record(Lsn(0x10));
3104 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
3105 : writer.finish()?;
3106 :
3107 : // Create a relation on the timeline
3108 : let mut writer = tline.begin_record(Lsn(0x20));
3109 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
3110 : writer.finish()?;
3111 :
3112 : let writer = tline.begin_record(Lsn(0x00));
3113 : writer.finish()?;
3114 :
3115 : // Check that list_rels() lists it after LSN 2, but no before it
3116 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
3117 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
3118 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
3119 :
3120 : // Create a branch, check that the relation is visible there
3121 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
3122 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
3123 : Some(timeline) => timeline,
3124 : None => panic!("Should have a local timeline"),
3125 : };
3126 : let newtline = DatadirTimelineImpl::new(newtline);
3127 : assert!(newtline
3128 : .list_rels(0, TESTDB, Lsn(0x30))?
3129 : .contains(&TESTREL_A));
3130 :
3131 : // Drop it on the branch
3132 : let mut new_writer = newtline.begin_record(Lsn(0x40));
3133 : new_writer.drop_relation(TESTREL_A)?;
3134 : new_writer.finish()?;
3135 :
3136 : // Check that it's no longer listed on the branch after the point where it was dropped
3137 : assert!(newtline
3138 : .list_rels(0, TESTDB, Lsn(0x30))?
3139 : .contains(&TESTREL_A));
3140 : assert!(!newtline
3141 : .list_rels(0, TESTDB, Lsn(0x40))?
3142 : .contains(&TESTREL_A));
3143 :
3144 : // Run checkpoint and garbage collection and check that it's still not visible
3145 : newtline.checkpoint(CheckpointConfig::Forced)?;
3146 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
3147 :
3148 : assert!(!newtline
3149 : .list_rels(0, TESTDB, Lsn(0x40))?
3150 : .contains(&TESTREL_A));
3151 :
3152 : Ok(())
3153 : }
3154 : */
3155 :
3156 : /*
3157 : #[test]
3158 : fn test_read_beyond_eof() -> Result<()> {
3159 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3160 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3161 :
3162 : make_some_layers(&tline, Lsn(0x20))?;
3163 : let mut writer = tline.begin_record(Lsn(0x60));
3164 : walingest.put_rel_page_image(
3165 : &mut writer,
3166 : TESTREL_A,
3167 : 0,
3168 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3169 : )?;
3170 : writer.finish()?;
3171 :
3172 : // Test read before rel creation. Should error out.
3173 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3174 :
3175 : // Read block beyond end of relation at different points in time.
3176 : // These reads should fall into different delta, image, and in-memory layers.
3177 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3178 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3179 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3180 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3181 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3182 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3183 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3184 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3185 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3186 :
3187 : // Test on an in-memory layer with no preceding layer
3188 : let mut writer = tline.begin_record(Lsn(0x70));
3189 : walingest.put_rel_page_image(
3190 : &mut writer,
3191 : TESTREL_B,
3192 : 0,
3193 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3194 : )?;
3195 : writer.finish()?;
3196 :
3197 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3198 :
3199 : Ok(())
3200 : }
3201 : */
3202 : }
|