Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
13 : use crate::walrecord::NeonWalRecord;
14 : use crate::{aux_file, repository::*};
15 : use anyhow::{ensure, Context};
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use pageserver_api::key::{
19 : dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
20 : relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
21 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
22 : CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
23 : };
24 : use pageserver_api::keyspace::SparseKeySpace;
25 : use pageserver_api::models::AuxFilePolicy;
26 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
27 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
28 : use postgres_ffi::BLCKSZ;
29 : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
30 : use serde::{Deserialize, Serialize};
31 : use std::collections::{hash_map, HashMap, HashSet};
32 : use std::ops::ControlFlow;
33 : use std::ops::Range;
34 : use strum::IntoEnumIterator;
35 : use tokio_util::sync::CancellationToken;
36 : use tracing::{debug, info, trace, warn};
37 : use utils::bin_ser::DeserializeError;
38 : use utils::pausable_failpoint;
39 : use utils::{bin_ser::BeSer, lsn::Lsn};
40 :
41 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
42 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
43 :
44 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
45 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
46 :
47 : #[derive(Debug)]
48 : pub enum LsnForTimestamp {
49 : /// Found commits both before and after the given timestamp
50 : Present(Lsn),
51 :
52 : /// Found no commits after the given timestamp, this means
53 : /// that the newest data in the branch is older than the given
54 : /// timestamp.
55 : ///
56 : /// All commits <= LSN happened before the given timestamp
57 : Future(Lsn),
58 :
59 : /// The queried timestamp is past our horizon we look back at (PITR)
60 : ///
61 : /// All commits > LSN happened after the given timestamp,
62 : /// but any commits < LSN might have happened before or after
63 : /// the given timestamp. We don't know because no data before
64 : /// the given lsn is available.
65 : Past(Lsn),
66 :
67 : /// We have found no commit with a timestamp,
68 : /// so we can't return anything meaningful.
69 : ///
70 : /// The associated LSN is the lower bound value we can safely
71 : /// create branches on, but no statement is made if it is
72 : /// older or newer than the timestamp.
73 : ///
74 : /// This variant can e.g. be returned right after a
75 : /// cluster import.
76 : NoData(Lsn),
77 : }
78 :
79 0 : #[derive(Debug, thiserror::Error)]
80 : pub(crate) enum CalculateLogicalSizeError {
81 : #[error("cancelled")]
82 : Cancelled,
83 :
84 : /// Something went wrong while reading the metadata we use to calculate logical size
85 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
86 : /// in the `From` implementation for this variant.
87 : #[error(transparent)]
88 : PageRead(PageReconstructError),
89 :
90 : /// Something went wrong deserializing metadata that we read to calculate logical size
91 : #[error("decode error: {0}")]
92 : Decode(#[from] DeserializeError),
93 : }
94 :
95 0 : #[derive(Debug, thiserror::Error)]
96 : pub(crate) enum CollectKeySpaceError {
97 : #[error(transparent)]
98 : Decode(#[from] DeserializeError),
99 : #[error(transparent)]
100 : PageRead(PageReconstructError),
101 : #[error("cancelled")]
102 : Cancelled,
103 : }
104 :
105 : impl From<PageReconstructError> for CollectKeySpaceError {
106 0 : fn from(err: PageReconstructError) -> Self {
107 0 : match err {
108 0 : PageReconstructError::Cancelled => Self::Cancelled,
109 0 : err => Self::PageRead(err),
110 : }
111 0 : }
112 : }
113 :
114 : impl From<PageReconstructError> for CalculateLogicalSizeError {
115 0 : fn from(pre: PageReconstructError) -> Self {
116 0 : match pre {
117 0 : PageReconstructError::Cancelled => Self::Cancelled,
118 0 : _ => Self::PageRead(pre),
119 : }
120 0 : }
121 : }
122 :
123 0 : #[derive(Debug, thiserror::Error)]
124 : pub enum RelationError {
125 : #[error("Relation Already Exists")]
126 : AlreadyExists,
127 : #[error("invalid relnode")]
128 : InvalidRelnode,
129 : #[error(transparent)]
130 : Other(#[from] anyhow::Error),
131 : }
132 :
133 : ///
134 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
135 : /// and other special kinds of files, in a versioned key-value store. The
136 : /// Timeline struct provides the key-value store.
137 : ///
138 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
139 : /// implementation, and might be moved into a separate struct later.
140 : impl Timeline {
141 : /// Start ingesting a WAL record, or other atomic modification of
142 : /// the timeline.
143 : ///
144 : /// This provides a transaction-like interface to perform a bunch
145 : /// of modifications atomically.
146 : ///
147 : /// To ingest a WAL record, call begin_modification(lsn) to get a
148 : /// DatadirModification object. Use the functions in the object to
149 : /// modify the repository state, updating all the pages and metadata
150 : /// that the WAL record affects. When you're done, call commit() to
151 : /// commit the changes.
152 : ///
153 : /// Lsn stored in modification is advanced by `ingest_record` and
154 : /// is used by `commit()` to update `last_record_lsn`.
155 : ///
156 : /// Calling commit() will flush all the changes and reset the state,
157 : /// so the `DatadirModification` struct can be reused to perform the next modification.
158 : ///
159 : /// Note that any pending modifications you make through the
160 : /// modification object won't be visible to calls to the 'get' and list
161 : /// functions of the timeline until you finish! And if you update the
162 : /// same page twice, the last update wins.
163 : ///
164 805188 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
165 805188 : where
166 805188 : Self: Sized,
167 805188 : {
168 805188 : DatadirModification {
169 805188 : tline: self,
170 805188 : pending_lsns: Vec::new(),
171 805188 : pending_metadata_pages: HashMap::new(),
172 805188 : pending_data_pages: Vec::new(),
173 805188 : pending_zero_data_pages: Default::default(),
174 805188 : pending_deletions: Vec::new(),
175 805188 : pending_nblocks: 0,
176 805188 : pending_directory_entries: Vec::new(),
177 805188 : pending_bytes: 0,
178 805188 : lsn,
179 805188 : }
180 805188 : }
181 :
182 : //------------------------------------------------------------------------------
183 : // Public GET functions
184 : //------------------------------------------------------------------------------
185 :
186 : /// Look up given page version.
187 55152 : pub(crate) async fn get_rel_page_at_lsn(
188 55152 : &self,
189 55152 : tag: RelTag,
190 55152 : blknum: BlockNumber,
191 55152 : version: Version<'_>,
192 55152 : ctx: &RequestContext,
193 55152 : ) -> Result<Bytes, PageReconstructError> {
194 55152 : if tag.relnode == 0 {
195 0 : return Err(PageReconstructError::Other(
196 0 : RelationError::InvalidRelnode.into(),
197 0 : ));
198 55152 : }
199 :
200 55152 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
201 55152 : if blknum >= nblocks {
202 0 : debug!(
203 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
204 0 : tag,
205 0 : blknum,
206 0 : version.get_lsn(),
207 : nblocks
208 : );
209 0 : return Ok(ZERO_PAGE.clone());
210 55152 : }
211 55152 :
212 55152 : let key = rel_block_to_key(tag, blknum);
213 55152 : version.get(self, key, ctx).await
214 55152 : }
215 :
216 : // Get size of a database in blocks
217 0 : pub(crate) async fn get_db_size(
218 0 : &self,
219 0 : spcnode: Oid,
220 0 : dbnode: Oid,
221 0 : version: Version<'_>,
222 0 : ctx: &RequestContext,
223 0 : ) -> Result<usize, PageReconstructError> {
224 0 : let mut total_blocks = 0;
225 :
226 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
227 :
228 0 : for rel in rels {
229 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
230 0 : total_blocks += n_blocks as usize;
231 : }
232 0 : Ok(total_blocks)
233 0 : }
234 :
235 : /// Get size of a relation file
236 73302 : pub(crate) async fn get_rel_size(
237 73302 : &self,
238 73302 : tag: RelTag,
239 73302 : version: Version<'_>,
240 73302 : ctx: &RequestContext,
241 73302 : ) -> Result<BlockNumber, PageReconstructError> {
242 73302 : if tag.relnode == 0 {
243 0 : return Err(PageReconstructError::Other(
244 0 : RelationError::InvalidRelnode.into(),
245 0 : ));
246 73302 : }
247 :
248 73302 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
249 57882 : return Ok(nblocks);
250 15420 : }
251 15420 :
252 15420 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
253 0 : && !self.get_rel_exists(tag, version, ctx).await?
254 : {
255 : // FIXME: Postgres sometimes calls smgrcreate() to create
256 : // FSM, and smgrnblocks() on it immediately afterwards,
257 : // without extending it. Tolerate that by claiming that
258 : // any non-existent FSM fork has size 0.
259 0 : return Ok(0);
260 15420 : }
261 15420 :
262 15420 : let key = rel_size_to_key(tag);
263 15420 : let mut buf = version.get(self, key, ctx).await?;
264 15408 : let nblocks = buf.get_u32_le();
265 15408 :
266 15408 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
267 15408 :
268 15408 : Ok(nblocks)
269 73302 : }
270 :
271 : /// Does relation exist?
272 18150 : pub(crate) async fn get_rel_exists(
273 18150 : &self,
274 18150 : tag: RelTag,
275 18150 : version: Version<'_>,
276 18150 : ctx: &RequestContext,
277 18150 : ) -> Result<bool, PageReconstructError> {
278 18150 : if tag.relnode == 0 {
279 0 : return Err(PageReconstructError::Other(
280 0 : RelationError::InvalidRelnode.into(),
281 0 : ));
282 18150 : }
283 :
284 : // first try to lookup relation in cache
285 18150 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
286 18096 : return Ok(true);
287 54 : }
288 : // then check if the database was already initialized.
289 : // get_rel_exists can be called before dbdir is created.
290 54 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
291 54 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
292 54 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
293 0 : return Ok(false);
294 54 : }
295 54 : // fetch directory listing
296 54 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
297 54 : let buf = version.get(self, key, ctx).await?;
298 :
299 54 : let dir = RelDirectory::des(&buf)?;
300 54 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
301 18150 : }
302 :
303 : /// Get a list of all existing relations in given tablespace and database.
304 : ///
305 : /// # Cancel-Safety
306 : ///
307 : /// This method is cancellation-safe.
308 0 : pub(crate) async fn list_rels(
309 0 : &self,
310 0 : spcnode: Oid,
311 0 : dbnode: Oid,
312 0 : version: Version<'_>,
313 0 : ctx: &RequestContext,
314 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
315 0 : // fetch directory listing
316 0 : let key = rel_dir_to_key(spcnode, dbnode);
317 0 : let buf = version.get(self, key, ctx).await?;
318 :
319 0 : let dir = RelDirectory::des(&buf)?;
320 0 : let rels: HashSet<RelTag> =
321 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
322 0 : spcnode,
323 0 : dbnode,
324 0 : relnode: *relnode,
325 0 : forknum: *forknum,
326 0 : }));
327 0 :
328 0 : Ok(rels)
329 0 : }
330 :
331 : /// Get the whole SLRU segment
332 0 : pub(crate) async fn get_slru_segment(
333 0 : &self,
334 0 : kind: SlruKind,
335 0 : segno: u32,
336 0 : lsn: Lsn,
337 0 : ctx: &RequestContext,
338 0 : ) -> Result<Bytes, PageReconstructError> {
339 0 : let n_blocks = self
340 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
341 0 : .await?;
342 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
343 0 : for blkno in 0..n_blocks {
344 0 : let block = self
345 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
346 0 : .await?;
347 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
348 : }
349 0 : Ok(segment.freeze())
350 0 : }
351 :
352 : /// Look up given SLRU page version.
353 0 : pub(crate) async fn get_slru_page_at_lsn(
354 0 : &self,
355 0 : kind: SlruKind,
356 0 : segno: u32,
357 0 : blknum: BlockNumber,
358 0 : lsn: Lsn,
359 0 : ctx: &RequestContext,
360 0 : ) -> Result<Bytes, PageReconstructError> {
361 0 : let key = slru_block_to_key(kind, segno, blknum);
362 0 : self.get(key, lsn, ctx).await
363 0 : }
364 :
365 : /// Get size of an SLRU segment
366 0 : pub(crate) async fn get_slru_segment_size(
367 0 : &self,
368 0 : kind: SlruKind,
369 0 : segno: u32,
370 0 : version: Version<'_>,
371 0 : ctx: &RequestContext,
372 0 : ) -> Result<BlockNumber, PageReconstructError> {
373 0 : let key = slru_segment_size_to_key(kind, segno);
374 0 : let mut buf = version.get(self, key, ctx).await?;
375 0 : Ok(buf.get_u32_le())
376 0 : }
377 :
378 : /// Get size of an SLRU segment
379 0 : pub(crate) async fn get_slru_segment_exists(
380 0 : &self,
381 0 : kind: SlruKind,
382 0 : segno: u32,
383 0 : version: Version<'_>,
384 0 : ctx: &RequestContext,
385 0 : ) -> Result<bool, PageReconstructError> {
386 0 : // fetch directory listing
387 0 : let key = slru_dir_to_key(kind);
388 0 : let buf = version.get(self, key, ctx).await?;
389 :
390 0 : let dir = SlruSegmentDirectory::des(&buf)?;
391 0 : Ok(dir.segments.contains(&segno))
392 0 : }
393 :
394 : /// Locate LSN, such that all transactions that committed before
395 : /// 'search_timestamp' are visible, but nothing newer is.
396 : ///
397 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
398 : /// so it's not well defined which LSN you get if there were multiple commits
399 : /// "in flight" at that point in time.
400 : ///
401 0 : pub(crate) async fn find_lsn_for_timestamp(
402 0 : &self,
403 0 : search_timestamp: TimestampTz,
404 0 : cancel: &CancellationToken,
405 0 : ctx: &RequestContext,
406 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
407 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
408 :
409 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
410 0 : // We use this method to figure out the branching LSN for the new branch, but the
411 0 : // GC cutoff could be before the branching point and we cannot create a new branch
412 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
413 0 : // on the safe side.
414 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
415 0 : let max_lsn = self.get_last_record_lsn();
416 0 :
417 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
418 0 : // LSN divided by 8.
419 0 : let mut low = min_lsn.0 / 8;
420 0 : let mut high = max_lsn.0 / 8 + 1;
421 0 :
422 0 : let mut found_smaller = false;
423 0 : let mut found_larger = false;
424 :
425 0 : while low < high {
426 0 : if cancel.is_cancelled() {
427 0 : return Err(PageReconstructError::Cancelled);
428 0 : }
429 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
430 0 : let mid = (high + low) / 2;
431 :
432 0 : let cmp = self
433 0 : .is_latest_commit_timestamp_ge_than(
434 0 : search_timestamp,
435 0 : Lsn(mid * 8),
436 0 : &mut found_smaller,
437 0 : &mut found_larger,
438 0 : ctx,
439 0 : )
440 0 : .await?;
441 :
442 0 : if cmp {
443 0 : high = mid;
444 0 : } else {
445 0 : low = mid + 1;
446 0 : }
447 : }
448 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
449 : // so the LSN of the last commit record before or at `search_timestamp`.
450 : // Remove one from `low` to get `t`.
451 : //
452 : // FIXME: it would be better to get the LSN of the previous commit.
453 : // Otherwise, if you restore to the returned LSN, the database will
454 : // include physical changes from later commits that will be marked
455 : // as aborted, and will need to be vacuumed away.
456 0 : let commit_lsn = Lsn((low - 1) * 8);
457 0 : match (found_smaller, found_larger) {
458 : (false, false) => {
459 : // This can happen if no commit records have been processed yet, e.g.
460 : // just after importing a cluster.
461 0 : Ok(LsnForTimestamp::NoData(min_lsn))
462 : }
463 : (false, true) => {
464 : // Didn't find any commit timestamps smaller than the request
465 0 : Ok(LsnForTimestamp::Past(min_lsn))
466 : }
467 0 : (true, _) if commit_lsn < min_lsn => {
468 0 : // the search above did set found_smaller to true but it never increased the lsn.
469 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
470 0 : // below the min_lsn. We should never do that.
471 0 : Ok(LsnForTimestamp::Past(min_lsn))
472 : }
473 : (true, false) => {
474 : // Only found commits with timestamps smaller than the request.
475 : // It's still a valid case for branch creation, return it.
476 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
477 : // case, anyway.
478 0 : Ok(LsnForTimestamp::Future(commit_lsn))
479 : }
480 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
481 : }
482 0 : }
483 :
484 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
485 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
486 : ///
487 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
488 : /// with a smaller/larger timestamp.
489 : ///
490 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
491 0 : &self,
492 0 : search_timestamp: TimestampTz,
493 0 : probe_lsn: Lsn,
494 0 : found_smaller: &mut bool,
495 0 : found_larger: &mut bool,
496 0 : ctx: &RequestContext,
497 0 : ) -> Result<bool, PageReconstructError> {
498 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
499 0 : if timestamp >= search_timestamp {
500 0 : *found_larger = true;
501 0 : return ControlFlow::Break(true);
502 0 : } else {
503 0 : *found_smaller = true;
504 0 : }
505 0 : ControlFlow::Continue(())
506 0 : })
507 0 : .await
508 0 : }
509 :
510 : /// Obtain the possible timestamp range for the given lsn.
511 : ///
512 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
513 0 : pub(crate) async fn get_timestamp_for_lsn(
514 0 : &self,
515 0 : probe_lsn: Lsn,
516 0 : ctx: &RequestContext,
517 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
518 0 : let mut max: Option<TimestampTz> = None;
519 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
520 0 : if let Some(max_prev) = max {
521 0 : max = Some(max_prev.max(timestamp));
522 0 : } else {
523 0 : max = Some(timestamp);
524 0 : }
525 0 : ControlFlow::Continue(())
526 0 : })
527 0 : .await?;
528 :
529 0 : Ok(max)
530 0 : }
531 :
532 : /// Runs the given function on all the timestamps for a given lsn
533 : ///
534 : /// The return value is either given by the closure, or set to the `Default`
535 : /// impl's output.
536 0 : async fn map_all_timestamps<T: Default>(
537 0 : &self,
538 0 : probe_lsn: Lsn,
539 0 : ctx: &RequestContext,
540 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
541 0 : ) -> Result<T, PageReconstructError> {
542 0 : for segno in self
543 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
544 0 : .await?
545 : {
546 0 : let nblocks = self
547 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
548 0 : .await?;
549 0 : for blknum in (0..nblocks).rev() {
550 0 : let clog_page = self
551 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
552 0 : .await?;
553 :
554 0 : if clog_page.len() == BLCKSZ as usize + 8 {
555 0 : let mut timestamp_bytes = [0u8; 8];
556 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
557 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
558 0 :
559 0 : match f(timestamp) {
560 0 : ControlFlow::Break(b) => return Ok(b),
561 0 : ControlFlow::Continue(()) => (),
562 : }
563 0 : }
564 : }
565 : }
566 0 : Ok(Default::default())
567 0 : }
568 :
569 0 : pub(crate) async fn get_slru_keyspace(
570 0 : &self,
571 0 : version: Version<'_>,
572 0 : ctx: &RequestContext,
573 0 : ) -> Result<KeySpace, PageReconstructError> {
574 0 : let mut accum = KeySpaceAccum::new();
575 :
576 0 : for kind in SlruKind::iter() {
577 0 : let mut segments: Vec<u32> = self
578 0 : .list_slru_segments(kind, version, ctx)
579 0 : .await?
580 0 : .into_iter()
581 0 : .collect();
582 0 : segments.sort_unstable();
583 :
584 0 : for seg in segments {
585 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
586 :
587 0 : accum.add_range(
588 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
589 0 : );
590 : }
591 : }
592 :
593 0 : Ok(accum.to_keyspace())
594 0 : }
595 :
596 : /// Get a list of SLRU segments
597 0 : pub(crate) async fn list_slru_segments(
598 0 : &self,
599 0 : kind: SlruKind,
600 0 : version: Version<'_>,
601 0 : ctx: &RequestContext,
602 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
603 0 : // fetch directory entry
604 0 : let key = slru_dir_to_key(kind);
605 :
606 0 : let buf = version.get(self, key, ctx).await?;
607 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
608 0 : }
609 :
610 0 : pub(crate) async fn get_relmap_file(
611 0 : &self,
612 0 : spcnode: Oid,
613 0 : dbnode: Oid,
614 0 : version: Version<'_>,
615 0 : ctx: &RequestContext,
616 0 : ) -> Result<Bytes, PageReconstructError> {
617 0 : let key = relmap_file_key(spcnode, dbnode);
618 :
619 0 : let buf = version.get(self, key, ctx).await?;
620 0 : Ok(buf)
621 0 : }
622 :
623 876 : pub(crate) async fn list_dbdirs(
624 876 : &self,
625 876 : lsn: Lsn,
626 876 : ctx: &RequestContext,
627 876 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
628 : // fetch directory entry
629 9307 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
630 :
631 876 : Ok(DbDirectory::des(&buf)?.dbdirs)
632 876 : }
633 :
634 0 : pub(crate) async fn get_twophase_file(
635 0 : &self,
636 0 : xid: u64,
637 0 : lsn: Lsn,
638 0 : ctx: &RequestContext,
639 0 : ) -> Result<Bytes, PageReconstructError> {
640 0 : let key = twophase_file_key(xid);
641 0 : let buf = self.get(key, lsn, ctx).await?;
642 0 : Ok(buf)
643 0 : }
644 :
645 882 : pub(crate) async fn list_twophase_files(
646 882 : &self,
647 882 : lsn: Lsn,
648 882 : ctx: &RequestContext,
649 882 : ) -> Result<HashSet<u64>, PageReconstructError> {
650 : // fetch directory entry
651 9450 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
652 :
653 882 : if self.pg_version >= 17 {
654 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
655 : } else {
656 882 : Ok(TwoPhaseDirectory::des(&buf)?
657 : .xids
658 882 : .iter()
659 882 : .map(|x| u64::from(*x))
660 882 : .collect())
661 : }
662 882 : }
663 :
664 0 : pub(crate) async fn get_control_file(
665 0 : &self,
666 0 : lsn: Lsn,
667 0 : ctx: &RequestContext,
668 0 : ) -> Result<Bytes, PageReconstructError> {
669 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
670 0 : }
671 :
672 36 : pub(crate) async fn get_checkpoint(
673 36 : &self,
674 36 : lsn: Lsn,
675 36 : ctx: &RequestContext,
676 36 : ) -> Result<Bytes, PageReconstructError> {
677 36 : self.get(CHECKPOINT_KEY, lsn, ctx).await
678 36 : }
679 :
680 48 : async fn list_aux_files_v1(
681 48 : &self,
682 48 : lsn: Lsn,
683 48 : ctx: &RequestContext,
684 48 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
685 48 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
686 30 : Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
687 18 : Err(e) => {
688 18 : // This is expected: historical databases do not have the key.
689 18 : debug!("Failed to get info about AUX files: {}", e);
690 18 : Ok(HashMap::new())
691 : }
692 : }
693 48 : }
694 :
695 72 : async fn list_aux_files_v2(
696 72 : &self,
697 72 : lsn: Lsn,
698 72 : ctx: &RequestContext,
699 72 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
700 72 : let kv = self
701 72 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
702 0 : .await?;
703 72 : let mut result = HashMap::new();
704 72 : let mut sz = 0;
705 180 : for (_, v) in kv {
706 108 : let v = v?;
707 108 : let v = aux_file::decode_file_value_bytes(&v)
708 108 : .context("value decode")
709 108 : .map_err(PageReconstructError::Other)?;
710 210 : for (fname, content) in v {
711 102 : sz += fname.len();
712 102 : sz += content.len();
713 102 : result.insert(fname, content);
714 102 : }
715 : }
716 72 : self.aux_file_size_estimator.on_initial(sz);
717 72 : Ok(result)
718 72 : }
719 :
720 0 : pub(crate) async fn trigger_aux_file_size_computation(
721 0 : &self,
722 0 : lsn: Lsn,
723 0 : ctx: &RequestContext,
724 0 : ) -> Result<(), PageReconstructError> {
725 0 : let current_policy = self.last_aux_file_policy.load();
726 0 : if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
727 0 : self.list_aux_files_v2(lsn, ctx).await?;
728 0 : }
729 0 : Ok(())
730 0 : }
731 :
732 78 : pub(crate) async fn list_aux_files(
733 78 : &self,
734 78 : lsn: Lsn,
735 78 : ctx: &RequestContext,
736 78 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
737 78 : let current_policy = self.last_aux_file_policy.load();
738 78 : match current_policy {
739 : Some(AuxFilePolicy::V1) => {
740 6 : let res = self.list_aux_files_v1(lsn, ctx).await?;
741 6 : let empty_str = if res.is_empty() { ", empty" } else { "" };
742 6 : warn!(
743 0 : "this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})"
744 : );
745 6 : Ok(res)
746 : }
747 : None => {
748 0 : let res = self.list_aux_files_v1(lsn, ctx).await?;
749 0 : if !res.is_empty() {
750 0 : warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
751 0 : }
752 0 : Ok(res)
753 : }
754 66 : Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
755 : Some(AuxFilePolicy::CrossValidation) => {
756 6 : let v1_result = self.list_aux_files_v1(lsn, ctx).await;
757 6 : let v2_result = self.list_aux_files_v2(lsn, ctx).await;
758 6 : match (v1_result, v2_result) {
759 6 : (Ok(v1), Ok(v2)) => {
760 6 : if v1 != v2 {
761 0 : tracing::error!(
762 0 : "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
763 : );
764 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
765 0 : "unmatched aux file v1 v2 result"
766 0 : )));
767 6 : }
768 6 : Ok(v1)
769 : }
770 0 : (Ok(_), Err(v2)) => {
771 0 : tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
772 0 : Err(v2)
773 : }
774 0 : (Err(v1), Ok(_)) => {
775 0 : tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
776 0 : Err(v1)
777 : }
778 0 : (Err(_), Err(v2)) => Err(v2),
779 : }
780 : }
781 : }
782 78 : }
783 :
784 0 : pub(crate) async fn get_replorigins(
785 0 : &self,
786 0 : lsn: Lsn,
787 0 : ctx: &RequestContext,
788 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
789 0 : let kv = self
790 0 : .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
791 0 : .await?;
792 0 : let mut result = HashMap::new();
793 0 : for (k, v) in kv {
794 0 : let v = v?;
795 0 : let origin_id = k.field6 as RepOriginId;
796 0 : let origin_lsn = Lsn::des(&v).unwrap();
797 0 : if origin_lsn != Lsn::INVALID {
798 0 : result.insert(origin_id, origin_lsn);
799 0 : }
800 : }
801 0 : Ok(result)
802 0 : }
803 :
804 : /// Does the same as get_current_logical_size but counted on demand.
805 : /// Used to initialize the logical size tracking on startup.
806 : ///
807 : /// Only relation blocks are counted currently. That excludes metadata,
808 : /// SLRUs, twophase files etc.
809 : ///
810 : /// # Cancel-Safety
811 : ///
812 : /// This method is cancellation-safe.
813 0 : pub(crate) async fn get_current_logical_size_non_incremental(
814 0 : &self,
815 0 : lsn: Lsn,
816 0 : ctx: &RequestContext,
817 0 : ) -> Result<u64, CalculateLogicalSizeError> {
818 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
819 :
820 : // Fetch list of database dirs and iterate them
821 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
822 0 : let dbdir = DbDirectory::des(&buf)?;
823 :
824 0 : let mut total_size: u64 = 0;
825 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
826 0 : for rel in self
827 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
828 0 : .await?
829 : {
830 0 : if self.cancel.is_cancelled() {
831 0 : return Err(CalculateLogicalSizeError::Cancelled);
832 0 : }
833 0 : let relsize_key = rel_size_to_key(rel);
834 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
835 0 : let relsize = buf.get_u32_le();
836 0 :
837 0 : total_size += relsize as u64;
838 : }
839 : }
840 0 : Ok(total_size * BLCKSZ as u64)
841 0 : }
842 :
843 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
844 : /// for gc-compaction.
845 : ///
846 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
847 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
848 : /// be kept only for a specific range of LSN.
849 : ///
850 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
851 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
852 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
853 : /// determine which keys to retain/drop for gc-compaction.
854 : ///
855 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
856 : /// to be retained for each of the branch LSN.
857 : ///
858 : /// The return value is (dense keyspace, sparse keyspace).
859 78 : pub(crate) async fn collect_gc_compaction_keyspace(
860 78 : &self,
861 78 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
862 78 : let metadata_key_begin = Key::metadata_key_range().start;
863 78 : let aux_v1_key = AUX_FILES_KEY;
864 78 : let dense_keyspace = KeySpace {
865 78 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
866 78 : };
867 78 : Ok((
868 78 : dense_keyspace,
869 78 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
870 78 : ))
871 78 : }
872 :
873 : ///
874 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
875 : /// Anything that's not listed maybe removed from the underlying storage (from
876 : /// that LSN forwards).
877 : ///
878 : /// The return value is (dense keyspace, sparse keyspace).
879 876 : pub(crate) async fn collect_keyspace(
880 876 : &self,
881 876 : lsn: Lsn,
882 876 : ctx: &RequestContext,
883 876 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
884 876 : // Iterate through key ranges, greedily packing them into partitions
885 876 : let mut result = KeySpaceAccum::new();
886 876 :
887 876 : // The dbdir metadata always exists
888 876 : result.add_key(DBDIR_KEY);
889 :
890 : // Fetch list of database dirs and iterate them
891 9307 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
892 876 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
893 876 :
894 876 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
895 876 : for ((spcnode, dbnode), has_relmap_file) in dbs {
896 0 : if has_relmap_file {
897 0 : result.add_key(relmap_file_key(spcnode, dbnode));
898 0 : }
899 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
900 :
901 0 : let mut rels: Vec<RelTag> = self
902 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
903 0 : .await?
904 0 : .into_iter()
905 0 : .collect();
906 0 : rels.sort_unstable();
907 0 : for rel in rels {
908 0 : let relsize_key = rel_size_to_key(rel);
909 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
910 0 : let relsize = buf.get_u32_le();
911 0 :
912 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
913 0 : result.add_key(relsize_key);
914 : }
915 : }
916 :
917 : // Iterate SLRUs next
918 2628 : for kind in [
919 876 : SlruKind::Clog,
920 876 : SlruKind::MultiXactMembers,
921 876 : SlruKind::MultiXactOffsets,
922 : ] {
923 2628 : let slrudir_key = slru_dir_to_key(kind);
924 2628 : result.add_key(slrudir_key);
925 28775 : let buf = self.get(slrudir_key, lsn, ctx).await?;
926 2628 : let dir = SlruSegmentDirectory::des(&buf)?;
927 2628 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
928 2628 : segments.sort_unstable();
929 2628 : for segno in segments {
930 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
931 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
932 0 : let segsize = buf.get_u32_le();
933 0 :
934 0 : result.add_range(
935 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
936 0 : );
937 0 : result.add_key(segsize_key);
938 : }
939 : }
940 :
941 : // Then pg_twophase
942 876 : result.add_key(TWOPHASEDIR_KEY);
943 :
944 876 : let mut xids: Vec<u64> = self
945 876 : .list_twophase_files(lsn, ctx)
946 9448 : .await?
947 876 : .iter()
948 876 : .cloned()
949 876 : .collect();
950 876 : xids.sort_unstable();
951 876 : for xid in xids {
952 0 : result.add_key(twophase_file_key(xid));
953 0 : }
954 :
955 876 : result.add_key(CONTROLFILE_KEY);
956 876 : result.add_key(CHECKPOINT_KEY);
957 876 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
958 12 : result.add_key(AUX_FILES_KEY);
959 864 : }
960 :
961 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
962 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
963 : // and the keys will not be garbage-colllected.
964 : #[cfg(test)]
965 : {
966 876 : let guard = self.extra_test_dense_keyspace.load();
967 876 : for kr in &guard.ranges {
968 0 : result.add_range(kr.clone());
969 0 : }
970 : }
971 :
972 876 : let dense_keyspace = result.to_keyspace();
973 876 : let sparse_keyspace = SparseKeySpace(KeySpace {
974 876 : ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
975 876 : });
976 876 :
977 876 : if cfg!(debug_assertions) {
978 : // Verify if the sparse keyspaces are ordered and non-overlapping.
979 :
980 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
981 : // category of sparse keys are split into their own image/delta files. If there
982 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
983 : // and we want the developer to keep the keyspaces separated.
984 :
985 876 : let ranges = &sparse_keyspace.0.ranges;
986 :
987 : // TODO: use a single overlaps_with across the codebase
988 876 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
989 876 : !(a.end <= b.start || b.end <= a.start)
990 876 : }
991 1752 : for i in 0..ranges.len() {
992 1752 : for j in 0..i {
993 876 : if overlaps_with(&ranges[i], &ranges[j]) {
994 0 : panic!(
995 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
996 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
997 0 : );
998 876 : }
999 : }
1000 : }
1001 876 : for i in 1..ranges.len() {
1002 876 : assert!(
1003 876 : ranges[i - 1].end <= ranges[i].start,
1004 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1005 0 : ranges[i - 1].start,
1006 0 : ranges[i - 1].end,
1007 0 : ranges[i].start,
1008 0 : ranges[i].end
1009 : );
1010 : }
1011 0 : }
1012 :
1013 876 : Ok((dense_keyspace, sparse_keyspace))
1014 876 : }
1015 :
1016 : /// Get cached size of relation if it not updated after specified LSN
1017 1345620 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
1018 1345620 : let rel_size_cache = self.rel_size_cache.read().unwrap();
1019 1345620 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
1020 1345554 : if lsn >= *cached_lsn {
1021 1330116 : return Some(*nblocks);
1022 15438 : }
1023 66 : }
1024 15504 : None
1025 1345620 : }
1026 :
1027 : /// Update cached relation size if there is no more recent update
1028 15408 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1029 15408 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1030 15408 :
1031 15408 : if lsn < rel_size_cache.complete_as_of {
1032 : // Do not cache old values. It's safe to cache the size on read, as long as
1033 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1034 : // never evict values from the cache, so if the relation size changed after
1035 : // 'lsn', the new value is already in the cache.
1036 0 : return;
1037 15408 : }
1038 15408 :
1039 15408 : match rel_size_cache.map.entry(tag) {
1040 15408 : hash_map::Entry::Occupied(mut entry) => {
1041 15408 : let cached_lsn = entry.get_mut();
1042 15408 : if lsn >= cached_lsn.0 {
1043 0 : *cached_lsn = (lsn, nblocks);
1044 15408 : }
1045 : }
1046 0 : hash_map::Entry::Vacant(entry) => {
1047 0 : entry.insert((lsn, nblocks));
1048 0 : }
1049 : }
1050 15408 : }
1051 :
1052 : /// Store cached relation size
1053 866196 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1054 866196 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1055 866196 : rel_size_cache.map.insert(tag, (lsn, nblocks));
1056 866196 : }
1057 :
1058 : /// Remove cached relation size
1059 6 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1060 6 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1061 6 : rel_size_cache.map.remove(tag);
1062 6 : }
1063 : }
1064 :
1065 : /// DatadirModification represents an operation to ingest an atomic set of
1066 : /// updates to the repository.
1067 : ///
1068 : /// It is created by the 'begin_record' function. It is called for each WAL
1069 : /// record, so that all the modifications by a one WAL record appear atomic.
1070 : pub struct DatadirModification<'a> {
1071 : /// The timeline this modification applies to. You can access this to
1072 : /// read the state, but note that any pending updates are *not* reflected
1073 : /// in the state in 'tline' yet.
1074 : pub tline: &'a Timeline,
1075 :
1076 : /// Current LSN of the modification
1077 : lsn: Lsn,
1078 :
1079 : // The modifications are not applied directly to the underlying key-value store.
1080 : // The put-functions add the modifications here, and they are flushed to the
1081 : // underlying key-value store by the 'finish' function.
1082 : pending_lsns: Vec<Lsn>,
1083 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1084 : pending_nblocks: i64,
1085 :
1086 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1087 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1088 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1089 :
1090 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1091 : /// which keys are stored here.
1092 : pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>,
1093 :
1094 : // Sometimes during ingest, for example when extending a relation, we would like to write a zero page. However,
1095 : // if we encounter a write from postgres in the same wal record, we will drop this entry.
1096 : //
1097 : // Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed
1098 : // at the end of each wal record, and all these writes implicitly are at lsn Self::lsn
1099 : pending_zero_data_pages: HashSet<CompactKey>,
1100 :
1101 : /// For special "directory" keys that store key-value maps, track the size of the map
1102 : /// if it was updated in this modification.
1103 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
1104 :
1105 : /// An **approximation** of how large our EphemeralFile write will be when committed.
1106 : pending_bytes: usize,
1107 : }
1108 :
1109 : impl<'a> DatadirModification<'a> {
1110 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1111 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1112 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1113 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1114 :
1115 : /// Get the current lsn
1116 1254168 : pub(crate) fn get_lsn(&self) -> Lsn {
1117 1254168 : self.lsn
1118 1254168 : }
1119 :
1120 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1121 0 : self.pending_bytes
1122 0 : }
1123 :
1124 0 : pub(crate) fn has_dirty_data_pages(&self) -> bool {
1125 0 : (!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty())
1126 0 : }
1127 :
1128 : /// Set the current lsn
1129 437574 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1130 437574 : ensure!(
1131 437574 : lsn >= self.lsn,
1132 0 : "setting an older lsn {} than {} is not allowed",
1133 : lsn,
1134 : self.lsn
1135 : );
1136 :
1137 : // If we are advancing LSN, then state from previous wal record should have been flushed.
1138 437574 : assert!(self.pending_zero_data_pages.is_empty());
1139 :
1140 437574 : if lsn > self.lsn {
1141 437574 : self.pending_lsns.push(self.lsn);
1142 437574 : self.lsn = lsn;
1143 437574 : }
1144 437574 : Ok(())
1145 437574 : }
1146 :
1147 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1148 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1149 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1150 : ///
1151 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1152 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1153 : /// via [`Self::get`] as soon as their record has been ingested.
1154 2988336 : fn is_data_key(key: &Key) -> bool {
1155 2988336 : key.is_rel_block_key() || key.is_slru_block_key()
1156 2988336 : }
1157 :
1158 : /// Initialize a completely new repository.
1159 : ///
1160 : /// This inserts the directory metadata entries that are assumed to
1161 : /// always exist.
1162 534 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1163 534 : let buf = DbDirectory::ser(&DbDirectory {
1164 534 : dbdirs: HashMap::new(),
1165 534 : })?;
1166 534 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
1167 534 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1168 534 :
1169 534 : // Create AuxFilesDirectory
1170 534 : self.init_aux_dir()?;
1171 :
1172 534 : let buf = if self.tline.pg_version >= 17 {
1173 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1174 0 : xids: HashSet::new(),
1175 0 : })
1176 : } else {
1177 534 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1178 534 : xids: HashSet::new(),
1179 534 : })
1180 0 : }?;
1181 534 : self.pending_directory_entries
1182 534 : .push((DirectoryKind::TwoPhase, 0));
1183 534 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1184 :
1185 534 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1186 534 : let empty_dir = Value::Image(buf);
1187 534 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1188 534 : self.pending_directory_entries
1189 534 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1190 534 : self.put(
1191 534 : slru_dir_to_key(SlruKind::MultiXactMembers),
1192 534 : empty_dir.clone(),
1193 534 : );
1194 534 : self.pending_directory_entries
1195 534 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1196 534 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1197 534 : self.pending_directory_entries
1198 534 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1199 534 :
1200 534 : Ok(())
1201 534 : }
1202 :
1203 : #[cfg(test)]
1204 528 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1205 528 : self.init_empty()?;
1206 528 : self.put_control_file(bytes::Bytes::from_static(
1207 528 : b"control_file contents do not matter",
1208 528 : ))
1209 528 : .context("put_control_file")?;
1210 528 : self.put_checkpoint(bytes::Bytes::from_static(
1211 528 : b"checkpoint_file contents do not matter",
1212 528 : ))
1213 528 : .context("put_checkpoint_file")?;
1214 528 : Ok(())
1215 528 : }
1216 :
1217 : /// Put a new page version that can be constructed from a WAL record
1218 : ///
1219 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1220 : /// current end-of-file. It's up to the caller to check that the relation size
1221 : /// matches the blocks inserted!
1222 436890 : pub fn put_rel_wal_record(
1223 436890 : &mut self,
1224 436890 : rel: RelTag,
1225 436890 : blknum: BlockNumber,
1226 436890 : rec: NeonWalRecord,
1227 436890 : ) -> anyhow::Result<()> {
1228 436890 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1229 436890 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1230 436890 : Ok(())
1231 436890 : }
1232 :
1233 : // Same, but for an SLRU.
1234 24 : pub fn put_slru_wal_record(
1235 24 : &mut self,
1236 24 : kind: SlruKind,
1237 24 : segno: u32,
1238 24 : blknum: BlockNumber,
1239 24 : rec: NeonWalRecord,
1240 24 : ) -> anyhow::Result<()> {
1241 24 : self.put(
1242 24 : slru_block_to_key(kind, segno, blknum),
1243 24 : Value::WalRecord(rec),
1244 24 : );
1245 24 : Ok(())
1246 24 : }
1247 :
1248 : /// Like put_wal_record, but with ready-made image of the page.
1249 833598 : pub fn put_rel_page_image(
1250 833598 : &mut self,
1251 833598 : rel: RelTag,
1252 833598 : blknum: BlockNumber,
1253 833598 : img: Bytes,
1254 833598 : ) -> anyhow::Result<()> {
1255 833598 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1256 833598 : let key = rel_block_to_key(rel, blknum);
1257 833598 : if !key.is_valid_key_on_write_path() {
1258 0 : anyhow::bail!(
1259 0 : "the request contains data not supported by pageserver at {}",
1260 0 : key
1261 0 : );
1262 833598 : }
1263 833598 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1264 833598 : Ok(())
1265 833598 : }
1266 :
1267 18 : pub fn put_slru_page_image(
1268 18 : &mut self,
1269 18 : kind: SlruKind,
1270 18 : segno: u32,
1271 18 : blknum: BlockNumber,
1272 18 : img: Bytes,
1273 18 : ) -> anyhow::Result<()> {
1274 18 : let key = slru_block_to_key(kind, segno, blknum);
1275 18 : if !key.is_valid_key_on_write_path() {
1276 0 : anyhow::bail!(
1277 0 : "the request contains data not supported by pageserver at {}",
1278 0 : key
1279 0 : );
1280 18 : }
1281 18 : self.put(key, Value::Image(img));
1282 18 : Ok(())
1283 18 : }
1284 :
1285 8994 : pub(crate) fn put_rel_page_image_zero(
1286 8994 : &mut self,
1287 8994 : rel: RelTag,
1288 8994 : blknum: BlockNumber,
1289 8994 : ) -> anyhow::Result<()> {
1290 8994 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1291 8994 : let key = rel_block_to_key(rel, blknum);
1292 8994 : if !key.is_valid_key_on_write_path() {
1293 0 : anyhow::bail!(
1294 0 : "the request contains data not supported by pageserver: {} @ {}",
1295 0 : key,
1296 0 : self.lsn
1297 0 : );
1298 8994 : }
1299 8994 : self.pending_zero_data_pages.insert(key.to_compact());
1300 8994 : self.pending_bytes += ZERO_PAGE.len();
1301 8994 : Ok(())
1302 8994 : }
1303 :
1304 0 : pub(crate) fn put_slru_page_image_zero(
1305 0 : &mut self,
1306 0 : kind: SlruKind,
1307 0 : segno: u32,
1308 0 : blknum: BlockNumber,
1309 0 : ) -> anyhow::Result<()> {
1310 0 : let key = slru_block_to_key(kind, segno, blknum);
1311 0 : if !key.is_valid_key_on_write_path() {
1312 0 : anyhow::bail!(
1313 0 : "the request contains data not supported by pageserver: {} @ {}",
1314 0 : key,
1315 0 : self.lsn
1316 0 : );
1317 0 : }
1318 0 : self.pending_zero_data_pages.insert(key.to_compact());
1319 0 : self.pending_bytes += ZERO_PAGE.len();
1320 0 : Ok(())
1321 0 : }
1322 :
1323 : /// Call this at the end of each WAL record.
1324 437592 : pub(crate) fn on_record_end(&mut self) {
1325 437592 : let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages);
1326 446586 : for key in pending_zero_data_pages {
1327 8994 : self.put_data(key, Value::Image(ZERO_PAGE.clone()));
1328 8994 : }
1329 437592 : }
1330 :
1331 : /// Store a relmapper file (pg_filenode.map) in the repository
1332 48 : pub async fn put_relmap_file(
1333 48 : &mut self,
1334 48 : spcnode: Oid,
1335 48 : dbnode: Oid,
1336 48 : img: Bytes,
1337 48 : ctx: &RequestContext,
1338 48 : ) -> anyhow::Result<()> {
1339 : // Add it to the directory (if it doesn't exist already)
1340 48 : let buf = self.get(DBDIR_KEY, ctx).await?;
1341 48 : let mut dbdir = DbDirectory::des(&buf)?;
1342 :
1343 48 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1344 48 : if r.is_none() || r == Some(false) {
1345 : // The dbdir entry didn't exist, or it contained a
1346 : // 'false'. The 'insert' call already updated it with
1347 : // 'true', now write the updated 'dbdirs' map back.
1348 48 : let buf = DbDirectory::ser(&dbdir)?;
1349 48 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1350 48 :
1351 48 : // Create AuxFilesDirectory as well
1352 48 : self.init_aux_dir()?;
1353 0 : }
1354 48 : if r.is_none() {
1355 : // Create RelDirectory
1356 24 : let buf = RelDirectory::ser(&RelDirectory {
1357 24 : rels: HashSet::new(),
1358 24 : })?;
1359 24 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1360 24 : self.put(
1361 24 : rel_dir_to_key(spcnode, dbnode),
1362 24 : Value::Image(Bytes::from(buf)),
1363 24 : );
1364 24 : }
1365 :
1366 48 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1367 48 : Ok(())
1368 48 : }
1369 :
1370 0 : pub async fn put_twophase_file(
1371 0 : &mut self,
1372 0 : xid: u64,
1373 0 : img: Bytes,
1374 0 : ctx: &RequestContext,
1375 0 : ) -> anyhow::Result<()> {
1376 : // Add it to the directory entry
1377 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1378 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1379 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1380 0 : if !dir.xids.insert(xid) {
1381 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1382 0 : }
1383 0 : self.pending_directory_entries
1384 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1385 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1386 : } else {
1387 0 : let xid = xid as u32;
1388 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1389 0 : if !dir.xids.insert(xid) {
1390 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1391 0 : }
1392 0 : self.pending_directory_entries
1393 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1394 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1395 : };
1396 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1397 0 :
1398 0 : self.put(twophase_file_key(xid), Value::Image(img));
1399 0 : Ok(())
1400 0 : }
1401 :
1402 0 : pub async fn set_replorigin(
1403 0 : &mut self,
1404 0 : origin_id: RepOriginId,
1405 0 : origin_lsn: Lsn,
1406 0 : ) -> anyhow::Result<()> {
1407 0 : let key = repl_origin_key(origin_id);
1408 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1409 0 : Ok(())
1410 0 : }
1411 :
1412 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1413 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1414 0 : }
1415 :
1416 534 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1417 534 : self.put(CONTROLFILE_KEY, Value::Image(img));
1418 534 : Ok(())
1419 534 : }
1420 :
1421 576 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1422 576 : self.put(CHECKPOINT_KEY, Value::Image(img));
1423 576 : Ok(())
1424 576 : }
1425 :
1426 0 : pub async fn drop_dbdir(
1427 0 : &mut self,
1428 0 : spcnode: Oid,
1429 0 : dbnode: Oid,
1430 0 : ctx: &RequestContext,
1431 0 : ) -> anyhow::Result<()> {
1432 0 : let total_blocks = self
1433 0 : .tline
1434 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1435 0 : .await?;
1436 :
1437 : // Remove entry from dbdir
1438 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1439 0 : let mut dir = DbDirectory::des(&buf)?;
1440 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1441 0 : let buf = DbDirectory::ser(&dir)?;
1442 0 : self.pending_directory_entries
1443 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1444 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1445 : } else {
1446 0 : warn!(
1447 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1448 : spcnode, dbnode
1449 : );
1450 : }
1451 :
1452 : // Update logical database size.
1453 0 : self.pending_nblocks -= total_blocks as i64;
1454 0 :
1455 0 : // Delete all relations and metadata files for the spcnode/dnode
1456 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1457 0 : Ok(())
1458 0 : }
1459 :
1460 : /// Create a relation fork.
1461 : ///
1462 : /// 'nblocks' is the initial size.
1463 5760 : pub async fn put_rel_creation(
1464 5760 : &mut self,
1465 5760 : rel: RelTag,
1466 5760 : nblocks: BlockNumber,
1467 5760 : ctx: &RequestContext,
1468 5760 : ) -> Result<(), RelationError> {
1469 5760 : if rel.relnode == 0 {
1470 0 : return Err(RelationError::InvalidRelnode);
1471 5760 : }
1472 : // It's possible that this is the first rel for this db in this
1473 : // tablespace. Create the reldir entry for it if so.
1474 5760 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1475 5760 : .context("deserialize db")?;
1476 5760 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1477 5760 : let mut rel_dir =
1478 5760 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1479 : // Didn't exist. Update dbdir
1480 24 : e.insert(false);
1481 24 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1482 24 : self.pending_directory_entries
1483 24 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1484 24 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1485 24 :
1486 24 : // and create the RelDirectory
1487 24 : RelDirectory::default()
1488 : } else {
1489 : // reldir already exists, fetch it
1490 5736 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1491 5736 : .context("deserialize db")?
1492 : };
1493 :
1494 : // Add the new relation to the rel directory entry, and write it back
1495 5760 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1496 0 : return Err(RelationError::AlreadyExists);
1497 5760 : }
1498 5760 :
1499 5760 : self.pending_directory_entries
1500 5760 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1501 5760 :
1502 5760 : self.put(
1503 5760 : rel_dir_key,
1504 5760 : Value::Image(Bytes::from(
1505 5760 : RelDirectory::ser(&rel_dir).context("serialize")?,
1506 : )),
1507 : );
1508 :
1509 : // Put size
1510 5760 : let size_key = rel_size_to_key(rel);
1511 5760 : let buf = nblocks.to_le_bytes();
1512 5760 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1513 5760 :
1514 5760 : self.pending_nblocks += nblocks as i64;
1515 5760 :
1516 5760 : // Update relation size cache
1517 5760 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1518 5760 :
1519 5760 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1520 5760 : // caller.
1521 5760 : Ok(())
1522 5760 : }
1523 :
1524 : /// Truncate relation
1525 18036 : pub async fn put_rel_truncation(
1526 18036 : &mut self,
1527 18036 : rel: RelTag,
1528 18036 : nblocks: BlockNumber,
1529 18036 : ctx: &RequestContext,
1530 18036 : ) -> anyhow::Result<()> {
1531 18036 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1532 18036 : if self
1533 18036 : .tline
1534 18036 : .get_rel_exists(rel, Version::Modified(self), ctx)
1535 0 : .await?
1536 : {
1537 18036 : let size_key = rel_size_to_key(rel);
1538 : // Fetch the old size first
1539 18036 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1540 18036 :
1541 18036 : // Update the entry with the new size.
1542 18036 : let buf = nblocks.to_le_bytes();
1543 18036 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1544 18036 :
1545 18036 : // Update relation size cache
1546 18036 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1547 18036 :
1548 18036 : // Update relation size cache
1549 18036 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1550 18036 :
1551 18036 : // Update logical database size.
1552 18036 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1553 0 : }
1554 18036 : Ok(())
1555 18036 : }
1556 :
1557 : /// Extend relation
1558 : /// If new size is smaller, do nothing.
1559 830040 : pub async fn put_rel_extend(
1560 830040 : &mut self,
1561 830040 : rel: RelTag,
1562 830040 : nblocks: BlockNumber,
1563 830040 : ctx: &RequestContext,
1564 830040 : ) -> anyhow::Result<()> {
1565 830040 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1566 :
1567 : // Put size
1568 830040 : let size_key = rel_size_to_key(rel);
1569 830040 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1570 830040 :
1571 830040 : // only extend relation here. never decrease the size
1572 830040 : if nblocks > old_size {
1573 824364 : let buf = nblocks.to_le_bytes();
1574 824364 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1575 824364 :
1576 824364 : // Update relation size cache
1577 824364 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1578 824364 :
1579 824364 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1580 824364 : }
1581 830040 : Ok(())
1582 830040 : }
1583 :
1584 : /// Drop a relation.
1585 6 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1586 6 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1587 :
1588 : // Remove it from the directory entry
1589 6 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1590 6 : let buf = self.get(dir_key, ctx).await?;
1591 6 : let mut dir = RelDirectory::des(&buf)?;
1592 :
1593 6 : self.pending_directory_entries
1594 6 : .push((DirectoryKind::Rel, dir.rels.len()));
1595 6 :
1596 6 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1597 6 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1598 : } else {
1599 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1600 : }
1601 :
1602 : // update logical size
1603 6 : let size_key = rel_size_to_key(rel);
1604 6 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1605 6 : self.pending_nblocks -= old_size as i64;
1606 6 :
1607 6 : // Remove enty from relation size cache
1608 6 : self.tline.remove_cached_rel_size(&rel);
1609 6 :
1610 6 : // Delete size entry, as well as all blocks
1611 6 : self.delete(rel_key_range(rel));
1612 6 :
1613 6 : Ok(())
1614 6 : }
1615 :
1616 18 : pub async fn put_slru_segment_creation(
1617 18 : &mut self,
1618 18 : kind: SlruKind,
1619 18 : segno: u32,
1620 18 : nblocks: BlockNumber,
1621 18 : ctx: &RequestContext,
1622 18 : ) -> anyhow::Result<()> {
1623 18 : // Add it to the directory entry
1624 18 : let dir_key = slru_dir_to_key(kind);
1625 18 : let buf = self.get(dir_key, ctx).await?;
1626 18 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1627 :
1628 18 : if !dir.segments.insert(segno) {
1629 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1630 18 : }
1631 18 : self.pending_directory_entries
1632 18 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1633 18 : self.put(
1634 18 : dir_key,
1635 18 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1636 : );
1637 :
1638 : // Put size
1639 18 : let size_key = slru_segment_size_to_key(kind, segno);
1640 18 : let buf = nblocks.to_le_bytes();
1641 18 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1642 18 :
1643 18 : // even if nblocks > 0, we don't insert any actual blocks here
1644 18 :
1645 18 : Ok(())
1646 18 : }
1647 :
1648 : /// Extend SLRU segment
1649 0 : pub fn put_slru_extend(
1650 0 : &mut self,
1651 0 : kind: SlruKind,
1652 0 : segno: u32,
1653 0 : nblocks: BlockNumber,
1654 0 : ) -> anyhow::Result<()> {
1655 0 : // Put size
1656 0 : let size_key = slru_segment_size_to_key(kind, segno);
1657 0 : let buf = nblocks.to_le_bytes();
1658 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1659 0 : Ok(())
1660 0 : }
1661 :
1662 : /// This method is used for marking truncated SLRU files
1663 0 : pub async fn drop_slru_segment(
1664 0 : &mut self,
1665 0 : kind: SlruKind,
1666 0 : segno: u32,
1667 0 : ctx: &RequestContext,
1668 0 : ) -> anyhow::Result<()> {
1669 0 : // Remove it from the directory entry
1670 0 : let dir_key = slru_dir_to_key(kind);
1671 0 : let buf = self.get(dir_key, ctx).await?;
1672 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1673 :
1674 0 : if !dir.segments.remove(&segno) {
1675 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1676 0 : }
1677 0 : self.pending_directory_entries
1678 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1679 0 : self.put(
1680 0 : dir_key,
1681 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1682 : );
1683 :
1684 : // Delete size entry, as well as all blocks
1685 0 : self.delete(slru_segment_key_range(kind, segno));
1686 0 :
1687 0 : Ok(())
1688 0 : }
1689 :
1690 : /// Drop a relmapper file (pg_filenode.map)
1691 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1692 0 : // TODO
1693 0 : Ok(())
1694 0 : }
1695 :
1696 : /// This method is used for marking truncated SLRU files
1697 0 : pub async fn drop_twophase_file(
1698 0 : &mut self,
1699 0 : xid: u64,
1700 0 : ctx: &RequestContext,
1701 0 : ) -> anyhow::Result<()> {
1702 : // Remove it from the directory entry
1703 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1704 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1705 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
1706 :
1707 0 : if !dir.xids.remove(&xid) {
1708 0 : warn!("twophase file for xid {} does not exist", xid);
1709 0 : }
1710 0 : self.pending_directory_entries
1711 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1712 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1713 : } else {
1714 0 : let xid: u32 = u32::try_from(xid)?;
1715 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1716 :
1717 0 : if !dir.xids.remove(&xid) {
1718 0 : warn!("twophase file for xid {} does not exist", xid);
1719 0 : }
1720 0 : self.pending_directory_entries
1721 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1722 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1723 : };
1724 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1725 0 :
1726 0 : // Delete it
1727 0 : self.delete(twophase_key_range(xid));
1728 0 :
1729 0 : Ok(())
1730 0 : }
1731 :
1732 582 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1733 582 : if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
1734 570 : return Ok(());
1735 12 : }
1736 12 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1737 12 : files: HashMap::new(),
1738 12 : })?;
1739 12 : self.pending_directory_entries
1740 12 : .push((DirectoryKind::AuxFiles, 0));
1741 12 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1742 12 : Ok(())
1743 582 : }
1744 :
1745 90 : pub async fn put_file(
1746 90 : &mut self,
1747 90 : path: &str,
1748 90 : content: &[u8],
1749 90 : ctx: &RequestContext,
1750 90 : ) -> anyhow::Result<()> {
1751 90 : let switch_policy = self.tline.get_switch_aux_file_policy();
1752 :
1753 90 : let policy = {
1754 90 : let current_policy = self.tline.last_aux_file_policy.load();
1755 : // Allowed switch path:
1756 : // * no aux files -> v1/v2/cross-validation
1757 : // * cross-validation->v2
1758 :
1759 90 : let current_policy = if current_policy.is_none() {
1760 : // This path will only be hit once per tenant: we will decide the final policy in this code block.
1761 : // The next call to `put_file` will always have `last_aux_file_policy != None`.
1762 36 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1763 36 : let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
1764 36 : if aux_files_key_v1.is_empty() {
1765 30 : None
1766 : } else {
1767 6 : warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)");
1768 6 : self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
1769 6 : Some(AuxFilePolicy::V1)
1770 : }
1771 : } else {
1772 54 : current_policy
1773 : };
1774 :
1775 90 : if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
1776 36 : self.tline.do_switch_aux_policy(switch_policy)?;
1777 36 : info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
1778 36 : switch_policy
1779 : } else {
1780 : // This branch handles non-valid migration path, and the case that switch_policy == current_policy.
1781 : // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
1782 54 : current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
1783 : }
1784 : };
1785 :
1786 90 : if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
1787 78 : let key = aux_file::encode_aux_file_key(path);
1788 : // retrieve the key from the engine
1789 78 : let old_val = match self.get(key, ctx).await {
1790 18 : Ok(val) => Some(val),
1791 60 : Err(PageReconstructError::MissingKey(_)) => None,
1792 0 : Err(e) => return Err(e.into()),
1793 : };
1794 78 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
1795 18 : aux_file::decode_file_value(old_val)?
1796 : } else {
1797 60 : Vec::new()
1798 : };
1799 78 : let mut other_files = Vec::with_capacity(files.len());
1800 78 : let mut modifying_file = None;
1801 96 : for file @ (p, content) in files {
1802 18 : if path == p {
1803 18 : assert!(
1804 18 : modifying_file.is_none(),
1805 0 : "duplicated entries found for {}",
1806 : path
1807 : );
1808 18 : modifying_file = Some(content);
1809 0 : } else {
1810 0 : other_files.push(file);
1811 0 : }
1812 : }
1813 78 : let mut new_files = other_files;
1814 78 : match (modifying_file, content.is_empty()) {
1815 12 : (Some(old_content), false) => {
1816 12 : self.tline
1817 12 : .aux_file_size_estimator
1818 12 : .on_update(old_content.len(), content.len());
1819 12 : new_files.push((path, content));
1820 12 : }
1821 6 : (Some(old_content), true) => {
1822 6 : self.tline
1823 6 : .aux_file_size_estimator
1824 6 : .on_remove(old_content.len());
1825 6 : // not adding the file key to the final `new_files` vec.
1826 6 : }
1827 60 : (None, false) => {
1828 60 : self.tline.aux_file_size_estimator.on_add(content.len());
1829 60 : new_files.push((path, content));
1830 60 : }
1831 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
1832 : }
1833 78 : let new_val = aux_file::encode_file_value(&new_files)?;
1834 78 : self.put(key, Value::Image(new_val.into()));
1835 12 : }
1836 :
1837 90 : if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
1838 18 : let file_path = path.to_string();
1839 18 : let content = if content.is_empty() {
1840 0 : None
1841 : } else {
1842 18 : Some(Bytes::copy_from_slice(content))
1843 : };
1844 :
1845 : let n_files;
1846 18 : let mut aux_files = self.tline.aux_files.lock().await;
1847 18 : if let Some(mut dir) = aux_files.dir.take() {
1848 : // We already updated aux files in `self`: emit a delta and update our latest value.
1849 0 : dir.upsert(file_path.clone(), content.clone());
1850 0 : n_files = dir.files.len();
1851 0 : if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
1852 0 : self.put(
1853 0 : AUX_FILES_KEY,
1854 0 : Value::Image(Bytes::from(
1855 0 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1856 : )),
1857 : );
1858 0 : aux_files.n_deltas = 0;
1859 0 : } else {
1860 0 : self.put(
1861 0 : AUX_FILES_KEY,
1862 0 : Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
1863 0 : );
1864 0 : aux_files.n_deltas += 1;
1865 0 : }
1866 0 : aux_files.dir = Some(dir);
1867 : } else {
1868 : // Check if the AUX_FILES_KEY is initialized
1869 18 : match self.get(AUX_FILES_KEY, ctx).await {
1870 18 : Ok(dir_bytes) => {
1871 18 : let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
1872 : // Key is already set, we may append a delta
1873 18 : self.put(
1874 18 : AUX_FILES_KEY,
1875 18 : Value::WalRecord(NeonWalRecord::AuxFile {
1876 18 : file_path: file_path.clone(),
1877 18 : content: content.clone(),
1878 18 : }),
1879 18 : );
1880 18 : dir.upsert(file_path, content);
1881 18 : n_files = dir.files.len();
1882 18 : aux_files.dir = Some(dir);
1883 : }
1884 : Err(
1885 0 : e @ (PageReconstructError::Cancelled
1886 0 : | PageReconstructError::AncestorLsnTimeout(_)),
1887 0 : ) => {
1888 0 : // Important that we do not interpret a shutdown error as "not found" and thereby
1889 0 : // reset the map.
1890 0 : return Err(e.into());
1891 : }
1892 : // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
1893 : // the original code assumes all other errors are missing keys. Therefore, we keep the code path
1894 : // the same for now, though in theory, we should only match the `MissingKey` variant.
1895 : Err(
1896 0 : e @ (PageReconstructError::Other(_)
1897 : | PageReconstructError::WalRedo(_)
1898 : | PageReconstructError::MissingKey(_)),
1899 : ) => {
1900 : // Key is missing, we must insert an image as the basis for subsequent deltas.
1901 :
1902 0 : if !matches!(e, PageReconstructError::MissingKey(_)) {
1903 0 : let e = utils::error::report_compact_sources(&e);
1904 0 : tracing::warn!("treating error as if it was a missing key: {}", e);
1905 0 : }
1906 :
1907 0 : let mut dir = AuxFilesDirectory {
1908 0 : files: HashMap::new(),
1909 0 : };
1910 0 : dir.upsert(file_path, content);
1911 0 : self.put(
1912 0 : AUX_FILES_KEY,
1913 0 : Value::Image(Bytes::from(
1914 0 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1915 : )),
1916 : );
1917 0 : n_files = 1;
1918 0 : aux_files.dir = Some(dir);
1919 : }
1920 : }
1921 : }
1922 :
1923 18 : self.pending_directory_entries
1924 18 : .push((DirectoryKind::AuxFiles, n_files));
1925 72 : }
1926 :
1927 90 : Ok(())
1928 90 : }
1929 :
1930 : ///
1931 : /// Flush changes accumulated so far to the underlying repository.
1932 : ///
1933 : /// Usually, changes made in DatadirModification are atomic, but this allows
1934 : /// you to flush them to the underlying repository before the final `commit`.
1935 : /// That allows to free up the memory used to hold the pending changes.
1936 : ///
1937 : /// Currently only used during bulk import of a data directory. In that
1938 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1939 : /// whole import fails and the timeline will be deleted anyway.
1940 : /// (Or to be precise, it will be left behind for debugging purposes and
1941 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1942 : ///
1943 : /// Note: A consequence of flushing the pending operations is that they
1944 : /// won't be visible to subsequent operations until `commit`. The function
1945 : /// retains all the metadata, but data pages are flushed. That's again OK
1946 : /// for bulk import, where you are just loading data pages and won't try to
1947 : /// modify the same pages twice.
1948 5790 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1949 5790 : // Unless we have accumulated a decent amount of changes, it's not worth it
1950 5790 : // to scan through the pending_updates list.
1951 5790 : let pending_nblocks = self.pending_nblocks;
1952 5790 : if pending_nblocks < 10000 {
1953 5790 : return Ok(());
1954 0 : }
1955 :
1956 0 : let mut writer = self.tline.writer().await;
1957 :
1958 : // Flush relation and SLRU data blocks, keep metadata.
1959 0 : let pending_data_pages = std::mem::take(&mut self.pending_data_pages);
1960 0 :
1961 0 : // This bails out on first error without modifying pending_updates.
1962 0 : // That's Ok, cf this function's doc comment.
1963 0 : writer.put_batch(pending_data_pages, ctx).await?;
1964 0 : self.pending_bytes = 0;
1965 0 :
1966 0 : if pending_nblocks != 0 {
1967 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1968 0 : self.pending_nblocks = 0;
1969 0 : }
1970 :
1971 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1972 0 : writer.update_directory_entries_count(kind, count as u64);
1973 0 : }
1974 :
1975 0 : Ok(())
1976 5790 : }
1977 :
1978 : ///
1979 : /// Finish this atomic update, writing all the updated keys to the
1980 : /// underlying timeline.
1981 : /// All the modifications in this atomic update are stamped by the specified LSN.
1982 : ///
1983 2229240 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1984 2229240 : // Commit should never be called mid-wal-record
1985 2229240 : assert!(self.pending_zero_data_pages.is_empty());
1986 :
1987 2229240 : let mut writer = self.tline.writer().await;
1988 :
1989 2229240 : let pending_nblocks = self.pending_nblocks;
1990 2229240 : self.pending_nblocks = 0;
1991 2229240 :
1992 2229240 : // Ordering: the items in this batch do not need to be in any global order, but values for
1993 2229240 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
1994 2229240 : // this to do efficient updates to its index.
1995 2229240 : let mut write_batch = std::mem::take(&mut self.pending_data_pages);
1996 2229240 :
1997 2229240 : write_batch.extend(
1998 2229240 : self.pending_metadata_pages
1999 2229240 : .drain()
2000 2229240 : .flat_map(|(key, values)| {
2001 821544 : values
2002 821544 : .into_iter()
2003 821544 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2004 2229240 : }),
2005 2229240 : );
2006 2229240 :
2007 2229240 : if !write_batch.is_empty() {
2008 1242192 : writer.put_batch(write_batch, ctx).await?;
2009 987048 : }
2010 :
2011 2229240 : if !self.pending_deletions.is_empty() {
2012 6 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2013 6 : self.pending_deletions.clear();
2014 2229234 : }
2015 :
2016 2229240 : self.pending_lsns.push(self.lsn);
2017 2666814 : for pending_lsn in self.pending_lsns.drain(..) {
2018 2666814 : // Ideally, we should be able to call writer.finish_write() only once
2019 2666814 : // with the highest LSN. However, the last_record_lsn variable in the
2020 2666814 : // timeline keeps track of the latest LSN and the immediate previous LSN
2021 2666814 : // so we need to record every LSN to not leave a gap between them.
2022 2666814 : writer.finish_write(pending_lsn);
2023 2666814 : }
2024 :
2025 2229240 : if pending_nblocks != 0 {
2026 811710 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2027 1417530 : }
2028 :
2029 2229240 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2030 8532 : writer.update_directory_entries_count(kind, count as u64);
2031 8532 : }
2032 :
2033 2229240 : self.pending_bytes = 0;
2034 2229240 :
2035 2229240 : Ok(())
2036 2229240 : }
2037 :
2038 875112 : pub(crate) fn len(&self) -> usize {
2039 875112 : self.pending_metadata_pages.len()
2040 875112 : + self.pending_data_pages.len()
2041 875112 : + self.pending_deletions.len()
2042 875112 : }
2043 :
2044 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2045 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2046 : /// in the modification.
2047 : ///
2048 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2049 : /// page must ensure that the pages they read are already committed in Timeline, for example
2050 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2051 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2052 : /// and not data pages.
2053 859806 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2054 859806 : if !Self::is_data_key(&key) {
2055 : // Have we already updated the same key? Read the latest pending updated
2056 : // version in that case.
2057 : //
2058 : // Note: we don't check pending_deletions. It is an error to request a
2059 : // value that has been removed, deletion only avoids leaking storage.
2060 859806 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2061 47784 : if let Some((_, _, value)) = values.last() {
2062 47784 : return if let Value::Image(img) = value {
2063 47784 : Ok(img.clone())
2064 : } else {
2065 : // Currently, we never need to read back a WAL record that we
2066 : // inserted in the same "transaction". All the metadata updates
2067 : // work directly with Images, and we never need to read actual
2068 : // data pages. We could handle this if we had to, by calling
2069 : // the walredo manager, but let's keep it simple for now.
2070 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2071 0 : "unexpected pending WAL record"
2072 0 : )))
2073 : };
2074 0 : }
2075 812022 : }
2076 : } else {
2077 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2078 : // this key should never be present in pending_data_pages. We ensure this by committing
2079 : // modifications before ingesting DB create operations, which are the only kind that reads
2080 : // data pages during ingest.
2081 0 : if cfg!(debug_assertions) {
2082 0 : for (dirty_key, _, _, _) in &self.pending_data_pages {
2083 0 : debug_assert!(&key.to_compact() != dirty_key);
2084 : }
2085 :
2086 0 : debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact()))
2087 0 : }
2088 : }
2089 :
2090 : // Metadata page cache miss, or we're reading a data page.
2091 812022 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2092 812022 : self.tline.get(key, lsn, ctx).await
2093 859806 : }
2094 :
2095 : /// Only used during unit tests, force putting a key into the modification.
2096 : #[cfg(test)]
2097 6 : pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
2098 6 : self.put(key, val);
2099 6 : }
2100 :
2101 2128530 : fn put(&mut self, key: Key, val: Value) {
2102 2128530 : if Self::is_data_key(&key) {
2103 1270530 : self.put_data(key.to_compact(), val)
2104 : } else {
2105 858000 : self.put_metadata(key.to_compact(), val)
2106 : }
2107 2128530 : }
2108 :
2109 1279524 : fn put_data(&mut self, key: CompactKey, val: Value) {
2110 1279524 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2111 1279524 :
2112 1279524 : // If this page was previously zero'd in the same WalRecord, then drop the previous zero page write. This
2113 1279524 : // is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend),
2114 1279524 : // and the subsequent postgres-originating write
2115 1279524 : if self.pending_zero_data_pages.remove(&key) {
2116 0 : self.pending_bytes -= ZERO_PAGE.len();
2117 1279524 : }
2118 :
2119 1279524 : self.pending_bytes += val_serialized_size;
2120 1279524 : self.pending_data_pages
2121 1279524 : .push((key, self.lsn, val_serialized_size, val))
2122 1279524 : }
2123 :
2124 858000 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2125 858000 : let values = self.pending_metadata_pages.entry(key).or_default();
2126 : // Replace the previous value if it exists at the same lsn
2127 858000 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2128 36456 : if *last_lsn == self.lsn {
2129 : // Update the pending_bytes contribution from this entry, and update the serialized size in place
2130 36456 : self.pending_bytes -= *last_value_ser_size;
2131 36456 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2132 36456 : self.pending_bytes += *last_value_ser_size;
2133 36456 :
2134 36456 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2135 36456 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2136 36456 : *last_value = val;
2137 36456 : return;
2138 0 : }
2139 821544 : }
2140 :
2141 821544 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2142 821544 : self.pending_bytes += val_serialized_size;
2143 821544 : values.push((self.lsn, val_serialized_size, val));
2144 858000 : }
2145 :
2146 6 : fn delete(&mut self, key_range: Range<Key>) {
2147 6 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2148 6 : self.pending_deletions.push((key_range, self.lsn));
2149 6 : }
2150 : }
2151 :
2152 : /// This struct facilitates accessing either a committed key from the timeline at a
2153 : /// specific LSN, or the latest uncommitted key from a pending modification.
2154 : ///
2155 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2156 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2157 : /// need to look up the keys in the modification first before looking them up in the
2158 : /// timeline to not miss the latest updates.
2159 : #[derive(Clone, Copy)]
2160 : pub enum Version<'a> {
2161 : Lsn(Lsn),
2162 : Modified(&'a DatadirModification<'a>),
2163 : }
2164 :
2165 : impl<'a> Version<'a> {
2166 70680 : async fn get(
2167 70680 : &self,
2168 70680 : timeline: &Timeline,
2169 70680 : key: Key,
2170 70680 : ctx: &RequestContext,
2171 70680 : ) -> Result<Bytes, PageReconstructError> {
2172 70680 : match self {
2173 70620 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2174 60 : Version::Modified(modification) => modification.get(key, ctx).await,
2175 : }
2176 70680 : }
2177 :
2178 106860 : fn get_lsn(&self) -> Lsn {
2179 106860 : match self {
2180 88722 : Version::Lsn(lsn) => *lsn,
2181 18138 : Version::Modified(modification) => modification.lsn,
2182 : }
2183 106860 : }
2184 : }
2185 :
2186 : //--- Metadata structs stored in key-value pairs in the repository.
2187 :
2188 6738 : #[derive(Debug, Serialize, Deserialize)]
2189 : struct DbDirectory {
2190 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2191 : dbdirs: HashMap<(Oid, Oid), bool>,
2192 : }
2193 :
2194 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2195 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2196 : // were named like "pg_twophase/000002E5", now they're like
2197 : // "pg_twophsae/0000000A000002E4".
2198 :
2199 882 : #[derive(Debug, Serialize, Deserialize)]
2200 : struct TwoPhaseDirectory {
2201 : xids: HashSet<TransactionId>,
2202 : }
2203 :
2204 0 : #[derive(Debug, Serialize, Deserialize)]
2205 : struct TwoPhaseDirectoryV17 {
2206 : xids: HashSet<u64>,
2207 : }
2208 :
2209 5796 : #[derive(Debug, Serialize, Deserialize, Default)]
2210 : struct RelDirectory {
2211 : // Set of relations that exist. (relfilenode, forknum)
2212 : //
2213 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2214 : // key-value pairs, if you have a lot of relations
2215 : rels: HashSet<(Oid, u8)>,
2216 : }
2217 :
2218 84 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
2219 : pub(crate) struct AuxFilesDirectory {
2220 : pub(crate) files: HashMap<String, Bytes>,
2221 : }
2222 :
2223 : impl AuxFilesDirectory {
2224 48 : pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
2225 48 : if let Some(value) = value {
2226 42 : self.files.insert(key, value);
2227 42 : } else {
2228 6 : self.files.remove(&key);
2229 6 : }
2230 48 : }
2231 : }
2232 :
2233 0 : #[derive(Debug, Serialize, Deserialize)]
2234 : struct RelSizeEntry {
2235 : nblocks: u32,
2236 : }
2237 :
2238 2646 : #[derive(Debug, Serialize, Deserialize, Default)]
2239 : struct SlruSegmentDirectory {
2240 : // Set of SLRU segments that exist.
2241 : segments: HashSet<u32>,
2242 : }
2243 :
2244 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2245 : #[repr(u8)]
2246 : pub(crate) enum DirectoryKind {
2247 : Db,
2248 : TwoPhase,
2249 : Rel,
2250 : AuxFiles,
2251 : SlruSegment(SlruKind),
2252 : }
2253 :
2254 : impl DirectoryKind {
2255 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2256 17064 : pub(crate) fn offset(&self) -> usize {
2257 17064 : self.into_usize()
2258 17064 : }
2259 : }
2260 :
2261 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2262 :
2263 : #[allow(clippy::bool_assert_comparison)]
2264 : #[cfg(test)]
2265 : mod tests {
2266 : use hex_literal::hex;
2267 : use utils::id::TimelineId;
2268 :
2269 : use super::*;
2270 :
2271 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
2272 :
2273 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2274 : #[tokio::test]
2275 6 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2276 6 : let name = "aux_files_round_trip";
2277 6 : let harness = TenantHarness::create(name).await?;
2278 6 :
2279 6 : pub const TIMELINE_ID: TimelineId =
2280 6 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2281 6 :
2282 24 : let (tenant, ctx) = harness.load().await;
2283 6 : let tline = tenant
2284 6 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2285 6 : .await?;
2286 6 : let tline = tline.raw_timeline().unwrap();
2287 6 :
2288 6 : // First modification: insert two keys
2289 6 : let mut modification = tline.begin_modification(Lsn(0x1000));
2290 6 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2291 6 : modification.set_lsn(Lsn(0x1008))?;
2292 6 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2293 6 : modification.commit(&ctx).await?;
2294 6 : let expect_1008 = HashMap::from([
2295 6 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2296 6 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2297 6 : ]);
2298 6 :
2299 6 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2300 6 : assert_eq!(readback, expect_1008);
2301 6 :
2302 6 : // Second modification: update one key, remove the other
2303 6 : let mut modification = tline.begin_modification(Lsn(0x2000));
2304 6 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2305 6 : modification.set_lsn(Lsn(0x2008))?;
2306 6 : modification.put_file("foo/bar2", b"", &ctx).await?;
2307 6 : modification.commit(&ctx).await?;
2308 6 : let expect_2008 =
2309 6 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2310 6 :
2311 6 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
2312 6 : assert_eq!(readback, expect_2008);
2313 6 :
2314 6 : // Reading back in time works
2315 6 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2316 6 : assert_eq!(readback, expect_1008);
2317 6 :
2318 6 : Ok(())
2319 6 : }
2320 :
2321 : /*
2322 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2323 : let incremental = timeline.get_current_logical_size();
2324 : let non_incremental = timeline
2325 : .get_current_logical_size_non_incremental(lsn)
2326 : .unwrap();
2327 : assert_eq!(incremental, non_incremental);
2328 : }
2329 : */
2330 :
2331 : /*
2332 : ///
2333 : /// Test list_rels() function, with branches and dropped relations
2334 : ///
2335 : #[test]
2336 : fn test_list_rels_drop() -> Result<()> {
2337 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2338 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2339 : const TESTDB: u32 = 111;
2340 :
2341 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2342 : // after branching fails below
2343 : let mut writer = tline.begin_record(Lsn(0x10));
2344 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2345 : writer.finish()?;
2346 :
2347 : // Create a relation on the timeline
2348 : let mut writer = tline.begin_record(Lsn(0x20));
2349 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2350 : writer.finish()?;
2351 :
2352 : let writer = tline.begin_record(Lsn(0x00));
2353 : writer.finish()?;
2354 :
2355 : // Check that list_rels() lists it after LSN 2, but no before it
2356 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2357 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2358 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2359 :
2360 : // Create a branch, check that the relation is visible there
2361 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2362 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2363 : Some(timeline) => timeline,
2364 : None => panic!("Should have a local timeline"),
2365 : };
2366 : let newtline = DatadirTimelineImpl::new(newtline);
2367 : assert!(newtline
2368 : .list_rels(0, TESTDB, Lsn(0x30))?
2369 : .contains(&TESTREL_A));
2370 :
2371 : // Drop it on the branch
2372 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2373 : new_writer.drop_relation(TESTREL_A)?;
2374 : new_writer.finish()?;
2375 :
2376 : // Check that it's no longer listed on the branch after the point where it was dropped
2377 : assert!(newtline
2378 : .list_rels(0, TESTDB, Lsn(0x30))?
2379 : .contains(&TESTREL_A));
2380 : assert!(!newtline
2381 : .list_rels(0, TESTDB, Lsn(0x40))?
2382 : .contains(&TESTREL_A));
2383 :
2384 : // Run checkpoint and garbage collection and check that it's still not visible
2385 : newtline.checkpoint(CheckpointConfig::Forced)?;
2386 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2387 :
2388 : assert!(!newtline
2389 : .list_rels(0, TESTDB, Lsn(0x40))?
2390 : .contains(&TESTREL_A));
2391 :
2392 : Ok(())
2393 : }
2394 : */
2395 :
2396 : /*
2397 : #[test]
2398 : fn test_read_beyond_eof() -> Result<()> {
2399 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2400 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2401 :
2402 : make_some_layers(&tline, Lsn(0x20))?;
2403 : let mut writer = tline.begin_record(Lsn(0x60));
2404 : walingest.put_rel_page_image(
2405 : &mut writer,
2406 : TESTREL_A,
2407 : 0,
2408 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2409 : )?;
2410 : writer.finish()?;
2411 :
2412 : // Test read before rel creation. Should error out.
2413 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2414 :
2415 : // Read block beyond end of relation at different points in time.
2416 : // These reads should fall into different delta, image, and in-memory layers.
2417 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2418 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2419 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2420 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2421 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2422 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2423 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2424 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2425 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2426 :
2427 : // Test on an in-memory layer with no preceding layer
2428 : let mut writer = tline.begin_record(Lsn(0x70));
2429 : walingest.put_rel_page_image(
2430 : &mut writer,
2431 : TESTREL_B,
2432 : 0,
2433 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2434 : )?;
2435 : writer.finish()?;
2436 :
2437 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2438 :
2439 : Ok(())
2440 : }
2441 : */
2442 : }
|