Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::aux_file;
11 : use crate::context::RequestContext;
12 : use crate::keyspace::{KeySpace, KeySpaceAccum};
13 : use crate::metrics::{
14 : RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
15 : };
16 : use crate::span::{
17 : debug_assert_current_span_has_tenant_and_timeline_id,
18 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
19 : };
20 : use crate::tenant::storage_layer::IoConcurrency;
21 : use crate::tenant::timeline::GetVectoredError;
22 : use anyhow::{ensure, Context};
23 : use bytes::{Buf, Bytes, BytesMut};
24 : use enum_map::Enum;
25 : use itertools::Itertools;
26 : use pageserver_api::key::Key;
27 : use pageserver_api::key::{
28 : dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
29 : relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
30 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
31 : CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
32 : };
33 : use pageserver_api::keyspace::SparseKeySpace;
34 : use pageserver_api::record::NeonWalRecord;
35 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
36 : use pageserver_api::shard::ShardIdentity;
37 : use pageserver_api::value::Value;
38 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
39 : use postgres_ffi::BLCKSZ;
40 : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
41 : use serde::{Deserialize, Serialize};
42 : use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
43 : use std::ops::ControlFlow;
44 : use std::ops::Range;
45 : use strum::IntoEnumIterator;
46 : use tokio_util::sync::CancellationToken;
47 : use tracing::{debug, trace, warn};
48 : use utils::bin_ser::DeserializeError;
49 : use utils::pausable_failpoint;
50 : use utils::{bin_ser::BeSer, lsn::Lsn};
51 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
52 :
53 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
54 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
55 :
56 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
57 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
58 :
59 : #[derive(Debug)]
60 : pub enum LsnForTimestamp {
61 : /// Found commits both before and after the given timestamp
62 : Present(Lsn),
63 :
64 : /// Found no commits after the given timestamp, this means
65 : /// that the newest data in the branch is older than the given
66 : /// timestamp.
67 : ///
68 : /// All commits <= LSN happened before the given timestamp
69 : Future(Lsn),
70 :
71 : /// The queried timestamp is past our horizon we look back at (PITR)
72 : ///
73 : /// All commits > LSN happened after the given timestamp,
74 : /// but any commits < LSN might have happened before or after
75 : /// the given timestamp. We don't know because no data before
76 : /// the given lsn is available.
77 : Past(Lsn),
78 :
79 : /// We have found no commit with a timestamp,
80 : /// so we can't return anything meaningful.
81 : ///
82 : /// The associated LSN is the lower bound value we can safely
83 : /// create branches on, but no statement is made if it is
84 : /// older or newer than the timestamp.
85 : ///
86 : /// This variant can e.g. be returned right after a
87 : /// cluster import.
88 : NoData(Lsn),
89 : }
90 :
91 : #[derive(Debug, thiserror::Error)]
92 : pub(crate) enum CalculateLogicalSizeError {
93 : #[error("cancelled")]
94 : Cancelled,
95 :
96 : /// Something went wrong while reading the metadata we use to calculate logical size
97 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
98 : /// in the `From` implementation for this variant.
99 : #[error(transparent)]
100 : PageRead(PageReconstructError),
101 :
102 : /// Something went wrong deserializing metadata that we read to calculate logical size
103 : #[error("decode error: {0}")]
104 : Decode(#[from] DeserializeError),
105 : }
106 :
107 : #[derive(Debug, thiserror::Error)]
108 : pub(crate) enum CollectKeySpaceError {
109 : #[error(transparent)]
110 : Decode(#[from] DeserializeError),
111 : #[error(transparent)]
112 : PageRead(PageReconstructError),
113 : #[error("cancelled")]
114 : Cancelled,
115 : }
116 :
117 : impl From<PageReconstructError> for CollectKeySpaceError {
118 0 : fn from(err: PageReconstructError) -> Self {
119 0 : match err {
120 0 : PageReconstructError::Cancelled => Self::Cancelled,
121 0 : err => Self::PageRead(err),
122 : }
123 0 : }
124 : }
125 :
126 : impl From<PageReconstructError> for CalculateLogicalSizeError {
127 0 : fn from(pre: PageReconstructError) -> Self {
128 0 : match pre {
129 0 : PageReconstructError::Cancelled => Self::Cancelled,
130 0 : _ => Self::PageRead(pre),
131 : }
132 0 : }
133 : }
134 :
135 : #[derive(Debug, thiserror::Error)]
136 : pub enum RelationError {
137 : #[error("Relation Already Exists")]
138 : AlreadyExists,
139 : #[error("invalid relnode")]
140 : InvalidRelnode,
141 : #[error(transparent)]
142 : Other(#[from] anyhow::Error),
143 : }
144 :
145 : ///
146 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
147 : /// and other special kinds of files, in a versioned key-value store. The
148 : /// Timeline struct provides the key-value store.
149 : ///
150 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
151 : /// implementation, and might be moved into a separate struct later.
152 : impl Timeline {
153 : /// Start ingesting a WAL record, or other atomic modification of
154 : /// the timeline.
155 : ///
156 : /// This provides a transaction-like interface to perform a bunch
157 : /// of modifications atomically.
158 : ///
159 : /// To ingest a WAL record, call begin_modification(lsn) to get a
160 : /// DatadirModification object. Use the functions in the object to
161 : /// modify the repository state, updating all the pages and metadata
162 : /// that the WAL record affects. When you're done, call commit() to
163 : /// commit the changes.
164 : ///
165 : /// Lsn stored in modification is advanced by `ingest_record` and
166 : /// is used by `commit()` to update `last_record_lsn`.
167 : ///
168 : /// Calling commit() will flush all the changes and reset the state,
169 : /// so the `DatadirModification` struct can be reused to perform the next modification.
170 : ///
171 : /// Note that any pending modifications you make through the
172 : /// modification object won't be visible to calls to the 'get' and list
173 : /// functions of the timeline until you finish! And if you update the
174 : /// same page twice, the last update wins.
175 : ///
176 536820 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
177 536820 : where
178 536820 : Self: Sized,
179 536820 : {
180 536820 : DatadirModification {
181 536820 : tline: self,
182 536820 : pending_lsns: Vec::new(),
183 536820 : pending_metadata_pages: HashMap::new(),
184 536820 : pending_data_batch: None,
185 536820 : pending_deletions: Vec::new(),
186 536820 : pending_nblocks: 0,
187 536820 : pending_directory_entries: Vec::new(),
188 536820 : pending_metadata_bytes: 0,
189 536820 : lsn,
190 536820 : }
191 536820 : }
192 :
193 : //------------------------------------------------------------------------------
194 : // Public GET functions
195 : //------------------------------------------------------------------------------
196 :
197 : /// Look up given page version.
198 36768 : pub(crate) async fn get_rel_page_at_lsn(
199 36768 : &self,
200 36768 : tag: RelTag,
201 36768 : blknum: BlockNumber,
202 36768 : version: Version<'_>,
203 36768 : ctx: &RequestContext,
204 36768 : io_concurrency: IoConcurrency,
205 36768 : ) -> Result<Bytes, PageReconstructError> {
206 36768 : match version {
207 36768 : Version::Lsn(effective_lsn) => {
208 36768 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
209 36768 : let res = self
210 36768 : .get_rel_page_at_lsn_batched(
211 36768 : pages.iter().map(|(tag, blknum)| (tag, blknum)),
212 36768 : effective_lsn,
213 36768 : io_concurrency.clone(),
214 36768 : ctx,
215 36768 : )
216 36768 : .await;
217 36768 : assert_eq!(res.len(), 1);
218 36768 : res.into_iter().next().unwrap()
219 : }
220 0 : Version::Modified(modification) => {
221 0 : if tag.relnode == 0 {
222 0 : return Err(PageReconstructError::Other(
223 0 : RelationError::InvalidRelnode.into(),
224 0 : ));
225 0 : }
226 :
227 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
228 0 : if blknum >= nblocks {
229 0 : debug!(
230 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
231 0 : tag,
232 0 : blknum,
233 0 : version.get_lsn(),
234 : nblocks
235 : );
236 0 : return Ok(ZERO_PAGE.clone());
237 0 : }
238 0 :
239 0 : let key = rel_block_to_key(tag, blknum);
240 0 : modification.get(key, ctx).await
241 : }
242 : }
243 36768 : }
244 :
245 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
246 : ///
247 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
248 36768 : pub(crate) async fn get_rel_page_at_lsn_batched(
249 36768 : &self,
250 36768 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
251 36768 : effective_lsn: Lsn,
252 36768 : io_concurrency: IoConcurrency,
253 36768 : ctx: &RequestContext,
254 36768 : ) -> Vec<Result<Bytes, PageReconstructError>> {
255 36768 : debug_assert_current_span_has_tenant_and_timeline_id();
256 36768 :
257 36768 : let mut slots_filled = 0;
258 36768 : let page_count = pages.len();
259 36768 :
260 36768 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
261 36768 : let mut result = Vec::with_capacity(pages.len());
262 36768 : let result_slots = result.spare_capacity_mut();
263 36768 :
264 36768 : let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
265 36768 : for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
266 36768 : if tag.relnode == 0 {
267 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
268 0 : RelationError::InvalidRelnode.into(),
269 0 : )));
270 0 :
271 0 : slots_filled += 1;
272 0 : continue;
273 36768 : }
274 :
275 36768 : let nblocks = match self
276 36768 : .get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
277 36768 : .await
278 : {
279 36768 : Ok(nblocks) => nblocks,
280 0 : Err(err) => {
281 0 : result_slots[response_slot_idx].write(Err(err));
282 0 : slots_filled += 1;
283 0 : continue;
284 : }
285 : };
286 :
287 36768 : if *blknum >= nblocks {
288 0 : debug!(
289 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
290 : tag, blknum, effective_lsn, nblocks
291 : );
292 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
293 0 : slots_filled += 1;
294 0 : continue;
295 36768 : }
296 36768 :
297 36768 : let key = rel_block_to_key(*tag, *blknum);
298 36768 :
299 36768 : let key_slots = keys_slots.entry(key).or_default();
300 36768 : key_slots.push(response_slot_idx);
301 : }
302 :
303 36768 : let keyspace = {
304 : // add_key requires monotonicity
305 36768 : let mut acc = KeySpaceAccum::new();
306 36768 : for key in keys_slots
307 36768 : .keys()
308 36768 : // in fact it requires strong monotonicity
309 36768 : .dedup()
310 36768 : {
311 36768 : acc.add_key(*key);
312 36768 : }
313 36768 : acc.to_keyspace()
314 36768 : };
315 36768 :
316 36768 : match self
317 36768 : .get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
318 36768 : .await
319 : {
320 36768 : Ok(results) => {
321 73536 : for (key, res) in results {
322 36768 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
323 36768 : let first_slot = key_slots.next().unwrap();
324 :
325 36768 : for slot in key_slots {
326 0 : let clone = match &res {
327 0 : Ok(buf) => Ok(buf.clone()),
328 0 : Err(err) => Err(match err {
329 : PageReconstructError::Cancelled => {
330 0 : PageReconstructError::Cancelled
331 : }
332 :
333 0 : x @ PageReconstructError::Other(_) |
334 0 : x @ PageReconstructError::AncestorLsnTimeout(_) |
335 0 : x @ PageReconstructError::WalRedo(_) |
336 0 : x @ PageReconstructError::MissingKey(_) => {
337 0 : PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
338 : },
339 : }),
340 : };
341 :
342 0 : result_slots[slot].write(clone);
343 0 : slots_filled += 1;
344 : }
345 :
346 36768 : result_slots[first_slot].write(res);
347 36768 : slots_filled += 1;
348 : }
349 : }
350 0 : Err(err) => {
351 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
352 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
353 0 : for slot in keys_slots.values().flatten() {
354 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
355 : // but without taking ownership of the GetVectoredError
356 0 : let err = match &err {
357 : GetVectoredError::Cancelled => {
358 0 : Err(PageReconstructError::Cancelled)
359 : }
360 : // TODO: restructure get_vectored API to make this error per-key
361 0 : GetVectoredError::MissingKey(err) => {
362 0 : Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}")))
363 : }
364 : // TODO: restructure get_vectored API to make this error per-key
365 0 : GetVectoredError::GetReadyAncestorError(err) => {
366 0 : Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}")))
367 : }
368 : // TODO: restructure get_vectored API to make this error per-key
369 0 : GetVectoredError::Other(err) => {
370 0 : Err(PageReconstructError::Other(
371 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
372 0 : ))
373 : }
374 : // TODO: we can prevent this error class by moving this check into the type system
375 0 : GetVectoredError::InvalidLsn(e) => {
376 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
377 : }
378 : // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
379 : // TODO: we can prevent this error class by moving this check into the type system
380 0 : GetVectoredError::Oversized(err) => {
381 0 : Err(anyhow::anyhow!(
382 0 : "batching oversized: {err:?}"
383 0 : )
384 0 : .into())
385 : }
386 : };
387 :
388 0 : result_slots[*slot].write(err);
389 : }
390 :
391 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
392 0 : }
393 : };
394 :
395 36768 : assert_eq!(slots_filled, page_count);
396 : // SAFETY:
397 : // 1. `result` and any of its uninint members are not read from until this point
398 : // 2. The length below is tracked at run-time and matches the number of requested pages.
399 36768 : unsafe {
400 36768 : result.set_len(page_count);
401 36768 : }
402 36768 :
403 36768 : result
404 36768 : }
405 :
406 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
407 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
408 : /// for pages up to the highest page number it has stored.
409 0 : pub(crate) async fn get_db_size(
410 0 : &self,
411 0 : spcnode: Oid,
412 0 : dbnode: Oid,
413 0 : version: Version<'_>,
414 0 : ctx: &RequestContext,
415 0 : ) -> Result<usize, PageReconstructError> {
416 0 : let mut total_blocks = 0;
417 :
418 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
419 :
420 0 : for rel in rels {
421 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
422 0 : total_blocks += n_blocks as usize;
423 : }
424 0 : Ok(total_blocks)
425 0 : }
426 :
427 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
428 : ///
429 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
430 : /// page number stored in the shard.
431 48868 : pub(crate) async fn get_rel_size(
432 48868 : &self,
433 48868 : tag: RelTag,
434 48868 : version: Version<'_>,
435 48868 : ctx: &RequestContext,
436 48868 : ) -> Result<BlockNumber, PageReconstructError> {
437 48868 : if tag.relnode == 0 {
438 0 : return Err(PageReconstructError::Other(
439 0 : RelationError::InvalidRelnode.into(),
440 0 : ));
441 48868 : }
442 :
443 48868 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
444 38588 : return Ok(nblocks);
445 10280 : }
446 10280 :
447 10280 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
448 0 : && !self.get_rel_exists(tag, version, ctx).await?
449 : {
450 : // FIXME: Postgres sometimes calls smgrcreate() to create
451 : // FSM, and smgrnblocks() on it immediately afterwards,
452 : // without extending it. Tolerate that by claiming that
453 : // any non-existent FSM fork has size 0.
454 0 : return Ok(0);
455 10280 : }
456 10280 :
457 10280 : let key = rel_size_to_key(tag);
458 10280 : let mut buf = version.get(self, key, ctx).await?;
459 10272 : let nblocks = buf.get_u32_le();
460 10272 :
461 10272 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
462 10272 :
463 10272 : Ok(nblocks)
464 48868 : }
465 :
466 : /// Does the relation exist?
467 : ///
468 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
469 : /// the shard stores pages for.
470 12100 : pub(crate) async fn get_rel_exists(
471 12100 : &self,
472 12100 : tag: RelTag,
473 12100 : version: Version<'_>,
474 12100 : ctx: &RequestContext,
475 12100 : ) -> Result<bool, PageReconstructError> {
476 12100 : if tag.relnode == 0 {
477 0 : return Err(PageReconstructError::Other(
478 0 : RelationError::InvalidRelnode.into(),
479 0 : ));
480 12100 : }
481 :
482 : // first try to lookup relation in cache
483 12100 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
484 12064 : return Ok(true);
485 36 : }
486 : // then check if the database was already initialized.
487 : // get_rel_exists can be called before dbdir is created.
488 36 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
489 36 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
490 36 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
491 0 : return Ok(false);
492 36 : }
493 36 : // fetch directory listing
494 36 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
495 36 : let buf = version.get(self, key, ctx).await?;
496 :
497 36 : let dir = RelDirectory::des(&buf)?;
498 36 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
499 12100 : }
500 :
501 : /// Get a list of all existing relations in given tablespace and database.
502 : ///
503 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
504 : /// the shard stores pages for.
505 : ///
506 : /// # Cancel-Safety
507 : ///
508 : /// This method is cancellation-safe.
509 0 : pub(crate) async fn list_rels(
510 0 : &self,
511 0 : spcnode: Oid,
512 0 : dbnode: Oid,
513 0 : version: Version<'_>,
514 0 : ctx: &RequestContext,
515 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
516 0 : // fetch directory listing
517 0 : let key = rel_dir_to_key(spcnode, dbnode);
518 0 : let buf = version.get(self, key, ctx).await?;
519 :
520 0 : let dir = RelDirectory::des(&buf)?;
521 0 : let rels: HashSet<RelTag> =
522 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
523 0 : spcnode,
524 0 : dbnode,
525 0 : relnode: *relnode,
526 0 : forknum: *forknum,
527 0 : }));
528 0 :
529 0 : Ok(rels)
530 0 : }
531 :
532 : /// Get the whole SLRU segment
533 0 : pub(crate) async fn get_slru_segment(
534 0 : &self,
535 0 : kind: SlruKind,
536 0 : segno: u32,
537 0 : lsn: Lsn,
538 0 : ctx: &RequestContext,
539 0 : ) -> Result<Bytes, PageReconstructError> {
540 0 : assert!(self.tenant_shard_id.is_shard_zero());
541 0 : let n_blocks = self
542 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
543 0 : .await?;
544 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
545 0 : for blkno in 0..n_blocks {
546 0 : let block = self
547 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
548 0 : .await?;
549 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
550 : }
551 0 : Ok(segment.freeze())
552 0 : }
553 :
554 : /// Look up given SLRU page version.
555 0 : pub(crate) async fn get_slru_page_at_lsn(
556 0 : &self,
557 0 : kind: SlruKind,
558 0 : segno: u32,
559 0 : blknum: BlockNumber,
560 0 : lsn: Lsn,
561 0 : ctx: &RequestContext,
562 0 : ) -> Result<Bytes, PageReconstructError> {
563 0 : assert!(self.tenant_shard_id.is_shard_zero());
564 0 : let key = slru_block_to_key(kind, segno, blknum);
565 0 : self.get(key, lsn, ctx).await
566 0 : }
567 :
568 : /// Get size of an SLRU segment
569 0 : pub(crate) async fn get_slru_segment_size(
570 0 : &self,
571 0 : kind: SlruKind,
572 0 : segno: u32,
573 0 : version: Version<'_>,
574 0 : ctx: &RequestContext,
575 0 : ) -> Result<BlockNumber, PageReconstructError> {
576 0 : assert!(self.tenant_shard_id.is_shard_zero());
577 0 : let key = slru_segment_size_to_key(kind, segno);
578 0 : let mut buf = version.get(self, key, ctx).await?;
579 0 : Ok(buf.get_u32_le())
580 0 : }
581 :
582 : /// Get size of an SLRU segment
583 0 : pub(crate) async fn get_slru_segment_exists(
584 0 : &self,
585 0 : kind: SlruKind,
586 0 : segno: u32,
587 0 : version: Version<'_>,
588 0 : ctx: &RequestContext,
589 0 : ) -> Result<bool, PageReconstructError> {
590 0 : assert!(self.tenant_shard_id.is_shard_zero());
591 : // fetch directory listing
592 0 : let key = slru_dir_to_key(kind);
593 0 : let buf = version.get(self, key, ctx).await?;
594 :
595 0 : let dir = SlruSegmentDirectory::des(&buf)?;
596 0 : Ok(dir.segments.contains(&segno))
597 0 : }
598 :
599 : /// Locate LSN, such that all transactions that committed before
600 : /// 'search_timestamp' are visible, but nothing newer is.
601 : ///
602 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
603 : /// so it's not well defined which LSN you get if there were multiple commits
604 : /// "in flight" at that point in time.
605 : ///
606 0 : pub(crate) async fn find_lsn_for_timestamp(
607 0 : &self,
608 0 : search_timestamp: TimestampTz,
609 0 : cancel: &CancellationToken,
610 0 : ctx: &RequestContext,
611 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
612 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
613 :
614 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
615 0 : let gc_cutoff_planned = {
616 0 : let gc_info = self.gc_info.read().unwrap();
617 0 : gc_info.min_cutoff()
618 0 : };
619 0 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
620 0 : // but let's be defensive.
621 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
622 0 : // We use this method to figure out the branching LSN for the new branch, but the
623 0 : // GC cutoff could be before the branching point and we cannot create a new branch
624 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
625 0 : // on the safe side.
626 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
627 0 : let max_lsn = self.get_last_record_lsn();
628 0 :
629 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
630 0 : // LSN divided by 8.
631 0 : let mut low = min_lsn.0 / 8;
632 0 : let mut high = max_lsn.0 / 8 + 1;
633 0 :
634 0 : let mut found_smaller = false;
635 0 : let mut found_larger = false;
636 :
637 0 : while low < high {
638 0 : if cancel.is_cancelled() {
639 0 : return Err(PageReconstructError::Cancelled);
640 0 : }
641 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
642 0 : let mid = (high + low) / 2;
643 :
644 0 : let cmp = match self
645 0 : .is_latest_commit_timestamp_ge_than(
646 0 : search_timestamp,
647 0 : Lsn(mid * 8),
648 0 : &mut found_smaller,
649 0 : &mut found_larger,
650 0 : ctx,
651 0 : )
652 0 : .await
653 : {
654 0 : Ok(res) => res,
655 0 : Err(PageReconstructError::MissingKey(e)) => {
656 0 : warn!("Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}", e);
657 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
658 0 : return Ok(LsnForTimestamp::Past(min_lsn));
659 : }
660 0 : Err(e) => return Err(e),
661 : };
662 :
663 0 : if cmp {
664 0 : high = mid;
665 0 : } else {
666 0 : low = mid + 1;
667 0 : }
668 : }
669 :
670 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
671 : // so the LSN of the last commit record before or at `search_timestamp`.
672 : // Remove one from `low` to get `t`.
673 : //
674 : // FIXME: it would be better to get the LSN of the previous commit.
675 : // Otherwise, if you restore to the returned LSN, the database will
676 : // include physical changes from later commits that will be marked
677 : // as aborted, and will need to be vacuumed away.
678 0 : let commit_lsn = Lsn((low - 1) * 8);
679 0 : match (found_smaller, found_larger) {
680 : (false, false) => {
681 : // This can happen if no commit records have been processed yet, e.g.
682 : // just after importing a cluster.
683 0 : Ok(LsnForTimestamp::NoData(min_lsn))
684 : }
685 : (false, true) => {
686 : // Didn't find any commit timestamps smaller than the request
687 0 : Ok(LsnForTimestamp::Past(min_lsn))
688 : }
689 0 : (true, _) if commit_lsn < min_lsn => {
690 0 : // the search above did set found_smaller to true but it never increased the lsn.
691 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
692 0 : // below the min_lsn. We should never do that.
693 0 : Ok(LsnForTimestamp::Past(min_lsn))
694 : }
695 : (true, false) => {
696 : // Only found commits with timestamps smaller than the request.
697 : // It's still a valid case for branch creation, return it.
698 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
699 : // case, anyway.
700 0 : Ok(LsnForTimestamp::Future(commit_lsn))
701 : }
702 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
703 : }
704 0 : }
705 :
706 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
707 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
708 : ///
709 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
710 : /// with a smaller/larger timestamp.
711 : ///
712 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
713 0 : &self,
714 0 : search_timestamp: TimestampTz,
715 0 : probe_lsn: Lsn,
716 0 : found_smaller: &mut bool,
717 0 : found_larger: &mut bool,
718 0 : ctx: &RequestContext,
719 0 : ) -> Result<bool, PageReconstructError> {
720 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
721 0 : if timestamp >= search_timestamp {
722 0 : *found_larger = true;
723 0 : return ControlFlow::Break(true);
724 0 : } else {
725 0 : *found_smaller = true;
726 0 : }
727 0 : ControlFlow::Continue(())
728 0 : })
729 0 : .await
730 0 : }
731 :
732 : /// Obtain the possible timestamp range for the given lsn.
733 : ///
734 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
735 0 : pub(crate) async fn get_timestamp_for_lsn(
736 0 : &self,
737 0 : probe_lsn: Lsn,
738 0 : ctx: &RequestContext,
739 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
740 0 : let mut max: Option<TimestampTz> = None;
741 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
742 0 : if let Some(max_prev) = max {
743 0 : max = Some(max_prev.max(timestamp));
744 0 : } else {
745 0 : max = Some(timestamp);
746 0 : }
747 0 : ControlFlow::Continue(())
748 0 : })
749 0 : .await?;
750 :
751 0 : Ok(max)
752 0 : }
753 :
754 : /// Runs the given function on all the timestamps for a given lsn
755 : ///
756 : /// The return value is either given by the closure, or set to the `Default`
757 : /// impl's output.
758 0 : async fn map_all_timestamps<T: Default>(
759 0 : &self,
760 0 : probe_lsn: Lsn,
761 0 : ctx: &RequestContext,
762 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
763 0 : ) -> Result<T, PageReconstructError> {
764 0 : for segno in self
765 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
766 0 : .await?
767 : {
768 0 : let nblocks = self
769 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
770 0 : .await?;
771 0 : for blknum in (0..nblocks).rev() {
772 0 : let clog_page = self
773 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
774 0 : .await?;
775 :
776 0 : if clog_page.len() == BLCKSZ as usize + 8 {
777 0 : let mut timestamp_bytes = [0u8; 8];
778 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
779 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
780 0 :
781 0 : match f(timestamp) {
782 0 : ControlFlow::Break(b) => return Ok(b),
783 0 : ControlFlow::Continue(()) => (),
784 : }
785 0 : }
786 : }
787 : }
788 0 : Ok(Default::default())
789 0 : }
790 :
791 0 : pub(crate) async fn get_slru_keyspace(
792 0 : &self,
793 0 : version: Version<'_>,
794 0 : ctx: &RequestContext,
795 0 : ) -> Result<KeySpace, PageReconstructError> {
796 0 : let mut accum = KeySpaceAccum::new();
797 :
798 0 : for kind in SlruKind::iter() {
799 0 : let mut segments: Vec<u32> = self
800 0 : .list_slru_segments(kind, version, ctx)
801 0 : .await?
802 0 : .into_iter()
803 0 : .collect();
804 0 : segments.sort_unstable();
805 :
806 0 : for seg in segments {
807 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
808 :
809 0 : accum.add_range(
810 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
811 0 : );
812 : }
813 : }
814 :
815 0 : Ok(accum.to_keyspace())
816 0 : }
817 :
818 : /// Get a list of SLRU segments
819 0 : pub(crate) async fn list_slru_segments(
820 0 : &self,
821 0 : kind: SlruKind,
822 0 : version: Version<'_>,
823 0 : ctx: &RequestContext,
824 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
825 0 : // fetch directory entry
826 0 : let key = slru_dir_to_key(kind);
827 :
828 0 : let buf = version.get(self, key, ctx).await?;
829 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
830 0 : }
831 :
832 0 : pub(crate) async fn get_relmap_file(
833 0 : &self,
834 0 : spcnode: Oid,
835 0 : dbnode: Oid,
836 0 : version: Version<'_>,
837 0 : ctx: &RequestContext,
838 0 : ) -> Result<Bytes, PageReconstructError> {
839 0 : let key = relmap_file_key(spcnode, dbnode);
840 :
841 0 : let buf = version.get(self, key, ctx).await?;
842 0 : Ok(buf)
843 0 : }
844 :
845 644 : pub(crate) async fn list_dbdirs(
846 644 : &self,
847 644 : lsn: Lsn,
848 644 : ctx: &RequestContext,
849 644 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
850 : // fetch directory entry
851 644 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
852 :
853 644 : Ok(DbDirectory::des(&buf)?.dbdirs)
854 644 : }
855 :
856 0 : pub(crate) async fn get_twophase_file(
857 0 : &self,
858 0 : xid: u64,
859 0 : lsn: Lsn,
860 0 : ctx: &RequestContext,
861 0 : ) -> Result<Bytes, PageReconstructError> {
862 0 : let key = twophase_file_key(xid);
863 0 : let buf = self.get(key, lsn, ctx).await?;
864 0 : Ok(buf)
865 0 : }
866 :
867 648 : pub(crate) async fn list_twophase_files(
868 648 : &self,
869 648 : lsn: Lsn,
870 648 : ctx: &RequestContext,
871 648 : ) -> Result<HashSet<u64>, PageReconstructError> {
872 : // fetch directory entry
873 648 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
874 :
875 648 : if self.pg_version >= 17 {
876 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
877 : } else {
878 648 : Ok(TwoPhaseDirectory::des(&buf)?
879 : .xids
880 648 : .iter()
881 648 : .map(|x| u64::from(*x))
882 648 : .collect())
883 : }
884 648 : }
885 :
886 0 : pub(crate) async fn get_control_file(
887 0 : &self,
888 0 : lsn: Lsn,
889 0 : ctx: &RequestContext,
890 0 : ) -> Result<Bytes, PageReconstructError> {
891 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
892 0 : }
893 :
894 24 : pub(crate) async fn get_checkpoint(
895 24 : &self,
896 24 : lsn: Lsn,
897 24 : ctx: &RequestContext,
898 24 : ) -> Result<Bytes, PageReconstructError> {
899 24 : self.get(CHECKPOINT_KEY, lsn, ctx).await
900 24 : }
901 :
902 24 : async fn list_aux_files_v2(
903 24 : &self,
904 24 : lsn: Lsn,
905 24 : ctx: &RequestContext,
906 24 : io_concurrency: IoConcurrency,
907 24 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
908 24 : let kv = self
909 24 : .scan(
910 24 : KeySpace::single(Key::metadata_aux_key_range()),
911 24 : lsn,
912 24 : ctx,
913 24 : io_concurrency,
914 24 : )
915 24 : .await?;
916 24 : let mut result = HashMap::new();
917 24 : let mut sz = 0;
918 60 : for (_, v) in kv {
919 36 : let v = v?;
920 36 : let v = aux_file::decode_file_value_bytes(&v)
921 36 : .context("value decode")
922 36 : .map_err(PageReconstructError::Other)?;
923 68 : for (fname, content) in v {
924 32 : sz += fname.len();
925 32 : sz += content.len();
926 32 : result.insert(fname, content);
927 32 : }
928 : }
929 24 : self.aux_file_size_estimator.on_initial(sz);
930 24 : Ok(result)
931 24 : }
932 :
933 0 : pub(crate) async fn trigger_aux_file_size_computation(
934 0 : &self,
935 0 : lsn: Lsn,
936 0 : ctx: &RequestContext,
937 0 : io_concurrency: IoConcurrency,
938 0 : ) -> Result<(), PageReconstructError> {
939 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
940 0 : Ok(())
941 0 : }
942 :
943 24 : pub(crate) async fn list_aux_files(
944 24 : &self,
945 24 : lsn: Lsn,
946 24 : ctx: &RequestContext,
947 24 : io_concurrency: IoConcurrency,
948 24 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
949 24 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
950 24 : }
951 :
952 0 : pub(crate) async fn get_replorigins(
953 0 : &self,
954 0 : lsn: Lsn,
955 0 : ctx: &RequestContext,
956 0 : io_concurrency: IoConcurrency,
957 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
958 0 : let kv = self
959 0 : .scan(
960 0 : KeySpace::single(repl_origin_key_range()),
961 0 : lsn,
962 0 : ctx,
963 0 : io_concurrency,
964 0 : )
965 0 : .await?;
966 0 : let mut result = HashMap::new();
967 0 : for (k, v) in kv {
968 0 : let v = v?;
969 0 : let origin_id = k.field6 as RepOriginId;
970 0 : let origin_lsn = Lsn::des(&v).unwrap();
971 0 : if origin_lsn != Lsn::INVALID {
972 0 : result.insert(origin_id, origin_lsn);
973 0 : }
974 : }
975 0 : Ok(result)
976 0 : }
977 :
978 : /// Does the same as get_current_logical_size but counted on demand.
979 : /// Used to initialize the logical size tracking on startup.
980 : ///
981 : /// Only relation blocks are counted currently. That excludes metadata,
982 : /// SLRUs, twophase files etc.
983 : ///
984 : /// # Cancel-Safety
985 : ///
986 : /// This method is cancellation-safe.
987 0 : pub(crate) async fn get_current_logical_size_non_incremental(
988 0 : &self,
989 0 : lsn: Lsn,
990 0 : ctx: &RequestContext,
991 0 : ) -> Result<u64, CalculateLogicalSizeError> {
992 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
993 :
994 : // Fetch list of database dirs and iterate them
995 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
996 0 : let dbdir = DbDirectory::des(&buf)?;
997 :
998 0 : let mut total_size: u64 = 0;
999 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1000 0 : for rel in self
1001 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
1002 0 : .await?
1003 : {
1004 0 : if self.cancel.is_cancelled() {
1005 0 : return Err(CalculateLogicalSizeError::Cancelled);
1006 0 : }
1007 0 : let relsize_key = rel_size_to_key(rel);
1008 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1009 0 : let relsize = buf.get_u32_le();
1010 0 :
1011 0 : total_size += relsize as u64;
1012 : }
1013 : }
1014 0 : Ok(total_size * BLCKSZ as u64)
1015 0 : }
1016 :
1017 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1018 : /// for gc-compaction.
1019 : ///
1020 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1021 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1022 : /// be kept only for a specific range of LSN.
1023 : ///
1024 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1025 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1026 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1027 : /// determine which keys to retain/drop for gc-compaction.
1028 : ///
1029 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1030 : /// to be retained for each of the branch LSN.
1031 : ///
1032 : /// The return value is (dense keyspace, sparse keyspace).
1033 104 : pub(crate) async fn collect_gc_compaction_keyspace(
1034 104 : &self,
1035 104 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1036 104 : let metadata_key_begin = Key::metadata_key_range().start;
1037 104 : let aux_v1_key = AUX_FILES_KEY;
1038 104 : let dense_keyspace = KeySpace {
1039 104 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1040 104 : };
1041 104 : Ok((
1042 104 : dense_keyspace,
1043 104 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1044 104 : ))
1045 104 : }
1046 :
1047 : ///
1048 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1049 : /// Anything that's not listed maybe removed from the underlying storage (from
1050 : /// that LSN forwards).
1051 : ///
1052 : /// The return value is (dense keyspace, sparse keyspace).
1053 644 : pub(crate) async fn collect_keyspace(
1054 644 : &self,
1055 644 : lsn: Lsn,
1056 644 : ctx: &RequestContext,
1057 644 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1058 644 : // Iterate through key ranges, greedily packing them into partitions
1059 644 : let mut result = KeySpaceAccum::new();
1060 644 :
1061 644 : // The dbdir metadata always exists
1062 644 : result.add_key(DBDIR_KEY);
1063 :
1064 : // Fetch list of database dirs and iterate them
1065 644 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1066 644 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1067 644 :
1068 644 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1069 644 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1070 0 : if has_relmap_file {
1071 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1072 0 : }
1073 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1074 :
1075 0 : let mut rels: Vec<RelTag> = self
1076 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
1077 0 : .await?
1078 0 : .into_iter()
1079 0 : .collect();
1080 0 : rels.sort_unstable();
1081 0 : for rel in rels {
1082 0 : let relsize_key = rel_size_to_key(rel);
1083 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1084 0 : let relsize = buf.get_u32_le();
1085 0 :
1086 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1087 0 : result.add_key(relsize_key);
1088 : }
1089 : }
1090 :
1091 : // Iterate SLRUs next
1092 644 : if self.tenant_shard_id.is_shard_zero() {
1093 1896 : for kind in [
1094 632 : SlruKind::Clog,
1095 632 : SlruKind::MultiXactMembers,
1096 632 : SlruKind::MultiXactOffsets,
1097 : ] {
1098 1896 : let slrudir_key = slru_dir_to_key(kind);
1099 1896 : result.add_key(slrudir_key);
1100 1896 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1101 1896 : let dir = SlruSegmentDirectory::des(&buf)?;
1102 1896 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1103 1896 : segments.sort_unstable();
1104 1896 : for segno in segments {
1105 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1106 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1107 0 : let segsize = buf.get_u32_le();
1108 0 :
1109 0 : result.add_range(
1110 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1111 0 : );
1112 0 : result.add_key(segsize_key);
1113 : }
1114 : }
1115 12 : }
1116 :
1117 : // Then pg_twophase
1118 644 : result.add_key(TWOPHASEDIR_KEY);
1119 :
1120 644 : let mut xids: Vec<u64> = self
1121 644 : .list_twophase_files(lsn, ctx)
1122 644 : .await?
1123 644 : .iter()
1124 644 : .cloned()
1125 644 : .collect();
1126 644 : xids.sort_unstable();
1127 644 : for xid in xids {
1128 0 : result.add_key(twophase_file_key(xid));
1129 0 : }
1130 :
1131 644 : result.add_key(CONTROLFILE_KEY);
1132 644 : result.add_key(CHECKPOINT_KEY);
1133 644 :
1134 644 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1135 644 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1136 644 : // and the keys will not be garbage-colllected.
1137 644 : #[cfg(test)]
1138 644 : {
1139 644 : let guard = self.extra_test_dense_keyspace.load();
1140 644 : for kr in &guard.ranges {
1141 0 : result.add_range(kr.clone());
1142 0 : }
1143 0 : }
1144 0 :
1145 644 : let dense_keyspace = result.to_keyspace();
1146 644 : let sparse_keyspace = SparseKeySpace(KeySpace {
1147 644 : ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
1148 644 : });
1149 644 :
1150 644 : if cfg!(debug_assertions) {
1151 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1152 :
1153 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1154 : // category of sparse keys are split into their own image/delta files. If there
1155 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1156 : // and we want the developer to keep the keyspaces separated.
1157 :
1158 644 : let ranges = &sparse_keyspace.0.ranges;
1159 :
1160 : // TODO: use a single overlaps_with across the codebase
1161 644 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1162 644 : !(a.end <= b.start || b.end <= a.start)
1163 644 : }
1164 1288 : for i in 0..ranges.len() {
1165 1288 : for j in 0..i {
1166 644 : if overlaps_with(&ranges[i], &ranges[j]) {
1167 0 : panic!(
1168 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1169 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1170 0 : );
1171 644 : }
1172 : }
1173 : }
1174 644 : for i in 1..ranges.len() {
1175 644 : assert!(
1176 644 : ranges[i - 1].end <= ranges[i].start,
1177 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1178 0 : ranges[i - 1].start,
1179 0 : ranges[i - 1].end,
1180 0 : ranges[i].start,
1181 0 : ranges[i].end
1182 : );
1183 : }
1184 0 : }
1185 :
1186 644 : Ok((dense_keyspace, sparse_keyspace))
1187 644 : }
1188 :
1189 : /// Get cached size of relation if it not updated after specified LSN
1190 897080 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
1191 897080 : let rel_size_cache = self.rel_size_cache.read().unwrap();
1192 897080 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
1193 897036 : if lsn >= *cached_lsn {
1194 886744 : RELSIZE_CACHE_HITS.inc();
1195 886744 : return Some(*nblocks);
1196 10292 : }
1197 10292 : RELSIZE_CACHE_MISSES_OLD.inc();
1198 44 : }
1199 10336 : RELSIZE_CACHE_MISSES.inc();
1200 10336 : None
1201 897080 : }
1202 :
1203 : /// Update cached relation size if there is no more recent update
1204 10272 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1205 10272 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1206 10272 :
1207 10272 : if lsn < rel_size_cache.complete_as_of {
1208 : // Do not cache old values. It's safe to cache the size on read, as long as
1209 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1210 : // never evict values from the cache, so if the relation size changed after
1211 : // 'lsn', the new value is already in the cache.
1212 0 : return;
1213 10272 : }
1214 10272 :
1215 10272 : match rel_size_cache.map.entry(tag) {
1216 10272 : hash_map::Entry::Occupied(mut entry) => {
1217 10272 : let cached_lsn = entry.get_mut();
1218 10272 : if lsn >= cached_lsn.0 {
1219 0 : *cached_lsn = (lsn, nblocks);
1220 10272 : }
1221 : }
1222 0 : hash_map::Entry::Vacant(entry) => {
1223 0 : entry.insert((lsn, nblocks));
1224 0 : RELSIZE_CACHE_ENTRIES.inc();
1225 0 : }
1226 : }
1227 10272 : }
1228 :
1229 : /// Store cached relation size
1230 565440 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1231 565440 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1232 565440 : if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
1233 3840 : RELSIZE_CACHE_ENTRIES.inc();
1234 561600 : }
1235 565440 : }
1236 :
1237 : /// Remove cached relation size
1238 4 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1239 4 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1240 4 : if rel_size_cache.map.remove(tag).is_some() {
1241 4 : RELSIZE_CACHE_ENTRIES.dec();
1242 4 : }
1243 4 : }
1244 : }
1245 :
1246 : /// DatadirModification represents an operation to ingest an atomic set of
1247 : /// updates to the repository.
1248 : ///
1249 : /// It is created by the 'begin_record' function. It is called for each WAL
1250 : /// record, so that all the modifications by a one WAL record appear atomic.
1251 : pub struct DatadirModification<'a> {
1252 : /// The timeline this modification applies to. You can access this to
1253 : /// read the state, but note that any pending updates are *not* reflected
1254 : /// in the state in 'tline' yet.
1255 : pub tline: &'a Timeline,
1256 :
1257 : /// Current LSN of the modification
1258 : lsn: Lsn,
1259 :
1260 : // The modifications are not applied directly to the underlying key-value store.
1261 : // The put-functions add the modifications here, and they are flushed to the
1262 : // underlying key-value store by the 'finish' function.
1263 : pending_lsns: Vec<Lsn>,
1264 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1265 : pending_nblocks: i64,
1266 :
1267 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1268 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1269 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1270 :
1271 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1272 : /// which keys are stored here.
1273 : pending_data_batch: Option<SerializedValueBatch>,
1274 :
1275 : /// For special "directory" keys that store key-value maps, track the size of the map
1276 : /// if it was updated in this modification.
1277 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
1278 :
1279 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1280 : pending_metadata_bytes: usize,
1281 : }
1282 :
1283 : impl DatadirModification<'_> {
1284 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1285 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1286 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1287 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1288 :
1289 : /// Get the current lsn
1290 836116 : pub(crate) fn get_lsn(&self) -> Lsn {
1291 836116 : self.lsn
1292 836116 : }
1293 :
1294 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1295 0 : self.pending_data_batch
1296 0 : .as_ref()
1297 0 : .map_or(0, |b| b.buffer_size())
1298 0 : + self.pending_metadata_bytes
1299 0 : }
1300 :
1301 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1302 0 : self.pending_data_batch
1303 0 : .as_ref()
1304 0 : .is_some_and(|b| b.has_data())
1305 0 : }
1306 :
1307 : /// Returns statistics about the currently pending modifications.
1308 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1309 0 : let mut stats = DatadirModificationStats::default();
1310 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1311 0 : match value {
1312 0 : Value::Image(_) => stats.metadata_images += 1,
1313 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1314 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1315 : }
1316 : }
1317 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1318 0 : match valuemeta {
1319 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1320 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1321 0 : ValueMeta::Observed(_) => {}
1322 : }
1323 : }
1324 0 : stats
1325 0 : }
1326 :
1327 : /// Set the current lsn
1328 291716 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1329 291716 : ensure!(
1330 291716 : lsn >= self.lsn,
1331 0 : "setting an older lsn {} than {} is not allowed",
1332 : lsn,
1333 : self.lsn
1334 : );
1335 :
1336 291716 : if lsn > self.lsn {
1337 291716 : self.pending_lsns.push(self.lsn);
1338 291716 : self.lsn = lsn;
1339 291716 : }
1340 291716 : Ok(())
1341 291716 : }
1342 :
1343 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1344 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1345 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1346 : ///
1347 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1348 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1349 : /// via [`Self::get`] as soon as their record has been ingested.
1350 1701248 : fn is_data_key(key: &Key) -> bool {
1351 1701248 : key.is_rel_block_key() || key.is_slru_block_key()
1352 1701248 : }
1353 :
1354 : /// Initialize a completely new repository.
1355 : ///
1356 : /// This inserts the directory metadata entries that are assumed to
1357 : /// always exist.
1358 416 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1359 416 : let buf = DbDirectory::ser(&DbDirectory {
1360 416 : dbdirs: HashMap::new(),
1361 416 : })?;
1362 416 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
1363 416 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1364 :
1365 416 : let buf = if self.tline.pg_version >= 17 {
1366 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1367 0 : xids: HashSet::new(),
1368 0 : })
1369 : } else {
1370 416 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1371 416 : xids: HashSet::new(),
1372 416 : })
1373 0 : }?;
1374 416 : self.pending_directory_entries
1375 416 : .push((DirectoryKind::TwoPhase, 0));
1376 416 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1377 :
1378 416 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1379 416 : let empty_dir = Value::Image(buf);
1380 416 :
1381 416 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1382 416 : // harmless but they'd just be dropped on later compaction.
1383 416 : if self.tline.tenant_shard_id.is_shard_zero() {
1384 404 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1385 404 : self.pending_directory_entries
1386 404 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1387 404 : self.put(
1388 404 : slru_dir_to_key(SlruKind::MultiXactMembers),
1389 404 : empty_dir.clone(),
1390 404 : );
1391 404 : self.pending_directory_entries
1392 404 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1393 404 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1394 404 : self.pending_directory_entries
1395 404 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1396 404 : }
1397 :
1398 416 : Ok(())
1399 416 : }
1400 :
1401 : #[cfg(test)]
1402 412 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1403 412 : self.init_empty()?;
1404 412 : self.put_control_file(bytes::Bytes::from_static(
1405 412 : b"control_file contents do not matter",
1406 412 : ))
1407 412 : .context("put_control_file")?;
1408 412 : self.put_checkpoint(bytes::Bytes::from_static(
1409 412 : b"checkpoint_file contents do not matter",
1410 412 : ))
1411 412 : .context("put_checkpoint_file")?;
1412 412 : Ok(())
1413 412 : }
1414 :
1415 : /// Creates a relation if it is not already present.
1416 : /// Returns the current size of the relation
1417 836112 : pub(crate) async fn create_relation_if_required(
1418 836112 : &mut self,
1419 836112 : rel: RelTag,
1420 836112 : ctx: &RequestContext,
1421 836112 : ) -> Result<u32, PageReconstructError> {
1422 : // Get current size and put rel creation if rel doesn't exist
1423 : //
1424 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1425 : // check the cache too. This is because eagerly checking the cache results in
1426 : // less work overall and 10% better performance. It's more work on cache miss
1427 : // but cache miss is rare.
1428 836112 : if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
1429 836092 : Ok(nblocks)
1430 20 : } else if !self
1431 20 : .tline
1432 20 : .get_rel_exists(rel, Version::Modified(self), ctx)
1433 20 : .await?
1434 : {
1435 : // create it with 0 size initially, the logic below will extend it
1436 20 : self.put_rel_creation(rel, 0, ctx)
1437 20 : .await
1438 20 : .context("Relation Error")?;
1439 20 : Ok(0)
1440 : } else {
1441 0 : self.tline
1442 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1443 0 : .await
1444 : }
1445 836112 : }
1446 :
1447 : /// Given a block number for a relation (which represents a newly written block),
1448 : /// the previous block count of the relation, and the shard info, find the gaps
1449 : /// that were created by the newly written block if any.
1450 291340 : fn find_gaps(
1451 291340 : rel: RelTag,
1452 291340 : blkno: u32,
1453 291340 : previous_nblocks: u32,
1454 291340 : shard: &ShardIdentity,
1455 291340 : ) -> Option<KeySpace> {
1456 291340 : let mut key = rel_block_to_key(rel, blkno);
1457 291340 : let mut gap_accum = None;
1458 :
1459 291340 : for gap_blkno in previous_nblocks..blkno {
1460 64 : key.field6 = gap_blkno;
1461 64 :
1462 64 : if shard.get_shard_number(&key) != shard.number {
1463 16 : continue;
1464 48 : }
1465 48 :
1466 48 : gap_accum
1467 48 : .get_or_insert_with(KeySpaceAccum::new)
1468 48 : .add_key(key);
1469 : }
1470 :
1471 291340 : gap_accum.map(|accum| accum.to_keyspace())
1472 291340 : }
1473 :
1474 291704 : pub async fn ingest_batch(
1475 291704 : &mut self,
1476 291704 : mut batch: SerializedValueBatch,
1477 291704 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1478 291704 : shard: &ShardIdentity,
1479 291704 : ctx: &RequestContext,
1480 291704 : ) -> anyhow::Result<()> {
1481 291704 : let mut gaps_at_lsns = Vec::default();
1482 :
1483 291704 : for meta in batch.metadata.iter() {
1484 291284 : let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
1485 291284 : let new_nblocks = blkno + 1;
1486 :
1487 291284 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1488 291284 : if new_nblocks > old_nblocks {
1489 4780 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1490 286504 : }
1491 :
1492 291284 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1493 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1494 291284 : }
1495 : }
1496 :
1497 291704 : if !gaps_at_lsns.is_empty() {
1498 0 : batch.zero_gaps(gaps_at_lsns);
1499 291704 : }
1500 :
1501 291704 : match self.pending_data_batch.as_mut() {
1502 40 : Some(pending_batch) => {
1503 40 : pending_batch.extend(batch);
1504 40 : }
1505 291664 : None if batch.has_data() => {
1506 291260 : self.pending_data_batch = Some(batch);
1507 291260 : }
1508 404 : None => {
1509 404 : // Nothing to initialize the batch with
1510 404 : }
1511 : }
1512 :
1513 291704 : Ok(())
1514 291704 : }
1515 :
1516 : /// Put a new page version that can be constructed from a WAL record
1517 : ///
1518 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1519 : /// current end-of-file. It's up to the caller to check that the relation size
1520 : /// matches the blocks inserted!
1521 24 : pub fn put_rel_wal_record(
1522 24 : &mut self,
1523 24 : rel: RelTag,
1524 24 : blknum: BlockNumber,
1525 24 : rec: NeonWalRecord,
1526 24 : ) -> anyhow::Result<()> {
1527 24 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1528 24 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1529 24 : Ok(())
1530 24 : }
1531 :
1532 : // Same, but for an SLRU.
1533 16 : pub fn put_slru_wal_record(
1534 16 : &mut self,
1535 16 : kind: SlruKind,
1536 16 : segno: u32,
1537 16 : blknum: BlockNumber,
1538 16 : rec: NeonWalRecord,
1539 16 : ) -> anyhow::Result<()> {
1540 16 : if !self.tline.tenant_shard_id.is_shard_zero() {
1541 0 : return Ok(());
1542 16 : }
1543 16 :
1544 16 : self.put(
1545 16 : slru_block_to_key(kind, segno, blknum),
1546 16 : Value::WalRecord(rec),
1547 16 : );
1548 16 : Ok(())
1549 16 : }
1550 :
1551 : /// Like put_wal_record, but with ready-made image of the page.
1552 555684 : pub fn put_rel_page_image(
1553 555684 : &mut self,
1554 555684 : rel: RelTag,
1555 555684 : blknum: BlockNumber,
1556 555684 : img: Bytes,
1557 555684 : ) -> anyhow::Result<()> {
1558 555684 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1559 555684 : let key = rel_block_to_key(rel, blknum);
1560 555684 : if !key.is_valid_key_on_write_path() {
1561 0 : anyhow::bail!(
1562 0 : "the request contains data not supported by pageserver at {}",
1563 0 : key
1564 0 : );
1565 555684 : }
1566 555684 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1567 555684 : Ok(())
1568 555684 : }
1569 :
1570 12 : pub fn put_slru_page_image(
1571 12 : &mut self,
1572 12 : kind: SlruKind,
1573 12 : segno: u32,
1574 12 : blknum: BlockNumber,
1575 12 : img: Bytes,
1576 12 : ) -> anyhow::Result<()> {
1577 12 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1578 :
1579 12 : let key = slru_block_to_key(kind, segno, blknum);
1580 12 : if !key.is_valid_key_on_write_path() {
1581 0 : anyhow::bail!(
1582 0 : "the request contains data not supported by pageserver at {}",
1583 0 : key
1584 0 : );
1585 12 : }
1586 12 : self.put(key, Value::Image(img));
1587 12 : Ok(())
1588 12 : }
1589 :
1590 5996 : pub(crate) fn put_rel_page_image_zero(
1591 5996 : &mut self,
1592 5996 : rel: RelTag,
1593 5996 : blknum: BlockNumber,
1594 5996 : ) -> anyhow::Result<()> {
1595 5996 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1596 5996 : let key = rel_block_to_key(rel, blknum);
1597 5996 : if !key.is_valid_key_on_write_path() {
1598 0 : anyhow::bail!(
1599 0 : "the request contains data not supported by pageserver: {} @ {}",
1600 0 : key,
1601 0 : self.lsn
1602 0 : );
1603 5996 : }
1604 5996 :
1605 5996 : let batch = self
1606 5996 : .pending_data_batch
1607 5996 : .get_or_insert_with(SerializedValueBatch::default);
1608 5996 :
1609 5996 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1610 5996 :
1611 5996 : Ok(())
1612 5996 : }
1613 :
1614 0 : pub(crate) fn put_slru_page_image_zero(
1615 0 : &mut self,
1616 0 : kind: SlruKind,
1617 0 : segno: u32,
1618 0 : blknum: BlockNumber,
1619 0 : ) -> anyhow::Result<()> {
1620 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1621 0 : let key = slru_block_to_key(kind, segno, blknum);
1622 0 : if !key.is_valid_key_on_write_path() {
1623 0 : anyhow::bail!(
1624 0 : "the request contains data not supported by pageserver: {} @ {}",
1625 0 : key,
1626 0 : self.lsn
1627 0 : );
1628 0 : }
1629 0 :
1630 0 : let batch = self
1631 0 : .pending_data_batch
1632 0 : .get_or_insert_with(SerializedValueBatch::default);
1633 0 :
1634 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1635 0 :
1636 0 : Ok(())
1637 0 : }
1638 :
1639 : /// Store a relmapper file (pg_filenode.map) in the repository
1640 32 : pub async fn put_relmap_file(
1641 32 : &mut self,
1642 32 : spcnode: Oid,
1643 32 : dbnode: Oid,
1644 32 : img: Bytes,
1645 32 : ctx: &RequestContext,
1646 32 : ) -> anyhow::Result<()> {
1647 : // Add it to the directory (if it doesn't exist already)
1648 32 : let buf = self.get(DBDIR_KEY, ctx).await?;
1649 32 : let mut dbdir = DbDirectory::des(&buf)?;
1650 :
1651 32 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1652 32 : if r.is_none() || r == Some(false) {
1653 : // The dbdir entry didn't exist, or it contained a
1654 : // 'false'. The 'insert' call already updated it with
1655 : // 'true', now write the updated 'dbdirs' map back.
1656 32 : let buf = DbDirectory::ser(&dbdir)?;
1657 32 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1658 0 : }
1659 32 : if r.is_none() {
1660 : // Create RelDirectory
1661 16 : let buf = RelDirectory::ser(&RelDirectory {
1662 16 : rels: HashSet::new(),
1663 16 : })?;
1664 16 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1665 16 : self.put(
1666 16 : rel_dir_to_key(spcnode, dbnode),
1667 16 : Value::Image(Bytes::from(buf)),
1668 16 : );
1669 16 : }
1670 :
1671 32 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1672 32 : Ok(())
1673 32 : }
1674 :
1675 0 : pub async fn put_twophase_file(
1676 0 : &mut self,
1677 0 : xid: u64,
1678 0 : img: Bytes,
1679 0 : ctx: &RequestContext,
1680 0 : ) -> anyhow::Result<()> {
1681 : // Add it to the directory entry
1682 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1683 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1684 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1685 0 : if !dir.xids.insert(xid) {
1686 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1687 0 : }
1688 0 : self.pending_directory_entries
1689 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1690 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1691 : } else {
1692 0 : let xid = xid as u32;
1693 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1694 0 : if !dir.xids.insert(xid) {
1695 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1696 0 : }
1697 0 : self.pending_directory_entries
1698 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1699 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1700 : };
1701 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1702 0 :
1703 0 : self.put(twophase_file_key(xid), Value::Image(img));
1704 0 : Ok(())
1705 0 : }
1706 :
1707 0 : pub async fn set_replorigin(
1708 0 : &mut self,
1709 0 : origin_id: RepOriginId,
1710 0 : origin_lsn: Lsn,
1711 0 : ) -> anyhow::Result<()> {
1712 0 : let key = repl_origin_key(origin_id);
1713 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1714 0 : Ok(())
1715 0 : }
1716 :
1717 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1718 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1719 0 : }
1720 :
1721 416 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1722 416 : self.put(CONTROLFILE_KEY, Value::Image(img));
1723 416 : Ok(())
1724 416 : }
1725 :
1726 444 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1727 444 : self.put(CHECKPOINT_KEY, Value::Image(img));
1728 444 : Ok(())
1729 444 : }
1730 :
1731 0 : pub async fn drop_dbdir(
1732 0 : &mut self,
1733 0 : spcnode: Oid,
1734 0 : dbnode: Oid,
1735 0 : ctx: &RequestContext,
1736 0 : ) -> anyhow::Result<()> {
1737 0 : let total_blocks = self
1738 0 : .tline
1739 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1740 0 : .await?;
1741 :
1742 : // Remove entry from dbdir
1743 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1744 0 : let mut dir = DbDirectory::des(&buf)?;
1745 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1746 0 : let buf = DbDirectory::ser(&dir)?;
1747 0 : self.pending_directory_entries
1748 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1749 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1750 : } else {
1751 0 : warn!(
1752 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1753 : spcnode, dbnode
1754 : );
1755 : }
1756 :
1757 : // Update logical database size.
1758 0 : self.pending_nblocks -= total_blocks as i64;
1759 0 :
1760 0 : // Delete all relations and metadata files for the spcnode/dnode
1761 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1762 0 : Ok(())
1763 0 : }
1764 :
1765 : /// Create a relation fork.
1766 : ///
1767 : /// 'nblocks' is the initial size.
1768 3840 : pub async fn put_rel_creation(
1769 3840 : &mut self,
1770 3840 : rel: RelTag,
1771 3840 : nblocks: BlockNumber,
1772 3840 : ctx: &RequestContext,
1773 3840 : ) -> Result<(), RelationError> {
1774 3840 : if rel.relnode == 0 {
1775 0 : return Err(RelationError::InvalidRelnode);
1776 3840 : }
1777 : // It's possible that this is the first rel for this db in this
1778 : // tablespace. Create the reldir entry for it if so.
1779 3840 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1780 3840 : .context("deserialize db")?;
1781 3840 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1782 3840 : let mut rel_dir =
1783 3840 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1784 : // Didn't exist. Update dbdir
1785 16 : e.insert(false);
1786 16 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1787 16 : self.pending_directory_entries
1788 16 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1789 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1790 16 :
1791 16 : // and create the RelDirectory
1792 16 : RelDirectory::default()
1793 : } else {
1794 : // reldir already exists, fetch it
1795 3824 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1796 3824 : .context("deserialize db")?
1797 : };
1798 :
1799 : // Add the new relation to the rel directory entry, and write it back
1800 3840 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1801 0 : return Err(RelationError::AlreadyExists);
1802 3840 : }
1803 3840 :
1804 3840 : self.pending_directory_entries
1805 3840 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1806 3840 :
1807 3840 : self.put(
1808 3840 : rel_dir_key,
1809 3840 : Value::Image(Bytes::from(
1810 3840 : RelDirectory::ser(&rel_dir).context("serialize")?,
1811 : )),
1812 : );
1813 :
1814 : // Put size
1815 3840 : let size_key = rel_size_to_key(rel);
1816 3840 : let buf = nblocks.to_le_bytes();
1817 3840 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1818 3840 :
1819 3840 : self.pending_nblocks += nblocks as i64;
1820 3840 :
1821 3840 : // Update relation size cache
1822 3840 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1823 3840 :
1824 3840 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1825 3840 : // caller.
1826 3840 : Ok(())
1827 3840 : }
1828 :
1829 : /// Truncate relation
1830 12024 : pub async fn put_rel_truncation(
1831 12024 : &mut self,
1832 12024 : rel: RelTag,
1833 12024 : nblocks: BlockNumber,
1834 12024 : ctx: &RequestContext,
1835 12024 : ) -> anyhow::Result<()> {
1836 12024 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1837 12024 : if self
1838 12024 : .tline
1839 12024 : .get_rel_exists(rel, Version::Modified(self), ctx)
1840 12024 : .await?
1841 : {
1842 12024 : let size_key = rel_size_to_key(rel);
1843 : // Fetch the old size first
1844 12024 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1845 12024 :
1846 12024 : // Update the entry with the new size.
1847 12024 : let buf = nblocks.to_le_bytes();
1848 12024 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1849 12024 :
1850 12024 : // Update relation size cache
1851 12024 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1852 12024 :
1853 12024 : // Update logical database size.
1854 12024 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1855 0 : }
1856 12024 : Ok(())
1857 12024 : }
1858 :
1859 : /// Extend relation
1860 : /// If new size is smaller, do nothing.
1861 553360 : pub async fn put_rel_extend(
1862 553360 : &mut self,
1863 553360 : rel: RelTag,
1864 553360 : nblocks: BlockNumber,
1865 553360 : ctx: &RequestContext,
1866 553360 : ) -> anyhow::Result<()> {
1867 553360 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1868 :
1869 : // Put size
1870 553360 : let size_key = rel_size_to_key(rel);
1871 553360 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1872 553360 :
1873 553360 : // only extend relation here. never decrease the size
1874 553360 : if nblocks > old_size {
1875 549576 : let buf = nblocks.to_le_bytes();
1876 549576 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1877 549576 :
1878 549576 : // Update relation size cache
1879 549576 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1880 549576 :
1881 549576 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1882 549576 : }
1883 553360 : Ok(())
1884 553360 : }
1885 :
1886 : /// Drop some relations
1887 20 : pub(crate) async fn put_rel_drops(
1888 20 : &mut self,
1889 20 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
1890 20 : ctx: &RequestContext,
1891 20 : ) -> anyhow::Result<()> {
1892 24 : for ((spc_node, db_node), rel_tags) in drop_relations {
1893 4 : let dir_key = rel_dir_to_key(spc_node, db_node);
1894 4 : let buf = self.get(dir_key, ctx).await?;
1895 4 : let mut dir = RelDirectory::des(&buf)?;
1896 :
1897 4 : let mut dirty = false;
1898 8 : for rel_tag in rel_tags {
1899 4 : if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
1900 4 : dirty = true;
1901 4 :
1902 4 : // update logical size
1903 4 : let size_key = rel_size_to_key(rel_tag);
1904 4 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1905 4 : self.pending_nblocks -= old_size as i64;
1906 4 :
1907 4 : // Remove entry from relation size cache
1908 4 : self.tline.remove_cached_rel_size(&rel_tag);
1909 4 :
1910 4 : // Delete size entry, as well as all blocks
1911 4 : self.delete(rel_key_range(rel_tag));
1912 0 : }
1913 : }
1914 :
1915 4 : if dirty {
1916 4 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1917 4 : self.pending_directory_entries
1918 4 : .push((DirectoryKind::Rel, dir.rels.len()));
1919 0 : }
1920 : }
1921 :
1922 20 : Ok(())
1923 20 : }
1924 :
1925 12 : pub async fn put_slru_segment_creation(
1926 12 : &mut self,
1927 12 : kind: SlruKind,
1928 12 : segno: u32,
1929 12 : nblocks: BlockNumber,
1930 12 : ctx: &RequestContext,
1931 12 : ) -> anyhow::Result<()> {
1932 12 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1933 :
1934 : // Add it to the directory entry
1935 12 : let dir_key = slru_dir_to_key(kind);
1936 12 : let buf = self.get(dir_key, ctx).await?;
1937 12 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1938 :
1939 12 : if !dir.segments.insert(segno) {
1940 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1941 12 : }
1942 12 : self.pending_directory_entries
1943 12 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1944 12 : self.put(
1945 12 : dir_key,
1946 12 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1947 : );
1948 :
1949 : // Put size
1950 12 : let size_key = slru_segment_size_to_key(kind, segno);
1951 12 : let buf = nblocks.to_le_bytes();
1952 12 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1953 12 :
1954 12 : // even if nblocks > 0, we don't insert any actual blocks here
1955 12 :
1956 12 : Ok(())
1957 12 : }
1958 :
1959 : /// Extend SLRU segment
1960 0 : pub fn put_slru_extend(
1961 0 : &mut self,
1962 0 : kind: SlruKind,
1963 0 : segno: u32,
1964 0 : nblocks: BlockNumber,
1965 0 : ) -> anyhow::Result<()> {
1966 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1967 :
1968 : // Put size
1969 0 : let size_key = slru_segment_size_to_key(kind, segno);
1970 0 : let buf = nblocks.to_le_bytes();
1971 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1972 0 : Ok(())
1973 0 : }
1974 :
1975 : /// This method is used for marking truncated SLRU files
1976 0 : pub async fn drop_slru_segment(
1977 0 : &mut self,
1978 0 : kind: SlruKind,
1979 0 : segno: u32,
1980 0 : ctx: &RequestContext,
1981 0 : ) -> anyhow::Result<()> {
1982 0 : // Remove it from the directory entry
1983 0 : let dir_key = slru_dir_to_key(kind);
1984 0 : let buf = self.get(dir_key, ctx).await?;
1985 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1986 :
1987 0 : if !dir.segments.remove(&segno) {
1988 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1989 0 : }
1990 0 : self.pending_directory_entries
1991 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1992 0 : self.put(
1993 0 : dir_key,
1994 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1995 : );
1996 :
1997 : // Delete size entry, as well as all blocks
1998 0 : self.delete(slru_segment_key_range(kind, segno));
1999 0 :
2000 0 : Ok(())
2001 0 : }
2002 :
2003 : /// Drop a relmapper file (pg_filenode.map)
2004 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
2005 0 : // TODO
2006 0 : Ok(())
2007 0 : }
2008 :
2009 : /// This method is used for marking truncated SLRU files
2010 0 : pub async fn drop_twophase_file(
2011 0 : &mut self,
2012 0 : xid: u64,
2013 0 : ctx: &RequestContext,
2014 0 : ) -> anyhow::Result<()> {
2015 : // Remove it from the directory entry
2016 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2017 0 : let newdirbuf = if self.tline.pg_version >= 17 {
2018 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2019 :
2020 0 : if !dir.xids.remove(&xid) {
2021 0 : warn!("twophase file for xid {} does not exist", xid);
2022 0 : }
2023 0 : self.pending_directory_entries
2024 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
2025 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2026 : } else {
2027 0 : let xid: u32 = u32::try_from(xid)?;
2028 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2029 :
2030 0 : if !dir.xids.remove(&xid) {
2031 0 : warn!("twophase file for xid {} does not exist", xid);
2032 0 : }
2033 0 : self.pending_directory_entries
2034 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
2035 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2036 : };
2037 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2038 0 :
2039 0 : // Delete it
2040 0 : self.delete(twophase_key_range(xid));
2041 0 :
2042 0 : Ok(())
2043 0 : }
2044 :
2045 32 : pub async fn put_file(
2046 32 : &mut self,
2047 32 : path: &str,
2048 32 : content: &[u8],
2049 32 : ctx: &RequestContext,
2050 32 : ) -> anyhow::Result<()> {
2051 32 : let key = aux_file::encode_aux_file_key(path);
2052 : // retrieve the key from the engine
2053 32 : let old_val = match self.get(key, ctx).await {
2054 8 : Ok(val) => Some(val),
2055 24 : Err(PageReconstructError::MissingKey(_)) => None,
2056 0 : Err(e) => return Err(e.into()),
2057 : };
2058 32 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2059 8 : aux_file::decode_file_value(old_val)?
2060 : } else {
2061 24 : Vec::new()
2062 : };
2063 32 : let mut other_files = Vec::with_capacity(files.len());
2064 32 : let mut modifying_file = None;
2065 40 : for file @ (p, content) in files {
2066 8 : if path == p {
2067 8 : assert!(
2068 8 : modifying_file.is_none(),
2069 0 : "duplicated entries found for {}",
2070 : path
2071 : );
2072 8 : modifying_file = Some(content);
2073 0 : } else {
2074 0 : other_files.push(file);
2075 0 : }
2076 : }
2077 32 : let mut new_files = other_files;
2078 32 : match (modifying_file, content.is_empty()) {
2079 4 : (Some(old_content), false) => {
2080 4 : self.tline
2081 4 : .aux_file_size_estimator
2082 4 : .on_update(old_content.len(), content.len());
2083 4 : new_files.push((path, content));
2084 4 : }
2085 4 : (Some(old_content), true) => {
2086 4 : self.tline
2087 4 : .aux_file_size_estimator
2088 4 : .on_remove(old_content.len());
2089 4 : // not adding the file key to the final `new_files` vec.
2090 4 : }
2091 24 : (None, false) => {
2092 24 : self.tline.aux_file_size_estimator.on_add(content.len());
2093 24 : new_files.push((path, content));
2094 24 : }
2095 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2096 : }
2097 32 : let new_val = aux_file::encode_file_value(&new_files)?;
2098 32 : self.put(key, Value::Image(new_val.into()));
2099 32 :
2100 32 : Ok(())
2101 32 : }
2102 :
2103 : ///
2104 : /// Flush changes accumulated so far to the underlying repository.
2105 : ///
2106 : /// Usually, changes made in DatadirModification are atomic, but this allows
2107 : /// you to flush them to the underlying repository before the final `commit`.
2108 : /// That allows to free up the memory used to hold the pending changes.
2109 : ///
2110 : /// Currently only used during bulk import of a data directory. In that
2111 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2112 : /// whole import fails and the timeline will be deleted anyway.
2113 : /// (Or to be precise, it will be left behind for debugging purposes and
2114 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2115 : ///
2116 : /// Note: A consequence of flushing the pending operations is that they
2117 : /// won't be visible to subsequent operations until `commit`. The function
2118 : /// retains all the metadata, but data pages are flushed. That's again OK
2119 : /// for bulk import, where you are just loading data pages and won't try to
2120 : /// modify the same pages twice.
2121 3860 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2122 3860 : // Unless we have accumulated a decent amount of changes, it's not worth it
2123 3860 : // to scan through the pending_updates list.
2124 3860 : let pending_nblocks = self.pending_nblocks;
2125 3860 : if pending_nblocks < 10000 {
2126 3860 : return Ok(());
2127 0 : }
2128 :
2129 0 : let mut writer = self.tline.writer().await;
2130 :
2131 : // Flush relation and SLRU data blocks, keep metadata.
2132 0 : if let Some(batch) = self.pending_data_batch.take() {
2133 0 : tracing::debug!(
2134 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2135 0 : batch.max_lsn,
2136 0 : self.tline.get_last_record_lsn()
2137 : );
2138 :
2139 : // This bails out on first error without modifying pending_updates.
2140 : // That's Ok, cf this function's doc comment.
2141 0 : writer.put_batch(batch, ctx).await?;
2142 0 : }
2143 :
2144 0 : if pending_nblocks != 0 {
2145 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2146 0 : self.pending_nblocks = 0;
2147 0 : }
2148 :
2149 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2150 0 : writer.update_directory_entries_count(kind, count as u64);
2151 0 : }
2152 :
2153 0 : Ok(())
2154 3860 : }
2155 :
2156 : ///
2157 : /// Finish this atomic update, writing all the updated keys to the
2158 : /// underlying timeline.
2159 : /// All the modifications in this atomic update are stamped by the specified LSN.
2160 : ///
2161 1486188 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2162 1486188 : let mut writer = self.tline.writer().await;
2163 :
2164 1486188 : let pending_nblocks = self.pending_nblocks;
2165 1486188 : self.pending_nblocks = 0;
2166 :
2167 : // Ordering: the items in this batch do not need to be in any global order, but values for
2168 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2169 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2170 : // more details.
2171 :
2172 1486188 : let metadata_batch = {
2173 1486188 : let pending_meta = self
2174 1486188 : .pending_metadata_pages
2175 1486188 : .drain()
2176 1486188 : .flat_map(|(key, values)| {
2177 548036 : values
2178 548036 : .into_iter()
2179 548036 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2180 1486188 : })
2181 1486188 : .collect::<Vec<_>>();
2182 1486188 :
2183 1486188 : if pending_meta.is_empty() {
2184 944556 : None
2185 : } else {
2186 541632 : Some(SerializedValueBatch::from_values(pending_meta))
2187 : }
2188 : };
2189 :
2190 1486188 : let data_batch = self.pending_data_batch.take();
2191 :
2192 1486188 : let maybe_batch = match (data_batch, metadata_batch) {
2193 529112 : (Some(mut data), Some(metadata)) => {
2194 529112 : data.extend(metadata);
2195 529112 : Some(data)
2196 : }
2197 286524 : (Some(data), None) => Some(data),
2198 12520 : (None, Some(metadata)) => Some(metadata),
2199 658032 : (None, None) => None,
2200 : };
2201 :
2202 1486188 : if let Some(batch) = maybe_batch {
2203 828156 : tracing::debug!(
2204 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2205 0 : batch.max_lsn,
2206 0 : self.tline.get_last_record_lsn()
2207 : );
2208 :
2209 : // This bails out on first error without modifying pending_updates.
2210 : // That's Ok, cf this function's doc comment.
2211 828156 : writer.put_batch(batch, ctx).await?;
2212 658032 : }
2213 :
2214 1486188 : if !self.pending_deletions.is_empty() {
2215 4 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2216 4 : self.pending_deletions.clear();
2217 1486184 : }
2218 :
2219 1486188 : self.pending_lsns.push(self.lsn);
2220 1777904 : for pending_lsn in self.pending_lsns.drain(..) {
2221 1777904 : // TODO(vlad): pretty sure the comment below is not valid anymore
2222 1777904 : // and we can call finish write with the latest LSN
2223 1777904 : //
2224 1777904 : // Ideally, we should be able to call writer.finish_write() only once
2225 1777904 : // with the highest LSN. However, the last_record_lsn variable in the
2226 1777904 : // timeline keeps track of the latest LSN and the immediate previous LSN
2227 1777904 : // so we need to record every LSN to not leave a gap between them.
2228 1777904 : writer.finish_write(pending_lsn);
2229 1777904 : }
2230 :
2231 1486188 : if pending_nblocks != 0 {
2232 541140 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2233 945048 : }
2234 :
2235 1486188 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2236 5932 : writer.update_directory_entries_count(kind, count as u64);
2237 5932 : }
2238 :
2239 1486188 : self.pending_metadata_bytes = 0;
2240 1486188 :
2241 1486188 : Ok(())
2242 1486188 : }
2243 :
2244 583408 : pub(crate) fn len(&self) -> usize {
2245 583408 : self.pending_metadata_pages.len()
2246 583408 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2247 583408 : + self.pending_deletions.len()
2248 583408 : }
2249 :
2250 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2251 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2252 : /// in the modification.
2253 : ///
2254 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2255 : /// page must ensure that the pages they read are already committed in Timeline, for example
2256 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2257 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2258 : /// and not data pages.
2259 573172 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2260 573172 : if !Self::is_data_key(&key) {
2261 : // Have we already updated the same key? Read the latest pending updated
2262 : // version in that case.
2263 : //
2264 : // Note: we don't check pending_deletions. It is an error to request a
2265 : // value that has been removed, deletion only avoids leaking storage.
2266 573172 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2267 31856 : if let Some((_, _, value)) = values.last() {
2268 31856 : return if let Value::Image(img) = value {
2269 31856 : Ok(img.clone())
2270 : } else {
2271 : // Currently, we never need to read back a WAL record that we
2272 : // inserted in the same "transaction". All the metadata updates
2273 : // work directly with Images, and we never need to read actual
2274 : // data pages. We could handle this if we had to, by calling
2275 : // the walredo manager, but let's keep it simple for now.
2276 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2277 0 : "unexpected pending WAL record"
2278 0 : )))
2279 : };
2280 0 : }
2281 541316 : }
2282 : } else {
2283 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2284 : // this key should never be present in pending_data_pages. We ensure this by committing
2285 : // modifications before ingesting DB create operations, which are the only kind that reads
2286 : // data pages during ingest.
2287 0 : if cfg!(debug_assertions) {
2288 0 : assert!(!self
2289 0 : .pending_data_batch
2290 0 : .as_ref()
2291 0 : .is_some_and(|b| b.updates_key(&key)));
2292 0 : }
2293 : }
2294 :
2295 : // Metadata page cache miss, or we're reading a data page.
2296 541316 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2297 541316 : self.tline.get(key, lsn, ctx).await
2298 573172 : }
2299 :
2300 1128076 : fn put(&mut self, key: Key, val: Value) {
2301 1128076 : if Self::is_data_key(&key) {
2302 555736 : self.put_data(key.to_compact(), val)
2303 : } else {
2304 572340 : self.put_metadata(key.to_compact(), val)
2305 : }
2306 1128076 : }
2307 :
2308 555736 : fn put_data(&mut self, key: CompactKey, val: Value) {
2309 555736 : let batch = self
2310 555736 : .pending_data_batch
2311 555736 : .get_or_insert_with(SerializedValueBatch::default);
2312 555736 : batch.put(key, val, self.lsn);
2313 555736 : }
2314 :
2315 572340 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2316 572340 : let values = self.pending_metadata_pages.entry(key).or_default();
2317 : // Replace the previous value if it exists at the same lsn
2318 572340 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2319 24304 : if *last_lsn == self.lsn {
2320 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2321 24304 : self.pending_metadata_bytes -= *last_value_ser_size;
2322 24304 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2323 24304 : self.pending_metadata_bytes += *last_value_ser_size;
2324 24304 :
2325 24304 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2326 24304 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2327 24304 : *last_value = val;
2328 24304 : return;
2329 0 : }
2330 548036 : }
2331 :
2332 548036 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2333 548036 : self.pending_metadata_bytes += val_serialized_size;
2334 548036 : values.push((self.lsn, val_serialized_size, val));
2335 548036 :
2336 548036 : if key == CHECKPOINT_KEY.to_compact() {
2337 444 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2338 547592 : }
2339 572340 : }
2340 :
2341 4 : fn delete(&mut self, key_range: Range<Key>) {
2342 4 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2343 4 : self.pending_deletions.push((key_range, self.lsn));
2344 4 : }
2345 : }
2346 :
2347 : /// Statistics for a DatadirModification.
2348 : #[derive(Default)]
2349 : pub struct DatadirModificationStats {
2350 : pub metadata_images: u64,
2351 : pub metadata_deltas: u64,
2352 : pub data_images: u64,
2353 : pub data_deltas: u64,
2354 : }
2355 :
2356 : /// This struct facilitates accessing either a committed key from the timeline at a
2357 : /// specific LSN, or the latest uncommitted key from a pending modification.
2358 : ///
2359 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2360 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2361 : /// need to look up the keys in the modification first before looking them up in the
2362 : /// timeline to not miss the latest updates.
2363 : #[derive(Clone, Copy)]
2364 : pub enum Version<'a> {
2365 : Lsn(Lsn),
2366 : Modified(&'a DatadirModification<'a>),
2367 : }
2368 :
2369 : impl Version<'_> {
2370 10352 : async fn get(
2371 10352 : &self,
2372 10352 : timeline: &Timeline,
2373 10352 : key: Key,
2374 10352 : ctx: &RequestContext,
2375 10352 : ) -> Result<Bytes, PageReconstructError> {
2376 10352 : match self {
2377 10312 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2378 40 : Version::Modified(modification) => modification.get(key, ctx).await,
2379 : }
2380 10352 : }
2381 :
2382 71240 : fn get_lsn(&self) -> Lsn {
2383 71240 : match self {
2384 59148 : Version::Lsn(lsn) => *lsn,
2385 12092 : Version::Modified(modification) => modification.lsn,
2386 : }
2387 71240 : }
2388 : }
2389 :
2390 : //--- Metadata structs stored in key-value pairs in the repository.
2391 :
2392 0 : #[derive(Debug, Serialize, Deserialize)]
2393 : pub(crate) struct DbDirectory {
2394 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2395 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2396 : }
2397 :
2398 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2399 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2400 : // were named like "pg_twophase/000002E5", now they're like
2401 : // "pg_twophsae/0000000A000002E4".
2402 :
2403 0 : #[derive(Debug, Serialize, Deserialize)]
2404 : pub(crate) struct TwoPhaseDirectory {
2405 : pub(crate) xids: HashSet<TransactionId>,
2406 : }
2407 :
2408 0 : #[derive(Debug, Serialize, Deserialize)]
2409 : struct TwoPhaseDirectoryV17 {
2410 : xids: HashSet<u64>,
2411 : }
2412 :
2413 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2414 : pub(crate) struct RelDirectory {
2415 : // Set of relations that exist. (relfilenode, forknum)
2416 : //
2417 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2418 : // key-value pairs, if you have a lot of relations
2419 : pub(crate) rels: HashSet<(Oid, u8)>,
2420 : }
2421 :
2422 0 : #[derive(Debug, Serialize, Deserialize)]
2423 : struct RelSizeEntry {
2424 : nblocks: u32,
2425 : }
2426 :
2427 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2428 : pub(crate) struct SlruSegmentDirectory {
2429 : // Set of SLRU segments that exist.
2430 : pub(crate) segments: HashSet<u32>,
2431 : }
2432 :
2433 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2434 : #[repr(u8)]
2435 : pub(crate) enum DirectoryKind {
2436 : Db,
2437 : TwoPhase,
2438 : Rel,
2439 : AuxFiles,
2440 : SlruSegment(SlruKind),
2441 : }
2442 :
2443 : impl DirectoryKind {
2444 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2445 11864 : pub(crate) fn offset(&self) -> usize {
2446 11864 : self.into_usize()
2447 11864 : }
2448 : }
2449 :
2450 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2451 :
2452 : #[allow(clippy::bool_assert_comparison)]
2453 : #[cfg(test)]
2454 : mod tests {
2455 : use hex_literal::hex;
2456 : use pageserver_api::{models::ShardParameters, shard::ShardStripeSize};
2457 : use utils::{
2458 : id::TimelineId,
2459 : shard::{ShardCount, ShardNumber},
2460 : };
2461 :
2462 : use super::*;
2463 :
2464 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
2465 :
2466 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2467 : #[tokio::test]
2468 4 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2469 4 : let name = "aux_files_round_trip";
2470 4 : let harness = TenantHarness::create(name).await?;
2471 4 :
2472 4 : pub const TIMELINE_ID: TimelineId =
2473 4 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2474 4 :
2475 4 : let (tenant, ctx) = harness.load().await;
2476 4 : let tline = tenant
2477 4 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2478 4 : .await?;
2479 4 : let tline = tline.raw_timeline().unwrap();
2480 4 :
2481 4 : // First modification: insert two keys
2482 4 : let mut modification = tline.begin_modification(Lsn(0x1000));
2483 4 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2484 4 : modification.set_lsn(Lsn(0x1008))?;
2485 4 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2486 4 : modification.commit(&ctx).await?;
2487 4 : let expect_1008 = HashMap::from([
2488 4 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2489 4 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2490 4 : ]);
2491 4 :
2492 4 : let io_concurrency = IoConcurrency::spawn_for_test();
2493 4 :
2494 4 : let readback = tline
2495 4 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2496 4 : .await?;
2497 4 : assert_eq!(readback, expect_1008);
2498 4 :
2499 4 : // Second modification: update one key, remove the other
2500 4 : let mut modification = tline.begin_modification(Lsn(0x2000));
2501 4 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2502 4 : modification.set_lsn(Lsn(0x2008))?;
2503 4 : modification.put_file("foo/bar2", b"", &ctx).await?;
2504 4 : modification.commit(&ctx).await?;
2505 4 : let expect_2008 =
2506 4 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2507 4 :
2508 4 : let readback = tline
2509 4 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2510 4 : .await?;
2511 4 : assert_eq!(readback, expect_2008);
2512 4 :
2513 4 : // Reading back in time works
2514 4 : let readback = tline
2515 4 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2516 4 : .await?;
2517 4 : assert_eq!(readback, expect_1008);
2518 4 :
2519 4 : Ok(())
2520 4 : }
2521 :
2522 : #[test]
2523 4 : fn gap_finding() {
2524 4 : let rel = RelTag {
2525 4 : spcnode: 1663,
2526 4 : dbnode: 208101,
2527 4 : relnode: 2620,
2528 4 : forknum: 0,
2529 4 : };
2530 4 : let base_blkno = 1;
2531 4 :
2532 4 : let base_key = rel_block_to_key(rel, base_blkno);
2533 4 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2534 4 :
2535 4 : let shard = ShardIdentity::unsharded();
2536 4 :
2537 4 : let mut previous_nblocks = 0;
2538 44 : for i in 0..10 {
2539 40 : let crnt_blkno = base_blkno + i;
2540 40 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2541 40 :
2542 40 : previous_nblocks = crnt_blkno + 1;
2543 40 :
2544 40 : if i == 0 {
2545 : // The first block we write is 1, so we should find the gap.
2546 4 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2547 : } else {
2548 36 : assert!(gaps.is_none());
2549 : }
2550 : }
2551 :
2552 : // This is an update to an already existing block. No gaps here.
2553 4 : let update_blkno = 5;
2554 4 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2555 4 : assert!(gaps.is_none());
2556 :
2557 : // This is an update past the current end block.
2558 4 : let after_gap_blkno = 20;
2559 4 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2560 4 :
2561 4 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2562 4 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2563 4 : assert_eq!(
2564 4 : gaps.unwrap(),
2565 4 : KeySpace::single(gap_start_key..after_gap_key)
2566 4 : );
2567 4 : }
2568 :
2569 : #[test]
2570 4 : fn sharded_gap_finding() {
2571 4 : let rel = RelTag {
2572 4 : spcnode: 1663,
2573 4 : dbnode: 208101,
2574 4 : relnode: 2620,
2575 4 : forknum: 0,
2576 4 : };
2577 4 :
2578 4 : let first_blkno = 6;
2579 4 :
2580 4 : // This shard will get the even blocks
2581 4 : let shard = ShardIdentity::from_params(
2582 4 : ShardNumber(0),
2583 4 : &ShardParameters {
2584 4 : count: ShardCount(2),
2585 4 : stripe_size: ShardStripeSize(1),
2586 4 : },
2587 4 : );
2588 4 :
2589 4 : // Only keys belonging to this shard are considered as gaps.
2590 4 : let mut previous_nblocks = 0;
2591 4 : let gaps =
2592 4 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2593 4 : assert!(!gaps.ranges.is_empty());
2594 12 : for gap_range in gaps.ranges {
2595 8 : let mut k = gap_range.start;
2596 16 : while k != gap_range.end {
2597 8 : assert_eq!(shard.get_shard_number(&k), shard.number);
2598 8 : k = k.next();
2599 : }
2600 : }
2601 :
2602 4 : previous_nblocks = first_blkno;
2603 4 :
2604 4 : let update_blkno = 2;
2605 4 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2606 4 : assert!(gaps.is_none());
2607 4 : }
2608 :
2609 : /*
2610 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2611 : let incremental = timeline.get_current_logical_size();
2612 : let non_incremental = timeline
2613 : .get_current_logical_size_non_incremental(lsn)
2614 : .unwrap();
2615 : assert_eq!(incremental, non_incremental);
2616 : }
2617 : */
2618 :
2619 : /*
2620 : ///
2621 : /// Test list_rels() function, with branches and dropped relations
2622 : ///
2623 : #[test]
2624 : fn test_list_rels_drop() -> Result<()> {
2625 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2626 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2627 : const TESTDB: u32 = 111;
2628 :
2629 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2630 : // after branching fails below
2631 : let mut writer = tline.begin_record(Lsn(0x10));
2632 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2633 : writer.finish()?;
2634 :
2635 : // Create a relation on the timeline
2636 : let mut writer = tline.begin_record(Lsn(0x20));
2637 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2638 : writer.finish()?;
2639 :
2640 : let writer = tline.begin_record(Lsn(0x00));
2641 : writer.finish()?;
2642 :
2643 : // Check that list_rels() lists it after LSN 2, but no before it
2644 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2645 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2646 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2647 :
2648 : // Create a branch, check that the relation is visible there
2649 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2650 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2651 : Some(timeline) => timeline,
2652 : None => panic!("Should have a local timeline"),
2653 : };
2654 : let newtline = DatadirTimelineImpl::new(newtline);
2655 : assert!(newtline
2656 : .list_rels(0, TESTDB, Lsn(0x30))?
2657 : .contains(&TESTREL_A));
2658 :
2659 : // Drop it on the branch
2660 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2661 : new_writer.drop_relation(TESTREL_A)?;
2662 : new_writer.finish()?;
2663 :
2664 : // Check that it's no longer listed on the branch after the point where it was dropped
2665 : assert!(newtline
2666 : .list_rels(0, TESTDB, Lsn(0x30))?
2667 : .contains(&TESTREL_A));
2668 : assert!(!newtline
2669 : .list_rels(0, TESTDB, Lsn(0x40))?
2670 : .contains(&TESTREL_A));
2671 :
2672 : // Run checkpoint and garbage collection and check that it's still not visible
2673 : newtline.checkpoint(CheckpointConfig::Forced)?;
2674 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2675 :
2676 : assert!(!newtline
2677 : .list_rels(0, TESTDB, Lsn(0x40))?
2678 : .contains(&TESTREL_A));
2679 :
2680 : Ok(())
2681 : }
2682 : */
2683 :
2684 : /*
2685 : #[test]
2686 : fn test_read_beyond_eof() -> Result<()> {
2687 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2688 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2689 :
2690 : make_some_layers(&tline, Lsn(0x20))?;
2691 : let mut writer = tline.begin_record(Lsn(0x60));
2692 : walingest.put_rel_page_image(
2693 : &mut writer,
2694 : TESTREL_A,
2695 : 0,
2696 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2697 : )?;
2698 : writer.finish()?;
2699 :
2700 : // Test read before rel creation. Should error out.
2701 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2702 :
2703 : // Read block beyond end of relation at different points in time.
2704 : // These reads should fall into different delta, image, and in-memory layers.
2705 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2706 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2707 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2708 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2709 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2710 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2711 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2712 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2713 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2714 :
2715 : // Test on an in-memory layer with no preceding layer
2716 : let mut writer = tline.begin_record(Lsn(0x70));
2717 : walingest.put_rel_page_image(
2718 : &mut writer,
2719 : TESTREL_B,
2720 : 0,
2721 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2722 : )?;
2723 : writer.finish()?;
2724 :
2725 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2726 :
2727 : Ok(())
2728 : }
2729 : */
2730 : }
|