Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::metrics::WAL_INGEST;
13 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
14 : use crate::walrecord::NeonWalRecord;
15 : use crate::{aux_file, repository::*};
16 : use anyhow::{ensure, Context};
17 : use bytes::{Buf, Bytes, BytesMut};
18 : use enum_map::Enum;
19 : use itertools::Itertools;
20 : use pageserver_api::key::{
21 : dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
22 : rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
23 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
24 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
25 : };
26 : use pageserver_api::keyspace::SparseKeySpace;
27 : use pageserver_api::models::AuxFilePolicy;
28 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
29 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
30 : use postgres_ffi::BLCKSZ;
31 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
32 : use serde::{Deserialize, Serialize};
33 : use std::collections::{hash_map, HashMap, HashSet};
34 : use std::ops::ControlFlow;
35 : use std::ops::Range;
36 : use strum::IntoEnumIterator;
37 : use tokio_util::sync::CancellationToken;
38 : use tracing::{debug, trace, warn};
39 : use utils::bin_ser::DeserializeError;
40 : use utils::vec_map::{VecMap, VecMapOrdering};
41 : use utils::{bin_ser::BeSer, lsn::Lsn};
42 :
43 : const MAX_AUX_FILE_DELTAS: usize = 1024;
44 :
45 : #[derive(Debug)]
46 : pub enum LsnForTimestamp {
47 : /// Found commits both before and after the given timestamp
48 : Present(Lsn),
49 :
50 : /// Found no commits after the given timestamp, this means
51 : /// that the newest data in the branch is older than the given
52 : /// timestamp.
53 : ///
54 : /// All commits <= LSN happened before the given timestamp
55 : Future(Lsn),
56 :
57 : /// The queried timestamp is past our horizon we look back at (PITR)
58 : ///
59 : /// All commits > LSN happened after the given timestamp,
60 : /// but any commits < LSN might have happened before or after
61 : /// the given timestamp. We don't know because no data before
62 : /// the given lsn is available.
63 : Past(Lsn),
64 :
65 : /// We have found no commit with a timestamp,
66 : /// so we can't return anything meaningful.
67 : ///
68 : /// The associated LSN is the lower bound value we can safely
69 : /// create branches on, but no statement is made if it is
70 : /// older or newer than the timestamp.
71 : ///
72 : /// This variant can e.g. be returned right after a
73 : /// cluster import.
74 : NoData(Lsn),
75 : }
76 :
77 0 : #[derive(Debug, thiserror::Error)]
78 : pub enum CalculateLogicalSizeError {
79 : #[error("cancelled")]
80 : Cancelled,
81 : #[error(transparent)]
82 : Other(#[from] anyhow::Error),
83 : }
84 :
85 0 : #[derive(Debug, thiserror::Error)]
86 : pub(crate) enum CollectKeySpaceError {
87 : #[error(transparent)]
88 : Decode(#[from] DeserializeError),
89 : #[error(transparent)]
90 : PageRead(PageReconstructError),
91 : #[error("cancelled")]
92 : Cancelled,
93 : }
94 :
95 : impl From<PageReconstructError> for CollectKeySpaceError {
96 0 : fn from(err: PageReconstructError) -> Self {
97 0 : match err {
98 0 : PageReconstructError::Cancelled => Self::Cancelled,
99 0 : err => Self::PageRead(err),
100 : }
101 0 : }
102 : }
103 :
104 : impl From<PageReconstructError> for CalculateLogicalSizeError {
105 0 : fn from(pre: PageReconstructError) -> Self {
106 0 : match pre {
107 : PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
108 0 : Self::Cancelled
109 : }
110 0 : _ => Self::Other(pre.into()),
111 : }
112 0 : }
113 : }
114 :
115 0 : #[derive(Debug, thiserror::Error)]
116 : pub enum RelationError {
117 : #[error("Relation Already Exists")]
118 : AlreadyExists,
119 : #[error("invalid relnode")]
120 : InvalidRelnode,
121 : #[error(transparent)]
122 : Other(#[from] anyhow::Error),
123 : }
124 :
125 : ///
126 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
127 : /// and other special kinds of files, in a versioned key-value store. The
128 : /// Timeline struct provides the key-value store.
129 : ///
130 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
131 : /// implementation, and might be moved into a separate struct later.
132 : impl Timeline {
133 : /// Start ingesting a WAL record, or other atomic modification of
134 : /// the timeline.
135 : ///
136 : /// This provides a transaction-like interface to perform a bunch
137 : /// of modifications atomically.
138 : ///
139 : /// To ingest a WAL record, call begin_modification(lsn) to get a
140 : /// DatadirModification object. Use the functions in the object to
141 : /// modify the repository state, updating all the pages and metadata
142 : /// that the WAL record affects. When you're done, call commit() to
143 : /// commit the changes.
144 : ///
145 : /// Lsn stored in modification is advanced by `ingest_record` and
146 : /// is used by `commit()` to update `last_record_lsn`.
147 : ///
148 : /// Calling commit() will flush all the changes and reset the state,
149 : /// so the `DatadirModification` struct can be reused to perform the next modification.
150 : ///
151 : /// Note that any pending modifications you make through the
152 : /// modification object won't be visible to calls to the 'get' and list
153 : /// functions of the timeline until you finish! And if you update the
154 : /// same page twice, the last update wins.
155 : ///
156 268302 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
157 268302 : where
158 268302 : Self: Sized,
159 268302 : {
160 268302 : DatadirModification {
161 268302 : tline: self,
162 268302 : pending_lsns: Vec::new(),
163 268302 : pending_updates: HashMap::new(),
164 268302 : pending_deletions: Vec::new(),
165 268302 : pending_nblocks: 0,
166 268302 : pending_directory_entries: Vec::new(),
167 268302 : lsn,
168 268302 : }
169 268302 : }
170 :
171 : //------------------------------------------------------------------------------
172 : // Public GET functions
173 : //------------------------------------------------------------------------------
174 :
175 : /// Look up given page version.
176 18384 : pub(crate) async fn get_rel_page_at_lsn(
177 18384 : &self,
178 18384 : tag: RelTag,
179 18384 : blknum: BlockNumber,
180 18384 : version: Version<'_>,
181 18384 : ctx: &RequestContext,
182 18384 : ) -> Result<Bytes, PageReconstructError> {
183 18384 : if tag.relnode == 0 {
184 0 : return Err(PageReconstructError::Other(
185 0 : RelationError::InvalidRelnode.into(),
186 0 : ));
187 18384 : }
188 :
189 18384 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
190 18384 : if blknum >= nblocks {
191 0 : debug!(
192 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
193 0 : tag,
194 0 : blknum,
195 0 : version.get_lsn(),
196 : nblocks
197 : );
198 0 : return Ok(ZERO_PAGE.clone());
199 18384 : }
200 18384 :
201 18384 : let key = rel_block_to_key(tag, blknum);
202 18384 : version.get(self, key, ctx).await
203 18384 : }
204 :
205 : // Get size of a database in blocks
206 0 : pub(crate) async fn get_db_size(
207 0 : &self,
208 0 : spcnode: Oid,
209 0 : dbnode: Oid,
210 0 : version: Version<'_>,
211 0 : ctx: &RequestContext,
212 0 : ) -> Result<usize, PageReconstructError> {
213 0 : let mut total_blocks = 0;
214 :
215 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
216 :
217 0 : for rel in rels {
218 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
219 0 : total_blocks += n_blocks as usize;
220 : }
221 0 : Ok(total_blocks)
222 0 : }
223 :
224 : /// Get size of a relation file
225 24434 : pub(crate) async fn get_rel_size(
226 24434 : &self,
227 24434 : tag: RelTag,
228 24434 : version: Version<'_>,
229 24434 : ctx: &RequestContext,
230 24434 : ) -> Result<BlockNumber, PageReconstructError> {
231 24434 : if tag.relnode == 0 {
232 0 : return Err(PageReconstructError::Other(
233 0 : RelationError::InvalidRelnode.into(),
234 0 : ));
235 24434 : }
236 :
237 24434 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
238 19294 : return Ok(nblocks);
239 5140 : }
240 5140 :
241 5140 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
242 0 : && !self.get_rel_exists(tag, version, ctx).await?
243 : {
244 : // FIXME: Postgres sometimes calls smgrcreate() to create
245 : // FSM, and smgrnblocks() on it immediately afterwards,
246 : // without extending it. Tolerate that by claiming that
247 : // any non-existent FSM fork has size 0.
248 0 : return Ok(0);
249 5140 : }
250 5140 :
251 5140 : let key = rel_size_to_key(tag);
252 5140 : let mut buf = version.get(self, key, ctx).await?;
253 5136 : let nblocks = buf.get_u32_le();
254 5136 :
255 5136 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
256 5136 :
257 5136 : Ok(nblocks)
258 24434 : }
259 :
260 : /// Does relation exist?
261 6050 : pub(crate) async fn get_rel_exists(
262 6050 : &self,
263 6050 : tag: RelTag,
264 6050 : version: Version<'_>,
265 6050 : ctx: &RequestContext,
266 6050 : ) -> Result<bool, PageReconstructError> {
267 6050 : if tag.relnode == 0 {
268 0 : return Err(PageReconstructError::Other(
269 0 : RelationError::InvalidRelnode.into(),
270 0 : ));
271 6050 : }
272 :
273 : // first try to lookup relation in cache
274 6050 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
275 6032 : return Ok(true);
276 18 : }
277 18 : // fetch directory listing
278 18 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
279 18 : let buf = version.get(self, key, ctx).await?;
280 :
281 18 : match RelDirectory::des(&buf).context("deserialization failure") {
282 18 : Ok(dir) => {
283 18 : let exists = dir.rels.contains(&(tag.relnode, tag.forknum));
284 18 : Ok(exists)
285 : }
286 0 : Err(e) => Err(PageReconstructError::from(e)),
287 : }
288 6050 : }
289 :
290 : /// Get a list of all existing relations in given tablespace and database.
291 : ///
292 : /// # Cancel-Safety
293 : ///
294 : /// This method is cancellation-safe.
295 0 : pub(crate) async fn list_rels(
296 0 : &self,
297 0 : spcnode: Oid,
298 0 : dbnode: Oid,
299 0 : version: Version<'_>,
300 0 : ctx: &RequestContext,
301 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
302 0 : // fetch directory listing
303 0 : let key = rel_dir_to_key(spcnode, dbnode);
304 0 : let buf = version.get(self, key, ctx).await?;
305 :
306 0 : match RelDirectory::des(&buf).context("deserialization failure") {
307 0 : Ok(dir) => {
308 0 : let rels: HashSet<RelTag> =
309 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
310 0 : spcnode,
311 0 : dbnode,
312 0 : relnode: *relnode,
313 0 : forknum: *forknum,
314 0 : }));
315 0 :
316 0 : Ok(rels)
317 : }
318 0 : Err(e) => Err(PageReconstructError::from(e)),
319 : }
320 0 : }
321 :
322 : /// Get the whole SLRU segment
323 0 : pub(crate) async fn get_slru_segment(
324 0 : &self,
325 0 : kind: SlruKind,
326 0 : segno: u32,
327 0 : lsn: Lsn,
328 0 : ctx: &RequestContext,
329 0 : ) -> Result<Bytes, PageReconstructError> {
330 0 : let n_blocks = self
331 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
332 0 : .await?;
333 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
334 0 : for blkno in 0..n_blocks {
335 0 : let block = self
336 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
337 0 : .await?;
338 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
339 : }
340 0 : Ok(segment.freeze())
341 0 : }
342 :
343 : /// Look up given SLRU page version.
344 0 : pub(crate) async fn get_slru_page_at_lsn(
345 0 : &self,
346 0 : kind: SlruKind,
347 0 : segno: u32,
348 0 : blknum: BlockNumber,
349 0 : lsn: Lsn,
350 0 : ctx: &RequestContext,
351 0 : ) -> Result<Bytes, PageReconstructError> {
352 0 : let key = slru_block_to_key(kind, segno, blknum);
353 0 : self.get(key, lsn, ctx).await
354 0 : }
355 :
356 : /// Get size of an SLRU segment
357 0 : pub(crate) async fn get_slru_segment_size(
358 0 : &self,
359 0 : kind: SlruKind,
360 0 : segno: u32,
361 0 : version: Version<'_>,
362 0 : ctx: &RequestContext,
363 0 : ) -> Result<BlockNumber, PageReconstructError> {
364 0 : let key = slru_segment_size_to_key(kind, segno);
365 0 : let mut buf = version.get(self, key, ctx).await?;
366 0 : Ok(buf.get_u32_le())
367 0 : }
368 :
369 : /// Get size of an SLRU segment
370 0 : pub(crate) async fn get_slru_segment_exists(
371 0 : &self,
372 0 : kind: SlruKind,
373 0 : segno: u32,
374 0 : version: Version<'_>,
375 0 : ctx: &RequestContext,
376 0 : ) -> Result<bool, PageReconstructError> {
377 0 : // fetch directory listing
378 0 : let key = slru_dir_to_key(kind);
379 0 : let buf = version.get(self, key, ctx).await?;
380 :
381 0 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
382 0 : Ok(dir) => {
383 0 : let exists = dir.segments.contains(&segno);
384 0 : Ok(exists)
385 : }
386 0 : Err(e) => Err(PageReconstructError::from(e)),
387 : }
388 0 : }
389 :
390 : /// Locate LSN, such that all transactions that committed before
391 : /// 'search_timestamp' are visible, but nothing newer is.
392 : ///
393 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
394 : /// so it's not well defined which LSN you get if there were multiple commits
395 : /// "in flight" at that point in time.
396 : ///
397 0 : pub(crate) async fn find_lsn_for_timestamp(
398 0 : &self,
399 0 : search_timestamp: TimestampTz,
400 0 : cancel: &CancellationToken,
401 0 : ctx: &RequestContext,
402 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
403 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
404 0 : // We use this method to figure out the branching LSN for the new branch, but the
405 0 : // GC cutoff could be before the branching point and we cannot create a new branch
406 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
407 0 : // on the safe side.
408 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
409 0 : let max_lsn = self.get_last_record_lsn();
410 0 :
411 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
412 0 : // LSN divided by 8.
413 0 : let mut low = min_lsn.0 / 8;
414 0 : let mut high = max_lsn.0 / 8 + 1;
415 0 :
416 0 : let mut found_smaller = false;
417 0 : let mut found_larger = false;
418 0 : while low < high {
419 0 : if cancel.is_cancelled() {
420 0 : return Err(PageReconstructError::Cancelled);
421 0 : }
422 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
423 0 : let mid = (high + low) / 2;
424 :
425 0 : let cmp = self
426 0 : .is_latest_commit_timestamp_ge_than(
427 0 : search_timestamp,
428 0 : Lsn(mid * 8),
429 0 : &mut found_smaller,
430 0 : &mut found_larger,
431 0 : ctx,
432 0 : )
433 0 : .await?;
434 :
435 0 : if cmp {
436 0 : high = mid;
437 0 : } else {
438 0 : low = mid + 1;
439 0 : }
440 : }
441 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
442 : // so the LSN of the last commit record before or at `search_timestamp`.
443 : // Remove one from `low` to get `t`.
444 : //
445 : // FIXME: it would be better to get the LSN of the previous commit.
446 : // Otherwise, if you restore to the returned LSN, the database will
447 : // include physical changes from later commits that will be marked
448 : // as aborted, and will need to be vacuumed away.
449 0 : let commit_lsn = Lsn((low - 1) * 8);
450 0 : match (found_smaller, found_larger) {
451 : (false, false) => {
452 : // This can happen if no commit records have been processed yet, e.g.
453 : // just after importing a cluster.
454 0 : Ok(LsnForTimestamp::NoData(min_lsn))
455 : }
456 : (false, true) => {
457 : // Didn't find any commit timestamps smaller than the request
458 0 : Ok(LsnForTimestamp::Past(min_lsn))
459 : }
460 0 : (true, _) if commit_lsn < min_lsn => {
461 0 : // the search above did set found_smaller to true but it never increased the lsn.
462 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
463 0 : // below the min_lsn. We should never do that.
464 0 : Ok(LsnForTimestamp::Past(min_lsn))
465 : }
466 : (true, false) => {
467 : // Only found commits with timestamps smaller than the request.
468 : // It's still a valid case for branch creation, return it.
469 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
470 : // case, anyway.
471 0 : Ok(LsnForTimestamp::Future(commit_lsn))
472 : }
473 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
474 : }
475 0 : }
476 :
477 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
478 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
479 : ///
480 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
481 : /// with a smaller/larger timestamp.
482 : ///
483 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
484 0 : &self,
485 0 : search_timestamp: TimestampTz,
486 0 : probe_lsn: Lsn,
487 0 : found_smaller: &mut bool,
488 0 : found_larger: &mut bool,
489 0 : ctx: &RequestContext,
490 0 : ) -> Result<bool, PageReconstructError> {
491 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
492 0 : if timestamp >= search_timestamp {
493 0 : *found_larger = true;
494 0 : return ControlFlow::Break(true);
495 0 : } else {
496 0 : *found_smaller = true;
497 0 : }
498 0 : ControlFlow::Continue(())
499 0 : })
500 0 : .await
501 0 : }
502 :
503 : /// Obtain the possible timestamp range for the given lsn.
504 : ///
505 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
506 0 : pub(crate) async fn get_timestamp_for_lsn(
507 0 : &self,
508 0 : probe_lsn: Lsn,
509 0 : ctx: &RequestContext,
510 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
511 0 : let mut max: Option<TimestampTz> = None;
512 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
513 0 : if let Some(max_prev) = max {
514 0 : max = Some(max_prev.max(timestamp));
515 0 : } else {
516 0 : max = Some(timestamp);
517 0 : }
518 0 : ControlFlow::Continue(())
519 0 : })
520 0 : .await?;
521 :
522 0 : Ok(max)
523 0 : }
524 :
525 : /// Runs the given function on all the timestamps for a given lsn
526 : ///
527 : /// The return value is either given by the closure, or set to the `Default`
528 : /// impl's output.
529 0 : async fn map_all_timestamps<T: Default>(
530 0 : &self,
531 0 : probe_lsn: Lsn,
532 0 : ctx: &RequestContext,
533 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
534 0 : ) -> Result<T, PageReconstructError> {
535 0 : for segno in self
536 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
537 0 : .await?
538 : {
539 0 : let nblocks = self
540 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
541 0 : .await?;
542 0 : for blknum in (0..nblocks).rev() {
543 0 : let clog_page = self
544 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
545 0 : .await?;
546 :
547 0 : if clog_page.len() == BLCKSZ as usize + 8 {
548 0 : let mut timestamp_bytes = [0u8; 8];
549 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
550 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
551 0 :
552 0 : match f(timestamp) {
553 0 : ControlFlow::Break(b) => return Ok(b),
554 0 : ControlFlow::Continue(()) => (),
555 : }
556 0 : }
557 : }
558 : }
559 0 : Ok(Default::default())
560 0 : }
561 :
562 0 : pub(crate) async fn get_slru_keyspace(
563 0 : &self,
564 0 : version: Version<'_>,
565 0 : ctx: &RequestContext,
566 0 : ) -> Result<KeySpace, PageReconstructError> {
567 0 : let mut accum = KeySpaceAccum::new();
568 :
569 0 : for kind in SlruKind::iter() {
570 0 : let mut segments: Vec<u32> = self
571 0 : .list_slru_segments(kind, version, ctx)
572 0 : .await?
573 0 : .into_iter()
574 0 : .collect();
575 0 : segments.sort_unstable();
576 :
577 0 : for seg in segments {
578 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
579 :
580 0 : accum.add_range(
581 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
582 0 : );
583 : }
584 : }
585 :
586 0 : Ok(accum.to_keyspace())
587 0 : }
588 :
589 : /// Get a list of SLRU segments
590 0 : pub(crate) async fn list_slru_segments(
591 0 : &self,
592 0 : kind: SlruKind,
593 0 : version: Version<'_>,
594 0 : ctx: &RequestContext,
595 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
596 0 : // fetch directory entry
597 0 : let key = slru_dir_to_key(kind);
598 :
599 0 : let buf = version.get(self, key, ctx).await?;
600 0 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
601 0 : Ok(dir) => Ok(dir.segments),
602 0 : Err(e) => Err(PageReconstructError::from(e)),
603 : }
604 0 : }
605 :
606 0 : pub(crate) async fn get_relmap_file(
607 0 : &self,
608 0 : spcnode: Oid,
609 0 : dbnode: Oid,
610 0 : version: Version<'_>,
611 0 : ctx: &RequestContext,
612 0 : ) -> Result<Bytes, PageReconstructError> {
613 0 : let key = relmap_file_key(spcnode, dbnode);
614 :
615 0 : let buf = version.get(self, key, ctx).await?;
616 0 : Ok(buf)
617 0 : }
618 :
619 0 : pub(crate) async fn list_dbdirs(
620 0 : &self,
621 0 : lsn: Lsn,
622 0 : ctx: &RequestContext,
623 0 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
624 : // fetch directory entry
625 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
626 :
627 0 : match DbDirectory::des(&buf).context("deserialization failure") {
628 0 : Ok(dir) => Ok(dir.dbdirs),
629 0 : Err(e) => Err(PageReconstructError::from(e)),
630 : }
631 0 : }
632 :
633 0 : pub(crate) async fn get_twophase_file(
634 0 : &self,
635 0 : xid: TransactionId,
636 0 : lsn: Lsn,
637 0 : ctx: &RequestContext,
638 0 : ) -> Result<Bytes, PageReconstructError> {
639 0 : let key = twophase_file_key(xid);
640 0 : let buf = self.get(key, lsn, ctx).await?;
641 0 : Ok(buf)
642 0 : }
643 :
644 0 : pub(crate) async fn list_twophase_files(
645 0 : &self,
646 0 : lsn: Lsn,
647 0 : ctx: &RequestContext,
648 0 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
649 : // fetch directory entry
650 0 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
651 :
652 0 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
653 0 : Ok(dir) => Ok(dir.xids),
654 0 : Err(e) => Err(PageReconstructError::from(e)),
655 : }
656 0 : }
657 :
658 0 : pub(crate) async fn get_control_file(
659 0 : &self,
660 0 : lsn: Lsn,
661 0 : ctx: &RequestContext,
662 0 : ) -> Result<Bytes, PageReconstructError> {
663 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
664 0 : }
665 :
666 12 : pub(crate) async fn get_checkpoint(
667 12 : &self,
668 12 : lsn: Lsn,
669 12 : ctx: &RequestContext,
670 12 : ) -> Result<Bytes, PageReconstructError> {
671 12 : self.get(CHECKPOINT_KEY, lsn, ctx).await
672 12 : }
673 :
674 6 : async fn list_aux_files_v1(
675 6 : &self,
676 6 : lsn: Lsn,
677 6 : ctx: &RequestContext,
678 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
679 6 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
680 6 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
681 6 : Ok(dir) => Ok(dir.files),
682 0 : Err(e) => Err(PageReconstructError::from(e)),
683 : },
684 0 : Err(e) => {
685 0 : // This is expected: historical databases do not have the key.
686 0 : debug!("Failed to get info about AUX files: {}", e);
687 0 : Ok(HashMap::new())
688 : }
689 : }
690 6 : }
691 :
692 0 : async fn list_aux_files_v2(
693 0 : &self,
694 0 : lsn: Lsn,
695 0 : ctx: &RequestContext,
696 0 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
697 0 : let kv = self
698 0 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
699 0 : .await
700 0 : .context("scan")?;
701 0 : let mut result = HashMap::new();
702 0 : for (_, v) in kv {
703 0 : let v = v.context("get value")?;
704 0 : let v = aux_file::decode_file_value_bytes(&v).context("value decode")?;
705 0 : for (fname, content) in v {
706 0 : result.insert(fname, content);
707 0 : }
708 : }
709 0 : Ok(result)
710 0 : }
711 :
712 6 : pub(crate) async fn list_aux_files(
713 6 : &self,
714 6 : lsn: Lsn,
715 6 : ctx: &RequestContext,
716 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
717 6 : match self.get_switch_aux_file_policy() {
718 6 : AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await,
719 0 : AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await,
720 : AuxFilePolicy::CrossValidation => {
721 0 : let v1_result = self.list_aux_files_v1(lsn, ctx).await;
722 0 : let v2_result = self.list_aux_files_v2(lsn, ctx).await;
723 0 : match (v1_result, v2_result) {
724 0 : (Ok(v1), Ok(v2)) => {
725 0 : if v1 != v2 {
726 0 : tracing::error!(
727 0 : "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
728 : );
729 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
730 0 : "unmatched aux file v1 v2 result"
731 0 : )));
732 0 : }
733 0 : Ok(v1)
734 : }
735 0 : (Ok(_), Err(v2)) => {
736 0 : tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
737 0 : Err(v2)
738 : }
739 0 : (Err(v1), Ok(_)) => {
740 0 : tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
741 0 : Err(v1)
742 : }
743 0 : (Err(_), Err(v2)) => Err(v2),
744 : }
745 : }
746 : }
747 6 : }
748 :
749 : /// Does the same as get_current_logical_size but counted on demand.
750 : /// Used to initialize the logical size tracking on startup.
751 : ///
752 : /// Only relation blocks are counted currently. That excludes metadata,
753 : /// SLRUs, twophase files etc.
754 : ///
755 : /// # Cancel-Safety
756 : ///
757 : /// This method is cancellation-safe.
758 0 : pub async fn get_current_logical_size_non_incremental(
759 0 : &self,
760 0 : lsn: Lsn,
761 0 : ctx: &RequestContext,
762 0 : ) -> Result<u64, CalculateLogicalSizeError> {
763 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
764 :
765 : // Fetch list of database dirs and iterate them
766 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
767 0 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
768 :
769 0 : let mut total_size: u64 = 0;
770 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
771 0 : for rel in self
772 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
773 0 : .await?
774 : {
775 0 : if self.cancel.is_cancelled() {
776 0 : return Err(CalculateLogicalSizeError::Cancelled);
777 0 : }
778 0 : let relsize_key = rel_size_to_key(rel);
779 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
780 0 : let relsize = buf.get_u32_le();
781 0 :
782 0 : total_size += relsize as u64;
783 : }
784 : }
785 0 : Ok(total_size * BLCKSZ as u64)
786 0 : }
787 :
788 : ///
789 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
790 : /// Anything that's not listed maybe removed from the underlying storage (from
791 : /// that LSN forwards).
792 : ///
793 : /// The return value is (dense keyspace, sparse keyspace).
794 204 : pub(crate) async fn collect_keyspace(
795 204 : &self,
796 204 : lsn: Lsn,
797 204 : ctx: &RequestContext,
798 204 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
799 204 : // Iterate through key ranges, greedily packing them into partitions
800 204 : let mut result = KeySpaceAccum::new();
801 204 :
802 204 : // The dbdir metadata always exists
803 204 : result.add_key(DBDIR_KEY);
804 :
805 : // Fetch list of database dirs and iterate them
806 2292 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
807 204 : let dbdir = DbDirectory::des(&buf)?;
808 :
809 204 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
810 204 : dbs.sort_unstable();
811 204 : for (spcnode, dbnode) in dbs {
812 0 : result.add_key(relmap_file_key(spcnode, dbnode));
813 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
814 :
815 0 : let mut rels: Vec<RelTag> = self
816 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
817 0 : .await?
818 0 : .into_iter()
819 0 : .collect();
820 0 : rels.sort_unstable();
821 0 : for rel in rels {
822 0 : let relsize_key = rel_size_to_key(rel);
823 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
824 0 : let relsize = buf.get_u32_le();
825 0 :
826 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
827 0 : result.add_key(relsize_key);
828 : }
829 : }
830 :
831 : // Iterate SLRUs next
832 612 : for kind in [
833 204 : SlruKind::Clog,
834 204 : SlruKind::MultiXactMembers,
835 204 : SlruKind::MultiXactOffsets,
836 : ] {
837 612 : let slrudir_key = slru_dir_to_key(kind);
838 612 : result.add_key(slrudir_key);
839 7882 : let buf = self.get(slrudir_key, lsn, ctx).await?;
840 612 : let dir = SlruSegmentDirectory::des(&buf)?;
841 612 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
842 612 : segments.sort_unstable();
843 612 : for segno in segments {
844 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
845 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
846 0 : let segsize = buf.get_u32_le();
847 0 :
848 0 : result.add_range(
849 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
850 0 : );
851 0 : result.add_key(segsize_key);
852 : }
853 : }
854 :
855 : // Then pg_twophase
856 204 : result.add_key(TWOPHASEDIR_KEY);
857 3019 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
858 204 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
859 204 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
860 204 : xids.sort_unstable();
861 204 : for xid in xids {
862 0 : result.add_key(twophase_file_key(xid));
863 0 : }
864 :
865 204 : result.add_key(CONTROLFILE_KEY);
866 204 : result.add_key(CHECKPOINT_KEY);
867 204 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
868 102 : result.add_key(AUX_FILES_KEY);
869 102 : }
870 :
871 204 : Ok((
872 204 : result.to_keyspace(),
873 204 : /* AUX sparse key space */
874 204 : SparseKeySpace(KeySpace::single(Key::metadata_aux_key_range())),
875 204 : ))
876 204 : }
877 :
878 : /// Get cached size of relation if it not updated after specified LSN
879 448540 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
880 448540 : let rel_size_cache = self.rel_size_cache.read().unwrap();
881 448540 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
882 448518 : if lsn >= *cached_lsn {
883 443372 : return Some(*nblocks);
884 5146 : }
885 22 : }
886 5168 : None
887 448540 : }
888 :
889 : /// Update cached relation size if there is no more recent update
890 5136 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
891 5136 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
892 5136 :
893 5136 : if lsn < rel_size_cache.complete_as_of {
894 : // Do not cache old values. It's safe to cache the size on read, as long as
895 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
896 : // never evict values from the cache, so if the relation size changed after
897 : // 'lsn', the new value is already in the cache.
898 0 : return;
899 5136 : }
900 5136 :
901 5136 : match rel_size_cache.map.entry(tag) {
902 5136 : hash_map::Entry::Occupied(mut entry) => {
903 5136 : let cached_lsn = entry.get_mut();
904 5136 : if lsn >= cached_lsn.0 {
905 0 : *cached_lsn = (lsn, nblocks);
906 5136 : }
907 : }
908 0 : hash_map::Entry::Vacant(entry) => {
909 0 : entry.insert((lsn, nblocks));
910 0 : }
911 : }
912 5136 : }
913 :
914 : /// Store cached relation size
915 288732 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
916 288732 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
917 288732 : rel_size_cache.map.insert(tag, (lsn, nblocks));
918 288732 : }
919 :
920 : /// Remove cached relation size
921 2 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
922 2 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
923 2 : rel_size_cache.map.remove(tag);
924 2 : }
925 : }
926 :
927 : /// DatadirModification represents an operation to ingest an atomic set of
928 : /// updates to the repository. It is created by the 'begin_record'
929 : /// function. It is called for each WAL record, so that all the modifications
930 : /// by a one WAL record appear atomic.
931 : pub struct DatadirModification<'a> {
932 : /// The timeline this modification applies to. You can access this to
933 : /// read the state, but note that any pending updates are *not* reflected
934 : /// in the state in 'tline' yet.
935 : pub tline: &'a Timeline,
936 :
937 : /// Current LSN of the modification
938 : lsn: Lsn,
939 :
940 : // The modifications are not applied directly to the underlying key-value store.
941 : // The put-functions add the modifications here, and they are flushed to the
942 : // underlying key-value store by the 'finish' function.
943 : pending_lsns: Vec<Lsn>,
944 : pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
945 : pending_deletions: Vec<(Range<Key>, Lsn)>,
946 : pending_nblocks: i64,
947 :
948 : /// For special "directory" keys that store key-value maps, track the size of the map
949 : /// if it was updated in this modification.
950 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
951 : }
952 :
953 : impl<'a> DatadirModification<'a> {
954 : /// Get the current lsn
955 418056 : pub(crate) fn get_lsn(&self) -> Lsn {
956 418056 : self.lsn
957 418056 : }
958 :
959 : /// Set the current lsn
960 145858 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
961 145858 : ensure!(
962 145858 : lsn >= self.lsn,
963 0 : "setting an older lsn {} than {} is not allowed",
964 : lsn,
965 : self.lsn
966 : );
967 145858 : if lsn > self.lsn {
968 145858 : self.pending_lsns.push(self.lsn);
969 145858 : self.lsn = lsn;
970 145858 : }
971 145858 : Ok(())
972 145858 : }
973 :
974 : /// Initialize a completely new repository.
975 : ///
976 : /// This inserts the directory metadata entries that are assumed to
977 : /// always exist.
978 104 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
979 104 : let buf = DbDirectory::ser(&DbDirectory {
980 104 : dbdirs: HashMap::new(),
981 104 : })?;
982 104 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
983 104 : self.put(DBDIR_KEY, Value::Image(buf.into()));
984 104 :
985 104 : // Create AuxFilesDirectory
986 104 : self.init_aux_dir()?;
987 :
988 104 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
989 104 : xids: HashSet::new(),
990 104 : })?;
991 104 : self.pending_directory_entries
992 104 : .push((DirectoryKind::TwoPhase, 0));
993 104 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
994 :
995 104 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
996 104 : let empty_dir = Value::Image(buf);
997 104 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
998 104 : self.pending_directory_entries
999 104 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1000 104 : self.put(
1001 104 : slru_dir_to_key(SlruKind::MultiXactMembers),
1002 104 : empty_dir.clone(),
1003 104 : );
1004 104 : self.pending_directory_entries
1005 104 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1006 104 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1007 104 : self.pending_directory_entries
1008 104 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1009 104 :
1010 104 : Ok(())
1011 104 : }
1012 :
1013 : #[cfg(test)]
1014 102 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1015 102 : self.init_empty()?;
1016 102 : self.put_control_file(bytes::Bytes::from_static(
1017 102 : b"control_file contents do not matter",
1018 102 : ))
1019 102 : .context("put_control_file")?;
1020 102 : self.put_checkpoint(bytes::Bytes::from_static(
1021 102 : b"checkpoint_file contents do not matter",
1022 102 : ))
1023 102 : .context("put_checkpoint_file")?;
1024 102 : Ok(())
1025 102 : }
1026 :
1027 : /// Put a new page version that can be constructed from a WAL record
1028 : ///
1029 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1030 : /// current end-of-file. It's up to the caller to check that the relation size
1031 : /// matches the blocks inserted!
1032 145630 : pub fn put_rel_wal_record(
1033 145630 : &mut self,
1034 145630 : rel: RelTag,
1035 145630 : blknum: BlockNumber,
1036 145630 : rec: NeonWalRecord,
1037 145630 : ) -> anyhow::Result<()> {
1038 145630 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1039 145630 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1040 145630 : Ok(())
1041 145630 : }
1042 :
1043 : // Same, but for an SLRU.
1044 8 : pub fn put_slru_wal_record(
1045 8 : &mut self,
1046 8 : kind: SlruKind,
1047 8 : segno: u32,
1048 8 : blknum: BlockNumber,
1049 8 : rec: NeonWalRecord,
1050 8 : ) -> anyhow::Result<()> {
1051 8 : self.put(
1052 8 : slru_block_to_key(kind, segno, blknum),
1053 8 : Value::WalRecord(rec),
1054 8 : );
1055 8 : Ok(())
1056 8 : }
1057 :
1058 : /// Like put_wal_record, but with ready-made image of the page.
1059 280864 : pub fn put_rel_page_image(
1060 280864 : &mut self,
1061 280864 : rel: RelTag,
1062 280864 : blknum: BlockNumber,
1063 280864 : img: Bytes,
1064 280864 : ) -> anyhow::Result<()> {
1065 280864 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1066 280864 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1067 280864 : Ok(())
1068 280864 : }
1069 :
1070 6 : pub fn put_slru_page_image(
1071 6 : &mut self,
1072 6 : kind: SlruKind,
1073 6 : segno: u32,
1074 6 : blknum: BlockNumber,
1075 6 : img: Bytes,
1076 6 : ) -> anyhow::Result<()> {
1077 6 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
1078 6 : Ok(())
1079 6 : }
1080 :
1081 : /// Store a relmapper file (pg_filenode.map) in the repository
1082 16 : pub async fn put_relmap_file(
1083 16 : &mut self,
1084 16 : spcnode: Oid,
1085 16 : dbnode: Oid,
1086 16 : img: Bytes,
1087 16 : ctx: &RequestContext,
1088 16 : ) -> anyhow::Result<()> {
1089 : // Add it to the directory (if it doesn't exist already)
1090 16 : let buf = self.get(DBDIR_KEY, ctx).await?;
1091 16 : let mut dbdir = DbDirectory::des(&buf)?;
1092 :
1093 16 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1094 16 : if r.is_none() || r == Some(false) {
1095 : // The dbdir entry didn't exist, or it contained a
1096 : // 'false'. The 'insert' call already updated it with
1097 : // 'true', now write the updated 'dbdirs' map back.
1098 16 : let buf = DbDirectory::ser(&dbdir)?;
1099 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1100 16 :
1101 16 : // Create AuxFilesDirectory as well
1102 16 : self.init_aux_dir()?;
1103 0 : }
1104 16 : if r.is_none() {
1105 8 : // Create RelDirectory
1106 8 : let buf = RelDirectory::ser(&RelDirectory {
1107 8 : rels: HashSet::new(),
1108 8 : })?;
1109 8 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1110 8 : self.put(
1111 8 : rel_dir_to_key(spcnode, dbnode),
1112 8 : Value::Image(Bytes::from(buf)),
1113 8 : );
1114 8 : }
1115 :
1116 16 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1117 16 : Ok(())
1118 16 : }
1119 :
1120 0 : pub async fn put_twophase_file(
1121 0 : &mut self,
1122 0 : xid: TransactionId,
1123 0 : img: Bytes,
1124 0 : ctx: &RequestContext,
1125 0 : ) -> anyhow::Result<()> {
1126 : // Add it to the directory entry
1127 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1128 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1129 0 : if !dir.xids.insert(xid) {
1130 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1131 0 : }
1132 0 : self.pending_directory_entries
1133 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1134 0 : self.put(
1135 0 : TWOPHASEDIR_KEY,
1136 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1137 : );
1138 :
1139 0 : self.put(twophase_file_key(xid), Value::Image(img));
1140 0 : Ok(())
1141 0 : }
1142 :
1143 104 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1144 104 : self.put(CONTROLFILE_KEY, Value::Image(img));
1145 104 : Ok(())
1146 104 : }
1147 :
1148 118 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1149 118 : self.put(CHECKPOINT_KEY, Value::Image(img));
1150 118 : Ok(())
1151 118 : }
1152 :
1153 0 : pub async fn drop_dbdir(
1154 0 : &mut self,
1155 0 : spcnode: Oid,
1156 0 : dbnode: Oid,
1157 0 : ctx: &RequestContext,
1158 0 : ) -> anyhow::Result<()> {
1159 0 : let total_blocks = self
1160 0 : .tline
1161 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1162 0 : .await?;
1163 :
1164 : // Remove entry from dbdir
1165 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1166 0 : let mut dir = DbDirectory::des(&buf)?;
1167 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1168 0 : let buf = DbDirectory::ser(&dir)?;
1169 0 : self.pending_directory_entries
1170 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1171 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1172 : } else {
1173 0 : warn!(
1174 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1175 : spcnode, dbnode
1176 : );
1177 : }
1178 :
1179 : // Update logical database size.
1180 0 : self.pending_nblocks -= total_blocks as i64;
1181 0 :
1182 0 : // Delete all relations and metadata files for the spcnode/dnode
1183 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1184 0 : Ok(())
1185 0 : }
1186 :
1187 : /// Create a relation fork.
1188 : ///
1189 : /// 'nblocks' is the initial size.
1190 1920 : pub async fn put_rel_creation(
1191 1920 : &mut self,
1192 1920 : rel: RelTag,
1193 1920 : nblocks: BlockNumber,
1194 1920 : ctx: &RequestContext,
1195 1920 : ) -> Result<(), RelationError> {
1196 1920 : if rel.relnode == 0 {
1197 0 : return Err(RelationError::InvalidRelnode);
1198 1920 : }
1199 : // It's possible that this is the first rel for this db in this
1200 : // tablespace. Create the reldir entry for it if so.
1201 1920 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1202 1920 : .context("deserialize db")?;
1203 1920 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1204 1920 : let mut rel_dir =
1205 1920 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1206 : // Didn't exist. Update dbdir
1207 8 : e.insert(false);
1208 8 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1209 8 : self.pending_directory_entries
1210 8 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1211 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1212 8 :
1213 8 : // and create the RelDirectory
1214 8 : RelDirectory::default()
1215 : } else {
1216 : // reldir already exists, fetch it
1217 1912 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1218 1912 : .context("deserialize db")?
1219 : };
1220 :
1221 : // Add the new relation to the rel directory entry, and write it back
1222 1920 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1223 0 : return Err(RelationError::AlreadyExists);
1224 1920 : }
1225 1920 :
1226 1920 : self.pending_directory_entries
1227 1920 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1228 1920 :
1229 1920 : self.put(
1230 1920 : rel_dir_key,
1231 1920 : Value::Image(Bytes::from(
1232 1920 : RelDirectory::ser(&rel_dir).context("serialize")?,
1233 : )),
1234 : );
1235 :
1236 : // Put size
1237 1920 : let size_key = rel_size_to_key(rel);
1238 1920 : let buf = nblocks.to_le_bytes();
1239 1920 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1240 1920 :
1241 1920 : self.pending_nblocks += nblocks as i64;
1242 1920 :
1243 1920 : // Update relation size cache
1244 1920 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1245 1920 :
1246 1920 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1247 1920 : // caller.
1248 1920 : Ok(())
1249 1920 : }
1250 :
1251 : /// Truncate relation
1252 6012 : pub async fn put_rel_truncation(
1253 6012 : &mut self,
1254 6012 : rel: RelTag,
1255 6012 : nblocks: BlockNumber,
1256 6012 : ctx: &RequestContext,
1257 6012 : ) -> anyhow::Result<()> {
1258 6012 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1259 6012 : if self
1260 6012 : .tline
1261 6012 : .get_rel_exists(rel, Version::Modified(self), ctx)
1262 0 : .await?
1263 : {
1264 6012 : let size_key = rel_size_to_key(rel);
1265 : // Fetch the old size first
1266 6012 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1267 6012 :
1268 6012 : // Update the entry with the new size.
1269 6012 : let buf = nblocks.to_le_bytes();
1270 6012 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1271 6012 :
1272 6012 : // Update relation size cache
1273 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1274 6012 :
1275 6012 : // Update relation size cache
1276 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1277 6012 :
1278 6012 : // Update logical database size.
1279 6012 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1280 0 : }
1281 6012 : Ok(())
1282 6012 : }
1283 :
1284 : /// Extend relation
1285 : /// If new size is smaller, do nothing.
1286 276680 : pub async fn put_rel_extend(
1287 276680 : &mut self,
1288 276680 : rel: RelTag,
1289 276680 : nblocks: BlockNumber,
1290 276680 : ctx: &RequestContext,
1291 276680 : ) -> anyhow::Result<()> {
1292 276680 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1293 :
1294 : // Put size
1295 276680 : let size_key = rel_size_to_key(rel);
1296 276680 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1297 276680 :
1298 276680 : // only extend relation here. never decrease the size
1299 276680 : if nblocks > old_size {
1300 274788 : let buf = nblocks.to_le_bytes();
1301 274788 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1302 274788 :
1303 274788 : // Update relation size cache
1304 274788 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1305 274788 :
1306 274788 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1307 274788 : }
1308 276680 : Ok(())
1309 276680 : }
1310 :
1311 : /// Drop a relation.
1312 2 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1313 2 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1314 :
1315 : // Remove it from the directory entry
1316 2 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1317 2 : let buf = self.get(dir_key, ctx).await?;
1318 2 : let mut dir = RelDirectory::des(&buf)?;
1319 :
1320 2 : self.pending_directory_entries
1321 2 : .push((DirectoryKind::Rel, dir.rels.len()));
1322 2 :
1323 2 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1324 2 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1325 : } else {
1326 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1327 : }
1328 :
1329 : // update logical size
1330 2 : let size_key = rel_size_to_key(rel);
1331 2 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1332 2 : self.pending_nblocks -= old_size as i64;
1333 2 :
1334 2 : // Remove enty from relation size cache
1335 2 : self.tline.remove_cached_rel_size(&rel);
1336 2 :
1337 2 : // Delete size entry, as well as all blocks
1338 2 : self.delete(rel_key_range(rel));
1339 2 :
1340 2 : Ok(())
1341 2 : }
1342 :
1343 6 : pub async fn put_slru_segment_creation(
1344 6 : &mut self,
1345 6 : kind: SlruKind,
1346 6 : segno: u32,
1347 6 : nblocks: BlockNumber,
1348 6 : ctx: &RequestContext,
1349 6 : ) -> anyhow::Result<()> {
1350 6 : // Add it to the directory entry
1351 6 : let dir_key = slru_dir_to_key(kind);
1352 6 : let buf = self.get(dir_key, ctx).await?;
1353 6 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1354 :
1355 6 : if !dir.segments.insert(segno) {
1356 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1357 6 : }
1358 6 : self.pending_directory_entries
1359 6 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1360 6 : self.put(
1361 6 : dir_key,
1362 6 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1363 : );
1364 :
1365 : // Put size
1366 6 : let size_key = slru_segment_size_to_key(kind, segno);
1367 6 : let buf = nblocks.to_le_bytes();
1368 6 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1369 6 :
1370 6 : // even if nblocks > 0, we don't insert any actual blocks here
1371 6 :
1372 6 : Ok(())
1373 6 : }
1374 :
1375 : /// Extend SLRU segment
1376 0 : pub fn put_slru_extend(
1377 0 : &mut self,
1378 0 : kind: SlruKind,
1379 0 : segno: u32,
1380 0 : nblocks: BlockNumber,
1381 0 : ) -> anyhow::Result<()> {
1382 0 : // Put size
1383 0 : let size_key = slru_segment_size_to_key(kind, segno);
1384 0 : let buf = nblocks.to_le_bytes();
1385 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1386 0 : Ok(())
1387 0 : }
1388 :
1389 : /// This method is used for marking truncated SLRU files
1390 0 : pub async fn drop_slru_segment(
1391 0 : &mut self,
1392 0 : kind: SlruKind,
1393 0 : segno: u32,
1394 0 : ctx: &RequestContext,
1395 0 : ) -> anyhow::Result<()> {
1396 0 : // Remove it from the directory entry
1397 0 : let dir_key = slru_dir_to_key(kind);
1398 0 : let buf = self.get(dir_key, ctx).await?;
1399 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1400 :
1401 0 : if !dir.segments.remove(&segno) {
1402 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1403 0 : }
1404 0 : self.pending_directory_entries
1405 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1406 0 : self.put(
1407 0 : dir_key,
1408 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1409 : );
1410 :
1411 : // Delete size entry, as well as all blocks
1412 0 : self.delete(slru_segment_key_range(kind, segno));
1413 0 :
1414 0 : Ok(())
1415 0 : }
1416 :
1417 : /// Drop a relmapper file (pg_filenode.map)
1418 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1419 0 : // TODO
1420 0 : Ok(())
1421 0 : }
1422 :
1423 : /// This method is used for marking truncated SLRU files
1424 0 : pub async fn drop_twophase_file(
1425 0 : &mut self,
1426 0 : xid: TransactionId,
1427 0 : ctx: &RequestContext,
1428 0 : ) -> anyhow::Result<()> {
1429 : // Remove it from the directory entry
1430 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1431 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1432 :
1433 0 : if !dir.xids.remove(&xid) {
1434 0 : warn!("twophase file for xid {} does not exist", xid);
1435 0 : }
1436 0 : self.pending_directory_entries
1437 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1438 0 : self.put(
1439 0 : TWOPHASEDIR_KEY,
1440 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1441 : );
1442 :
1443 : // Delete it
1444 0 : self.delete(twophase_key_range(xid));
1445 0 :
1446 0 : Ok(())
1447 0 : }
1448 :
1449 120 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1450 120 : if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
1451 0 : return Ok(());
1452 120 : }
1453 120 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1454 120 : files: HashMap::new(),
1455 120 : })?;
1456 120 : self.pending_directory_entries
1457 120 : .push((DirectoryKind::AuxFiles, 0));
1458 120 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1459 120 : Ok(())
1460 120 : }
1461 :
1462 12 : pub async fn put_file(
1463 12 : &mut self,
1464 12 : path: &str,
1465 12 : content: &[u8],
1466 12 : ctx: &RequestContext,
1467 12 : ) -> anyhow::Result<()> {
1468 12 : let policy = self.tline.get_switch_aux_file_policy();
1469 12 : if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
1470 0 : let key = aux_file::encode_aux_file_key(path);
1471 : // retrieve the key from the engine
1472 0 : let old_val = match self.get(key, ctx).await {
1473 0 : Ok(val) => Some(val),
1474 0 : Err(PageReconstructError::MissingKey(_)) => None,
1475 0 : Err(e) => return Err(e.into()),
1476 : };
1477 0 : let files = if let Some(ref old_val) = old_val {
1478 0 : aux_file::decode_file_value(old_val)?
1479 : } else {
1480 0 : Vec::new()
1481 : };
1482 0 : let new_files = if content.is_empty() {
1483 0 : files
1484 0 : .into_iter()
1485 0 : .filter(|(p, _)| &path != p)
1486 0 : .collect::<Vec<_>>()
1487 : } else {
1488 0 : files
1489 0 : .into_iter()
1490 0 : .filter(|(p, _)| &path != p)
1491 0 : .chain(std::iter::once((path, content)))
1492 0 : .collect::<Vec<_>>()
1493 : };
1494 0 : let new_val = aux_file::encode_file_value(&new_files)?;
1495 0 : self.put(key, Value::Image(new_val.into()));
1496 12 : }
1497 :
1498 12 : if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
1499 12 : let file_path = path.to_string();
1500 12 : let content = if content.is_empty() {
1501 2 : None
1502 : } else {
1503 10 : Some(Bytes::copy_from_slice(content))
1504 : };
1505 :
1506 : let n_files;
1507 12 : let mut aux_files = self.tline.aux_files.lock().await;
1508 12 : if let Some(mut dir) = aux_files.dir.take() {
1509 : // We already updated aux files in `self`: emit a delta and update our latest value.
1510 8 : dir.upsert(file_path.clone(), content.clone());
1511 8 : n_files = dir.files.len();
1512 8 : if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
1513 0 : self.put(
1514 0 : AUX_FILES_KEY,
1515 0 : Value::Image(Bytes::from(
1516 0 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1517 : )),
1518 : );
1519 0 : aux_files.n_deltas = 0;
1520 8 : } else {
1521 8 : self.put(
1522 8 : AUX_FILES_KEY,
1523 8 : Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
1524 8 : );
1525 8 : aux_files.n_deltas += 1;
1526 8 : }
1527 8 : aux_files.dir = Some(dir);
1528 : } else {
1529 : // Check if the AUX_FILES_KEY is initialized
1530 4 : match self.get(AUX_FILES_KEY, ctx).await {
1531 0 : Ok(dir_bytes) => {
1532 0 : let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
1533 : // Key is already set, we may append a delta
1534 0 : self.put(
1535 0 : AUX_FILES_KEY,
1536 0 : Value::WalRecord(NeonWalRecord::AuxFile {
1537 0 : file_path: file_path.clone(),
1538 0 : content: content.clone(),
1539 0 : }),
1540 0 : );
1541 0 : dir.upsert(file_path, content);
1542 0 : n_files = dir.files.len();
1543 0 : aux_files.dir = Some(dir);
1544 : }
1545 : Err(
1546 0 : e @ (PageReconstructError::AncestorStopping(_)
1547 : | PageReconstructError::Cancelled
1548 : | PageReconstructError::AncestorLsnTimeout(_)),
1549 : ) => {
1550 : // Important that we do not interpret a shutdown error as "not found" and thereby
1551 : // reset the map.
1552 0 : return Err(e.into());
1553 : }
1554 : // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
1555 : // the original code assumes all other errors are missing keys. Therefore, we keep the code path
1556 : // the same for now, though in theory, we should only match the `MissingKey` variant.
1557 : Err(
1558 : PageReconstructError::Other(_)
1559 : | PageReconstructError::WalRedo(_)
1560 : | PageReconstructError::MissingKey { .. },
1561 : ) => {
1562 : // Key is missing, we must insert an image as the basis for subsequent deltas.
1563 :
1564 4 : let mut dir = AuxFilesDirectory {
1565 4 : files: HashMap::new(),
1566 4 : };
1567 4 : dir.upsert(file_path, content);
1568 4 : self.put(
1569 4 : AUX_FILES_KEY,
1570 4 : Value::Image(Bytes::from(
1571 4 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1572 : )),
1573 : );
1574 4 : n_files = 1;
1575 4 : aux_files.dir = Some(dir);
1576 : }
1577 : }
1578 : }
1579 :
1580 12 : self.pending_directory_entries
1581 12 : .push((DirectoryKind::AuxFiles, n_files));
1582 0 : }
1583 :
1584 12 : Ok(())
1585 12 : }
1586 :
1587 : ///
1588 : /// Flush changes accumulated so far to the underlying repository.
1589 : ///
1590 : /// Usually, changes made in DatadirModification are atomic, but this allows
1591 : /// you to flush them to the underlying repository before the final `commit`.
1592 : /// That allows to free up the memory used to hold the pending changes.
1593 : ///
1594 : /// Currently only used during bulk import of a data directory. In that
1595 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1596 : /// whole import fails and the timeline will be deleted anyway.
1597 : /// (Or to be precise, it will be left behind for debugging purposes and
1598 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1599 : ///
1600 : /// Note: A consequence of flushing the pending operations is that they
1601 : /// won't be visible to subsequent operations until `commit`. The function
1602 : /// retains all the metadata, but data pages are flushed. That's again OK
1603 : /// for bulk import, where you are just loading data pages and won't try to
1604 : /// modify the same pages twice.
1605 1930 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1606 1930 : // Unless we have accumulated a decent amount of changes, it's not worth it
1607 1930 : // to scan through the pending_updates list.
1608 1930 : let pending_nblocks = self.pending_nblocks;
1609 1930 : if pending_nblocks < 10000 {
1610 1930 : return Ok(());
1611 0 : }
1612 :
1613 0 : let mut writer = self.tline.writer().await;
1614 :
1615 : // Flush relation and SLRU data blocks, keep metadata.
1616 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1617 0 : for (key, values) in self.pending_updates.drain() {
1618 0 : for (lsn, value) in values {
1619 0 : if is_rel_block_key(&key) || is_slru_block_key(key) {
1620 : // This bails out on first error without modifying pending_updates.
1621 : // That's Ok, cf this function's doc comment.
1622 0 : writer.put(key, lsn, &value, ctx).await?;
1623 0 : } else {
1624 0 : retained_pending_updates
1625 0 : .entry(key)
1626 0 : .or_default()
1627 0 : .push((lsn, value));
1628 0 : }
1629 : }
1630 : }
1631 :
1632 0 : self.pending_updates = retained_pending_updates;
1633 0 :
1634 0 : if pending_nblocks != 0 {
1635 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1636 0 : self.pending_nblocks = 0;
1637 0 : }
1638 :
1639 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1640 0 : writer.update_directory_entries_count(kind, count as u64);
1641 0 : }
1642 :
1643 0 : Ok(())
1644 1930 : }
1645 :
1646 : ///
1647 : /// Finish this atomic update, writing all the updated keys to the
1648 : /// underlying timeline.
1649 : /// All the modifications in this atomic update are stamped by the specified LSN.
1650 : ///
1651 742986 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1652 742986 : let mut writer = self.tline.writer().await;
1653 :
1654 742986 : let timer = WAL_INGEST.time_spent_on_ingest.start_timer();
1655 742986 :
1656 742986 : let pending_nblocks = self.pending_nblocks;
1657 742986 : self.pending_nblocks = 0;
1658 742986 :
1659 742986 : if !self.pending_updates.is_empty() {
1660 : // The put_batch call below expects expects the inputs to be sorted by Lsn,
1661 : // so we do that first.
1662 413970 : let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
1663 413970 : self.pending_updates
1664 413970 : .drain()
1665 699924 : .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
1666 413970 : .kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
1667 413970 : VecMapOrdering::GreaterOrEqual,
1668 413970 : );
1669 413970 :
1670 413970 : writer.put_batch(lsn_ordered_batch, ctx).await?;
1671 329016 : }
1672 :
1673 742986 : if !self.pending_deletions.is_empty() {
1674 2 : writer.delete_batch(&self.pending_deletions).await?;
1675 2 : self.pending_deletions.clear();
1676 742984 : }
1677 :
1678 742986 : self.pending_lsns.push(self.lsn);
1679 888844 : for pending_lsn in self.pending_lsns.drain(..) {
1680 888844 : // Ideally, we should be able to call writer.finish_write() only once
1681 888844 : // with the highest LSN. However, the last_record_lsn variable in the
1682 888844 : // timeline keeps track of the latest LSN and the immediate previous LSN
1683 888844 : // so we need to record every LSN to not leave a gap between them.
1684 888844 : writer.finish_write(pending_lsn);
1685 888844 : }
1686 :
1687 742986 : if pending_nblocks != 0 {
1688 270570 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1689 472416 : }
1690 :
1691 742986 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1692 2596 : writer.update_directory_entries_count(kind, count as u64);
1693 2596 : }
1694 :
1695 742986 : timer.observe_duration();
1696 742986 :
1697 742986 : Ok(())
1698 742986 : }
1699 :
1700 291704 : pub(crate) fn len(&self) -> usize {
1701 291704 : self.pending_updates.len() + self.pending_deletions.len()
1702 291704 : }
1703 :
1704 : // Internal helper functions to batch the modifications
1705 :
1706 286564 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1707 : // Have we already updated the same key? Read the latest pending updated
1708 : // version in that case.
1709 : //
1710 : // Note: we don't check pending_deletions. It is an error to request a
1711 : // value that has been removed, deletion only avoids leaking storage.
1712 286564 : if let Some(values) = self.pending_updates.get(&key) {
1713 15928 : if let Some((_, value)) = values.last() {
1714 15928 : return if let Value::Image(img) = value {
1715 15928 : Ok(img.clone())
1716 : } else {
1717 : // Currently, we never need to read back a WAL record that we
1718 : // inserted in the same "transaction". All the metadata updates
1719 : // work directly with Images, and we never need to read actual
1720 : // data pages. We could handle this if we had to, by calling
1721 : // the walredo manager, but let's keep it simple for now.
1722 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1723 0 : "unexpected pending WAL record"
1724 0 : )))
1725 : };
1726 0 : }
1727 270636 : }
1728 270636 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1729 270636 : self.tline.get(key, lsn, ctx).await
1730 286564 : }
1731 :
1732 712084 : fn put(&mut self, key: Key, val: Value) {
1733 712084 : let values = self.pending_updates.entry(key).or_default();
1734 : // Replace the previous value if it exists at the same lsn
1735 712084 : if let Some((last_lsn, last_value)) = values.last_mut() {
1736 12166 : if *last_lsn == self.lsn {
1737 12160 : *last_value = val;
1738 12160 : return;
1739 6 : }
1740 699918 : }
1741 699924 : values.push((self.lsn, val));
1742 712084 : }
1743 :
1744 2 : fn delete(&mut self, key_range: Range<Key>) {
1745 2 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1746 2 : self.pending_deletions.push((key_range, self.lsn));
1747 2 : }
1748 : }
1749 :
1750 : /// This struct facilitates accessing either a committed key from the timeline at a
1751 : /// specific LSN, or the latest uncommitted key from a pending modification.
1752 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1753 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1754 : /// need to look up the keys in the modification first before looking them up in the
1755 : /// timeline to not miss the latest updates.
1756 : #[derive(Clone, Copy)]
1757 : pub enum Version<'a> {
1758 : Lsn(Lsn),
1759 : Modified(&'a DatadirModification<'a>),
1760 : }
1761 :
1762 : impl<'a> Version<'a> {
1763 23542 : async fn get(
1764 23542 : &self,
1765 23542 : timeline: &Timeline,
1766 23542 : key: Key,
1767 23542 : ctx: &RequestContext,
1768 23542 : ) -> Result<Bytes, PageReconstructError> {
1769 23542 : match self {
1770 23532 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1771 10 : Version::Modified(modification) => modification.get(key, ctx).await,
1772 : }
1773 23542 : }
1774 :
1775 35620 : fn get_lsn(&self) -> Lsn {
1776 35620 : match self {
1777 29574 : Version::Lsn(lsn) => *lsn,
1778 6046 : Version::Modified(modification) => modification.lsn,
1779 : }
1780 35620 : }
1781 : }
1782 :
1783 : //--- Metadata structs stored in key-value pairs in the repository.
1784 :
1785 2140 : #[derive(Debug, Serialize, Deserialize)]
1786 : struct DbDirectory {
1787 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1788 : dbdirs: HashMap<(Oid, Oid), bool>,
1789 : }
1790 :
1791 204 : #[derive(Debug, Serialize, Deserialize)]
1792 : struct TwoPhaseDirectory {
1793 : xids: HashSet<TransactionId>,
1794 : }
1795 :
1796 1932 : #[derive(Debug, Serialize, Deserialize, Default)]
1797 : struct RelDirectory {
1798 : // Set of relations that exist. (relfilenode, forknum)
1799 : //
1800 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1801 : // key-value pairs, if you have a lot of relations
1802 : rels: HashSet<(Oid, u8)>,
1803 : }
1804 :
1805 24 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
1806 : pub(crate) struct AuxFilesDirectory {
1807 : pub(crate) files: HashMap<String, Bytes>,
1808 : }
1809 :
1810 : impl AuxFilesDirectory {
1811 28 : pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
1812 28 : if let Some(value) = value {
1813 22 : self.files.insert(key, value);
1814 22 : } else {
1815 6 : self.files.remove(&key);
1816 6 : }
1817 28 : }
1818 : }
1819 :
1820 0 : #[derive(Debug, Serialize, Deserialize)]
1821 : struct RelSizeEntry {
1822 : nblocks: u32,
1823 : }
1824 :
1825 618 : #[derive(Debug, Serialize, Deserialize, Default)]
1826 : struct SlruSegmentDirectory {
1827 : // Set of SLRU segments that exist.
1828 : segments: HashSet<u32>,
1829 : }
1830 :
1831 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
1832 : #[repr(u8)]
1833 : pub(crate) enum DirectoryKind {
1834 : Db,
1835 : TwoPhase,
1836 : Rel,
1837 : AuxFiles,
1838 : SlruSegment(SlruKind),
1839 : }
1840 :
1841 : impl DirectoryKind {
1842 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
1843 5192 : pub(crate) fn offset(&self) -> usize {
1844 5192 : self.into_usize()
1845 5192 : }
1846 : }
1847 :
1848 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1849 :
1850 : #[allow(clippy::bool_assert_comparison)]
1851 : #[cfg(test)]
1852 : mod tests {
1853 : use hex_literal::hex;
1854 : use utils::id::TimelineId;
1855 :
1856 : use super::*;
1857 :
1858 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
1859 :
1860 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
1861 : #[tokio::test]
1862 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
1863 2 : let name = "aux_files_round_trip";
1864 2 : let harness = TenantHarness::create(name)?;
1865 2 :
1866 2 : pub const TIMELINE_ID: TimelineId =
1867 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
1868 2 :
1869 8 : let (tenant, ctx) = harness.load().await;
1870 2 : let tline = tenant
1871 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
1872 2 : .await?;
1873 2 : let tline = tline.raw_timeline().unwrap();
1874 2 :
1875 2 : // First modification: insert two keys
1876 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
1877 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
1878 2 : modification.set_lsn(Lsn(0x1008))?;
1879 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
1880 2 : modification.commit(&ctx).await?;
1881 2 : let expect_1008 = HashMap::from([
1882 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
1883 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
1884 2 : ]);
1885 2 :
1886 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
1887 2 : assert_eq!(readback, expect_1008);
1888 2 :
1889 2 : // Second modification: update one key, remove the other
1890 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
1891 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
1892 2 : modification.set_lsn(Lsn(0x2008))?;
1893 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
1894 2 : modification.commit(&ctx).await?;
1895 2 : let expect_2008 =
1896 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
1897 2 :
1898 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
1899 2 : assert_eq!(readback, expect_2008);
1900 2 :
1901 2 : // Reading back in time works
1902 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
1903 2 : assert_eq!(readback, expect_1008);
1904 2 :
1905 2 : Ok(())
1906 2 : }
1907 :
1908 : /*
1909 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1910 : let incremental = timeline.get_current_logical_size();
1911 : let non_incremental = timeline
1912 : .get_current_logical_size_non_incremental(lsn)
1913 : .unwrap();
1914 : assert_eq!(incremental, non_incremental);
1915 : }
1916 : */
1917 :
1918 : /*
1919 : ///
1920 : /// Test list_rels() function, with branches and dropped relations
1921 : ///
1922 : #[test]
1923 : fn test_list_rels_drop() -> Result<()> {
1924 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1925 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1926 : const TESTDB: u32 = 111;
1927 :
1928 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1929 : // after branching fails below
1930 : let mut writer = tline.begin_record(Lsn(0x10));
1931 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1932 : writer.finish()?;
1933 :
1934 : // Create a relation on the timeline
1935 : let mut writer = tline.begin_record(Lsn(0x20));
1936 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1937 : writer.finish()?;
1938 :
1939 : let writer = tline.begin_record(Lsn(0x00));
1940 : writer.finish()?;
1941 :
1942 : // Check that list_rels() lists it after LSN 2, but no before it
1943 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1944 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
1945 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
1946 :
1947 : // Create a branch, check that the relation is visible there
1948 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
1949 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
1950 : Some(timeline) => timeline,
1951 : None => panic!("Should have a local timeline"),
1952 : };
1953 : let newtline = DatadirTimelineImpl::new(newtline);
1954 : assert!(newtline
1955 : .list_rels(0, TESTDB, Lsn(0x30))?
1956 : .contains(&TESTREL_A));
1957 :
1958 : // Drop it on the branch
1959 : let mut new_writer = newtline.begin_record(Lsn(0x40));
1960 : new_writer.drop_relation(TESTREL_A)?;
1961 : new_writer.finish()?;
1962 :
1963 : // Check that it's no longer listed on the branch after the point where it was dropped
1964 : assert!(newtline
1965 : .list_rels(0, TESTDB, Lsn(0x30))?
1966 : .contains(&TESTREL_A));
1967 : assert!(!newtline
1968 : .list_rels(0, TESTDB, Lsn(0x40))?
1969 : .contains(&TESTREL_A));
1970 :
1971 : // Run checkpoint and garbage collection and check that it's still not visible
1972 : newtline.checkpoint(CheckpointConfig::Forced)?;
1973 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
1974 :
1975 : assert!(!newtline
1976 : .list_rels(0, TESTDB, Lsn(0x40))?
1977 : .contains(&TESTREL_A));
1978 :
1979 : Ok(())
1980 : }
1981 : */
1982 :
1983 : /*
1984 : #[test]
1985 : fn test_read_beyond_eof() -> Result<()> {
1986 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
1987 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
1988 :
1989 : make_some_layers(&tline, Lsn(0x20))?;
1990 : let mut writer = tline.begin_record(Lsn(0x60));
1991 : walingest.put_rel_page_image(
1992 : &mut writer,
1993 : TESTREL_A,
1994 : 0,
1995 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
1996 : )?;
1997 : writer.finish()?;
1998 :
1999 : // Test read before rel creation. Should error out.
2000 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2001 :
2002 : // Read block beyond end of relation at different points in time.
2003 : // These reads should fall into different delta, image, and in-memory layers.
2004 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2005 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2006 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2007 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2008 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2009 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2010 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2011 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2012 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2013 :
2014 : // Test on an in-memory layer with no preceding layer
2015 : let mut writer = tline.begin_record(Lsn(0x70));
2016 : walingest.put_rel_page_image(
2017 : &mut writer,
2018 : TESTREL_B,
2019 : 0,
2020 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2021 : )?;
2022 : writer.finish()?;
2023 :
2024 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2025 :
2026 : Ok(())
2027 : }
2028 : */
2029 : }
|