Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::repository::*;
13 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
14 : use crate::walrecord::NeonWalRecord;
15 : use anyhow::{ensure, Context};
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use itertools::Itertools;
19 : use pageserver_api::key::{
20 : dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
21 : rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
22 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
23 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
24 : };
25 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
26 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
27 : use postgres_ffi::BLCKSZ;
28 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
29 : use serde::{Deserialize, Serialize};
30 : use std::collections::{hash_map, HashMap, HashSet};
31 : use std::ops::ControlFlow;
32 : use std::ops::Range;
33 : use strum::IntoEnumIterator;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::{debug, trace, warn};
36 : use utils::bin_ser::DeserializeError;
37 : use utils::vec_map::{VecMap, VecMapOrdering};
38 : use utils::{bin_ser::BeSer, lsn::Lsn};
39 :
40 : const MAX_AUX_FILE_DELTAS: usize = 1024;
41 :
42 : #[derive(Debug)]
43 : pub enum LsnForTimestamp {
44 : /// Found commits both before and after the given timestamp
45 : Present(Lsn),
46 :
47 : /// Found no commits after the given timestamp, this means
48 : /// that the newest data in the branch is older than the given
49 : /// timestamp.
50 : ///
51 : /// All commits <= LSN happened before the given timestamp
52 : Future(Lsn),
53 :
54 : /// The queried timestamp is past our horizon we look back at (PITR)
55 : ///
56 : /// All commits > LSN happened after the given timestamp,
57 : /// but any commits < LSN might have happened before or after
58 : /// the given timestamp. We don't know because no data before
59 : /// the given lsn is available.
60 : Past(Lsn),
61 :
62 : /// We have found no commit with a timestamp,
63 : /// so we can't return anything meaningful.
64 : ///
65 : /// The associated LSN is the lower bound value we can safely
66 : /// create branches on, but no statement is made if it is
67 : /// older or newer than the timestamp.
68 : ///
69 : /// This variant can e.g. be returned right after a
70 : /// cluster import.
71 : NoData(Lsn),
72 : }
73 :
74 0 : #[derive(Debug, thiserror::Error)]
75 : pub enum CalculateLogicalSizeError {
76 : #[error("cancelled")]
77 : Cancelled,
78 : #[error(transparent)]
79 : Other(#[from] anyhow::Error),
80 : }
81 :
82 0 : #[derive(Debug, thiserror::Error)]
83 : pub(crate) enum CollectKeySpaceError {
84 : #[error(transparent)]
85 : Decode(#[from] DeserializeError),
86 : #[error(transparent)]
87 : PageRead(PageReconstructError),
88 : #[error("cancelled")]
89 : Cancelled,
90 : }
91 :
92 : impl From<PageReconstructError> for CollectKeySpaceError {
93 0 : fn from(err: PageReconstructError) -> Self {
94 0 : match err {
95 0 : PageReconstructError::Cancelled => Self::Cancelled,
96 0 : err => Self::PageRead(err),
97 : }
98 0 : }
99 : }
100 :
101 : impl From<PageReconstructError> for CalculateLogicalSizeError {
102 0 : fn from(pre: PageReconstructError) -> Self {
103 0 : match pre {
104 : PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
105 0 : Self::Cancelled
106 : }
107 0 : _ => Self::Other(pre.into()),
108 : }
109 0 : }
110 : }
111 :
112 0 : #[derive(Debug, thiserror::Error)]
113 : pub enum RelationError {
114 : #[error("Relation Already Exists")]
115 : AlreadyExists,
116 : #[error("invalid relnode")]
117 : InvalidRelnode,
118 : #[error(transparent)]
119 : Other(#[from] anyhow::Error),
120 : }
121 :
122 : ///
123 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
124 : /// and other special kinds of files, in a versioned key-value store. The
125 : /// Timeline struct provides the key-value store.
126 : ///
127 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
128 : /// implementation, and might be moved into a separate struct later.
129 : impl Timeline {
130 : /// Start ingesting a WAL record, or other atomic modification of
131 : /// the timeline.
132 : ///
133 : /// This provides a transaction-like interface to perform a bunch
134 : /// of modifications atomically.
135 : ///
136 : /// To ingest a WAL record, call begin_modification(lsn) to get a
137 : /// DatadirModification object. Use the functions in the object to
138 : /// modify the repository state, updating all the pages and metadata
139 : /// that the WAL record affects. When you're done, call commit() to
140 : /// commit the changes.
141 : ///
142 : /// Lsn stored in modification is advanced by `ingest_record` and
143 : /// is used by `commit()` to update `last_record_lsn`.
144 : ///
145 : /// Calling commit() will flush all the changes and reset the state,
146 : /// so the `DatadirModification` struct can be reused to perform the next modification.
147 : ///
148 : /// Note that any pending modifications you make through the
149 : /// modification object won't be visible to calls to the 'get' and list
150 : /// functions of the timeline until you finish! And if you update the
151 : /// same page twice, the last update wins.
152 : ///
153 268292 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
154 268292 : where
155 268292 : Self: Sized,
156 268292 : {
157 268292 : DatadirModification {
158 268292 : tline: self,
159 268292 : pending_lsns: Vec::new(),
160 268292 : pending_updates: HashMap::new(),
161 268292 : pending_deletions: Vec::new(),
162 268292 : pending_nblocks: 0,
163 268292 : pending_directory_entries: Vec::new(),
164 268292 : lsn,
165 268292 : }
166 268292 : }
167 :
168 : //------------------------------------------------------------------------------
169 : // Public GET functions
170 : //------------------------------------------------------------------------------
171 :
172 : /// Look up given page version.
173 18384 : pub(crate) async fn get_rel_page_at_lsn(
174 18384 : &self,
175 18384 : tag: RelTag,
176 18384 : blknum: BlockNumber,
177 18384 : version: Version<'_>,
178 18384 : latest: bool,
179 18384 : ctx: &RequestContext,
180 18384 : ) -> Result<Bytes, PageReconstructError> {
181 18384 : if tag.relnode == 0 {
182 0 : return Err(PageReconstructError::Other(
183 0 : RelationError::InvalidRelnode.into(),
184 0 : ));
185 18384 : }
186 :
187 18384 : let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
188 18384 : if blknum >= nblocks {
189 0 : debug!(
190 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
191 0 : tag,
192 0 : blknum,
193 0 : version.get_lsn(),
194 0 : nblocks
195 0 : );
196 0 : return Ok(ZERO_PAGE.clone());
197 18384 : }
198 18384 :
199 18384 : let key = rel_block_to_key(tag, blknum);
200 18384 : version.get(self, key, ctx).await
201 18384 : }
202 :
203 : // Get size of a database in blocks
204 0 : pub(crate) async fn get_db_size(
205 0 : &self,
206 0 : spcnode: Oid,
207 0 : dbnode: Oid,
208 0 : version: Version<'_>,
209 0 : latest: bool,
210 0 : ctx: &RequestContext,
211 0 : ) -> Result<usize, PageReconstructError> {
212 0 : let mut total_blocks = 0;
213 :
214 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
215 :
216 0 : for rel in rels {
217 0 : let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
218 0 : total_blocks += n_blocks as usize;
219 : }
220 0 : Ok(total_blocks)
221 0 : }
222 :
223 : /// Get size of a relation file
224 24434 : pub(crate) async fn get_rel_size(
225 24434 : &self,
226 24434 : tag: RelTag,
227 24434 : version: Version<'_>,
228 24434 : latest: bool,
229 24434 : ctx: &RequestContext,
230 24434 : ) -> Result<BlockNumber, PageReconstructError> {
231 24434 : if tag.relnode == 0 {
232 0 : return Err(PageReconstructError::Other(
233 0 : RelationError::InvalidRelnode.into(),
234 0 : ));
235 24434 : }
236 :
237 24434 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
238 19294 : return Ok(nblocks);
239 5140 : }
240 5140 :
241 5140 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
242 0 : && !self.get_rel_exists(tag, version, latest, ctx).await?
243 : {
244 : // FIXME: Postgres sometimes calls smgrcreate() to create
245 : // FSM, and smgrnblocks() on it immediately afterwards,
246 : // without extending it. Tolerate that by claiming that
247 : // any non-existent FSM fork has size 0.
248 0 : return Ok(0);
249 5140 : }
250 5140 :
251 5140 : let key = rel_size_to_key(tag);
252 5140 : let mut buf = version.get(self, key, ctx).await?;
253 5136 : let nblocks = buf.get_u32_le();
254 5136 :
255 5136 : if latest {
256 0 : // Update relation size cache only if "latest" flag is set.
257 0 : // This flag is set by compute when it is working with most recent version of relation.
258 0 : // Typically master compute node always set latest=true.
259 0 : // Please notice, that even if compute node "by mistake" specifies old LSN but set
260 0 : // latest=true, then it can not cause cache corruption, because with latest=true
261 0 : // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
262 0 : // associated with most recent value of LSN.
263 0 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
264 5136 : }
265 5136 : Ok(nblocks)
266 24434 : }
267 :
268 : /// Does relation exist?
269 6050 : pub(crate) async fn get_rel_exists(
270 6050 : &self,
271 6050 : tag: RelTag,
272 6050 : version: Version<'_>,
273 6050 : _latest: bool,
274 6050 : ctx: &RequestContext,
275 6050 : ) -> Result<bool, PageReconstructError> {
276 6050 : if tag.relnode == 0 {
277 0 : return Err(PageReconstructError::Other(
278 0 : RelationError::InvalidRelnode.into(),
279 0 : ));
280 6050 : }
281 :
282 : // first try to lookup relation in cache
283 6050 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
284 6032 : return Ok(true);
285 18 : }
286 18 : // fetch directory listing
287 18 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
288 18 : let buf = version.get(self, key, ctx).await?;
289 :
290 18 : match RelDirectory::des(&buf).context("deserialization failure") {
291 18 : Ok(dir) => {
292 18 : let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
293 18 : Ok(exists)
294 : }
295 0 : Err(e) => Err(PageReconstructError::from(e)),
296 : }
297 6050 : }
298 :
299 : /// Get a list of all existing relations in given tablespace and database.
300 : ///
301 : /// # Cancel-Safety
302 : ///
303 : /// This method is cancellation-safe.
304 0 : pub(crate) async fn list_rels(
305 0 : &self,
306 0 : spcnode: Oid,
307 0 : dbnode: Oid,
308 0 : version: Version<'_>,
309 0 : ctx: &RequestContext,
310 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
311 0 : // fetch directory listing
312 0 : let key = rel_dir_to_key(spcnode, dbnode);
313 0 : let buf = version.get(self, key, ctx).await?;
314 :
315 0 : match RelDirectory::des(&buf).context("deserialization failure") {
316 0 : Ok(dir) => {
317 0 : let rels: HashSet<RelTag> =
318 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
319 0 : spcnode,
320 0 : dbnode,
321 0 : relnode: *relnode,
322 0 : forknum: *forknum,
323 0 : }));
324 0 :
325 0 : Ok(rels)
326 : }
327 0 : Err(e) => Err(PageReconstructError::from(e)),
328 : }
329 0 : }
330 :
331 : /// Get the whole SLRU segment
332 0 : pub(crate) async fn get_slru_segment(
333 0 : &self,
334 0 : kind: SlruKind,
335 0 : segno: u32,
336 0 : lsn: Lsn,
337 0 : ctx: &RequestContext,
338 0 : ) -> Result<Bytes, PageReconstructError> {
339 0 : let n_blocks = self
340 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
341 0 : .await?;
342 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
343 0 : for blkno in 0..n_blocks {
344 0 : let block = self
345 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
346 0 : .await?;
347 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
348 : }
349 0 : Ok(segment.freeze())
350 0 : }
351 :
352 : /// Look up given SLRU page version.
353 0 : pub(crate) async fn get_slru_page_at_lsn(
354 0 : &self,
355 0 : kind: SlruKind,
356 0 : segno: u32,
357 0 : blknum: BlockNumber,
358 0 : lsn: Lsn,
359 0 : ctx: &RequestContext,
360 0 : ) -> Result<Bytes, PageReconstructError> {
361 0 : let key = slru_block_to_key(kind, segno, blknum);
362 0 : self.get(key, lsn, ctx).await
363 0 : }
364 :
365 : /// Get size of an SLRU segment
366 0 : pub(crate) async fn get_slru_segment_size(
367 0 : &self,
368 0 : kind: SlruKind,
369 0 : segno: u32,
370 0 : version: Version<'_>,
371 0 : ctx: &RequestContext,
372 0 : ) -> Result<BlockNumber, PageReconstructError> {
373 0 : let key = slru_segment_size_to_key(kind, segno);
374 0 : let mut buf = version.get(self, key, ctx).await?;
375 0 : Ok(buf.get_u32_le())
376 0 : }
377 :
378 : /// Get size of an SLRU segment
379 0 : pub(crate) async fn get_slru_segment_exists(
380 0 : &self,
381 0 : kind: SlruKind,
382 0 : segno: u32,
383 0 : version: Version<'_>,
384 0 : ctx: &RequestContext,
385 0 : ) -> Result<bool, PageReconstructError> {
386 0 : // fetch directory listing
387 0 : let key = slru_dir_to_key(kind);
388 0 : let buf = version.get(self, key, ctx).await?;
389 :
390 0 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
391 0 : Ok(dir) => {
392 0 : let exists = dir.segments.get(&segno).is_some();
393 0 : Ok(exists)
394 : }
395 0 : Err(e) => Err(PageReconstructError::from(e)),
396 : }
397 0 : }
398 :
399 : /// Locate LSN, such that all transactions that committed before
400 : /// 'search_timestamp' are visible, but nothing newer is.
401 : ///
402 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
403 : /// so it's not well defined which LSN you get if there were multiple commits
404 : /// "in flight" at that point in time.
405 : ///
406 0 : pub(crate) async fn find_lsn_for_timestamp(
407 0 : &self,
408 0 : search_timestamp: TimestampTz,
409 0 : cancel: &CancellationToken,
410 0 : ctx: &RequestContext,
411 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
412 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
413 0 : // We use this method to figure out the branching LSN for the new branch, but the
414 0 : // GC cutoff could be before the branching point and we cannot create a new branch
415 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
416 0 : // on the safe side.
417 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
418 0 : let max_lsn = self.get_last_record_lsn();
419 0 :
420 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
421 0 : // LSN divided by 8.
422 0 : let mut low = min_lsn.0 / 8;
423 0 : let mut high = max_lsn.0 / 8 + 1;
424 0 :
425 0 : let mut found_smaller = false;
426 0 : let mut found_larger = false;
427 0 : while low < high {
428 0 : if cancel.is_cancelled() {
429 0 : return Err(PageReconstructError::Cancelled);
430 0 : }
431 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
432 0 : let mid = (high + low) / 2;
433 :
434 0 : let cmp = self
435 0 : .is_latest_commit_timestamp_ge_than(
436 0 : search_timestamp,
437 0 : Lsn(mid * 8),
438 0 : &mut found_smaller,
439 0 : &mut found_larger,
440 0 : ctx,
441 0 : )
442 0 : .await?;
443 :
444 0 : if cmp {
445 0 : high = mid;
446 0 : } else {
447 0 : low = mid + 1;
448 0 : }
449 : }
450 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
451 : // so the LSN of the last commit record before or at `search_timestamp`.
452 : // Remove one from `low` to get `t`.
453 : //
454 : // FIXME: it would be better to get the LSN of the previous commit.
455 : // Otherwise, if you restore to the returned LSN, the database will
456 : // include physical changes from later commits that will be marked
457 : // as aborted, and will need to be vacuumed away.
458 0 : let commit_lsn = Lsn((low - 1) * 8);
459 0 : match (found_smaller, found_larger) {
460 : (false, false) => {
461 : // This can happen if no commit records have been processed yet, e.g.
462 : // just after importing a cluster.
463 0 : Ok(LsnForTimestamp::NoData(min_lsn))
464 : }
465 : (false, true) => {
466 : // Didn't find any commit timestamps smaller than the request
467 0 : Ok(LsnForTimestamp::Past(min_lsn))
468 : }
469 : (true, false) => {
470 : // Only found commits with timestamps smaller than the request.
471 : // It's still a valid case for branch creation, return it.
472 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
473 : // case, anyway.
474 0 : Ok(LsnForTimestamp::Future(commit_lsn))
475 : }
476 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
477 : }
478 0 : }
479 :
480 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
481 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
482 : ///
483 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
484 : /// with a smaller/larger timestamp.
485 : ///
486 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
487 0 : &self,
488 0 : search_timestamp: TimestampTz,
489 0 : probe_lsn: Lsn,
490 0 : found_smaller: &mut bool,
491 0 : found_larger: &mut bool,
492 0 : ctx: &RequestContext,
493 0 : ) -> Result<bool, PageReconstructError> {
494 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
495 0 : if timestamp >= search_timestamp {
496 0 : *found_larger = true;
497 0 : return ControlFlow::Break(true);
498 0 : } else {
499 0 : *found_smaller = true;
500 0 : }
501 0 : ControlFlow::Continue(())
502 0 : })
503 0 : .await
504 0 : }
505 :
506 : /// Obtain the possible timestamp range for the given lsn.
507 : ///
508 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
509 0 : pub(crate) async fn get_timestamp_for_lsn(
510 0 : &self,
511 0 : probe_lsn: Lsn,
512 0 : ctx: &RequestContext,
513 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
514 0 : let mut max: Option<TimestampTz> = None;
515 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
516 0 : if let Some(max_prev) = max {
517 0 : max = Some(max_prev.max(timestamp));
518 0 : } else {
519 0 : max = Some(timestamp);
520 0 : }
521 0 : ControlFlow::Continue(())
522 0 : })
523 0 : .await?;
524 :
525 0 : Ok(max)
526 0 : }
527 :
528 : /// Runs the given function on all the timestamps for a given lsn
529 : ///
530 : /// The return value is either given by the closure, or set to the `Default`
531 : /// impl's output.
532 0 : async fn map_all_timestamps<T: Default>(
533 0 : &self,
534 0 : probe_lsn: Lsn,
535 0 : ctx: &RequestContext,
536 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
537 0 : ) -> Result<T, PageReconstructError> {
538 0 : for segno in self
539 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
540 0 : .await?
541 : {
542 0 : let nblocks = self
543 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
544 0 : .await?;
545 0 : for blknum in (0..nblocks).rev() {
546 0 : let clog_page = self
547 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
548 0 : .await?;
549 :
550 0 : if clog_page.len() == BLCKSZ as usize + 8 {
551 0 : let mut timestamp_bytes = [0u8; 8];
552 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
553 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
554 0 :
555 0 : match f(timestamp) {
556 0 : ControlFlow::Break(b) => return Ok(b),
557 0 : ControlFlow::Continue(()) => (),
558 : }
559 0 : }
560 : }
561 : }
562 0 : Ok(Default::default())
563 0 : }
564 :
565 0 : pub(crate) async fn get_slru_keyspace(
566 0 : &self,
567 0 : version: Version<'_>,
568 0 : ctx: &RequestContext,
569 0 : ) -> Result<KeySpace, PageReconstructError> {
570 0 : let mut accum = KeySpaceAccum::new();
571 :
572 0 : for kind in SlruKind::iter() {
573 0 : let mut segments: Vec<u32> = self
574 0 : .list_slru_segments(kind, version, ctx)
575 0 : .await?
576 0 : .into_iter()
577 0 : .collect();
578 0 : segments.sort_unstable();
579 :
580 0 : for seg in segments {
581 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
582 :
583 0 : accum.add_range(
584 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
585 0 : );
586 : }
587 : }
588 :
589 0 : Ok(accum.to_keyspace())
590 0 : }
591 :
592 : /// Get a list of SLRU segments
593 0 : pub(crate) async fn list_slru_segments(
594 0 : &self,
595 0 : kind: SlruKind,
596 0 : version: Version<'_>,
597 0 : ctx: &RequestContext,
598 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
599 0 : // fetch directory entry
600 0 : let key = slru_dir_to_key(kind);
601 :
602 0 : let buf = version.get(self, key, ctx).await?;
603 0 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
604 0 : Ok(dir) => Ok(dir.segments),
605 0 : Err(e) => Err(PageReconstructError::from(e)),
606 : }
607 0 : }
608 :
609 0 : pub(crate) async fn get_relmap_file(
610 0 : &self,
611 0 : spcnode: Oid,
612 0 : dbnode: Oid,
613 0 : version: Version<'_>,
614 0 : ctx: &RequestContext,
615 0 : ) -> Result<Bytes, PageReconstructError> {
616 0 : let key = relmap_file_key(spcnode, dbnode);
617 :
618 0 : let buf = version.get(self, key, ctx).await?;
619 0 : Ok(buf)
620 0 : }
621 :
622 0 : pub(crate) async fn list_dbdirs(
623 0 : &self,
624 0 : lsn: Lsn,
625 0 : ctx: &RequestContext,
626 0 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
627 : // fetch directory entry
628 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
629 :
630 0 : match DbDirectory::des(&buf).context("deserialization failure") {
631 0 : Ok(dir) => Ok(dir.dbdirs),
632 0 : Err(e) => Err(PageReconstructError::from(e)),
633 : }
634 0 : }
635 :
636 0 : pub(crate) async fn get_twophase_file(
637 0 : &self,
638 0 : xid: TransactionId,
639 0 : lsn: Lsn,
640 0 : ctx: &RequestContext,
641 0 : ) -> Result<Bytes, PageReconstructError> {
642 0 : let key = twophase_file_key(xid);
643 0 : let buf = self.get(key, lsn, ctx).await?;
644 0 : Ok(buf)
645 0 : }
646 :
647 0 : pub(crate) async fn list_twophase_files(
648 0 : &self,
649 0 : lsn: Lsn,
650 0 : ctx: &RequestContext,
651 0 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
652 : // fetch directory entry
653 0 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
654 :
655 0 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
656 0 : Ok(dir) => Ok(dir.xids),
657 0 : Err(e) => Err(PageReconstructError::from(e)),
658 : }
659 0 : }
660 :
661 0 : pub(crate) async fn get_control_file(
662 0 : &self,
663 0 : lsn: Lsn,
664 0 : ctx: &RequestContext,
665 0 : ) -> Result<Bytes, PageReconstructError> {
666 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
667 0 : }
668 :
669 12 : pub(crate) async fn get_checkpoint(
670 12 : &self,
671 12 : lsn: Lsn,
672 12 : ctx: &RequestContext,
673 12 : ) -> Result<Bytes, PageReconstructError> {
674 12 : self.get(CHECKPOINT_KEY, lsn, ctx).await
675 12 : }
676 :
677 6 : pub(crate) async fn list_aux_files(
678 6 : &self,
679 6 : lsn: Lsn,
680 6 : ctx: &RequestContext,
681 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
682 6 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
683 6 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
684 6 : Ok(dir) => Ok(dir.files),
685 0 : Err(e) => Err(PageReconstructError::from(e)),
686 : },
687 0 : Err(e) => {
688 0 : // This is expected: historical databases do not have the key.
689 0 : debug!("Failed to get info about AUX files: {}", e);
690 0 : Ok(HashMap::new())
691 : }
692 : }
693 6 : }
694 :
695 : /// Does the same as get_current_logical_size but counted on demand.
696 : /// Used to initialize the logical size tracking on startup.
697 : ///
698 : /// Only relation blocks are counted currently. That excludes metadata,
699 : /// SLRUs, twophase files etc.
700 : ///
701 : /// # Cancel-Safety
702 : ///
703 : /// This method is cancellation-safe.
704 0 : pub async fn get_current_logical_size_non_incremental(
705 0 : &self,
706 0 : lsn: Lsn,
707 0 : ctx: &RequestContext,
708 0 : ) -> Result<u64, CalculateLogicalSizeError> {
709 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
710 :
711 : // Fetch list of database dirs and iterate them
712 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
713 0 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
714 :
715 0 : let mut total_size: u64 = 0;
716 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
717 0 : for rel in self
718 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
719 0 : .await?
720 : {
721 0 : if self.cancel.is_cancelled() {
722 0 : return Err(CalculateLogicalSizeError::Cancelled);
723 0 : }
724 0 : let relsize_key = rel_size_to_key(rel);
725 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
726 0 : let relsize = buf.get_u32_le();
727 0 :
728 0 : total_size += relsize as u64;
729 : }
730 : }
731 0 : Ok(total_size * BLCKSZ as u64)
732 0 : }
733 :
734 : ///
735 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
736 : /// Anything that's not listed maybe removed from the underlying storage (from
737 : /// that LSN forwards).
738 196 : pub(crate) async fn collect_keyspace(
739 196 : &self,
740 196 : lsn: Lsn,
741 196 : ctx: &RequestContext,
742 196 : ) -> Result<KeySpace, CollectKeySpaceError> {
743 196 : // Iterate through key ranges, greedily packing them into partitions
744 196 : let mut result = KeySpaceAccum::new();
745 196 :
746 196 : // The dbdir metadata always exists
747 196 : result.add_key(DBDIR_KEY);
748 :
749 : // Fetch list of database dirs and iterate them
750 2253 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
751 196 : let dbdir = DbDirectory::des(&buf)?;
752 :
753 196 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
754 196 : dbs.sort_unstable();
755 196 : for (spcnode, dbnode) in dbs {
756 0 : result.add_key(relmap_file_key(spcnode, dbnode));
757 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
758 :
759 0 : let mut rels: Vec<RelTag> = self
760 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
761 0 : .await?
762 0 : .into_iter()
763 0 : .collect();
764 0 : rels.sort_unstable();
765 0 : for rel in rels {
766 0 : let relsize_key = rel_size_to_key(rel);
767 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
768 0 : let relsize = buf.get_u32_le();
769 0 :
770 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
771 0 : result.add_key(relsize_key);
772 : }
773 : }
774 :
775 : // Iterate SLRUs next
776 588 : for kind in [
777 196 : SlruKind::Clog,
778 196 : SlruKind::MultiXactMembers,
779 196 : SlruKind::MultiXactOffsets,
780 : ] {
781 588 : let slrudir_key = slru_dir_to_key(kind);
782 588 : result.add_key(slrudir_key);
783 7928 : let buf = self.get(slrudir_key, lsn, ctx).await?;
784 588 : let dir = SlruSegmentDirectory::des(&buf)?;
785 588 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
786 588 : segments.sort_unstable();
787 588 : for segno in segments {
788 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
789 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
790 0 : let segsize = buf.get_u32_le();
791 0 :
792 0 : result.add_range(
793 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
794 0 : );
795 0 : result.add_key(segsize_key);
796 : }
797 : }
798 :
799 : // Then pg_twophase
800 196 : result.add_key(TWOPHASEDIR_KEY);
801 3039 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
802 196 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
803 196 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
804 196 : xids.sort_unstable();
805 196 : for xid in xids {
806 0 : result.add_key(twophase_file_key(xid));
807 0 : }
808 :
809 196 : result.add_key(CONTROLFILE_KEY);
810 196 : result.add_key(CHECKPOINT_KEY);
811 196 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
812 94 : result.add_key(AUX_FILES_KEY);
813 102 : }
814 196 : Ok(result.to_keyspace())
815 196 : }
816 :
817 : /// Get cached size of relation if it not updated after specified LSN
818 448540 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
819 448540 : let rel_size_cache = self.rel_size_cache.read().unwrap();
820 448540 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
821 448518 : if lsn >= *cached_lsn {
822 443372 : return Some(*nblocks);
823 5146 : }
824 22 : }
825 5168 : None
826 448540 : }
827 :
828 : /// Update cached relation size if there is no more recent update
829 0 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
830 0 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
831 0 : match rel_size_cache.entry(tag) {
832 0 : hash_map::Entry::Occupied(mut entry) => {
833 0 : let cached_lsn = entry.get_mut();
834 0 : if lsn >= cached_lsn.0 {
835 0 : *cached_lsn = (lsn, nblocks);
836 0 : }
837 : }
838 0 : hash_map::Entry::Vacant(entry) => {
839 0 : entry.insert((lsn, nblocks));
840 0 : }
841 : }
842 0 : }
843 :
844 : /// Store cached relation size
845 288732 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
846 288732 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
847 288732 : rel_size_cache.insert(tag, (lsn, nblocks));
848 288732 : }
849 :
850 : /// Remove cached relation size
851 2 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
852 2 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
853 2 : rel_size_cache.remove(tag);
854 2 : }
855 : }
856 :
857 : /// DatadirModification represents an operation to ingest an atomic set of
858 : /// updates to the repository. It is created by the 'begin_record'
859 : /// function. It is called for each WAL record, so that all the modifications
860 : /// by a one WAL record appear atomic.
861 : pub struct DatadirModification<'a> {
862 : /// The timeline this modification applies to. You can access this to
863 : /// read the state, but note that any pending updates are *not* reflected
864 : /// in the state in 'tline' yet.
865 : pub tline: &'a Timeline,
866 :
867 : /// Current LSN of the modification
868 : lsn: Lsn,
869 :
870 : // The modifications are not applied directly to the underlying key-value store.
871 : // The put-functions add the modifications here, and they are flushed to the
872 : // underlying key-value store by the 'finish' function.
873 : pending_lsns: Vec<Lsn>,
874 : pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
875 : pending_deletions: Vec<(Range<Key>, Lsn)>,
876 : pending_nblocks: i64,
877 :
878 : /// For special "directory" keys that store key-value maps, track the size of the map
879 : /// if it was updated in this modification.
880 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
881 : }
882 :
883 : impl<'a> DatadirModification<'a> {
884 : /// Get the current lsn
885 418056 : pub(crate) fn get_lsn(&self) -> Lsn {
886 418056 : self.lsn
887 418056 : }
888 :
889 : /// Set the current lsn
890 145856 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
891 145856 : ensure!(
892 145856 : lsn >= self.lsn,
893 0 : "setting an older lsn {} than {} is not allowed",
894 : lsn,
895 : self.lsn
896 : );
897 145856 : if lsn > self.lsn {
898 145856 : self.pending_lsns.push(self.lsn);
899 145856 : self.lsn = lsn;
900 145856 : }
901 145856 : Ok(())
902 145856 : }
903 :
904 : /// Initialize a completely new repository.
905 : ///
906 : /// This inserts the directory metadata entries that are assumed to
907 : /// always exist.
908 96 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
909 96 : let buf = DbDirectory::ser(&DbDirectory {
910 96 : dbdirs: HashMap::new(),
911 96 : })?;
912 96 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
913 96 : self.put(DBDIR_KEY, Value::Image(buf.into()));
914 96 :
915 96 : // Create AuxFilesDirectory
916 96 : self.init_aux_dir()?;
917 :
918 96 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
919 96 : xids: HashSet::new(),
920 96 : })?;
921 96 : self.pending_directory_entries
922 96 : .push((DirectoryKind::TwoPhase, 0));
923 96 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
924 :
925 96 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
926 96 : let empty_dir = Value::Image(buf);
927 96 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
928 96 : self.pending_directory_entries
929 96 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
930 96 : self.put(
931 96 : slru_dir_to_key(SlruKind::MultiXactMembers),
932 96 : empty_dir.clone(),
933 96 : );
934 96 : self.pending_directory_entries
935 96 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
936 96 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
937 96 : self.pending_directory_entries
938 96 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
939 96 :
940 96 : Ok(())
941 96 : }
942 :
943 : #[cfg(test)]
944 94 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
945 94 : self.init_empty()?;
946 94 : self.put_control_file(bytes::Bytes::from_static(
947 94 : b"control_file contents do not matter",
948 94 : ))
949 94 : .context("put_control_file")?;
950 94 : self.put_checkpoint(bytes::Bytes::from_static(
951 94 : b"checkpoint_file contents do not matter",
952 94 : ))
953 94 : .context("put_checkpoint_file")?;
954 94 : Ok(())
955 94 : }
956 :
957 : /// Put a new page version that can be constructed from a WAL record
958 : ///
959 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
960 : /// current end-of-file. It's up to the caller to check that the relation size
961 : /// matches the blocks inserted!
962 145630 : pub fn put_rel_wal_record(
963 145630 : &mut self,
964 145630 : rel: RelTag,
965 145630 : blknum: BlockNumber,
966 145630 : rec: NeonWalRecord,
967 145630 : ) -> anyhow::Result<()> {
968 145630 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
969 145630 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
970 145630 : Ok(())
971 145630 : }
972 :
973 : // Same, but for an SLRU.
974 8 : pub fn put_slru_wal_record(
975 8 : &mut self,
976 8 : kind: SlruKind,
977 8 : segno: u32,
978 8 : blknum: BlockNumber,
979 8 : rec: NeonWalRecord,
980 8 : ) -> anyhow::Result<()> {
981 8 : self.put(
982 8 : slru_block_to_key(kind, segno, blknum),
983 8 : Value::WalRecord(rec),
984 8 : );
985 8 : Ok(())
986 8 : }
987 :
988 : /// Like put_wal_record, but with ready-made image of the page.
989 280864 : pub fn put_rel_page_image(
990 280864 : &mut self,
991 280864 : rel: RelTag,
992 280864 : blknum: BlockNumber,
993 280864 : img: Bytes,
994 280864 : ) -> anyhow::Result<()> {
995 280864 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
996 280864 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
997 280864 : Ok(())
998 280864 : }
999 :
1000 6 : pub fn put_slru_page_image(
1001 6 : &mut self,
1002 6 : kind: SlruKind,
1003 6 : segno: u32,
1004 6 : blknum: BlockNumber,
1005 6 : img: Bytes,
1006 6 : ) -> anyhow::Result<()> {
1007 6 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
1008 6 : Ok(())
1009 6 : }
1010 :
1011 : /// Store a relmapper file (pg_filenode.map) in the repository
1012 16 : pub async fn put_relmap_file(
1013 16 : &mut self,
1014 16 : spcnode: Oid,
1015 16 : dbnode: Oid,
1016 16 : img: Bytes,
1017 16 : ctx: &RequestContext,
1018 16 : ) -> anyhow::Result<()> {
1019 : // Add it to the directory (if it doesn't exist already)
1020 16 : let buf = self.get(DBDIR_KEY, ctx).await?;
1021 16 : let mut dbdir = DbDirectory::des(&buf)?;
1022 :
1023 16 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1024 16 : if r.is_none() || r == Some(false) {
1025 : // The dbdir entry didn't exist, or it contained a
1026 : // 'false'. The 'insert' call already updated it with
1027 : // 'true', now write the updated 'dbdirs' map back.
1028 16 : let buf = DbDirectory::ser(&dbdir)?;
1029 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1030 16 :
1031 16 : // Create AuxFilesDirectory as well
1032 16 : self.init_aux_dir()?;
1033 0 : }
1034 16 : if r.is_none() {
1035 8 : // Create RelDirectory
1036 8 : let buf = RelDirectory::ser(&RelDirectory {
1037 8 : rels: HashSet::new(),
1038 8 : })?;
1039 8 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1040 8 : self.put(
1041 8 : rel_dir_to_key(spcnode, dbnode),
1042 8 : Value::Image(Bytes::from(buf)),
1043 8 : );
1044 8 : }
1045 :
1046 16 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1047 16 : Ok(())
1048 16 : }
1049 :
1050 0 : pub async fn put_twophase_file(
1051 0 : &mut self,
1052 0 : xid: TransactionId,
1053 0 : img: Bytes,
1054 0 : ctx: &RequestContext,
1055 0 : ) -> anyhow::Result<()> {
1056 : // Add it to the directory entry
1057 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1058 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1059 0 : if !dir.xids.insert(xid) {
1060 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1061 0 : }
1062 0 : self.pending_directory_entries
1063 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1064 0 : self.put(
1065 0 : TWOPHASEDIR_KEY,
1066 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1067 : );
1068 :
1069 0 : self.put(twophase_file_key(xid), Value::Image(img));
1070 0 : Ok(())
1071 0 : }
1072 :
1073 96 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1074 96 : self.put(CONTROLFILE_KEY, Value::Image(img));
1075 96 : Ok(())
1076 96 : }
1077 :
1078 110 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1079 110 : self.put(CHECKPOINT_KEY, Value::Image(img));
1080 110 : Ok(())
1081 110 : }
1082 :
1083 0 : pub async fn drop_dbdir(
1084 0 : &mut self,
1085 0 : spcnode: Oid,
1086 0 : dbnode: Oid,
1087 0 : ctx: &RequestContext,
1088 0 : ) -> anyhow::Result<()> {
1089 0 : let total_blocks = self
1090 0 : .tline
1091 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
1092 0 : .await?;
1093 :
1094 : // Remove entry from dbdir
1095 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1096 0 : let mut dir = DbDirectory::des(&buf)?;
1097 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1098 0 : let buf = DbDirectory::ser(&dir)?;
1099 0 : self.pending_directory_entries
1100 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1101 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1102 : } else {
1103 0 : warn!(
1104 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1105 0 : spcnode, dbnode
1106 0 : );
1107 : }
1108 :
1109 : // Update logical database size.
1110 0 : self.pending_nblocks -= total_blocks as i64;
1111 0 :
1112 0 : // Delete all relations and metadata files for the spcnode/dnode
1113 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1114 0 : Ok(())
1115 0 : }
1116 :
1117 : /// Create a relation fork.
1118 : ///
1119 : /// 'nblocks' is the initial size.
1120 1920 : pub async fn put_rel_creation(
1121 1920 : &mut self,
1122 1920 : rel: RelTag,
1123 1920 : nblocks: BlockNumber,
1124 1920 : ctx: &RequestContext,
1125 1920 : ) -> Result<(), RelationError> {
1126 1920 : if rel.relnode == 0 {
1127 0 : return Err(RelationError::InvalidRelnode);
1128 1920 : }
1129 : // It's possible that this is the first rel for this db in this
1130 : // tablespace. Create the reldir entry for it if so.
1131 1920 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1132 1920 : .context("deserialize db")?;
1133 1920 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1134 1920 : let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
1135 : // Didn't exist. Update dbdir
1136 8 : dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
1137 8 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1138 8 : self.pending_directory_entries
1139 8 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1140 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1141 8 :
1142 8 : // and create the RelDirectory
1143 8 : RelDirectory::default()
1144 : } else {
1145 : // reldir already exists, fetch it
1146 1912 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1147 1912 : .context("deserialize db")?
1148 : };
1149 :
1150 : // Add the new relation to the rel directory entry, and write it back
1151 1920 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1152 0 : return Err(RelationError::AlreadyExists);
1153 1920 : }
1154 1920 :
1155 1920 : self.pending_directory_entries
1156 1920 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1157 1920 :
1158 1920 : self.put(
1159 1920 : rel_dir_key,
1160 1920 : Value::Image(Bytes::from(
1161 1920 : RelDirectory::ser(&rel_dir).context("serialize")?,
1162 : )),
1163 : );
1164 :
1165 : // Put size
1166 1920 : let size_key = rel_size_to_key(rel);
1167 1920 : let buf = nblocks.to_le_bytes();
1168 1920 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1169 1920 :
1170 1920 : self.pending_nblocks += nblocks as i64;
1171 1920 :
1172 1920 : // Update relation size cache
1173 1920 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1174 1920 :
1175 1920 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1176 1920 : // caller.
1177 1920 : Ok(())
1178 1920 : }
1179 :
1180 : /// Truncate relation
1181 6012 : pub async fn put_rel_truncation(
1182 6012 : &mut self,
1183 6012 : rel: RelTag,
1184 6012 : nblocks: BlockNumber,
1185 6012 : ctx: &RequestContext,
1186 6012 : ) -> anyhow::Result<()> {
1187 6012 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1188 6012 : if self
1189 6012 : .tline
1190 6012 : .get_rel_exists(rel, Version::Modified(self), true, ctx)
1191 0 : .await?
1192 : {
1193 6012 : let size_key = rel_size_to_key(rel);
1194 : // Fetch the old size first
1195 6012 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1196 6012 :
1197 6012 : // Update the entry with the new size.
1198 6012 : let buf = nblocks.to_le_bytes();
1199 6012 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1200 6012 :
1201 6012 : // Update relation size cache
1202 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1203 6012 :
1204 6012 : // Update relation size cache
1205 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1206 6012 :
1207 6012 : // Update logical database size.
1208 6012 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1209 0 : }
1210 6012 : Ok(())
1211 6012 : }
1212 :
1213 : /// Extend relation
1214 : /// If new size is smaller, do nothing.
1215 276680 : pub async fn put_rel_extend(
1216 276680 : &mut self,
1217 276680 : rel: RelTag,
1218 276680 : nblocks: BlockNumber,
1219 276680 : ctx: &RequestContext,
1220 276680 : ) -> anyhow::Result<()> {
1221 276680 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1222 :
1223 : // Put size
1224 276680 : let size_key = rel_size_to_key(rel);
1225 276680 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1226 276680 :
1227 276680 : // only extend relation here. never decrease the size
1228 276680 : if nblocks > old_size {
1229 274788 : let buf = nblocks.to_le_bytes();
1230 274788 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1231 274788 :
1232 274788 : // Update relation size cache
1233 274788 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1234 274788 :
1235 274788 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1236 274788 : }
1237 276680 : Ok(())
1238 276680 : }
1239 :
1240 : /// Drop a relation.
1241 2 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1242 2 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1243 :
1244 : // Remove it from the directory entry
1245 2 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1246 2 : let buf = self.get(dir_key, ctx).await?;
1247 2 : let mut dir = RelDirectory::des(&buf)?;
1248 :
1249 2 : self.pending_directory_entries
1250 2 : .push((DirectoryKind::Rel, dir.rels.len()));
1251 2 :
1252 2 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1253 2 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1254 : } else {
1255 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1256 : }
1257 :
1258 : // update logical size
1259 2 : let size_key = rel_size_to_key(rel);
1260 2 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1261 2 : self.pending_nblocks -= old_size as i64;
1262 2 :
1263 2 : // Remove enty from relation size cache
1264 2 : self.tline.remove_cached_rel_size(&rel);
1265 2 :
1266 2 : // Delete size entry, as well as all blocks
1267 2 : self.delete(rel_key_range(rel));
1268 2 :
1269 2 : Ok(())
1270 2 : }
1271 :
1272 6 : pub async fn put_slru_segment_creation(
1273 6 : &mut self,
1274 6 : kind: SlruKind,
1275 6 : segno: u32,
1276 6 : nblocks: BlockNumber,
1277 6 : ctx: &RequestContext,
1278 6 : ) -> anyhow::Result<()> {
1279 6 : // Add it to the directory entry
1280 6 : let dir_key = slru_dir_to_key(kind);
1281 6 : let buf = self.get(dir_key, ctx).await?;
1282 6 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1283 :
1284 6 : if !dir.segments.insert(segno) {
1285 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1286 6 : }
1287 6 : self.pending_directory_entries
1288 6 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1289 6 : self.put(
1290 6 : dir_key,
1291 6 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1292 : );
1293 :
1294 : // Put size
1295 6 : let size_key = slru_segment_size_to_key(kind, segno);
1296 6 : let buf = nblocks.to_le_bytes();
1297 6 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1298 6 :
1299 6 : // even if nblocks > 0, we don't insert any actual blocks here
1300 6 :
1301 6 : Ok(())
1302 6 : }
1303 :
1304 : /// Extend SLRU segment
1305 0 : pub fn put_slru_extend(
1306 0 : &mut self,
1307 0 : kind: SlruKind,
1308 0 : segno: u32,
1309 0 : nblocks: BlockNumber,
1310 0 : ) -> anyhow::Result<()> {
1311 0 : // Put size
1312 0 : let size_key = slru_segment_size_to_key(kind, segno);
1313 0 : let buf = nblocks.to_le_bytes();
1314 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1315 0 : Ok(())
1316 0 : }
1317 :
1318 : /// This method is used for marking truncated SLRU files
1319 0 : pub async fn drop_slru_segment(
1320 0 : &mut self,
1321 0 : kind: SlruKind,
1322 0 : segno: u32,
1323 0 : ctx: &RequestContext,
1324 0 : ) -> anyhow::Result<()> {
1325 0 : // Remove it from the directory entry
1326 0 : let dir_key = slru_dir_to_key(kind);
1327 0 : let buf = self.get(dir_key, ctx).await?;
1328 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1329 :
1330 0 : if !dir.segments.remove(&segno) {
1331 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1332 0 : }
1333 0 : self.pending_directory_entries
1334 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1335 0 : self.put(
1336 0 : dir_key,
1337 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1338 : );
1339 :
1340 : // Delete size entry, as well as all blocks
1341 0 : self.delete(slru_segment_key_range(kind, segno));
1342 0 :
1343 0 : Ok(())
1344 0 : }
1345 :
1346 : /// Drop a relmapper file (pg_filenode.map)
1347 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1348 0 : // TODO
1349 0 : Ok(())
1350 0 : }
1351 :
1352 : /// This method is used for marking truncated SLRU files
1353 0 : pub async fn drop_twophase_file(
1354 0 : &mut self,
1355 0 : xid: TransactionId,
1356 0 : ctx: &RequestContext,
1357 0 : ) -> anyhow::Result<()> {
1358 : // Remove it from the directory entry
1359 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1360 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1361 :
1362 0 : if !dir.xids.remove(&xid) {
1363 0 : warn!("twophase file for xid {} does not exist", xid);
1364 0 : }
1365 0 : self.pending_directory_entries
1366 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1367 0 : self.put(
1368 0 : TWOPHASEDIR_KEY,
1369 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1370 : );
1371 :
1372 : // Delete it
1373 0 : self.delete(twophase_key_range(xid));
1374 0 :
1375 0 : Ok(())
1376 0 : }
1377 :
1378 112 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1379 112 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1380 112 : files: HashMap::new(),
1381 112 : })?;
1382 112 : self.pending_directory_entries
1383 112 : .push((DirectoryKind::AuxFiles, 0));
1384 112 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1385 112 : Ok(())
1386 112 : }
1387 :
1388 8 : pub async fn put_file(
1389 8 : &mut self,
1390 8 : path: &str,
1391 8 : content: &[u8],
1392 8 : ctx: &RequestContext,
1393 8 : ) -> anyhow::Result<()> {
1394 8 : let file_path = path.to_string();
1395 8 : let content = if content.is_empty() {
1396 2 : None
1397 : } else {
1398 6 : Some(Bytes::copy_from_slice(content))
1399 : };
1400 :
1401 : let n_files;
1402 8 : let mut aux_files = self.tline.aux_files.lock().await;
1403 8 : if let Some(mut dir) = aux_files.dir.take() {
1404 : // We already updated aux files in `self`: emit a delta and update our latest value
1405 6 : dir.upsert(file_path.clone(), content.clone());
1406 6 : n_files = dir.files.len();
1407 6 : if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
1408 0 : self.put(
1409 0 : AUX_FILES_KEY,
1410 0 : Value::Image(Bytes::from(
1411 0 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1412 : )),
1413 : );
1414 0 : aux_files.n_deltas = 0;
1415 6 : } else {
1416 6 : self.put(
1417 6 : AUX_FILES_KEY,
1418 6 : Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
1419 6 : );
1420 6 : aux_files.n_deltas += 1;
1421 6 : }
1422 6 : aux_files.dir = Some(dir);
1423 : } else {
1424 : // Check if the AUX_FILES_KEY is initialized
1425 2 : match self.get(AUX_FILES_KEY, ctx).await {
1426 0 : Ok(dir_bytes) => {
1427 0 : let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
1428 : // Key is already set, we may append a delta
1429 0 : self.put(
1430 0 : AUX_FILES_KEY,
1431 0 : Value::WalRecord(NeonWalRecord::AuxFile {
1432 0 : file_path: file_path.clone(),
1433 0 : content: content.clone(),
1434 0 : }),
1435 0 : );
1436 0 : dir.upsert(file_path, content);
1437 0 : n_files = dir.files.len();
1438 0 : aux_files.dir = Some(dir);
1439 : }
1440 : Err(
1441 0 : e @ (PageReconstructError::AncestorStopping(_)
1442 : | PageReconstructError::Cancelled
1443 : | PageReconstructError::AncestorLsnTimeout(_)),
1444 : ) => {
1445 : // Important that we do not interpret a shutdown error as "not found" and thereby
1446 : // reset the map.
1447 0 : return Err(e.into());
1448 : }
1449 : // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
1450 : // we are assuming that all _other_ possible errors represents a missing key. If some
1451 : // other error occurs, we may incorrectly reset the map of aux files.
1452 : Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
1453 : // Key is missing, we must insert an image as the basis for subsequent deltas.
1454 :
1455 2 : let mut dir = AuxFilesDirectory {
1456 2 : files: HashMap::new(),
1457 2 : };
1458 2 : dir.upsert(file_path, content);
1459 2 : self.put(
1460 2 : AUX_FILES_KEY,
1461 2 : Value::Image(Bytes::from(
1462 2 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1463 : )),
1464 : );
1465 2 : n_files = 1;
1466 2 : aux_files.dir = Some(dir);
1467 : }
1468 : }
1469 : }
1470 :
1471 8 : self.pending_directory_entries
1472 8 : .push((DirectoryKind::AuxFiles, n_files));
1473 8 :
1474 8 : Ok(())
1475 8 : }
1476 :
1477 : ///
1478 : /// Flush changes accumulated so far to the underlying repository.
1479 : ///
1480 : /// Usually, changes made in DatadirModification are atomic, but this allows
1481 : /// you to flush them to the underlying repository before the final `commit`.
1482 : /// That allows to free up the memory used to hold the pending changes.
1483 : ///
1484 : /// Currently only used during bulk import of a data directory. In that
1485 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1486 : /// whole import fails and the timeline will be deleted anyway.
1487 : /// (Or to be precise, it will be left behind for debugging purposes and
1488 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1489 : ///
1490 : /// Note: A consequence of flushing the pending operations is that they
1491 : /// won't be visible to subsequent operations until `commit`. The function
1492 : /// retains all the metadata, but data pages are flushed. That's again OK
1493 : /// for bulk import, where you are just loading data pages and won't try to
1494 : /// modify the same pages twice.
1495 1930 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1496 1930 : // Unless we have accumulated a decent amount of changes, it's not worth it
1497 1930 : // to scan through the pending_updates list.
1498 1930 : let pending_nblocks = self.pending_nblocks;
1499 1930 : if pending_nblocks < 10000 {
1500 1930 : return Ok(());
1501 0 : }
1502 :
1503 0 : let mut writer = self.tline.writer().await;
1504 :
1505 : // Flush relation and SLRU data blocks, keep metadata.
1506 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1507 0 : for (key, values) in self.pending_updates.drain() {
1508 0 : for (lsn, value) in values {
1509 0 : if is_rel_block_key(&key) || is_slru_block_key(key) {
1510 : // This bails out on first error without modifying pending_updates.
1511 : // That's Ok, cf this function's doc comment.
1512 0 : writer.put(key, lsn, &value, ctx).await?;
1513 0 : } else {
1514 0 : retained_pending_updates
1515 0 : .entry(key)
1516 0 : .or_default()
1517 0 : .push((lsn, value));
1518 0 : }
1519 : }
1520 : }
1521 :
1522 0 : self.pending_updates = retained_pending_updates;
1523 0 :
1524 0 : if pending_nblocks != 0 {
1525 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1526 0 : self.pending_nblocks = 0;
1527 0 : }
1528 :
1529 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1530 0 : writer.update_directory_entries_count(kind, count as u64);
1531 0 : }
1532 :
1533 0 : Ok(())
1534 1930 : }
1535 :
1536 : ///
1537 : /// Finish this atomic update, writing all the updated keys to the
1538 : /// underlying timeline.
1539 : /// All the modifications in this atomic update are stamped by the specified LSN.
1540 : ///
1541 742976 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1542 742976 : let mut writer = self.tline.writer().await;
1543 :
1544 742976 : let pending_nblocks = self.pending_nblocks;
1545 742976 : self.pending_nblocks = 0;
1546 742976 :
1547 742976 : if !self.pending_updates.is_empty() {
1548 : // The put_batch call below expects expects the inputs to be sorted by Lsn,
1549 : // so we do that first.
1550 413960 : let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
1551 413960 : self.pending_updates
1552 413960 : .drain()
1553 699856 : .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
1554 413960 : .kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
1555 413960 : VecMapOrdering::GreaterOrEqual,
1556 413960 : );
1557 413960 :
1558 413960 : writer.put_batch(lsn_ordered_batch, ctx).await?;
1559 329016 : }
1560 :
1561 742976 : if !self.pending_deletions.is_empty() {
1562 2 : writer.delete_batch(&self.pending_deletions).await?;
1563 2 : self.pending_deletions.clear();
1564 742974 : }
1565 :
1566 742976 : self.pending_lsns.push(self.lsn);
1567 888832 : for pending_lsn in self.pending_lsns.drain(..) {
1568 888832 : // Ideally, we should be able to call writer.finish_write() only once
1569 888832 : // with the highest LSN. However, the last_record_lsn variable in the
1570 888832 : // timeline keeps track of the latest LSN and the immediate previous LSN
1571 888832 : // so we need to record every LSN to not leave a gap between them.
1572 888832 : writer.finish_write(pending_lsn);
1573 888832 : }
1574 :
1575 742976 : if pending_nblocks != 0 {
1576 270570 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1577 472406 : }
1578 :
1579 742976 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1580 2544 : writer.update_directory_entries_count(kind, count as u64);
1581 2544 : }
1582 :
1583 742976 : Ok(())
1584 742976 : }
1585 :
1586 291704 : pub(crate) fn len(&self) -> usize {
1587 291704 : self.pending_updates.len() + self.pending_deletions.len()
1588 291704 : }
1589 :
1590 : // Internal helper functions to batch the modifications
1591 :
1592 286562 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1593 : // Have we already updated the same key? Read the latest pending updated
1594 : // version in that case.
1595 : //
1596 : // Note: we don't check pending_deletions. It is an error to request a
1597 : // value that has been removed, deletion only avoids leaking storage.
1598 286562 : if let Some(values) = self.pending_updates.get(&key) {
1599 15928 : if let Some((_, value)) = values.last() {
1600 15928 : return if let Value::Image(img) = value {
1601 15928 : Ok(img.clone())
1602 : } else {
1603 : // Currently, we never need to read back a WAL record that we
1604 : // inserted in the same "transaction". All the metadata updates
1605 : // work directly with Images, and we never need to read actual
1606 : // data pages. We could handle this if we had to, by calling
1607 : // the walredo manager, but let's keep it simple for now.
1608 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1609 0 : "unexpected pending WAL record"
1610 0 : )))
1611 : };
1612 0 : }
1613 270634 : }
1614 270634 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1615 270634 : self.tline.get(key, lsn, ctx).await
1616 286562 : }
1617 :
1618 712016 : fn put(&mut self, key: Key, val: Value) {
1619 712016 : let values = self.pending_updates.entry(key).or_default();
1620 : // Replace the previous value if it exists at the same lsn
1621 712016 : if let Some((last_lsn, last_value)) = values.last_mut() {
1622 12164 : if *last_lsn == self.lsn {
1623 12160 : *last_value = val;
1624 12160 : return;
1625 4 : }
1626 699852 : }
1627 699856 : values.push((self.lsn, val));
1628 712016 : }
1629 :
1630 2 : fn delete(&mut self, key_range: Range<Key>) {
1631 2 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1632 2 : self.pending_deletions.push((key_range, self.lsn));
1633 2 : }
1634 : }
1635 :
1636 : /// This struct facilitates accessing either a committed key from the timeline at a
1637 : /// specific LSN, or the latest uncommitted key from a pending modification.
1638 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1639 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1640 : /// need to look up the keys in the modification first before looking them up in the
1641 : /// timeline to not miss the latest updates.
1642 : #[derive(Clone, Copy)]
1643 : pub enum Version<'a> {
1644 : Lsn(Lsn),
1645 : Modified(&'a DatadirModification<'a>),
1646 : }
1647 :
1648 : impl<'a> Version<'a> {
1649 23542 : async fn get(
1650 23542 : &self,
1651 23542 : timeline: &Timeline,
1652 23542 : key: Key,
1653 23542 : ctx: &RequestContext,
1654 23542 : ) -> Result<Bytes, PageReconstructError> {
1655 23542 : match self {
1656 23532 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1657 10 : Version::Modified(modification) => modification.get(key, ctx).await,
1658 : }
1659 23542 : }
1660 :
1661 30484 : fn get_lsn(&self) -> Lsn {
1662 30484 : match self {
1663 24438 : Version::Lsn(lsn) => *lsn,
1664 6046 : Version::Modified(modification) => modification.lsn,
1665 : }
1666 30484 : }
1667 : }
1668 :
1669 : //--- Metadata structs stored in key-value pairs in the repository.
1670 :
1671 2132 : #[derive(Debug, Serialize, Deserialize)]
1672 : struct DbDirectory {
1673 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1674 : dbdirs: HashMap<(Oid, Oid), bool>,
1675 : }
1676 :
1677 196 : #[derive(Debug, Serialize, Deserialize)]
1678 : struct TwoPhaseDirectory {
1679 : xids: HashSet<TransactionId>,
1680 : }
1681 :
1682 1932 : #[derive(Debug, Serialize, Deserialize, Default)]
1683 : struct RelDirectory {
1684 : // Set of relations that exist. (relfilenode, forknum)
1685 : //
1686 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1687 : // key-value pairs, if you have a lot of relations
1688 : rels: HashSet<(Oid, u8)>,
1689 : }
1690 :
1691 24 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
1692 : pub(crate) struct AuxFilesDirectory {
1693 : pub(crate) files: HashMap<String, Bytes>,
1694 : }
1695 :
1696 : impl AuxFilesDirectory {
1697 24 : pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
1698 24 : if let Some(value) = value {
1699 18 : self.files.insert(key, value);
1700 18 : } else {
1701 6 : self.files.remove(&key);
1702 6 : }
1703 24 : }
1704 : }
1705 :
1706 0 : #[derive(Debug, Serialize, Deserialize)]
1707 : struct RelSizeEntry {
1708 : nblocks: u32,
1709 : }
1710 :
1711 594 : #[derive(Debug, Serialize, Deserialize, Default)]
1712 : struct SlruSegmentDirectory {
1713 : // Set of SLRU segments that exist.
1714 : segments: HashSet<u32>,
1715 : }
1716 :
1717 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
1718 : #[repr(u8)]
1719 : pub(crate) enum DirectoryKind {
1720 : Db,
1721 : TwoPhase,
1722 : Rel,
1723 : AuxFiles,
1724 : SlruSegment(SlruKind),
1725 : }
1726 :
1727 : impl DirectoryKind {
1728 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
1729 5088 : pub(crate) fn offset(&self) -> usize {
1730 5088 : self.into_usize()
1731 5088 : }
1732 : }
1733 :
1734 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1735 :
1736 : #[allow(clippy::bool_assert_comparison)]
1737 : #[cfg(test)]
1738 : mod tests {
1739 : use hex_literal::hex;
1740 : use utils::id::TimelineId;
1741 :
1742 : use super::*;
1743 :
1744 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
1745 :
1746 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
1747 : #[tokio::test]
1748 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
1749 2 : let name = "aux_files_round_trip";
1750 2 : let harness = TenantHarness::create(name)?;
1751 2 :
1752 2 : pub const TIMELINE_ID: TimelineId =
1753 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
1754 2 :
1755 2 : let (tenant, ctx) = harness.load().await;
1756 2 : let tline = tenant
1757 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
1758 2 : .await?;
1759 2 : let tline = tline.raw_timeline().unwrap();
1760 2 :
1761 2 : // First modification: insert two keys
1762 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
1763 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
1764 2 : modification.set_lsn(Lsn(0x1008))?;
1765 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
1766 2 : modification.commit(&ctx).await?;
1767 2 : let expect_1008 = HashMap::from([
1768 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
1769 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
1770 2 : ]);
1771 2 :
1772 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
1773 2 : assert_eq!(readback, expect_1008);
1774 2 :
1775 2 : // Second modification: update one key, remove the other
1776 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
1777 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
1778 2 : modification.set_lsn(Lsn(0x2008))?;
1779 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
1780 2 : modification.commit(&ctx).await?;
1781 2 : let expect_2008 =
1782 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
1783 2 :
1784 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
1785 2 : assert_eq!(readback, expect_2008);
1786 2 :
1787 2 : // Reading back in time works
1788 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
1789 2 : assert_eq!(readback, expect_1008);
1790 2 :
1791 2 : Ok(())
1792 2 : }
1793 :
1794 : /*
1795 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1796 : let incremental = timeline.get_current_logical_size();
1797 : let non_incremental = timeline
1798 : .get_current_logical_size_non_incremental(lsn)
1799 : .unwrap();
1800 : assert_eq!(incremental, non_incremental);
1801 : }
1802 : */
1803 :
1804 : /*
1805 : ///
1806 : /// Test list_rels() function, with branches and dropped relations
1807 : ///
1808 : #[test]
1809 : fn test_list_rels_drop() -> Result<()> {
1810 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1811 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1812 : const TESTDB: u32 = 111;
1813 :
1814 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1815 : // after branching fails below
1816 : let mut writer = tline.begin_record(Lsn(0x10));
1817 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1818 : writer.finish()?;
1819 :
1820 : // Create a relation on the timeline
1821 : let mut writer = tline.begin_record(Lsn(0x20));
1822 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1823 : writer.finish()?;
1824 :
1825 : let writer = tline.begin_record(Lsn(0x00));
1826 : writer.finish()?;
1827 :
1828 : // Check that list_rels() lists it after LSN 2, but no before it
1829 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1830 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
1831 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
1832 :
1833 : // Create a branch, check that the relation is visible there
1834 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
1835 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
1836 : Some(timeline) => timeline,
1837 : None => panic!("Should have a local timeline"),
1838 : };
1839 : let newtline = DatadirTimelineImpl::new(newtline);
1840 : assert!(newtline
1841 : .list_rels(0, TESTDB, Lsn(0x30))?
1842 : .contains(&TESTREL_A));
1843 :
1844 : // Drop it on the branch
1845 : let mut new_writer = newtline.begin_record(Lsn(0x40));
1846 : new_writer.drop_relation(TESTREL_A)?;
1847 : new_writer.finish()?;
1848 :
1849 : // Check that it's no longer listed on the branch after the point where it was dropped
1850 : assert!(newtline
1851 : .list_rels(0, TESTDB, Lsn(0x30))?
1852 : .contains(&TESTREL_A));
1853 : assert!(!newtline
1854 : .list_rels(0, TESTDB, Lsn(0x40))?
1855 : .contains(&TESTREL_A));
1856 :
1857 : // Run checkpoint and garbage collection and check that it's still not visible
1858 : newtline.checkpoint(CheckpointConfig::Forced)?;
1859 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
1860 :
1861 : assert!(!newtline
1862 : .list_rels(0, TESTDB, Lsn(0x40))?
1863 : .contains(&TESTREL_A));
1864 :
1865 : Ok(())
1866 : }
1867 : */
1868 :
1869 : /*
1870 : #[test]
1871 : fn test_read_beyond_eof() -> Result<()> {
1872 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
1873 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
1874 :
1875 : make_some_layers(&tline, Lsn(0x20))?;
1876 : let mut writer = tline.begin_record(Lsn(0x60));
1877 : walingest.put_rel_page_image(
1878 : &mut writer,
1879 : TESTREL_A,
1880 : 0,
1881 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
1882 : )?;
1883 : writer.finish()?;
1884 :
1885 : // Test read before rel creation. Should error out.
1886 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
1887 :
1888 : // Read block beyond end of relation at different points in time.
1889 : // These reads should fall into different delta, image, and in-memory layers.
1890 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
1891 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
1892 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
1893 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
1894 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
1895 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
1896 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
1897 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
1898 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
1899 :
1900 : // Test on an in-memory layer with no preceding layer
1901 : let mut writer = tline.begin_record(Lsn(0x70));
1902 : walingest.put_rel_page_image(
1903 : &mut writer,
1904 : TESTREL_B,
1905 : 0,
1906 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
1907 : )?;
1908 : writer.finish()?;
1909 :
1910 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
1911 :
1912 : Ok(())
1913 : }
1914 : */
1915 : }
|