Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::repository::*;
13 : use crate::walrecord::NeonWalRecord;
14 : use anyhow::Context;
15 : use bytes::{Buf, Bytes};
16 : use pageserver_api::reltag::{RelTag, SlruKind};
17 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
18 : use postgres_ffi::BLCKSZ;
19 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
20 : use serde::{Deserialize, Serialize};
21 : use std::collections::{hash_map, HashMap, HashSet};
22 : use std::ops::Range;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{debug, trace, warn};
25 : use utils::{bin_ser::BeSer, lsn::Lsn};
26 :
27 : /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
28 : pub type BlockNumber = u32;
29 :
30 0 : #[derive(Debug)]
31 : pub enum LsnForTimestamp {
32 : Present(Lsn),
33 : Future(Lsn),
34 : Past(Lsn),
35 : NoData(Lsn),
36 : }
37 :
38 3 : #[derive(Debug, thiserror::Error)]
39 : pub enum CalculateLogicalSizeError {
40 : #[error("cancelled")]
41 : Cancelled,
42 : #[error(transparent)]
43 : Other(#[from] anyhow::Error),
44 : }
45 :
46 0 : #[derive(Debug, thiserror::Error)]
47 : pub enum RelationError {
48 : #[error("Relation Already Exists")]
49 : AlreadyExists,
50 : #[error("invalid relnode")]
51 : InvalidRelnode,
52 : #[error(transparent)]
53 : Other(#[from] anyhow::Error),
54 : }
55 :
56 : ///
57 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
58 : /// and other special kinds of files, in a versioned key-value store. The
59 : /// Timeline struct provides the key-value store.
60 : ///
61 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
62 : /// implementation, and might be moved into a separate struct later.
63 : impl Timeline {
64 : /// Start ingesting a WAL record, or other atomic modification of
65 : /// the timeline.
66 : ///
67 : /// This provides a transaction-like interface to perform a bunch
68 : /// of modifications atomically.
69 : ///
70 : /// To ingest a WAL record, call begin_modification(lsn) to get a
71 : /// DatadirModification object. Use the functions in the object to
72 : /// modify the repository state, updating all the pages and metadata
73 : /// that the WAL record affects. When you're done, call commit() to
74 : /// commit the changes.
75 : ///
76 : /// Lsn stored in modification is advanced by `ingest_record` and
77 : /// is used by `commit()` to update `last_record_lsn`.
78 : ///
79 : /// Calling commit() will flush all the changes and reset the state,
80 : /// so the `DatadirModification` struct can be reused to perform the next modification.
81 : ///
82 : /// Note that any pending modifications you make through the
83 : /// modification object won't be visible to calls to the 'get' and list
84 : /// functions of the timeline until you finish! And if you update the
85 : /// same page twice, the last update wins.
86 : ///
87 867789 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
88 867789 : where
89 867789 : Self: Sized,
90 867789 : {
91 867789 : DatadirModification {
92 867789 : tline: self,
93 867789 : pending_updates: HashMap::new(),
94 867789 : pending_deletions: Vec::new(),
95 867789 : pending_nblocks: 0,
96 867789 : lsn,
97 867789 : }
98 867789 : }
99 :
100 : //------------------------------------------------------------------------------
101 : // Public GET functions
102 : //------------------------------------------------------------------------------
103 :
104 : /// Look up given page version.
105 4582770 : pub async fn get_rel_page_at_lsn(
106 4582770 : &self,
107 4582770 : tag: RelTag,
108 4582770 : blknum: BlockNumber,
109 4582770 : lsn: Lsn,
110 4582770 : latest: bool,
111 4582770 : ctx: &RequestContext,
112 4582770 : ) -> Result<Bytes, PageReconstructError> {
113 4582770 : if tag.relnode == 0 {
114 0 : return Err(PageReconstructError::Other(
115 0 : RelationError::InvalidRelnode.into(),
116 0 : ));
117 4582770 : }
118 :
119 4582770 : let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
120 4582770 : if blknum >= nblocks {
121 0 : debug!(
122 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
123 0 : tag, blknum, lsn, nblocks
124 0 : );
125 159409 : return Ok(ZERO_PAGE.clone());
126 4423361 : }
127 4423361 :
128 4423361 : let key = rel_block_to_key(tag, blknum);
129 8429505 : self.get(key, lsn, ctx).await
130 4582739 : }
131 :
132 : // Get size of a database in blocks
133 7 : pub async fn get_db_size(
134 7 : &self,
135 7 : spcnode: Oid,
136 7 : dbnode: Oid,
137 7 : lsn: Lsn,
138 7 : latest: bool,
139 7 : ctx: &RequestContext,
140 7 : ) -> Result<usize, PageReconstructError> {
141 7 : let mut total_blocks = 0;
142 :
143 7 : let rels = self.list_rels(spcnode, dbnode, lsn, ctx).await?;
144 :
145 2058 : for rel in rels {
146 2051 : let n_blocks = self.get_rel_size(rel, lsn, latest, ctx).await?;
147 2051 : total_blocks += n_blocks as usize;
148 : }
149 7 : Ok(total_blocks)
150 7 : }
151 :
152 : /// Get size of a relation file
153 80326639 : pub async fn get_rel_size(
154 80326639 : &self,
155 80326639 : tag: RelTag,
156 80326639 : lsn: Lsn,
157 80326639 : latest: bool,
158 80326639 : ctx: &RequestContext,
159 80326760 : ) -> Result<BlockNumber, PageReconstructError> {
160 80326760 : if tag.relnode == 0 {
161 0 : return Err(PageReconstructError::Other(
162 0 : RelationError::InvalidRelnode.into(),
163 0 : ));
164 80326760 : }
165 :
166 80326760 : if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
167 80035393 : return Ok(nblocks);
168 291367 : }
169 291367 :
170 291367 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
171 7343 : && !self.get_rel_exists(tag, lsn, latest, ctx).await?
172 : {
173 : // FIXME: Postgres sometimes calls smgrcreate() to create
174 : // FSM, and smgrnblocks() on it immediately afterwards,
175 : // without extending it. Tolerate that by claiming that
176 : // any non-existent FSM fork has size 0.
177 0 : return Ok(0);
178 291367 : }
179 291367 :
180 291367 : let key = rel_size_to_key(tag);
181 291367 : let mut buf = self.get(key, lsn, ctx).await?;
182 291365 : let nblocks = buf.get_u32_le();
183 291365 :
184 291365 : if latest {
185 168447 : // Update relation size cache only if "latest" flag is set.
186 168447 : // This flag is set by compute when it is working with most recent version of relation.
187 168447 : // Typically master compute node always set latest=true.
188 168447 : // Please notice, that even if compute node "by mistake" specifies old LSN but set
189 168447 : // latest=true, then it can not cause cache corruption, because with latest=true
190 168447 : // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
191 168447 : // associated with most recent value of LSN.
192 168447 : self.update_cached_rel_size(tag, lsn, nblocks);
193 168447 : }
194 291365 : Ok(nblocks)
195 80326760 : }
196 :
197 : /// Does relation exist?
198 75836851 : pub async fn get_rel_exists(
199 75836851 : &self,
200 75836851 : tag: RelTag,
201 75836851 : lsn: Lsn,
202 75836851 : _latest: bool,
203 75836851 : ctx: &RequestContext,
204 75836972 : ) -> Result<bool, PageReconstructError> {
205 75836972 : if tag.relnode == 0 {
206 0 : return Err(PageReconstructError::Other(
207 0 : RelationError::InvalidRelnode.into(),
208 0 : ));
209 75836972 : }
210 :
211 : // first try to lookup relation in cache
212 75836972 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
213 75732261 : return Ok(true);
214 104711 : }
215 104711 : // fetch directory listing
216 104711 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
217 104711 : let buf = self.get(key, lsn, ctx).await?;
218 :
219 104711 : match RelDirectory::des(&buf).context("deserialization failure") {
220 104711 : Ok(dir) => {
221 104711 : let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
222 104711 : Ok(exists)
223 : }
224 0 : Err(e) => Err(PageReconstructError::from(e)),
225 : }
226 75836972 : }
227 :
228 : /// Get a list of all existing relations in given tablespace and database.
229 6669 : pub async fn list_rels(
230 6669 : &self,
231 6669 : spcnode: Oid,
232 6669 : dbnode: Oid,
233 6669 : lsn: Lsn,
234 6669 : ctx: &RequestContext,
235 6670 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
236 6670 : // fetch directory listing
237 6670 : let key = rel_dir_to_key(spcnode, dbnode);
238 6670 : let buf = self.get(key, lsn, ctx).await?;
239 :
240 6670 : match RelDirectory::des(&buf).context("deserialization failure") {
241 6670 : Ok(dir) => {
242 6670 : let rels: HashSet<RelTag> =
243 1567419 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
244 1567419 : spcnode,
245 1567419 : dbnode,
246 1567419 : relnode: *relnode,
247 1567419 : forknum: *forknum,
248 1567419 : }));
249 6670 :
250 6670 : Ok(rels)
251 : }
252 0 : Err(e) => Err(PageReconstructError::from(e)),
253 : }
254 6670 : }
255 :
256 : /// Look up given SLRU page version.
257 6189 : pub async fn get_slru_page_at_lsn(
258 6189 : &self,
259 6189 : kind: SlruKind,
260 6189 : segno: u32,
261 6189 : blknum: BlockNumber,
262 6189 : lsn: Lsn,
263 6189 : ctx: &RequestContext,
264 6189 : ) -> Result<Bytes, PageReconstructError> {
265 6189 : let key = slru_block_to_key(kind, segno, blknum);
266 223121 : self.get(key, lsn, ctx).await
267 6189 : }
268 :
269 : /// Get size of an SLRU segment
270 6036 : pub async fn get_slru_segment_size(
271 6036 : &self,
272 6036 : kind: SlruKind,
273 6036 : segno: u32,
274 6036 : lsn: Lsn,
275 6036 : ctx: &RequestContext,
276 6036 : ) -> Result<BlockNumber, PageReconstructError> {
277 6036 : let key = slru_segment_size_to_key(kind, segno);
278 6036 : let mut buf = self.get(key, lsn, ctx).await?;
279 6036 : Ok(buf.get_u32_le())
280 6036 : }
281 :
282 : /// Get size of an SLRU segment
283 674 : pub async fn get_slru_segment_exists(
284 674 : &self,
285 674 : kind: SlruKind,
286 674 : segno: u32,
287 674 : lsn: Lsn,
288 674 : ctx: &RequestContext,
289 674 : ) -> Result<bool, PageReconstructError> {
290 674 : // fetch directory listing
291 674 : let key = slru_dir_to_key(kind);
292 674 : let buf = self.get(key, lsn, ctx).await?;
293 :
294 674 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
295 674 : Ok(dir) => {
296 674 : let exists = dir.segments.get(&segno).is_some();
297 674 : Ok(exists)
298 : }
299 0 : Err(e) => Err(PageReconstructError::from(e)),
300 : }
301 674 : }
302 :
303 : /// Locate LSN, such that all transactions that committed before
304 : /// 'search_timestamp' are visible, but nothing newer is.
305 : ///
306 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
307 : /// so it's not well defined which LSN you get if there were multiple commits
308 : /// "in flight" at that point in time.
309 : ///
310 203 : pub async fn find_lsn_for_timestamp(
311 203 : &self,
312 203 : search_timestamp: TimestampTz,
313 203 : ctx: &RequestContext,
314 203 : ) -> Result<LsnForTimestamp, PageReconstructError> {
315 203 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
316 203 : let min_lsn = *gc_cutoff_lsn_guard;
317 203 : let max_lsn = self.get_last_record_lsn();
318 203 :
319 203 : // LSNs are always 8-byte aligned. low/mid/high represent the
320 203 : // LSN divided by 8.
321 203 : let mut low = min_lsn.0 / 8;
322 203 : let mut high = max_lsn.0 / 8 + 1;
323 203 :
324 203 : let mut found_smaller = false;
325 203 : let mut found_larger = false;
326 3582 : while low < high {
327 : // cannot overflow, high and low are both smaller than u64::MAX / 2
328 3379 : let mid = (high + low) / 2;
329 :
330 3379 : let cmp = self
331 3379 : .is_latest_commit_timestamp_ge_than(
332 3379 : search_timestamp,
333 3379 : Lsn(mid * 8),
334 3379 : &mut found_smaller,
335 3379 : &mut found_larger,
336 3379 : ctx,
337 3379 : )
338 213673 : .await?;
339 :
340 3379 : if cmp {
341 904 : high = mid;
342 2475 : } else {
343 2475 : low = mid + 1;
344 2475 : }
345 : }
346 203 : match (found_smaller, found_larger) {
347 : (false, false) => {
348 : // This can happen if no commit records have been processed yet, e.g.
349 : // just after importing a cluster.
350 25 : Ok(LsnForTimestamp::NoData(max_lsn))
351 : }
352 : (true, false) => {
353 : // Didn't find any commit timestamps larger than the request
354 75 : Ok(LsnForTimestamp::Future(max_lsn))
355 : }
356 : (false, true) => {
357 : // Didn't find any commit timestamps smaller than the request
358 9 : Ok(LsnForTimestamp::Past(max_lsn))
359 : }
360 : (true, true) => {
361 : // low is the LSN of the first commit record *after* the search_timestamp,
362 : // Back off by one to get to the point just before the commit.
363 : //
364 : // FIXME: it would be better to get the LSN of the previous commit.
365 : // Otherwise, if you restore to the returned LSN, the database will
366 : // include physical changes from later commits that will be marked
367 : // as aborted, and will need to be vacuumed away.
368 94 : Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
369 : }
370 : }
371 203 : }
372 :
373 : ///
374 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
375 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
376 : ///
377 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
378 : /// with a smaller/larger timestamp.
379 : ///
380 3379 : pub async fn is_latest_commit_timestamp_ge_than(
381 3379 : &self,
382 3379 : search_timestamp: TimestampTz,
383 3379 : probe_lsn: Lsn,
384 3379 : found_smaller: &mut bool,
385 3379 : found_larger: &mut bool,
386 3379 : ctx: &RequestContext,
387 3379 : ) -> Result<bool, PageReconstructError> {
388 3379 : for segno in self
389 3379 : .list_slru_segments(SlruKind::Clog, probe_lsn, ctx)
390 1976 : .await?
391 : {
392 3379 : let nblocks = self
393 3379 : .get_slru_segment_size(SlruKind::Clog, segno, probe_lsn, ctx)
394 1520 : .await?;
395 3552 : for blknum in (0..nblocks).rev() {
396 3552 : let clog_page = self
397 3552 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
398 210177 : .await?;
399 :
400 3552 : if clog_page.len() == BLCKSZ as usize + 8 {
401 3323 : let mut timestamp_bytes = [0u8; 8];
402 3323 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
403 3323 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
404 3323 :
405 3323 : if timestamp >= search_timestamp {
406 904 : *found_larger = true;
407 904 : return Ok(true);
408 2419 : } else {
409 2419 : *found_smaller = true;
410 2419 : }
411 229 : }
412 : }
413 : }
414 2475 : Ok(false)
415 3379 : }
416 :
417 : /// Get a list of SLRU segments
418 5364 : pub async fn list_slru_segments(
419 5364 : &self,
420 5364 : kind: SlruKind,
421 5364 : lsn: Lsn,
422 5364 : ctx: &RequestContext,
423 5364 : ) -> Result<HashSet<u32>, PageReconstructError> {
424 5364 : // fetch directory entry
425 5364 : let key = slru_dir_to_key(kind);
426 :
427 5364 : let buf = self.get(key, lsn, ctx).await?;
428 5363 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
429 5363 : Ok(dir) => Ok(dir.segments),
430 0 : Err(e) => Err(PageReconstructError::from(e)),
431 : }
432 5364 : }
433 :
434 2666 : pub async fn get_relmap_file(
435 2666 : &self,
436 2666 : spcnode: Oid,
437 2666 : dbnode: Oid,
438 2666 : lsn: Lsn,
439 2666 : ctx: &RequestContext,
440 2666 : ) -> Result<Bytes, PageReconstructError> {
441 2666 : let key = relmap_file_key(spcnode, dbnode);
442 :
443 2666 : let buf = self.get(key, lsn, ctx).await?;
444 2666 : Ok(buf)
445 2666 : }
446 :
447 661 : pub async fn list_dbdirs(
448 661 : &self,
449 661 : lsn: Lsn,
450 661 : ctx: &RequestContext,
451 661 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
452 : // fetch directory entry
453 661 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
454 :
455 661 : match DbDirectory::des(&buf).context("deserialization failure") {
456 661 : Ok(dir) => Ok(dir.dbdirs),
457 0 : Err(e) => Err(PageReconstructError::from(e)),
458 : }
459 661 : }
460 :
461 2 : pub async fn get_twophase_file(
462 2 : &self,
463 2 : xid: TransactionId,
464 2 : lsn: Lsn,
465 2 : ctx: &RequestContext,
466 2 : ) -> Result<Bytes, PageReconstructError> {
467 2 : let key = twophase_file_key(xid);
468 2 : let buf = self.get(key, lsn, ctx).await?;
469 2 : Ok(buf)
470 2 : }
471 :
472 661 : pub async fn list_twophase_files(
473 661 : &self,
474 661 : lsn: Lsn,
475 661 : ctx: &RequestContext,
476 661 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
477 : // fetch directory entry
478 661 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
479 :
480 661 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
481 661 : Ok(dir) => Ok(dir.xids),
482 0 : Err(e) => Err(PageReconstructError::from(e)),
483 : }
484 661 : }
485 :
486 660 : pub async fn get_control_file(
487 660 : &self,
488 660 : lsn: Lsn,
489 660 : ctx: &RequestContext,
490 660 : ) -> Result<Bytes, PageReconstructError> {
491 660 : self.get(CONTROLFILE_KEY, lsn, ctx).await
492 660 : }
493 :
494 2059 : pub async fn get_checkpoint(
495 2059 : &self,
496 2059 : lsn: Lsn,
497 2059 : ctx: &RequestContext,
498 2059 : ) -> Result<Bytes, PageReconstructError> {
499 2059 : self.get(CHECKPOINT_KEY, lsn, ctx).await
500 2059 : }
501 :
502 : /// Does the same as get_current_logical_size but counted on demand.
503 : /// Used to initialize the logical size tracking on startup.
504 : ///
505 : /// Only relation blocks are counted currently. That excludes metadata,
506 : /// SLRUs, twophase files etc.
507 421 : pub async fn get_current_logical_size_non_incremental(
508 421 : &self,
509 421 : lsn: Lsn,
510 421 : cancel: CancellationToken,
511 421 : ctx: &RequestContext,
512 421 : ) -> Result<u64, CalculateLogicalSizeError> {
513 421 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
514 :
515 : // Fetch list of database dirs and iterate them
516 421 : let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
517 414 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
518 :
519 414 : let mut total_size: u64 = 0;
520 1660 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
521 387109 : for rel in self
522 1660 : .list_rels(*spcnode, *dbnode, lsn, ctx)
523 891 : .await
524 1659 : .context("list rels")?
525 : {
526 387109 : if cancel.is_cancelled() {
527 1 : return Err(CalculateLogicalSizeError::Cancelled);
528 387108 : }
529 387108 : let relsize_key = rel_size_to_key(rel);
530 387108 : let mut buf = self
531 387108 : .get(relsize_key, lsn, ctx)
532 140355 : .await
533 387108 : .with_context(|| format!("read relation size of {rel:?}"))?;
534 387107 : let relsize = buf.get_u32_le();
535 387107 :
536 387107 : total_size += relsize as u64;
537 : }
538 : }
539 411 : Ok(total_size * BLCKSZ as u64)
540 417 : }
541 :
542 : ///
543 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
544 : /// Anything that's not listed maybe removed from the underlying storage (from
545 : /// that LSN forwards).
546 666 : pub async fn collect_keyspace(
547 666 : &self,
548 666 : lsn: Lsn,
549 666 : ctx: &RequestContext,
550 666 : ) -> anyhow::Result<KeySpace> {
551 666 : // Iterate through key ranges, greedily packing them into partitions
552 666 : let mut result = KeySpaceAccum::new();
553 666 :
554 666 : // The dbdir metadata always exists
555 666 : result.add_key(DBDIR_KEY);
556 :
557 : // Fetch list of database dirs and iterate them
558 989 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
559 666 : let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
560 :
561 666 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
562 666 : dbs.sort_unstable();
563 2999 : for (spcnode, dbnode) in dbs {
564 2335 : result.add_key(relmap_file_key(spcnode, dbnode));
565 2335 : result.add_key(rel_dir_to_key(spcnode, dbnode));
566 :
567 2335 : let mut rels: Vec<RelTag> = self
568 2335 : .list_rels(spcnode, dbnode, lsn, ctx)
569 2284 : .await?
570 2335 : .into_iter()
571 2335 : .collect();
572 2335 : rels.sort_unstable();
573 554188 : for rel in rels {
574 551855 : let relsize_key = rel_size_to_key(rel);
575 551855 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
576 551853 : let relsize = buf.get_u32_le();
577 551853 :
578 551853 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
579 551853 : result.add_key(relsize_key);
580 : }
581 : }
582 :
583 : // Iterate SLRUs next
584 1992 : for kind in [
585 664 : SlruKind::Clog,
586 664 : SlruKind::MultiXactMembers,
587 664 : SlruKind::MultiXactOffsets,
588 : ] {
589 1992 : let slrudir_key = slru_dir_to_key(kind);
590 1992 : result.add_key(slrudir_key);
591 1992 : let buf = self.get(slrudir_key, lsn, ctx).await?;
592 1992 : let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
593 1992 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
594 1992 : segments.sort_unstable();
595 3740 : for segno in segments {
596 1748 : let segsize_key = slru_segment_size_to_key(kind, segno);
597 1748 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
598 1748 : let segsize = buf.get_u32_le();
599 1748 :
600 1748 : result.add_range(
601 1748 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
602 1748 : );
603 1748 : result.add_key(segsize_key);
604 : }
605 : }
606 :
607 : // Then pg_twophase
608 664 : result.add_key(TWOPHASEDIR_KEY);
609 664 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
610 664 : let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
611 664 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
612 664 : xids.sort_unstable();
613 664 : for xid in xids {
614 0 : result.add_key(twophase_file_key(xid));
615 0 : }
616 :
617 664 : result.add_key(CONTROLFILE_KEY);
618 664 : result.add_key(CHECKPOINT_KEY);
619 664 :
620 664 : Ok(result.to_keyspace())
621 664 : }
622 :
623 : /// Get cached size of relation if it not updated after specified LSN
624 156163490 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
625 156163490 : let rel_size_cache = self.rel_size_cache.read().unwrap();
626 156163490 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
627 155919845 : if lsn >= *cached_lsn {
628 155767412 : return Some(*nblocks);
629 152433 : }
630 243645 : }
631 396078 : None
632 156163490 : }
633 :
634 : /// Update cached relation size if there is no more recent update
635 168447 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
636 168447 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
637 168447 : match rel_size_cache.entry(tag) {
638 148122 : hash_map::Entry::Occupied(mut entry) => {
639 148122 : let cached_lsn = entry.get_mut();
640 148122 : if lsn >= cached_lsn.0 {
641 3 : *cached_lsn = (lsn, nblocks);
642 148119 : }
643 : }
644 20325 : hash_map::Entry::Vacant(entry) => {
645 20325 : entry.insert((lsn, nblocks));
646 20325 : }
647 : }
648 168447 : }
649 :
650 : /// Store cached relation size
651 1728353 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
652 1728353 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
653 1728353 : rel_size_cache.insert(tag, (lsn, nblocks));
654 1728353 : }
655 :
656 : /// Remove cached relation size
657 17781 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
658 17781 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
659 17781 : rel_size_cache.remove(tag);
660 17781 : }
661 : }
662 :
663 : /// DatadirModification represents an operation to ingest an atomic set of
664 : /// updates to the repository. It is created by the 'begin_record'
665 : /// function. It is called for each WAL record, so that all the modifications
666 : /// by a one WAL record appear atomic.
667 : pub struct DatadirModification<'a> {
668 : /// The timeline this modification applies to. You can access this to
669 : /// read the state, but note that any pending updates are *not* reflected
670 : /// in the state in 'tline' yet.
671 : pub tline: &'a Timeline,
672 :
673 : /// Lsn assigned by begin_modification
674 : pub lsn: Lsn,
675 :
676 : // The modifications are not applied directly to the underlying key-value store.
677 : // The put-functions add the modifications here, and they are flushed to the
678 : // underlying key-value store by the 'finish' function.
679 : pending_updates: HashMap<Key, Value>,
680 : pending_deletions: Vec<Range<Key>>,
681 : pending_nblocks: i64,
682 : }
683 :
684 : impl<'a> DatadirModification<'a> {
685 : /// Initialize a completely new repository.
686 : ///
687 : /// This inserts the directory metadata entries that are assumed to
688 : /// always exist.
689 682 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
690 682 : let buf = DbDirectory::ser(&DbDirectory {
691 682 : dbdirs: HashMap::new(),
692 682 : })?;
693 682 : self.put(DBDIR_KEY, Value::Image(buf.into()));
694 :
695 682 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
696 682 : xids: HashSet::new(),
697 682 : })?;
698 682 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
699 :
700 682 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
701 682 : let empty_dir = Value::Image(buf);
702 682 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
703 682 : self.put(
704 682 : slru_dir_to_key(SlruKind::MultiXactMembers),
705 682 : empty_dir.clone(),
706 682 : );
707 682 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
708 682 :
709 682 : Ok(())
710 682 : }
711 :
712 : #[cfg(test)]
713 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
714 34 : self.init_empty()?;
715 34 : self.put_control_file(bytes::Bytes::from_static(
716 34 : b"control_file contents do not matter",
717 34 : ))
718 34 : .context("put_control_file")?;
719 34 : self.put_checkpoint(bytes::Bytes::from_static(
720 34 : b"checkpoint_file contents do not matter",
721 34 : ))
722 34 : .context("put_checkpoint_file")?;
723 34 : Ok(())
724 34 : }
725 :
726 : /// Put a new page version that can be constructed from a WAL record
727 : ///
728 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
729 : /// current end-of-file. It's up to the caller to check that the relation size
730 : /// matches the blocks inserted!
731 75311843 : pub fn put_rel_wal_record(
732 75311843 : &mut self,
733 75311843 : rel: RelTag,
734 75311843 : blknum: BlockNumber,
735 75311843 : rec: NeonWalRecord,
736 75311843 : ) -> anyhow::Result<()> {
737 75311843 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
738 75311843 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
739 75311843 : Ok(())
740 75311843 : }
741 :
742 : // Same, but for an SLRU.
743 2313430 : pub fn put_slru_wal_record(
744 2313430 : &mut self,
745 2313430 : kind: SlruKind,
746 2313430 : segno: u32,
747 2313430 : blknum: BlockNumber,
748 2313430 : rec: NeonWalRecord,
749 2313430 : ) -> anyhow::Result<()> {
750 2313430 : self.put(
751 2313430 : slru_block_to_key(kind, segno, blknum),
752 2313430 : Value::WalRecord(rec),
753 2313430 : );
754 2313430 : Ok(())
755 2313430 : }
756 :
757 : /// Like put_wal_record, but with ready-made image of the page.
758 2530223 : pub fn put_rel_page_image(
759 2530223 : &mut self,
760 2530223 : rel: RelTag,
761 2530223 : blknum: BlockNumber,
762 2530223 : img: Bytes,
763 2530223 : ) -> anyhow::Result<()> {
764 2530223 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
765 2530223 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
766 2530223 : Ok(())
767 2530223 : }
768 :
769 2615 : pub fn put_slru_page_image(
770 2615 : &mut self,
771 2615 : kind: SlruKind,
772 2615 : segno: u32,
773 2615 : blknum: BlockNumber,
774 2615 : img: Bytes,
775 2615 : ) -> anyhow::Result<()> {
776 2615 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
777 2615 : Ok(())
778 2615 : }
779 :
780 : /// Store a relmapper file (pg_filenode.map) in the repository
781 2649 : pub async fn put_relmap_file(
782 2649 : &mut self,
783 2649 : spcnode: Oid,
784 2649 : dbnode: Oid,
785 2649 : img: Bytes,
786 2649 : ctx: &RequestContext,
787 2649 : ) -> anyhow::Result<()> {
788 : // Add it to the directory (if it doesn't exist already)
789 2649 : let buf = self.get(DBDIR_KEY, ctx).await?;
790 2649 : let mut dbdir = DbDirectory::des(&buf)?;
791 :
792 2649 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
793 2649 : if r.is_none() || r == Some(false) {
794 2604 : // The dbdir entry didn't exist, or it contained a
795 2604 : // 'false'. The 'insert' call already updated it with
796 2604 : // 'true', now write the updated 'dbdirs' map back.
797 2604 : let buf = DbDirectory::ser(&dbdir)?;
798 2604 : self.put(DBDIR_KEY, Value::Image(buf.into()));
799 45 : }
800 2649 : if r.is_none() {
801 20 : // Create RelDirectory
802 20 : let buf = RelDirectory::ser(&RelDirectory {
803 20 : rels: HashSet::new(),
804 20 : })?;
805 20 : self.put(
806 20 : rel_dir_to_key(spcnode, dbnode),
807 20 : Value::Image(Bytes::from(buf)),
808 20 : );
809 2629 : }
810 :
811 2649 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
812 2649 : Ok(())
813 2649 : }
814 :
815 4 : pub async fn put_twophase_file(
816 4 : &mut self,
817 4 : xid: TransactionId,
818 4 : img: Bytes,
819 4 : ctx: &RequestContext,
820 4 : ) -> anyhow::Result<()> {
821 : // Add it to the directory entry
822 4 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
823 4 : let mut dir = TwoPhaseDirectory::des(&buf)?;
824 4 : if !dir.xids.insert(xid) {
825 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
826 4 : }
827 4 : self.put(
828 4 : TWOPHASEDIR_KEY,
829 4 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
830 : );
831 :
832 4 : self.put(twophase_file_key(xid), Value::Image(img));
833 4 : Ok(())
834 4 : }
835 :
836 680 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
837 680 : self.put(CONTROLFILE_KEY, Value::Image(img));
838 680 : Ok(())
839 680 : }
840 :
841 29078 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
842 29078 : self.put(CHECKPOINT_KEY, Value::Image(img));
843 29078 : Ok(())
844 29078 : }
845 :
846 2 : pub async fn drop_dbdir(
847 2 : &mut self,
848 2 : spcnode: Oid,
849 2 : dbnode: Oid,
850 2 : ctx: &RequestContext,
851 2 : ) -> anyhow::Result<()> {
852 2 : let req_lsn = self.tline.get_last_record_lsn();
853 :
854 2 : let total_blocks = self
855 2 : .tline
856 2 : .get_db_size(spcnode, dbnode, req_lsn, true, ctx)
857 0 : .await?;
858 :
859 : // Remove entry from dbdir
860 2 : let buf = self.get(DBDIR_KEY, ctx).await?;
861 2 : let mut dir = DbDirectory::des(&buf)?;
862 2 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
863 2 : let buf = DbDirectory::ser(&dir)?;
864 2 : self.put(DBDIR_KEY, Value::Image(buf.into()));
865 : } else {
866 0 : warn!(
867 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
868 0 : spcnode, dbnode
869 0 : );
870 : }
871 :
872 : // Update logical database size.
873 2 : self.pending_nblocks -= total_blocks as i64;
874 2 :
875 2 : // Delete all relations and metadata files for the spcnode/dnode
876 2 : self.delete(dbdir_key_range(spcnode, dbnode));
877 2 : Ok(())
878 2 : }
879 :
880 : /// Create a relation fork.
881 : ///
882 : /// 'nblocks' is the initial size.
883 630094 : pub async fn put_rel_creation(
884 630094 : &mut self,
885 630094 : rel: RelTag,
886 630094 : nblocks: BlockNumber,
887 630094 : ctx: &RequestContext,
888 630094 : ) -> Result<(), RelationError> {
889 630094 : if rel.relnode == 0 {
890 0 : return Err(RelationError::InvalidRelnode);
891 630094 : }
892 : // It's possible that this is the first rel for this db in this
893 : // tablespace. Create the reldir entry for it if so.
894 630094 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
895 630094 : .context("deserialize db")?;
896 630094 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
897 630094 : let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
898 : // Didn't exist. Update dbdir
899 2585 : dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
900 2585 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
901 2585 : self.put(DBDIR_KEY, Value::Image(buf.into()));
902 2585 :
903 2585 : // and create the RelDirectory
904 2585 : RelDirectory::default()
905 : } else {
906 : // reldir already exists, fetch it
907 627509 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
908 627509 : .context("deserialize db")?
909 : };
910 :
911 : // Add the new relation to the rel directory entry, and write it back
912 630094 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
913 0 : return Err(RelationError::AlreadyExists);
914 630094 : }
915 630094 : self.put(
916 630094 : rel_dir_key,
917 630094 : Value::Image(Bytes::from(
918 630094 : RelDirectory::ser(&rel_dir).context("serialize")?,
919 : )),
920 : );
921 :
922 : // Put size
923 630094 : let size_key = rel_size_to_key(rel);
924 630094 : let buf = nblocks.to_le_bytes();
925 630094 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
926 630094 :
927 630094 : self.pending_nblocks += nblocks as i64;
928 630094 :
929 630094 : // Update relation size cache
930 630094 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
931 630094 :
932 630094 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
933 630094 : // caller.
934 630094 : Ok(())
935 630094 : }
936 :
937 : /// Truncate relation
938 3095 : pub async fn put_rel_truncation(
939 3095 : &mut self,
940 3095 : rel: RelTag,
941 3095 : nblocks: BlockNumber,
942 3095 : ctx: &RequestContext,
943 3095 : ) -> anyhow::Result<()> {
944 3095 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
945 3095 : let last_lsn = self.tline.get_last_record_lsn();
946 3095 : if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
947 3095 : let size_key = rel_size_to_key(rel);
948 : // Fetch the old size first
949 3095 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
950 3095 :
951 3095 : // Update the entry with the new size.
952 3095 : let buf = nblocks.to_le_bytes();
953 3095 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
954 3095 :
955 3095 : // Update relation size cache
956 3095 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
957 3095 :
958 3095 : // Update relation size cache
959 3095 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
960 3095 :
961 3095 : // Update logical database size.
962 3095 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
963 0 : }
964 3095 : Ok(())
965 3095 : }
966 :
967 : /// Extend relation
968 : /// If new size is smaller, do nothing.
969 1694438 : pub async fn put_rel_extend(
970 1694438 : &mut self,
971 1694438 : rel: RelTag,
972 1694438 : nblocks: BlockNumber,
973 1694438 : ctx: &RequestContext,
974 1694438 : ) -> anyhow::Result<()> {
975 1694438 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
976 :
977 : // Put size
978 1694438 : let size_key = rel_size_to_key(rel);
979 1694438 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
980 1694438 :
981 1694438 : // only extend relation here. never decrease the size
982 1694438 : if nblocks > old_size {
983 1092069 : let buf = nblocks.to_le_bytes();
984 1092069 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
985 1092069 :
986 1092069 : // Update relation size cache
987 1092069 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
988 1092069 :
989 1092069 : self.pending_nblocks += nblocks as i64 - old_size as i64;
990 1092069 : }
991 1694438 : Ok(())
992 1694438 : }
993 :
994 : /// Drop a relation.
995 17781 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
996 17781 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
997 :
998 : // Remove it from the directory entry
999 17781 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1000 17781 : let buf = self.get(dir_key, ctx).await?;
1001 17781 : let mut dir = RelDirectory::des(&buf)?;
1002 :
1003 17781 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1004 17781 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1005 : } else {
1006 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1007 : }
1008 :
1009 : // update logical size
1010 17781 : let size_key = rel_size_to_key(rel);
1011 17781 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1012 17781 : self.pending_nblocks -= old_size as i64;
1013 17781 :
1014 17781 : // Remove enty from relation size cache
1015 17781 : self.tline.remove_cached_rel_size(&rel);
1016 17781 :
1017 17781 : // Delete size entry, as well as all blocks
1018 17781 : self.delete(rel_key_range(rel));
1019 17781 :
1020 17781 : Ok(())
1021 17781 : }
1022 :
1023 1959 : pub async fn put_slru_segment_creation(
1024 1959 : &mut self,
1025 1959 : kind: SlruKind,
1026 1959 : segno: u32,
1027 1959 : nblocks: BlockNumber,
1028 1959 : ctx: &RequestContext,
1029 1959 : ) -> anyhow::Result<()> {
1030 1959 : // Add it to the directory entry
1031 1959 : let dir_key = slru_dir_to_key(kind);
1032 1959 : let buf = self.get(dir_key, ctx).await?;
1033 1959 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1034 :
1035 1959 : if !dir.segments.insert(segno) {
1036 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1037 1959 : }
1038 1959 : self.put(
1039 1959 : dir_key,
1040 1959 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1041 : );
1042 :
1043 : // Put size
1044 1959 : let size_key = slru_segment_size_to_key(kind, segno);
1045 1959 : let buf = nblocks.to_le_bytes();
1046 1959 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1047 1959 :
1048 1959 : // even if nblocks > 0, we don't insert any actual blocks here
1049 1959 :
1050 1959 : Ok(())
1051 1959 : }
1052 :
1053 : /// Extend SLRU segment
1054 668 : pub fn put_slru_extend(
1055 668 : &mut self,
1056 668 : kind: SlruKind,
1057 668 : segno: u32,
1058 668 : nblocks: BlockNumber,
1059 668 : ) -> anyhow::Result<()> {
1060 668 : // Put size
1061 668 : let size_key = slru_segment_size_to_key(kind, segno);
1062 668 : let buf = nblocks.to_le_bytes();
1063 668 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1064 668 : Ok(())
1065 668 : }
1066 :
1067 : /// This method is used for marking truncated SLRU files
1068 9 : pub async fn drop_slru_segment(
1069 9 : &mut self,
1070 9 : kind: SlruKind,
1071 9 : segno: u32,
1072 9 : ctx: &RequestContext,
1073 9 : ) -> anyhow::Result<()> {
1074 9 : // Remove it from the directory entry
1075 9 : let dir_key = slru_dir_to_key(kind);
1076 9 : let buf = self.get(dir_key, ctx).await?;
1077 9 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1078 :
1079 9 : if !dir.segments.remove(&segno) {
1080 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1081 9 : }
1082 : self.put(
1083 9 : dir_key,
1084 9 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1085 : );
1086 :
1087 : // Delete size entry, as well as all blocks
1088 9 : self.delete(slru_segment_key_range(kind, segno));
1089 9 :
1090 9 : Ok(())
1091 9 : }
1092 :
1093 : /// Drop a relmapper file (pg_filenode.map)
1094 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1095 0 : // TODO
1096 0 : Ok(())
1097 0 : }
1098 :
1099 : /// This method is used for marking truncated SLRU files
1100 2 : pub async fn drop_twophase_file(
1101 2 : &mut self,
1102 2 : xid: TransactionId,
1103 2 : ctx: &RequestContext,
1104 2 : ) -> anyhow::Result<()> {
1105 : // Remove it from the directory entry
1106 2 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1107 2 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1108 :
1109 2 : if !dir.xids.remove(&xid) {
1110 0 : warn!("twophase file for xid {} does not exist", xid);
1111 2 : }
1112 : self.put(
1113 : TWOPHASEDIR_KEY,
1114 2 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1115 : );
1116 :
1117 : // Delete it
1118 2 : self.delete(twophase_key_range(xid));
1119 2 :
1120 2 : Ok(())
1121 2 : }
1122 :
1123 : ///
1124 : /// Flush changes accumulated so far to the underlying repository.
1125 : ///
1126 : /// Usually, changes made in DatadirModification are atomic, but this allows
1127 : /// you to flush them to the underlying repository before the final `commit`.
1128 : /// That allows to free up the memory used to hold the pending changes.
1129 : ///
1130 : /// Currently only used during bulk import of a data directory. In that
1131 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1132 : /// whole import fails and the timeline will be deleted anyway.
1133 : /// (Or to be precise, it will be left behind for debugging purposes and
1134 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1135 : ///
1136 : /// Note: A consequence of flushing the pending operations is that they
1137 : /// won't be visible to subsequent operations until `commit`. The function
1138 : /// retains all the metadata, but data pages are flushed. That's again OK
1139 : /// for bulk import, where you are just loading data pages and won't try to
1140 : /// modify the same pages twice.
1141 614019 : pub async fn flush(&mut self) -> anyhow::Result<()> {
1142 614019 : // Unless we have accumulated a decent amount of changes, it's not worth it
1143 614019 : // to scan through the pending_updates list.
1144 614019 : let pending_nblocks = self.pending_nblocks;
1145 614019 : if pending_nblocks < 10000 {
1146 614019 : return Ok(());
1147 0 : }
1148 :
1149 0 : let writer = self.tline.writer().await;
1150 :
1151 : // Flush relation and SLRU data blocks, keep metadata.
1152 0 : let mut retained_pending_updates = HashMap::new();
1153 0 : for (key, value) in self.pending_updates.drain() {
1154 0 : if is_rel_block_key(key) || is_slru_block_key(key) {
1155 : // This bails out on first error without modifying pending_updates.
1156 : // That's Ok, cf this function's doc comment.
1157 0 : writer.put(key, self.lsn, &value).await?;
1158 0 : } else {
1159 0 : retained_pending_updates.insert(key, value);
1160 0 : }
1161 : }
1162 0 : self.pending_updates.extend(retained_pending_updates);
1163 0 :
1164 0 : if pending_nblocks != 0 {
1165 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1166 0 : self.pending_nblocks = 0;
1167 0 : }
1168 :
1169 0 : Ok(())
1170 614019 : }
1171 :
1172 : ///
1173 : /// Finish this atomic update, writing all the updated keys to the
1174 : /// underlying timeline.
1175 : /// All the modifications in this atomic update are stamped by the specified LSN.
1176 : ///
1177 73661491 : pub async fn commit(&mut self) -> anyhow::Result<()> {
1178 73661518 : let writer = self.tline.writer().await;
1179 73661518 : let lsn = self.lsn;
1180 73661518 : let pending_nblocks = self.pending_nblocks;
1181 73661518 : self.pending_nblocks = 0;
1182 :
1183 81939061 : for (key, value) in self.pending_updates.drain() {
1184 81939061 : writer.put(key, lsn, &value).await?;
1185 : }
1186 73661508 : for key_range in self.pending_deletions.drain(..) {
1187 17794 : writer.delete(key_range, lsn).await?;
1188 : }
1189 :
1190 73661508 : writer.finish_write(lsn);
1191 73661508 :
1192 73661508 : if pending_nblocks != 0 {
1193 1094354 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1194 72567154 : }
1195 :
1196 73661508 : Ok(())
1197 73661508 : }
1198 :
1199 : // Internal helper functions to batch the modifications
1200 :
1201 2995323 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1202 : // Have we already updated the same key? Read the pending updated
1203 : // version in that case.
1204 : //
1205 : // Note: we don't check pending_deletions. It is an error to request a
1206 : // value that has been removed, deletion only avoids leaking storage.
1207 2995323 : if let Some(value) = self.pending_updates.get(&key) {
1208 1836929 : if let Value::Image(img) = value {
1209 1836929 : Ok(img.clone())
1210 : } else {
1211 : // Currently, we never need to read back a WAL record that we
1212 : // inserted in the same "transaction". All the metadata updates
1213 : // work directly with Images, and we never need to read actual
1214 : // data pages. We could handle this if we had to, by calling
1215 : // the walredo manager, but let's keep it simple for now.
1216 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1217 0 : "unexpected pending WAL record"
1218 0 : )))
1219 : }
1220 : } else {
1221 1158394 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1222 1158394 : self.tline.get(key, lsn, ctx).await
1223 : }
1224 2995323 : }
1225 :
1226 82576877 : fn put(&mut self, key: Key, val: Value) {
1227 82576877 : self.pending_updates.insert(key, val);
1228 82576877 : }
1229 :
1230 17794 : fn delete(&mut self, key_range: Range<Key>) {
1231 17794 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1232 17794 : self.pending_deletions.push(key_range);
1233 17794 : }
1234 : }
1235 :
1236 : //--- Metadata structs stored in key-value pairs in the repository.
1237 :
1238 634486 : #[derive(Debug, Serialize, Deserialize)]
1239 : struct DbDirectory {
1240 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1241 : dbdirs: HashMap<(Oid, Oid), bool>,
1242 : }
1243 :
1244 1376 : #[derive(Debug, Serialize, Deserialize)]
1245 : struct TwoPhaseDirectory {
1246 : xids: HashSet<TransactionId>,
1247 : }
1248 :
1249 1295790 : #[derive(Debug, Serialize, Deserialize, Default)]
1250 : struct RelDirectory {
1251 : // Set of relations that exist. (relfilenode, forknum)
1252 : //
1253 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1254 : // key-value pairs, if you have a lot of relations
1255 : rels: HashSet<(Oid, u8)>,
1256 : }
1257 :
1258 0 : #[derive(Debug, Serialize, Deserialize)]
1259 : struct RelSizeEntry {
1260 : nblocks: u32,
1261 : }
1262 :
1263 9997 : #[derive(Debug, Serialize, Deserialize, Default)]
1264 : struct SlruSegmentDirectory {
1265 : // Set of SLRU segments that exist.
1266 : segments: HashSet<u32>,
1267 : }
1268 :
1269 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1270 :
1271 : // Layout of the Key address space
1272 : //
1273 : // The Key struct, used to address the underlying key-value store, consists of
1274 : // 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
1275 : // all the data and metadata keys into those 18 bytes.
1276 : //
1277 : // Principles for the mapping:
1278 : //
1279 : // - Things that are often accessed or modified together, should be close to
1280 : // each other in the key space. For example, if a relation is extended by one
1281 : // block, we create a new key-value pair for the block data, and update the
1282 : // relation size entry. Because of that, the RelSize key comes after all the
1283 : // RelBlocks of a relation: the RelSize and the last RelBlock are always next
1284 : // to each other.
1285 : //
1286 : // The key space is divided into four major sections, identified by the first
1287 : // byte, and the form a hierarchy:
1288 : //
1289 : // 00 Relation data and metadata
1290 : //
1291 : // DbDir () -> (dbnode, spcnode)
1292 : // Filenodemap
1293 : // RelDir -> relnode forknum
1294 : // RelBlocks
1295 : // RelSize
1296 : //
1297 : // 01 SLRUs
1298 : //
1299 : // SlruDir kind
1300 : // SlruSegBlocks segno
1301 : // SlruSegSize
1302 : //
1303 : // 02 pg_twophase
1304 : //
1305 : // 03 misc
1306 : // controlfile
1307 : // checkpoint
1308 : // pg_version
1309 : //
1310 : // Below is a full list of the keyspace allocation:
1311 : //
1312 : // DbDir:
1313 : // 00 00000000 00000000 00000000 00 00000000
1314 : //
1315 : // Filenodemap:
1316 : // 00 SPCNODE DBNODE 00000000 00 00000000
1317 : //
1318 : // RelDir:
1319 : // 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
1320 : //
1321 : // RelBlock:
1322 : // 00 SPCNODE DBNODE RELNODE FORK BLKNUM
1323 : //
1324 : // RelSize:
1325 : // 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
1326 : //
1327 : // SlruDir:
1328 : // 01 kind 00000000 00000000 00 00000000
1329 : //
1330 : // SlruSegBlock:
1331 : // 01 kind 00000001 SEGNO 00 BLKNUM
1332 : //
1333 : // SlruSegSize:
1334 : // 01 kind 00000001 SEGNO 00 FFFFFFFF
1335 : //
1336 : // TwoPhaseDir:
1337 : // 02 00000000 00000000 00000000 00 00000000
1338 : //
1339 : // TwoPhaseFile:
1340 : // 02 00000000 00000000 00000000 00 XID
1341 : //
1342 : // ControlFile:
1343 : // 03 00000000 00000000 00000000 00 00000000
1344 : //
1345 : // Checkpoint:
1346 : // 03 00000000 00000000 00000000 00 00000001
1347 : //-- Section 01: relation data and metadata
1348 :
1349 : const DBDIR_KEY: Key = Key {
1350 : field1: 0x00,
1351 : field2: 0,
1352 : field3: 0,
1353 : field4: 0,
1354 : field5: 0,
1355 : field6: 0,
1356 : };
1357 :
1358 2 : fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
1359 2 : Key {
1360 2 : field1: 0x00,
1361 2 : field2: spcnode,
1362 2 : field3: dbnode,
1363 2 : field4: 0,
1364 2 : field5: 0,
1365 2 : field6: 0,
1366 2 : }..Key {
1367 2 : field1: 0x00,
1368 2 : field2: spcnode,
1369 2 : field3: dbnode,
1370 2 : field4: 0xffffffff,
1371 2 : field5: 0xff,
1372 2 : field6: 0xffffffff,
1373 2 : }
1374 2 : }
1375 :
1376 7650 : fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
1377 7650 : Key {
1378 7650 : field1: 0x00,
1379 7650 : field2: spcnode,
1380 7650 : field3: dbnode,
1381 7650 : field4: 0,
1382 7650 : field5: 0,
1383 7650 : field6: 0,
1384 7650 : }
1385 7650 : }
1386 :
1387 761611 : fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
1388 761611 : Key {
1389 761611 : field1: 0x00,
1390 761611 : field2: spcnode,
1391 761611 : field3: dbnode,
1392 761611 : field4: 0,
1393 761611 : field5: 0,
1394 761611 : field6: 1,
1395 761611 : }
1396 761611 : }
1397 :
1398 83369133 : fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
1399 83369133 : Key {
1400 83369133 : field1: 0x00,
1401 83369133 : field2: rel.spcnode,
1402 83369133 : field3: rel.dbnode,
1403 83369133 : field4: rel.relnode,
1404 83369133 : field5: rel.forknum,
1405 83369133 : field6: blknum,
1406 83369133 : }
1407 83369133 : }
1408 :
1409 3575738 : fn rel_size_to_key(rel: RelTag) -> Key {
1410 3575738 : Key {
1411 3575738 : field1: 0x00,
1412 3575738 : field2: rel.spcnode,
1413 3575738 : field3: rel.dbnode,
1414 3575738 : field4: rel.relnode,
1415 3575738 : field5: rel.forknum,
1416 3575738 : field6: 0xffffffff,
1417 3575738 : }
1418 3575738 : }
1419 :
1420 17781 : fn rel_key_range(rel: RelTag) -> Range<Key> {
1421 17781 : Key {
1422 17781 : field1: 0x00,
1423 17781 : field2: rel.spcnode,
1424 17781 : field3: rel.dbnode,
1425 17781 : field4: rel.relnode,
1426 17781 : field5: rel.forknum,
1427 17781 : field6: 0,
1428 17781 : }..Key {
1429 17781 : field1: 0x00,
1430 17781 : field2: rel.spcnode,
1431 17781 : field3: rel.dbnode,
1432 17781 : field4: rel.relnode,
1433 17781 : field5: rel.forknum + 1,
1434 17781 : field6: 0,
1435 17781 : }
1436 17781 : }
1437 :
1438 : //-- Section 02: SLRUs
1439 :
1440 12044 : fn slru_dir_to_key(kind: SlruKind) -> Key {
1441 12044 : Key {
1442 12044 : field1: 0x01,
1443 12044 : field2: match kind {
1444 6422 : SlruKind::Clog => 0x00,
1445 2954 : SlruKind::MultiXactMembers => 0x01,
1446 2668 : SlruKind::MultiXactOffsets => 0x02,
1447 : },
1448 : field3: 0,
1449 : field4: 0,
1450 : field5: 0,
1451 : field6: 0,
1452 : }
1453 12044 : }
1454 :
1455 2325730 : fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
1456 2325730 : Key {
1457 2325730 : field1: 0x01,
1458 2325730 : field2: match kind {
1459 2271547 : SlruKind::Clog => 0x00,
1460 27652 : SlruKind::MultiXactMembers => 0x01,
1461 26531 : SlruKind::MultiXactOffsets => 0x02,
1462 : },
1463 : field3: 1,
1464 2325730 : field4: segno,
1465 2325730 : field5: 0,
1466 2325730 : field6: blknum,
1467 2325730 : }
1468 2325730 : }
1469 :
1470 10411 : fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
1471 10411 : Key {
1472 10411 : field1: 0x01,
1473 10411 : field2: match kind {
1474 6005 : SlruKind::Clog => 0x00,
1475 2493 : SlruKind::MultiXactMembers => 0x01,
1476 1913 : SlruKind::MultiXactOffsets => 0x02,
1477 : },
1478 : field3: 1,
1479 10411 : field4: segno,
1480 10411 : field5: 0,
1481 10411 : field6: 0xffffffff,
1482 10411 : }
1483 10411 : }
1484 :
1485 9 : fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
1486 9 : let field2 = match kind {
1487 9 : SlruKind::Clog => 0x00,
1488 0 : SlruKind::MultiXactMembers => 0x01,
1489 0 : SlruKind::MultiXactOffsets => 0x02,
1490 : };
1491 :
1492 9 : Key {
1493 9 : field1: 0x01,
1494 9 : field2,
1495 9 : field3: 1,
1496 9 : field4: segno,
1497 9 : field5: 0,
1498 9 : field6: 0,
1499 9 : }..Key {
1500 9 : field1: 0x01,
1501 9 : field2,
1502 9 : field3: 1,
1503 9 : field4: segno,
1504 9 : field5: 1,
1505 9 : field6: 0,
1506 9 : }
1507 9 : }
1508 :
1509 : //-- Section 03: pg_twophase
1510 :
1511 : const TWOPHASEDIR_KEY: Key = Key {
1512 : field1: 0x02,
1513 : field2: 0,
1514 : field3: 0,
1515 : field4: 0,
1516 : field5: 0,
1517 : field6: 0,
1518 : };
1519 :
1520 6 : fn twophase_file_key(xid: TransactionId) -> Key {
1521 6 : Key {
1522 6 : field1: 0x02,
1523 6 : field2: 0,
1524 6 : field3: 0,
1525 6 : field4: 0,
1526 6 : field5: 0,
1527 6 : field6: xid,
1528 6 : }
1529 6 : }
1530 :
1531 2 : fn twophase_key_range(xid: TransactionId) -> Range<Key> {
1532 2 : let (next_xid, overflowed) = xid.overflowing_add(1);
1533 2 :
1534 2 : Key {
1535 2 : field1: 0x02,
1536 2 : field2: 0,
1537 2 : field3: 0,
1538 2 : field4: 0,
1539 2 : field5: 0,
1540 2 : field6: xid,
1541 2 : }..Key {
1542 2 : field1: 0x02,
1543 2 : field2: 0,
1544 2 : field3: 0,
1545 2 : field4: 0,
1546 2 : field5: u8::from(overflowed),
1547 2 : field6: next_xid,
1548 2 : }
1549 2 : }
1550 :
1551 : //-- Section 03: Control file
1552 : const CONTROLFILE_KEY: Key = Key {
1553 : field1: 0x03,
1554 : field2: 0,
1555 : field3: 0,
1556 : field4: 0,
1557 : field5: 0,
1558 : field6: 0,
1559 : };
1560 :
1561 : const CHECKPOINT_KEY: Key = Key {
1562 : field1: 0x03,
1563 : field2: 0,
1564 : field3: 0,
1565 : field4: 0,
1566 : field5: 0,
1567 : field6: 1,
1568 : };
1569 :
1570 : // Reverse mappings for a few Keys.
1571 : // These are needed by WAL redo manager.
1572 :
1573 2761257 : pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
1574 2761257 : Ok(match key.field1 {
1575 2761257 : 0x00 => (
1576 2761257 : RelTag {
1577 2761257 : spcnode: key.field2,
1578 2761257 : dbnode: key.field3,
1579 2761257 : relnode: key.field4,
1580 2761257 : forknum: key.field5,
1581 2761257 : },
1582 2761257 : key.field6,
1583 2761257 : ),
1584 0 : _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
1585 : })
1586 2761257 : }
1587 :
1588 0 : fn is_rel_block_key(key: Key) -> bool {
1589 0 : key.field1 == 0x00 && key.field4 != 0
1590 0 : }
1591 :
1592 0 : pub fn is_rel_fsm_block_key(key: Key) -> bool {
1593 0 : key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
1594 0 : }
1595 :
1596 0 : pub fn is_rel_vm_block_key(key: Key) -> bool {
1597 0 : key.field1 == 0x00
1598 0 : && key.field4 != 0
1599 0 : && key.field5 == VISIBILITYMAP_FORKNUM
1600 0 : && key.field6 != 0xffffffff
1601 0 : }
1602 :
1603 28080239 : pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
1604 28080239 : Ok(match key.field1 {
1605 : 0x01 => {
1606 28080239 : let kind = match key.field2 {
1607 27984367 : 0x00 => SlruKind::Clog,
1608 48210 : 0x01 => SlruKind::MultiXactMembers,
1609 47662 : 0x02 => SlruKind::MultiXactOffsets,
1610 0 : _ => anyhow::bail!("unrecognized slru kind 0x{:02x}", key.field2),
1611 : };
1612 28080239 : let segno = key.field4;
1613 28080239 : let blknum = key.field6;
1614 28080239 :
1615 28080239 : (kind, segno, blknum)
1616 : }
1617 0 : _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
1618 : })
1619 28080239 : }
1620 :
1621 0 : fn is_slru_block_key(key: Key) -> bool {
1622 0 : key.field1 == 0x01 // SLRU-related
1623 0 : && key.field3 == 0x00000001 // but not SlruDir
1624 0 : && key.field6 != 0xffffffff // and not SlruSegSize
1625 0 : }
1626 :
1627 : #[allow(clippy::bool_assert_comparison)]
1628 : #[cfg(test)]
1629 : mod tests {
1630 : //use super::repo_harness::*;
1631 : //use super::*;
1632 :
1633 : /*
1634 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1635 : let incremental = timeline.get_current_logical_size();
1636 : let non_incremental = timeline
1637 : .get_current_logical_size_non_incremental(lsn)
1638 : .unwrap();
1639 : assert_eq!(incremental, non_incremental);
1640 : }
1641 : */
1642 :
1643 : /*
1644 : ///
1645 : /// Test list_rels() function, with branches and dropped relations
1646 : ///
1647 : #[test]
1648 : fn test_list_rels_drop() -> Result<()> {
1649 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1650 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1651 : const TESTDB: u32 = 111;
1652 :
1653 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1654 : // after branching fails below
1655 : let mut writer = tline.begin_record(Lsn(0x10));
1656 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1657 : writer.finish()?;
1658 :
1659 : // Create a relation on the timeline
1660 : let mut writer = tline.begin_record(Lsn(0x20));
1661 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1662 : writer.finish()?;
1663 :
1664 : let writer = tline.begin_record(Lsn(0x00));
1665 : writer.finish()?;
1666 :
1667 : // Check that list_rels() lists it after LSN 2, but no before it
1668 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1669 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
1670 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
1671 :
1672 : // Create a branch, check that the relation is visible there
1673 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
1674 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
1675 : Some(timeline) => timeline,
1676 : None => panic!("Should have a local timeline"),
1677 : };
1678 : let newtline = DatadirTimelineImpl::new(newtline);
1679 : assert!(newtline
1680 : .list_rels(0, TESTDB, Lsn(0x30))?
1681 : .contains(&TESTREL_A));
1682 :
1683 : // Drop it on the branch
1684 : let mut new_writer = newtline.begin_record(Lsn(0x40));
1685 : new_writer.drop_relation(TESTREL_A)?;
1686 : new_writer.finish()?;
1687 :
1688 : // Check that it's no longer listed on the branch after the point where it was dropped
1689 : assert!(newtline
1690 : .list_rels(0, TESTDB, Lsn(0x30))?
1691 : .contains(&TESTREL_A));
1692 : assert!(!newtline
1693 : .list_rels(0, TESTDB, Lsn(0x40))?
1694 : .contains(&TESTREL_A));
1695 :
1696 : // Run checkpoint and garbage collection and check that it's still not visible
1697 : newtline.checkpoint(CheckpointConfig::Forced)?;
1698 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
1699 :
1700 : assert!(!newtline
1701 : .list_rels(0, TESTDB, Lsn(0x40))?
1702 : .contains(&TESTREL_A));
1703 :
1704 : Ok(())
1705 : }
1706 : */
1707 :
1708 : /*
1709 : #[test]
1710 : fn test_read_beyond_eof() -> Result<()> {
1711 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
1712 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
1713 :
1714 : make_some_layers(&tline, Lsn(0x20))?;
1715 : let mut writer = tline.begin_record(Lsn(0x60));
1716 : walingest.put_rel_page_image(
1717 : &mut writer,
1718 : TESTREL_A,
1719 : 0,
1720 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
1721 : )?;
1722 : writer.finish()?;
1723 :
1724 : // Test read before rel creation. Should error out.
1725 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
1726 :
1727 : // Read block beyond end of relation at different points in time.
1728 : // These reads should fall into different delta, image, and in-memory layers.
1729 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
1730 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
1731 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
1732 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
1733 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
1734 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
1735 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
1736 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
1737 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
1738 :
1739 : // Test on an in-memory layer with no preceding layer
1740 : let mut writer = tline.begin_record(Lsn(0x70));
1741 : walingest.put_rel_page_image(
1742 : &mut writer,
1743 : TESTREL_B,
1744 : 0,
1745 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
1746 : )?;
1747 : writer.finish()?;
1748 :
1749 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
1750 :
1751 : Ok(())
1752 : }
1753 : */
1754 : }
|