Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 :
12 : use anyhow::{Context, ensure};
13 : use bytes::{Buf, Bytes, BytesMut};
14 : use enum_map::Enum;
15 : use itertools::Itertools;
16 : use pageserver_api::key::{
17 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
18 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
19 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
20 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
21 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
22 : };
23 : use pageserver_api::keyspace::SparseKeySpace;
24 : use pageserver_api::models::RelSizeMigration;
25 : use pageserver_api::record::NeonWalRecord;
26 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
27 : use pageserver_api::shard::ShardIdentity;
28 : use pageserver_api::value::Value;
29 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
30 : use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
31 : use serde::{Deserialize, Serialize};
32 : use strum::IntoEnumIterator;
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::{debug, info, trace, warn};
35 : use utils::bin_ser::{BeSer, DeserializeError};
36 : use utils::lsn::Lsn;
37 : use utils::pausable_failpoint;
38 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
39 :
40 : use super::tenant::{PageReconstructError, Timeline};
41 : use crate::aux_file;
42 : use crate::context::RequestContext;
43 : use crate::keyspace::{KeySpace, KeySpaceAccum};
44 : use crate::metrics::{
45 : RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
46 : };
47 : use crate::span::{
48 : debug_assert_current_span_has_tenant_and_timeline_id,
49 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
50 : };
51 : use crate::tenant::storage_layer::IoConcurrency;
52 : use crate::tenant::timeline::GetVectoredError;
53 :
54 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
55 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
56 :
57 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
58 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
59 :
60 : #[derive(Debug)]
61 : pub enum LsnForTimestamp {
62 : /// Found commits both before and after the given timestamp
63 : Present(Lsn),
64 :
65 : /// Found no commits after the given timestamp, this means
66 : /// that the newest data in the branch is older than the given
67 : /// timestamp.
68 : ///
69 : /// All commits <= LSN happened before the given timestamp
70 : Future(Lsn),
71 :
72 : /// The queried timestamp is past our horizon we look back at (PITR)
73 : ///
74 : /// All commits > LSN happened after the given timestamp,
75 : /// but any commits < LSN might have happened before or after
76 : /// the given timestamp. We don't know because no data before
77 : /// the given lsn is available.
78 : Past(Lsn),
79 :
80 : /// We have found no commit with a timestamp,
81 : /// so we can't return anything meaningful.
82 : ///
83 : /// The associated LSN is the lower bound value we can safely
84 : /// create branches on, but no statement is made if it is
85 : /// older or newer than the timestamp.
86 : ///
87 : /// This variant can e.g. be returned right after a
88 : /// cluster import.
89 : NoData(Lsn),
90 : }
91 :
92 : #[derive(Debug, thiserror::Error)]
93 : pub(crate) enum CalculateLogicalSizeError {
94 : #[error("cancelled")]
95 : Cancelled,
96 :
97 : /// Something went wrong while reading the metadata we use to calculate logical size
98 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
99 : /// in the `From` implementation for this variant.
100 : #[error(transparent)]
101 : PageRead(PageReconstructError),
102 :
103 : /// Something went wrong deserializing metadata that we read to calculate logical size
104 : #[error("decode error: {0}")]
105 : Decode(#[from] DeserializeError),
106 : }
107 :
108 : #[derive(Debug, thiserror::Error)]
109 : pub(crate) enum CollectKeySpaceError {
110 : #[error(transparent)]
111 : Decode(#[from] DeserializeError),
112 : #[error(transparent)]
113 : PageRead(PageReconstructError),
114 : #[error("cancelled")]
115 : Cancelled,
116 : }
117 :
118 : impl From<PageReconstructError> for CollectKeySpaceError {
119 0 : fn from(err: PageReconstructError) -> Self {
120 0 : match err {
121 0 : PageReconstructError::Cancelled => Self::Cancelled,
122 0 : err => Self::PageRead(err),
123 : }
124 0 : }
125 : }
126 :
127 : impl From<PageReconstructError> for CalculateLogicalSizeError {
128 0 : fn from(pre: PageReconstructError) -> Self {
129 0 : match pre {
130 0 : PageReconstructError::Cancelled => Self::Cancelled,
131 0 : _ => Self::PageRead(pre),
132 : }
133 0 : }
134 : }
135 :
136 : #[derive(Debug, thiserror::Error)]
137 : pub enum RelationError {
138 : #[error("Relation Already Exists")]
139 : AlreadyExists,
140 : #[error("invalid relnode")]
141 : InvalidRelnode,
142 : #[error(transparent)]
143 : Other(#[from] anyhow::Error),
144 : }
145 :
146 : ///
147 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
148 : /// and other special kinds of files, in a versioned key-value store. The
149 : /// Timeline struct provides the key-value store.
150 : ///
151 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
152 : /// implementation, and might be moved into a separate struct later.
153 : impl Timeline {
154 : /// Start ingesting a WAL record, or other atomic modification of
155 : /// the timeline.
156 : ///
157 : /// This provides a transaction-like interface to perform a bunch
158 : /// of modifications atomically.
159 : ///
160 : /// To ingest a WAL record, call begin_modification(lsn) to get a
161 : /// DatadirModification object. Use the functions in the object to
162 : /// modify the repository state, updating all the pages and metadata
163 : /// that the WAL record affects. When you're done, call commit() to
164 : /// commit the changes.
165 : ///
166 : /// Lsn stored in modification is advanced by `ingest_record` and
167 : /// is used by `commit()` to update `last_record_lsn`.
168 : ///
169 : /// Calling commit() will flush all the changes and reset the state,
170 : /// so the `DatadirModification` struct can be reused to perform the next modification.
171 : ///
172 : /// Note that any pending modifications you make through the
173 : /// modification object won't be visible to calls to the 'get' and list
174 : /// functions of the timeline until you finish! And if you update the
175 : /// same page twice, the last update wins.
176 : ///
177 536828 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
178 536828 : where
179 536828 : Self: Sized,
180 536828 : {
181 536828 : DatadirModification {
182 536828 : tline: self,
183 536828 : pending_lsns: Vec::new(),
184 536828 : pending_metadata_pages: HashMap::new(),
185 536828 : pending_data_batch: None,
186 536828 : pending_deletions: Vec::new(),
187 536828 : pending_nblocks: 0,
188 536828 : pending_directory_entries: Vec::new(),
189 536828 : pending_metadata_bytes: 0,
190 536828 : lsn,
191 536828 : }
192 536828 : }
193 :
194 : //------------------------------------------------------------------------------
195 : // Public GET functions
196 : //------------------------------------------------------------------------------
197 :
198 : /// Look up given page version.
199 36768 : pub(crate) async fn get_rel_page_at_lsn(
200 36768 : &self,
201 36768 : tag: RelTag,
202 36768 : blknum: BlockNumber,
203 36768 : version: Version<'_>,
204 36768 : ctx: &RequestContext,
205 36768 : io_concurrency: IoConcurrency,
206 36768 : ) -> Result<Bytes, PageReconstructError> {
207 36768 : match version {
208 36768 : Version::Lsn(effective_lsn) => {
209 36768 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
210 36768 : let res = self
211 36768 : .get_rel_page_at_lsn_batched(
212 36768 : pages.iter().map(|(tag, blknum)| (tag, blknum)),
213 36768 : effective_lsn,
214 36768 : io_concurrency.clone(),
215 36768 : ctx,
216 36768 : )
217 36768 : .await;
218 36768 : assert_eq!(res.len(), 1);
219 36768 : res.into_iter().next().unwrap()
220 : }
221 0 : Version::Modified(modification) => {
222 0 : if tag.relnode == 0 {
223 0 : return Err(PageReconstructError::Other(
224 0 : RelationError::InvalidRelnode.into(),
225 0 : ));
226 0 : }
227 :
228 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
229 0 : if blknum >= nblocks {
230 0 : debug!(
231 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
232 0 : tag,
233 0 : blknum,
234 0 : version.get_lsn(),
235 : nblocks
236 : );
237 0 : return Ok(ZERO_PAGE.clone());
238 0 : }
239 0 :
240 0 : let key = rel_block_to_key(tag, blknum);
241 0 : modification.get(key, ctx).await
242 : }
243 : }
244 36768 : }
245 :
246 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
247 : ///
248 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
249 36768 : pub(crate) async fn get_rel_page_at_lsn_batched(
250 36768 : &self,
251 36768 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
252 36768 : effective_lsn: Lsn,
253 36768 : io_concurrency: IoConcurrency,
254 36768 : ctx: &RequestContext,
255 36768 : ) -> Vec<Result<Bytes, PageReconstructError>> {
256 36768 : debug_assert_current_span_has_tenant_and_timeline_id();
257 36768 :
258 36768 : let mut slots_filled = 0;
259 36768 : let page_count = pages.len();
260 36768 :
261 36768 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
262 36768 : let mut result = Vec::with_capacity(pages.len());
263 36768 : let result_slots = result.spare_capacity_mut();
264 36768 :
265 36768 : let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
266 36768 : for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
267 36768 : if tag.relnode == 0 {
268 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
269 0 : RelationError::InvalidRelnode.into(),
270 0 : )));
271 0 :
272 0 : slots_filled += 1;
273 0 : continue;
274 36768 : }
275 :
276 36768 : let nblocks = match self
277 36768 : .get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
278 36768 : .await
279 : {
280 36768 : Ok(nblocks) => nblocks,
281 0 : Err(err) => {
282 0 : result_slots[response_slot_idx].write(Err(err));
283 0 : slots_filled += 1;
284 0 : continue;
285 : }
286 : };
287 :
288 36768 : if *blknum >= nblocks {
289 0 : debug!(
290 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
291 : tag, blknum, effective_lsn, nblocks
292 : );
293 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
294 0 : slots_filled += 1;
295 0 : continue;
296 36768 : }
297 36768 :
298 36768 : let key = rel_block_to_key(*tag, *blknum);
299 36768 :
300 36768 : let key_slots = keys_slots.entry(key).or_default();
301 36768 : key_slots.push(response_slot_idx);
302 : }
303 :
304 36768 : let keyspace = {
305 : // add_key requires monotonicity
306 36768 : let mut acc = KeySpaceAccum::new();
307 36768 : for key in keys_slots
308 36768 : .keys()
309 36768 : // in fact it requires strong monotonicity
310 36768 : .dedup()
311 36768 : {
312 36768 : acc.add_key(*key);
313 36768 : }
314 36768 : acc.to_keyspace()
315 36768 : };
316 36768 :
317 36768 : match self
318 36768 : .get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
319 36768 : .await
320 : {
321 36768 : Ok(results) => {
322 73536 : for (key, res) in results {
323 36768 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
324 36768 : let first_slot = key_slots.next().unwrap();
325 :
326 36768 : for slot in key_slots {
327 0 : let clone = match &res {
328 0 : Ok(buf) => Ok(buf.clone()),
329 0 : Err(err) => Err(match err {
330 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
331 :
332 0 : x @ PageReconstructError::Other(_)
333 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
334 0 : | x @ PageReconstructError::WalRedo(_)
335 0 : | x @ PageReconstructError::MissingKey(_) => {
336 0 : PageReconstructError::Other(anyhow::anyhow!(
337 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
338 0 : ))
339 : }
340 : }),
341 : };
342 :
343 0 : result_slots[slot].write(clone);
344 0 : slots_filled += 1;
345 : }
346 :
347 36768 : result_slots[first_slot].write(res);
348 36768 : slots_filled += 1;
349 : }
350 : }
351 0 : Err(err) => {
352 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
353 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
354 0 : for slot in keys_slots.values().flatten() {
355 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
356 : // but without taking ownership of the GetVectoredError
357 0 : let err = match &err {
358 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
359 : // TODO: restructure get_vectored API to make this error per-key
360 0 : GetVectoredError::MissingKey(err) => {
361 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
362 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
363 0 : )))
364 : }
365 : // TODO: restructure get_vectored API to make this error per-key
366 0 : GetVectoredError::GetReadyAncestorError(err) => {
367 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
368 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
369 0 : )))
370 : }
371 : // TODO: restructure get_vectored API to make this error per-key
372 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
373 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
374 0 : )),
375 : // TODO: we can prevent this error class by moving this check into the type system
376 0 : GetVectoredError::InvalidLsn(e) => {
377 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
378 : }
379 : // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
380 : // TODO: we can prevent this error class by moving this check into the type system
381 0 : GetVectoredError::Oversized(err) => {
382 0 : Err(anyhow::anyhow!("batching oversized: {err:?}").into())
383 : }
384 : };
385 :
386 0 : result_slots[*slot].write(err);
387 : }
388 :
389 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
390 0 : }
391 : };
392 :
393 36768 : assert_eq!(slots_filled, page_count);
394 : // SAFETY:
395 : // 1. `result` and any of its uninint members are not read from until this point
396 : // 2. The length below is tracked at run-time and matches the number of requested pages.
397 36768 : unsafe {
398 36768 : result.set_len(page_count);
399 36768 : }
400 36768 :
401 36768 : result
402 36768 : }
403 :
404 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
405 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
406 : /// for pages up to the highest page number it has stored.
407 0 : pub(crate) async fn get_db_size(
408 0 : &self,
409 0 : spcnode: Oid,
410 0 : dbnode: Oid,
411 0 : version: Version<'_>,
412 0 : ctx: &RequestContext,
413 0 : ) -> Result<usize, PageReconstructError> {
414 0 : let mut total_blocks = 0;
415 :
416 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
417 :
418 0 : for rel in rels {
419 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
420 0 : total_blocks += n_blocks as usize;
421 : }
422 0 : Ok(total_blocks)
423 0 : }
424 :
425 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
426 : ///
427 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
428 : /// page number stored in the shard.
429 48868 : pub(crate) async fn get_rel_size(
430 48868 : &self,
431 48868 : tag: RelTag,
432 48868 : version: Version<'_>,
433 48868 : ctx: &RequestContext,
434 48868 : ) -> Result<BlockNumber, PageReconstructError> {
435 48868 : if tag.relnode == 0 {
436 0 : return Err(PageReconstructError::Other(
437 0 : RelationError::InvalidRelnode.into(),
438 0 : ));
439 48868 : }
440 :
441 48868 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
442 38588 : return Ok(nblocks);
443 10280 : }
444 10280 :
445 10280 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
446 0 : && !self.get_rel_exists(tag, version, ctx).await?
447 : {
448 : // FIXME: Postgres sometimes calls smgrcreate() to create
449 : // FSM, and smgrnblocks() on it immediately afterwards,
450 : // without extending it. Tolerate that by claiming that
451 : // any non-existent FSM fork has size 0.
452 0 : return Ok(0);
453 10280 : }
454 10280 :
455 10280 : let key = rel_size_to_key(tag);
456 10280 : let mut buf = version.get(self, key, ctx).await?;
457 10272 : let nblocks = buf.get_u32_le();
458 10272 :
459 10272 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
460 10272 :
461 10272 : Ok(nblocks)
462 48868 : }
463 :
464 : /// Does the relation exist?
465 : ///
466 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
467 : /// the shard stores pages for.
468 12100 : pub(crate) async fn get_rel_exists(
469 12100 : &self,
470 12100 : tag: RelTag,
471 12100 : version: Version<'_>,
472 12100 : ctx: &RequestContext,
473 12100 : ) -> Result<bool, PageReconstructError> {
474 12100 : if tag.relnode == 0 {
475 0 : return Err(PageReconstructError::Other(
476 0 : RelationError::InvalidRelnode.into(),
477 0 : ));
478 12100 : }
479 :
480 : // first try to lookup relation in cache
481 12100 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
482 12064 : return Ok(true);
483 36 : }
484 : // then check if the database was already initialized.
485 : // get_rel_exists can be called before dbdir is created.
486 36 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
487 36 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
488 36 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
489 0 : return Ok(false);
490 36 : }
491 36 :
492 36 : // Read path: first read the new reldir keyspace. Early return if the relation exists.
493 36 : // Otherwise, read the old reldir keyspace.
494 36 : // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
495 36 :
496 36 : if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
497 36 : self.get_rel_size_v2_status()
498 : {
499 : // fetch directory listing (new)
500 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
501 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
502 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
503 0 : let exists_v2 = buf == RelDirExists::Exists;
504 0 : // Fast path: if the relation exists in the new format, return true.
505 0 : // TODO: we should have a verification mode that checks both keyspaces
506 0 : // to ensure the relation only exists in one of them.
507 0 : if exists_v2 {
508 0 : return Ok(true);
509 0 : }
510 36 : }
511 :
512 : // fetch directory listing (old)
513 :
514 36 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
515 36 : let buf = version.get(self, key, ctx).await?;
516 :
517 36 : let dir = RelDirectory::des(&buf)?;
518 36 : let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
519 36 : Ok(exists_v1)
520 12100 : }
521 :
522 : /// Get a list of all existing relations in given tablespace and database.
523 : ///
524 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
525 : /// the shard stores pages for.
526 : ///
527 : /// # Cancel-Safety
528 : ///
529 : /// This method is cancellation-safe.
530 0 : pub(crate) async fn list_rels(
531 0 : &self,
532 0 : spcnode: Oid,
533 0 : dbnode: Oid,
534 0 : version: Version<'_>,
535 0 : ctx: &RequestContext,
536 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
537 0 : // fetch directory listing (old)
538 0 : let key = rel_dir_to_key(spcnode, dbnode);
539 0 : let buf = version.get(self, key, ctx).await?;
540 :
541 0 : let dir = RelDirectory::des(&buf)?;
542 0 : let rels_v1: HashSet<RelTag> =
543 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
544 0 : spcnode,
545 0 : dbnode,
546 0 : relnode: *relnode,
547 0 : forknum: *forknum,
548 0 : }));
549 0 :
550 0 : if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
551 0 : return Ok(rels_v1);
552 0 : }
553 0 :
554 0 : // scan directory listing (new), merge with the old results
555 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
556 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
557 0 : self.conf,
558 0 : self.gate
559 0 : .enter()
560 0 : .map_err(|_| PageReconstructError::Cancelled)?,
561 : );
562 0 : let results = self
563 0 : .scan(
564 0 : KeySpace::single(key_range),
565 0 : version.get_lsn(),
566 0 : ctx,
567 0 : io_concurrency,
568 0 : )
569 0 : .await?;
570 0 : let mut rels = rels_v1;
571 0 : for (key, val) in results {
572 0 : let val = RelDirExists::decode(&val?)
573 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
574 0 : assert_eq!(key.field6, 1);
575 0 : assert_eq!(key.field2, spcnode);
576 0 : assert_eq!(key.field3, dbnode);
577 0 : let tag = RelTag {
578 0 : spcnode,
579 0 : dbnode,
580 0 : relnode: key.field4,
581 0 : forknum: key.field5,
582 0 : };
583 0 : if val == RelDirExists::Removed {
584 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
585 0 : continue;
586 0 : }
587 0 : let did_not_contain = rels.insert(tag);
588 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
589 : }
590 0 : Ok(rels)
591 0 : }
592 :
593 : /// Get the whole SLRU segment
594 0 : pub(crate) async fn get_slru_segment(
595 0 : &self,
596 0 : kind: SlruKind,
597 0 : segno: u32,
598 0 : lsn: Lsn,
599 0 : ctx: &RequestContext,
600 0 : ) -> Result<Bytes, PageReconstructError> {
601 0 : assert!(self.tenant_shard_id.is_shard_zero());
602 0 : let n_blocks = self
603 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
604 0 : .await?;
605 :
606 0 : let keyspace = KeySpace::single(
607 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
608 0 : );
609 0 :
610 0 : let batches = keyspace.partition(
611 0 : self.get_shard_identity(),
612 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
613 0 : );
614 :
615 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
616 0 : self.conf,
617 0 : self.gate
618 0 : .enter()
619 0 : .map_err(|_| PageReconstructError::Cancelled)?,
620 : );
621 :
622 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
623 0 : for batch in batches.parts {
624 0 : let blocks = self
625 0 : .get_vectored(batch, lsn, io_concurrency.clone(), ctx)
626 0 : .await?;
627 :
628 0 : for (_key, block) in blocks {
629 0 : let block = block?;
630 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
631 : }
632 : }
633 :
634 0 : Ok(segment.freeze())
635 0 : }
636 :
637 : /// Get size of an SLRU segment
638 0 : pub(crate) async fn get_slru_segment_size(
639 0 : &self,
640 0 : kind: SlruKind,
641 0 : segno: u32,
642 0 : version: Version<'_>,
643 0 : ctx: &RequestContext,
644 0 : ) -> Result<BlockNumber, PageReconstructError> {
645 0 : assert!(self.tenant_shard_id.is_shard_zero());
646 0 : let key = slru_segment_size_to_key(kind, segno);
647 0 : let mut buf = version.get(self, key, ctx).await?;
648 0 : Ok(buf.get_u32_le())
649 0 : }
650 :
651 : /// Get size of an SLRU segment
652 0 : pub(crate) async fn get_slru_segment_exists(
653 0 : &self,
654 0 : kind: SlruKind,
655 0 : segno: u32,
656 0 : version: Version<'_>,
657 0 : ctx: &RequestContext,
658 0 : ) -> Result<bool, PageReconstructError> {
659 0 : assert!(self.tenant_shard_id.is_shard_zero());
660 : // fetch directory listing
661 0 : let key = slru_dir_to_key(kind);
662 0 : let buf = version.get(self, key, ctx).await?;
663 :
664 0 : let dir = SlruSegmentDirectory::des(&buf)?;
665 0 : Ok(dir.segments.contains(&segno))
666 0 : }
667 :
668 : /// Locate LSN, such that all transactions that committed before
669 : /// 'search_timestamp' are visible, but nothing newer is.
670 : ///
671 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
672 : /// so it's not well defined which LSN you get if there were multiple commits
673 : /// "in flight" at that point in time.
674 : ///
675 0 : pub(crate) async fn find_lsn_for_timestamp(
676 0 : &self,
677 0 : search_timestamp: TimestampTz,
678 0 : cancel: &CancellationToken,
679 0 : ctx: &RequestContext,
680 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
681 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
682 :
683 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
684 0 : let gc_cutoff_planned = {
685 0 : let gc_info = self.gc_info.read().unwrap();
686 0 : gc_info.min_cutoff()
687 0 : };
688 0 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
689 0 : // but let's be defensive.
690 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
691 0 : // We use this method to figure out the branching LSN for the new branch, but the
692 0 : // GC cutoff could be before the branching point and we cannot create a new branch
693 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
694 0 : // on the safe side.
695 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
696 0 : let max_lsn = self.get_last_record_lsn();
697 0 :
698 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
699 0 : // LSN divided by 8.
700 0 : let mut low = min_lsn.0 / 8;
701 0 : let mut high = max_lsn.0 / 8 + 1;
702 0 :
703 0 : let mut found_smaller = false;
704 0 : let mut found_larger = false;
705 :
706 0 : while low < high {
707 0 : if cancel.is_cancelled() {
708 0 : return Err(PageReconstructError::Cancelled);
709 0 : }
710 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
711 0 : let mid = (high + low) / 2;
712 :
713 0 : let cmp = match self
714 0 : .is_latest_commit_timestamp_ge_than(
715 0 : search_timestamp,
716 0 : Lsn(mid * 8),
717 0 : &mut found_smaller,
718 0 : &mut found_larger,
719 0 : ctx,
720 0 : )
721 0 : .await
722 : {
723 0 : Ok(res) => res,
724 0 : Err(PageReconstructError::MissingKey(e)) => {
725 0 : warn!(
726 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
727 : e
728 : );
729 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
730 0 : return Ok(LsnForTimestamp::Past(min_lsn));
731 : }
732 0 : Err(e) => return Err(e),
733 : };
734 :
735 0 : if cmp {
736 0 : high = mid;
737 0 : } else {
738 0 : low = mid + 1;
739 0 : }
740 : }
741 :
742 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
743 : // so the LSN of the last commit record before or at `search_timestamp`.
744 : // Remove one from `low` to get `t`.
745 : //
746 : // FIXME: it would be better to get the LSN of the previous commit.
747 : // Otherwise, if you restore to the returned LSN, the database will
748 : // include physical changes from later commits that will be marked
749 : // as aborted, and will need to be vacuumed away.
750 0 : let commit_lsn = Lsn((low - 1) * 8);
751 0 : match (found_smaller, found_larger) {
752 : (false, false) => {
753 : // This can happen if no commit records have been processed yet, e.g.
754 : // just after importing a cluster.
755 0 : Ok(LsnForTimestamp::NoData(min_lsn))
756 : }
757 : (false, true) => {
758 : // Didn't find any commit timestamps smaller than the request
759 0 : Ok(LsnForTimestamp::Past(min_lsn))
760 : }
761 0 : (true, _) if commit_lsn < min_lsn => {
762 0 : // the search above did set found_smaller to true but it never increased the lsn.
763 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
764 0 : // below the min_lsn. We should never do that.
765 0 : Ok(LsnForTimestamp::Past(min_lsn))
766 : }
767 : (true, false) => {
768 : // Only found commits with timestamps smaller than the request.
769 : // It's still a valid case for branch creation, return it.
770 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
771 : // case, anyway.
772 0 : Ok(LsnForTimestamp::Future(commit_lsn))
773 : }
774 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
775 : }
776 0 : }
777 :
778 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
779 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
780 : ///
781 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
782 : /// with a smaller/larger timestamp.
783 : ///
784 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
785 0 : &self,
786 0 : search_timestamp: TimestampTz,
787 0 : probe_lsn: Lsn,
788 0 : found_smaller: &mut bool,
789 0 : found_larger: &mut bool,
790 0 : ctx: &RequestContext,
791 0 : ) -> Result<bool, PageReconstructError> {
792 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
793 0 : if timestamp >= search_timestamp {
794 0 : *found_larger = true;
795 0 : return ControlFlow::Break(true);
796 0 : } else {
797 0 : *found_smaller = true;
798 0 : }
799 0 : ControlFlow::Continue(())
800 0 : })
801 0 : .await
802 0 : }
803 :
804 : /// Obtain the possible timestamp range for the given lsn.
805 : ///
806 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
807 0 : pub(crate) async fn get_timestamp_for_lsn(
808 0 : &self,
809 0 : probe_lsn: Lsn,
810 0 : ctx: &RequestContext,
811 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
812 0 : let mut max: Option<TimestampTz> = None;
813 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
814 0 : if let Some(max_prev) = max {
815 0 : max = Some(max_prev.max(timestamp));
816 0 : } else {
817 0 : max = Some(timestamp);
818 0 : }
819 0 : ControlFlow::Continue(())
820 0 : })
821 0 : .await?;
822 :
823 0 : Ok(max)
824 0 : }
825 :
826 : /// Runs the given function on all the timestamps for a given lsn
827 : ///
828 : /// The return value is either given by the closure, or set to the `Default`
829 : /// impl's output.
830 0 : async fn map_all_timestamps<T: Default>(
831 0 : &self,
832 0 : probe_lsn: Lsn,
833 0 : ctx: &RequestContext,
834 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
835 0 : ) -> Result<T, PageReconstructError> {
836 0 : for segno in self
837 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
838 0 : .await?
839 : {
840 0 : let nblocks = self
841 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
842 0 : .await?;
843 :
844 0 : let keyspace = KeySpace::single(
845 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
846 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
847 0 : );
848 0 :
849 0 : let batches = keyspace.partition(
850 0 : self.get_shard_identity(),
851 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
852 0 : );
853 :
854 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
855 0 : self.conf,
856 0 : self.gate
857 0 : .enter()
858 0 : .map_err(|_| PageReconstructError::Cancelled)?,
859 : );
860 :
861 0 : for batch in batches.parts.into_iter().rev() {
862 0 : let blocks = self
863 0 : .get_vectored(batch, probe_lsn, io_concurrency.clone(), ctx)
864 0 : .await?;
865 :
866 0 : for (_key, clog_page) in blocks.into_iter().rev() {
867 0 : let clog_page = clog_page?;
868 :
869 0 : if clog_page.len() == BLCKSZ as usize + 8 {
870 0 : let mut timestamp_bytes = [0u8; 8];
871 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
872 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
873 0 :
874 0 : match f(timestamp) {
875 0 : ControlFlow::Break(b) => return Ok(b),
876 0 : ControlFlow::Continue(()) => (),
877 : }
878 0 : }
879 : }
880 : }
881 : }
882 0 : Ok(Default::default())
883 0 : }
884 :
885 0 : pub(crate) async fn get_slru_keyspace(
886 0 : &self,
887 0 : version: Version<'_>,
888 0 : ctx: &RequestContext,
889 0 : ) -> Result<KeySpace, PageReconstructError> {
890 0 : let mut accum = KeySpaceAccum::new();
891 :
892 0 : for kind in SlruKind::iter() {
893 0 : let mut segments: Vec<u32> = self
894 0 : .list_slru_segments(kind, version, ctx)
895 0 : .await?
896 0 : .into_iter()
897 0 : .collect();
898 0 : segments.sort_unstable();
899 :
900 0 : for seg in segments {
901 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
902 :
903 0 : accum.add_range(
904 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
905 0 : );
906 : }
907 : }
908 :
909 0 : Ok(accum.to_keyspace())
910 0 : }
911 :
912 : /// Get a list of SLRU segments
913 0 : pub(crate) async fn list_slru_segments(
914 0 : &self,
915 0 : kind: SlruKind,
916 0 : version: Version<'_>,
917 0 : ctx: &RequestContext,
918 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
919 0 : // fetch directory entry
920 0 : let key = slru_dir_to_key(kind);
921 :
922 0 : let buf = version.get(self, key, ctx).await?;
923 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
924 0 : }
925 :
926 0 : pub(crate) async fn get_relmap_file(
927 0 : &self,
928 0 : spcnode: Oid,
929 0 : dbnode: Oid,
930 0 : version: Version<'_>,
931 0 : ctx: &RequestContext,
932 0 : ) -> Result<Bytes, PageReconstructError> {
933 0 : let key = relmap_file_key(spcnode, dbnode);
934 :
935 0 : let buf = version.get(self, key, ctx).await?;
936 0 : Ok(buf)
937 0 : }
938 :
939 652 : pub(crate) async fn list_dbdirs(
940 652 : &self,
941 652 : lsn: Lsn,
942 652 : ctx: &RequestContext,
943 652 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
944 : // fetch directory entry
945 652 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
946 :
947 652 : Ok(DbDirectory::des(&buf)?.dbdirs)
948 652 : }
949 :
950 0 : pub(crate) async fn get_twophase_file(
951 0 : &self,
952 0 : xid: u64,
953 0 : lsn: Lsn,
954 0 : ctx: &RequestContext,
955 0 : ) -> Result<Bytes, PageReconstructError> {
956 0 : let key = twophase_file_key(xid);
957 0 : let buf = self.get(key, lsn, ctx).await?;
958 0 : Ok(buf)
959 0 : }
960 :
961 656 : pub(crate) async fn list_twophase_files(
962 656 : &self,
963 656 : lsn: Lsn,
964 656 : ctx: &RequestContext,
965 656 : ) -> Result<HashSet<u64>, PageReconstructError> {
966 : // fetch directory entry
967 656 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
968 :
969 656 : if self.pg_version >= 17 {
970 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
971 : } else {
972 656 : Ok(TwoPhaseDirectory::des(&buf)?
973 : .xids
974 656 : .iter()
975 656 : .map(|x| u64::from(*x))
976 656 : .collect())
977 : }
978 656 : }
979 :
980 0 : pub(crate) async fn get_control_file(
981 0 : &self,
982 0 : lsn: Lsn,
983 0 : ctx: &RequestContext,
984 0 : ) -> Result<Bytes, PageReconstructError> {
985 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
986 0 : }
987 :
988 24 : pub(crate) async fn get_checkpoint(
989 24 : &self,
990 24 : lsn: Lsn,
991 24 : ctx: &RequestContext,
992 24 : ) -> Result<Bytes, PageReconstructError> {
993 24 : self.get(CHECKPOINT_KEY, lsn, ctx).await
994 24 : }
995 :
996 24 : async fn list_aux_files_v2(
997 24 : &self,
998 24 : lsn: Lsn,
999 24 : ctx: &RequestContext,
1000 24 : io_concurrency: IoConcurrency,
1001 24 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1002 24 : let kv = self
1003 24 : .scan(
1004 24 : KeySpace::single(Key::metadata_aux_key_range()),
1005 24 : lsn,
1006 24 : ctx,
1007 24 : io_concurrency,
1008 24 : )
1009 24 : .await?;
1010 24 : let mut result = HashMap::new();
1011 24 : let mut sz = 0;
1012 60 : for (_, v) in kv {
1013 36 : let v = v?;
1014 36 : let v = aux_file::decode_file_value_bytes(&v)
1015 36 : .context("value decode")
1016 36 : .map_err(PageReconstructError::Other)?;
1017 68 : for (fname, content) in v {
1018 32 : sz += fname.len();
1019 32 : sz += content.len();
1020 32 : result.insert(fname, content);
1021 32 : }
1022 : }
1023 24 : self.aux_file_size_estimator.on_initial(sz);
1024 24 : Ok(result)
1025 24 : }
1026 :
1027 0 : pub(crate) async fn trigger_aux_file_size_computation(
1028 0 : &self,
1029 0 : lsn: Lsn,
1030 0 : ctx: &RequestContext,
1031 0 : io_concurrency: IoConcurrency,
1032 0 : ) -> Result<(), PageReconstructError> {
1033 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1034 0 : Ok(())
1035 0 : }
1036 :
1037 24 : pub(crate) async fn list_aux_files(
1038 24 : &self,
1039 24 : lsn: Lsn,
1040 24 : ctx: &RequestContext,
1041 24 : io_concurrency: IoConcurrency,
1042 24 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1043 24 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1044 24 : }
1045 :
1046 0 : pub(crate) async fn get_replorigins(
1047 0 : &self,
1048 0 : lsn: Lsn,
1049 0 : ctx: &RequestContext,
1050 0 : io_concurrency: IoConcurrency,
1051 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1052 0 : let kv = self
1053 0 : .scan(
1054 0 : KeySpace::single(repl_origin_key_range()),
1055 0 : lsn,
1056 0 : ctx,
1057 0 : io_concurrency,
1058 0 : )
1059 0 : .await?;
1060 0 : let mut result = HashMap::new();
1061 0 : for (k, v) in kv {
1062 0 : let v = v?;
1063 0 : let origin_id = k.field6 as RepOriginId;
1064 0 : let origin_lsn = Lsn::des(&v).unwrap();
1065 0 : if origin_lsn != Lsn::INVALID {
1066 0 : result.insert(origin_id, origin_lsn);
1067 0 : }
1068 : }
1069 0 : Ok(result)
1070 0 : }
1071 :
1072 : /// Does the same as get_current_logical_size but counted on demand.
1073 : /// Used to initialize the logical size tracking on startup.
1074 : ///
1075 : /// Only relation blocks are counted currently. That excludes metadata,
1076 : /// SLRUs, twophase files etc.
1077 : ///
1078 : /// # Cancel-Safety
1079 : ///
1080 : /// This method is cancellation-safe.
1081 0 : pub(crate) async fn get_current_logical_size_non_incremental(
1082 0 : &self,
1083 0 : lsn: Lsn,
1084 0 : ctx: &RequestContext,
1085 0 : ) -> Result<u64, CalculateLogicalSizeError> {
1086 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1087 0 :
1088 0 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1089 :
1090 : // Fetch list of database dirs and iterate them
1091 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1092 0 : let dbdir = DbDirectory::des(&buf)?;
1093 :
1094 0 : let mut total_size: u64 = 0;
1095 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1096 0 : for rel in self
1097 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
1098 0 : .await?
1099 : {
1100 0 : if self.cancel.is_cancelled() {
1101 0 : return Err(CalculateLogicalSizeError::Cancelled);
1102 0 : }
1103 0 : let relsize_key = rel_size_to_key(rel);
1104 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1105 0 : let relsize = buf.get_u32_le();
1106 0 :
1107 0 : total_size += relsize as u64;
1108 : }
1109 : }
1110 0 : Ok(total_size * BLCKSZ as u64)
1111 0 : }
1112 :
1113 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1114 : /// for gc-compaction.
1115 : ///
1116 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1117 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1118 : /// be kept only for a specific range of LSN.
1119 : ///
1120 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1121 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1122 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1123 : /// determine which keys to retain/drop for gc-compaction.
1124 : ///
1125 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1126 : /// to be retained for each of the branch LSN.
1127 : ///
1128 : /// The return value is (dense keyspace, sparse keyspace).
1129 104 : pub(crate) async fn collect_gc_compaction_keyspace(
1130 104 : &self,
1131 104 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1132 104 : let metadata_key_begin = Key::metadata_key_range().start;
1133 104 : let aux_v1_key = AUX_FILES_KEY;
1134 104 : let dense_keyspace = KeySpace {
1135 104 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1136 104 : };
1137 104 : Ok((
1138 104 : dense_keyspace,
1139 104 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1140 104 : ))
1141 104 : }
1142 :
1143 : ///
1144 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1145 : /// Anything that's not listed maybe removed from the underlying storage (from
1146 : /// that LSN forwards).
1147 : ///
1148 : /// The return value is (dense keyspace, sparse keyspace).
1149 652 : pub(crate) async fn collect_keyspace(
1150 652 : &self,
1151 652 : lsn: Lsn,
1152 652 : ctx: &RequestContext,
1153 652 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1154 652 : // Iterate through key ranges, greedily packing them into partitions
1155 652 : let mut result = KeySpaceAccum::new();
1156 652 :
1157 652 : // The dbdir metadata always exists
1158 652 : result.add_key(DBDIR_KEY);
1159 :
1160 : // Fetch list of database dirs and iterate them
1161 652 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1162 652 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1163 652 :
1164 652 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1165 652 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1166 0 : if has_relmap_file {
1167 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1168 0 : }
1169 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1170 :
1171 0 : let mut rels: Vec<RelTag> = self
1172 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
1173 0 : .await?
1174 0 : .into_iter()
1175 0 : .collect();
1176 0 : rels.sort_unstable();
1177 0 : for rel in rels {
1178 0 : let relsize_key = rel_size_to_key(rel);
1179 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1180 0 : let relsize = buf.get_u32_le();
1181 0 :
1182 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1183 0 : result.add_key(relsize_key);
1184 : }
1185 : }
1186 :
1187 : // Iterate SLRUs next
1188 652 : if self.tenant_shard_id.is_shard_zero() {
1189 1920 : for kind in [
1190 640 : SlruKind::Clog,
1191 640 : SlruKind::MultiXactMembers,
1192 640 : SlruKind::MultiXactOffsets,
1193 : ] {
1194 1920 : let slrudir_key = slru_dir_to_key(kind);
1195 1920 : result.add_key(slrudir_key);
1196 1920 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1197 1920 : let dir = SlruSegmentDirectory::des(&buf)?;
1198 1920 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1199 1920 : segments.sort_unstable();
1200 1920 : for segno in segments {
1201 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1202 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1203 0 : let segsize = buf.get_u32_le();
1204 0 :
1205 0 : result.add_range(
1206 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1207 0 : );
1208 0 : result.add_key(segsize_key);
1209 : }
1210 : }
1211 12 : }
1212 :
1213 : // Then pg_twophase
1214 652 : result.add_key(TWOPHASEDIR_KEY);
1215 :
1216 652 : let mut xids: Vec<u64> = self
1217 652 : .list_twophase_files(lsn, ctx)
1218 652 : .await?
1219 652 : .iter()
1220 652 : .cloned()
1221 652 : .collect();
1222 652 : xids.sort_unstable();
1223 652 : for xid in xids {
1224 0 : result.add_key(twophase_file_key(xid));
1225 0 : }
1226 :
1227 652 : result.add_key(CONTROLFILE_KEY);
1228 652 : result.add_key(CHECKPOINT_KEY);
1229 652 :
1230 652 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1231 652 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1232 652 : // and the keys will not be garbage-colllected.
1233 652 : #[cfg(test)]
1234 652 : {
1235 652 : let guard = self.extra_test_dense_keyspace.load();
1236 652 : for kr in &guard.ranges {
1237 0 : result.add_range(kr.clone());
1238 0 : }
1239 0 : }
1240 0 :
1241 652 : let dense_keyspace = result.to_keyspace();
1242 652 : let sparse_keyspace = SparseKeySpace(KeySpace {
1243 652 : ranges: vec![
1244 652 : Key::metadata_aux_key_range(),
1245 652 : repl_origin_key_range(),
1246 652 : Key::rel_dir_sparse_key_range(),
1247 652 : ],
1248 652 : });
1249 652 :
1250 652 : if cfg!(debug_assertions) {
1251 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1252 :
1253 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1254 : // category of sparse keys are split into their own image/delta files. If there
1255 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1256 : // and we want the developer to keep the keyspaces separated.
1257 :
1258 652 : let ranges = &sparse_keyspace.0.ranges;
1259 :
1260 : // TODO: use a single overlaps_with across the codebase
1261 1956 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1262 1956 : !(a.end <= b.start || b.end <= a.start)
1263 1956 : }
1264 1956 : for i in 0..ranges.len() {
1265 1956 : for j in 0..i {
1266 1956 : if overlaps_with(&ranges[i], &ranges[j]) {
1267 0 : panic!(
1268 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1269 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1270 0 : );
1271 1956 : }
1272 : }
1273 : }
1274 1304 : for i in 1..ranges.len() {
1275 1304 : assert!(
1276 1304 : ranges[i - 1].end <= ranges[i].start,
1277 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1278 0 : ranges[i - 1].start,
1279 0 : ranges[i - 1].end,
1280 0 : ranges[i].start,
1281 0 : ranges[i].end
1282 : );
1283 : }
1284 0 : }
1285 :
1286 652 : Ok((dense_keyspace, sparse_keyspace))
1287 652 : }
1288 :
1289 : /// Get cached size of relation if it not updated after specified LSN
1290 897080 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
1291 897080 : let rel_size_cache = self.rel_size_cache.read().unwrap();
1292 897080 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
1293 897036 : if lsn >= *cached_lsn {
1294 886744 : RELSIZE_CACHE_HITS.inc();
1295 886744 : return Some(*nblocks);
1296 10292 : }
1297 10292 : RELSIZE_CACHE_MISSES_OLD.inc();
1298 44 : }
1299 10336 : RELSIZE_CACHE_MISSES.inc();
1300 10336 : None
1301 897080 : }
1302 :
1303 : /// Update cached relation size if there is no more recent update
1304 10272 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1305 10272 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1306 10272 :
1307 10272 : if lsn < rel_size_cache.complete_as_of {
1308 : // Do not cache old values. It's safe to cache the size on read, as long as
1309 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1310 : // never evict values from the cache, so if the relation size changed after
1311 : // 'lsn', the new value is already in the cache.
1312 0 : return;
1313 10272 : }
1314 10272 :
1315 10272 : match rel_size_cache.map.entry(tag) {
1316 10272 : hash_map::Entry::Occupied(mut entry) => {
1317 10272 : let cached_lsn = entry.get_mut();
1318 10272 : if lsn >= cached_lsn.0 {
1319 0 : *cached_lsn = (lsn, nblocks);
1320 10272 : }
1321 : }
1322 0 : hash_map::Entry::Vacant(entry) => {
1323 0 : entry.insert((lsn, nblocks));
1324 0 : RELSIZE_CACHE_ENTRIES.inc();
1325 0 : }
1326 : }
1327 10272 : }
1328 :
1329 : /// Store cached relation size
1330 565440 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1331 565440 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1332 565440 : if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
1333 3840 : RELSIZE_CACHE_ENTRIES.inc();
1334 561600 : }
1335 565440 : }
1336 :
1337 : /// Remove cached relation size
1338 4 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1339 4 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1340 4 : if rel_size_cache.map.remove(tag).is_some() {
1341 4 : RELSIZE_CACHE_ENTRIES.dec();
1342 4 : }
1343 4 : }
1344 : }
1345 :
1346 : /// DatadirModification represents an operation to ingest an atomic set of
1347 : /// updates to the repository.
1348 : ///
1349 : /// It is created by the 'begin_record' function. It is called for each WAL
1350 : /// record, so that all the modifications by a one WAL record appear atomic.
1351 : pub struct DatadirModification<'a> {
1352 : /// The timeline this modification applies to. You can access this to
1353 : /// read the state, but note that any pending updates are *not* reflected
1354 : /// in the state in 'tline' yet.
1355 : pub tline: &'a Timeline,
1356 :
1357 : /// Current LSN of the modification
1358 : lsn: Lsn,
1359 :
1360 : // The modifications are not applied directly to the underlying key-value store.
1361 : // The put-functions add the modifications here, and they are flushed to the
1362 : // underlying key-value store by the 'finish' function.
1363 : pending_lsns: Vec<Lsn>,
1364 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1365 : pending_nblocks: i64,
1366 :
1367 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1368 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1369 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1370 :
1371 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1372 : /// which keys are stored here.
1373 : pending_data_batch: Option<SerializedValueBatch>,
1374 :
1375 : /// For special "directory" keys that store key-value maps, track the size of the map
1376 : /// if it was updated in this modification.
1377 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1378 :
1379 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1380 : pending_metadata_bytes: usize,
1381 : }
1382 :
1383 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1384 : pub enum MetricsUpdate {
1385 : /// Set the metrics to this value
1386 : Set(u64),
1387 : /// Increment the metrics by this value
1388 : Add(u64),
1389 : /// Decrement the metrics by this value
1390 : Sub(u64),
1391 : }
1392 :
1393 : impl DatadirModification<'_> {
1394 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1395 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1396 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1397 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1398 :
1399 : /// Get the current lsn
1400 836116 : pub(crate) fn get_lsn(&self) -> Lsn {
1401 836116 : self.lsn
1402 836116 : }
1403 :
1404 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1405 0 : self.pending_data_batch
1406 0 : .as_ref()
1407 0 : .map_or(0, |b| b.buffer_size())
1408 0 : + self.pending_metadata_bytes
1409 0 : }
1410 :
1411 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1412 0 : self.pending_data_batch
1413 0 : .as_ref()
1414 0 : .is_some_and(|b| b.has_data())
1415 0 : }
1416 :
1417 : /// Returns statistics about the currently pending modifications.
1418 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1419 0 : let mut stats = DatadirModificationStats::default();
1420 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1421 0 : match value {
1422 0 : Value::Image(_) => stats.metadata_images += 1,
1423 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1424 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1425 : }
1426 : }
1427 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1428 0 : match valuemeta {
1429 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1430 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1431 0 : ValueMeta::Observed(_) => {}
1432 : }
1433 : }
1434 0 : stats
1435 0 : }
1436 :
1437 : /// Set the current lsn
1438 291716 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1439 291716 : ensure!(
1440 291716 : lsn >= self.lsn,
1441 0 : "setting an older lsn {} than {} is not allowed",
1442 : lsn,
1443 : self.lsn
1444 : );
1445 :
1446 291716 : if lsn > self.lsn {
1447 291716 : self.pending_lsns.push(self.lsn);
1448 291716 : self.lsn = lsn;
1449 291716 : }
1450 291716 : Ok(())
1451 291716 : }
1452 :
1453 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1454 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1455 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1456 : ///
1457 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1458 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1459 : /// via [`Self::get`] as soon as their record has been ingested.
1460 1701304 : fn is_data_key(key: &Key) -> bool {
1461 1701304 : key.is_rel_block_key() || key.is_slru_block_key()
1462 1701304 : }
1463 :
1464 : /// Initialize a completely new repository.
1465 : ///
1466 : /// This inserts the directory metadata entries that are assumed to
1467 : /// always exist.
1468 424 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1469 424 : let buf = DbDirectory::ser(&DbDirectory {
1470 424 : dbdirs: HashMap::new(),
1471 424 : })?;
1472 424 : self.pending_directory_entries
1473 424 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1474 424 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1475 :
1476 424 : let buf = if self.tline.pg_version >= 17 {
1477 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1478 0 : xids: HashSet::new(),
1479 0 : })
1480 : } else {
1481 424 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1482 424 : xids: HashSet::new(),
1483 424 : })
1484 0 : }?;
1485 424 : self.pending_directory_entries
1486 424 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1487 424 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1488 :
1489 424 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1490 424 : let empty_dir = Value::Image(buf);
1491 424 :
1492 424 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1493 424 : // harmless but they'd just be dropped on later compaction.
1494 424 : if self.tline.tenant_shard_id.is_shard_zero() {
1495 412 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1496 412 : self.pending_directory_entries.push((
1497 412 : DirectoryKind::SlruSegment(SlruKind::Clog),
1498 412 : MetricsUpdate::Set(0),
1499 412 : ));
1500 412 : self.put(
1501 412 : slru_dir_to_key(SlruKind::MultiXactMembers),
1502 412 : empty_dir.clone(),
1503 412 : );
1504 412 : self.pending_directory_entries.push((
1505 412 : DirectoryKind::SlruSegment(SlruKind::Clog),
1506 412 : MetricsUpdate::Set(0),
1507 412 : ));
1508 412 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1509 412 : self.pending_directory_entries.push((
1510 412 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1511 412 : MetricsUpdate::Set(0),
1512 412 : ));
1513 412 : }
1514 :
1515 424 : Ok(())
1516 424 : }
1517 :
1518 : #[cfg(test)]
1519 420 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1520 420 : self.init_empty()?;
1521 420 : self.put_control_file(bytes::Bytes::from_static(
1522 420 : b"control_file contents do not matter",
1523 420 : ))
1524 420 : .context("put_control_file")?;
1525 420 : self.put_checkpoint(bytes::Bytes::from_static(
1526 420 : b"checkpoint_file contents do not matter",
1527 420 : ))
1528 420 : .context("put_checkpoint_file")?;
1529 420 : Ok(())
1530 420 : }
1531 :
1532 : /// Creates a relation if it is not already present.
1533 : /// Returns the current size of the relation
1534 836112 : pub(crate) async fn create_relation_if_required(
1535 836112 : &mut self,
1536 836112 : rel: RelTag,
1537 836112 : ctx: &RequestContext,
1538 836112 : ) -> Result<u32, PageReconstructError> {
1539 : // Get current size and put rel creation if rel doesn't exist
1540 : //
1541 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1542 : // check the cache too. This is because eagerly checking the cache results in
1543 : // less work overall and 10% better performance. It's more work on cache miss
1544 : // but cache miss is rare.
1545 836112 : if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
1546 836092 : Ok(nblocks)
1547 20 : } else if !self
1548 20 : .tline
1549 20 : .get_rel_exists(rel, Version::Modified(self), ctx)
1550 20 : .await?
1551 : {
1552 : // create it with 0 size initially, the logic below will extend it
1553 20 : self.put_rel_creation(rel, 0, ctx)
1554 20 : .await
1555 20 : .context("Relation Error")?;
1556 20 : Ok(0)
1557 : } else {
1558 0 : self.tline
1559 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1560 0 : .await
1561 : }
1562 836112 : }
1563 :
1564 : /// Given a block number for a relation (which represents a newly written block),
1565 : /// the previous block count of the relation, and the shard info, find the gaps
1566 : /// that were created by the newly written block if any.
1567 291340 : fn find_gaps(
1568 291340 : rel: RelTag,
1569 291340 : blkno: u32,
1570 291340 : previous_nblocks: u32,
1571 291340 : shard: &ShardIdentity,
1572 291340 : ) -> Option<KeySpace> {
1573 291340 : let mut key = rel_block_to_key(rel, blkno);
1574 291340 : let mut gap_accum = None;
1575 :
1576 291340 : for gap_blkno in previous_nblocks..blkno {
1577 64 : key.field6 = gap_blkno;
1578 64 :
1579 64 : if shard.get_shard_number(&key) != shard.number {
1580 16 : continue;
1581 48 : }
1582 48 :
1583 48 : gap_accum
1584 48 : .get_or_insert_with(KeySpaceAccum::new)
1585 48 : .add_key(key);
1586 : }
1587 :
1588 291340 : gap_accum.map(|accum| accum.to_keyspace())
1589 291340 : }
1590 :
1591 291704 : pub async fn ingest_batch(
1592 291704 : &mut self,
1593 291704 : mut batch: SerializedValueBatch,
1594 291704 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1595 291704 : shard: &ShardIdentity,
1596 291704 : ctx: &RequestContext,
1597 291704 : ) -> anyhow::Result<()> {
1598 291704 : let mut gaps_at_lsns = Vec::default();
1599 :
1600 291704 : for meta in batch.metadata.iter() {
1601 291284 : let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
1602 291284 : let new_nblocks = blkno + 1;
1603 :
1604 291284 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1605 291284 : if new_nblocks > old_nblocks {
1606 4780 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1607 286504 : }
1608 :
1609 291284 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1610 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1611 291284 : }
1612 : }
1613 :
1614 291704 : if !gaps_at_lsns.is_empty() {
1615 0 : batch.zero_gaps(gaps_at_lsns);
1616 291704 : }
1617 :
1618 291704 : match self.pending_data_batch.as_mut() {
1619 40 : Some(pending_batch) => {
1620 40 : pending_batch.extend(batch);
1621 40 : }
1622 291664 : None if batch.has_data() => {
1623 291260 : self.pending_data_batch = Some(batch);
1624 291260 : }
1625 404 : None => {
1626 404 : // Nothing to initialize the batch with
1627 404 : }
1628 : }
1629 :
1630 291704 : Ok(())
1631 291704 : }
1632 :
1633 : /// Put a new page version that can be constructed from a WAL record
1634 : ///
1635 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1636 : /// current end-of-file. It's up to the caller to check that the relation size
1637 : /// matches the blocks inserted!
1638 24 : pub fn put_rel_wal_record(
1639 24 : &mut self,
1640 24 : rel: RelTag,
1641 24 : blknum: BlockNumber,
1642 24 : rec: NeonWalRecord,
1643 24 : ) -> anyhow::Result<()> {
1644 24 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1645 24 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1646 24 : Ok(())
1647 24 : }
1648 :
1649 : // Same, but for an SLRU.
1650 16 : pub fn put_slru_wal_record(
1651 16 : &mut self,
1652 16 : kind: SlruKind,
1653 16 : segno: u32,
1654 16 : blknum: BlockNumber,
1655 16 : rec: NeonWalRecord,
1656 16 : ) -> anyhow::Result<()> {
1657 16 : if !self.tline.tenant_shard_id.is_shard_zero() {
1658 0 : return Ok(());
1659 16 : }
1660 16 :
1661 16 : self.put(
1662 16 : slru_block_to_key(kind, segno, blknum),
1663 16 : Value::WalRecord(rec),
1664 16 : );
1665 16 : Ok(())
1666 16 : }
1667 :
1668 : /// Like put_wal_record, but with ready-made image of the page.
1669 555684 : pub fn put_rel_page_image(
1670 555684 : &mut self,
1671 555684 : rel: RelTag,
1672 555684 : blknum: BlockNumber,
1673 555684 : img: Bytes,
1674 555684 : ) -> anyhow::Result<()> {
1675 555684 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1676 555684 : let key = rel_block_to_key(rel, blknum);
1677 555684 : if !key.is_valid_key_on_write_path() {
1678 0 : anyhow::bail!(
1679 0 : "the request contains data not supported by pageserver at {}",
1680 0 : key
1681 0 : );
1682 555684 : }
1683 555684 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1684 555684 : Ok(())
1685 555684 : }
1686 :
1687 12 : pub fn put_slru_page_image(
1688 12 : &mut self,
1689 12 : kind: SlruKind,
1690 12 : segno: u32,
1691 12 : blknum: BlockNumber,
1692 12 : img: Bytes,
1693 12 : ) -> anyhow::Result<()> {
1694 12 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1695 :
1696 12 : let key = slru_block_to_key(kind, segno, blknum);
1697 12 : if !key.is_valid_key_on_write_path() {
1698 0 : anyhow::bail!(
1699 0 : "the request contains data not supported by pageserver at {}",
1700 0 : key
1701 0 : );
1702 12 : }
1703 12 : self.put(key, Value::Image(img));
1704 12 : Ok(())
1705 12 : }
1706 :
1707 5996 : pub(crate) fn put_rel_page_image_zero(
1708 5996 : &mut self,
1709 5996 : rel: RelTag,
1710 5996 : blknum: BlockNumber,
1711 5996 : ) -> anyhow::Result<()> {
1712 5996 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1713 5996 : let key = rel_block_to_key(rel, blknum);
1714 5996 : if !key.is_valid_key_on_write_path() {
1715 0 : anyhow::bail!(
1716 0 : "the request contains data not supported by pageserver: {} @ {}",
1717 0 : key,
1718 0 : self.lsn
1719 0 : );
1720 5996 : }
1721 5996 :
1722 5996 : let batch = self
1723 5996 : .pending_data_batch
1724 5996 : .get_or_insert_with(SerializedValueBatch::default);
1725 5996 :
1726 5996 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1727 5996 :
1728 5996 : Ok(())
1729 5996 : }
1730 :
1731 0 : pub(crate) fn put_slru_page_image_zero(
1732 0 : &mut self,
1733 0 : kind: SlruKind,
1734 0 : segno: u32,
1735 0 : blknum: BlockNumber,
1736 0 : ) -> anyhow::Result<()> {
1737 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1738 0 : let key = slru_block_to_key(kind, segno, blknum);
1739 0 : if !key.is_valid_key_on_write_path() {
1740 0 : anyhow::bail!(
1741 0 : "the request contains data not supported by pageserver: {} @ {}",
1742 0 : key,
1743 0 : self.lsn
1744 0 : );
1745 0 : }
1746 0 :
1747 0 : let batch = self
1748 0 : .pending_data_batch
1749 0 : .get_or_insert_with(SerializedValueBatch::default);
1750 0 :
1751 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1752 0 :
1753 0 : Ok(())
1754 0 : }
1755 :
1756 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
1757 : /// we enable it, we also need to persist it in `index_part.json`.
1758 3892 : pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
1759 3892 : let status = self.tline.get_rel_size_v2_status();
1760 3892 : let config = self.tline.get_rel_size_v2_enabled();
1761 3892 : match (config, status) {
1762 : (false, RelSizeMigration::Legacy) => {
1763 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
1764 3892 : Ok(false)
1765 : }
1766 : (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1767 : // index_part already persisted that the timeline has enabled rel_size_v2
1768 0 : Ok(true)
1769 : }
1770 : (true, RelSizeMigration::Legacy) => {
1771 : // The first time we enable it, we need to persist it in `index_part.json`
1772 0 : self.tline
1773 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
1774 0 : tracing::info!("enabled rel_size_v2");
1775 0 : Ok(true)
1776 : }
1777 : (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1778 : // index_part already persisted that the timeline has enabled rel_size_v2
1779 : // and we don't need to do anything
1780 0 : Ok(true)
1781 : }
1782 : }
1783 3892 : }
1784 :
1785 : /// Store a relmapper file (pg_filenode.map) in the repository
1786 32 : pub async fn put_relmap_file(
1787 32 : &mut self,
1788 32 : spcnode: Oid,
1789 32 : dbnode: Oid,
1790 32 : img: Bytes,
1791 32 : ctx: &RequestContext,
1792 32 : ) -> anyhow::Result<()> {
1793 32 : let v2_enabled = self.maybe_enable_rel_size_v2()?;
1794 :
1795 : // Add it to the directory (if it doesn't exist already)
1796 32 : let buf = self.get(DBDIR_KEY, ctx).await?;
1797 32 : let mut dbdir = DbDirectory::des(&buf)?;
1798 :
1799 32 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1800 32 : if r.is_none() || r == Some(false) {
1801 : // The dbdir entry didn't exist, or it contained a
1802 : // 'false'. The 'insert' call already updated it with
1803 : // 'true', now write the updated 'dbdirs' map back.
1804 32 : let buf = DbDirectory::ser(&dbdir)?;
1805 32 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1806 0 : }
1807 32 : if r.is_none() {
1808 : // Create RelDirectory
1809 : // TODO: if we have fully migrated to v2, no need to create this directory
1810 16 : let buf = RelDirectory::ser(&RelDirectory {
1811 16 : rels: HashSet::new(),
1812 16 : })?;
1813 16 : self.pending_directory_entries
1814 16 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1815 16 : if v2_enabled {
1816 0 : self.pending_directory_entries
1817 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1818 16 : }
1819 16 : self.put(
1820 16 : rel_dir_to_key(spcnode, dbnode),
1821 16 : Value::Image(Bytes::from(buf)),
1822 16 : );
1823 16 : }
1824 :
1825 32 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1826 32 : Ok(())
1827 32 : }
1828 :
1829 0 : pub async fn put_twophase_file(
1830 0 : &mut self,
1831 0 : xid: u64,
1832 0 : img: Bytes,
1833 0 : ctx: &RequestContext,
1834 0 : ) -> anyhow::Result<()> {
1835 : // Add it to the directory entry
1836 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1837 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1838 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1839 0 : if !dir.xids.insert(xid) {
1840 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1841 0 : }
1842 0 : self.pending_directory_entries.push((
1843 0 : DirectoryKind::TwoPhase,
1844 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1845 0 : ));
1846 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1847 : } else {
1848 0 : let xid = xid as u32;
1849 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1850 0 : if !dir.xids.insert(xid) {
1851 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1852 0 : }
1853 0 : self.pending_directory_entries.push((
1854 0 : DirectoryKind::TwoPhase,
1855 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1856 0 : ));
1857 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1858 : };
1859 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1860 0 :
1861 0 : self.put(twophase_file_key(xid), Value::Image(img));
1862 0 : Ok(())
1863 0 : }
1864 :
1865 0 : pub async fn set_replorigin(
1866 0 : &mut self,
1867 0 : origin_id: RepOriginId,
1868 0 : origin_lsn: Lsn,
1869 0 : ) -> anyhow::Result<()> {
1870 0 : let key = repl_origin_key(origin_id);
1871 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1872 0 : Ok(())
1873 0 : }
1874 :
1875 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1876 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1877 0 : }
1878 :
1879 424 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1880 424 : self.put(CONTROLFILE_KEY, Value::Image(img));
1881 424 : Ok(())
1882 424 : }
1883 :
1884 452 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1885 452 : self.put(CHECKPOINT_KEY, Value::Image(img));
1886 452 : Ok(())
1887 452 : }
1888 :
1889 0 : pub async fn drop_dbdir(
1890 0 : &mut self,
1891 0 : spcnode: Oid,
1892 0 : dbnode: Oid,
1893 0 : ctx: &RequestContext,
1894 0 : ) -> anyhow::Result<()> {
1895 0 : let total_blocks = self
1896 0 : .tline
1897 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1898 0 : .await?;
1899 :
1900 : // Remove entry from dbdir
1901 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1902 0 : let mut dir = DbDirectory::des(&buf)?;
1903 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1904 0 : let buf = DbDirectory::ser(&dir)?;
1905 0 : self.pending_directory_entries.push((
1906 0 : DirectoryKind::Db,
1907 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
1908 0 : ));
1909 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1910 : } else {
1911 0 : warn!(
1912 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1913 : spcnode, dbnode
1914 : );
1915 : }
1916 :
1917 : // Update logical database size.
1918 0 : self.pending_nblocks -= total_blocks as i64;
1919 0 :
1920 0 : // Delete all relations and metadata files for the spcnode/dnode
1921 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1922 0 : Ok(())
1923 0 : }
1924 :
1925 : /// Create a relation fork.
1926 : ///
1927 : /// 'nblocks' is the initial size.
1928 3840 : pub async fn put_rel_creation(
1929 3840 : &mut self,
1930 3840 : rel: RelTag,
1931 3840 : nblocks: BlockNumber,
1932 3840 : ctx: &RequestContext,
1933 3840 : ) -> Result<(), RelationError> {
1934 3840 : if rel.relnode == 0 {
1935 0 : return Err(RelationError::InvalidRelnode);
1936 3840 : }
1937 : // It's possible that this is the first rel for this db in this
1938 : // tablespace. Create the reldir entry for it if so.
1939 3840 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1940 3840 : .context("deserialize db")?;
1941 :
1942 3840 : let dbdir_exists =
1943 3840 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1944 : // Didn't exist. Update dbdir
1945 16 : e.insert(false);
1946 16 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1947 16 : self.pending_directory_entries.push((
1948 16 : DirectoryKind::Db,
1949 16 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
1950 16 : ));
1951 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1952 16 : false
1953 : } else {
1954 3824 : true
1955 : };
1956 :
1957 3840 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1958 3840 : let mut rel_dir = if !dbdir_exists {
1959 : // Create the RelDirectory
1960 16 : RelDirectory::default()
1961 : } else {
1962 : // reldir already exists, fetch it
1963 3824 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1964 3824 : .context("deserialize db")?
1965 : };
1966 :
1967 3840 : let v2_enabled = self.maybe_enable_rel_size_v2()?;
1968 :
1969 3840 : if v2_enabled {
1970 0 : if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
1971 0 : return Err(RelationError::AlreadyExists);
1972 0 : }
1973 0 : let sparse_rel_dir_key =
1974 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
1975 : // check if the rel_dir_key exists in v2
1976 0 : let val = self
1977 0 : .sparse_get(sparse_rel_dir_key, ctx)
1978 0 : .await
1979 0 : .map_err(|e| RelationError::Other(e.into()))?;
1980 0 : let val = RelDirExists::decode_option(val)
1981 0 : .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
1982 0 : if val == RelDirExists::Exists {
1983 0 : return Err(RelationError::AlreadyExists);
1984 0 : }
1985 0 : self.put(
1986 0 : sparse_rel_dir_key,
1987 0 : Value::Image(RelDirExists::Exists.encode()),
1988 0 : );
1989 0 : if !dbdir_exists {
1990 0 : self.pending_directory_entries
1991 0 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1992 0 : self.pending_directory_entries
1993 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1994 0 : // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
1995 0 : // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
1996 0 : // will be key not found errors if we don't create an empty one for rel_size_v2.
1997 0 : self.put(
1998 0 : rel_dir_key,
1999 0 : Value::Image(Bytes::from(
2000 0 : RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
2001 : )),
2002 : );
2003 0 : }
2004 0 : self.pending_directory_entries
2005 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2006 : } else {
2007 : // Add the new relation to the rel directory entry, and write it back
2008 3840 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2009 0 : return Err(RelationError::AlreadyExists);
2010 3840 : }
2011 3840 : if !dbdir_exists {
2012 16 : self.pending_directory_entries
2013 16 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2014 3824 : }
2015 3840 : self.pending_directory_entries
2016 3840 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2017 3840 : self.put(
2018 3840 : rel_dir_key,
2019 3840 : Value::Image(Bytes::from(
2020 3840 : RelDirectory::ser(&rel_dir).context("serialize")?,
2021 : )),
2022 : );
2023 : }
2024 :
2025 : // Put size
2026 3840 : let size_key = rel_size_to_key(rel);
2027 3840 : let buf = nblocks.to_le_bytes();
2028 3840 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2029 3840 :
2030 3840 : self.pending_nblocks += nblocks as i64;
2031 3840 :
2032 3840 : // Update relation size cache
2033 3840 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2034 3840 :
2035 3840 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2036 3840 : // caller.
2037 3840 : Ok(())
2038 3840 : }
2039 :
2040 : /// Truncate relation
2041 12024 : pub async fn put_rel_truncation(
2042 12024 : &mut self,
2043 12024 : rel: RelTag,
2044 12024 : nblocks: BlockNumber,
2045 12024 : ctx: &RequestContext,
2046 12024 : ) -> anyhow::Result<()> {
2047 12024 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
2048 12024 : if self
2049 12024 : .tline
2050 12024 : .get_rel_exists(rel, Version::Modified(self), ctx)
2051 12024 : .await?
2052 : {
2053 12024 : let size_key = rel_size_to_key(rel);
2054 : // Fetch the old size first
2055 12024 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2056 12024 :
2057 12024 : // Update the entry with the new size.
2058 12024 : let buf = nblocks.to_le_bytes();
2059 12024 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2060 12024 :
2061 12024 : // Update relation size cache
2062 12024 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2063 12024 :
2064 12024 : // Update logical database size.
2065 12024 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2066 0 : }
2067 12024 : Ok(())
2068 12024 : }
2069 :
2070 : /// Extend relation
2071 : /// If new size is smaller, do nothing.
2072 553360 : pub async fn put_rel_extend(
2073 553360 : &mut self,
2074 553360 : rel: RelTag,
2075 553360 : nblocks: BlockNumber,
2076 553360 : ctx: &RequestContext,
2077 553360 : ) -> anyhow::Result<()> {
2078 553360 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
2079 :
2080 : // Put size
2081 553360 : let size_key = rel_size_to_key(rel);
2082 553360 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2083 553360 :
2084 553360 : // only extend relation here. never decrease the size
2085 553360 : if nblocks > old_size {
2086 549576 : let buf = nblocks.to_le_bytes();
2087 549576 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2088 549576 :
2089 549576 : // Update relation size cache
2090 549576 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2091 549576 :
2092 549576 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2093 549576 : }
2094 553360 : Ok(())
2095 553360 : }
2096 :
2097 : /// Drop some relations
2098 20 : pub(crate) async fn put_rel_drops(
2099 20 : &mut self,
2100 20 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2101 20 : ctx: &RequestContext,
2102 20 : ) -> anyhow::Result<()> {
2103 20 : let v2_enabled = self.maybe_enable_rel_size_v2()?;
2104 24 : for ((spc_node, db_node), rel_tags) in drop_relations {
2105 4 : let dir_key = rel_dir_to_key(spc_node, db_node);
2106 4 : let buf = self.get(dir_key, ctx).await?;
2107 4 : let mut dir = RelDirectory::des(&buf)?;
2108 :
2109 4 : let mut dirty = false;
2110 8 : for rel_tag in rel_tags {
2111 4 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2112 4 : self.pending_directory_entries
2113 4 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2114 4 : dirty = true;
2115 4 : true
2116 0 : } else if v2_enabled {
2117 : // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
2118 : // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
2119 : // logic).
2120 0 : let key =
2121 0 : rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2122 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2123 0 : .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
2124 0 : if val == RelDirExists::Exists {
2125 0 : self.pending_directory_entries
2126 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2127 0 : // put tombstone
2128 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2129 0 : // no need to set dirty to true
2130 0 : true
2131 : } else {
2132 0 : false
2133 : }
2134 : } else {
2135 0 : false
2136 : };
2137 :
2138 4 : if found {
2139 : // update logical size
2140 4 : let size_key = rel_size_to_key(rel_tag);
2141 4 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2142 4 : self.pending_nblocks -= old_size as i64;
2143 4 :
2144 4 : // Remove entry from relation size cache
2145 4 : self.tline.remove_cached_rel_size(&rel_tag);
2146 4 :
2147 4 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2148 4 : self.delete(rel_key_range(rel_tag));
2149 0 : }
2150 : }
2151 :
2152 4 : if dirty {
2153 4 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2154 0 : }
2155 : }
2156 :
2157 20 : Ok(())
2158 20 : }
2159 :
2160 12 : pub async fn put_slru_segment_creation(
2161 12 : &mut self,
2162 12 : kind: SlruKind,
2163 12 : segno: u32,
2164 12 : nblocks: BlockNumber,
2165 12 : ctx: &RequestContext,
2166 12 : ) -> anyhow::Result<()> {
2167 12 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2168 :
2169 : // Add it to the directory entry
2170 12 : let dir_key = slru_dir_to_key(kind);
2171 12 : let buf = self.get(dir_key, ctx).await?;
2172 12 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2173 :
2174 12 : if !dir.segments.insert(segno) {
2175 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
2176 12 : }
2177 12 : self.pending_directory_entries.push((
2178 12 : DirectoryKind::SlruSegment(kind),
2179 12 : MetricsUpdate::Set(dir.segments.len() as u64),
2180 12 : ));
2181 12 : self.put(
2182 12 : dir_key,
2183 12 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2184 : );
2185 :
2186 : // Put size
2187 12 : let size_key = slru_segment_size_to_key(kind, segno);
2188 12 : let buf = nblocks.to_le_bytes();
2189 12 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2190 12 :
2191 12 : // even if nblocks > 0, we don't insert any actual blocks here
2192 12 :
2193 12 : Ok(())
2194 12 : }
2195 :
2196 : /// Extend SLRU segment
2197 0 : pub fn put_slru_extend(
2198 0 : &mut self,
2199 0 : kind: SlruKind,
2200 0 : segno: u32,
2201 0 : nblocks: BlockNumber,
2202 0 : ) -> anyhow::Result<()> {
2203 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2204 :
2205 : // Put size
2206 0 : let size_key = slru_segment_size_to_key(kind, segno);
2207 0 : let buf = nblocks.to_le_bytes();
2208 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2209 0 : Ok(())
2210 0 : }
2211 :
2212 : /// This method is used for marking truncated SLRU files
2213 0 : pub async fn drop_slru_segment(
2214 0 : &mut self,
2215 0 : kind: SlruKind,
2216 0 : segno: u32,
2217 0 : ctx: &RequestContext,
2218 0 : ) -> anyhow::Result<()> {
2219 0 : // Remove it from the directory entry
2220 0 : let dir_key = slru_dir_to_key(kind);
2221 0 : let buf = self.get(dir_key, ctx).await?;
2222 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2223 :
2224 0 : if !dir.segments.remove(&segno) {
2225 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2226 0 : }
2227 0 : self.pending_directory_entries.push((
2228 0 : DirectoryKind::SlruSegment(kind),
2229 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2230 0 : ));
2231 0 : self.put(
2232 0 : dir_key,
2233 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2234 : );
2235 :
2236 : // Delete size entry, as well as all blocks
2237 0 : self.delete(slru_segment_key_range(kind, segno));
2238 0 :
2239 0 : Ok(())
2240 0 : }
2241 :
2242 : /// Drop a relmapper file (pg_filenode.map)
2243 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
2244 0 : // TODO
2245 0 : Ok(())
2246 0 : }
2247 :
2248 : /// This method is used for marking truncated SLRU files
2249 0 : pub async fn drop_twophase_file(
2250 0 : &mut self,
2251 0 : xid: u64,
2252 0 : ctx: &RequestContext,
2253 0 : ) -> anyhow::Result<()> {
2254 : // Remove it from the directory entry
2255 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2256 0 : let newdirbuf = if self.tline.pg_version >= 17 {
2257 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2258 :
2259 0 : if !dir.xids.remove(&xid) {
2260 0 : warn!("twophase file for xid {} does not exist", xid);
2261 0 : }
2262 0 : self.pending_directory_entries.push((
2263 0 : DirectoryKind::TwoPhase,
2264 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2265 0 : ));
2266 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2267 : } else {
2268 0 : let xid: u32 = u32::try_from(xid)?;
2269 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2270 :
2271 0 : if !dir.xids.remove(&xid) {
2272 0 : warn!("twophase file for xid {} does not exist", xid);
2273 0 : }
2274 0 : self.pending_directory_entries.push((
2275 0 : DirectoryKind::TwoPhase,
2276 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2277 0 : ));
2278 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2279 : };
2280 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2281 0 :
2282 0 : // Delete it
2283 0 : self.delete(twophase_key_range(xid));
2284 0 :
2285 0 : Ok(())
2286 0 : }
2287 :
2288 32 : pub async fn put_file(
2289 32 : &mut self,
2290 32 : path: &str,
2291 32 : content: &[u8],
2292 32 : ctx: &RequestContext,
2293 32 : ) -> anyhow::Result<()> {
2294 32 : let key = aux_file::encode_aux_file_key(path);
2295 : // retrieve the key from the engine
2296 32 : let old_val = match self.get(key, ctx).await {
2297 8 : Ok(val) => Some(val),
2298 24 : Err(PageReconstructError::MissingKey(_)) => None,
2299 0 : Err(e) => return Err(e.into()),
2300 : };
2301 32 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2302 8 : aux_file::decode_file_value(old_val)?
2303 : } else {
2304 24 : Vec::new()
2305 : };
2306 32 : let mut other_files = Vec::with_capacity(files.len());
2307 32 : let mut modifying_file = None;
2308 40 : for file @ (p, content) in files {
2309 8 : if path == p {
2310 8 : assert!(
2311 8 : modifying_file.is_none(),
2312 0 : "duplicated entries found for {}",
2313 : path
2314 : );
2315 8 : modifying_file = Some(content);
2316 0 : } else {
2317 0 : other_files.push(file);
2318 0 : }
2319 : }
2320 32 : let mut new_files = other_files;
2321 32 : match (modifying_file, content.is_empty()) {
2322 4 : (Some(old_content), false) => {
2323 4 : self.tline
2324 4 : .aux_file_size_estimator
2325 4 : .on_update(old_content.len(), content.len());
2326 4 : new_files.push((path, content));
2327 4 : }
2328 4 : (Some(old_content), true) => {
2329 4 : self.tline
2330 4 : .aux_file_size_estimator
2331 4 : .on_remove(old_content.len());
2332 4 : // not adding the file key to the final `new_files` vec.
2333 4 : }
2334 24 : (None, false) => {
2335 24 : self.tline.aux_file_size_estimator.on_add(content.len());
2336 24 : new_files.push((path, content));
2337 24 : }
2338 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2339 : // Compute doesn't know if previous version of this file exists or not, so
2340 : // attempt to delete non-existing file can cause this message.
2341 : // To avoid false alarms, log it as info rather than warning.
2342 0 : (None, true) if path.starts_with("pg_stat/") => {
2343 0 : info!("removing non-existing pg_stat file: {}", path)
2344 : }
2345 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2346 : }
2347 32 : let new_val = aux_file::encode_file_value(&new_files)?;
2348 32 : self.put(key, Value::Image(new_val.into()));
2349 32 :
2350 32 : Ok(())
2351 32 : }
2352 :
2353 : ///
2354 : /// Flush changes accumulated so far to the underlying repository.
2355 : ///
2356 : /// Usually, changes made in DatadirModification are atomic, but this allows
2357 : /// you to flush them to the underlying repository before the final `commit`.
2358 : /// That allows to free up the memory used to hold the pending changes.
2359 : ///
2360 : /// Currently only used during bulk import of a data directory. In that
2361 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2362 : /// whole import fails and the timeline will be deleted anyway.
2363 : /// (Or to be precise, it will be left behind for debugging purposes and
2364 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2365 : ///
2366 : /// Note: A consequence of flushing the pending operations is that they
2367 : /// won't be visible to subsequent operations until `commit`. The function
2368 : /// retains all the metadata, but data pages are flushed. That's again OK
2369 : /// for bulk import, where you are just loading data pages and won't try to
2370 : /// modify the same pages twice.
2371 3860 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2372 3860 : // Unless we have accumulated a decent amount of changes, it's not worth it
2373 3860 : // to scan through the pending_updates list.
2374 3860 : let pending_nblocks = self.pending_nblocks;
2375 3860 : if pending_nblocks < 10000 {
2376 3860 : return Ok(());
2377 0 : }
2378 :
2379 0 : let mut writer = self.tline.writer().await;
2380 :
2381 : // Flush relation and SLRU data blocks, keep metadata.
2382 0 : if let Some(batch) = self.pending_data_batch.take() {
2383 0 : tracing::debug!(
2384 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2385 0 : batch.max_lsn,
2386 0 : self.tline.get_last_record_lsn()
2387 : );
2388 :
2389 : // This bails out on first error without modifying pending_updates.
2390 : // That's Ok, cf this function's doc comment.
2391 0 : writer.put_batch(batch, ctx).await?;
2392 0 : }
2393 :
2394 0 : if pending_nblocks != 0 {
2395 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2396 0 : self.pending_nblocks = 0;
2397 0 : }
2398 :
2399 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2400 0 : writer.update_directory_entries_count(kind, count);
2401 0 : }
2402 :
2403 0 : Ok(())
2404 3860 : }
2405 :
2406 : ///
2407 : /// Finish this atomic update, writing all the updated keys to the
2408 : /// underlying timeline.
2409 : /// All the modifications in this atomic update are stamped by the specified LSN.
2410 : ///
2411 1486196 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2412 1486196 : let mut writer = self.tline.writer().await;
2413 :
2414 1486196 : let pending_nblocks = self.pending_nblocks;
2415 1486196 : self.pending_nblocks = 0;
2416 :
2417 : // Ordering: the items in this batch do not need to be in any global order, but values for
2418 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2419 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2420 : // more details.
2421 :
2422 1486196 : let metadata_batch = {
2423 1486196 : let pending_meta = self
2424 1486196 : .pending_metadata_pages
2425 1486196 : .drain()
2426 1486196 : .flat_map(|(key, values)| {
2427 548092 : values
2428 548092 : .into_iter()
2429 548092 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2430 1486196 : })
2431 1486196 : .collect::<Vec<_>>();
2432 1486196 :
2433 1486196 : if pending_meta.is_empty() {
2434 944556 : None
2435 : } else {
2436 541640 : Some(SerializedValueBatch::from_values(pending_meta))
2437 : }
2438 : };
2439 :
2440 1486196 : let data_batch = self.pending_data_batch.take();
2441 :
2442 1486196 : let maybe_batch = match (data_batch, metadata_batch) {
2443 529112 : (Some(mut data), Some(metadata)) => {
2444 529112 : data.extend(metadata);
2445 529112 : Some(data)
2446 : }
2447 286524 : (Some(data), None) => Some(data),
2448 12528 : (None, Some(metadata)) => Some(metadata),
2449 658032 : (None, None) => None,
2450 : };
2451 :
2452 1486196 : if let Some(batch) = maybe_batch {
2453 828164 : tracing::debug!(
2454 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2455 0 : batch.max_lsn,
2456 0 : self.tline.get_last_record_lsn()
2457 : );
2458 :
2459 : // This bails out on first error without modifying pending_updates.
2460 : // That's Ok, cf this function's doc comment.
2461 828164 : writer.put_batch(batch, ctx).await?;
2462 658032 : }
2463 :
2464 1486196 : if !self.pending_deletions.is_empty() {
2465 4 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2466 4 : self.pending_deletions.clear();
2467 1486192 : }
2468 :
2469 1486196 : self.pending_lsns.push(self.lsn);
2470 1777912 : for pending_lsn in self.pending_lsns.drain(..) {
2471 1777912 : // TODO(vlad): pretty sure the comment below is not valid anymore
2472 1777912 : // and we can call finish write with the latest LSN
2473 1777912 : //
2474 1777912 : // Ideally, we should be able to call writer.finish_write() only once
2475 1777912 : // with the highest LSN. However, the last_record_lsn variable in the
2476 1777912 : // timeline keeps track of the latest LSN and the immediate previous LSN
2477 1777912 : // so we need to record every LSN to not leave a gap between them.
2478 1777912 : writer.finish_write(pending_lsn);
2479 1777912 : }
2480 :
2481 1486196 : if pending_nblocks != 0 {
2482 541140 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2483 945056 : }
2484 :
2485 1486196 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2486 5988 : writer.update_directory_entries_count(kind, count);
2487 5988 : }
2488 :
2489 1486196 : self.pending_metadata_bytes = 0;
2490 1486196 :
2491 1486196 : Ok(())
2492 1486196 : }
2493 :
2494 583408 : pub(crate) fn len(&self) -> usize {
2495 583408 : self.pending_metadata_pages.len()
2496 583408 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2497 583408 : + self.pending_deletions.len()
2498 583408 : }
2499 :
2500 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2501 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2502 : /// in the modification.
2503 : ///
2504 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2505 : /// page must ensure that the pages they read are already committed in Timeline, for example
2506 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2507 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2508 : /// and not data pages.
2509 573172 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2510 573172 : if !Self::is_data_key(&key) {
2511 : // Have we already updated the same key? Read the latest pending updated
2512 : // version in that case.
2513 : //
2514 : // Note: we don't check pending_deletions. It is an error to request a
2515 : // value that has been removed, deletion only avoids leaking storage.
2516 573172 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2517 31856 : if let Some((_, _, value)) = values.last() {
2518 31856 : return if let Value::Image(img) = value {
2519 31856 : Ok(img.clone())
2520 : } else {
2521 : // Currently, we never need to read back a WAL record that we
2522 : // inserted in the same "transaction". All the metadata updates
2523 : // work directly with Images, and we never need to read actual
2524 : // data pages. We could handle this if we had to, by calling
2525 : // the walredo manager, but let's keep it simple for now.
2526 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2527 0 : "unexpected pending WAL record"
2528 0 : )))
2529 : };
2530 0 : }
2531 541316 : }
2532 : } else {
2533 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2534 : // this key should never be present in pending_data_pages. We ensure this by committing
2535 : // modifications before ingesting DB create operations, which are the only kind that reads
2536 : // data pages during ingest.
2537 0 : if cfg!(debug_assertions) {
2538 0 : assert!(
2539 0 : !self
2540 0 : .pending_data_batch
2541 0 : .as_ref()
2542 0 : .is_some_and(|b| b.updates_key(&key))
2543 0 : );
2544 0 : }
2545 : }
2546 :
2547 : // Metadata page cache miss, or we're reading a data page.
2548 541316 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2549 541316 : self.tline.get(key, lsn, ctx).await
2550 573172 : }
2551 :
2552 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2553 : /// and the empty value into None.
2554 0 : async fn sparse_get(
2555 0 : &self,
2556 0 : key: Key,
2557 0 : ctx: &RequestContext,
2558 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2559 0 : let val = self.get(key, ctx).await;
2560 0 : match val {
2561 0 : Ok(val) if val.is_empty() => Ok(None),
2562 0 : Ok(val) => Ok(Some(val)),
2563 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2564 0 : Err(e) => Err(e),
2565 : }
2566 0 : }
2567 :
2568 1128132 : fn put(&mut self, key: Key, val: Value) {
2569 1128132 : if Self::is_data_key(&key) {
2570 555736 : self.put_data(key.to_compact(), val)
2571 : } else {
2572 572396 : self.put_metadata(key.to_compact(), val)
2573 : }
2574 1128132 : }
2575 :
2576 555736 : fn put_data(&mut self, key: CompactKey, val: Value) {
2577 555736 : let batch = self
2578 555736 : .pending_data_batch
2579 555736 : .get_or_insert_with(SerializedValueBatch::default);
2580 555736 : batch.put(key, val, self.lsn);
2581 555736 : }
2582 :
2583 572396 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2584 572396 : let values = self.pending_metadata_pages.entry(key).or_default();
2585 : // Replace the previous value if it exists at the same lsn
2586 572396 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2587 24304 : if *last_lsn == self.lsn {
2588 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2589 24304 : self.pending_metadata_bytes -= *last_value_ser_size;
2590 24304 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2591 24304 : self.pending_metadata_bytes += *last_value_ser_size;
2592 24304 :
2593 24304 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2594 24304 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2595 24304 : *last_value = val;
2596 24304 : return;
2597 0 : }
2598 548092 : }
2599 :
2600 548092 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2601 548092 : self.pending_metadata_bytes += val_serialized_size;
2602 548092 : values.push((self.lsn, val_serialized_size, val));
2603 548092 :
2604 548092 : if key == CHECKPOINT_KEY.to_compact() {
2605 452 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2606 547640 : }
2607 572396 : }
2608 :
2609 4 : fn delete(&mut self, key_range: Range<Key>) {
2610 4 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2611 4 : self.pending_deletions.push((key_range, self.lsn));
2612 4 : }
2613 : }
2614 :
2615 : /// Statistics for a DatadirModification.
2616 : #[derive(Default)]
2617 : pub struct DatadirModificationStats {
2618 : pub metadata_images: u64,
2619 : pub metadata_deltas: u64,
2620 : pub data_images: u64,
2621 : pub data_deltas: u64,
2622 : }
2623 :
2624 : /// This struct facilitates accessing either a committed key from the timeline at a
2625 : /// specific LSN, or the latest uncommitted key from a pending modification.
2626 : ///
2627 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2628 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2629 : /// need to look up the keys in the modification first before looking them up in the
2630 : /// timeline to not miss the latest updates.
2631 : #[derive(Clone, Copy)]
2632 : pub enum Version<'a> {
2633 : Lsn(Lsn),
2634 : Modified(&'a DatadirModification<'a>),
2635 : }
2636 :
2637 : impl Version<'_> {
2638 10352 : async fn get(
2639 10352 : &self,
2640 10352 : timeline: &Timeline,
2641 10352 : key: Key,
2642 10352 : ctx: &RequestContext,
2643 10352 : ) -> Result<Bytes, PageReconstructError> {
2644 10352 : match self {
2645 10312 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2646 40 : Version::Modified(modification) => modification.get(key, ctx).await,
2647 : }
2648 10352 : }
2649 :
2650 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2651 : /// and the empty value into None.
2652 0 : async fn sparse_get(
2653 0 : &self,
2654 0 : timeline: &Timeline,
2655 0 : key: Key,
2656 0 : ctx: &RequestContext,
2657 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2658 0 : let val = self.get(timeline, key, ctx).await;
2659 0 : match val {
2660 0 : Ok(val) if val.is_empty() => Ok(None),
2661 0 : Ok(val) => Ok(Some(val)),
2662 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2663 0 : Err(e) => Err(e),
2664 : }
2665 0 : }
2666 :
2667 71240 : fn get_lsn(&self) -> Lsn {
2668 71240 : match self {
2669 59148 : Version::Lsn(lsn) => *lsn,
2670 12092 : Version::Modified(modification) => modification.lsn,
2671 : }
2672 71240 : }
2673 : }
2674 :
2675 : //--- Metadata structs stored in key-value pairs in the repository.
2676 :
2677 0 : #[derive(Debug, Serialize, Deserialize)]
2678 : pub(crate) struct DbDirectory {
2679 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2680 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2681 : }
2682 :
2683 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2684 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2685 : // were named like "pg_twophase/000002E5", now they're like
2686 : // "pg_twophsae/0000000A000002E4".
2687 :
2688 0 : #[derive(Debug, Serialize, Deserialize)]
2689 : pub(crate) struct TwoPhaseDirectory {
2690 : pub(crate) xids: HashSet<TransactionId>,
2691 : }
2692 :
2693 0 : #[derive(Debug, Serialize, Deserialize)]
2694 : struct TwoPhaseDirectoryV17 {
2695 : xids: HashSet<u64>,
2696 : }
2697 :
2698 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2699 : pub(crate) struct RelDirectory {
2700 : // Set of relations that exist. (relfilenode, forknum)
2701 : //
2702 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2703 : // key-value pairs, if you have a lot of relations
2704 : pub(crate) rels: HashSet<(Oid, u8)>,
2705 : }
2706 :
2707 0 : #[derive(Debug, Serialize, Deserialize)]
2708 : struct RelSizeEntry {
2709 : nblocks: u32,
2710 : }
2711 :
2712 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2713 : pub(crate) struct SlruSegmentDirectory {
2714 : // Set of SLRU segments that exist.
2715 : pub(crate) segments: HashSet<u32>,
2716 : }
2717 :
2718 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2719 : #[repr(u8)]
2720 : pub(crate) enum DirectoryKind {
2721 : Db,
2722 : TwoPhase,
2723 : Rel,
2724 : AuxFiles,
2725 : SlruSegment(SlruKind),
2726 : RelV2,
2727 : }
2728 :
2729 : impl DirectoryKind {
2730 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2731 17968 : pub(crate) fn offset(&self) -> usize {
2732 17968 : self.into_usize()
2733 17968 : }
2734 : }
2735 :
2736 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2737 :
2738 : #[allow(clippy::bool_assert_comparison)]
2739 : #[cfg(test)]
2740 : mod tests {
2741 : use hex_literal::hex;
2742 : use pageserver_api::models::ShardParameters;
2743 : use pageserver_api::shard::ShardStripeSize;
2744 : use utils::id::TimelineId;
2745 : use utils::shard::{ShardCount, ShardNumber};
2746 :
2747 : use super::*;
2748 : use crate::DEFAULT_PG_VERSION;
2749 : use crate::tenant::harness::TenantHarness;
2750 :
2751 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2752 : #[tokio::test]
2753 4 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2754 4 : let name = "aux_files_round_trip";
2755 4 : let harness = TenantHarness::create(name).await?;
2756 4 :
2757 4 : pub const TIMELINE_ID: TimelineId =
2758 4 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2759 4 :
2760 4 : let (tenant, ctx) = harness.load().await;
2761 4 : let (tline, ctx) = tenant
2762 4 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2763 4 : .await?;
2764 4 : let tline = tline.raw_timeline().unwrap();
2765 4 :
2766 4 : // First modification: insert two keys
2767 4 : let mut modification = tline.begin_modification(Lsn(0x1000));
2768 4 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2769 4 : modification.set_lsn(Lsn(0x1008))?;
2770 4 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2771 4 : modification.commit(&ctx).await?;
2772 4 : let expect_1008 = HashMap::from([
2773 4 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2774 4 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2775 4 : ]);
2776 4 :
2777 4 : let io_concurrency = IoConcurrency::spawn_for_test();
2778 4 :
2779 4 : let readback = tline
2780 4 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2781 4 : .await?;
2782 4 : assert_eq!(readback, expect_1008);
2783 4 :
2784 4 : // Second modification: update one key, remove the other
2785 4 : let mut modification = tline.begin_modification(Lsn(0x2000));
2786 4 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2787 4 : modification.set_lsn(Lsn(0x2008))?;
2788 4 : modification.put_file("foo/bar2", b"", &ctx).await?;
2789 4 : modification.commit(&ctx).await?;
2790 4 : let expect_2008 =
2791 4 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2792 4 :
2793 4 : let readback = tline
2794 4 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2795 4 : .await?;
2796 4 : assert_eq!(readback, expect_2008);
2797 4 :
2798 4 : // Reading back in time works
2799 4 : let readback = tline
2800 4 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2801 4 : .await?;
2802 4 : assert_eq!(readback, expect_1008);
2803 4 :
2804 4 : Ok(())
2805 4 : }
2806 :
2807 : #[test]
2808 4 : fn gap_finding() {
2809 4 : let rel = RelTag {
2810 4 : spcnode: 1663,
2811 4 : dbnode: 208101,
2812 4 : relnode: 2620,
2813 4 : forknum: 0,
2814 4 : };
2815 4 : let base_blkno = 1;
2816 4 :
2817 4 : let base_key = rel_block_to_key(rel, base_blkno);
2818 4 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2819 4 :
2820 4 : let shard = ShardIdentity::unsharded();
2821 4 :
2822 4 : let mut previous_nblocks = 0;
2823 44 : for i in 0..10 {
2824 40 : let crnt_blkno = base_blkno + i;
2825 40 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2826 40 :
2827 40 : previous_nblocks = crnt_blkno + 1;
2828 40 :
2829 40 : if i == 0 {
2830 : // The first block we write is 1, so we should find the gap.
2831 4 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2832 : } else {
2833 36 : assert!(gaps.is_none());
2834 : }
2835 : }
2836 :
2837 : // This is an update to an already existing block. No gaps here.
2838 4 : let update_blkno = 5;
2839 4 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2840 4 : assert!(gaps.is_none());
2841 :
2842 : // This is an update past the current end block.
2843 4 : let after_gap_blkno = 20;
2844 4 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2845 4 :
2846 4 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2847 4 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2848 4 : assert_eq!(
2849 4 : gaps.unwrap(),
2850 4 : KeySpace::single(gap_start_key..after_gap_key)
2851 4 : );
2852 4 : }
2853 :
2854 : #[test]
2855 4 : fn sharded_gap_finding() {
2856 4 : let rel = RelTag {
2857 4 : spcnode: 1663,
2858 4 : dbnode: 208101,
2859 4 : relnode: 2620,
2860 4 : forknum: 0,
2861 4 : };
2862 4 :
2863 4 : let first_blkno = 6;
2864 4 :
2865 4 : // This shard will get the even blocks
2866 4 : let shard = ShardIdentity::from_params(
2867 4 : ShardNumber(0),
2868 4 : &ShardParameters {
2869 4 : count: ShardCount(2),
2870 4 : stripe_size: ShardStripeSize(1),
2871 4 : },
2872 4 : );
2873 4 :
2874 4 : // Only keys belonging to this shard are considered as gaps.
2875 4 : let mut previous_nblocks = 0;
2876 4 : let gaps =
2877 4 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2878 4 : assert!(!gaps.ranges.is_empty());
2879 12 : for gap_range in gaps.ranges {
2880 8 : let mut k = gap_range.start;
2881 16 : while k != gap_range.end {
2882 8 : assert_eq!(shard.get_shard_number(&k), shard.number);
2883 8 : k = k.next();
2884 : }
2885 : }
2886 :
2887 4 : previous_nblocks = first_blkno;
2888 4 :
2889 4 : let update_blkno = 2;
2890 4 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2891 4 : assert!(gaps.is_none());
2892 4 : }
2893 :
2894 : /*
2895 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2896 : let incremental = timeline.get_current_logical_size();
2897 : let non_incremental = timeline
2898 : .get_current_logical_size_non_incremental(lsn)
2899 : .unwrap();
2900 : assert_eq!(incremental, non_incremental);
2901 : }
2902 : */
2903 :
2904 : /*
2905 : ///
2906 : /// Test list_rels() function, with branches and dropped relations
2907 : ///
2908 : #[test]
2909 : fn test_list_rels_drop() -> Result<()> {
2910 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2911 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2912 : const TESTDB: u32 = 111;
2913 :
2914 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2915 : // after branching fails below
2916 : let mut writer = tline.begin_record(Lsn(0x10));
2917 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2918 : writer.finish()?;
2919 :
2920 : // Create a relation on the timeline
2921 : let mut writer = tline.begin_record(Lsn(0x20));
2922 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2923 : writer.finish()?;
2924 :
2925 : let writer = tline.begin_record(Lsn(0x00));
2926 : writer.finish()?;
2927 :
2928 : // Check that list_rels() lists it after LSN 2, but no before it
2929 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2930 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2931 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2932 :
2933 : // Create a branch, check that the relation is visible there
2934 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2935 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2936 : Some(timeline) => timeline,
2937 : None => panic!("Should have a local timeline"),
2938 : };
2939 : let newtline = DatadirTimelineImpl::new(newtline);
2940 : assert!(newtline
2941 : .list_rels(0, TESTDB, Lsn(0x30))?
2942 : .contains(&TESTREL_A));
2943 :
2944 : // Drop it on the branch
2945 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2946 : new_writer.drop_relation(TESTREL_A)?;
2947 : new_writer.finish()?;
2948 :
2949 : // Check that it's no longer listed on the branch after the point where it was dropped
2950 : assert!(newtline
2951 : .list_rels(0, TESTDB, Lsn(0x30))?
2952 : .contains(&TESTREL_A));
2953 : assert!(!newtline
2954 : .list_rels(0, TESTDB, Lsn(0x40))?
2955 : .contains(&TESTREL_A));
2956 :
2957 : // Run checkpoint and garbage collection and check that it's still not visible
2958 : newtline.checkpoint(CheckpointConfig::Forced)?;
2959 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2960 :
2961 : assert!(!newtline
2962 : .list_rels(0, TESTDB, Lsn(0x40))?
2963 : .contains(&TESTREL_A));
2964 :
2965 : Ok(())
2966 : }
2967 : */
2968 :
2969 : /*
2970 : #[test]
2971 : fn test_read_beyond_eof() -> Result<()> {
2972 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2973 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2974 :
2975 : make_some_layers(&tline, Lsn(0x20))?;
2976 : let mut writer = tline.begin_record(Lsn(0x60));
2977 : walingest.put_rel_page_image(
2978 : &mut writer,
2979 : TESTREL_A,
2980 : 0,
2981 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2982 : )?;
2983 : writer.finish()?;
2984 :
2985 : // Test read before rel creation. Should error out.
2986 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2987 :
2988 : // Read block beyond end of relation at different points in time.
2989 : // These reads should fall into different delta, image, and in-memory layers.
2990 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2991 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2992 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2993 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2994 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2995 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2996 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2997 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2998 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2999 :
3000 : // Test on an in-memory layer with no preceding layer
3001 : let mut writer = tline.begin_record(Lsn(0x70));
3002 : walingest.put_rel_page_image(
3003 : &mut writer,
3004 : TESTREL_B,
3005 : 0,
3006 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3007 : )?;
3008 : writer.finish()?;
3009 :
3010 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3011 :
3012 : Ok(())
3013 : }
3014 : */
3015 : }
|