Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::aux_file;
11 : use crate::context::RequestContext;
12 : use crate::keyspace::{KeySpace, KeySpaceAccum};
13 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
14 : use anyhow::{ensure, Context};
15 : use bytes::{Buf, Bytes, BytesMut};
16 : use enum_map::Enum;
17 : use pageserver_api::key::Key;
18 : use pageserver_api::key::{
19 : dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
20 : relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
21 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
22 : CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
23 : };
24 : use pageserver_api::keyspace::SparseKeySpace;
25 : use pageserver_api::record::NeonWalRecord;
26 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
27 : use pageserver_api::shard::ShardIdentity;
28 : use pageserver_api::value::Value;
29 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
30 : use postgres_ffi::BLCKSZ;
31 : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
32 : use serde::{Deserialize, Serialize};
33 : use std::collections::{hash_map, HashMap, HashSet};
34 : use std::ops::ControlFlow;
35 : use std::ops::Range;
36 : use strum::IntoEnumIterator;
37 : use tokio_util::sync::CancellationToken;
38 : use tracing::{debug, trace, warn};
39 : use utils::bin_ser::DeserializeError;
40 : use utils::pausable_failpoint;
41 : use utils::{bin_ser::BeSer, lsn::Lsn};
42 : use wal_decoder::serialized_batch::SerializedValueBatch;
43 :
44 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
45 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
46 :
47 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
48 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
49 :
50 : #[derive(Debug)]
51 : pub enum LsnForTimestamp {
52 : /// Found commits both before and after the given timestamp
53 : Present(Lsn),
54 :
55 : /// Found no commits after the given timestamp, this means
56 : /// that the newest data in the branch is older than the given
57 : /// timestamp.
58 : ///
59 : /// All commits <= LSN happened before the given timestamp
60 : Future(Lsn),
61 :
62 : /// The queried timestamp is past our horizon we look back at (PITR)
63 : ///
64 : /// All commits > LSN happened after the given timestamp,
65 : /// but any commits < LSN might have happened before or after
66 : /// the given timestamp. We don't know because no data before
67 : /// the given lsn is available.
68 : Past(Lsn),
69 :
70 : /// We have found no commit with a timestamp,
71 : /// so we can't return anything meaningful.
72 : ///
73 : /// The associated LSN is the lower bound value we can safely
74 : /// create branches on, but no statement is made if it is
75 : /// older or newer than the timestamp.
76 : ///
77 : /// This variant can e.g. be returned right after a
78 : /// cluster import.
79 : NoData(Lsn),
80 : }
81 :
82 0 : #[derive(Debug, thiserror::Error)]
83 : pub(crate) enum CalculateLogicalSizeError {
84 : #[error("cancelled")]
85 : Cancelled,
86 :
87 : /// Something went wrong while reading the metadata we use to calculate logical size
88 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
89 : /// in the `From` implementation for this variant.
90 : #[error(transparent)]
91 : PageRead(PageReconstructError),
92 :
93 : /// Something went wrong deserializing metadata that we read to calculate logical size
94 : #[error("decode error: {0}")]
95 : Decode(#[from] DeserializeError),
96 : }
97 :
98 0 : #[derive(Debug, thiserror::Error)]
99 : pub(crate) enum CollectKeySpaceError {
100 : #[error(transparent)]
101 : Decode(#[from] DeserializeError),
102 : #[error(transparent)]
103 : PageRead(PageReconstructError),
104 : #[error("cancelled")]
105 : Cancelled,
106 : }
107 :
108 : impl From<PageReconstructError> for CollectKeySpaceError {
109 0 : fn from(err: PageReconstructError) -> Self {
110 0 : match err {
111 0 : PageReconstructError::Cancelled => Self::Cancelled,
112 0 : err => Self::PageRead(err),
113 : }
114 0 : }
115 : }
116 :
117 : impl From<PageReconstructError> for CalculateLogicalSizeError {
118 0 : fn from(pre: PageReconstructError) -> Self {
119 0 : match pre {
120 0 : PageReconstructError::Cancelled => Self::Cancelled,
121 0 : _ => Self::PageRead(pre),
122 : }
123 0 : }
124 : }
125 :
126 0 : #[derive(Debug, thiserror::Error)]
127 : pub enum RelationError {
128 : #[error("Relation Already Exists")]
129 : AlreadyExists,
130 : #[error("invalid relnode")]
131 : InvalidRelnode,
132 : #[error(transparent)]
133 : Other(#[from] anyhow::Error),
134 : }
135 :
136 : ///
137 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
138 : /// and other special kinds of files, in a versioned key-value store. The
139 : /// Timeline struct provides the key-value store.
140 : ///
141 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
142 : /// implementation, and might be moved into a separate struct later.
143 : impl Timeline {
144 : /// Start ingesting a WAL record, or other atomic modification of
145 : /// the timeline.
146 : ///
147 : /// This provides a transaction-like interface to perform a bunch
148 : /// of modifications atomically.
149 : ///
150 : /// To ingest a WAL record, call begin_modification(lsn) to get a
151 : /// DatadirModification object. Use the functions in the object to
152 : /// modify the repository state, updating all the pages and metadata
153 : /// that the WAL record affects. When you're done, call commit() to
154 : /// commit the changes.
155 : ///
156 : /// Lsn stored in modification is advanced by `ingest_record` and
157 : /// is used by `commit()` to update `last_record_lsn`.
158 : ///
159 : /// Calling commit() will flush all the changes and reset the state,
160 : /// so the `DatadirModification` struct can be reused to perform the next modification.
161 : ///
162 : /// Note that any pending modifications you make through the
163 : /// modification object won't be visible to calls to the 'get' and list
164 : /// functions of the timeline until you finish! And if you update the
165 : /// same page twice, the last update wins.
166 : ///
167 268378 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
168 268378 : where
169 268378 : Self: Sized,
170 268378 : {
171 268378 : DatadirModification {
172 268378 : tline: self,
173 268378 : pending_lsns: Vec::new(),
174 268378 : pending_metadata_pages: HashMap::new(),
175 268378 : pending_data_batch: None,
176 268378 : pending_deletions: Vec::new(),
177 268378 : pending_nblocks: 0,
178 268378 : pending_directory_entries: Vec::new(),
179 268378 : pending_metadata_bytes: 0,
180 268378 : lsn,
181 268378 : }
182 268378 : }
183 :
184 : //------------------------------------------------------------------------------
185 : // Public GET functions
186 : //------------------------------------------------------------------------------
187 :
188 : /// Look up given page version.
189 18384 : pub(crate) async fn get_rel_page_at_lsn(
190 18384 : &self,
191 18384 : tag: RelTag,
192 18384 : blknum: BlockNumber,
193 18384 : version: Version<'_>,
194 18384 : ctx: &RequestContext,
195 18384 : ) -> Result<Bytes, PageReconstructError> {
196 18384 : if tag.relnode == 0 {
197 0 : return Err(PageReconstructError::Other(
198 0 : RelationError::InvalidRelnode.into(),
199 0 : ));
200 18384 : }
201 :
202 18384 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
203 18384 : if blknum >= nblocks {
204 0 : debug!(
205 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
206 0 : tag,
207 0 : blknum,
208 0 : version.get_lsn(),
209 : nblocks
210 : );
211 0 : return Ok(ZERO_PAGE.clone());
212 18384 : }
213 18384 :
214 18384 : let key = rel_block_to_key(tag, blknum);
215 18384 : version.get(self, key, ctx).await
216 18384 : }
217 :
218 : // Get size of a database in blocks
219 0 : pub(crate) async fn get_db_size(
220 0 : &self,
221 0 : spcnode: Oid,
222 0 : dbnode: Oid,
223 0 : version: Version<'_>,
224 0 : ctx: &RequestContext,
225 0 : ) -> Result<usize, PageReconstructError> {
226 0 : let mut total_blocks = 0;
227 :
228 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
229 :
230 0 : for rel in rels {
231 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
232 0 : total_blocks += n_blocks as usize;
233 : }
234 0 : Ok(total_blocks)
235 0 : }
236 :
237 : /// Get size of a relation file
238 24434 : pub(crate) async fn get_rel_size(
239 24434 : &self,
240 24434 : tag: RelTag,
241 24434 : version: Version<'_>,
242 24434 : ctx: &RequestContext,
243 24434 : ) -> Result<BlockNumber, PageReconstructError> {
244 24434 : if tag.relnode == 0 {
245 0 : return Err(PageReconstructError::Other(
246 0 : RelationError::InvalidRelnode.into(),
247 0 : ));
248 24434 : }
249 :
250 24434 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
251 19294 : return Ok(nblocks);
252 5140 : }
253 5140 :
254 5140 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
255 0 : && !self.get_rel_exists(tag, version, ctx).await?
256 : {
257 : // FIXME: Postgres sometimes calls smgrcreate() to create
258 : // FSM, and smgrnblocks() on it immediately afterwards,
259 : // without extending it. Tolerate that by claiming that
260 : // any non-existent FSM fork has size 0.
261 0 : return Ok(0);
262 5140 : }
263 5140 :
264 5140 : let key = rel_size_to_key(tag);
265 5140 : let mut buf = version.get(self, key, ctx).await?;
266 5136 : let nblocks = buf.get_u32_le();
267 5136 :
268 5136 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
269 5136 :
270 5136 : Ok(nblocks)
271 24434 : }
272 :
273 : /// Does relation exist?
274 6050 : pub(crate) async fn get_rel_exists(
275 6050 : &self,
276 6050 : tag: RelTag,
277 6050 : version: Version<'_>,
278 6050 : ctx: &RequestContext,
279 6050 : ) -> Result<bool, PageReconstructError> {
280 6050 : if tag.relnode == 0 {
281 0 : return Err(PageReconstructError::Other(
282 0 : RelationError::InvalidRelnode.into(),
283 0 : ));
284 6050 : }
285 :
286 : // first try to lookup relation in cache
287 6050 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
288 6032 : return Ok(true);
289 18 : }
290 : // then check if the database was already initialized.
291 : // get_rel_exists can be called before dbdir is created.
292 18 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
293 18 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
294 18 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
295 0 : return Ok(false);
296 18 : }
297 18 : // fetch directory listing
298 18 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
299 18 : let buf = version.get(self, key, ctx).await?;
300 :
301 18 : let dir = RelDirectory::des(&buf)?;
302 18 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
303 6050 : }
304 :
305 : /// Get a list of all existing relations in given tablespace and database.
306 : ///
307 : /// # Cancel-Safety
308 : ///
309 : /// This method is cancellation-safe.
310 0 : pub(crate) async fn list_rels(
311 0 : &self,
312 0 : spcnode: Oid,
313 0 : dbnode: Oid,
314 0 : version: Version<'_>,
315 0 : ctx: &RequestContext,
316 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
317 0 : // fetch directory listing
318 0 : let key = rel_dir_to_key(spcnode, dbnode);
319 0 : let buf = version.get(self, key, ctx).await?;
320 :
321 0 : let dir = RelDirectory::des(&buf)?;
322 0 : let rels: HashSet<RelTag> =
323 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
324 0 : spcnode,
325 0 : dbnode,
326 0 : relnode: *relnode,
327 0 : forknum: *forknum,
328 0 : }));
329 0 :
330 0 : Ok(rels)
331 0 : }
332 :
333 : /// Get the whole SLRU segment
334 0 : pub(crate) async fn get_slru_segment(
335 0 : &self,
336 0 : kind: SlruKind,
337 0 : segno: u32,
338 0 : lsn: Lsn,
339 0 : ctx: &RequestContext,
340 0 : ) -> Result<Bytes, PageReconstructError> {
341 0 : let n_blocks = self
342 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
343 0 : .await?;
344 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
345 0 : for blkno in 0..n_blocks {
346 0 : let block = self
347 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
348 0 : .await?;
349 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
350 : }
351 0 : Ok(segment.freeze())
352 0 : }
353 :
354 : /// Look up given SLRU page version.
355 0 : pub(crate) async fn get_slru_page_at_lsn(
356 0 : &self,
357 0 : kind: SlruKind,
358 0 : segno: u32,
359 0 : blknum: BlockNumber,
360 0 : lsn: Lsn,
361 0 : ctx: &RequestContext,
362 0 : ) -> Result<Bytes, PageReconstructError> {
363 0 : let key = slru_block_to_key(kind, segno, blknum);
364 0 : self.get(key, lsn, ctx).await
365 0 : }
366 :
367 : /// Get size of an SLRU segment
368 0 : pub(crate) async fn get_slru_segment_size(
369 0 : &self,
370 0 : kind: SlruKind,
371 0 : segno: u32,
372 0 : version: Version<'_>,
373 0 : ctx: &RequestContext,
374 0 : ) -> Result<BlockNumber, PageReconstructError> {
375 0 : let key = slru_segment_size_to_key(kind, segno);
376 0 : let mut buf = version.get(self, key, ctx).await?;
377 0 : Ok(buf.get_u32_le())
378 0 : }
379 :
380 : /// Get size of an SLRU segment
381 0 : pub(crate) async fn get_slru_segment_exists(
382 0 : &self,
383 0 : kind: SlruKind,
384 0 : segno: u32,
385 0 : version: Version<'_>,
386 0 : ctx: &RequestContext,
387 0 : ) -> Result<bool, PageReconstructError> {
388 0 : // fetch directory listing
389 0 : let key = slru_dir_to_key(kind);
390 0 : let buf = version.get(self, key, ctx).await?;
391 :
392 0 : let dir = SlruSegmentDirectory::des(&buf)?;
393 0 : Ok(dir.segments.contains(&segno))
394 0 : }
395 :
396 : /// Locate LSN, such that all transactions that committed before
397 : /// 'search_timestamp' are visible, but nothing newer is.
398 : ///
399 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
400 : /// so it's not well defined which LSN you get if there were multiple commits
401 : /// "in flight" at that point in time.
402 : ///
403 0 : pub(crate) async fn find_lsn_for_timestamp(
404 0 : &self,
405 0 : search_timestamp: TimestampTz,
406 0 : cancel: &CancellationToken,
407 0 : ctx: &RequestContext,
408 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
409 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
410 :
411 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
412 0 : // We use this method to figure out the branching LSN for the new branch, but the
413 0 : // GC cutoff could be before the branching point and we cannot create a new branch
414 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
415 0 : // on the safe side.
416 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
417 0 : let max_lsn = self.get_last_record_lsn();
418 0 :
419 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
420 0 : // LSN divided by 8.
421 0 : let mut low = min_lsn.0 / 8;
422 0 : let mut high = max_lsn.0 / 8 + 1;
423 0 :
424 0 : let mut found_smaller = false;
425 0 : let mut found_larger = false;
426 :
427 0 : while low < high {
428 0 : if cancel.is_cancelled() {
429 0 : return Err(PageReconstructError::Cancelled);
430 0 : }
431 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
432 0 : let mid = (high + low) / 2;
433 :
434 0 : let cmp = self
435 0 : .is_latest_commit_timestamp_ge_than(
436 0 : search_timestamp,
437 0 : Lsn(mid * 8),
438 0 : &mut found_smaller,
439 0 : &mut found_larger,
440 0 : ctx,
441 0 : )
442 0 : .await?;
443 :
444 0 : if cmp {
445 0 : high = mid;
446 0 : } else {
447 0 : low = mid + 1;
448 0 : }
449 : }
450 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
451 : // so the LSN of the last commit record before or at `search_timestamp`.
452 : // Remove one from `low` to get `t`.
453 : //
454 : // FIXME: it would be better to get the LSN of the previous commit.
455 : // Otherwise, if you restore to the returned LSN, the database will
456 : // include physical changes from later commits that will be marked
457 : // as aborted, and will need to be vacuumed away.
458 0 : let commit_lsn = Lsn((low - 1) * 8);
459 0 : match (found_smaller, found_larger) {
460 : (false, false) => {
461 : // This can happen if no commit records have been processed yet, e.g.
462 : // just after importing a cluster.
463 0 : Ok(LsnForTimestamp::NoData(min_lsn))
464 : }
465 : (false, true) => {
466 : // Didn't find any commit timestamps smaller than the request
467 0 : Ok(LsnForTimestamp::Past(min_lsn))
468 : }
469 0 : (true, _) if commit_lsn < min_lsn => {
470 0 : // the search above did set found_smaller to true but it never increased the lsn.
471 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
472 0 : // below the min_lsn. We should never do that.
473 0 : Ok(LsnForTimestamp::Past(min_lsn))
474 : }
475 : (true, false) => {
476 : // Only found commits with timestamps smaller than the request.
477 : // It's still a valid case for branch creation, return it.
478 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
479 : // case, anyway.
480 0 : Ok(LsnForTimestamp::Future(commit_lsn))
481 : }
482 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
483 : }
484 0 : }
485 :
486 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
487 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
488 : ///
489 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
490 : /// with a smaller/larger timestamp.
491 : ///
492 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
493 0 : &self,
494 0 : search_timestamp: TimestampTz,
495 0 : probe_lsn: Lsn,
496 0 : found_smaller: &mut bool,
497 0 : found_larger: &mut bool,
498 0 : ctx: &RequestContext,
499 0 : ) -> Result<bool, PageReconstructError> {
500 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
501 0 : if timestamp >= search_timestamp {
502 0 : *found_larger = true;
503 0 : return ControlFlow::Break(true);
504 0 : } else {
505 0 : *found_smaller = true;
506 0 : }
507 0 : ControlFlow::Continue(())
508 0 : })
509 0 : .await
510 0 : }
511 :
512 : /// Obtain the possible timestamp range for the given lsn.
513 : ///
514 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
515 0 : pub(crate) async fn get_timestamp_for_lsn(
516 0 : &self,
517 0 : probe_lsn: Lsn,
518 0 : ctx: &RequestContext,
519 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
520 0 : let mut max: Option<TimestampTz> = None;
521 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
522 0 : if let Some(max_prev) = max {
523 0 : max = Some(max_prev.max(timestamp));
524 0 : } else {
525 0 : max = Some(timestamp);
526 0 : }
527 0 : ControlFlow::Continue(())
528 0 : })
529 0 : .await?;
530 :
531 0 : Ok(max)
532 0 : }
533 :
534 : /// Runs the given function on all the timestamps for a given lsn
535 : ///
536 : /// The return value is either given by the closure, or set to the `Default`
537 : /// impl's output.
538 0 : async fn map_all_timestamps<T: Default>(
539 0 : &self,
540 0 : probe_lsn: Lsn,
541 0 : ctx: &RequestContext,
542 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
543 0 : ) -> Result<T, PageReconstructError> {
544 0 : for segno in self
545 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
546 0 : .await?
547 : {
548 0 : let nblocks = self
549 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
550 0 : .await?;
551 0 : for blknum in (0..nblocks).rev() {
552 0 : let clog_page = self
553 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
554 0 : .await?;
555 :
556 0 : if clog_page.len() == BLCKSZ as usize + 8 {
557 0 : let mut timestamp_bytes = [0u8; 8];
558 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
559 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
560 0 :
561 0 : match f(timestamp) {
562 0 : ControlFlow::Break(b) => return Ok(b),
563 0 : ControlFlow::Continue(()) => (),
564 : }
565 0 : }
566 : }
567 : }
568 0 : Ok(Default::default())
569 0 : }
570 :
571 0 : pub(crate) async fn get_slru_keyspace(
572 0 : &self,
573 0 : version: Version<'_>,
574 0 : ctx: &RequestContext,
575 0 : ) -> Result<KeySpace, PageReconstructError> {
576 0 : let mut accum = KeySpaceAccum::new();
577 :
578 0 : for kind in SlruKind::iter() {
579 0 : let mut segments: Vec<u32> = self
580 0 : .list_slru_segments(kind, version, ctx)
581 0 : .await?
582 0 : .into_iter()
583 0 : .collect();
584 0 : segments.sort_unstable();
585 :
586 0 : for seg in segments {
587 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
588 :
589 0 : accum.add_range(
590 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
591 0 : );
592 : }
593 : }
594 :
595 0 : Ok(accum.to_keyspace())
596 0 : }
597 :
598 : /// Get a list of SLRU segments
599 0 : pub(crate) async fn list_slru_segments(
600 0 : &self,
601 0 : kind: SlruKind,
602 0 : version: Version<'_>,
603 0 : ctx: &RequestContext,
604 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
605 0 : // fetch directory entry
606 0 : let key = slru_dir_to_key(kind);
607 :
608 0 : let buf = version.get(self, key, ctx).await?;
609 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
610 0 : }
611 :
612 0 : pub(crate) async fn get_relmap_file(
613 0 : &self,
614 0 : spcnode: Oid,
615 0 : dbnode: Oid,
616 0 : version: Version<'_>,
617 0 : ctx: &RequestContext,
618 0 : ) -> Result<Bytes, PageReconstructError> {
619 0 : let key = relmap_file_key(spcnode, dbnode);
620 :
621 0 : let buf = version.get(self, key, ctx).await?;
622 0 : Ok(buf)
623 0 : }
624 :
625 290 : pub(crate) async fn list_dbdirs(
626 290 : &self,
627 290 : lsn: Lsn,
628 290 : ctx: &RequestContext,
629 290 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
630 : // fetch directory entry
631 3095 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
632 :
633 290 : Ok(DbDirectory::des(&buf)?.dbdirs)
634 290 : }
635 :
636 0 : pub(crate) async fn get_twophase_file(
637 0 : &self,
638 0 : xid: u64,
639 0 : lsn: Lsn,
640 0 : ctx: &RequestContext,
641 0 : ) -> Result<Bytes, PageReconstructError> {
642 0 : let key = twophase_file_key(xid);
643 0 : let buf = self.get(key, lsn, ctx).await?;
644 0 : Ok(buf)
645 0 : }
646 :
647 292 : pub(crate) async fn list_twophase_files(
648 292 : &self,
649 292 : lsn: Lsn,
650 292 : ctx: &RequestContext,
651 292 : ) -> Result<HashSet<u64>, PageReconstructError> {
652 : // fetch directory entry
653 3130 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
654 :
655 292 : if self.pg_version >= 17 {
656 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
657 : } else {
658 292 : Ok(TwoPhaseDirectory::des(&buf)?
659 : .xids
660 292 : .iter()
661 292 : .map(|x| u64::from(*x))
662 292 : .collect())
663 : }
664 292 : }
665 :
666 0 : pub(crate) async fn get_control_file(
667 0 : &self,
668 0 : lsn: Lsn,
669 0 : ctx: &RequestContext,
670 0 : ) -> Result<Bytes, PageReconstructError> {
671 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
672 0 : }
673 :
674 12 : pub(crate) async fn get_checkpoint(
675 12 : &self,
676 12 : lsn: Lsn,
677 12 : ctx: &RequestContext,
678 12 : ) -> Result<Bytes, PageReconstructError> {
679 12 : self.get(CHECKPOINT_KEY, lsn, ctx).await
680 12 : }
681 :
682 12 : async fn list_aux_files_v2(
683 12 : &self,
684 12 : lsn: Lsn,
685 12 : ctx: &RequestContext,
686 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
687 12 : let kv = self
688 12 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
689 0 : .await?;
690 12 : let mut result = HashMap::new();
691 12 : let mut sz = 0;
692 30 : for (_, v) in kv {
693 18 : let v = v?;
694 18 : let v = aux_file::decode_file_value_bytes(&v)
695 18 : .context("value decode")
696 18 : .map_err(PageReconstructError::Other)?;
697 34 : for (fname, content) in v {
698 16 : sz += fname.len();
699 16 : sz += content.len();
700 16 : result.insert(fname, content);
701 16 : }
702 : }
703 12 : self.aux_file_size_estimator.on_initial(sz);
704 12 : Ok(result)
705 12 : }
706 :
707 0 : pub(crate) async fn trigger_aux_file_size_computation(
708 0 : &self,
709 0 : lsn: Lsn,
710 0 : ctx: &RequestContext,
711 0 : ) -> Result<(), PageReconstructError> {
712 0 : self.list_aux_files_v2(lsn, ctx).await?;
713 0 : Ok(())
714 0 : }
715 :
716 12 : pub(crate) async fn list_aux_files(
717 12 : &self,
718 12 : lsn: Lsn,
719 12 : ctx: &RequestContext,
720 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
721 12 : self.list_aux_files_v2(lsn, ctx).await
722 12 : }
723 :
724 0 : pub(crate) async fn get_replorigins(
725 0 : &self,
726 0 : lsn: Lsn,
727 0 : ctx: &RequestContext,
728 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
729 0 : let kv = self
730 0 : .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
731 0 : .await?;
732 0 : let mut result = HashMap::new();
733 0 : for (k, v) in kv {
734 0 : let v = v?;
735 0 : let origin_id = k.field6 as RepOriginId;
736 0 : let origin_lsn = Lsn::des(&v).unwrap();
737 0 : if origin_lsn != Lsn::INVALID {
738 0 : result.insert(origin_id, origin_lsn);
739 0 : }
740 : }
741 0 : Ok(result)
742 0 : }
743 :
744 : /// Does the same as get_current_logical_size but counted on demand.
745 : /// Used to initialize the logical size tracking on startup.
746 : ///
747 : /// Only relation blocks are counted currently. That excludes metadata,
748 : /// SLRUs, twophase files etc.
749 : ///
750 : /// # Cancel-Safety
751 : ///
752 : /// This method is cancellation-safe.
753 0 : pub(crate) async fn get_current_logical_size_non_incremental(
754 0 : &self,
755 0 : lsn: Lsn,
756 0 : ctx: &RequestContext,
757 0 : ) -> Result<u64, CalculateLogicalSizeError> {
758 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
759 :
760 : // Fetch list of database dirs and iterate them
761 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
762 0 : let dbdir = DbDirectory::des(&buf)?;
763 :
764 0 : let mut total_size: u64 = 0;
765 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
766 0 : for rel in self
767 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
768 0 : .await?
769 : {
770 0 : if self.cancel.is_cancelled() {
771 0 : return Err(CalculateLogicalSizeError::Cancelled);
772 0 : }
773 0 : let relsize_key = rel_size_to_key(rel);
774 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
775 0 : let relsize = buf.get_u32_le();
776 0 :
777 0 : total_size += relsize as u64;
778 : }
779 : }
780 0 : Ok(total_size * BLCKSZ as u64)
781 0 : }
782 :
783 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
784 : /// for gc-compaction.
785 : ///
786 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
787 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
788 : /// be kept only for a specific range of LSN.
789 : ///
790 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
791 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
792 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
793 : /// determine which keys to retain/drop for gc-compaction.
794 : ///
795 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
796 : /// to be retained for each of the branch LSN.
797 : ///
798 : /// The return value is (dense keyspace, sparse keyspace).
799 40 : pub(crate) async fn collect_gc_compaction_keyspace(
800 40 : &self,
801 40 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
802 40 : let metadata_key_begin = Key::metadata_key_range().start;
803 40 : let aux_v1_key = AUX_FILES_KEY;
804 40 : let dense_keyspace = KeySpace {
805 40 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
806 40 : };
807 40 : Ok((
808 40 : dense_keyspace,
809 40 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
810 40 : ))
811 40 : }
812 :
813 : ///
814 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
815 : /// Anything that's not listed maybe removed from the underlying storage (from
816 : /// that LSN forwards).
817 : ///
818 : /// The return value is (dense keyspace, sparse keyspace).
819 290 : pub(crate) async fn collect_keyspace(
820 290 : &self,
821 290 : lsn: Lsn,
822 290 : ctx: &RequestContext,
823 290 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
824 290 : // Iterate through key ranges, greedily packing them into partitions
825 290 : let mut result = KeySpaceAccum::new();
826 290 :
827 290 : // The dbdir metadata always exists
828 290 : result.add_key(DBDIR_KEY);
829 :
830 : // Fetch list of database dirs and iterate them
831 3095 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
832 290 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
833 290 :
834 290 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
835 290 : for ((spcnode, dbnode), has_relmap_file) in dbs {
836 0 : if has_relmap_file {
837 0 : result.add_key(relmap_file_key(spcnode, dbnode));
838 0 : }
839 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
840 :
841 0 : let mut rels: Vec<RelTag> = self
842 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
843 0 : .await?
844 0 : .into_iter()
845 0 : .collect();
846 0 : rels.sort_unstable();
847 0 : for rel in rels {
848 0 : let relsize_key = rel_size_to_key(rel);
849 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
850 0 : let relsize = buf.get_u32_le();
851 0 :
852 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
853 0 : result.add_key(relsize_key);
854 : }
855 : }
856 :
857 : // Iterate SLRUs next
858 870 : for kind in [
859 290 : SlruKind::Clog,
860 290 : SlruKind::MultiXactMembers,
861 290 : SlruKind::MultiXactOffsets,
862 : ] {
863 870 : let slrudir_key = slru_dir_to_key(kind);
864 870 : result.add_key(slrudir_key);
865 9497 : let buf = self.get(slrudir_key, lsn, ctx).await?;
866 870 : let dir = SlruSegmentDirectory::des(&buf)?;
867 870 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
868 870 : segments.sort_unstable();
869 870 : for segno in segments {
870 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
871 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
872 0 : let segsize = buf.get_u32_le();
873 0 :
874 0 : result.add_range(
875 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
876 0 : );
877 0 : result.add_key(segsize_key);
878 : }
879 : }
880 :
881 : // Then pg_twophase
882 290 : result.add_key(TWOPHASEDIR_KEY);
883 :
884 290 : let mut xids: Vec<u64> = self
885 290 : .list_twophase_files(lsn, ctx)
886 3129 : .await?
887 290 : .iter()
888 290 : .cloned()
889 290 : .collect();
890 290 : xids.sort_unstable();
891 290 : for xid in xids {
892 0 : result.add_key(twophase_file_key(xid));
893 0 : }
894 :
895 290 : result.add_key(CONTROLFILE_KEY);
896 290 : result.add_key(CHECKPOINT_KEY);
897 290 :
898 290 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
899 290 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
900 290 : // and the keys will not be garbage-colllected.
901 290 : #[cfg(test)]
902 290 : {
903 290 : let guard = self.extra_test_dense_keyspace.load();
904 290 : for kr in &guard.ranges {
905 0 : result.add_range(kr.clone());
906 0 : }
907 0 : }
908 0 :
909 290 : let dense_keyspace = result.to_keyspace();
910 290 : let sparse_keyspace = SparseKeySpace(KeySpace {
911 290 : ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
912 290 : });
913 290 :
914 290 : if cfg!(debug_assertions) {
915 : // Verify if the sparse keyspaces are ordered and non-overlapping.
916 :
917 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
918 : // category of sparse keys are split into their own image/delta files. If there
919 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
920 : // and we want the developer to keep the keyspaces separated.
921 :
922 290 : let ranges = &sparse_keyspace.0.ranges;
923 :
924 : // TODO: use a single overlaps_with across the codebase
925 290 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
926 290 : !(a.end <= b.start || b.end <= a.start)
927 290 : }
928 580 : for i in 0..ranges.len() {
929 580 : for j in 0..i {
930 290 : if overlaps_with(&ranges[i], &ranges[j]) {
931 0 : panic!(
932 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
933 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
934 0 : );
935 290 : }
936 : }
937 : }
938 290 : for i in 1..ranges.len() {
939 290 : assert!(
940 290 : ranges[i - 1].end <= ranges[i].start,
941 0 : "unordered sparse keyspace: {}..{} and {}..{}",
942 0 : ranges[i - 1].start,
943 0 : ranges[i - 1].end,
944 0 : ranges[i].start,
945 0 : ranges[i].end
946 : );
947 : }
948 0 : }
949 :
950 290 : Ok((dense_keyspace, sparse_keyspace))
951 290 : }
952 :
953 : /// Get cached size of relation if it not updated after specified LSN
954 448540 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
955 448540 : let rel_size_cache = self.rel_size_cache.read().unwrap();
956 448540 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
957 448518 : if lsn >= *cached_lsn {
958 443372 : return Some(*nblocks);
959 5146 : }
960 22 : }
961 5168 : None
962 448540 : }
963 :
964 : /// Update cached relation size if there is no more recent update
965 5136 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
966 5136 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
967 5136 :
968 5136 : if lsn < rel_size_cache.complete_as_of {
969 : // Do not cache old values. It's safe to cache the size on read, as long as
970 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
971 : // never evict values from the cache, so if the relation size changed after
972 : // 'lsn', the new value is already in the cache.
973 0 : return;
974 5136 : }
975 5136 :
976 5136 : match rel_size_cache.map.entry(tag) {
977 5136 : hash_map::Entry::Occupied(mut entry) => {
978 5136 : let cached_lsn = entry.get_mut();
979 5136 : if lsn >= cached_lsn.0 {
980 0 : *cached_lsn = (lsn, nblocks);
981 5136 : }
982 : }
983 0 : hash_map::Entry::Vacant(entry) => {
984 0 : entry.insert((lsn, nblocks));
985 0 : }
986 : }
987 5136 : }
988 :
989 : /// Store cached relation size
990 282720 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
991 282720 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
992 282720 : rel_size_cache.map.insert(tag, (lsn, nblocks));
993 282720 : }
994 :
995 : /// Remove cached relation size
996 2 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
997 2 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
998 2 : rel_size_cache.map.remove(tag);
999 2 : }
1000 : }
1001 :
1002 : /// DatadirModification represents an operation to ingest an atomic set of
1003 : /// updates to the repository.
1004 : ///
1005 : /// It is created by the 'begin_record' function. It is called for each WAL
1006 : /// record, so that all the modifications by a one WAL record appear atomic.
1007 : pub struct DatadirModification<'a> {
1008 : /// The timeline this modification applies to. You can access this to
1009 : /// read the state, but note that any pending updates are *not* reflected
1010 : /// in the state in 'tline' yet.
1011 : pub tline: &'a Timeline,
1012 :
1013 : /// Current LSN of the modification
1014 : lsn: Lsn,
1015 :
1016 : // The modifications are not applied directly to the underlying key-value store.
1017 : // The put-functions add the modifications here, and they are flushed to the
1018 : // underlying key-value store by the 'finish' function.
1019 : pending_lsns: Vec<Lsn>,
1020 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1021 : pending_nblocks: i64,
1022 :
1023 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1024 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1025 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1026 :
1027 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1028 : /// which keys are stored here.
1029 : pending_data_batch: Option<SerializedValueBatch>,
1030 :
1031 : /// For special "directory" keys that store key-value maps, track the size of the map
1032 : /// if it was updated in this modification.
1033 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
1034 :
1035 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1036 : pending_metadata_bytes: usize,
1037 : }
1038 :
1039 : impl<'a> DatadirModification<'a> {
1040 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1041 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1042 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1043 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1044 :
1045 : /// Get the current lsn
1046 418058 : pub(crate) fn get_lsn(&self) -> Lsn {
1047 418058 : self.lsn
1048 418058 : }
1049 :
1050 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1051 0 : self.pending_data_batch
1052 0 : .as_ref()
1053 0 : .map_or(0, |b| b.buffer_size())
1054 0 : + self.pending_metadata_bytes
1055 0 : }
1056 :
1057 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1058 0 : !self
1059 0 : .pending_data_batch
1060 0 : .as_ref()
1061 0 : .map_or(true, |b| b.is_empty())
1062 0 : }
1063 :
1064 : /// Set the current lsn
1065 145858 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1066 145858 : ensure!(
1067 145858 : lsn >= self.lsn,
1068 0 : "setting an older lsn {} than {} is not allowed",
1069 : lsn,
1070 : self.lsn
1071 : );
1072 :
1073 145858 : if lsn > self.lsn {
1074 145858 : self.pending_lsns.push(self.lsn);
1075 145858 : self.lsn = lsn;
1076 145858 : }
1077 145858 : Ok(())
1078 145858 : }
1079 :
1080 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1081 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1082 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1083 : ///
1084 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1085 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1086 : /// via [`Self::get`] as soon as their record has been ingested.
1087 850418 : fn is_data_key(key: &Key) -> bool {
1088 850418 : key.is_rel_block_key() || key.is_slru_block_key()
1089 850418 : }
1090 :
1091 : /// Initialize a completely new repository.
1092 : ///
1093 : /// This inserts the directory metadata entries that are assumed to
1094 : /// always exist.
1095 176 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1096 176 : let buf = DbDirectory::ser(&DbDirectory {
1097 176 : dbdirs: HashMap::new(),
1098 176 : })?;
1099 176 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
1100 176 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1101 :
1102 176 : let buf = if self.tline.pg_version >= 17 {
1103 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1104 0 : xids: HashSet::new(),
1105 0 : })
1106 : } else {
1107 176 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1108 176 : xids: HashSet::new(),
1109 176 : })
1110 0 : }?;
1111 176 : self.pending_directory_entries
1112 176 : .push((DirectoryKind::TwoPhase, 0));
1113 176 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1114 :
1115 176 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1116 176 : let empty_dir = Value::Image(buf);
1117 176 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1118 176 : self.pending_directory_entries
1119 176 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1120 176 : self.put(
1121 176 : slru_dir_to_key(SlruKind::MultiXactMembers),
1122 176 : empty_dir.clone(),
1123 176 : );
1124 176 : self.pending_directory_entries
1125 176 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1126 176 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1127 176 : self.pending_directory_entries
1128 176 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1129 176 :
1130 176 : Ok(())
1131 176 : }
1132 :
1133 : #[cfg(test)]
1134 174 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1135 174 : self.init_empty()?;
1136 174 : self.put_control_file(bytes::Bytes::from_static(
1137 174 : b"control_file contents do not matter",
1138 174 : ))
1139 174 : .context("put_control_file")?;
1140 174 : self.put_checkpoint(bytes::Bytes::from_static(
1141 174 : b"checkpoint_file contents do not matter",
1142 174 : ))
1143 174 : .context("put_checkpoint_file")?;
1144 174 : Ok(())
1145 174 : }
1146 :
1147 : /// Creates a relation if it is not already present.
1148 : /// Returns the current size of the relation
1149 418056 : pub(crate) async fn create_relation_if_required(
1150 418056 : &mut self,
1151 418056 : rel: RelTag,
1152 418056 : ctx: &RequestContext,
1153 418056 : ) -> Result<u32, PageReconstructError> {
1154 : // Get current size and put rel creation if rel doesn't exist
1155 : //
1156 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1157 : // check the cache too. This is because eagerly checking the cache results in
1158 : // less work overall and 10% better performance. It's more work on cache miss
1159 : // but cache miss is rare.
1160 418056 : if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
1161 418046 : Ok(nblocks)
1162 10 : } else if !self
1163 10 : .tline
1164 10 : .get_rel_exists(rel, Version::Modified(self), ctx)
1165 2 : .await?
1166 : {
1167 : // create it with 0 size initially, the logic below will extend it
1168 10 : self.put_rel_creation(rel, 0, ctx)
1169 3 : .await
1170 10 : .context("Relation Error")?;
1171 10 : Ok(0)
1172 : } else {
1173 0 : self.tline
1174 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1175 0 : .await
1176 : }
1177 418056 : }
1178 :
1179 : /// Given a block number for a relation (which represents a newly written block),
1180 : /// the previous block count of the relation, and the shard info, find the gaps
1181 : /// that were created by the newly written block if any.
1182 145670 : fn find_gaps(
1183 145670 : rel: RelTag,
1184 145670 : blkno: u32,
1185 145670 : previous_nblocks: u32,
1186 145670 : shard: &ShardIdentity,
1187 145670 : ) -> Option<KeySpace> {
1188 145670 : let mut key = rel_block_to_key(rel, blkno);
1189 145670 : let mut gap_accum = None;
1190 :
1191 145670 : for gap_blkno in previous_nblocks..blkno {
1192 32 : key.field6 = gap_blkno;
1193 32 :
1194 32 : if shard.get_shard_number(&key) != shard.number {
1195 8 : continue;
1196 24 : }
1197 24 :
1198 24 : gap_accum
1199 24 : .get_or_insert_with(KeySpaceAccum::new)
1200 24 : .add_key(key);
1201 : }
1202 :
1203 145670 : gap_accum.map(|accum| accum.to_keyspace())
1204 145670 : }
1205 :
1206 145852 : pub async fn ingest_batch(
1207 145852 : &mut self,
1208 145852 : mut batch: SerializedValueBatch,
1209 145852 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1210 145852 : shard: &ShardIdentity,
1211 145852 : ctx: &RequestContext,
1212 145852 : ) -> anyhow::Result<()> {
1213 145852 : let mut gaps_at_lsns = Vec::default();
1214 :
1215 145852 : for meta in batch.metadata.iter() {
1216 145642 : let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
1217 145642 : let new_nblocks = blkno + 1;
1218 :
1219 145642 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1220 145642 : if new_nblocks > old_nblocks {
1221 2390 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1222 143252 : }
1223 :
1224 145642 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1225 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1226 145642 : }
1227 : }
1228 :
1229 145852 : if !gaps_at_lsns.is_empty() {
1230 0 : batch.zero_gaps(gaps_at_lsns);
1231 145852 : }
1232 :
1233 145852 : match self.pending_data_batch.as_mut() {
1234 20 : Some(pending_batch) => {
1235 20 : pending_batch.extend(batch);
1236 20 : }
1237 145832 : None if !batch.is_empty() => {
1238 145630 : self.pending_data_batch = Some(batch);
1239 145630 : }
1240 202 : None => {
1241 202 : // Nothing to initialize the batch with
1242 202 : }
1243 : }
1244 :
1245 145852 : Ok(())
1246 145852 : }
1247 :
1248 : /// Put a new page version that can be constructed from a WAL record
1249 : ///
1250 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1251 : /// current end-of-file. It's up to the caller to check that the relation size
1252 : /// matches the blocks inserted!
1253 12 : pub fn put_rel_wal_record(
1254 12 : &mut self,
1255 12 : rel: RelTag,
1256 12 : blknum: BlockNumber,
1257 12 : rec: NeonWalRecord,
1258 12 : ) -> anyhow::Result<()> {
1259 12 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1260 12 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1261 12 : Ok(())
1262 12 : }
1263 :
1264 : // Same, but for an SLRU.
1265 8 : pub fn put_slru_wal_record(
1266 8 : &mut self,
1267 8 : kind: SlruKind,
1268 8 : segno: u32,
1269 8 : blknum: BlockNumber,
1270 8 : rec: NeonWalRecord,
1271 8 : ) -> anyhow::Result<()> {
1272 8 : self.put(
1273 8 : slru_block_to_key(kind, segno, blknum),
1274 8 : Value::WalRecord(rec),
1275 8 : );
1276 8 : Ok(())
1277 8 : }
1278 :
1279 : /// Like put_wal_record, but with ready-made image of the page.
1280 277842 : pub fn put_rel_page_image(
1281 277842 : &mut self,
1282 277842 : rel: RelTag,
1283 277842 : blknum: BlockNumber,
1284 277842 : img: Bytes,
1285 277842 : ) -> anyhow::Result<()> {
1286 277842 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1287 277842 : let key = rel_block_to_key(rel, blknum);
1288 277842 : if !key.is_valid_key_on_write_path() {
1289 0 : anyhow::bail!(
1290 0 : "the request contains data not supported by pageserver at {}",
1291 0 : key
1292 0 : );
1293 277842 : }
1294 277842 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1295 277842 : Ok(())
1296 277842 : }
1297 :
1298 6 : pub fn put_slru_page_image(
1299 6 : &mut self,
1300 6 : kind: SlruKind,
1301 6 : segno: u32,
1302 6 : blknum: BlockNumber,
1303 6 : img: Bytes,
1304 6 : ) -> anyhow::Result<()> {
1305 6 : let key = slru_block_to_key(kind, segno, blknum);
1306 6 : if !key.is_valid_key_on_write_path() {
1307 0 : anyhow::bail!(
1308 0 : "the request contains data not supported by pageserver at {}",
1309 0 : key
1310 0 : );
1311 6 : }
1312 6 : self.put(key, Value::Image(img));
1313 6 : Ok(())
1314 6 : }
1315 :
1316 2998 : pub(crate) fn put_rel_page_image_zero(
1317 2998 : &mut self,
1318 2998 : rel: RelTag,
1319 2998 : blknum: BlockNumber,
1320 2998 : ) -> anyhow::Result<()> {
1321 2998 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1322 2998 : let key = rel_block_to_key(rel, blknum);
1323 2998 : if !key.is_valid_key_on_write_path() {
1324 0 : anyhow::bail!(
1325 0 : "the request contains data not supported by pageserver: {} @ {}",
1326 0 : key,
1327 0 : self.lsn
1328 0 : );
1329 2998 : }
1330 2998 :
1331 2998 : let batch = self
1332 2998 : .pending_data_batch
1333 2998 : .get_or_insert_with(SerializedValueBatch::default);
1334 2998 :
1335 2998 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1336 2998 :
1337 2998 : Ok(())
1338 2998 : }
1339 :
1340 0 : pub(crate) fn put_slru_page_image_zero(
1341 0 : &mut self,
1342 0 : kind: SlruKind,
1343 0 : segno: u32,
1344 0 : blknum: BlockNumber,
1345 0 : ) -> anyhow::Result<()> {
1346 0 : let key = slru_block_to_key(kind, segno, blknum);
1347 0 : if !key.is_valid_key_on_write_path() {
1348 0 : anyhow::bail!(
1349 0 : "the request contains data not supported by pageserver: {} @ {}",
1350 0 : key,
1351 0 : self.lsn
1352 0 : );
1353 0 : }
1354 0 :
1355 0 : let batch = self
1356 0 : .pending_data_batch
1357 0 : .get_or_insert_with(SerializedValueBatch::default);
1358 0 :
1359 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1360 0 :
1361 0 : Ok(())
1362 0 : }
1363 :
1364 : /// Store a relmapper file (pg_filenode.map) in the repository
1365 16 : pub async fn put_relmap_file(
1366 16 : &mut self,
1367 16 : spcnode: Oid,
1368 16 : dbnode: Oid,
1369 16 : img: Bytes,
1370 16 : ctx: &RequestContext,
1371 16 : ) -> anyhow::Result<()> {
1372 : // Add it to the directory (if it doesn't exist already)
1373 16 : let buf = self.get(DBDIR_KEY, ctx).await?;
1374 16 : let mut dbdir = DbDirectory::des(&buf)?;
1375 :
1376 16 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1377 16 : if r.is_none() || r == Some(false) {
1378 : // The dbdir entry didn't exist, or it contained a
1379 : // 'false'. The 'insert' call already updated it with
1380 : // 'true', now write the updated 'dbdirs' map back.
1381 16 : let buf = DbDirectory::ser(&dbdir)?;
1382 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1383 0 : }
1384 16 : if r.is_none() {
1385 : // Create RelDirectory
1386 8 : let buf = RelDirectory::ser(&RelDirectory {
1387 8 : rels: HashSet::new(),
1388 8 : })?;
1389 8 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1390 8 : self.put(
1391 8 : rel_dir_to_key(spcnode, dbnode),
1392 8 : Value::Image(Bytes::from(buf)),
1393 8 : );
1394 8 : }
1395 :
1396 16 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1397 16 : Ok(())
1398 16 : }
1399 :
1400 0 : pub async fn put_twophase_file(
1401 0 : &mut self,
1402 0 : xid: u64,
1403 0 : img: Bytes,
1404 0 : ctx: &RequestContext,
1405 0 : ) -> anyhow::Result<()> {
1406 : // Add it to the directory entry
1407 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1408 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1409 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1410 0 : if !dir.xids.insert(xid) {
1411 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1412 0 : }
1413 0 : self.pending_directory_entries
1414 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1415 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1416 : } else {
1417 0 : let xid = xid as u32;
1418 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1419 0 : if !dir.xids.insert(xid) {
1420 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1421 0 : }
1422 0 : self.pending_directory_entries
1423 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1424 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1425 : };
1426 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1427 0 :
1428 0 : self.put(twophase_file_key(xid), Value::Image(img));
1429 0 : Ok(())
1430 0 : }
1431 :
1432 0 : pub async fn set_replorigin(
1433 0 : &mut self,
1434 0 : origin_id: RepOriginId,
1435 0 : origin_lsn: Lsn,
1436 0 : ) -> anyhow::Result<()> {
1437 0 : let key = repl_origin_key(origin_id);
1438 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1439 0 : Ok(())
1440 0 : }
1441 :
1442 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1443 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1444 0 : }
1445 :
1446 176 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1447 176 : self.put(CONTROLFILE_KEY, Value::Image(img));
1448 176 : Ok(())
1449 176 : }
1450 :
1451 190 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1452 190 : self.put(CHECKPOINT_KEY, Value::Image(img));
1453 190 : Ok(())
1454 190 : }
1455 :
1456 0 : pub async fn drop_dbdir(
1457 0 : &mut self,
1458 0 : spcnode: Oid,
1459 0 : dbnode: Oid,
1460 0 : ctx: &RequestContext,
1461 0 : ) -> anyhow::Result<()> {
1462 0 : let total_blocks = self
1463 0 : .tline
1464 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1465 0 : .await?;
1466 :
1467 : // Remove entry from dbdir
1468 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1469 0 : let mut dir = DbDirectory::des(&buf)?;
1470 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1471 0 : let buf = DbDirectory::ser(&dir)?;
1472 0 : self.pending_directory_entries
1473 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1474 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1475 : } else {
1476 0 : warn!(
1477 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1478 : spcnode, dbnode
1479 : );
1480 : }
1481 :
1482 : // Update logical database size.
1483 0 : self.pending_nblocks -= total_blocks as i64;
1484 0 :
1485 0 : // Delete all relations and metadata files for the spcnode/dnode
1486 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1487 0 : Ok(())
1488 0 : }
1489 :
1490 : /// Create a relation fork.
1491 : ///
1492 : /// 'nblocks' is the initial size.
1493 1920 : pub async fn put_rel_creation(
1494 1920 : &mut self,
1495 1920 : rel: RelTag,
1496 1920 : nblocks: BlockNumber,
1497 1920 : ctx: &RequestContext,
1498 1920 : ) -> Result<(), RelationError> {
1499 1920 : if rel.relnode == 0 {
1500 0 : return Err(RelationError::InvalidRelnode);
1501 1920 : }
1502 : // It's possible that this is the first rel for this db in this
1503 : // tablespace. Create the reldir entry for it if so.
1504 1920 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1505 1920 : .context("deserialize db")?;
1506 1920 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1507 1920 : let mut rel_dir =
1508 1920 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1509 : // Didn't exist. Update dbdir
1510 8 : e.insert(false);
1511 8 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1512 8 : self.pending_directory_entries
1513 8 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1514 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1515 8 :
1516 8 : // and create the RelDirectory
1517 8 : RelDirectory::default()
1518 : } else {
1519 : // reldir already exists, fetch it
1520 1912 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1521 1912 : .context("deserialize db")?
1522 : };
1523 :
1524 : // Add the new relation to the rel directory entry, and write it back
1525 1920 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1526 0 : return Err(RelationError::AlreadyExists);
1527 1920 : }
1528 1920 :
1529 1920 : self.pending_directory_entries
1530 1920 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1531 1920 :
1532 1920 : self.put(
1533 1920 : rel_dir_key,
1534 1920 : Value::Image(Bytes::from(
1535 1920 : RelDirectory::ser(&rel_dir).context("serialize")?,
1536 : )),
1537 : );
1538 :
1539 : // Put size
1540 1920 : let size_key = rel_size_to_key(rel);
1541 1920 : let buf = nblocks.to_le_bytes();
1542 1920 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1543 1920 :
1544 1920 : self.pending_nblocks += nblocks as i64;
1545 1920 :
1546 1920 : // Update relation size cache
1547 1920 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1548 1920 :
1549 1920 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1550 1920 : // caller.
1551 1920 : Ok(())
1552 1920 : }
1553 :
1554 : /// Truncate relation
1555 6012 : pub async fn put_rel_truncation(
1556 6012 : &mut self,
1557 6012 : rel: RelTag,
1558 6012 : nblocks: BlockNumber,
1559 6012 : ctx: &RequestContext,
1560 6012 : ) -> anyhow::Result<()> {
1561 6012 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1562 6012 : if self
1563 6012 : .tline
1564 6012 : .get_rel_exists(rel, Version::Modified(self), ctx)
1565 0 : .await?
1566 : {
1567 6012 : let size_key = rel_size_to_key(rel);
1568 : // Fetch the old size first
1569 6012 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1570 6012 :
1571 6012 : // Update the entry with the new size.
1572 6012 : let buf = nblocks.to_le_bytes();
1573 6012 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1574 6012 :
1575 6012 : // Update relation size cache
1576 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1577 6012 :
1578 6012 : // Update logical database size.
1579 6012 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1580 0 : }
1581 6012 : Ok(())
1582 6012 : }
1583 :
1584 : /// Extend relation
1585 : /// If new size is smaller, do nothing.
1586 276680 : pub async fn put_rel_extend(
1587 276680 : &mut self,
1588 276680 : rel: RelTag,
1589 276680 : nblocks: BlockNumber,
1590 276680 : ctx: &RequestContext,
1591 276680 : ) -> anyhow::Result<()> {
1592 276680 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1593 :
1594 : // Put size
1595 276680 : let size_key = rel_size_to_key(rel);
1596 276680 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1597 276680 :
1598 276680 : // only extend relation here. never decrease the size
1599 276680 : if nblocks > old_size {
1600 274788 : let buf = nblocks.to_le_bytes();
1601 274788 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1602 274788 :
1603 274788 : // Update relation size cache
1604 274788 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1605 274788 :
1606 274788 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1607 274788 : }
1608 276680 : Ok(())
1609 276680 : }
1610 :
1611 : /// Drop some relations
1612 10 : pub(crate) async fn put_rel_drops(
1613 10 : &mut self,
1614 10 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
1615 10 : ctx: &RequestContext,
1616 10 : ) -> anyhow::Result<()> {
1617 12 : for ((spc_node, db_node), rel_tags) in drop_relations {
1618 2 : let dir_key = rel_dir_to_key(spc_node, db_node);
1619 2 : let buf = self.get(dir_key, ctx).await?;
1620 2 : let mut dir = RelDirectory::des(&buf)?;
1621 :
1622 2 : let mut dirty = false;
1623 4 : for rel_tag in rel_tags {
1624 2 : if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
1625 2 : dirty = true;
1626 2 :
1627 2 : // update logical size
1628 2 : let size_key = rel_size_to_key(rel_tag);
1629 2 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1630 2 : self.pending_nblocks -= old_size as i64;
1631 2 :
1632 2 : // Remove entry from relation size cache
1633 2 : self.tline.remove_cached_rel_size(&rel_tag);
1634 2 :
1635 2 : // Delete size entry, as well as all blocks
1636 2 : self.delete(rel_key_range(rel_tag));
1637 0 : }
1638 : }
1639 :
1640 2 : if dirty {
1641 2 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1642 2 : self.pending_directory_entries
1643 2 : .push((DirectoryKind::Rel, dir.rels.len()));
1644 0 : }
1645 : }
1646 :
1647 10 : Ok(())
1648 10 : }
1649 :
1650 6 : pub async fn put_slru_segment_creation(
1651 6 : &mut self,
1652 6 : kind: SlruKind,
1653 6 : segno: u32,
1654 6 : nblocks: BlockNumber,
1655 6 : ctx: &RequestContext,
1656 6 : ) -> anyhow::Result<()> {
1657 6 : // Add it to the directory entry
1658 6 : let dir_key = slru_dir_to_key(kind);
1659 6 : let buf = self.get(dir_key, ctx).await?;
1660 6 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1661 :
1662 6 : if !dir.segments.insert(segno) {
1663 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1664 6 : }
1665 6 : self.pending_directory_entries
1666 6 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1667 6 : self.put(
1668 6 : dir_key,
1669 6 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1670 : );
1671 :
1672 : // Put size
1673 6 : let size_key = slru_segment_size_to_key(kind, segno);
1674 6 : let buf = nblocks.to_le_bytes();
1675 6 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1676 6 :
1677 6 : // even if nblocks > 0, we don't insert any actual blocks here
1678 6 :
1679 6 : Ok(())
1680 6 : }
1681 :
1682 : /// Extend SLRU segment
1683 0 : pub fn put_slru_extend(
1684 0 : &mut self,
1685 0 : kind: SlruKind,
1686 0 : segno: u32,
1687 0 : nblocks: BlockNumber,
1688 0 : ) -> anyhow::Result<()> {
1689 0 : // Put size
1690 0 : let size_key = slru_segment_size_to_key(kind, segno);
1691 0 : let buf = nblocks.to_le_bytes();
1692 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1693 0 : Ok(())
1694 0 : }
1695 :
1696 : /// This method is used for marking truncated SLRU files
1697 0 : pub async fn drop_slru_segment(
1698 0 : &mut self,
1699 0 : kind: SlruKind,
1700 0 : segno: u32,
1701 0 : ctx: &RequestContext,
1702 0 : ) -> anyhow::Result<()> {
1703 0 : // Remove it from the directory entry
1704 0 : let dir_key = slru_dir_to_key(kind);
1705 0 : let buf = self.get(dir_key, ctx).await?;
1706 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1707 :
1708 0 : if !dir.segments.remove(&segno) {
1709 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1710 0 : }
1711 0 : self.pending_directory_entries
1712 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1713 0 : self.put(
1714 0 : dir_key,
1715 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1716 : );
1717 :
1718 : // Delete size entry, as well as all blocks
1719 0 : self.delete(slru_segment_key_range(kind, segno));
1720 0 :
1721 0 : Ok(())
1722 0 : }
1723 :
1724 : /// Drop a relmapper file (pg_filenode.map)
1725 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1726 0 : // TODO
1727 0 : Ok(())
1728 0 : }
1729 :
1730 : /// This method is used for marking truncated SLRU files
1731 0 : pub async fn drop_twophase_file(
1732 0 : &mut self,
1733 0 : xid: u64,
1734 0 : ctx: &RequestContext,
1735 0 : ) -> anyhow::Result<()> {
1736 : // Remove it from the directory entry
1737 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1738 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1739 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
1740 :
1741 0 : if !dir.xids.remove(&xid) {
1742 0 : warn!("twophase file for xid {} does not exist", xid);
1743 0 : }
1744 0 : self.pending_directory_entries
1745 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1746 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1747 : } else {
1748 0 : let xid: u32 = u32::try_from(xid)?;
1749 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1750 :
1751 0 : if !dir.xids.remove(&xid) {
1752 0 : warn!("twophase file for xid {} does not exist", xid);
1753 0 : }
1754 0 : self.pending_directory_entries
1755 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1756 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1757 : };
1758 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1759 0 :
1760 0 : // Delete it
1761 0 : self.delete(twophase_key_range(xid));
1762 0 :
1763 0 : Ok(())
1764 0 : }
1765 :
1766 16 : pub async fn put_file(
1767 16 : &mut self,
1768 16 : path: &str,
1769 16 : content: &[u8],
1770 16 : ctx: &RequestContext,
1771 16 : ) -> anyhow::Result<()> {
1772 16 : let key = aux_file::encode_aux_file_key(path);
1773 : // retrieve the key from the engine
1774 16 : let old_val = match self.get(key, ctx).await {
1775 4 : Ok(val) => Some(val),
1776 12 : Err(PageReconstructError::MissingKey(_)) => None,
1777 0 : Err(e) => return Err(e.into()),
1778 : };
1779 16 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
1780 4 : aux_file::decode_file_value(old_val)?
1781 : } else {
1782 12 : Vec::new()
1783 : };
1784 16 : let mut other_files = Vec::with_capacity(files.len());
1785 16 : let mut modifying_file = None;
1786 20 : for file @ (p, content) in files {
1787 4 : if path == p {
1788 4 : assert!(
1789 4 : modifying_file.is_none(),
1790 0 : "duplicated entries found for {}",
1791 : path
1792 : );
1793 4 : modifying_file = Some(content);
1794 0 : } else {
1795 0 : other_files.push(file);
1796 0 : }
1797 : }
1798 16 : let mut new_files = other_files;
1799 16 : match (modifying_file, content.is_empty()) {
1800 2 : (Some(old_content), false) => {
1801 2 : self.tline
1802 2 : .aux_file_size_estimator
1803 2 : .on_update(old_content.len(), content.len());
1804 2 : new_files.push((path, content));
1805 2 : }
1806 2 : (Some(old_content), true) => {
1807 2 : self.tline
1808 2 : .aux_file_size_estimator
1809 2 : .on_remove(old_content.len());
1810 2 : // not adding the file key to the final `new_files` vec.
1811 2 : }
1812 12 : (None, false) => {
1813 12 : self.tline.aux_file_size_estimator.on_add(content.len());
1814 12 : new_files.push((path, content));
1815 12 : }
1816 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
1817 : }
1818 16 : let new_val = aux_file::encode_file_value(&new_files)?;
1819 16 : self.put(key, Value::Image(new_val.into()));
1820 16 :
1821 16 : Ok(())
1822 16 : }
1823 :
1824 : ///
1825 : /// Flush changes accumulated so far to the underlying repository.
1826 : ///
1827 : /// Usually, changes made in DatadirModification are atomic, but this allows
1828 : /// you to flush them to the underlying repository before the final `commit`.
1829 : /// That allows to free up the memory used to hold the pending changes.
1830 : ///
1831 : /// Currently only used during bulk import of a data directory. In that
1832 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1833 : /// whole import fails and the timeline will be deleted anyway.
1834 : /// (Or to be precise, it will be left behind for debugging purposes and
1835 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1836 : ///
1837 : /// Note: A consequence of flushing the pending operations is that they
1838 : /// won't be visible to subsequent operations until `commit`. The function
1839 : /// retains all the metadata, but data pages are flushed. That's again OK
1840 : /// for bulk import, where you are just loading data pages and won't try to
1841 : /// modify the same pages twice.
1842 1930 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1843 1930 : // Unless we have accumulated a decent amount of changes, it's not worth it
1844 1930 : // to scan through the pending_updates list.
1845 1930 : let pending_nblocks = self.pending_nblocks;
1846 1930 : if pending_nblocks < 10000 {
1847 1930 : return Ok(());
1848 0 : }
1849 :
1850 0 : let mut writer = self.tline.writer().await;
1851 :
1852 : // Flush relation and SLRU data blocks, keep metadata.
1853 0 : if let Some(batch) = self.pending_data_batch.take() {
1854 0 : tracing::debug!(
1855 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
1856 0 : batch.max_lsn,
1857 0 : self.tline.get_last_record_lsn()
1858 : );
1859 :
1860 : // This bails out on first error without modifying pending_updates.
1861 : // That's Ok, cf this function's doc comment.
1862 0 : writer.put_batch(batch, ctx).await?;
1863 0 : }
1864 :
1865 0 : if pending_nblocks != 0 {
1866 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1867 0 : self.pending_nblocks = 0;
1868 0 : }
1869 :
1870 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1871 0 : writer.update_directory_entries_count(kind, count as u64);
1872 0 : }
1873 :
1874 0 : Ok(())
1875 1930 : }
1876 :
1877 : ///
1878 : /// Finish this atomic update, writing all the updated keys to the
1879 : /// underlying timeline.
1880 : /// All the modifications in this atomic update are stamped by the specified LSN.
1881 : ///
1882 743062 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1883 743062 : let mut writer = self.tline.writer().await;
1884 :
1885 743062 : let pending_nblocks = self.pending_nblocks;
1886 743062 : self.pending_nblocks = 0;
1887 :
1888 : // Ordering: the items in this batch do not need to be in any global order, but values for
1889 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
1890 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
1891 : // more details.
1892 :
1893 743062 : let metadata_batch = {
1894 743062 : let pending_meta = self
1895 743062 : .pending_metadata_pages
1896 743062 : .drain()
1897 743062 : .flat_map(|(key, values)| {
1898 273812 : values
1899 273812 : .into_iter()
1900 273812 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
1901 743062 : })
1902 743062 : .collect::<Vec<_>>();
1903 743062 :
1904 743062 : if pending_meta.is_empty() {
1905 472278 : None
1906 : } else {
1907 270784 : Some(SerializedValueBatch::from_values(pending_meta))
1908 : }
1909 : };
1910 :
1911 743062 : let data_batch = self.pending_data_batch.take();
1912 :
1913 743062 : let maybe_batch = match (data_batch, metadata_batch) {
1914 264556 : (Some(mut data), Some(metadata)) => {
1915 264556 : data.extend(metadata);
1916 264556 : Some(data)
1917 : }
1918 143262 : (Some(data), None) => Some(data),
1919 6228 : (None, Some(metadata)) => Some(metadata),
1920 329016 : (None, None) => None,
1921 : };
1922 :
1923 743062 : if let Some(batch) = maybe_batch {
1924 414046 : tracing::debug!(
1925 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
1926 0 : batch.max_lsn,
1927 0 : self.tline.get_last_record_lsn()
1928 : );
1929 :
1930 : // This bails out on first error without modifying pending_updates.
1931 : // That's Ok, cf this function's doc comment.
1932 414046 : writer.put_batch(batch, ctx).await?;
1933 329016 : }
1934 :
1935 743062 : if !self.pending_deletions.is_empty() {
1936 2 : writer.delete_batch(&self.pending_deletions, ctx).await?;
1937 2 : self.pending_deletions.clear();
1938 743060 : }
1939 :
1940 743062 : self.pending_lsns.push(self.lsn);
1941 888920 : for pending_lsn in self.pending_lsns.drain(..) {
1942 888920 : // TODO(vlad): pretty sure the comment below is not valid anymore
1943 888920 : // and we can call finish write with the latest LSN
1944 888920 : //
1945 888920 : // Ideally, we should be able to call writer.finish_write() only once
1946 888920 : // with the highest LSN. However, the last_record_lsn variable in the
1947 888920 : // timeline keeps track of the latest LSN and the immediate previous LSN
1948 888920 : // so we need to record every LSN to not leave a gap between them.
1949 888920 : writer.finish_write(pending_lsn);
1950 888920 : }
1951 :
1952 743062 : if pending_nblocks != 0 {
1953 270570 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1954 472492 : }
1955 :
1956 743062 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1957 2824 : writer.update_directory_entries_count(kind, count as u64);
1958 2824 : }
1959 :
1960 743062 : self.pending_metadata_bytes = 0;
1961 743062 :
1962 743062 : Ok(())
1963 743062 : }
1964 :
1965 291704 : pub(crate) fn len(&self) -> usize {
1966 291704 : self.pending_metadata_pages.len()
1967 291704 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
1968 291704 : + self.pending_deletions.len()
1969 291704 : }
1970 :
1971 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
1972 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
1973 : /// in the modification.
1974 : ///
1975 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
1976 : /// page must ensure that the pages they read are already committed in Timeline, for example
1977 : /// DB create operations are always preceded by a call to commit(). This is special cased because
1978 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
1979 : /// and not data pages.
1980 286586 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1981 286586 : if !Self::is_data_key(&key) {
1982 : // Have we already updated the same key? Read the latest pending updated
1983 : // version in that case.
1984 : //
1985 : // Note: we don't check pending_deletions. It is an error to request a
1986 : // value that has been removed, deletion only avoids leaking storage.
1987 286586 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
1988 15928 : if let Some((_, _, value)) = values.last() {
1989 15928 : return if let Value::Image(img) = value {
1990 15928 : Ok(img.clone())
1991 : } else {
1992 : // Currently, we never need to read back a WAL record that we
1993 : // inserted in the same "transaction". All the metadata updates
1994 : // work directly with Images, and we never need to read actual
1995 : // data pages. We could handle this if we had to, by calling
1996 : // the walredo manager, but let's keep it simple for now.
1997 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
1998 0 : "unexpected pending WAL record"
1999 0 : )))
2000 : };
2001 0 : }
2002 270658 : }
2003 : } else {
2004 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2005 : // this key should never be present in pending_data_pages. We ensure this by committing
2006 : // modifications before ingesting DB create operations, which are the only kind that reads
2007 : // data pages during ingest.
2008 0 : if cfg!(debug_assertions) {
2009 0 : assert!(!self
2010 0 : .pending_data_batch
2011 0 : .as_ref()
2012 0 : .map_or(false, |b| b.updates_key(&key)));
2013 0 : }
2014 : }
2015 :
2016 : // Metadata page cache miss, or we're reading a data page.
2017 270658 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2018 270658 : self.tline.get(key, lsn, ctx).await
2019 286586 : }
2020 :
2021 563832 : fn put(&mut self, key: Key, val: Value) {
2022 563832 : if Self::is_data_key(&key) {
2023 277868 : self.put_data(key.to_compact(), val)
2024 : } else {
2025 285964 : self.put_metadata(key.to_compact(), val)
2026 : }
2027 563832 : }
2028 :
2029 277868 : fn put_data(&mut self, key: CompactKey, val: Value) {
2030 277868 : let batch = self
2031 277868 : .pending_data_batch
2032 277868 : .get_or_insert_with(SerializedValueBatch::default);
2033 277868 : batch.put(key, val, self.lsn);
2034 277868 : }
2035 :
2036 285964 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2037 285964 : let values = self.pending_metadata_pages.entry(key).or_default();
2038 : // Replace the previous value if it exists at the same lsn
2039 285964 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2040 12152 : if *last_lsn == self.lsn {
2041 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2042 12152 : self.pending_metadata_bytes -= *last_value_ser_size;
2043 12152 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2044 12152 : self.pending_metadata_bytes += *last_value_ser_size;
2045 12152 :
2046 12152 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2047 12152 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2048 12152 : *last_value = val;
2049 12152 : return;
2050 0 : }
2051 273812 : }
2052 :
2053 273812 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2054 273812 : self.pending_metadata_bytes += val_serialized_size;
2055 273812 : values.push((self.lsn, val_serialized_size, val));
2056 273812 :
2057 273812 : if key == CHECKPOINT_KEY.to_compact() {
2058 190 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2059 273622 : }
2060 285964 : }
2061 :
2062 2 : fn delete(&mut self, key_range: Range<Key>) {
2063 2 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2064 2 : self.pending_deletions.push((key_range, self.lsn));
2065 2 : }
2066 : }
2067 :
2068 : /// This struct facilitates accessing either a committed key from the timeline at a
2069 : /// specific LSN, or the latest uncommitted key from a pending modification.
2070 : ///
2071 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2072 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2073 : /// need to look up the keys in the modification first before looking them up in the
2074 : /// timeline to not miss the latest updates.
2075 : #[derive(Clone, Copy)]
2076 : pub enum Version<'a> {
2077 : Lsn(Lsn),
2078 : Modified(&'a DatadirModification<'a>),
2079 : }
2080 :
2081 : impl<'a> Version<'a> {
2082 23560 : async fn get(
2083 23560 : &self,
2084 23560 : timeline: &Timeline,
2085 23560 : key: Key,
2086 23560 : ctx: &RequestContext,
2087 23560 : ) -> Result<Bytes, PageReconstructError> {
2088 23560 : match self {
2089 23540 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2090 20 : Version::Modified(modification) => modification.get(key, ctx).await,
2091 : }
2092 23560 : }
2093 :
2094 35620 : fn get_lsn(&self) -> Lsn {
2095 35620 : match self {
2096 29574 : Version::Lsn(lsn) => *lsn,
2097 6046 : Version::Modified(modification) => modification.lsn,
2098 : }
2099 35620 : }
2100 : }
2101 :
2102 : //--- Metadata structs stored in key-value pairs in the repository.
2103 :
2104 2244 : #[derive(Debug, Serialize, Deserialize)]
2105 : struct DbDirectory {
2106 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2107 : dbdirs: HashMap<(Oid, Oid), bool>,
2108 : }
2109 :
2110 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2111 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2112 : // were named like "pg_twophase/000002E5", now they're like
2113 : // "pg_twophsae/0000000A000002E4".
2114 :
2115 292 : #[derive(Debug, Serialize, Deserialize)]
2116 : struct TwoPhaseDirectory {
2117 : xids: HashSet<TransactionId>,
2118 : }
2119 :
2120 0 : #[derive(Debug, Serialize, Deserialize)]
2121 : struct TwoPhaseDirectoryV17 {
2122 : xids: HashSet<u64>,
2123 : }
2124 :
2125 1932 : #[derive(Debug, Serialize, Deserialize, Default)]
2126 : struct RelDirectory {
2127 : // Set of relations that exist. (relfilenode, forknum)
2128 : //
2129 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2130 : // key-value pairs, if you have a lot of relations
2131 : rels: HashSet<(Oid, u8)>,
2132 : }
2133 :
2134 0 : #[derive(Debug, Serialize, Deserialize)]
2135 : struct RelSizeEntry {
2136 : nblocks: u32,
2137 : }
2138 :
2139 876 : #[derive(Debug, Serialize, Deserialize, Default)]
2140 : struct SlruSegmentDirectory {
2141 : // Set of SLRU segments that exist.
2142 : segments: HashSet<u32>,
2143 : }
2144 :
2145 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2146 : #[repr(u8)]
2147 : pub(crate) enum DirectoryKind {
2148 : Db,
2149 : TwoPhase,
2150 : Rel,
2151 : AuxFiles,
2152 : SlruSegment(SlruKind),
2153 : }
2154 :
2155 : impl DirectoryKind {
2156 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2157 5648 : pub(crate) fn offset(&self) -> usize {
2158 5648 : self.into_usize()
2159 5648 : }
2160 : }
2161 :
2162 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2163 :
2164 : #[allow(clippy::bool_assert_comparison)]
2165 : #[cfg(test)]
2166 : mod tests {
2167 : use hex_literal::hex;
2168 : use pageserver_api::{models::ShardParameters, shard::ShardStripeSize};
2169 : use utils::{
2170 : id::TimelineId,
2171 : shard::{ShardCount, ShardNumber},
2172 : };
2173 :
2174 : use super::*;
2175 :
2176 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
2177 :
2178 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2179 : #[tokio::test]
2180 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2181 2 : let name = "aux_files_round_trip";
2182 2 : let harness = TenantHarness::create(name).await?;
2183 2 :
2184 2 : pub const TIMELINE_ID: TimelineId =
2185 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2186 2 :
2187 20 : let (tenant, ctx) = harness.load().await;
2188 2 : let tline = tenant
2189 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2190 2 : .await?;
2191 2 : let tline = tline.raw_timeline().unwrap();
2192 2 :
2193 2 : // First modification: insert two keys
2194 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
2195 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2196 2 : modification.set_lsn(Lsn(0x1008))?;
2197 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2198 2 : modification.commit(&ctx).await?;
2199 2 : let expect_1008 = HashMap::from([
2200 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2201 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2202 2 : ]);
2203 2 :
2204 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2205 2 : assert_eq!(readback, expect_1008);
2206 2 :
2207 2 : // Second modification: update one key, remove the other
2208 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
2209 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2210 2 : modification.set_lsn(Lsn(0x2008))?;
2211 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
2212 2 : modification.commit(&ctx).await?;
2213 2 : let expect_2008 =
2214 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2215 2 :
2216 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
2217 2 : assert_eq!(readback, expect_2008);
2218 2 :
2219 2 : // Reading back in time works
2220 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2221 2 : assert_eq!(readback, expect_1008);
2222 2 :
2223 2 : Ok(())
2224 2 : }
2225 :
2226 : #[test]
2227 2 : fn gap_finding() {
2228 2 : let rel = RelTag {
2229 2 : spcnode: 1663,
2230 2 : dbnode: 208101,
2231 2 : relnode: 2620,
2232 2 : forknum: 0,
2233 2 : };
2234 2 : let base_blkno = 1;
2235 2 :
2236 2 : let base_key = rel_block_to_key(rel, base_blkno);
2237 2 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2238 2 :
2239 2 : let shard = ShardIdentity::unsharded();
2240 2 :
2241 2 : let mut previous_nblocks = 0;
2242 22 : for i in 0..10 {
2243 20 : let crnt_blkno = base_blkno + i;
2244 20 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2245 20 :
2246 20 : previous_nblocks = crnt_blkno + 1;
2247 20 :
2248 20 : if i == 0 {
2249 : // The first block we write is 1, so we should find the gap.
2250 2 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2251 : } else {
2252 18 : assert!(gaps.is_none());
2253 : }
2254 : }
2255 :
2256 : // This is an update to an already existing block. No gaps here.
2257 2 : let update_blkno = 5;
2258 2 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2259 2 : assert!(gaps.is_none());
2260 :
2261 : // This is an update past the current end block.
2262 2 : let after_gap_blkno = 20;
2263 2 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2264 2 :
2265 2 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2266 2 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2267 2 : assert_eq!(
2268 2 : gaps.unwrap(),
2269 2 : KeySpace::single(gap_start_key..after_gap_key)
2270 2 : );
2271 2 : }
2272 :
2273 : #[test]
2274 2 : fn sharded_gap_finding() {
2275 2 : let rel = RelTag {
2276 2 : spcnode: 1663,
2277 2 : dbnode: 208101,
2278 2 : relnode: 2620,
2279 2 : forknum: 0,
2280 2 : };
2281 2 :
2282 2 : let first_blkno = 6;
2283 2 :
2284 2 : // This shard will get the even blocks
2285 2 : let shard = ShardIdentity::from_params(
2286 2 : ShardNumber(0),
2287 2 : &ShardParameters {
2288 2 : count: ShardCount(2),
2289 2 : stripe_size: ShardStripeSize(1),
2290 2 : },
2291 2 : );
2292 2 :
2293 2 : // Only keys belonging to this shard are considered as gaps.
2294 2 : let mut previous_nblocks = 0;
2295 2 : let gaps =
2296 2 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2297 2 : assert!(!gaps.ranges.is_empty());
2298 6 : for gap_range in gaps.ranges {
2299 4 : let mut k = gap_range.start;
2300 8 : while k != gap_range.end {
2301 4 : assert_eq!(shard.get_shard_number(&k), shard.number);
2302 4 : k = k.next();
2303 : }
2304 : }
2305 :
2306 2 : previous_nblocks = first_blkno;
2307 2 :
2308 2 : let update_blkno = 2;
2309 2 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2310 2 : assert!(gaps.is_none());
2311 2 : }
2312 :
2313 : /*
2314 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2315 : let incremental = timeline.get_current_logical_size();
2316 : let non_incremental = timeline
2317 : .get_current_logical_size_non_incremental(lsn)
2318 : .unwrap();
2319 : assert_eq!(incremental, non_incremental);
2320 : }
2321 : */
2322 :
2323 : /*
2324 : ///
2325 : /// Test list_rels() function, with branches and dropped relations
2326 : ///
2327 : #[test]
2328 : fn test_list_rels_drop() -> Result<()> {
2329 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2330 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2331 : const TESTDB: u32 = 111;
2332 :
2333 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2334 : // after branching fails below
2335 : let mut writer = tline.begin_record(Lsn(0x10));
2336 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2337 : writer.finish()?;
2338 :
2339 : // Create a relation on the timeline
2340 : let mut writer = tline.begin_record(Lsn(0x20));
2341 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2342 : writer.finish()?;
2343 :
2344 : let writer = tline.begin_record(Lsn(0x00));
2345 : writer.finish()?;
2346 :
2347 : // Check that list_rels() lists it after LSN 2, but no before it
2348 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2349 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2350 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2351 :
2352 : // Create a branch, check that the relation is visible there
2353 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2354 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2355 : Some(timeline) => timeline,
2356 : None => panic!("Should have a local timeline"),
2357 : };
2358 : let newtline = DatadirTimelineImpl::new(newtline);
2359 : assert!(newtline
2360 : .list_rels(0, TESTDB, Lsn(0x30))?
2361 : .contains(&TESTREL_A));
2362 :
2363 : // Drop it on the branch
2364 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2365 : new_writer.drop_relation(TESTREL_A)?;
2366 : new_writer.finish()?;
2367 :
2368 : // Check that it's no longer listed on the branch after the point where it was dropped
2369 : assert!(newtline
2370 : .list_rels(0, TESTDB, Lsn(0x30))?
2371 : .contains(&TESTREL_A));
2372 : assert!(!newtline
2373 : .list_rels(0, TESTDB, Lsn(0x40))?
2374 : .contains(&TESTREL_A));
2375 :
2376 : // Run checkpoint and garbage collection and check that it's still not visible
2377 : newtline.checkpoint(CheckpointConfig::Forced)?;
2378 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2379 :
2380 : assert!(!newtline
2381 : .list_rels(0, TESTDB, Lsn(0x40))?
2382 : .contains(&TESTREL_A));
2383 :
2384 : Ok(())
2385 : }
2386 : */
2387 :
2388 : /*
2389 : #[test]
2390 : fn test_read_beyond_eof() -> Result<()> {
2391 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2392 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2393 :
2394 : make_some_layers(&tline, Lsn(0x20))?;
2395 : let mut writer = tline.begin_record(Lsn(0x60));
2396 : walingest.put_rel_page_image(
2397 : &mut writer,
2398 : TESTREL_A,
2399 : 0,
2400 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2401 : )?;
2402 : writer.finish()?;
2403 :
2404 : // Test read before rel creation. Should error out.
2405 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2406 :
2407 : // Read block beyond end of relation at different points in time.
2408 : // These reads should fall into different delta, image, and in-memory layers.
2409 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2410 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2411 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2412 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2413 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2414 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2415 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2416 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2417 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2418 :
2419 : // Test on an in-memory layer with no preceding layer
2420 : let mut writer = tline.begin_record(Lsn(0x70));
2421 : walingest.put_rel_page_image(
2422 : &mut writer,
2423 : TESTREL_B,
2424 : 0,
2425 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2426 : )?;
2427 : writer.finish()?;
2428 :
2429 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2430 :
2431 : Ok(())
2432 : }
2433 : */
2434 : }
|