Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::metrics::WAL_INGEST;
13 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
14 : use crate::walrecord::NeonWalRecord;
15 : use crate::{aux_file, repository::*};
16 : use anyhow::{ensure, Context};
17 : use bytes::{Buf, Bytes, BytesMut};
18 : use enum_map::Enum;
19 : use itertools::Itertools;
20 : use pageserver_api::key::{
21 : dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
22 : rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
23 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
24 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
25 : };
26 : use pageserver_api::keyspace::SparseKeySpace;
27 : use pageserver_api::models::AuxFilePolicy;
28 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
29 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
30 : use postgres_ffi::BLCKSZ;
31 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
32 : use serde::{Deserialize, Serialize};
33 : use std::collections::{hash_map, HashMap, HashSet};
34 : use std::ops::ControlFlow;
35 : use std::ops::Range;
36 : use strum::IntoEnumIterator;
37 : use tokio_util::sync::CancellationToken;
38 : use tracing::{debug, info, trace, warn};
39 : use utils::bin_ser::DeserializeError;
40 : use utils::vec_map::{VecMap, VecMapOrdering};
41 : use utils::{bin_ser::BeSer, lsn::Lsn};
42 :
43 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
44 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
45 :
46 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
47 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
48 :
49 : #[derive(Debug)]
50 : pub enum LsnForTimestamp {
51 : /// Found commits both before and after the given timestamp
52 : Present(Lsn),
53 :
54 : /// Found no commits after the given timestamp, this means
55 : /// that the newest data in the branch is older than the given
56 : /// timestamp.
57 : ///
58 : /// All commits <= LSN happened before the given timestamp
59 : Future(Lsn),
60 :
61 : /// The queried timestamp is past our horizon we look back at (PITR)
62 : ///
63 : /// All commits > LSN happened after the given timestamp,
64 : /// but any commits < LSN might have happened before or after
65 : /// the given timestamp. We don't know because no data before
66 : /// the given lsn is available.
67 : Past(Lsn),
68 :
69 : /// We have found no commit with a timestamp,
70 : /// so we can't return anything meaningful.
71 : ///
72 : /// The associated LSN is the lower bound value we can safely
73 : /// create branches on, but no statement is made if it is
74 : /// older or newer than the timestamp.
75 : ///
76 : /// This variant can e.g. be returned right after a
77 : /// cluster import.
78 : NoData(Lsn),
79 : }
80 :
81 0 : #[derive(Debug, thiserror::Error)]
82 : pub enum CalculateLogicalSizeError {
83 : #[error("cancelled")]
84 : Cancelled,
85 : #[error(transparent)]
86 : Other(#[from] anyhow::Error),
87 : }
88 :
89 0 : #[derive(Debug, thiserror::Error)]
90 : pub(crate) enum CollectKeySpaceError {
91 : #[error(transparent)]
92 : Decode(#[from] DeserializeError),
93 : #[error(transparent)]
94 : PageRead(PageReconstructError),
95 : #[error("cancelled")]
96 : Cancelled,
97 : }
98 :
99 : impl From<PageReconstructError> for CollectKeySpaceError {
100 0 : fn from(err: PageReconstructError) -> Self {
101 0 : match err {
102 0 : PageReconstructError::Cancelled => Self::Cancelled,
103 0 : err => Self::PageRead(err),
104 : }
105 0 : }
106 : }
107 :
108 : impl From<PageReconstructError> for CalculateLogicalSizeError {
109 0 : fn from(pre: PageReconstructError) -> Self {
110 0 : match pre {
111 : PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
112 0 : Self::Cancelled
113 : }
114 0 : _ => Self::Other(pre.into()),
115 : }
116 0 : }
117 : }
118 :
119 0 : #[derive(Debug, thiserror::Error)]
120 : pub enum RelationError {
121 : #[error("Relation Already Exists")]
122 : AlreadyExists,
123 : #[error("invalid relnode")]
124 : InvalidRelnode,
125 : #[error(transparent)]
126 : Other(#[from] anyhow::Error),
127 : }
128 :
129 : ///
130 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
131 : /// and other special kinds of files, in a versioned key-value store. The
132 : /// Timeline struct provides the key-value store.
133 : ///
134 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
135 : /// implementation, and might be moved into a separate struct later.
136 : impl Timeline {
137 : /// Start ingesting a WAL record, or other atomic modification of
138 : /// the timeline.
139 : ///
140 : /// This provides a transaction-like interface to perform a bunch
141 : /// of modifications atomically.
142 : ///
143 : /// To ingest a WAL record, call begin_modification(lsn) to get a
144 : /// DatadirModification object. Use the functions in the object to
145 : /// modify the repository state, updating all the pages and metadata
146 : /// that the WAL record affects. When you're done, call commit() to
147 : /// commit the changes.
148 : ///
149 : /// Lsn stored in modification is advanced by `ingest_record` and
150 : /// is used by `commit()` to update `last_record_lsn`.
151 : ///
152 : /// Calling commit() will flush all the changes and reset the state,
153 : /// so the `DatadirModification` struct can be reused to perform the next modification.
154 : ///
155 : /// Note that any pending modifications you make through the
156 : /// modification object won't be visible to calls to the 'get' and list
157 : /// functions of the timeline until you finish! And if you update the
158 : /// same page twice, the last update wins.
159 : ///
160 268326 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
161 268326 : where
162 268326 : Self: Sized,
163 268326 : {
164 268326 : DatadirModification {
165 268326 : tline: self,
166 268326 : pending_lsns: Vec::new(),
167 268326 : pending_updates: HashMap::new(),
168 268326 : pending_deletions: Vec::new(),
169 268326 : pending_nblocks: 0,
170 268326 : pending_directory_entries: Vec::new(),
171 268326 : lsn,
172 268326 : }
173 268326 : }
174 :
175 : //------------------------------------------------------------------------------
176 : // Public GET functions
177 : //------------------------------------------------------------------------------
178 :
179 : /// Look up given page version.
180 18384 : pub(crate) async fn get_rel_page_at_lsn(
181 18384 : &self,
182 18384 : tag: RelTag,
183 18384 : blknum: BlockNumber,
184 18384 : version: Version<'_>,
185 18384 : ctx: &RequestContext,
186 18384 : ) -> Result<Bytes, PageReconstructError> {
187 18384 : if tag.relnode == 0 {
188 0 : return Err(PageReconstructError::Other(
189 0 : RelationError::InvalidRelnode.into(),
190 0 : ));
191 18384 : }
192 :
193 18384 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
194 18384 : if blknum >= nblocks {
195 0 : debug!(
196 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
197 0 : tag,
198 0 : blknum,
199 0 : version.get_lsn(),
200 : nblocks
201 : );
202 0 : return Ok(ZERO_PAGE.clone());
203 18384 : }
204 18384 :
205 18384 : let key = rel_block_to_key(tag, blknum);
206 18384 : version.get(self, key, ctx).await
207 18384 : }
208 :
209 : // Get size of a database in blocks
210 0 : pub(crate) async fn get_db_size(
211 0 : &self,
212 0 : spcnode: Oid,
213 0 : dbnode: Oid,
214 0 : version: Version<'_>,
215 0 : ctx: &RequestContext,
216 0 : ) -> Result<usize, PageReconstructError> {
217 0 : let mut total_blocks = 0;
218 :
219 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
220 :
221 0 : for rel in rels {
222 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
223 0 : total_blocks += n_blocks as usize;
224 : }
225 0 : Ok(total_blocks)
226 0 : }
227 :
228 : /// Get size of a relation file
229 24434 : pub(crate) async fn get_rel_size(
230 24434 : &self,
231 24434 : tag: RelTag,
232 24434 : version: Version<'_>,
233 24434 : ctx: &RequestContext,
234 24434 : ) -> Result<BlockNumber, PageReconstructError> {
235 24434 : if tag.relnode == 0 {
236 0 : return Err(PageReconstructError::Other(
237 0 : RelationError::InvalidRelnode.into(),
238 0 : ));
239 24434 : }
240 :
241 24434 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
242 19294 : return Ok(nblocks);
243 5140 : }
244 5140 :
245 5140 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
246 0 : && !self.get_rel_exists(tag, version, ctx).await?
247 : {
248 : // FIXME: Postgres sometimes calls smgrcreate() to create
249 : // FSM, and smgrnblocks() on it immediately afterwards,
250 : // without extending it. Tolerate that by claiming that
251 : // any non-existent FSM fork has size 0.
252 0 : return Ok(0);
253 5140 : }
254 5140 :
255 5140 : let key = rel_size_to_key(tag);
256 5140 : let mut buf = version.get(self, key, ctx).await?;
257 5136 : let nblocks = buf.get_u32_le();
258 5136 :
259 5136 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
260 5136 :
261 5136 : Ok(nblocks)
262 24434 : }
263 :
264 : /// Does relation exist?
265 6050 : pub(crate) async fn get_rel_exists(
266 6050 : &self,
267 6050 : tag: RelTag,
268 6050 : version: Version<'_>,
269 6050 : ctx: &RequestContext,
270 6050 : ) -> Result<bool, PageReconstructError> {
271 6050 : if tag.relnode == 0 {
272 0 : return Err(PageReconstructError::Other(
273 0 : RelationError::InvalidRelnode.into(),
274 0 : ));
275 6050 : }
276 :
277 : // first try to lookup relation in cache
278 6050 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
279 6032 : return Ok(true);
280 18 : }
281 18 : // fetch directory listing
282 18 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
283 18 : let buf = version.get(self, key, ctx).await?;
284 :
285 18 : match RelDirectory::des(&buf).context("deserialization failure") {
286 18 : Ok(dir) => {
287 18 : let exists = dir.rels.contains(&(tag.relnode, tag.forknum));
288 18 : Ok(exists)
289 : }
290 0 : Err(e) => Err(PageReconstructError::from(e)),
291 : }
292 6050 : }
293 :
294 : /// Get a list of all existing relations in given tablespace and database.
295 : ///
296 : /// # Cancel-Safety
297 : ///
298 : /// This method is cancellation-safe.
299 0 : pub(crate) async fn list_rels(
300 0 : &self,
301 0 : spcnode: Oid,
302 0 : dbnode: Oid,
303 0 : version: Version<'_>,
304 0 : ctx: &RequestContext,
305 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
306 0 : // fetch directory listing
307 0 : let key = rel_dir_to_key(spcnode, dbnode);
308 0 : let buf = version.get(self, key, ctx).await?;
309 :
310 0 : match RelDirectory::des(&buf).context("deserialization failure") {
311 0 : Ok(dir) => {
312 0 : let rels: HashSet<RelTag> =
313 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
314 0 : spcnode,
315 0 : dbnode,
316 0 : relnode: *relnode,
317 0 : forknum: *forknum,
318 0 : }));
319 0 :
320 0 : Ok(rels)
321 : }
322 0 : Err(e) => Err(PageReconstructError::from(e)),
323 : }
324 0 : }
325 :
326 : /// Get the whole SLRU segment
327 0 : pub(crate) async fn get_slru_segment(
328 0 : &self,
329 0 : kind: SlruKind,
330 0 : segno: u32,
331 0 : lsn: Lsn,
332 0 : ctx: &RequestContext,
333 0 : ) -> Result<Bytes, PageReconstructError> {
334 0 : let n_blocks = self
335 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
336 0 : .await?;
337 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
338 0 : for blkno in 0..n_blocks {
339 0 : let block = self
340 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
341 0 : .await?;
342 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
343 : }
344 0 : Ok(segment.freeze())
345 0 : }
346 :
347 : /// Look up given SLRU page version.
348 0 : pub(crate) async fn get_slru_page_at_lsn(
349 0 : &self,
350 0 : kind: SlruKind,
351 0 : segno: u32,
352 0 : blknum: BlockNumber,
353 0 : lsn: Lsn,
354 0 : ctx: &RequestContext,
355 0 : ) -> Result<Bytes, PageReconstructError> {
356 0 : let key = slru_block_to_key(kind, segno, blknum);
357 0 : self.get(key, lsn, ctx).await
358 0 : }
359 :
360 : /// Get size of an SLRU segment
361 0 : pub(crate) async fn get_slru_segment_size(
362 0 : &self,
363 0 : kind: SlruKind,
364 0 : segno: u32,
365 0 : version: Version<'_>,
366 0 : ctx: &RequestContext,
367 0 : ) -> Result<BlockNumber, PageReconstructError> {
368 0 : let key = slru_segment_size_to_key(kind, segno);
369 0 : let mut buf = version.get(self, key, ctx).await?;
370 0 : Ok(buf.get_u32_le())
371 0 : }
372 :
373 : /// Get size of an SLRU segment
374 0 : pub(crate) async fn get_slru_segment_exists(
375 0 : &self,
376 0 : kind: SlruKind,
377 0 : segno: u32,
378 0 : version: Version<'_>,
379 0 : ctx: &RequestContext,
380 0 : ) -> Result<bool, PageReconstructError> {
381 0 : // fetch directory listing
382 0 : let key = slru_dir_to_key(kind);
383 0 : let buf = version.get(self, key, ctx).await?;
384 :
385 0 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
386 0 : Ok(dir) => {
387 0 : let exists = dir.segments.contains(&segno);
388 0 : Ok(exists)
389 : }
390 0 : Err(e) => Err(PageReconstructError::from(e)),
391 : }
392 0 : }
393 :
394 : /// Locate LSN, such that all transactions that committed before
395 : /// 'search_timestamp' are visible, but nothing newer is.
396 : ///
397 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
398 : /// so it's not well defined which LSN you get if there were multiple commits
399 : /// "in flight" at that point in time.
400 : ///
401 0 : pub(crate) async fn find_lsn_for_timestamp(
402 0 : &self,
403 0 : search_timestamp: TimestampTz,
404 0 : cancel: &CancellationToken,
405 0 : ctx: &RequestContext,
406 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
407 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
408 0 : // We use this method to figure out the branching LSN for the new branch, but the
409 0 : // GC cutoff could be before the branching point and we cannot create a new branch
410 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
411 0 : // on the safe side.
412 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
413 0 : let max_lsn = self.get_last_record_lsn();
414 0 :
415 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
416 0 : // LSN divided by 8.
417 0 : let mut low = min_lsn.0 / 8;
418 0 : let mut high = max_lsn.0 / 8 + 1;
419 0 :
420 0 : let mut found_smaller = false;
421 0 : let mut found_larger = false;
422 0 : while low < high {
423 0 : if cancel.is_cancelled() {
424 0 : return Err(PageReconstructError::Cancelled);
425 0 : }
426 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
427 0 : let mid = (high + low) / 2;
428 :
429 0 : let cmp = self
430 0 : .is_latest_commit_timestamp_ge_than(
431 0 : search_timestamp,
432 0 : Lsn(mid * 8),
433 0 : &mut found_smaller,
434 0 : &mut found_larger,
435 0 : ctx,
436 0 : )
437 0 : .await?;
438 :
439 0 : if cmp {
440 0 : high = mid;
441 0 : } else {
442 0 : low = mid + 1;
443 0 : }
444 : }
445 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
446 : // so the LSN of the last commit record before or at `search_timestamp`.
447 : // Remove one from `low` to get `t`.
448 : //
449 : // FIXME: it would be better to get the LSN of the previous commit.
450 : // Otherwise, if you restore to the returned LSN, the database will
451 : // include physical changes from later commits that will be marked
452 : // as aborted, and will need to be vacuumed away.
453 0 : let commit_lsn = Lsn((low - 1) * 8);
454 0 : match (found_smaller, found_larger) {
455 : (false, false) => {
456 : // This can happen if no commit records have been processed yet, e.g.
457 : // just after importing a cluster.
458 0 : Ok(LsnForTimestamp::NoData(min_lsn))
459 : }
460 : (false, true) => {
461 : // Didn't find any commit timestamps smaller than the request
462 0 : Ok(LsnForTimestamp::Past(min_lsn))
463 : }
464 0 : (true, _) if commit_lsn < min_lsn => {
465 0 : // the search above did set found_smaller to true but it never increased the lsn.
466 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
467 0 : // below the min_lsn. We should never do that.
468 0 : Ok(LsnForTimestamp::Past(min_lsn))
469 : }
470 : (true, false) => {
471 : // Only found commits with timestamps smaller than the request.
472 : // It's still a valid case for branch creation, return it.
473 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
474 : // case, anyway.
475 0 : Ok(LsnForTimestamp::Future(commit_lsn))
476 : }
477 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
478 : }
479 0 : }
480 :
481 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
482 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
483 : ///
484 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
485 : /// with a smaller/larger timestamp.
486 : ///
487 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
488 0 : &self,
489 0 : search_timestamp: TimestampTz,
490 0 : probe_lsn: Lsn,
491 0 : found_smaller: &mut bool,
492 0 : found_larger: &mut bool,
493 0 : ctx: &RequestContext,
494 0 : ) -> Result<bool, PageReconstructError> {
495 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
496 0 : if timestamp >= search_timestamp {
497 0 : *found_larger = true;
498 0 : return ControlFlow::Break(true);
499 0 : } else {
500 0 : *found_smaller = true;
501 0 : }
502 0 : ControlFlow::Continue(())
503 0 : })
504 0 : .await
505 0 : }
506 :
507 : /// Obtain the possible timestamp range for the given lsn.
508 : ///
509 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
510 0 : pub(crate) async fn get_timestamp_for_lsn(
511 0 : &self,
512 0 : probe_lsn: Lsn,
513 0 : ctx: &RequestContext,
514 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
515 0 : let mut max: Option<TimestampTz> = None;
516 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
517 0 : if let Some(max_prev) = max {
518 0 : max = Some(max_prev.max(timestamp));
519 0 : } else {
520 0 : max = Some(timestamp);
521 0 : }
522 0 : ControlFlow::Continue(())
523 0 : })
524 0 : .await?;
525 :
526 0 : Ok(max)
527 0 : }
528 :
529 : /// Runs the given function on all the timestamps for a given lsn
530 : ///
531 : /// The return value is either given by the closure, or set to the `Default`
532 : /// impl's output.
533 0 : async fn map_all_timestamps<T: Default>(
534 0 : &self,
535 0 : probe_lsn: Lsn,
536 0 : ctx: &RequestContext,
537 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
538 0 : ) -> Result<T, PageReconstructError> {
539 0 : for segno in self
540 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
541 0 : .await?
542 : {
543 0 : let nblocks = self
544 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
545 0 : .await?;
546 0 : for blknum in (0..nblocks).rev() {
547 0 : let clog_page = self
548 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
549 0 : .await?;
550 :
551 0 : if clog_page.len() == BLCKSZ as usize + 8 {
552 0 : let mut timestamp_bytes = [0u8; 8];
553 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
554 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
555 0 :
556 0 : match f(timestamp) {
557 0 : ControlFlow::Break(b) => return Ok(b),
558 0 : ControlFlow::Continue(()) => (),
559 : }
560 0 : }
561 : }
562 : }
563 0 : Ok(Default::default())
564 0 : }
565 :
566 0 : pub(crate) async fn get_slru_keyspace(
567 0 : &self,
568 0 : version: Version<'_>,
569 0 : ctx: &RequestContext,
570 0 : ) -> Result<KeySpace, PageReconstructError> {
571 0 : let mut accum = KeySpaceAccum::new();
572 :
573 0 : for kind in SlruKind::iter() {
574 0 : let mut segments: Vec<u32> = self
575 0 : .list_slru_segments(kind, version, ctx)
576 0 : .await?
577 0 : .into_iter()
578 0 : .collect();
579 0 : segments.sort_unstable();
580 :
581 0 : for seg in segments {
582 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
583 :
584 0 : accum.add_range(
585 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
586 0 : );
587 : }
588 : }
589 :
590 0 : Ok(accum.to_keyspace())
591 0 : }
592 :
593 : /// Get a list of SLRU segments
594 0 : pub(crate) async fn list_slru_segments(
595 0 : &self,
596 0 : kind: SlruKind,
597 0 : version: Version<'_>,
598 0 : ctx: &RequestContext,
599 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
600 0 : // fetch directory entry
601 0 : let key = slru_dir_to_key(kind);
602 :
603 0 : let buf = version.get(self, key, ctx).await?;
604 0 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
605 0 : Ok(dir) => Ok(dir.segments),
606 0 : Err(e) => Err(PageReconstructError::from(e)),
607 : }
608 0 : }
609 :
610 0 : pub(crate) async fn get_relmap_file(
611 0 : &self,
612 0 : spcnode: Oid,
613 0 : dbnode: Oid,
614 0 : version: Version<'_>,
615 0 : ctx: &RequestContext,
616 0 : ) -> Result<Bytes, PageReconstructError> {
617 0 : let key = relmap_file_key(spcnode, dbnode);
618 :
619 0 : let buf = version.get(self, key, ctx).await?;
620 0 : Ok(buf)
621 0 : }
622 :
623 0 : pub(crate) async fn list_dbdirs(
624 0 : &self,
625 0 : lsn: Lsn,
626 0 : ctx: &RequestContext,
627 0 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
628 : // fetch directory entry
629 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
630 :
631 0 : match DbDirectory::des(&buf).context("deserialization failure") {
632 0 : Ok(dir) => Ok(dir.dbdirs),
633 0 : Err(e) => Err(PageReconstructError::from(e)),
634 : }
635 0 : }
636 :
637 0 : pub(crate) async fn get_twophase_file(
638 0 : &self,
639 0 : xid: TransactionId,
640 0 : lsn: Lsn,
641 0 : ctx: &RequestContext,
642 0 : ) -> Result<Bytes, PageReconstructError> {
643 0 : let key = twophase_file_key(xid);
644 0 : let buf = self.get(key, lsn, ctx).await?;
645 0 : Ok(buf)
646 0 : }
647 :
648 0 : pub(crate) async fn list_twophase_files(
649 0 : &self,
650 0 : lsn: Lsn,
651 0 : ctx: &RequestContext,
652 0 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
653 : // fetch directory entry
654 0 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
655 :
656 0 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
657 0 : Ok(dir) => Ok(dir.xids),
658 0 : Err(e) => Err(PageReconstructError::from(e)),
659 : }
660 0 : }
661 :
662 0 : pub(crate) async fn get_control_file(
663 0 : &self,
664 0 : lsn: Lsn,
665 0 : ctx: &RequestContext,
666 0 : ) -> Result<Bytes, PageReconstructError> {
667 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
668 0 : }
669 :
670 12 : pub(crate) async fn get_checkpoint(
671 12 : &self,
672 12 : lsn: Lsn,
673 12 : ctx: &RequestContext,
674 12 : ) -> Result<Bytes, PageReconstructError> {
675 12 : self.get(CHECKPOINT_KEY, lsn, ctx).await
676 12 : }
677 :
678 14 : async fn list_aux_files_v1(
679 14 : &self,
680 14 : lsn: Lsn,
681 14 : ctx: &RequestContext,
682 14 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
683 14 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
684 12 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
685 12 : Ok(dir) => Ok(dir.files),
686 0 : Err(e) => Err(PageReconstructError::from(e)),
687 : },
688 2 : Err(e) => {
689 2 : // This is expected: historical databases do not have the key.
690 2 : debug!("Failed to get info about AUX files: {}", e);
691 2 : Ok(HashMap::new())
692 : }
693 : }
694 14 : }
695 :
696 8 : async fn list_aux_files_v2(
697 8 : &self,
698 8 : lsn: Lsn,
699 8 : ctx: &RequestContext,
700 8 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
701 8 : let kv = self
702 8 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
703 0 : .await
704 8 : .context("scan")?;
705 8 : let mut result = HashMap::new();
706 8 : let mut sz = 0;
707 24 : for (_, v) in kv {
708 16 : let v = v.context("get value")?;
709 16 : let v = aux_file::decode_file_value_bytes(&v).context("value decode")?;
710 32 : for (fname, content) in v {
711 16 : sz += fname.len();
712 16 : sz += content.len();
713 16 : result.insert(fname, content);
714 16 : }
715 : }
716 8 : self.aux_file_size_estimator.on_base_backup(sz);
717 8 : Ok(result)
718 8 : }
719 :
720 20 : pub(crate) async fn list_aux_files(
721 20 : &self,
722 20 : lsn: Lsn,
723 20 : ctx: &RequestContext,
724 20 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
725 20 : let current_policy = self.last_aux_file_policy.load();
726 20 : match current_policy {
727 12 : Some(AuxFilePolicy::V1) | None => self.list_aux_files_v1(lsn, ctx).await,
728 6 : Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
729 : Some(AuxFilePolicy::CrossValidation) => {
730 2 : let v1_result = self.list_aux_files_v1(lsn, ctx).await;
731 2 : let v2_result = self.list_aux_files_v2(lsn, ctx).await;
732 2 : match (v1_result, v2_result) {
733 2 : (Ok(v1), Ok(v2)) => {
734 2 : if v1 != v2 {
735 0 : tracing::error!(
736 0 : "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
737 : );
738 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
739 0 : "unmatched aux file v1 v2 result"
740 0 : )));
741 2 : }
742 2 : Ok(v1)
743 : }
744 0 : (Ok(_), Err(v2)) => {
745 0 : tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
746 0 : Err(v2)
747 : }
748 0 : (Err(v1), Ok(_)) => {
749 0 : tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
750 0 : Err(v1)
751 : }
752 0 : (Err(_), Err(v2)) => Err(v2),
753 : }
754 : }
755 : }
756 20 : }
757 :
758 : /// Does the same as get_current_logical_size but counted on demand.
759 : /// Used to initialize the logical size tracking on startup.
760 : ///
761 : /// Only relation blocks are counted currently. That excludes metadata,
762 : /// SLRUs, twophase files etc.
763 : ///
764 : /// # Cancel-Safety
765 : ///
766 : /// This method is cancellation-safe.
767 0 : pub async fn get_current_logical_size_non_incremental(
768 0 : &self,
769 0 : lsn: Lsn,
770 0 : ctx: &RequestContext,
771 0 : ) -> Result<u64, CalculateLogicalSizeError> {
772 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
773 :
774 : // Fetch list of database dirs and iterate them
775 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
776 0 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
777 :
778 0 : let mut total_size: u64 = 0;
779 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
780 0 : for rel in self
781 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
782 0 : .await?
783 : {
784 0 : if self.cancel.is_cancelled() {
785 0 : return Err(CalculateLogicalSizeError::Cancelled);
786 0 : }
787 0 : let relsize_key = rel_size_to_key(rel);
788 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
789 0 : let relsize = buf.get_u32_le();
790 0 :
791 0 : total_size += relsize as u64;
792 : }
793 : }
794 0 : Ok(total_size * BLCKSZ as u64)
795 0 : }
796 :
797 : ///
798 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
799 : /// Anything that's not listed maybe removed from the underlying storage (from
800 : /// that LSN forwards).
801 : ///
802 : /// The return value is (dense keyspace, sparse keyspace).
803 232 : pub(crate) async fn collect_keyspace(
804 232 : &self,
805 232 : lsn: Lsn,
806 232 : ctx: &RequestContext,
807 232 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
808 232 : // Iterate through key ranges, greedily packing them into partitions
809 232 : let mut result = KeySpaceAccum::new();
810 232 :
811 232 : // The dbdir metadata always exists
812 232 : result.add_key(DBDIR_KEY);
813 :
814 : // Fetch list of database dirs and iterate them
815 2303 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
816 232 : let dbdir = DbDirectory::des(&buf)?;
817 :
818 232 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
819 232 : dbs.sort_unstable();
820 232 : for (spcnode, dbnode) in dbs {
821 0 : result.add_key(relmap_file_key(spcnode, dbnode));
822 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
823 :
824 0 : let mut rels: Vec<RelTag> = self
825 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
826 0 : .await?
827 0 : .into_iter()
828 0 : .collect();
829 0 : rels.sort_unstable();
830 0 : for rel in rels {
831 0 : let relsize_key = rel_size_to_key(rel);
832 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
833 0 : let relsize = buf.get_u32_le();
834 0 :
835 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
836 0 : result.add_key(relsize_key);
837 : }
838 : }
839 :
840 : // Iterate SLRUs next
841 696 : for kind in [
842 232 : SlruKind::Clog,
843 232 : SlruKind::MultiXactMembers,
844 232 : SlruKind::MultiXactOffsets,
845 : ] {
846 696 : let slrudir_key = slru_dir_to_key(kind);
847 696 : result.add_key(slrudir_key);
848 7965 : let buf = self.get(slrudir_key, lsn, ctx).await?;
849 696 : let dir = SlruSegmentDirectory::des(&buf)?;
850 696 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
851 696 : segments.sort_unstable();
852 696 : for segno in segments {
853 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
854 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
855 0 : let segsize = buf.get_u32_le();
856 0 :
857 0 : result.add_range(
858 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
859 0 : );
860 0 : result.add_key(segsize_key);
861 : }
862 : }
863 :
864 : // Then pg_twophase
865 232 : result.add_key(TWOPHASEDIR_KEY);
866 3070 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
867 232 : let twophase_dir = TwoPhaseDirectory::des(&buf)?;
868 232 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
869 232 : xids.sort_unstable();
870 232 : for xid in xids {
871 0 : result.add_key(twophase_file_key(xid));
872 0 : }
873 :
874 232 : result.add_key(CONTROLFILE_KEY);
875 232 : result.add_key(CHECKPOINT_KEY);
876 232 :
877 232 : // Remove v1 keyspace if the user has fully switched to v2.
878 232 : if self.last_aux_file_policy.load() != Some(AuxFilePolicy::V2) {
879 230 : if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
880 126 : result.add_key(AUX_FILES_KEY);
881 126 : }
882 2 : }
883 :
884 232 : Ok((
885 232 : result.to_keyspace(),
886 232 : /* AUX sparse key space */
887 232 : SparseKeySpace(KeySpace::single(Key::metadata_aux_key_range())),
888 232 : ))
889 232 : }
890 :
891 : /// Get cached size of relation if it not updated after specified LSN
892 448540 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
893 448540 : let rel_size_cache = self.rel_size_cache.read().unwrap();
894 448540 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
895 448518 : if lsn >= *cached_lsn {
896 443372 : return Some(*nblocks);
897 5146 : }
898 22 : }
899 5168 : None
900 448540 : }
901 :
902 : /// Update cached relation size if there is no more recent update
903 5136 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
904 5136 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
905 5136 :
906 5136 : if lsn < rel_size_cache.complete_as_of {
907 : // Do not cache old values. It's safe to cache the size on read, as long as
908 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
909 : // never evict values from the cache, so if the relation size changed after
910 : // 'lsn', the new value is already in the cache.
911 0 : return;
912 5136 : }
913 5136 :
914 5136 : match rel_size_cache.map.entry(tag) {
915 5136 : hash_map::Entry::Occupied(mut entry) => {
916 5136 : let cached_lsn = entry.get_mut();
917 5136 : if lsn >= cached_lsn.0 {
918 0 : *cached_lsn = (lsn, nblocks);
919 5136 : }
920 : }
921 0 : hash_map::Entry::Vacant(entry) => {
922 0 : entry.insert((lsn, nblocks));
923 0 : }
924 : }
925 5136 : }
926 :
927 : /// Store cached relation size
928 288732 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
929 288732 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
930 288732 : rel_size_cache.map.insert(tag, (lsn, nblocks));
931 288732 : }
932 :
933 : /// Remove cached relation size
934 2 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
935 2 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
936 2 : rel_size_cache.map.remove(tag);
937 2 : }
938 : }
939 :
940 : /// DatadirModification represents an operation to ingest an atomic set of
941 : /// updates to the repository. It is created by the 'begin_record'
942 : /// function. It is called for each WAL record, so that all the modifications
943 : /// by a one WAL record appear atomic.
944 : pub struct DatadirModification<'a> {
945 : /// The timeline this modification applies to. You can access this to
946 : /// read the state, but note that any pending updates are *not* reflected
947 : /// in the state in 'tline' yet.
948 : pub tline: &'a Timeline,
949 :
950 : /// Current LSN of the modification
951 : lsn: Lsn,
952 :
953 : // The modifications are not applied directly to the underlying key-value store.
954 : // The put-functions add the modifications here, and they are flushed to the
955 : // underlying key-value store by the 'finish' function.
956 : pending_lsns: Vec<Lsn>,
957 : pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
958 : pending_deletions: Vec<(Range<Key>, Lsn)>,
959 : pending_nblocks: i64,
960 :
961 : /// For special "directory" keys that store key-value maps, track the size of the map
962 : /// if it was updated in this modification.
963 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
964 : }
965 :
966 : impl<'a> DatadirModification<'a> {
967 : /// Get the current lsn
968 418056 : pub(crate) fn get_lsn(&self) -> Lsn {
969 418056 : self.lsn
970 418056 : }
971 :
972 : /// Set the current lsn
973 145858 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
974 145858 : ensure!(
975 145858 : lsn >= self.lsn,
976 0 : "setting an older lsn {} than {} is not allowed",
977 : lsn,
978 : self.lsn
979 : );
980 145858 : if lsn > self.lsn {
981 145858 : self.pending_lsns.push(self.lsn);
982 145858 : self.lsn = lsn;
983 145858 : }
984 145858 : Ok(())
985 145858 : }
986 :
987 : /// Initialize a completely new repository.
988 : ///
989 : /// This inserts the directory metadata entries that are assumed to
990 : /// always exist.
991 116 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
992 116 : let buf = DbDirectory::ser(&DbDirectory {
993 116 : dbdirs: HashMap::new(),
994 116 : })?;
995 116 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
996 116 : self.put(DBDIR_KEY, Value::Image(buf.into()));
997 116 :
998 116 : // Create AuxFilesDirectory
999 116 : self.init_aux_dir()?;
1000 :
1001 116 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1002 116 : xids: HashSet::new(),
1003 116 : })?;
1004 116 : self.pending_directory_entries
1005 116 : .push((DirectoryKind::TwoPhase, 0));
1006 116 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1007 :
1008 116 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1009 116 : let empty_dir = Value::Image(buf);
1010 116 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1011 116 : self.pending_directory_entries
1012 116 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1013 116 : self.put(
1014 116 : slru_dir_to_key(SlruKind::MultiXactMembers),
1015 116 : empty_dir.clone(),
1016 116 : );
1017 116 : self.pending_directory_entries
1018 116 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1019 116 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1020 116 : self.pending_directory_entries
1021 116 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1022 116 :
1023 116 : Ok(())
1024 116 : }
1025 :
1026 : #[cfg(test)]
1027 114 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1028 114 : self.init_empty()?;
1029 114 : self.put_control_file(bytes::Bytes::from_static(
1030 114 : b"control_file contents do not matter",
1031 114 : ))
1032 114 : .context("put_control_file")?;
1033 114 : self.put_checkpoint(bytes::Bytes::from_static(
1034 114 : b"checkpoint_file contents do not matter",
1035 114 : ))
1036 114 : .context("put_checkpoint_file")?;
1037 114 : Ok(())
1038 114 : }
1039 :
1040 : /// Put a new page version that can be constructed from a WAL record
1041 : ///
1042 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1043 : /// current end-of-file. It's up to the caller to check that the relation size
1044 : /// matches the blocks inserted!
1045 145630 : pub fn put_rel_wal_record(
1046 145630 : &mut self,
1047 145630 : rel: RelTag,
1048 145630 : blknum: BlockNumber,
1049 145630 : rec: NeonWalRecord,
1050 145630 : ) -> anyhow::Result<()> {
1051 145630 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1052 145630 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1053 145630 : Ok(())
1054 145630 : }
1055 :
1056 : // Same, but for an SLRU.
1057 8 : pub fn put_slru_wal_record(
1058 8 : &mut self,
1059 8 : kind: SlruKind,
1060 8 : segno: u32,
1061 8 : blknum: BlockNumber,
1062 8 : rec: NeonWalRecord,
1063 8 : ) -> anyhow::Result<()> {
1064 8 : self.put(
1065 8 : slru_block_to_key(kind, segno, blknum),
1066 8 : Value::WalRecord(rec),
1067 8 : );
1068 8 : Ok(())
1069 8 : }
1070 :
1071 : /// Like put_wal_record, but with ready-made image of the page.
1072 280864 : pub fn put_rel_page_image(
1073 280864 : &mut self,
1074 280864 : rel: RelTag,
1075 280864 : blknum: BlockNumber,
1076 280864 : img: Bytes,
1077 280864 : ) -> anyhow::Result<()> {
1078 280864 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1079 280864 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1080 280864 : Ok(())
1081 280864 : }
1082 :
1083 6 : pub fn put_slru_page_image(
1084 6 : &mut self,
1085 6 : kind: SlruKind,
1086 6 : segno: u32,
1087 6 : blknum: BlockNumber,
1088 6 : img: Bytes,
1089 6 : ) -> anyhow::Result<()> {
1090 6 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
1091 6 : Ok(())
1092 6 : }
1093 :
1094 : /// Store a relmapper file (pg_filenode.map) in the repository
1095 16 : pub async fn put_relmap_file(
1096 16 : &mut self,
1097 16 : spcnode: Oid,
1098 16 : dbnode: Oid,
1099 16 : img: Bytes,
1100 16 : ctx: &RequestContext,
1101 16 : ) -> anyhow::Result<()> {
1102 : // Add it to the directory (if it doesn't exist already)
1103 16 : let buf = self.get(DBDIR_KEY, ctx).await?;
1104 16 : let mut dbdir = DbDirectory::des(&buf)?;
1105 :
1106 16 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1107 16 : if r.is_none() || r == Some(false) {
1108 : // The dbdir entry didn't exist, or it contained a
1109 : // 'false'. The 'insert' call already updated it with
1110 : // 'true', now write the updated 'dbdirs' map back.
1111 16 : let buf = DbDirectory::ser(&dbdir)?;
1112 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1113 16 :
1114 16 : // Create AuxFilesDirectory as well
1115 16 : self.init_aux_dir()?;
1116 0 : }
1117 16 : if r.is_none() {
1118 8 : // Create RelDirectory
1119 8 : let buf = RelDirectory::ser(&RelDirectory {
1120 8 : rels: HashSet::new(),
1121 8 : })?;
1122 8 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1123 8 : self.put(
1124 8 : rel_dir_to_key(spcnode, dbnode),
1125 8 : Value::Image(Bytes::from(buf)),
1126 8 : );
1127 8 : }
1128 :
1129 16 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1130 16 : Ok(())
1131 16 : }
1132 :
1133 0 : pub async fn put_twophase_file(
1134 0 : &mut self,
1135 0 : xid: TransactionId,
1136 0 : img: Bytes,
1137 0 : ctx: &RequestContext,
1138 0 : ) -> anyhow::Result<()> {
1139 : // Add it to the directory entry
1140 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1141 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1142 0 : if !dir.xids.insert(xid) {
1143 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1144 0 : }
1145 0 : self.pending_directory_entries
1146 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1147 0 : self.put(
1148 0 : TWOPHASEDIR_KEY,
1149 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1150 : );
1151 :
1152 0 : self.put(twophase_file_key(xid), Value::Image(img));
1153 0 : Ok(())
1154 0 : }
1155 :
1156 116 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1157 116 : self.put(CONTROLFILE_KEY, Value::Image(img));
1158 116 : Ok(())
1159 116 : }
1160 :
1161 130 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1162 130 : self.put(CHECKPOINT_KEY, Value::Image(img));
1163 130 : Ok(())
1164 130 : }
1165 :
1166 0 : pub async fn drop_dbdir(
1167 0 : &mut self,
1168 0 : spcnode: Oid,
1169 0 : dbnode: Oid,
1170 0 : ctx: &RequestContext,
1171 0 : ) -> anyhow::Result<()> {
1172 0 : let total_blocks = self
1173 0 : .tline
1174 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1175 0 : .await?;
1176 :
1177 : // Remove entry from dbdir
1178 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1179 0 : let mut dir = DbDirectory::des(&buf)?;
1180 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1181 0 : let buf = DbDirectory::ser(&dir)?;
1182 0 : self.pending_directory_entries
1183 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1184 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1185 : } else {
1186 0 : warn!(
1187 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1188 : spcnode, dbnode
1189 : );
1190 : }
1191 :
1192 : // Update logical database size.
1193 0 : self.pending_nblocks -= total_blocks as i64;
1194 0 :
1195 0 : // Delete all relations and metadata files for the spcnode/dnode
1196 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1197 0 : Ok(())
1198 0 : }
1199 :
1200 : /// Create a relation fork.
1201 : ///
1202 : /// 'nblocks' is the initial size.
1203 1920 : pub async fn put_rel_creation(
1204 1920 : &mut self,
1205 1920 : rel: RelTag,
1206 1920 : nblocks: BlockNumber,
1207 1920 : ctx: &RequestContext,
1208 1920 : ) -> Result<(), RelationError> {
1209 1920 : if rel.relnode == 0 {
1210 0 : return Err(RelationError::InvalidRelnode);
1211 1920 : }
1212 : // It's possible that this is the first rel for this db in this
1213 : // tablespace. Create the reldir entry for it if so.
1214 1920 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1215 1920 : .context("deserialize db")?;
1216 1920 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1217 1920 : let mut rel_dir =
1218 1920 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1219 : // Didn't exist. Update dbdir
1220 8 : e.insert(false);
1221 8 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1222 8 : self.pending_directory_entries
1223 8 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1224 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1225 8 :
1226 8 : // and create the RelDirectory
1227 8 : RelDirectory::default()
1228 : } else {
1229 : // reldir already exists, fetch it
1230 1912 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1231 1912 : .context("deserialize db")?
1232 : };
1233 :
1234 : // Add the new relation to the rel directory entry, and write it back
1235 1920 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1236 0 : return Err(RelationError::AlreadyExists);
1237 1920 : }
1238 1920 :
1239 1920 : self.pending_directory_entries
1240 1920 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1241 1920 :
1242 1920 : self.put(
1243 1920 : rel_dir_key,
1244 1920 : Value::Image(Bytes::from(
1245 1920 : RelDirectory::ser(&rel_dir).context("serialize")?,
1246 : )),
1247 : );
1248 :
1249 : // Put size
1250 1920 : let size_key = rel_size_to_key(rel);
1251 1920 : let buf = nblocks.to_le_bytes();
1252 1920 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1253 1920 :
1254 1920 : self.pending_nblocks += nblocks as i64;
1255 1920 :
1256 1920 : // Update relation size cache
1257 1920 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1258 1920 :
1259 1920 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1260 1920 : // caller.
1261 1920 : Ok(())
1262 1920 : }
1263 :
1264 : /// Truncate relation
1265 6012 : pub async fn put_rel_truncation(
1266 6012 : &mut self,
1267 6012 : rel: RelTag,
1268 6012 : nblocks: BlockNumber,
1269 6012 : ctx: &RequestContext,
1270 6012 : ) -> anyhow::Result<()> {
1271 6012 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1272 6012 : if self
1273 6012 : .tline
1274 6012 : .get_rel_exists(rel, Version::Modified(self), ctx)
1275 0 : .await?
1276 : {
1277 6012 : let size_key = rel_size_to_key(rel);
1278 : // Fetch the old size first
1279 6012 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1280 6012 :
1281 6012 : // Update the entry with the new size.
1282 6012 : let buf = nblocks.to_le_bytes();
1283 6012 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1284 6012 :
1285 6012 : // Update relation size cache
1286 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1287 6012 :
1288 6012 : // Update relation size cache
1289 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1290 6012 :
1291 6012 : // Update logical database size.
1292 6012 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1293 0 : }
1294 6012 : Ok(())
1295 6012 : }
1296 :
1297 : /// Extend relation
1298 : /// If new size is smaller, do nothing.
1299 276680 : pub async fn put_rel_extend(
1300 276680 : &mut self,
1301 276680 : rel: RelTag,
1302 276680 : nblocks: BlockNumber,
1303 276680 : ctx: &RequestContext,
1304 276680 : ) -> anyhow::Result<()> {
1305 276680 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1306 :
1307 : // Put size
1308 276680 : let size_key = rel_size_to_key(rel);
1309 276680 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1310 276680 :
1311 276680 : // only extend relation here. never decrease the size
1312 276680 : if nblocks > old_size {
1313 274788 : let buf = nblocks.to_le_bytes();
1314 274788 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1315 274788 :
1316 274788 : // Update relation size cache
1317 274788 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1318 274788 :
1319 274788 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1320 274788 : }
1321 276680 : Ok(())
1322 276680 : }
1323 :
1324 : /// Drop a relation.
1325 2 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1326 2 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1327 :
1328 : // Remove it from the directory entry
1329 2 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1330 2 : let buf = self.get(dir_key, ctx).await?;
1331 2 : let mut dir = RelDirectory::des(&buf)?;
1332 :
1333 2 : self.pending_directory_entries
1334 2 : .push((DirectoryKind::Rel, dir.rels.len()));
1335 2 :
1336 2 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1337 2 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1338 : } else {
1339 0 : warn!("dropped rel {} did not exist in rel directory", rel);
1340 : }
1341 :
1342 : // update logical size
1343 2 : let size_key = rel_size_to_key(rel);
1344 2 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1345 2 : self.pending_nblocks -= old_size as i64;
1346 2 :
1347 2 : // Remove enty from relation size cache
1348 2 : self.tline.remove_cached_rel_size(&rel);
1349 2 :
1350 2 : // Delete size entry, as well as all blocks
1351 2 : self.delete(rel_key_range(rel));
1352 2 :
1353 2 : Ok(())
1354 2 : }
1355 :
1356 6 : pub async fn put_slru_segment_creation(
1357 6 : &mut self,
1358 6 : kind: SlruKind,
1359 6 : segno: u32,
1360 6 : nblocks: BlockNumber,
1361 6 : ctx: &RequestContext,
1362 6 : ) -> anyhow::Result<()> {
1363 6 : // Add it to the directory entry
1364 6 : let dir_key = slru_dir_to_key(kind);
1365 6 : let buf = self.get(dir_key, ctx).await?;
1366 6 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1367 :
1368 6 : if !dir.segments.insert(segno) {
1369 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1370 6 : }
1371 6 : self.pending_directory_entries
1372 6 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1373 6 : self.put(
1374 6 : dir_key,
1375 6 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1376 : );
1377 :
1378 : // Put size
1379 6 : let size_key = slru_segment_size_to_key(kind, segno);
1380 6 : let buf = nblocks.to_le_bytes();
1381 6 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1382 6 :
1383 6 : // even if nblocks > 0, we don't insert any actual blocks here
1384 6 :
1385 6 : Ok(())
1386 6 : }
1387 :
1388 : /// Extend SLRU segment
1389 0 : pub fn put_slru_extend(
1390 0 : &mut self,
1391 0 : kind: SlruKind,
1392 0 : segno: u32,
1393 0 : nblocks: BlockNumber,
1394 0 : ) -> anyhow::Result<()> {
1395 0 : // Put size
1396 0 : let size_key = slru_segment_size_to_key(kind, segno);
1397 0 : let buf = nblocks.to_le_bytes();
1398 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1399 0 : Ok(())
1400 0 : }
1401 :
1402 : /// This method is used for marking truncated SLRU files
1403 0 : pub async fn drop_slru_segment(
1404 0 : &mut self,
1405 0 : kind: SlruKind,
1406 0 : segno: u32,
1407 0 : ctx: &RequestContext,
1408 0 : ) -> anyhow::Result<()> {
1409 0 : // Remove it from the directory entry
1410 0 : let dir_key = slru_dir_to_key(kind);
1411 0 : let buf = self.get(dir_key, ctx).await?;
1412 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1413 :
1414 0 : if !dir.segments.remove(&segno) {
1415 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1416 0 : }
1417 0 : self.pending_directory_entries
1418 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1419 0 : self.put(
1420 0 : dir_key,
1421 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1422 : );
1423 :
1424 : // Delete size entry, as well as all blocks
1425 0 : self.delete(slru_segment_key_range(kind, segno));
1426 0 :
1427 0 : Ok(())
1428 0 : }
1429 :
1430 : /// Drop a relmapper file (pg_filenode.map)
1431 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1432 0 : // TODO
1433 0 : Ok(())
1434 0 : }
1435 :
1436 : /// This method is used for marking truncated SLRU files
1437 0 : pub async fn drop_twophase_file(
1438 0 : &mut self,
1439 0 : xid: TransactionId,
1440 0 : ctx: &RequestContext,
1441 0 : ) -> anyhow::Result<()> {
1442 : // Remove it from the directory entry
1443 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1444 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1445 :
1446 0 : if !dir.xids.remove(&xid) {
1447 0 : warn!("twophase file for xid {} does not exist", xid);
1448 0 : }
1449 0 : self.pending_directory_entries
1450 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1451 0 : self.put(
1452 0 : TWOPHASEDIR_KEY,
1453 0 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1454 : );
1455 :
1456 : // Delete it
1457 0 : self.delete(twophase_key_range(xid));
1458 0 :
1459 0 : Ok(())
1460 0 : }
1461 :
1462 132 : pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
1463 132 : if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
1464 0 : return Ok(());
1465 132 : }
1466 132 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
1467 132 : files: HashMap::new(),
1468 132 : })?;
1469 132 : self.pending_directory_entries
1470 132 : .push((DirectoryKind::AuxFiles, 0));
1471 132 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
1472 132 : Ok(())
1473 132 : }
1474 :
1475 24 : pub async fn put_file(
1476 24 : &mut self,
1477 24 : path: &str,
1478 24 : content: &[u8],
1479 24 : ctx: &RequestContext,
1480 24 : ) -> anyhow::Result<()> {
1481 24 : let switch_policy = self.tline.get_switch_aux_file_policy();
1482 :
1483 24 : let policy = {
1484 24 : let current_policy = self.tline.last_aux_file_policy.load();
1485 24 : // Allowed switch path:
1486 24 : // * no aux files -> v1/v2/cross-validation
1487 24 : // * cross-validation->v2
1488 24 : if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
1489 10 : self.tline.last_aux_file_policy.store(Some(switch_policy));
1490 10 : self.tline
1491 10 : .remote_client
1492 10 : .schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?;
1493 10 : info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
1494 10 : switch_policy
1495 : } else {
1496 : // This branch handles non-valid migration path, and the case that switch_policy == current_policy.
1497 : // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
1498 14 : current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
1499 : }
1500 : };
1501 :
1502 24 : if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
1503 8 : let key = aux_file::encode_aux_file_key(path);
1504 : // retrieve the key from the engine
1505 8 : let old_val = match self.get(key, ctx).await {
1506 2 : Ok(val) => Some(val),
1507 6 : Err(PageReconstructError::MissingKey(_)) => None,
1508 0 : Err(e) => return Err(e.into()),
1509 : };
1510 8 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
1511 2 : aux_file::decode_file_value(old_val)?
1512 : } else {
1513 6 : Vec::new()
1514 : };
1515 8 : let mut other_files = Vec::with_capacity(files.len());
1516 8 : let mut modifying_file = None;
1517 10 : for file @ (p, content) in files {
1518 2 : if path == p {
1519 2 : assert!(
1520 2 : modifying_file.is_none(),
1521 0 : "duplicated entries found for {}",
1522 : path
1523 : );
1524 2 : modifying_file = Some(content);
1525 0 : } else {
1526 0 : other_files.push(file);
1527 0 : }
1528 : }
1529 8 : let mut new_files = other_files;
1530 8 : match (modifying_file, content.is_empty()) {
1531 2 : (Some(old_content), false) => {
1532 2 : self.tline
1533 2 : .aux_file_size_estimator
1534 2 : .on_update(old_content.len(), content.len());
1535 2 : new_files.push((path, content));
1536 2 : }
1537 0 : (Some(old_content), true) => {
1538 0 : self.tline
1539 0 : .aux_file_size_estimator
1540 0 : .on_remove(old_content.len());
1541 0 : // not adding the file key to the final `new_files` vec.
1542 0 : }
1543 6 : (None, false) => {
1544 6 : self.tline.aux_file_size_estimator.on_add(content.len());
1545 6 : new_files.push((path, content));
1546 6 : }
1547 0 : (None, true) => anyhow::bail!("removing non-existing aux file: {}", path),
1548 : }
1549 8 : let new_val = aux_file::encode_file_value(&new_files)?;
1550 8 : self.put(key, Value::Image(new_val.into()));
1551 16 : }
1552 :
1553 24 : if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
1554 18 : let file_path = path.to_string();
1555 18 : let content = if content.is_empty() {
1556 2 : None
1557 : } else {
1558 16 : Some(Bytes::copy_from_slice(content))
1559 : };
1560 :
1561 : let n_files;
1562 18 : let mut aux_files = self.tline.aux_files.lock().await;
1563 18 : if let Some(mut dir) = aux_files.dir.take() {
1564 : // We already updated aux files in `self`: emit a delta and update our latest value.
1565 10 : dir.upsert(file_path.clone(), content.clone());
1566 10 : n_files = dir.files.len();
1567 10 : if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
1568 0 : self.put(
1569 0 : AUX_FILES_KEY,
1570 0 : Value::Image(Bytes::from(
1571 0 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1572 : )),
1573 : );
1574 0 : aux_files.n_deltas = 0;
1575 10 : } else {
1576 10 : self.put(
1577 10 : AUX_FILES_KEY,
1578 10 : Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
1579 10 : );
1580 10 : aux_files.n_deltas += 1;
1581 10 : }
1582 10 : aux_files.dir = Some(dir);
1583 : } else {
1584 : // Check if the AUX_FILES_KEY is initialized
1585 8 : match self.get(AUX_FILES_KEY, ctx).await {
1586 4 : Ok(dir_bytes) => {
1587 4 : let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
1588 : // Key is already set, we may append a delta
1589 4 : self.put(
1590 4 : AUX_FILES_KEY,
1591 4 : Value::WalRecord(NeonWalRecord::AuxFile {
1592 4 : file_path: file_path.clone(),
1593 4 : content: content.clone(),
1594 4 : }),
1595 4 : );
1596 4 : dir.upsert(file_path, content);
1597 4 : n_files = dir.files.len();
1598 4 : aux_files.dir = Some(dir);
1599 : }
1600 : Err(
1601 0 : e @ (PageReconstructError::AncestorStopping(_)
1602 : | PageReconstructError::Cancelled
1603 : | PageReconstructError::AncestorLsnTimeout(_)),
1604 : ) => {
1605 : // Important that we do not interpret a shutdown error as "not found" and thereby
1606 : // reset the map.
1607 0 : return Err(e.into());
1608 : }
1609 : // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
1610 : // the original code assumes all other errors are missing keys. Therefore, we keep the code path
1611 : // the same for now, though in theory, we should only match the `MissingKey` variant.
1612 : Err(
1613 : PageReconstructError::Other(_)
1614 : | PageReconstructError::WalRedo(_)
1615 : | PageReconstructError::MissingKey { .. },
1616 : ) => {
1617 : // Key is missing, we must insert an image as the basis for subsequent deltas.
1618 :
1619 4 : let mut dir = AuxFilesDirectory {
1620 4 : files: HashMap::new(),
1621 4 : };
1622 4 : dir.upsert(file_path, content);
1623 4 : self.put(
1624 4 : AUX_FILES_KEY,
1625 4 : Value::Image(Bytes::from(
1626 4 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1627 : )),
1628 : );
1629 4 : n_files = 1;
1630 4 : aux_files.dir = Some(dir);
1631 : }
1632 : }
1633 : }
1634 :
1635 18 : self.pending_directory_entries
1636 18 : .push((DirectoryKind::AuxFiles, n_files));
1637 6 : }
1638 :
1639 24 : Ok(())
1640 24 : }
1641 :
1642 : ///
1643 : /// Flush changes accumulated so far to the underlying repository.
1644 : ///
1645 : /// Usually, changes made in DatadirModification are atomic, but this allows
1646 : /// you to flush them to the underlying repository before the final `commit`.
1647 : /// That allows to free up the memory used to hold the pending changes.
1648 : ///
1649 : /// Currently only used during bulk import of a data directory. In that
1650 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1651 : /// whole import fails and the timeline will be deleted anyway.
1652 : /// (Or to be precise, it will be left behind for debugging purposes and
1653 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1654 : ///
1655 : /// Note: A consequence of flushing the pending operations is that they
1656 : /// won't be visible to subsequent operations until `commit`. The function
1657 : /// retains all the metadata, but data pages are flushed. That's again OK
1658 : /// for bulk import, where you are just loading data pages and won't try to
1659 : /// modify the same pages twice.
1660 1930 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1661 1930 : // Unless we have accumulated a decent amount of changes, it's not worth it
1662 1930 : // to scan through the pending_updates list.
1663 1930 : let pending_nblocks = self.pending_nblocks;
1664 1930 : if pending_nblocks < 10000 {
1665 1930 : return Ok(());
1666 0 : }
1667 :
1668 0 : let mut writer = self.tline.writer().await;
1669 :
1670 : // Flush relation and SLRU data blocks, keep metadata.
1671 0 : let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
1672 0 : for (key, values) in self.pending_updates.drain() {
1673 0 : for (lsn, value) in values {
1674 0 : if is_rel_block_key(&key) || is_slru_block_key(key) {
1675 : // This bails out on first error without modifying pending_updates.
1676 : // That's Ok, cf this function's doc comment.
1677 0 : writer.put(key, lsn, &value, ctx).await?;
1678 0 : } else {
1679 0 : retained_pending_updates
1680 0 : .entry(key)
1681 0 : .or_default()
1682 0 : .push((lsn, value));
1683 0 : }
1684 : }
1685 : }
1686 :
1687 0 : self.pending_updates = retained_pending_updates;
1688 0 :
1689 0 : if pending_nblocks != 0 {
1690 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1691 0 : self.pending_nblocks = 0;
1692 0 : }
1693 :
1694 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1695 0 : writer.update_directory_entries_count(kind, count as u64);
1696 0 : }
1697 :
1698 0 : Ok(())
1699 1930 : }
1700 :
1701 : ///
1702 : /// Finish this atomic update, writing all the updated keys to the
1703 : /// underlying timeline.
1704 : /// All the modifications in this atomic update are stamped by the specified LSN.
1705 : ///
1706 743010 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1707 743010 : let mut writer = self.tline.writer().await;
1708 :
1709 743010 : let timer = WAL_INGEST.time_spent_on_ingest.start_timer();
1710 743010 :
1711 743010 : let pending_nblocks = self.pending_nblocks;
1712 743010 : self.pending_nblocks = 0;
1713 743010 :
1714 743010 : if !self.pending_updates.is_empty() {
1715 : // The put_batch call below expects expects the inputs to be sorted by Lsn,
1716 : // so we do that first.
1717 413994 : let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
1718 413994 : self.pending_updates
1719 413994 : .drain()
1720 700034 : .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
1721 413994 : .kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
1722 413994 : VecMapOrdering::GreaterOrEqual,
1723 413994 : );
1724 413994 :
1725 413994 : writer.put_batch(lsn_ordered_batch, ctx).await?;
1726 329016 : }
1727 :
1728 743010 : if !self.pending_deletions.is_empty() {
1729 2 : writer.delete_batch(&self.pending_deletions, ctx).await?;
1730 2 : self.pending_deletions.clear();
1731 743008 : }
1732 :
1733 743010 : self.pending_lsns.push(self.lsn);
1734 888868 : for pending_lsn in self.pending_lsns.drain(..) {
1735 888868 : // Ideally, we should be able to call writer.finish_write() only once
1736 888868 : // with the highest LSN. However, the last_record_lsn variable in the
1737 888868 : // timeline keeps track of the latest LSN and the immediate previous LSN
1738 888868 : // so we need to record every LSN to not leave a gap between them.
1739 888868 : writer.finish_write(pending_lsn);
1740 888868 : }
1741 :
1742 743010 : if pending_nblocks != 0 {
1743 270570 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1744 472440 : }
1745 :
1746 743010 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
1747 2674 : writer.update_directory_entries_count(kind, count as u64);
1748 2674 : }
1749 :
1750 743010 : timer.observe_duration();
1751 743010 :
1752 743010 : Ok(())
1753 743010 : }
1754 :
1755 291704 : pub(crate) fn len(&self) -> usize {
1756 291704 : self.pending_updates.len() + self.pending_deletions.len()
1757 291704 : }
1758 :
1759 : // Internal helper functions to batch the modifications
1760 :
1761 286576 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1762 : // Have we already updated the same key? Read the latest pending updated
1763 : // version in that case.
1764 : //
1765 : // Note: we don't check pending_deletions. It is an error to request a
1766 : // value that has been removed, deletion only avoids leaking storage.
1767 286576 : if let Some(values) = self.pending_updates.get(&key) {
1768 15928 : if let Some((_, value)) = values.last() {
1769 15928 : return if let Value::Image(img) = value {
1770 15928 : Ok(img.clone())
1771 : } else {
1772 : // Currently, we never need to read back a WAL record that we
1773 : // inserted in the same "transaction". All the metadata updates
1774 : // work directly with Images, and we never need to read actual
1775 : // data pages. We could handle this if we had to, by calling
1776 : // the walredo manager, but let's keep it simple for now.
1777 0 : Err(PageReconstructError::from(anyhow::anyhow!(
1778 0 : "unexpected pending WAL record"
1779 0 : )))
1780 : };
1781 0 : }
1782 270648 : }
1783 270648 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1784 270648 : self.tline.get(key, lsn, ctx).await
1785 286576 : }
1786 :
1787 712194 : fn put(&mut self, key: Key, val: Value) {
1788 712194 : let values = self.pending_updates.entry(key).or_default();
1789 : // Replace the previous value if it exists at the same lsn
1790 712194 : if let Some((last_lsn, last_value)) = values.last_mut() {
1791 12166 : if *last_lsn == self.lsn {
1792 12160 : *last_value = val;
1793 12160 : return;
1794 6 : }
1795 700028 : }
1796 700034 : values.push((self.lsn, val));
1797 712194 : }
1798 :
1799 2 : fn delete(&mut self, key_range: Range<Key>) {
1800 2 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1801 2 : self.pending_deletions.push((key_range, self.lsn));
1802 2 : }
1803 : }
1804 :
1805 : /// This struct facilitates accessing either a committed key from the timeline at a
1806 : /// specific LSN, or the latest uncommitted key from a pending modification.
1807 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
1808 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
1809 : /// need to look up the keys in the modification first before looking them up in the
1810 : /// timeline to not miss the latest updates.
1811 : #[derive(Clone, Copy)]
1812 : pub enum Version<'a> {
1813 : Lsn(Lsn),
1814 : Modified(&'a DatadirModification<'a>),
1815 : }
1816 :
1817 : impl<'a> Version<'a> {
1818 23542 : async fn get(
1819 23542 : &self,
1820 23542 : timeline: &Timeline,
1821 23542 : key: Key,
1822 23542 : ctx: &RequestContext,
1823 23542 : ) -> Result<Bytes, PageReconstructError> {
1824 23542 : match self {
1825 23532 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
1826 10 : Version::Modified(modification) => modification.get(key, ctx).await,
1827 : }
1828 23542 : }
1829 :
1830 35620 : fn get_lsn(&self) -> Lsn {
1831 35620 : match self {
1832 29574 : Version::Lsn(lsn) => *lsn,
1833 6046 : Version::Modified(modification) => modification.lsn,
1834 : }
1835 35620 : }
1836 : }
1837 :
1838 : //--- Metadata structs stored in key-value pairs in the repository.
1839 :
1840 2168 : #[derive(Debug, Serialize, Deserialize)]
1841 : struct DbDirectory {
1842 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1843 : dbdirs: HashMap<(Oid, Oid), bool>,
1844 : }
1845 :
1846 232 : #[derive(Debug, Serialize, Deserialize)]
1847 : struct TwoPhaseDirectory {
1848 : xids: HashSet<TransactionId>,
1849 : }
1850 :
1851 1932 : #[derive(Debug, Serialize, Deserialize, Default)]
1852 : struct RelDirectory {
1853 : // Set of relations that exist. (relfilenode, forknum)
1854 : //
1855 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1856 : // key-value pairs, if you have a lot of relations
1857 : rels: HashSet<(Oid, u8)>,
1858 : }
1859 :
1860 42 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
1861 : pub(crate) struct AuxFilesDirectory {
1862 : pub(crate) files: HashMap<String, Bytes>,
1863 : }
1864 :
1865 : impl AuxFilesDirectory {
1866 42 : pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
1867 42 : if let Some(value) = value {
1868 36 : self.files.insert(key, value);
1869 36 : } else {
1870 6 : self.files.remove(&key);
1871 6 : }
1872 42 : }
1873 : }
1874 :
1875 0 : #[derive(Debug, Serialize, Deserialize)]
1876 : struct RelSizeEntry {
1877 : nblocks: u32,
1878 : }
1879 :
1880 702 : #[derive(Debug, Serialize, Deserialize, Default)]
1881 : struct SlruSegmentDirectory {
1882 : // Set of SLRU segments that exist.
1883 : segments: HashSet<u32>,
1884 : }
1885 :
1886 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
1887 : #[repr(u8)]
1888 : pub(crate) enum DirectoryKind {
1889 : Db,
1890 : TwoPhase,
1891 : Rel,
1892 : AuxFiles,
1893 : SlruSegment(SlruKind),
1894 : }
1895 :
1896 : impl DirectoryKind {
1897 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
1898 5348 : pub(crate) fn offset(&self) -> usize {
1899 5348 : self.into_usize()
1900 5348 : }
1901 : }
1902 :
1903 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1904 :
1905 : #[allow(clippy::bool_assert_comparison)]
1906 : #[cfg(test)]
1907 : mod tests {
1908 : use hex_literal::hex;
1909 : use utils::id::TimelineId;
1910 :
1911 : use super::*;
1912 :
1913 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
1914 :
1915 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
1916 : #[tokio::test]
1917 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
1918 2 : let name = "aux_files_round_trip";
1919 2 : let harness = TenantHarness::create(name)?;
1920 2 :
1921 2 : pub const TIMELINE_ID: TimelineId =
1922 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
1923 2 :
1924 8 : let (tenant, ctx) = harness.load().await;
1925 2 : let tline = tenant
1926 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
1927 2 : .await?;
1928 2 : let tline = tline.raw_timeline().unwrap();
1929 2 :
1930 2 : // First modification: insert two keys
1931 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
1932 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
1933 2 : modification.set_lsn(Lsn(0x1008))?;
1934 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
1935 2 : modification.commit(&ctx).await?;
1936 2 : let expect_1008 = HashMap::from([
1937 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
1938 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
1939 2 : ]);
1940 2 :
1941 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
1942 2 : assert_eq!(readback, expect_1008);
1943 2 :
1944 2 : // Second modification: update one key, remove the other
1945 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
1946 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
1947 2 : modification.set_lsn(Lsn(0x2008))?;
1948 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
1949 2 : modification.commit(&ctx).await?;
1950 2 : let expect_2008 =
1951 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
1952 2 :
1953 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
1954 2 : assert_eq!(readback, expect_2008);
1955 2 :
1956 2 : // Reading back in time works
1957 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
1958 2 : assert_eq!(readback, expect_1008);
1959 2 :
1960 2 : Ok(())
1961 2 : }
1962 :
1963 : /*
1964 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1965 : let incremental = timeline.get_current_logical_size();
1966 : let non_incremental = timeline
1967 : .get_current_logical_size_non_incremental(lsn)
1968 : .unwrap();
1969 : assert_eq!(incremental, non_incremental);
1970 : }
1971 : */
1972 :
1973 : /*
1974 : ///
1975 : /// Test list_rels() function, with branches and dropped relations
1976 : ///
1977 : #[test]
1978 : fn test_list_rels_drop() -> Result<()> {
1979 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1980 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1981 : const TESTDB: u32 = 111;
1982 :
1983 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1984 : // after branching fails below
1985 : let mut writer = tline.begin_record(Lsn(0x10));
1986 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1987 : writer.finish()?;
1988 :
1989 : // Create a relation on the timeline
1990 : let mut writer = tline.begin_record(Lsn(0x20));
1991 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1992 : writer.finish()?;
1993 :
1994 : let writer = tline.begin_record(Lsn(0x00));
1995 : writer.finish()?;
1996 :
1997 : // Check that list_rels() lists it after LSN 2, but no before it
1998 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1999 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2000 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2001 :
2002 : // Create a branch, check that the relation is visible there
2003 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2004 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2005 : Some(timeline) => timeline,
2006 : None => panic!("Should have a local timeline"),
2007 : };
2008 : let newtline = DatadirTimelineImpl::new(newtline);
2009 : assert!(newtline
2010 : .list_rels(0, TESTDB, Lsn(0x30))?
2011 : .contains(&TESTREL_A));
2012 :
2013 : // Drop it on the branch
2014 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2015 : new_writer.drop_relation(TESTREL_A)?;
2016 : new_writer.finish()?;
2017 :
2018 : // Check that it's no longer listed on the branch after the point where it was dropped
2019 : assert!(newtline
2020 : .list_rels(0, TESTDB, Lsn(0x30))?
2021 : .contains(&TESTREL_A));
2022 : assert!(!newtline
2023 : .list_rels(0, TESTDB, Lsn(0x40))?
2024 : .contains(&TESTREL_A));
2025 :
2026 : // Run checkpoint and garbage collection and check that it's still not visible
2027 : newtline.checkpoint(CheckpointConfig::Forced)?;
2028 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2029 :
2030 : assert!(!newtline
2031 : .list_rels(0, TESTDB, Lsn(0x40))?
2032 : .contains(&TESTREL_A));
2033 :
2034 : Ok(())
2035 : }
2036 : */
2037 :
2038 : /*
2039 : #[test]
2040 : fn test_read_beyond_eof() -> Result<()> {
2041 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2042 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2043 :
2044 : make_some_layers(&tline, Lsn(0x20))?;
2045 : let mut writer = tline.begin_record(Lsn(0x60));
2046 : walingest.put_rel_page_image(
2047 : &mut writer,
2048 : TESTREL_A,
2049 : 0,
2050 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2051 : )?;
2052 : writer.finish()?;
2053 :
2054 : // Test read before rel creation. Should error out.
2055 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2056 :
2057 : // Read block beyond end of relation at different points in time.
2058 : // These reads should fall into different delta, image, and in-memory layers.
2059 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2060 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2061 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2062 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2063 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2064 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2065 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2066 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2067 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2068 :
2069 : // Test on an in-memory layer with no preceding layer
2070 : let mut writer = tline.begin_record(Lsn(0x70));
2071 : walingest.put_rel_page_image(
2072 : &mut writer,
2073 : TESTREL_B,
2074 : 0,
2075 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2076 : )?;
2077 : writer.finish()?;
2078 :
2079 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2080 :
2081 : Ok(())
2082 : }
2083 : */
2084 : }
|