Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 : use std::sync::Arc;
12 :
13 : use crate::walingest::{WalIngestError, WalIngestErrorKind};
14 : use crate::{PERF_TRACE_TARGET, ensure_walingest};
15 : use anyhow::Context;
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use pageserver_api::key::{
19 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
20 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
21 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
22 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
23 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
24 : };
25 : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
26 : use pageserver_api::models::RelSizeMigration;
27 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
28 : use pageserver_api::shard::ShardIdentity;
29 : use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
30 : use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
31 : use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
32 : use serde::{Deserialize, Serialize};
33 : use strum::IntoEnumIterator;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::{debug, info, info_span, trace, warn};
36 : use utils::bin_ser::{BeSer, DeserializeError};
37 : use utils::lsn::Lsn;
38 : use utils::pausable_failpoint;
39 : use wal_decoder::models::record::NeonWalRecord;
40 : use wal_decoder::models::value::Value;
41 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
42 :
43 : use super::tenant::{PageReconstructError, Timeline};
44 : use crate::aux_file;
45 : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
46 : use crate::keyspace::{KeySpace, KeySpaceAccum};
47 : use crate::metrics::{
48 : RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
49 : RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
50 : RELSIZE_SNAPSHOT_CACHE_MISSES,
51 : };
52 : use crate::span::{
53 : debug_assert_current_span_has_tenant_and_timeline_id,
54 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
55 : };
56 : use crate::tenant::storage_layer::IoConcurrency;
57 : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
58 :
59 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
60 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
61 :
62 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
63 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
64 :
65 : #[derive(Debug)]
66 : pub enum LsnForTimestamp {
67 : /// Found commits both before and after the given timestamp
68 : Present(Lsn),
69 :
70 : /// Found no commits after the given timestamp, this means
71 : /// that the newest data in the branch is older than the given
72 : /// timestamp.
73 : ///
74 : /// All commits <= LSN happened before the given timestamp
75 : Future(Lsn),
76 :
77 : /// The queried timestamp is past our horizon we look back at (PITR)
78 : ///
79 : /// All commits > LSN happened after the given timestamp,
80 : /// but any commits < LSN might have happened before or after
81 : /// the given timestamp. We don't know because no data before
82 : /// the given lsn is available.
83 : Past(Lsn),
84 :
85 : /// We have found no commit with a timestamp,
86 : /// so we can't return anything meaningful.
87 : ///
88 : /// The associated LSN is the lower bound value we can safely
89 : /// create branches on, but no statement is made if it is
90 : /// older or newer than the timestamp.
91 : ///
92 : /// This variant can e.g. be returned right after a
93 : /// cluster import.
94 : NoData(Lsn),
95 : }
96 :
97 : /// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
98 : /// See comments libs/pageserver_api/src/models.rs.
99 : /// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
100 : /// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
101 : #[derive(Debug, Clone, Copy, Default)]
102 : pub struct LsnRange {
103 : pub effective_lsn: Lsn,
104 : pub request_lsn: Lsn,
105 : }
106 :
107 : impl LsnRange {
108 0 : pub fn at(lsn: Lsn) -> LsnRange {
109 0 : LsnRange {
110 0 : effective_lsn: lsn,
111 0 : request_lsn: lsn,
112 0 : }
113 0 : }
114 16 : pub fn is_latest(&self) -> bool {
115 16 : self.request_lsn == Lsn::MAX
116 16 : }
117 : }
118 :
119 : #[derive(Debug, thiserror::Error)]
120 : pub(crate) enum CalculateLogicalSizeError {
121 : #[error("cancelled")]
122 : Cancelled,
123 :
124 : /// Something went wrong while reading the metadata we use to calculate logical size
125 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
126 : /// in the `From` implementation for this variant.
127 : #[error(transparent)]
128 : PageRead(PageReconstructError),
129 :
130 : /// Something went wrong deserializing metadata that we read to calculate logical size
131 : #[error("decode error: {0}")]
132 : Decode(#[from] DeserializeError),
133 : }
134 :
135 : #[derive(Debug, thiserror::Error)]
136 : pub(crate) enum CollectKeySpaceError {
137 : #[error(transparent)]
138 : Decode(#[from] DeserializeError),
139 : #[error(transparent)]
140 : PageRead(PageReconstructError),
141 : #[error("cancelled")]
142 : Cancelled,
143 : }
144 :
145 : impl CollectKeySpaceError {
146 0 : pub(crate) fn is_cancel(&self) -> bool {
147 0 : match self {
148 0 : CollectKeySpaceError::Decode(_) => false,
149 0 : CollectKeySpaceError::PageRead(e) => e.is_cancel(),
150 0 : CollectKeySpaceError::Cancelled => true,
151 : }
152 0 : }
153 0 : pub(crate) fn into_anyhow(self) -> anyhow::Error {
154 0 : match self {
155 0 : CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
156 0 : CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
157 0 : CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
158 : }
159 0 : }
160 : }
161 :
162 : impl From<PageReconstructError> for CollectKeySpaceError {
163 0 : fn from(err: PageReconstructError) -> Self {
164 0 : match err {
165 0 : PageReconstructError::Cancelled => Self::Cancelled,
166 0 : err => Self::PageRead(err),
167 : }
168 0 : }
169 : }
170 :
171 : impl From<PageReconstructError> for CalculateLogicalSizeError {
172 0 : fn from(pre: PageReconstructError) -> Self {
173 0 : match pre {
174 0 : PageReconstructError::Cancelled => Self::Cancelled,
175 0 : _ => Self::PageRead(pre),
176 : }
177 0 : }
178 : }
179 :
180 : #[derive(Debug, thiserror::Error)]
181 : pub enum RelationError {
182 : #[error("invalid relnode")]
183 : InvalidRelnode,
184 : }
185 :
186 : ///
187 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
188 : /// and other special kinds of files, in a versioned key-value store. The
189 : /// Timeline struct provides the key-value store.
190 : ///
191 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
192 : /// implementation, and might be moved into a separate struct later.
193 : impl Timeline {
194 : /// Start ingesting a WAL record, or other atomic modification of
195 : /// the timeline.
196 : ///
197 : /// This provides a transaction-like interface to perform a bunch
198 : /// of modifications atomically.
199 : ///
200 : /// To ingest a WAL record, call begin_modification(lsn) to get a
201 : /// DatadirModification object. Use the functions in the object to
202 : /// modify the repository state, updating all the pages and metadata
203 : /// that the WAL record affects. When you're done, call commit() to
204 : /// commit the changes.
205 : ///
206 : /// Lsn stored in modification is advanced by `ingest_record` and
207 : /// is used by `commit()` to update `last_record_lsn`.
208 : ///
209 : /// Calling commit() will flush all the changes and reset the state,
210 : /// so the `DatadirModification` struct can be reused to perform the next modification.
211 : ///
212 : /// Note that any pending modifications you make through the
213 : /// modification object won't be visible to calls to the 'get' and list
214 : /// functions of the timeline until you finish! And if you update the
215 : /// same page twice, the last update wins.
216 : ///
217 134215 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
218 134215 : where
219 134215 : Self: Sized,
220 : {
221 134215 : DatadirModification {
222 134215 : tline: self,
223 134215 : pending_lsns: Vec::new(),
224 134215 : pending_metadata_pages: HashMap::new(),
225 134215 : pending_data_batch: None,
226 134215 : pending_deletions: Vec::new(),
227 134215 : pending_nblocks: 0,
228 134215 : pending_directory_entries: Vec::new(),
229 134215 : pending_metadata_bytes: 0,
230 134215 : lsn,
231 134215 : }
232 134215 : }
233 :
234 : //------------------------------------------------------------------------------
235 : // Public GET functions
236 : //------------------------------------------------------------------------------
237 :
238 : /// Look up given page version.
239 9192 : pub(crate) async fn get_rel_page_at_lsn(
240 9192 : &self,
241 9192 : tag: RelTag,
242 9192 : blknum: BlockNumber,
243 9192 : version: Version<'_>,
244 9192 : ctx: &RequestContext,
245 9192 : io_concurrency: IoConcurrency,
246 9192 : ) -> Result<Bytes, PageReconstructError> {
247 9192 : match version {
248 9192 : Version::LsnRange(lsns) => {
249 9192 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
250 9192 : let res = self
251 9192 : .get_rel_page_at_lsn_batched(
252 9192 : pages
253 9192 : .iter()
254 9192 : .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
255 9192 : io_concurrency.clone(),
256 9192 : ctx,
257 : )
258 9192 : .await;
259 9192 : assert_eq!(res.len(), 1);
260 9192 : res.into_iter().next().unwrap()
261 : }
262 0 : Version::Modified(modification) => {
263 0 : if tag.relnode == 0 {
264 0 : return Err(PageReconstructError::Other(
265 0 : RelationError::InvalidRelnode.into(),
266 0 : ));
267 0 : }
268 :
269 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
270 0 : if blknum >= nblocks {
271 0 : debug!(
272 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
273 : tag,
274 : blknum,
275 0 : version.get_lsn(),
276 : nblocks
277 : );
278 0 : return Ok(ZERO_PAGE.clone());
279 0 : }
280 :
281 0 : let key = rel_block_to_key(tag, blknum);
282 0 : modification.get(key, ctx).await
283 : }
284 : }
285 9192 : }
286 :
287 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
288 : ///
289 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
290 : ///
291 : /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
292 : /// if the client goes away (e.g. due to timeout or cancellation).
293 : /// TODO: verify that it actually is cancellation-safe.
294 9192 : pub(crate) async fn get_rel_page_at_lsn_batched(
295 9192 : &self,
296 9192 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
297 9192 : io_concurrency: IoConcurrency,
298 9192 : ctx: &RequestContext,
299 9192 : ) -> Vec<Result<Bytes, PageReconstructError>> {
300 9192 : debug_assert_current_span_has_tenant_and_timeline_id();
301 :
302 9192 : let mut slots_filled = 0;
303 9192 : let page_count = pages.len();
304 :
305 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
306 9192 : let mut result = Vec::with_capacity(pages.len());
307 9192 : let result_slots = result.spare_capacity_mut();
308 :
309 9192 : let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
310 9192 : HashMap::with_capacity(pages.len());
311 :
312 9192 : let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
313 9192 : HashMap::with_capacity(pages.len());
314 :
315 9192 : for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
316 9192 : if tag.relnode == 0 {
317 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
318 0 : RelationError::InvalidRelnode.into(),
319 0 : )));
320 :
321 0 : slots_filled += 1;
322 0 : continue;
323 9192 : }
324 9192 : let lsn = lsns.effective_lsn;
325 9192 : let nblocks = {
326 9192 : let ctx = RequestContextBuilder::from(&ctx)
327 9192 : .perf_span(|crnt_perf_span| {
328 0 : info_span!(
329 : target: PERF_TRACE_TARGET,
330 0 : parent: crnt_perf_span,
331 : "GET_REL_SIZE",
332 : reltag=%tag,
333 : lsn=%lsn,
334 : )
335 0 : })
336 9192 : .attached_child();
337 :
338 9192 : match self
339 9192 : .get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
340 9192 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
341 9192 : .await
342 : {
343 9192 : Ok(nblocks) => nblocks,
344 0 : Err(err) => {
345 0 : result_slots[response_slot_idx].write(Err(err));
346 0 : slots_filled += 1;
347 0 : continue;
348 : }
349 : }
350 : };
351 :
352 9192 : if *blknum >= nblocks {
353 0 : debug!(
354 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
355 : tag, blknum, lsn, nblocks
356 : );
357 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
358 0 : slots_filled += 1;
359 0 : continue;
360 9192 : }
361 :
362 9192 : let key = rel_block_to_key(*tag, *blknum);
363 :
364 9192 : let ctx = RequestContextBuilder::from(&ctx)
365 9192 : .perf_span(|crnt_perf_span| {
366 0 : info_span!(
367 : target: PERF_TRACE_TARGET,
368 0 : parent: crnt_perf_span,
369 : "GET_BATCH",
370 : batch_size = %page_count,
371 : )
372 0 : })
373 9192 : .attached_child();
374 :
375 9192 : let key_slots = keys_slots.entry(key).or_default();
376 9192 : key_slots.push((response_slot_idx, ctx));
377 :
378 9192 : let acc = req_keyspaces.entry(lsn).or_default();
379 9192 : acc.add_key(key);
380 : }
381 :
382 9192 : let query: Vec<(Lsn, KeySpace)> = req_keyspaces
383 9192 : .into_iter()
384 9192 : .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
385 9192 : .collect();
386 :
387 9192 : let query = VersionedKeySpaceQuery::scattered(query);
388 9192 : let res = self
389 9192 : .get_vectored(query, io_concurrency, ctx)
390 9192 : .maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
391 9192 : .await;
392 :
393 9192 : match res {
394 9192 : Ok(results) => {
395 18384 : for (key, res) in results {
396 9192 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
397 9192 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
398 :
399 9192 : for (slot, req_ctx) in key_slots {
400 0 : let clone = match &res {
401 0 : Ok(buf) => Ok(buf.clone()),
402 0 : Err(err) => Err(match err {
403 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
404 :
405 0 : x @ PageReconstructError::Other(_)
406 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
407 0 : | x @ PageReconstructError::WalRedo(_)
408 0 : | x @ PageReconstructError::MissingKey(_) => {
409 0 : PageReconstructError::Other(anyhow::anyhow!(
410 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
411 0 : ))
412 : }
413 : }),
414 : };
415 :
416 0 : result_slots[slot].write(clone);
417 : // There is no standardized way to express that the batched span followed from N request spans.
418 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
419 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
420 0 : req_ctx.perf_follows_from(ctx);
421 0 : slots_filled += 1;
422 : }
423 :
424 9192 : result_slots[first_slot].write(res);
425 9192 : first_req_ctx.perf_follows_from(ctx);
426 9192 : slots_filled += 1;
427 : }
428 : }
429 0 : Err(err) => {
430 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
431 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
432 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
433 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
434 : // but without taking ownership of the GetVectoredError
435 0 : let err = match &err {
436 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
437 : // TODO: restructure get_vectored API to make this error per-key
438 0 : GetVectoredError::MissingKey(err) => {
439 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
440 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
441 0 : )))
442 : }
443 : // TODO: restructure get_vectored API to make this error per-key
444 0 : GetVectoredError::GetReadyAncestorError(err) => {
445 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
446 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
447 0 : )))
448 : }
449 : // TODO: restructure get_vectored API to make this error per-key
450 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
451 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
452 0 : )),
453 : // TODO: we can prevent this error class by moving this check into the type system
454 0 : GetVectoredError::InvalidLsn(e) => {
455 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
456 : }
457 : // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys
458 : // TODO: we can prevent this error class by moving this check into the type system
459 0 : GetVectoredError::Oversized(err, max) => {
460 0 : Err(anyhow::anyhow!("batching oversized: {err} > {max}").into())
461 : }
462 : };
463 :
464 0 : req_ctx.perf_follows_from(ctx);
465 0 : result_slots[*slot].write(err);
466 : }
467 :
468 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
469 : }
470 : };
471 :
472 9192 : assert_eq!(slots_filled, page_count);
473 : // SAFETY:
474 : // 1. `result` and any of its uninint members are not read from until this point
475 : // 2. The length below is tracked at run-time and matches the number of requested pages.
476 9192 : unsafe {
477 9192 : result.set_len(page_count);
478 9192 : }
479 :
480 9192 : result
481 9192 : }
482 :
483 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
484 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
485 : /// for pages up to the highest page number it has stored.
486 0 : pub(crate) async fn get_db_size(
487 0 : &self,
488 0 : spcnode: Oid,
489 0 : dbnode: Oid,
490 0 : version: Version<'_>,
491 0 : ctx: &RequestContext,
492 0 : ) -> Result<usize, PageReconstructError> {
493 0 : let mut total_blocks = 0;
494 :
495 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
496 :
497 0 : if rels.is_empty() {
498 0 : return Ok(0);
499 0 : }
500 :
501 : // Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`.
502 0 : let reldir_key = rel_dir_to_key(spcnode, dbnode);
503 0 : let buf = version.get(self, reldir_key, ctx).await?;
504 0 : let reldir = RelDirectory::des(&buf)?;
505 :
506 0 : for rel in rels {
507 0 : let n_blocks = self
508 0 : .get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), false, ctx)
509 0 : .await?
510 0 : .expect("allow_missing=false");
511 0 : total_blocks += n_blocks as usize;
512 : }
513 0 : Ok(total_blocks)
514 0 : }
515 :
516 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
517 : ///
518 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
519 : /// page number stored in the shard.
520 12217 : pub(crate) async fn get_rel_size(
521 12217 : &self,
522 12217 : tag: RelTag,
523 12217 : version: Version<'_>,
524 12217 : ctx: &RequestContext,
525 12217 : ) -> Result<BlockNumber, PageReconstructError> {
526 12217 : Ok(self
527 12217 : .get_rel_size_in_reldir(tag, version, None, false, ctx)
528 12217 : .await?
529 12215 : .expect("allow_missing=false"))
530 12217 : }
531 :
532 : /// Get size of a relation file. If `allow_missing` is true, returns None for missing relations,
533 : /// otherwise errors.
534 : ///
535 : /// INVARIANT: never returns None if `allow_missing=false`.
536 : ///
537 : /// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
538 12217 : pub(crate) async fn get_rel_size_in_reldir(
539 12217 : &self,
540 12217 : tag: RelTag,
541 12217 : version: Version<'_>,
542 12217 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
543 12217 : allow_missing: bool,
544 12217 : ctx: &RequestContext,
545 12217 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
546 12217 : if tag.relnode == 0 {
547 0 : return Err(PageReconstructError::Other(
548 0 : RelationError::InvalidRelnode.into(),
549 0 : ));
550 12217 : }
551 :
552 12217 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
553 12210 : return Ok(Some(nblocks));
554 7 : }
555 :
556 7 : if allow_missing
557 0 : && !self
558 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
559 0 : .await?
560 : {
561 0 : return Ok(None);
562 7 : }
563 :
564 7 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
565 0 : && !self
566 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
567 0 : .await?
568 : {
569 : // FIXME: Postgres sometimes calls smgrcreate() to create
570 : // FSM, and smgrnblocks() on it immediately afterwards,
571 : // without extending it. Tolerate that by claiming that
572 : // any non-existent FSM fork has size 0.
573 0 : return Ok(Some(0));
574 7 : }
575 :
576 7 : let key = rel_size_to_key(tag);
577 7 : let mut buf = version.get(self, key, ctx).await?;
578 5 : let nblocks = buf.get_u32_le();
579 :
580 5 : self.update_cached_rel_size(tag, version, nblocks);
581 :
582 5 : Ok(Some(nblocks))
583 12217 : }
584 :
585 : /// Does the relation exist?
586 : ///
587 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
588 : /// the shard stores pages for.
589 : ///
590 3025 : pub(crate) async fn get_rel_exists(
591 3025 : &self,
592 3025 : tag: RelTag,
593 3025 : version: Version<'_>,
594 3025 : ctx: &RequestContext,
595 3025 : ) -> Result<bool, PageReconstructError> {
596 3025 : self.get_rel_exists_in_reldir(tag, version, None, ctx).await
597 3025 : }
598 :
599 : /// Does the relation exist? With a cached deserialized `RelDirectory`.
600 : ///
601 : /// There are some cases where the caller loops across all relations. In that specific case,
602 : /// the caller should obtain the deserialized `RelDirectory` first and then call this function
603 : /// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing
604 : /// a new API (e.g., `get_rel_exists_batched`).
605 3025 : pub(crate) async fn get_rel_exists_in_reldir(
606 3025 : &self,
607 3025 : tag: RelTag,
608 3025 : version: Version<'_>,
609 3025 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
610 3025 : ctx: &RequestContext,
611 3025 : ) -> Result<bool, PageReconstructError> {
612 3025 : if tag.relnode == 0 {
613 0 : return Err(PageReconstructError::Other(
614 0 : RelationError::InvalidRelnode.into(),
615 0 : ));
616 3025 : }
617 :
618 : // first try to lookup relation in cache
619 3025 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
620 3016 : return Ok(true);
621 9 : }
622 : // then check if the database was already initialized.
623 : // get_rel_exists can be called before dbdir is created.
624 9 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
625 9 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
626 9 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
627 0 : return Ok(false);
628 9 : }
629 :
630 : // Read path: first read the new reldir keyspace. Early return if the relation exists.
631 : // Otherwise, read the old reldir keyspace.
632 : // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
633 :
634 : if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
635 9 : self.get_rel_size_v2_status()
636 : {
637 : // fetch directory listing (new)
638 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
639 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
640 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
641 0 : let exists_v2 = buf == RelDirExists::Exists;
642 : // Fast path: if the relation exists in the new format, return true.
643 : // TODO: we should have a verification mode that checks both keyspaces
644 : // to ensure the relation only exists in one of them.
645 0 : if exists_v2 {
646 0 : return Ok(true);
647 0 : }
648 9 : }
649 :
650 : // fetch directory listing (old)
651 :
652 9 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
653 :
654 9 : if let Some((cached_key, dir)) = deserialized_reldir_v1 {
655 0 : if cached_key == key {
656 0 : return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
657 0 : } else if cfg!(test) || cfg!(feature = "testing") {
658 0 : panic!("cached reldir key mismatch: {cached_key} != {key}");
659 : } else {
660 0 : warn!("cached reldir key mismatch: {cached_key} != {key}");
661 : }
662 : // Fallback to reading the directory from the datadir.
663 9 : }
664 9 : let buf = version.get(self, key, ctx).await?;
665 :
666 9 : let dir = RelDirectory::des(&buf)?;
667 9 : let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
668 9 : Ok(exists_v1)
669 3025 : }
670 :
671 : /// Get a list of all existing relations in given tablespace and database.
672 : ///
673 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
674 : /// the shard stores pages for.
675 : ///
676 : /// # Cancel-Safety
677 : ///
678 : /// This method is cancellation-safe.
679 0 : pub(crate) async fn list_rels(
680 0 : &self,
681 0 : spcnode: Oid,
682 0 : dbnode: Oid,
683 0 : version: Version<'_>,
684 0 : ctx: &RequestContext,
685 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
686 : // fetch directory listing (old)
687 0 : let key = rel_dir_to_key(spcnode, dbnode);
688 0 : let buf = version.get(self, key, ctx).await?;
689 :
690 0 : let dir = RelDirectory::des(&buf)?;
691 0 : let rels_v1: HashSet<RelTag> =
692 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
693 0 : spcnode,
694 0 : dbnode,
695 0 : relnode: *relnode,
696 0 : forknum: *forknum,
697 0 : }));
698 :
699 0 : if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
700 0 : return Ok(rels_v1);
701 0 : }
702 :
703 : // scan directory listing (new), merge with the old results
704 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
705 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
706 0 : self.conf.get_vectored_concurrent_io,
707 0 : self.gate
708 0 : .enter()
709 0 : .map_err(|_| PageReconstructError::Cancelled)?,
710 : );
711 0 : let results = self
712 0 : .scan(
713 0 : KeySpace::single(key_range),
714 0 : version.get_lsn(),
715 0 : ctx,
716 0 : io_concurrency,
717 0 : )
718 0 : .await?;
719 0 : let mut rels = rels_v1;
720 0 : for (key, val) in results {
721 0 : let val = RelDirExists::decode(&val?)
722 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
723 0 : assert_eq!(key.field6, 1);
724 0 : assert_eq!(key.field2, spcnode);
725 0 : assert_eq!(key.field3, dbnode);
726 0 : let tag = RelTag {
727 0 : spcnode,
728 0 : dbnode,
729 0 : relnode: key.field4,
730 0 : forknum: key.field5,
731 0 : };
732 0 : if val == RelDirExists::Removed {
733 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
734 0 : continue;
735 0 : }
736 0 : let did_not_contain = rels.insert(tag);
737 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
738 : }
739 0 : Ok(rels)
740 0 : }
741 :
742 : /// Get the whole SLRU segment
743 0 : pub(crate) async fn get_slru_segment(
744 0 : &self,
745 0 : kind: SlruKind,
746 0 : segno: u32,
747 0 : lsn: Lsn,
748 0 : ctx: &RequestContext,
749 0 : ) -> Result<Bytes, PageReconstructError> {
750 0 : assert!(self.tenant_shard_id.is_shard_zero());
751 0 : let n_blocks = self
752 0 : .get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
753 0 : .await?;
754 :
755 0 : let keyspace = KeySpace::single(
756 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
757 : );
758 :
759 0 : let batches = keyspace.partition(
760 0 : self.get_shard_identity(),
761 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
762 0 : BLCKSZ as u64,
763 : );
764 :
765 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
766 0 : self.conf.get_vectored_concurrent_io,
767 0 : self.gate
768 0 : .enter()
769 0 : .map_err(|_| PageReconstructError::Cancelled)?,
770 : );
771 :
772 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
773 0 : for batch in batches.parts {
774 0 : let query = VersionedKeySpaceQuery::uniform(batch, lsn);
775 0 : let blocks = self
776 0 : .get_vectored(query, io_concurrency.clone(), ctx)
777 0 : .await?;
778 :
779 0 : for (_key, block) in blocks {
780 0 : let block = block?;
781 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
782 : }
783 : }
784 :
785 0 : Ok(segment.freeze())
786 0 : }
787 :
788 : /// Get size of an SLRU segment
789 0 : pub(crate) async fn get_slru_segment_size(
790 0 : &self,
791 0 : kind: SlruKind,
792 0 : segno: u32,
793 0 : version: Version<'_>,
794 0 : ctx: &RequestContext,
795 0 : ) -> Result<BlockNumber, PageReconstructError> {
796 0 : assert!(self.tenant_shard_id.is_shard_zero());
797 0 : let key = slru_segment_size_to_key(kind, segno);
798 0 : let mut buf = version.get(self, key, ctx).await?;
799 0 : Ok(buf.get_u32_le())
800 0 : }
801 :
802 : /// Does the slru segment exist?
803 0 : pub(crate) async fn get_slru_segment_exists(
804 0 : &self,
805 0 : kind: SlruKind,
806 0 : segno: u32,
807 0 : version: Version<'_>,
808 0 : ctx: &RequestContext,
809 0 : ) -> Result<bool, PageReconstructError> {
810 0 : assert!(self.tenant_shard_id.is_shard_zero());
811 : // fetch directory listing
812 0 : let key = slru_dir_to_key(kind);
813 0 : let buf = version.get(self, key, ctx).await?;
814 :
815 0 : let dir = SlruSegmentDirectory::des(&buf)?;
816 0 : Ok(dir.segments.contains(&segno))
817 0 : }
818 :
819 : /// Locate LSN, such that all transactions that committed before
820 : /// 'search_timestamp' are visible, but nothing newer is.
821 : ///
822 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
823 : /// so it's not well defined which LSN you get if there were multiple commits
824 : /// "in flight" at that point in time.
825 : ///
826 0 : pub(crate) async fn find_lsn_for_timestamp(
827 0 : &self,
828 0 : search_timestamp: TimestampTz,
829 0 : cancel: &CancellationToken,
830 0 : ctx: &RequestContext,
831 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
832 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
833 :
834 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
835 0 : let gc_cutoff_planned = {
836 0 : let gc_info = self.gc_info.read().unwrap();
837 0 : info!(cutoffs=?gc_info.cutoffs, applied_cutoff=%*gc_cutoff_lsn_guard, "starting find_lsn_for_timestamp");
838 0 : gc_info.min_cutoff()
839 : };
840 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
841 : // but let's be defensive.
842 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
843 : // We use this method to figure out the branching LSN for the new branch, but the
844 : // GC cutoff could be before the branching point and we cannot create a new branch
845 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
846 : // on the safe side.
847 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
848 0 : let max_lsn = self.get_last_record_lsn();
849 :
850 : // LSNs are always 8-byte aligned. low/mid/high represent the
851 : // LSN divided by 8.
852 0 : let mut low = min_lsn.0 / 8;
853 0 : let mut high = max_lsn.0 / 8 + 1;
854 :
855 0 : let mut found_smaller = false;
856 0 : let mut found_larger = false;
857 :
858 0 : while low < high {
859 0 : if cancel.is_cancelled() {
860 0 : return Err(PageReconstructError::Cancelled);
861 0 : }
862 : // cannot overflow, high and low are both smaller than u64::MAX / 2
863 0 : let mid = (high + low) / 2;
864 :
865 0 : let cmp = match self
866 0 : .is_latest_commit_timestamp_ge_than(
867 0 : search_timestamp,
868 0 : Lsn(mid * 8),
869 0 : &mut found_smaller,
870 0 : &mut found_larger,
871 0 : ctx,
872 : )
873 0 : .await
874 : {
875 0 : Ok(res) => res,
876 0 : Err(PageReconstructError::MissingKey(e)) => {
877 0 : warn!(
878 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
879 : e
880 : );
881 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
882 0 : return Ok(LsnForTimestamp::Past(min_lsn));
883 : }
884 0 : Err(e) => return Err(e),
885 : };
886 :
887 0 : if cmp {
888 0 : high = mid;
889 0 : } else {
890 0 : low = mid + 1;
891 0 : }
892 : }
893 :
894 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
895 : // so the LSN of the last commit record before or at `search_timestamp`.
896 : // Remove one from `low` to get `t`.
897 : //
898 : // FIXME: it would be better to get the LSN of the previous commit.
899 : // Otherwise, if you restore to the returned LSN, the database will
900 : // include physical changes from later commits that will be marked
901 : // as aborted, and will need to be vacuumed away.
902 0 : let commit_lsn = Lsn((low - 1) * 8);
903 0 : match (found_smaller, found_larger) {
904 : (false, false) => {
905 : // This can happen if no commit records have been processed yet, e.g.
906 : // just after importing a cluster.
907 0 : Ok(LsnForTimestamp::NoData(min_lsn))
908 : }
909 : (false, true) => {
910 : // Didn't find any commit timestamps smaller than the request
911 0 : Ok(LsnForTimestamp::Past(min_lsn))
912 : }
913 0 : (true, _) if commit_lsn < min_lsn => {
914 : // the search above did set found_smaller to true but it never increased the lsn.
915 : // Then, low is still the old min_lsn, and the subtraction above gave a value
916 : // below the min_lsn. We should never do that.
917 0 : Ok(LsnForTimestamp::Past(min_lsn))
918 : }
919 : (true, false) => {
920 : // Only found commits with timestamps smaller than the request.
921 : // It's still a valid case for branch creation, return it.
922 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
923 : // case, anyway.
924 0 : Ok(LsnForTimestamp::Future(commit_lsn))
925 : }
926 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
927 : }
928 0 : }
929 :
930 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
931 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
932 : ///
933 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
934 : /// with a smaller/larger timestamp.
935 : ///
936 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
937 0 : &self,
938 0 : search_timestamp: TimestampTz,
939 0 : probe_lsn: Lsn,
940 0 : found_smaller: &mut bool,
941 0 : found_larger: &mut bool,
942 0 : ctx: &RequestContext,
943 0 : ) -> Result<bool, PageReconstructError> {
944 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
945 0 : if timestamp >= search_timestamp {
946 0 : *found_larger = true;
947 0 : return ControlFlow::Break(true);
948 0 : } else {
949 0 : *found_smaller = true;
950 0 : }
951 0 : ControlFlow::Continue(())
952 0 : })
953 0 : .await
954 0 : }
955 :
956 : /// Obtain the timestamp for the given lsn.
957 : ///
958 : /// If the lsn has no timestamps (e.g. no commits), returns None.
959 0 : pub(crate) async fn get_timestamp_for_lsn(
960 0 : &self,
961 0 : probe_lsn: Lsn,
962 0 : ctx: &RequestContext,
963 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
964 0 : let mut max: Option<TimestampTz> = None;
965 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
966 0 : if let Some(max_prev) = max {
967 0 : max = Some(max_prev.max(timestamp));
968 0 : } else {
969 0 : max = Some(timestamp);
970 0 : }
971 0 : ControlFlow::Continue(())
972 0 : })
973 0 : .await?;
974 :
975 0 : Ok(max)
976 0 : }
977 :
978 : /// Runs the given function on all the timestamps for a given lsn
979 : ///
980 : /// The return value is either given by the closure, or set to the `Default`
981 : /// impl's output.
982 0 : async fn map_all_timestamps<T: Default>(
983 0 : &self,
984 0 : probe_lsn: Lsn,
985 0 : ctx: &RequestContext,
986 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
987 0 : ) -> Result<T, PageReconstructError> {
988 0 : for segno in self
989 0 : .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
990 0 : .await?
991 : {
992 0 : let nblocks = self
993 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
994 0 : .await?;
995 :
996 0 : let keyspace = KeySpace::single(
997 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
998 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
999 : );
1000 :
1001 0 : let batches = keyspace.partition(
1002 0 : self.get_shard_identity(),
1003 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
1004 0 : BLCKSZ as u64,
1005 : );
1006 :
1007 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
1008 0 : self.conf.get_vectored_concurrent_io,
1009 0 : self.gate
1010 0 : .enter()
1011 0 : .map_err(|_| PageReconstructError::Cancelled)?,
1012 : );
1013 :
1014 0 : for batch in batches.parts.into_iter().rev() {
1015 0 : let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
1016 0 : let blocks = self
1017 0 : .get_vectored(query, io_concurrency.clone(), ctx)
1018 0 : .await?;
1019 :
1020 0 : for (_key, clog_page) in blocks.into_iter().rev() {
1021 0 : let clog_page = clog_page?;
1022 :
1023 0 : if clog_page.len() == BLCKSZ as usize + 8 {
1024 0 : let mut timestamp_bytes = [0u8; 8];
1025 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
1026 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
1027 :
1028 0 : match f(timestamp) {
1029 0 : ControlFlow::Break(b) => return Ok(b),
1030 0 : ControlFlow::Continue(()) => (),
1031 : }
1032 0 : }
1033 : }
1034 : }
1035 : }
1036 0 : Ok(Default::default())
1037 0 : }
1038 :
1039 0 : pub(crate) async fn get_slru_keyspace(
1040 0 : &self,
1041 0 : version: Version<'_>,
1042 0 : ctx: &RequestContext,
1043 0 : ) -> Result<KeySpace, PageReconstructError> {
1044 0 : let mut accum = KeySpaceAccum::new();
1045 :
1046 0 : for kind in SlruKind::iter() {
1047 0 : let mut segments: Vec<u32> = self
1048 0 : .list_slru_segments(kind, version, ctx)
1049 0 : .await?
1050 0 : .into_iter()
1051 0 : .collect();
1052 0 : segments.sort_unstable();
1053 :
1054 0 : for seg in segments {
1055 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
1056 :
1057 0 : accum.add_range(
1058 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
1059 : );
1060 : }
1061 : }
1062 :
1063 0 : Ok(accum.to_keyspace())
1064 0 : }
1065 :
1066 : /// Get a list of SLRU segments
1067 0 : pub(crate) async fn list_slru_segments(
1068 0 : &self,
1069 0 : kind: SlruKind,
1070 0 : version: Version<'_>,
1071 0 : ctx: &RequestContext,
1072 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
1073 : // fetch directory entry
1074 0 : let key = slru_dir_to_key(kind);
1075 :
1076 0 : let buf = version.get(self, key, ctx).await?;
1077 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
1078 0 : }
1079 :
1080 0 : pub(crate) async fn get_relmap_file(
1081 0 : &self,
1082 0 : spcnode: Oid,
1083 0 : dbnode: Oid,
1084 0 : version: Version<'_>,
1085 0 : ctx: &RequestContext,
1086 0 : ) -> Result<Bytes, PageReconstructError> {
1087 0 : let key = relmap_file_key(spcnode, dbnode);
1088 :
1089 0 : let buf = version.get(self, key, ctx).await?;
1090 0 : Ok(buf)
1091 0 : }
1092 :
1093 169 : pub(crate) async fn list_dbdirs(
1094 169 : &self,
1095 169 : lsn: Lsn,
1096 169 : ctx: &RequestContext,
1097 169 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
1098 : // fetch directory entry
1099 169 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1100 :
1101 169 : Ok(DbDirectory::des(&buf)?.dbdirs)
1102 169 : }
1103 :
1104 0 : pub(crate) async fn get_twophase_file(
1105 0 : &self,
1106 0 : xid: u64,
1107 0 : lsn: Lsn,
1108 0 : ctx: &RequestContext,
1109 0 : ) -> Result<Bytes, PageReconstructError> {
1110 0 : let key = twophase_file_key(xid);
1111 0 : let buf = self.get(key, lsn, ctx).await?;
1112 0 : Ok(buf)
1113 0 : }
1114 :
1115 170 : pub(crate) async fn list_twophase_files(
1116 170 : &self,
1117 170 : lsn: Lsn,
1118 170 : ctx: &RequestContext,
1119 170 : ) -> Result<HashSet<u64>, PageReconstructError> {
1120 : // fetch directory entry
1121 170 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
1122 :
1123 170 : if self.pg_version >= PgMajorVersion::PG17 {
1124 157 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
1125 : } else {
1126 13 : Ok(TwoPhaseDirectory::des(&buf)?
1127 : .xids
1128 13 : .iter()
1129 13 : .map(|x| u64::from(*x))
1130 13 : .collect())
1131 : }
1132 170 : }
1133 :
1134 0 : pub(crate) async fn get_control_file(
1135 0 : &self,
1136 0 : lsn: Lsn,
1137 0 : ctx: &RequestContext,
1138 0 : ) -> Result<Bytes, PageReconstructError> {
1139 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1140 0 : }
1141 :
1142 6 : pub(crate) async fn get_checkpoint(
1143 6 : &self,
1144 6 : lsn: Lsn,
1145 6 : ctx: &RequestContext,
1146 6 : ) -> Result<Bytes, PageReconstructError> {
1147 6 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1148 6 : }
1149 :
1150 6 : async fn list_aux_files_v2(
1151 6 : &self,
1152 6 : lsn: Lsn,
1153 6 : ctx: &RequestContext,
1154 6 : io_concurrency: IoConcurrency,
1155 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1156 6 : let kv = self
1157 6 : .scan(
1158 6 : KeySpace::single(Key::metadata_aux_key_range()),
1159 6 : lsn,
1160 6 : ctx,
1161 6 : io_concurrency,
1162 6 : )
1163 6 : .await?;
1164 6 : let mut result = HashMap::new();
1165 6 : let mut sz = 0;
1166 15 : for (_, v) in kv {
1167 9 : let v = v?;
1168 9 : let v = aux_file::decode_file_value_bytes(&v)
1169 9 : .context("value decode")
1170 9 : .map_err(PageReconstructError::Other)?;
1171 17 : for (fname, content) in v {
1172 8 : sz += fname.len();
1173 8 : sz += content.len();
1174 8 : result.insert(fname, content);
1175 8 : }
1176 : }
1177 6 : self.aux_file_size_estimator.on_initial(sz);
1178 6 : Ok(result)
1179 6 : }
1180 :
1181 0 : pub(crate) async fn trigger_aux_file_size_computation(
1182 0 : &self,
1183 0 : lsn: Lsn,
1184 0 : ctx: &RequestContext,
1185 0 : io_concurrency: IoConcurrency,
1186 0 : ) -> Result<(), PageReconstructError> {
1187 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1188 0 : Ok(())
1189 0 : }
1190 :
1191 6 : pub(crate) async fn list_aux_files(
1192 6 : &self,
1193 6 : lsn: Lsn,
1194 6 : ctx: &RequestContext,
1195 6 : io_concurrency: IoConcurrency,
1196 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1197 6 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1198 6 : }
1199 :
1200 2 : pub(crate) async fn get_replorigins(
1201 2 : &self,
1202 2 : lsn: Lsn,
1203 2 : ctx: &RequestContext,
1204 2 : io_concurrency: IoConcurrency,
1205 2 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1206 2 : let kv = self
1207 2 : .scan(
1208 2 : KeySpace::single(repl_origin_key_range()),
1209 2 : lsn,
1210 2 : ctx,
1211 2 : io_concurrency,
1212 2 : )
1213 2 : .await?;
1214 2 : let mut result = HashMap::new();
1215 6 : for (k, v) in kv {
1216 5 : let v = v?;
1217 5 : if v.is_empty() {
1218 : // This is a tombstone -- we can skip it.
1219 : // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
1220 : // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
1221 : // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
1222 : // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
1223 2 : continue;
1224 3 : }
1225 3 : let origin_id = k.field6 as RepOriginId;
1226 3 : let origin_lsn = Lsn::des(&v)
1227 3 : .with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
1228 2 : if origin_lsn != Lsn::INVALID {
1229 2 : result.insert(origin_id, origin_lsn);
1230 2 : }
1231 : }
1232 1 : Ok(result)
1233 2 : }
1234 :
1235 : /// Does the same as get_current_logical_size but counted on demand.
1236 : /// Used to initialize the logical size tracking on startup.
1237 : ///
1238 : /// Only relation blocks are counted currently. That excludes metadata,
1239 : /// SLRUs, twophase files etc.
1240 : ///
1241 : /// # Cancel-Safety
1242 : ///
1243 : /// This method is cancellation-safe.
1244 7 : pub(crate) async fn get_current_logical_size_non_incremental(
1245 7 : &self,
1246 7 : lsn: Lsn,
1247 7 : ctx: &RequestContext,
1248 7 : ) -> Result<u64, CalculateLogicalSizeError> {
1249 7 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1250 :
1251 7 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1252 :
1253 : // Fetch list of database dirs and iterate them
1254 7 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1255 7 : let dbdir = DbDirectory::des(&buf)?;
1256 :
1257 7 : let mut total_size: u64 = 0;
1258 7 : let mut dbdir_cnt = 0;
1259 7 : let mut rel_cnt = 0;
1260 :
1261 7 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1262 0 : dbdir_cnt += 1;
1263 0 : for rel in self
1264 0 : .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
1265 0 : .await?
1266 : {
1267 0 : rel_cnt += 1;
1268 0 : if self.cancel.is_cancelled() {
1269 0 : return Err(CalculateLogicalSizeError::Cancelled);
1270 0 : }
1271 0 : let relsize_key = rel_size_to_key(rel);
1272 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1273 0 : let relsize = buf.get_u32_le();
1274 :
1275 0 : total_size += relsize as u64;
1276 : }
1277 : }
1278 :
1279 7 : self.db_rel_count
1280 7 : .store(Some(Arc::new((dbdir_cnt, rel_cnt))));
1281 :
1282 7 : Ok(total_size * BLCKSZ as u64)
1283 7 : }
1284 :
1285 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1286 : /// for gc-compaction.
1287 : ///
1288 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1289 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1290 : /// be kept only for a specific range of LSN.
1291 : ///
1292 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1293 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1294 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1295 : /// determine which keys to retain/drop for gc-compaction.
1296 : ///
1297 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1298 : /// to be retained for each of the branch LSN.
1299 : ///
1300 : /// The return value is (dense keyspace, sparse keyspace).
1301 27 : pub(crate) async fn collect_gc_compaction_keyspace(
1302 27 : &self,
1303 27 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1304 27 : let metadata_key_begin = Key::metadata_key_range().start;
1305 27 : let aux_v1_key = AUX_FILES_KEY;
1306 27 : let dense_keyspace = KeySpace {
1307 27 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1308 27 : };
1309 27 : Ok((
1310 27 : dense_keyspace,
1311 27 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1312 27 : ))
1313 27 : }
1314 :
1315 : ///
1316 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1317 : /// Anything that's not listed maybe removed from the underlying storage (from
1318 : /// that LSN forwards).
1319 : ///
1320 : /// The return value is (dense keyspace, sparse keyspace).
1321 169 : pub(crate) async fn collect_keyspace(
1322 169 : &self,
1323 169 : lsn: Lsn,
1324 169 : ctx: &RequestContext,
1325 169 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1326 : // Iterate through key ranges, greedily packing them into partitions
1327 169 : let mut result = KeySpaceAccum::new();
1328 :
1329 : // The dbdir metadata always exists
1330 169 : result.add_key(DBDIR_KEY);
1331 :
1332 : // Fetch list of database dirs and iterate them
1333 169 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1334 169 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1335 :
1336 169 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1337 169 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1338 0 : if has_relmap_file {
1339 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1340 0 : }
1341 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1342 :
1343 0 : let mut rels: Vec<RelTag> = self
1344 0 : .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
1345 0 : .await?
1346 0 : .into_iter()
1347 0 : .collect();
1348 0 : rels.sort_unstable();
1349 0 : for rel in rels {
1350 0 : let relsize_key = rel_size_to_key(rel);
1351 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1352 0 : let relsize = buf.get_u32_le();
1353 :
1354 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1355 0 : result.add_key(relsize_key);
1356 : }
1357 : }
1358 :
1359 : // Iterate SLRUs next
1360 169 : if self.tenant_shard_id.is_shard_zero() {
1361 498 : for kind in [
1362 166 : SlruKind::Clog,
1363 166 : SlruKind::MultiXactMembers,
1364 166 : SlruKind::MultiXactOffsets,
1365 : ] {
1366 498 : let slrudir_key = slru_dir_to_key(kind);
1367 498 : result.add_key(slrudir_key);
1368 498 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1369 498 : let dir = SlruSegmentDirectory::des(&buf)?;
1370 498 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1371 498 : segments.sort_unstable();
1372 498 : for segno in segments {
1373 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1374 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1375 0 : let segsize = buf.get_u32_le();
1376 :
1377 0 : result.add_range(
1378 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1379 : );
1380 0 : result.add_key(segsize_key);
1381 : }
1382 : }
1383 3 : }
1384 :
1385 : // Then pg_twophase
1386 169 : result.add_key(TWOPHASEDIR_KEY);
1387 :
1388 169 : let mut xids: Vec<u64> = self
1389 169 : .list_twophase_files(lsn, ctx)
1390 169 : .await?
1391 169 : .iter()
1392 169 : .cloned()
1393 169 : .collect();
1394 169 : xids.sort_unstable();
1395 169 : for xid in xids {
1396 0 : result.add_key(twophase_file_key(xid));
1397 0 : }
1398 :
1399 169 : result.add_key(CONTROLFILE_KEY);
1400 169 : result.add_key(CHECKPOINT_KEY);
1401 :
1402 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1403 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1404 : // and the keys will not be garbage-colllected.
1405 : #[cfg(test)]
1406 : {
1407 169 : let guard = self.extra_test_dense_keyspace.load();
1408 169 : for kr in &guard.ranges {
1409 0 : result.add_range(kr.clone());
1410 0 : }
1411 : }
1412 :
1413 169 : let dense_keyspace = result.to_keyspace();
1414 169 : let sparse_keyspace = SparseKeySpace(KeySpace {
1415 169 : ranges: vec![
1416 169 : Key::metadata_aux_key_range(),
1417 169 : repl_origin_key_range(),
1418 169 : Key::rel_dir_sparse_key_range(),
1419 169 : ],
1420 169 : });
1421 :
1422 169 : if cfg!(debug_assertions) {
1423 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1424 :
1425 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1426 : // category of sparse keys are split into their own image/delta files. If there
1427 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1428 : // and we want the developer to keep the keyspaces separated.
1429 :
1430 169 : let ranges = &sparse_keyspace.0.ranges;
1431 :
1432 : // TODO: use a single overlaps_with across the codebase
1433 507 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1434 507 : !(a.end <= b.start || b.end <= a.start)
1435 507 : }
1436 507 : for i in 0..ranges.len() {
1437 507 : for j in 0..i {
1438 507 : if overlaps_with(&ranges[i], &ranges[j]) {
1439 0 : panic!(
1440 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1441 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1442 : );
1443 507 : }
1444 : }
1445 : }
1446 338 : for i in 1..ranges.len() {
1447 338 : assert!(
1448 338 : ranges[i - 1].end <= ranges[i].start,
1449 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1450 0 : ranges[i - 1].start,
1451 0 : ranges[i - 1].end,
1452 0 : ranges[i].start,
1453 0 : ranges[i].end
1454 : );
1455 : }
1456 0 : }
1457 :
1458 169 : Ok((dense_keyspace, sparse_keyspace))
1459 169 : }
1460 :
1461 : /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
1462 : /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
1463 : /// at the particular LSN (snapshot).
1464 224270 : pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
1465 224270 : let lsn = version.get_lsn();
1466 : {
1467 224270 : let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
1468 224270 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
1469 224259 : if lsn >= *cached_lsn {
1470 221686 : RELSIZE_LATEST_CACHE_HITS.inc();
1471 221686 : return Some(*nblocks);
1472 2573 : }
1473 2573 : RELSIZE_CACHE_MISSES_OLD.inc();
1474 11 : }
1475 : }
1476 : {
1477 2584 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1478 2584 : if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
1479 2563 : RELSIZE_SNAPSHOT_CACHE_HITS.inc();
1480 2563 : return Some(*nblock);
1481 21 : }
1482 : }
1483 21 : if version.is_latest() {
1484 10 : RELSIZE_LATEST_CACHE_MISSES.inc();
1485 11 : } else {
1486 11 : RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
1487 11 : }
1488 21 : None
1489 224270 : }
1490 :
1491 : /// Update cached relation size if there is no more recent update
1492 5 : pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
1493 5 : let lsn = version.get_lsn();
1494 5 : if version.is_latest() {
1495 0 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1496 0 : match rel_size_cache.entry(tag) {
1497 0 : hash_map::Entry::Occupied(mut entry) => {
1498 0 : let cached_lsn = entry.get_mut();
1499 0 : if lsn >= cached_lsn.0 {
1500 0 : *cached_lsn = (lsn, nblocks);
1501 0 : }
1502 : }
1503 0 : hash_map::Entry::Vacant(entry) => {
1504 0 : entry.insert((lsn, nblocks));
1505 0 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1506 0 : }
1507 : }
1508 : } else {
1509 5 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1510 5 : if rel_size_cache.capacity() != 0 {
1511 5 : rel_size_cache.insert((lsn, tag), nblocks);
1512 5 : RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
1513 5 : }
1514 : }
1515 5 : }
1516 :
1517 : /// Store cached relation size
1518 141360 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1519 141360 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1520 141360 : if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
1521 960 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1522 140400 : }
1523 141360 : }
1524 :
1525 : /// Remove cached relation size
1526 1 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1527 1 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1528 1 : if rel_size_cache.remove(tag).is_some() {
1529 1 : RELSIZE_LATEST_CACHE_ENTRIES.dec();
1530 1 : }
1531 1 : }
1532 : }
1533 :
1534 : /// DatadirModification represents an operation to ingest an atomic set of
1535 : /// updates to the repository.
1536 : ///
1537 : /// It is created by the 'begin_record' function. It is called for each WAL
1538 : /// record, so that all the modifications by a one WAL record appear atomic.
1539 : pub struct DatadirModification<'a> {
1540 : /// The timeline this modification applies to. You can access this to
1541 : /// read the state, but note that any pending updates are *not* reflected
1542 : /// in the state in 'tline' yet.
1543 : pub tline: &'a Timeline,
1544 :
1545 : /// Current LSN of the modification
1546 : lsn: Lsn,
1547 :
1548 : // The modifications are not applied directly to the underlying key-value store.
1549 : // The put-functions add the modifications here, and they are flushed to the
1550 : // underlying key-value store by the 'finish' function.
1551 : pending_lsns: Vec<Lsn>,
1552 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1553 : pending_nblocks: i64,
1554 :
1555 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1556 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1557 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1558 :
1559 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1560 : /// which keys are stored here.
1561 : pending_data_batch: Option<SerializedValueBatch>,
1562 :
1563 : /// For special "directory" keys that store key-value maps, track the size of the map
1564 : /// if it was updated in this modification.
1565 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1566 :
1567 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1568 : pending_metadata_bytes: usize,
1569 : }
1570 :
1571 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1572 : pub enum MetricsUpdate {
1573 : /// Set the metrics to this value
1574 : Set(u64),
1575 : /// Increment the metrics by this value
1576 : Add(u64),
1577 : /// Decrement the metrics by this value
1578 : Sub(u64),
1579 : }
1580 :
1581 : impl DatadirModification<'_> {
1582 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1583 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1584 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1585 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1586 :
1587 : /// Get the current lsn
1588 1 : pub(crate) fn get_lsn(&self) -> Lsn {
1589 1 : self.lsn
1590 1 : }
1591 :
1592 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1593 0 : self.pending_data_batch
1594 0 : .as_ref()
1595 0 : .map_or(0, |b| b.buffer_size())
1596 0 : + self.pending_metadata_bytes
1597 0 : }
1598 :
1599 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1600 0 : self.pending_data_batch
1601 0 : .as_ref()
1602 0 : .is_some_and(|b| b.has_data())
1603 0 : }
1604 :
1605 : /// Returns statistics about the currently pending modifications.
1606 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1607 0 : let mut stats = DatadirModificationStats::default();
1608 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1609 0 : match value {
1610 0 : Value::Image(_) => stats.metadata_images += 1,
1611 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1612 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1613 : }
1614 : }
1615 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1616 0 : match valuemeta {
1617 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1618 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1619 0 : ValueMeta::Observed(_) => {}
1620 : }
1621 : }
1622 0 : stats
1623 0 : }
1624 :
1625 : /// Set the current lsn
1626 72929 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
1627 72929 : ensure_walingest!(
1628 : lsn >= self.lsn,
1629 : "setting an older lsn {} than {} is not allowed",
1630 : lsn,
1631 : self.lsn
1632 : );
1633 :
1634 72929 : if lsn > self.lsn {
1635 72929 : self.pending_lsns.push(self.lsn);
1636 72929 : self.lsn = lsn;
1637 72929 : }
1638 72929 : Ok(())
1639 72929 : }
1640 :
1641 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1642 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1643 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1644 : ///
1645 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1646 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1647 : /// via [`Self::get`] as soon as their record has been ingested.
1648 425371 : fn is_data_key(key: &Key) -> bool {
1649 425371 : key.is_rel_block_key() || key.is_slru_block_key()
1650 425371 : }
1651 :
1652 : /// Initialize a completely new repository.
1653 : ///
1654 : /// This inserts the directory metadata entries that are assumed to
1655 : /// always exist.
1656 112 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1657 112 : let buf = DbDirectory::ser(&DbDirectory {
1658 112 : dbdirs: HashMap::new(),
1659 112 : })?;
1660 112 : self.pending_directory_entries
1661 112 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1662 112 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1663 :
1664 112 : let buf = if self.tline.pg_version >= PgMajorVersion::PG17 {
1665 99 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1666 99 : xids: HashSet::new(),
1667 99 : })
1668 : } else {
1669 13 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1670 13 : xids: HashSet::new(),
1671 13 : })
1672 0 : }?;
1673 112 : self.pending_directory_entries
1674 112 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1675 112 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1676 :
1677 112 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1678 112 : let empty_dir = Value::Image(buf);
1679 :
1680 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1681 : // harmless but they'd just be dropped on later compaction.
1682 112 : if self.tline.tenant_shard_id.is_shard_zero() {
1683 109 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1684 109 : self.pending_directory_entries.push((
1685 109 : DirectoryKind::SlruSegment(SlruKind::Clog),
1686 109 : MetricsUpdate::Set(0),
1687 109 : ));
1688 109 : self.put(
1689 109 : slru_dir_to_key(SlruKind::MultiXactMembers),
1690 109 : empty_dir.clone(),
1691 109 : );
1692 109 : self.pending_directory_entries.push((
1693 109 : DirectoryKind::SlruSegment(SlruKind::Clog),
1694 109 : MetricsUpdate::Set(0),
1695 109 : ));
1696 109 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1697 109 : self.pending_directory_entries.push((
1698 109 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1699 109 : MetricsUpdate::Set(0),
1700 109 : ));
1701 109 : }
1702 :
1703 112 : Ok(())
1704 112 : }
1705 :
1706 : #[cfg(test)]
1707 111 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1708 111 : self.init_empty()?;
1709 111 : self.put_control_file(bytes::Bytes::from_static(
1710 111 : b"control_file contents do not matter",
1711 : ))
1712 111 : .context("put_control_file")?;
1713 111 : self.put_checkpoint(bytes::Bytes::from_static(
1714 111 : b"checkpoint_file contents do not matter",
1715 : ))
1716 111 : .context("put_checkpoint_file")?;
1717 111 : Ok(())
1718 111 : }
1719 :
1720 : /// Creates a relation if it is not already present.
1721 : /// Returns the current size of the relation
1722 209028 : pub(crate) async fn create_relation_if_required(
1723 209028 : &mut self,
1724 209028 : rel: RelTag,
1725 209028 : ctx: &RequestContext,
1726 209028 : ) -> Result<u32, WalIngestError> {
1727 : // Get current size and put rel creation if rel doesn't exist
1728 : //
1729 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1730 : // check the cache too. This is because eagerly checking the cache results in
1731 : // less work overall and 10% better performance. It's more work on cache miss
1732 : // but cache miss is rare.
1733 209028 : if let Some(nblocks) = self
1734 209028 : .tline
1735 209028 : .get_cached_rel_size(&rel, Version::Modified(self))
1736 : {
1737 209023 : Ok(nblocks)
1738 5 : } else if !self
1739 5 : .tline
1740 5 : .get_rel_exists(rel, Version::Modified(self), ctx)
1741 5 : .await?
1742 : {
1743 : // create it with 0 size initially, the logic below will extend it
1744 5 : self.put_rel_creation(rel, 0, ctx).await?;
1745 5 : Ok(0)
1746 : } else {
1747 0 : Ok(self
1748 0 : .tline
1749 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1750 0 : .await?)
1751 : }
1752 209028 : }
1753 :
1754 : /// Given a block number for a relation (which represents a newly written block),
1755 : /// the previous block count of the relation, and the shard info, find the gaps
1756 : /// that were created by the newly written block if any.
1757 72835 : fn find_gaps(
1758 72835 : rel: RelTag,
1759 72835 : blkno: u32,
1760 72835 : previous_nblocks: u32,
1761 72835 : shard: &ShardIdentity,
1762 72835 : ) -> Option<KeySpace> {
1763 72835 : let mut key = rel_block_to_key(rel, blkno);
1764 72835 : let mut gap_accum = None;
1765 :
1766 72835 : for gap_blkno in previous_nblocks..blkno {
1767 16 : key.field6 = gap_blkno;
1768 :
1769 16 : if shard.get_shard_number(&key) != shard.number {
1770 4 : continue;
1771 12 : }
1772 :
1773 12 : gap_accum
1774 12 : .get_or_insert_with(KeySpaceAccum::new)
1775 12 : .add_key(key);
1776 : }
1777 :
1778 72835 : gap_accum.map(|accum| accum.to_keyspace())
1779 72835 : }
1780 :
1781 72926 : pub async fn ingest_batch(
1782 72926 : &mut self,
1783 72926 : mut batch: SerializedValueBatch,
1784 72926 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1785 72926 : shard: &ShardIdentity,
1786 72926 : ctx: &RequestContext,
1787 72926 : ) -> Result<(), WalIngestError> {
1788 72926 : let mut gaps_at_lsns = Vec::default();
1789 :
1790 72926 : for meta in batch.metadata.iter() {
1791 72821 : let key = Key::from_compact(meta.key());
1792 72821 : let (rel, blkno) = key
1793 72821 : .to_rel_block()
1794 72821 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
1795 72821 : let new_nblocks = blkno + 1;
1796 :
1797 72821 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1798 72821 : if new_nblocks > old_nblocks {
1799 1195 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1800 71626 : }
1801 :
1802 72821 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1803 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1804 72821 : }
1805 : }
1806 :
1807 72926 : if !gaps_at_lsns.is_empty() {
1808 0 : batch.zero_gaps(gaps_at_lsns);
1809 72926 : }
1810 :
1811 72926 : match self.pending_data_batch.as_mut() {
1812 10 : Some(pending_batch) => {
1813 10 : pending_batch.extend(batch);
1814 10 : }
1815 72916 : None if batch.has_data() => {
1816 72815 : self.pending_data_batch = Some(batch);
1817 72815 : }
1818 101 : None => {
1819 101 : // Nothing to initialize the batch with
1820 101 : }
1821 : }
1822 :
1823 72926 : Ok(())
1824 72926 : }
1825 :
1826 : /// Put a new page version that can be constructed from a WAL record
1827 : ///
1828 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1829 : /// current end-of-file. It's up to the caller to check that the relation size
1830 : /// matches the blocks inserted!
1831 6 : pub fn put_rel_wal_record(
1832 6 : &mut self,
1833 6 : rel: RelTag,
1834 6 : blknum: BlockNumber,
1835 6 : rec: NeonWalRecord,
1836 6 : ) -> Result<(), WalIngestError> {
1837 6 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1838 6 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1839 6 : Ok(())
1840 6 : }
1841 :
1842 : // Same, but for an SLRU.
1843 4 : pub fn put_slru_wal_record(
1844 4 : &mut self,
1845 4 : kind: SlruKind,
1846 4 : segno: u32,
1847 4 : blknum: BlockNumber,
1848 4 : rec: NeonWalRecord,
1849 4 : ) -> Result<(), WalIngestError> {
1850 4 : if !self.tline.tenant_shard_id.is_shard_zero() {
1851 0 : return Ok(());
1852 4 : }
1853 :
1854 4 : self.put(
1855 4 : slru_block_to_key(kind, segno, blknum),
1856 4 : Value::WalRecord(rec),
1857 : );
1858 4 : Ok(())
1859 4 : }
1860 :
1861 : /// Like put_wal_record, but with ready-made image of the page.
1862 138921 : pub fn put_rel_page_image(
1863 138921 : &mut self,
1864 138921 : rel: RelTag,
1865 138921 : blknum: BlockNumber,
1866 138921 : img: Bytes,
1867 138921 : ) -> Result<(), WalIngestError> {
1868 138921 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1869 138921 : let key = rel_block_to_key(rel, blknum);
1870 138921 : if !key.is_valid_key_on_write_path() {
1871 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1872 138921 : }
1873 138921 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1874 138921 : Ok(())
1875 138921 : }
1876 :
1877 3 : pub fn put_slru_page_image(
1878 3 : &mut self,
1879 3 : kind: SlruKind,
1880 3 : segno: u32,
1881 3 : blknum: BlockNumber,
1882 3 : img: Bytes,
1883 3 : ) -> Result<(), WalIngestError> {
1884 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1885 :
1886 3 : let key = slru_block_to_key(kind, segno, blknum);
1887 3 : if !key.is_valid_key_on_write_path() {
1888 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1889 3 : }
1890 3 : self.put(key, Value::Image(img));
1891 3 : Ok(())
1892 3 : }
1893 :
1894 1499 : pub(crate) fn put_rel_page_image_zero(
1895 1499 : &mut self,
1896 1499 : rel: RelTag,
1897 1499 : blknum: BlockNumber,
1898 1499 : ) -> Result<(), WalIngestError> {
1899 1499 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1900 1499 : let key = rel_block_to_key(rel, blknum);
1901 1499 : if !key.is_valid_key_on_write_path() {
1902 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1903 1499 : }
1904 :
1905 1499 : let batch = self
1906 1499 : .pending_data_batch
1907 1499 : .get_or_insert_with(SerializedValueBatch::default);
1908 :
1909 1499 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1910 :
1911 1499 : Ok(())
1912 1499 : }
1913 :
1914 0 : pub(crate) fn put_slru_page_image_zero(
1915 0 : &mut self,
1916 0 : kind: SlruKind,
1917 0 : segno: u32,
1918 0 : blknum: BlockNumber,
1919 0 : ) -> Result<(), WalIngestError> {
1920 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1921 0 : let key = slru_block_to_key(kind, segno, blknum);
1922 0 : if !key.is_valid_key_on_write_path() {
1923 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1924 0 : }
1925 :
1926 0 : let batch = self
1927 0 : .pending_data_batch
1928 0 : .get_or_insert_with(SerializedValueBatch::default);
1929 :
1930 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1931 :
1932 0 : Ok(())
1933 0 : }
1934 :
1935 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
1936 : /// we enable it, we also need to persist it in `index_part.json`.
1937 973 : pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
1938 973 : let status = self.tline.get_rel_size_v2_status();
1939 973 : let config = self.tline.get_rel_size_v2_enabled();
1940 973 : match (config, status) {
1941 : (false, RelSizeMigration::Legacy) => {
1942 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
1943 973 : Ok(false)
1944 : }
1945 : (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1946 : // index_part already persisted that the timeline has enabled rel_size_v2
1947 0 : Ok(true)
1948 : }
1949 : (true, RelSizeMigration::Legacy) => {
1950 : // The first time we enable it, we need to persist it in `index_part.json`
1951 0 : self.tline
1952 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
1953 0 : tracing::info!("enabled rel_size_v2");
1954 0 : Ok(true)
1955 : }
1956 : (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1957 : // index_part already persisted that the timeline has enabled rel_size_v2
1958 : // and we don't need to do anything
1959 0 : Ok(true)
1960 : }
1961 : }
1962 973 : }
1963 :
1964 : /// Store a relmapper file (pg_filenode.map) in the repository
1965 8 : pub async fn put_relmap_file(
1966 8 : &mut self,
1967 8 : spcnode: Oid,
1968 8 : dbnode: Oid,
1969 8 : img: Bytes,
1970 8 : ctx: &RequestContext,
1971 8 : ) -> Result<(), WalIngestError> {
1972 8 : let v2_enabled = self
1973 8 : .maybe_enable_rel_size_v2()
1974 8 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
1975 :
1976 : // Add it to the directory (if it doesn't exist already)
1977 8 : let buf = self.get(DBDIR_KEY, ctx).await?;
1978 8 : let mut dbdir = DbDirectory::des(&buf)?;
1979 :
1980 8 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1981 8 : if r.is_none() || r == Some(false) {
1982 : // The dbdir entry didn't exist, or it contained a
1983 : // 'false'. The 'insert' call already updated it with
1984 : // 'true', now write the updated 'dbdirs' map back.
1985 8 : let buf = DbDirectory::ser(&dbdir)?;
1986 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1987 0 : }
1988 8 : if r.is_none() {
1989 : // Create RelDirectory
1990 : // TODO: if we have fully migrated to v2, no need to create this directory
1991 4 : let buf = RelDirectory::ser(&RelDirectory {
1992 4 : rels: HashSet::new(),
1993 4 : })?;
1994 4 : self.pending_directory_entries
1995 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1996 4 : if v2_enabled {
1997 0 : self.pending_directory_entries
1998 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1999 4 : }
2000 4 : self.put(
2001 4 : rel_dir_to_key(spcnode, dbnode),
2002 4 : Value::Image(Bytes::from(buf)),
2003 : );
2004 4 : }
2005 :
2006 8 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
2007 8 : Ok(())
2008 8 : }
2009 :
2010 0 : pub async fn put_twophase_file(
2011 0 : &mut self,
2012 0 : xid: u64,
2013 0 : img: Bytes,
2014 0 : ctx: &RequestContext,
2015 0 : ) -> Result<(), WalIngestError> {
2016 : // Add it to the directory entry
2017 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2018 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2019 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
2020 0 : if !dir.xids.insert(xid) {
2021 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
2022 0 : }
2023 0 : self.pending_directory_entries.push((
2024 0 : DirectoryKind::TwoPhase,
2025 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2026 0 : ));
2027 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2028 : } else {
2029 0 : let xid = xid as u32;
2030 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
2031 0 : if !dir.xids.insert(xid) {
2032 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
2033 0 : }
2034 0 : self.pending_directory_entries.push((
2035 0 : DirectoryKind::TwoPhase,
2036 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2037 0 : ));
2038 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2039 : };
2040 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2041 :
2042 0 : self.put(twophase_file_key(xid), Value::Image(img));
2043 0 : Ok(())
2044 0 : }
2045 :
2046 1 : pub async fn set_replorigin(
2047 1 : &mut self,
2048 1 : origin_id: RepOriginId,
2049 1 : origin_lsn: Lsn,
2050 1 : ) -> Result<(), WalIngestError> {
2051 1 : let key = repl_origin_key(origin_id);
2052 1 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
2053 1 : Ok(())
2054 1 : }
2055 :
2056 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
2057 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
2058 0 : }
2059 :
2060 112 : pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2061 112 : self.put(CONTROLFILE_KEY, Value::Image(img));
2062 112 : Ok(())
2063 112 : }
2064 :
2065 119 : pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2066 119 : self.put(CHECKPOINT_KEY, Value::Image(img));
2067 119 : Ok(())
2068 119 : }
2069 :
2070 0 : pub async fn drop_dbdir(
2071 0 : &mut self,
2072 0 : spcnode: Oid,
2073 0 : dbnode: Oid,
2074 0 : ctx: &RequestContext,
2075 0 : ) -> Result<(), WalIngestError> {
2076 0 : let total_blocks = self
2077 0 : .tline
2078 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
2079 0 : .await?;
2080 :
2081 : // Remove entry from dbdir
2082 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
2083 0 : let mut dir = DbDirectory::des(&buf)?;
2084 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
2085 0 : let buf = DbDirectory::ser(&dir)?;
2086 0 : self.pending_directory_entries.push((
2087 0 : DirectoryKind::Db,
2088 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
2089 0 : ));
2090 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2091 : } else {
2092 0 : warn!(
2093 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
2094 : spcnode, dbnode
2095 : );
2096 : }
2097 :
2098 : // Update logical database size.
2099 0 : self.pending_nblocks -= total_blocks as i64;
2100 :
2101 : // Delete all relations and metadata files for the spcnode/dnode
2102 0 : self.delete(dbdir_key_range(spcnode, dbnode));
2103 0 : Ok(())
2104 0 : }
2105 :
2106 : /// Create a relation fork.
2107 : ///
2108 : /// 'nblocks' is the initial size.
2109 960 : pub async fn put_rel_creation(
2110 960 : &mut self,
2111 960 : rel: RelTag,
2112 960 : nblocks: BlockNumber,
2113 960 : ctx: &RequestContext,
2114 960 : ) -> Result<(), WalIngestError> {
2115 960 : if rel.relnode == 0 {
2116 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
2117 0 : "invalid relnode"
2118 0 : )))?;
2119 960 : }
2120 : // It's possible that this is the first rel for this db in this
2121 : // tablespace. Create the reldir entry for it if so.
2122 960 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
2123 :
2124 960 : let dbdir_exists =
2125 960 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
2126 : // Didn't exist. Update dbdir
2127 4 : e.insert(false);
2128 4 : let buf = DbDirectory::ser(&dbdir)?;
2129 4 : self.pending_directory_entries.push((
2130 4 : DirectoryKind::Db,
2131 4 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
2132 4 : ));
2133 4 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2134 4 : false
2135 : } else {
2136 956 : true
2137 : };
2138 :
2139 960 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
2140 960 : let mut rel_dir = if !dbdir_exists {
2141 : // Create the RelDirectory
2142 4 : RelDirectory::default()
2143 : } else {
2144 : // reldir already exists, fetch it
2145 956 : RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
2146 : };
2147 :
2148 960 : let v2_enabled = self
2149 960 : .maybe_enable_rel_size_v2()
2150 960 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2151 :
2152 960 : if v2_enabled {
2153 0 : if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
2154 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2155 0 : }
2156 0 : let sparse_rel_dir_key =
2157 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2158 : // check if the rel_dir_key exists in v2
2159 0 : let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
2160 0 : let val = RelDirExists::decode_option(val)
2161 0 : .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
2162 0 : if val == RelDirExists::Exists {
2163 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2164 0 : }
2165 0 : self.put(
2166 0 : sparse_rel_dir_key,
2167 0 : Value::Image(RelDirExists::Exists.encode()),
2168 : );
2169 0 : if !dbdir_exists {
2170 0 : self.pending_directory_entries
2171 0 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2172 0 : self.pending_directory_entries
2173 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2174 : // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
2175 : // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
2176 : // will be key not found errors if we don't create an empty one for rel_size_v2.
2177 0 : self.put(
2178 0 : rel_dir_key,
2179 0 : Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
2180 : );
2181 0 : }
2182 0 : self.pending_directory_entries
2183 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2184 : } else {
2185 : // Add the new relation to the rel directory entry, and write it back
2186 960 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2187 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2188 960 : }
2189 960 : if !dbdir_exists {
2190 4 : self.pending_directory_entries
2191 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2192 956 : }
2193 960 : self.pending_directory_entries
2194 960 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2195 960 : self.put(
2196 960 : rel_dir_key,
2197 960 : Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
2198 : );
2199 : }
2200 :
2201 : // Put size
2202 960 : let size_key = rel_size_to_key(rel);
2203 960 : let buf = nblocks.to_le_bytes();
2204 960 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2205 :
2206 960 : self.pending_nblocks += nblocks as i64;
2207 :
2208 : // Update relation size cache
2209 960 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2210 :
2211 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2212 : // caller.
2213 960 : Ok(())
2214 960 : }
2215 :
2216 : /// Truncate relation
2217 3006 : pub async fn put_rel_truncation(
2218 3006 : &mut self,
2219 3006 : rel: RelTag,
2220 3006 : nblocks: BlockNumber,
2221 3006 : ctx: &RequestContext,
2222 3006 : ) -> Result<(), WalIngestError> {
2223 3006 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2224 3006 : if self
2225 3006 : .tline
2226 3006 : .get_rel_exists(rel, Version::Modified(self), ctx)
2227 3006 : .await?
2228 : {
2229 3006 : let size_key = rel_size_to_key(rel);
2230 : // Fetch the old size first
2231 3006 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2232 :
2233 : // Update the entry with the new size.
2234 3006 : let buf = nblocks.to_le_bytes();
2235 3006 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2236 :
2237 : // Update relation size cache
2238 3006 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2239 :
2240 : // Update logical database size.
2241 3006 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2242 0 : }
2243 3006 : Ok(())
2244 3006 : }
2245 :
2246 : /// Extend relation
2247 : /// If new size is smaller, do nothing.
2248 138340 : pub async fn put_rel_extend(
2249 138340 : &mut self,
2250 138340 : rel: RelTag,
2251 138340 : nblocks: BlockNumber,
2252 138340 : ctx: &RequestContext,
2253 138340 : ) -> Result<(), WalIngestError> {
2254 138340 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2255 :
2256 : // Put size
2257 138340 : let size_key = rel_size_to_key(rel);
2258 138340 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2259 :
2260 : // only extend relation here. never decrease the size
2261 138340 : if nblocks > old_size {
2262 137394 : let buf = nblocks.to_le_bytes();
2263 137394 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2264 137394 :
2265 137394 : // Update relation size cache
2266 137394 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2267 137394 :
2268 137394 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2269 137394 : }
2270 138340 : Ok(())
2271 138340 : }
2272 :
2273 : /// Drop some relations
2274 5 : pub(crate) async fn put_rel_drops(
2275 5 : &mut self,
2276 5 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2277 5 : ctx: &RequestContext,
2278 5 : ) -> Result<(), WalIngestError> {
2279 5 : let v2_enabled = self
2280 5 : .maybe_enable_rel_size_v2()
2281 5 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2282 6 : for ((spc_node, db_node), rel_tags) in drop_relations {
2283 1 : let dir_key = rel_dir_to_key(spc_node, db_node);
2284 1 : let buf = self.get(dir_key, ctx).await?;
2285 1 : let mut dir = RelDirectory::des(&buf)?;
2286 :
2287 1 : let mut dirty = false;
2288 2 : for rel_tag in rel_tags {
2289 1 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2290 1 : self.pending_directory_entries
2291 1 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2292 1 : dirty = true;
2293 1 : true
2294 0 : } else if v2_enabled {
2295 : // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
2296 : // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
2297 : // logic).
2298 0 : let key =
2299 0 : rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2300 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2301 0 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2302 0 : if val == RelDirExists::Exists {
2303 0 : self.pending_directory_entries
2304 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2305 : // put tombstone
2306 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2307 : // no need to set dirty to true
2308 0 : true
2309 : } else {
2310 0 : false
2311 : }
2312 : } else {
2313 0 : false
2314 : };
2315 :
2316 1 : if found {
2317 : // update logical size
2318 1 : let size_key = rel_size_to_key(rel_tag);
2319 1 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2320 1 : self.pending_nblocks -= old_size as i64;
2321 :
2322 : // Remove entry from relation size cache
2323 1 : self.tline.remove_cached_rel_size(&rel_tag);
2324 :
2325 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2326 1 : self.delete(rel_key_range(rel_tag));
2327 0 : }
2328 : }
2329 :
2330 1 : if dirty {
2331 1 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2332 0 : }
2333 : }
2334 :
2335 5 : Ok(())
2336 5 : }
2337 :
2338 3 : pub async fn put_slru_segment_creation(
2339 3 : &mut self,
2340 3 : kind: SlruKind,
2341 3 : segno: u32,
2342 3 : nblocks: BlockNumber,
2343 3 : ctx: &RequestContext,
2344 3 : ) -> Result<(), WalIngestError> {
2345 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2346 :
2347 : // Add it to the directory entry
2348 3 : let dir_key = slru_dir_to_key(kind);
2349 3 : let buf = self.get(dir_key, ctx).await?;
2350 3 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2351 :
2352 3 : if !dir.segments.insert(segno) {
2353 0 : Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
2354 3 : }
2355 3 : self.pending_directory_entries.push((
2356 3 : DirectoryKind::SlruSegment(kind),
2357 3 : MetricsUpdate::Set(dir.segments.len() as u64),
2358 3 : ));
2359 3 : self.put(
2360 3 : dir_key,
2361 3 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2362 : );
2363 :
2364 : // Put size
2365 3 : let size_key = slru_segment_size_to_key(kind, segno);
2366 3 : let buf = nblocks.to_le_bytes();
2367 3 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2368 :
2369 : // even if nblocks > 0, we don't insert any actual blocks here
2370 :
2371 3 : Ok(())
2372 3 : }
2373 :
2374 : /// Extend SLRU segment
2375 0 : pub fn put_slru_extend(
2376 0 : &mut self,
2377 0 : kind: SlruKind,
2378 0 : segno: u32,
2379 0 : nblocks: BlockNumber,
2380 0 : ) -> Result<(), WalIngestError> {
2381 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2382 :
2383 : // Put size
2384 0 : let size_key = slru_segment_size_to_key(kind, segno);
2385 0 : let buf = nblocks.to_le_bytes();
2386 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2387 0 : Ok(())
2388 0 : }
2389 :
2390 : /// This method is used for marking truncated SLRU files
2391 0 : pub async fn drop_slru_segment(
2392 0 : &mut self,
2393 0 : kind: SlruKind,
2394 0 : segno: u32,
2395 0 : ctx: &RequestContext,
2396 0 : ) -> Result<(), WalIngestError> {
2397 : // Remove it from the directory entry
2398 0 : let dir_key = slru_dir_to_key(kind);
2399 0 : let buf = self.get(dir_key, ctx).await?;
2400 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2401 :
2402 0 : if !dir.segments.remove(&segno) {
2403 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2404 0 : }
2405 0 : self.pending_directory_entries.push((
2406 0 : DirectoryKind::SlruSegment(kind),
2407 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2408 0 : ));
2409 0 : self.put(
2410 0 : dir_key,
2411 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2412 : );
2413 :
2414 : // Delete size entry, as well as all blocks
2415 0 : self.delete(slru_segment_key_range(kind, segno));
2416 :
2417 0 : Ok(())
2418 0 : }
2419 :
2420 : /// Drop a relmapper file (pg_filenode.map)
2421 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
2422 : // TODO
2423 0 : Ok(())
2424 0 : }
2425 :
2426 : /// This method is used for marking truncated SLRU files
2427 0 : pub async fn drop_twophase_file(
2428 0 : &mut self,
2429 0 : xid: u64,
2430 0 : ctx: &RequestContext,
2431 0 : ) -> Result<(), WalIngestError> {
2432 : // Remove it from the directory entry
2433 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2434 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2435 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2436 :
2437 0 : if !dir.xids.remove(&xid) {
2438 0 : warn!("twophase file for xid {} does not exist", xid);
2439 0 : }
2440 0 : self.pending_directory_entries.push((
2441 0 : DirectoryKind::TwoPhase,
2442 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2443 0 : ));
2444 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2445 : } else {
2446 0 : let xid: u32 = u32::try_from(xid)
2447 0 : .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
2448 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2449 :
2450 0 : if !dir.xids.remove(&xid) {
2451 0 : warn!("twophase file for xid {} does not exist", xid);
2452 0 : }
2453 0 : self.pending_directory_entries.push((
2454 0 : DirectoryKind::TwoPhase,
2455 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2456 0 : ));
2457 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2458 : };
2459 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2460 :
2461 : // Delete it
2462 0 : self.delete(twophase_key_range(xid));
2463 :
2464 0 : Ok(())
2465 0 : }
2466 :
2467 8 : pub async fn put_file(
2468 8 : &mut self,
2469 8 : path: &str,
2470 8 : content: &[u8],
2471 8 : ctx: &RequestContext,
2472 8 : ) -> Result<(), WalIngestError> {
2473 8 : let key = aux_file::encode_aux_file_key(path);
2474 : // retrieve the key from the engine
2475 8 : let old_val = match self.get(key, ctx).await {
2476 2 : Ok(val) => Some(val),
2477 6 : Err(PageReconstructError::MissingKey(_)) => None,
2478 0 : Err(e) => return Err(e.into()),
2479 : };
2480 8 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2481 2 : aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
2482 : } else {
2483 6 : Vec::new()
2484 : };
2485 8 : let mut other_files = Vec::with_capacity(files.len());
2486 8 : let mut modifying_file = None;
2487 10 : for file @ (p, content) in files {
2488 2 : if path == p {
2489 2 : assert!(
2490 2 : modifying_file.is_none(),
2491 0 : "duplicated entries found for {path}"
2492 : );
2493 2 : modifying_file = Some(content);
2494 0 : } else {
2495 0 : other_files.push(file);
2496 0 : }
2497 : }
2498 8 : let mut new_files = other_files;
2499 8 : match (modifying_file, content.is_empty()) {
2500 1 : (Some(old_content), false) => {
2501 1 : self.tline
2502 1 : .aux_file_size_estimator
2503 1 : .on_update(old_content.len(), content.len());
2504 1 : new_files.push((path, content));
2505 1 : }
2506 1 : (Some(old_content), true) => {
2507 1 : self.tline
2508 1 : .aux_file_size_estimator
2509 1 : .on_remove(old_content.len());
2510 1 : // not adding the file key to the final `new_files` vec.
2511 1 : }
2512 6 : (None, false) => {
2513 6 : self.tline.aux_file_size_estimator.on_add(content.len());
2514 6 : new_files.push((path, content));
2515 6 : }
2516 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2517 : // Compute doesn't know if previous version of this file exists or not, so
2518 : // attempt to delete non-existing file can cause this message.
2519 : // To avoid false alarms, log it as info rather than warning.
2520 0 : (None, true) if path.starts_with("pg_stat/") => {
2521 0 : info!("removing non-existing pg_stat file: {}", path)
2522 : }
2523 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2524 : }
2525 8 : let new_val = aux_file::encode_file_value(&new_files)
2526 8 : .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
2527 8 : self.put(key, Value::Image(new_val.into()));
2528 :
2529 8 : Ok(())
2530 8 : }
2531 :
2532 : ///
2533 : /// Flush changes accumulated so far to the underlying repository.
2534 : ///
2535 : /// Usually, changes made in DatadirModification are atomic, but this allows
2536 : /// you to flush them to the underlying repository before the final `commit`.
2537 : /// That allows to free up the memory used to hold the pending changes.
2538 : ///
2539 : /// Currently only used during bulk import of a data directory. In that
2540 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2541 : /// whole import fails and the timeline will be deleted anyway.
2542 : /// (Or to be precise, it will be left behind for debugging purposes and
2543 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2544 : ///
2545 : /// Note: A consequence of flushing the pending operations is that they
2546 : /// won't be visible to subsequent operations until `commit`. The function
2547 : /// retains all the metadata, but data pages are flushed. That's again OK
2548 : /// for bulk import, where you are just loading data pages and won't try to
2549 : /// modify the same pages twice.
2550 965 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2551 : // Unless we have accumulated a decent amount of changes, it's not worth it
2552 : // to scan through the pending_updates list.
2553 965 : let pending_nblocks = self.pending_nblocks;
2554 965 : if pending_nblocks < 10000 {
2555 965 : return Ok(());
2556 0 : }
2557 :
2558 0 : let mut writer = self.tline.writer().await;
2559 :
2560 : // Flush relation and SLRU data blocks, keep metadata.
2561 0 : if let Some(batch) = self.pending_data_batch.take() {
2562 0 : tracing::debug!(
2563 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2564 : batch.max_lsn,
2565 0 : self.tline.get_last_record_lsn()
2566 : );
2567 :
2568 : // This bails out on first error without modifying pending_updates.
2569 : // That's Ok, cf this function's doc comment.
2570 0 : writer.put_batch(batch, ctx).await?;
2571 0 : }
2572 :
2573 0 : if pending_nblocks != 0 {
2574 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2575 0 : self.pending_nblocks = 0;
2576 0 : }
2577 :
2578 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2579 0 : writer.update_directory_entries_count(kind, count);
2580 0 : }
2581 :
2582 0 : Ok(())
2583 965 : }
2584 :
2585 : ///
2586 : /// Finish this atomic update, writing all the updated keys to the
2587 : /// underlying timeline.
2588 : /// All the modifications in this atomic update are stamped by the specified LSN.
2589 : ///
2590 371557 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2591 371557 : let mut writer = self.tline.writer().await;
2592 :
2593 371557 : let pending_nblocks = self.pending_nblocks;
2594 371557 : self.pending_nblocks = 0;
2595 :
2596 : // Ordering: the items in this batch do not need to be in any global order, but values for
2597 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2598 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2599 : // more details.
2600 :
2601 371557 : let metadata_batch = {
2602 371557 : let pending_meta = self
2603 371557 : .pending_metadata_pages
2604 371557 : .drain()
2605 371557 : .flat_map(|(key, values)| {
2606 137068 : values
2607 137068 : .into_iter()
2608 137068 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2609 137068 : })
2610 371557 : .collect::<Vec<_>>();
2611 :
2612 371557 : if pending_meta.is_empty() {
2613 236139 : None
2614 : } else {
2615 135418 : Some(SerializedValueBatch::from_values(pending_meta))
2616 : }
2617 : };
2618 :
2619 371557 : let data_batch = self.pending_data_batch.take();
2620 :
2621 371557 : let maybe_batch = match (data_batch, metadata_batch) {
2622 132278 : (Some(mut data), Some(metadata)) => {
2623 132278 : data.extend(metadata);
2624 132278 : Some(data)
2625 : }
2626 71631 : (Some(data), None) => Some(data),
2627 3140 : (None, Some(metadata)) => Some(metadata),
2628 164508 : (None, None) => None,
2629 : };
2630 :
2631 371557 : if let Some(batch) = maybe_batch {
2632 207049 : tracing::debug!(
2633 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2634 : batch.max_lsn,
2635 0 : self.tline.get_last_record_lsn()
2636 : );
2637 :
2638 : // This bails out on first error without modifying pending_updates.
2639 : // That's Ok, cf this function's doc comment.
2640 207049 : writer.put_batch(batch, ctx).await?;
2641 164508 : }
2642 :
2643 371557 : if !self.pending_deletions.is_empty() {
2644 1 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2645 1 : self.pending_deletions.clear();
2646 371556 : }
2647 :
2648 371557 : self.pending_lsns.push(self.lsn);
2649 444486 : for pending_lsn in self.pending_lsns.drain(..) {
2650 444486 : // TODO(vlad): pretty sure the comment below is not valid anymore
2651 444486 : // and we can call finish write with the latest LSN
2652 444486 : //
2653 444486 : // Ideally, we should be able to call writer.finish_write() only once
2654 444486 : // with the highest LSN. However, the last_record_lsn variable in the
2655 444486 : // timeline keeps track of the latest LSN and the immediate previous LSN
2656 444486 : // so we need to record every LSN to not leave a gap between them.
2657 444486 : writer.finish_write(pending_lsn);
2658 444486 : }
2659 :
2660 371557 : if pending_nblocks != 0 {
2661 135285 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2662 236272 : }
2663 :
2664 371557 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2665 1527 : writer.update_directory_entries_count(kind, count);
2666 1527 : }
2667 :
2668 371557 : self.pending_metadata_bytes = 0;
2669 :
2670 371557 : Ok(())
2671 371557 : }
2672 :
2673 145852 : pub(crate) fn len(&self) -> usize {
2674 145852 : self.pending_metadata_pages.len()
2675 145852 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2676 145852 : + self.pending_deletions.len()
2677 145852 : }
2678 :
2679 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2680 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2681 : /// in the modification.
2682 : ///
2683 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2684 : /// page must ensure that the pages they read are already committed in Timeline, for example
2685 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2686 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2687 : /// and not data pages.
2688 143293 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2689 143293 : if !Self::is_data_key(&key) {
2690 : // Have we already updated the same key? Read the latest pending updated
2691 : // version in that case.
2692 : //
2693 : // Note: we don't check pending_deletions. It is an error to request a
2694 : // value that has been removed, deletion only avoids leaking storage.
2695 143293 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2696 7964 : if let Some((_, _, value)) = values.last() {
2697 7964 : return if let Value::Image(img) = value {
2698 7964 : Ok(img.clone())
2699 : } else {
2700 : // Currently, we never need to read back a WAL record that we
2701 : // inserted in the same "transaction". All the metadata updates
2702 : // work directly with Images, and we never need to read actual
2703 : // data pages. We could handle this if we had to, by calling
2704 : // the walredo manager, but let's keep it simple for now.
2705 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2706 0 : "unexpected pending WAL record"
2707 0 : )))
2708 : };
2709 0 : }
2710 135329 : }
2711 : } else {
2712 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2713 : // this key should never be present in pending_data_pages. We ensure this by committing
2714 : // modifications before ingesting DB create operations, which are the only kind that reads
2715 : // data pages during ingest.
2716 0 : if cfg!(debug_assertions) {
2717 0 : assert!(
2718 0 : !self
2719 0 : .pending_data_batch
2720 0 : .as_ref()
2721 0 : .is_some_and(|b| b.updates_key(&key))
2722 : );
2723 0 : }
2724 : }
2725 :
2726 : // Metadata page cache miss, or we're reading a data page.
2727 135329 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2728 135329 : self.tline.get(key, lsn, ctx).await
2729 143293 : }
2730 :
2731 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2732 : /// and the empty value into None.
2733 0 : async fn sparse_get(
2734 0 : &self,
2735 0 : key: Key,
2736 0 : ctx: &RequestContext,
2737 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2738 0 : let val = self.get(key, ctx).await;
2739 0 : match val {
2740 0 : Ok(val) if val.is_empty() => Ok(None),
2741 0 : Ok(val) => Ok(Some(val)),
2742 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2743 0 : Err(e) => Err(e),
2744 : }
2745 0 : }
2746 :
2747 : #[cfg(test)]
2748 2 : pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
2749 2 : self.put(key, val);
2750 2 : }
2751 :
2752 282078 : fn put(&mut self, key: Key, val: Value) {
2753 282078 : if Self::is_data_key(&key) {
2754 138934 : self.put_data(key.to_compact(), val)
2755 : } else {
2756 143144 : self.put_metadata(key.to_compact(), val)
2757 : }
2758 282078 : }
2759 :
2760 138934 : fn put_data(&mut self, key: CompactKey, val: Value) {
2761 138934 : let batch = self
2762 138934 : .pending_data_batch
2763 138934 : .get_or_insert_with(SerializedValueBatch::default);
2764 138934 : batch.put(key, val, self.lsn);
2765 138934 : }
2766 :
2767 143144 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2768 143144 : let values = self.pending_metadata_pages.entry(key).or_default();
2769 : // Replace the previous value if it exists at the same lsn
2770 143144 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2771 6076 : if *last_lsn == self.lsn {
2772 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2773 6076 : self.pending_metadata_bytes -= *last_value_ser_size;
2774 6076 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2775 6076 : self.pending_metadata_bytes += *last_value_ser_size;
2776 :
2777 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2778 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2779 6076 : *last_value = val;
2780 6076 : return;
2781 0 : }
2782 137068 : }
2783 :
2784 137068 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2785 137068 : self.pending_metadata_bytes += val_serialized_size;
2786 137068 : values.push((self.lsn, val_serialized_size, val));
2787 :
2788 137068 : if key == CHECKPOINT_KEY.to_compact() {
2789 119 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2790 136949 : }
2791 143144 : }
2792 :
2793 1 : fn delete(&mut self, key_range: Range<Key>) {
2794 1 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2795 1 : self.pending_deletions.push((key_range, self.lsn));
2796 1 : }
2797 : }
2798 :
2799 : /// Statistics for a DatadirModification.
2800 : #[derive(Default)]
2801 : pub struct DatadirModificationStats {
2802 : pub metadata_images: u64,
2803 : pub metadata_deltas: u64,
2804 : pub data_images: u64,
2805 : pub data_deltas: u64,
2806 : }
2807 :
2808 : /// This struct facilitates accessing either a committed key from the timeline at a
2809 : /// specific LSN, or the latest uncommitted key from a pending modification.
2810 : ///
2811 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2812 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2813 : /// need to look up the keys in the modification first before looking them up in the
2814 : /// timeline to not miss the latest updates.
2815 : #[derive(Clone, Copy)]
2816 : pub enum Version<'a> {
2817 : LsnRange(LsnRange),
2818 : Modified(&'a DatadirModification<'a>),
2819 : }
2820 :
2821 : impl Version<'_> {
2822 25 : async fn get(
2823 25 : &self,
2824 25 : timeline: &Timeline,
2825 25 : key: Key,
2826 25 : ctx: &RequestContext,
2827 25 : ) -> Result<Bytes, PageReconstructError> {
2828 25 : match self {
2829 15 : Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
2830 10 : Version::Modified(modification) => modification.get(key, ctx).await,
2831 : }
2832 25 : }
2833 :
2834 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2835 : /// and the empty value into None.
2836 0 : async fn sparse_get(
2837 0 : &self,
2838 0 : timeline: &Timeline,
2839 0 : key: Key,
2840 0 : ctx: &RequestContext,
2841 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2842 0 : let val = self.get(timeline, key, ctx).await;
2843 0 : match val {
2844 0 : Ok(val) if val.is_empty() => Ok(None),
2845 0 : Ok(val) => Ok(Some(val)),
2846 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2847 0 : Err(e) => Err(e),
2848 : }
2849 0 : }
2850 :
2851 26 : pub fn is_latest(&self) -> bool {
2852 26 : match self {
2853 16 : Version::LsnRange(lsns) => lsns.is_latest(),
2854 10 : Version::Modified(_) => true,
2855 : }
2856 26 : }
2857 :
2858 224275 : pub fn get_lsn(&self) -> Lsn {
2859 224275 : match self {
2860 12224 : Version::LsnRange(lsns) => lsns.effective_lsn,
2861 212051 : Version::Modified(modification) => modification.lsn,
2862 : }
2863 224275 : }
2864 :
2865 12219 : pub fn at(lsn: Lsn) -> Self {
2866 12219 : Version::LsnRange(LsnRange {
2867 12219 : effective_lsn: lsn,
2868 12219 : request_lsn: lsn,
2869 12219 : })
2870 12219 : }
2871 : }
2872 :
2873 : //--- Metadata structs stored in key-value pairs in the repository.
2874 :
2875 0 : #[derive(Debug, Serialize, Deserialize)]
2876 : pub(crate) struct DbDirectory {
2877 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2878 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2879 : }
2880 :
2881 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2882 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2883 : // were named like "pg_twophase/000002E5", now they're like
2884 : // "pg_twophsae/0000000A000002E4".
2885 :
2886 0 : #[derive(Debug, Serialize, Deserialize)]
2887 : pub(crate) struct TwoPhaseDirectory {
2888 : pub(crate) xids: HashSet<TransactionId>,
2889 : }
2890 :
2891 0 : #[derive(Debug, Serialize, Deserialize)]
2892 : struct TwoPhaseDirectoryV17 {
2893 : xids: HashSet<u64>,
2894 : }
2895 :
2896 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2897 : pub(crate) struct RelDirectory {
2898 : // Set of relations that exist. (relfilenode, forknum)
2899 : //
2900 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2901 : // key-value pairs, if you have a lot of relations
2902 : pub(crate) rels: HashSet<(Oid, u8)>,
2903 : }
2904 :
2905 0 : #[derive(Debug, Serialize, Deserialize)]
2906 : struct RelSizeEntry {
2907 : nblocks: u32,
2908 : }
2909 :
2910 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2911 : pub(crate) struct SlruSegmentDirectory {
2912 : // Set of SLRU segments that exist.
2913 : pub(crate) segments: HashSet<u32>,
2914 : }
2915 :
2916 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2917 : #[repr(u8)]
2918 : pub(crate) enum DirectoryKind {
2919 : Db,
2920 : TwoPhase,
2921 : Rel,
2922 : AuxFiles,
2923 : SlruSegment(SlruKind),
2924 : RelV2,
2925 : }
2926 :
2927 : impl DirectoryKind {
2928 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2929 4582 : pub(crate) fn offset(&self) -> usize {
2930 4582 : self.into_usize()
2931 4582 : }
2932 : }
2933 :
2934 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2935 :
2936 : #[allow(clippy::bool_assert_comparison)]
2937 : #[cfg(test)]
2938 : mod tests {
2939 : use hex_literal::hex;
2940 : use pageserver_api::models::ShardParameters;
2941 : use utils::id::TimelineId;
2942 : use utils::shard::{ShardCount, ShardNumber, ShardStripeSize};
2943 :
2944 : use super::*;
2945 : use crate::DEFAULT_PG_VERSION;
2946 : use crate::tenant::harness::TenantHarness;
2947 :
2948 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2949 : #[tokio::test]
2950 1 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2951 1 : let name = "aux_files_round_trip";
2952 1 : let harness = TenantHarness::create(name).await?;
2953 :
2954 : pub const TIMELINE_ID: TimelineId =
2955 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2956 :
2957 1 : let (tenant, ctx) = harness.load().await;
2958 1 : let (tline, ctx) = tenant
2959 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2960 1 : .await?;
2961 1 : let tline = tline.raw_timeline().unwrap();
2962 :
2963 : // First modification: insert two keys
2964 1 : let mut modification = tline.begin_modification(Lsn(0x1000));
2965 1 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2966 1 : modification.set_lsn(Lsn(0x1008))?;
2967 1 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2968 1 : modification.commit(&ctx).await?;
2969 1 : let expect_1008 = HashMap::from([
2970 1 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2971 1 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2972 1 : ]);
2973 :
2974 1 : let io_concurrency = IoConcurrency::spawn_for_test();
2975 :
2976 1 : let readback = tline
2977 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2978 1 : .await?;
2979 1 : assert_eq!(readback, expect_1008);
2980 :
2981 : // Second modification: update one key, remove the other
2982 1 : let mut modification = tline.begin_modification(Lsn(0x2000));
2983 1 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2984 1 : modification.set_lsn(Lsn(0x2008))?;
2985 1 : modification.put_file("foo/bar2", b"", &ctx).await?;
2986 1 : modification.commit(&ctx).await?;
2987 1 : let expect_2008 =
2988 1 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2989 :
2990 1 : let readback = tline
2991 1 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2992 1 : .await?;
2993 1 : assert_eq!(readback, expect_2008);
2994 :
2995 : // Reading back in time works
2996 1 : let readback = tline
2997 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2998 1 : .await?;
2999 1 : assert_eq!(readback, expect_1008);
3000 :
3001 2 : Ok(())
3002 1 : }
3003 :
3004 : #[test]
3005 1 : fn gap_finding() {
3006 1 : let rel = RelTag {
3007 1 : spcnode: 1663,
3008 1 : dbnode: 208101,
3009 1 : relnode: 2620,
3010 1 : forknum: 0,
3011 1 : };
3012 1 : let base_blkno = 1;
3013 :
3014 1 : let base_key = rel_block_to_key(rel, base_blkno);
3015 1 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
3016 :
3017 1 : let shard = ShardIdentity::unsharded();
3018 :
3019 1 : let mut previous_nblocks = 0;
3020 11 : for i in 0..10 {
3021 10 : let crnt_blkno = base_blkno + i;
3022 10 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
3023 :
3024 10 : previous_nblocks = crnt_blkno + 1;
3025 :
3026 10 : if i == 0 {
3027 : // The first block we write is 1, so we should find the gap.
3028 1 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
3029 : } else {
3030 9 : assert!(gaps.is_none());
3031 : }
3032 : }
3033 :
3034 : // This is an update to an already existing block. No gaps here.
3035 1 : let update_blkno = 5;
3036 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3037 1 : assert!(gaps.is_none());
3038 :
3039 : // This is an update past the current end block.
3040 1 : let after_gap_blkno = 20;
3041 1 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
3042 :
3043 1 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
3044 1 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
3045 1 : assert_eq!(
3046 1 : gaps.unwrap(),
3047 1 : KeySpace::single(gap_start_key..after_gap_key)
3048 : );
3049 1 : }
3050 :
3051 : #[test]
3052 1 : fn sharded_gap_finding() {
3053 1 : let rel = RelTag {
3054 1 : spcnode: 1663,
3055 1 : dbnode: 208101,
3056 1 : relnode: 2620,
3057 1 : forknum: 0,
3058 1 : };
3059 :
3060 1 : let first_blkno = 6;
3061 :
3062 : // This shard will get the even blocks
3063 1 : let shard = ShardIdentity::from_params(
3064 1 : ShardNumber(0),
3065 1 : ShardParameters {
3066 1 : count: ShardCount(2),
3067 1 : stripe_size: ShardStripeSize(1),
3068 1 : },
3069 : );
3070 :
3071 : // Only keys belonging to this shard are considered as gaps.
3072 1 : let mut previous_nblocks = 0;
3073 1 : let gaps =
3074 1 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
3075 1 : assert!(!gaps.ranges.is_empty());
3076 3 : for gap_range in gaps.ranges {
3077 2 : let mut k = gap_range.start;
3078 4 : while k != gap_range.end {
3079 2 : assert_eq!(shard.get_shard_number(&k), shard.number);
3080 2 : k = k.next();
3081 : }
3082 : }
3083 :
3084 1 : previous_nblocks = first_blkno;
3085 :
3086 1 : let update_blkno = 2;
3087 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3088 1 : assert!(gaps.is_none());
3089 1 : }
3090 :
3091 : /*
3092 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
3093 : let incremental = timeline.get_current_logical_size();
3094 : let non_incremental = timeline
3095 : .get_current_logical_size_non_incremental(lsn)
3096 : .unwrap();
3097 : assert_eq!(incremental, non_incremental);
3098 : }
3099 : */
3100 :
3101 : /*
3102 : ///
3103 : /// Test list_rels() function, with branches and dropped relations
3104 : ///
3105 : #[test]
3106 : fn test_list_rels_drop() -> Result<()> {
3107 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
3108 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
3109 : const TESTDB: u32 = 111;
3110 :
3111 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
3112 : // after branching fails below
3113 : let mut writer = tline.begin_record(Lsn(0x10));
3114 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
3115 : writer.finish()?;
3116 :
3117 : // Create a relation on the timeline
3118 : let mut writer = tline.begin_record(Lsn(0x20));
3119 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
3120 : writer.finish()?;
3121 :
3122 : let writer = tline.begin_record(Lsn(0x00));
3123 : writer.finish()?;
3124 :
3125 : // Check that list_rels() lists it after LSN 2, but no before it
3126 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
3127 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
3128 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
3129 :
3130 : // Create a branch, check that the relation is visible there
3131 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
3132 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
3133 : Some(timeline) => timeline,
3134 : None => panic!("Should have a local timeline"),
3135 : };
3136 : let newtline = DatadirTimelineImpl::new(newtline);
3137 : assert!(newtline
3138 : .list_rels(0, TESTDB, Lsn(0x30))?
3139 : .contains(&TESTREL_A));
3140 :
3141 : // Drop it on the branch
3142 : let mut new_writer = newtline.begin_record(Lsn(0x40));
3143 : new_writer.drop_relation(TESTREL_A)?;
3144 : new_writer.finish()?;
3145 :
3146 : // Check that it's no longer listed on the branch after the point where it was dropped
3147 : assert!(newtline
3148 : .list_rels(0, TESTDB, Lsn(0x30))?
3149 : .contains(&TESTREL_A));
3150 : assert!(!newtline
3151 : .list_rels(0, TESTDB, Lsn(0x40))?
3152 : .contains(&TESTREL_A));
3153 :
3154 : // Run checkpoint and garbage collection and check that it's still not visible
3155 : newtline.checkpoint(CheckpointConfig::Forced)?;
3156 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
3157 :
3158 : assert!(!newtline
3159 : .list_rels(0, TESTDB, Lsn(0x40))?
3160 : .contains(&TESTREL_A));
3161 :
3162 : Ok(())
3163 : }
3164 : */
3165 :
3166 : /*
3167 : #[test]
3168 : fn test_read_beyond_eof() -> Result<()> {
3169 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3170 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3171 :
3172 : make_some_layers(&tline, Lsn(0x20))?;
3173 : let mut writer = tline.begin_record(Lsn(0x60));
3174 : walingest.put_rel_page_image(
3175 : &mut writer,
3176 : TESTREL_A,
3177 : 0,
3178 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3179 : )?;
3180 : writer.finish()?;
3181 :
3182 : // Test read before rel creation. Should error out.
3183 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3184 :
3185 : // Read block beyond end of relation at different points in time.
3186 : // These reads should fall into different delta, image, and in-memory layers.
3187 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3188 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3189 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3190 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3191 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3192 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3193 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3194 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3195 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3196 :
3197 : // Test on an in-memory layer with no preceding layer
3198 : let mut writer = tline.begin_record(Lsn(0x70));
3199 : walingest.put_rel_page_image(
3200 : &mut writer,
3201 : TESTREL_B,
3202 : 0,
3203 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3204 : )?;
3205 : writer.finish()?;
3206 :
3207 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3208 :
3209 : Ok(())
3210 : }
3211 : */
3212 : }
|