Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
13 : use crate::walrecord::NeonWalRecord;
14 : use crate::{aux_file, repository::*};
15 : use anyhow::{ensure, Context};
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use itertools::Itertools;
19 : use pageserver_api::key::{
20 : dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
21 : relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
22 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
23 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
24 : };
25 : use pageserver_api::keyspace::SparseKeySpace;
26 : use pageserver_api::models::AuxFilePolicy;
27 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
28 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
29 : use postgres_ffi::BLCKSZ;
30 : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
31 : use serde::{Deserialize, Serialize};
32 : use std::collections::{hash_map, HashMap, HashSet};
33 : use std::ops::ControlFlow;
34 : use std::ops::Range;
35 : use strum::IntoEnumIterator;
36 : use tokio_util::sync::CancellationToken;
37 : use tracing::{debug, info, trace, warn};
38 : use utils::bin_ser::DeserializeError;
39 : use utils::pausable_failpoint;
40 : use utils::vec_map::{VecMap, VecMapOrdering};
41 : use utils::{bin_ser::BeSer, lsn::Lsn};
42 :
43 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
44 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
45 :
46 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
47 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
48 :
49 : #[derive(Debug)]
50 : pub enum LsnForTimestamp {
51 : /// Found commits both before and after the given timestamp
52 : Present(Lsn),
53 :
54 : /// Found no commits after the given timestamp, this means
55 : /// that the newest data in the branch is older than the given
56 : /// timestamp.
57 : ///
58 : /// All commits <= LSN happened before the given timestamp
59 : Future(Lsn),
60 :
61 : /// The queried timestamp is past our horizon we look back at (PITR)
62 : ///
63 : /// All commits > LSN happened after the given timestamp,
64 : /// but any commits < LSN might have happened before or after
65 : /// the given timestamp. We don't know because no data before
66 : /// the given lsn is available.
67 : Past(Lsn),
68 :
69 : /// We have found no commit with a timestamp,
70 : /// so we can't return anything meaningful.
71 : ///
72 : /// The associated LSN is the lower bound value we can safely
73 : /// create branches on, but no statement is made if it is
74 : /// older or newer than the timestamp.
75 : ///
76 : /// This variant can e.g. be returned right after a
77 : /// cluster import.
78 : NoData(Lsn),
79 : }
80 :
81 0 : #[derive(Debug, thiserror::Error)]
82 : pub(crate) enum CalculateLogicalSizeError {
83 : #[error("cancelled")]
84 : Cancelled,
85 :
86 : /// Something went wrong while reading the metadata we use to calculate logical size
87 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
88 : /// in the `From` implementation for this variant.
89 : #[error(transparent)]
90 : PageRead(PageReconstructError),
91 :
92 : /// Something went wrong deserializing metadata that we read to calculate logical size
93 : #[error("decode error: {0}")]
94 : Decode(#[from] DeserializeError),
95 : }
96 :
97 0 : #[derive(Debug, thiserror::Error)]
98 : pub(crate) enum CollectKeySpaceError {
99 : #[error(transparent)]
100 : Decode(#[from] DeserializeError),
101 : #[error(transparent)]
102 : PageRead(PageReconstructError),
103 : #[error("cancelled")]
104 : Cancelled,
105 : }
106 :
107 : impl From<PageReconstructError> for CollectKeySpaceError {
108 0 : fn from(err: PageReconstructError) -> Self {
109 0 : match err {
110 0 : PageReconstructError::Cancelled => Self::Cancelled,
111 0 : err => Self::PageRead(err),
112 : }
113 0 : }
114 : }
115 :
116 : impl From<PageReconstructError> for CalculateLogicalSizeError {
117 0 : fn from(pre: PageReconstructError) -> Self {
118 0 : match pre {
119 0 : PageReconstructError::Cancelled => Self::Cancelled,
120 0 : _ => Self::PageRead(pre),
121 : }
122 0 : }
123 : }
124 :
125 0 : #[derive(Debug, thiserror::Error)]
126 : pub enum RelationError {
127 : #[error("Relation Already Exists")]
128 : AlreadyExists,
129 : #[error("invalid relnode")]
130 : InvalidRelnode,
131 : #[error(transparent)]
132 : Other(#[from] anyhow::Error),
133 : }
134 :
135 : ///
136 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
137 : /// and other special kinds of files, in a versioned key-value store. The
138 : /// Timeline struct provides the key-value store.
139 : ///
140 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
141 : /// implementation, and might be moved into a separate struct later.
142 : impl Timeline {
143 : /// Start ingesting a WAL record, or other atomic modification of
144 : /// the timeline.
145 : ///
146 : /// This provides a transaction-like interface to perform a bunch
147 : /// of modifications atomically.
148 : ///
149 : /// To ingest a WAL record, call begin_modification(lsn) to get a
150 : /// DatadirModification object. Use the functions in the object to
151 : /// modify the repository state, updating all the pages and metadata
152 : /// that the WAL record affects. When you're done, call commit() to
153 : /// commit the changes.
154 : ///
155 : /// Lsn stored in modification is advanced by `ingest_record` and
156 : /// is used by `commit()` to update `last_record_lsn`.
157 : ///
158 : /// Calling commit() will flush all the changes and reset the state,
159 : /// so the `DatadirModification` struct can be reused to perform the next modification.
160 : ///
161 : /// Note that any pending modifications you make through the
162 : /// modification object won't be visible to calls to the 'get' and list
163 : /// functions of the timeline until you finish! And if you update the
164 : /// same page twice, the last update wins.
165 : ///
166 268378 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
167 268378 : where
168 268378 : Self: Sized,
169 268378 : {
170 268378 : DatadirModification {
171 268378 : tline: self,
172 268378 : pending_lsns: Vec::new(),
173 268378 : pending_updates: HashMap::new(),
174 268378 : pending_deletions: Vec::new(),
175 268378 : pending_nblocks: 0,
176 268378 : pending_directory_entries: Vec::new(),
177 268378 : lsn,
178 268378 : }
179 268378 : }
180 :
181 : //------------------------------------------------------------------------------
182 : // Public GET functions
183 : //------------------------------------------------------------------------------
184 :
185 : /// Look up given page version.
186 18384 : pub(crate) async fn get_rel_page_at_lsn(
187 18384 : &self,
188 18384 : tag: RelTag,
189 18384 : blknum: BlockNumber,
190 18384 : version: Version<'_>,
191 18384 : ctx: &RequestContext,
192 18384 : ) -> Result<Bytes, PageReconstructError> {
193 18384 : if tag.relnode == 0 {
194 0 : return Err(PageReconstructError::Other(
195 0 : RelationError::InvalidRelnode.into(),
196 0 : ));
197 18384 : }
198 :
199 18384 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
200 18384 : if blknum >= nblocks {
201 0 : debug!(
202 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
203 0 : tag,
204 0 : blknum,
205 0 : version.get_lsn(),
206 : nblocks
207 : );
208 0 : return Ok(ZERO_PAGE.clone());
209 18384 : }
210 18384 :
211 18384 : let key = rel_block_to_key(tag, blknum);
212 18384 : version.get(self, key, ctx).await
213 18384 : }
214 :
215 : // Get size of a database in blocks
216 0 : pub(crate) async fn get_db_size(
217 0 : &self,
218 0 : spcnode: Oid,
219 0 : dbnode: Oid,
220 0 : version: Version<'_>,
221 0 : ctx: &RequestContext,
222 0 : ) -> Result<usize, PageReconstructError> {
223 0 : let mut total_blocks = 0;
224 :
225 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
226 :
227 0 : for rel in rels {
228 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
229 0 : total_blocks += n_blocks as usize;
230 : }
231 0 : Ok(total_blocks)
232 0 : }
233 :
234 : /// Get size of a relation file
235 24434 : pub(crate) async fn get_rel_size(
236 24434 : &self,
237 24434 : tag: RelTag,
238 24434 : version: Version<'_>,
239 24434 : ctx: &RequestContext,
240 24434 : ) -> Result<BlockNumber, PageReconstructError> {
241 24434 : if tag.relnode == 0 {
242 0 : return Err(PageReconstructError::Other(
243 0 : RelationError::InvalidRelnode.into(),
244 0 : ));
245 24434 : }
246 :
247 24434 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
248 19294 : return Ok(nblocks);
249 5140 : }
250 5140 :
251 5140 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
252 0 : && !self.get_rel_exists(tag, version, ctx).await?
253 : {
254 : // FIXME: Postgres sometimes calls smgrcreate() to create
255 : // FSM, and smgrnblocks() on it immediately afterwards,
256 : // without extending it. Tolerate that by claiming that
257 : // any non-existent FSM fork has size 0.
258 0 : return Ok(0);
259 5140 : }
260 5140 :
261 5140 : let key = rel_size_to_key(tag);
262 5140 : let mut buf = version.get(self, key, ctx).await?;
263 5136 : let nblocks = buf.get_u32_le();
264 5136 :
265 5136 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
266 5136 :
267 5136 : Ok(nblocks)
268 24434 : }
269 :
270 : /// Does relation exist?
271 6050 : pub(crate) async fn get_rel_exists(
272 6050 : &self,
273 6050 : tag: RelTag,
274 6050 : version: Version<'_>,
275 6050 : ctx: &RequestContext,
276 6050 : ) -> Result<bool, PageReconstructError> {
277 6050 : if tag.relnode == 0 {
278 0 : return Err(PageReconstructError::Other(
279 0 : RelationError::InvalidRelnode.into(),
280 0 : ));
281 6050 : }
282 :
283 : // first try to lookup relation in cache
284 6050 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
285 6032 : return Ok(true);
286 18 : }
287 : // then check if the database was already initialized.
288 : // get_rel_exists can be called before dbdir is created.
289 18 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
290 18 : let dbdirs = match DbDirectory::des(&buf).context("deserialization failure") {
291 18 : Ok(dir) => Ok(dir.dbdirs),
292 0 : Err(e) => Err(PageReconstructError::from(e)),
293 0 : }?;
294 18 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
295 0 : return Ok(false);
296 18 : }
297 18 : // fetch directory listing
298 18 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
299 18 : let buf = version.get(self, key, ctx).await?;
300 :
301 18 : match RelDirectory::des(&buf).context("deserialization failure") {
302 18 : Ok(dir) => {
303 18 : let exists = dir.rels.contains(&(tag.relnode, tag.forknum));
304 18 : Ok(exists)
305 : }
306 0 : Err(e) => Err(PageReconstructError::from(e)),
307 : }
308 6050 : }
309 :
310 : /// Get a list of all existing relations in given tablespace and database.
311 : ///
312 : /// # Cancel-Safety
313 : ///
314 : /// This method is cancellation-safe.
315 0 : pub(crate) async fn list_rels(
316 0 : &self,
317 0 : spcnode: Oid,
318 0 : dbnode: Oid,
319 0 : version: Version<'_>,
320 0 : ctx: &RequestContext,
321 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
322 0 : // fetch directory listing
323 0 : let key = rel_dir_to_key(spcnode, dbnode);
324 0 : let buf = version.get(self, key, ctx).await?;
325 :
326 0 : match RelDirectory::des(&buf).context("deserialization failure") {
327 0 : Ok(dir) => {
328 0 : let rels: HashSet<RelTag> =
329 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
330 0 : spcnode,
331 0 : dbnode,
332 0 : relnode: *relnode,
333 0 : forknum: *forknum,
334 0 : }));
335 0 :
336 0 : Ok(rels)
337 : }
338 0 : Err(e) => Err(PageReconstructError::from(e)),
339 : }
340 0 : }
341 :
342 : /// Get the whole SLRU segment
343 0 : pub(crate) async fn get_slru_segment(
344 0 : &self,
345 0 : kind: SlruKind,
346 0 : segno: u32,
347 0 : lsn: Lsn,
348 0 : ctx: &RequestContext,
349 0 : ) -> Result<Bytes, PageReconstructError> {
350 0 : let n_blocks = self
351 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
352 0 : .await?;
353 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
354 0 : for blkno in 0..n_blocks {
355 0 : let block = self
356 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
357 0 : .await?;
358 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
359 : }
360 0 : Ok(segment.freeze())
361 0 : }
362 :
363 : /// Look up given SLRU page version.
364 0 : pub(crate) async fn get_slru_page_at_lsn(
365 0 : &self,
366 0 : kind: SlruKind,
367 0 : segno: u32,
368 0 : blknum: BlockNumber,
369 0 : lsn: Lsn,
370 0 : ctx: &RequestContext,
371 0 : ) -> Result<Bytes, PageReconstructError> {
372 0 : let key = slru_block_to_key(kind, segno, blknum);
373 0 : self.get(key, lsn, ctx).await
374 0 : }
375 :
376 : /// Get size of an SLRU segment
377 0 : pub(crate) async fn get_slru_segment_size(
378 0 : &self,
379 0 : kind: SlruKind,
380 0 : segno: u32,
381 0 : version: Version<'_>,
382 0 : ctx: &RequestContext,
383 0 : ) -> Result<BlockNumber, PageReconstructError> {
384 0 : let key = slru_segment_size_to_key(kind, segno);
385 0 : let mut buf = version.get(self, key, ctx).await?;
386 0 : Ok(buf.get_u32_le())
387 0 : }
388 :
389 : /// Get size of an SLRU segment
390 0 : pub(crate) async fn get_slru_segment_exists(
391 0 : &self,
392 0 : kind: SlruKind,
393 0 : segno: u32,
394 0 : version: Version<'_>,
395 0 : ctx: &RequestContext,
396 0 : ) -> Result<bool, PageReconstructError> {
397 0 : // fetch directory listing
398 0 : let key = slru_dir_to_key(kind);
399 0 : let buf = version.get(self, key, ctx).await?;
400 :
401 0 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
402 0 : Ok(dir) => {
403 0 : let exists = dir.segments.contains(&segno);
404 0 : Ok(exists)
405 : }
406 0 : Err(e) => Err(PageReconstructError::from(e)),
407 : }
408 0 : }
409 :
410 : /// Locate LSN, such that all transactions that committed before
411 : /// 'search_timestamp' are visible, but nothing newer is.
412 : ///
413 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
414 : /// so it's not well defined which LSN you get if there were multiple commits
415 : /// "in flight" at that point in time.
416 : ///
417 0 : pub(crate) async fn find_lsn_for_timestamp(
418 0 : &self,
419 0 : search_timestamp: TimestampTz,
420 0 : cancel: &CancellationToken,
421 0 : ctx: &RequestContext,
422 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
423 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
424 :
425 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
426 0 : // We use this method to figure out the branching LSN for the new branch, but the
427 0 : // GC cutoff could be before the branching point and we cannot create a new branch
428 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
429 0 : // on the safe side.
430 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
431 0 : let max_lsn = self.get_last_record_lsn();
432 0 :
433 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
434 0 : // LSN divided by 8.
435 0 : let mut low = min_lsn.0 / 8;
436 0 : let mut high = max_lsn.0 / 8 + 1;
437 0 :
438 0 : let mut found_smaller = false;
439 0 : let mut found_larger = false;
440 :
441 0 : while low < high {
442 0 : if cancel.is_cancelled() {
443 0 : return Err(PageReconstructError::Cancelled);
444 0 : }
445 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
446 0 : let mid = (high + low) / 2;
447 :
448 0 : let cmp = self
449 0 : .is_latest_commit_timestamp_ge_than(
450 0 : search_timestamp,
451 0 : Lsn(mid * 8),
452 0 : &mut found_smaller,
453 0 : &mut found_larger,
454 0 : ctx,
455 0 : )
456 0 : .await?;
457 :
458 0 : if cmp {
459 0 : high = mid;
460 0 : } else {
461 0 : low = mid + 1;
462 0 : }
463 : }
464 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
465 : // so the LSN of the last commit record before or at `search_timestamp`.
466 : // Remove one from `low` to get `t`.
467 : //
468 : // FIXME: it would be better to get the LSN of the previous commit.
469 : // Otherwise, if you restore to the returned LSN, the database will
470 : // include physical changes from later commits that will be marked
471 : // as aborted, and will need to be vacuumed away.
472 0 : let commit_lsn = Lsn((low - 1) * 8);
473 0 : match (found_smaller, found_larger) {
474 : (false, false) => {
475 : // This can happen if no commit records have been processed yet, e.g.
476 : // just after importing a cluster.
477 0 : Ok(LsnForTimestamp::NoData(min_lsn))
478 : }
479 : (false, true) => {
480 : // Didn't find any commit timestamps smaller than the request
481 0 : Ok(LsnForTimestamp::Past(min_lsn))
482 : }
483 0 : (true, _) if commit_lsn < min_lsn => {
484 0 : // the search above did set found_smaller to true but it never increased the lsn.
485 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
486 0 : // below the min_lsn. We should never do that.
487 0 : Ok(LsnForTimestamp::Past(min_lsn))
488 : }
489 : (true, false) => {
490 : // Only found commits with timestamps smaller than the request.
491 : // It's still a valid case for branch creation, return it.
492 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
493 : // case, anyway.
494 0 : Ok(LsnForTimestamp::Future(commit_lsn))
495 : }
496 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
497 : }
498 0 : }
499 :
500 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
501 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
502 : ///
503 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
504 : /// with a smaller/larger timestamp.
505 : ///
506 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
507 0 : &self,
508 0 : search_timestamp: TimestampTz,
509 0 : probe_lsn: Lsn,
510 0 : found_smaller: &mut bool,
511 0 : found_larger: &mut bool,
512 0 : ctx: &RequestContext,
513 0 : ) -> Result<bool, PageReconstructError> {
514 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
515 0 : if timestamp >= search_timestamp {
516 0 : *found_larger = true;
517 0 : return ControlFlow::Break(true);
518 0 : } else {
519 0 : *found_smaller = true;
520 0 : }
521 0 : ControlFlow::Continue(())
522 0 : })
523 0 : .await
524 0 : }
525 :
526 : /// Obtain the possible timestamp range for the given lsn.
527 : ///
528 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
529 0 : pub(crate) async fn get_timestamp_for_lsn(
530 0 : &self,
531 0 : probe_lsn: Lsn,
532 0 : ctx: &RequestContext,
533 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
534 0 : let mut max: Option<TimestampTz> = None;
535 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
536 0 : if let Some(max_prev) = max {
537 0 : max = Some(max_prev.max(timestamp));
538 0 : } else {
539 0 : max = Some(timestamp);
540 0 : }
541 0 : ControlFlow::Continue(())
542 0 : })
543 0 : .await?;
544 :
545 0 : Ok(max)
546 0 : }
547 :
548 : /// Runs the given function on all the timestamps for a given lsn
549 : ///
550 : /// The return value is either given by the closure, or set to the `Default`
551 : /// impl's output.
552 0 : async fn map_all_timestamps<T: Default>(
553 0 : &self,
554 0 : probe_lsn: Lsn,
555 0 : ctx: &RequestContext,
556 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
557 0 : ) -> Result<T, PageReconstructError> {
558 0 : for segno in self
559 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
560 0 : .await?
561 : {
562 0 : let nblocks = self
563 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
564 0 : .await?;
565 0 : for blknum in (0..nblocks).rev() {
566 0 : let clog_page = self
567 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
568 0 : .await?;
569 :
570 0 : if clog_page.len() == BLCKSZ as usize + 8 {
571 0 : let mut timestamp_bytes = [0u8; 8];
572 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
573 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
574 0 :
575 0 : match f(timestamp) {
576 0 : ControlFlow::Break(b) => return Ok(b),
577 0 : ControlFlow::Continue(()) => (),
578 : }
579 0 : }
580 : }
581 : }
582 0 : Ok(Default::default())
583 0 : }
584 :
585 0 : pub(crate) async fn get_slru_keyspace(
586 0 : &self,
587 0 : version: Version<'_>,
588 0 : ctx: &RequestContext,
589 0 : ) -> Result<KeySpace, PageReconstructError> {
590 0 : let mut accum = KeySpaceAccum::new();
591 :
592 0 : for kind in SlruKind::iter() {
593 0 : let mut segments: Vec<u32> = self
594 0 : .list_slru_segments(kind, version, ctx)
595 0 : .await?
596 0 : .into_iter()
597 0 : .collect();
598 0 : segments.sort_unstable();
599 :
600 0 : for seg in segments {
601 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
602 :
603 0 : accum.add_range(
604 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
605 0 : );
606 : }
607 : }
608 :
609 0 : Ok(accum.to_keyspace())
610 0 : }
611 :
612 : /// Get a list of SLRU segments
613 0 : pub(crate) async fn list_slru_segments(
614 0 : &self,
615 0 : kind: SlruKind,
616 0 : version: Version<'_>,
617 0 : ctx: &RequestContext,
618 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
619 0 : // fetch directory entry
620 0 : let key = slru_dir_to_key(kind);
621 :
622 0 : let buf = version.get(self, key, ctx).await?;
623 0 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
624 0 : Ok(dir) => Ok(dir.segments),
625 0 : Err(e) => Err(PageReconstructError::from(e)),
626 : }
627 0 : }
628 :
629 0 : pub(crate) async fn get_relmap_file(
630 0 : &self,
631 0 : spcnode: Oid,
632 0 : dbnode: Oid,
633 0 : version: Version<'_>,
634 0 : ctx: &RequestContext,
635 0 : ) -> Result<Bytes, PageReconstructError> {
636 0 : let key = relmap_file_key(spcnode, dbnode);
637 :
638 0 : let buf = version.get(self, key, ctx).await?;
639 0 : Ok(buf)
640 0 : }
641 :
642 274 : pub(crate) async fn list_dbdirs(
643 274 : &self,
644 274 : lsn: Lsn,
645 274 : ctx: &RequestContext,
646 274 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
647 : // fetch directory entry
648 3280 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
649 :
650 274 : match DbDirectory::des(&buf).context("deserialization failure") {
651 274 : Ok(dir) => Ok(dir.dbdirs),
652 0 : Err(e) => Err(PageReconstructError::from(e)),
653 : }
654 274 : }
655 :
656 0 : pub(crate) async fn get_twophase_file(
657 0 : &self,
658 0 : xid: TransactionId,
659 0 : lsn: Lsn,
660 0 : ctx: &RequestContext,
661 0 : ) -> Result<Bytes, PageReconstructError> {
662 0 : let key = twophase_file_key(xid);
663 0 : let buf = self.get(key, lsn, ctx).await?;
664 0 : Ok(buf)
665 0 : }
666 :
667 2 : pub(crate) async fn list_twophase_files(
668 2 : &self,
669 2 : lsn: Lsn,
670 2 : ctx: &RequestContext,
671 2 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
672 : // fetch directory entry
673 2 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
674 :
675 2 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
676 2 : Ok(dir) => Ok(dir.xids),
677 0 : Err(e) => Err(PageReconstructError::from(e)),
678 : }
679 2 : }
680 :
681 0 : pub(crate) async fn get_control_file(
682 0 : &self,
683 0 : lsn: Lsn,
684 0 : ctx: &RequestContext,
685 0 : ) -> Result<Bytes, PageReconstructError> {
686 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
687 0 : }
688 :
689 12 : pub(crate) async fn get_checkpoint(
690 12 : &self,
691 12 : lsn: Lsn,
692 12 : ctx: &RequestContext,
693 12 : ) -> Result<Bytes, PageReconstructError> {
694 12 : self.get(CHECKPOINT_KEY, lsn, ctx).await
695 12 : }
696 :
697 28 : async fn list_aux_files_v1(
698 28 : &self,
699 28 : lsn: Lsn,
700 28 : ctx: &RequestContext,
701 28 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
702 28 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
703 22 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
704 22 : Ok(dir) => Ok(dir.files),
705 0 : Err(e) => Err(PageReconstructError::from(e)),
706 : },
707 6 : Err(e) => {
708 6 : // This is expected: historical databases do not have the key.
709 6 : debug!("Failed to get info about AUX files: {}", e);
710 6 : Ok(HashMap::new())
711 : }
712 : }
713 28 : }
714 :
715 12 : async fn list_aux_files_v2(
716 12 : &self,
717 12 : lsn: Lsn,
718 12 : ctx: &RequestContext,
719 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
720 12 : let kv = self
721 12 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
722 0 : .await
723 12 : .context("scan")?;
724 12 : let mut result = HashMap::new();
725 12 : let mut sz = 0;
726 30 : for (_, v) in kv {
727 18 : let v = v.context("get value")?;
728 18 : let v = aux_file::decode_file_value_bytes(&v).context("value decode")?;
729 36 : for (fname, content) in v {
730 18 : sz += fname.len();
731 18 : sz += content.len();
732 18 : result.insert(fname, content);
733 18 : }
734 : }
735 12 : self.aux_file_size_estimator.on_initial(sz);
736 12 : Ok(result)
737 12 : }
738 :
739 0 : pub(crate) async fn trigger_aux_file_size_computation(
740 0 : &self,
741 0 : lsn: Lsn,
742 0 : ctx: &RequestContext,
743 0 : ) -> Result<(), PageReconstructError> {
744 0 : let current_policy = self.last_aux_file_policy.load();
745 0 : if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
746 0 : self.list_aux_files_v2(lsn, ctx).await?;
747 0 : }
748 0 : Ok(())
749 0 : }
750 :
751 26 : pub(crate) async fn list_aux_files(
752 26 : &self,
753 26 : lsn: Lsn,
754 26 : ctx: &RequestContext,
755 26 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
756 26 : let current_policy = self.last_aux_file_policy.load();
757 26 : match current_policy {
758 14 : Some(AuxFilePolicy::V1) | None => self.list_aux_files_v1(lsn, ctx).await,
759 10 : Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
760 : Some(AuxFilePolicy::CrossValidation) => {
761 2 : let v1_result = self.list_aux_files_v1(lsn, ctx).await;
762 2 : let v2_result = self.list_aux_files_v2(lsn, ctx).await;
763 2 : match (v1_result, v2_result) {
764 2 : (Ok(v1), Ok(v2)) => {
765 2 : if v1 != v2 {
766 0 : tracing::error!(
767 0 : "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
768 : );
769 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
770 0 : "unmatched aux file v1 v2 result"
771 0 : )));
772 2 : }
773 2 : Ok(v1)
774 : }
775 0 : (Ok(_), Err(v2)) => {
776 0 : tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
777 0 : Err(v2)
778 : }
779 0 : (Err(v1), Ok(_)) => {
780 0 : tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
781 0 : Err(v1)
782 : }
783 0 : (Err(_), Err(v2)) => Err(v2),
784 : }
785 : }
786 : }
787 26 : }
788 :
789 0 : pub(crate) async fn get_replorigins(
790 0 : &self,
791 0 : lsn: Lsn,
792 0 : ctx: &RequestContext,
793 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
794 0 : let kv = self
795 0 : .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
796 0 : .await
797 0 : .context("scan")?;
798 0 : let mut result = HashMap::new();
799 0 : for (k, v) in kv {
800 0 : let v = v.context("get value")?;
801 0 : let origin_id = k.field6 as RepOriginId;
802 0 : let origin_lsn = Lsn::des(&v).unwrap();
803 0 : if origin_lsn != Lsn::INVALID {
804 0 : result.insert(origin_id, origin_lsn);
805 0 : }
806 : }
807 0 : Ok(result)
808 0 : }
809 :
810 : /// Does the same as get_current_logical_size but counted on demand.
811 : /// Used to initialize the logical size tracking on startup.
812 : ///
813 : /// Only relation blocks are counted currently. That excludes metadata,
814 : /// SLRUs, twophase files etc.
815 : ///
816 : /// # Cancel-Safety
817 : ///
818 : /// This method is cancellation-safe.
819 0 : pub(crate) async fn get_current_logical_size_non_incremental(
820 0 : &self,
821 0 : lsn: Lsn,
822 0 : ctx: &RequestContext,
823 0 : ) -> Result<u64, CalculateLogicalSizeError> {
824 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
825 :
826 : // Fetch list of database dirs and iterate them
827 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
828 0 : let dbdir = DbDirectory::des(&buf)?;
829 :
830 0 : let mut total_size: u64 = 0;
831 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
832 0 : for rel in self
833 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
834 0 : .await?
835 : {
836 0 : if self.cancel.is_cancelled() {
837 0 : return Err(CalculateLogicalSizeError::Cancelled);
838 0 : }
839 0 : let relsize_key = rel_size_to_key(rel);
840 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
841 0 : let relsize = buf.get_u32_le();
842 0 :
843 0 : total_size += relsize as u64;
844 : }
845 : }
846 0 : Ok(total_size * BLCKSZ as u64)
847 0 : }
848 :
849 : ///
850 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
851 : /// Anything that's not listed maybe removed from the underlying storage (from
852 : /// that LSN forwards).
853 : ///
854 : /// The return value is (dense keyspace, sparse keyspace).
855 274 : pub(crate) async fn collect_keyspace(
856 274 : &self,
857 274 : lsn: Lsn,
858 274 : ctx: &RequestContext,
859 274 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
860 274 : // Iterate through key ranges, greedily packing them into partitions
861 274 : let mut result = KeySpaceAccum::new();
862 274 :
863 274 : // The dbdir metadata always exists
864 274 : result.add_key(DBDIR_KEY);
865 :
866 : // Fetch list of database dirs and iterate them
867 3280 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
868 274 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
869 274 :
870 274 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
871 274 : for ((spcnode, dbnode), has_relmap_file) in dbs {
872 0 : if has_relmap_file {
873 0 : result.add_key(relmap_file_key(spcnode, dbnode));
874 0 : }
875 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
876 :
877 0 : let mut rels: Vec<RelTag> = self
878 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
879 0 : .await?
880 0 : .into_iter()
881 0 : .collect();
882 0 : rels.sort_unstable();
883 0 : for rel in rels {
884 0 : let relsize_key = rel_size_to_key(rel);
885 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
886 0 : let relsize = buf.get_u32_le();
887 0 :
888 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
889 0 : result.add_key(relsize_key);
890 : }
891 : }
892 :
893 : // Iterate SLRUs next
894 822 : for kind in [
895 274 : SlruKind::Clog,
896 274 : SlruKind::MultiXactMembers,
897 274 : SlruKind::MultiXactOffsets,
898 : ] {
899 822 : let slrudir_key = slru_dir_to_key(kind);
900 822 : result.add_key(slrudir_key);
901 10050 : let buf = self.get(slrudir_key, lsn, ctx).await?;
902 822 : let dir = SlruSegmentDirectory::des(&buf)?;
903 822 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
904 822 : segments.sort_unstable();
905 822 : for segno in segments {
906 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
907 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
908 0 : let segsize = buf.get_u32_le();
909 0 :
910 0 : result.add_range(
911 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
912 0 : );
913 0 : result.add_key(segsize_key);
914 : }
915 : }
916 :
917 : // Then pg_twophase
918 274 : result.add_key(TWOPHASEDIR_KEY);
919 3169 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
920 274 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
921 274 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
922 274 : xids.sort_unstable();
923 274 : for xid in xids {
924 0 : result.add_key(twophase_file_key(xid));
925 0 : }
926 :
927 274 : result.add_key(CONTROLFILE_KEY);
928 274 : result.add_key(CHECKPOINT_KEY);
929 274 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
930 170 : result.add_key(AUX_FILES_KEY);
931 170 : }
932 :
933 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
934 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
935 : // and the keys will not be garbage-colllected.
936 : #[cfg(test)]
937 : {
938 274 : let guard = self.extra_test_dense_keyspace.load();
939 274 : for kr in &guard.ranges {
940 0 : result.add_range(kr.clone());
941 0 : }
942 : }
943 :
944 274 : let dense_keyspace = result.to_keyspace();
945 274 : let sparse_keyspace = SparseKeySpace(KeySpace {
946 274 : ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
947 274 : });
948 274 :
949 274 : if cfg!(debug_assertions) {
950 : // Verify if the sparse keyspaces are ordered and non-overlapping.
951 :
952 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
953 : // category of sparse keys are split into their own image/delta files. If there
954 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
955 : // and we want the developer to keep the keyspaces separated.
956 :
957 274 : let ranges = &sparse_keyspace.0.ranges;
958 :
959 : // TODO: use a single overlaps_with across the codebase
960 274 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
961 274 : !(a.end <= b.start || b.end <= a.start)
962 274 : }
963 548 : for i in 0..ranges.len() {
964 548 : for j in 0..i {
965 274 : if overlaps_with(&ranges[i], &ranges[j]) {
966 0 : panic!(
967 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
968 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
969 0 : );
970 274 : }
971 : }
972 : }
973 274 : for i in 1..ranges.len() {
974 274 : assert!(
975 274 : ranges[i - 1].end <= ranges[i].start,
976 0 : "unordered sparse keyspace: {}..{} and {}..{}",
977 0 : ranges[i - 1].start,
978 0 : ranges[i - 1].end,
979 0 : ranges[i].start,
980 0 : ranges[i].end
981 : );
982 : }
983 0 : }
984 :
985 274 : Ok((dense_keyspace, sparse_keyspace))
986 274 : }
987 :
988 : /// Get cached size of relation if it not updated after specified LSN
989 448540 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
990 448540 : let rel_size_cache = self.rel_size_cache.read().unwrap();
991 448540 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
992 448518 : if lsn >= *cached_lsn {
993 443372 : return Some(*nblocks);
994 5146 : }
995 22 : }
996 5168 : None
997 448540 : }
998 :
999 : /// Update cached relation size if there is no more recent update
1000 5136 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1001 5136 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1002 5136 :
1003 5136 : if lsn < rel_size_cache.complete_as_of {
1004 : // Do not cache old values. It's safe to cache the size on read, as long as
1005 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1006 : // never evict values from the cache, so if the relation size changed after
1007 : // 'lsn', the new value is already in the cache.
1008 0 : return;
1009 5136 : }
1010 5136 :
1011 5136 : match rel_size_cache.map.entry(tag) {
1012 5136 : hash_map::Entry::Occupied(mut entry) => {
1013 5136 : let cached_lsn = entry.get_mut();
1014 5136 : if lsn >= cached_lsn.0 {
1015 0 : *cached_lsn = (lsn, nblocks);
1016 5136 : }
1017 : }
1018 0 : hash_map::Entry::Vacant(entry) => {
1019 0 : entry.insert((lsn, nblocks));
1020 0 : }
1021 : }
1022 5136 : }
1023 :
1024 : /// Store cached relation size
1025 288732 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1026 288732 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1027 288732 : rel_size_cache.map.insert(tag, (lsn, nblocks));
1028 288732 : }
1029 :
1030 : /// Remove cached relation size
1031 2 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1032 2 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1033 2 : rel_size_cache.map.remove(tag);
1034 2 : }
1035 : }
1036 :
1037 : /// DatadirModification represents an operation to ingest an atomic set of
1038 : /// updates to the repository. It is created by the 'begin_record'
1039 : /// function. It is called for each WAL record, so that all the modifications
1040 : /// by a one WAL record appear atomic.
1041 : pub struct DatadirModification<'a> {
1042 : /// The timeline this modification applies to. You can access this to
1043 : /// read the state, but note that any pending updates are *not* reflected
1044 : /// in the state in 'tline' yet.
1045 : pub tline: &'a Timeline,
1046 :
1047 : /// Current LSN of the modification
1048 : lsn: Lsn,
1049 :
1050 : // The modifications are not applied directly to the underlying key-value store.
1051 : // The put-functions add the modifications here, and they are flushed to the
1052 : // underlying key-value store by the 'finish' function.
1053 : pending_lsns: Vec<Lsn>,
1054 : pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
1055 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1056 : pending_nblocks: i64,
1057 :
1058 : /// For special "directory" keys that store key-value maps, track the size of the map
1059 : /// if it was updated in this modification.
1060 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
1061 : }
1062 :
1063 : impl<'a> DatadirModification<'a> {
1064 : /// Get the current lsn
1065 418056 : pub(crate) fn get_lsn(&self) -> Lsn {
1066 418056 : self.lsn
1067 418056 : }
1068 :
1069 : /// Set the current lsn
1070 145858 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1071 145858 : ensure!(
1072 145858 : lsn >= self.lsn,
1073 0 : "setting an older lsn {} than {} is not allowed",
1074 : lsn,
1075 : self.lsn
1076 : );
1077 145858 : if lsn > self.lsn {
1078 145858 : self.pending_lsns.push(self.lsn);
1079 145858 : self.lsn = lsn;
1080 145858 : }
1081 145858 : Ok(())
1082 145858 : }
1083 :
1084 : /// Initialize a completely new repository.
1085 : ///
1086 : /// This inserts the directory metadata entries that are assumed to
1087 : /// always exist.
1088 160 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1089 160 : let buf = DbDirectory::ser(&DbDirectory {
1090 160 : dbdirs: HashMap::new(),
1091 160 : })?;
1092 160 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
1093 160 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1094 160 :
1095 160 : // Create AuxFilesDirectory
1096 160 : self.init_aux_dir()?;
1097 :
1098 160 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1099 160 : xids: HashSet::new(),
1100 160 : })?;
1101 160 : self.pending_directory_entries
1102 160 : .push((DirectoryKind::TwoPhase, 0));
1103 160 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1104 :
1105 160 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1106 160 : let empty_dir = Value::Image(buf);
1107 160 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1108 160 : self.pending_directory_entries
1109 160 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1110 160 : self.put(
1111 160 : slru_dir_to_key(SlruKind::MultiXactMembers),
1112 160 : empty_dir.clone(),
1113 160 : );
1114 160 : self.pending_directory_entries
1115 160 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1116 160 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1117 160 : self.pending_directory_entries
1118 160 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1119 160 :
1120 160 : Ok(())
1121 160 : }
1122 :
1123 : #[cfg(test)]
1124 158 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1125 158 : self.init_empty()?;
1126 158 : self.put_control_file(bytes::Bytes::from_static(
1127 158 : b"control_file contents do not matter",
1128 158 : ))
1129 158 : .context("put_control_file")?;
1130 158 : self.put_checkpoint(bytes::Bytes::from_static(
1131 158 : b"checkpoint_file contents do not matter",
1132 158 : ))
1133 158 : .context("put_checkpoint_file")?;
1134 158 : Ok(())
1135 158 : }
1136 :
1137 : /// Put a new page version that can be constructed from a WAL record
1138 : ///
1139 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1140 : /// current end-of-file. It's up to the caller to check that the relation size
1141 : /// matches the blocks inserted!
1142 145630 : pub fn put_rel_wal_record(
1143 145630 : &mut self,
1144 145630 : rel: RelTag,
1145 145630 : blknum: BlockNumber,
1146 145630 : rec: NeonWalRecord,
1147 145630 : ) -> anyhow::Result<()> {
1148 145630 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1149 145630 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1150 145630 : Ok(())
1151 145630 : }
1152 :
1153 : // Same, but for an SLRU.
1154 8 : pub fn put_slru_wal_record(
1155 8 : &mut self,
1156 8 : kind: SlruKind,
1157 8 : segno: u32,
1158 8 : blknum: BlockNumber,
1159 8 : rec: NeonWalRecord,
1160 8 : ) -> anyhow::Result<()> {
1161 8 : self.put(
1162 8 : slru_block_to_key(kind, segno, blknum),
1163 8 : Value::WalRecord(rec),
1164 8 : );
1165 8 : Ok(())
1166 8 : }
1167 :
1168 : /// Like put_wal_record, but with ready-made image of the page.
1169 280864 : pub fn put_rel_page_image(
1170 280864 : &mut self,
1171 280864 : rel: RelTag,
1172 280864 : blknum: BlockNumber,
1173 280864 : img: Bytes,
1174 280864 : ) -> anyhow::Result<()> {
1175 280864 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1176 280864 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1177 280864 : Ok(())
1178 280864 : }
1179 :
1180 6 : pub fn put_slru_page_image(
1181 6 : &mut self,
1182 6 : kind: SlruKind,
1183 6 : segno: u32,
1184 6 : blknum: BlockNumber,
1185 6 : img: Bytes,
1186 6 : ) -> anyhow::Result<()> {
1187 6 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
1188 6 : Ok(())
1189 6 : }
1190 :
1191 : /// Store a relmapper file (pg_filenode.map) in the repository
1192 16 : pub async fn put_relmap_file(
1193 16 : &mut self,
1194 16 : spcnode: Oid,
1195 16 : dbnode: Oid,
1196 16 : img: Bytes,
1197 16 : ctx: &RequestContext,
1198 16 : ) -> anyhow::Result<()> {
1199 : // Add it to the directory (if it doesn't exist already)
1200 16 : let buf = self.get(DBDIR_KEY, ctx).await?;
1201 16 : let mut dbdir = DbDirectory::des(&buf)?;
1202 :
1203 16 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1204 16 : if r.is_none() || r == Some(false) {
1205 : // The dbdir entry didn't exist, or it contained a
1206 : // 'false'. The 'insert' call already updated it with
1207 : // 'true', now write the updated 'dbdirs' map back.
1208 16 : let buf = DbDirectory::ser(&dbdir)?;
1209 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1210 16 :
1211 16 : // Create AuxFilesDirectory as well
1212 16 : self.init_aux_dir()?;
1213 0 : }
1214 16 : if r.is_none() {
1215 8 : // Create RelDirectory
1216 8 : let buf = RelDirectory::ser(&RelDirectory {
1217 8 : rels: HashSet::new(),
1218 8 : })?;
1219 8 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1220 8 : self.put(
1221 8 : rel_dir_to_key(spcnode, dbnode),
1222 8 : Value::Image(Bytes::from(buf)),
1223 8 : );
1224 8 : }
1225 :
1226 16 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1227 16 : Ok(())
1228 16 : }
1229 :
1230 0 : pub async fn put_twophase_file(
1231 0 : &mut self,
1232 0 : xid: TransactionId,
1233 0 : img: Bytes,
1234 0 : ctx: &RequestContext,
1235 0 : ) -> anyhow::Result<()> {
1236 : // Add it to the directory entry
1237 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1238 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1239 0 : if !dir.xids.insert(xid) {
1240 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1241 0 : }
1242 0 : self.pending_directory_entries
1243 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1244 0 : self.put(
1245 0 : TWOPHASEDIR_KEY,
1246 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1247 : );
1248 :
1249 0 : self.put(twophase_file_key(xid), Value::Image(img));
1250 0 : Ok(())
1251 0 : }
1252 :
1253 0 : pub async fn set_replorigin(
1254 0 : &mut self,
1255 0 : origin_id: RepOriginId,
1256 0 : origin_lsn: Lsn,
1257 0 : ) -> anyhow::Result<()> {
1258 0 : let key = repl_origin_key(origin_id);
1259 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1260 0 : Ok(())
1261 0 : }
1262 :
1263 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1264 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1265 0 : }
1266 :
1267 160 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1268 160 : self.put(CONTROLFILE_KEY, Value::Image(img));
1269 160 : Ok(())
1270 160 : }
1271 :
1272 174 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1273 174 : self.put(CHECKPOINT_KEY, Value::Image(img));
1274 174 : Ok(())
1275 174 : }
1276 :
1277 0 : pub async fn drop_dbdir(
1278 0 : &mut self,
1279 0 : spcnode: Oid,
1280 0 : dbnode: Oid,
1281 0 : ctx: &RequestContext,
1282 0 : ) -> anyhow::Result<()> {
1283 0 : let total_blocks = self
1284 0 : .tline
1285 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1286 0 : .await?;
1287 :
1288 : // Remove entry from dbdir
1289 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1290 0 : let mut dir = DbDirectory::des(&buf)?;
1291 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1292 0 : let buf = DbDirectory::ser(&dir)?;
1293 0 : self.pending_directory_entries
1294 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1295 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1296 : } else {
1297 0 : warn!(
1298 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1299 : spcnode, dbnode
1300 : );
1301 : }
1302 :
1303 : // Update logical database size.
1304 0 : self.pending_nblocks -= total_blocks as i64;
1305 0 :
1306 0 : // Delete all relations and metadata files for the spcnode/dnode
1307 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1308 0 : Ok(())
1309 0 : }
1310 :
1311 : /// Create a relation fork.
1312 : ///
1313 : /// 'nblocks' is the initial size.
1314 1920 : pub async fn put_rel_creation(
1315 1920 : &mut self,
1316 1920 : rel: RelTag,
1317 1920 : nblocks: BlockNumber,
1318 1920 : ctx: &RequestContext,
1319 1920 : ) -> Result<(), RelationError> {
1320 1920 : if rel.relnode == 0 {
1321 0 : return Err(RelationError::InvalidRelnode);
1322 1920 : }
1323 : // It's possible that this is the first rel for this db in this
1324 : // tablespace. Create the reldir entry for it if so.
1325 1920 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1326 1920 : .context("deserialize db")?;
1327 1920 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1328 1920 : let mut rel_dir =
1329 1920 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1330 : // Didn't exist. Update dbdir
1331 8 : e.insert(false);
1332 8 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1333 8 : self.pending_directory_entries
1334 8 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1335 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1336 8 :
1337 8 : // and create the RelDirectory
1338 8 : RelDirectory::default()
1339 : } else {
1340 : // reldir already exists, fetch it
1341 1912 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1342 1912 : .context("deserialize db")?
1343 : };
1344 :
1345 : // Add the new relation to the rel directory entry, and write it back
1346 1920 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1347 0 : return Err(RelationError::AlreadyExists);
1348 1920 : }
1349 1920 :
1350 1920 : self.pending_directory_entries
1351 1920 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1352 1920 :
1353 1920 : self.put(
1354 1920 : rel_dir_key,
1355 1920 : Value::Image(Bytes::from(
1356 1920 : RelDirectory::ser(&rel_dir).context("serialize")?,
1357 : )),
1358 : );
1359 :
1360 : // Put size
1361 1920 : let size_key = rel_size_to_key(rel);
1362 1920 : let buf = nblocks.to_le_bytes();
1363 1920 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1364 1920 :
1365 1920 : self.pending_nblocks += nblocks as i64;
1366 1920 :
1367 1920 : // Update relation size cache
1368 1920 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1369 1920 :
1370 1920 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1371 1920 : // caller.
1372 1920 : Ok(())
1373 1920 : }
1374 :
1375 : /// Truncate relation
1376 6012 : pub async fn put_rel_truncation(
1377 6012 : &mut self,
1378 6012 : rel: RelTag,
1379 6012 : nblocks: BlockNumber,
1380 6012 : ctx: &RequestContext,
1381 6012 : ) -> anyhow::Result<()> {
1382 6012 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1383 6012 : if self
1384 6012 : .tline
1385 6012 : .get_rel_exists(rel, Version::Modified(self), ctx)
1386 0 : .await?
1387 : {
1388 6012 : let size_key = rel_size_to_key(rel);
1389 : // Fetch the old size first
1390 6012 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1391 6012 :
1392 6012 : // Update the entry with the new size.
1393 6012 : let buf = nblocks.to_le_bytes();
1394 6012 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1395 6012 :
1396 6012 : // Update relation size cache
1397 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1398 6012 :
1399 6012 : // Update relation size cache
1400 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1401 6012 :
1402 6012 : // Update logical database size.
1403 6012 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1404 0 : }
1405 6012 : Ok(())
1406 6012 : }
1407 :
1408 : /// Extend relation
1409 : /// If new size is smaller, do nothing.
1410 276680 : pub async fn put_rel_extend(
1411 276680 : &mut self,
1412 276680 : rel: RelTag,
1413 276680 : nblocks: BlockNumber,
1414 276680 : ctx: &RequestContext,
1415 276680 : ) -> anyhow::Result<()> {
1416 276680 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1417 :
1418 : // Put size
1419 276680 : let size_key = rel_size_to_key(rel);
1420 276680 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1421 276680 :
1422 276680 : // only extend relation here. never decrease the size
1423 276680 : if nblocks > old_size {
1424 274788 : let buf = nblocks.to_le_bytes();
1425 274788 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1426 274788 :
1427 274788 : // Update relation size cache
1428 274788 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1429 274788 :
1430 274788 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1431 274788 : }
1432 276680 : Ok(())
1433 276680 : }
1434 :
1435 : /// Drop a relation.
1436 2 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1437 2 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1438 :
1439 : // Remove it from the directory entry
1440 2 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1441 2 : let buf = self.get(dir_key, ctx).await?;
1442 2 : let mut dir = RelDirectory::des(&buf)?;
1443 :
1444 2 : self.pending_directory_entries
1445 2 : .push((DirectoryKind::Rel, dir.rels.len()));
1446 2 :
1447 2 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1448 2 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1449 : } else {
1450 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1451 : }
1452 :
1453 : // update logical size
1454 2 : let size_key = rel_size_to_key(rel);
1455 2 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1456 2 : self.pending_nblocks -= old_size as i64;
1457 2 :
1458 2 : // Remove enty from relation size cache
1459 2 : self.tline.remove_cached_rel_size(&rel);
1460 2 :
1461 2 : // Delete size entry, as well as all blocks
1462 2 : self.delete(rel_key_range(rel));
1463 2 :
1464 2 : Ok(())
1465 2 : }
1466 :
1467 6 : pub async fn put_slru_segment_creation(
1468 6 : &mut self,
1469 6 : kind: SlruKind,
1470 6 : segno: u32,
1471 6 : nblocks: BlockNumber,
1472 6 : ctx: &RequestContext,
1473 6 : ) -> anyhow::Result<()> {
1474 6 : // Add it to the directory entry
1475 6 : let dir_key = slru_dir_to_key(kind);
1476 6 : let buf = self.get(dir_key, ctx).await?;
1477 6 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1478 :
1479 6 : if !dir.segments.insert(segno) {
1480 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1481 6 : }
1482 6 : self.pending_directory_entries
1483 6 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1484 6 : self.put(
1485 6 : dir_key,
1486 6 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1487 : );
1488 :
1489 : // Put size
1490 6 : let size_key = slru_segment_size_to_key(kind, segno);
1491 6 : let buf = nblocks.to_le_bytes();
1492 6 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1493 6 :
1494 6 : // even if nblocks > 0, we don't insert any actual blocks here
1495 6 :
1496 6 : Ok(())
1497 6 : }
1498 :
1499 : /// Extend SLRU segment
1500 0 : pub fn put_slru_extend(
1501 0 : &mut self,
1502 0 : kind: SlruKind,
1503 0 : segno: u32,
1504 0 : nblocks: BlockNumber,
1505 0 : ) -> anyhow::Result<()> {
1506 0 : // Put size
1507 0 : let size_key = slru_segment_size_to_key(kind, segno);
1508 0 : let buf = nblocks.to_le_bytes();
1509 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1510 0 : Ok(())
1511 0 : }
1512 :
1513 : /// This method is used for marking truncated SLRU files
1514 0 : pub async fn drop_slru_segment(
1515 0 : &mut self,
1516 0 : kind: SlruKind,
1517 0 : segno: u32,
1518 0 : ctx: &RequestContext,
1519 0 : ) -> anyhow::Result<()> {
1520 0 : // Remove it from the directory entry
1521 0 : let dir_key = slru_dir_to_key(kind);
1522 0 : let buf = self.get(dir_key, ctx).await?;
1523 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1524 :
1525 0 : if !dir.segments.remove(&segno) {
1526 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1527 0 : }
1528 0 : self.pending_directory_entries
1529 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1530 0 : self.put(
1531 0 : dir_key,
1532 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1533 : );
1534 :
1535 : // Delete size entry, as well as all blocks
1536 0 : self.delete(slru_segment_key_range(kind, segno));
1537 0 :
1538 0 : Ok(())
1539 0 : }
1540 :
1541 : /// Drop a relmapper file (pg_filenode.map)
1542 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1543 0 : // TODO
1544 0 : Ok(())
1545 0 : }
1546 :
1547 : /// This method is used for marking truncated SLRU files
1548 0 : pub async fn drop_twophase_file(
1549 0 : &mut self,
1550 0 : xid: TransactionId,
1551 0 : ctx: &RequestContext,
1552 0 : ) -> anyhow::Result<()> {
1553 : // Remove it from the directory entry
1554 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1555 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1556 :
1557 0 : if !dir.xids.remove(&xid) {
1558 0 : warn!("twophase file for xid {} does not exist", xid);
1559 0 : }
1560 0 : self.pending_directory_entries
1561 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1562 0 : self.put(
1563 0 : TWOPHASEDIR_KEY,
1564 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1565 : );
1566 :
1567 : // Delete it
1568 0 : self.delete(twophase_key_range(xid));
1569 0 :
1570 0 : Ok(())
1571 0 : }
1572 :
1573 176 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1574 176 : if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
1575 2 : return Ok(());
1576 174 : }
1577 174 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1578 174 : files: HashMap::new(),
1579 174 : })?;
1580 174 : self.pending_directory_entries
1581 174 : .push((DirectoryKind::AuxFiles, 0));
1582 174 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1583 174 : Ok(())
1584 176 : }
1585 :
1586 30 : pub async fn put_file(
1587 30 : &mut self,
1588 30 : path: &str,
1589 30 : content: &[u8],
1590 30 : ctx: &RequestContext,
1591 30 : ) -> anyhow::Result<()> {
1592 30 : let switch_policy = self.tline.get_switch_aux_file_policy();
1593 :
1594 30 : let policy = {
1595 30 : let current_policy = self.tline.last_aux_file_policy.load();
1596 : // Allowed switch path:
1597 : // * no aux files -> v1/v2/cross-validation
1598 : // * cross-validation->v2
1599 :
1600 30 : let current_policy = if current_policy.is_none() {
1601 : // This path will only be hit once per tenant: we will decide the final policy in this code block.
1602 : // The next call to `put_file` will always have `last_aux_file_policy != None`.
1603 12 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1604 12 : let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
1605 12 : if aux_files_key_v1.is_empty() {
1606 10 : None
1607 : } else {
1608 2 : self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
1609 2 : Some(AuxFilePolicy::V1)
1610 : }
1611 : } else {
1612 18 : current_policy
1613 : };
1614 :
1615 30 : if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
1616 12 : self.tline.do_switch_aux_policy(switch_policy)?;
1617 12 : info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
1618 12 : switch_policy
1619 : } else {
1620 : // This branch handles non-valid migration path, and the case that switch_policy == current_policy.
1621 : // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
1622 18 : current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
1623 : }
1624 : };
1625 :
1626 30 : if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
1627 10 : let key = aux_file::encode_aux_file_key(path);
1628 : // retrieve the key from the engine
1629 10 : let old_val = match self.get(key, ctx).await {
1630 2 : Ok(val) => Some(val),
1631 8 : Err(PageReconstructError::MissingKey(_)) => None,
1632 0 : Err(e) => return Err(e.into()),
1633 : };
1634 10 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
1635 2 : aux_file::decode_file_value(old_val)?
1636 : } else {
1637 8 : Vec::new()
1638 : };
1639 10 : let mut other_files = Vec::with_capacity(files.len());
1640 10 : let mut modifying_file = None;
1641 12 : for file @ (p, content) in files {
1642 2 : if path == p {
1643 2 : assert!(
1644 2 : modifying_file.is_none(),
1645 0 : "duplicated entries found for {}",
1646 : path
1647 : );
1648 2 : modifying_file = Some(content);
1649 0 : } else {
1650 0 : other_files.push(file);
1651 0 : }
1652 : }
1653 10 : let mut new_files = other_files;
1654 10 : match (modifying_file, content.is_empty()) {
1655 2 : (Some(old_content), false) => {
1656 2 : self.tline
1657 2 : .aux_file_size_estimator
1658 2 : .on_update(old_content.len(), content.len());
1659 2 : new_files.push((path, content));
1660 2 : }
1661 0 : (Some(old_content), true) => {
1662 0 : self.tline
1663 0 : .aux_file_size_estimator
1664 0 : .on_remove(old_content.len());
1665 0 : // not adding the file key to the final `new_files` vec.
1666 0 : }
1667 8 : (None, false) => {
1668 8 : self.tline.aux_file_size_estimator.on_add(content.len());
1669 8 : new_files.push((path, content));
1670 8 : }
1671 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
1672 : }
1673 10 : let new_val = aux_file::encode_file_value(&new_files)?;
1674 10 : self.put(key, Value::Image(new_val.into()));
1675 20 : }
1676 :
1677 30 : if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
1678 22 : let file_path = path.to_string();
1679 22 : let content = if content.is_empty() {
1680 2 : None
1681 : } else {
1682 20 : Some(Bytes::copy_from_slice(content))
1683 : };
1684 :
1685 : let n_files;
1686 22 : let mut aux_files = self.tline.aux_files.lock().await;
1687 22 : if let Some(mut dir) = aux_files.dir.take() {
1688 : // We already updated aux files in `self`: emit a delta and update our latest value.
1689 10 : dir.upsert(file_path.clone(), content.clone());
1690 10 : n_files = dir.files.len();
1691 10 : if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
1692 0 : self.put(
1693 0 : AUX_FILES_KEY,
1694 0 : Value::Image(Bytes::from(
1695 0 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1696 : )),
1697 : );
1698 0 : aux_files.n_deltas = 0;
1699 10 : } else {
1700 10 : self.put(
1701 10 : AUX_FILES_KEY,
1702 10 : Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
1703 10 : );
1704 10 : aux_files.n_deltas += 1;
1705 10 : }
1706 10 : aux_files.dir = Some(dir);
1707 : } else {
1708 : // Check if the AUX_FILES_KEY is initialized
1709 12 : match self.get(AUX_FILES_KEY, ctx).await {
1710 8 : Ok(dir_bytes) => {
1711 8 : let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
1712 : // Key is already set, we may append a delta
1713 8 : self.put(
1714 8 : AUX_FILES_KEY,
1715 8 : Value::WalRecord(NeonWalRecord::AuxFile {
1716 8 : file_path: file_path.clone(),
1717 8 : content: content.clone(),
1718 8 : }),
1719 8 : );
1720 8 : dir.upsert(file_path, content);
1721 8 : n_files = dir.files.len();
1722 8 : aux_files.dir = Some(dir);
1723 : }
1724 : Err(
1725 0 : e @ (PageReconstructError::Cancelled
1726 0 : | PageReconstructError::AncestorLsnTimeout(_)),
1727 0 : ) => {
1728 0 : // Important that we do not interpret a shutdown error as "not found" and thereby
1729 0 : // reset the map.
1730 0 : return Err(e.into());
1731 : }
1732 : // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
1733 : // the original code assumes all other errors are missing keys. Therefore, we keep the code path
1734 : // the same for now, though in theory, we should only match the `MissingKey` variant.
1735 : Err(
1736 : PageReconstructError::Other(_)
1737 : | PageReconstructError::WalRedo(_)
1738 : | PageReconstructError::MissingKey { .. },
1739 : ) => {
1740 : // Key is missing, we must insert an image as the basis for subsequent deltas.
1741 :
1742 4 : let mut dir = AuxFilesDirectory {
1743 4 : files: HashMap::new(),
1744 4 : };
1745 4 : dir.upsert(file_path, content);
1746 4 : self.put(
1747 4 : AUX_FILES_KEY,
1748 4 : Value::Image(Bytes::from(
1749 4 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1750 : )),
1751 : );
1752 4 : n_files = 1;
1753 4 : aux_files.dir = Some(dir);
1754 : }
1755 : }
1756 : }
1757 :
1758 22 : self.pending_directory_entries
1759 22 : .push((DirectoryKind::AuxFiles, n_files));
1760 8 : }
1761 :
1762 30 : Ok(())
1763 30 : }
1764 :
1765 : ///
1766 : /// Flush changes accumulated so far to the underlying repository.
1767 : ///
1768 : /// Usually, changes made in DatadirModification are atomic, but this allows
1769 : /// you to flush them to the underlying repository before the final `commit`.
1770 : /// That allows to free up the memory used to hold the pending changes.
1771 : ///
1772 : /// Currently only used during bulk import of a data directory. In that
1773 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1774 : /// whole import fails and the timeline will be deleted anyway.
1775 : /// (Or to be precise, it will be left behind for debugging purposes and
1776 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1777 : ///
1778 : /// Note: A consequence of flushing the pending operations is that they
1779 : /// won't be visible to subsequent operations until `commit`. The function
1780 : /// retains all the metadata, but data pages are flushed. That's again OK
1781 : /// for bulk import, where you are just loading data pages and won't try to
1782 : /// modify the same pages twice.
1783 1930 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1784 1930 : // Unless we have accumulated a decent amount of changes, it's not worth it
1785 1930 : // to scan through the pending_updates list.
1786 1930 : let pending_nblocks = self.pending_nblocks;
1787 1930 : if pending_nblocks < 10000 {
1788 1930 : return Ok(());
1789 0 : }
1790 :
1791 0 : let mut writer = self.tline.writer().await;
1792 :
1793 : // Flush relation and SLRU data blocks, keep metadata.
1794 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1795 0 : for (key, values) in self.pending_updates.drain() {
1796 0 : for (lsn, value) in values {
1797 0 : if key.is_rel_block_key() || key.is_slru_block_key() {
1798 : // This bails out on first error without modifying pending_updates.
1799 : // That's Ok, cf this function's doc comment.
1800 0 : writer.put(key, lsn, &value, ctx).await?;
1801 0 : } else {
1802 0 : retained_pending_updates
1803 0 : .entry(key)
1804 0 : .or_default()
1805 0 : .push((lsn, value));
1806 0 : }
1807 : }
1808 : }
1809 :
1810 0 : self.pending_updates = retained_pending_updates;
1811 0 :
1812 0 : if pending_nblocks != 0 {
1813 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1814 0 : self.pending_nblocks = 0;
1815 0 : }
1816 :
1817 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1818 0 : writer.update_directory_entries_count(kind, count as u64);
1819 0 : }
1820 :
1821 0 : Ok(())
1822 1930 : }
1823 :
1824 : ///
1825 : /// Finish this atomic update, writing all the updated keys to the
1826 : /// underlying timeline.
1827 : /// All the modifications in this atomic update are stamped by the specified LSN.
1828 : ///
1829 743062 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1830 743062 : let mut writer = self.tline.writer().await;
1831 :
1832 743062 : let pending_nblocks = self.pending_nblocks;
1833 743062 : self.pending_nblocks = 0;
1834 743062 :
1835 743062 : if !self.pending_updates.is_empty() {
1836 : // The put_batch call below expects expects the inputs to be sorted by Lsn,
1837 : // so we do that first.
1838 414046 : let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
1839 414046 : self.pending_updates
1840 414046 : .drain()
1841 700392 : .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
1842 414046 : .kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
1843 414046 : VecMapOrdering::GreaterOrEqual,
1844 414046 : );
1845 414046 :
1846 414046 : writer.put_batch(lsn_ordered_batch, ctx).await?;
1847 329016 : }
1848 :
1849 743062 : if !self.pending_deletions.is_empty() {
1850 2 : writer.delete_batch(&self.pending_deletions, ctx).await?;
1851 2 : self.pending_deletions.clear();
1852 743060 : }
1853 :
1854 743062 : self.pending_lsns.push(self.lsn);
1855 888920 : for pending_lsn in self.pending_lsns.drain(..) {
1856 888920 : // Ideally, we should be able to call writer.finish_write() only once
1857 888920 : // with the highest LSN. However, the last_record_lsn variable in the
1858 888920 : // timeline keeps track of the latest LSN and the immediate previous LSN
1859 888920 : // so we need to record every LSN to not leave a gap between them.
1860 888920 : writer.finish_write(pending_lsn);
1861 888920 : }
1862 :
1863 743062 : if pending_nblocks != 0 {
1864 270570 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1865 472492 : }
1866 :
1867 743062 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1868 2940 : writer.update_directory_entries_count(kind, count as u64);
1869 2940 : }
1870 :
1871 743062 : Ok(())
1872 743062 : }
1873 :
1874 291704 : pub(crate) fn len(&self) -> usize {
1875 291704 : self.pending_updates.len() + self.pending_deletions.len()
1876 291704 : }
1877 :
1878 : // Internal helper functions to batch the modifications
1879 :
1880 286592 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1881 : // Have we already updated the same key? Read the latest pending updated
1882 : // version in that case.
1883 : //
1884 : // Note: we don't check pending_deletions. It is an error to request a
1885 : // value that has been removed, deletion only avoids leaking storage.
1886 286592 : if let Some(values) = self.pending_updates.get(&key) {
1887 15928 : if let Some((_, value)) = values.last() {
1888 15928 : return if let Value::Image(img) = value {
1889 15928 : Ok(img.clone())
1890 : } else {
1891 : // Currently, we never need to read back a WAL record that we
1892 : // inserted in the same "transaction". All the metadata updates
1893 : // work directly with Images, and we never need to read actual
1894 : // data pages. We could handle this if we had to, by calling
1895 : // the walredo manager, but let's keep it simple for now.
1896 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1897 0 : "unexpected pending WAL record"
1898 0 : )))
1899 : };
1900 0 : }
1901 270664 : }
1902 270664 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1903 270664 : self.tline.get(key, lsn, ctx).await
1904 286592 : }
1905 :
1906 : /// Only used during unit tests, force putting a key into the modification.
1907 : #[cfg(test)]
1908 2 : pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
1909 2 : self.put(key, val);
1910 2 : }
1911 :
1912 712552 : fn put(&mut self, key: Key, val: Value) {
1913 712552 : let values = self.pending_updates.entry(key).or_default();
1914 : // Replace the previous value if it exists at the same lsn
1915 712552 : if let Some((last_lsn, last_value)) = values.last_mut() {
1916 12166 : if *last_lsn == self.lsn {
1917 12160 : *last_value = val;
1918 12160 : return;
1919 6 : }
1920 700386 : }
1921 700392 : values.push((self.lsn, val));
1922 712552 : }
1923 :
1924 2 : fn delete(&mut self, key_range: Range<Key>) {
1925 2 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1926 2 : self.pending_deletions.push((key_range, self.lsn));
1927 2 : }
1928 : }
1929 :
1930 : /// This struct facilitates accessing either a committed key from the timeline at a
1931 : /// specific LSN, or the latest uncommitted key from a pending modification.
1932 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1933 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1934 : /// need to look up the keys in the modification first before looking them up in the
1935 : /// timeline to not miss the latest updates.
1936 : #[derive(Clone, Copy)]
1937 : pub enum Version<'a> {
1938 : Lsn(Lsn),
1939 : Modified(&'a DatadirModification<'a>),
1940 : }
1941 :
1942 : impl<'a> Version<'a> {
1943 23560 : async fn get(
1944 23560 : &self,
1945 23560 : timeline: &Timeline,
1946 23560 : key: Key,
1947 23560 : ctx: &RequestContext,
1948 23560 : ) -> Result<Bytes, PageReconstructError> {
1949 23560 : match self {
1950 23540 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1951 20 : Version::Modified(modification) => modification.get(key, ctx).await,
1952 : }
1953 23560 : }
1954 :
1955 35620 : fn get_lsn(&self) -> Lsn {
1956 35620 : match self {
1957 29574 : Version::Lsn(lsn) => *lsn,
1958 6046 : Version::Modified(modification) => modification.lsn,
1959 : }
1960 35620 : }
1961 : }
1962 :
1963 : //--- Metadata structs stored in key-value pairs in the repository.
1964 :
1965 2228 : #[derive(Debug, Serialize, Deserialize)]
1966 : struct DbDirectory {
1967 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1968 : dbdirs: HashMap<(Oid, Oid), bool>,
1969 : }
1970 :
1971 276 : #[derive(Debug, Serialize, Deserialize)]
1972 : struct TwoPhaseDirectory {
1973 : xids: HashSet<TransactionId>,
1974 : }
1975 :
1976 1932 : #[derive(Debug, Serialize, Deserialize, Default)]
1977 : struct RelDirectory {
1978 : // Set of relations that exist. (relfilenode, forknum)
1979 : //
1980 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1981 : // key-value pairs, if you have a lot of relations
1982 : rels: HashSet<(Oid, u8)>,
1983 : }
1984 :
1985 58 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
1986 : pub(crate) struct AuxFilesDirectory {
1987 : pub(crate) files: HashMap<String, Bytes>,
1988 : }
1989 :
1990 : impl AuxFilesDirectory {
1991 48 : pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
1992 48 : if let Some(value) = value {
1993 42 : self.files.insert(key, value);
1994 42 : } else {
1995 6 : self.files.remove(&key);
1996 6 : }
1997 48 : }
1998 : }
1999 :
2000 0 : #[derive(Debug, Serialize, Deserialize)]
2001 : struct RelSizeEntry {
2002 : nblocks: u32,
2003 : }
2004 :
2005 828 : #[derive(Debug, Serialize, Deserialize, Default)]
2006 : struct SlruSegmentDirectory {
2007 : // Set of SLRU segments that exist.
2008 : segments: HashSet<u32>,
2009 : }
2010 :
2011 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2012 : #[repr(u8)]
2013 : pub(crate) enum DirectoryKind {
2014 : Db,
2015 : TwoPhase,
2016 : Rel,
2017 : AuxFiles,
2018 : SlruSegment(SlruKind),
2019 : }
2020 :
2021 : impl DirectoryKind {
2022 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2023 5880 : pub(crate) fn offset(&self) -> usize {
2024 5880 : self.into_usize()
2025 5880 : }
2026 : }
2027 :
2028 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2029 :
2030 : #[allow(clippy::bool_assert_comparison)]
2031 : #[cfg(test)]
2032 : mod tests {
2033 : use hex_literal::hex;
2034 : use utils::id::TimelineId;
2035 :
2036 : use super::*;
2037 :
2038 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
2039 :
2040 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2041 : #[tokio::test]
2042 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2043 2 : let name = "aux_files_round_trip";
2044 2 : let harness = TenantHarness::create(name).await?;
2045 2 :
2046 2 : pub const TIMELINE_ID: TimelineId =
2047 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2048 2 :
2049 8 : let (tenant, ctx) = harness.load().await;
2050 2 : let tline = tenant
2051 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
2052 2 : .await?;
2053 2 : let tline = tline.raw_timeline().unwrap();
2054 2 :
2055 2 : // First modification: insert two keys
2056 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
2057 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2058 2 : modification.set_lsn(Lsn(0x1008))?;
2059 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2060 2 : modification.commit(&ctx).await?;
2061 2 : let expect_1008 = HashMap::from([
2062 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2063 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2064 2 : ]);
2065 2 :
2066 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2067 2 : assert_eq!(readback, expect_1008);
2068 2 :
2069 2 : // Second modification: update one key, remove the other
2070 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
2071 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2072 2 : modification.set_lsn(Lsn(0x2008))?;
2073 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
2074 2 : modification.commit(&ctx).await?;
2075 2 : let expect_2008 =
2076 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2077 2 :
2078 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
2079 2 : assert_eq!(readback, expect_2008);
2080 2 :
2081 2 : // Reading back in time works
2082 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2083 2 : assert_eq!(readback, expect_1008);
2084 2 :
2085 2 : Ok(())
2086 2 : }
2087 :
2088 : /*
2089 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2090 : let incremental = timeline.get_current_logical_size();
2091 : let non_incremental = timeline
2092 : .get_current_logical_size_non_incremental(lsn)
2093 : .unwrap();
2094 : assert_eq!(incremental, non_incremental);
2095 : }
2096 : */
2097 :
2098 : /*
2099 : ///
2100 : /// Test list_rels() function, with branches and dropped relations
2101 : ///
2102 : #[test]
2103 : fn test_list_rels_drop() -> Result<()> {
2104 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2105 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2106 : const TESTDB: u32 = 111;
2107 :
2108 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2109 : // after branching fails below
2110 : let mut writer = tline.begin_record(Lsn(0x10));
2111 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2112 : writer.finish()?;
2113 :
2114 : // Create a relation on the timeline
2115 : let mut writer = tline.begin_record(Lsn(0x20));
2116 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2117 : writer.finish()?;
2118 :
2119 : let writer = tline.begin_record(Lsn(0x00));
2120 : writer.finish()?;
2121 :
2122 : // Check that list_rels() lists it after LSN 2, but no before it
2123 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2124 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2125 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2126 :
2127 : // Create a branch, check that the relation is visible there
2128 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2129 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2130 : Some(timeline) => timeline,
2131 : None => panic!("Should have a local timeline"),
2132 : };
2133 : let newtline = DatadirTimelineImpl::new(newtline);
2134 : assert!(newtline
2135 : .list_rels(0, TESTDB, Lsn(0x30))?
2136 : .contains(&TESTREL_A));
2137 :
2138 : // Drop it on the branch
2139 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2140 : new_writer.drop_relation(TESTREL_A)?;
2141 : new_writer.finish()?;
2142 :
2143 : // Check that it's no longer listed on the branch after the point where it was dropped
2144 : assert!(newtline
2145 : .list_rels(0, TESTDB, Lsn(0x30))?
2146 : .contains(&TESTREL_A));
2147 : assert!(!newtline
2148 : .list_rels(0, TESTDB, Lsn(0x40))?
2149 : .contains(&TESTREL_A));
2150 :
2151 : // Run checkpoint and garbage collection and check that it's still not visible
2152 : newtline.checkpoint(CheckpointConfig::Forced)?;
2153 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2154 :
2155 : assert!(!newtline
2156 : .list_rels(0, TESTDB, Lsn(0x40))?
2157 : .contains(&TESTREL_A));
2158 :
2159 : Ok(())
2160 : }
2161 : */
2162 :
2163 : /*
2164 : #[test]
2165 : fn test_read_beyond_eof() -> Result<()> {
2166 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2167 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2168 :
2169 : make_some_layers(&tline, Lsn(0x20))?;
2170 : let mut writer = tline.begin_record(Lsn(0x60));
2171 : walingest.put_rel_page_image(
2172 : &mut writer,
2173 : TESTREL_A,
2174 : 0,
2175 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2176 : )?;
2177 : writer.finish()?;
2178 :
2179 : // Test read before rel creation. Should error out.
2180 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2181 :
2182 : // Read block beyond end of relation at different points in time.
2183 : // These reads should fall into different delta, image, and in-memory layers.
2184 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2185 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2186 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2187 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2188 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2189 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2190 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2191 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2192 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2193 :
2194 : // Test on an in-memory layer with no preceding layer
2195 : let mut writer = tline.begin_record(Lsn(0x70));
2196 : walingest.put_rel_page_image(
2197 : &mut writer,
2198 : TESTREL_B,
2199 : 0,
2200 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2201 : )?;
2202 : writer.finish()?;
2203 :
2204 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2205 :
2206 : Ok(())
2207 : }
2208 : */
2209 : }
|