Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
13 : use crate::walrecord::NeonWalRecord;
14 : use crate::{aux_file, repository::*};
15 : use anyhow::{ensure, Context};
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use pageserver_api::key::{
19 : dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
20 : relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
21 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
22 : CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
23 : };
24 : use pageserver_api::keyspace::SparseKeySpace;
25 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
26 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
27 : use postgres_ffi::BLCKSZ;
28 : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
29 : use serde::{Deserialize, Serialize};
30 : use std::collections::{hash_map, HashMap, HashSet};
31 : use std::ops::ControlFlow;
32 : use std::ops::Range;
33 : use strum::IntoEnumIterator;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::{debug, trace, warn};
36 : use utils::bin_ser::DeserializeError;
37 : use utils::pausable_failpoint;
38 : use utils::{bin_ser::BeSer, lsn::Lsn};
39 :
40 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
41 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
42 :
43 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
44 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
45 :
46 : #[derive(Debug)]
47 : pub enum LsnForTimestamp {
48 : /// Found commits both before and after the given timestamp
49 : Present(Lsn),
50 :
51 : /// Found no commits after the given timestamp, this means
52 : /// that the newest data in the branch is older than the given
53 : /// timestamp.
54 : ///
55 : /// All commits <= LSN happened before the given timestamp
56 : Future(Lsn),
57 :
58 : /// The queried timestamp is past our horizon we look back at (PITR)
59 : ///
60 : /// All commits > LSN happened after the given timestamp,
61 : /// but any commits < LSN might have happened before or after
62 : /// the given timestamp. We don't know because no data before
63 : /// the given lsn is available.
64 : Past(Lsn),
65 :
66 : /// We have found no commit with a timestamp,
67 : /// so we can't return anything meaningful.
68 : ///
69 : /// The associated LSN is the lower bound value we can safely
70 : /// create branches on, but no statement is made if it is
71 : /// older or newer than the timestamp.
72 : ///
73 : /// This variant can e.g. be returned right after a
74 : /// cluster import.
75 : NoData(Lsn),
76 : }
77 :
78 0 : #[derive(Debug, thiserror::Error)]
79 : pub(crate) enum CalculateLogicalSizeError {
80 : #[error("cancelled")]
81 : Cancelled,
82 :
83 : /// Something went wrong while reading the metadata we use to calculate logical size
84 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
85 : /// in the `From` implementation for this variant.
86 : #[error(transparent)]
87 : PageRead(PageReconstructError),
88 :
89 : /// Something went wrong deserializing metadata that we read to calculate logical size
90 : #[error("decode error: {0}")]
91 : Decode(#[from] DeserializeError),
92 : }
93 :
94 0 : #[derive(Debug, thiserror::Error)]
95 : pub(crate) enum CollectKeySpaceError {
96 : #[error(transparent)]
97 : Decode(#[from] DeserializeError),
98 : #[error(transparent)]
99 : PageRead(PageReconstructError),
100 : #[error("cancelled")]
101 : Cancelled,
102 : }
103 :
104 : impl From<PageReconstructError> for CollectKeySpaceError {
105 0 : fn from(err: PageReconstructError) -> Self {
106 0 : match err {
107 0 : PageReconstructError::Cancelled => Self::Cancelled,
108 0 : err => Self::PageRead(err),
109 : }
110 0 : }
111 : }
112 :
113 : impl From<PageReconstructError> for CalculateLogicalSizeError {
114 0 : fn from(pre: PageReconstructError) -> Self {
115 0 : match pre {
116 0 : PageReconstructError::Cancelled => Self::Cancelled,
117 0 : _ => Self::PageRead(pre),
118 : }
119 0 : }
120 : }
121 :
122 0 : #[derive(Debug, thiserror::Error)]
123 : pub enum RelationError {
124 : #[error("Relation Already Exists")]
125 : AlreadyExists,
126 : #[error("invalid relnode")]
127 : InvalidRelnode,
128 : #[error(transparent)]
129 : Other(#[from] anyhow::Error),
130 : }
131 :
132 : ///
133 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
134 : /// and other special kinds of files, in a versioned key-value store. The
135 : /// Timeline struct provides the key-value store.
136 : ///
137 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
138 : /// implementation, and might be moved into a separate struct later.
139 : impl Timeline {
140 : /// Start ingesting a WAL record, or other atomic modification of
141 : /// the timeline.
142 : ///
143 : /// This provides a transaction-like interface to perform a bunch
144 : /// of modifications atomically.
145 : ///
146 : /// To ingest a WAL record, call begin_modification(lsn) to get a
147 : /// DatadirModification object. Use the functions in the object to
148 : /// modify the repository state, updating all the pages and metadata
149 : /// that the WAL record affects. When you're done, call commit() to
150 : /// commit the changes.
151 : ///
152 : /// Lsn stored in modification is advanced by `ingest_record` and
153 : /// is used by `commit()` to update `last_record_lsn`.
154 : ///
155 : /// Calling commit() will flush all the changes and reset the state,
156 : /// so the `DatadirModification` struct can be reused to perform the next modification.
157 : ///
158 : /// Note that any pending modifications you make through the
159 : /// modification object won't be visible to calls to the 'get' and list
160 : /// functions of the timeline until you finish! And if you update the
161 : /// same page twice, the last update wins.
162 : ///
163 268374 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
164 268374 : where
165 268374 : Self: Sized,
166 268374 : {
167 268374 : DatadirModification {
168 268374 : tline: self,
169 268374 : pending_lsns: Vec::new(),
170 268374 : pending_metadata_pages: HashMap::new(),
171 268374 : pending_data_pages: Vec::new(),
172 268374 : pending_zero_data_pages: Default::default(),
173 268374 : pending_deletions: Vec::new(),
174 268374 : pending_nblocks: 0,
175 268374 : pending_directory_entries: Vec::new(),
176 268374 : pending_bytes: 0,
177 268374 : lsn,
178 268374 : }
179 268374 : }
180 :
181 : //------------------------------------------------------------------------------
182 : // Public GET functions
183 : //------------------------------------------------------------------------------
184 :
185 : /// Look up given page version.
186 18384 : pub(crate) async fn get_rel_page_at_lsn(
187 18384 : &self,
188 18384 : tag: RelTag,
189 18384 : blknum: BlockNumber,
190 18384 : version: Version<'_>,
191 18384 : ctx: &RequestContext,
192 18384 : ) -> Result<Bytes, PageReconstructError> {
193 18384 : if tag.relnode == 0 {
194 0 : return Err(PageReconstructError::Other(
195 0 : RelationError::InvalidRelnode.into(),
196 0 : ));
197 18384 : }
198 :
199 18384 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
200 18384 : if blknum >= nblocks {
201 0 : debug!(
202 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
203 0 : tag,
204 0 : blknum,
205 0 : version.get_lsn(),
206 : nblocks
207 : );
208 0 : return Ok(ZERO_PAGE.clone());
209 18384 : }
210 18384 :
211 18384 : let key = rel_block_to_key(tag, blknum);
212 18384 : version.get(self, key, ctx).await
213 18384 : }
214 :
215 : // Get size of a database in blocks
216 0 : pub(crate) async fn get_db_size(
217 0 : &self,
218 0 : spcnode: Oid,
219 0 : dbnode: Oid,
220 0 : version: Version<'_>,
221 0 : ctx: &RequestContext,
222 0 : ) -> Result<usize, PageReconstructError> {
223 0 : let mut total_blocks = 0;
224 :
225 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
226 :
227 0 : for rel in rels {
228 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
229 0 : total_blocks += n_blocks as usize;
230 : }
231 0 : Ok(total_blocks)
232 0 : }
233 :
234 : /// Get size of a relation file
235 24434 : pub(crate) async fn get_rel_size(
236 24434 : &self,
237 24434 : tag: RelTag,
238 24434 : version: Version<'_>,
239 24434 : ctx: &RequestContext,
240 24434 : ) -> Result<BlockNumber, PageReconstructError> {
241 24434 : if tag.relnode == 0 {
242 0 : return Err(PageReconstructError::Other(
243 0 : RelationError::InvalidRelnode.into(),
244 0 : ));
245 24434 : }
246 :
247 24434 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
248 19294 : return Ok(nblocks);
249 5140 : }
250 5140 :
251 5140 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
252 0 : && !self.get_rel_exists(tag, version, ctx).await?
253 : {
254 : // FIXME: Postgres sometimes calls smgrcreate() to create
255 : // FSM, and smgrnblocks() on it immediately afterwards,
256 : // without extending it. Tolerate that by claiming that
257 : // any non-existent FSM fork has size 0.
258 0 : return Ok(0);
259 5140 : }
260 5140 :
261 5140 : let key = rel_size_to_key(tag);
262 5140 : let mut buf = version.get(self, key, ctx).await?;
263 5136 : let nblocks = buf.get_u32_le();
264 5136 :
265 5136 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
266 5136 :
267 5136 : Ok(nblocks)
268 24434 : }
269 :
270 : /// Does relation exist?
271 6050 : pub(crate) async fn get_rel_exists(
272 6050 : &self,
273 6050 : tag: RelTag,
274 6050 : version: Version<'_>,
275 6050 : ctx: &RequestContext,
276 6050 : ) -> Result<bool, PageReconstructError> {
277 6050 : if tag.relnode == 0 {
278 0 : return Err(PageReconstructError::Other(
279 0 : RelationError::InvalidRelnode.into(),
280 0 : ));
281 6050 : }
282 :
283 : // first try to lookup relation in cache
284 6050 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
285 6032 : return Ok(true);
286 18 : }
287 : // then check if the database was already initialized.
288 : // get_rel_exists can be called before dbdir is created.
289 18 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
290 18 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
291 18 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
292 0 : return Ok(false);
293 18 : }
294 18 : // fetch directory listing
295 18 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
296 18 : let buf = version.get(self, key, ctx).await?;
297 :
298 18 : let dir = RelDirectory::des(&buf)?;
299 18 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
300 6050 : }
301 :
302 : /// Get a list of all existing relations in given tablespace and database.
303 : ///
304 : /// # Cancel-Safety
305 : ///
306 : /// This method is cancellation-safe.
307 0 : pub(crate) async fn list_rels(
308 0 : &self,
309 0 : spcnode: Oid,
310 0 : dbnode: Oid,
311 0 : version: Version<'_>,
312 0 : ctx: &RequestContext,
313 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
314 0 : // fetch directory listing
315 0 : let key = rel_dir_to_key(spcnode, dbnode);
316 0 : let buf = version.get(self, key, ctx).await?;
317 :
318 0 : let dir = RelDirectory::des(&buf)?;
319 0 : let rels: HashSet<RelTag> =
320 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
321 0 : spcnode,
322 0 : dbnode,
323 0 : relnode: *relnode,
324 0 : forknum: *forknum,
325 0 : }));
326 0 :
327 0 : Ok(rels)
328 0 : }
329 :
330 : /// Get the whole SLRU segment
331 0 : pub(crate) async fn get_slru_segment(
332 0 : &self,
333 0 : kind: SlruKind,
334 0 : segno: u32,
335 0 : lsn: Lsn,
336 0 : ctx: &RequestContext,
337 0 : ) -> Result<Bytes, PageReconstructError> {
338 0 : let n_blocks = self
339 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
340 0 : .await?;
341 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
342 0 : for blkno in 0..n_blocks {
343 0 : let block = self
344 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
345 0 : .await?;
346 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
347 : }
348 0 : Ok(segment.freeze())
349 0 : }
350 :
351 : /// Look up given SLRU page version.
352 0 : pub(crate) async fn get_slru_page_at_lsn(
353 0 : &self,
354 0 : kind: SlruKind,
355 0 : segno: u32,
356 0 : blknum: BlockNumber,
357 0 : lsn: Lsn,
358 0 : ctx: &RequestContext,
359 0 : ) -> Result<Bytes, PageReconstructError> {
360 0 : let key = slru_block_to_key(kind, segno, blknum);
361 0 : self.get(key, lsn, ctx).await
362 0 : }
363 :
364 : /// Get size of an SLRU segment
365 0 : pub(crate) async fn get_slru_segment_size(
366 0 : &self,
367 0 : kind: SlruKind,
368 0 : segno: u32,
369 0 : version: Version<'_>,
370 0 : ctx: &RequestContext,
371 0 : ) -> Result<BlockNumber, PageReconstructError> {
372 0 : let key = slru_segment_size_to_key(kind, segno);
373 0 : let mut buf = version.get(self, key, ctx).await?;
374 0 : Ok(buf.get_u32_le())
375 0 : }
376 :
377 : /// Get size of an SLRU segment
378 0 : pub(crate) async fn get_slru_segment_exists(
379 0 : &self,
380 0 : kind: SlruKind,
381 0 : segno: u32,
382 0 : version: Version<'_>,
383 0 : ctx: &RequestContext,
384 0 : ) -> Result<bool, PageReconstructError> {
385 0 : // fetch directory listing
386 0 : let key = slru_dir_to_key(kind);
387 0 : let buf = version.get(self, key, ctx).await?;
388 :
389 0 : let dir = SlruSegmentDirectory::des(&buf)?;
390 0 : Ok(dir.segments.contains(&segno))
391 0 : }
392 :
393 : /// Locate LSN, such that all transactions that committed before
394 : /// 'search_timestamp' are visible, but nothing newer is.
395 : ///
396 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
397 : /// so it's not well defined which LSN you get if there were multiple commits
398 : /// "in flight" at that point in time.
399 : ///
400 0 : pub(crate) async fn find_lsn_for_timestamp(
401 0 : &self,
402 0 : search_timestamp: TimestampTz,
403 0 : cancel: &CancellationToken,
404 0 : ctx: &RequestContext,
405 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
406 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
407 :
408 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
409 0 : // We use this method to figure out the branching LSN for the new branch, but the
410 0 : // GC cutoff could be before the branching point and we cannot create a new branch
411 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
412 0 : // on the safe side.
413 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
414 0 : let max_lsn = self.get_last_record_lsn();
415 0 :
416 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
417 0 : // LSN divided by 8.
418 0 : let mut low = min_lsn.0 / 8;
419 0 : let mut high = max_lsn.0 / 8 + 1;
420 0 :
421 0 : let mut found_smaller = false;
422 0 : let mut found_larger = false;
423 :
424 0 : while low < high {
425 0 : if cancel.is_cancelled() {
426 0 : return Err(PageReconstructError::Cancelled);
427 0 : }
428 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
429 0 : let mid = (high + low) / 2;
430 :
431 0 : let cmp = self
432 0 : .is_latest_commit_timestamp_ge_than(
433 0 : search_timestamp,
434 0 : Lsn(mid * 8),
435 0 : &mut found_smaller,
436 0 : &mut found_larger,
437 0 : ctx,
438 0 : )
439 0 : .await?;
440 :
441 0 : if cmp {
442 0 : high = mid;
443 0 : } else {
444 0 : low = mid + 1;
445 0 : }
446 : }
447 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
448 : // so the LSN of the last commit record before or at `search_timestamp`.
449 : // Remove one from `low` to get `t`.
450 : //
451 : // FIXME: it would be better to get the LSN of the previous commit.
452 : // Otherwise, if you restore to the returned LSN, the database will
453 : // include physical changes from later commits that will be marked
454 : // as aborted, and will need to be vacuumed away.
455 0 : let commit_lsn = Lsn((low - 1) * 8);
456 0 : match (found_smaller, found_larger) {
457 : (false, false) => {
458 : // This can happen if no commit records have been processed yet, e.g.
459 : // just after importing a cluster.
460 0 : Ok(LsnForTimestamp::NoData(min_lsn))
461 : }
462 : (false, true) => {
463 : // Didn't find any commit timestamps smaller than the request
464 0 : Ok(LsnForTimestamp::Past(min_lsn))
465 : }
466 0 : (true, _) if commit_lsn < min_lsn => {
467 0 : // the search above did set found_smaller to true but it never increased the lsn.
468 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
469 0 : // below the min_lsn. We should never do that.
470 0 : Ok(LsnForTimestamp::Past(min_lsn))
471 : }
472 : (true, false) => {
473 : // Only found commits with timestamps smaller than the request.
474 : // It's still a valid case for branch creation, return it.
475 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
476 : // case, anyway.
477 0 : Ok(LsnForTimestamp::Future(commit_lsn))
478 : }
479 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
480 : }
481 0 : }
482 :
483 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
484 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
485 : ///
486 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
487 : /// with a smaller/larger timestamp.
488 : ///
489 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
490 0 : &self,
491 0 : search_timestamp: TimestampTz,
492 0 : probe_lsn: Lsn,
493 0 : found_smaller: &mut bool,
494 0 : found_larger: &mut bool,
495 0 : ctx: &RequestContext,
496 0 : ) -> Result<bool, PageReconstructError> {
497 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
498 0 : if timestamp >= search_timestamp {
499 0 : *found_larger = true;
500 0 : return ControlFlow::Break(true);
501 0 : } else {
502 0 : *found_smaller = true;
503 0 : }
504 0 : ControlFlow::Continue(())
505 0 : })
506 0 : .await
507 0 : }
508 :
509 : /// Obtain the possible timestamp range for the given lsn.
510 : ///
511 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
512 0 : pub(crate) async fn get_timestamp_for_lsn(
513 0 : &self,
514 0 : probe_lsn: Lsn,
515 0 : ctx: &RequestContext,
516 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
517 0 : let mut max: Option<TimestampTz> = None;
518 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
519 0 : if let Some(max_prev) = max {
520 0 : max = Some(max_prev.max(timestamp));
521 0 : } else {
522 0 : max = Some(timestamp);
523 0 : }
524 0 : ControlFlow::Continue(())
525 0 : })
526 0 : .await?;
527 :
528 0 : Ok(max)
529 0 : }
530 :
531 : /// Runs the given function on all the timestamps for a given lsn
532 : ///
533 : /// The return value is either given by the closure, or set to the `Default`
534 : /// impl's output.
535 0 : async fn map_all_timestamps<T: Default>(
536 0 : &self,
537 0 : probe_lsn: Lsn,
538 0 : ctx: &RequestContext,
539 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
540 0 : ) -> Result<T, PageReconstructError> {
541 0 : for segno in self
542 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
543 0 : .await?
544 : {
545 0 : let nblocks = self
546 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
547 0 : .await?;
548 0 : for blknum in (0..nblocks).rev() {
549 0 : let clog_page = self
550 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
551 0 : .await?;
552 :
553 0 : if clog_page.len() == BLCKSZ as usize + 8 {
554 0 : let mut timestamp_bytes = [0u8; 8];
555 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
556 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
557 0 :
558 0 : match f(timestamp) {
559 0 : ControlFlow::Break(b) => return Ok(b),
560 0 : ControlFlow::Continue(()) => (),
561 : }
562 0 : }
563 : }
564 : }
565 0 : Ok(Default::default())
566 0 : }
567 :
568 0 : pub(crate) async fn get_slru_keyspace(
569 0 : &self,
570 0 : version: Version<'_>,
571 0 : ctx: &RequestContext,
572 0 : ) -> Result<KeySpace, PageReconstructError> {
573 0 : let mut accum = KeySpaceAccum::new();
574 :
575 0 : for kind in SlruKind::iter() {
576 0 : let mut segments: Vec<u32> = self
577 0 : .list_slru_segments(kind, version, ctx)
578 0 : .await?
579 0 : .into_iter()
580 0 : .collect();
581 0 : segments.sort_unstable();
582 :
583 0 : for seg in segments {
584 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
585 :
586 0 : accum.add_range(
587 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
588 0 : );
589 : }
590 : }
591 :
592 0 : Ok(accum.to_keyspace())
593 0 : }
594 :
595 : /// Get a list of SLRU segments
596 0 : pub(crate) async fn list_slru_segments(
597 0 : &self,
598 0 : kind: SlruKind,
599 0 : version: Version<'_>,
600 0 : ctx: &RequestContext,
601 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
602 0 : // fetch directory entry
603 0 : let key = slru_dir_to_key(kind);
604 :
605 0 : let buf = version.get(self, key, ctx).await?;
606 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
607 0 : }
608 :
609 0 : pub(crate) async fn get_relmap_file(
610 0 : &self,
611 0 : spcnode: Oid,
612 0 : dbnode: Oid,
613 0 : version: Version<'_>,
614 0 : ctx: &RequestContext,
615 0 : ) -> Result<Bytes, PageReconstructError> {
616 0 : let key = relmap_file_key(spcnode, dbnode);
617 :
618 0 : let buf = version.get(self, key, ctx).await?;
619 0 : Ok(buf)
620 0 : }
621 :
622 286 : pub(crate) async fn list_dbdirs(
623 286 : &self,
624 286 : lsn: Lsn,
625 286 : ctx: &RequestContext,
626 286 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
627 : // fetch directory entry
628 3087 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
629 :
630 286 : Ok(DbDirectory::des(&buf)?.dbdirs)
631 286 : }
632 :
633 0 : pub(crate) async fn get_twophase_file(
634 0 : &self,
635 0 : xid: u64,
636 0 : lsn: Lsn,
637 0 : ctx: &RequestContext,
638 0 : ) -> Result<Bytes, PageReconstructError> {
639 0 : let key = twophase_file_key(xid);
640 0 : let buf = self.get(key, lsn, ctx).await?;
641 0 : Ok(buf)
642 0 : }
643 :
644 288 : pub(crate) async fn list_twophase_files(
645 288 : &self,
646 288 : lsn: Lsn,
647 288 : ctx: &RequestContext,
648 288 : ) -> Result<HashSet<u64>, PageReconstructError> {
649 : // fetch directory entry
650 3120 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
651 :
652 288 : if self.pg_version >= 17 {
653 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
654 : } else {
655 288 : Ok(TwoPhaseDirectory::des(&buf)?
656 : .xids
657 288 : .iter()
658 288 : .map(|x| u64::from(*x))
659 288 : .collect())
660 : }
661 288 : }
662 :
663 0 : pub(crate) async fn get_control_file(
664 0 : &self,
665 0 : lsn: Lsn,
666 0 : ctx: &RequestContext,
667 0 : ) -> Result<Bytes, PageReconstructError> {
668 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
669 0 : }
670 :
671 12 : pub(crate) async fn get_checkpoint(
672 12 : &self,
673 12 : lsn: Lsn,
674 12 : ctx: &RequestContext,
675 12 : ) -> Result<Bytes, PageReconstructError> {
676 12 : self.get(CHECKPOINT_KEY, lsn, ctx).await
677 12 : }
678 :
679 12 : async fn list_aux_files_v2(
680 12 : &self,
681 12 : lsn: Lsn,
682 12 : ctx: &RequestContext,
683 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
684 12 : let kv = self
685 12 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
686 0 : .await?;
687 12 : let mut result = HashMap::new();
688 12 : let mut sz = 0;
689 30 : for (_, v) in kv {
690 18 : let v = v?;
691 18 : let v = aux_file::decode_file_value_bytes(&v)
692 18 : .context("value decode")
693 18 : .map_err(PageReconstructError::Other)?;
694 34 : for (fname, content) in v {
695 16 : sz += fname.len();
696 16 : sz += content.len();
697 16 : result.insert(fname, content);
698 16 : }
699 : }
700 12 : self.aux_file_size_estimator.on_initial(sz);
701 12 : Ok(result)
702 12 : }
703 :
704 0 : pub(crate) async fn trigger_aux_file_size_computation(
705 0 : &self,
706 0 : lsn: Lsn,
707 0 : ctx: &RequestContext,
708 0 : ) -> Result<(), PageReconstructError> {
709 0 : self.list_aux_files_v2(lsn, ctx).await?;
710 0 : Ok(())
711 0 : }
712 :
713 12 : pub(crate) async fn list_aux_files(
714 12 : &self,
715 12 : lsn: Lsn,
716 12 : ctx: &RequestContext,
717 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
718 12 : self.list_aux_files_v2(lsn, ctx).await
719 12 : }
720 :
721 0 : pub(crate) async fn get_replorigins(
722 0 : &self,
723 0 : lsn: Lsn,
724 0 : ctx: &RequestContext,
725 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
726 0 : let kv = self
727 0 : .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
728 0 : .await?;
729 0 : let mut result = HashMap::new();
730 0 : for (k, v) in kv {
731 0 : let v = v?;
732 0 : let origin_id = k.field6 as RepOriginId;
733 0 : let origin_lsn = Lsn::des(&v).unwrap();
734 0 : if origin_lsn != Lsn::INVALID {
735 0 : result.insert(origin_id, origin_lsn);
736 0 : }
737 : }
738 0 : Ok(result)
739 0 : }
740 :
741 : /// Does the same as get_current_logical_size but counted on demand.
742 : /// Used to initialize the logical size tracking on startup.
743 : ///
744 : /// Only relation blocks are counted currently. That excludes metadata,
745 : /// SLRUs, twophase files etc.
746 : ///
747 : /// # Cancel-Safety
748 : ///
749 : /// This method is cancellation-safe.
750 0 : pub(crate) async fn get_current_logical_size_non_incremental(
751 0 : &self,
752 0 : lsn: Lsn,
753 0 : ctx: &RequestContext,
754 0 : ) -> Result<u64, CalculateLogicalSizeError> {
755 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
756 :
757 : // Fetch list of database dirs and iterate them
758 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
759 0 : let dbdir = DbDirectory::des(&buf)?;
760 :
761 0 : let mut total_size: u64 = 0;
762 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
763 0 : for rel in self
764 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
765 0 : .await?
766 : {
767 0 : if self.cancel.is_cancelled() {
768 0 : return Err(CalculateLogicalSizeError::Cancelled);
769 0 : }
770 0 : let relsize_key = rel_size_to_key(rel);
771 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
772 0 : let relsize = buf.get_u32_le();
773 0 :
774 0 : total_size += relsize as u64;
775 : }
776 : }
777 0 : Ok(total_size * BLCKSZ as u64)
778 0 : }
779 :
780 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
781 : /// for gc-compaction.
782 : ///
783 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
784 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
785 : /// be kept only for a specific range of LSN.
786 : ///
787 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
788 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
789 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
790 : /// determine which keys to retain/drop for gc-compaction.
791 : ///
792 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
793 : /// to be retained for each of the branch LSN.
794 : ///
795 : /// The return value is (dense keyspace, sparse keyspace).
796 26 : pub(crate) async fn collect_gc_compaction_keyspace(
797 26 : &self,
798 26 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
799 26 : let metadata_key_begin = Key::metadata_key_range().start;
800 26 : let aux_v1_key = AUX_FILES_KEY;
801 26 : let dense_keyspace = KeySpace {
802 26 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
803 26 : };
804 26 : Ok((
805 26 : dense_keyspace,
806 26 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
807 26 : ))
808 26 : }
809 :
810 : ///
811 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
812 : /// Anything that's not listed maybe removed from the underlying storage (from
813 : /// that LSN forwards).
814 : ///
815 : /// The return value is (dense keyspace, sparse keyspace).
816 286 : pub(crate) async fn collect_keyspace(
817 286 : &self,
818 286 : lsn: Lsn,
819 286 : ctx: &RequestContext,
820 286 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
821 286 : // Iterate through key ranges, greedily packing them into partitions
822 286 : let mut result = KeySpaceAccum::new();
823 286 :
824 286 : // The dbdir metadata always exists
825 286 : result.add_key(DBDIR_KEY);
826 :
827 : // Fetch list of database dirs and iterate them
828 3087 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
829 286 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
830 286 :
831 286 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
832 286 : for ((spcnode, dbnode), has_relmap_file) in dbs {
833 0 : if has_relmap_file {
834 0 : result.add_key(relmap_file_key(spcnode, dbnode));
835 0 : }
836 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
837 :
838 0 : let mut rels: Vec<RelTag> = self
839 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
840 0 : .await?
841 0 : .into_iter()
842 0 : .collect();
843 0 : rels.sort_unstable();
844 0 : for rel in rels {
845 0 : let relsize_key = rel_size_to_key(rel);
846 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
847 0 : let relsize = buf.get_u32_le();
848 0 :
849 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
850 0 : result.add_key(relsize_key);
851 : }
852 : }
853 :
854 : // Iterate SLRUs next
855 858 : for kind in [
856 286 : SlruKind::Clog,
857 286 : SlruKind::MultiXactMembers,
858 286 : SlruKind::MultiXactOffsets,
859 : ] {
860 858 : let slrudir_key = slru_dir_to_key(kind);
861 858 : result.add_key(slrudir_key);
862 9543 : let buf = self.get(slrudir_key, lsn, ctx).await?;
863 858 : let dir = SlruSegmentDirectory::des(&buf)?;
864 858 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
865 858 : segments.sort_unstable();
866 858 : for segno in segments {
867 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
868 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
869 0 : let segsize = buf.get_u32_le();
870 0 :
871 0 : result.add_range(
872 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
873 0 : );
874 0 : result.add_key(segsize_key);
875 : }
876 : }
877 :
878 : // Then pg_twophase
879 286 : result.add_key(TWOPHASEDIR_KEY);
880 :
881 286 : let mut xids: Vec<u64> = self
882 286 : .list_twophase_files(lsn, ctx)
883 3120 : .await?
884 286 : .iter()
885 286 : .cloned()
886 286 : .collect();
887 286 : xids.sort_unstable();
888 286 : for xid in xids {
889 0 : result.add_key(twophase_file_key(xid));
890 0 : }
891 :
892 286 : result.add_key(CONTROLFILE_KEY);
893 286 : result.add_key(CHECKPOINT_KEY);
894 286 :
895 286 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
896 286 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
897 286 : // and the keys will not be garbage-colllected.
898 286 : #[cfg(test)]
899 286 : {
900 286 : let guard = self.extra_test_dense_keyspace.load();
901 286 : for kr in &guard.ranges {
902 0 : result.add_range(kr.clone());
903 0 : }
904 0 : }
905 0 :
906 286 : let dense_keyspace = result.to_keyspace();
907 286 : let sparse_keyspace = SparseKeySpace(KeySpace {
908 286 : ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
909 286 : });
910 286 :
911 286 : if cfg!(debug_assertions) {
912 : // Verify if the sparse keyspaces are ordered and non-overlapping.
913 :
914 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
915 : // category of sparse keys are split into their own image/delta files. If there
916 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
917 : // and we want the developer to keep the keyspaces separated.
918 :
919 286 : let ranges = &sparse_keyspace.0.ranges;
920 :
921 : // TODO: use a single overlaps_with across the codebase
922 286 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
923 286 : !(a.end <= b.start || b.end <= a.start)
924 286 : }
925 572 : for i in 0..ranges.len() {
926 572 : for j in 0..i {
927 286 : if overlaps_with(&ranges[i], &ranges[j]) {
928 0 : panic!(
929 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
930 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
931 0 : );
932 286 : }
933 : }
934 : }
935 286 : for i in 1..ranges.len() {
936 286 : assert!(
937 286 : ranges[i - 1].end <= ranges[i].start,
938 0 : "unordered sparse keyspace: {}..{} and {}..{}",
939 0 : ranges[i - 1].start,
940 0 : ranges[i - 1].end,
941 0 : ranges[i].start,
942 0 : ranges[i].end
943 : );
944 : }
945 0 : }
946 :
947 286 : Ok((dense_keyspace, sparse_keyspace))
948 286 : }
949 :
950 : /// Get cached size of relation if it not updated after specified LSN
951 448540 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
952 448540 : let rel_size_cache = self.rel_size_cache.read().unwrap();
953 448540 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
954 448518 : if lsn >= *cached_lsn {
955 443372 : return Some(*nblocks);
956 5146 : }
957 22 : }
958 5168 : None
959 448540 : }
960 :
961 : /// Update cached relation size if there is no more recent update
962 5136 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
963 5136 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
964 5136 :
965 5136 : if lsn < rel_size_cache.complete_as_of {
966 : // Do not cache old values. It's safe to cache the size on read, as long as
967 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
968 : // never evict values from the cache, so if the relation size changed after
969 : // 'lsn', the new value is already in the cache.
970 0 : return;
971 5136 : }
972 5136 :
973 5136 : match rel_size_cache.map.entry(tag) {
974 5136 : hash_map::Entry::Occupied(mut entry) => {
975 5136 : let cached_lsn = entry.get_mut();
976 5136 : if lsn >= cached_lsn.0 {
977 0 : *cached_lsn = (lsn, nblocks);
978 5136 : }
979 : }
980 0 : hash_map::Entry::Vacant(entry) => {
981 0 : entry.insert((lsn, nblocks));
982 0 : }
983 : }
984 5136 : }
985 :
986 : /// Store cached relation size
987 282720 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
988 282720 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
989 282720 : rel_size_cache.map.insert(tag, (lsn, nblocks));
990 282720 : }
991 :
992 : /// Remove cached relation size
993 2 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
994 2 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
995 2 : rel_size_cache.map.remove(tag);
996 2 : }
997 : }
998 :
999 : /// DatadirModification represents an operation to ingest an atomic set of
1000 : /// updates to the repository.
1001 : ///
1002 : /// It is created by the 'begin_record' function. It is called for each WAL
1003 : /// record, so that all the modifications by a one WAL record appear atomic.
1004 : pub struct DatadirModification<'a> {
1005 : /// The timeline this modification applies to. You can access this to
1006 : /// read the state, but note that any pending updates are *not* reflected
1007 : /// in the state in 'tline' yet.
1008 : pub tline: &'a Timeline,
1009 :
1010 : /// Current LSN of the modification
1011 : lsn: Lsn,
1012 :
1013 : // The modifications are not applied directly to the underlying key-value store.
1014 : // The put-functions add the modifications here, and they are flushed to the
1015 : // underlying key-value store by the 'finish' function.
1016 : pending_lsns: Vec<Lsn>,
1017 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1018 : pending_nblocks: i64,
1019 :
1020 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1021 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1022 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1023 :
1024 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1025 : /// which keys are stored here.
1026 : pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>,
1027 :
1028 : // Sometimes during ingest, for example when extending a relation, we would like to write a zero page. However,
1029 : // if we encounter a write from postgres in the same wal record, we will drop this entry.
1030 : //
1031 : // Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed
1032 : // at the end of each wal record, and all these writes implicitly are at lsn Self::lsn
1033 : pending_zero_data_pages: HashSet<CompactKey>,
1034 :
1035 : /// For special "directory" keys that store key-value maps, track the size of the map
1036 : /// if it was updated in this modification.
1037 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
1038 :
1039 : /// An **approximation** of how large our EphemeralFile write will be when committed.
1040 : pending_bytes: usize,
1041 : }
1042 :
1043 : impl<'a> DatadirModification<'a> {
1044 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1045 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1046 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1047 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1048 :
1049 : /// Get the current lsn
1050 418056 : pub(crate) fn get_lsn(&self) -> Lsn {
1051 418056 : self.lsn
1052 418056 : }
1053 :
1054 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1055 0 : self.pending_bytes
1056 0 : }
1057 :
1058 0 : pub(crate) fn has_dirty_data_pages(&self) -> bool {
1059 0 : (!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty())
1060 0 : }
1061 :
1062 : /// Set the current lsn
1063 145858 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1064 145858 : ensure!(
1065 145858 : lsn >= self.lsn,
1066 0 : "setting an older lsn {} than {} is not allowed",
1067 : lsn,
1068 : self.lsn
1069 : );
1070 :
1071 : // If we are advancing LSN, then state from previous wal record should have been flushed.
1072 145858 : assert!(self.pending_zero_data_pages.is_empty());
1073 :
1074 145858 : if lsn > self.lsn {
1075 145858 : self.pending_lsns.push(self.lsn);
1076 145858 : self.lsn = lsn;
1077 145858 : }
1078 145858 : Ok(())
1079 145858 : }
1080 :
1081 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1082 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1083 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1084 : ///
1085 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1086 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1087 : /// via [`Self::get`] as soon as their record has been ingested.
1088 996032 : fn is_data_key(key: &Key) -> bool {
1089 996032 : key.is_rel_block_key() || key.is_slru_block_key()
1090 996032 : }
1091 :
1092 : /// Initialize a completely new repository.
1093 : ///
1094 : /// This inserts the directory metadata entries that are assumed to
1095 : /// always exist.
1096 172 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1097 172 : let buf = DbDirectory::ser(&DbDirectory {
1098 172 : dbdirs: HashMap::new(),
1099 172 : })?;
1100 172 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
1101 172 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1102 :
1103 172 : let buf = if self.tline.pg_version >= 17 {
1104 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1105 0 : xids: HashSet::new(),
1106 0 : })
1107 : } else {
1108 172 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1109 172 : xids: HashSet::new(),
1110 172 : })
1111 0 : }?;
1112 172 : self.pending_directory_entries
1113 172 : .push((DirectoryKind::TwoPhase, 0));
1114 172 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1115 :
1116 172 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1117 172 : let empty_dir = Value::Image(buf);
1118 172 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1119 172 : self.pending_directory_entries
1120 172 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1121 172 : self.put(
1122 172 : slru_dir_to_key(SlruKind::MultiXactMembers),
1123 172 : empty_dir.clone(),
1124 172 : );
1125 172 : self.pending_directory_entries
1126 172 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1127 172 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1128 172 : self.pending_directory_entries
1129 172 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1130 172 :
1131 172 : Ok(())
1132 172 : }
1133 :
1134 : #[cfg(test)]
1135 170 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1136 170 : self.init_empty()?;
1137 170 : self.put_control_file(bytes::Bytes::from_static(
1138 170 : b"control_file contents do not matter",
1139 170 : ))
1140 170 : .context("put_control_file")?;
1141 170 : self.put_checkpoint(bytes::Bytes::from_static(
1142 170 : b"checkpoint_file contents do not matter",
1143 170 : ))
1144 170 : .context("put_checkpoint_file")?;
1145 170 : Ok(())
1146 170 : }
1147 :
1148 : /// Put a new page version that can be constructed from a WAL record
1149 : ///
1150 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1151 : /// current end-of-file. It's up to the caller to check that the relation size
1152 : /// matches the blocks inserted!
1153 145630 : pub fn put_rel_wal_record(
1154 145630 : &mut self,
1155 145630 : rel: RelTag,
1156 145630 : blknum: BlockNumber,
1157 145630 : rec: NeonWalRecord,
1158 145630 : ) -> anyhow::Result<()> {
1159 145630 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1160 145630 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1161 145630 : Ok(())
1162 145630 : }
1163 :
1164 : // Same, but for an SLRU.
1165 8 : pub fn put_slru_wal_record(
1166 8 : &mut self,
1167 8 : kind: SlruKind,
1168 8 : segno: u32,
1169 8 : blknum: BlockNumber,
1170 8 : rec: NeonWalRecord,
1171 8 : ) -> anyhow::Result<()> {
1172 8 : self.put(
1173 8 : slru_block_to_key(kind, segno, blknum),
1174 8 : Value::WalRecord(rec),
1175 8 : );
1176 8 : Ok(())
1177 8 : }
1178 :
1179 : /// Like put_wal_record, but with ready-made image of the page.
1180 277866 : pub fn put_rel_page_image(
1181 277866 : &mut self,
1182 277866 : rel: RelTag,
1183 277866 : blknum: BlockNumber,
1184 277866 : img: Bytes,
1185 277866 : ) -> anyhow::Result<()> {
1186 277866 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1187 277866 : let key = rel_block_to_key(rel, blknum);
1188 277866 : if !key.is_valid_key_on_write_path() {
1189 0 : anyhow::bail!(
1190 0 : "the request contains data not supported by pageserver at {}",
1191 0 : key
1192 0 : );
1193 277866 : }
1194 277866 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1195 277866 : Ok(())
1196 277866 : }
1197 :
1198 6 : pub fn put_slru_page_image(
1199 6 : &mut self,
1200 6 : kind: SlruKind,
1201 6 : segno: u32,
1202 6 : blknum: BlockNumber,
1203 6 : img: Bytes,
1204 6 : ) -> anyhow::Result<()> {
1205 6 : let key = slru_block_to_key(kind, segno, blknum);
1206 6 : if !key.is_valid_key_on_write_path() {
1207 0 : anyhow::bail!(
1208 0 : "the request contains data not supported by pageserver at {}",
1209 0 : key
1210 0 : );
1211 6 : }
1212 6 : self.put(key, Value::Image(img));
1213 6 : Ok(())
1214 6 : }
1215 :
1216 2998 : pub(crate) fn put_rel_page_image_zero(
1217 2998 : &mut self,
1218 2998 : rel: RelTag,
1219 2998 : blknum: BlockNumber,
1220 2998 : ) -> anyhow::Result<()> {
1221 2998 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1222 2998 : let key = rel_block_to_key(rel, blknum);
1223 2998 : if !key.is_valid_key_on_write_path() {
1224 0 : anyhow::bail!(
1225 0 : "the request contains data not supported by pageserver: {} @ {}",
1226 0 : key,
1227 0 : self.lsn
1228 0 : );
1229 2998 : }
1230 2998 : self.pending_zero_data_pages.insert(key.to_compact());
1231 2998 : self.pending_bytes += ZERO_PAGE.len();
1232 2998 : Ok(())
1233 2998 : }
1234 :
1235 0 : pub(crate) fn put_slru_page_image_zero(
1236 0 : &mut self,
1237 0 : kind: SlruKind,
1238 0 : segno: u32,
1239 0 : blknum: BlockNumber,
1240 0 : ) -> anyhow::Result<()> {
1241 0 : let key = slru_block_to_key(kind, segno, blknum);
1242 0 : if !key.is_valid_key_on_write_path() {
1243 0 : anyhow::bail!(
1244 0 : "the request contains data not supported by pageserver: {} @ {}",
1245 0 : key,
1246 0 : self.lsn
1247 0 : );
1248 0 : }
1249 0 : self.pending_zero_data_pages.insert(key.to_compact());
1250 0 : self.pending_bytes += ZERO_PAGE.len();
1251 0 : Ok(())
1252 0 : }
1253 :
1254 : /// Call this at the end of each WAL record.
1255 145864 : pub(crate) fn on_record_end(&mut self) {
1256 145864 : let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages);
1257 148862 : for key in pending_zero_data_pages {
1258 2998 : self.put_data(key, Value::Image(ZERO_PAGE.clone()));
1259 2998 : }
1260 145864 : }
1261 :
1262 : /// Store a relmapper file (pg_filenode.map) in the repository
1263 16 : pub async fn put_relmap_file(
1264 16 : &mut self,
1265 16 : spcnode: Oid,
1266 16 : dbnode: Oid,
1267 16 : img: Bytes,
1268 16 : ctx: &RequestContext,
1269 16 : ) -> anyhow::Result<()> {
1270 : // Add it to the directory (if it doesn't exist already)
1271 16 : let buf = self.get(DBDIR_KEY, ctx).await?;
1272 16 : let mut dbdir = DbDirectory::des(&buf)?;
1273 :
1274 16 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1275 16 : if r.is_none() || r == Some(false) {
1276 : // The dbdir entry didn't exist, or it contained a
1277 : // 'false'. The 'insert' call already updated it with
1278 : // 'true', now write the updated 'dbdirs' map back.
1279 16 : let buf = DbDirectory::ser(&dbdir)?;
1280 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1281 0 : }
1282 16 : if r.is_none() {
1283 : // Create RelDirectory
1284 8 : let buf = RelDirectory::ser(&RelDirectory {
1285 8 : rels: HashSet::new(),
1286 8 : })?;
1287 8 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1288 8 : self.put(
1289 8 : rel_dir_to_key(spcnode, dbnode),
1290 8 : Value::Image(Bytes::from(buf)),
1291 8 : );
1292 8 : }
1293 :
1294 16 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1295 16 : Ok(())
1296 16 : }
1297 :
1298 0 : pub async fn put_twophase_file(
1299 0 : &mut self,
1300 0 : xid: u64,
1301 0 : img: Bytes,
1302 0 : ctx: &RequestContext,
1303 0 : ) -> anyhow::Result<()> {
1304 : // Add it to the directory entry
1305 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1306 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1307 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1308 0 : if !dir.xids.insert(xid) {
1309 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1310 0 : }
1311 0 : self.pending_directory_entries
1312 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1313 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1314 : } else {
1315 0 : let xid = xid as u32;
1316 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1317 0 : if !dir.xids.insert(xid) {
1318 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1319 0 : }
1320 0 : self.pending_directory_entries
1321 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1322 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1323 : };
1324 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1325 0 :
1326 0 : self.put(twophase_file_key(xid), Value::Image(img));
1327 0 : Ok(())
1328 0 : }
1329 :
1330 0 : pub async fn set_replorigin(
1331 0 : &mut self,
1332 0 : origin_id: RepOriginId,
1333 0 : origin_lsn: Lsn,
1334 0 : ) -> anyhow::Result<()> {
1335 0 : let key = repl_origin_key(origin_id);
1336 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1337 0 : Ok(())
1338 0 : }
1339 :
1340 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1341 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1342 0 : }
1343 :
1344 172 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1345 172 : self.put(CONTROLFILE_KEY, Value::Image(img));
1346 172 : Ok(())
1347 172 : }
1348 :
1349 186 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1350 186 : self.put(CHECKPOINT_KEY, Value::Image(img));
1351 186 : Ok(())
1352 186 : }
1353 :
1354 0 : pub async fn drop_dbdir(
1355 0 : &mut self,
1356 0 : spcnode: Oid,
1357 0 : dbnode: Oid,
1358 0 : ctx: &RequestContext,
1359 0 : ) -> anyhow::Result<()> {
1360 0 : let total_blocks = self
1361 0 : .tline
1362 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1363 0 : .await?;
1364 :
1365 : // Remove entry from dbdir
1366 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1367 0 : let mut dir = DbDirectory::des(&buf)?;
1368 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1369 0 : let buf = DbDirectory::ser(&dir)?;
1370 0 : self.pending_directory_entries
1371 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1372 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1373 : } else {
1374 0 : warn!(
1375 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1376 : spcnode, dbnode
1377 : );
1378 : }
1379 :
1380 : // Update logical database size.
1381 0 : self.pending_nblocks -= total_blocks as i64;
1382 0 :
1383 0 : // Delete all relations and metadata files for the spcnode/dnode
1384 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1385 0 : Ok(())
1386 0 : }
1387 :
1388 : /// Create a relation fork.
1389 : ///
1390 : /// 'nblocks' is the initial size.
1391 1920 : pub async fn put_rel_creation(
1392 1920 : &mut self,
1393 1920 : rel: RelTag,
1394 1920 : nblocks: BlockNumber,
1395 1920 : ctx: &RequestContext,
1396 1920 : ) -> Result<(), RelationError> {
1397 1920 : if rel.relnode == 0 {
1398 0 : return Err(RelationError::InvalidRelnode);
1399 1920 : }
1400 : // It's possible that this is the first rel for this db in this
1401 : // tablespace. Create the reldir entry for it if so.
1402 1920 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1403 1920 : .context("deserialize db")?;
1404 1920 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1405 1920 : let mut rel_dir =
1406 1920 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1407 : // Didn't exist. Update dbdir
1408 8 : e.insert(false);
1409 8 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1410 8 : self.pending_directory_entries
1411 8 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1412 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1413 8 :
1414 8 : // and create the RelDirectory
1415 8 : RelDirectory::default()
1416 : } else {
1417 : // reldir already exists, fetch it
1418 1912 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1419 1912 : .context("deserialize db")?
1420 : };
1421 :
1422 : // Add the new relation to the rel directory entry, and write it back
1423 1920 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1424 0 : return Err(RelationError::AlreadyExists);
1425 1920 : }
1426 1920 :
1427 1920 : self.pending_directory_entries
1428 1920 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1429 1920 :
1430 1920 : self.put(
1431 1920 : rel_dir_key,
1432 1920 : Value::Image(Bytes::from(
1433 1920 : RelDirectory::ser(&rel_dir).context("serialize")?,
1434 : )),
1435 : );
1436 :
1437 : // Put size
1438 1920 : let size_key = rel_size_to_key(rel);
1439 1920 : let buf = nblocks.to_le_bytes();
1440 1920 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1441 1920 :
1442 1920 : self.pending_nblocks += nblocks as i64;
1443 1920 :
1444 1920 : // Update relation size cache
1445 1920 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1446 1920 :
1447 1920 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1448 1920 : // caller.
1449 1920 : Ok(())
1450 1920 : }
1451 :
1452 : /// Truncate relation
1453 6012 : pub async fn put_rel_truncation(
1454 6012 : &mut self,
1455 6012 : rel: RelTag,
1456 6012 : nblocks: BlockNumber,
1457 6012 : ctx: &RequestContext,
1458 6012 : ) -> anyhow::Result<()> {
1459 6012 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1460 6012 : if self
1461 6012 : .tline
1462 6012 : .get_rel_exists(rel, Version::Modified(self), ctx)
1463 0 : .await?
1464 : {
1465 6012 : let size_key = rel_size_to_key(rel);
1466 : // Fetch the old size first
1467 6012 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1468 6012 :
1469 6012 : // Update the entry with the new size.
1470 6012 : let buf = nblocks.to_le_bytes();
1471 6012 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1472 6012 :
1473 6012 : // Update relation size cache
1474 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1475 6012 :
1476 6012 : // Update logical database size.
1477 6012 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1478 0 : }
1479 6012 : Ok(())
1480 6012 : }
1481 :
1482 : /// Extend relation
1483 : /// If new size is smaller, do nothing.
1484 276680 : pub async fn put_rel_extend(
1485 276680 : &mut self,
1486 276680 : rel: RelTag,
1487 276680 : nblocks: BlockNumber,
1488 276680 : ctx: &RequestContext,
1489 276680 : ) -> anyhow::Result<()> {
1490 276680 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1491 :
1492 : // Put size
1493 276680 : let size_key = rel_size_to_key(rel);
1494 276680 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1495 276680 :
1496 276680 : // only extend relation here. never decrease the size
1497 276680 : if nblocks > old_size {
1498 274788 : let buf = nblocks.to_le_bytes();
1499 274788 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1500 274788 :
1501 274788 : // Update relation size cache
1502 274788 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1503 274788 :
1504 274788 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1505 274788 : }
1506 276680 : Ok(())
1507 276680 : }
1508 :
1509 : /// Drop a relation.
1510 2 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1511 2 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1512 :
1513 : // Remove it from the directory entry
1514 2 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1515 2 : let buf = self.get(dir_key, ctx).await?;
1516 2 : let mut dir = RelDirectory::des(&buf)?;
1517 :
1518 2 : self.pending_directory_entries
1519 2 : .push((DirectoryKind::Rel, dir.rels.len()));
1520 2 :
1521 2 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1522 2 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1523 : } else {
1524 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1525 : }
1526 :
1527 : // update logical size
1528 2 : let size_key = rel_size_to_key(rel);
1529 2 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1530 2 : self.pending_nblocks -= old_size as i64;
1531 2 :
1532 2 : // Remove enty from relation size cache
1533 2 : self.tline.remove_cached_rel_size(&rel);
1534 2 :
1535 2 : // Delete size entry, as well as all blocks
1536 2 : self.delete(rel_key_range(rel));
1537 2 :
1538 2 : Ok(())
1539 2 : }
1540 :
1541 6 : pub async fn put_slru_segment_creation(
1542 6 : &mut self,
1543 6 : kind: SlruKind,
1544 6 : segno: u32,
1545 6 : nblocks: BlockNumber,
1546 6 : ctx: &RequestContext,
1547 6 : ) -> anyhow::Result<()> {
1548 6 : // Add it to the directory entry
1549 6 : let dir_key = slru_dir_to_key(kind);
1550 6 : let buf = self.get(dir_key, ctx).await?;
1551 6 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1552 :
1553 6 : if !dir.segments.insert(segno) {
1554 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1555 6 : }
1556 6 : self.pending_directory_entries
1557 6 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1558 6 : self.put(
1559 6 : dir_key,
1560 6 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1561 : );
1562 :
1563 : // Put size
1564 6 : let size_key = slru_segment_size_to_key(kind, segno);
1565 6 : let buf = nblocks.to_le_bytes();
1566 6 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1567 6 :
1568 6 : // even if nblocks > 0, we don't insert any actual blocks here
1569 6 :
1570 6 : Ok(())
1571 6 : }
1572 :
1573 : /// Extend SLRU segment
1574 0 : pub fn put_slru_extend(
1575 0 : &mut self,
1576 0 : kind: SlruKind,
1577 0 : segno: u32,
1578 0 : nblocks: BlockNumber,
1579 0 : ) -> anyhow::Result<()> {
1580 0 : // Put size
1581 0 : let size_key = slru_segment_size_to_key(kind, segno);
1582 0 : let buf = nblocks.to_le_bytes();
1583 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1584 0 : Ok(())
1585 0 : }
1586 :
1587 : /// This method is used for marking truncated SLRU files
1588 0 : pub async fn drop_slru_segment(
1589 0 : &mut self,
1590 0 : kind: SlruKind,
1591 0 : segno: u32,
1592 0 : ctx: &RequestContext,
1593 0 : ) -> anyhow::Result<()> {
1594 0 : // Remove it from the directory entry
1595 0 : let dir_key = slru_dir_to_key(kind);
1596 0 : let buf = self.get(dir_key, ctx).await?;
1597 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1598 :
1599 0 : if !dir.segments.remove(&segno) {
1600 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1601 0 : }
1602 0 : self.pending_directory_entries
1603 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1604 0 : self.put(
1605 0 : dir_key,
1606 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1607 : );
1608 :
1609 : // Delete size entry, as well as all blocks
1610 0 : self.delete(slru_segment_key_range(kind, segno));
1611 0 :
1612 0 : Ok(())
1613 0 : }
1614 :
1615 : /// Drop a relmapper file (pg_filenode.map)
1616 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1617 0 : // TODO
1618 0 : Ok(())
1619 0 : }
1620 :
1621 : /// This method is used for marking truncated SLRU files
1622 0 : pub async fn drop_twophase_file(
1623 0 : &mut self,
1624 0 : xid: u64,
1625 0 : ctx: &RequestContext,
1626 0 : ) -> anyhow::Result<()> {
1627 : // Remove it from the directory entry
1628 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1629 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1630 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
1631 :
1632 0 : if !dir.xids.remove(&xid) {
1633 0 : warn!("twophase file for xid {} does not exist", xid);
1634 0 : }
1635 0 : self.pending_directory_entries
1636 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1637 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1638 : } else {
1639 0 : let xid: u32 = u32::try_from(xid)?;
1640 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1641 :
1642 0 : if !dir.xids.remove(&xid) {
1643 0 : warn!("twophase file for xid {} does not exist", xid);
1644 0 : }
1645 0 : self.pending_directory_entries
1646 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1647 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1648 : };
1649 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1650 0 :
1651 0 : // Delete it
1652 0 : self.delete(twophase_key_range(xid));
1653 0 :
1654 0 : Ok(())
1655 0 : }
1656 :
1657 16 : pub async fn put_file(
1658 16 : &mut self,
1659 16 : path: &str,
1660 16 : content: &[u8],
1661 16 : ctx: &RequestContext,
1662 16 : ) -> anyhow::Result<()> {
1663 16 : let key = aux_file::encode_aux_file_key(path);
1664 : // retrieve the key from the engine
1665 16 : let old_val = match self.get(key, ctx).await {
1666 4 : Ok(val) => Some(val),
1667 12 : Err(PageReconstructError::MissingKey(_)) => None,
1668 0 : Err(e) => return Err(e.into()),
1669 : };
1670 16 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
1671 4 : aux_file::decode_file_value(old_val)?
1672 : } else {
1673 12 : Vec::new()
1674 : };
1675 16 : let mut other_files = Vec::with_capacity(files.len());
1676 16 : let mut modifying_file = None;
1677 20 : for file @ (p, content) in files {
1678 4 : if path == p {
1679 4 : assert!(
1680 4 : modifying_file.is_none(),
1681 0 : "duplicated entries found for {}",
1682 : path
1683 : );
1684 4 : modifying_file = Some(content);
1685 0 : } else {
1686 0 : other_files.push(file);
1687 0 : }
1688 : }
1689 16 : let mut new_files = other_files;
1690 16 : match (modifying_file, content.is_empty()) {
1691 2 : (Some(old_content), false) => {
1692 2 : self.tline
1693 2 : .aux_file_size_estimator
1694 2 : .on_update(old_content.len(), content.len());
1695 2 : new_files.push((path, content));
1696 2 : }
1697 2 : (Some(old_content), true) => {
1698 2 : self.tline
1699 2 : .aux_file_size_estimator
1700 2 : .on_remove(old_content.len());
1701 2 : // not adding the file key to the final `new_files` vec.
1702 2 : }
1703 12 : (None, false) => {
1704 12 : self.tline.aux_file_size_estimator.on_add(content.len());
1705 12 : new_files.push((path, content));
1706 12 : }
1707 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
1708 : }
1709 16 : let new_val = aux_file::encode_file_value(&new_files)?;
1710 16 : self.put(key, Value::Image(new_val.into()));
1711 16 :
1712 16 : Ok(())
1713 16 : }
1714 :
1715 : ///
1716 : /// Flush changes accumulated so far to the underlying repository.
1717 : ///
1718 : /// Usually, changes made in DatadirModification are atomic, but this allows
1719 : /// you to flush them to the underlying repository before the final `commit`.
1720 : /// That allows to free up the memory used to hold the pending changes.
1721 : ///
1722 : /// Currently only used during bulk import of a data directory. In that
1723 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1724 : /// whole import fails and the timeline will be deleted anyway.
1725 : /// (Or to be precise, it will be left behind for debugging purposes and
1726 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1727 : ///
1728 : /// Note: A consequence of flushing the pending operations is that they
1729 : /// won't be visible to subsequent operations until `commit`. The function
1730 : /// retains all the metadata, but data pages are flushed. That's again OK
1731 : /// for bulk import, where you are just loading data pages and won't try to
1732 : /// modify the same pages twice.
1733 1930 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1734 1930 : // Unless we have accumulated a decent amount of changes, it's not worth it
1735 1930 : // to scan through the pending_updates list.
1736 1930 : let pending_nblocks = self.pending_nblocks;
1737 1930 : if pending_nblocks < 10000 {
1738 1930 : return Ok(());
1739 0 : }
1740 :
1741 0 : let mut writer = self.tline.writer().await;
1742 :
1743 : // Flush relation and SLRU data blocks, keep metadata.
1744 0 : let pending_data_pages = std::mem::take(&mut self.pending_data_pages);
1745 0 :
1746 0 : // This bails out on first error without modifying pending_updates.
1747 0 : // That's Ok, cf this function's doc comment.
1748 0 : writer.put_batch(pending_data_pages, ctx).await?;
1749 0 : self.pending_bytes = 0;
1750 0 :
1751 0 : if pending_nblocks != 0 {
1752 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1753 0 : self.pending_nblocks = 0;
1754 0 : }
1755 :
1756 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1757 0 : writer.update_directory_entries_count(kind, count as u64);
1758 0 : }
1759 :
1760 0 : Ok(())
1761 1930 : }
1762 :
1763 : ///
1764 : /// Finish this atomic update, writing all the updated keys to the
1765 : /// underlying timeline.
1766 : /// All the modifications in this atomic update are stamped by the specified LSN.
1767 : ///
1768 743058 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1769 743058 : // Commit should never be called mid-wal-record
1770 743058 : assert!(self.pending_zero_data_pages.is_empty());
1771 :
1772 743058 : let mut writer = self.tline.writer().await;
1773 :
1774 743058 : let pending_nblocks = self.pending_nblocks;
1775 743058 : self.pending_nblocks = 0;
1776 743058 :
1777 743058 : // Ordering: the items in this batch do not need to be in any global order, but values for
1778 743058 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
1779 743058 : // this to do efficient updates to its index.
1780 743058 : let mut write_batch = std::mem::take(&mut self.pending_data_pages);
1781 743058 :
1782 743058 : write_batch.extend(
1783 743058 : self.pending_metadata_pages
1784 743058 : .drain()
1785 743058 : .flat_map(|(key, values)| {
1786 273784 : values
1787 273784 : .into_iter()
1788 273784 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
1789 743058 : }),
1790 743058 : );
1791 743058 :
1792 743058 : if !write_batch.is_empty() {
1793 414042 : writer.put_batch(write_batch, ctx).await?;
1794 329016 : }
1795 :
1796 743058 : if !self.pending_deletions.is_empty() {
1797 2 : writer.delete_batch(&self.pending_deletions, ctx).await?;
1798 2 : self.pending_deletions.clear();
1799 743056 : }
1800 :
1801 743058 : self.pending_lsns.push(self.lsn);
1802 888916 : for pending_lsn in self.pending_lsns.drain(..) {
1803 888916 : // Ideally, we should be able to call writer.finish_write() only once
1804 888916 : // with the highest LSN. However, the last_record_lsn variable in the
1805 888916 : // timeline keeps track of the latest LSN and the immediate previous LSN
1806 888916 : // so we need to record every LSN to not leave a gap between them.
1807 888916 : writer.finish_write(pending_lsn);
1808 888916 : }
1809 :
1810 743058 : if pending_nblocks != 0 {
1811 270570 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1812 472488 : }
1813 :
1814 743058 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1815 2804 : writer.update_directory_entries_count(kind, count as u64);
1816 2804 : }
1817 :
1818 743058 : self.pending_bytes = 0;
1819 743058 :
1820 743058 : Ok(())
1821 743058 : }
1822 :
1823 291704 : pub(crate) fn len(&self) -> usize {
1824 291704 : self.pending_metadata_pages.len()
1825 291704 : + self.pending_data_pages.len()
1826 291704 : + self.pending_deletions.len()
1827 291704 : }
1828 :
1829 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
1830 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
1831 : /// in the modification.
1832 : ///
1833 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
1834 : /// page must ensure that the pages they read are already committed in Timeline, for example
1835 : /// DB create operations are always preceded by a call to commit(). This is special cased because
1836 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
1837 : /// and not data pages.
1838 286586 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1839 286586 : if !Self::is_data_key(&key) {
1840 : // Have we already updated the same key? Read the latest pending updated
1841 : // version in that case.
1842 : //
1843 : // Note: we don't check pending_deletions. It is an error to request a
1844 : // value that has been removed, deletion only avoids leaking storage.
1845 286586 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
1846 15928 : if let Some((_, _, value)) = values.last() {
1847 15928 : return if let Value::Image(img) = value {
1848 15928 : Ok(img.clone())
1849 : } else {
1850 : // Currently, we never need to read back a WAL record that we
1851 : // inserted in the same "transaction". All the metadata updates
1852 : // work directly with Images, and we never need to read actual
1853 : // data pages. We could handle this if we had to, by calling
1854 : // the walredo manager, but let's keep it simple for now.
1855 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
1856 0 : "unexpected pending WAL record"
1857 0 : )))
1858 : };
1859 0 : }
1860 270658 : }
1861 : } else {
1862 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
1863 : // this key should never be present in pending_data_pages. We ensure this by committing
1864 : // modifications before ingesting DB create operations, which are the only kind that reads
1865 : // data pages during ingest.
1866 0 : if cfg!(debug_assertions) {
1867 0 : for (dirty_key, _, _, _) in &self.pending_data_pages {
1868 0 : debug_assert!(&key.to_compact() != dirty_key);
1869 : }
1870 :
1871 0 : debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact()))
1872 0 : }
1873 : }
1874 :
1875 : // Metadata page cache miss, or we're reading a data page.
1876 270658 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1877 270658 : self.tline.get(key, lsn, ctx).await
1878 286586 : }
1879 :
1880 709446 : fn put(&mut self, key: Key, val: Value) {
1881 709446 : if Self::is_data_key(&key) {
1882 423510 : self.put_data(key.to_compact(), val)
1883 : } else {
1884 285936 : self.put_metadata(key.to_compact(), val)
1885 : }
1886 709446 : }
1887 :
1888 426508 : fn put_data(&mut self, key: CompactKey, val: Value) {
1889 426508 : let val_serialized_size = val.serialized_size().unwrap() as usize;
1890 426508 :
1891 426508 : // If this page was previously zero'd in the same WalRecord, then drop the previous zero page write. This
1892 426508 : // is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend),
1893 426508 : // and the subsequent postgres-originating write
1894 426508 : if self.pending_zero_data_pages.remove(&key) {
1895 0 : self.pending_bytes -= ZERO_PAGE.len();
1896 426508 : }
1897 :
1898 426508 : self.pending_bytes += val_serialized_size;
1899 426508 : self.pending_data_pages
1900 426508 : .push((key, self.lsn, val_serialized_size, val))
1901 426508 : }
1902 :
1903 285936 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
1904 285936 : let values = self.pending_metadata_pages.entry(key).or_default();
1905 : // Replace the previous value if it exists at the same lsn
1906 285936 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
1907 12152 : if *last_lsn == self.lsn {
1908 : // Update the pending_bytes contribution from this entry, and update the serialized size in place
1909 12152 : self.pending_bytes -= *last_value_ser_size;
1910 12152 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
1911 12152 : self.pending_bytes += *last_value_ser_size;
1912 12152 :
1913 12152 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
1914 12152 : // have been generated by synthesized zero page writes prior to the first real write to a page.
1915 12152 : *last_value = val;
1916 12152 : return;
1917 0 : }
1918 273784 : }
1919 :
1920 273784 : let val_serialized_size = val.serialized_size().unwrap() as usize;
1921 273784 : self.pending_bytes += val_serialized_size;
1922 273784 : values.push((self.lsn, val_serialized_size, val));
1923 285936 : }
1924 :
1925 2 : fn delete(&mut self, key_range: Range<Key>) {
1926 2 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1927 2 : self.pending_deletions.push((key_range, self.lsn));
1928 2 : }
1929 : }
1930 :
1931 : /// This struct facilitates accessing either a committed key from the timeline at a
1932 : /// specific LSN, or the latest uncommitted key from a pending modification.
1933 : ///
1934 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1935 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1936 : /// need to look up the keys in the modification first before looking them up in the
1937 : /// timeline to not miss the latest updates.
1938 : #[derive(Clone, Copy)]
1939 : pub enum Version<'a> {
1940 : Lsn(Lsn),
1941 : Modified(&'a DatadirModification<'a>),
1942 : }
1943 :
1944 : impl<'a> Version<'a> {
1945 23560 : async fn get(
1946 23560 : &self,
1947 23560 : timeline: &Timeline,
1948 23560 : key: Key,
1949 23560 : ctx: &RequestContext,
1950 23560 : ) -> Result<Bytes, PageReconstructError> {
1951 23560 : match self {
1952 23540 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1953 20 : Version::Modified(modification) => modification.get(key, ctx).await,
1954 : }
1955 23560 : }
1956 :
1957 35620 : fn get_lsn(&self) -> Lsn {
1958 35620 : match self {
1959 29574 : Version::Lsn(lsn) => *lsn,
1960 6046 : Version::Modified(modification) => modification.lsn,
1961 : }
1962 35620 : }
1963 : }
1964 :
1965 : //--- Metadata structs stored in key-value pairs in the repository.
1966 :
1967 2240 : #[derive(Debug, Serialize, Deserialize)]
1968 : struct DbDirectory {
1969 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1970 : dbdirs: HashMap<(Oid, Oid), bool>,
1971 : }
1972 :
1973 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
1974 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
1975 : // were named like "pg_twophase/000002E5", now they're like
1976 : // "pg_twophsae/0000000A000002E4".
1977 :
1978 288 : #[derive(Debug, Serialize, Deserialize)]
1979 : struct TwoPhaseDirectory {
1980 : xids: HashSet<TransactionId>,
1981 : }
1982 :
1983 0 : #[derive(Debug, Serialize, Deserialize)]
1984 : struct TwoPhaseDirectoryV17 {
1985 : xids: HashSet<u64>,
1986 : }
1987 :
1988 1932 : #[derive(Debug, Serialize, Deserialize, Default)]
1989 : struct RelDirectory {
1990 : // Set of relations that exist. (relfilenode, forknum)
1991 : //
1992 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1993 : // key-value pairs, if you have a lot of relations
1994 : rels: HashSet<(Oid, u8)>,
1995 : }
1996 :
1997 0 : #[derive(Debug, Serialize, Deserialize)]
1998 : struct RelSizeEntry {
1999 : nblocks: u32,
2000 : }
2001 :
2002 864 : #[derive(Debug, Serialize, Deserialize, Default)]
2003 : struct SlruSegmentDirectory {
2004 : // Set of SLRU segments that exist.
2005 : segments: HashSet<u32>,
2006 : }
2007 :
2008 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2009 : #[repr(u8)]
2010 : pub(crate) enum DirectoryKind {
2011 : Db,
2012 : TwoPhase,
2013 : Rel,
2014 : AuxFiles,
2015 : SlruSegment(SlruKind),
2016 : }
2017 :
2018 : impl DirectoryKind {
2019 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2020 5608 : pub(crate) fn offset(&self) -> usize {
2021 5608 : self.into_usize()
2022 5608 : }
2023 : }
2024 :
2025 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2026 :
2027 : #[allow(clippy::bool_assert_comparison)]
2028 : #[cfg(test)]
2029 : mod tests {
2030 : use hex_literal::hex;
2031 : use utils::id::TimelineId;
2032 :
2033 : use super::*;
2034 :
2035 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
2036 :
2037 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2038 : #[tokio::test]
2039 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2040 2 : let name = "aux_files_round_trip";
2041 2 : let harness = TenantHarness::create(name).await?;
2042 2 :
2043 2 : pub const TIMELINE_ID: TimelineId =
2044 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2045 2 :
2046 8 : let (tenant, ctx) = harness.load().await;
2047 2 : let tline = tenant
2048 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2049 2 : .await?;
2050 2 : let tline = tline.raw_timeline().unwrap();
2051 2 :
2052 2 : // First modification: insert two keys
2053 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
2054 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2055 2 : modification.set_lsn(Lsn(0x1008))?;
2056 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2057 2 : modification.commit(&ctx).await?;
2058 2 : let expect_1008 = HashMap::from([
2059 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2060 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2061 2 : ]);
2062 2 :
2063 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2064 2 : assert_eq!(readback, expect_1008);
2065 2 :
2066 2 : // Second modification: update one key, remove the other
2067 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
2068 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2069 2 : modification.set_lsn(Lsn(0x2008))?;
2070 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
2071 2 : modification.commit(&ctx).await?;
2072 2 : let expect_2008 =
2073 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2074 2 :
2075 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
2076 2 : assert_eq!(readback, expect_2008);
2077 2 :
2078 2 : // Reading back in time works
2079 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2080 2 : assert_eq!(readback, expect_1008);
2081 2 :
2082 2 : Ok(())
2083 2 : }
2084 :
2085 : /*
2086 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2087 : let incremental = timeline.get_current_logical_size();
2088 : let non_incremental = timeline
2089 : .get_current_logical_size_non_incremental(lsn)
2090 : .unwrap();
2091 : assert_eq!(incremental, non_incremental);
2092 : }
2093 : */
2094 :
2095 : /*
2096 : ///
2097 : /// Test list_rels() function, with branches and dropped relations
2098 : ///
2099 : #[test]
2100 : fn test_list_rels_drop() -> Result<()> {
2101 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2102 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2103 : const TESTDB: u32 = 111;
2104 :
2105 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2106 : // after branching fails below
2107 : let mut writer = tline.begin_record(Lsn(0x10));
2108 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2109 : writer.finish()?;
2110 :
2111 : // Create a relation on the timeline
2112 : let mut writer = tline.begin_record(Lsn(0x20));
2113 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2114 : writer.finish()?;
2115 :
2116 : let writer = tline.begin_record(Lsn(0x00));
2117 : writer.finish()?;
2118 :
2119 : // Check that list_rels() lists it after LSN 2, but no before it
2120 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2121 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2122 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2123 :
2124 : // Create a branch, check that the relation is visible there
2125 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2126 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2127 : Some(timeline) => timeline,
2128 : None => panic!("Should have a local timeline"),
2129 : };
2130 : let newtline = DatadirTimelineImpl::new(newtline);
2131 : assert!(newtline
2132 : .list_rels(0, TESTDB, Lsn(0x30))?
2133 : .contains(&TESTREL_A));
2134 :
2135 : // Drop it on the branch
2136 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2137 : new_writer.drop_relation(TESTREL_A)?;
2138 : new_writer.finish()?;
2139 :
2140 : // Check that it's no longer listed on the branch after the point where it was dropped
2141 : assert!(newtline
2142 : .list_rels(0, TESTDB, Lsn(0x30))?
2143 : .contains(&TESTREL_A));
2144 : assert!(!newtline
2145 : .list_rels(0, TESTDB, Lsn(0x40))?
2146 : .contains(&TESTREL_A));
2147 :
2148 : // Run checkpoint and garbage collection and check that it's still not visible
2149 : newtline.checkpoint(CheckpointConfig::Forced)?;
2150 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2151 :
2152 : assert!(!newtline
2153 : .list_rels(0, TESTDB, Lsn(0x40))?
2154 : .contains(&TESTREL_A));
2155 :
2156 : Ok(())
2157 : }
2158 : */
2159 :
2160 : /*
2161 : #[test]
2162 : fn test_read_beyond_eof() -> Result<()> {
2163 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2164 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2165 :
2166 : make_some_layers(&tline, Lsn(0x20))?;
2167 : let mut writer = tline.begin_record(Lsn(0x60));
2168 : walingest.put_rel_page_image(
2169 : &mut writer,
2170 : TESTREL_A,
2171 : 0,
2172 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2173 : )?;
2174 : writer.finish()?;
2175 :
2176 : // Test read before rel creation. Should error out.
2177 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2178 :
2179 : // Read block beyond end of relation at different points in time.
2180 : // These reads should fall into different delta, image, and in-memory layers.
2181 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2182 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2183 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2184 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2185 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2186 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2187 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2188 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2189 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2190 :
2191 : // Test on an in-memory layer with no preceding layer
2192 : let mut writer = tline.begin_record(Lsn(0x70));
2193 : walingest.put_rel_page_image(
2194 : &mut writer,
2195 : TESTREL_B,
2196 : 0,
2197 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2198 : )?;
2199 : writer.finish()?;
2200 :
2201 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2202 :
2203 : Ok(())
2204 : }
2205 : */
2206 : }
|