Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::repository::*;
13 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
14 : use crate::walrecord::NeonWalRecord;
15 : use anyhow::{ensure, Context};
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use pageserver_api::key::{
18 : dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
19 : rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
20 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
21 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
22 : };
23 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
24 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
25 : use postgres_ffi::BLCKSZ;
26 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
27 : use serde::{Deserialize, Serialize};
28 : use std::collections::{hash_map, HashMap, HashSet};
29 : use std::ops::ControlFlow;
30 : use std::ops::Range;
31 : use strum::IntoEnumIterator;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::{debug, trace, warn};
34 : use utils::bin_ser::DeserializeError;
35 : use utils::{bin_ser::BeSer, lsn::Lsn};
36 :
37 0 : #[derive(Debug)]
38 : pub enum LsnForTimestamp {
39 : /// Found commits both before and after the given timestamp
40 : Present(Lsn),
41 :
42 : /// Found no commits after the given timestamp, this means
43 : /// that the newest data in the branch is older than the given
44 : /// timestamp.
45 : ///
46 : /// All commits <= LSN happened before the given timestamp
47 : Future(Lsn),
48 :
49 : /// The queried timestamp is past our horizon we look back at (PITR)
50 : ///
51 : /// All commits > LSN happened after the given timestamp,
52 : /// but any commits < LSN might have happened before or after
53 : /// the given timestamp. We don't know because no data before
54 : /// the given lsn is available.
55 : Past(Lsn),
56 :
57 : /// We have found no commit with a timestamp,
58 : /// so we can't return anything meaningful.
59 : ///
60 : /// The associated LSN is the lower bound value we can safely
61 : /// create branches on, but no statement is made if it is
62 : /// older or newer than the timestamp.
63 : ///
64 : /// This variant can e.g. be returned right after a
65 : /// cluster import.
66 : NoData(Lsn),
67 : }
68 :
69 0 : #[derive(Debug, thiserror::Error)]
70 : pub enum CalculateLogicalSizeError {
71 : #[error("cancelled")]
72 : Cancelled,
73 : #[error(transparent)]
74 : Other(#[from] anyhow::Error),
75 : }
76 :
77 2 : #[derive(Debug, thiserror::Error)]
78 : pub(crate) enum CollectKeySpaceError {
79 : #[error(transparent)]
80 : Decode(#[from] DeserializeError),
81 : #[error(transparent)]
82 : PageRead(PageReconstructError),
83 : #[error("cancelled")]
84 : Cancelled,
85 : }
86 :
87 : impl From<PageReconstructError> for CollectKeySpaceError {
88 3 : fn from(err: PageReconstructError) -> Self {
89 3 : match err {
90 1 : PageReconstructError::Cancelled => Self::Cancelled,
91 2 : err => Self::PageRead(err),
92 : }
93 3 : }
94 : }
95 :
96 : impl From<PageReconstructError> for CalculateLogicalSizeError {
97 31 : fn from(pre: PageReconstructError) -> Self {
98 31 : match pre {
99 : PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
100 29 : Self::Cancelled
101 : }
102 2 : _ => Self::Other(pre.into()),
103 : }
104 31 : }
105 : }
106 :
107 0 : #[derive(Debug, thiserror::Error)]
108 : pub enum RelationError {
109 : #[error("Relation Already Exists")]
110 : AlreadyExists,
111 : #[error("invalid relnode")]
112 : InvalidRelnode,
113 : #[error(transparent)]
114 : Other(#[from] anyhow::Error),
115 : }
116 :
117 : ///
118 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
119 : /// and other special kinds of files, in a versioned key-value store. The
120 : /// Timeline struct provides the key-value store.
121 : ///
122 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
123 : /// implementation, and might be moved into a separate struct later.
124 : impl Timeline {
125 : /// Start ingesting a WAL record, or other atomic modification of
126 : /// the timeline.
127 : ///
128 : /// This provides a transaction-like interface to perform a bunch
129 : /// of modifications atomically.
130 : ///
131 : /// To ingest a WAL record, call begin_modification(lsn) to get a
132 : /// DatadirModification object. Use the functions in the object to
133 : /// modify the repository state, updating all the pages and metadata
134 : /// that the WAL record affects. When you're done, call commit() to
135 : /// commit the changes.
136 : ///
137 : /// Lsn stored in modification is advanced by `ingest_record` and
138 : /// is used by `commit()` to update `last_record_lsn`.
139 : ///
140 : /// Calling commit() will flush all the changes and reset the state,
141 : /// so the `DatadirModification` struct can be reused to perform the next modification.
142 : ///
143 : /// Note that any pending modifications you make through the
144 : /// modification object won't be visible to calls to the 'get' and list
145 : /// functions of the timeline until you finish! And if you update the
146 : /// same page twice, the last update wins.
147 : ///
148 1061791 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
149 1061791 : where
150 1061791 : Self: Sized,
151 1061791 : {
152 1061791 : DatadirModification {
153 1061791 : tline: self,
154 1061791 : pending_lsns: Vec::new(),
155 1061791 : pending_updates: HashMap::new(),
156 1061791 : pending_deletions: Vec::new(),
157 1061791 : pending_nblocks: 0,
158 1061791 : lsn,
159 1061791 : }
160 1061791 : }
161 :
162 : //------------------------------------------------------------------------------
163 : // Public GET functions
164 : //------------------------------------------------------------------------------
165 :
166 : /// Look up given page version.
167 5622574 : pub(crate) async fn get_rel_page_at_lsn(
168 5622574 : &self,
169 5622574 : tag: RelTag,
170 5622574 : blknum: BlockNumber,
171 5622574 : version: Version<'_>,
172 5622574 : horizon: Lsn,
173 5622574 : ctx: &RequestContext,
174 5622574 : ) -> Result<Bytes, PageReconstructError> {
175 5622574 : if tag.relnode == 0 {
176 0 : return Err(PageReconstructError::Other(
177 0 : RelationError::InvalidRelnode.into(),
178 0 : ));
179 5622574 : }
180 :
181 5622574 : let nblocks = self.get_rel_size(tag, version, horizon, ctx).await?;
182 5622574 : if blknum >= nblocks {
183 0 : debug!(
184 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
185 0 : tag,
186 0 : blknum,
187 0 : version.get_lsn(),
188 0 : nblocks
189 0 : );
190 129498 : return Ok(ZERO_PAGE.clone());
191 5493076 : }
192 5493076 :
193 5493076 : let key = rel_block_to_key(tag, blknum);
194 5493076 : version.get(self, key, ctx).await
195 5622574 : }
196 :
197 : // Get size of a database in blocks
198 8 : pub(crate) async fn get_db_size(
199 8 : &self,
200 8 : spcnode: Oid,
201 8 : dbnode: Oid,
202 8 : version: Version<'_>,
203 8 : horizon: Lsn,
204 8 : ctx: &RequestContext,
205 8 : ) -> Result<usize, PageReconstructError> {
206 8 : let mut total_blocks = 0;
207 :
208 8 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
209 :
210 2351 : for rel in rels {
211 2343 : let n_blocks = self.get_rel_size(rel, version, horizon, ctx).await?;
212 2343 : total_blocks += n_blocks as usize;
213 : }
214 8 : Ok(total_blocks)
215 8 : }
216 :
217 : /// Get size of a relation file
218 5775974 : pub(crate) async fn get_rel_size(
219 5775974 : &self,
220 5775974 : tag: RelTag,
221 5775974 : version: Version<'_>,
222 5775974 : horizon: Lsn,
223 5775974 : ctx: &RequestContext,
224 5775974 : ) -> Result<BlockNumber, PageReconstructError> {
225 5775974 : if tag.relnode == 0 {
226 0 : return Err(PageReconstructError::Other(
227 0 : RelationError::InvalidRelnode.into(),
228 0 : ));
229 5775974 : }
230 :
231 5775974 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
232 5380575 : return Ok(nblocks);
233 395399 : }
234 395399 :
235 395399 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
236 7387 : && !self.get_rel_exists(tag, version, horizon, ctx).await?
237 : {
238 : // FIXME: Postgres sometimes calls smgrcreate() to create
239 : // FSM, and smgrnblocks() on it immediately afterwards,
240 : // without extending it. Tolerate that by claiming that
241 : // any non-existent FSM fork has size 0.
242 27 : return Ok(0);
243 395372 : }
244 395372 :
245 395372 : let key = rel_size_to_key(tag);
246 395372 : let mut buf = version.get(self, key, ctx).await?;
247 395368 : let nblocks = buf.get_u32_le();
248 395368 :
249 395368 : if horizon == Lsn::MAX {
250 155970 : // Update relation size cache only if latest version is requested.
251 155970 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
252 239398 : }
253 395368 : Ok(nblocks)
254 5775974 : }
255 :
256 : /// Does relation exist?
257 441817 : pub(crate) async fn get_rel_exists(
258 441817 : &self,
259 441817 : tag: RelTag,
260 441817 : version: Version<'_>,
261 441817 : _horizon: Lsn,
262 441817 : ctx: &RequestContext,
263 441817 : ) -> Result<bool, PageReconstructError> {
264 441817 : if tag.relnode == 0 {
265 0 : return Err(PageReconstructError::Other(
266 0 : RelationError::InvalidRelnode.into(),
267 0 : ));
268 441817 : }
269 :
270 : // first try to lookup relation in cache
271 441817 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
272 180049 : return Ok(true);
273 261768 : }
274 261768 : // fetch directory listing
275 261768 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
276 261768 : let buf = version.get(self, key, ctx).await?;
277 :
278 261768 : match RelDirectory::des(&buf).context("deserialization failure") {
279 261768 : Ok(dir) => {
280 261768 : let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
281 261768 : Ok(exists)
282 : }
283 0 : Err(e) => Err(PageReconstructError::from(e)),
284 : }
285 441817 : }
286 :
287 : /// Get a list of all existing relations in given tablespace and database.
288 : ///
289 : /// # Cancel-Safety
290 : ///
291 : /// This method is cancellation-safe.
292 8142 : pub(crate) async fn list_rels(
293 8142 : &self,
294 8142 : spcnode: Oid,
295 8142 : dbnode: Oid,
296 8142 : version: Version<'_>,
297 8142 : ctx: &RequestContext,
298 8142 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
299 8142 : // fetch directory listing
300 8142 : let key = rel_dir_to_key(spcnode, dbnode);
301 8142 : let buf = version.get(self, key, ctx).await?;
302 :
303 8141 : match RelDirectory::des(&buf).context("deserialization failure") {
304 8141 : Ok(dir) => {
305 8141 : let rels: HashSet<RelTag> =
306 1927910 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
307 1927910 : spcnode,
308 1927910 : dbnode,
309 1927910 : relnode: *relnode,
310 1927910 : forknum: *forknum,
311 1927910 : }));
312 8141 :
313 8141 : Ok(rels)
314 : }
315 0 : Err(e) => Err(PageReconstructError::from(e)),
316 : }
317 8142 : }
318 :
319 : /// Get the whole SLRU segment
320 0 : pub(crate) async fn get_slru_segment(
321 0 : &self,
322 0 : kind: SlruKind,
323 0 : segno: u32,
324 0 : lsn: Lsn,
325 0 : ctx: &RequestContext,
326 0 : ) -> Result<Bytes, PageReconstructError> {
327 0 : let n_blocks = self
328 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
329 0 : .await?;
330 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
331 0 : for blkno in 0..n_blocks {
332 0 : let block = self
333 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
334 0 : .await?;
335 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
336 : }
337 0 : Ok(segment.freeze())
338 0 : }
339 :
340 : /// Look up given SLRU page version.
341 3174 : pub(crate) async fn get_slru_page_at_lsn(
342 3174 : &self,
343 3174 : kind: SlruKind,
344 3174 : segno: u32,
345 3174 : blknum: BlockNumber,
346 3174 : lsn: Lsn,
347 3174 : ctx: &RequestContext,
348 3174 : ) -> Result<Bytes, PageReconstructError> {
349 3174 : let key = slru_block_to_key(kind, segno, blknum);
350 262876 : self.get(key, lsn, ctx).await
351 3173 : }
352 :
353 : /// Get size of an SLRU segment
354 6323 : pub(crate) async fn get_slru_segment_size(
355 6323 : &self,
356 6323 : kind: SlruKind,
357 6323 : segno: u32,
358 6323 : version: Version<'_>,
359 6323 : ctx: &RequestContext,
360 6323 : ) -> Result<BlockNumber, PageReconstructError> {
361 6323 : let key = slru_segment_size_to_key(kind, segno);
362 6323 : let mut buf = version.get(self, key, ctx).await?;
363 6323 : Ok(buf.get_u32_le())
364 6323 : }
365 :
366 : /// Get size of an SLRU segment
367 1339 : pub(crate) async fn get_slru_segment_exists(
368 1339 : &self,
369 1339 : kind: SlruKind,
370 1339 : segno: u32,
371 1339 : version: Version<'_>,
372 1339 : ctx: &RequestContext,
373 1339 : ) -> Result<bool, PageReconstructError> {
374 1339 : // fetch directory listing
375 1339 : let key = slru_dir_to_key(kind);
376 1339 : let buf = version.get(self, key, ctx).await?;
377 :
378 1339 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
379 1339 : Ok(dir) => {
380 1339 : let exists = dir.segments.get(&segno).is_some();
381 1339 : Ok(exists)
382 : }
383 0 : Err(e) => Err(PageReconstructError::from(e)),
384 : }
385 1339 : }
386 :
387 : /// Locate LSN, such that all transactions that committed before
388 : /// 'search_timestamp' are visible, but nothing newer is.
389 : ///
390 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
391 : /// so it's not well defined which LSN you get if there were multiple commits
392 : /// "in flight" at that point in time.
393 : ///
394 190 : pub(crate) async fn find_lsn_for_timestamp(
395 190 : &self,
396 190 : search_timestamp: TimestampTz,
397 190 : cancel: &CancellationToken,
398 190 : ctx: &RequestContext,
399 190 : ) -> Result<LsnForTimestamp, PageReconstructError> {
400 190 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
401 190 : // We use this method to figure out the branching LSN for the new branch, but the
402 190 : // GC cutoff could be before the branching point and we cannot create a new branch
403 190 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
404 190 : // on the safe side.
405 190 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
406 190 : let max_lsn = self.get_last_record_lsn();
407 190 :
408 190 : // LSNs are always 8-byte aligned. low/mid/high represent the
409 190 : // LSN divided by 8.
410 190 : let mut low = min_lsn.0 / 8;
411 190 : let mut high = max_lsn.0 / 8 + 1;
412 190 :
413 190 : let mut found_smaller = false;
414 190 : let mut found_larger = false;
415 3353 : while low < high {
416 3164 : if cancel.is_cancelled() {
417 0 : return Err(PageReconstructError::Cancelled);
418 3164 : }
419 3164 : // cannot overflow, high and low are both smaller than u64::MAX / 2
420 3164 : let mid = (high + low) / 2;
421 :
422 3164 : let cmp = self
423 3164 : .is_latest_commit_timestamp_ge_than(
424 3164 : search_timestamp,
425 3164 : Lsn(mid * 8),
426 3164 : &mut found_smaller,
427 3164 : &mut found_larger,
428 3164 : ctx,
429 3164 : )
430 265761 : .await?;
431 :
432 3163 : if cmp {
433 889 : high = mid;
434 2274 : } else {
435 2274 : low = mid + 1;
436 2274 : }
437 : }
438 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
439 : // so the LSN of the last commit record before or at `search_timestamp`.
440 : // Remove one from `low` to get `t`.
441 : //
442 : // FIXME: it would be better to get the LSN of the previous commit.
443 : // Otherwise, if you restore to the returned LSN, the database will
444 : // include physical changes from later commits that will be marked
445 : // as aborted, and will need to be vacuumed away.
446 189 : let commit_lsn = Lsn((low - 1) * 8);
447 189 : match (found_smaller, found_larger) {
448 : (false, false) => {
449 : // This can happen if no commit records have been processed yet, e.g.
450 : // just after importing a cluster.
451 21 : Ok(LsnForTimestamp::NoData(min_lsn))
452 : }
453 : (false, true) => {
454 : // Didn't find any commit timestamps smaller than the request
455 23 : Ok(LsnForTimestamp::Past(min_lsn))
456 : }
457 : (true, false) => {
458 : // Only found commits with timestamps smaller than the request.
459 : // It's still a valid case for branch creation, return it.
460 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
461 : // case, anyway.
462 82 : Ok(LsnForTimestamp::Future(commit_lsn))
463 : }
464 63 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
465 : }
466 189 : }
467 :
468 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
469 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
470 : ///
471 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
472 : /// with a smaller/larger timestamp.
473 : ///
474 3164 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
475 3164 : &self,
476 3164 : search_timestamp: TimestampTz,
477 3164 : probe_lsn: Lsn,
478 3164 : found_smaller: &mut bool,
479 3164 : found_larger: &mut bool,
480 3164 : ctx: &RequestContext,
481 3164 : ) -> Result<bool, PageReconstructError> {
482 3164 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
483 2943 : if timestamp >= search_timestamp {
484 889 : *found_larger = true;
485 889 : return ControlFlow::Break(true);
486 2054 : } else {
487 2054 : *found_smaller = true;
488 2054 : }
489 2054 : ControlFlow::Continue(())
490 3164 : })
491 265761 : .await
492 3163 : }
493 :
494 : /// Obtain the possible timestamp range for the given lsn.
495 : ///
496 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
497 12 : pub(crate) async fn get_timestamp_for_lsn(
498 12 : &self,
499 12 : probe_lsn: Lsn,
500 12 : ctx: &RequestContext,
501 12 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
502 12 : let mut max: Option<TimestampTz> = None;
503 12 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
504 10 : if let Some(max_prev) = max {
505 0 : max = Some(max_prev.max(timestamp));
506 10 : } else {
507 10 : max = Some(timestamp);
508 10 : }
509 10 : ControlFlow::Continue(())
510 12 : })
511 81 : .await?;
512 :
513 10 : Ok(max)
514 12 : }
515 :
516 : /// Runs the given function on all the timestamps for a given lsn
517 : ///
518 : /// The return value is either given by the closure, or set to the `Default`
519 : /// impl's output.
520 3176 : async fn map_all_timestamps<T: Default>(
521 3176 : &self,
522 3176 : probe_lsn: Lsn,
523 3176 : ctx: &RequestContext,
524 3176 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
525 3176 : ) -> Result<T, PageReconstructError> {
526 3176 : for segno in self
527 3176 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
528 1509 : .await?
529 : {
530 3174 : let nblocks = self
531 3174 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
532 1457 : .await?;
533 3174 : for blknum in (0..nblocks).rev() {
534 3174 : let clog_page = self
535 3174 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
536 262876 : .await?;
537 :
538 3173 : if clog_page.len() == BLCKSZ as usize + 8 {
539 2953 : let mut timestamp_bytes = [0u8; 8];
540 2953 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
541 2953 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
542 2953 :
543 2953 : match f(timestamp) {
544 889 : ControlFlow::Break(b) => return Ok(b),
545 2064 : ControlFlow::Continue(()) => (),
546 : }
547 220 : }
548 : }
549 : }
550 2284 : Ok(Default::default())
551 3175 : }
552 :
553 609 : pub(crate) async fn get_slru_keyspace(
554 609 : &self,
555 609 : version: Version<'_>,
556 609 : ctx: &RequestContext,
557 609 : ) -> Result<KeySpace, PageReconstructError> {
558 609 : let mut accum = KeySpaceAccum::new();
559 :
560 2418 : for kind in SlruKind::iter() {
561 1815 : let mut segments: Vec<u32> = self
562 1815 : .list_slru_segments(kind, version, ctx)
563 185 : .await?
564 1809 : .into_iter()
565 1809 : .collect();
566 1809 : segments.sort_unstable();
567 :
568 3654 : for seg in segments {
569 1845 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
570 :
571 1845 : accum.add_range(
572 1845 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
573 1845 : );
574 : }
575 : }
576 :
577 603 : Ok(accum.to_keyspace())
578 609 : }
579 :
580 : /// Get a list of SLRU segments
581 4993 : pub(crate) async fn list_slru_segments(
582 4993 : &self,
583 4993 : kind: SlruKind,
584 4993 : version: Version<'_>,
585 4993 : ctx: &RequestContext,
586 4993 : ) -> Result<HashSet<u32>, PageReconstructError> {
587 4993 : // fetch directory entry
588 4993 : let key = slru_dir_to_key(kind);
589 :
590 4993 : let buf = version.get(self, key, ctx).await?;
591 4985 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
592 4985 : Ok(dir) => Ok(dir.segments),
593 0 : Err(e) => Err(PageReconstructError::from(e)),
594 : }
595 4993 : }
596 :
597 2446 : pub(crate) async fn get_relmap_file(
598 2446 : &self,
599 2446 : spcnode: Oid,
600 2446 : dbnode: Oid,
601 2446 : version: Version<'_>,
602 2446 : ctx: &RequestContext,
603 2446 : ) -> Result<Bytes, PageReconstructError> {
604 2446 : let key = relmap_file_key(spcnode, dbnode);
605 :
606 2446 : let buf = version.get(self, key, ctx).await?;
607 2446 : Ok(buf)
608 2446 : }
609 :
610 603 : pub(crate) async fn list_dbdirs(
611 603 : &self,
612 603 : lsn: Lsn,
613 603 : ctx: &RequestContext,
614 603 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
615 : // fetch directory entry
616 603 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
617 :
618 603 : match DbDirectory::des(&buf).context("deserialization failure") {
619 603 : Ok(dir) => Ok(dir.dbdirs),
620 0 : Err(e) => Err(PageReconstructError::from(e)),
621 : }
622 603 : }
623 :
624 2 : pub(crate) async fn get_twophase_file(
625 2 : &self,
626 2 : xid: TransactionId,
627 2 : lsn: Lsn,
628 2 : ctx: &RequestContext,
629 2 : ) -> Result<Bytes, PageReconstructError> {
630 2 : let key = twophase_file_key(xid);
631 2 : let buf = self.get(key, lsn, ctx).await?;
632 2 : Ok(buf)
633 2 : }
634 :
635 603 : pub(crate) async fn list_twophase_files(
636 603 : &self,
637 603 : lsn: Lsn,
638 603 : ctx: &RequestContext,
639 603 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
640 : // fetch directory entry
641 603 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
642 :
643 603 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
644 603 : Ok(dir) => Ok(dir.xids),
645 0 : Err(e) => Err(PageReconstructError::from(e)),
646 : }
647 603 : }
648 :
649 597 : pub(crate) async fn get_control_file(
650 597 : &self,
651 597 : lsn: Lsn,
652 597 : ctx: &RequestContext,
653 597 : ) -> Result<Bytes, PageReconstructError> {
654 597 : self.get(CONTROLFILE_KEY, lsn, ctx).await
655 597 : }
656 :
657 1943 : pub(crate) async fn get_checkpoint(
658 1943 : &self,
659 1943 : lsn: Lsn,
660 1943 : ctx: &RequestContext,
661 1943 : ) -> Result<Bytes, PageReconstructError> {
662 1943 : self.get(CHECKPOINT_KEY, lsn, ctx).await
663 1943 : }
664 :
665 2424 : pub(crate) async fn list_aux_files(
666 2424 : &self,
667 2424 : lsn: Lsn,
668 2424 : ctx: &RequestContext,
669 2424 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
670 2424 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
671 1436 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
672 1436 : Ok(dir) => Ok(dir.files),
673 0 : Err(e) => Err(PageReconstructError::from(e)),
674 : },
675 988 : Err(e) => {
676 : // This is expected: historical databases do not have the key.
677 0 : debug!("Failed to get info about AUX files: {}", e);
678 988 : Ok(HashMap::new())
679 : }
680 : }
681 2424 : }
682 :
683 : /// Does the same as get_current_logical_size but counted on demand.
684 : /// Used to initialize the logical size tracking on startup.
685 : ///
686 : /// Only relation blocks are counted currently. That excludes metadata,
687 : /// SLRUs, twophase files etc.
688 : ///
689 : /// # Cancel-Safety
690 : ///
691 : /// This method is cancellation-safe.
692 695 : pub async fn get_current_logical_size_non_incremental(
693 695 : &self,
694 695 : lsn: Lsn,
695 695 : ctx: &RequestContext,
696 695 : ) -> Result<u64, CalculateLogicalSizeError> {
697 695 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
698 :
699 : // Fetch list of database dirs and iterate them
700 943 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
701 687 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
702 :
703 687 : let mut total_size: u64 = 0;
704 2721 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
705 630374 : for rel in self
706 2721 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
707 600 : .await?
708 : {
709 630374 : if self.cancel.is_cancelled() {
710 2 : return Err(CalculateLogicalSizeError::Cancelled);
711 630372 : }
712 630372 : let relsize_key = rel_size_to_key(rel);
713 630372 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
714 630340 : let relsize = buf.get_u32_le();
715 630340 :
716 630340 : total_size += relsize as u64;
717 : }
718 : }
719 652 : Ok(total_size * BLCKSZ as u64)
720 685 : }
721 :
722 : ///
723 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
724 : /// Anything that's not listed maybe removed from the underlying storage (from
725 : /// that LSN forwards).
726 893 : pub(crate) async fn collect_keyspace(
727 893 : &self,
728 893 : lsn: Lsn,
729 893 : ctx: &RequestContext,
730 893 : ) -> Result<KeySpace, CollectKeySpaceError> {
731 893 : // Iterate through key ranges, greedily packing them into partitions
732 893 : let mut result = KeySpaceAccum::new();
733 893 :
734 893 : // The dbdir metadata always exists
735 893 : result.add_key(DBDIR_KEY);
736 :
737 : // Fetch list of database dirs and iterate them
738 2300 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
739 891 : let dbdir = DbDirectory::des(&buf)?;
740 :
741 891 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
742 891 : dbs.sort_unstable();
743 3852 : for (spcnode, dbnode) in dbs {
744 2963 : result.add_key(relmap_file_key(spcnode, dbnode));
745 2963 : result.add_key(rel_dir_to_key(spcnode, dbnode));
746 :
747 2963 : let mut rels: Vec<RelTag> = self
748 2963 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
749 785 : .await?
750 2963 : .into_iter()
751 2963 : .collect();
752 2963 : rels.sort_unstable();
753 715757 : for rel in rels {
754 712796 : let relsize_key = rel_size_to_key(rel);
755 712796 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
756 712794 : let relsize = buf.get_u32_le();
757 712794 :
758 712794 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
759 712794 : result.add_key(relsize_key);
760 : }
761 : }
762 :
763 : // Iterate SLRUs next
764 2667 : for kind in [
765 889 : SlruKind::Clog,
766 889 : SlruKind::MultiXactMembers,
767 889 : SlruKind::MultiXactOffsets,
768 : ] {
769 2667 : let slrudir_key = slru_dir_to_key(kind);
770 2667 : result.add_key(slrudir_key);
771 8753 : let buf = self.get(slrudir_key, lsn, ctx).await?;
772 2667 : let dir = SlruSegmentDirectory::des(&buf)?;
773 2667 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
774 2667 : segments.sort_unstable();
775 4847 : for segno in segments {
776 2180 : let segsize_key = slru_segment_size_to_key(kind, segno);
777 2180 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
778 2180 : let segsize = buf.get_u32_le();
779 2180 :
780 2180 : result.add_range(
781 2180 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
782 2180 : );
783 2180 : result.add_key(segsize_key);
784 : }
785 : }
786 :
787 : // Then pg_twophase
788 889 : result.add_key(TWOPHASEDIR_KEY);
789 3256 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
790 889 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
791 889 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
792 889 : xids.sort_unstable();
793 889 : for xid in xids {
794 0 : result.add_key(twophase_file_key(xid));
795 0 : }
796 :
797 889 : result.add_key(CONTROLFILE_KEY);
798 889 : result.add_key(CHECKPOINT_KEY);
799 889 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
800 621 : result.add_key(AUX_FILES_KEY);
801 621 : }
802 889 : Ok(result.to_keyspace())
803 892 : }
804 :
805 : /// Get cached size of relation if it not updated after specified LSN
806 63432140 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
807 63432140 : let rel_size_cache = self.rel_size_cache.read().unwrap();
808 63432140 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
809 63075245 : if lsn >= *cached_lsn {
810 62770501 : return Some(*nblocks);
811 304744 : }
812 356895 : }
813 661639 : None
814 63432140 : }
815 :
816 : /// Update cached relation size if there is no more recent update
817 155970 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
818 155970 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
819 155970 : match rel_size_cache.entry(tag) {
820 130012 : hash_map::Entry::Occupied(mut entry) => {
821 130012 : let cached_lsn = entry.get_mut();
822 130012 : if lsn >= cached_lsn.0 {
823 19 : *cached_lsn = (lsn, nblocks);
824 129993 : }
825 : }
826 25958 : hash_map::Entry::Vacant(entry) => {
827 25958 : entry.insert((lsn, nblocks));
828 25958 : }
829 : }
830 155970 : }
831 :
832 : /// Store cached relation size
833 1962230 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
834 1962230 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
835 1962230 : rel_size_cache.insert(tag, (lsn, nblocks));
836 1962230 : }
837 :
838 : /// Remove cached relation size
839 67233 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
840 67233 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
841 67233 : rel_size_cache.remove(tag);
842 67233 : }
843 : }
844 :
845 : /// DatadirModification represents an operation to ingest an atomic set of
846 : /// updates to the repository. It is created by the 'begin_record'
847 : /// function. It is called for each WAL record, so that all the modifications
848 : /// by a one WAL record appear atomic.
849 : pub struct DatadirModification<'a> {
850 : /// The timeline this modification applies to. You can access this to
851 : /// read the state, but note that any pending updates are *not* reflected
852 : /// in the state in 'tline' yet.
853 : pub tline: &'a Timeline,
854 :
855 : /// Current LSN of the modification
856 : lsn: Lsn,
857 :
858 : // The modifications are not applied directly to the underlying key-value store.
859 : // The put-functions add the modifications here, and they are flushed to the
860 : // underlying key-value store by the 'finish' function.
861 : pending_lsns: Vec<Lsn>,
862 : pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
863 : pending_deletions: Vec<(Range<Key>, Lsn)>,
864 : pending_nblocks: i64,
865 : }
866 :
867 : impl<'a> DatadirModification<'a> {
868 : /// Get the current lsn
869 57214349 : pub(crate) fn get_lsn(&self) -> Lsn {
870 57214349 : self.lsn
871 57214349 : }
872 :
873 : /// Set the current lsn
874 73215430 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
875 73215430 : ensure!(
876 73215430 : lsn >= self.lsn,
877 0 : "setting an older lsn {} than {} is not allowed",
878 : lsn,
879 : self.lsn
880 : );
881 73215430 : if lsn > self.lsn {
882 73215430 : self.pending_lsns.push(self.lsn);
883 73215430 : self.lsn = lsn;
884 73215430 : }
885 73215430 : Ok(())
886 73215430 : }
887 :
888 : /// Initialize a completely new repository.
889 : ///
890 : /// This inserts the directory metadata entries that are assumed to
891 : /// always exist.
892 675 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
893 675 : let buf = DbDirectory::ser(&DbDirectory {
894 675 : dbdirs: HashMap::new(),
895 675 : })?;
896 675 : self.put(DBDIR_KEY, Value::Image(buf.into()));
897 675 :
898 675 : // Create AuxFilesDirectory
899 675 : self.init_aux_dir()?;
900 :
901 675 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
902 675 : xids: HashSet::new(),
903 675 : })?;
904 675 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
905 :
906 675 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
907 675 : let empty_dir = Value::Image(buf);
908 675 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
909 675 : self.put(
910 675 : slru_dir_to_key(SlruKind::MultiXactMembers),
911 675 : empty_dir.clone(),
912 675 : );
913 675 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
914 675 :
915 675 : Ok(())
916 675 : }
917 :
918 : #[cfg(test)]
919 70 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
920 70 : self.init_empty()?;
921 70 : self.put_control_file(bytes::Bytes::from_static(
922 70 : b"control_file contents do not matter",
923 70 : ))
924 70 : .context("put_control_file")?;
925 70 : self.put_checkpoint(bytes::Bytes::from_static(
926 70 : b"checkpoint_file contents do not matter",
927 70 : ))
928 70 : .context("put_checkpoint_file")?;
929 70 : Ok(())
930 70 : }
931 :
932 : /// Put a new page version that can be constructed from a WAL record
933 : ///
934 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
935 : /// current end-of-file. It's up to the caller to check that the relation size
936 : /// matches the blocks inserted!
937 53273693 : pub fn put_rel_wal_record(
938 53273693 : &mut self,
939 53273693 : rel: RelTag,
940 53273693 : blknum: BlockNumber,
941 53273693 : rec: NeonWalRecord,
942 53273693 : ) -> anyhow::Result<()> {
943 53273693 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
944 53273693 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
945 53273693 : Ok(())
946 53273693 : }
947 :
948 : // Same, but for an SLRU.
949 6163782 : pub fn put_slru_wal_record(
950 6163782 : &mut self,
951 6163782 : kind: SlruKind,
952 6163782 : segno: u32,
953 6163782 : blknum: BlockNumber,
954 6163782 : rec: NeonWalRecord,
955 6163782 : ) -> anyhow::Result<()> {
956 6163782 : self.put(
957 6163782 : slru_block_to_key(kind, segno, blknum),
958 6163782 : Value::WalRecord(rec),
959 6163782 : );
960 6163782 : Ok(())
961 6163782 : }
962 :
963 : /// Like put_wal_record, but with ready-made image of the page.
964 2579137 : pub fn put_rel_page_image(
965 2579137 : &mut self,
966 2579137 : rel: RelTag,
967 2579137 : blknum: BlockNumber,
968 2579137 : img: Bytes,
969 2579137 : ) -> anyhow::Result<()> {
970 2579137 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
971 2579137 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
972 2579137 : Ok(())
973 2579137 : }
974 :
975 3160 : pub fn put_slru_page_image(
976 3160 : &mut self,
977 3160 : kind: SlruKind,
978 3160 : segno: u32,
979 3160 : blknum: BlockNumber,
980 3160 : img: Bytes,
981 3160 : ) -> anyhow::Result<()> {
982 3160 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
983 3160 : Ok(())
984 3160 : }
985 :
986 : /// Store a relmapper file (pg_filenode.map) in the repository
987 2510 : pub async fn put_relmap_file(
988 2510 : &mut self,
989 2510 : spcnode: Oid,
990 2510 : dbnode: Oid,
991 2510 : img: Bytes,
992 2510 : ctx: &RequestContext,
993 2510 : ) -> anyhow::Result<()> {
994 : // Add it to the directory (if it doesn't exist already)
995 2510 : let buf = self.get(DBDIR_KEY, ctx).await?;
996 2510 : let mut dbdir = DbDirectory::des(&buf)?;
997 :
998 2510 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
999 2510 : if r.is_none() || r == Some(false) {
1000 : // The dbdir entry didn't exist, or it contained a
1001 : // 'false'. The 'insert' call already updated it with
1002 : // 'true', now write the updated 'dbdirs' map back.
1003 2448 : let buf = DbDirectory::ser(&dbdir)?;
1004 2448 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1005 2448 :
1006 2448 : // Create AuxFilesDirectory as well
1007 2448 : self.init_aux_dir()?;
1008 62 : }
1009 2510 : if r.is_none() {
1010 36 : // Create RelDirectory
1011 36 : let buf = RelDirectory::ser(&RelDirectory {
1012 36 : rels: HashSet::new(),
1013 36 : })?;
1014 36 : self.put(
1015 36 : rel_dir_to_key(spcnode, dbnode),
1016 36 : Value::Image(Bytes::from(buf)),
1017 36 : );
1018 2474 : }
1019 :
1020 2510 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1021 2510 : Ok(())
1022 2510 : }
1023 :
1024 4 : pub async fn put_twophase_file(
1025 4 : &mut self,
1026 4 : xid: TransactionId,
1027 4 : img: Bytes,
1028 4 : ctx: &RequestContext,
1029 4 : ) -> anyhow::Result<()> {
1030 : // Add it to the directory entry
1031 4 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1032 4 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1033 4 : if !dir.xids.insert(xid) {
1034 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1035 4 : }
1036 4 : self.put(
1037 4 : TWOPHASEDIR_KEY,
1038 4 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1039 : );
1040 :
1041 4 : self.put(twophase_file_key(xid), Value::Image(img));
1042 4 : Ok(())
1043 4 : }
1044 :
1045 673 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1046 673 : self.put(CONTROLFILE_KEY, Value::Image(img));
1047 673 : Ok(())
1048 673 : }
1049 :
1050 34046 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1051 34046 : self.put(CHECKPOINT_KEY, Value::Image(img));
1052 34046 : Ok(())
1053 34046 : }
1054 :
1055 3 : pub async fn drop_dbdir(
1056 3 : &mut self,
1057 3 : spcnode: Oid,
1058 3 : dbnode: Oid,
1059 3 : ctx: &RequestContext,
1060 3 : ) -> anyhow::Result<()> {
1061 3 : let total_blocks = self
1062 3 : .tline
1063 3 : .get_db_size(spcnode, dbnode, Version::Modified(self), Lsn::MAX, ctx)
1064 0 : .await?;
1065 :
1066 : // Remove entry from dbdir
1067 3 : let buf = self.get(DBDIR_KEY, ctx).await?;
1068 3 : let mut dir = DbDirectory::des(&buf)?;
1069 3 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1070 3 : let buf = DbDirectory::ser(&dir)?;
1071 3 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1072 : } else {
1073 0 : warn!(
1074 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1075 0 : spcnode, dbnode
1076 0 : );
1077 : }
1078 :
1079 : // Update logical database size.
1080 3 : self.pending_nblocks -= total_blocks as i64;
1081 3 :
1082 3 : // Delete all relations and metadata files for the spcnode/dnode
1083 3 : self.delete(dbdir_key_range(spcnode, dbnode));
1084 3 : Ok(())
1085 3 : }
1086 :
1087 : /// Create a relation fork.
1088 : ///
1089 : /// 'nblocks' is the initial size.
1090 648391 : pub async fn put_rel_creation(
1091 648391 : &mut self,
1092 648391 : rel: RelTag,
1093 648391 : nblocks: BlockNumber,
1094 648391 : ctx: &RequestContext,
1095 648391 : ) -> Result<(), RelationError> {
1096 648391 : if rel.relnode == 0 {
1097 0 : return Err(RelationError::InvalidRelnode);
1098 648391 : }
1099 : // It's possible that this is the first rel for this db in this
1100 : // tablespace. Create the reldir entry for it if so.
1101 648391 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1102 648391 : .context("deserialize db")?;
1103 648391 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1104 648391 : let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
1105 : // Didn't exist. Update dbdir
1106 2417 : dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
1107 2417 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1108 2417 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1109 2417 :
1110 2417 : // and create the RelDirectory
1111 2417 : RelDirectory::default()
1112 : } else {
1113 : // reldir already exists, fetch it
1114 645974 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1115 645974 : .context("deserialize db")?
1116 : };
1117 :
1118 : // Add the new relation to the rel directory entry, and write it back
1119 648391 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1120 0 : return Err(RelationError::AlreadyExists);
1121 648391 : }
1122 648391 : self.put(
1123 648391 : rel_dir_key,
1124 648391 : Value::Image(Bytes::from(
1125 648391 : RelDirectory::ser(&rel_dir).context("serialize")?,
1126 : )),
1127 : );
1128 :
1129 : // Put size
1130 648391 : let size_key = rel_size_to_key(rel);
1131 648391 : let buf = nblocks.to_le_bytes();
1132 648391 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1133 648391 :
1134 648391 : self.pending_nblocks += nblocks as i64;
1135 648391 :
1136 648391 : // Update relation size cache
1137 648391 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1138 648391 :
1139 648391 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1140 648391 : // caller.
1141 648391 : Ok(())
1142 648391 : }
1143 :
1144 : /// Truncate relation
1145 6322 : pub async fn put_rel_truncation(
1146 6322 : &mut self,
1147 6322 : rel: RelTag,
1148 6322 : nblocks: BlockNumber,
1149 6322 : ctx: &RequestContext,
1150 6322 : ) -> anyhow::Result<()> {
1151 6322 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1152 6322 : if self
1153 6322 : .tline
1154 6322 : .get_rel_exists(rel, Version::Modified(self), Lsn::MAX, ctx)
1155 0 : .await?
1156 : {
1157 6322 : let size_key = rel_size_to_key(rel);
1158 : // Fetch the old size first
1159 6322 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1160 6322 :
1161 6322 : // Update the entry with the new size.
1162 6322 : let buf = nblocks.to_le_bytes();
1163 6322 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1164 6322 :
1165 6322 : // Update relation size cache
1166 6322 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1167 6322 :
1168 6322 : // Update relation size cache
1169 6322 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1170 6322 :
1171 6322 : // Update logical database size.
1172 6322 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1173 0 : }
1174 6322 : Ok(())
1175 6322 : }
1176 :
1177 : /// Extend relation
1178 : /// If new size is smaller, do nothing.
1179 1863650 : pub async fn put_rel_extend(
1180 1863650 : &mut self,
1181 1863650 : rel: RelTag,
1182 1863650 : nblocks: BlockNumber,
1183 1863650 : ctx: &RequestContext,
1184 1863650 : ) -> anyhow::Result<()> {
1185 1863650 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1186 :
1187 : // Put size
1188 1863650 : let size_key = rel_size_to_key(rel);
1189 1863650 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1190 1863647 :
1191 1863647 : // only extend relation here. never decrease the size
1192 1863647 : if nblocks > old_size {
1193 1301195 : let buf = nblocks.to_le_bytes();
1194 1301195 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1195 1301195 :
1196 1301195 : // Update relation size cache
1197 1301195 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1198 1301195 :
1199 1301195 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1200 1301195 : }
1201 1863647 : Ok(())
1202 1863650 : }
1203 :
1204 : /// Drop a relation.
1205 67233 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1206 67233 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1207 :
1208 : // Remove it from the directory entry
1209 67233 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1210 67233 : let buf = self.get(dir_key, ctx).await?;
1211 67233 : let mut dir = RelDirectory::des(&buf)?;
1212 :
1213 67233 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1214 67233 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1215 : } else {
1216 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1217 : }
1218 :
1219 : // update logical size
1220 67233 : let size_key = rel_size_to_key(rel);
1221 67233 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1222 67233 : self.pending_nblocks -= old_size as i64;
1223 67233 :
1224 67233 : // Remove enty from relation size cache
1225 67233 : self.tline.remove_cached_rel_size(&rel);
1226 67233 :
1227 67233 : // Delete size entry, as well as all blocks
1228 67233 : self.delete(rel_key_range(rel));
1229 67233 :
1230 67233 : Ok(())
1231 67233 : }
1232 :
1233 1848 : pub async fn put_slru_segment_creation(
1234 1848 : &mut self,
1235 1848 : kind: SlruKind,
1236 1848 : segno: u32,
1237 1848 : nblocks: BlockNumber,
1238 1848 : ctx: &RequestContext,
1239 1848 : ) -> anyhow::Result<()> {
1240 1848 : // Add it to the directory entry
1241 1848 : let dir_key = slru_dir_to_key(kind);
1242 1848 : let buf = self.get(dir_key, ctx).await?;
1243 1848 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1244 :
1245 1848 : if !dir.segments.insert(segno) {
1246 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1247 1848 : }
1248 1848 : self.put(
1249 1848 : dir_key,
1250 1848 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1251 : );
1252 :
1253 : // Put size
1254 1848 : let size_key = slru_segment_size_to_key(kind, segno);
1255 1848 : let buf = nblocks.to_le_bytes();
1256 1848 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1257 1848 :
1258 1848 : // even if nblocks > 0, we don't insert any actual blocks here
1259 1848 :
1260 1848 : Ok(())
1261 1848 : }
1262 :
1263 : /// Extend SLRU segment
1264 1315 : pub fn put_slru_extend(
1265 1315 : &mut self,
1266 1315 : kind: SlruKind,
1267 1315 : segno: u32,
1268 1315 : nblocks: BlockNumber,
1269 1315 : ) -> anyhow::Result<()> {
1270 1315 : // Put size
1271 1315 : let size_key = slru_segment_size_to_key(kind, segno);
1272 1315 : let buf = nblocks.to_le_bytes();
1273 1315 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1274 1315 : Ok(())
1275 1315 : }
1276 :
1277 : /// This method is used for marking truncated SLRU files
1278 10 : pub async fn drop_slru_segment(
1279 10 : &mut self,
1280 10 : kind: SlruKind,
1281 10 : segno: u32,
1282 10 : ctx: &RequestContext,
1283 10 : ) -> anyhow::Result<()> {
1284 10 : // Remove it from the directory entry
1285 10 : let dir_key = slru_dir_to_key(kind);
1286 10 : let buf = self.get(dir_key, ctx).await?;
1287 10 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1288 :
1289 10 : if !dir.segments.remove(&segno) {
1290 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1291 10 : }
1292 10 : self.put(
1293 10 : dir_key,
1294 10 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1295 : );
1296 :
1297 : // Delete size entry, as well as all blocks
1298 10 : self.delete(slru_segment_key_range(kind, segno));
1299 10 :
1300 10 : Ok(())
1301 10 : }
1302 :
1303 : /// Drop a relmapper file (pg_filenode.map)
1304 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1305 0 : // TODO
1306 0 : Ok(())
1307 0 : }
1308 :
1309 : /// This method is used for marking truncated SLRU files
1310 2 : pub async fn drop_twophase_file(
1311 2 : &mut self,
1312 2 : xid: TransactionId,
1313 2 : ctx: &RequestContext,
1314 2 : ) -> anyhow::Result<()> {
1315 : // Remove it from the directory entry
1316 2 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1317 2 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1318 :
1319 2 : if !dir.xids.remove(&xid) {
1320 0 : warn!("twophase file for xid {} does not exist", xid);
1321 2 : }
1322 2 : self.put(
1323 2 : TWOPHASEDIR_KEY,
1324 2 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1325 : );
1326 :
1327 : // Delete it
1328 2 : self.delete(twophase_key_range(xid));
1329 2 :
1330 2 : Ok(())
1331 2 : }
1332 :
1333 3123 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1334 3123 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1335 3123 : files: HashMap::new(),
1336 3123 : })?;
1337 3123 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1338 3123 : Ok(())
1339 3123 : }
1340 :
1341 117 : pub async fn put_file(
1342 117 : &mut self,
1343 117 : path: &str,
1344 117 : content: &[u8],
1345 117 : ctx: &RequestContext,
1346 117 : ) -> anyhow::Result<()> {
1347 117 : let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
1348 114 : Ok(buf) => AuxFilesDirectory::des(&buf)?,
1349 3 : Err(e) => {
1350 : // This is expected: historical databases do not have the key.
1351 0 : debug!("Failed to get info about AUX files: {}", e);
1352 3 : AuxFilesDirectory {
1353 3 : files: HashMap::new(),
1354 3 : }
1355 : }
1356 : };
1357 117 : let path = path.to_string();
1358 117 : if content.is_empty() {
1359 5 : dir.files.remove(&path);
1360 112 : } else {
1361 112 : dir.files.insert(path, Bytes::copy_from_slice(content));
1362 112 : }
1363 117 : self.put(
1364 117 : AUX_FILES_KEY,
1365 117 : Value::Image(Bytes::from(
1366 117 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1367 : )),
1368 : );
1369 117 : Ok(())
1370 117 : }
1371 :
1372 : ///
1373 : /// Flush changes accumulated so far to the underlying repository.
1374 : ///
1375 : /// Usually, changes made in DatadirModification are atomic, but this allows
1376 : /// you to flush them to the underlying repository before the final `commit`.
1377 : /// That allows to free up the memory used to hold the pending changes.
1378 : ///
1379 : /// Currently only used during bulk import of a data directory. In that
1380 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1381 : /// whole import fails and the timeline will be deleted anyway.
1382 : /// (Or to be precise, it will be left behind for debugging purposes and
1383 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1384 : ///
1385 : /// Note: A consequence of flushing the pending operations is that they
1386 : /// won't be visible to subsequent operations until `commit`. The function
1387 : /// retains all the metadata, but data pages are flushed. That's again OK
1388 : /// for bulk import, where you are just loading data pages and won't try to
1389 : /// modify the same pages twice.
1390 573331 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1391 573331 : // Unless we have accumulated a decent amount of changes, it's not worth it
1392 573331 : // to scan through the pending_updates list.
1393 573331 : let pending_nblocks = self.pending_nblocks;
1394 573331 : if pending_nblocks < 10000 {
1395 573331 : return Ok(());
1396 0 : }
1397 :
1398 0 : let writer = self.tline.writer().await;
1399 :
1400 : // Flush relation and SLRU data blocks, keep metadata.
1401 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1402 0 : for (key, values) in self.pending_updates.drain() {
1403 0 : for (lsn, value) in values {
1404 0 : if is_rel_block_key(&key) || is_slru_block_key(key) {
1405 : // This bails out on first error without modifying pending_updates.
1406 : // That's Ok, cf this function's doc comment.
1407 0 : writer.put(key, lsn, &value, ctx).await?;
1408 0 : } else {
1409 0 : retained_pending_updates
1410 0 : .entry(key)
1411 0 : .or_default()
1412 0 : .push((lsn, value));
1413 0 : }
1414 : }
1415 : }
1416 :
1417 0 : self.pending_updates = retained_pending_updates;
1418 0 :
1419 0 : if pending_nblocks != 0 {
1420 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1421 0 : self.pending_nblocks = 0;
1422 0 : }
1423 :
1424 0 : Ok(())
1425 573331 : }
1426 :
1427 : ///
1428 : /// Finish this atomic update, writing all the updated keys to the
1429 : /// underlying timeline.
1430 : /// All the modifications in this atomic update are stamped by the specified LSN.
1431 : ///
1432 2141052 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1433 2141052 : let writer = self.tline.writer().await;
1434 :
1435 2141052 : let pending_nblocks = self.pending_nblocks;
1436 2141052 : self.pending_nblocks = 0;
1437 2141052 :
1438 2141052 : if !self.pending_updates.is_empty() {
1439 1718651 : writer.put_batch(&self.pending_updates, ctx).await?;
1440 1718650 : self.pending_updates.clear();
1441 422401 : }
1442 :
1443 2141051 : if !self.pending_deletions.is_empty() {
1444 19216 : writer.delete_batch(&self.pending_deletions).await?;
1445 19216 : self.pending_deletions.clear();
1446 2121835 : }
1447 :
1448 2141051 : self.pending_lsns.push(self.lsn);
1449 75356216 : for pending_lsn in self.pending_lsns.drain(..) {
1450 75356216 : // Ideally, we should be able to call writer.finish_write() only once
1451 75356216 : // with the highest LSN. However, the last_record_lsn variable in the
1452 75356216 : // timeline keeps track of the latest LSN and the immediate previous LSN
1453 75356216 : // so we need to record every LSN to not leave a gap between them.
1454 75356216 : writer.finish_write(pending_lsn);
1455 75356216 : }
1456 :
1457 2141051 : if pending_nblocks != 0 {
1458 642421 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1459 1498630 : }
1460 :
1461 2141051 : Ok(())
1462 2141051 : }
1463 :
1464 146430857 : pub(crate) fn len(&self) -> usize {
1465 146430857 : self.pending_updates.len() + self.pending_deletions.len()
1466 146430857 : }
1467 :
1468 : // Internal helper functions to batch the modifications
1469 :
1470 3527034 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1471 : // Have we already updated the same key? Read the latest pending updated
1472 : // version in that case.
1473 : //
1474 : // Note: we don't check pending_deletions. It is an error to request a
1475 : // value that has been removed, deletion only avoids leaking storage.
1476 3527034 : if let Some(values) = self.pending_updates.get(&key) {
1477 2651066 : if let Some((_, value)) = values.last() {
1478 2651066 : return if let Value::Image(img) = value {
1479 2651066 : Ok(img.clone())
1480 : } else {
1481 : // Currently, we never need to read back a WAL record that we
1482 : // inserted in the same "transaction". All the metadata updates
1483 : // work directly with Images, and we never need to read actual
1484 : // data pages. We could handle this if we had to, by calling
1485 : // the walredo manager, but let's keep it simple for now.
1486 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1487 0 : "unexpected pending WAL record"
1488 0 : )))
1489 : };
1490 0 : }
1491 875968 : }
1492 875968 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1493 875968 : self.tline.get(key, lsn, ctx).await
1494 3527034 : }
1495 :
1496 64745083 : fn put(&mut self, key: Key, val: Value) {
1497 64745083 : let values = self.pending_updates.entry(key).or_default();
1498 : // Replace the previous value if it exists at the same lsn
1499 64745083 : if let Some((last_lsn, last_value)) = values.last_mut() {
1500 54114977 : if *last_lsn == self.lsn {
1501 644317 : *last_value = val;
1502 644317 : return;
1503 53470660 : }
1504 10630106 : }
1505 64100766 : values.push((self.lsn, val));
1506 64745083 : }
1507 :
1508 67248 : fn delete(&mut self, key_range: Range<Key>) {
1509 67248 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1510 67248 : self.pending_deletions.push((key_range, self.lsn));
1511 67248 : }
1512 : }
1513 :
1514 : /// This struct facilitates accessing either a committed key from the timeline at a
1515 : /// specific LSN, or the latest uncommitted key from a pending modification.
1516 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1517 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1518 : /// need to look up the keys in the modification first before looking them up in the
1519 : /// timeline to not miss the latest updates.
1520 0 : #[derive(Clone, Copy)]
1521 : pub enum Version<'a> {
1522 : Lsn(Lsn),
1523 : Modified(&'a DatadirModification<'a>),
1524 : }
1525 :
1526 : impl<'a> Version<'a> {
1527 6173459 : async fn get(
1528 6173459 : &self,
1529 6173459 : timeline: &Timeline,
1530 6173459 : key: Key,
1531 6173459 : ctx: &RequestContext,
1532 6173459 : ) -> Result<Bytes, PageReconstructError> {
1533 6173459 : match self {
1534 5949722 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1535 223737 : Version::Modified(modification) => modification.get(key, ctx).await,
1536 : }
1537 6173459 : }
1538 :
1539 6373761 : fn get_lsn(&self) -> Lsn {
1540 6373761 : match self {
1541 5864439 : Version::Lsn(lsn) => *lsn,
1542 509322 : Version::Modified(modification) => modification.lsn,
1543 : }
1544 6373761 : }
1545 : }
1546 :
1547 : //--- Metadata structs stored in key-value pairs in the repository.
1548 :
1549 653085 : #[derive(Debug, Serialize, Deserialize)]
1550 : struct DbDirectory {
1551 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1552 : dbdirs: HashMap<(Oid, Oid), bool>,
1553 : }
1554 :
1555 1498 : #[derive(Debug, Serialize, Deserialize)]
1556 : struct TwoPhaseDirectory {
1557 : xids: HashSet<TransactionId>,
1558 : }
1559 :
1560 1431320 : #[derive(Debug, Serialize, Deserialize, Default)]
1561 : struct RelDirectory {
1562 : // Set of relations that exist. (relfilenode, forknum)
1563 : //
1564 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1565 : // key-value pairs, if you have a lot of relations
1566 : rels: HashSet<(Oid, u8)>,
1567 : }
1568 :
1569 6480 : #[derive(Debug, Serialize, Deserialize, Default)]
1570 : struct AuxFilesDirectory {
1571 : files: HashMap<String, Bytes>,
1572 : }
1573 :
1574 0 : #[derive(Debug, Serialize, Deserialize)]
1575 : struct RelSizeEntry {
1576 : nblocks: u32,
1577 : }
1578 :
1579 10849 : #[derive(Debug, Serialize, Deserialize, Default)]
1580 : struct SlruSegmentDirectory {
1581 : // Set of SLRU segments that exist.
1582 : segments: HashSet<u32>,
1583 : }
1584 :
1585 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1586 :
1587 : #[allow(clippy::bool_assert_comparison)]
1588 : #[cfg(test)]
1589 : mod tests {
1590 : //use super::repo_harness::*;
1591 : //use super::*;
1592 :
1593 : /*
1594 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1595 : let incremental = timeline.get_current_logical_size();
1596 : let non_incremental = timeline
1597 : .get_current_logical_size_non_incremental(lsn)
1598 : .unwrap();
1599 : assert_eq!(incremental, non_incremental);
1600 : }
1601 : */
1602 :
1603 : /*
1604 : ///
1605 : /// Test list_rels() function, with branches and dropped relations
1606 : ///
1607 : #[test]
1608 : fn test_list_rels_drop() -> Result<()> {
1609 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1610 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1611 : const TESTDB: u32 = 111;
1612 :
1613 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1614 : // after branching fails below
1615 : let mut writer = tline.begin_record(Lsn(0x10));
1616 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1617 : writer.finish()?;
1618 :
1619 : // Create a relation on the timeline
1620 : let mut writer = tline.begin_record(Lsn(0x20));
1621 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1622 : writer.finish()?;
1623 :
1624 : let writer = tline.begin_record(Lsn(0x00));
1625 : writer.finish()?;
1626 :
1627 : // Check that list_rels() lists it after LSN 2, but no before it
1628 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1629 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
1630 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
1631 :
1632 : // Create a branch, check that the relation is visible there
1633 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
1634 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
1635 : Some(timeline) => timeline,
1636 : None => panic!("Should have a local timeline"),
1637 : };
1638 : let newtline = DatadirTimelineImpl::new(newtline);
1639 : assert!(newtline
1640 : .list_rels(0, TESTDB, Lsn(0x30))?
1641 : .contains(&TESTREL_A));
1642 :
1643 : // Drop it on the branch
1644 : let mut new_writer = newtline.begin_record(Lsn(0x40));
1645 : new_writer.drop_relation(TESTREL_A)?;
1646 : new_writer.finish()?;
1647 :
1648 : // Check that it's no longer listed on the branch after the point where it was dropped
1649 : assert!(newtline
1650 : .list_rels(0, TESTDB, Lsn(0x30))?
1651 : .contains(&TESTREL_A));
1652 : assert!(!newtline
1653 : .list_rels(0, TESTDB, Lsn(0x40))?
1654 : .contains(&TESTREL_A));
1655 :
1656 : // Run checkpoint and garbage collection and check that it's still not visible
1657 : newtline.checkpoint(CheckpointConfig::Forced)?;
1658 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
1659 :
1660 : assert!(!newtline
1661 : .list_rels(0, TESTDB, Lsn(0x40))?
1662 : .contains(&TESTREL_A));
1663 :
1664 : Ok(())
1665 : }
1666 : */
1667 :
1668 : /*
1669 : #[test]
1670 : fn test_read_beyond_eof() -> Result<()> {
1671 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
1672 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
1673 :
1674 : make_some_layers(&tline, Lsn(0x20))?;
1675 : let mut writer = tline.begin_record(Lsn(0x60));
1676 : walingest.put_rel_page_image(
1677 : &mut writer,
1678 : TESTREL_A,
1679 : 0,
1680 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
1681 : )?;
1682 : writer.finish()?;
1683 :
1684 : // Test read before rel creation. Should error out.
1685 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
1686 :
1687 : // Read block beyond end of relation at different points in time.
1688 : // These reads should fall into different delta, image, and in-memory layers.
1689 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
1690 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
1691 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
1692 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
1693 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
1694 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
1695 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
1696 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
1697 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
1698 :
1699 : // Test on an in-memory layer with no preceding layer
1700 : let mut writer = tline.begin_record(Lsn(0x70));
1701 : walingest.put_rel_page_image(
1702 : &mut writer,
1703 : TESTREL_B,
1704 : 0,
1705 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
1706 : )?;
1707 : writer.finish()?;
1708 :
1709 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
1710 :
1711 : Ok(())
1712 : }
1713 : */
1714 : }
|