Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::aux_file;
11 : use crate::context::RequestContext;
12 : use crate::keyspace::{KeySpace, KeySpaceAccum};
13 : use crate::span::{
14 : debug_assert_current_span_has_tenant_and_timeline_id,
15 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
16 : };
17 : use crate::tenant::timeline::GetVectoredError;
18 : use anyhow::{ensure, Context};
19 : use bytes::{Buf, Bytes, BytesMut};
20 : use enum_map::Enum;
21 : use itertools::Itertools;
22 : use pageserver_api::key::Key;
23 : use pageserver_api::key::{
24 : dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
25 : relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
26 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
27 : CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
28 : };
29 : use pageserver_api::keyspace::SparseKeySpace;
30 : use pageserver_api::record::NeonWalRecord;
31 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
32 : use pageserver_api::shard::ShardIdentity;
33 : use pageserver_api::value::Value;
34 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
35 : use postgres_ffi::BLCKSZ;
36 : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
37 : use serde::{Deserialize, Serialize};
38 : use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
39 : use std::ops::ControlFlow;
40 : use std::ops::Range;
41 : use strum::IntoEnumIterator;
42 : use tokio_util::sync::CancellationToken;
43 : use tracing::{debug, trace, warn};
44 : use utils::bin_ser::DeserializeError;
45 : use utils::pausable_failpoint;
46 : use utils::{bin_ser::BeSer, lsn::Lsn};
47 : use wal_decoder::serialized_batch::SerializedValueBatch;
48 :
49 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
50 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
51 :
52 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
53 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
54 :
55 : #[derive(Debug)]
56 : pub enum LsnForTimestamp {
57 : /// Found commits both before and after the given timestamp
58 : Present(Lsn),
59 :
60 : /// Found no commits after the given timestamp, this means
61 : /// that the newest data in the branch is older than the given
62 : /// timestamp.
63 : ///
64 : /// All commits <= LSN happened before the given timestamp
65 : Future(Lsn),
66 :
67 : /// The queried timestamp is past our horizon we look back at (PITR)
68 : ///
69 : /// All commits > LSN happened after the given timestamp,
70 : /// but any commits < LSN might have happened before or after
71 : /// the given timestamp. We don't know because no data before
72 : /// the given lsn is available.
73 : Past(Lsn),
74 :
75 : /// We have found no commit with a timestamp,
76 : /// so we can't return anything meaningful.
77 : ///
78 : /// The associated LSN is the lower bound value we can safely
79 : /// create branches on, but no statement is made if it is
80 : /// older or newer than the timestamp.
81 : ///
82 : /// This variant can e.g. be returned right after a
83 : /// cluster import.
84 : NoData(Lsn),
85 : }
86 :
87 0 : #[derive(Debug, thiserror::Error)]
88 : pub(crate) enum CalculateLogicalSizeError {
89 : #[error("cancelled")]
90 : Cancelled,
91 :
92 : /// Something went wrong while reading the metadata we use to calculate logical size
93 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
94 : /// in the `From` implementation for this variant.
95 : #[error(transparent)]
96 : PageRead(PageReconstructError),
97 :
98 : /// Something went wrong deserializing metadata that we read to calculate logical size
99 : #[error("decode error: {0}")]
100 : Decode(#[from] DeserializeError),
101 : }
102 :
103 0 : #[derive(Debug, thiserror::Error)]
104 : pub(crate) enum CollectKeySpaceError {
105 : #[error(transparent)]
106 : Decode(#[from] DeserializeError),
107 : #[error(transparent)]
108 : PageRead(PageReconstructError),
109 : #[error("cancelled")]
110 : Cancelled,
111 : }
112 :
113 : impl From<PageReconstructError> for CollectKeySpaceError {
114 0 : fn from(err: PageReconstructError) -> Self {
115 0 : match err {
116 0 : PageReconstructError::Cancelled => Self::Cancelled,
117 0 : err => Self::PageRead(err),
118 : }
119 0 : }
120 : }
121 :
122 : impl From<PageReconstructError> for CalculateLogicalSizeError {
123 0 : fn from(pre: PageReconstructError) -> Self {
124 0 : match pre {
125 0 : PageReconstructError::Cancelled => Self::Cancelled,
126 0 : _ => Self::PageRead(pre),
127 : }
128 0 : }
129 : }
130 :
131 0 : #[derive(Debug, thiserror::Error)]
132 : pub enum RelationError {
133 : #[error("Relation Already Exists")]
134 : AlreadyExists,
135 : #[error("invalid relnode")]
136 : InvalidRelnode,
137 : #[error(transparent)]
138 : Other(#[from] anyhow::Error),
139 : }
140 :
141 : ///
142 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
143 : /// and other special kinds of files, in a versioned key-value store. The
144 : /// Timeline struct provides the key-value store.
145 : ///
146 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
147 : /// implementation, and might be moved into a separate struct later.
148 : impl Timeline {
149 : /// Start ingesting a WAL record, or other atomic modification of
150 : /// the timeline.
151 : ///
152 : /// This provides a transaction-like interface to perform a bunch
153 : /// of modifications atomically.
154 : ///
155 : /// To ingest a WAL record, call begin_modification(lsn) to get a
156 : /// DatadirModification object. Use the functions in the object to
157 : /// modify the repository state, updating all the pages and metadata
158 : /// that the WAL record affects. When you're done, call commit() to
159 : /// commit the changes.
160 : ///
161 : /// Lsn stored in modification is advanced by `ingest_record` and
162 : /// is used by `commit()` to update `last_record_lsn`.
163 : ///
164 : /// Calling commit() will flush all the changes and reset the state,
165 : /// so the `DatadirModification` struct can be reused to perform the next modification.
166 : ///
167 : /// Note that any pending modifications you make through the
168 : /// modification object won't be visible to calls to the 'get' and list
169 : /// functions of the timeline until you finish! And if you update the
170 : /// same page twice, the last update wins.
171 : ///
172 268380 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
173 268380 : where
174 268380 : Self: Sized,
175 268380 : {
176 268380 : DatadirModification {
177 268380 : tline: self,
178 268380 : pending_lsns: Vec::new(),
179 268380 : pending_metadata_pages: HashMap::new(),
180 268380 : pending_data_batch: None,
181 268380 : pending_deletions: Vec::new(),
182 268380 : pending_nblocks: 0,
183 268380 : pending_directory_entries: Vec::new(),
184 268380 : pending_metadata_bytes: 0,
185 268380 : lsn,
186 268380 : }
187 268380 : }
188 :
189 : //------------------------------------------------------------------------------
190 : // Public GET functions
191 : //------------------------------------------------------------------------------
192 :
193 : /// Look up given page version.
194 18384 : pub(crate) async fn get_rel_page_at_lsn(
195 18384 : &self,
196 18384 : tag: RelTag,
197 18384 : blknum: BlockNumber,
198 18384 : version: Version<'_>,
199 18384 : ctx: &RequestContext,
200 18384 : ) -> Result<Bytes, PageReconstructError> {
201 18384 : match version {
202 18384 : Version::Lsn(effective_lsn) => {
203 18384 : let pages = smallvec::smallvec![(tag, blknum)];
204 18384 : let res = self
205 18384 : .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
206 7028 : .await;
207 18384 : assert_eq!(res.len(), 1);
208 18384 : res.into_iter().next().unwrap()
209 : }
210 0 : Version::Modified(modification) => {
211 0 : if tag.relnode == 0 {
212 0 : return Err(PageReconstructError::Other(
213 0 : RelationError::InvalidRelnode.into(),
214 0 : ));
215 0 : }
216 :
217 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
218 0 : if blknum >= nblocks {
219 0 : debug!(
220 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
221 0 : tag,
222 0 : blknum,
223 0 : version.get_lsn(),
224 : nblocks
225 : );
226 0 : return Ok(ZERO_PAGE.clone());
227 0 : }
228 0 :
229 0 : let key = rel_block_to_key(tag, blknum);
230 0 : modification.get(key, ctx).await
231 : }
232 : }
233 18384 : }
234 :
235 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
236 : ///
237 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
238 18384 : pub(crate) async fn get_rel_page_at_lsn_batched(
239 18384 : &self,
240 18384 : pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
241 18384 : effective_lsn: Lsn,
242 18384 : ctx: &RequestContext,
243 18384 : ) -> Vec<Result<Bytes, PageReconstructError>> {
244 18384 : debug_assert_current_span_has_tenant_and_timeline_id();
245 18384 :
246 18384 : let mut slots_filled = 0;
247 18384 : let page_count = pages.len();
248 18384 :
249 18384 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
250 18384 : let mut result = Vec::with_capacity(pages.len());
251 18384 : let result_slots = result.spare_capacity_mut();
252 18384 :
253 18384 : let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
254 18384 : for (response_slot_idx, (tag, blknum)) in pages.into_iter().enumerate() {
255 18384 : if tag.relnode == 0 {
256 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
257 0 : RelationError::InvalidRelnode.into(),
258 0 : )));
259 0 :
260 0 : slots_filled += 1;
261 0 : continue;
262 18384 : }
263 :
264 18384 : let nblocks = match self
265 18384 : .get_rel_size(tag, Version::Lsn(effective_lsn), ctx)
266 53 : .await
267 : {
268 18384 : Ok(nblocks) => nblocks,
269 0 : Err(err) => {
270 0 : result_slots[response_slot_idx].write(Err(err));
271 0 : slots_filled += 1;
272 0 : continue;
273 : }
274 : };
275 :
276 18384 : if blknum >= nblocks {
277 0 : debug!(
278 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
279 : tag, blknum, effective_lsn, nblocks
280 : );
281 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
282 0 : slots_filled += 1;
283 0 : continue;
284 18384 : }
285 18384 :
286 18384 : let key = rel_block_to_key(tag, blknum);
287 18384 :
288 18384 : let key_slots = keys_slots.entry(key).or_default();
289 18384 : key_slots.push(response_slot_idx);
290 : }
291 :
292 18384 : let keyspace = {
293 : // add_key requires monotonicity
294 18384 : let mut acc = KeySpaceAccum::new();
295 18384 : for key in keys_slots
296 18384 : .keys()
297 18384 : // in fact it requires strong monotonicity
298 18384 : .dedup()
299 18384 : {
300 18384 : acc.add_key(*key);
301 18384 : }
302 18384 : acc.to_keyspace()
303 18384 : };
304 18384 :
305 18384 : match self.get_vectored(keyspace, effective_lsn, ctx).await {
306 18384 : Ok(results) => {
307 36768 : for (key, res) in results {
308 18384 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
309 18384 : let first_slot = key_slots.next().unwrap();
310 :
311 18384 : for slot in key_slots {
312 0 : let clone = match &res {
313 0 : Ok(buf) => Ok(buf.clone()),
314 0 : Err(err) => Err(match err {
315 : PageReconstructError::Cancelled => {
316 0 : PageReconstructError::Cancelled
317 : }
318 :
319 0 : x @ PageReconstructError::Other(_) |
320 0 : x @ PageReconstructError::AncestorLsnTimeout(_) |
321 0 : x @ PageReconstructError::WalRedo(_) |
322 0 : x @ PageReconstructError::MissingKey(_) => {
323 0 : PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
324 : },
325 : }),
326 : };
327 :
328 0 : result_slots[slot].write(clone);
329 0 : slots_filled += 1;
330 : }
331 :
332 18384 : result_slots[first_slot].write(res);
333 18384 : slots_filled += 1;
334 : }
335 : }
336 0 : Err(err) => {
337 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
338 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
339 0 : for slot in keys_slots.values().flatten() {
340 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
341 : // but without taking ownership of the GetVectoredError
342 0 : let err = match &err {
343 : GetVectoredError::Cancelled => {
344 0 : Err(PageReconstructError::Cancelled)
345 : }
346 : // TODO: restructure get_vectored API to make this error per-key
347 0 : GetVectoredError::MissingKey(err) => {
348 0 : Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}")))
349 : }
350 : // TODO: restructure get_vectored API to make this error per-key
351 0 : GetVectoredError::GetReadyAncestorError(err) => {
352 0 : Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}")))
353 : }
354 : // TODO: restructure get_vectored API to make this error per-key
355 0 : GetVectoredError::Other(err) => {
356 0 : Err(PageReconstructError::Other(
357 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
358 0 : ))
359 : }
360 : // TODO: we can prevent this error class by moving this check into the type system
361 0 : GetVectoredError::InvalidLsn(e) => {
362 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
363 : }
364 : // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
365 : // TODO: we can prevent this error class by moving this check into the type system
366 0 : GetVectoredError::Oversized(err) => {
367 0 : Err(anyhow::anyhow!(
368 0 : "batching oversized: {err:?}"
369 0 : )
370 0 : .into())
371 : }
372 : };
373 :
374 0 : result_slots[*slot].write(err);
375 : }
376 :
377 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
378 0 : }
379 : };
380 :
381 18384 : assert_eq!(slots_filled, page_count);
382 : // SAFETY:
383 : // 1. `result` and any of its uninint members are not read from until this point
384 : // 2. The length below is tracked at run-time and matches the number of requested pages.
385 18384 : unsafe {
386 18384 : result.set_len(page_count);
387 18384 : }
388 18384 :
389 18384 : result
390 18384 : }
391 :
392 : // Get size of a database in blocks
393 0 : pub(crate) async fn get_db_size(
394 0 : &self,
395 0 : spcnode: Oid,
396 0 : dbnode: Oid,
397 0 : version: Version<'_>,
398 0 : ctx: &RequestContext,
399 0 : ) -> Result<usize, PageReconstructError> {
400 0 : let mut total_blocks = 0;
401 :
402 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
403 :
404 0 : for rel in rels {
405 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
406 0 : total_blocks += n_blocks as usize;
407 : }
408 0 : Ok(total_blocks)
409 0 : }
410 :
411 : /// Get size of a relation file
412 24434 : pub(crate) async fn get_rel_size(
413 24434 : &self,
414 24434 : tag: RelTag,
415 24434 : version: Version<'_>,
416 24434 : ctx: &RequestContext,
417 24434 : ) -> Result<BlockNumber, PageReconstructError> {
418 24434 : if tag.relnode == 0 {
419 0 : return Err(PageReconstructError::Other(
420 0 : RelationError::InvalidRelnode.into(),
421 0 : ));
422 24434 : }
423 :
424 24434 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
425 19294 : return Ok(nblocks);
426 5140 : }
427 5140 :
428 5140 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
429 0 : && !self.get_rel_exists(tag, version, ctx).await?
430 : {
431 : // FIXME: Postgres sometimes calls smgrcreate() to create
432 : // FSM, and smgrnblocks() on it immediately afterwards,
433 : // without extending it. Tolerate that by claiming that
434 : // any non-existent FSM fork has size 0.
435 0 : return Ok(0);
436 5140 : }
437 5140 :
438 5140 : let key = rel_size_to_key(tag);
439 5140 : let mut buf = version.get(self, key, ctx).await?;
440 5136 : let nblocks = buf.get_u32_le();
441 5136 :
442 5136 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
443 5136 :
444 5136 : Ok(nblocks)
445 24434 : }
446 :
447 : /// Does relation exist?
448 6050 : pub(crate) async fn get_rel_exists(
449 6050 : &self,
450 6050 : tag: RelTag,
451 6050 : version: Version<'_>,
452 6050 : ctx: &RequestContext,
453 6050 : ) -> Result<bool, PageReconstructError> {
454 6050 : if tag.relnode == 0 {
455 0 : return Err(PageReconstructError::Other(
456 0 : RelationError::InvalidRelnode.into(),
457 0 : ));
458 6050 : }
459 :
460 : // first try to lookup relation in cache
461 6050 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
462 6032 : return Ok(true);
463 18 : }
464 : // then check if the database was already initialized.
465 : // get_rel_exists can be called before dbdir is created.
466 18 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
467 18 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
468 18 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
469 0 : return Ok(false);
470 18 : }
471 18 : // fetch directory listing
472 18 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
473 18 : let buf = version.get(self, key, ctx).await?;
474 :
475 18 : let dir = RelDirectory::des(&buf)?;
476 18 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
477 6050 : }
478 :
479 : /// Get a list of all existing relations in given tablespace and database.
480 : ///
481 : /// # Cancel-Safety
482 : ///
483 : /// This method is cancellation-safe.
484 0 : pub(crate) async fn list_rels(
485 0 : &self,
486 0 : spcnode: Oid,
487 0 : dbnode: Oid,
488 0 : version: Version<'_>,
489 0 : ctx: &RequestContext,
490 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
491 0 : // fetch directory listing
492 0 : let key = rel_dir_to_key(spcnode, dbnode);
493 0 : let buf = version.get(self, key, ctx).await?;
494 :
495 0 : let dir = RelDirectory::des(&buf)?;
496 0 : let rels: HashSet<RelTag> =
497 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
498 0 : spcnode,
499 0 : dbnode,
500 0 : relnode: *relnode,
501 0 : forknum: *forknum,
502 0 : }));
503 0 :
504 0 : Ok(rels)
505 0 : }
506 :
507 : /// Get the whole SLRU segment
508 0 : pub(crate) async fn get_slru_segment(
509 0 : &self,
510 0 : kind: SlruKind,
511 0 : segno: u32,
512 0 : lsn: Lsn,
513 0 : ctx: &RequestContext,
514 0 : ) -> Result<Bytes, PageReconstructError> {
515 0 : let n_blocks = self
516 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
517 0 : .await?;
518 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
519 0 : for blkno in 0..n_blocks {
520 0 : let block = self
521 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
522 0 : .await?;
523 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
524 : }
525 0 : Ok(segment.freeze())
526 0 : }
527 :
528 : /// Look up given SLRU page version.
529 0 : pub(crate) async fn get_slru_page_at_lsn(
530 0 : &self,
531 0 : kind: SlruKind,
532 0 : segno: u32,
533 0 : blknum: BlockNumber,
534 0 : lsn: Lsn,
535 0 : ctx: &RequestContext,
536 0 : ) -> Result<Bytes, PageReconstructError> {
537 0 : let key = slru_block_to_key(kind, segno, blknum);
538 0 : self.get(key, lsn, ctx).await
539 0 : }
540 :
541 : /// Get size of an SLRU segment
542 0 : pub(crate) async fn get_slru_segment_size(
543 0 : &self,
544 0 : kind: SlruKind,
545 0 : segno: u32,
546 0 : version: Version<'_>,
547 0 : ctx: &RequestContext,
548 0 : ) -> Result<BlockNumber, PageReconstructError> {
549 0 : let key = slru_segment_size_to_key(kind, segno);
550 0 : let mut buf = version.get(self, key, ctx).await?;
551 0 : Ok(buf.get_u32_le())
552 0 : }
553 :
554 : /// Get size of an SLRU segment
555 0 : pub(crate) async fn get_slru_segment_exists(
556 0 : &self,
557 0 : kind: SlruKind,
558 0 : segno: u32,
559 0 : version: Version<'_>,
560 0 : ctx: &RequestContext,
561 0 : ) -> Result<bool, PageReconstructError> {
562 0 : // fetch directory listing
563 0 : let key = slru_dir_to_key(kind);
564 0 : let buf = version.get(self, key, ctx).await?;
565 :
566 0 : let dir = SlruSegmentDirectory::des(&buf)?;
567 0 : Ok(dir.segments.contains(&segno))
568 0 : }
569 :
570 : /// Locate LSN, such that all transactions that committed before
571 : /// 'search_timestamp' are visible, but nothing newer is.
572 : ///
573 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
574 : /// so it's not well defined which LSN you get if there were multiple commits
575 : /// "in flight" at that point in time.
576 : ///
577 0 : pub(crate) async fn find_lsn_for_timestamp(
578 0 : &self,
579 0 : search_timestamp: TimestampTz,
580 0 : cancel: &CancellationToken,
581 0 : ctx: &RequestContext,
582 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
583 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
584 :
585 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
586 0 : // We use this method to figure out the branching LSN for the new branch, but the
587 0 : // GC cutoff could be before the branching point and we cannot create a new branch
588 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
589 0 : // on the safe side.
590 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
591 0 : let max_lsn = self.get_last_record_lsn();
592 0 :
593 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
594 0 : // LSN divided by 8.
595 0 : let mut low = min_lsn.0 / 8;
596 0 : let mut high = max_lsn.0 / 8 + 1;
597 0 :
598 0 : let mut found_smaller = false;
599 0 : let mut found_larger = false;
600 :
601 0 : while low < high {
602 0 : if cancel.is_cancelled() {
603 0 : return Err(PageReconstructError::Cancelled);
604 0 : }
605 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
606 0 : let mid = (high + low) / 2;
607 :
608 0 : let cmp = self
609 0 : .is_latest_commit_timestamp_ge_than(
610 0 : search_timestamp,
611 0 : Lsn(mid * 8),
612 0 : &mut found_smaller,
613 0 : &mut found_larger,
614 0 : ctx,
615 0 : )
616 0 : .await?;
617 :
618 0 : if cmp {
619 0 : high = mid;
620 0 : } else {
621 0 : low = mid + 1;
622 0 : }
623 : }
624 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
625 : // so the LSN of the last commit record before or at `search_timestamp`.
626 : // Remove one from `low` to get `t`.
627 : //
628 : // FIXME: it would be better to get the LSN of the previous commit.
629 : // Otherwise, if you restore to the returned LSN, the database will
630 : // include physical changes from later commits that will be marked
631 : // as aborted, and will need to be vacuumed away.
632 0 : let commit_lsn = Lsn((low - 1) * 8);
633 0 : match (found_smaller, found_larger) {
634 : (false, false) => {
635 : // This can happen if no commit records have been processed yet, e.g.
636 : // just after importing a cluster.
637 0 : Ok(LsnForTimestamp::NoData(min_lsn))
638 : }
639 : (false, true) => {
640 : // Didn't find any commit timestamps smaller than the request
641 0 : Ok(LsnForTimestamp::Past(min_lsn))
642 : }
643 0 : (true, _) if commit_lsn < min_lsn => {
644 0 : // the search above did set found_smaller to true but it never increased the lsn.
645 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
646 0 : // below the min_lsn. We should never do that.
647 0 : Ok(LsnForTimestamp::Past(min_lsn))
648 : }
649 : (true, false) => {
650 : // Only found commits with timestamps smaller than the request.
651 : // It's still a valid case for branch creation, return it.
652 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
653 : // case, anyway.
654 0 : Ok(LsnForTimestamp::Future(commit_lsn))
655 : }
656 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
657 : }
658 0 : }
659 :
660 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
661 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
662 : ///
663 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
664 : /// with a smaller/larger timestamp.
665 : ///
666 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
667 0 : &self,
668 0 : search_timestamp: TimestampTz,
669 0 : probe_lsn: Lsn,
670 0 : found_smaller: &mut bool,
671 0 : found_larger: &mut bool,
672 0 : ctx: &RequestContext,
673 0 : ) -> Result<bool, PageReconstructError> {
674 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
675 0 : if timestamp >= search_timestamp {
676 0 : *found_larger = true;
677 0 : return ControlFlow::Break(true);
678 0 : } else {
679 0 : *found_smaller = true;
680 0 : }
681 0 : ControlFlow::Continue(())
682 0 : })
683 0 : .await
684 0 : }
685 :
686 : /// Obtain the possible timestamp range for the given lsn.
687 : ///
688 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
689 0 : pub(crate) async fn get_timestamp_for_lsn(
690 0 : &self,
691 0 : probe_lsn: Lsn,
692 0 : ctx: &RequestContext,
693 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
694 0 : let mut max: Option<TimestampTz> = None;
695 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
696 0 : if let Some(max_prev) = max {
697 0 : max = Some(max_prev.max(timestamp));
698 0 : } else {
699 0 : max = Some(timestamp);
700 0 : }
701 0 : ControlFlow::Continue(())
702 0 : })
703 0 : .await?;
704 :
705 0 : Ok(max)
706 0 : }
707 :
708 : /// Runs the given function on all the timestamps for a given lsn
709 : ///
710 : /// The return value is either given by the closure, or set to the `Default`
711 : /// impl's output.
712 0 : async fn map_all_timestamps<T: Default>(
713 0 : &self,
714 0 : probe_lsn: Lsn,
715 0 : ctx: &RequestContext,
716 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
717 0 : ) -> Result<T, PageReconstructError> {
718 0 : for segno in self
719 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
720 0 : .await?
721 : {
722 0 : let nblocks = self
723 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
724 0 : .await?;
725 0 : for blknum in (0..nblocks).rev() {
726 0 : let clog_page = self
727 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
728 0 : .await?;
729 :
730 0 : if clog_page.len() == BLCKSZ as usize + 8 {
731 0 : let mut timestamp_bytes = [0u8; 8];
732 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
733 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
734 0 :
735 0 : match f(timestamp) {
736 0 : ControlFlow::Break(b) => return Ok(b),
737 0 : ControlFlow::Continue(()) => (),
738 : }
739 0 : }
740 : }
741 : }
742 0 : Ok(Default::default())
743 0 : }
744 :
745 0 : pub(crate) async fn get_slru_keyspace(
746 0 : &self,
747 0 : version: Version<'_>,
748 0 : ctx: &RequestContext,
749 0 : ) -> Result<KeySpace, PageReconstructError> {
750 0 : let mut accum = KeySpaceAccum::new();
751 :
752 0 : for kind in SlruKind::iter() {
753 0 : let mut segments: Vec<u32> = self
754 0 : .list_slru_segments(kind, version, ctx)
755 0 : .await?
756 0 : .into_iter()
757 0 : .collect();
758 0 : segments.sort_unstable();
759 :
760 0 : for seg in segments {
761 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
762 :
763 0 : accum.add_range(
764 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
765 0 : );
766 : }
767 : }
768 :
769 0 : Ok(accum.to_keyspace())
770 0 : }
771 :
772 : /// Get a list of SLRU segments
773 0 : pub(crate) async fn list_slru_segments(
774 0 : &self,
775 0 : kind: SlruKind,
776 0 : version: Version<'_>,
777 0 : ctx: &RequestContext,
778 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
779 0 : // fetch directory entry
780 0 : let key = slru_dir_to_key(kind);
781 :
782 0 : let buf = version.get(self, key, ctx).await?;
783 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
784 0 : }
785 :
786 0 : pub(crate) async fn get_relmap_file(
787 0 : &self,
788 0 : spcnode: Oid,
789 0 : dbnode: Oid,
790 0 : version: Version<'_>,
791 0 : ctx: &RequestContext,
792 0 : ) -> Result<Bytes, PageReconstructError> {
793 0 : let key = relmap_file_key(spcnode, dbnode);
794 :
795 0 : let buf = version.get(self, key, ctx).await?;
796 0 : Ok(buf)
797 0 : }
798 :
799 292 : pub(crate) async fn list_dbdirs(
800 292 : &self,
801 292 : lsn: Lsn,
802 292 : ctx: &RequestContext,
803 292 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
804 : // fetch directory entry
805 3117 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
806 :
807 292 : Ok(DbDirectory::des(&buf)?.dbdirs)
808 292 : }
809 :
810 0 : pub(crate) async fn get_twophase_file(
811 0 : &self,
812 0 : xid: u64,
813 0 : lsn: Lsn,
814 0 : ctx: &RequestContext,
815 0 : ) -> Result<Bytes, PageReconstructError> {
816 0 : let key = twophase_file_key(xid);
817 0 : let buf = self.get(key, lsn, ctx).await?;
818 0 : Ok(buf)
819 0 : }
820 :
821 294 : pub(crate) async fn list_twophase_files(
822 294 : &self,
823 294 : lsn: Lsn,
824 294 : ctx: &RequestContext,
825 294 : ) -> Result<HashSet<u64>, PageReconstructError> {
826 : // fetch directory entry
827 3134 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
828 :
829 294 : if self.pg_version >= 17 {
830 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
831 : } else {
832 294 : Ok(TwoPhaseDirectory::des(&buf)?
833 : .xids
834 294 : .iter()
835 294 : .map(|x| u64::from(*x))
836 294 : .collect())
837 : }
838 294 : }
839 :
840 0 : pub(crate) async fn get_control_file(
841 0 : &self,
842 0 : lsn: Lsn,
843 0 : ctx: &RequestContext,
844 0 : ) -> Result<Bytes, PageReconstructError> {
845 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
846 0 : }
847 :
848 12 : pub(crate) async fn get_checkpoint(
849 12 : &self,
850 12 : lsn: Lsn,
851 12 : ctx: &RequestContext,
852 12 : ) -> Result<Bytes, PageReconstructError> {
853 12 : self.get(CHECKPOINT_KEY, lsn, ctx).await
854 12 : }
855 :
856 12 : async fn list_aux_files_v2(
857 12 : &self,
858 12 : lsn: Lsn,
859 12 : ctx: &RequestContext,
860 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
861 12 : let kv = self
862 12 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
863 0 : .await?;
864 12 : let mut result = HashMap::new();
865 12 : let mut sz = 0;
866 30 : for (_, v) in kv {
867 18 : let v = v?;
868 18 : let v = aux_file::decode_file_value_bytes(&v)
869 18 : .context("value decode")
870 18 : .map_err(PageReconstructError::Other)?;
871 34 : for (fname, content) in v {
872 16 : sz += fname.len();
873 16 : sz += content.len();
874 16 : result.insert(fname, content);
875 16 : }
876 : }
877 12 : self.aux_file_size_estimator.on_initial(sz);
878 12 : Ok(result)
879 12 : }
880 :
881 0 : pub(crate) async fn trigger_aux_file_size_computation(
882 0 : &self,
883 0 : lsn: Lsn,
884 0 : ctx: &RequestContext,
885 0 : ) -> Result<(), PageReconstructError> {
886 0 : self.list_aux_files_v2(lsn, ctx).await?;
887 0 : Ok(())
888 0 : }
889 :
890 12 : pub(crate) async fn list_aux_files(
891 12 : &self,
892 12 : lsn: Lsn,
893 12 : ctx: &RequestContext,
894 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
895 12 : self.list_aux_files_v2(lsn, ctx).await
896 12 : }
897 :
898 0 : pub(crate) async fn get_replorigins(
899 0 : &self,
900 0 : lsn: Lsn,
901 0 : ctx: &RequestContext,
902 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
903 0 : let kv = self
904 0 : .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
905 0 : .await?;
906 0 : let mut result = HashMap::new();
907 0 : for (k, v) in kv {
908 0 : let v = v?;
909 0 : let origin_id = k.field6 as RepOriginId;
910 0 : let origin_lsn = Lsn::des(&v).unwrap();
911 0 : if origin_lsn != Lsn::INVALID {
912 0 : result.insert(origin_id, origin_lsn);
913 0 : }
914 : }
915 0 : Ok(result)
916 0 : }
917 :
918 : /// Does the same as get_current_logical_size but counted on demand.
919 : /// Used to initialize the logical size tracking on startup.
920 : ///
921 : /// Only relation blocks are counted currently. That excludes metadata,
922 : /// SLRUs, twophase files etc.
923 : ///
924 : /// # Cancel-Safety
925 : ///
926 : /// This method is cancellation-safe.
927 0 : pub(crate) async fn get_current_logical_size_non_incremental(
928 0 : &self,
929 0 : lsn: Lsn,
930 0 : ctx: &RequestContext,
931 0 : ) -> Result<u64, CalculateLogicalSizeError> {
932 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
933 :
934 : // Fetch list of database dirs and iterate them
935 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
936 0 : let dbdir = DbDirectory::des(&buf)?;
937 :
938 0 : let mut total_size: u64 = 0;
939 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
940 0 : for rel in self
941 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
942 0 : .await?
943 : {
944 0 : if self.cancel.is_cancelled() {
945 0 : return Err(CalculateLogicalSizeError::Cancelled);
946 0 : }
947 0 : let relsize_key = rel_size_to_key(rel);
948 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
949 0 : let relsize = buf.get_u32_le();
950 0 :
951 0 : total_size += relsize as u64;
952 : }
953 : }
954 0 : Ok(total_size * BLCKSZ as u64)
955 0 : }
956 :
957 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
958 : /// for gc-compaction.
959 : ///
960 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
961 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
962 : /// be kept only for a specific range of LSN.
963 : ///
964 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
965 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
966 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
967 : /// determine which keys to retain/drop for gc-compaction.
968 : ///
969 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
970 : /// to be retained for each of the branch LSN.
971 : ///
972 : /// The return value is (dense keyspace, sparse keyspace).
973 40 : pub(crate) async fn collect_gc_compaction_keyspace(
974 40 : &self,
975 40 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
976 40 : let metadata_key_begin = Key::metadata_key_range().start;
977 40 : let aux_v1_key = AUX_FILES_KEY;
978 40 : let dense_keyspace = KeySpace {
979 40 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
980 40 : };
981 40 : Ok((
982 40 : dense_keyspace,
983 40 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
984 40 : ))
985 40 : }
986 :
987 : ///
988 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
989 : /// Anything that's not listed maybe removed from the underlying storage (from
990 : /// that LSN forwards).
991 : ///
992 : /// The return value is (dense keyspace, sparse keyspace).
993 292 : pub(crate) async fn collect_keyspace(
994 292 : &self,
995 292 : lsn: Lsn,
996 292 : ctx: &RequestContext,
997 292 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
998 292 : // Iterate through key ranges, greedily packing them into partitions
999 292 : let mut result = KeySpaceAccum::new();
1000 292 :
1001 292 : // The dbdir metadata always exists
1002 292 : result.add_key(DBDIR_KEY);
1003 :
1004 : // Fetch list of database dirs and iterate them
1005 3117 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1006 292 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1007 292 :
1008 292 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1009 292 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1010 0 : if has_relmap_file {
1011 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1012 0 : }
1013 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1014 :
1015 0 : let mut rels: Vec<RelTag> = self
1016 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
1017 0 : .await?
1018 0 : .into_iter()
1019 0 : .collect();
1020 0 : rels.sort_unstable();
1021 0 : for rel in rels {
1022 0 : let relsize_key = rel_size_to_key(rel);
1023 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1024 0 : let relsize = buf.get_u32_le();
1025 0 :
1026 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1027 0 : result.add_key(relsize_key);
1028 : }
1029 : }
1030 :
1031 : // Iterate SLRUs next
1032 876 : for kind in [
1033 292 : SlruKind::Clog,
1034 292 : SlruKind::MultiXactMembers,
1035 292 : SlruKind::MultiXactOffsets,
1036 : ] {
1037 876 : let slrudir_key = slru_dir_to_key(kind);
1038 876 : result.add_key(slrudir_key);
1039 9542 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1040 876 : let dir = SlruSegmentDirectory::des(&buf)?;
1041 876 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1042 876 : segments.sort_unstable();
1043 876 : for segno in segments {
1044 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1045 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1046 0 : let segsize = buf.get_u32_le();
1047 0 :
1048 0 : result.add_range(
1049 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1050 0 : );
1051 0 : result.add_key(segsize_key);
1052 : }
1053 : }
1054 :
1055 : // Then pg_twophase
1056 292 : result.add_key(TWOPHASEDIR_KEY);
1057 :
1058 292 : let mut xids: Vec<u64> = self
1059 292 : .list_twophase_files(lsn, ctx)
1060 3133 : .await?
1061 292 : .iter()
1062 292 : .cloned()
1063 292 : .collect();
1064 292 : xids.sort_unstable();
1065 292 : for xid in xids {
1066 0 : result.add_key(twophase_file_key(xid));
1067 0 : }
1068 :
1069 292 : result.add_key(CONTROLFILE_KEY);
1070 292 : result.add_key(CHECKPOINT_KEY);
1071 292 :
1072 292 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1073 292 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1074 292 : // and the keys will not be garbage-colllected.
1075 292 : #[cfg(test)]
1076 292 : {
1077 292 : let guard = self.extra_test_dense_keyspace.load();
1078 292 : for kr in &guard.ranges {
1079 0 : result.add_range(kr.clone());
1080 0 : }
1081 0 : }
1082 0 :
1083 292 : let dense_keyspace = result.to_keyspace();
1084 292 : let sparse_keyspace = SparseKeySpace(KeySpace {
1085 292 : ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
1086 292 : });
1087 292 :
1088 292 : if cfg!(debug_assertions) {
1089 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1090 :
1091 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1092 : // category of sparse keys are split into their own image/delta files. If there
1093 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1094 : // and we want the developer to keep the keyspaces separated.
1095 :
1096 292 : let ranges = &sparse_keyspace.0.ranges;
1097 :
1098 : // TODO: use a single overlaps_with across the codebase
1099 292 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1100 292 : !(a.end <= b.start || b.end <= a.start)
1101 292 : }
1102 584 : for i in 0..ranges.len() {
1103 584 : for j in 0..i {
1104 292 : if overlaps_with(&ranges[i], &ranges[j]) {
1105 0 : panic!(
1106 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1107 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1108 0 : );
1109 292 : }
1110 : }
1111 : }
1112 292 : for i in 1..ranges.len() {
1113 292 : assert!(
1114 292 : ranges[i - 1].end <= ranges[i].start,
1115 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1116 0 : ranges[i - 1].start,
1117 0 : ranges[i - 1].end,
1118 0 : ranges[i].start,
1119 0 : ranges[i].end
1120 : );
1121 : }
1122 0 : }
1123 :
1124 292 : Ok((dense_keyspace, sparse_keyspace))
1125 292 : }
1126 :
1127 : /// Get cached size of relation if it not updated after specified LSN
1128 448540 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
1129 448540 : let rel_size_cache = self.rel_size_cache.read().unwrap();
1130 448540 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
1131 448518 : if lsn >= *cached_lsn {
1132 443372 : return Some(*nblocks);
1133 5146 : }
1134 22 : }
1135 5168 : None
1136 448540 : }
1137 :
1138 : /// Update cached relation size if there is no more recent update
1139 5136 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1140 5136 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1141 5136 :
1142 5136 : if lsn < rel_size_cache.complete_as_of {
1143 : // Do not cache old values. It's safe to cache the size on read, as long as
1144 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1145 : // never evict values from the cache, so if the relation size changed after
1146 : // 'lsn', the new value is already in the cache.
1147 0 : return;
1148 5136 : }
1149 5136 :
1150 5136 : match rel_size_cache.map.entry(tag) {
1151 5136 : hash_map::Entry::Occupied(mut entry) => {
1152 5136 : let cached_lsn = entry.get_mut();
1153 5136 : if lsn >= cached_lsn.0 {
1154 0 : *cached_lsn = (lsn, nblocks);
1155 5136 : }
1156 : }
1157 0 : hash_map::Entry::Vacant(entry) => {
1158 0 : entry.insert((lsn, nblocks));
1159 0 : }
1160 : }
1161 5136 : }
1162 :
1163 : /// Store cached relation size
1164 282720 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1165 282720 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1166 282720 : rel_size_cache.map.insert(tag, (lsn, nblocks));
1167 282720 : }
1168 :
1169 : /// Remove cached relation size
1170 2 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1171 2 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1172 2 : rel_size_cache.map.remove(tag);
1173 2 : }
1174 : }
1175 :
1176 : /// DatadirModification represents an operation to ingest an atomic set of
1177 : /// updates to the repository.
1178 : ///
1179 : /// It is created by the 'begin_record' function. It is called for each WAL
1180 : /// record, so that all the modifications by a one WAL record appear atomic.
1181 : pub struct DatadirModification<'a> {
1182 : /// The timeline this modification applies to. You can access this to
1183 : /// read the state, but note that any pending updates are *not* reflected
1184 : /// in the state in 'tline' yet.
1185 : pub tline: &'a Timeline,
1186 :
1187 : /// Current LSN of the modification
1188 : lsn: Lsn,
1189 :
1190 : // The modifications are not applied directly to the underlying key-value store.
1191 : // The put-functions add the modifications here, and they are flushed to the
1192 : // underlying key-value store by the 'finish' function.
1193 : pending_lsns: Vec<Lsn>,
1194 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1195 : pending_nblocks: i64,
1196 :
1197 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1198 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1199 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1200 :
1201 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1202 : /// which keys are stored here.
1203 : pending_data_batch: Option<SerializedValueBatch>,
1204 :
1205 : /// For special "directory" keys that store key-value maps, track the size of the map
1206 : /// if it was updated in this modification.
1207 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
1208 :
1209 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1210 : pending_metadata_bytes: usize,
1211 : }
1212 :
1213 : impl<'a> DatadirModification<'a> {
1214 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1215 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1216 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1217 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1218 :
1219 : /// Get the current lsn
1220 418058 : pub(crate) fn get_lsn(&self) -> Lsn {
1221 418058 : self.lsn
1222 418058 : }
1223 :
1224 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1225 0 : self.pending_data_batch
1226 0 : .as_ref()
1227 0 : .map_or(0, |b| b.buffer_size())
1228 0 : + self.pending_metadata_bytes
1229 0 : }
1230 :
1231 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1232 0 : !self
1233 0 : .pending_data_batch
1234 0 : .as_ref()
1235 0 : .map_or(true, |b| b.is_empty())
1236 0 : }
1237 :
1238 : /// Set the current lsn
1239 145858 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1240 145858 : ensure!(
1241 145858 : lsn >= self.lsn,
1242 0 : "setting an older lsn {} than {} is not allowed",
1243 : lsn,
1244 : self.lsn
1245 : );
1246 :
1247 145858 : if lsn > self.lsn {
1248 145858 : self.pending_lsns.push(self.lsn);
1249 145858 : self.lsn = lsn;
1250 145858 : }
1251 145858 : Ok(())
1252 145858 : }
1253 :
1254 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1255 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1256 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1257 : ///
1258 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1259 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1260 : /// via [`Self::get`] as soon as their record has been ingested.
1261 850432 : fn is_data_key(key: &Key) -> bool {
1262 850432 : key.is_rel_block_key() || key.is_slru_block_key()
1263 850432 : }
1264 :
1265 : /// Initialize a completely new repository.
1266 : ///
1267 : /// This inserts the directory metadata entries that are assumed to
1268 : /// always exist.
1269 178 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1270 178 : let buf = DbDirectory::ser(&DbDirectory {
1271 178 : dbdirs: HashMap::new(),
1272 178 : })?;
1273 178 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
1274 178 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1275 :
1276 178 : let buf = if self.tline.pg_version >= 17 {
1277 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1278 0 : xids: HashSet::new(),
1279 0 : })
1280 : } else {
1281 178 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1282 178 : xids: HashSet::new(),
1283 178 : })
1284 0 : }?;
1285 178 : self.pending_directory_entries
1286 178 : .push((DirectoryKind::TwoPhase, 0));
1287 178 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1288 :
1289 178 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1290 178 : let empty_dir = Value::Image(buf);
1291 178 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1292 178 : self.pending_directory_entries
1293 178 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1294 178 : self.put(
1295 178 : slru_dir_to_key(SlruKind::MultiXactMembers),
1296 178 : empty_dir.clone(),
1297 178 : );
1298 178 : self.pending_directory_entries
1299 178 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1300 178 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1301 178 : self.pending_directory_entries
1302 178 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1303 178 :
1304 178 : Ok(())
1305 178 : }
1306 :
1307 : #[cfg(test)]
1308 176 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1309 176 : self.init_empty()?;
1310 176 : self.put_control_file(bytes::Bytes::from_static(
1311 176 : b"control_file contents do not matter",
1312 176 : ))
1313 176 : .context("put_control_file")?;
1314 176 : self.put_checkpoint(bytes::Bytes::from_static(
1315 176 : b"checkpoint_file contents do not matter",
1316 176 : ))
1317 176 : .context("put_checkpoint_file")?;
1318 176 : Ok(())
1319 176 : }
1320 :
1321 : /// Creates a relation if it is not already present.
1322 : /// Returns the current size of the relation
1323 418056 : pub(crate) async fn create_relation_if_required(
1324 418056 : &mut self,
1325 418056 : rel: RelTag,
1326 418056 : ctx: &RequestContext,
1327 418056 : ) -> Result<u32, PageReconstructError> {
1328 : // Get current size and put rel creation if rel doesn't exist
1329 : //
1330 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1331 : // check the cache too. This is because eagerly checking the cache results in
1332 : // less work overall and 10% better performance. It's more work on cache miss
1333 : // but cache miss is rare.
1334 418056 : if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
1335 418046 : Ok(nblocks)
1336 10 : } else if !self
1337 10 : .tline
1338 10 : .get_rel_exists(rel, Version::Modified(self), ctx)
1339 2 : .await?
1340 : {
1341 : // create it with 0 size initially, the logic below will extend it
1342 10 : self.put_rel_creation(rel, 0, ctx)
1343 3 : .await
1344 10 : .context("Relation Error")?;
1345 10 : Ok(0)
1346 : } else {
1347 0 : self.tline
1348 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1349 0 : .await
1350 : }
1351 418056 : }
1352 :
1353 : /// Given a block number for a relation (which represents a newly written block),
1354 : /// the previous block count of the relation, and the shard info, find the gaps
1355 : /// that were created by the newly written block if any.
1356 145670 : fn find_gaps(
1357 145670 : rel: RelTag,
1358 145670 : blkno: u32,
1359 145670 : previous_nblocks: u32,
1360 145670 : shard: &ShardIdentity,
1361 145670 : ) -> Option<KeySpace> {
1362 145670 : let mut key = rel_block_to_key(rel, blkno);
1363 145670 : let mut gap_accum = None;
1364 :
1365 145670 : for gap_blkno in previous_nblocks..blkno {
1366 32 : key.field6 = gap_blkno;
1367 32 :
1368 32 : if shard.get_shard_number(&key) != shard.number {
1369 8 : continue;
1370 24 : }
1371 24 :
1372 24 : gap_accum
1373 24 : .get_or_insert_with(KeySpaceAccum::new)
1374 24 : .add_key(key);
1375 : }
1376 :
1377 145670 : gap_accum.map(|accum| accum.to_keyspace())
1378 145670 : }
1379 :
1380 145852 : pub async fn ingest_batch(
1381 145852 : &mut self,
1382 145852 : mut batch: SerializedValueBatch,
1383 145852 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1384 145852 : shard: &ShardIdentity,
1385 145852 : ctx: &RequestContext,
1386 145852 : ) -> anyhow::Result<()> {
1387 145852 : let mut gaps_at_lsns = Vec::default();
1388 :
1389 145852 : for meta in batch.metadata.iter() {
1390 145642 : let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
1391 145642 : let new_nblocks = blkno + 1;
1392 :
1393 145642 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1394 145642 : if new_nblocks > old_nblocks {
1395 2390 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1396 143252 : }
1397 :
1398 145642 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1399 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1400 145642 : }
1401 : }
1402 :
1403 145852 : if !gaps_at_lsns.is_empty() {
1404 0 : batch.zero_gaps(gaps_at_lsns);
1405 145852 : }
1406 :
1407 145852 : match self.pending_data_batch.as_mut() {
1408 20 : Some(pending_batch) => {
1409 20 : pending_batch.extend(batch);
1410 20 : }
1411 145832 : None if !batch.is_empty() => {
1412 145630 : self.pending_data_batch = Some(batch);
1413 145630 : }
1414 202 : None => {
1415 202 : // Nothing to initialize the batch with
1416 202 : }
1417 : }
1418 :
1419 145852 : Ok(())
1420 145852 : }
1421 :
1422 : /// Put a new page version that can be constructed from a WAL record
1423 : ///
1424 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1425 : /// current end-of-file. It's up to the caller to check that the relation size
1426 : /// matches the blocks inserted!
1427 12 : pub fn put_rel_wal_record(
1428 12 : &mut self,
1429 12 : rel: RelTag,
1430 12 : blknum: BlockNumber,
1431 12 : rec: NeonWalRecord,
1432 12 : ) -> anyhow::Result<()> {
1433 12 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1434 12 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1435 12 : Ok(())
1436 12 : }
1437 :
1438 : // Same, but for an SLRU.
1439 8 : pub fn put_slru_wal_record(
1440 8 : &mut self,
1441 8 : kind: SlruKind,
1442 8 : segno: u32,
1443 8 : blknum: BlockNumber,
1444 8 : rec: NeonWalRecord,
1445 8 : ) -> anyhow::Result<()> {
1446 8 : self.put(
1447 8 : slru_block_to_key(kind, segno, blknum),
1448 8 : Value::WalRecord(rec),
1449 8 : );
1450 8 : Ok(())
1451 8 : }
1452 :
1453 : /// Like put_wal_record, but with ready-made image of the page.
1454 277842 : pub fn put_rel_page_image(
1455 277842 : &mut self,
1456 277842 : rel: RelTag,
1457 277842 : blknum: BlockNumber,
1458 277842 : img: Bytes,
1459 277842 : ) -> anyhow::Result<()> {
1460 277842 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1461 277842 : let key = rel_block_to_key(rel, blknum);
1462 277842 : if !key.is_valid_key_on_write_path() {
1463 0 : anyhow::bail!(
1464 0 : "the request contains data not supported by pageserver at {}",
1465 0 : key
1466 0 : );
1467 277842 : }
1468 277842 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1469 277842 : Ok(())
1470 277842 : }
1471 :
1472 6 : pub fn put_slru_page_image(
1473 6 : &mut self,
1474 6 : kind: SlruKind,
1475 6 : segno: u32,
1476 6 : blknum: BlockNumber,
1477 6 : img: Bytes,
1478 6 : ) -> anyhow::Result<()> {
1479 6 : let key = slru_block_to_key(kind, segno, blknum);
1480 6 : if !key.is_valid_key_on_write_path() {
1481 0 : anyhow::bail!(
1482 0 : "the request contains data not supported by pageserver at {}",
1483 0 : key
1484 0 : );
1485 6 : }
1486 6 : self.put(key, Value::Image(img));
1487 6 : Ok(())
1488 6 : }
1489 :
1490 2998 : pub(crate) fn put_rel_page_image_zero(
1491 2998 : &mut self,
1492 2998 : rel: RelTag,
1493 2998 : blknum: BlockNumber,
1494 2998 : ) -> anyhow::Result<()> {
1495 2998 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1496 2998 : let key = rel_block_to_key(rel, blknum);
1497 2998 : if !key.is_valid_key_on_write_path() {
1498 0 : anyhow::bail!(
1499 0 : "the request contains data not supported by pageserver: {} @ {}",
1500 0 : key,
1501 0 : self.lsn
1502 0 : );
1503 2998 : }
1504 2998 :
1505 2998 : let batch = self
1506 2998 : .pending_data_batch
1507 2998 : .get_or_insert_with(SerializedValueBatch::default);
1508 2998 :
1509 2998 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1510 2998 :
1511 2998 : Ok(())
1512 2998 : }
1513 :
1514 0 : pub(crate) fn put_slru_page_image_zero(
1515 0 : &mut self,
1516 0 : kind: SlruKind,
1517 0 : segno: u32,
1518 0 : blknum: BlockNumber,
1519 0 : ) -> anyhow::Result<()> {
1520 0 : let key = slru_block_to_key(kind, segno, blknum);
1521 0 : if !key.is_valid_key_on_write_path() {
1522 0 : anyhow::bail!(
1523 0 : "the request contains data not supported by pageserver: {} @ {}",
1524 0 : key,
1525 0 : self.lsn
1526 0 : );
1527 0 : }
1528 0 :
1529 0 : let batch = self
1530 0 : .pending_data_batch
1531 0 : .get_or_insert_with(SerializedValueBatch::default);
1532 0 :
1533 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1534 0 :
1535 0 : Ok(())
1536 0 : }
1537 :
1538 : /// Store a relmapper file (pg_filenode.map) in the repository
1539 16 : pub async fn put_relmap_file(
1540 16 : &mut self,
1541 16 : spcnode: Oid,
1542 16 : dbnode: Oid,
1543 16 : img: Bytes,
1544 16 : ctx: &RequestContext,
1545 16 : ) -> anyhow::Result<()> {
1546 : // Add it to the directory (if it doesn't exist already)
1547 16 : let buf = self.get(DBDIR_KEY, ctx).await?;
1548 16 : let mut dbdir = DbDirectory::des(&buf)?;
1549 :
1550 16 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1551 16 : if r.is_none() || r == Some(false) {
1552 : // The dbdir entry didn't exist, or it contained a
1553 : // 'false'. The 'insert' call already updated it with
1554 : // 'true', now write the updated 'dbdirs' map back.
1555 16 : let buf = DbDirectory::ser(&dbdir)?;
1556 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1557 0 : }
1558 16 : if r.is_none() {
1559 : // Create RelDirectory
1560 8 : let buf = RelDirectory::ser(&RelDirectory {
1561 8 : rels: HashSet::new(),
1562 8 : })?;
1563 8 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1564 8 : self.put(
1565 8 : rel_dir_to_key(spcnode, dbnode),
1566 8 : Value::Image(Bytes::from(buf)),
1567 8 : );
1568 8 : }
1569 :
1570 16 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1571 16 : Ok(())
1572 16 : }
1573 :
1574 0 : pub async fn put_twophase_file(
1575 0 : &mut self,
1576 0 : xid: u64,
1577 0 : img: Bytes,
1578 0 : ctx: &RequestContext,
1579 0 : ) -> anyhow::Result<()> {
1580 : // Add it to the directory entry
1581 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1582 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1583 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1584 0 : if !dir.xids.insert(xid) {
1585 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1586 0 : }
1587 0 : self.pending_directory_entries
1588 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1589 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1590 : } else {
1591 0 : let xid = xid as u32;
1592 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1593 0 : if !dir.xids.insert(xid) {
1594 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1595 0 : }
1596 0 : self.pending_directory_entries
1597 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1598 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1599 : };
1600 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1601 0 :
1602 0 : self.put(twophase_file_key(xid), Value::Image(img));
1603 0 : Ok(())
1604 0 : }
1605 :
1606 0 : pub async fn set_replorigin(
1607 0 : &mut self,
1608 0 : origin_id: RepOriginId,
1609 0 : origin_lsn: Lsn,
1610 0 : ) -> anyhow::Result<()> {
1611 0 : let key = repl_origin_key(origin_id);
1612 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1613 0 : Ok(())
1614 0 : }
1615 :
1616 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1617 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1618 0 : }
1619 :
1620 178 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1621 178 : self.put(CONTROLFILE_KEY, Value::Image(img));
1622 178 : Ok(())
1623 178 : }
1624 :
1625 192 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1626 192 : self.put(CHECKPOINT_KEY, Value::Image(img));
1627 192 : Ok(())
1628 192 : }
1629 :
1630 0 : pub async fn drop_dbdir(
1631 0 : &mut self,
1632 0 : spcnode: Oid,
1633 0 : dbnode: Oid,
1634 0 : ctx: &RequestContext,
1635 0 : ) -> anyhow::Result<()> {
1636 0 : let total_blocks = self
1637 0 : .tline
1638 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1639 0 : .await?;
1640 :
1641 : // Remove entry from dbdir
1642 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1643 0 : let mut dir = DbDirectory::des(&buf)?;
1644 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1645 0 : let buf = DbDirectory::ser(&dir)?;
1646 0 : self.pending_directory_entries
1647 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1648 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1649 : } else {
1650 0 : warn!(
1651 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1652 : spcnode, dbnode
1653 : );
1654 : }
1655 :
1656 : // Update logical database size.
1657 0 : self.pending_nblocks -= total_blocks as i64;
1658 0 :
1659 0 : // Delete all relations and metadata files for the spcnode/dnode
1660 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1661 0 : Ok(())
1662 0 : }
1663 :
1664 : /// Create a relation fork.
1665 : ///
1666 : /// 'nblocks' is the initial size.
1667 1920 : pub async fn put_rel_creation(
1668 1920 : &mut self,
1669 1920 : rel: RelTag,
1670 1920 : nblocks: BlockNumber,
1671 1920 : ctx: &RequestContext,
1672 1920 : ) -> Result<(), RelationError> {
1673 1920 : if rel.relnode == 0 {
1674 0 : return Err(RelationError::InvalidRelnode);
1675 1920 : }
1676 : // It's possible that this is the first rel for this db in this
1677 : // tablespace. Create the reldir entry for it if so.
1678 1920 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1679 1920 : .context("deserialize db")?;
1680 1920 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1681 1920 : let mut rel_dir =
1682 1920 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1683 : // Didn't exist. Update dbdir
1684 8 : e.insert(false);
1685 8 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1686 8 : self.pending_directory_entries
1687 8 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1688 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1689 8 :
1690 8 : // and create the RelDirectory
1691 8 : RelDirectory::default()
1692 : } else {
1693 : // reldir already exists, fetch it
1694 1912 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1695 1912 : .context("deserialize db")?
1696 : };
1697 :
1698 : // Add the new relation to the rel directory entry, and write it back
1699 1920 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1700 0 : return Err(RelationError::AlreadyExists);
1701 1920 : }
1702 1920 :
1703 1920 : self.pending_directory_entries
1704 1920 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1705 1920 :
1706 1920 : self.put(
1707 1920 : rel_dir_key,
1708 1920 : Value::Image(Bytes::from(
1709 1920 : RelDirectory::ser(&rel_dir).context("serialize")?,
1710 : )),
1711 : );
1712 :
1713 : // Put size
1714 1920 : let size_key = rel_size_to_key(rel);
1715 1920 : let buf = nblocks.to_le_bytes();
1716 1920 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1717 1920 :
1718 1920 : self.pending_nblocks += nblocks as i64;
1719 1920 :
1720 1920 : // Update relation size cache
1721 1920 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1722 1920 :
1723 1920 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1724 1920 : // caller.
1725 1920 : Ok(())
1726 1920 : }
1727 :
1728 : /// Truncate relation
1729 6012 : pub async fn put_rel_truncation(
1730 6012 : &mut self,
1731 6012 : rel: RelTag,
1732 6012 : nblocks: BlockNumber,
1733 6012 : ctx: &RequestContext,
1734 6012 : ) -> anyhow::Result<()> {
1735 6012 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1736 6012 : if self
1737 6012 : .tline
1738 6012 : .get_rel_exists(rel, Version::Modified(self), ctx)
1739 0 : .await?
1740 : {
1741 6012 : let size_key = rel_size_to_key(rel);
1742 : // Fetch the old size first
1743 6012 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1744 6012 :
1745 6012 : // Update the entry with the new size.
1746 6012 : let buf = nblocks.to_le_bytes();
1747 6012 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1748 6012 :
1749 6012 : // Update relation size cache
1750 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1751 6012 :
1752 6012 : // Update logical database size.
1753 6012 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1754 0 : }
1755 6012 : Ok(())
1756 6012 : }
1757 :
1758 : /// Extend relation
1759 : /// If new size is smaller, do nothing.
1760 276680 : pub async fn put_rel_extend(
1761 276680 : &mut self,
1762 276680 : rel: RelTag,
1763 276680 : nblocks: BlockNumber,
1764 276680 : ctx: &RequestContext,
1765 276680 : ) -> anyhow::Result<()> {
1766 276680 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1767 :
1768 : // Put size
1769 276680 : let size_key = rel_size_to_key(rel);
1770 276680 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1771 276680 :
1772 276680 : // only extend relation here. never decrease the size
1773 276680 : if nblocks > old_size {
1774 274788 : let buf = nblocks.to_le_bytes();
1775 274788 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1776 274788 :
1777 274788 : // Update relation size cache
1778 274788 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1779 274788 :
1780 274788 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1781 274788 : }
1782 276680 : Ok(())
1783 276680 : }
1784 :
1785 : /// Drop some relations
1786 10 : pub(crate) async fn put_rel_drops(
1787 10 : &mut self,
1788 10 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
1789 10 : ctx: &RequestContext,
1790 10 : ) -> anyhow::Result<()> {
1791 12 : for ((spc_node, db_node), rel_tags) in drop_relations {
1792 2 : let dir_key = rel_dir_to_key(spc_node, db_node);
1793 2 : let buf = self.get(dir_key, ctx).await?;
1794 2 : let mut dir = RelDirectory::des(&buf)?;
1795 :
1796 2 : let mut dirty = false;
1797 4 : for rel_tag in rel_tags {
1798 2 : if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
1799 2 : dirty = true;
1800 2 :
1801 2 : // update logical size
1802 2 : let size_key = rel_size_to_key(rel_tag);
1803 2 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1804 2 : self.pending_nblocks -= old_size as i64;
1805 2 :
1806 2 : // Remove entry from relation size cache
1807 2 : self.tline.remove_cached_rel_size(&rel_tag);
1808 2 :
1809 2 : // Delete size entry, as well as all blocks
1810 2 : self.delete(rel_key_range(rel_tag));
1811 0 : }
1812 : }
1813 :
1814 2 : if dirty {
1815 2 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1816 2 : self.pending_directory_entries
1817 2 : .push((DirectoryKind::Rel, dir.rels.len()));
1818 0 : }
1819 : }
1820 :
1821 10 : Ok(())
1822 10 : }
1823 :
1824 6 : pub async fn put_slru_segment_creation(
1825 6 : &mut self,
1826 6 : kind: SlruKind,
1827 6 : segno: u32,
1828 6 : nblocks: BlockNumber,
1829 6 : ctx: &RequestContext,
1830 6 : ) -> anyhow::Result<()> {
1831 6 : // Add it to the directory entry
1832 6 : let dir_key = slru_dir_to_key(kind);
1833 6 : let buf = self.get(dir_key, ctx).await?;
1834 6 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1835 :
1836 6 : if !dir.segments.insert(segno) {
1837 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1838 6 : }
1839 6 : self.pending_directory_entries
1840 6 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1841 6 : self.put(
1842 6 : dir_key,
1843 6 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1844 : );
1845 :
1846 : // Put size
1847 6 : let size_key = slru_segment_size_to_key(kind, segno);
1848 6 : let buf = nblocks.to_le_bytes();
1849 6 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1850 6 :
1851 6 : // even if nblocks > 0, we don't insert any actual blocks here
1852 6 :
1853 6 : Ok(())
1854 6 : }
1855 :
1856 : /// Extend SLRU segment
1857 0 : pub fn put_slru_extend(
1858 0 : &mut self,
1859 0 : kind: SlruKind,
1860 0 : segno: u32,
1861 0 : nblocks: BlockNumber,
1862 0 : ) -> anyhow::Result<()> {
1863 0 : // Put size
1864 0 : let size_key = slru_segment_size_to_key(kind, segno);
1865 0 : let buf = nblocks.to_le_bytes();
1866 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1867 0 : Ok(())
1868 0 : }
1869 :
1870 : /// This method is used for marking truncated SLRU files
1871 0 : pub async fn drop_slru_segment(
1872 0 : &mut self,
1873 0 : kind: SlruKind,
1874 0 : segno: u32,
1875 0 : ctx: &RequestContext,
1876 0 : ) -> anyhow::Result<()> {
1877 0 : // Remove it from the directory entry
1878 0 : let dir_key = slru_dir_to_key(kind);
1879 0 : let buf = self.get(dir_key, ctx).await?;
1880 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1881 :
1882 0 : if !dir.segments.remove(&segno) {
1883 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1884 0 : }
1885 0 : self.pending_directory_entries
1886 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1887 0 : self.put(
1888 0 : dir_key,
1889 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1890 : );
1891 :
1892 : // Delete size entry, as well as all blocks
1893 0 : self.delete(slru_segment_key_range(kind, segno));
1894 0 :
1895 0 : Ok(())
1896 0 : }
1897 :
1898 : /// Drop a relmapper file (pg_filenode.map)
1899 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1900 0 : // TODO
1901 0 : Ok(())
1902 0 : }
1903 :
1904 : /// This method is used for marking truncated SLRU files
1905 0 : pub async fn drop_twophase_file(
1906 0 : &mut self,
1907 0 : xid: u64,
1908 0 : ctx: &RequestContext,
1909 0 : ) -> anyhow::Result<()> {
1910 : // Remove it from the directory entry
1911 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1912 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1913 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
1914 :
1915 0 : if !dir.xids.remove(&xid) {
1916 0 : warn!("twophase file for xid {} does not exist", xid);
1917 0 : }
1918 0 : self.pending_directory_entries
1919 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1920 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1921 : } else {
1922 0 : let xid: u32 = u32::try_from(xid)?;
1923 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1924 :
1925 0 : if !dir.xids.remove(&xid) {
1926 0 : warn!("twophase file for xid {} does not exist", xid);
1927 0 : }
1928 0 : self.pending_directory_entries
1929 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1930 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1931 : };
1932 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1933 0 :
1934 0 : // Delete it
1935 0 : self.delete(twophase_key_range(xid));
1936 0 :
1937 0 : Ok(())
1938 0 : }
1939 :
1940 16 : pub async fn put_file(
1941 16 : &mut self,
1942 16 : path: &str,
1943 16 : content: &[u8],
1944 16 : ctx: &RequestContext,
1945 16 : ) -> anyhow::Result<()> {
1946 16 : let key = aux_file::encode_aux_file_key(path);
1947 : // retrieve the key from the engine
1948 16 : let old_val = match self.get(key, ctx).await {
1949 4 : Ok(val) => Some(val),
1950 12 : Err(PageReconstructError::MissingKey(_)) => None,
1951 0 : Err(e) => return Err(e.into()),
1952 : };
1953 16 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
1954 4 : aux_file::decode_file_value(old_val)?
1955 : } else {
1956 12 : Vec::new()
1957 : };
1958 16 : let mut other_files = Vec::with_capacity(files.len());
1959 16 : let mut modifying_file = None;
1960 20 : for file @ (p, content) in files {
1961 4 : if path == p {
1962 4 : assert!(
1963 4 : modifying_file.is_none(),
1964 0 : "duplicated entries found for {}",
1965 : path
1966 : );
1967 4 : modifying_file = Some(content);
1968 0 : } else {
1969 0 : other_files.push(file);
1970 0 : }
1971 : }
1972 16 : let mut new_files = other_files;
1973 16 : match (modifying_file, content.is_empty()) {
1974 2 : (Some(old_content), false) => {
1975 2 : self.tline
1976 2 : .aux_file_size_estimator
1977 2 : .on_update(old_content.len(), content.len());
1978 2 : new_files.push((path, content));
1979 2 : }
1980 2 : (Some(old_content), true) => {
1981 2 : self.tline
1982 2 : .aux_file_size_estimator
1983 2 : .on_remove(old_content.len());
1984 2 : // not adding the file key to the final `new_files` vec.
1985 2 : }
1986 12 : (None, false) => {
1987 12 : self.tline.aux_file_size_estimator.on_add(content.len());
1988 12 : new_files.push((path, content));
1989 12 : }
1990 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
1991 : }
1992 16 : let new_val = aux_file::encode_file_value(&new_files)?;
1993 16 : self.put(key, Value::Image(new_val.into()));
1994 16 :
1995 16 : Ok(())
1996 16 : }
1997 :
1998 : ///
1999 : /// Flush changes accumulated so far to the underlying repository.
2000 : ///
2001 : /// Usually, changes made in DatadirModification are atomic, but this allows
2002 : /// you to flush them to the underlying repository before the final `commit`.
2003 : /// That allows to free up the memory used to hold the pending changes.
2004 : ///
2005 : /// Currently only used during bulk import of a data directory. In that
2006 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2007 : /// whole import fails and the timeline will be deleted anyway.
2008 : /// (Or to be precise, it will be left behind for debugging purposes and
2009 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2010 : ///
2011 : /// Note: A consequence of flushing the pending operations is that they
2012 : /// won't be visible to subsequent operations until `commit`. The function
2013 : /// retains all the metadata, but data pages are flushed. That's again OK
2014 : /// for bulk import, where you are just loading data pages and won't try to
2015 : /// modify the same pages twice.
2016 1930 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2017 1930 : // Unless we have accumulated a decent amount of changes, it's not worth it
2018 1930 : // to scan through the pending_updates list.
2019 1930 : let pending_nblocks = self.pending_nblocks;
2020 1930 : if pending_nblocks < 10000 {
2021 1930 : return Ok(());
2022 0 : }
2023 :
2024 0 : let mut writer = self.tline.writer().await;
2025 :
2026 : // Flush relation and SLRU data blocks, keep metadata.
2027 0 : if let Some(batch) = self.pending_data_batch.take() {
2028 0 : tracing::debug!(
2029 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2030 0 : batch.max_lsn,
2031 0 : self.tline.get_last_record_lsn()
2032 : );
2033 :
2034 : // This bails out on first error without modifying pending_updates.
2035 : // That's Ok, cf this function's doc comment.
2036 0 : writer.put_batch(batch, ctx).await?;
2037 0 : }
2038 :
2039 0 : if pending_nblocks != 0 {
2040 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2041 0 : self.pending_nblocks = 0;
2042 0 : }
2043 :
2044 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2045 0 : writer.update_directory_entries_count(kind, count as u64);
2046 0 : }
2047 :
2048 0 : Ok(())
2049 1930 : }
2050 :
2051 : ///
2052 : /// Finish this atomic update, writing all the updated keys to the
2053 : /// underlying timeline.
2054 : /// All the modifications in this atomic update are stamped by the specified LSN.
2055 : ///
2056 743064 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2057 743064 : let mut writer = self.tline.writer().await;
2058 :
2059 743064 : let pending_nblocks = self.pending_nblocks;
2060 743064 : self.pending_nblocks = 0;
2061 :
2062 : // Ordering: the items in this batch do not need to be in any global order, but values for
2063 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2064 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2065 : // more details.
2066 :
2067 743064 : let metadata_batch = {
2068 743064 : let pending_meta = self
2069 743064 : .pending_metadata_pages
2070 743064 : .drain()
2071 743064 : .flat_map(|(key, values)| {
2072 273826 : values
2073 273826 : .into_iter()
2074 273826 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2075 743064 : })
2076 743064 : .collect::<Vec<_>>();
2077 743064 :
2078 743064 : if pending_meta.is_empty() {
2079 472278 : None
2080 : } else {
2081 270786 : Some(SerializedValueBatch::from_values(pending_meta))
2082 : }
2083 : };
2084 :
2085 743064 : let data_batch = self.pending_data_batch.take();
2086 :
2087 743064 : let maybe_batch = match (data_batch, metadata_batch) {
2088 264556 : (Some(mut data), Some(metadata)) => {
2089 264556 : data.extend(metadata);
2090 264556 : Some(data)
2091 : }
2092 143262 : (Some(data), None) => Some(data),
2093 6230 : (None, Some(metadata)) => Some(metadata),
2094 329016 : (None, None) => None,
2095 : };
2096 :
2097 743064 : if let Some(batch) = maybe_batch {
2098 414048 : tracing::debug!(
2099 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2100 0 : batch.max_lsn,
2101 0 : self.tline.get_last_record_lsn()
2102 : );
2103 :
2104 : // This bails out on first error without modifying pending_updates.
2105 : // That's Ok, cf this function's doc comment.
2106 414048 : writer.put_batch(batch, ctx).await?;
2107 329016 : }
2108 :
2109 743064 : if !self.pending_deletions.is_empty() {
2110 2 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2111 2 : self.pending_deletions.clear();
2112 743062 : }
2113 :
2114 743064 : self.pending_lsns.push(self.lsn);
2115 888922 : for pending_lsn in self.pending_lsns.drain(..) {
2116 888922 : // TODO(vlad): pretty sure the comment below is not valid anymore
2117 888922 : // and we can call finish write with the latest LSN
2118 888922 : //
2119 888922 : // Ideally, we should be able to call writer.finish_write() only once
2120 888922 : // with the highest LSN. However, the last_record_lsn variable in the
2121 888922 : // timeline keeps track of the latest LSN and the immediate previous LSN
2122 888922 : // so we need to record every LSN to not leave a gap between them.
2123 888922 : writer.finish_write(pending_lsn);
2124 888922 : }
2125 :
2126 743064 : if pending_nblocks != 0 {
2127 270570 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2128 472494 : }
2129 :
2130 743064 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2131 2834 : writer.update_directory_entries_count(kind, count as u64);
2132 2834 : }
2133 :
2134 743064 : self.pending_metadata_bytes = 0;
2135 743064 :
2136 743064 : Ok(())
2137 743064 : }
2138 :
2139 291704 : pub(crate) fn len(&self) -> usize {
2140 291704 : self.pending_metadata_pages.len()
2141 291704 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2142 291704 : + self.pending_deletions.len()
2143 291704 : }
2144 :
2145 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2146 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2147 : /// in the modification.
2148 : ///
2149 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2150 : /// page must ensure that the pages they read are already committed in Timeline, for example
2151 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2152 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2153 : /// and not data pages.
2154 286586 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2155 286586 : if !Self::is_data_key(&key) {
2156 : // Have we already updated the same key? Read the latest pending updated
2157 : // version in that case.
2158 : //
2159 : // Note: we don't check pending_deletions. It is an error to request a
2160 : // value that has been removed, deletion only avoids leaking storage.
2161 286586 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2162 15928 : if let Some((_, _, value)) = values.last() {
2163 15928 : return if let Value::Image(img) = value {
2164 15928 : Ok(img.clone())
2165 : } else {
2166 : // Currently, we never need to read back a WAL record that we
2167 : // inserted in the same "transaction". All the metadata updates
2168 : // work directly with Images, and we never need to read actual
2169 : // data pages. We could handle this if we had to, by calling
2170 : // the walredo manager, but let's keep it simple for now.
2171 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2172 0 : "unexpected pending WAL record"
2173 0 : )))
2174 : };
2175 0 : }
2176 270658 : }
2177 : } else {
2178 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2179 : // this key should never be present in pending_data_pages. We ensure this by committing
2180 : // modifications before ingesting DB create operations, which are the only kind that reads
2181 : // data pages during ingest.
2182 0 : if cfg!(debug_assertions) {
2183 0 : assert!(!self
2184 0 : .pending_data_batch
2185 0 : .as_ref()
2186 0 : .map_or(false, |b| b.updates_key(&key)));
2187 0 : }
2188 : }
2189 :
2190 : // Metadata page cache miss, or we're reading a data page.
2191 270658 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2192 270658 : self.tline.get(key, lsn, ctx).await
2193 286586 : }
2194 :
2195 563846 : fn put(&mut self, key: Key, val: Value) {
2196 563846 : if Self::is_data_key(&key) {
2197 277868 : self.put_data(key.to_compact(), val)
2198 : } else {
2199 285978 : self.put_metadata(key.to_compact(), val)
2200 : }
2201 563846 : }
2202 :
2203 277868 : fn put_data(&mut self, key: CompactKey, val: Value) {
2204 277868 : let batch = self
2205 277868 : .pending_data_batch
2206 277868 : .get_or_insert_with(SerializedValueBatch::default);
2207 277868 : batch.put(key, val, self.lsn);
2208 277868 : }
2209 :
2210 285978 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2211 285978 : let values = self.pending_metadata_pages.entry(key).or_default();
2212 : // Replace the previous value if it exists at the same lsn
2213 285978 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2214 12152 : if *last_lsn == self.lsn {
2215 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2216 12152 : self.pending_metadata_bytes -= *last_value_ser_size;
2217 12152 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2218 12152 : self.pending_metadata_bytes += *last_value_ser_size;
2219 12152 :
2220 12152 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2221 12152 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2222 12152 : *last_value = val;
2223 12152 : return;
2224 0 : }
2225 273826 : }
2226 :
2227 273826 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2228 273826 : self.pending_metadata_bytes += val_serialized_size;
2229 273826 : values.push((self.lsn, val_serialized_size, val));
2230 273826 :
2231 273826 : if key == CHECKPOINT_KEY.to_compact() {
2232 192 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2233 273634 : }
2234 285978 : }
2235 :
2236 2 : fn delete(&mut self, key_range: Range<Key>) {
2237 2 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2238 2 : self.pending_deletions.push((key_range, self.lsn));
2239 2 : }
2240 : }
2241 :
2242 : /// This struct facilitates accessing either a committed key from the timeline at a
2243 : /// specific LSN, or the latest uncommitted key from a pending modification.
2244 : ///
2245 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2246 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2247 : /// need to look up the keys in the modification first before looking them up in the
2248 : /// timeline to not miss the latest updates.
2249 : #[derive(Clone, Copy)]
2250 : pub enum Version<'a> {
2251 : Lsn(Lsn),
2252 : Modified(&'a DatadirModification<'a>),
2253 : }
2254 :
2255 : impl<'a> Version<'a> {
2256 5176 : async fn get(
2257 5176 : &self,
2258 5176 : timeline: &Timeline,
2259 5176 : key: Key,
2260 5176 : ctx: &RequestContext,
2261 5176 : ) -> Result<Bytes, PageReconstructError> {
2262 5176 : match self {
2263 5156 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2264 20 : Version::Modified(modification) => modification.get(key, ctx).await,
2265 : }
2266 5176 : }
2267 :
2268 35620 : fn get_lsn(&self) -> Lsn {
2269 35620 : match self {
2270 29574 : Version::Lsn(lsn) => *lsn,
2271 6046 : Version::Modified(modification) => modification.lsn,
2272 : }
2273 35620 : }
2274 : }
2275 :
2276 : //--- Metadata structs stored in key-value pairs in the repository.
2277 :
2278 2246 : #[derive(Debug, Serialize, Deserialize)]
2279 : struct DbDirectory {
2280 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2281 : dbdirs: HashMap<(Oid, Oid), bool>,
2282 : }
2283 :
2284 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2285 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2286 : // were named like "pg_twophase/000002E5", now they're like
2287 : // "pg_twophsae/0000000A000002E4".
2288 :
2289 294 : #[derive(Debug, Serialize, Deserialize)]
2290 : struct TwoPhaseDirectory {
2291 : xids: HashSet<TransactionId>,
2292 : }
2293 :
2294 0 : #[derive(Debug, Serialize, Deserialize)]
2295 : struct TwoPhaseDirectoryV17 {
2296 : xids: HashSet<u64>,
2297 : }
2298 :
2299 1932 : #[derive(Debug, Serialize, Deserialize, Default)]
2300 : struct RelDirectory {
2301 : // Set of relations that exist. (relfilenode, forknum)
2302 : //
2303 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2304 : // key-value pairs, if you have a lot of relations
2305 : rels: HashSet<(Oid, u8)>,
2306 : }
2307 :
2308 0 : #[derive(Debug, Serialize, Deserialize)]
2309 : struct RelSizeEntry {
2310 : nblocks: u32,
2311 : }
2312 :
2313 882 : #[derive(Debug, Serialize, Deserialize, Default)]
2314 : struct SlruSegmentDirectory {
2315 : // Set of SLRU segments that exist.
2316 : segments: HashSet<u32>,
2317 : }
2318 :
2319 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2320 : #[repr(u8)]
2321 : pub(crate) enum DirectoryKind {
2322 : Db,
2323 : TwoPhase,
2324 : Rel,
2325 : AuxFiles,
2326 : SlruSegment(SlruKind),
2327 : }
2328 :
2329 : impl DirectoryKind {
2330 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2331 5668 : pub(crate) fn offset(&self) -> usize {
2332 5668 : self.into_usize()
2333 5668 : }
2334 : }
2335 :
2336 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2337 :
2338 : #[allow(clippy::bool_assert_comparison)]
2339 : #[cfg(test)]
2340 : mod tests {
2341 : use hex_literal::hex;
2342 : use pageserver_api::{models::ShardParameters, shard::ShardStripeSize};
2343 : use utils::{
2344 : id::TimelineId,
2345 : shard::{ShardCount, ShardNumber},
2346 : };
2347 :
2348 : use super::*;
2349 :
2350 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
2351 :
2352 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2353 : #[tokio::test]
2354 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2355 2 : let name = "aux_files_round_trip";
2356 2 : let harness = TenantHarness::create(name).await?;
2357 2 :
2358 2 : pub const TIMELINE_ID: TimelineId =
2359 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2360 2 :
2361 20 : let (tenant, ctx) = harness.load().await;
2362 2 : let tline = tenant
2363 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2364 2 : .await?;
2365 2 : let tline = tline.raw_timeline().unwrap();
2366 2 :
2367 2 : // First modification: insert two keys
2368 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
2369 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2370 2 : modification.set_lsn(Lsn(0x1008))?;
2371 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2372 2 : modification.commit(&ctx).await?;
2373 2 : let expect_1008 = HashMap::from([
2374 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2375 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2376 2 : ]);
2377 2 :
2378 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2379 2 : assert_eq!(readback, expect_1008);
2380 2 :
2381 2 : // Second modification: update one key, remove the other
2382 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
2383 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2384 2 : modification.set_lsn(Lsn(0x2008))?;
2385 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
2386 2 : modification.commit(&ctx).await?;
2387 2 : let expect_2008 =
2388 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2389 2 :
2390 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
2391 2 : assert_eq!(readback, expect_2008);
2392 2 :
2393 2 : // Reading back in time works
2394 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2395 2 : assert_eq!(readback, expect_1008);
2396 2 :
2397 2 : Ok(())
2398 2 : }
2399 :
2400 : #[test]
2401 2 : fn gap_finding() {
2402 2 : let rel = RelTag {
2403 2 : spcnode: 1663,
2404 2 : dbnode: 208101,
2405 2 : relnode: 2620,
2406 2 : forknum: 0,
2407 2 : };
2408 2 : let base_blkno = 1;
2409 2 :
2410 2 : let base_key = rel_block_to_key(rel, base_blkno);
2411 2 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2412 2 :
2413 2 : let shard = ShardIdentity::unsharded();
2414 2 :
2415 2 : let mut previous_nblocks = 0;
2416 22 : for i in 0..10 {
2417 20 : let crnt_blkno = base_blkno + i;
2418 20 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2419 20 :
2420 20 : previous_nblocks = crnt_blkno + 1;
2421 20 :
2422 20 : if i == 0 {
2423 : // The first block we write is 1, so we should find the gap.
2424 2 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2425 : } else {
2426 18 : assert!(gaps.is_none());
2427 : }
2428 : }
2429 :
2430 : // This is an update to an already existing block. No gaps here.
2431 2 : let update_blkno = 5;
2432 2 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2433 2 : assert!(gaps.is_none());
2434 :
2435 : // This is an update past the current end block.
2436 2 : let after_gap_blkno = 20;
2437 2 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2438 2 :
2439 2 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2440 2 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2441 2 : assert_eq!(
2442 2 : gaps.unwrap(),
2443 2 : KeySpace::single(gap_start_key..after_gap_key)
2444 2 : );
2445 2 : }
2446 :
2447 : #[test]
2448 2 : fn sharded_gap_finding() {
2449 2 : let rel = RelTag {
2450 2 : spcnode: 1663,
2451 2 : dbnode: 208101,
2452 2 : relnode: 2620,
2453 2 : forknum: 0,
2454 2 : };
2455 2 :
2456 2 : let first_blkno = 6;
2457 2 :
2458 2 : // This shard will get the even blocks
2459 2 : let shard = ShardIdentity::from_params(
2460 2 : ShardNumber(0),
2461 2 : &ShardParameters {
2462 2 : count: ShardCount(2),
2463 2 : stripe_size: ShardStripeSize(1),
2464 2 : },
2465 2 : );
2466 2 :
2467 2 : // Only keys belonging to this shard are considered as gaps.
2468 2 : let mut previous_nblocks = 0;
2469 2 : let gaps =
2470 2 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2471 2 : assert!(!gaps.ranges.is_empty());
2472 6 : for gap_range in gaps.ranges {
2473 4 : let mut k = gap_range.start;
2474 8 : while k != gap_range.end {
2475 4 : assert_eq!(shard.get_shard_number(&k), shard.number);
2476 4 : k = k.next();
2477 : }
2478 : }
2479 :
2480 2 : previous_nblocks = first_blkno;
2481 2 :
2482 2 : let update_blkno = 2;
2483 2 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2484 2 : assert!(gaps.is_none());
2485 2 : }
2486 :
2487 : /*
2488 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2489 : let incremental = timeline.get_current_logical_size();
2490 : let non_incremental = timeline
2491 : .get_current_logical_size_non_incremental(lsn)
2492 : .unwrap();
2493 : assert_eq!(incremental, non_incremental);
2494 : }
2495 : */
2496 :
2497 : /*
2498 : ///
2499 : /// Test list_rels() function, with branches and dropped relations
2500 : ///
2501 : #[test]
2502 : fn test_list_rels_drop() -> Result<()> {
2503 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2504 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2505 : const TESTDB: u32 = 111;
2506 :
2507 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2508 : // after branching fails below
2509 : let mut writer = tline.begin_record(Lsn(0x10));
2510 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2511 : writer.finish()?;
2512 :
2513 : // Create a relation on the timeline
2514 : let mut writer = tline.begin_record(Lsn(0x20));
2515 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2516 : writer.finish()?;
2517 :
2518 : let writer = tline.begin_record(Lsn(0x00));
2519 : writer.finish()?;
2520 :
2521 : // Check that list_rels() lists it after LSN 2, but no before it
2522 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2523 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2524 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2525 :
2526 : // Create a branch, check that the relation is visible there
2527 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2528 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2529 : Some(timeline) => timeline,
2530 : None => panic!("Should have a local timeline"),
2531 : };
2532 : let newtline = DatadirTimelineImpl::new(newtline);
2533 : assert!(newtline
2534 : .list_rels(0, TESTDB, Lsn(0x30))?
2535 : .contains(&TESTREL_A));
2536 :
2537 : // Drop it on the branch
2538 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2539 : new_writer.drop_relation(TESTREL_A)?;
2540 : new_writer.finish()?;
2541 :
2542 : // Check that it's no longer listed on the branch after the point where it was dropped
2543 : assert!(newtline
2544 : .list_rels(0, TESTDB, Lsn(0x30))?
2545 : .contains(&TESTREL_A));
2546 : assert!(!newtline
2547 : .list_rels(0, TESTDB, Lsn(0x40))?
2548 : .contains(&TESTREL_A));
2549 :
2550 : // Run checkpoint and garbage collection and check that it's still not visible
2551 : newtline.checkpoint(CheckpointConfig::Forced)?;
2552 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2553 :
2554 : assert!(!newtline
2555 : .list_rels(0, TESTDB, Lsn(0x40))?
2556 : .contains(&TESTREL_A));
2557 :
2558 : Ok(())
2559 : }
2560 : */
2561 :
2562 : /*
2563 : #[test]
2564 : fn test_read_beyond_eof() -> Result<()> {
2565 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2566 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2567 :
2568 : make_some_layers(&tline, Lsn(0x20))?;
2569 : let mut writer = tline.begin_record(Lsn(0x60));
2570 : walingest.put_rel_page_image(
2571 : &mut writer,
2572 : TESTREL_A,
2573 : 0,
2574 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2575 : )?;
2576 : writer.finish()?;
2577 :
2578 : // Test read before rel creation. Should error out.
2579 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2580 :
2581 : // Read block beyond end of relation at different points in time.
2582 : // These reads should fall into different delta, image, and in-memory layers.
2583 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2584 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2585 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2586 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2587 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2588 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2589 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2590 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2591 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2592 :
2593 : // Test on an in-memory layer with no preceding layer
2594 : let mut writer = tline.begin_record(Lsn(0x70));
2595 : walingest.put_rel_page_image(
2596 : &mut writer,
2597 : TESTREL_B,
2598 : 0,
2599 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2600 : )?;
2601 : writer.finish()?;
2602 :
2603 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2604 :
2605 : Ok(())
2606 : }
2607 : */
2608 : }
|