Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
13 : use crate::walrecord::NeonWalRecord;
14 : use crate::{aux_file, repository::*};
15 : use anyhow::{bail, ensure, Context};
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use pageserver_api::key::{
19 : dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
20 : relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
21 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
22 : CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
23 : };
24 : use pageserver_api::keyspace::SparseKeySpace;
25 : use pageserver_api::models::AuxFilePolicy;
26 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
27 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
28 : use postgres_ffi::BLCKSZ;
29 : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
30 : use serde::{Deserialize, Serialize};
31 : use std::collections::{hash_map, HashMap, HashSet};
32 : use std::ops::ControlFlow;
33 : use std::ops::Range;
34 : use strum::IntoEnumIterator;
35 : use tokio_util::sync::CancellationToken;
36 : use tracing::{debug, info, trace, warn};
37 : use utils::bin_ser::DeserializeError;
38 : use utils::pausable_failpoint;
39 : use utils::{bin_ser::BeSer, lsn::Lsn};
40 :
41 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
42 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
43 :
44 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
45 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
46 :
47 : #[derive(Debug)]
48 : pub enum LsnForTimestamp {
49 : /// Found commits both before and after the given timestamp
50 : Present(Lsn),
51 :
52 : /// Found no commits after the given timestamp, this means
53 : /// that the newest data in the branch is older than the given
54 : /// timestamp.
55 : ///
56 : /// All commits <= LSN happened before the given timestamp
57 : Future(Lsn),
58 :
59 : /// The queried timestamp is past our horizon we look back at (PITR)
60 : ///
61 : /// All commits > LSN happened after the given timestamp,
62 : /// but any commits < LSN might have happened before or after
63 : /// the given timestamp. We don't know because no data before
64 : /// the given lsn is available.
65 : Past(Lsn),
66 :
67 : /// We have found no commit with a timestamp,
68 : /// so we can't return anything meaningful.
69 : ///
70 : /// The associated LSN is the lower bound value we can safely
71 : /// create branches on, but no statement is made if it is
72 : /// older or newer than the timestamp.
73 : ///
74 : /// This variant can e.g. be returned right after a
75 : /// cluster import.
76 : NoData(Lsn),
77 : }
78 :
79 0 : #[derive(Debug, thiserror::Error)]
80 : pub(crate) enum CalculateLogicalSizeError {
81 : #[error("cancelled")]
82 : Cancelled,
83 :
84 : /// Something went wrong while reading the metadata we use to calculate logical size
85 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
86 : /// in the `From` implementation for this variant.
87 : #[error(transparent)]
88 : PageRead(PageReconstructError),
89 :
90 : /// Something went wrong deserializing metadata that we read to calculate logical size
91 : #[error("decode error: {0}")]
92 : Decode(#[from] DeserializeError),
93 : }
94 :
95 0 : #[derive(Debug, thiserror::Error)]
96 : pub(crate) enum CollectKeySpaceError {
97 : #[error(transparent)]
98 : Decode(#[from] DeserializeError),
99 : #[error(transparent)]
100 : PageRead(PageReconstructError),
101 : #[error("cancelled")]
102 : Cancelled,
103 : }
104 :
105 : impl From<PageReconstructError> for CollectKeySpaceError {
106 0 : fn from(err: PageReconstructError) -> Self {
107 0 : match err {
108 0 : PageReconstructError::Cancelled => Self::Cancelled,
109 0 : err => Self::PageRead(err),
110 : }
111 0 : }
112 : }
113 :
114 : impl From<PageReconstructError> for CalculateLogicalSizeError {
115 0 : fn from(pre: PageReconstructError) -> Self {
116 0 : match pre {
117 0 : PageReconstructError::Cancelled => Self::Cancelled,
118 0 : _ => Self::PageRead(pre),
119 : }
120 0 : }
121 : }
122 :
123 0 : #[derive(Debug, thiserror::Error)]
124 : pub enum RelationError {
125 : #[error("Relation Already Exists")]
126 : AlreadyExists,
127 : #[error("invalid relnode")]
128 : InvalidRelnode,
129 : #[error(transparent)]
130 : Other(#[from] anyhow::Error),
131 : }
132 :
133 : ///
134 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
135 : /// and other special kinds of files, in a versioned key-value store. The
136 : /// Timeline struct provides the key-value store.
137 : ///
138 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
139 : /// implementation, and might be moved into a separate struct later.
140 : impl Timeline {
141 : /// Start ingesting a WAL record, or other atomic modification of
142 : /// the timeline.
143 : ///
144 : /// This provides a transaction-like interface to perform a bunch
145 : /// of modifications atomically.
146 : ///
147 : /// To ingest a WAL record, call begin_modification(lsn) to get a
148 : /// DatadirModification object. Use the functions in the object to
149 : /// modify the repository state, updating all the pages and metadata
150 : /// that the WAL record affects. When you're done, call commit() to
151 : /// commit the changes.
152 : ///
153 : /// Lsn stored in modification is advanced by `ingest_record` and
154 : /// is used by `commit()` to update `last_record_lsn`.
155 : ///
156 : /// Calling commit() will flush all the changes and reset the state,
157 : /// so the `DatadirModification` struct can be reused to perform the next modification.
158 : ///
159 : /// Note that any pending modifications you make through the
160 : /// modification object won't be visible to calls to the 'get' and list
161 : /// functions of the timeline until you finish! And if you update the
162 : /// same page twice, the last update wins.
163 : ///
164 805176 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
165 805176 : where
166 805176 : Self: Sized,
167 805176 : {
168 805176 : DatadirModification {
169 805176 : tline: self,
170 805176 : pending_lsns: Vec::new(),
171 805176 : pending_updates: HashMap::new(),
172 805176 : pending_deletions: Vec::new(),
173 805176 : pending_nblocks: 0,
174 805176 : pending_directory_entries: Vec::new(),
175 805176 : pending_bytes: 0,
176 805176 : lsn,
177 805176 : }
178 805176 : }
179 :
180 : //------------------------------------------------------------------------------
181 : // Public GET functions
182 : //------------------------------------------------------------------------------
183 :
184 : /// Look up given page version.
185 55152 : pub(crate) async fn get_rel_page_at_lsn(
186 55152 : &self,
187 55152 : tag: RelTag,
188 55152 : blknum: BlockNumber,
189 55152 : version: Version<'_>,
190 55152 : ctx: &RequestContext,
191 55152 : ) -> Result<Bytes, PageReconstructError> {
192 55152 : if tag.relnode == 0 {
193 0 : return Err(PageReconstructError::Other(
194 0 : RelationError::InvalidRelnode.into(),
195 0 : ));
196 55152 : }
197 :
198 55152 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
199 55152 : if blknum >= nblocks {
200 0 : debug!(
201 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
202 0 : tag,
203 0 : blknum,
204 0 : version.get_lsn(),
205 : nblocks
206 : );
207 0 : return Ok(ZERO_PAGE.clone());
208 55152 : }
209 55152 :
210 55152 : let key = rel_block_to_key(tag, blknum);
211 55152 : version.get(self, key, ctx).await
212 55152 : }
213 :
214 : // Get size of a database in blocks
215 0 : pub(crate) async fn get_db_size(
216 0 : &self,
217 0 : spcnode: Oid,
218 0 : dbnode: Oid,
219 0 : version: Version<'_>,
220 0 : ctx: &RequestContext,
221 0 : ) -> Result<usize, PageReconstructError> {
222 0 : let mut total_blocks = 0;
223 :
224 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
225 :
226 0 : for rel in rels {
227 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
228 0 : total_blocks += n_blocks as usize;
229 : }
230 0 : Ok(total_blocks)
231 0 : }
232 :
233 : /// Get size of a relation file
234 73302 : pub(crate) async fn get_rel_size(
235 73302 : &self,
236 73302 : tag: RelTag,
237 73302 : version: Version<'_>,
238 73302 : ctx: &RequestContext,
239 73302 : ) -> Result<BlockNumber, PageReconstructError> {
240 73302 : if tag.relnode == 0 {
241 0 : return Err(PageReconstructError::Other(
242 0 : RelationError::InvalidRelnode.into(),
243 0 : ));
244 73302 : }
245 :
246 73302 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
247 57882 : return Ok(nblocks);
248 15420 : }
249 15420 :
250 15420 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
251 0 : && !self.get_rel_exists(tag, version, ctx).await?
252 : {
253 : // FIXME: Postgres sometimes calls smgrcreate() to create
254 : // FSM, and smgrnblocks() on it immediately afterwards,
255 : // without extending it. Tolerate that by claiming that
256 : // any non-existent FSM fork has size 0.
257 0 : return Ok(0);
258 15420 : }
259 15420 :
260 15420 : let key = rel_size_to_key(tag);
261 15420 : let mut buf = version.get(self, key, ctx).await?;
262 15408 : let nblocks = buf.get_u32_le();
263 15408 :
264 15408 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
265 15408 :
266 15408 : Ok(nblocks)
267 73302 : }
268 :
269 : /// Does relation exist?
270 18150 : pub(crate) async fn get_rel_exists(
271 18150 : &self,
272 18150 : tag: RelTag,
273 18150 : version: Version<'_>,
274 18150 : ctx: &RequestContext,
275 18150 : ) -> Result<bool, PageReconstructError> {
276 18150 : if tag.relnode == 0 {
277 0 : return Err(PageReconstructError::Other(
278 0 : RelationError::InvalidRelnode.into(),
279 0 : ));
280 18150 : }
281 :
282 : // first try to lookup relation in cache
283 18150 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
284 18096 : return Ok(true);
285 54 : }
286 : // then check if the database was already initialized.
287 : // get_rel_exists can be called before dbdir is created.
288 54 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
289 54 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
290 54 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
291 0 : return Ok(false);
292 54 : }
293 54 : // fetch directory listing
294 54 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
295 54 : let buf = version.get(self, key, ctx).await?;
296 :
297 54 : let dir = RelDirectory::des(&buf)?;
298 54 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
299 18150 : }
300 :
301 : /// Get a list of all existing relations in given tablespace and database.
302 : ///
303 : /// # Cancel-Safety
304 : ///
305 : /// This method is cancellation-safe.
306 0 : pub(crate) async fn list_rels(
307 0 : &self,
308 0 : spcnode: Oid,
309 0 : dbnode: Oid,
310 0 : version: Version<'_>,
311 0 : ctx: &RequestContext,
312 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
313 0 : // fetch directory listing
314 0 : let key = rel_dir_to_key(spcnode, dbnode);
315 0 : let buf = version.get(self, key, ctx).await?;
316 :
317 0 : let dir = RelDirectory::des(&buf)?;
318 0 : let rels: HashSet<RelTag> =
319 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
320 0 : spcnode,
321 0 : dbnode,
322 0 : relnode: *relnode,
323 0 : forknum: *forknum,
324 0 : }));
325 0 :
326 0 : Ok(rels)
327 0 : }
328 :
329 : /// Get the whole SLRU segment
330 0 : pub(crate) async fn get_slru_segment(
331 0 : &self,
332 0 : kind: SlruKind,
333 0 : segno: u32,
334 0 : lsn: Lsn,
335 0 : ctx: &RequestContext,
336 0 : ) -> Result<Bytes, PageReconstructError> {
337 0 : let n_blocks = self
338 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
339 0 : .await?;
340 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
341 0 : for blkno in 0..n_blocks {
342 0 : let block = self
343 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
344 0 : .await?;
345 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
346 : }
347 0 : Ok(segment.freeze())
348 0 : }
349 :
350 : /// Look up given SLRU page version.
351 0 : pub(crate) async fn get_slru_page_at_lsn(
352 0 : &self,
353 0 : kind: SlruKind,
354 0 : segno: u32,
355 0 : blknum: BlockNumber,
356 0 : lsn: Lsn,
357 0 : ctx: &RequestContext,
358 0 : ) -> Result<Bytes, PageReconstructError> {
359 0 : let key = slru_block_to_key(kind, segno, blknum);
360 0 : self.get(key, lsn, ctx).await
361 0 : }
362 :
363 : /// Get size of an SLRU segment
364 0 : pub(crate) async fn get_slru_segment_size(
365 0 : &self,
366 0 : kind: SlruKind,
367 0 : segno: u32,
368 0 : version: Version<'_>,
369 0 : ctx: &RequestContext,
370 0 : ) -> Result<BlockNumber, PageReconstructError> {
371 0 : let key = slru_segment_size_to_key(kind, segno);
372 0 : let mut buf = version.get(self, key, ctx).await?;
373 0 : Ok(buf.get_u32_le())
374 0 : }
375 :
376 : /// Get size of an SLRU segment
377 0 : pub(crate) async fn get_slru_segment_exists(
378 0 : &self,
379 0 : kind: SlruKind,
380 0 : segno: u32,
381 0 : version: Version<'_>,
382 0 : ctx: &RequestContext,
383 0 : ) -> Result<bool, PageReconstructError> {
384 0 : // fetch directory listing
385 0 : let key = slru_dir_to_key(kind);
386 0 : let buf = version.get(self, key, ctx).await?;
387 :
388 0 : let dir = SlruSegmentDirectory::des(&buf)?;
389 0 : Ok(dir.segments.contains(&segno))
390 0 : }
391 :
392 : /// Locate LSN, such that all transactions that committed before
393 : /// 'search_timestamp' are visible, but nothing newer is.
394 : ///
395 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
396 : /// so it's not well defined which LSN you get if there were multiple commits
397 : /// "in flight" at that point in time.
398 : ///
399 0 : pub(crate) async fn find_lsn_for_timestamp(
400 0 : &self,
401 0 : search_timestamp: TimestampTz,
402 0 : cancel: &CancellationToken,
403 0 : ctx: &RequestContext,
404 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
405 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
406 :
407 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
408 0 : // We use this method to figure out the branching LSN for the new branch, but the
409 0 : // GC cutoff could be before the branching point and we cannot create a new branch
410 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
411 0 : // on the safe side.
412 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
413 0 : let max_lsn = self.get_last_record_lsn();
414 0 :
415 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
416 0 : // LSN divided by 8.
417 0 : let mut low = min_lsn.0 / 8;
418 0 : let mut high = max_lsn.0 / 8 + 1;
419 0 :
420 0 : let mut found_smaller = false;
421 0 : let mut found_larger = false;
422 :
423 0 : while low < high {
424 0 : if cancel.is_cancelled() {
425 0 : return Err(PageReconstructError::Cancelled);
426 0 : }
427 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
428 0 : let mid = (high + low) / 2;
429 :
430 0 : let cmp = self
431 0 : .is_latest_commit_timestamp_ge_than(
432 0 : search_timestamp,
433 0 : Lsn(mid * 8),
434 0 : &mut found_smaller,
435 0 : &mut found_larger,
436 0 : ctx,
437 0 : )
438 0 : .await?;
439 :
440 0 : if cmp {
441 0 : high = mid;
442 0 : } else {
443 0 : low = mid + 1;
444 0 : }
445 : }
446 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
447 : // so the LSN of the last commit record before or at `search_timestamp`.
448 : // Remove one from `low` to get `t`.
449 : //
450 : // FIXME: it would be better to get the LSN of the previous commit.
451 : // Otherwise, if you restore to the returned LSN, the database will
452 : // include physical changes from later commits that will be marked
453 : // as aborted, and will need to be vacuumed away.
454 0 : let commit_lsn = Lsn((low - 1) * 8);
455 0 : match (found_smaller, found_larger) {
456 : (false, false) => {
457 : // This can happen if no commit records have been processed yet, e.g.
458 : // just after importing a cluster.
459 0 : Ok(LsnForTimestamp::NoData(min_lsn))
460 : }
461 : (false, true) => {
462 : // Didn't find any commit timestamps smaller than the request
463 0 : Ok(LsnForTimestamp::Past(min_lsn))
464 : }
465 0 : (true, _) if commit_lsn < min_lsn => {
466 0 : // the search above did set found_smaller to true but it never increased the lsn.
467 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
468 0 : // below the min_lsn. We should never do that.
469 0 : Ok(LsnForTimestamp::Past(min_lsn))
470 : }
471 : (true, false) => {
472 : // Only found commits with timestamps smaller than the request.
473 : // It's still a valid case for branch creation, return it.
474 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
475 : // case, anyway.
476 0 : Ok(LsnForTimestamp::Future(commit_lsn))
477 : }
478 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
479 : }
480 0 : }
481 :
482 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
483 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
484 : ///
485 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
486 : /// with a smaller/larger timestamp.
487 : ///
488 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
489 0 : &self,
490 0 : search_timestamp: TimestampTz,
491 0 : probe_lsn: Lsn,
492 0 : found_smaller: &mut bool,
493 0 : found_larger: &mut bool,
494 0 : ctx: &RequestContext,
495 0 : ) -> Result<bool, PageReconstructError> {
496 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
497 0 : if timestamp >= search_timestamp {
498 0 : *found_larger = true;
499 0 : return ControlFlow::Break(true);
500 0 : } else {
501 0 : *found_smaller = true;
502 0 : }
503 0 : ControlFlow::Continue(())
504 0 : })
505 0 : .await
506 0 : }
507 :
508 : /// Obtain the possible timestamp range for the given lsn.
509 : ///
510 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
511 0 : pub(crate) async fn get_timestamp_for_lsn(
512 0 : &self,
513 0 : probe_lsn: Lsn,
514 0 : ctx: &RequestContext,
515 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
516 0 : let mut max: Option<TimestampTz> = None;
517 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
518 0 : if let Some(max_prev) = max {
519 0 : max = Some(max_prev.max(timestamp));
520 0 : } else {
521 0 : max = Some(timestamp);
522 0 : }
523 0 : ControlFlow::Continue(())
524 0 : })
525 0 : .await?;
526 :
527 0 : Ok(max)
528 0 : }
529 :
530 : /// Runs the given function on all the timestamps for a given lsn
531 : ///
532 : /// The return value is either given by the closure, or set to the `Default`
533 : /// impl's output.
534 0 : async fn map_all_timestamps<T: Default>(
535 0 : &self,
536 0 : probe_lsn: Lsn,
537 0 : ctx: &RequestContext,
538 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
539 0 : ) -> Result<T, PageReconstructError> {
540 0 : for segno in self
541 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
542 0 : .await?
543 : {
544 0 : let nblocks = self
545 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
546 0 : .await?;
547 0 : for blknum in (0..nblocks).rev() {
548 0 : let clog_page = self
549 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
550 0 : .await?;
551 :
552 0 : if clog_page.len() == BLCKSZ as usize + 8 {
553 0 : let mut timestamp_bytes = [0u8; 8];
554 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
555 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
556 0 :
557 0 : match f(timestamp) {
558 0 : ControlFlow::Break(b) => return Ok(b),
559 0 : ControlFlow::Continue(()) => (),
560 : }
561 0 : }
562 : }
563 : }
564 0 : Ok(Default::default())
565 0 : }
566 :
567 0 : pub(crate) async fn get_slru_keyspace(
568 0 : &self,
569 0 : version: Version<'_>,
570 0 : ctx: &RequestContext,
571 0 : ) -> Result<KeySpace, PageReconstructError> {
572 0 : let mut accum = KeySpaceAccum::new();
573 :
574 0 : for kind in SlruKind::iter() {
575 0 : let mut segments: Vec<u32> = self
576 0 : .list_slru_segments(kind, version, ctx)
577 0 : .await?
578 0 : .into_iter()
579 0 : .collect();
580 0 : segments.sort_unstable();
581 :
582 0 : for seg in segments {
583 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
584 :
585 0 : accum.add_range(
586 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
587 0 : );
588 : }
589 : }
590 :
591 0 : Ok(accum.to_keyspace())
592 0 : }
593 :
594 : /// Get a list of SLRU segments
595 0 : pub(crate) async fn list_slru_segments(
596 0 : &self,
597 0 : kind: SlruKind,
598 0 : version: Version<'_>,
599 0 : ctx: &RequestContext,
600 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
601 0 : // fetch directory entry
602 0 : let key = slru_dir_to_key(kind);
603 :
604 0 : let buf = version.get(self, key, ctx).await?;
605 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
606 0 : }
607 :
608 0 : pub(crate) async fn get_relmap_file(
609 0 : &self,
610 0 : spcnode: Oid,
611 0 : dbnode: Oid,
612 0 : version: Version<'_>,
613 0 : ctx: &RequestContext,
614 0 : ) -> Result<Bytes, PageReconstructError> {
615 0 : let key = relmap_file_key(spcnode, dbnode);
616 :
617 0 : let buf = version.get(self, key, ctx).await?;
618 0 : Ok(buf)
619 0 : }
620 :
621 864 : pub(crate) async fn list_dbdirs(
622 864 : &self,
623 864 : lsn: Lsn,
624 864 : ctx: &RequestContext,
625 864 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
626 : // fetch directory entry
627 9382 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
628 :
629 864 : Ok(DbDirectory::des(&buf)?.dbdirs)
630 864 : }
631 :
632 0 : pub(crate) async fn get_twophase_file(
633 0 : &self,
634 0 : xid: TransactionId,
635 0 : lsn: Lsn,
636 0 : ctx: &RequestContext,
637 0 : ) -> Result<Bytes, PageReconstructError> {
638 0 : let key = twophase_file_key(xid);
639 0 : let buf = self.get(key, lsn, ctx).await?;
640 0 : Ok(buf)
641 0 : }
642 :
643 6 : pub(crate) async fn list_twophase_files(
644 6 : &self,
645 6 : lsn: Lsn,
646 6 : ctx: &RequestContext,
647 6 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
648 : // fetch directory entry
649 6 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
650 :
651 6 : Ok(TwoPhaseDirectory::des(&buf)?.xids)
652 6 : }
653 :
654 0 : pub(crate) async fn get_control_file(
655 0 : &self,
656 0 : lsn: Lsn,
657 0 : ctx: &RequestContext,
658 0 : ) -> Result<Bytes, PageReconstructError> {
659 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
660 0 : }
661 :
662 36 : pub(crate) async fn get_checkpoint(
663 36 : &self,
664 36 : lsn: Lsn,
665 36 : ctx: &RequestContext,
666 36 : ) -> Result<Bytes, PageReconstructError> {
667 36 : self.get(CHECKPOINT_KEY, lsn, ctx).await
668 36 : }
669 :
670 48 : async fn list_aux_files_v1(
671 48 : &self,
672 48 : lsn: Lsn,
673 48 : ctx: &RequestContext,
674 48 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
675 48 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
676 30 : Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
677 18 : Err(e) => {
678 18 : // This is expected: historical databases do not have the key.
679 18 : debug!("Failed to get info about AUX files: {}", e);
680 18 : Ok(HashMap::new())
681 : }
682 : }
683 48 : }
684 :
685 72 : async fn list_aux_files_v2(
686 72 : &self,
687 72 : lsn: Lsn,
688 72 : ctx: &RequestContext,
689 72 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
690 72 : let kv = self
691 72 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
692 0 : .await?;
693 72 : let mut result = HashMap::new();
694 72 : let mut sz = 0;
695 180 : for (_, v) in kv {
696 108 : let v = v?;
697 108 : let v = aux_file::decode_file_value_bytes(&v)
698 108 : .context("value decode")
699 108 : .map_err(PageReconstructError::Other)?;
700 210 : for (fname, content) in v {
701 102 : sz += fname.len();
702 102 : sz += content.len();
703 102 : result.insert(fname, content);
704 102 : }
705 : }
706 72 : self.aux_file_size_estimator.on_initial(sz);
707 72 : Ok(result)
708 72 : }
709 :
710 0 : pub(crate) async fn trigger_aux_file_size_computation(
711 0 : &self,
712 0 : lsn: Lsn,
713 0 : ctx: &RequestContext,
714 0 : ) -> Result<(), PageReconstructError> {
715 0 : let current_policy = self.last_aux_file_policy.load();
716 0 : if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
717 0 : self.list_aux_files_v2(lsn, ctx).await?;
718 0 : }
719 0 : Ok(())
720 0 : }
721 :
722 78 : pub(crate) async fn list_aux_files(
723 78 : &self,
724 78 : lsn: Lsn,
725 78 : ctx: &RequestContext,
726 78 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
727 78 : let current_policy = self.last_aux_file_policy.load();
728 78 : match current_policy {
729 : Some(AuxFilePolicy::V1) => {
730 6 : warn!("this timeline is using deprecated aux file policy V1 (policy=V1)");
731 6 : self.list_aux_files_v1(lsn, ctx).await
732 : }
733 : None => {
734 0 : let res = self.list_aux_files_v1(lsn, ctx).await?;
735 0 : if !res.is_empty() {
736 0 : warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
737 0 : }
738 0 : Ok(res)
739 : }
740 66 : Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
741 : Some(AuxFilePolicy::CrossValidation) => {
742 6 : let v1_result = self.list_aux_files_v1(lsn, ctx).await;
743 6 : let v2_result = self.list_aux_files_v2(lsn, ctx).await;
744 6 : match (v1_result, v2_result) {
745 6 : (Ok(v1), Ok(v2)) => {
746 6 : if v1 != v2 {
747 0 : tracing::error!(
748 0 : "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
749 : );
750 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
751 0 : "unmatched aux file v1 v2 result"
752 0 : )));
753 6 : }
754 6 : Ok(v1)
755 : }
756 0 : (Ok(_), Err(v2)) => {
757 0 : tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
758 0 : Err(v2)
759 : }
760 0 : (Err(v1), Ok(_)) => {
761 0 : tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
762 0 : Err(v1)
763 : }
764 0 : (Err(_), Err(v2)) => Err(v2),
765 : }
766 : }
767 : }
768 78 : }
769 :
770 0 : pub(crate) async fn get_replorigins(
771 0 : &self,
772 0 : lsn: Lsn,
773 0 : ctx: &RequestContext,
774 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
775 0 : let kv = self
776 0 : .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
777 0 : .await?;
778 0 : let mut result = HashMap::new();
779 0 : for (k, v) in kv {
780 0 : let v = v?;
781 0 : let origin_id = k.field6 as RepOriginId;
782 0 : let origin_lsn = Lsn::des(&v).unwrap();
783 0 : if origin_lsn != Lsn::INVALID {
784 0 : result.insert(origin_id, origin_lsn);
785 0 : }
786 : }
787 0 : Ok(result)
788 0 : }
789 :
790 : /// Does the same as get_current_logical_size but counted on demand.
791 : /// Used to initialize the logical size tracking on startup.
792 : ///
793 : /// Only relation blocks are counted currently. That excludes metadata,
794 : /// SLRUs, twophase files etc.
795 : ///
796 : /// # Cancel-Safety
797 : ///
798 : /// This method is cancellation-safe.
799 0 : pub(crate) async fn get_current_logical_size_non_incremental(
800 0 : &self,
801 0 : lsn: Lsn,
802 0 : ctx: &RequestContext,
803 0 : ) -> Result<u64, CalculateLogicalSizeError> {
804 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
805 :
806 : // Fetch list of database dirs and iterate them
807 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
808 0 : let dbdir = DbDirectory::des(&buf)?;
809 :
810 0 : let mut total_size: u64 = 0;
811 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
812 0 : for rel in self
813 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
814 0 : .await?
815 : {
816 0 : if self.cancel.is_cancelled() {
817 0 : return Err(CalculateLogicalSizeError::Cancelled);
818 0 : }
819 0 : let relsize_key = rel_size_to_key(rel);
820 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
821 0 : let relsize = buf.get_u32_le();
822 0 :
823 0 : total_size += relsize as u64;
824 : }
825 : }
826 0 : Ok(total_size * BLCKSZ as u64)
827 0 : }
828 :
829 : ///
830 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
831 : /// Anything that's not listed maybe removed from the underlying storage (from
832 : /// that LSN forwards).
833 : ///
834 : /// The return value is (dense keyspace, sparse keyspace).
835 864 : pub(crate) async fn collect_keyspace(
836 864 : &self,
837 864 : lsn: Lsn,
838 864 : ctx: &RequestContext,
839 864 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
840 864 : // Iterate through key ranges, greedily packing them into partitions
841 864 : let mut result = KeySpaceAccum::new();
842 864 :
843 864 : // The dbdir metadata always exists
844 864 : result.add_key(DBDIR_KEY);
845 :
846 : // Fetch list of database dirs and iterate them
847 9382 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
848 864 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
849 864 :
850 864 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
851 864 : for ((spcnode, dbnode), has_relmap_file) in dbs {
852 0 : if has_relmap_file {
853 0 : result.add_key(relmap_file_key(spcnode, dbnode));
854 0 : }
855 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
856 :
857 0 : let mut rels: Vec<RelTag> = self
858 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
859 0 : .await?
860 0 : .into_iter()
861 0 : .collect();
862 0 : rels.sort_unstable();
863 0 : for rel in rels {
864 0 : let relsize_key = rel_size_to_key(rel);
865 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
866 0 : let relsize = buf.get_u32_le();
867 0 :
868 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
869 0 : result.add_key(relsize_key);
870 : }
871 : }
872 :
873 : // Iterate SLRUs next
874 2592 : for kind in [
875 864 : SlruKind::Clog,
876 864 : SlruKind::MultiXactMembers,
877 864 : SlruKind::MultiXactOffsets,
878 : ] {
879 2592 : let slrudir_key = slru_dir_to_key(kind);
880 2592 : result.add_key(slrudir_key);
881 28707 : let buf = self.get(slrudir_key, lsn, ctx).await?;
882 2592 : let dir = SlruSegmentDirectory::des(&buf)?;
883 2592 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
884 2592 : segments.sort_unstable();
885 2592 : for segno in segments {
886 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
887 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
888 0 : let segsize = buf.get_u32_le();
889 0 :
890 0 : result.add_range(
891 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
892 0 : );
893 0 : result.add_key(segsize_key);
894 : }
895 : }
896 :
897 : // Then pg_twophase
898 864 : result.add_key(TWOPHASEDIR_KEY);
899 9401 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
900 864 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
901 864 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
902 864 : xids.sort_unstable();
903 864 : for xid in xids {
904 0 : result.add_key(twophase_file_key(xid));
905 0 : }
906 :
907 864 : result.add_key(CONTROLFILE_KEY);
908 864 : result.add_key(CHECKPOINT_KEY);
909 864 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
910 12 : result.add_key(AUX_FILES_KEY);
911 852 : }
912 :
913 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
914 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
915 : // and the keys will not be garbage-colllected.
916 : #[cfg(test)]
917 : {
918 864 : let guard = self.extra_test_dense_keyspace.load();
919 864 : for kr in &guard.ranges {
920 0 : result.add_range(kr.clone());
921 0 : }
922 : }
923 :
924 864 : let dense_keyspace = result.to_keyspace();
925 864 : let sparse_keyspace = SparseKeySpace(KeySpace {
926 864 : ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
927 864 : });
928 864 :
929 864 : if cfg!(debug_assertions) {
930 : // Verify if the sparse keyspaces are ordered and non-overlapping.
931 :
932 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
933 : // category of sparse keys are split into their own image/delta files. If there
934 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
935 : // and we want the developer to keep the keyspaces separated.
936 :
937 864 : let ranges = &sparse_keyspace.0.ranges;
938 :
939 : // TODO: use a single overlaps_with across the codebase
940 864 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
941 864 : !(a.end <= b.start || b.end <= a.start)
942 864 : }
943 1728 : for i in 0..ranges.len() {
944 1728 : for j in 0..i {
945 864 : if overlaps_with(&ranges[i], &ranges[j]) {
946 0 : panic!(
947 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
948 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
949 0 : );
950 864 : }
951 : }
952 : }
953 864 : for i in 1..ranges.len() {
954 864 : assert!(
955 864 : ranges[i - 1].end <= ranges[i].start,
956 0 : "unordered sparse keyspace: {}..{} and {}..{}",
957 0 : ranges[i - 1].start,
958 0 : ranges[i - 1].end,
959 0 : ranges[i].start,
960 0 : ranges[i].end
961 : );
962 : }
963 0 : }
964 :
965 864 : Ok((dense_keyspace, sparse_keyspace))
966 864 : }
967 :
968 : /// Get cached size of relation if it not updated after specified LSN
969 1345620 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
970 1345620 : let rel_size_cache = self.rel_size_cache.read().unwrap();
971 1345620 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
972 1345554 : if lsn >= *cached_lsn {
973 1330116 : return Some(*nblocks);
974 15438 : }
975 66 : }
976 15504 : None
977 1345620 : }
978 :
979 : /// Update cached relation size if there is no more recent update
980 15408 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
981 15408 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
982 15408 :
983 15408 : if lsn < rel_size_cache.complete_as_of {
984 : // Do not cache old values. It's safe to cache the size on read, as long as
985 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
986 : // never evict values from the cache, so if the relation size changed after
987 : // 'lsn', the new value is already in the cache.
988 0 : return;
989 15408 : }
990 15408 :
991 15408 : match rel_size_cache.map.entry(tag) {
992 15408 : hash_map::Entry::Occupied(mut entry) => {
993 15408 : let cached_lsn = entry.get_mut();
994 15408 : if lsn >= cached_lsn.0 {
995 0 : *cached_lsn = (lsn, nblocks);
996 15408 : }
997 : }
998 0 : hash_map::Entry::Vacant(entry) => {
999 0 : entry.insert((lsn, nblocks));
1000 0 : }
1001 : }
1002 15408 : }
1003 :
1004 : /// Store cached relation size
1005 866196 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1006 866196 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1007 866196 : rel_size_cache.map.insert(tag, (lsn, nblocks));
1008 866196 : }
1009 :
1010 : /// Remove cached relation size
1011 6 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1012 6 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1013 6 : rel_size_cache.map.remove(tag);
1014 6 : }
1015 : }
1016 :
1017 : /// DatadirModification represents an operation to ingest an atomic set of
1018 : /// updates to the repository. It is created by the 'begin_record'
1019 : /// function. It is called for each WAL record, so that all the modifications
1020 : /// by a one WAL record appear atomic.
1021 : pub struct DatadirModification<'a> {
1022 : /// The timeline this modification applies to. You can access this to
1023 : /// read the state, but note that any pending updates are *not* reflected
1024 : /// in the state in 'tline' yet.
1025 : pub tline: &'a Timeline,
1026 :
1027 : /// Current LSN of the modification
1028 : lsn: Lsn,
1029 :
1030 : // The modifications are not applied directly to the underlying key-value store.
1031 : // The put-functions add the modifications here, and they are flushed to the
1032 : // underlying key-value store by the 'finish' function.
1033 : pending_lsns: Vec<Lsn>,
1034 : pending_updates: HashMap<Key, Vec<(Lsn, usize, Value)>>,
1035 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1036 : pending_nblocks: i64,
1037 :
1038 : /// For special "directory" keys that store key-value maps, track the size of the map
1039 : /// if it was updated in this modification.
1040 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
1041 :
1042 : /// An **approximation** of how large our EphemeralFile write will be when committed.
1043 : pending_bytes: usize,
1044 : }
1045 :
1046 : impl<'a> DatadirModification<'a> {
1047 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1048 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1049 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1050 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1051 :
1052 : /// Get the current lsn
1053 1254168 : pub(crate) fn get_lsn(&self) -> Lsn {
1054 1254168 : self.lsn
1055 1254168 : }
1056 :
1057 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1058 0 : self.pending_bytes
1059 0 : }
1060 :
1061 : /// Set the current lsn
1062 437574 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1063 437574 : ensure!(
1064 437574 : lsn >= self.lsn,
1065 0 : "setting an older lsn {} than {} is not allowed",
1066 : lsn,
1067 : self.lsn
1068 : );
1069 437574 : if lsn > self.lsn {
1070 437574 : self.pending_lsns.push(self.lsn);
1071 437574 : self.lsn = lsn;
1072 437574 : }
1073 437574 : Ok(())
1074 437574 : }
1075 :
1076 : /// Initialize a completely new repository.
1077 : ///
1078 : /// This inserts the directory metadata entries that are assumed to
1079 : /// always exist.
1080 522 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1081 522 : let buf = DbDirectory::ser(&DbDirectory {
1082 522 : dbdirs: HashMap::new(),
1083 522 : })?;
1084 522 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
1085 522 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1086 522 :
1087 522 : // Create AuxFilesDirectory
1088 522 : self.init_aux_dir()?;
1089 :
1090 522 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1091 522 : xids: HashSet::new(),
1092 522 : })?;
1093 522 : self.pending_directory_entries
1094 522 : .push((DirectoryKind::TwoPhase, 0));
1095 522 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1096 :
1097 522 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1098 522 : let empty_dir = Value::Image(buf);
1099 522 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1100 522 : self.pending_directory_entries
1101 522 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1102 522 : self.put(
1103 522 : slru_dir_to_key(SlruKind::MultiXactMembers),
1104 522 : empty_dir.clone(),
1105 522 : );
1106 522 : self.pending_directory_entries
1107 522 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1108 522 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1109 522 : self.pending_directory_entries
1110 522 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1111 522 :
1112 522 : Ok(())
1113 522 : }
1114 :
1115 : #[cfg(test)]
1116 516 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1117 516 : self.init_empty()?;
1118 516 : self.put_control_file(bytes::Bytes::from_static(
1119 516 : b"control_file contents do not matter",
1120 516 : ))
1121 516 : .context("put_control_file")?;
1122 516 : self.put_checkpoint(bytes::Bytes::from_static(
1123 516 : b"checkpoint_file contents do not matter",
1124 516 : ))
1125 516 : .context("put_checkpoint_file")?;
1126 516 : Ok(())
1127 516 : }
1128 :
1129 : /// Put a new page version that can be constructed from a WAL record
1130 : ///
1131 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1132 : /// current end-of-file. It's up to the caller to check that the relation size
1133 : /// matches the blocks inserted!
1134 436890 : pub fn put_rel_wal_record(
1135 436890 : &mut self,
1136 436890 : rel: RelTag,
1137 436890 : blknum: BlockNumber,
1138 436890 : rec: NeonWalRecord,
1139 436890 : ) -> anyhow::Result<()> {
1140 436890 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1141 436890 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1142 436890 : Ok(())
1143 436890 : }
1144 :
1145 : // Same, but for an SLRU.
1146 24 : pub fn put_slru_wal_record(
1147 24 : &mut self,
1148 24 : kind: SlruKind,
1149 24 : segno: u32,
1150 24 : blknum: BlockNumber,
1151 24 : rec: NeonWalRecord,
1152 24 : ) -> anyhow::Result<()> {
1153 24 : self.put(
1154 24 : slru_block_to_key(kind, segno, blknum),
1155 24 : Value::WalRecord(rec),
1156 24 : );
1157 24 : Ok(())
1158 24 : }
1159 :
1160 : /// Like put_wal_record, but with ready-made image of the page.
1161 842592 : pub fn put_rel_page_image(
1162 842592 : &mut self,
1163 842592 : rel: RelTag,
1164 842592 : blknum: BlockNumber,
1165 842592 : img: Bytes,
1166 842592 : ) -> anyhow::Result<()> {
1167 842592 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1168 842592 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1169 842592 : Ok(())
1170 842592 : }
1171 :
1172 18 : pub fn put_slru_page_image(
1173 18 : &mut self,
1174 18 : kind: SlruKind,
1175 18 : segno: u32,
1176 18 : blknum: BlockNumber,
1177 18 : img: Bytes,
1178 18 : ) -> anyhow::Result<()> {
1179 18 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
1180 18 : Ok(())
1181 18 : }
1182 :
1183 : /// Store a relmapper file (pg_filenode.map) in the repository
1184 48 : pub async fn put_relmap_file(
1185 48 : &mut self,
1186 48 : spcnode: Oid,
1187 48 : dbnode: Oid,
1188 48 : img: Bytes,
1189 48 : ctx: &RequestContext,
1190 48 : ) -> anyhow::Result<()> {
1191 : // Add it to the directory (if it doesn't exist already)
1192 48 : let buf = self.get(DBDIR_KEY, ctx).await?;
1193 48 : let mut dbdir = DbDirectory::des(&buf)?;
1194 :
1195 48 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1196 48 : if r.is_none() || r == Some(false) {
1197 : // The dbdir entry didn't exist, or it contained a
1198 : // 'false'. The 'insert' call already updated it with
1199 : // 'true', now write the updated 'dbdirs' map back.
1200 48 : let buf = DbDirectory::ser(&dbdir)?;
1201 48 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1202 48 :
1203 48 : // Create AuxFilesDirectory as well
1204 48 : self.init_aux_dir()?;
1205 0 : }
1206 48 : if r.is_none() {
1207 24 : // Create RelDirectory
1208 24 : let buf = RelDirectory::ser(&RelDirectory {
1209 24 : rels: HashSet::new(),
1210 24 : })?;
1211 24 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1212 24 : self.put(
1213 24 : rel_dir_to_key(spcnode, dbnode),
1214 24 : Value::Image(Bytes::from(buf)),
1215 24 : );
1216 24 : }
1217 :
1218 48 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1219 48 : Ok(())
1220 48 : }
1221 :
1222 0 : pub async fn put_twophase_file(
1223 0 : &mut self,
1224 0 : xid: TransactionId,
1225 0 : img: Bytes,
1226 0 : ctx: &RequestContext,
1227 0 : ) -> anyhow::Result<()> {
1228 : // Add it to the directory entry
1229 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1230 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1231 0 : if !dir.xids.insert(xid) {
1232 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1233 0 : }
1234 0 : self.pending_directory_entries
1235 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1236 0 : self.put(
1237 0 : TWOPHASEDIR_KEY,
1238 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1239 : );
1240 :
1241 0 : self.put(twophase_file_key(xid), Value::Image(img));
1242 0 : Ok(())
1243 0 : }
1244 :
1245 0 : pub async fn set_replorigin(
1246 0 : &mut self,
1247 0 : origin_id: RepOriginId,
1248 0 : origin_lsn: Lsn,
1249 0 : ) -> anyhow::Result<()> {
1250 0 : let key = repl_origin_key(origin_id);
1251 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1252 0 : Ok(())
1253 0 : }
1254 :
1255 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1256 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1257 0 : }
1258 :
1259 522 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1260 522 : self.put(CONTROLFILE_KEY, Value::Image(img));
1261 522 : Ok(())
1262 522 : }
1263 :
1264 564 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1265 564 : self.put(CHECKPOINT_KEY, Value::Image(img));
1266 564 : Ok(())
1267 564 : }
1268 :
1269 0 : pub async fn drop_dbdir(
1270 0 : &mut self,
1271 0 : spcnode: Oid,
1272 0 : dbnode: Oid,
1273 0 : ctx: &RequestContext,
1274 0 : ) -> anyhow::Result<()> {
1275 0 : let total_blocks = self
1276 0 : .tline
1277 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1278 0 : .await?;
1279 :
1280 : // Remove entry from dbdir
1281 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1282 0 : let mut dir = DbDirectory::des(&buf)?;
1283 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1284 0 : let buf = DbDirectory::ser(&dir)?;
1285 0 : self.pending_directory_entries
1286 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1287 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1288 : } else {
1289 0 : warn!(
1290 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1291 : spcnode, dbnode
1292 : );
1293 : }
1294 :
1295 : // Update logical database size.
1296 0 : self.pending_nblocks -= total_blocks as i64;
1297 0 :
1298 0 : // Delete all relations and metadata files for the spcnode/dnode
1299 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1300 0 : Ok(())
1301 0 : }
1302 :
1303 : /// Create a relation fork.
1304 : ///
1305 : /// 'nblocks' is the initial size.
1306 5760 : pub async fn put_rel_creation(
1307 5760 : &mut self,
1308 5760 : rel: RelTag,
1309 5760 : nblocks: BlockNumber,
1310 5760 : ctx: &RequestContext,
1311 5760 : ) -> Result<(), RelationError> {
1312 5760 : if rel.relnode == 0 {
1313 0 : return Err(RelationError::InvalidRelnode);
1314 5760 : }
1315 : // It's possible that this is the first rel for this db in this
1316 : // tablespace. Create the reldir entry for it if so.
1317 5760 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1318 5760 : .context("deserialize db")?;
1319 5760 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1320 5760 : let mut rel_dir =
1321 5760 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1322 : // Didn't exist. Update dbdir
1323 24 : e.insert(false);
1324 24 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1325 24 : self.pending_directory_entries
1326 24 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1327 24 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1328 24 :
1329 24 : // and create the RelDirectory
1330 24 : RelDirectory::default()
1331 : } else {
1332 : // reldir already exists, fetch it
1333 5736 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1334 5736 : .context("deserialize db")?
1335 : };
1336 :
1337 : // Add the new relation to the rel directory entry, and write it back
1338 5760 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1339 0 : return Err(RelationError::AlreadyExists);
1340 5760 : }
1341 5760 :
1342 5760 : self.pending_directory_entries
1343 5760 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1344 5760 :
1345 5760 : self.put(
1346 5760 : rel_dir_key,
1347 5760 : Value::Image(Bytes::from(
1348 5760 : RelDirectory::ser(&rel_dir).context("serialize")?,
1349 : )),
1350 : );
1351 :
1352 : // Put size
1353 5760 : let size_key = rel_size_to_key(rel);
1354 5760 : let buf = nblocks.to_le_bytes();
1355 5760 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1356 5760 :
1357 5760 : self.pending_nblocks += nblocks as i64;
1358 5760 :
1359 5760 : // Update relation size cache
1360 5760 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1361 5760 :
1362 5760 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1363 5760 : // caller.
1364 5760 : Ok(())
1365 5760 : }
1366 :
1367 : /// Truncate relation
1368 18036 : pub async fn put_rel_truncation(
1369 18036 : &mut self,
1370 18036 : rel: RelTag,
1371 18036 : nblocks: BlockNumber,
1372 18036 : ctx: &RequestContext,
1373 18036 : ) -> anyhow::Result<()> {
1374 18036 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1375 18036 : if self
1376 18036 : .tline
1377 18036 : .get_rel_exists(rel, Version::Modified(self), ctx)
1378 0 : .await?
1379 : {
1380 18036 : let size_key = rel_size_to_key(rel);
1381 : // Fetch the old size first
1382 18036 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1383 18036 :
1384 18036 : // Update the entry with the new size.
1385 18036 : let buf = nblocks.to_le_bytes();
1386 18036 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1387 18036 :
1388 18036 : // Update relation size cache
1389 18036 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1390 18036 :
1391 18036 : // Update relation size cache
1392 18036 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1393 18036 :
1394 18036 : // Update logical database size.
1395 18036 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1396 0 : }
1397 18036 : Ok(())
1398 18036 : }
1399 :
1400 : /// Extend relation
1401 : /// If new size is smaller, do nothing.
1402 830040 : pub async fn put_rel_extend(
1403 830040 : &mut self,
1404 830040 : rel: RelTag,
1405 830040 : nblocks: BlockNumber,
1406 830040 : ctx: &RequestContext,
1407 830040 : ) -> anyhow::Result<()> {
1408 830040 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1409 :
1410 : // Put size
1411 830040 : let size_key = rel_size_to_key(rel);
1412 830040 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1413 830040 :
1414 830040 : // only extend relation here. never decrease the size
1415 830040 : if nblocks > old_size {
1416 824364 : let buf = nblocks.to_le_bytes();
1417 824364 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1418 824364 :
1419 824364 : // Update relation size cache
1420 824364 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1421 824364 :
1422 824364 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1423 824364 : }
1424 830040 : Ok(())
1425 830040 : }
1426 :
1427 : /// Drop a relation.
1428 6 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1429 6 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1430 :
1431 : // Remove it from the directory entry
1432 6 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1433 6 : let buf = self.get(dir_key, ctx).await?;
1434 6 : let mut dir = RelDirectory::des(&buf)?;
1435 :
1436 6 : self.pending_directory_entries
1437 6 : .push((DirectoryKind::Rel, dir.rels.len()));
1438 6 :
1439 6 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1440 6 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1441 : } else {
1442 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1443 : }
1444 :
1445 : // update logical size
1446 6 : let size_key = rel_size_to_key(rel);
1447 6 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1448 6 : self.pending_nblocks -= old_size as i64;
1449 6 :
1450 6 : // Remove enty from relation size cache
1451 6 : self.tline.remove_cached_rel_size(&rel);
1452 6 :
1453 6 : // Delete size entry, as well as all blocks
1454 6 : self.delete(rel_key_range(rel));
1455 6 :
1456 6 : Ok(())
1457 6 : }
1458 :
1459 18 : pub async fn put_slru_segment_creation(
1460 18 : &mut self,
1461 18 : kind: SlruKind,
1462 18 : segno: u32,
1463 18 : nblocks: BlockNumber,
1464 18 : ctx: &RequestContext,
1465 18 : ) -> anyhow::Result<()> {
1466 18 : // Add it to the directory entry
1467 18 : let dir_key = slru_dir_to_key(kind);
1468 18 : let buf = self.get(dir_key, ctx).await?;
1469 18 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1470 :
1471 18 : if !dir.segments.insert(segno) {
1472 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1473 18 : }
1474 18 : self.pending_directory_entries
1475 18 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1476 18 : self.put(
1477 18 : dir_key,
1478 18 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1479 : );
1480 :
1481 : // Put size
1482 18 : let size_key = slru_segment_size_to_key(kind, segno);
1483 18 : let buf = nblocks.to_le_bytes();
1484 18 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1485 18 :
1486 18 : // even if nblocks > 0, we don't insert any actual blocks here
1487 18 :
1488 18 : Ok(())
1489 18 : }
1490 :
1491 : /// Extend SLRU segment
1492 0 : pub fn put_slru_extend(
1493 0 : &mut self,
1494 0 : kind: SlruKind,
1495 0 : segno: u32,
1496 0 : nblocks: BlockNumber,
1497 0 : ) -> anyhow::Result<()> {
1498 0 : // Put size
1499 0 : let size_key = slru_segment_size_to_key(kind, segno);
1500 0 : let buf = nblocks.to_le_bytes();
1501 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1502 0 : Ok(())
1503 0 : }
1504 :
1505 : /// This method is used for marking truncated SLRU files
1506 0 : pub async fn drop_slru_segment(
1507 0 : &mut self,
1508 0 : kind: SlruKind,
1509 0 : segno: u32,
1510 0 : ctx: &RequestContext,
1511 0 : ) -> anyhow::Result<()> {
1512 0 : // Remove it from the directory entry
1513 0 : let dir_key = slru_dir_to_key(kind);
1514 0 : let buf = self.get(dir_key, ctx).await?;
1515 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1516 :
1517 0 : if !dir.segments.remove(&segno) {
1518 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1519 0 : }
1520 0 : self.pending_directory_entries
1521 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1522 0 : self.put(
1523 0 : dir_key,
1524 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1525 : );
1526 :
1527 : // Delete size entry, as well as all blocks
1528 0 : self.delete(slru_segment_key_range(kind, segno));
1529 0 :
1530 0 : Ok(())
1531 0 : }
1532 :
1533 : /// Drop a relmapper file (pg_filenode.map)
1534 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1535 0 : // TODO
1536 0 : Ok(())
1537 0 : }
1538 :
1539 : /// This method is used for marking truncated SLRU files
1540 0 : pub async fn drop_twophase_file(
1541 0 : &mut self,
1542 0 : xid: TransactionId,
1543 0 : ctx: &RequestContext,
1544 0 : ) -> anyhow::Result<()> {
1545 : // Remove it from the directory entry
1546 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1547 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1548 :
1549 0 : if !dir.xids.remove(&xid) {
1550 0 : warn!("twophase file for xid {} does not exist", xid);
1551 0 : }
1552 0 : self.pending_directory_entries
1553 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1554 0 : self.put(
1555 0 : TWOPHASEDIR_KEY,
1556 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1557 : );
1558 :
1559 : // Delete it
1560 0 : self.delete(twophase_key_range(xid));
1561 0 :
1562 0 : Ok(())
1563 0 : }
1564 :
1565 570 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1566 570 : if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
1567 558 : return Ok(());
1568 12 : }
1569 12 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1570 12 : files: HashMap::new(),
1571 12 : })?;
1572 12 : self.pending_directory_entries
1573 12 : .push((DirectoryKind::AuxFiles, 0));
1574 12 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1575 12 : Ok(())
1576 570 : }
1577 :
1578 90 : pub async fn put_file(
1579 90 : &mut self,
1580 90 : path: &str,
1581 90 : content: &[u8],
1582 90 : ctx: &RequestContext,
1583 90 : ) -> anyhow::Result<()> {
1584 90 : let switch_policy = self.tline.get_switch_aux_file_policy();
1585 :
1586 90 : let policy = {
1587 90 : let current_policy = self.tline.last_aux_file_policy.load();
1588 : // Allowed switch path:
1589 : // * no aux files -> v1/v2/cross-validation
1590 : // * cross-validation->v2
1591 :
1592 90 : let current_policy = if current_policy.is_none() {
1593 : // This path will only be hit once per tenant: we will decide the final policy in this code block.
1594 : // The next call to `put_file` will always have `last_aux_file_policy != None`.
1595 36 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1596 36 : let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
1597 36 : if aux_files_key_v1.is_empty() {
1598 30 : None
1599 : } else {
1600 6 : warn!("this timeline is using deprecated aux file policy V1");
1601 6 : self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
1602 6 : Some(AuxFilePolicy::V1)
1603 : }
1604 : } else {
1605 54 : current_policy
1606 : };
1607 :
1608 90 : if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
1609 36 : self.tline.do_switch_aux_policy(switch_policy)?;
1610 36 : info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
1611 36 : switch_policy
1612 : } else {
1613 : // This branch handles non-valid migration path, and the case that switch_policy == current_policy.
1614 : // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
1615 54 : current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
1616 : }
1617 : };
1618 :
1619 90 : if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
1620 78 : let key = aux_file::encode_aux_file_key(path);
1621 : // retrieve the key from the engine
1622 78 : let old_val = match self.get(key, ctx).await {
1623 18 : Ok(val) => Some(val),
1624 60 : Err(PageReconstructError::MissingKey(_)) => None,
1625 0 : Err(e) => return Err(e.into()),
1626 : };
1627 78 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
1628 18 : aux_file::decode_file_value(old_val)?
1629 : } else {
1630 60 : Vec::new()
1631 : };
1632 78 : let mut other_files = Vec::with_capacity(files.len());
1633 78 : let mut modifying_file = None;
1634 96 : for file @ (p, content) in files {
1635 18 : if path == p {
1636 18 : assert!(
1637 18 : modifying_file.is_none(),
1638 0 : "duplicated entries found for {}",
1639 : path
1640 : );
1641 18 : modifying_file = Some(content);
1642 0 : } else {
1643 0 : other_files.push(file);
1644 0 : }
1645 : }
1646 78 : let mut new_files = other_files;
1647 78 : match (modifying_file, content.is_empty()) {
1648 12 : (Some(old_content), false) => {
1649 12 : self.tline
1650 12 : .aux_file_size_estimator
1651 12 : .on_update(old_content.len(), content.len());
1652 12 : new_files.push((path, content));
1653 12 : }
1654 6 : (Some(old_content), true) => {
1655 6 : self.tline
1656 6 : .aux_file_size_estimator
1657 6 : .on_remove(old_content.len());
1658 6 : // not adding the file key to the final `new_files` vec.
1659 6 : }
1660 60 : (None, false) => {
1661 60 : self.tline.aux_file_size_estimator.on_add(content.len());
1662 60 : new_files.push((path, content));
1663 60 : }
1664 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
1665 : }
1666 78 : let new_val = aux_file::encode_file_value(&new_files)?;
1667 78 : self.put(key, Value::Image(new_val.into()));
1668 12 : }
1669 :
1670 90 : if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
1671 18 : let file_path = path.to_string();
1672 18 : let content = if content.is_empty() {
1673 0 : None
1674 : } else {
1675 18 : Some(Bytes::copy_from_slice(content))
1676 : };
1677 :
1678 : let n_files;
1679 18 : let mut aux_files = self.tline.aux_files.lock().await;
1680 18 : if let Some(mut dir) = aux_files.dir.take() {
1681 : // We already updated aux files in `self`: emit a delta and update our latest value.
1682 0 : dir.upsert(file_path.clone(), content.clone());
1683 0 : n_files = dir.files.len();
1684 0 : if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
1685 0 : self.put(
1686 0 : AUX_FILES_KEY,
1687 0 : Value::Image(Bytes::from(
1688 0 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1689 : )),
1690 : );
1691 0 : aux_files.n_deltas = 0;
1692 0 : } else {
1693 0 : self.put(
1694 0 : AUX_FILES_KEY,
1695 0 : Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
1696 0 : );
1697 0 : aux_files.n_deltas += 1;
1698 0 : }
1699 0 : aux_files.dir = Some(dir);
1700 : } else {
1701 : // Check if the AUX_FILES_KEY is initialized
1702 18 : match self.get(AUX_FILES_KEY, ctx).await {
1703 18 : Ok(dir_bytes) => {
1704 18 : let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
1705 : // Key is already set, we may append a delta
1706 18 : self.put(
1707 18 : AUX_FILES_KEY,
1708 18 : Value::WalRecord(NeonWalRecord::AuxFile {
1709 18 : file_path: file_path.clone(),
1710 18 : content: content.clone(),
1711 18 : }),
1712 18 : );
1713 18 : dir.upsert(file_path, content);
1714 18 : n_files = dir.files.len();
1715 18 : aux_files.dir = Some(dir);
1716 : }
1717 : Err(
1718 0 : e @ (PageReconstructError::Cancelled
1719 0 : | PageReconstructError::AncestorLsnTimeout(_)),
1720 0 : ) => {
1721 0 : // Important that we do not interpret a shutdown error as "not found" and thereby
1722 0 : // reset the map.
1723 0 : return Err(e.into());
1724 : }
1725 : // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
1726 : // the original code assumes all other errors are missing keys. Therefore, we keep the code path
1727 : // the same for now, though in theory, we should only match the `MissingKey` variant.
1728 : Err(
1729 0 : e @ (PageReconstructError::Other(_)
1730 : | PageReconstructError::WalRedo(_)
1731 : | PageReconstructError::MissingKey(_)),
1732 : ) => {
1733 : // Key is missing, we must insert an image as the basis for subsequent deltas.
1734 :
1735 0 : if !matches!(e, PageReconstructError::MissingKey(_)) {
1736 0 : let e = utils::error::report_compact_sources(&e);
1737 0 : tracing::warn!("treating error as if it was a missing key: {}", e);
1738 0 : }
1739 :
1740 0 : let mut dir = AuxFilesDirectory {
1741 0 : files: HashMap::new(),
1742 0 : };
1743 0 : dir.upsert(file_path, content);
1744 0 : self.put(
1745 0 : AUX_FILES_KEY,
1746 0 : Value::Image(Bytes::from(
1747 0 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1748 : )),
1749 : );
1750 0 : n_files = 1;
1751 0 : aux_files.dir = Some(dir);
1752 : }
1753 : }
1754 : }
1755 :
1756 18 : self.pending_directory_entries
1757 18 : .push((DirectoryKind::AuxFiles, n_files));
1758 72 : }
1759 :
1760 90 : Ok(())
1761 90 : }
1762 :
1763 : ///
1764 : /// Flush changes accumulated so far to the underlying repository.
1765 : ///
1766 : /// Usually, changes made in DatadirModification are atomic, but this allows
1767 : /// you to flush them to the underlying repository before the final `commit`.
1768 : /// That allows to free up the memory used to hold the pending changes.
1769 : ///
1770 : /// Currently only used during bulk import of a data directory. In that
1771 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1772 : /// whole import fails and the timeline will be deleted anyway.
1773 : /// (Or to be precise, it will be left behind for debugging purposes and
1774 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1775 : ///
1776 : /// Note: A consequence of flushing the pending operations is that they
1777 : /// won't be visible to subsequent operations until `commit`. The function
1778 : /// retains all the metadata, but data pages are flushed. That's again OK
1779 : /// for bulk import, where you are just loading data pages and won't try to
1780 : /// modify the same pages twice.
1781 5790 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1782 5790 : // Unless we have accumulated a decent amount of changes, it's not worth it
1783 5790 : // to scan through the pending_updates list.
1784 5790 : let pending_nblocks = self.pending_nblocks;
1785 5790 : if pending_nblocks < 10000 {
1786 5790 : return Ok(());
1787 0 : }
1788 :
1789 0 : let mut writer = self.tline.writer().await;
1790 :
1791 : // Flush relation and SLRU data blocks, keep metadata.
1792 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1793 0 : for (key, values) in self.pending_updates.drain() {
1794 0 : if !key.is_valid_key_on_write_path() {
1795 0 : bail!(
1796 0 : "the request contains data not supported by pageserver at TimelineWriter::put: {}", key
1797 0 : );
1798 0 : }
1799 0 : let mut write_batch = Vec::new();
1800 0 : for (lsn, value_ser_size, value) in values {
1801 0 : if key.is_rel_block_key() || key.is_slru_block_key() {
1802 0 : // This bails out on first error without modifying pending_updates.
1803 0 : // That's Ok, cf this function's doc comment.
1804 0 : write_batch.push((key.to_compact(), lsn, value_ser_size, value));
1805 0 : } else {
1806 0 : retained_pending_updates.entry(key).or_default().push((
1807 0 : lsn,
1808 0 : value_ser_size,
1809 0 : value,
1810 0 : ));
1811 0 : }
1812 : }
1813 0 : writer.put_batch(write_batch, ctx).await?;
1814 : }
1815 :
1816 0 : self.pending_updates = retained_pending_updates;
1817 0 : self.pending_bytes = 0;
1818 0 :
1819 0 : if pending_nblocks != 0 {
1820 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1821 0 : self.pending_nblocks = 0;
1822 0 : }
1823 :
1824 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1825 0 : writer.update_directory_entries_count(kind, count as u64);
1826 0 : }
1827 :
1828 0 : Ok(())
1829 5790 : }
1830 :
1831 : ///
1832 : /// Finish this atomic update, writing all the updated keys to the
1833 : /// underlying timeline.
1834 : /// All the modifications in this atomic update are stamped by the specified LSN.
1835 : ///
1836 2229228 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1837 2229228 : let mut writer = self.tline.writer().await;
1838 :
1839 2229228 : let pending_nblocks = self.pending_nblocks;
1840 2229228 : self.pending_nblocks = 0;
1841 2229228 :
1842 2229228 : if !self.pending_updates.is_empty() {
1843 : // Ordering: the items in this batch do not need to be in any global order, but values for
1844 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
1845 : // this to do efficient updates to its index.
1846 1242180 : let batch: Vec<(CompactKey, Lsn, usize, Value)> = self
1847 1242180 : .pending_updates
1848 1242180 : .drain()
1849 2100984 : .flat_map(|(key, values)| {
1850 2100984 : values.into_iter().map(move |(lsn, val_ser_size, value)| {
1851 2100984 : if !key.is_valid_key_on_write_path() {
1852 0 : bail!("the request contains data not supported by pageserver at TimelineWriter::put: {}", key);
1853 2100984 : }
1854 2100984 : Ok((key.to_compact(), lsn, val_ser_size, value))
1855 2100984 : })
1856 2100984 : })
1857 1242180 : .collect::<anyhow::Result<Vec<_>>>()?;
1858 :
1859 1242180 : writer.put_batch(batch, ctx).await?;
1860 987048 : }
1861 :
1862 2229228 : if !self.pending_deletions.is_empty() {
1863 6 : writer.delete_batch(&self.pending_deletions, ctx).await?;
1864 6 : self.pending_deletions.clear();
1865 2229222 : }
1866 :
1867 2229228 : self.pending_lsns.push(self.lsn);
1868 2666802 : for pending_lsn in self.pending_lsns.drain(..) {
1869 2666802 : // Ideally, we should be able to call writer.finish_write() only once
1870 2666802 : // with the highest LSN. However, the last_record_lsn variable in the
1871 2666802 : // timeline keeps track of the latest LSN and the immediate previous LSN
1872 2666802 : // so we need to record every LSN to not leave a gap between them.
1873 2666802 : writer.finish_write(pending_lsn);
1874 2666802 : }
1875 :
1876 2229228 : if pending_nblocks != 0 {
1877 811710 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1878 1417518 : }
1879 :
1880 2229228 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1881 8472 : writer.update_directory_entries_count(kind, count as u64);
1882 8472 : }
1883 :
1884 2229228 : self.pending_bytes = 0;
1885 2229228 :
1886 2229228 : Ok(())
1887 2229228 : }
1888 :
1889 875112 : pub(crate) fn len(&self) -> usize {
1890 875112 : self.pending_updates.len() + self.pending_deletions.len()
1891 875112 : }
1892 :
1893 : // Internal helper functions to batch the modifications
1894 :
1895 859806 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1896 : // Have we already updated the same key? Read the latest pending updated
1897 : // version in that case.
1898 : //
1899 : // Note: we don't check pending_deletions. It is an error to request a
1900 : // value that has been removed, deletion only avoids leaking storage.
1901 859806 : if let Some(values) = self.pending_updates.get(&key) {
1902 47784 : if let Some((_, _, value)) = values.last() {
1903 47784 : return if let Value::Image(img) = value {
1904 47784 : Ok(img.clone())
1905 : } else {
1906 : // Currently, we never need to read back a WAL record that we
1907 : // inserted in the same "transaction". All the metadata updates
1908 : // work directly with Images, and we never need to read actual
1909 : // data pages. We could handle this if we had to, by calling
1910 : // the walredo manager, but let's keep it simple for now.
1911 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
1912 0 : "unexpected pending WAL record"
1913 0 : )))
1914 : };
1915 0 : }
1916 812022 : }
1917 812022 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1918 812022 : self.tline.get(key, lsn, ctx).await
1919 859806 : }
1920 :
1921 : /// Only used during unit tests, force putting a key into the modification.
1922 : #[cfg(test)]
1923 6 : pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
1924 6 : self.put(key, val);
1925 6 : }
1926 :
1927 2137440 : fn put(&mut self, key: Key, val: Value) {
1928 2137440 : let values = self.pending_updates.entry(key).or_default();
1929 : // Replace the previous value if it exists at the same lsn
1930 2137440 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
1931 36456 : if *last_lsn == self.lsn {
1932 36456 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
1933 36456 : *last_value = val;
1934 36456 : return;
1935 0 : }
1936 2100984 : }
1937 :
1938 2100984 : let val_serialized_size = val.serialized_size().unwrap() as usize;
1939 2100984 : self.pending_bytes += val_serialized_size;
1940 2100984 : values.push((self.lsn, val_serialized_size, val));
1941 2137440 : }
1942 :
1943 6 : fn delete(&mut self, key_range: Range<Key>) {
1944 6 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1945 6 : self.pending_deletions.push((key_range, self.lsn));
1946 6 : }
1947 : }
1948 :
1949 : /// This struct facilitates accessing either a committed key from the timeline at a
1950 : /// specific LSN, or the latest uncommitted key from a pending modification.
1951 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1952 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1953 : /// need to look up the keys in the modification first before looking them up in the
1954 : /// timeline to not miss the latest updates.
1955 : #[derive(Clone, Copy)]
1956 : pub enum Version<'a> {
1957 : Lsn(Lsn),
1958 : Modified(&'a DatadirModification<'a>),
1959 : }
1960 :
1961 : impl<'a> Version<'a> {
1962 70680 : async fn get(
1963 70680 : &self,
1964 70680 : timeline: &Timeline,
1965 70680 : key: Key,
1966 70680 : ctx: &RequestContext,
1967 70680 : ) -> Result<Bytes, PageReconstructError> {
1968 70680 : match self {
1969 70620 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1970 60 : Version::Modified(modification) => modification.get(key, ctx).await,
1971 : }
1972 70680 : }
1973 :
1974 106860 : fn get_lsn(&self) -> Lsn {
1975 106860 : match self {
1976 88722 : Version::Lsn(lsn) => *lsn,
1977 18138 : Version::Modified(modification) => modification.lsn,
1978 : }
1979 106860 : }
1980 : }
1981 :
1982 : //--- Metadata structs stored in key-value pairs in the repository.
1983 :
1984 6726 : #[derive(Debug, Serialize, Deserialize)]
1985 : struct DbDirectory {
1986 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1987 : dbdirs: HashMap<(Oid, Oid), bool>,
1988 : }
1989 :
1990 870 : #[derive(Debug, Serialize, Deserialize)]
1991 : struct TwoPhaseDirectory {
1992 : xids: HashSet<TransactionId>,
1993 : }
1994 :
1995 5796 : #[derive(Debug, Serialize, Deserialize, Default)]
1996 : struct RelDirectory {
1997 : // Set of relations that exist. (relfilenode, forknum)
1998 : //
1999 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2000 : // key-value pairs, if you have a lot of relations
2001 : rels: HashSet<(Oid, u8)>,
2002 : }
2003 :
2004 84 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
2005 : pub(crate) struct AuxFilesDirectory {
2006 : pub(crate) files: HashMap<String, Bytes>,
2007 : }
2008 :
2009 : impl AuxFilesDirectory {
2010 48 : pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
2011 48 : if let Some(value) = value {
2012 42 : self.files.insert(key, value);
2013 42 : } else {
2014 6 : self.files.remove(&key);
2015 6 : }
2016 48 : }
2017 : }
2018 :
2019 0 : #[derive(Debug, Serialize, Deserialize)]
2020 : struct RelSizeEntry {
2021 : nblocks: u32,
2022 : }
2023 :
2024 2610 : #[derive(Debug, Serialize, Deserialize, Default)]
2025 : struct SlruSegmentDirectory {
2026 : // Set of SLRU segments that exist.
2027 : segments: HashSet<u32>,
2028 : }
2029 :
2030 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2031 : #[repr(u8)]
2032 : pub(crate) enum DirectoryKind {
2033 : Db,
2034 : TwoPhase,
2035 : Rel,
2036 : AuxFiles,
2037 : SlruSegment(SlruKind),
2038 : }
2039 :
2040 : impl DirectoryKind {
2041 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2042 16944 : pub(crate) fn offset(&self) -> usize {
2043 16944 : self.into_usize()
2044 16944 : }
2045 : }
2046 :
2047 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2048 :
2049 : #[allow(clippy::bool_assert_comparison)]
2050 : #[cfg(test)]
2051 : mod tests {
2052 : use hex_literal::hex;
2053 : use utils::id::TimelineId;
2054 :
2055 : use super::*;
2056 :
2057 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
2058 :
2059 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2060 : #[tokio::test]
2061 6 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2062 6 : let name = "aux_files_round_trip";
2063 6 : let harness = TenantHarness::create(name).await?;
2064 6 :
2065 6 : pub const TIMELINE_ID: TimelineId =
2066 6 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2067 6 :
2068 23 : let (tenant, ctx) = harness.load().await;
2069 6 : let tline = tenant
2070 6 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2071 6 : .await?;
2072 6 : let tline = tline.raw_timeline().unwrap();
2073 6 :
2074 6 : // First modification: insert two keys
2075 6 : let mut modification = tline.begin_modification(Lsn(0x1000));
2076 6 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2077 6 : modification.set_lsn(Lsn(0x1008))?;
2078 6 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2079 6 : modification.commit(&ctx).await?;
2080 6 : let expect_1008 = HashMap::from([
2081 6 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2082 6 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2083 6 : ]);
2084 6 :
2085 6 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2086 6 : assert_eq!(readback, expect_1008);
2087 6 :
2088 6 : // Second modification: update one key, remove the other
2089 6 : let mut modification = tline.begin_modification(Lsn(0x2000));
2090 6 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2091 6 : modification.set_lsn(Lsn(0x2008))?;
2092 6 : modification.put_file("foo/bar2", b"", &ctx).await?;
2093 6 : modification.commit(&ctx).await?;
2094 6 : let expect_2008 =
2095 6 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2096 6 :
2097 6 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
2098 6 : assert_eq!(readback, expect_2008);
2099 6 :
2100 6 : // Reading back in time works
2101 6 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2102 6 : assert_eq!(readback, expect_1008);
2103 6 :
2104 6 : Ok(())
2105 6 : }
2106 :
2107 : /*
2108 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2109 : let incremental = timeline.get_current_logical_size();
2110 : let non_incremental = timeline
2111 : .get_current_logical_size_non_incremental(lsn)
2112 : .unwrap();
2113 : assert_eq!(incremental, non_incremental);
2114 : }
2115 : */
2116 :
2117 : /*
2118 : ///
2119 : /// Test list_rels() function, with branches and dropped relations
2120 : ///
2121 : #[test]
2122 : fn test_list_rels_drop() -> Result<()> {
2123 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2124 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2125 : const TESTDB: u32 = 111;
2126 :
2127 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2128 : // after branching fails below
2129 : let mut writer = tline.begin_record(Lsn(0x10));
2130 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2131 : writer.finish()?;
2132 :
2133 : // Create a relation on the timeline
2134 : let mut writer = tline.begin_record(Lsn(0x20));
2135 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2136 : writer.finish()?;
2137 :
2138 : let writer = tline.begin_record(Lsn(0x00));
2139 : writer.finish()?;
2140 :
2141 : // Check that list_rels() lists it after LSN 2, but no before it
2142 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2143 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2144 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2145 :
2146 : // Create a branch, check that the relation is visible there
2147 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2148 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2149 : Some(timeline) => timeline,
2150 : None => panic!("Should have a local timeline"),
2151 : };
2152 : let newtline = DatadirTimelineImpl::new(newtline);
2153 : assert!(newtline
2154 : .list_rels(0, TESTDB, Lsn(0x30))?
2155 : .contains(&TESTREL_A));
2156 :
2157 : // Drop it on the branch
2158 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2159 : new_writer.drop_relation(TESTREL_A)?;
2160 : new_writer.finish()?;
2161 :
2162 : // Check that it's no longer listed on the branch after the point where it was dropped
2163 : assert!(newtline
2164 : .list_rels(0, TESTDB, Lsn(0x30))?
2165 : .contains(&TESTREL_A));
2166 : assert!(!newtline
2167 : .list_rels(0, TESTDB, Lsn(0x40))?
2168 : .contains(&TESTREL_A));
2169 :
2170 : // Run checkpoint and garbage collection and check that it's still not visible
2171 : newtline.checkpoint(CheckpointConfig::Forced)?;
2172 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2173 :
2174 : assert!(!newtline
2175 : .list_rels(0, TESTDB, Lsn(0x40))?
2176 : .contains(&TESTREL_A));
2177 :
2178 : Ok(())
2179 : }
2180 : */
2181 :
2182 : /*
2183 : #[test]
2184 : fn test_read_beyond_eof() -> Result<()> {
2185 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2186 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2187 :
2188 : make_some_layers(&tline, Lsn(0x20))?;
2189 : let mut writer = tline.begin_record(Lsn(0x60));
2190 : walingest.put_rel_page_image(
2191 : &mut writer,
2192 : TESTREL_A,
2193 : 0,
2194 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2195 : )?;
2196 : writer.finish()?;
2197 :
2198 : // Test read before rel creation. Should error out.
2199 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2200 :
2201 : // Read block beyond end of relation at different points in time.
2202 : // These reads should fall into different delta, image, and in-memory layers.
2203 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2204 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2205 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2206 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2207 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2208 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2209 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2210 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2211 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2212 :
2213 : // Test on an in-memory layer with no preceding layer
2214 : let mut writer = tline.begin_record(Lsn(0x70));
2215 : walingest.put_rel_page_image(
2216 : &mut writer,
2217 : TESTREL_B,
2218 : 0,
2219 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2220 : )?;
2221 : writer.finish()?;
2222 :
2223 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2224 :
2225 : Ok(())
2226 : }
2227 : */
2228 : }
|