Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::repository::*;
13 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
14 : use crate::walrecord::NeonWalRecord;
15 : use anyhow::{ensure, Context};
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use pageserver_api::key::{
19 : dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
20 : rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
21 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
22 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
23 : };
24 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
25 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
26 : use postgres_ffi::BLCKSZ;
27 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
28 : use serde::{Deserialize, Serialize};
29 : use std::collections::{hash_map, HashMap, HashSet};
30 : use std::ops::ControlFlow;
31 : use std::ops::Range;
32 : use strum::IntoEnumIterator;
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::{debug, trace, warn};
35 : use utils::bin_ser::DeserializeError;
36 : use utils::{bin_ser::BeSer, lsn::Lsn};
37 :
38 0 : #[derive(Debug)]
39 : pub enum LsnForTimestamp {
40 : /// Found commits both before and after the given timestamp
41 : Present(Lsn),
42 :
43 : /// Found no commits after the given timestamp, this means
44 : /// that the newest data in the branch is older than the given
45 : /// timestamp.
46 : ///
47 : /// All commits <= LSN happened before the given timestamp
48 : Future(Lsn),
49 :
50 : /// The queried timestamp is past our horizon we look back at (PITR)
51 : ///
52 : /// All commits > LSN happened after the given timestamp,
53 : /// but any commits < LSN might have happened before or after
54 : /// the given timestamp. We don't know because no data before
55 : /// the given lsn is available.
56 : Past(Lsn),
57 :
58 : /// We have found no commit with a timestamp,
59 : /// so we can't return anything meaningful.
60 : ///
61 : /// The associated LSN is the lower bound value we can safely
62 : /// create branches on, but no statement is made if it is
63 : /// older or newer than the timestamp.
64 : ///
65 : /// This variant can e.g. be returned right after a
66 : /// cluster import.
67 : NoData(Lsn),
68 : }
69 :
70 0 : #[derive(Debug, thiserror::Error)]
71 : pub enum CalculateLogicalSizeError {
72 : #[error("cancelled")]
73 : Cancelled,
74 : #[error(transparent)]
75 : Other(#[from] anyhow::Error),
76 : }
77 :
78 2 : #[derive(Debug, thiserror::Error)]
79 : pub(crate) enum CollectKeySpaceError {
80 : #[error(transparent)]
81 : Decode(#[from] DeserializeError),
82 : #[error(transparent)]
83 : PageRead(PageReconstructError),
84 : #[error("cancelled")]
85 : Cancelled,
86 : }
87 :
88 : impl From<PageReconstructError> for CollectKeySpaceError {
89 1 : fn from(err: PageReconstructError) -> Self {
90 1 : match err {
91 0 : PageReconstructError::Cancelled => Self::Cancelled,
92 1 : err => Self::PageRead(err),
93 : }
94 1 : }
95 : }
96 :
97 : impl From<PageReconstructError> for CalculateLogicalSizeError {
98 29 : fn from(pre: PageReconstructError) -> Self {
99 29 : match pre {
100 : PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
101 27 : Self::Cancelled
102 : }
103 2 : _ => Self::Other(pre.into()),
104 : }
105 29 : }
106 : }
107 :
108 0 : #[derive(Debug, thiserror::Error)]
109 : pub enum RelationError {
110 : #[error("Relation Already Exists")]
111 : AlreadyExists,
112 : #[error("invalid relnode")]
113 : InvalidRelnode,
114 : #[error(transparent)]
115 : Other(#[from] anyhow::Error),
116 : }
117 :
118 : ///
119 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
120 : /// and other special kinds of files, in a versioned key-value store. The
121 : /// Timeline struct provides the key-value store.
122 : ///
123 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
124 : /// implementation, and might be moved into a separate struct later.
125 : impl Timeline {
126 : /// Start ingesting a WAL record, or other atomic modification of
127 : /// the timeline.
128 : ///
129 : /// This provides a transaction-like interface to perform a bunch
130 : /// of modifications atomically.
131 : ///
132 : /// To ingest a WAL record, call begin_modification(lsn) to get a
133 : /// DatadirModification object. Use the functions in the object to
134 : /// modify the repository state, updating all the pages and metadata
135 : /// that the WAL record affects. When you're done, call commit() to
136 : /// commit the changes.
137 : ///
138 : /// Lsn stored in modification is advanced by `ingest_record` and
139 : /// is used by `commit()` to update `last_record_lsn`.
140 : ///
141 : /// Calling commit() will flush all the changes and reset the state,
142 : /// so the `DatadirModification` struct can be reused to perform the next modification.
143 : ///
144 : /// Note that any pending modifications you make through the
145 : /// modification object won't be visible to calls to the 'get' and list
146 : /// functions of the timeline until you finish! And if you update the
147 : /// same page twice, the last update wins.
148 : ///
149 1018095 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
150 1018095 : where
151 1018095 : Self: Sized,
152 1018095 : {
153 1018095 : DatadirModification {
154 1018095 : tline: self,
155 1018095 : pending_lsns: Vec::new(),
156 1018095 : pending_updates: HashMap::new(),
157 1018095 : pending_deletions: Vec::new(),
158 1018095 : pending_nblocks: 0,
159 1018095 : pending_aux_files: None,
160 1018095 : pending_directory_entries: Vec::new(),
161 1018095 : lsn,
162 1018095 : }
163 1018095 : }
164 :
165 : //------------------------------------------------------------------------------
166 : // Public GET functions
167 : //------------------------------------------------------------------------------
168 :
169 : /// Look up given page version.
170 4470425 : pub(crate) async fn get_rel_page_at_lsn(
171 4470425 : &self,
172 4470425 : tag: RelTag,
173 4470425 : blknum: BlockNumber,
174 4470425 : version: Version<'_>,
175 4470425 : latest: bool,
176 4470425 : ctx: &RequestContext,
177 4470425 : ) -> Result<Bytes, PageReconstructError> {
178 4470425 : if tag.relnode == 0 {
179 0 : return Err(PageReconstructError::Other(
180 0 : RelationError::InvalidRelnode.into(),
181 0 : ));
182 4470425 : }
183 :
184 4470425 : let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
185 4470425 : if blknum >= nblocks {
186 0 : debug!(
187 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
188 0 : tag,
189 0 : blknum,
190 0 : version.get_lsn(),
191 0 : nblocks
192 0 : );
193 124652 : return Ok(ZERO_PAGE.clone());
194 4345773 : }
195 4345773 :
196 4345773 : let key = rel_block_to_key(tag, blknum);
197 4345773 : version.get(self, key, ctx).await
198 4470424 : }
199 :
200 : // Get size of a database in blocks
201 8 : pub(crate) async fn get_db_size(
202 8 : &self,
203 8 : spcnode: Oid,
204 8 : dbnode: Oid,
205 8 : version: Version<'_>,
206 8 : latest: bool,
207 8 : ctx: &RequestContext,
208 8 : ) -> Result<usize, PageReconstructError> {
209 8 : let mut total_blocks = 0;
210 :
211 8 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
212 :
213 2351 : for rel in rels {
214 2343 : let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
215 2343 : total_blocks += n_blocks as usize;
216 : }
217 8 : Ok(total_blocks)
218 8 : }
219 :
220 : /// Get size of a relation file
221 4612051 : pub(crate) async fn get_rel_size(
222 4612051 : &self,
223 4612051 : tag: RelTag,
224 4612051 : version: Version<'_>,
225 4612051 : latest: bool,
226 4612051 : ctx: &RequestContext,
227 4612051 : ) -> Result<BlockNumber, PageReconstructError> {
228 4612051 : if tag.relnode == 0 {
229 0 : return Err(PageReconstructError::Other(
230 0 : RelationError::InvalidRelnode.into(),
231 0 : ));
232 4612051 : }
233 :
234 4612051 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
235 4304878 : return Ok(nblocks);
236 307173 : }
237 307173 :
238 307173 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
239 7667 : && !self.get_rel_exists(tag, version, latest, ctx).await?
240 : {
241 : // FIXME: Postgres sometimes calls smgrcreate() to create
242 : // FSM, and smgrnblocks() on it immediately afterwards,
243 : // without extending it. Tolerate that by claiming that
244 : // any non-existent FSM fork has size 0.
245 11 : return Ok(0);
246 307162 : }
247 307162 :
248 307162 : let key = rel_size_to_key(tag);
249 307162 : let mut buf = version.get(self, key, ctx).await?;
250 307158 : let nblocks = buf.get_u32_le();
251 307158 :
252 307158 : if latest {
253 155937 : // Update relation size cache only if "latest" flag is set.
254 155937 : // This flag is set by compute when it is working with most recent version of relation.
255 155937 : // Typically master compute node always set latest=true.
256 155937 : // Please notice, that even if compute node "by mistake" specifies old LSN but set
257 155937 : // latest=true, then it can not cause cache corruption, because with latest=true
258 155937 : // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
259 155937 : // associated with most recent value of LSN.
260 155937 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
261 155937 : }
262 307158 : Ok(nblocks)
263 4612051 : }
264 :
265 : /// Does relation exist?
266 430565 : pub(crate) async fn get_rel_exists(
267 430565 : &self,
268 430565 : tag: RelTag,
269 430565 : version: Version<'_>,
270 430565 : _latest: bool,
271 430565 : ctx: &RequestContext,
272 430565 : ) -> Result<bool, PageReconstructError> {
273 430565 : if tag.relnode == 0 {
274 0 : return Err(PageReconstructError::Other(
275 0 : RelationError::InvalidRelnode.into(),
276 0 : ));
277 430565 : }
278 :
279 : // first try to lookup relation in cache
280 430565 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
281 167901 : return Ok(true);
282 262664 : }
283 262664 : // fetch directory listing
284 262664 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
285 262664 : let buf = version.get(self, key, ctx).await?;
286 :
287 262664 : match RelDirectory::des(&buf).context("deserialization failure") {
288 262664 : Ok(dir) => {
289 262664 : let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
290 262664 : Ok(exists)
291 : }
292 0 : Err(e) => Err(PageReconstructError::from(e)),
293 : }
294 430565 : }
295 :
296 : /// Get a list of all existing relations in given tablespace and database.
297 : ///
298 : /// # Cancel-Safety
299 : ///
300 : /// This method is cancellation-safe.
301 8170 : pub(crate) async fn list_rels(
302 8170 : &self,
303 8170 : spcnode: Oid,
304 8170 : dbnode: Oid,
305 8170 : version: Version<'_>,
306 8170 : ctx: &RequestContext,
307 8170 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
308 8170 : // fetch directory listing
309 8170 : let key = rel_dir_to_key(spcnode, dbnode);
310 8170 : let buf = version.get(self, key, ctx).await?;
311 :
312 8170 : match RelDirectory::des(&buf).context("deserialization failure") {
313 8170 : Ok(dir) => {
314 8170 : let rels: HashSet<RelTag> =
315 1937069 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
316 1937069 : spcnode,
317 1937069 : dbnode,
318 1937069 : relnode: *relnode,
319 1937069 : forknum: *forknum,
320 1937069 : }));
321 8170 :
322 8170 : Ok(rels)
323 : }
324 0 : Err(e) => Err(PageReconstructError::from(e)),
325 : }
326 8170 : }
327 :
328 : /// Get the whole SLRU segment
329 0 : pub(crate) async fn get_slru_segment(
330 0 : &self,
331 0 : kind: SlruKind,
332 0 : segno: u32,
333 0 : lsn: Lsn,
334 0 : ctx: &RequestContext,
335 0 : ) -> Result<Bytes, PageReconstructError> {
336 0 : let n_blocks = self
337 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
338 0 : .await?;
339 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
340 0 : for blkno in 0..n_blocks {
341 0 : let block = self
342 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
343 0 : .await?;
344 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
345 : }
346 0 : Ok(segment.freeze())
347 0 : }
348 :
349 : /// Look up given SLRU page version.
350 2918 : pub(crate) async fn get_slru_page_at_lsn(
351 2918 : &self,
352 2918 : kind: SlruKind,
353 2918 : segno: u32,
354 2918 : blknum: BlockNumber,
355 2918 : lsn: Lsn,
356 2918 : ctx: &RequestContext,
357 2918 : ) -> Result<Bytes, PageReconstructError> {
358 2918 : let key = slru_block_to_key(kind, segno, blknum);
359 274951 : self.get(key, lsn, ctx).await
360 2917 : }
361 :
362 : /// Get size of an SLRU segment
363 6355 : pub(crate) async fn get_slru_segment_size(
364 6355 : &self,
365 6355 : kind: SlruKind,
366 6355 : segno: u32,
367 6355 : version: Version<'_>,
368 6355 : ctx: &RequestContext,
369 6355 : ) -> Result<BlockNumber, PageReconstructError> {
370 6355 : let key = slru_segment_size_to_key(kind, segno);
371 6355 : let mut buf = version.get(self, key, ctx).await?;
372 6355 : Ok(buf.get_u32_le())
373 6355 : }
374 :
375 : /// Get size of an SLRU segment
376 1642 : pub(crate) async fn get_slru_segment_exists(
377 1642 : &self,
378 1642 : kind: SlruKind,
379 1642 : segno: u32,
380 1642 : version: Version<'_>,
381 1642 : ctx: &RequestContext,
382 1642 : ) -> Result<bool, PageReconstructError> {
383 1642 : // fetch directory listing
384 1642 : let key = slru_dir_to_key(kind);
385 1642 : let buf = version.get(self, key, ctx).await?;
386 :
387 1642 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
388 1642 : Ok(dir) => {
389 1642 : let exists = dir.segments.get(&segno).is_some();
390 1642 : Ok(exists)
391 : }
392 0 : Err(e) => Err(PageReconstructError::from(e)),
393 : }
394 1642 : }
395 :
396 : /// Locate LSN, such that all transactions that committed before
397 : /// 'search_timestamp' are visible, but nothing newer is.
398 : ///
399 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
400 : /// so it's not well defined which LSN you get if there were multiple commits
401 : /// "in flight" at that point in time.
402 : ///
403 181 : pub(crate) async fn find_lsn_for_timestamp(
404 181 : &self,
405 181 : search_timestamp: TimestampTz,
406 181 : cancel: &CancellationToken,
407 181 : ctx: &RequestContext,
408 181 : ) -> Result<LsnForTimestamp, PageReconstructError> {
409 181 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
410 181 : // We use this method to figure out the branching LSN for the new branch, but the
411 181 : // GC cutoff could be before the branching point and we cannot create a new branch
412 181 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
413 181 : // on the safe side.
414 181 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
415 181 : let max_lsn = self.get_last_record_lsn();
416 181 :
417 181 : // LSNs are always 8-byte aligned. low/mid/high represent the
418 181 : // LSN divided by 8.
419 181 : let mut low = min_lsn.0 / 8;
420 181 : let mut high = max_lsn.0 / 8 + 1;
421 181 :
422 181 : let mut found_smaller = false;
423 181 : let mut found_larger = false;
424 3088 : while low < high {
425 2908 : if cancel.is_cancelled() {
426 0 : return Err(PageReconstructError::Cancelled);
427 2908 : }
428 2908 : // cannot overflow, high and low are both smaller than u64::MAX / 2
429 2908 : let mid = (high + low) / 2;
430 :
431 2908 : let cmp = self
432 2908 : .is_latest_commit_timestamp_ge_than(
433 2908 : search_timestamp,
434 2908 : Lsn(mid * 8),
435 2908 : &mut found_smaller,
436 2908 : &mut found_larger,
437 2908 : ctx,
438 2908 : )
439 276993 : .await?;
440 :
441 2907 : if cmp {
442 765 : high = mid;
443 2142 : } else {
444 2142 : low = mid + 1;
445 2142 : }
446 : }
447 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
448 : // so the LSN of the last commit record before or at `search_timestamp`.
449 : // Remove one from `low` to get `t`.
450 : //
451 : // FIXME: it would be better to get the LSN of the previous commit.
452 : // Otherwise, if you restore to the returned LSN, the database will
453 : // include physical changes from later commits that will be marked
454 : // as aborted, and will need to be vacuumed away.
455 180 : let commit_lsn = Lsn((low - 1) * 8);
456 180 : match (found_smaller, found_larger) {
457 : (false, false) => {
458 : // This can happen if no commit records have been processed yet, e.g.
459 : // just after importing a cluster.
460 24 : Ok(LsnForTimestamp::NoData(min_lsn))
461 : }
462 : (false, true) => {
463 : // Didn't find any commit timestamps smaller than the request
464 19 : Ok(LsnForTimestamp::Past(min_lsn))
465 : }
466 : (true, false) => {
467 : // Only found commits with timestamps smaller than the request.
468 : // It's still a valid case for branch creation, return it.
469 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
470 : // case, anyway.
471 80 : Ok(LsnForTimestamp::Future(commit_lsn))
472 : }
473 57 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
474 : }
475 180 : }
476 :
477 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
478 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
479 : ///
480 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
481 : /// with a smaller/larger timestamp.
482 : ///
483 2908 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
484 2908 : &self,
485 2908 : search_timestamp: TimestampTz,
486 2908 : probe_lsn: Lsn,
487 2908 : found_smaller: &mut bool,
488 2908 : found_larger: &mut bool,
489 2908 : ctx: &RequestContext,
490 2908 : ) -> Result<bool, PageReconstructError> {
491 2908 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
492 2695 : if timestamp >= search_timestamp {
493 765 : *found_larger = true;
494 765 : return ControlFlow::Break(true);
495 1930 : } else {
496 1930 : *found_smaller = true;
497 1930 : }
498 1930 : ControlFlow::Continue(())
499 2908 : })
500 276993 : .await
501 2907 : }
502 :
503 : /// Obtain the possible timestamp range for the given lsn.
504 : ///
505 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
506 12 : pub(crate) async fn get_timestamp_for_lsn(
507 12 : &self,
508 12 : probe_lsn: Lsn,
509 12 : ctx: &RequestContext,
510 12 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
511 12 : let mut max: Option<TimestampTz> = None;
512 12 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
513 10 : if let Some(max_prev) = max {
514 0 : max = Some(max_prev.max(timestamp));
515 10 : } else {
516 10 : max = Some(timestamp);
517 10 : }
518 10 : ControlFlow::Continue(())
519 12 : })
520 82 : .await?;
521 :
522 10 : Ok(max)
523 12 : }
524 :
525 : /// Runs the given function on all the timestamps for a given lsn
526 : ///
527 : /// The return value is either given by the closure, or set to the `Default`
528 : /// impl's output.
529 2920 : async fn map_all_timestamps<T: Default>(
530 2920 : &self,
531 2920 : probe_lsn: Lsn,
532 2920 : ctx: &RequestContext,
533 2920 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
534 2920 : ) -> Result<T, PageReconstructError> {
535 2920 : for segno in self
536 2920 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
537 1084 : .await?
538 : {
539 2918 : let nblocks = self
540 2918 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
541 1040 : .await?;
542 2918 : for blknum in (0..nblocks).rev() {
543 2918 : let clog_page = self
544 2918 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
545 274951 : .await?;
546 :
547 2917 : if clog_page.len() == BLCKSZ as usize + 8 {
548 2705 : let mut timestamp_bytes = [0u8; 8];
549 2705 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
550 2705 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
551 2705 :
552 2705 : match f(timestamp) {
553 765 : ControlFlow::Break(b) => return Ok(b),
554 1940 : ControlFlow::Continue(()) => (),
555 : }
556 212 : }
557 : }
558 : }
559 2152 : Ok(Default::default())
560 2919 : }
561 :
562 607 : pub(crate) async fn get_slru_keyspace(
563 607 : &self,
564 607 : version: Version<'_>,
565 607 : ctx: &RequestContext,
566 607 : ) -> Result<KeySpace, PageReconstructError> {
567 607 : let mut accum = KeySpaceAccum::new();
568 :
569 2410 : for kind in SlruKind::iter() {
570 1809 : let mut segments: Vec<u32> = self
571 1809 : .list_slru_segments(kind, version, ctx)
572 160 : .await?
573 1803 : .into_iter()
574 1803 : .collect();
575 1803 : segments.sort_unstable();
576 :
577 3642 : for seg in segments {
578 1839 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
579 :
580 1839 : accum.add_range(
581 1839 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
582 1839 : );
583 : }
584 : }
585 :
586 601 : Ok(accum.to_keyspace())
587 607 : }
588 :
589 : /// Get a list of SLRU segments
590 4732 : pub(crate) async fn list_slru_segments(
591 4732 : &self,
592 4732 : kind: SlruKind,
593 4732 : version: Version<'_>,
594 4732 : ctx: &RequestContext,
595 4732 : ) -> Result<HashSet<u32>, PageReconstructError> {
596 4732 : // fetch directory entry
597 4732 : let key = slru_dir_to_key(kind);
598 :
599 4732 : let buf = version.get(self, key, ctx).await?;
600 4724 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
601 4724 : Ok(dir) => Ok(dir.segments),
602 0 : Err(e) => Err(PageReconstructError::from(e)),
603 : }
604 4732 : }
605 :
606 2438 : pub(crate) async fn get_relmap_file(
607 2438 : &self,
608 2438 : spcnode: Oid,
609 2438 : dbnode: Oid,
610 2438 : version: Version<'_>,
611 2438 : ctx: &RequestContext,
612 2438 : ) -> Result<Bytes, PageReconstructError> {
613 2438 : let key = relmap_file_key(spcnode, dbnode);
614 :
615 2438 : let buf = version.get(self, key, ctx).await?;
616 2438 : Ok(buf)
617 2438 : }
618 :
619 601 : pub(crate) async fn list_dbdirs(
620 601 : &self,
621 601 : lsn: Lsn,
622 601 : ctx: &RequestContext,
623 601 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
624 : // fetch directory entry
625 601 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
626 :
627 601 : match DbDirectory::des(&buf).context("deserialization failure") {
628 601 : Ok(dir) => Ok(dir.dbdirs),
629 0 : Err(e) => Err(PageReconstructError::from(e)),
630 : }
631 601 : }
632 :
633 2 : pub(crate) async fn get_twophase_file(
634 2 : &self,
635 2 : xid: TransactionId,
636 2 : lsn: Lsn,
637 2 : ctx: &RequestContext,
638 2 : ) -> Result<Bytes, PageReconstructError> {
639 2 : let key = twophase_file_key(xid);
640 2 : let buf = self.get(key, lsn, ctx).await?;
641 2 : Ok(buf)
642 2 : }
643 :
644 601 : pub(crate) async fn list_twophase_files(
645 601 : &self,
646 601 : lsn: Lsn,
647 601 : ctx: &RequestContext,
648 601 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
649 : // fetch directory entry
650 601 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
651 :
652 601 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
653 601 : Ok(dir) => Ok(dir.xids),
654 0 : Err(e) => Err(PageReconstructError::from(e)),
655 : }
656 601 : }
657 :
658 595 : pub(crate) async fn get_control_file(
659 595 : &self,
660 595 : lsn: Lsn,
661 595 : ctx: &RequestContext,
662 595 : ) -> Result<Bytes, PageReconstructError> {
663 595 : self.get(CONTROLFILE_KEY, lsn, ctx).await
664 595 : }
665 :
666 1965 : pub(crate) async fn get_checkpoint(
667 1965 : &self,
668 1965 : lsn: Lsn,
669 1965 : ctx: &RequestContext,
670 1965 : ) -> Result<Bytes, PageReconstructError> {
671 1965 : self.get(CHECKPOINT_KEY, lsn, ctx).await
672 1965 : }
673 :
674 2422 : pub(crate) async fn list_aux_files(
675 2422 : &self,
676 2422 : lsn: Lsn,
677 2422 : ctx: &RequestContext,
678 2422 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
679 2422 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
680 1430 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
681 1430 : Ok(dir) => Ok(dir.files),
682 0 : Err(e) => Err(PageReconstructError::from(e)),
683 : },
684 992 : Err(e) => {
685 : // This is expected: historical databases do not have the key.
686 0 : debug!("Failed to get info about AUX files: {}", e);
687 992 : Ok(HashMap::new())
688 : }
689 : }
690 2422 : }
691 :
692 : /// Does the same as get_current_logical_size but counted on demand.
693 : /// Used to initialize the logical size tracking on startup.
694 : ///
695 : /// Only relation blocks are counted currently. That excludes metadata,
696 : /// SLRUs, twophase files etc.
697 : ///
698 : /// # Cancel-Safety
699 : ///
700 : /// This method is cancellation-safe.
701 689 : pub async fn get_current_logical_size_non_incremental(
702 689 : &self,
703 689 : lsn: Lsn,
704 689 : ctx: &RequestContext,
705 689 : ) -> Result<u64, CalculateLogicalSizeError> {
706 689 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
707 :
708 : // Fetch list of database dirs and iterate them
709 932 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
710 682 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
711 :
712 682 : let mut total_size: u64 = 0;
713 2698 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
714 625253 : for rel in self
715 2698 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
716 363 : .await?
717 : {
718 625253 : if self.cancel.is_cancelled() {
719 1 : return Err(CalculateLogicalSizeError::Cancelled);
720 625252 : }
721 625252 : let relsize_key = rel_size_to_key(rel);
722 625252 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
723 625219 : let relsize = buf.get_u32_le();
724 625219 :
725 625219 : total_size += relsize as u64;
726 : }
727 : }
728 648 : Ok(total_size * BLCKSZ as u64)
729 678 : }
730 :
731 : ///
732 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
733 : /// Anything that's not listed maybe removed from the underlying storage (from
734 : /// that LSN forwards).
735 907 : pub(crate) async fn collect_keyspace(
736 907 : &self,
737 907 : lsn: Lsn,
738 907 : ctx: &RequestContext,
739 907 : ) -> Result<KeySpace, CollectKeySpaceError> {
740 907 : // Iterate through key ranges, greedily packing them into partitions
741 907 : let mut result = KeySpaceAccum::new();
742 907 :
743 907 : // The dbdir metadata always exists
744 907 : result.add_key(DBDIR_KEY);
745 :
746 : // Fetch list of database dirs and iterate them
747 2297 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
748 906 : let dbdir = DbDirectory::des(&buf)?;
749 :
750 906 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
751 906 : dbs.sort_unstable();
752 3927 : for (spcnode, dbnode) in dbs {
753 3022 : result.add_key(relmap_file_key(spcnode, dbnode));
754 3022 : result.add_key(rel_dir_to_key(spcnode, dbnode));
755 :
756 3022 : let mut rels: Vec<RelTag> = self
757 3022 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
758 654 : .await?
759 3022 : .into_iter()
760 3022 : .collect();
761 3022 : rels.sort_unstable();
762 731883 : for rel in rels {
763 728862 : let relsize_key = rel_size_to_key(rel);
764 728862 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
765 728861 : let relsize = buf.get_u32_le();
766 728861 :
767 728861 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
768 728861 : result.add_key(relsize_key);
769 : }
770 : }
771 :
772 : // Iterate SLRUs next
773 2715 : for kind in [
774 905 : SlruKind::Clog,
775 905 : SlruKind::MultiXactMembers,
776 905 : SlruKind::MultiXactOffsets,
777 : ] {
778 2715 : let slrudir_key = slru_dir_to_key(kind);
779 2715 : result.add_key(slrudir_key);
780 8465 : let buf = self.get(slrudir_key, lsn, ctx).await?;
781 2715 : let dir = SlruSegmentDirectory::des(&buf)?;
782 2715 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
783 2715 : segments.sort_unstable();
784 4932 : for segno in segments {
785 2217 : let segsize_key = slru_segment_size_to_key(kind, segno);
786 2217 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
787 2217 : let segsize = buf.get_u32_le();
788 2217 :
789 2217 : result.add_range(
790 2217 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
791 2217 : );
792 2217 : result.add_key(segsize_key);
793 : }
794 : }
795 :
796 : // Then pg_twophase
797 905 : result.add_key(TWOPHASEDIR_KEY);
798 3170 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
799 905 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
800 905 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
801 905 : xids.sort_unstable();
802 905 : for xid in xids {
803 0 : result.add_key(twophase_file_key(xid));
804 0 : }
805 :
806 905 : result.add_key(CONTROLFILE_KEY);
807 905 : result.add_key(CHECKPOINT_KEY);
808 905 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
809 632 : result.add_key(AUX_FILES_KEY);
810 632 : }
811 905 : Ok(result.to_keyspace())
812 906 : }
813 :
814 : /// Get cached size of relation if it not updated after specified LSN
815 61931885 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
816 61931885 : let rel_size_cache = self.rel_size_cache.read().unwrap();
817 61931885 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
818 61572835 : if lsn >= *cached_lsn {
819 61357462 : return Some(*nblocks);
820 215373 : }
821 359050 : }
822 574423 : None
823 61931885 : }
824 :
825 : /// Update cached relation size if there is no more recent update
826 155937 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
827 155937 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
828 155937 : match rel_size_cache.entry(tag) {
829 128967 : hash_map::Entry::Occupied(mut entry) => {
830 128967 : let cached_lsn = entry.get_mut();
831 128967 : if lsn >= cached_lsn.0 {
832 12 : *cached_lsn = (lsn, nblocks);
833 128955 : }
834 : }
835 26970 : hash_map::Entry::Vacant(entry) => {
836 26970 : entry.insert((lsn, nblocks));
837 26970 : }
838 : }
839 155937 : }
840 :
841 : /// Store cached relation size
842 1928349 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
843 1928349 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
844 1928349 : rel_size_cache.insert(tag, (lsn, nblocks));
845 1928349 : }
846 :
847 : /// Remove cached relation size
848 67312 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
849 67312 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
850 67312 : rel_size_cache.remove(tag);
851 67312 : }
852 : }
853 :
854 : /// DatadirModification represents an operation to ingest an atomic set of
855 : /// updates to the repository. It is created by the 'begin_record'
856 : /// function. It is called for each WAL record, so that all the modifications
857 : /// by a one WAL record appear atomic.
858 : pub struct DatadirModification<'a> {
859 : /// The timeline this modification applies to. You can access this to
860 : /// read the state, but note that any pending updates are *not* reflected
861 : /// in the state in 'tline' yet.
862 : pub tline: &'a Timeline,
863 :
864 : /// Current LSN of the modification
865 : lsn: Lsn,
866 :
867 : // The modifications are not applied directly to the underlying key-value store.
868 : // The put-functions add the modifications here, and they are flushed to the
869 : // underlying key-value store by the 'finish' function.
870 : pending_lsns: Vec<Lsn>,
871 : pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
872 : pending_deletions: Vec<(Range<Key>, Lsn)>,
873 : pending_nblocks: i64,
874 :
875 : // If we already wrote any aux file changes in this modification, stash the latest dir. If set,
876 : // [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
877 : // if AUX_FILES_KEY is already set.
878 : pending_aux_files: Option<AuxFilesDirectory>,
879 :
880 : /// For special "directory" keys that store key-value maps, track the size of the map
881 : /// if it was updated in this modification.
882 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
883 : }
884 :
885 : impl<'a> DatadirModification<'a> {
886 : /// Get the current lsn
887 56889269 : pub(crate) fn get_lsn(&self) -> Lsn {
888 56889269 : self.lsn
889 56889269 : }
890 :
891 : /// Set the current lsn
892 72693847 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
893 72693847 : ensure!(
894 72693847 : lsn >= self.lsn,
895 0 : "setting an older lsn {} than {} is not allowed",
896 : lsn,
897 : self.lsn
898 : );
899 72693847 : if lsn > self.lsn {
900 72693847 : self.pending_lsns.push(self.lsn);
901 72693847 : self.lsn = lsn;
902 72693847 : }
903 72693847 : Ok(())
904 72693847 : }
905 :
906 : /// Initialize a completely new repository.
907 : ///
908 : /// This inserts the directory metadata entries that are assumed to
909 : /// always exist.
910 680 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
911 680 : let buf = DbDirectory::ser(&DbDirectory {
912 680 : dbdirs: HashMap::new(),
913 680 : })?;
914 680 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
915 680 : self.put(DBDIR_KEY, Value::Image(buf.into()));
916 680 :
917 680 : // Create AuxFilesDirectory
918 680 : self.init_aux_dir()?;
919 :
920 680 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
921 680 : xids: HashSet::new(),
922 680 : })?;
923 680 : self.pending_directory_entries
924 680 : .push((DirectoryKind::TwoPhase, 0));
925 680 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
926 :
927 680 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
928 680 : let empty_dir = Value::Image(buf);
929 680 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
930 680 : self.pending_directory_entries
931 680 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
932 680 : self.put(
933 680 : slru_dir_to_key(SlruKind::MultiXactMembers),
934 680 : empty_dir.clone(),
935 680 : );
936 680 : self.pending_directory_entries
937 680 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
938 680 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
939 680 : self.pending_directory_entries
940 680 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
941 680 :
942 680 : Ok(())
943 680 : }
944 :
945 : #[cfg(test)]
946 70 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
947 70 : self.init_empty()?;
948 70 : self.put_control_file(bytes::Bytes::from_static(
949 70 : b"control_file contents do not matter",
950 70 : ))
951 70 : .context("put_control_file")?;
952 70 : self.put_checkpoint(bytes::Bytes::from_static(
953 70 : b"checkpoint_file contents do not matter",
954 70 : ))
955 70 : .context("put_checkpoint_file")?;
956 70 : Ok(())
957 70 : }
958 :
959 : /// Put a new page version that can be constructed from a WAL record
960 : ///
961 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
962 : /// current end-of-file. It's up to the caller to check that the relation size
963 : /// matches the blocks inserted!
964 52739517 : pub fn put_rel_wal_record(
965 52739517 : &mut self,
966 52739517 : rel: RelTag,
967 52739517 : blknum: BlockNumber,
968 52739517 : rec: NeonWalRecord,
969 52739517 : ) -> anyhow::Result<()> {
970 52739517 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
971 52739517 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
972 52739517 : Ok(())
973 52739517 : }
974 :
975 : // Same, but for an SLRU.
976 6091255 : pub fn put_slru_wal_record(
977 6091255 : &mut self,
978 6091255 : kind: SlruKind,
979 6091255 : segno: u32,
980 6091255 : blknum: BlockNumber,
981 6091255 : rec: NeonWalRecord,
982 6091255 : ) -> anyhow::Result<()> {
983 6091255 : self.put(
984 6091255 : slru_block_to_key(kind, segno, blknum),
985 6091255 : Value::WalRecord(rec),
986 6091255 : );
987 6091255 : Ok(())
988 6091255 : }
989 :
990 : /// Like put_wal_record, but with ready-made image of the page.
991 2591084 : pub fn put_rel_page_image(
992 2591084 : &mut self,
993 2591084 : rel: RelTag,
994 2591084 : blknum: BlockNumber,
995 2591084 : img: Bytes,
996 2591084 : ) -> anyhow::Result<()> {
997 2591084 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
998 2591084 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
999 2591084 : Ok(())
1000 2591084 : }
1001 :
1002 3478 : pub fn put_slru_page_image(
1003 3478 : &mut self,
1004 3478 : kind: SlruKind,
1005 3478 : segno: u32,
1006 3478 : blknum: BlockNumber,
1007 3478 : img: Bytes,
1008 3478 : ) -> anyhow::Result<()> {
1009 3478 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
1010 3478 : Ok(())
1011 3478 : }
1012 :
1013 : /// Store a relmapper file (pg_filenode.map) in the repository
1014 2530 : pub async fn put_relmap_file(
1015 2530 : &mut self,
1016 2530 : spcnode: Oid,
1017 2530 : dbnode: Oid,
1018 2530 : img: Bytes,
1019 2530 : ctx: &RequestContext,
1020 2530 : ) -> anyhow::Result<()> {
1021 : // Add it to the directory (if it doesn't exist already)
1022 2530 : let buf = self.get(DBDIR_KEY, ctx).await?;
1023 2530 : let mut dbdir = DbDirectory::des(&buf)?;
1024 :
1025 2530 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1026 2530 : if r.is_none() || r == Some(false) {
1027 : // The dbdir entry didn't exist, or it contained a
1028 : // 'false'. The 'insert' call already updated it with
1029 : // 'true', now write the updated 'dbdirs' map back.
1030 2468 : let buf = DbDirectory::ser(&dbdir)?;
1031 2468 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1032 2468 :
1033 2468 : // Create AuxFilesDirectory as well
1034 2468 : self.init_aux_dir()?;
1035 62 : }
1036 2530 : if r.is_none() {
1037 36 : // Create RelDirectory
1038 36 : let buf = RelDirectory::ser(&RelDirectory {
1039 36 : rels: HashSet::new(),
1040 36 : })?;
1041 36 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1042 36 : self.put(
1043 36 : rel_dir_to_key(spcnode, dbnode),
1044 36 : Value::Image(Bytes::from(buf)),
1045 36 : );
1046 2494 : }
1047 :
1048 2530 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1049 2530 : Ok(())
1050 2530 : }
1051 :
1052 4 : pub async fn put_twophase_file(
1053 4 : &mut self,
1054 4 : xid: TransactionId,
1055 4 : img: Bytes,
1056 4 : ctx: &RequestContext,
1057 4 : ) -> anyhow::Result<()> {
1058 : // Add it to the directory entry
1059 4 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1060 4 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1061 4 : if !dir.xids.insert(xid) {
1062 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1063 4 : }
1064 4 : self.pending_directory_entries
1065 4 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1066 4 : self.put(
1067 4 : TWOPHASEDIR_KEY,
1068 4 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1069 : );
1070 :
1071 4 : self.put(twophase_file_key(xid), Value::Image(img));
1072 4 : Ok(())
1073 4 : }
1074 :
1075 678 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1076 678 : self.put(CONTROLFILE_KEY, Value::Image(img));
1077 678 : Ok(())
1078 678 : }
1079 :
1080 34992 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1081 34992 : self.put(CHECKPOINT_KEY, Value::Image(img));
1082 34992 : Ok(())
1083 34992 : }
1084 :
1085 3 : pub async fn drop_dbdir(
1086 3 : &mut self,
1087 3 : spcnode: Oid,
1088 3 : dbnode: Oid,
1089 3 : ctx: &RequestContext,
1090 3 : ) -> anyhow::Result<()> {
1091 3 : let total_blocks = self
1092 3 : .tline
1093 3 : .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
1094 0 : .await?;
1095 :
1096 : // Remove entry from dbdir
1097 3 : let buf = self.get(DBDIR_KEY, ctx).await?;
1098 3 : let mut dir = DbDirectory::des(&buf)?;
1099 3 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1100 3 : let buf = DbDirectory::ser(&dir)?;
1101 3 : self.pending_directory_entries
1102 3 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1103 3 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1104 : } else {
1105 0 : warn!(
1106 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1107 0 : spcnode, dbnode
1108 0 : );
1109 : }
1110 :
1111 : // Update logical database size.
1112 3 : self.pending_nblocks -= total_blocks as i64;
1113 3 :
1114 3 : // Delete all relations and metadata files for the spcnode/dnode
1115 3 : self.delete(dbdir_key_range(spcnode, dbnode));
1116 3 : Ok(())
1117 3 : }
1118 :
1119 : /// Create a relation fork.
1120 : ///
1121 : /// 'nblocks' is the initial size.
1122 653184 : pub async fn put_rel_creation(
1123 653184 : &mut self,
1124 653184 : rel: RelTag,
1125 653184 : nblocks: BlockNumber,
1126 653184 : ctx: &RequestContext,
1127 653184 : ) -> Result<(), RelationError> {
1128 653184 : if rel.relnode == 0 {
1129 0 : return Err(RelationError::InvalidRelnode);
1130 653184 : }
1131 : // It's possible that this is the first rel for this db in this
1132 : // tablespace. Create the reldir entry for it if so.
1133 653184 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1134 653184 : .context("deserialize db")?;
1135 653184 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1136 653184 : let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
1137 : // Didn't exist. Update dbdir
1138 2437 : dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
1139 2437 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1140 2437 : self.pending_directory_entries
1141 2437 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1142 2437 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1143 2437 :
1144 2437 : // and create the RelDirectory
1145 2437 : RelDirectory::default()
1146 : } else {
1147 : // reldir already exists, fetch it
1148 650747 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1149 650747 : .context("deserialize db")?
1150 : };
1151 :
1152 : // Add the new relation to the rel directory entry, and write it back
1153 653184 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1154 0 : return Err(RelationError::AlreadyExists);
1155 653184 : }
1156 653184 :
1157 653184 : self.pending_directory_entries
1158 653184 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1159 653184 :
1160 653184 : self.put(
1161 653184 : rel_dir_key,
1162 653184 : Value::Image(Bytes::from(
1163 653184 : RelDirectory::ser(&rel_dir).context("serialize")?,
1164 : )),
1165 : );
1166 :
1167 : // Put size
1168 653184 : let size_key = rel_size_to_key(rel);
1169 653184 : let buf = nblocks.to_le_bytes();
1170 653184 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1171 653184 :
1172 653184 : self.pending_nblocks += nblocks as i64;
1173 653184 :
1174 653184 : // Update relation size cache
1175 653184 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1176 653184 :
1177 653184 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1178 653184 : // caller.
1179 653184 : Ok(())
1180 653184 : }
1181 :
1182 : /// Truncate relation
1183 6344 : pub async fn put_rel_truncation(
1184 6344 : &mut self,
1185 6344 : rel: RelTag,
1186 6344 : nblocks: BlockNumber,
1187 6344 : ctx: &RequestContext,
1188 6344 : ) -> anyhow::Result<()> {
1189 6344 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1190 6344 : if self
1191 6344 : .tline
1192 6344 : .get_rel_exists(rel, Version::Modified(self), true, ctx)
1193 0 : .await?
1194 : {
1195 6344 : let size_key = rel_size_to_key(rel);
1196 : // Fetch the old size first
1197 6344 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1198 6344 :
1199 6344 : // Update the entry with the new size.
1200 6344 : let buf = nblocks.to_le_bytes();
1201 6344 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1202 6344 :
1203 6344 : // Update relation size cache
1204 6344 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1205 6344 :
1206 6344 : // Update relation size cache
1207 6344 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1208 6344 :
1209 6344 : // Update logical database size.
1210 6344 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1211 0 : }
1212 6344 : Ok(())
1213 6344 : }
1214 :
1215 : /// Extend relation
1216 : /// If new size is smaller, do nothing.
1217 1829588 : pub async fn put_rel_extend(
1218 1829588 : &mut self,
1219 1829588 : rel: RelTag,
1220 1829588 : nblocks: BlockNumber,
1221 1829588 : ctx: &RequestContext,
1222 1829588 : ) -> anyhow::Result<()> {
1223 1829588 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1224 :
1225 : // Put size
1226 1829588 : let size_key = rel_size_to_key(rel);
1227 1829588 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1228 1829584 :
1229 1829584 : // only extend relation here. never decrease the size
1230 1829584 : if nblocks > old_size {
1231 1262477 : let buf = nblocks.to_le_bytes();
1232 1262477 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1233 1262477 :
1234 1262477 : // Update relation size cache
1235 1262477 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1236 1262477 :
1237 1262477 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1238 1262477 : }
1239 1829584 : Ok(())
1240 1829588 : }
1241 :
1242 : /// Drop a relation.
1243 67312 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1244 67312 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1245 :
1246 : // Remove it from the directory entry
1247 67312 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1248 67312 : let buf = self.get(dir_key, ctx).await?;
1249 67312 : let mut dir = RelDirectory::des(&buf)?;
1250 :
1251 67312 : self.pending_directory_entries
1252 67312 : .push((DirectoryKind::Rel, dir.rels.len()));
1253 67312 :
1254 67312 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1255 67312 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1256 : } else {
1257 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1258 : }
1259 :
1260 : // update logical size
1261 67312 : let size_key = rel_size_to_key(rel);
1262 67312 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1263 67312 : self.pending_nblocks -= old_size as i64;
1264 67312 :
1265 67312 : // Remove enty from relation size cache
1266 67312 : self.tline.remove_cached_rel_size(&rel);
1267 67312 :
1268 67312 : // Delete size entry, as well as all blocks
1269 67312 : self.delete(rel_key_range(rel));
1270 67312 :
1271 67312 : Ok(())
1272 67312 : }
1273 :
1274 1872 : pub async fn put_slru_segment_creation(
1275 1872 : &mut self,
1276 1872 : kind: SlruKind,
1277 1872 : segno: u32,
1278 1872 : nblocks: BlockNumber,
1279 1872 : ctx: &RequestContext,
1280 1872 : ) -> anyhow::Result<()> {
1281 1872 : // Add it to the directory entry
1282 1872 : let dir_key = slru_dir_to_key(kind);
1283 1872 : let buf = self.get(dir_key, ctx).await?;
1284 1872 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1285 :
1286 1872 : if !dir.segments.insert(segno) {
1287 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1288 1872 : }
1289 1872 : self.pending_directory_entries
1290 1872 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1291 1872 : self.put(
1292 1872 : dir_key,
1293 1872 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1294 : );
1295 :
1296 : // Put size
1297 1872 : let size_key = slru_segment_size_to_key(kind, segno);
1298 1872 : let buf = nblocks.to_le_bytes();
1299 1872 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1300 1872 :
1301 1872 : // even if nblocks > 0, we don't insert any actual blocks here
1302 1872 :
1303 1872 : Ok(())
1304 1872 : }
1305 :
1306 : /// Extend SLRU segment
1307 1618 : pub fn put_slru_extend(
1308 1618 : &mut self,
1309 1618 : kind: SlruKind,
1310 1618 : segno: u32,
1311 1618 : nblocks: BlockNumber,
1312 1618 : ) -> anyhow::Result<()> {
1313 1618 : // Put size
1314 1618 : let size_key = slru_segment_size_to_key(kind, segno);
1315 1618 : let buf = nblocks.to_le_bytes();
1316 1618 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1317 1618 : Ok(())
1318 1618 : }
1319 :
1320 : /// This method is used for marking truncated SLRU files
1321 18 : pub async fn drop_slru_segment(
1322 18 : &mut self,
1323 18 : kind: SlruKind,
1324 18 : segno: u32,
1325 18 : ctx: &RequestContext,
1326 18 : ) -> anyhow::Result<()> {
1327 18 : // Remove it from the directory entry
1328 18 : let dir_key = slru_dir_to_key(kind);
1329 18 : let buf = self.get(dir_key, ctx).await?;
1330 18 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1331 :
1332 18 : if !dir.segments.remove(&segno) {
1333 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1334 18 : }
1335 18 : self.pending_directory_entries
1336 18 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1337 18 : self.put(
1338 18 : dir_key,
1339 18 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1340 : );
1341 :
1342 : // Delete size entry, as well as all blocks
1343 18 : self.delete(slru_segment_key_range(kind, segno));
1344 18 :
1345 18 : Ok(())
1346 18 : }
1347 :
1348 : /// Drop a relmapper file (pg_filenode.map)
1349 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1350 0 : // TODO
1351 0 : Ok(())
1352 0 : }
1353 :
1354 : /// This method is used for marking truncated SLRU files
1355 2 : pub async fn drop_twophase_file(
1356 2 : &mut self,
1357 2 : xid: TransactionId,
1358 2 : ctx: &RequestContext,
1359 2 : ) -> anyhow::Result<()> {
1360 : // Remove it from the directory entry
1361 2 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1362 2 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1363 :
1364 2 : if !dir.xids.remove(&xid) {
1365 0 : warn!("twophase file for xid {} does not exist", xid);
1366 2 : }
1367 2 : self.pending_directory_entries
1368 2 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1369 2 : self.put(
1370 2 : TWOPHASEDIR_KEY,
1371 2 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1372 : );
1373 :
1374 : // Delete it
1375 2 : self.delete(twophase_key_range(xid));
1376 2 :
1377 2 : Ok(())
1378 2 : }
1379 :
1380 3148 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1381 3148 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1382 3148 : files: HashMap::new(),
1383 3148 : })?;
1384 3148 : self.pending_directory_entries
1385 3148 : .push((DirectoryKind::AuxFiles, 0));
1386 3148 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1387 3148 : Ok(())
1388 3148 : }
1389 :
1390 124 : pub async fn put_file(
1391 124 : &mut self,
1392 124 : path: &str,
1393 124 : content: &[u8],
1394 124 : ctx: &RequestContext,
1395 124 : ) -> anyhow::Result<()> {
1396 124 : let file_path = path.to_string();
1397 124 : let content = if content.is_empty() {
1398 7 : None
1399 : } else {
1400 117 : Some(Bytes::copy_from_slice(content))
1401 : };
1402 :
1403 124 : let dir = if let Some(mut dir) = self.pending_aux_files.take() {
1404 : // We already updated aux files in `self`: emit a delta and update our latest value
1405 :
1406 95 : self.put(
1407 95 : AUX_FILES_KEY,
1408 95 : Value::WalRecord(NeonWalRecord::AuxFile {
1409 95 : file_path: file_path.clone(),
1410 95 : content: content.clone(),
1411 95 : }),
1412 95 : );
1413 95 :
1414 95 : dir.upsert(file_path, content);
1415 95 : dir
1416 : } else {
1417 : // Check if the AUX_FILES_KEY is initialized
1418 29 : match self.get(AUX_FILES_KEY, ctx).await {
1419 24 : Ok(dir_bytes) => {
1420 24 : let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
1421 : // Key is already set, we may append a delta
1422 24 : self.put(
1423 24 : AUX_FILES_KEY,
1424 24 : Value::WalRecord(NeonWalRecord::AuxFile {
1425 24 : file_path: file_path.clone(),
1426 24 : content: content.clone(),
1427 24 : }),
1428 24 : );
1429 24 : dir.upsert(file_path, content);
1430 24 : dir
1431 : }
1432 : Err(
1433 0 : e @ (PageReconstructError::AncestorStopping(_)
1434 : | PageReconstructError::Cancelled
1435 : | PageReconstructError::AncestorLsnTimeout(_)),
1436 : ) => {
1437 : // Important that we do not interpret a shutdown error as "not found" and thereby
1438 : // reset the map.
1439 0 : return Err(e.into());
1440 : }
1441 : // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
1442 : // we are assuming that all _other_ possible errors represents a missing key. If some
1443 : // other error occurs, we may incorrectly reset the map of aux files.
1444 : Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
1445 : // Key is missing, we must insert an image as the basis for subsequent deltas.
1446 :
1447 5 : let mut dir = AuxFilesDirectory {
1448 5 : files: HashMap::new(),
1449 5 : };
1450 5 : dir.upsert(file_path, content);
1451 5 : self.put(
1452 5 : AUX_FILES_KEY,
1453 5 : Value::Image(Bytes::from(
1454 5 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1455 : )),
1456 : );
1457 5 : dir
1458 : }
1459 : }
1460 : };
1461 :
1462 124 : self.pending_directory_entries
1463 124 : .push((DirectoryKind::AuxFiles, dir.files.len()));
1464 124 : self.pending_aux_files = Some(dir);
1465 124 :
1466 124 : Ok(())
1467 124 : }
1468 :
1469 : ///
1470 : /// Flush changes accumulated so far to the underlying repository.
1471 : ///
1472 : /// Usually, changes made in DatadirModification are atomic, but this allows
1473 : /// you to flush them to the underlying repository before the final `commit`.
1474 : /// That allows to free up the memory used to hold the pending changes.
1475 : ///
1476 : /// Currently only used during bulk import of a data directory. In that
1477 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1478 : /// whole import fails and the timeline will be deleted anyway.
1479 : /// (Or to be precise, it will be left behind for debugging purposes and
1480 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1481 : ///
1482 : /// Note: A consequence of flushing the pending operations is that they
1483 : /// won't be visible to subsequent operations until `commit`. The function
1484 : /// retains all the metadata, but data pages are flushed. That's again OK
1485 : /// for bulk import, where you are just loading data pages and won't try to
1486 : /// modify the same pages twice.
1487 578076 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1488 578076 : // Unless we have accumulated a decent amount of changes, it's not worth it
1489 578076 : // to scan through the pending_updates list.
1490 578076 : let pending_nblocks = self.pending_nblocks;
1491 578076 : if pending_nblocks < 10000 {
1492 578076 : return Ok(());
1493 0 : }
1494 :
1495 0 : let writer = self.tline.writer().await;
1496 :
1497 : // Flush relation and SLRU data blocks, keep metadata.
1498 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1499 0 : for (key, values) in self.pending_updates.drain() {
1500 0 : for (lsn, value) in values {
1501 0 : if is_rel_block_key(&key) || is_slru_block_key(key) {
1502 : // This bails out on first error without modifying pending_updates.
1503 : // That's Ok, cf this function's doc comment.
1504 0 : writer.put(key, lsn, &value, ctx).await?;
1505 0 : } else {
1506 0 : retained_pending_updates
1507 0 : .entry(key)
1508 0 : .or_default()
1509 0 : .push((lsn, value));
1510 0 : }
1511 : }
1512 : }
1513 :
1514 0 : self.pending_updates = retained_pending_updates;
1515 0 :
1516 0 : if pending_nblocks != 0 {
1517 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1518 0 : self.pending_nblocks = 0;
1519 0 : }
1520 :
1521 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1522 0 : writer.update_directory_entries_count(kind, count as u64);
1523 0 : }
1524 :
1525 0 : Ok(())
1526 578076 : }
1527 :
1528 : ///
1529 : /// Finish this atomic update, writing all the updated keys to the
1530 : /// underlying timeline.
1531 : /// All the modifications in this atomic update are stamped by the specified LSN.
1532 : ///
1533 2096508 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1534 2096508 : let writer = self.tline.writer().await;
1535 :
1536 2096508 : let pending_nblocks = self.pending_nblocks;
1537 2096508 : self.pending_nblocks = 0;
1538 2096508 :
1539 2096508 : if !self.pending_updates.is_empty() {
1540 1674304 : writer.put_batch(&self.pending_updates, ctx).await?;
1541 1674303 : self.pending_updates.clear();
1542 422204 : }
1543 :
1544 2096507 : if !self.pending_deletions.is_empty() {
1545 19261 : writer.delete_batch(&self.pending_deletions).await?;
1546 19261 : self.pending_deletions.clear();
1547 2077246 : }
1548 :
1549 2096507 : self.pending_lsns.push(self.lsn);
1550 74789978 : for pending_lsn in self.pending_lsns.drain(..) {
1551 74789978 : // Ideally, we should be able to call writer.finish_write() only once
1552 74789978 : // with the highest LSN. However, the last_record_lsn variable in the
1553 74789978 : // timeline keeps track of the latest LSN and the immediate previous LSN
1554 74789978 : // so we need to record every LSN to not leave a gap between them.
1555 74789978 : writer.finish_write(pending_lsn);
1556 74789978 : }
1557 :
1558 2096507 : if pending_nblocks != 0 {
1559 640015 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1560 1456492 : }
1561 :
1562 2096507 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1563 730583 : writer.update_directory_entries_count(kind, count as u64);
1564 730583 : }
1565 :
1566 2096507 : Ok(())
1567 2096507 : }
1568 :
1569 145387682 : pub(crate) fn len(&self) -> usize {
1570 145387682 : self.pending_updates.len() + self.pending_deletions.len()
1571 145387682 : }
1572 :
1573 : // Internal helper functions to batch the modifications
1574 :
1575 3503511 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1576 : // Have we already updated the same key? Read the latest pending updated
1577 : // version in that case.
1578 : //
1579 : // Note: we don't check pending_deletions. It is an error to request a
1580 : // value that has been removed, deletion only avoids leaking storage.
1581 3503511 : if let Some(values) = self.pending_updates.get(&key) {
1582 2629426 : if let Some((_, value)) = values.last() {
1583 2629426 : return if let Value::Image(img) = value {
1584 2629426 : Ok(img.clone())
1585 : } else {
1586 : // Currently, we never need to read back a WAL record that we
1587 : // inserted in the same "transaction". All the metadata updates
1588 : // work directly with Images, and we never need to read actual
1589 : // data pages. We could handle this if we had to, by calling
1590 : // the walredo manager, but let's keep it simple for now.
1591 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1592 0 : "unexpected pending WAL record"
1593 0 : )))
1594 : };
1595 0 : }
1596 874085 : }
1597 874085 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1598 874085 : self.tline.get(key, lsn, ctx).await
1599 3503511 : }
1600 :
1601 64123041 : fn put(&mut self, key: Key, val: Value) {
1602 64123041 : let values = self.pending_updates.entry(key).or_default();
1603 : // Replace the previous value if it exists at the same lsn
1604 64123041 : if let Some((last_lsn, last_value)) = values.last_mut() {
1605 53878203 : if *last_lsn == self.lsn {
1606 649168 : *last_value = val;
1607 649168 : return;
1608 53229035 : }
1609 10244838 : }
1610 63473873 : values.push((self.lsn, val));
1611 64123041 : }
1612 :
1613 67335 : fn delete(&mut self, key_range: Range<Key>) {
1614 67335 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1615 67335 : self.pending_deletions.push((key_range, self.lsn));
1616 67335 : }
1617 : }
1618 :
1619 : /// This struct facilitates accessing either a committed key from the timeline at a
1620 : /// specific LSN, or the latest uncommitted key from a pending modification.
1621 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1622 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1623 : /// need to look up the keys in the modification first before looking them up in the
1624 : /// timeline to not miss the latest updates.
1625 0 : #[derive(Clone, Copy)]
1626 : pub enum Version<'a> {
1627 : Lsn(Lsn),
1628 : Modified(&'a DatadirModification<'a>),
1629 : }
1630 :
1631 : impl<'a> Version<'a> {
1632 4938936 : async fn get(
1633 4938936 : &self,
1634 4938936 : timeline: &Timeline,
1635 4938936 : key: Key,
1636 4938936 : ctx: &RequestContext,
1637 4938936 : ) -> Result<Bytes, PageReconstructError> {
1638 4938936 : match self {
1639 4714370 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1640 224566 : Version::Modified(modification) => modification.get(key, ctx).await,
1641 : }
1642 4938935 : }
1643 :
1644 5198553 : fn get_lsn(&self) -> Lsn {
1645 5198553 : match self {
1646 4713389 : Version::Lsn(lsn) => *lsn,
1647 485164 : Version::Modified(modification) => modification.lsn,
1648 : }
1649 5198553 : }
1650 : }
1651 :
1652 : //--- Metadata structs stored in key-value pairs in the repository.
1653 :
1654 657906 : #[derive(Debug, Serialize, Deserialize)]
1655 : struct DbDirectory {
1656 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1657 : dbdirs: HashMap<(Oid, Oid), bool>,
1658 : }
1659 :
1660 1512 : #[derive(Debug, Serialize, Deserialize)]
1661 : struct TwoPhaseDirectory {
1662 : xids: HashSet<TransactionId>,
1663 : }
1664 :
1665 1441064 : #[derive(Debug, Serialize, Deserialize, Default)]
1666 : struct RelDirectory {
1667 : // Set of relations that exist. (relfilenode, forknum)
1668 : //
1669 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1670 : // key-value pairs, if you have a lot of relations
1671 : rels: HashSet<(Oid, u8)>,
1672 : }
1673 :
1674 7679 : #[derive(Debug, Serialize, Deserialize, Default)]
1675 : pub(crate) struct AuxFilesDirectory {
1676 : pub(crate) files: HashMap<String, Bytes>,
1677 : }
1678 :
1679 : impl AuxFilesDirectory {
1680 1493 : pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
1681 1493 : if let Some(value) = value {
1682 1421 : self.files.insert(key, value);
1683 1421 : } else {
1684 72 : self.files.remove(&key);
1685 72 : }
1686 1493 : }
1687 : }
1688 :
1689 0 : #[derive(Debug, Serialize, Deserialize)]
1690 : struct RelSizeEntry {
1691 : nblocks: u32,
1692 : }
1693 :
1694 10971 : #[derive(Debug, Serialize, Deserialize, Default)]
1695 : struct SlruSegmentDirectory {
1696 : // Set of SLRU segments that exist.
1697 : segments: HashSet<u32>,
1698 : }
1699 :
1700 1461166 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
1701 : #[repr(u8)]
1702 : pub(crate) enum DirectoryKind {
1703 : Db,
1704 : TwoPhase,
1705 : Rel,
1706 : AuxFiles,
1707 : SlruSegment(SlruKind),
1708 : }
1709 :
1710 : impl DirectoryKind {
1711 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
1712 1461166 : pub(crate) fn offset(&self) -> usize {
1713 1461166 : self.into_usize()
1714 1461166 : }
1715 : }
1716 :
1717 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1718 :
1719 : #[allow(clippy::bool_assert_comparison)]
1720 : #[cfg(test)]
1721 : mod tests {
1722 : use hex_literal::hex;
1723 : use utils::id::TimelineId;
1724 :
1725 : use super::*;
1726 :
1727 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
1728 :
1729 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
1730 2 : #[tokio::test]
1731 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
1732 2 : let name = "aux_files_round_trip";
1733 2 : let harness = TenantHarness::create(name)?;
1734 2 :
1735 2 : pub const TIMELINE_ID: TimelineId =
1736 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
1737 2 :
1738 2 : let (tenant, ctx) = harness.load().await;
1739 2 : let tline = tenant
1740 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
1741 2 : .await?;
1742 2 : let tline = tline.raw_timeline().unwrap();
1743 2 :
1744 2 : // First modification: insert two keys
1745 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
1746 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
1747 2 : modification.set_lsn(Lsn(0x1008))?;
1748 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
1749 2 : modification.commit(&ctx).await?;
1750 2 : let expect_1008 = HashMap::from([
1751 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
1752 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
1753 2 : ]);
1754 2 :
1755 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
1756 2 : assert_eq!(readback, expect_1008);
1757 2 :
1758 2 : // Second modification: update one key, remove the other
1759 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
1760 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
1761 2 : modification.set_lsn(Lsn(0x2008))?;
1762 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
1763 2 : modification.commit(&ctx).await?;
1764 2 : let expect_2008 =
1765 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
1766 2 :
1767 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
1768 2 : assert_eq!(readback, expect_2008);
1769 2 :
1770 2 : // Reading back in time works
1771 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
1772 2 : assert_eq!(readback, expect_1008);
1773 2 :
1774 2 : Ok(())
1775 2 : }
1776 :
1777 : /*
1778 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1779 : let incremental = timeline.get_current_logical_size();
1780 : let non_incremental = timeline
1781 : .get_current_logical_size_non_incremental(lsn)
1782 : .unwrap();
1783 : assert_eq!(incremental, non_incremental);
1784 : }
1785 : */
1786 :
1787 : /*
1788 : ///
1789 : /// Test list_rels() function, with branches and dropped relations
1790 : ///
1791 : #[test]
1792 : fn test_list_rels_drop() -> Result<()> {
1793 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1794 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1795 : const TESTDB: u32 = 111;
1796 :
1797 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1798 : // after branching fails below
1799 : let mut writer = tline.begin_record(Lsn(0x10));
1800 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1801 : writer.finish()?;
1802 :
1803 : // Create a relation on the timeline
1804 : let mut writer = tline.begin_record(Lsn(0x20));
1805 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1806 : writer.finish()?;
1807 :
1808 : let writer = tline.begin_record(Lsn(0x00));
1809 : writer.finish()?;
1810 :
1811 : // Check that list_rels() lists it after LSN 2, but no before it
1812 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1813 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
1814 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
1815 :
1816 : // Create a branch, check that the relation is visible there
1817 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
1818 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
1819 : Some(timeline) => timeline,
1820 : None => panic!("Should have a local timeline"),
1821 : };
1822 : let newtline = DatadirTimelineImpl::new(newtline);
1823 : assert!(newtline
1824 : .list_rels(0, TESTDB, Lsn(0x30))?
1825 : .contains(&TESTREL_A));
1826 :
1827 : // Drop it on the branch
1828 : let mut new_writer = newtline.begin_record(Lsn(0x40));
1829 : new_writer.drop_relation(TESTREL_A)?;
1830 : new_writer.finish()?;
1831 :
1832 : // Check that it's no longer listed on the branch after the point where it was dropped
1833 : assert!(newtline
1834 : .list_rels(0, TESTDB, Lsn(0x30))?
1835 : .contains(&TESTREL_A));
1836 : assert!(!newtline
1837 : .list_rels(0, TESTDB, Lsn(0x40))?
1838 : .contains(&TESTREL_A));
1839 :
1840 : // Run checkpoint and garbage collection and check that it's still not visible
1841 : newtline.checkpoint(CheckpointConfig::Forced)?;
1842 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
1843 :
1844 : assert!(!newtline
1845 : .list_rels(0, TESTDB, Lsn(0x40))?
1846 : .contains(&TESTREL_A));
1847 :
1848 : Ok(())
1849 : }
1850 : */
1851 :
1852 : /*
1853 : #[test]
1854 : fn test_read_beyond_eof() -> Result<()> {
1855 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
1856 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
1857 :
1858 : make_some_layers(&tline, Lsn(0x20))?;
1859 : let mut writer = tline.begin_record(Lsn(0x60));
1860 : walingest.put_rel_page_image(
1861 : &mut writer,
1862 : TESTREL_A,
1863 : 0,
1864 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
1865 : )?;
1866 : writer.finish()?;
1867 :
1868 : // Test read before rel creation. Should error out.
1869 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
1870 :
1871 : // Read block beyond end of relation at different points in time.
1872 : // These reads should fall into different delta, image, and in-memory layers.
1873 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
1874 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
1875 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
1876 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
1877 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
1878 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
1879 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
1880 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
1881 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
1882 :
1883 : // Test on an in-memory layer with no preceding layer
1884 : let mut writer = tline.begin_record(Lsn(0x70));
1885 : walingest.put_rel_page_image(
1886 : &mut writer,
1887 : TESTREL_B,
1888 : 0,
1889 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
1890 : )?;
1891 : writer.finish()?;
1892 :
1893 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
1894 :
1895 : Ok(())
1896 : }
1897 : */
1898 : }
|