TLA Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::repository::*;
13 : use crate::walrecord::NeonWalRecord;
14 : use anyhow::{ensure, Context};
15 : use bytes::{Buf, Bytes};
16 : use pageserver_api::key::is_rel_block_key;
17 : use pageserver_api::reltag::{RelTag, SlruKind};
18 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
19 : use postgres_ffi::BLCKSZ;
20 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
21 : use serde::{Deserialize, Serialize};
22 : use std::collections::{hash_map, HashMap, HashSet};
23 : use std::ops::ControlFlow;
24 : use std::ops::Range;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::{debug, trace, warn};
27 : use utils::bin_ser::DeserializeError;
28 : use utils::{bin_ser::BeSer, lsn::Lsn};
29 :
30 : /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
31 : pub type BlockNumber = u32;
32 :
33 UBC 0 : #[derive(Debug)]
34 : pub enum LsnForTimestamp {
35 : /// Found commits both before and after the given timestamp
36 : Present(Lsn),
37 :
38 : /// Found no commits after the given timestamp, this means
39 : /// that the newest data in the branch is older than the given
40 : /// timestamp.
41 : ///
42 : /// All commits <= LSN happened before the given timestamp
43 : Future(Lsn),
44 :
45 : /// The queried timestamp is past our horizon we look back at (PITR)
46 : ///
47 : /// All commits > LSN happened after the given timestamp,
48 : /// but any commits < LSN might have happened before or after
49 : /// the given timestamp. We don't know because no data before
50 : /// the given lsn is available.
51 : Past(Lsn),
52 :
53 : /// We have found no commit with a timestamp,
54 : /// so we can't return anything meaningful.
55 : ///
56 : /// The associated LSN is the lower bound value we can safely
57 : /// create branches on, but no statement is made if it is
58 : /// older or newer than the timestamp.
59 : ///
60 : /// This variant can e.g. be returned right after a
61 : /// cluster import.
62 : NoData(Lsn),
63 : }
64 :
65 0 : #[derive(Debug, thiserror::Error)]
66 : pub enum CalculateLogicalSizeError {
67 : #[error("cancelled")]
68 : Cancelled,
69 : #[error(transparent)]
70 : Other(#[from] anyhow::Error),
71 : }
72 :
73 LBC (4) : #[derive(Debug, thiserror::Error)]
74 : pub(crate) enum CollectKeySpaceError {
75 : #[error(transparent)]
76 : Decode(#[from] DeserializeError),
77 : #[error(transparent)]
78 : PageRead(PageReconstructError),
79 : #[error("cancelled")]
80 : Cancelled,
81 : }
82 :
83 : impl From<PageReconstructError> for CollectKeySpaceError {
84 CBC 1 : fn from(err: PageReconstructError) -> Self {
85 1 : match err {
86 1 : PageReconstructError::Cancelled => Self::Cancelled,
87 LBC (2) : err => Self::PageRead(err),
88 : }
89 CBC 1 : }
90 : }
91 :
92 : impl From<PageReconstructError> for CalculateLogicalSizeError {
93 18 : fn from(pre: PageReconstructError) -> Self {
94 18 : match pre {
95 : PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
96 16 : Self::Cancelled
97 : }
98 2 : _ => Self::Other(pre.into()),
99 : }
100 18 : }
101 : }
102 :
103 UBC 0 : #[derive(Debug, thiserror::Error)]
104 : pub enum RelationError {
105 : #[error("Relation Already Exists")]
106 : AlreadyExists,
107 : #[error("invalid relnode")]
108 : InvalidRelnode,
109 : #[error(transparent)]
110 : Other(#[from] anyhow::Error),
111 : }
112 :
113 : ///
114 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
115 : /// and other special kinds of files, in a versioned key-value store. The
116 : /// Timeline struct provides the key-value store.
117 : ///
118 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
119 : /// implementation, and might be moved into a separate struct later.
120 : impl Timeline {
121 : /// Start ingesting a WAL record, or other atomic modification of
122 : /// the timeline.
123 : ///
124 : /// This provides a transaction-like interface to perform a bunch
125 : /// of modifications atomically.
126 : ///
127 : /// To ingest a WAL record, call begin_modification(lsn) to get a
128 : /// DatadirModification object. Use the functions in the object to
129 : /// modify the repository state, updating all the pages and metadata
130 : /// that the WAL record affects. When you're done, call commit() to
131 : /// commit the changes.
132 : ///
133 : /// Lsn stored in modification is advanced by `ingest_record` and
134 : /// is used by `commit()` to update `last_record_lsn`.
135 : ///
136 : /// Calling commit() will flush all the changes and reset the state,
137 : /// so the `DatadirModification` struct can be reused to perform the next modification.
138 : ///
139 : /// Note that any pending modifications you make through the
140 : /// modification object won't be visible to calls to the 'get' and list
141 : /// functions of the timeline until you finish! And if you update the
142 : /// same page twice, the last update wins.
143 : ///
144 CBC 697658 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
145 697658 : where
146 697658 : Self: Sized,
147 697658 : {
148 697658 : DatadirModification {
149 697658 : tline: self,
150 697658 : pending_lsns: Vec::new(),
151 697658 : pending_updates: HashMap::new(),
152 697658 : pending_deletions: Vec::new(),
153 697658 : pending_nblocks: 0,
154 697658 : lsn,
155 697658 : }
156 697658 : }
157 :
158 : //------------------------------------------------------------------------------
159 : // Public GET functions
160 : //------------------------------------------------------------------------------
161 :
162 : /// Look up given page version.
163 3648033 : pub(crate) async fn get_rel_page_at_lsn(
164 3648033 : &self,
165 3648033 : tag: RelTag,
166 3648033 : blknum: BlockNumber,
167 3648033 : version: Version<'_>,
168 3648033 : latest: bool,
169 3648033 : ctx: &RequestContext,
170 3648033 : ) -> Result<Bytes, PageReconstructError> {
171 3648033 : if tag.relnode == 0 {
172 UBC 0 : return Err(PageReconstructError::Other(
173 0 : RelationError::InvalidRelnode.into(),
174 0 : ));
175 CBC 3648033 : }
176 :
177 3648033 : let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
178 3648033 : if blknum >= nblocks {
179 UBC 0 : debug!(
180 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
181 0 : tag,
182 0 : blknum,
183 0 : version.get_lsn(),
184 0 : nblocks
185 0 : );
186 CBC 118268 : return Ok(ZERO_PAGE.clone());
187 3529765 : }
188 3529765 :
189 3529765 : let key = rel_block_to_key(tag, blknum);
190 3529765 : version.get(self, key, ctx).await
191 3648007 : }
192 :
193 : // Get size of a database in blocks
194 8 : pub(crate) async fn get_db_size(
195 8 : &self,
196 8 : spcnode: Oid,
197 8 : dbnode: Oid,
198 8 : version: Version<'_>,
199 8 : latest: bool,
200 8 : ctx: &RequestContext,
201 8 : ) -> Result<usize, PageReconstructError> {
202 8 : let mut total_blocks = 0;
203 :
204 8 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
205 :
206 2351 : for rel in rels {
207 2343 : let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
208 2343 : total_blocks += n_blocks as usize;
209 : }
210 8 : Ok(total_blocks)
211 8 : }
212 :
213 : /// Get size of a relation file
214 3777105 : pub(crate) async fn get_rel_size(
215 3777105 : &self,
216 3777105 : tag: RelTag,
217 3777105 : version: Version<'_>,
218 3777105 : latest: bool,
219 3777105 : ctx: &RequestContext,
220 3777105 : ) -> Result<BlockNumber, PageReconstructError> {
221 3777105 : if tag.relnode == 0 {
222 UBC 0 : return Err(PageReconstructError::Other(
223 0 : RelationError::InvalidRelnode.into(),
224 0 : ));
225 CBC 3777105 : }
226 :
227 3777105 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
228 3476775 : return Ok(nblocks);
229 300330 : }
230 300330 :
231 300330 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
232 7323 : && !self.get_rel_exists(tag, version, latest, ctx).await?
233 : {
234 : // FIXME: Postgres sometimes calls smgrcreate() to create
235 : // FSM, and smgrnblocks() on it immediately afterwards,
236 : // without extending it. Tolerate that by claiming that
237 : // any non-existent FSM fork has size 0.
238 11 : return Ok(0);
239 300319 : }
240 300319 :
241 300319 : let key = rel_size_to_key(tag);
242 300319 : let mut buf = version.get(self, key, ctx).await?;
243 300317 : let nblocks = buf.get_u32_le();
244 300317 :
245 300317 : if latest {
246 151988 : // Update relation size cache only if "latest" flag is set.
247 151988 : // This flag is set by compute when it is working with most recent version of relation.
248 151988 : // Typically master compute node always set latest=true.
249 151988 : // Please notice, that even if compute node "by mistake" specifies old LSN but set
250 151988 : // latest=true, then it can not cause cache corruption, because with latest=true
251 151988 : // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
252 151988 : // associated with most recent value of LSN.
253 151988 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
254 151988 : }
255 300317 : Ok(nblocks)
256 3777105 : }
257 :
258 : /// Does relation exist?
259 212940 : pub(crate) async fn get_rel_exists(
260 212940 : &self,
261 212940 : tag: RelTag,
262 212940 : version: Version<'_>,
263 212940 : _latest: bool,
264 212940 : ctx: &RequestContext,
265 212940 : ) -> Result<bool, PageReconstructError> {
266 212940 : if tag.relnode == 0 {
267 UBC 0 : return Err(PageReconstructError::Other(
268 0 : RelationError::InvalidRelnode.into(),
269 0 : ));
270 CBC 212940 : }
271 :
272 : // first try to lookup relation in cache
273 212940 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
274 113638 : return Ok(true);
275 99302 : }
276 99302 : // fetch directory listing
277 99302 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
278 99302 : let buf = version.get(self, key, ctx).await?;
279 :
280 99302 : match RelDirectory::des(&buf).context("deserialization failure") {
281 99302 : Ok(dir) => {
282 99302 : let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
283 99302 : Ok(exists)
284 : }
285 UBC 0 : Err(e) => Err(PageReconstructError::from(e)),
286 : }
287 CBC 212940 : }
288 :
289 : /// Get a list of all existing relations in given tablespace and database.
290 : ///
291 : /// # Cancel-Safety
292 : ///
293 : /// This method is cancellation-safe.
294 7375 : pub(crate) async fn list_rels(
295 7375 : &self,
296 7375 : spcnode: Oid,
297 7375 : dbnode: Oid,
298 7375 : version: Version<'_>,
299 7375 : ctx: &RequestContext,
300 7375 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
301 7375 : // fetch directory listing
302 7375 : let key = rel_dir_to_key(spcnode, dbnode);
303 7375 : let buf = version.get(self, key, ctx).await?;
304 :
305 7375 : match RelDirectory::des(&buf).context("deserialization failure") {
306 7375 : Ok(dir) => {
307 7375 : let rels: HashSet<RelTag> =
308 1735987 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
309 1735987 : spcnode,
310 1735987 : dbnode,
311 1735987 : relnode: *relnode,
312 1735987 : forknum: *forknum,
313 1735987 : }));
314 7375 :
315 7375 : Ok(rels)
316 : }
317 UBC 0 : Err(e) => Err(PageReconstructError::from(e)),
318 : }
319 CBC 7375 : }
320 :
321 : /// Look up given SLRU page version.
322 5607 : pub(crate) async fn get_slru_page_at_lsn(
323 5607 : &self,
324 5607 : kind: SlruKind,
325 5607 : segno: u32,
326 5607 : blknum: BlockNumber,
327 5607 : lsn: Lsn,
328 5607 : ctx: &RequestContext,
329 5607 : ) -> Result<Bytes, PageReconstructError> {
330 5607 : let key = slru_block_to_key(kind, segno, blknum);
331 255502 : self.get(key, lsn, ctx).await
332 5606 : }
333 :
334 : /// Get size of an SLRU segment
335 5622 : pub(crate) async fn get_slru_segment_size(
336 5622 : &self,
337 5622 : kind: SlruKind,
338 5622 : segno: u32,
339 5622 : version: Version<'_>,
340 5622 : ctx: &RequestContext,
341 5622 : ) -> Result<BlockNumber, PageReconstructError> {
342 5622 : let key = slru_segment_size_to_key(kind, segno);
343 5622 : let mut buf = version.get(self, key, ctx).await?;
344 5622 : Ok(buf.get_u32_le())
345 5622 : }
346 :
347 : /// Get size of an SLRU segment
348 664 : pub(crate) async fn get_slru_segment_exists(
349 664 : &self,
350 664 : kind: SlruKind,
351 664 : segno: u32,
352 664 : version: Version<'_>,
353 664 : ctx: &RequestContext,
354 664 : ) -> Result<bool, PageReconstructError> {
355 664 : // fetch directory listing
356 664 : let key = slru_dir_to_key(kind);
357 664 : let buf = version.get(self, key, ctx).await?;
358 :
359 664 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
360 664 : Ok(dir) => {
361 664 : let exists = dir.segments.get(&segno).is_some();
362 664 : Ok(exists)
363 : }
364 UBC 0 : Err(e) => Err(PageReconstructError::from(e)),
365 : }
366 CBC 664 : }
367 :
368 : /// Locate LSN, such that all transactions that committed before
369 : /// 'search_timestamp' are visible, but nothing newer is.
370 : ///
371 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
372 : /// so it's not well defined which LSN you get if there were multiple commits
373 : /// "in flight" at that point in time.
374 : ///
375 201 : pub(crate) async fn find_lsn_for_timestamp(
376 201 : &self,
377 201 : search_timestamp: TimestampTz,
378 201 : cancel: &CancellationToken,
379 201 : ctx: &RequestContext,
380 201 : ) -> Result<LsnForTimestamp, PageReconstructError> {
381 201 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
382 201 : // We use this method to figure out the branching LSN for the new branch, but the
383 201 : // GC cutoff could be before the branching point and we cannot create a new branch
384 201 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
385 201 : // on the safe side.
386 201 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
387 201 : let max_lsn = self.get_last_record_lsn();
388 201 :
389 201 : // LSNs are always 8-byte aligned. low/mid/high represent the
390 201 : // LSN divided by 8.
391 201 : let mut low = min_lsn.0 / 8;
392 201 : let mut high = max_lsn.0 / 8 + 1;
393 201 :
394 201 : let mut found_smaller = false;
395 201 : let mut found_larger = false;
396 3474 : while low < high {
397 3274 : if cancel.is_cancelled() {
398 UBC 0 : return Err(PageReconstructError::Cancelled);
399 CBC 3274 : }
400 3274 : // cannot overflow, high and low are both smaller than u64::MAX / 2
401 3274 : let mid = (high + low) / 2;
402 :
403 3274 : let cmp = self
404 3274 : .is_latest_commit_timestamp_ge_than(
405 3274 : search_timestamp,
406 3274 : Lsn(mid * 8),
407 3274 : &mut found_smaller,
408 3274 : &mut found_larger,
409 3274 : ctx,
410 3274 : )
411 237224 : .await?;
412 :
413 3273 : if cmp {
414 794 : high = mid;
415 2479 : } else {
416 2479 : low = mid + 1;
417 2479 : }
418 : }
419 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
420 : // so the LSN of the last commit record before or at `search_timestamp`.
421 : // Remove one from `low` to get `t`.
422 : //
423 : // FIXME: it would be better to get the LSN of the previous commit.
424 : // Otherwise, if you restore to the returned LSN, the database will
425 : // include physical changes from later commits that will be marked
426 : // as aborted, and will need to be vacuumed away.
427 200 : let commit_lsn = Lsn((low - 1) * 8);
428 200 : match (found_smaller, found_larger) {
429 : (false, false) => {
430 : // This can happen if no commit records have been processed yet, e.g.
431 : // just after importing a cluster.
432 23 : Ok(LsnForTimestamp::NoData(min_lsn))
433 : }
434 : (false, true) => {
435 : // Didn't find any commit timestamps smaller than the request
436 19 : Ok(LsnForTimestamp::Past(min_lsn))
437 : }
438 : (true, false) => {
439 : // Only found commits with timestamps smaller than the request.
440 : // It's still a valid case for branch creation, return it.
441 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
442 : // case, anyway.
443 90 : Ok(LsnForTimestamp::Future(commit_lsn))
444 : }
445 68 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
446 : }
447 200 : }
448 :
449 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
450 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
451 : ///
452 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
453 : /// with a smaller/larger timestamp.
454 : ///
455 3274 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
456 3274 : &self,
457 3274 : search_timestamp: TimestampTz,
458 3274 : probe_lsn: Lsn,
459 3274 : found_smaller: &mut bool,
460 3274 : found_larger: &mut bool,
461 3274 : ctx: &RequestContext,
462 3274 : ) -> Result<bool, PageReconstructError> {
463 3274 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
464 3026 : if timestamp >= search_timestamp {
465 794 : *found_larger = true;
466 794 : return ControlFlow::Break(true);
467 2232 : } else {
468 2232 : *found_smaller = true;
469 2232 : }
470 2232 : ControlFlow::Continue(())
471 3274 : })
472 237224 : .await
473 3273 : }
474 :
475 : /// Obtain the possible timestamp range for the given lsn.
476 : ///
477 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
478 12 : pub(crate) async fn get_timestamp_for_lsn(
479 12 : &self,
480 12 : probe_lsn: Lsn,
481 12 : ctx: &RequestContext,
482 12 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
483 12 : let mut max: Option<TimestampTz> = None;
484 12 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
485 10 : if let Some(max_prev) = max {
486 UBC 0 : max = Some(max_prev.max(timestamp));
487 CBC 10 : } else {
488 10 : max = Some(timestamp);
489 10 : }
490 10 : ControlFlow::Continue(())
491 12 : })
492 80 : .await?;
493 :
494 10 : Ok(max)
495 12 : }
496 :
497 : /// Runs the given function on all the timestamps for a given lsn
498 : ///
499 : /// The return value is either given by the closure, or set to the `Default`
500 : /// impl's output.
501 3286 : async fn map_all_timestamps<T: Default>(
502 3286 : &self,
503 3286 : probe_lsn: Lsn,
504 3286 : ctx: &RequestContext,
505 3286 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
506 3286 : ) -> Result<T, PageReconstructError> {
507 3286 : for segno in self
508 3286 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
509 1027 : .await?
510 : {
511 3284 : let nblocks = self
512 3284 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
513 949 : .await?;
514 3284 : for blknum in (0..nblocks).rev() {
515 3284 : let clog_page = self
516 3284 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
517 235328 : .await?;
518 :
519 3283 : if clog_page.len() == BLCKSZ as usize + 8 {
520 3036 : let mut timestamp_bytes = [0u8; 8];
521 3036 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
522 3036 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
523 3036 :
524 3036 : match f(timestamp) {
525 794 : ControlFlow::Break(b) => return Ok(b),
526 2242 : ControlFlow::Continue(()) => (),
527 : }
528 247 : }
529 : }
530 : }
531 2489 : Ok(Default::default())
532 3285 : }
533 :
534 : /// Get a list of SLRU segments
535 4962 : pub(crate) async fn list_slru_segments(
536 4962 : &self,
537 4962 : kind: SlruKind,
538 4962 : version: Version<'_>,
539 4962 : ctx: &RequestContext,
540 4962 : ) -> Result<HashSet<u32>, PageReconstructError> {
541 4962 : // fetch directory entry
542 4962 : let key = slru_dir_to_key(kind);
543 :
544 4962 : let buf = version.get(self, key, ctx).await?;
545 4959 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
546 4959 : Ok(dir) => Ok(dir.segments),
547 UBC 0 : Err(e) => Err(PageReconstructError::from(e)),
548 : }
549 CBC 4962 : }
550 :
551 2249 : pub(crate) async fn get_relmap_file(
552 2249 : &self,
553 2249 : spcnode: Oid,
554 2249 : dbnode: Oid,
555 2249 : version: Version<'_>,
556 2249 : ctx: &RequestContext,
557 2249 : ) -> Result<Bytes, PageReconstructError> {
558 2249 : let key = relmap_file_key(spcnode, dbnode);
559 :
560 2249 : let buf = version.get(self, key, ctx).await?;
561 2249 : Ok(buf)
562 2249 : }
563 :
564 558 : pub(crate) async fn list_dbdirs(
565 558 : &self,
566 558 : lsn: Lsn,
567 558 : ctx: &RequestContext,
568 558 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
569 : // fetch directory entry
570 558 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
571 :
572 558 : match DbDirectory::des(&buf).context("deserialization failure") {
573 558 : Ok(dir) => Ok(dir.dbdirs),
574 UBC 0 : Err(e) => Err(PageReconstructError::from(e)),
575 : }
576 CBC 558 : }
577 :
578 2 : pub(crate) async fn get_twophase_file(
579 2 : &self,
580 2 : xid: TransactionId,
581 2 : lsn: Lsn,
582 2 : ctx: &RequestContext,
583 2 : ) -> Result<Bytes, PageReconstructError> {
584 2 : let key = twophase_file_key(xid);
585 2 : let buf = self.get(key, lsn, ctx).await?;
586 2 : Ok(buf)
587 2 : }
588 :
589 558 : pub(crate) async fn list_twophase_files(
590 558 : &self,
591 558 : lsn: Lsn,
592 558 : ctx: &RequestContext,
593 558 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
594 : // fetch directory entry
595 558 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
596 :
597 558 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
598 558 : Ok(dir) => Ok(dir.xids),
599 UBC 0 : Err(e) => Err(PageReconstructError::from(e)),
600 : }
601 CBC 558 : }
602 :
603 557 : pub(crate) async fn get_control_file(
604 557 : &self,
605 557 : lsn: Lsn,
606 557 : ctx: &RequestContext,
607 557 : ) -> Result<Bytes, PageReconstructError> {
608 557 : self.get(CONTROLFILE_KEY, lsn, ctx).await
609 557 : }
610 :
611 1798 : pub(crate) async fn get_checkpoint(
612 1798 : &self,
613 1798 : lsn: Lsn,
614 1798 : ctx: &RequestContext,
615 1798 : ) -> Result<Bytes, PageReconstructError> {
616 1798 : self.get(CHECKPOINT_KEY, lsn, ctx).await
617 1798 : }
618 :
619 2241 : pub(crate) async fn list_aux_files(
620 2241 : &self,
621 2241 : lsn: Lsn,
622 2241 : ctx: &RequestContext,
623 2241 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
624 2241 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
625 1305 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
626 1305 : Ok(dir) => Ok(dir.files),
627 UBC 0 : Err(e) => Err(PageReconstructError::from(e)),
628 : },
629 CBC 936 : Err(e) => {
630 : // This is expected: historical databases do not have the key.
631 UBC 0 : debug!("Failed to get info about AUX files: {}", e);
632 CBC 936 : Ok(HashMap::new())
633 : }
634 : }
635 2241 : }
636 :
637 : /// Does the same as get_current_logical_size but counted on demand.
638 : /// Used to initialize the logical size tracking on startup.
639 : ///
640 : /// Only relation blocks are counted currently. That excludes metadata,
641 : /// SLRUs, twophase files etc.
642 : ///
643 : /// # Cancel-Safety
644 : ///
645 : /// This method is cancellation-safe.
646 651 : pub async fn get_current_logical_size_non_incremental(
647 651 : &self,
648 651 : lsn: Lsn,
649 651 : ctx: &RequestContext,
650 651 : ) -> Result<u64, CalculateLogicalSizeError> {
651 651 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
652 :
653 : // Fetch list of database dirs and iterate them
654 985 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
655 639 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
656 :
657 639 : let mut total_size: u64 = 0;
658 2537 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
659 589751 : for rel in self
660 2537 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
661 350 : .await?
662 : {
663 589751 : if self.cancel.is_cancelled() {
664 GBC 1 : return Err(CalculateLogicalSizeError::Cancelled);
665 CBC 589750 : }
666 589750 : let relsize_key = rel_size_to_key(rel);
667 589750 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
668 589734 : let relsize = buf.get_u32_le();
669 589734 :
670 589734 : total_size += relsize as u64;
671 : }
672 : }
673 622 : Ok(total_size * BLCKSZ as u64)
674 641 : }
675 :
676 : ///
677 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
678 : /// Anything that's not listed maybe removed from the underlying storage (from
679 : /// that LSN forwards).
680 726 : pub(crate) async fn collect_keyspace(
681 726 : &self,
682 726 : lsn: Lsn,
683 726 : ctx: &RequestContext,
684 726 : ) -> Result<KeySpace, CollectKeySpaceError> {
685 726 : // Iterate through key ranges, greedily packing them into partitions
686 726 : let mut result = KeySpaceAccum::new();
687 726 :
688 726 : // The dbdir metadata always exists
689 726 : result.add_key(DBDIR_KEY);
690 :
691 : // Fetch list of database dirs and iterate them
692 726 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
693 726 : let dbdir = DbDirectory::des(&buf)?;
694 :
695 726 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
696 726 : dbs.sort_unstable();
697 3302 : for (spcnode, dbnode) in dbs {
698 2579 : result.add_key(relmap_file_key(spcnode, dbnode));
699 2579 : result.add_key(rel_dir_to_key(spcnode, dbnode));
700 :
701 2579 : let mut rels: Vec<RelTag> = self
702 2579 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
703 434 : .await?
704 2579 : .into_iter()
705 2579 : .collect();
706 2579 : rels.sort_unstable();
707 614555 : for rel in rels {
708 611979 : let relsize_key = rel_size_to_key(rel);
709 611979 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
710 611976 : let relsize = buf.get_u32_le();
711 611976 :
712 611976 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
713 611976 : result.add_key(relsize_key);
714 : }
715 : }
716 :
717 : // Iterate SLRUs next
718 2169 : for kind in [
719 723 : SlruKind::Clog,
720 723 : SlruKind::MultiXactMembers,
721 723 : SlruKind::MultiXactOffsets,
722 : ] {
723 2169 : let slrudir_key = slru_dir_to_key(kind);
724 2169 : result.add_key(slrudir_key);
725 2169 : let buf = self.get(slrudir_key, lsn, ctx).await?;
726 2169 : let dir = SlruSegmentDirectory::des(&buf)?;
727 2169 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
728 2169 : segments.sort_unstable();
729 4097 : for segno in segments {
730 1928 : let segsize_key = slru_segment_size_to_key(kind, segno);
731 1928 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
732 1928 : let segsize = buf.get_u32_le();
733 1928 :
734 1928 : result.add_range(
735 1928 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
736 1928 : );
737 1928 : result.add_key(segsize_key);
738 : }
739 : }
740 :
741 : // Then pg_twophase
742 723 : result.add_key(TWOPHASEDIR_KEY);
743 723 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
744 723 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
745 723 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
746 723 : xids.sort_unstable();
747 723 : for xid in xids {
748 UBC 0 : result.add_key(twophase_file_key(xid));
749 0 : }
750 :
751 CBC 723 : result.add_key(CONTROLFILE_KEY);
752 723 : result.add_key(CHECKPOINT_KEY);
753 723 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
754 515 : result.add_key(AUX_FILES_KEY);
755 515 : }
756 723 : Ok(result.to_keyspace())
757 724 : }
758 :
759 : /// Get cached size of relation if it not updated after specified LSN
760 51218414 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
761 51218414 : let rel_size_cache = self.rel_size_cache.read().unwrap();
762 51218414 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
763 51028779 : if lsn >= *cached_lsn {
764 50816009 : return Some(*nblocks);
765 212770 : }
766 189635 : }
767 402405 : None
768 51218414 : }
769 :
770 : /// Update cached relation size if there is no more recent update
771 151988 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
772 151988 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
773 151988 : match rel_size_cache.entry(tag) {
774 128440 : hash_map::Entry::Occupied(mut entry) => {
775 128440 : let cached_lsn = entry.get_mut();
776 128440 : if lsn >= cached_lsn.0 {
777 8 : *cached_lsn = (lsn, nblocks);
778 128432 : }
779 : }
780 23548 : hash_map::Entry::Vacant(entry) => {
781 23548 : entry.insert((lsn, nblocks));
782 23548 : }
783 : }
784 151988 : }
785 :
786 : /// Store cached relation size
787 1492536 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
788 1492536 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
789 1492536 : rel_size_cache.insert(tag, (lsn, nblocks));
790 1492536 : }
791 :
792 : /// Remove cached relation size
793 17594 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
794 17594 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
795 17594 : rel_size_cache.remove(tag);
796 17594 : }
797 : }
798 :
799 : /// DatadirModification represents an operation to ingest an atomic set of
800 : /// updates to the repository. It is created by the 'begin_record'
801 : /// function. It is called for each WAL record, so that all the modifications
802 : /// by a one WAL record appear atomic.
803 : pub struct DatadirModification<'a> {
804 : /// The timeline this modification applies to. You can access this to
805 : /// read the state, but note that any pending updates are *not* reflected
806 : /// in the state in 'tline' yet.
807 : pub tline: &'a Timeline,
808 :
809 : /// Current LSN of the modification
810 : lsn: Lsn,
811 :
812 : // The modifications are not applied directly to the underlying key-value store.
813 : // The put-functions add the modifications here, and they are flushed to the
814 : // underlying key-value store by the 'finish' function.
815 : pending_lsns: Vec<Lsn>,
816 : pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
817 : pending_deletions: Vec<(Range<Key>, Lsn)>,
818 : pending_nblocks: i64,
819 : }
820 :
821 : impl<'a> DatadirModification<'a> {
822 : /// Get the current lsn
823 47228369 : pub(crate) fn get_lsn(&self) -> Lsn {
824 47228369 : self.lsn
825 47228369 : }
826 :
827 : /// Set the current lsn
828 47422616 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
829 47422616 : ensure!(
830 47422616 : lsn >= self.lsn,
831 UBC 0 : "setting an older lsn {} than {} is not allowed",
832 : lsn,
833 : self.lsn
834 : );
835 CBC 47422616 : if lsn > self.lsn {
836 47422616 : self.pending_lsns.push(self.lsn);
837 47422616 : self.lsn = lsn;
838 47422616 : }
839 47422616 : Ok(())
840 47422616 : }
841 :
842 : /// Initialize a completely new repository.
843 : ///
844 : /// This inserts the directory metadata entries that are assumed to
845 : /// always exist.
846 570 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
847 570 : let buf = DbDirectory::ser(&DbDirectory {
848 570 : dbdirs: HashMap::new(),
849 570 : })?;
850 570 : self.put(DBDIR_KEY, Value::Image(buf.into()));
851 570 :
852 570 : // Create AuxFilesDirectory
853 570 : self.init_aux_dir()?;
854 :
855 570 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
856 570 : xids: HashSet::new(),
857 570 : })?;
858 570 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
859 :
860 570 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
861 570 : let empty_dir = Value::Image(buf);
862 570 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
863 570 : self.put(
864 570 : slru_dir_to_key(SlruKind::MultiXactMembers),
865 570 : empty_dir.clone(),
866 570 : );
867 570 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
868 570 :
869 570 : Ok(())
870 570 : }
871 :
872 : #[cfg(test)]
873 35 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
874 35 : self.init_empty()?;
875 35 : self.put_control_file(bytes::Bytes::from_static(
876 35 : b"control_file contents do not matter",
877 35 : ))
878 35 : .context("put_control_file")?;
879 35 : self.put_checkpoint(bytes::Bytes::from_static(
880 35 : b"checkpoint_file contents do not matter",
881 35 : ))
882 35 : .context("put_checkpoint_file")?;
883 35 : Ok(())
884 35 : }
885 :
886 : /// Put a new page version that can be constructed from a WAL record
887 : ///
888 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
889 : /// current end-of-file. It's up to the caller to check that the relation size
890 : /// matches the blocks inserted!
891 46962169 : pub fn put_rel_wal_record(
892 46962169 : &mut self,
893 46962169 : rel: RelTag,
894 46962169 : blknum: BlockNumber,
895 46962169 : rec: NeonWalRecord,
896 46962169 : ) -> anyhow::Result<()> {
897 46962169 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
898 46962169 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
899 46962169 : Ok(())
900 46962169 : }
901 :
902 : // Same, but for an SLRU.
903 1849938 : pub fn put_slru_wal_record(
904 1849938 : &mut self,
905 1849938 : kind: SlruKind,
906 1849938 : segno: u32,
907 1849938 : blknum: BlockNumber,
908 1849938 : rec: NeonWalRecord,
909 1849938 : ) -> anyhow::Result<()> {
910 1849938 : self.put(
911 1849938 : slru_block_to_key(kind, segno, blknum),
912 1849938 : Value::WalRecord(rec),
913 1849938 : );
914 1849938 : Ok(())
915 1849938 : }
916 :
917 : /// Like put_wal_record, but with ready-made image of the page.
918 2165845 : pub fn put_rel_page_image(
919 2165845 : &mut self,
920 2165845 : rel: RelTag,
921 2165845 : blknum: BlockNumber,
922 2165845 : img: Bytes,
923 2165845 : ) -> anyhow::Result<()> {
924 2165845 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
925 2165845 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
926 2165845 : Ok(())
927 2165845 : }
928 :
929 2266 : pub fn put_slru_page_image(
930 2266 : &mut self,
931 2266 : kind: SlruKind,
932 2266 : segno: u32,
933 2266 : blknum: BlockNumber,
934 2266 : img: Bytes,
935 2266 : ) -> anyhow::Result<()> {
936 2266 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
937 2266 : Ok(())
938 2266 : }
939 :
940 : /// Store a relmapper file (pg_filenode.map) in the repository
941 2203 : pub async fn put_relmap_file(
942 2203 : &mut self,
943 2203 : spcnode: Oid,
944 2203 : dbnode: Oid,
945 2203 : img: Bytes,
946 2203 : ctx: &RequestContext,
947 2203 : ) -> anyhow::Result<()> {
948 : // Add it to the directory (if it doesn't exist already)
949 2203 : let buf = self.get(DBDIR_KEY, ctx).await?;
950 2203 : let mut dbdir = DbDirectory::des(&buf)?;
951 :
952 2203 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
953 2203 : if r.is_none() || r == Some(false) {
954 : // The dbdir entry didn't exist, or it contained a
955 : // 'false'. The 'insert' call already updated it with
956 : // 'true', now write the updated 'dbdirs' map back.
957 2149 : let buf = DbDirectory::ser(&dbdir)?;
958 2149 : self.put(DBDIR_KEY, Value::Image(buf.into()));
959 2149 :
960 2149 : // Create AuxFilesDirectory as well
961 2149 : self.init_aux_dir()?;
962 54 : }
963 2203 : if r.is_none() {
964 17 : // Create RelDirectory
965 17 : let buf = RelDirectory::ser(&RelDirectory {
966 17 : rels: HashSet::new(),
967 17 : })?;
968 17 : self.put(
969 17 : rel_dir_to_key(spcnode, dbnode),
970 17 : Value::Image(Bytes::from(buf)),
971 17 : );
972 2186 : }
973 :
974 2203 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
975 2203 : Ok(())
976 2203 : }
977 :
978 4 : pub async fn put_twophase_file(
979 4 : &mut self,
980 4 : xid: TransactionId,
981 4 : img: Bytes,
982 4 : ctx: &RequestContext,
983 4 : ) -> anyhow::Result<()> {
984 : // Add it to the directory entry
985 4 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
986 4 : let mut dir = TwoPhaseDirectory::des(&buf)?;
987 4 : if !dir.xids.insert(xid) {
988 UBC 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
989 CBC 4 : }
990 4 : self.put(
991 4 : TWOPHASEDIR_KEY,
992 4 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
993 : );
994 :
995 4 : self.put(twophase_file_key(xid), Value::Image(img));
996 4 : Ok(())
997 4 : }
998 :
999 568 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1000 568 : self.put(CONTROLFILE_KEY, Value::Image(img));
1001 568 : Ok(())
1002 568 : }
1003 :
1004 28367 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1005 28367 : self.put(CHECKPOINT_KEY, Value::Image(img));
1006 28367 : Ok(())
1007 28367 : }
1008 :
1009 3 : pub async fn drop_dbdir(
1010 3 : &mut self,
1011 3 : spcnode: Oid,
1012 3 : dbnode: Oid,
1013 3 : ctx: &RequestContext,
1014 3 : ) -> anyhow::Result<()> {
1015 3 : let total_blocks = self
1016 3 : .tline
1017 3 : .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
1018 UBC 0 : .await?;
1019 :
1020 : // Remove entry from dbdir
1021 CBC 3 : let buf = self.get(DBDIR_KEY, ctx).await?;
1022 3 : let mut dir = DbDirectory::des(&buf)?;
1023 3 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1024 3 : let buf = DbDirectory::ser(&dir)?;
1025 3 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1026 : } else {
1027 UBC 0 : warn!(
1028 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1029 0 : spcnode, dbnode
1030 0 : );
1031 : }
1032 :
1033 : // Update logical database size.
1034 CBC 3 : self.pending_nblocks -= total_blocks as i64;
1035 3 :
1036 3 : // Delete all relations and metadata files for the spcnode/dnode
1037 3 : self.delete(dbdir_key_range(spcnode, dbnode));
1038 3 : Ok(())
1039 3 : }
1040 :
1041 : /// Create a relation fork.
1042 : ///
1043 : /// 'nblocks' is the initial size.
1044 523217 : pub async fn put_rel_creation(
1045 523217 : &mut self,
1046 523217 : rel: RelTag,
1047 523217 : nblocks: BlockNumber,
1048 523217 : ctx: &RequestContext,
1049 523217 : ) -> Result<(), RelationError> {
1050 523217 : if rel.relnode == 0 {
1051 UBC 0 : return Err(RelationError::InvalidRelnode);
1052 CBC 523217 : }
1053 : // It's possible that this is the first rel for this db in this
1054 : // tablespace. Create the reldir entry for it if so.
1055 523217 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1056 523216 : .context("deserialize db")?;
1057 523216 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1058 523216 : let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
1059 : // Didn't exist. Update dbdir
1060 2133 : dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
1061 2133 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1062 2133 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1063 2133 :
1064 2133 : // and create the RelDirectory
1065 2133 : RelDirectory::default()
1066 : } else {
1067 : // reldir already exists, fetch it
1068 521083 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1069 521083 : .context("deserialize db")?
1070 : };
1071 :
1072 : // Add the new relation to the rel directory entry, and write it back
1073 523216 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1074 UBC 0 : return Err(RelationError::AlreadyExists);
1075 CBC 523216 : }
1076 523216 : self.put(
1077 523216 : rel_dir_key,
1078 523216 : Value::Image(Bytes::from(
1079 523216 : RelDirectory::ser(&rel_dir).context("serialize")?,
1080 : )),
1081 : );
1082 :
1083 : // Put size
1084 523216 : let size_key = rel_size_to_key(rel);
1085 523216 : let buf = nblocks.to_le_bytes();
1086 523216 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1087 523216 :
1088 523216 : self.pending_nblocks += nblocks as i64;
1089 523216 :
1090 523216 : // Update relation size cache
1091 523216 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1092 523216 :
1093 523216 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1094 523216 : // caller.
1095 523216 : Ok(())
1096 523217 : }
1097 :
1098 : /// Truncate relation
1099 3092 : pub async fn put_rel_truncation(
1100 3092 : &mut self,
1101 3092 : rel: RelTag,
1102 3092 : nblocks: BlockNumber,
1103 3092 : ctx: &RequestContext,
1104 3092 : ) -> anyhow::Result<()> {
1105 3092 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1106 3092 : if self
1107 3092 : .tline
1108 3092 : .get_rel_exists(rel, Version::Modified(self), true, ctx)
1109 UBC 0 : .await?
1110 : {
1111 CBC 3092 : let size_key = rel_size_to_key(rel);
1112 : // Fetch the old size first
1113 3092 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1114 3092 :
1115 3092 : // Update the entry with the new size.
1116 3092 : let buf = nblocks.to_le_bytes();
1117 3092 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1118 3092 :
1119 3092 : // Update relation size cache
1120 3092 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1121 3092 :
1122 3092 : // Update relation size cache
1123 3092 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1124 3092 :
1125 3092 : // Update logical database size.
1126 3092 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1127 UBC 0 : }
1128 CBC 3092 : Ok(())
1129 3092 : }
1130 :
1131 : /// Extend relation
1132 : /// If new size is smaller, do nothing.
1133 1460341 : pub async fn put_rel_extend(
1134 1460341 : &mut self,
1135 1460341 : rel: RelTag,
1136 1460341 : nblocks: BlockNumber,
1137 1460341 : ctx: &RequestContext,
1138 1460341 : ) -> anyhow::Result<()> {
1139 1460341 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1140 :
1141 : // Put size
1142 1460341 : let size_key = rel_size_to_key(rel);
1143 1460341 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1144 1460337 :
1145 1460337 : // only extend relation here. never decrease the size
1146 1460337 : if nblocks > old_size {
1147 963136 : let buf = nblocks.to_le_bytes();
1148 963136 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1149 963136 :
1150 963136 : // Update relation size cache
1151 963136 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1152 963136 :
1153 963136 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1154 963136 : }
1155 1460337 : Ok(())
1156 1460341 : }
1157 :
1158 : /// Drop a relation.
1159 17594 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1160 17594 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1161 :
1162 : // Remove it from the directory entry
1163 17594 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1164 17594 : let buf = self.get(dir_key, ctx).await?;
1165 17594 : let mut dir = RelDirectory::des(&buf)?;
1166 :
1167 17594 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1168 17594 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1169 : } else {
1170 UBC 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1171 : }
1172 :
1173 : // update logical size
1174 CBC 17594 : let size_key = rel_size_to_key(rel);
1175 17594 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1176 17594 : self.pending_nblocks -= old_size as i64;
1177 17594 :
1178 17594 : // Remove enty from relation size cache
1179 17594 : self.tline.remove_cached_rel_size(&rel);
1180 17594 :
1181 17594 : // Delete size entry, as well as all blocks
1182 17594 : self.delete(rel_key_range(rel));
1183 17594 :
1184 17594 : Ok(())
1185 17594 : }
1186 :
1187 1620 : pub async fn put_slru_segment_creation(
1188 1620 : &mut self,
1189 1620 : kind: SlruKind,
1190 1620 : segno: u32,
1191 1620 : nblocks: BlockNumber,
1192 1620 : ctx: &RequestContext,
1193 1620 : ) -> anyhow::Result<()> {
1194 1620 : // Add it to the directory entry
1195 1620 : let dir_key = slru_dir_to_key(kind);
1196 1620 : let buf = self.get(dir_key, ctx).await?;
1197 1620 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1198 :
1199 1620 : if !dir.segments.insert(segno) {
1200 UBC 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1201 CBC 1620 : }
1202 1620 : self.put(
1203 1620 : dir_key,
1204 1620 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1205 : );
1206 :
1207 : // Put size
1208 1620 : let size_key = slru_segment_size_to_key(kind, segno);
1209 1620 : let buf = nblocks.to_le_bytes();
1210 1620 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1211 1620 :
1212 1620 : // even if nblocks > 0, we don't insert any actual blocks here
1213 1620 :
1214 1620 : Ok(())
1215 1620 : }
1216 :
1217 : /// Extend SLRU segment
1218 658 : pub fn put_slru_extend(
1219 658 : &mut self,
1220 658 : kind: SlruKind,
1221 658 : segno: u32,
1222 658 : nblocks: BlockNumber,
1223 658 : ) -> anyhow::Result<()> {
1224 658 : // Put size
1225 658 : let size_key = slru_segment_size_to_key(kind, segno);
1226 658 : let buf = nblocks.to_le_bytes();
1227 658 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1228 658 : Ok(())
1229 658 : }
1230 :
1231 : /// This method is used for marking truncated SLRU files
1232 9 : pub async fn drop_slru_segment(
1233 9 : &mut self,
1234 9 : kind: SlruKind,
1235 9 : segno: u32,
1236 9 : ctx: &RequestContext,
1237 9 : ) -> anyhow::Result<()> {
1238 9 : // Remove it from the directory entry
1239 9 : let dir_key = slru_dir_to_key(kind);
1240 9 : let buf = self.get(dir_key, ctx).await?;
1241 9 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1242 :
1243 9 : if !dir.segments.remove(&segno) {
1244 UBC 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1245 CBC 9 : }
1246 9 : self.put(
1247 9 : dir_key,
1248 9 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1249 : );
1250 :
1251 : // Delete size entry, as well as all blocks
1252 9 : self.delete(slru_segment_key_range(kind, segno));
1253 9 :
1254 9 : Ok(())
1255 9 : }
1256 :
1257 : /// Drop a relmapper file (pg_filenode.map)
1258 UBC 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1259 0 : // TODO
1260 0 : Ok(())
1261 0 : }
1262 :
1263 : /// This method is used for marking truncated SLRU files
1264 CBC 2 : pub async fn drop_twophase_file(
1265 2 : &mut self,
1266 2 : xid: TransactionId,
1267 2 : ctx: &RequestContext,
1268 2 : ) -> anyhow::Result<()> {
1269 : // Remove it from the directory entry
1270 2 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1271 2 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1272 :
1273 2 : if !dir.xids.remove(&xid) {
1274 UBC 0 : warn!("twophase file for xid {} does not exist", xid);
1275 CBC 2 : }
1276 2 : self.put(
1277 2 : TWOPHASEDIR_KEY,
1278 2 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1279 : );
1280 :
1281 : // Delete it
1282 2 : self.delete(twophase_key_range(xid));
1283 2 :
1284 2 : Ok(())
1285 2 : }
1286 :
1287 2719 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1288 2719 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1289 2719 : files: HashMap::new(),
1290 2719 : })?;
1291 2719 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1292 2719 : Ok(())
1293 2719 : }
1294 :
1295 105 : pub async fn put_file(
1296 105 : &mut self,
1297 105 : path: &str,
1298 105 : content: &[u8],
1299 105 : ctx: &RequestContext,
1300 105 : ) -> anyhow::Result<()> {
1301 105 : let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
1302 103 : Ok(buf) => AuxFilesDirectory::des(&buf)?,
1303 2 : Err(e) => {
1304 : // This is expected: historical databases do not have the key.
1305 UBC 0 : debug!("Failed to get info about AUX files: {}", e);
1306 CBC 2 : AuxFilesDirectory {
1307 2 : files: HashMap::new(),
1308 2 : }
1309 : }
1310 : };
1311 105 : let path = path.to_string();
1312 105 : if content.is_empty() {
1313 4 : dir.files.remove(&path);
1314 101 : } else {
1315 101 : dir.files.insert(path, Bytes::copy_from_slice(content));
1316 101 : }
1317 105 : self.put(
1318 105 : AUX_FILES_KEY,
1319 105 : Value::Image(Bytes::from(
1320 105 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1321 : )),
1322 : );
1323 105 : Ok(())
1324 105 : }
1325 :
1326 : ///
1327 : /// Flush changes accumulated so far to the underlying repository.
1328 : ///
1329 : /// Usually, changes made in DatadirModification are atomic, but this allows
1330 : /// you to flush them to the underlying repository before the final `commit`.
1331 : /// That allows to free up the memory used to hold the pending changes.
1332 : ///
1333 : /// Currently only used during bulk import of a data directory. In that
1334 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1335 : /// whole import fails and the timeline will be deleted anyway.
1336 : /// (Or to be precise, it will be left behind for debugging purposes and
1337 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1338 : ///
1339 : /// Note: A consequence of flushing the pending operations is that they
1340 : /// won't be visible to subsequent operations until `commit`. The function
1341 : /// retains all the metadata, but data pages are flushed. That's again OK
1342 : /// for bulk import, where you are just loading data pages and won't try to
1343 : /// modify the same pages twice.
1344 506812 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1345 506812 : // Unless we have accumulated a decent amount of changes, it's not worth it
1346 506812 : // to scan through the pending_updates list.
1347 506812 : let pending_nblocks = self.pending_nblocks;
1348 506812 : if pending_nblocks < 10000 {
1349 506812 : return Ok(());
1350 UBC 0 : }
1351 :
1352 0 : let writer = self.tline.writer().await;
1353 :
1354 : // Flush relation and SLRU data blocks, keep metadata.
1355 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1356 0 : for (key, values) in self.pending_updates.drain() {
1357 0 : for (lsn, value) in values {
1358 0 : if is_rel_block_key(&key) || is_slru_block_key(key) {
1359 : // This bails out on first error without modifying pending_updates.
1360 : // That's Ok, cf this function's doc comment.
1361 0 : writer.put(key, lsn, &value, ctx).await?;
1362 0 : } else {
1363 0 : retained_pending_updates
1364 0 : .entry(key)
1365 0 : .or_default()
1366 0 : .push((lsn, value));
1367 0 : }
1368 : }
1369 : }
1370 :
1371 0 : self.pending_updates = retained_pending_updates;
1372 0 :
1373 0 : if pending_nblocks != 0 {
1374 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1375 0 : self.pending_nblocks = 0;
1376 0 : }
1377 :
1378 0 : Ok(())
1379 CBC 506812 : }
1380 :
1381 : ///
1382 : /// Finish this atomic update, writing all the updated keys to the
1383 : /// underlying timeline.
1384 : /// All the modifications in this atomic update are stamped by the specified LSN.
1385 : ///
1386 1333218 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1387 1333217 : let writer = self.tline.writer().await;
1388 :
1389 1333217 : let pending_nblocks = self.pending_nblocks;
1390 1333217 : self.pending_nblocks = 0;
1391 1333217 :
1392 1333217 : if !self.pending_updates.is_empty() {
1393 1168369 : writer.put_batch(&self.pending_updates, ctx).await?;
1394 1168369 : self.pending_updates.clear();
1395 164848 : }
1396 :
1397 1333217 : if !self.pending_deletions.is_empty() {
1398 6569 : writer.delete_batch(&self.pending_deletions).await?;
1399 6569 : self.pending_deletions.clear();
1400 1326648 : }
1401 :
1402 1333217 : self.pending_lsns.push(self.lsn);
1403 48755534 : for pending_lsn in self.pending_lsns.drain(..) {
1404 48755534 : // Ideally, we should be able to call writer.finish_write() only once
1405 48755534 : // with the highest LSN. However, the last_record_lsn variable in the
1406 48755534 : // timeline keeps track of the latest LSN and the immediate previous LSN
1407 48755534 : // so we need to record every LSN to not leave a gap between them.
1408 48755534 : writer.finish_write(pending_lsn);
1409 48755534 : }
1410 :
1411 1333217 : if pending_nblocks != 0 {
1412 427894 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1413 905323 : }
1414 :
1415 1333217 : Ok(())
1416 1333217 : }
1417 :
1418 94845227 : pub(crate) fn len(&self) -> usize {
1419 94845227 : self.pending_updates.len() + self.pending_deletions.len()
1420 94845227 : }
1421 :
1422 : // Internal helper functions to batch the modifications
1423 :
1424 2614147 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1425 : // Have we already updated the same key? Read the latest pending updated
1426 : // version in that case.
1427 : //
1428 : // Note: we don't check pending_deletions. It is an error to request a
1429 : // value that has been removed, deletion only avoids leaking storage.
1430 2614147 : if let Some(values) = self.pending_updates.get(&key) {
1431 2107182 : if let Some((_, value)) = values.last() {
1432 2107182 : return if let Value::Image(img) = value {
1433 2107182 : Ok(img.clone())
1434 : } else {
1435 : // Currently, we never need to read back a WAL record that we
1436 : // inserted in the same "transaction". All the metadata updates
1437 : // work directly with Images, and we never need to read actual
1438 : // data pages. We could handle this if we had to, by calling
1439 : // the walredo manager, but let's keep it simple for now.
1440 UBC 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1441 0 : "unexpected pending WAL record"
1442 0 : )))
1443 : };
1444 0 : }
1445 CBC 506965 : }
1446 506965 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1447 506965 : self.tline.get(key, lsn, ctx).await
1448 2614147 : }
1449 :
1450 53055503 : fn put(&mut self, key: Key, val: Value) {
1451 53055503 : let values = self.pending_updates.entry(key).or_default();
1452 : // Replace the previous value if it exists at the same lsn
1453 53055503 : if let Some((last_lsn, last_value)) = values.last_mut() {
1454 44848600 : if *last_lsn == self.lsn {
1455 526141 : *last_value = val;
1456 526141 : return;
1457 44322459 : }
1458 8206903 : }
1459 52529362 : values.push((self.lsn, val));
1460 53055503 : }
1461 :
1462 17608 : fn delete(&mut self, key_range: Range<Key>) {
1463 17608 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1464 17608 : self.pending_deletions.push((key_range, self.lsn));
1465 17608 : }
1466 : }
1467 :
1468 : /// This struct facilitates accessing either a committed key from the timeline at a
1469 : /// specific LSN, or the latest uncommitted key from a pending modification.
1470 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1471 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1472 : /// need to look up the keys in the modification first before looking them up in the
1473 : /// timeline to not miss the latest updates.
1474 UBC 0 : #[derive(Clone, Copy)]
1475 : pub enum Version<'a> {
1476 : Lsn(Lsn),
1477 : Modified(&'a DatadirModification<'a>),
1478 : }
1479 :
1480 : impl<'a> Version<'a> {
1481 CBC 3950258 : async fn get(
1482 3950258 : &self,
1483 3950258 : timeline: &Timeline,
1484 3950258 : key: Key,
1485 3950258 : ctx: &RequestContext,
1486 3950258 : ) -> Result<Bytes, PageReconstructError> {
1487 3950258 : match self {
1488 3882978 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1489 67280 : Version::Modified(modification) => modification.get(key, ctx).await,
1490 : }
1491 3950232 : }
1492 :
1493 4142033 : fn get_lsn(&self) -> Lsn {
1494 4142033 : match self {
1495 3871042 : Version::Lsn(lsn) => *lsn,
1496 270991 : Version::Modified(modification) => modification.lsn,
1497 : }
1498 4142033 : }
1499 : }
1500 :
1501 : //--- Metadata structs stored in key-value pairs in the repository.
1502 :
1503 527345 : #[derive(Debug, Serialize, Deserialize)]
1504 : struct DbDirectory {
1505 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1506 : dbdirs: HashMap<(Oid, Oid), bool>,
1507 : }
1508 :
1509 1287 : #[derive(Debug, Serialize, Deserialize)]
1510 : struct TwoPhaseDirectory {
1511 : xids: HashSet<TransactionId>,
1512 : }
1513 :
1514 1081654 : #[derive(Debug, Serialize, Deserialize, Default)]
1515 : struct RelDirectory {
1516 : // Set of relations that exist. (relfilenode, forknum)
1517 : //
1518 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1519 : // key-value pairs, if you have a lot of relations
1520 : rels: HashSet<(Oid, u8)>,
1521 : }
1522 :
1523 5648 : #[derive(Debug, Serialize, Deserialize, Default)]
1524 : struct AuxFilesDirectory {
1525 : files: HashMap<String, Bytes>,
1526 : }
1527 :
1528 UBC 0 : #[derive(Debug, Serialize, Deserialize)]
1529 : struct RelSizeEntry {
1530 : nblocks: u32,
1531 : }
1532 :
1533 CBC 9421 : #[derive(Debug, Serialize, Deserialize, Default)]
1534 : struct SlruSegmentDirectory {
1535 : // Set of SLRU segments that exist.
1536 : segments: HashSet<u32>,
1537 : }
1538 :
1539 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1540 :
1541 : // Layout of the Key address space
1542 : //
1543 : // The Key struct, used to address the underlying key-value store, consists of
1544 : // 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
1545 : // all the data and metadata keys into those 18 bytes.
1546 : //
1547 : // Principles for the mapping:
1548 : //
1549 : // - Things that are often accessed or modified together, should be close to
1550 : // each other in the key space. For example, if a relation is extended by one
1551 : // block, we create a new key-value pair for the block data, and update the
1552 : // relation size entry. Because of that, the RelSize key comes after all the
1553 : // RelBlocks of a relation: the RelSize and the last RelBlock are always next
1554 : // to each other.
1555 : //
1556 : // The key space is divided into four major sections, identified by the first
1557 : // byte, and the form a hierarchy:
1558 : //
1559 : // 00 Relation data and metadata
1560 : //
1561 : // DbDir () -> (dbnode, spcnode)
1562 : // Filenodemap
1563 : // RelDir -> relnode forknum
1564 : // RelBlocks
1565 : // RelSize
1566 : //
1567 : // 01 SLRUs
1568 : //
1569 : // SlruDir kind
1570 : // SlruSegBlocks segno
1571 : // SlruSegSize
1572 : //
1573 : // 02 pg_twophase
1574 : //
1575 : // 03 misc
1576 : // Controlfile
1577 : // checkpoint
1578 : // pg_version
1579 : //
1580 : // 04 aux files
1581 : //
1582 : // Below is a full list of the keyspace allocation:
1583 : //
1584 : // DbDir:
1585 : // 00 00000000 00000000 00000000 00 00000000
1586 : //
1587 : // Filenodemap:
1588 : // 00 SPCNODE DBNODE 00000000 00 00000000
1589 : //
1590 : // RelDir:
1591 : // 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
1592 : //
1593 : // RelBlock:
1594 : // 00 SPCNODE DBNODE RELNODE FORK BLKNUM
1595 : //
1596 : // RelSize:
1597 : // 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
1598 : //
1599 : // SlruDir:
1600 : // 01 kind 00000000 00000000 00 00000000
1601 : //
1602 : // SlruSegBlock:
1603 : // 01 kind 00000001 SEGNO 00 BLKNUM
1604 : //
1605 : // SlruSegSize:
1606 : // 01 kind 00000001 SEGNO 00 FFFFFFFF
1607 : //
1608 : // TwoPhaseDir:
1609 : // 02 00000000 00000000 00000000 00 00000000
1610 : //
1611 : // TwoPhaseFile:
1612 : // 02 00000000 00000000 00000000 00 XID
1613 : //
1614 : // ControlFile:
1615 : // 03 00000000 00000000 00000000 00 00000000
1616 : //
1617 : // Checkpoint:
1618 : // 03 00000000 00000000 00000000 00 00000001
1619 : //
1620 : // AuxFiles:
1621 : // 03 00000000 00000000 00000000 00 00000002
1622 : //
1623 :
1624 : //-- Section 01: relation data and metadata
1625 :
1626 : const DBDIR_KEY: Key = Key {
1627 : field1: 0x00,
1628 : field2: 0,
1629 : field3: 0,
1630 : field4: 0,
1631 : field5: 0,
1632 : field6: 0,
1633 : };
1634 :
1635 3 : fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
1636 3 : Key {
1637 3 : field1: 0x00,
1638 3 : field2: spcnode,
1639 3 : field3: dbnode,
1640 3 : field4: 0,
1641 3 : field5: 0,
1642 3 : field6: 0,
1643 3 : }..Key {
1644 3 : field1: 0x00,
1645 3 : field2: spcnode,
1646 3 : field3: dbnode,
1647 3 : field4: 0xffffffff,
1648 3 : field5: 0xff,
1649 3 : field6: 0xffffffff,
1650 3 : }
1651 3 : }
1652 :
1653 7031 : fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
1654 7031 : Key {
1655 7031 : field1: 0x00,
1656 7031 : field2: spcnode,
1657 7031 : field3: dbnode,
1658 7031 : field4: 0,
1659 7031 : field5: 0,
1660 7031 : field6: 0,
1661 7031 : }
1662 7031 : }
1663 :
1664 650083 : fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
1665 650083 : Key {
1666 650083 : field1: 0x00,
1667 650083 : field2: spcnode,
1668 650083 : field3: dbnode,
1669 650083 : field4: 0,
1670 650083 : field5: 0,
1671 650083 : field6: 1,
1672 650083 : }
1673 650083 : }
1674 :
1675 105429802 : pub(crate) fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
1676 105429802 : Key {
1677 105429802 : field1: 0x00,
1678 105429802 : field2: rel.spcnode,
1679 105429802 : field3: rel.dbnode,
1680 105429802 : field4: rel.relnode,
1681 105429802 : field5: rel.forknum,
1682 105429802 : field6: blknum,
1683 105429802 : }
1684 105429802 : }
1685 :
1686 3506303 : fn rel_size_to_key(rel: RelTag) -> Key {
1687 3506303 : Key {
1688 3506303 : field1: 0x00,
1689 3506303 : field2: rel.spcnode,
1690 3506303 : field3: rel.dbnode,
1691 3506303 : field4: rel.relnode,
1692 3506303 : field5: rel.forknum,
1693 3506303 : field6: 0xffffffff,
1694 3506303 : }
1695 3506303 : }
1696 :
1697 17594 : fn rel_key_range(rel: RelTag) -> Range<Key> {
1698 17594 : Key {
1699 17594 : field1: 0x00,
1700 17594 : field2: rel.spcnode,
1701 17594 : field3: rel.dbnode,
1702 17594 : field4: rel.relnode,
1703 17594 : field5: rel.forknum,
1704 17594 : field6: 0,
1705 17594 : }..Key {
1706 17594 : field1: 0x00,
1707 17594 : field2: rel.spcnode,
1708 17594 : field3: rel.dbnode,
1709 17594 : field4: rel.relnode,
1710 17594 : field5: rel.forknum + 1,
1711 17594 : field6: 0,
1712 17594 : }
1713 17594 : }
1714 :
1715 : //-- Section 02: SLRUs
1716 :
1717 11134 : fn slru_dir_to_key(kind: SlruKind) -> Key {
1718 11134 : Key {
1719 11134 : field1: 0x01,
1720 11134 : field2: match kind {
1721 6050 : SlruKind::Clog => 0x00,
1722 2685 : SlruKind::MultiXactMembers => 0x01,
1723 2399 : SlruKind::MultiXactOffsets => 0x02,
1724 : },
1725 : field3: 0,
1726 : field4: 0,
1727 : field5: 0,
1728 : field6: 0,
1729 : }
1730 11134 : }
1731 :
1732 1861667 : fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
1733 1861667 : Key {
1734 1861667 : field1: 0x01,
1735 1861667 : field2: match kind {
1736 1807672 : SlruKind::Clog => 0x00,
1737 27564 : SlruKind::MultiXactMembers => 0x01,
1738 26431 : SlruKind::MultiXactOffsets => 0x02,
1739 : },
1740 : field3: 1,
1741 1861667 : field4: segno,
1742 1861667 : field5: 0,
1743 1861667 : field6: blknum,
1744 1861667 : }
1745 1861667 : }
1746 :
1747 9828 : fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
1748 9828 : Key {
1749 9828 : field1: 0x01,
1750 9828 : field2: match kind {
1751 5732 : SlruKind::Clog => 0x00,
1752 2341 : SlruKind::MultiXactMembers => 0x01,
1753 1755 : SlruKind::MultiXactOffsets => 0x02,
1754 : },
1755 : field3: 1,
1756 9828 : field4: segno,
1757 9828 : field5: 0,
1758 9828 : field6: 0xffffffff,
1759 9828 : }
1760 9828 : }
1761 :
1762 9 : fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
1763 9 : let field2 = match kind {
1764 9 : SlruKind::Clog => 0x00,
1765 UBC 0 : SlruKind::MultiXactMembers => 0x01,
1766 0 : SlruKind::MultiXactOffsets => 0x02,
1767 : };
1768 :
1769 CBC 9 : Key {
1770 9 : field1: 0x01,
1771 9 : field2,
1772 9 : field3: 1,
1773 9 : field4: segno,
1774 9 : field5: 0,
1775 9 : field6: 0,
1776 9 : }..Key {
1777 9 : field1: 0x01,
1778 9 : field2,
1779 9 : field3: 1,
1780 9 : field4: segno,
1781 9 : field5: 1,
1782 9 : field6: 0,
1783 9 : }
1784 9 : }
1785 :
1786 : //-- Section 03: pg_twophase
1787 :
1788 : const TWOPHASEDIR_KEY: Key = Key {
1789 : field1: 0x02,
1790 : field2: 0,
1791 : field3: 0,
1792 : field4: 0,
1793 : field5: 0,
1794 : field6: 0,
1795 : };
1796 :
1797 6 : fn twophase_file_key(xid: TransactionId) -> Key {
1798 6 : Key {
1799 6 : field1: 0x02,
1800 6 : field2: 0,
1801 6 : field3: 0,
1802 6 : field4: 0,
1803 6 : field5: 0,
1804 6 : field6: xid,
1805 6 : }
1806 6 : }
1807 :
1808 2 : fn twophase_key_range(xid: TransactionId) -> Range<Key> {
1809 2 : let (next_xid, overflowed) = xid.overflowing_add(1);
1810 2 :
1811 2 : Key {
1812 2 : field1: 0x02,
1813 2 : field2: 0,
1814 2 : field3: 0,
1815 2 : field4: 0,
1816 2 : field5: 0,
1817 2 : field6: xid,
1818 2 : }..Key {
1819 2 : field1: 0x02,
1820 2 : field2: 0,
1821 2 : field3: 0,
1822 2 : field4: 0,
1823 2 : field5: u8::from(overflowed),
1824 2 : field6: next_xid,
1825 2 : }
1826 2 : }
1827 :
1828 : //-- Section 03: Control file
1829 : const CONTROLFILE_KEY: Key = Key {
1830 : field1: 0x03,
1831 : field2: 0,
1832 : field3: 0,
1833 : field4: 0,
1834 : field5: 0,
1835 : field6: 0,
1836 : };
1837 :
1838 : const CHECKPOINT_KEY: Key = Key {
1839 : field1: 0x03,
1840 : field2: 0,
1841 : field3: 0,
1842 : field4: 0,
1843 : field5: 0,
1844 : field6: 1,
1845 : };
1846 :
1847 : const AUX_FILES_KEY: Key = Key {
1848 : field1: 0x03,
1849 : field2: 0,
1850 : field3: 0,
1851 : field4: 0,
1852 : field5: 0,
1853 : field6: 2,
1854 : };
1855 :
1856 : // Reverse mappings for a few Keys.
1857 : // These are needed by WAL redo manager.
1858 :
1859 : // AUX_FILES currently stores only data for logical replication (slots etc), and
1860 : // we don't preserve these on a branch because safekeepers can't follow timeline
1861 : // switch (and generally it likely should be optional), so ignore these.
1862 21465031 : pub fn is_inherited_key(key: Key) -> bool {
1863 21465031 : key != AUX_FILES_KEY
1864 21465031 : }
1865 :
1866 : /// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
1867 2057120 : pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
1868 2057120 : Ok(match key.field1 {
1869 2057120 : 0x00 => (
1870 2057120 : RelTag {
1871 2057120 : spcnode: key.field2,
1872 2057120 : dbnode: key.field3,
1873 2057120 : relnode: key.field4,
1874 2057120 : forknum: key.field5,
1875 2057120 : },
1876 2057120 : key.field6,
1877 2057120 : ),
1878 UBC 0 : _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
1879 : })
1880 CBC 2057120 : }
1881 UBC 0 : pub fn is_rel_fsm_block_key(key: Key) -> bool {
1882 0 : key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
1883 0 : }
1884 :
1885 0 : pub fn is_rel_vm_block_key(key: Key) -> bool {
1886 0 : key.field1 == 0x00
1887 0 : && key.field4 != 0
1888 0 : && key.field5 == VISIBILITYMAP_FORKNUM
1889 0 : && key.field6 != 0xffffffff
1890 0 : }
1891 :
1892 CBC 16158944 : pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
1893 16158944 : Ok(match key.field1 {
1894 : 0x01 => {
1895 16158944 : let kind = match key.field2 {
1896 16063072 : 0x00 => SlruKind::Clog,
1897 48210 : 0x01 => SlruKind::MultiXactMembers,
1898 47662 : 0x02 => SlruKind::MultiXactOffsets,
1899 UBC 0 : _ => anyhow::bail!("unrecognized slru kind 0x{:02x}", key.field2),
1900 : };
1901 CBC 16158944 : let segno = key.field4;
1902 16158944 : let blknum = key.field6;
1903 16158944 :
1904 16158944 : (kind, segno, blknum)
1905 : }
1906 UBC 0 : _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
1907 : })
1908 CBC 16158944 : }
1909 :
1910 UBC 0 : fn is_slru_block_key(key: Key) -> bool {
1911 0 : key.field1 == 0x01 // SLRU-related
1912 0 : && key.field3 == 0x00000001 // but not SlruDir
1913 0 : && key.field6 != 0xffffffff // and not SlruSegSize
1914 0 : }
1915 :
1916 : #[allow(clippy::bool_assert_comparison)]
1917 : #[cfg(test)]
1918 : mod tests {
1919 : //use super::repo_harness::*;
1920 : //use super::*;
1921 :
1922 : /*
1923 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1924 : let incremental = timeline.get_current_logical_size();
1925 : let non_incremental = timeline
1926 : .get_current_logical_size_non_incremental(lsn)
1927 : .unwrap();
1928 : assert_eq!(incremental, non_incremental);
1929 : }
1930 : */
1931 :
1932 : /*
1933 : ///
1934 : /// Test list_rels() function, with branches and dropped relations
1935 : ///
1936 : #[test]
1937 : fn test_list_rels_drop() -> Result<()> {
1938 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1939 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1940 : const TESTDB: u32 = 111;
1941 :
1942 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1943 : // after branching fails below
1944 : let mut writer = tline.begin_record(Lsn(0x10));
1945 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1946 : writer.finish()?;
1947 :
1948 : // Create a relation on the timeline
1949 : let mut writer = tline.begin_record(Lsn(0x20));
1950 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1951 : writer.finish()?;
1952 :
1953 : let writer = tline.begin_record(Lsn(0x00));
1954 : writer.finish()?;
1955 :
1956 : // Check that list_rels() lists it after LSN 2, but no before it
1957 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1958 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
1959 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
1960 :
1961 : // Create a branch, check that the relation is visible there
1962 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
1963 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
1964 : Some(timeline) => timeline,
1965 : None => panic!("Should have a local timeline"),
1966 : };
1967 : let newtline = DatadirTimelineImpl::new(newtline);
1968 : assert!(newtline
1969 : .list_rels(0, TESTDB, Lsn(0x30))?
1970 : .contains(&TESTREL_A));
1971 :
1972 : // Drop it on the branch
1973 : let mut new_writer = newtline.begin_record(Lsn(0x40));
1974 : new_writer.drop_relation(TESTREL_A)?;
1975 : new_writer.finish()?;
1976 :
1977 : // Check that it's no longer listed on the branch after the point where it was dropped
1978 : assert!(newtline
1979 : .list_rels(0, TESTDB, Lsn(0x30))?
1980 : .contains(&TESTREL_A));
1981 : assert!(!newtline
1982 : .list_rels(0, TESTDB, Lsn(0x40))?
1983 : .contains(&TESTREL_A));
1984 :
1985 : // Run checkpoint and garbage collection and check that it's still not visible
1986 : newtline.checkpoint(CheckpointConfig::Forced)?;
1987 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
1988 :
1989 : assert!(!newtline
1990 : .list_rels(0, TESTDB, Lsn(0x40))?
1991 : .contains(&TESTREL_A));
1992 :
1993 : Ok(())
1994 : }
1995 : */
1996 :
1997 : /*
1998 : #[test]
1999 : fn test_read_beyond_eof() -> Result<()> {
2000 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2001 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2002 :
2003 : make_some_layers(&tline, Lsn(0x20))?;
2004 : let mut writer = tline.begin_record(Lsn(0x60));
2005 : walingest.put_rel_page_image(
2006 : &mut writer,
2007 : TESTREL_A,
2008 : 0,
2009 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2010 : )?;
2011 : writer.finish()?;
2012 :
2013 : // Test read before rel creation. Should error out.
2014 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2015 :
2016 : // Read block beyond end of relation at different points in time.
2017 : // These reads should fall into different delta, image, and in-memory layers.
2018 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2019 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2020 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2021 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2022 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2023 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2024 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2025 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2026 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2027 :
2028 : // Test on an in-memory layer with no preceding layer
2029 : let mut writer = tline.begin_record(Lsn(0x70));
2030 : walingest.put_rel_page_image(
2031 : &mut writer,
2032 : TESTREL_B,
2033 : 0,
2034 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2035 : )?;
2036 : writer.finish()?;
2037 :
2038 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2039 :
2040 : Ok(())
2041 : }
2042 : */
2043 : }
|