Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::repository::*;
13 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
14 : use crate::walrecord::NeonWalRecord;
15 : use anyhow::{ensure, Context};
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use pageserver_api::key::{
18 : dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
19 : rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
20 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
21 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
22 : };
23 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
24 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
25 : use postgres_ffi::BLCKSZ;
26 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
27 : use serde::{Deserialize, Serialize};
28 : use std::collections::{hash_map, HashMap, HashSet};
29 : use std::ops::ControlFlow;
30 : use std::ops::Range;
31 : use strum::IntoEnumIterator;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::{debug, trace, warn};
34 : use utils::bin_ser::DeserializeError;
35 : use utils::{bin_ser::BeSer, lsn::Lsn};
36 :
37 0 : #[derive(Debug)]
38 : pub enum LsnForTimestamp {
39 : /// Found commits both before and after the given timestamp
40 : Present(Lsn),
41 :
42 : /// Found no commits after the given timestamp, this means
43 : /// that the newest data in the branch is older than the given
44 : /// timestamp.
45 : ///
46 : /// All commits <= LSN happened before the given timestamp
47 : Future(Lsn),
48 :
49 : /// The queried timestamp is past our horizon we look back at (PITR)
50 : ///
51 : /// All commits > LSN happened after the given timestamp,
52 : /// but any commits < LSN might have happened before or after
53 : /// the given timestamp. We don't know because no data before
54 : /// the given lsn is available.
55 : Past(Lsn),
56 :
57 : /// We have found no commit with a timestamp,
58 : /// so we can't return anything meaningful.
59 : ///
60 : /// The associated LSN is the lower bound value we can safely
61 : /// create branches on, but no statement is made if it is
62 : /// older or newer than the timestamp.
63 : ///
64 : /// This variant can e.g. be returned right after a
65 : /// cluster import.
66 : NoData(Lsn),
67 : }
68 :
69 0 : #[derive(Debug, thiserror::Error)]
70 : pub enum CalculateLogicalSizeError {
71 : #[error("cancelled")]
72 : Cancelled,
73 : #[error(transparent)]
74 : Other(#[from] anyhow::Error),
75 : }
76 :
77 2 : #[derive(Debug, thiserror::Error)]
78 : pub(crate) enum CollectKeySpaceError {
79 : #[error(transparent)]
80 : Decode(#[from] DeserializeError),
81 : #[error(transparent)]
82 : PageRead(PageReconstructError),
83 : #[error("cancelled")]
84 : Cancelled,
85 : }
86 :
87 : impl From<PageReconstructError> for CollectKeySpaceError {
88 2 : fn from(err: PageReconstructError) -> Self {
89 2 : match err {
90 1 : PageReconstructError::Cancelled => Self::Cancelled,
91 1 : err => Self::PageRead(err),
92 : }
93 2 : }
94 : }
95 :
96 : impl From<PageReconstructError> for CalculateLogicalSizeError {
97 28 : fn from(pre: PageReconstructError) -> Self {
98 28 : match pre {
99 : PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
100 26 : Self::Cancelled
101 : }
102 2 : _ => Self::Other(pre.into()),
103 : }
104 28 : }
105 : }
106 :
107 0 : #[derive(Debug, thiserror::Error)]
108 : pub enum RelationError {
109 : #[error("Relation Already Exists")]
110 : AlreadyExists,
111 : #[error("invalid relnode")]
112 : InvalidRelnode,
113 : #[error(transparent)]
114 : Other(#[from] anyhow::Error),
115 : }
116 :
117 : ///
118 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
119 : /// and other special kinds of files, in a versioned key-value store. The
120 : /// Timeline struct provides the key-value store.
121 : ///
122 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
123 : /// implementation, and might be moved into a separate struct later.
124 : impl Timeline {
125 : /// Start ingesting a WAL record, or other atomic modification of
126 : /// the timeline.
127 : ///
128 : /// This provides a transaction-like interface to perform a bunch
129 : /// of modifications atomically.
130 : ///
131 : /// To ingest a WAL record, call begin_modification(lsn) to get a
132 : /// DatadirModification object. Use the functions in the object to
133 : /// modify the repository state, updating all the pages and metadata
134 : /// that the WAL record affects. When you're done, call commit() to
135 : /// commit the changes.
136 : ///
137 : /// Lsn stored in modification is advanced by `ingest_record` and
138 : /// is used by `commit()` to update `last_record_lsn`.
139 : ///
140 : /// Calling commit() will flush all the changes and reset the state,
141 : /// so the `DatadirModification` struct can be reused to perform the next modification.
142 : ///
143 : /// Note that any pending modifications you make through the
144 : /// modification object won't be visible to calls to the 'get' and list
145 : /// functions of the timeline until you finish! And if you update the
146 : /// same page twice, the last update wins.
147 : ///
148 1059058 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
149 1059058 : where
150 1059058 : Self: Sized,
151 1059058 : {
152 1059058 : DatadirModification {
153 1059058 : tline: self,
154 1059058 : pending_lsns: Vec::new(),
155 1059058 : pending_updates: HashMap::new(),
156 1059058 : pending_deletions: Vec::new(),
157 1059058 : pending_nblocks: 0,
158 1059058 : lsn,
159 1059058 : }
160 1059058 : }
161 :
162 : //------------------------------------------------------------------------------
163 : // Public GET functions
164 : //------------------------------------------------------------------------------
165 :
166 : /// Look up given page version.
167 4512456 : pub(crate) async fn get_rel_page_at_lsn(
168 4512456 : &self,
169 4512456 : tag: RelTag,
170 4512456 : blknum: BlockNumber,
171 4512456 : version: Version<'_>,
172 4512456 : latest: bool,
173 4512456 : ctx: &RequestContext,
174 4512456 : ) -> Result<Bytes, PageReconstructError> {
175 4512456 : if tag.relnode == 0 {
176 0 : return Err(PageReconstructError::Other(
177 0 : RelationError::InvalidRelnode.into(),
178 0 : ));
179 4512456 : }
180 :
181 4512456 : let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
182 4512456 : if blknum >= nblocks {
183 0 : debug!(
184 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
185 0 : tag,
186 0 : blknum,
187 0 : version.get_lsn(),
188 0 : nblocks
189 0 : );
190 123469 : return Ok(ZERO_PAGE.clone());
191 4388987 : }
192 4388987 :
193 4388987 : let key = rel_block_to_key(tag, blknum);
194 4388987 : version.get(self, key, ctx).await
195 4512456 : }
196 :
197 : // Get size of a database in blocks
198 8 : pub(crate) async fn get_db_size(
199 8 : &self,
200 8 : spcnode: Oid,
201 8 : dbnode: Oid,
202 8 : version: Version<'_>,
203 8 : latest: bool,
204 8 : ctx: &RequestContext,
205 8 : ) -> Result<usize, PageReconstructError> {
206 8 : let mut total_blocks = 0;
207 :
208 8 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
209 :
210 2351 : for rel in rels {
211 2343 : let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
212 2343 : total_blocks += n_blocks as usize;
213 : }
214 8 : Ok(total_blocks)
215 8 : }
216 :
217 : /// Get size of a relation file
218 4655845 : pub(crate) async fn get_rel_size(
219 4655845 : &self,
220 4655845 : tag: RelTag,
221 4655845 : version: Version<'_>,
222 4655845 : latest: bool,
223 4655845 : ctx: &RequestContext,
224 4655845 : ) -> Result<BlockNumber, PageReconstructError> {
225 4655845 : if tag.relnode == 0 {
226 0 : return Err(PageReconstructError::Other(
227 0 : RelationError::InvalidRelnode.into(),
228 0 : ));
229 4655845 : }
230 :
231 4655845 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
232 4347101 : return Ok(nblocks);
233 308744 : }
234 308744 :
235 308744 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
236 7608 : && !self.get_rel_exists(tag, version, latest, ctx).await?
237 : {
238 : // FIXME: Postgres sometimes calls smgrcreate() to create
239 : // FSM, and smgrnblocks() on it immediately afterwards,
240 : // without extending it. Tolerate that by claiming that
241 : // any non-existent FSM fork has size 0.
242 11 : return Ok(0);
243 308733 : }
244 308733 :
245 308733 : let key = rel_size_to_key(tag);
246 308733 : let mut buf = version.get(self, key, ctx).await?;
247 308729 : let nblocks = buf.get_u32_le();
248 308729 :
249 308729 : if latest {
250 157378 : // Update relation size cache only if "latest" flag is set.
251 157378 : // This flag is set by compute when it is working with most recent version of relation.
252 157378 : // Typically master compute node always set latest=true.
253 157378 : // Please notice, that even if compute node "by mistake" specifies old LSN but set
254 157378 : // latest=true, then it can not cause cache corruption, because with latest=true
255 157378 : // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
256 157378 : // associated with most recent value of LSN.
257 157378 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
258 157378 : }
259 308729 : Ok(nblocks)
260 4655845 : }
261 :
262 : /// Does relation exist?
263 432499 : pub(crate) async fn get_rel_exists(
264 432499 : &self,
265 432499 : tag: RelTag,
266 432499 : version: Version<'_>,
267 432499 : _latest: bool,
268 432499 : ctx: &RequestContext,
269 432499 : ) -> Result<bool, PageReconstructError> {
270 432499 : if tag.relnode == 0 {
271 0 : return Err(PageReconstructError::Other(
272 0 : RelationError::InvalidRelnode.into(),
273 0 : ));
274 432499 : }
275 :
276 : // first try to lookup relation in cache
277 432499 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
278 169724 : return Ok(true);
279 262775 : }
280 262775 : // fetch directory listing
281 262775 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
282 262775 : let buf = version.get(self, key, ctx).await?;
283 :
284 262775 : match RelDirectory::des(&buf).context("deserialization failure") {
285 262775 : Ok(dir) => {
286 262775 : let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
287 262775 : Ok(exists)
288 : }
289 0 : Err(e) => Err(PageReconstructError::from(e)),
290 : }
291 432499 : }
292 :
293 : /// Get a list of all existing relations in given tablespace and database.
294 : ///
295 : /// # Cancel-Safety
296 : ///
297 : /// This method is cancellation-safe.
298 8125 : pub(crate) async fn list_rels(
299 8125 : &self,
300 8125 : spcnode: Oid,
301 8125 : dbnode: Oid,
302 8125 : version: Version<'_>,
303 8125 : ctx: &RequestContext,
304 8125 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
305 8125 : // fetch directory listing
306 8125 : let key = rel_dir_to_key(spcnode, dbnode);
307 8125 : let buf = version.get(self, key, ctx).await?;
308 :
309 8125 : match RelDirectory::des(&buf).context("deserialization failure") {
310 8125 : Ok(dir) => {
311 8125 : let rels: HashSet<RelTag> =
312 1923108 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
313 1923108 : spcnode,
314 1923108 : dbnode,
315 1923108 : relnode: *relnode,
316 1923108 : forknum: *forknum,
317 1923108 : }));
318 8125 :
319 8125 : Ok(rels)
320 : }
321 0 : Err(e) => Err(PageReconstructError::from(e)),
322 : }
323 8125 : }
324 :
325 : /// Get the whole SLRU segment
326 0 : pub(crate) async fn get_slru_segment(
327 0 : &self,
328 0 : kind: SlruKind,
329 0 : segno: u32,
330 0 : lsn: Lsn,
331 0 : ctx: &RequestContext,
332 0 : ) -> Result<Bytes, PageReconstructError> {
333 0 : let n_blocks = self
334 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
335 0 : .await?;
336 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
337 0 : for blkno in 0..n_blocks {
338 0 : let block = self
339 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
340 0 : .await?;
341 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
342 : }
343 0 : Ok(segment.freeze())
344 0 : }
345 :
346 : /// Look up given SLRU page version.
347 2550 : pub(crate) async fn get_slru_page_at_lsn(
348 2550 : &self,
349 2550 : kind: SlruKind,
350 2550 : segno: u32,
351 2550 : blknum: BlockNumber,
352 2550 : lsn: Lsn,
353 2550 : ctx: &RequestContext,
354 2550 : ) -> Result<Bytes, PageReconstructError> {
355 2550 : let key = slru_block_to_key(kind, segno, blknum);
356 264452 : self.get(key, lsn, ctx).await
357 2548 : }
358 :
359 : /// Get size of an SLRU segment
360 5991 : pub(crate) async fn get_slru_segment_size(
361 5991 : &self,
362 5991 : kind: SlruKind,
363 5991 : segno: u32,
364 5991 : version: Version<'_>,
365 5991 : ctx: &RequestContext,
366 5991 : ) -> Result<BlockNumber, PageReconstructError> {
367 5991 : let key = slru_segment_size_to_key(kind, segno);
368 5991 : let mut buf = version.get(self, key, ctx).await?;
369 5991 : Ok(buf.get_u32_le())
370 5991 : }
371 :
372 : /// Get size of an SLRU segment
373 1646 : pub(crate) async fn get_slru_segment_exists(
374 1646 : &self,
375 1646 : kind: SlruKind,
376 1646 : segno: u32,
377 1646 : version: Version<'_>,
378 1646 : ctx: &RequestContext,
379 1646 : ) -> Result<bool, PageReconstructError> {
380 1646 : // fetch directory listing
381 1646 : let key = slru_dir_to_key(kind);
382 1646 : let buf = version.get(self, key, ctx).await?;
383 :
384 1646 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
385 1646 : Ok(dir) => {
386 1646 : let exists = dir.segments.get(&segno).is_some();
387 1646 : Ok(exists)
388 : }
389 0 : Err(e) => Err(PageReconstructError::from(e)),
390 : }
391 1646 : }
392 :
393 : /// Locate LSN, such that all transactions that committed before
394 : /// 'search_timestamp' are visible, but nothing newer is.
395 : ///
396 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
397 : /// so it's not well defined which LSN you get if there were multiple commits
398 : /// "in flight" at that point in time.
399 : ///
400 159 : pub(crate) async fn find_lsn_for_timestamp(
401 159 : &self,
402 159 : search_timestamp: TimestampTz,
403 159 : cancel: &CancellationToken,
404 159 : ctx: &RequestContext,
405 159 : ) -> Result<LsnForTimestamp, PageReconstructError> {
406 159 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
407 159 : // We use this method to figure out the branching LSN for the new branch, but the
408 159 : // GC cutoff could be before the branching point and we cannot create a new branch
409 159 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
410 159 : // on the safe side.
411 159 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
412 159 : let max_lsn = self.get_last_record_lsn();
413 159 :
414 159 : // LSNs are always 8-byte aligned. low/mid/high represent the
415 159 : // LSN divided by 8.
416 159 : let mut low = min_lsn.0 / 8;
417 159 : let mut high = max_lsn.0 / 8 + 1;
418 159 :
419 159 : let mut found_smaller = false;
420 159 : let mut found_larger = false;
421 2697 : while low < high {
422 2540 : if cancel.is_cancelled() {
423 0 : return Err(PageReconstructError::Cancelled);
424 2540 : }
425 2540 : // cannot overflow, high and low are both smaller than u64::MAX / 2
426 2540 : let mid = (high + low) / 2;
427 :
428 2540 : let cmp = self
429 2540 : .is_latest_commit_timestamp_ge_than(
430 2540 : search_timestamp,
431 2540 : Lsn(mid * 8),
432 2540 : &mut found_smaller,
433 2540 : &mut found_larger,
434 2540 : ctx,
435 2540 : )
436 266255 : .await?;
437 :
438 2538 : if cmp {
439 708 : high = mid;
440 1830 : } else {
441 1830 : low = mid + 1;
442 1830 : }
443 : }
444 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
445 : // so the LSN of the last commit record before or at `search_timestamp`.
446 : // Remove one from `low` to get `t`.
447 : //
448 : // FIXME: it would be better to get the LSN of the previous commit.
449 : // Otherwise, if you restore to the returned LSN, the database will
450 : // include physical changes from later commits that will be marked
451 : // as aborted, and will need to be vacuumed away.
452 157 : let commit_lsn = Lsn((low - 1) * 8);
453 157 : match (found_smaller, found_larger) {
454 : (false, false) => {
455 : // This can happen if no commit records have been processed yet, e.g.
456 : // just after importing a cluster.
457 21 : Ok(LsnForTimestamp::NoData(min_lsn))
458 : }
459 : (false, true) => {
460 : // Didn't find any commit timestamps smaller than the request
461 18 : Ok(LsnForTimestamp::Past(min_lsn))
462 : }
463 : (true, false) => {
464 : // Only found commits with timestamps smaller than the request.
465 : // It's still a valid case for branch creation, return it.
466 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
467 : // case, anyway.
468 62 : Ok(LsnForTimestamp::Future(commit_lsn))
469 : }
470 56 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
471 : }
472 157 : }
473 :
474 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
475 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
476 : ///
477 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
478 : /// with a smaller/larger timestamp.
479 : ///
480 2540 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
481 2540 : &self,
482 2540 : search_timestamp: TimestampTz,
483 2540 : probe_lsn: Lsn,
484 2540 : found_smaller: &mut bool,
485 2540 : found_larger: &mut bool,
486 2540 : ctx: &RequestContext,
487 2540 : ) -> Result<bool, PageReconstructError> {
488 2540 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
489 2331 : if timestamp >= search_timestamp {
490 708 : *found_larger = true;
491 708 : return ControlFlow::Break(true);
492 1623 : } else {
493 1623 : *found_smaller = true;
494 1623 : }
495 1623 : ControlFlow::Continue(())
496 2540 : })
497 266255 : .await
498 2538 : }
499 :
500 : /// Obtain the possible timestamp range for the given lsn.
501 : ///
502 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
503 12 : pub(crate) async fn get_timestamp_for_lsn(
504 12 : &self,
505 12 : probe_lsn: Lsn,
506 12 : ctx: &RequestContext,
507 12 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
508 12 : let mut max: Option<TimestampTz> = None;
509 12 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
510 10 : if let Some(max_prev) = max {
511 0 : max = Some(max_prev.max(timestamp));
512 10 : } else {
513 10 : max = Some(timestamp);
514 10 : }
515 10 : ControlFlow::Continue(())
516 12 : })
517 80 : .await?;
518 :
519 10 : Ok(max)
520 12 : }
521 :
522 : /// Runs the given function on all the timestamps for a given lsn
523 : ///
524 : /// The return value is either given by the closure, or set to the `Default`
525 : /// impl's output.
526 2552 : async fn map_all_timestamps<T: Default>(
527 2552 : &self,
528 2552 : probe_lsn: Lsn,
529 2552 : ctx: &RequestContext,
530 2552 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
531 2552 : ) -> Result<T, PageReconstructError> {
532 2552 : for segno in self
533 2552 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
534 923 : .await?
535 : {
536 2550 : let nblocks = self
537 2550 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
538 960 : .await?;
539 2550 : for blknum in (0..nblocks).rev() {
540 2550 : let clog_page = self
541 2550 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
542 264452 : .await?;
543 :
544 2548 : if clog_page.len() == BLCKSZ as usize + 8 {
545 2341 : let mut timestamp_bytes = [0u8; 8];
546 2341 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
547 2341 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
548 2341 :
549 2341 : match f(timestamp) {
550 708 : ControlFlow::Break(b) => return Ok(b),
551 1633 : ControlFlow::Continue(()) => (),
552 : }
553 207 : }
554 : }
555 : }
556 1840 : Ok(Default::default())
557 2550 : }
558 :
559 607 : pub(crate) async fn get_slru_keyspace(
560 607 : &self,
561 607 : version: Version<'_>,
562 607 : ctx: &RequestContext,
563 607 : ) -> Result<KeySpace, PageReconstructError> {
564 607 : let mut accum = KeySpaceAccum::new();
565 :
566 2410 : for kind in SlruKind::iter() {
567 1809 : let mut segments: Vec<u32> = self
568 1809 : .list_slru_segments(kind, version, ctx)
569 162 : .await?
570 1803 : .into_iter()
571 1803 : .collect();
572 1803 : segments.sort_unstable();
573 :
574 3642 : for seg in segments {
575 1839 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
576 :
577 1839 : accum.add_range(
578 1839 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
579 1839 : );
580 : }
581 : }
582 :
583 601 : Ok(accum.to_keyspace())
584 607 : }
585 :
586 : /// Get a list of SLRU segments
587 4364 : pub(crate) async fn list_slru_segments(
588 4364 : &self,
589 4364 : kind: SlruKind,
590 4364 : version: Version<'_>,
591 4364 : ctx: &RequestContext,
592 4364 : ) -> Result<HashSet<u32>, PageReconstructError> {
593 4364 : // fetch directory entry
594 4364 : let key = slru_dir_to_key(kind);
595 :
596 4364 : let buf = version.get(self, key, ctx).await?;
597 4356 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
598 4356 : Ok(dir) => Ok(dir.segments),
599 0 : Err(e) => Err(PageReconstructError::from(e)),
600 : }
601 4364 : }
602 :
603 2438 : pub(crate) async fn get_relmap_file(
604 2438 : &self,
605 2438 : spcnode: Oid,
606 2438 : dbnode: Oid,
607 2438 : version: Version<'_>,
608 2438 : ctx: &RequestContext,
609 2438 : ) -> Result<Bytes, PageReconstructError> {
610 2438 : let key = relmap_file_key(spcnode, dbnode);
611 :
612 2438 : let buf = version.get(self, key, ctx).await?;
613 2438 : Ok(buf)
614 2438 : }
615 :
616 601 : pub(crate) async fn list_dbdirs(
617 601 : &self,
618 601 : lsn: Lsn,
619 601 : ctx: &RequestContext,
620 601 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
621 : // fetch directory entry
622 601 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
623 :
624 601 : match DbDirectory::des(&buf).context("deserialization failure") {
625 601 : Ok(dir) => Ok(dir.dbdirs),
626 0 : Err(e) => Err(PageReconstructError::from(e)),
627 : }
628 601 : }
629 :
630 2 : pub(crate) async fn get_twophase_file(
631 2 : &self,
632 2 : xid: TransactionId,
633 2 : lsn: Lsn,
634 2 : ctx: &RequestContext,
635 2 : ) -> Result<Bytes, PageReconstructError> {
636 2 : let key = twophase_file_key(xid);
637 2 : let buf = self.get(key, lsn, ctx).await?;
638 2 : Ok(buf)
639 2 : }
640 :
641 601 : pub(crate) async fn list_twophase_files(
642 601 : &self,
643 601 : lsn: Lsn,
644 601 : ctx: &RequestContext,
645 601 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
646 : // fetch directory entry
647 601 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
648 :
649 601 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
650 601 : Ok(dir) => Ok(dir.xids),
651 0 : Err(e) => Err(PageReconstructError::from(e)),
652 : }
653 601 : }
654 :
655 595 : pub(crate) async fn get_control_file(
656 595 : &self,
657 595 : lsn: Lsn,
658 595 : ctx: &RequestContext,
659 595 : ) -> Result<Bytes, PageReconstructError> {
660 595 : self.get(CONTROLFILE_KEY, lsn, ctx).await
661 595 : }
662 :
663 1933 : pub(crate) async fn get_checkpoint(
664 1933 : &self,
665 1933 : lsn: Lsn,
666 1933 : ctx: &RequestContext,
667 1933 : ) -> Result<Bytes, PageReconstructError> {
668 1933 : self.get(CHECKPOINT_KEY, lsn, ctx).await
669 1933 : }
670 :
671 2416 : pub(crate) async fn list_aux_files(
672 2416 : &self,
673 2416 : lsn: Lsn,
674 2416 : ctx: &RequestContext,
675 2416 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
676 2416 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
677 1420 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
678 1420 : Ok(dir) => Ok(dir.files),
679 0 : Err(e) => Err(PageReconstructError::from(e)),
680 : },
681 996 : Err(e) => {
682 : // This is expected: historical databases do not have the key.
683 0 : debug!("Failed to get info about AUX files: {}", e);
684 996 : Ok(HashMap::new())
685 : }
686 : }
687 2416 : }
688 :
689 : /// Does the same as get_current_logical_size but counted on demand.
690 : /// Used to initialize the logical size tracking on startup.
691 : ///
692 : /// Only relation blocks are counted currently. That excludes metadata,
693 : /// SLRUs, twophase files etc.
694 : ///
695 : /// # Cancel-Safety
696 : ///
697 : /// This method is cancellation-safe.
698 694 : pub async fn get_current_logical_size_non_incremental(
699 694 : &self,
700 694 : lsn: Lsn,
701 694 : ctx: &RequestContext,
702 694 : ) -> Result<u64, CalculateLogicalSizeError> {
703 694 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
704 :
705 : // Fetch list of database dirs and iterate them
706 959 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
707 689 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
708 :
709 689 : let mut total_size: u64 = 0;
710 2725 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
711 631853 : for rel in self
712 2725 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
713 445 : .await?
714 : {
715 631853 : if self.cancel.is_cancelled() {
716 3 : return Err(CalculateLogicalSizeError::Cancelled);
717 631850 : }
718 631850 : let relsize_key = rel_size_to_key(rel);
719 631850 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
720 631823 : let relsize = buf.get_u32_le();
721 631823 :
722 631823 : total_size += relsize as u64;
723 : }
724 : }
725 659 : Ok(total_size * BLCKSZ as u64)
726 690 : }
727 :
728 : ///
729 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
730 : /// Anything that's not listed maybe removed from the underlying storage (from
731 : /// that LSN forwards).
732 891 : pub(crate) async fn collect_keyspace(
733 891 : &self,
734 891 : lsn: Lsn,
735 891 : ctx: &RequestContext,
736 891 : ) -> Result<KeySpace, CollectKeySpaceError> {
737 891 : // Iterate through key ranges, greedily packing them into partitions
738 891 : let mut result = KeySpaceAccum::new();
739 891 :
740 891 : // The dbdir metadata always exists
741 891 : result.add_key(DBDIR_KEY);
742 :
743 : // Fetch list of database dirs and iterate them
744 2274 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
745 890 : let dbdir = DbDirectory::des(&buf)?;
746 :
747 890 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
748 890 : dbs.sort_unstable();
749 3837 : for (spcnode, dbnode) in dbs {
750 2950 : result.add_key(relmap_file_key(spcnode, dbnode));
751 2950 : result.add_key(rel_dir_to_key(spcnode, dbnode));
752 :
753 2950 : let mut rels: Vec<RelTag> = self
754 2950 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
755 645 : .await?
756 2950 : .into_iter()
757 2950 : .collect();
758 2950 : rels.sort_unstable();
759 711838 : for rel in rels {
760 708891 : let relsize_key = rel_size_to_key(rel);
761 708891 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
762 708888 : let relsize = buf.get_u32_le();
763 708888 :
764 708888 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
765 708888 : result.add_key(relsize_key);
766 : }
767 : }
768 :
769 : // Iterate SLRUs next
770 2661 : for kind in [
771 887 : SlruKind::Clog,
772 887 : SlruKind::MultiXactMembers,
773 887 : SlruKind::MultiXactOffsets,
774 : ] {
775 2661 : let slrudir_key = slru_dir_to_key(kind);
776 2661 : result.add_key(slrudir_key);
777 8605 : let buf = self.get(slrudir_key, lsn, ctx).await?;
778 2661 : let dir = SlruSegmentDirectory::des(&buf)?;
779 2661 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
780 2661 : segments.sort_unstable();
781 4821 : for segno in segments {
782 2160 : let segsize_key = slru_segment_size_to_key(kind, segno);
783 2160 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
784 2160 : let segsize = buf.get_u32_le();
785 2160 :
786 2160 : result.add_range(
787 2160 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
788 2160 : );
789 2160 : result.add_key(segsize_key);
790 : }
791 : }
792 :
793 : // Then pg_twophase
794 887 : result.add_key(TWOPHASEDIR_KEY);
795 3183 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
796 887 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
797 887 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
798 887 : xids.sort_unstable();
799 887 : for xid in xids {
800 0 : result.add_key(twophase_file_key(xid));
801 0 : }
802 :
803 887 : result.add_key(CONTROLFILE_KEY);
804 887 : result.add_key(CHECKPOINT_KEY);
805 887 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
806 622 : result.add_key(AUX_FILES_KEY);
807 622 : }
808 887 : Ok(result.to_keyspace())
809 889 : }
810 :
811 : /// Get cached size of relation if it not updated after specified LSN
812 62055854 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
813 62055854 : let rel_size_cache = self.rel_size_cache.read().unwrap();
814 62055854 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
815 61697050 : if lsn >= *cached_lsn {
816 61479836 : return Some(*nblocks);
817 217213 : }
818 358804 : }
819 576017 : None
820 62055853 : }
821 :
822 : /// Update cached relation size if there is no more recent update
823 157378 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
824 157378 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
825 157378 : match rel_size_cache.entry(tag) {
826 130459 : hash_map::Entry::Occupied(mut entry) => {
827 130459 : let cached_lsn = entry.get_mut();
828 130459 : if lsn >= cached_lsn.0 {
829 18 : *cached_lsn = (lsn, nblocks);
830 130441 : }
831 : }
832 26919 : hash_map::Entry::Vacant(entry) => {
833 26919 : entry.insert((lsn, nblocks));
834 26919 : }
835 : }
836 157378 : }
837 :
838 : /// Store cached relation size
839 1928423 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
840 1928423 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
841 1928423 : rel_size_cache.insert(tag, (lsn, nblocks));
842 1928423 : }
843 :
844 : /// Remove cached relation size
845 67300 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
846 67300 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
847 67300 : rel_size_cache.remove(tag);
848 67300 : }
849 : }
850 :
851 : /// DatadirModification represents an operation to ingest an atomic set of
852 : /// updates to the repository. It is created by the 'begin_record'
853 : /// function. It is called for each WAL record, so that all the modifications
854 : /// by a one WAL record appear atomic.
855 : pub struct DatadirModification<'a> {
856 : /// The timeline this modification applies to. You can access this to
857 : /// read the state, but note that any pending updates are *not* reflected
858 : /// in the state in 'tline' yet.
859 : pub tline: &'a Timeline,
860 :
861 : /// Current LSN of the modification
862 : lsn: Lsn,
863 :
864 : // The modifications are not applied directly to the underlying key-value store.
865 : // The put-functions add the modifications here, and they are flushed to the
866 : // underlying key-value store by the 'finish' function.
867 : pending_lsns: Vec<Lsn>,
868 : pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
869 : pending_deletions: Vec<(Range<Key>, Lsn)>,
870 : pending_nblocks: i64,
871 : }
872 :
873 : impl<'a> DatadirModification<'a> {
874 : /// Get the current lsn
875 56967530 : pub(crate) fn get_lsn(&self) -> Lsn {
876 56967530 : self.lsn
877 56967530 : }
878 :
879 : /// Set the current lsn
880 73047349 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
881 73047349 : ensure!(
882 73047349 : lsn >= self.lsn,
883 0 : "setting an older lsn {} than {} is not allowed",
884 : lsn,
885 : self.lsn
886 : );
887 73047349 : if lsn > self.lsn {
888 73047349 : self.pending_lsns.push(self.lsn);
889 73047349 : self.lsn = lsn;
890 73047349 : }
891 73047349 : Ok(())
892 73047349 : }
893 :
894 : /// Initialize a completely new repository.
895 : ///
896 : /// This inserts the directory metadata entries that are assumed to
897 : /// always exist.
898 678 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
899 678 : let buf = DbDirectory::ser(&DbDirectory {
900 678 : dbdirs: HashMap::new(),
901 678 : })?;
902 678 : self.put(DBDIR_KEY, Value::Image(buf.into()));
903 678 :
904 678 : // Create AuxFilesDirectory
905 678 : self.init_aux_dir()?;
906 :
907 678 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
908 678 : xids: HashSet::new(),
909 678 : })?;
910 678 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
911 :
912 678 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
913 678 : let empty_dir = Value::Image(buf);
914 678 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
915 678 : self.put(
916 678 : slru_dir_to_key(SlruKind::MultiXactMembers),
917 678 : empty_dir.clone(),
918 678 : );
919 678 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
920 678 :
921 678 : Ok(())
922 678 : }
923 :
924 : #[cfg(test)]
925 70 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
926 70 : self.init_empty()?;
927 70 : self.put_control_file(bytes::Bytes::from_static(
928 70 : b"control_file contents do not matter",
929 70 : ))
930 70 : .context("put_control_file")?;
931 70 : self.put_checkpoint(bytes::Bytes::from_static(
932 70 : b"checkpoint_file contents do not matter",
933 70 : ))
934 70 : .context("put_checkpoint_file")?;
935 70 : Ok(())
936 70 : }
937 :
938 : /// Put a new page version that can be constructed from a WAL record
939 : ///
940 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
941 : /// current end-of-file. It's up to the caller to check that the relation size
942 : /// matches the blocks inserted!
943 53007413 : pub fn put_rel_wal_record(
944 53007413 : &mut self,
945 53007413 : rel: RelTag,
946 53007413 : blknum: BlockNumber,
947 53007413 : rec: NeonWalRecord,
948 53007413 : ) -> anyhow::Result<()> {
949 53007413 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
950 53007413 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
951 53007413 : Ok(())
952 53007413 : }
953 :
954 : // Same, but for an SLRU.
955 6182727 : pub fn put_slru_wal_record(
956 6182727 : &mut self,
957 6182727 : kind: SlruKind,
958 6182727 : segno: u32,
959 6182727 : blknum: BlockNumber,
960 6182727 : rec: NeonWalRecord,
961 6182727 : ) -> anyhow::Result<()> {
962 6182727 : self.put(
963 6182727 : slru_block_to_key(kind, segno, blknum),
964 6182727 : Value::WalRecord(rec),
965 6182727 : );
966 6182727 : Ok(())
967 6182727 : }
968 :
969 : /// Like put_wal_record, but with ready-made image of the page.
970 2579711 : pub fn put_rel_page_image(
971 2579711 : &mut self,
972 2579711 : rel: RelTag,
973 2579711 : blknum: BlockNumber,
974 2579711 : img: Bytes,
975 2579711 : ) -> anyhow::Result<()> {
976 2579711 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
977 2579711 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
978 2579711 : Ok(())
979 2579711 : }
980 :
981 3476 : pub fn put_slru_page_image(
982 3476 : &mut self,
983 3476 : kind: SlruKind,
984 3476 : segno: u32,
985 3476 : blknum: BlockNumber,
986 3476 : img: Bytes,
987 3476 : ) -> anyhow::Result<()> {
988 3476 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
989 3476 : Ok(())
990 3476 : }
991 :
992 : /// Store a relmapper file (pg_filenode.map) in the repository
993 2522 : pub async fn put_relmap_file(
994 2522 : &mut self,
995 2522 : spcnode: Oid,
996 2522 : dbnode: Oid,
997 2522 : img: Bytes,
998 2522 : ctx: &RequestContext,
999 2522 : ) -> anyhow::Result<()> {
1000 : // Add it to the directory (if it doesn't exist already)
1001 2522 : let buf = self.get(DBDIR_KEY, ctx).await?;
1002 2522 : let mut dbdir = DbDirectory::des(&buf)?;
1003 :
1004 2522 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1005 2522 : if r.is_none() || r == Some(false) {
1006 : // The dbdir entry didn't exist, or it contained a
1007 : // 'false'. The 'insert' call already updated it with
1008 : // 'true', now write the updated 'dbdirs' map back.
1009 2460 : let buf = DbDirectory::ser(&dbdir)?;
1010 2460 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1011 2460 :
1012 2460 : // Create AuxFilesDirectory as well
1013 2460 : self.init_aux_dir()?;
1014 62 : }
1015 2522 : if r.is_none() {
1016 36 : // Create RelDirectory
1017 36 : let buf = RelDirectory::ser(&RelDirectory {
1018 36 : rels: HashSet::new(),
1019 36 : })?;
1020 36 : self.put(
1021 36 : rel_dir_to_key(spcnode, dbnode),
1022 36 : Value::Image(Bytes::from(buf)),
1023 36 : );
1024 2486 : }
1025 :
1026 2522 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1027 2522 : Ok(())
1028 2522 : }
1029 :
1030 4 : pub async fn put_twophase_file(
1031 4 : &mut self,
1032 4 : xid: TransactionId,
1033 4 : img: Bytes,
1034 4 : ctx: &RequestContext,
1035 4 : ) -> anyhow::Result<()> {
1036 : // Add it to the directory entry
1037 4 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1038 4 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1039 4 : if !dir.xids.insert(xid) {
1040 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1041 4 : }
1042 4 : self.put(
1043 4 : TWOPHASEDIR_KEY,
1044 4 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1045 : );
1046 :
1047 4 : self.put(twophase_file_key(xid), Value::Image(img));
1048 4 : Ok(())
1049 4 : }
1050 :
1051 676 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1052 676 : self.put(CONTROLFILE_KEY, Value::Image(img));
1053 676 : Ok(())
1054 676 : }
1055 :
1056 35060 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1057 35060 : self.put(CHECKPOINT_KEY, Value::Image(img));
1058 35060 : Ok(())
1059 35060 : }
1060 :
1061 3 : pub async fn drop_dbdir(
1062 3 : &mut self,
1063 3 : spcnode: Oid,
1064 3 : dbnode: Oid,
1065 3 : ctx: &RequestContext,
1066 3 : ) -> anyhow::Result<()> {
1067 3 : let total_blocks = self
1068 3 : .tline
1069 3 : .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
1070 0 : .await?;
1071 :
1072 : // Remove entry from dbdir
1073 3 : let buf = self.get(DBDIR_KEY, ctx).await?;
1074 3 : let mut dir = DbDirectory::des(&buf)?;
1075 3 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1076 3 : let buf = DbDirectory::ser(&dir)?;
1077 3 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1078 : } else {
1079 0 : warn!(
1080 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1081 0 : spcnode, dbnode
1082 0 : );
1083 : }
1084 :
1085 : // Update logical database size.
1086 3 : self.pending_nblocks -= total_blocks as i64;
1087 3 :
1088 3 : // Delete all relations and metadata files for the spcnode/dnode
1089 3 : self.delete(dbdir_key_range(spcnode, dbnode));
1090 3 : Ok(())
1091 3 : }
1092 :
1093 : /// Create a relation fork.
1094 : ///
1095 : /// 'nblocks' is the initial size.
1096 651289 : pub async fn put_rel_creation(
1097 651289 : &mut self,
1098 651289 : rel: RelTag,
1099 651289 : nblocks: BlockNumber,
1100 651289 : ctx: &RequestContext,
1101 651289 : ) -> Result<(), RelationError> {
1102 651289 : if rel.relnode == 0 {
1103 0 : return Err(RelationError::InvalidRelnode);
1104 651289 : }
1105 : // It's possible that this is the first rel for this db in this
1106 : // tablespace. Create the reldir entry for it if so.
1107 651289 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1108 651289 : .context("deserialize db")?;
1109 651289 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1110 651289 : let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
1111 : // Didn't exist. Update dbdir
1112 2429 : dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
1113 2429 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1114 2429 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1115 2429 :
1116 2429 : // and create the RelDirectory
1117 2429 : RelDirectory::default()
1118 : } else {
1119 : // reldir already exists, fetch it
1120 648860 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1121 648860 : .context("deserialize db")?
1122 : };
1123 :
1124 : // Add the new relation to the rel directory entry, and write it back
1125 651289 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1126 0 : return Err(RelationError::AlreadyExists);
1127 651289 : }
1128 651289 : self.put(
1129 651289 : rel_dir_key,
1130 651289 : Value::Image(Bytes::from(
1131 651289 : RelDirectory::ser(&rel_dir).context("serialize")?,
1132 : )),
1133 : );
1134 :
1135 : // Put size
1136 651289 : let size_key = rel_size_to_key(rel);
1137 651289 : let buf = nblocks.to_le_bytes();
1138 651289 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1139 651289 :
1140 651289 : self.pending_nblocks += nblocks as i64;
1141 651289 :
1142 651289 : // Update relation size cache
1143 651289 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1144 651289 :
1145 651289 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1146 651289 : // caller.
1147 651289 : Ok(())
1148 651289 : }
1149 :
1150 : /// Truncate relation
1151 6359 : pub async fn put_rel_truncation(
1152 6359 : &mut self,
1153 6359 : rel: RelTag,
1154 6359 : nblocks: BlockNumber,
1155 6359 : ctx: &RequestContext,
1156 6359 : ) -> anyhow::Result<()> {
1157 6359 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1158 6359 : if self
1159 6359 : .tline
1160 6359 : .get_rel_exists(rel, Version::Modified(self), true, ctx)
1161 0 : .await?
1162 : {
1163 6359 : let size_key = rel_size_to_key(rel);
1164 : // Fetch the old size first
1165 6359 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1166 6359 :
1167 6359 : // Update the entry with the new size.
1168 6359 : let buf = nblocks.to_le_bytes();
1169 6359 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1170 6359 :
1171 6359 : // Update relation size cache
1172 6359 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1173 6359 :
1174 6359 : // Update relation size cache
1175 6359 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1176 6359 :
1177 6359 : // Update logical database size.
1178 6359 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1179 0 : }
1180 6359 : Ok(())
1181 6359 : }
1182 :
1183 : /// Extend relation
1184 : /// If new size is smaller, do nothing.
1185 1829664 : pub async fn put_rel_extend(
1186 1829664 : &mut self,
1187 1829664 : rel: RelTag,
1188 1829664 : nblocks: BlockNumber,
1189 1829664 : ctx: &RequestContext,
1190 1829664 : ) -> anyhow::Result<()> {
1191 1829664 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1192 :
1193 : // Put size
1194 1829664 : let size_key = rel_size_to_key(rel);
1195 1829664 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1196 1829661 :
1197 1829661 : // only extend relation here. never decrease the size
1198 1829661 : if nblocks > old_size {
1199 1264416 : let buf = nblocks.to_le_bytes();
1200 1264416 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1201 1264416 :
1202 1264416 : // Update relation size cache
1203 1264416 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1204 1264416 :
1205 1264416 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1206 1264416 : }
1207 1829661 : Ok(())
1208 1829664 : }
1209 :
1210 : /// Drop a relation.
1211 67300 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1212 67300 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1213 :
1214 : // Remove it from the directory entry
1215 67300 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1216 67300 : let buf = self.get(dir_key, ctx).await?;
1217 67300 : let mut dir = RelDirectory::des(&buf)?;
1218 :
1219 67300 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1220 67300 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1221 : } else {
1222 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1223 : }
1224 :
1225 : // update logical size
1226 67300 : let size_key = rel_size_to_key(rel);
1227 67300 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1228 67300 : self.pending_nblocks -= old_size as i64;
1229 67300 :
1230 67300 : // Remove enty from relation size cache
1231 67300 : self.tline.remove_cached_rel_size(&rel);
1232 67300 :
1233 67300 : // Delete size entry, as well as all blocks
1234 67300 : self.delete(rel_key_range(rel));
1235 67300 :
1236 67300 : Ok(())
1237 67300 : }
1238 :
1239 1866 : pub async fn put_slru_segment_creation(
1240 1866 : &mut self,
1241 1866 : kind: SlruKind,
1242 1866 : segno: u32,
1243 1866 : nblocks: BlockNumber,
1244 1866 : ctx: &RequestContext,
1245 1866 : ) -> anyhow::Result<()> {
1246 1866 : // Add it to the directory entry
1247 1866 : let dir_key = slru_dir_to_key(kind);
1248 1866 : let buf = self.get(dir_key, ctx).await?;
1249 1866 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1250 :
1251 1866 : if !dir.segments.insert(segno) {
1252 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1253 1866 : }
1254 1866 : self.put(
1255 1866 : dir_key,
1256 1866 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1257 : );
1258 :
1259 : // Put size
1260 1866 : let size_key = slru_segment_size_to_key(kind, segno);
1261 1866 : let buf = nblocks.to_le_bytes();
1262 1866 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1263 1866 :
1264 1866 : // even if nblocks > 0, we don't insert any actual blocks here
1265 1866 :
1266 1866 : Ok(())
1267 1866 : }
1268 :
1269 : /// Extend SLRU segment
1270 1622 : pub fn put_slru_extend(
1271 1622 : &mut self,
1272 1622 : kind: SlruKind,
1273 1622 : segno: u32,
1274 1622 : nblocks: BlockNumber,
1275 1622 : ) -> anyhow::Result<()> {
1276 1622 : // Put size
1277 1622 : let size_key = slru_segment_size_to_key(kind, segno);
1278 1622 : let buf = nblocks.to_le_bytes();
1279 1622 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1280 1622 : Ok(())
1281 1622 : }
1282 :
1283 : /// This method is used for marking truncated SLRU files
1284 18 : pub async fn drop_slru_segment(
1285 18 : &mut self,
1286 18 : kind: SlruKind,
1287 18 : segno: u32,
1288 18 : ctx: &RequestContext,
1289 18 : ) -> anyhow::Result<()> {
1290 18 : // Remove it from the directory entry
1291 18 : let dir_key = slru_dir_to_key(kind);
1292 18 : let buf = self.get(dir_key, ctx).await?;
1293 18 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1294 :
1295 18 : if !dir.segments.remove(&segno) {
1296 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1297 18 : }
1298 18 : self.put(
1299 18 : dir_key,
1300 18 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1301 : );
1302 :
1303 : // Delete size entry, as well as all blocks
1304 18 : self.delete(slru_segment_key_range(kind, segno));
1305 18 :
1306 18 : Ok(())
1307 18 : }
1308 :
1309 : /// Drop a relmapper file (pg_filenode.map)
1310 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1311 0 : // TODO
1312 0 : Ok(())
1313 0 : }
1314 :
1315 : /// This method is used for marking truncated SLRU files
1316 2 : pub async fn drop_twophase_file(
1317 2 : &mut self,
1318 2 : xid: TransactionId,
1319 2 : ctx: &RequestContext,
1320 2 : ) -> anyhow::Result<()> {
1321 : // Remove it from the directory entry
1322 2 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1323 2 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1324 :
1325 2 : if !dir.xids.remove(&xid) {
1326 0 : warn!("twophase file for xid {} does not exist", xid);
1327 2 : }
1328 2 : self.put(
1329 2 : TWOPHASEDIR_KEY,
1330 2 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1331 : );
1332 :
1333 : // Delete it
1334 2 : self.delete(twophase_key_range(xid));
1335 2 :
1336 2 : Ok(())
1337 2 : }
1338 :
1339 3138 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1340 3138 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1341 3138 : files: HashMap::new(),
1342 3138 : })?;
1343 3138 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1344 3138 : Ok(())
1345 3138 : }
1346 :
1347 118 : pub async fn put_file(
1348 118 : &mut self,
1349 118 : path: &str,
1350 118 : content: &[u8],
1351 118 : ctx: &RequestContext,
1352 118 : ) -> anyhow::Result<()> {
1353 118 : let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
1354 115 : Ok(buf) => AuxFilesDirectory::des(&buf)?,
1355 3 : Err(e) => {
1356 : // This is expected: historical databases do not have the key.
1357 0 : debug!("Failed to get info about AUX files: {}", e);
1358 3 : AuxFilesDirectory {
1359 3 : files: HashMap::new(),
1360 3 : }
1361 : }
1362 : };
1363 118 : let path = path.to_string();
1364 118 : if content.is_empty() {
1365 5 : dir.files.remove(&path);
1366 113 : } else {
1367 113 : dir.files.insert(path, Bytes::copy_from_slice(content));
1368 113 : }
1369 118 : self.put(
1370 118 : AUX_FILES_KEY,
1371 118 : Value::Image(Bytes::from(
1372 118 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1373 : )),
1374 : );
1375 118 : Ok(())
1376 118 : }
1377 :
1378 : ///
1379 : /// Flush changes accumulated so far to the underlying repository.
1380 : ///
1381 : /// Usually, changes made in DatadirModification are atomic, but this allows
1382 : /// you to flush them to the underlying repository before the final `commit`.
1383 : /// That allows to free up the memory used to hold the pending changes.
1384 : ///
1385 : /// Currently only used during bulk import of a data directory. In that
1386 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1387 : /// whole import fails and the timeline will be deleted anyway.
1388 : /// (Or to be precise, it will be left behind for debugging purposes and
1389 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1390 : ///
1391 : /// Note: A consequence of flushing the pending operations is that they
1392 : /// won't be visible to subsequent operations until `commit`. The function
1393 : /// retains all the metadata, but data pages are flushed. That's again OK
1394 : /// for bulk import, where you are just loading data pages and won't try to
1395 : /// modify the same pages twice.
1396 576178 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1397 576178 : // Unless we have accumulated a decent amount of changes, it's not worth it
1398 576178 : // to scan through the pending_updates list.
1399 576178 : let pending_nblocks = self.pending_nblocks;
1400 576178 : if pending_nblocks < 10000 {
1401 576178 : return Ok(());
1402 0 : }
1403 :
1404 0 : let writer = self.tline.writer().await;
1405 :
1406 : // Flush relation and SLRU data blocks, keep metadata.
1407 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1408 0 : for (key, values) in self.pending_updates.drain() {
1409 0 : for (lsn, value) in values {
1410 0 : if is_rel_block_key(&key) || is_slru_block_key(key) {
1411 : // This bails out on first error without modifying pending_updates.
1412 : // That's Ok, cf this function's doc comment.
1413 0 : writer.put(key, lsn, &value, ctx).await?;
1414 0 : } else {
1415 0 : retained_pending_updates
1416 0 : .entry(key)
1417 0 : .or_default()
1418 0 : .push((lsn, value));
1419 0 : }
1420 : }
1421 : }
1422 :
1423 0 : self.pending_updates = retained_pending_updates;
1424 0 :
1425 0 : if pending_nblocks != 0 {
1426 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1427 0 : self.pending_nblocks = 0;
1428 0 : }
1429 :
1430 0 : Ok(())
1431 576178 : }
1432 :
1433 : ///
1434 : /// Finish this atomic update, writing all the updated keys to the
1435 : /// underlying timeline.
1436 : /// All the modifications in this atomic update are stamped by the specified LSN.
1437 : ///
1438 2136802 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1439 2136801 : let writer = self.tline.writer().await;
1440 :
1441 2136801 : let pending_nblocks = self.pending_nblocks;
1442 2136801 : self.pending_nblocks = 0;
1443 2136801 :
1444 2136801 : if !self.pending_updates.is_empty() {
1445 1713238 : writer.put_batch(&self.pending_updates, ctx).await?;
1446 1713238 : self.pending_updates.clear();
1447 423563 : }
1448 :
1449 2136801 : if !self.pending_deletions.is_empty() {
1450 19251 : writer.delete_batch(&self.pending_deletions).await?;
1451 19251 : self.pending_deletions.clear();
1452 2117550 : }
1453 :
1454 2136801 : self.pending_lsns.push(self.lsn);
1455 75183994 : for pending_lsn in self.pending_lsns.drain(..) {
1456 75183994 : // Ideally, we should be able to call writer.finish_write() only once
1457 75183994 : // with the highest LSN. However, the last_record_lsn variable in the
1458 75183994 : // timeline keeps track of the latest LSN and the immediate previous LSN
1459 75183994 : // so we need to record every LSN to not leave a gap between them.
1460 75183994 : writer.finish_write(pending_lsn);
1461 75183994 : }
1462 :
1463 2136801 : if pending_nblocks != 0 {
1464 638850 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1465 1497951 : }
1466 :
1467 2136801 : Ok(())
1468 2136801 : }
1469 :
1470 146094695 : pub(crate) fn len(&self) -> usize {
1471 146094695 : self.pending_updates.len() + self.pending_deletions.len()
1472 146094695 : }
1473 :
1474 : // Internal helper functions to batch the modifications
1475 :
1476 3500003 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1477 : // Have we already updated the same key? Read the latest pending updated
1478 : // version in that case.
1479 : //
1480 : // Note: we don't check pending_deletions. It is an error to request a
1481 : // value that has been removed, deletion only avoids leaking storage.
1482 3500003 : if let Some(values) = self.pending_updates.get(&key) {
1483 2626972 : if let Some((_, value)) = values.last() {
1484 2626972 : return if let Value::Image(img) = value {
1485 2626972 : Ok(img.clone())
1486 : } else {
1487 : // Currently, we never need to read back a WAL record that we
1488 : // inserted in the same "transaction". All the metadata updates
1489 : // work directly with Images, and we never need to read actual
1490 : // data pages. We could handle this if we had to, by calling
1491 : // the walredo manager, but let's keep it simple for now.
1492 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1493 0 : "unexpected pending WAL record"
1494 0 : )))
1495 : };
1496 0 : }
1497 873031 : }
1498 873031 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1499 873031 : self.tline.get(key, lsn, ctx).await
1500 3500003 : }
1501 :
1502 64469194 : fn put(&mut self, key: Key, val: Value) {
1503 64469194 : let values = self.pending_updates.entry(key).or_default();
1504 : // Replace the previous value if it exists at the same lsn
1505 64469194 : if let Some((last_lsn, last_value)) = values.last_mut() {
1506 53943461 : if *last_lsn == self.lsn {
1507 647242 : *last_value = val;
1508 647242 : return;
1509 53296219 : }
1510 10525733 : }
1511 63821952 : values.push((self.lsn, val));
1512 64469194 : }
1513 :
1514 67323 : fn delete(&mut self, key_range: Range<Key>) {
1515 67323 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1516 67323 : self.pending_deletions.push((key_range, self.lsn));
1517 67323 : }
1518 : }
1519 :
1520 : /// This struct facilitates accessing either a committed key from the timeline at a
1521 : /// specific LSN, or the latest uncommitted key from a pending modification.
1522 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1523 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1524 : /// need to look up the keys in the modification first before looking them up in the
1525 : /// timeline to not miss the latest updates.
1526 0 : #[derive(Clone, Copy)]
1527 : pub enum Version<'a> {
1528 : Lsn(Lsn),
1529 : Modified(&'a DatadirModification<'a>),
1530 : }
1531 :
1532 : impl<'a> Version<'a> {
1533 4983060 : async fn get(
1534 4983060 : &self,
1535 4983060 : timeline: &Timeline,
1536 4983060 : key: Key,
1537 4983060 : ctx: &RequestContext,
1538 4983060 : ) -> Result<Bytes, PageReconstructError> {
1539 4983059 : match self {
1540 4758361 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1541 224698 : Version::Modified(modification) => modification.get(key, ctx).await,
1542 : }
1543 4983059 : }
1544 :
1545 5245722 : fn get_lsn(&self) -> Lsn {
1546 5245722 : match self {
1547 4756830 : Version::Lsn(lsn) => *lsn,
1548 488892 : Version::Modified(modification) => modification.lsn,
1549 : }
1550 5245722 : }
1551 : }
1552 :
1553 : //--- Metadata structs stored in key-value pairs in the repository.
1554 :
1555 655994 : #[derive(Debug, Serialize, Deserialize)]
1556 : struct DbDirectory {
1557 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1558 : dbdirs: HashMap<(Oid, Oid), bool>,
1559 : }
1560 :
1561 1494 : #[derive(Debug, Serialize, Deserialize)]
1562 : struct TwoPhaseDirectory {
1563 : xids: HashSet<TransactionId>,
1564 : }
1565 :
1566 1437250 : #[derive(Debug, Serialize, Deserialize, Default)]
1567 : struct RelDirectory {
1568 : // Set of relations that exist. (relfilenode, forknum)
1569 : //
1570 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1571 : // key-value pairs, if you have a lot of relations
1572 : rels: HashSet<(Oid, u8)>,
1573 : }
1574 :
1575 6512 : #[derive(Debug, Serialize, Deserialize, Default)]
1576 : struct AuxFilesDirectory {
1577 : files: HashMap<String, Bytes>,
1578 : }
1579 :
1580 0 : #[derive(Debug, Serialize, Deserialize)]
1581 : struct RelSizeEntry {
1582 : nblocks: u32,
1583 : }
1584 :
1585 10547 : #[derive(Debug, Serialize, Deserialize, Default)]
1586 : struct SlruSegmentDirectory {
1587 : // Set of SLRU segments that exist.
1588 : segments: HashSet<u32>,
1589 : }
1590 :
1591 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1592 :
1593 : #[allow(clippy::bool_assert_comparison)]
1594 : #[cfg(test)]
1595 : mod tests {
1596 : //use super::repo_harness::*;
1597 : //use super::*;
1598 :
1599 : /*
1600 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1601 : let incremental = timeline.get_current_logical_size();
1602 : let non_incremental = timeline
1603 : .get_current_logical_size_non_incremental(lsn)
1604 : .unwrap();
1605 : assert_eq!(incremental, non_incremental);
1606 : }
1607 : */
1608 :
1609 : /*
1610 : ///
1611 : /// Test list_rels() function, with branches and dropped relations
1612 : ///
1613 : #[test]
1614 : fn test_list_rels_drop() -> Result<()> {
1615 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1616 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1617 : const TESTDB: u32 = 111;
1618 :
1619 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1620 : // after branching fails below
1621 : let mut writer = tline.begin_record(Lsn(0x10));
1622 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1623 : writer.finish()?;
1624 :
1625 : // Create a relation on the timeline
1626 : let mut writer = tline.begin_record(Lsn(0x20));
1627 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1628 : writer.finish()?;
1629 :
1630 : let writer = tline.begin_record(Lsn(0x00));
1631 : writer.finish()?;
1632 :
1633 : // Check that list_rels() lists it after LSN 2, but no before it
1634 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1635 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
1636 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
1637 :
1638 : // Create a branch, check that the relation is visible there
1639 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
1640 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
1641 : Some(timeline) => timeline,
1642 : None => panic!("Should have a local timeline"),
1643 : };
1644 : let newtline = DatadirTimelineImpl::new(newtline);
1645 : assert!(newtline
1646 : .list_rels(0, TESTDB, Lsn(0x30))?
1647 : .contains(&TESTREL_A));
1648 :
1649 : // Drop it on the branch
1650 : let mut new_writer = newtline.begin_record(Lsn(0x40));
1651 : new_writer.drop_relation(TESTREL_A)?;
1652 : new_writer.finish()?;
1653 :
1654 : // Check that it's no longer listed on the branch after the point where it was dropped
1655 : assert!(newtline
1656 : .list_rels(0, TESTDB, Lsn(0x30))?
1657 : .contains(&TESTREL_A));
1658 : assert!(!newtline
1659 : .list_rels(0, TESTDB, Lsn(0x40))?
1660 : .contains(&TESTREL_A));
1661 :
1662 : // Run checkpoint and garbage collection and check that it's still not visible
1663 : newtline.checkpoint(CheckpointConfig::Forced)?;
1664 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
1665 :
1666 : assert!(!newtline
1667 : .list_rels(0, TESTDB, Lsn(0x40))?
1668 : .contains(&TESTREL_A));
1669 :
1670 : Ok(())
1671 : }
1672 : */
1673 :
1674 : /*
1675 : #[test]
1676 : fn test_read_beyond_eof() -> Result<()> {
1677 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
1678 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
1679 :
1680 : make_some_layers(&tline, Lsn(0x20))?;
1681 : let mut writer = tline.begin_record(Lsn(0x60));
1682 : walingest.put_rel_page_image(
1683 : &mut writer,
1684 : TESTREL_A,
1685 : 0,
1686 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
1687 : )?;
1688 : writer.finish()?;
1689 :
1690 : // Test read before rel creation. Should error out.
1691 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
1692 :
1693 : // Read block beyond end of relation at different points in time.
1694 : // These reads should fall into different delta, image, and in-memory layers.
1695 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
1696 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
1697 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
1698 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
1699 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
1700 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
1701 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
1702 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
1703 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
1704 :
1705 : // Test on an in-memory layer with no preceding layer
1706 : let mut writer = tline.begin_record(Lsn(0x70));
1707 : walingest.put_rel_page_image(
1708 : &mut writer,
1709 : TESTREL_B,
1710 : 0,
1711 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
1712 : )?;
1713 : writer.finish()?;
1714 :
1715 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
1716 :
1717 : Ok(())
1718 : }
1719 : */
1720 : }
|