Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::aux_file;
11 : use crate::context::RequestContext;
12 : use crate::keyspace::{KeySpace, KeySpaceAccum};
13 : use crate::metrics::{
14 : RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
15 : };
16 : use crate::span::{
17 : debug_assert_current_span_has_tenant_and_timeline_id,
18 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
19 : };
20 : use crate::tenant::timeline::GetVectoredError;
21 : use anyhow::{ensure, Context};
22 : use bytes::{Buf, Bytes, BytesMut};
23 : use enum_map::Enum;
24 : use itertools::Itertools;
25 : use pageserver_api::key::Key;
26 : use pageserver_api::key::{
27 : dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
28 : relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
29 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
30 : CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
31 : };
32 : use pageserver_api::keyspace::SparseKeySpace;
33 : use pageserver_api::record::NeonWalRecord;
34 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
35 : use pageserver_api::shard::ShardIdentity;
36 : use pageserver_api::value::Value;
37 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
38 : use postgres_ffi::BLCKSZ;
39 : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
40 : use serde::{Deserialize, Serialize};
41 : use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
42 : use std::ops::ControlFlow;
43 : use std::ops::Range;
44 : use strum::IntoEnumIterator;
45 : use tokio_util::sync::CancellationToken;
46 : use tracing::{debug, trace, warn};
47 : use utils::bin_ser::DeserializeError;
48 : use utils::pausable_failpoint;
49 : use utils::{bin_ser::BeSer, lsn::Lsn};
50 : use wal_decoder::serialized_batch::SerializedValueBatch;
51 :
52 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
53 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
54 :
55 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
56 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
57 :
58 : #[derive(Debug)]
59 : pub enum LsnForTimestamp {
60 : /// Found commits both before and after the given timestamp
61 : Present(Lsn),
62 :
63 : /// Found no commits after the given timestamp, this means
64 : /// that the newest data in the branch is older than the given
65 : /// timestamp.
66 : ///
67 : /// All commits <= LSN happened before the given timestamp
68 : Future(Lsn),
69 :
70 : /// The queried timestamp is past our horizon we look back at (PITR)
71 : ///
72 : /// All commits > LSN happened after the given timestamp,
73 : /// but any commits < LSN might have happened before or after
74 : /// the given timestamp. We don't know because no data before
75 : /// the given lsn is available.
76 : Past(Lsn),
77 :
78 : /// We have found no commit with a timestamp,
79 : /// so we can't return anything meaningful.
80 : ///
81 : /// The associated LSN is the lower bound value we can safely
82 : /// create branches on, but no statement is made if it is
83 : /// older or newer than the timestamp.
84 : ///
85 : /// This variant can e.g. be returned right after a
86 : /// cluster import.
87 : NoData(Lsn),
88 : }
89 :
90 : #[derive(Debug, thiserror::Error)]
91 : pub(crate) enum CalculateLogicalSizeError {
92 : #[error("cancelled")]
93 : Cancelled,
94 :
95 : /// Something went wrong while reading the metadata we use to calculate logical size
96 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
97 : /// in the `From` implementation for this variant.
98 : #[error(transparent)]
99 : PageRead(PageReconstructError),
100 :
101 : /// Something went wrong deserializing metadata that we read to calculate logical size
102 : #[error("decode error: {0}")]
103 : Decode(#[from] DeserializeError),
104 : }
105 :
106 : #[derive(Debug, thiserror::Error)]
107 : pub(crate) enum CollectKeySpaceError {
108 : #[error(transparent)]
109 : Decode(#[from] DeserializeError),
110 : #[error(transparent)]
111 : PageRead(PageReconstructError),
112 : #[error("cancelled")]
113 : Cancelled,
114 : }
115 :
116 : impl From<PageReconstructError> for CollectKeySpaceError {
117 0 : fn from(err: PageReconstructError) -> Self {
118 0 : match err {
119 0 : PageReconstructError::Cancelled => Self::Cancelled,
120 0 : err => Self::PageRead(err),
121 : }
122 0 : }
123 : }
124 :
125 : impl From<PageReconstructError> for CalculateLogicalSizeError {
126 0 : fn from(pre: PageReconstructError) -> Self {
127 0 : match pre {
128 0 : PageReconstructError::Cancelled => Self::Cancelled,
129 0 : _ => Self::PageRead(pre),
130 : }
131 0 : }
132 : }
133 :
134 : #[derive(Debug, thiserror::Error)]
135 : pub enum RelationError {
136 : #[error("Relation Already Exists")]
137 : AlreadyExists,
138 : #[error("invalid relnode")]
139 : InvalidRelnode,
140 : #[error(transparent)]
141 : Other(#[from] anyhow::Error),
142 : }
143 :
144 : ///
145 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
146 : /// and other special kinds of files, in a versioned key-value store. The
147 : /// Timeline struct provides the key-value store.
148 : ///
149 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
150 : /// implementation, and might be moved into a separate struct later.
151 : impl Timeline {
152 : /// Start ingesting a WAL record, or other atomic modification of
153 : /// the timeline.
154 : ///
155 : /// This provides a transaction-like interface to perform a bunch
156 : /// of modifications atomically.
157 : ///
158 : /// To ingest a WAL record, call begin_modification(lsn) to get a
159 : /// DatadirModification object. Use the functions in the object to
160 : /// modify the repository state, updating all the pages and metadata
161 : /// that the WAL record affects. When you're done, call commit() to
162 : /// commit the changes.
163 : ///
164 : /// Lsn stored in modification is advanced by `ingest_record` and
165 : /// is used by `commit()` to update `last_record_lsn`.
166 : ///
167 : /// Calling commit() will flush all the changes and reset the state,
168 : /// so the `DatadirModification` struct can be reused to perform the next modification.
169 : ///
170 : /// Note that any pending modifications you make through the
171 : /// modification object won't be visible to calls to the 'get' and list
172 : /// functions of the timeline until you finish! And if you update the
173 : /// same page twice, the last update wins.
174 : ///
175 268384 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
176 268384 : where
177 268384 : Self: Sized,
178 268384 : {
179 268384 : DatadirModification {
180 268384 : tline: self,
181 268384 : pending_lsns: Vec::new(),
182 268384 : pending_metadata_pages: HashMap::new(),
183 268384 : pending_data_batch: None,
184 268384 : pending_deletions: Vec::new(),
185 268384 : pending_nblocks: 0,
186 268384 : pending_directory_entries: Vec::new(),
187 268384 : pending_metadata_bytes: 0,
188 268384 : lsn,
189 268384 : }
190 268384 : }
191 :
192 : //------------------------------------------------------------------------------
193 : // Public GET functions
194 : //------------------------------------------------------------------------------
195 :
196 : /// Look up given page version.
197 18384 : pub(crate) async fn get_rel_page_at_lsn(
198 18384 : &self,
199 18384 : tag: RelTag,
200 18384 : blknum: BlockNumber,
201 18384 : version: Version<'_>,
202 18384 : ctx: &RequestContext,
203 18384 : ) -> Result<Bytes, PageReconstructError> {
204 18384 : match version {
205 18384 : Version::Lsn(effective_lsn) => {
206 18384 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
207 18384 : let res = self
208 18384 : .get_rel_page_at_lsn_batched(
209 18384 : pages.iter().map(|(tag, blknum)| (tag, blknum)),
210 18384 : effective_lsn,
211 18384 : ctx,
212 18384 : )
213 18384 : .await;
214 18384 : assert_eq!(res.len(), 1);
215 18384 : res.into_iter().next().unwrap()
216 : }
217 0 : Version::Modified(modification) => {
218 0 : if tag.relnode == 0 {
219 0 : return Err(PageReconstructError::Other(
220 0 : RelationError::InvalidRelnode.into(),
221 0 : ));
222 0 : }
223 :
224 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
225 0 : if blknum >= nblocks {
226 0 : debug!(
227 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
228 0 : tag,
229 0 : blknum,
230 0 : version.get_lsn(),
231 : nblocks
232 : );
233 0 : return Ok(ZERO_PAGE.clone());
234 0 : }
235 0 :
236 0 : let key = rel_block_to_key(tag, blknum);
237 0 : modification.get(key, ctx).await
238 : }
239 : }
240 18384 : }
241 :
242 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
243 : ///
244 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
245 18384 : pub(crate) async fn get_rel_page_at_lsn_batched(
246 18384 : &self,
247 18384 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
248 18384 : effective_lsn: Lsn,
249 18384 : ctx: &RequestContext,
250 18384 : ) -> Vec<Result<Bytes, PageReconstructError>> {
251 18384 : debug_assert_current_span_has_tenant_and_timeline_id();
252 18384 :
253 18384 : let mut slots_filled = 0;
254 18384 : let page_count = pages.len();
255 18384 :
256 18384 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
257 18384 : let mut result = Vec::with_capacity(pages.len());
258 18384 : let result_slots = result.spare_capacity_mut();
259 18384 :
260 18384 : let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
261 18384 : for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
262 18384 : if tag.relnode == 0 {
263 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
264 0 : RelationError::InvalidRelnode.into(),
265 0 : )));
266 0 :
267 0 : slots_filled += 1;
268 0 : continue;
269 18384 : }
270 :
271 18384 : let nblocks = match self
272 18384 : .get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
273 18384 : .await
274 : {
275 18384 : Ok(nblocks) => nblocks,
276 0 : Err(err) => {
277 0 : result_slots[response_slot_idx].write(Err(err));
278 0 : slots_filled += 1;
279 0 : continue;
280 : }
281 : };
282 :
283 18384 : if *blknum >= nblocks {
284 0 : debug!(
285 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
286 : tag, blknum, effective_lsn, nblocks
287 : );
288 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
289 0 : slots_filled += 1;
290 0 : continue;
291 18384 : }
292 18384 :
293 18384 : let key = rel_block_to_key(*tag, *blknum);
294 18384 :
295 18384 : let key_slots = keys_slots.entry(key).or_default();
296 18384 : key_slots.push(response_slot_idx);
297 : }
298 :
299 18384 : let keyspace = {
300 : // add_key requires monotonicity
301 18384 : let mut acc = KeySpaceAccum::new();
302 18384 : for key in keys_slots
303 18384 : .keys()
304 18384 : // in fact it requires strong monotonicity
305 18384 : .dedup()
306 18384 : {
307 18384 : acc.add_key(*key);
308 18384 : }
309 18384 : acc.to_keyspace()
310 18384 : };
311 18384 :
312 18384 : match self.get_vectored(keyspace, effective_lsn, ctx).await {
313 18384 : Ok(results) => {
314 36768 : for (key, res) in results {
315 18384 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
316 18384 : let first_slot = key_slots.next().unwrap();
317 :
318 18384 : for slot in key_slots {
319 0 : let clone = match &res {
320 0 : Ok(buf) => Ok(buf.clone()),
321 0 : Err(err) => Err(match err {
322 : PageReconstructError::Cancelled => {
323 0 : PageReconstructError::Cancelled
324 : }
325 :
326 0 : x @ PageReconstructError::Other(_) |
327 0 : x @ PageReconstructError::AncestorLsnTimeout(_) |
328 0 : x @ PageReconstructError::WalRedo(_) |
329 0 : x @ PageReconstructError::MissingKey(_) => {
330 0 : PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
331 : },
332 : }),
333 : };
334 :
335 0 : result_slots[slot].write(clone);
336 0 : slots_filled += 1;
337 : }
338 :
339 18384 : result_slots[first_slot].write(res);
340 18384 : slots_filled += 1;
341 : }
342 : }
343 0 : Err(err) => {
344 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
345 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
346 0 : for slot in keys_slots.values().flatten() {
347 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
348 : // but without taking ownership of the GetVectoredError
349 0 : let err = match &err {
350 : GetVectoredError::Cancelled => {
351 0 : Err(PageReconstructError::Cancelled)
352 : }
353 : // TODO: restructure get_vectored API to make this error per-key
354 0 : GetVectoredError::MissingKey(err) => {
355 0 : Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}")))
356 : }
357 : // TODO: restructure get_vectored API to make this error per-key
358 0 : GetVectoredError::GetReadyAncestorError(err) => {
359 0 : Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}")))
360 : }
361 : // TODO: restructure get_vectored API to make this error per-key
362 0 : GetVectoredError::Other(err) => {
363 0 : Err(PageReconstructError::Other(
364 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
365 0 : ))
366 : }
367 : // TODO: we can prevent this error class by moving this check into the type system
368 0 : GetVectoredError::InvalidLsn(e) => {
369 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
370 : }
371 : // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
372 : // TODO: we can prevent this error class by moving this check into the type system
373 0 : GetVectoredError::Oversized(err) => {
374 0 : Err(anyhow::anyhow!(
375 0 : "batching oversized: {err:?}"
376 0 : )
377 0 : .into())
378 : }
379 : };
380 :
381 0 : result_slots[*slot].write(err);
382 : }
383 :
384 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
385 0 : }
386 : };
387 :
388 18384 : assert_eq!(slots_filled, page_count);
389 : // SAFETY:
390 : // 1. `result` and any of its uninint members are not read from until this point
391 : // 2. The length below is tracked at run-time and matches the number of requested pages.
392 18384 : unsafe {
393 18384 : result.set_len(page_count);
394 18384 : }
395 18384 :
396 18384 : result
397 18384 : }
398 :
399 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
400 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
401 : /// for pages up to the highest page number it has stored.
402 0 : pub(crate) async fn get_db_size(
403 0 : &self,
404 0 : spcnode: Oid,
405 0 : dbnode: Oid,
406 0 : version: Version<'_>,
407 0 : ctx: &RequestContext,
408 0 : ) -> Result<usize, PageReconstructError> {
409 0 : let mut total_blocks = 0;
410 :
411 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
412 :
413 0 : for rel in rels {
414 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
415 0 : total_blocks += n_blocks as usize;
416 : }
417 0 : Ok(total_blocks)
418 0 : }
419 :
420 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
421 : ///
422 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
423 : /// page number stored in the shard.
424 24434 : pub(crate) async fn get_rel_size(
425 24434 : &self,
426 24434 : tag: RelTag,
427 24434 : version: Version<'_>,
428 24434 : ctx: &RequestContext,
429 24434 : ) -> Result<BlockNumber, PageReconstructError> {
430 24434 : if tag.relnode == 0 {
431 0 : return Err(PageReconstructError::Other(
432 0 : RelationError::InvalidRelnode.into(),
433 0 : ));
434 24434 : }
435 :
436 24434 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
437 19294 : return Ok(nblocks);
438 5140 : }
439 5140 :
440 5140 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
441 0 : && !self.get_rel_exists(tag, version, ctx).await?
442 : {
443 : // FIXME: Postgres sometimes calls smgrcreate() to create
444 : // FSM, and smgrnblocks() on it immediately afterwards,
445 : // without extending it. Tolerate that by claiming that
446 : // any non-existent FSM fork has size 0.
447 0 : return Ok(0);
448 5140 : }
449 5140 :
450 5140 : let key = rel_size_to_key(tag);
451 5140 : let mut buf = version.get(self, key, ctx).await?;
452 5136 : let nblocks = buf.get_u32_le();
453 5136 :
454 5136 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
455 5136 :
456 5136 : Ok(nblocks)
457 24434 : }
458 :
459 : /// Does the relation exist?
460 : ///
461 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
462 : /// the shard stores pages for.
463 6050 : pub(crate) async fn get_rel_exists(
464 6050 : &self,
465 6050 : tag: RelTag,
466 6050 : version: Version<'_>,
467 6050 : ctx: &RequestContext,
468 6050 : ) -> Result<bool, PageReconstructError> {
469 6050 : if tag.relnode == 0 {
470 0 : return Err(PageReconstructError::Other(
471 0 : RelationError::InvalidRelnode.into(),
472 0 : ));
473 6050 : }
474 :
475 : // first try to lookup relation in cache
476 6050 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
477 6032 : return Ok(true);
478 18 : }
479 : // then check if the database was already initialized.
480 : // get_rel_exists can be called before dbdir is created.
481 18 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
482 18 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
483 18 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
484 0 : return Ok(false);
485 18 : }
486 18 : // fetch directory listing
487 18 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
488 18 : let buf = version.get(self, key, ctx).await?;
489 :
490 18 : let dir = RelDirectory::des(&buf)?;
491 18 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
492 6050 : }
493 :
494 : /// Get a list of all existing relations in given tablespace and database.
495 : ///
496 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
497 : /// the shard stores pages for.
498 : ///
499 : /// # Cancel-Safety
500 : ///
501 : /// This method is cancellation-safe.
502 0 : pub(crate) async fn list_rels(
503 0 : &self,
504 0 : spcnode: Oid,
505 0 : dbnode: Oid,
506 0 : version: Version<'_>,
507 0 : ctx: &RequestContext,
508 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
509 0 : // fetch directory listing
510 0 : let key = rel_dir_to_key(spcnode, dbnode);
511 0 : let buf = version.get(self, key, ctx).await?;
512 :
513 0 : let dir = RelDirectory::des(&buf)?;
514 0 : let rels: HashSet<RelTag> =
515 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
516 0 : spcnode,
517 0 : dbnode,
518 0 : relnode: *relnode,
519 0 : forknum: *forknum,
520 0 : }));
521 0 :
522 0 : Ok(rels)
523 0 : }
524 :
525 : /// Get the whole SLRU segment
526 0 : pub(crate) async fn get_slru_segment(
527 0 : &self,
528 0 : kind: SlruKind,
529 0 : segno: u32,
530 0 : lsn: Lsn,
531 0 : ctx: &RequestContext,
532 0 : ) -> Result<Bytes, PageReconstructError> {
533 0 : assert!(self.tenant_shard_id.is_shard_zero());
534 0 : let n_blocks = self
535 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
536 0 : .await?;
537 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
538 0 : for blkno in 0..n_blocks {
539 0 : let block = self
540 0 : .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
541 0 : .await?;
542 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
543 : }
544 0 : Ok(segment.freeze())
545 0 : }
546 :
547 : /// Look up given SLRU page version.
548 0 : pub(crate) async fn get_slru_page_at_lsn(
549 0 : &self,
550 0 : kind: SlruKind,
551 0 : segno: u32,
552 0 : blknum: BlockNumber,
553 0 : lsn: Lsn,
554 0 : ctx: &RequestContext,
555 0 : ) -> Result<Bytes, PageReconstructError> {
556 0 : assert!(self.tenant_shard_id.is_shard_zero());
557 0 : let key = slru_block_to_key(kind, segno, blknum);
558 0 : self.get(key, lsn, ctx).await
559 0 : }
560 :
561 : /// Get size of an SLRU segment
562 0 : pub(crate) async fn get_slru_segment_size(
563 0 : &self,
564 0 : kind: SlruKind,
565 0 : segno: u32,
566 0 : version: Version<'_>,
567 0 : ctx: &RequestContext,
568 0 : ) -> Result<BlockNumber, PageReconstructError> {
569 0 : assert!(self.tenant_shard_id.is_shard_zero());
570 0 : let key = slru_segment_size_to_key(kind, segno);
571 0 : let mut buf = version.get(self, key, ctx).await?;
572 0 : Ok(buf.get_u32_le())
573 0 : }
574 :
575 : /// Get size of an SLRU segment
576 0 : pub(crate) async fn get_slru_segment_exists(
577 0 : &self,
578 0 : kind: SlruKind,
579 0 : segno: u32,
580 0 : version: Version<'_>,
581 0 : ctx: &RequestContext,
582 0 : ) -> Result<bool, PageReconstructError> {
583 0 : assert!(self.tenant_shard_id.is_shard_zero());
584 : // fetch directory listing
585 0 : let key = slru_dir_to_key(kind);
586 0 : let buf = version.get(self, key, ctx).await?;
587 :
588 0 : let dir = SlruSegmentDirectory::des(&buf)?;
589 0 : Ok(dir.segments.contains(&segno))
590 0 : }
591 :
592 : /// Locate LSN, such that all transactions that committed before
593 : /// 'search_timestamp' are visible, but nothing newer is.
594 : ///
595 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
596 : /// so it's not well defined which LSN you get if there were multiple commits
597 : /// "in flight" at that point in time.
598 : ///
599 0 : pub(crate) async fn find_lsn_for_timestamp(
600 0 : &self,
601 0 : search_timestamp: TimestampTz,
602 0 : cancel: &CancellationToken,
603 0 : ctx: &RequestContext,
604 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
605 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
606 :
607 0 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
608 0 : // We use this method to figure out the branching LSN for the new branch, but the
609 0 : // GC cutoff could be before the branching point and we cannot create a new branch
610 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
611 0 : // on the safe side.
612 0 : let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
613 0 : let max_lsn = self.get_last_record_lsn();
614 0 :
615 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
616 0 : // LSN divided by 8.
617 0 : let mut low = min_lsn.0 / 8;
618 0 : let mut high = max_lsn.0 / 8 + 1;
619 0 :
620 0 : let mut found_smaller = false;
621 0 : let mut found_larger = false;
622 :
623 0 : while low < high {
624 0 : if cancel.is_cancelled() {
625 0 : return Err(PageReconstructError::Cancelled);
626 0 : }
627 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
628 0 : let mid = (high + low) / 2;
629 :
630 0 : let cmp = match self
631 0 : .is_latest_commit_timestamp_ge_than(
632 0 : search_timestamp,
633 0 : Lsn(mid * 8),
634 0 : &mut found_smaller,
635 0 : &mut found_larger,
636 0 : ctx,
637 0 : )
638 0 : .await
639 : {
640 0 : Ok(res) => res,
641 0 : Err(PageReconstructError::MissingKey(e)) => {
642 0 : warn!("Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}", e);
643 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
644 0 : return Ok(LsnForTimestamp::Past(min_lsn));
645 : }
646 0 : Err(e) => return Err(e),
647 : };
648 :
649 0 : if cmp {
650 0 : high = mid;
651 0 : } else {
652 0 : low = mid + 1;
653 0 : }
654 : }
655 :
656 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
657 : // so the LSN of the last commit record before or at `search_timestamp`.
658 : // Remove one from `low` to get `t`.
659 : //
660 : // FIXME: it would be better to get the LSN of the previous commit.
661 : // Otherwise, if you restore to the returned LSN, the database will
662 : // include physical changes from later commits that will be marked
663 : // as aborted, and will need to be vacuumed away.
664 0 : let commit_lsn = Lsn((low - 1) * 8);
665 0 : match (found_smaller, found_larger) {
666 : (false, false) => {
667 : // This can happen if no commit records have been processed yet, e.g.
668 : // just after importing a cluster.
669 0 : Ok(LsnForTimestamp::NoData(min_lsn))
670 : }
671 : (false, true) => {
672 : // Didn't find any commit timestamps smaller than the request
673 0 : Ok(LsnForTimestamp::Past(min_lsn))
674 : }
675 0 : (true, _) if commit_lsn < min_lsn => {
676 0 : // the search above did set found_smaller to true but it never increased the lsn.
677 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
678 0 : // below the min_lsn. We should never do that.
679 0 : Ok(LsnForTimestamp::Past(min_lsn))
680 : }
681 : (true, false) => {
682 : // Only found commits with timestamps smaller than the request.
683 : // It's still a valid case for branch creation, return it.
684 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
685 : // case, anyway.
686 0 : Ok(LsnForTimestamp::Future(commit_lsn))
687 : }
688 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
689 : }
690 0 : }
691 :
692 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
693 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
694 : ///
695 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
696 : /// with a smaller/larger timestamp.
697 : ///
698 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
699 0 : &self,
700 0 : search_timestamp: TimestampTz,
701 0 : probe_lsn: Lsn,
702 0 : found_smaller: &mut bool,
703 0 : found_larger: &mut bool,
704 0 : ctx: &RequestContext,
705 0 : ) -> Result<bool, PageReconstructError> {
706 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
707 0 : if timestamp >= search_timestamp {
708 0 : *found_larger = true;
709 0 : return ControlFlow::Break(true);
710 0 : } else {
711 0 : *found_smaller = true;
712 0 : }
713 0 : ControlFlow::Continue(())
714 0 : })
715 0 : .await
716 0 : }
717 :
718 : /// Obtain the possible timestamp range for the given lsn.
719 : ///
720 : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
721 0 : pub(crate) async fn get_timestamp_for_lsn(
722 0 : &self,
723 0 : probe_lsn: Lsn,
724 0 : ctx: &RequestContext,
725 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
726 0 : let mut max: Option<TimestampTz> = None;
727 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
728 0 : if let Some(max_prev) = max {
729 0 : max = Some(max_prev.max(timestamp));
730 0 : } else {
731 0 : max = Some(timestamp);
732 0 : }
733 0 : ControlFlow::Continue(())
734 0 : })
735 0 : .await?;
736 :
737 0 : Ok(max)
738 0 : }
739 :
740 : /// Runs the given function on all the timestamps for a given lsn
741 : ///
742 : /// The return value is either given by the closure, or set to the `Default`
743 : /// impl's output.
744 0 : async fn map_all_timestamps<T: Default>(
745 0 : &self,
746 0 : probe_lsn: Lsn,
747 0 : ctx: &RequestContext,
748 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
749 0 : ) -> Result<T, PageReconstructError> {
750 0 : for segno in self
751 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
752 0 : .await?
753 : {
754 0 : let nblocks = self
755 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
756 0 : .await?;
757 0 : for blknum in (0..nblocks).rev() {
758 0 : let clog_page = self
759 0 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
760 0 : .await?;
761 :
762 0 : if clog_page.len() == BLCKSZ as usize + 8 {
763 0 : let mut timestamp_bytes = [0u8; 8];
764 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
765 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
766 0 :
767 0 : match f(timestamp) {
768 0 : ControlFlow::Break(b) => return Ok(b),
769 0 : ControlFlow::Continue(()) => (),
770 : }
771 0 : }
772 : }
773 : }
774 0 : Ok(Default::default())
775 0 : }
776 :
777 0 : pub(crate) async fn get_slru_keyspace(
778 0 : &self,
779 0 : version: Version<'_>,
780 0 : ctx: &RequestContext,
781 0 : ) -> Result<KeySpace, PageReconstructError> {
782 0 : let mut accum = KeySpaceAccum::new();
783 :
784 0 : for kind in SlruKind::iter() {
785 0 : let mut segments: Vec<u32> = self
786 0 : .list_slru_segments(kind, version, ctx)
787 0 : .await?
788 0 : .into_iter()
789 0 : .collect();
790 0 : segments.sort_unstable();
791 :
792 0 : for seg in segments {
793 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
794 :
795 0 : accum.add_range(
796 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
797 0 : );
798 : }
799 : }
800 :
801 0 : Ok(accum.to_keyspace())
802 0 : }
803 :
804 : /// Get a list of SLRU segments
805 0 : pub(crate) async fn list_slru_segments(
806 0 : &self,
807 0 : kind: SlruKind,
808 0 : version: Version<'_>,
809 0 : ctx: &RequestContext,
810 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
811 0 : // fetch directory entry
812 0 : let key = slru_dir_to_key(kind);
813 :
814 0 : let buf = version.get(self, key, ctx).await?;
815 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
816 0 : }
817 :
818 0 : pub(crate) async fn get_relmap_file(
819 0 : &self,
820 0 : spcnode: Oid,
821 0 : dbnode: Oid,
822 0 : version: Version<'_>,
823 0 : ctx: &RequestContext,
824 0 : ) -> Result<Bytes, PageReconstructError> {
825 0 : let key = relmap_file_key(spcnode, dbnode);
826 :
827 0 : let buf = version.get(self, key, ctx).await?;
828 0 : Ok(buf)
829 0 : }
830 :
831 296 : pub(crate) async fn list_dbdirs(
832 296 : &self,
833 296 : lsn: Lsn,
834 296 : ctx: &RequestContext,
835 296 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
836 : // fetch directory entry
837 296 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
838 :
839 296 : Ok(DbDirectory::des(&buf)?.dbdirs)
840 296 : }
841 :
842 0 : pub(crate) async fn get_twophase_file(
843 0 : &self,
844 0 : xid: u64,
845 0 : lsn: Lsn,
846 0 : ctx: &RequestContext,
847 0 : ) -> Result<Bytes, PageReconstructError> {
848 0 : let key = twophase_file_key(xid);
849 0 : let buf = self.get(key, lsn, ctx).await?;
850 0 : Ok(buf)
851 0 : }
852 :
853 298 : pub(crate) async fn list_twophase_files(
854 298 : &self,
855 298 : lsn: Lsn,
856 298 : ctx: &RequestContext,
857 298 : ) -> Result<HashSet<u64>, PageReconstructError> {
858 : // fetch directory entry
859 298 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
860 :
861 298 : if self.pg_version >= 17 {
862 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
863 : } else {
864 298 : Ok(TwoPhaseDirectory::des(&buf)?
865 : .xids
866 298 : .iter()
867 298 : .map(|x| u64::from(*x))
868 298 : .collect())
869 : }
870 298 : }
871 :
872 0 : pub(crate) async fn get_control_file(
873 0 : &self,
874 0 : lsn: Lsn,
875 0 : ctx: &RequestContext,
876 0 : ) -> Result<Bytes, PageReconstructError> {
877 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
878 0 : }
879 :
880 12 : pub(crate) async fn get_checkpoint(
881 12 : &self,
882 12 : lsn: Lsn,
883 12 : ctx: &RequestContext,
884 12 : ) -> Result<Bytes, PageReconstructError> {
885 12 : self.get(CHECKPOINT_KEY, lsn, ctx).await
886 12 : }
887 :
888 12 : async fn list_aux_files_v2(
889 12 : &self,
890 12 : lsn: Lsn,
891 12 : ctx: &RequestContext,
892 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
893 12 : let kv = self
894 12 : .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
895 12 : .await?;
896 12 : let mut result = HashMap::new();
897 12 : let mut sz = 0;
898 30 : for (_, v) in kv {
899 18 : let v = v?;
900 18 : let v = aux_file::decode_file_value_bytes(&v)
901 18 : .context("value decode")
902 18 : .map_err(PageReconstructError::Other)?;
903 34 : for (fname, content) in v {
904 16 : sz += fname.len();
905 16 : sz += content.len();
906 16 : result.insert(fname, content);
907 16 : }
908 : }
909 12 : self.aux_file_size_estimator.on_initial(sz);
910 12 : Ok(result)
911 12 : }
912 :
913 0 : pub(crate) async fn trigger_aux_file_size_computation(
914 0 : &self,
915 0 : lsn: Lsn,
916 0 : ctx: &RequestContext,
917 0 : ) -> Result<(), PageReconstructError> {
918 0 : self.list_aux_files_v2(lsn, ctx).await?;
919 0 : Ok(())
920 0 : }
921 :
922 12 : pub(crate) async fn list_aux_files(
923 12 : &self,
924 12 : lsn: Lsn,
925 12 : ctx: &RequestContext,
926 12 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
927 12 : self.list_aux_files_v2(lsn, ctx).await
928 12 : }
929 :
930 0 : pub(crate) async fn get_replorigins(
931 0 : &self,
932 0 : lsn: Lsn,
933 0 : ctx: &RequestContext,
934 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
935 0 : let kv = self
936 0 : .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
937 0 : .await?;
938 0 : let mut result = HashMap::new();
939 0 : for (k, v) in kv {
940 0 : let v = v?;
941 0 : let origin_id = k.field6 as RepOriginId;
942 0 : let origin_lsn = Lsn::des(&v).unwrap();
943 0 : if origin_lsn != Lsn::INVALID {
944 0 : result.insert(origin_id, origin_lsn);
945 0 : }
946 : }
947 0 : Ok(result)
948 0 : }
949 :
950 : /// Does the same as get_current_logical_size but counted on demand.
951 : /// Used to initialize the logical size tracking on startup.
952 : ///
953 : /// Only relation blocks are counted currently. That excludes metadata,
954 : /// SLRUs, twophase files etc.
955 : ///
956 : /// # Cancel-Safety
957 : ///
958 : /// This method is cancellation-safe.
959 0 : pub(crate) async fn get_current_logical_size_non_incremental(
960 0 : &self,
961 0 : lsn: Lsn,
962 0 : ctx: &RequestContext,
963 0 : ) -> Result<u64, CalculateLogicalSizeError> {
964 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
965 :
966 : // Fetch list of database dirs and iterate them
967 0 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
968 0 : let dbdir = DbDirectory::des(&buf)?;
969 :
970 0 : let mut total_size: u64 = 0;
971 0 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
972 0 : for rel in self
973 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
974 0 : .await?
975 : {
976 0 : if self.cancel.is_cancelled() {
977 0 : return Err(CalculateLogicalSizeError::Cancelled);
978 0 : }
979 0 : let relsize_key = rel_size_to_key(rel);
980 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
981 0 : let relsize = buf.get_u32_le();
982 0 :
983 0 : total_size += relsize as u64;
984 : }
985 : }
986 0 : Ok(total_size * BLCKSZ as u64)
987 0 : }
988 :
989 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
990 : /// for gc-compaction.
991 : ///
992 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
993 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
994 : /// be kept only for a specific range of LSN.
995 : ///
996 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
997 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
998 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
999 : /// determine which keys to retain/drop for gc-compaction.
1000 : ///
1001 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1002 : /// to be retained for each of the branch LSN.
1003 : ///
1004 : /// The return value is (dense keyspace, sparse keyspace).
1005 52 : pub(crate) async fn collect_gc_compaction_keyspace(
1006 52 : &self,
1007 52 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1008 52 : let metadata_key_begin = Key::metadata_key_range().start;
1009 52 : let aux_v1_key = AUX_FILES_KEY;
1010 52 : let dense_keyspace = KeySpace {
1011 52 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1012 52 : };
1013 52 : Ok((
1014 52 : dense_keyspace,
1015 52 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1016 52 : ))
1017 52 : }
1018 :
1019 : ///
1020 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1021 : /// Anything that's not listed maybe removed from the underlying storage (from
1022 : /// that LSN forwards).
1023 : ///
1024 : /// The return value is (dense keyspace, sparse keyspace).
1025 296 : pub(crate) async fn collect_keyspace(
1026 296 : &self,
1027 296 : lsn: Lsn,
1028 296 : ctx: &RequestContext,
1029 296 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1030 296 : // Iterate through key ranges, greedily packing them into partitions
1031 296 : let mut result = KeySpaceAccum::new();
1032 296 :
1033 296 : // The dbdir metadata always exists
1034 296 : result.add_key(DBDIR_KEY);
1035 :
1036 : // Fetch list of database dirs and iterate them
1037 296 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1038 296 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1039 296 :
1040 296 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1041 296 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1042 0 : if has_relmap_file {
1043 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1044 0 : }
1045 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1046 :
1047 0 : let mut rels: Vec<RelTag> = self
1048 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
1049 0 : .await?
1050 0 : .into_iter()
1051 0 : .collect();
1052 0 : rels.sort_unstable();
1053 0 : for rel in rels {
1054 0 : let relsize_key = rel_size_to_key(rel);
1055 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1056 0 : let relsize = buf.get_u32_le();
1057 0 :
1058 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1059 0 : result.add_key(relsize_key);
1060 : }
1061 : }
1062 :
1063 : // Iterate SLRUs next
1064 296 : if self.tenant_shard_id.is_shard_zero() {
1065 870 : for kind in [
1066 290 : SlruKind::Clog,
1067 290 : SlruKind::MultiXactMembers,
1068 290 : SlruKind::MultiXactOffsets,
1069 : ] {
1070 870 : let slrudir_key = slru_dir_to_key(kind);
1071 870 : result.add_key(slrudir_key);
1072 870 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1073 870 : let dir = SlruSegmentDirectory::des(&buf)?;
1074 870 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1075 870 : segments.sort_unstable();
1076 870 : for segno in segments {
1077 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1078 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1079 0 : let segsize = buf.get_u32_le();
1080 0 :
1081 0 : result.add_range(
1082 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1083 0 : );
1084 0 : result.add_key(segsize_key);
1085 : }
1086 : }
1087 6 : }
1088 :
1089 : // Then pg_twophase
1090 296 : result.add_key(TWOPHASEDIR_KEY);
1091 :
1092 296 : let mut xids: Vec<u64> = self
1093 296 : .list_twophase_files(lsn, ctx)
1094 296 : .await?
1095 296 : .iter()
1096 296 : .cloned()
1097 296 : .collect();
1098 296 : xids.sort_unstable();
1099 296 : for xid in xids {
1100 0 : result.add_key(twophase_file_key(xid));
1101 0 : }
1102 :
1103 296 : result.add_key(CONTROLFILE_KEY);
1104 296 : result.add_key(CHECKPOINT_KEY);
1105 296 :
1106 296 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1107 296 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1108 296 : // and the keys will not be garbage-colllected.
1109 296 : #[cfg(test)]
1110 296 : {
1111 296 : let guard = self.extra_test_dense_keyspace.load();
1112 296 : for kr in &guard.ranges {
1113 0 : result.add_range(kr.clone());
1114 0 : }
1115 0 : }
1116 0 :
1117 296 : let dense_keyspace = result.to_keyspace();
1118 296 : let sparse_keyspace = SparseKeySpace(KeySpace {
1119 296 : ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
1120 296 : });
1121 296 :
1122 296 : if cfg!(debug_assertions) {
1123 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1124 :
1125 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1126 : // category of sparse keys are split into their own image/delta files. If there
1127 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1128 : // and we want the developer to keep the keyspaces separated.
1129 :
1130 296 : let ranges = &sparse_keyspace.0.ranges;
1131 :
1132 : // TODO: use a single overlaps_with across the codebase
1133 296 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1134 296 : !(a.end <= b.start || b.end <= a.start)
1135 296 : }
1136 592 : for i in 0..ranges.len() {
1137 592 : for j in 0..i {
1138 296 : if overlaps_with(&ranges[i], &ranges[j]) {
1139 0 : panic!(
1140 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1141 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1142 0 : );
1143 296 : }
1144 : }
1145 : }
1146 296 : for i in 1..ranges.len() {
1147 296 : assert!(
1148 296 : ranges[i - 1].end <= ranges[i].start,
1149 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1150 0 : ranges[i - 1].start,
1151 0 : ranges[i - 1].end,
1152 0 : ranges[i].start,
1153 0 : ranges[i].end
1154 : );
1155 : }
1156 0 : }
1157 :
1158 296 : Ok((dense_keyspace, sparse_keyspace))
1159 296 : }
1160 :
1161 : /// Get cached size of relation if it not updated after specified LSN
1162 448540 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
1163 448540 : let rel_size_cache = self.rel_size_cache.read().unwrap();
1164 448540 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
1165 448518 : if lsn >= *cached_lsn {
1166 443372 : RELSIZE_CACHE_HITS.inc();
1167 443372 : return Some(*nblocks);
1168 5146 : }
1169 5146 : RELSIZE_CACHE_MISSES_OLD.inc();
1170 22 : }
1171 5168 : RELSIZE_CACHE_MISSES.inc();
1172 5168 : None
1173 448540 : }
1174 :
1175 : /// Update cached relation size if there is no more recent update
1176 5136 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1177 5136 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1178 5136 :
1179 5136 : if lsn < rel_size_cache.complete_as_of {
1180 : // Do not cache old values. It's safe to cache the size on read, as long as
1181 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1182 : // never evict values from the cache, so if the relation size changed after
1183 : // 'lsn', the new value is already in the cache.
1184 0 : return;
1185 5136 : }
1186 5136 :
1187 5136 : match rel_size_cache.map.entry(tag) {
1188 5136 : hash_map::Entry::Occupied(mut entry) => {
1189 5136 : let cached_lsn = entry.get_mut();
1190 5136 : if lsn >= cached_lsn.0 {
1191 0 : *cached_lsn = (lsn, nblocks);
1192 5136 : }
1193 : }
1194 0 : hash_map::Entry::Vacant(entry) => {
1195 0 : entry.insert((lsn, nblocks));
1196 0 : RELSIZE_CACHE_ENTRIES.inc();
1197 0 : }
1198 : }
1199 5136 : }
1200 :
1201 : /// Store cached relation size
1202 282720 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1203 282720 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1204 282720 : if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
1205 1920 : RELSIZE_CACHE_ENTRIES.inc();
1206 280800 : }
1207 282720 : }
1208 :
1209 : /// Remove cached relation size
1210 2 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1211 2 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1212 2 : if rel_size_cache.map.remove(tag).is_some() {
1213 2 : RELSIZE_CACHE_ENTRIES.dec();
1214 2 : }
1215 2 : }
1216 : }
1217 :
1218 : /// DatadirModification represents an operation to ingest an atomic set of
1219 : /// updates to the repository.
1220 : ///
1221 : /// It is created by the 'begin_record' function. It is called for each WAL
1222 : /// record, so that all the modifications by a one WAL record appear atomic.
1223 : pub struct DatadirModification<'a> {
1224 : /// The timeline this modification applies to. You can access this to
1225 : /// read the state, but note that any pending updates are *not* reflected
1226 : /// in the state in 'tline' yet.
1227 : pub tline: &'a Timeline,
1228 :
1229 : /// Current LSN of the modification
1230 : lsn: Lsn,
1231 :
1232 : // The modifications are not applied directly to the underlying key-value store.
1233 : // The put-functions add the modifications here, and they are flushed to the
1234 : // underlying key-value store by the 'finish' function.
1235 : pending_lsns: Vec<Lsn>,
1236 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1237 : pending_nblocks: i64,
1238 :
1239 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1240 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1241 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1242 :
1243 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1244 : /// which keys are stored here.
1245 : pending_data_batch: Option<SerializedValueBatch>,
1246 :
1247 : /// For special "directory" keys that store key-value maps, track the size of the map
1248 : /// if it was updated in this modification.
1249 : pending_directory_entries: Vec<(DirectoryKind, usize)>,
1250 :
1251 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1252 : pending_metadata_bytes: usize,
1253 : }
1254 :
1255 : impl DatadirModification<'_> {
1256 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1257 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1258 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1259 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1260 :
1261 : /// Get the current lsn
1262 418058 : pub(crate) fn get_lsn(&self) -> Lsn {
1263 418058 : self.lsn
1264 418058 : }
1265 :
1266 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1267 0 : self.pending_data_batch
1268 0 : .as_ref()
1269 0 : .map_or(0, |b| b.buffer_size())
1270 0 : + self.pending_metadata_bytes
1271 0 : }
1272 :
1273 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1274 0 : self.pending_data_batch
1275 0 : .as_ref()
1276 0 : .is_some_and(|b| b.has_data())
1277 0 : }
1278 :
1279 : /// Set the current lsn
1280 145858 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1281 145858 : ensure!(
1282 145858 : lsn >= self.lsn,
1283 0 : "setting an older lsn {} than {} is not allowed",
1284 : lsn,
1285 : self.lsn
1286 : );
1287 :
1288 145858 : if lsn > self.lsn {
1289 145858 : self.pending_lsns.push(self.lsn);
1290 145858 : self.lsn = lsn;
1291 145858 : }
1292 145858 : Ok(())
1293 145858 : }
1294 :
1295 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1296 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1297 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1298 : ///
1299 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1300 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1301 : /// via [`Self::get`] as soon as their record has been ingested.
1302 850442 : fn is_data_key(key: &Key) -> bool {
1303 850442 : key.is_rel_block_key() || key.is_slru_block_key()
1304 850442 : }
1305 :
1306 : /// Initialize a completely new repository.
1307 : ///
1308 : /// This inserts the directory metadata entries that are assumed to
1309 : /// always exist.
1310 182 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1311 182 : let buf = DbDirectory::ser(&DbDirectory {
1312 182 : dbdirs: HashMap::new(),
1313 182 : })?;
1314 182 : self.pending_directory_entries.push((DirectoryKind::Db, 0));
1315 182 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1316 :
1317 182 : let buf = if self.tline.pg_version >= 17 {
1318 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1319 0 : xids: HashSet::new(),
1320 0 : })
1321 : } else {
1322 182 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1323 182 : xids: HashSet::new(),
1324 182 : })
1325 0 : }?;
1326 182 : self.pending_directory_entries
1327 182 : .push((DirectoryKind::TwoPhase, 0));
1328 182 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1329 :
1330 182 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1331 182 : let empty_dir = Value::Image(buf);
1332 182 :
1333 182 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1334 182 : // harmless but they'd just be dropped on later compaction.
1335 182 : if self.tline.tenant_shard_id.is_shard_zero() {
1336 176 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1337 176 : self.pending_directory_entries
1338 176 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1339 176 : self.put(
1340 176 : slru_dir_to_key(SlruKind::MultiXactMembers),
1341 176 : empty_dir.clone(),
1342 176 : );
1343 176 : self.pending_directory_entries
1344 176 : .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
1345 176 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1346 176 : self.pending_directory_entries
1347 176 : .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
1348 176 : }
1349 :
1350 182 : Ok(())
1351 182 : }
1352 :
1353 : #[cfg(test)]
1354 180 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1355 180 : self.init_empty()?;
1356 180 : self.put_control_file(bytes::Bytes::from_static(
1357 180 : b"control_file contents do not matter",
1358 180 : ))
1359 180 : .context("put_control_file")?;
1360 180 : self.put_checkpoint(bytes::Bytes::from_static(
1361 180 : b"checkpoint_file contents do not matter",
1362 180 : ))
1363 180 : .context("put_checkpoint_file")?;
1364 180 : Ok(())
1365 180 : }
1366 :
1367 : /// Creates a relation if it is not already present.
1368 : /// Returns the current size of the relation
1369 418056 : pub(crate) async fn create_relation_if_required(
1370 418056 : &mut self,
1371 418056 : rel: RelTag,
1372 418056 : ctx: &RequestContext,
1373 418056 : ) -> Result<u32, PageReconstructError> {
1374 : // Get current size and put rel creation if rel doesn't exist
1375 : //
1376 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1377 : // check the cache too. This is because eagerly checking the cache results in
1378 : // less work overall and 10% better performance. It's more work on cache miss
1379 : // but cache miss is rare.
1380 418056 : if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
1381 418046 : Ok(nblocks)
1382 10 : } else if !self
1383 10 : .tline
1384 10 : .get_rel_exists(rel, Version::Modified(self), ctx)
1385 10 : .await?
1386 : {
1387 : // create it with 0 size initially, the logic below will extend it
1388 10 : self.put_rel_creation(rel, 0, ctx)
1389 10 : .await
1390 10 : .context("Relation Error")?;
1391 10 : Ok(0)
1392 : } else {
1393 0 : self.tline
1394 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1395 0 : .await
1396 : }
1397 418056 : }
1398 :
1399 : /// Given a block number for a relation (which represents a newly written block),
1400 : /// the previous block count of the relation, and the shard info, find the gaps
1401 : /// that were created by the newly written block if any.
1402 145670 : fn find_gaps(
1403 145670 : rel: RelTag,
1404 145670 : blkno: u32,
1405 145670 : previous_nblocks: u32,
1406 145670 : shard: &ShardIdentity,
1407 145670 : ) -> Option<KeySpace> {
1408 145670 : let mut key = rel_block_to_key(rel, blkno);
1409 145670 : let mut gap_accum = None;
1410 :
1411 145670 : for gap_blkno in previous_nblocks..blkno {
1412 32 : key.field6 = gap_blkno;
1413 32 :
1414 32 : if shard.get_shard_number(&key) != shard.number {
1415 8 : continue;
1416 24 : }
1417 24 :
1418 24 : gap_accum
1419 24 : .get_or_insert_with(KeySpaceAccum::new)
1420 24 : .add_key(key);
1421 : }
1422 :
1423 145670 : gap_accum.map(|accum| accum.to_keyspace())
1424 145670 : }
1425 :
1426 145852 : pub async fn ingest_batch(
1427 145852 : &mut self,
1428 145852 : mut batch: SerializedValueBatch,
1429 145852 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1430 145852 : shard: &ShardIdentity,
1431 145852 : ctx: &RequestContext,
1432 145852 : ) -> anyhow::Result<()> {
1433 145852 : let mut gaps_at_lsns = Vec::default();
1434 :
1435 145852 : for meta in batch.metadata.iter() {
1436 145642 : let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
1437 145642 : let new_nblocks = blkno + 1;
1438 :
1439 145642 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1440 145642 : if new_nblocks > old_nblocks {
1441 2390 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1442 143252 : }
1443 :
1444 145642 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1445 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1446 145642 : }
1447 : }
1448 :
1449 145852 : if !gaps_at_lsns.is_empty() {
1450 0 : batch.zero_gaps(gaps_at_lsns);
1451 145852 : }
1452 :
1453 145852 : match self.pending_data_batch.as_mut() {
1454 20 : Some(pending_batch) => {
1455 20 : pending_batch.extend(batch);
1456 20 : }
1457 145832 : None if batch.has_data() => {
1458 145630 : self.pending_data_batch = Some(batch);
1459 145630 : }
1460 202 : None => {
1461 202 : // Nothing to initialize the batch with
1462 202 : }
1463 : }
1464 :
1465 145852 : Ok(())
1466 145852 : }
1467 :
1468 : /// Put a new page version that can be constructed from a WAL record
1469 : ///
1470 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1471 : /// current end-of-file. It's up to the caller to check that the relation size
1472 : /// matches the blocks inserted!
1473 12 : pub fn put_rel_wal_record(
1474 12 : &mut self,
1475 12 : rel: RelTag,
1476 12 : blknum: BlockNumber,
1477 12 : rec: NeonWalRecord,
1478 12 : ) -> anyhow::Result<()> {
1479 12 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1480 12 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1481 12 : Ok(())
1482 12 : }
1483 :
1484 : // Same, but for an SLRU.
1485 8 : pub fn put_slru_wal_record(
1486 8 : &mut self,
1487 8 : kind: SlruKind,
1488 8 : segno: u32,
1489 8 : blknum: BlockNumber,
1490 8 : rec: NeonWalRecord,
1491 8 : ) -> anyhow::Result<()> {
1492 8 : if !self.tline.tenant_shard_id.is_shard_zero() {
1493 0 : return Ok(());
1494 8 : }
1495 8 :
1496 8 : self.put(
1497 8 : slru_block_to_key(kind, segno, blknum),
1498 8 : Value::WalRecord(rec),
1499 8 : );
1500 8 : Ok(())
1501 8 : }
1502 :
1503 : /// Like put_wal_record, but with ready-made image of the page.
1504 277842 : pub fn put_rel_page_image(
1505 277842 : &mut self,
1506 277842 : rel: RelTag,
1507 277842 : blknum: BlockNumber,
1508 277842 : img: Bytes,
1509 277842 : ) -> anyhow::Result<()> {
1510 277842 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1511 277842 : let key = rel_block_to_key(rel, blknum);
1512 277842 : if !key.is_valid_key_on_write_path() {
1513 0 : anyhow::bail!(
1514 0 : "the request contains data not supported by pageserver at {}",
1515 0 : key
1516 0 : );
1517 277842 : }
1518 277842 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1519 277842 : Ok(())
1520 277842 : }
1521 :
1522 6 : pub fn put_slru_page_image(
1523 6 : &mut self,
1524 6 : kind: SlruKind,
1525 6 : segno: u32,
1526 6 : blknum: BlockNumber,
1527 6 : img: Bytes,
1528 6 : ) -> anyhow::Result<()> {
1529 6 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1530 :
1531 6 : let key = slru_block_to_key(kind, segno, blknum);
1532 6 : if !key.is_valid_key_on_write_path() {
1533 0 : anyhow::bail!(
1534 0 : "the request contains data not supported by pageserver at {}",
1535 0 : key
1536 0 : );
1537 6 : }
1538 6 : self.put(key, Value::Image(img));
1539 6 : Ok(())
1540 6 : }
1541 :
1542 2998 : pub(crate) fn put_rel_page_image_zero(
1543 2998 : &mut self,
1544 2998 : rel: RelTag,
1545 2998 : blknum: BlockNumber,
1546 2998 : ) -> anyhow::Result<()> {
1547 2998 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1548 2998 : let key = rel_block_to_key(rel, blknum);
1549 2998 : if !key.is_valid_key_on_write_path() {
1550 0 : anyhow::bail!(
1551 0 : "the request contains data not supported by pageserver: {} @ {}",
1552 0 : key,
1553 0 : self.lsn
1554 0 : );
1555 2998 : }
1556 2998 :
1557 2998 : let batch = self
1558 2998 : .pending_data_batch
1559 2998 : .get_or_insert_with(SerializedValueBatch::default);
1560 2998 :
1561 2998 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1562 2998 :
1563 2998 : Ok(())
1564 2998 : }
1565 :
1566 0 : pub(crate) fn put_slru_page_image_zero(
1567 0 : &mut self,
1568 0 : kind: SlruKind,
1569 0 : segno: u32,
1570 0 : blknum: BlockNumber,
1571 0 : ) -> anyhow::Result<()> {
1572 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1573 0 : let key = slru_block_to_key(kind, segno, blknum);
1574 0 : if !key.is_valid_key_on_write_path() {
1575 0 : anyhow::bail!(
1576 0 : "the request contains data not supported by pageserver: {} @ {}",
1577 0 : key,
1578 0 : self.lsn
1579 0 : );
1580 0 : }
1581 0 :
1582 0 : let batch = self
1583 0 : .pending_data_batch
1584 0 : .get_or_insert_with(SerializedValueBatch::default);
1585 0 :
1586 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1587 0 :
1588 0 : Ok(())
1589 0 : }
1590 :
1591 : /// Store a relmapper file (pg_filenode.map) in the repository
1592 16 : pub async fn put_relmap_file(
1593 16 : &mut self,
1594 16 : spcnode: Oid,
1595 16 : dbnode: Oid,
1596 16 : img: Bytes,
1597 16 : ctx: &RequestContext,
1598 16 : ) -> anyhow::Result<()> {
1599 : // Add it to the directory (if it doesn't exist already)
1600 16 : let buf = self.get(DBDIR_KEY, ctx).await?;
1601 16 : let mut dbdir = DbDirectory::des(&buf)?;
1602 :
1603 16 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1604 16 : if r.is_none() || r == Some(false) {
1605 : // The dbdir entry didn't exist, or it contained a
1606 : // 'false'. The 'insert' call already updated it with
1607 : // 'true', now write the updated 'dbdirs' map back.
1608 16 : let buf = DbDirectory::ser(&dbdir)?;
1609 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1610 0 : }
1611 16 : if r.is_none() {
1612 : // Create RelDirectory
1613 8 : let buf = RelDirectory::ser(&RelDirectory {
1614 8 : rels: HashSet::new(),
1615 8 : })?;
1616 8 : self.pending_directory_entries.push((DirectoryKind::Rel, 0));
1617 8 : self.put(
1618 8 : rel_dir_to_key(spcnode, dbnode),
1619 8 : Value::Image(Bytes::from(buf)),
1620 8 : );
1621 8 : }
1622 :
1623 16 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1624 16 : Ok(())
1625 16 : }
1626 :
1627 0 : pub async fn put_twophase_file(
1628 0 : &mut self,
1629 0 : xid: u64,
1630 0 : img: Bytes,
1631 0 : ctx: &RequestContext,
1632 0 : ) -> anyhow::Result<()> {
1633 : // Add it to the directory entry
1634 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1635 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1636 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1637 0 : if !dir.xids.insert(xid) {
1638 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1639 0 : }
1640 0 : self.pending_directory_entries
1641 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1642 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1643 : } else {
1644 0 : let xid = xid as u32;
1645 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1646 0 : if !dir.xids.insert(xid) {
1647 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1648 0 : }
1649 0 : self.pending_directory_entries
1650 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1651 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1652 : };
1653 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1654 0 :
1655 0 : self.put(twophase_file_key(xid), Value::Image(img));
1656 0 : Ok(())
1657 0 : }
1658 :
1659 0 : pub async fn set_replorigin(
1660 0 : &mut self,
1661 0 : origin_id: RepOriginId,
1662 0 : origin_lsn: Lsn,
1663 0 : ) -> anyhow::Result<()> {
1664 0 : let key = repl_origin_key(origin_id);
1665 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1666 0 : Ok(())
1667 0 : }
1668 :
1669 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1670 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1671 0 : }
1672 :
1673 182 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1674 182 : self.put(CONTROLFILE_KEY, Value::Image(img));
1675 182 : Ok(())
1676 182 : }
1677 :
1678 196 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1679 196 : self.put(CHECKPOINT_KEY, Value::Image(img));
1680 196 : Ok(())
1681 196 : }
1682 :
1683 0 : pub async fn drop_dbdir(
1684 0 : &mut self,
1685 0 : spcnode: Oid,
1686 0 : dbnode: Oid,
1687 0 : ctx: &RequestContext,
1688 0 : ) -> anyhow::Result<()> {
1689 0 : let total_blocks = self
1690 0 : .tline
1691 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1692 0 : .await?;
1693 :
1694 : // Remove entry from dbdir
1695 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1696 0 : let mut dir = DbDirectory::des(&buf)?;
1697 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1698 0 : let buf = DbDirectory::ser(&dir)?;
1699 0 : self.pending_directory_entries
1700 0 : .push((DirectoryKind::Db, dir.dbdirs.len()));
1701 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1702 : } else {
1703 0 : warn!(
1704 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1705 : spcnode, dbnode
1706 : );
1707 : }
1708 :
1709 : // Update logical database size.
1710 0 : self.pending_nblocks -= total_blocks as i64;
1711 0 :
1712 0 : // Delete all relations and metadata files for the spcnode/dnode
1713 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1714 0 : Ok(())
1715 0 : }
1716 :
1717 : /// Create a relation fork.
1718 : ///
1719 : /// 'nblocks' is the initial size.
1720 1920 : pub async fn put_rel_creation(
1721 1920 : &mut self,
1722 1920 : rel: RelTag,
1723 1920 : nblocks: BlockNumber,
1724 1920 : ctx: &RequestContext,
1725 1920 : ) -> Result<(), RelationError> {
1726 1920 : if rel.relnode == 0 {
1727 0 : return Err(RelationError::InvalidRelnode);
1728 1920 : }
1729 : // It's possible that this is the first rel for this db in this
1730 : // tablespace. Create the reldir entry for it if so.
1731 1920 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1732 1920 : .context("deserialize db")?;
1733 1920 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1734 1920 : let mut rel_dir =
1735 1920 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1736 : // Didn't exist. Update dbdir
1737 8 : e.insert(false);
1738 8 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1739 8 : self.pending_directory_entries
1740 8 : .push((DirectoryKind::Db, dbdir.dbdirs.len()));
1741 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1742 8 :
1743 8 : // and create the RelDirectory
1744 8 : RelDirectory::default()
1745 : } else {
1746 : // reldir already exists, fetch it
1747 1912 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
1748 1912 : .context("deserialize db")?
1749 : };
1750 :
1751 : // Add the new relation to the rel directory entry, and write it back
1752 1920 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
1753 0 : return Err(RelationError::AlreadyExists);
1754 1920 : }
1755 1920 :
1756 1920 : self.pending_directory_entries
1757 1920 : .push((DirectoryKind::Rel, rel_dir.rels.len()));
1758 1920 :
1759 1920 : self.put(
1760 1920 : rel_dir_key,
1761 1920 : Value::Image(Bytes::from(
1762 1920 : RelDirectory::ser(&rel_dir).context("serialize")?,
1763 : )),
1764 : );
1765 :
1766 : // Put size
1767 1920 : let size_key = rel_size_to_key(rel);
1768 1920 : let buf = nblocks.to_le_bytes();
1769 1920 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1770 1920 :
1771 1920 : self.pending_nblocks += nblocks as i64;
1772 1920 :
1773 1920 : // Update relation size cache
1774 1920 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1775 1920 :
1776 1920 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1777 1920 : // caller.
1778 1920 : Ok(())
1779 1920 : }
1780 :
1781 : /// Truncate relation
1782 6012 : pub async fn put_rel_truncation(
1783 6012 : &mut self,
1784 6012 : rel: RelTag,
1785 6012 : nblocks: BlockNumber,
1786 6012 : ctx: &RequestContext,
1787 6012 : ) -> anyhow::Result<()> {
1788 6012 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1789 6012 : if self
1790 6012 : .tline
1791 6012 : .get_rel_exists(rel, Version::Modified(self), ctx)
1792 6012 : .await?
1793 : {
1794 6012 : let size_key = rel_size_to_key(rel);
1795 : // Fetch the old size first
1796 6012 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1797 6012 :
1798 6012 : // Update the entry with the new size.
1799 6012 : let buf = nblocks.to_le_bytes();
1800 6012 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1801 6012 :
1802 6012 : // Update relation size cache
1803 6012 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1804 6012 :
1805 6012 : // Update logical database size.
1806 6012 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1807 0 : }
1808 6012 : Ok(())
1809 6012 : }
1810 :
1811 : /// Extend relation
1812 : /// If new size is smaller, do nothing.
1813 276680 : pub async fn put_rel_extend(
1814 276680 : &mut self,
1815 276680 : rel: RelTag,
1816 276680 : nblocks: BlockNumber,
1817 276680 : ctx: &RequestContext,
1818 276680 : ) -> anyhow::Result<()> {
1819 276680 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1820 :
1821 : // Put size
1822 276680 : let size_key = rel_size_to_key(rel);
1823 276680 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1824 276680 :
1825 276680 : // only extend relation here. never decrease the size
1826 276680 : if nblocks > old_size {
1827 274788 : let buf = nblocks.to_le_bytes();
1828 274788 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1829 274788 :
1830 274788 : // Update relation size cache
1831 274788 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1832 274788 :
1833 274788 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1834 274788 : }
1835 276680 : Ok(())
1836 276680 : }
1837 :
1838 : /// Drop some relations
1839 10 : pub(crate) async fn put_rel_drops(
1840 10 : &mut self,
1841 10 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
1842 10 : ctx: &RequestContext,
1843 10 : ) -> anyhow::Result<()> {
1844 12 : for ((spc_node, db_node), rel_tags) in drop_relations {
1845 2 : let dir_key = rel_dir_to_key(spc_node, db_node);
1846 2 : let buf = self.get(dir_key, ctx).await?;
1847 2 : let mut dir = RelDirectory::des(&buf)?;
1848 :
1849 2 : let mut dirty = false;
1850 4 : for rel_tag in rel_tags {
1851 2 : if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
1852 2 : dirty = true;
1853 2 :
1854 2 : // update logical size
1855 2 : let size_key = rel_size_to_key(rel_tag);
1856 2 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1857 2 : self.pending_nblocks -= old_size as i64;
1858 2 :
1859 2 : // Remove entry from relation size cache
1860 2 : self.tline.remove_cached_rel_size(&rel_tag);
1861 2 :
1862 2 : // Delete size entry, as well as all blocks
1863 2 : self.delete(rel_key_range(rel_tag));
1864 0 : }
1865 : }
1866 :
1867 2 : if dirty {
1868 2 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1869 2 : self.pending_directory_entries
1870 2 : .push((DirectoryKind::Rel, dir.rels.len()));
1871 0 : }
1872 : }
1873 :
1874 10 : Ok(())
1875 10 : }
1876 :
1877 6 : pub async fn put_slru_segment_creation(
1878 6 : &mut self,
1879 6 : kind: SlruKind,
1880 6 : segno: u32,
1881 6 : nblocks: BlockNumber,
1882 6 : ctx: &RequestContext,
1883 6 : ) -> anyhow::Result<()> {
1884 6 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1885 :
1886 : // Add it to the directory entry
1887 6 : let dir_key = slru_dir_to_key(kind);
1888 6 : let buf = self.get(dir_key, ctx).await?;
1889 6 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1890 :
1891 6 : if !dir.segments.insert(segno) {
1892 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1893 6 : }
1894 6 : self.pending_directory_entries
1895 6 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1896 6 : self.put(
1897 6 : dir_key,
1898 6 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1899 : );
1900 :
1901 : // Put size
1902 6 : let size_key = slru_segment_size_to_key(kind, segno);
1903 6 : let buf = nblocks.to_le_bytes();
1904 6 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1905 6 :
1906 6 : // even if nblocks > 0, we don't insert any actual blocks here
1907 6 :
1908 6 : Ok(())
1909 6 : }
1910 :
1911 : /// Extend SLRU segment
1912 0 : pub fn put_slru_extend(
1913 0 : &mut self,
1914 0 : kind: SlruKind,
1915 0 : segno: u32,
1916 0 : nblocks: BlockNumber,
1917 0 : ) -> anyhow::Result<()> {
1918 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1919 :
1920 : // Put size
1921 0 : let size_key = slru_segment_size_to_key(kind, segno);
1922 0 : let buf = nblocks.to_le_bytes();
1923 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1924 0 : Ok(())
1925 0 : }
1926 :
1927 : /// This method is used for marking truncated SLRU files
1928 0 : pub async fn drop_slru_segment(
1929 0 : &mut self,
1930 0 : kind: SlruKind,
1931 0 : segno: u32,
1932 0 : ctx: &RequestContext,
1933 0 : ) -> anyhow::Result<()> {
1934 0 : // Remove it from the directory entry
1935 0 : let dir_key = slru_dir_to_key(kind);
1936 0 : let buf = self.get(dir_key, ctx).await?;
1937 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1938 :
1939 0 : if !dir.segments.remove(&segno) {
1940 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1941 0 : }
1942 0 : self.pending_directory_entries
1943 0 : .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
1944 0 : self.put(
1945 0 : dir_key,
1946 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1947 : );
1948 :
1949 : // Delete size entry, as well as all blocks
1950 0 : self.delete(slru_segment_key_range(kind, segno));
1951 0 :
1952 0 : Ok(())
1953 0 : }
1954 :
1955 : /// Drop a relmapper file (pg_filenode.map)
1956 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1957 0 : // TODO
1958 0 : Ok(())
1959 0 : }
1960 :
1961 : /// This method is used for marking truncated SLRU files
1962 0 : pub async fn drop_twophase_file(
1963 0 : &mut self,
1964 0 : xid: u64,
1965 0 : ctx: &RequestContext,
1966 0 : ) -> anyhow::Result<()> {
1967 : // Remove it from the directory entry
1968 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1969 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1970 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
1971 :
1972 0 : if !dir.xids.remove(&xid) {
1973 0 : warn!("twophase file for xid {} does not exist", xid);
1974 0 : }
1975 0 : self.pending_directory_entries
1976 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1977 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1978 : } else {
1979 0 : let xid: u32 = u32::try_from(xid)?;
1980 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1981 :
1982 0 : if !dir.xids.remove(&xid) {
1983 0 : warn!("twophase file for xid {} does not exist", xid);
1984 0 : }
1985 0 : self.pending_directory_entries
1986 0 : .push((DirectoryKind::TwoPhase, dir.xids.len()));
1987 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1988 : };
1989 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1990 0 :
1991 0 : // Delete it
1992 0 : self.delete(twophase_key_range(xid));
1993 0 :
1994 0 : Ok(())
1995 0 : }
1996 :
1997 16 : pub async fn put_file(
1998 16 : &mut self,
1999 16 : path: &str,
2000 16 : content: &[u8],
2001 16 : ctx: &RequestContext,
2002 16 : ) -> anyhow::Result<()> {
2003 16 : let key = aux_file::encode_aux_file_key(path);
2004 : // retrieve the key from the engine
2005 16 : let old_val = match self.get(key, ctx).await {
2006 4 : Ok(val) => Some(val),
2007 12 : Err(PageReconstructError::MissingKey(_)) => None,
2008 0 : Err(e) => return Err(e.into()),
2009 : };
2010 16 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2011 4 : aux_file::decode_file_value(old_val)?
2012 : } else {
2013 12 : Vec::new()
2014 : };
2015 16 : let mut other_files = Vec::with_capacity(files.len());
2016 16 : let mut modifying_file = None;
2017 20 : for file @ (p, content) in files {
2018 4 : if path == p {
2019 4 : assert!(
2020 4 : modifying_file.is_none(),
2021 0 : "duplicated entries found for {}",
2022 : path
2023 : );
2024 4 : modifying_file = Some(content);
2025 0 : } else {
2026 0 : other_files.push(file);
2027 0 : }
2028 : }
2029 16 : let mut new_files = other_files;
2030 16 : match (modifying_file, content.is_empty()) {
2031 2 : (Some(old_content), false) => {
2032 2 : self.tline
2033 2 : .aux_file_size_estimator
2034 2 : .on_update(old_content.len(), content.len());
2035 2 : new_files.push((path, content));
2036 2 : }
2037 2 : (Some(old_content), true) => {
2038 2 : self.tline
2039 2 : .aux_file_size_estimator
2040 2 : .on_remove(old_content.len());
2041 2 : // not adding the file key to the final `new_files` vec.
2042 2 : }
2043 12 : (None, false) => {
2044 12 : self.tline.aux_file_size_estimator.on_add(content.len());
2045 12 : new_files.push((path, content));
2046 12 : }
2047 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2048 : }
2049 16 : let new_val = aux_file::encode_file_value(&new_files)?;
2050 16 : self.put(key, Value::Image(new_val.into()));
2051 16 :
2052 16 : Ok(())
2053 16 : }
2054 :
2055 : ///
2056 : /// Flush changes accumulated so far to the underlying repository.
2057 : ///
2058 : /// Usually, changes made in DatadirModification are atomic, but this allows
2059 : /// you to flush them to the underlying repository before the final `commit`.
2060 : /// That allows to free up the memory used to hold the pending changes.
2061 : ///
2062 : /// Currently only used during bulk import of a data directory. In that
2063 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2064 : /// whole import fails and the timeline will be deleted anyway.
2065 : /// (Or to be precise, it will be left behind for debugging purposes and
2066 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2067 : ///
2068 : /// Note: A consequence of flushing the pending operations is that they
2069 : /// won't be visible to subsequent operations until `commit`. The function
2070 : /// retains all the metadata, but data pages are flushed. That's again OK
2071 : /// for bulk import, where you are just loading data pages and won't try to
2072 : /// modify the same pages twice.
2073 1930 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2074 1930 : // Unless we have accumulated a decent amount of changes, it's not worth it
2075 1930 : // to scan through the pending_updates list.
2076 1930 : let pending_nblocks = self.pending_nblocks;
2077 1930 : if pending_nblocks < 10000 {
2078 1930 : return Ok(());
2079 0 : }
2080 :
2081 0 : let mut writer = self.tline.writer().await;
2082 :
2083 : // Flush relation and SLRU data blocks, keep metadata.
2084 0 : if let Some(batch) = self.pending_data_batch.take() {
2085 0 : tracing::debug!(
2086 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2087 0 : batch.max_lsn,
2088 0 : self.tline.get_last_record_lsn()
2089 : );
2090 :
2091 : // This bails out on first error without modifying pending_updates.
2092 : // That's Ok, cf this function's doc comment.
2093 0 : writer.put_batch(batch, ctx).await?;
2094 0 : }
2095 :
2096 0 : if pending_nblocks != 0 {
2097 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2098 0 : self.pending_nblocks = 0;
2099 0 : }
2100 :
2101 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2102 0 : writer.update_directory_entries_count(kind, count as u64);
2103 0 : }
2104 :
2105 0 : Ok(())
2106 1930 : }
2107 :
2108 : ///
2109 : /// Finish this atomic update, writing all the updated keys to the
2110 : /// underlying timeline.
2111 : /// All the modifications in this atomic update are stamped by the specified LSN.
2112 : ///
2113 743068 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2114 743068 : let mut writer = self.tline.writer().await;
2115 :
2116 743068 : let pending_nblocks = self.pending_nblocks;
2117 743068 : self.pending_nblocks = 0;
2118 :
2119 : // Ordering: the items in this batch do not need to be in any global order, but values for
2120 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2121 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2122 : // more details.
2123 :
2124 743068 : let metadata_batch = {
2125 743068 : let pending_meta = self
2126 743068 : .pending_metadata_pages
2127 743068 : .drain()
2128 743068 : .flat_map(|(key, values)| {
2129 273836 : values
2130 273836 : .into_iter()
2131 273836 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2132 743068 : })
2133 743068 : .collect::<Vec<_>>();
2134 743068 :
2135 743068 : if pending_meta.is_empty() {
2136 472278 : None
2137 : } else {
2138 270790 : Some(SerializedValueBatch::from_values(pending_meta))
2139 : }
2140 : };
2141 :
2142 743068 : let data_batch = self.pending_data_batch.take();
2143 :
2144 743068 : let maybe_batch = match (data_batch, metadata_batch) {
2145 264556 : (Some(mut data), Some(metadata)) => {
2146 264556 : data.extend(metadata);
2147 264556 : Some(data)
2148 : }
2149 143262 : (Some(data), None) => Some(data),
2150 6234 : (None, Some(metadata)) => Some(metadata),
2151 329016 : (None, None) => None,
2152 : };
2153 :
2154 743068 : if let Some(batch) = maybe_batch {
2155 414052 : tracing::debug!(
2156 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2157 0 : batch.max_lsn,
2158 0 : self.tline.get_last_record_lsn()
2159 : );
2160 :
2161 : // This bails out on first error without modifying pending_updates.
2162 : // That's Ok, cf this function's doc comment.
2163 414052 : writer.put_batch(batch, ctx).await?;
2164 329016 : }
2165 :
2166 743068 : if !self.pending_deletions.is_empty() {
2167 2 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2168 2 : self.pending_deletions.clear();
2169 743066 : }
2170 :
2171 743068 : self.pending_lsns.push(self.lsn);
2172 888926 : for pending_lsn in self.pending_lsns.drain(..) {
2173 888926 : // TODO(vlad): pretty sure the comment below is not valid anymore
2174 888926 : // and we can call finish write with the latest LSN
2175 888926 : //
2176 888926 : // Ideally, we should be able to call writer.finish_write() only once
2177 888926 : // with the highest LSN. However, the last_record_lsn variable in the
2178 888926 : // timeline keeps track of the latest LSN and the immediate previous LSN
2179 888926 : // so we need to record every LSN to not leave a gap between them.
2180 888926 : writer.finish_write(pending_lsn);
2181 888926 : }
2182 :
2183 743068 : if pending_nblocks != 0 {
2184 270570 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2185 472498 : }
2186 :
2187 743068 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2188 2836 : writer.update_directory_entries_count(kind, count as u64);
2189 2836 : }
2190 :
2191 743068 : self.pending_metadata_bytes = 0;
2192 743068 :
2193 743068 : Ok(())
2194 743068 : }
2195 :
2196 291704 : pub(crate) fn len(&self) -> usize {
2197 291704 : self.pending_metadata_pages.len()
2198 291704 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2199 291704 : + self.pending_deletions.len()
2200 291704 : }
2201 :
2202 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2203 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2204 : /// in the modification.
2205 : ///
2206 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2207 : /// page must ensure that the pages they read are already committed in Timeline, for example
2208 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2209 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2210 : /// and not data pages.
2211 286586 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2212 286586 : if !Self::is_data_key(&key) {
2213 : // Have we already updated the same key? Read the latest pending updated
2214 : // version in that case.
2215 : //
2216 : // Note: we don't check pending_deletions. It is an error to request a
2217 : // value that has been removed, deletion only avoids leaking storage.
2218 286586 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2219 15928 : if let Some((_, _, value)) = values.last() {
2220 15928 : return if let Value::Image(img) = value {
2221 15928 : Ok(img.clone())
2222 : } else {
2223 : // Currently, we never need to read back a WAL record that we
2224 : // inserted in the same "transaction". All the metadata updates
2225 : // work directly with Images, and we never need to read actual
2226 : // data pages. We could handle this if we had to, by calling
2227 : // the walredo manager, but let's keep it simple for now.
2228 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2229 0 : "unexpected pending WAL record"
2230 0 : )))
2231 : };
2232 0 : }
2233 270658 : }
2234 : } else {
2235 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2236 : // this key should never be present in pending_data_pages. We ensure this by committing
2237 : // modifications before ingesting DB create operations, which are the only kind that reads
2238 : // data pages during ingest.
2239 0 : if cfg!(debug_assertions) {
2240 0 : assert!(!self
2241 0 : .pending_data_batch
2242 0 : .as_ref()
2243 0 : .is_some_and(|b| b.updates_key(&key)));
2244 0 : }
2245 : }
2246 :
2247 : // Metadata page cache miss, or we're reading a data page.
2248 270658 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2249 270658 : self.tline.get(key, lsn, ctx).await
2250 286586 : }
2251 :
2252 563856 : fn put(&mut self, key: Key, val: Value) {
2253 563856 : if Self::is_data_key(&key) {
2254 277868 : self.put_data(key.to_compact(), val)
2255 : } else {
2256 285988 : self.put_metadata(key.to_compact(), val)
2257 : }
2258 563856 : }
2259 :
2260 277868 : fn put_data(&mut self, key: CompactKey, val: Value) {
2261 277868 : let batch = self
2262 277868 : .pending_data_batch
2263 277868 : .get_or_insert_with(SerializedValueBatch::default);
2264 277868 : batch.put(key, val, self.lsn);
2265 277868 : }
2266 :
2267 285988 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2268 285988 : let values = self.pending_metadata_pages.entry(key).or_default();
2269 : // Replace the previous value if it exists at the same lsn
2270 285988 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2271 12152 : if *last_lsn == self.lsn {
2272 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2273 12152 : self.pending_metadata_bytes -= *last_value_ser_size;
2274 12152 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2275 12152 : self.pending_metadata_bytes += *last_value_ser_size;
2276 12152 :
2277 12152 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2278 12152 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2279 12152 : *last_value = val;
2280 12152 : return;
2281 0 : }
2282 273836 : }
2283 :
2284 273836 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2285 273836 : self.pending_metadata_bytes += val_serialized_size;
2286 273836 : values.push((self.lsn, val_serialized_size, val));
2287 273836 :
2288 273836 : if key == CHECKPOINT_KEY.to_compact() {
2289 196 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2290 273640 : }
2291 285988 : }
2292 :
2293 2 : fn delete(&mut self, key_range: Range<Key>) {
2294 2 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2295 2 : self.pending_deletions.push((key_range, self.lsn));
2296 2 : }
2297 : }
2298 :
2299 : /// This struct facilitates accessing either a committed key from the timeline at a
2300 : /// specific LSN, or the latest uncommitted key from a pending modification.
2301 : ///
2302 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2303 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2304 : /// need to look up the keys in the modification first before looking them up in the
2305 : /// timeline to not miss the latest updates.
2306 : #[derive(Clone, Copy)]
2307 : pub enum Version<'a> {
2308 : Lsn(Lsn),
2309 : Modified(&'a DatadirModification<'a>),
2310 : }
2311 :
2312 : impl Version<'_> {
2313 5176 : async fn get(
2314 5176 : &self,
2315 5176 : timeline: &Timeline,
2316 5176 : key: Key,
2317 5176 : ctx: &RequestContext,
2318 5176 : ) -> Result<Bytes, PageReconstructError> {
2319 5176 : match self {
2320 5156 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2321 20 : Version::Modified(modification) => modification.get(key, ctx).await,
2322 : }
2323 5176 : }
2324 :
2325 35620 : fn get_lsn(&self) -> Lsn {
2326 35620 : match self {
2327 29574 : Version::Lsn(lsn) => *lsn,
2328 6046 : Version::Modified(modification) => modification.lsn,
2329 : }
2330 35620 : }
2331 : }
2332 :
2333 : //--- Metadata structs stored in key-value pairs in the repository.
2334 :
2335 0 : #[derive(Debug, Serialize, Deserialize)]
2336 : pub(crate) struct DbDirectory {
2337 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2338 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2339 : }
2340 :
2341 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2342 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2343 : // were named like "pg_twophase/000002E5", now they're like
2344 : // "pg_twophsae/0000000A000002E4".
2345 :
2346 0 : #[derive(Debug, Serialize, Deserialize)]
2347 : pub(crate) struct TwoPhaseDirectory {
2348 : pub(crate) xids: HashSet<TransactionId>,
2349 : }
2350 :
2351 0 : #[derive(Debug, Serialize, Deserialize)]
2352 : struct TwoPhaseDirectoryV17 {
2353 : xids: HashSet<u64>,
2354 : }
2355 :
2356 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2357 : pub(crate) struct RelDirectory {
2358 : // Set of relations that exist. (relfilenode, forknum)
2359 : //
2360 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2361 : // key-value pairs, if you have a lot of relations
2362 : pub(crate) rels: HashSet<(Oid, u8)>,
2363 : }
2364 :
2365 0 : #[derive(Debug, Serialize, Deserialize)]
2366 : struct RelSizeEntry {
2367 : nblocks: u32,
2368 : }
2369 :
2370 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2371 : pub(crate) struct SlruSegmentDirectory {
2372 : // Set of SLRU segments that exist.
2373 : pub(crate) segments: HashSet<u32>,
2374 : }
2375 :
2376 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2377 : #[repr(u8)]
2378 : pub(crate) enum DirectoryKind {
2379 : Db,
2380 : TwoPhase,
2381 : Rel,
2382 : AuxFiles,
2383 : SlruSegment(SlruKind),
2384 : }
2385 :
2386 : impl DirectoryKind {
2387 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2388 5672 : pub(crate) fn offset(&self) -> usize {
2389 5672 : self.into_usize()
2390 5672 : }
2391 : }
2392 :
2393 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2394 :
2395 : #[allow(clippy::bool_assert_comparison)]
2396 : #[cfg(test)]
2397 : mod tests {
2398 : use hex_literal::hex;
2399 : use pageserver_api::{models::ShardParameters, shard::ShardStripeSize};
2400 : use utils::{
2401 : id::TimelineId,
2402 : shard::{ShardCount, ShardNumber},
2403 : };
2404 :
2405 : use super::*;
2406 :
2407 : use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
2408 :
2409 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2410 : #[tokio::test]
2411 2 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2412 2 : let name = "aux_files_round_trip";
2413 2 : let harness = TenantHarness::create(name).await?;
2414 2 :
2415 2 : pub const TIMELINE_ID: TimelineId =
2416 2 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2417 2 :
2418 2 : let (tenant, ctx) = harness.load().await;
2419 2 : let tline = tenant
2420 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2421 2 : .await?;
2422 2 : let tline = tline.raw_timeline().unwrap();
2423 2 :
2424 2 : // First modification: insert two keys
2425 2 : let mut modification = tline.begin_modification(Lsn(0x1000));
2426 2 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2427 2 : modification.set_lsn(Lsn(0x1008))?;
2428 2 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2429 2 : modification.commit(&ctx).await?;
2430 2 : let expect_1008 = HashMap::from([
2431 2 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2432 2 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2433 2 : ]);
2434 2 :
2435 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2436 2 : assert_eq!(readback, expect_1008);
2437 2 :
2438 2 : // Second modification: update one key, remove the other
2439 2 : let mut modification = tline.begin_modification(Lsn(0x2000));
2440 2 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2441 2 : modification.set_lsn(Lsn(0x2008))?;
2442 2 : modification.put_file("foo/bar2", b"", &ctx).await?;
2443 2 : modification.commit(&ctx).await?;
2444 2 : let expect_2008 =
2445 2 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2446 2 :
2447 2 : let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
2448 2 : assert_eq!(readback, expect_2008);
2449 2 :
2450 2 : // Reading back in time works
2451 2 : let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
2452 2 : assert_eq!(readback, expect_1008);
2453 2 :
2454 2 : Ok(())
2455 2 : }
2456 :
2457 : #[test]
2458 2 : fn gap_finding() {
2459 2 : let rel = RelTag {
2460 2 : spcnode: 1663,
2461 2 : dbnode: 208101,
2462 2 : relnode: 2620,
2463 2 : forknum: 0,
2464 2 : };
2465 2 : let base_blkno = 1;
2466 2 :
2467 2 : let base_key = rel_block_to_key(rel, base_blkno);
2468 2 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2469 2 :
2470 2 : let shard = ShardIdentity::unsharded();
2471 2 :
2472 2 : let mut previous_nblocks = 0;
2473 22 : for i in 0..10 {
2474 20 : let crnt_blkno = base_blkno + i;
2475 20 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2476 20 :
2477 20 : previous_nblocks = crnt_blkno + 1;
2478 20 :
2479 20 : if i == 0 {
2480 : // The first block we write is 1, so we should find the gap.
2481 2 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2482 : } else {
2483 18 : assert!(gaps.is_none());
2484 : }
2485 : }
2486 :
2487 : // This is an update to an already existing block. No gaps here.
2488 2 : let update_blkno = 5;
2489 2 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2490 2 : assert!(gaps.is_none());
2491 :
2492 : // This is an update past the current end block.
2493 2 : let after_gap_blkno = 20;
2494 2 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2495 2 :
2496 2 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2497 2 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2498 2 : assert_eq!(
2499 2 : gaps.unwrap(),
2500 2 : KeySpace::single(gap_start_key..after_gap_key)
2501 2 : );
2502 2 : }
2503 :
2504 : #[test]
2505 2 : fn sharded_gap_finding() {
2506 2 : let rel = RelTag {
2507 2 : spcnode: 1663,
2508 2 : dbnode: 208101,
2509 2 : relnode: 2620,
2510 2 : forknum: 0,
2511 2 : };
2512 2 :
2513 2 : let first_blkno = 6;
2514 2 :
2515 2 : // This shard will get the even blocks
2516 2 : let shard = ShardIdentity::from_params(
2517 2 : ShardNumber(0),
2518 2 : &ShardParameters {
2519 2 : count: ShardCount(2),
2520 2 : stripe_size: ShardStripeSize(1),
2521 2 : },
2522 2 : );
2523 2 :
2524 2 : // Only keys belonging to this shard are considered as gaps.
2525 2 : let mut previous_nblocks = 0;
2526 2 : let gaps =
2527 2 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2528 2 : assert!(!gaps.ranges.is_empty());
2529 6 : for gap_range in gaps.ranges {
2530 4 : let mut k = gap_range.start;
2531 8 : while k != gap_range.end {
2532 4 : assert_eq!(shard.get_shard_number(&k), shard.number);
2533 4 : k = k.next();
2534 : }
2535 : }
2536 :
2537 2 : previous_nblocks = first_blkno;
2538 2 :
2539 2 : let update_blkno = 2;
2540 2 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2541 2 : assert!(gaps.is_none());
2542 2 : }
2543 :
2544 : /*
2545 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2546 : let incremental = timeline.get_current_logical_size();
2547 : let non_incremental = timeline
2548 : .get_current_logical_size_non_incremental(lsn)
2549 : .unwrap();
2550 : assert_eq!(incremental, non_incremental);
2551 : }
2552 : */
2553 :
2554 : /*
2555 : ///
2556 : /// Test list_rels() function, with branches and dropped relations
2557 : ///
2558 : #[test]
2559 : fn test_list_rels_drop() -> Result<()> {
2560 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2561 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2562 : const TESTDB: u32 = 111;
2563 :
2564 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2565 : // after branching fails below
2566 : let mut writer = tline.begin_record(Lsn(0x10));
2567 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2568 : writer.finish()?;
2569 :
2570 : // Create a relation on the timeline
2571 : let mut writer = tline.begin_record(Lsn(0x20));
2572 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2573 : writer.finish()?;
2574 :
2575 : let writer = tline.begin_record(Lsn(0x00));
2576 : writer.finish()?;
2577 :
2578 : // Check that list_rels() lists it after LSN 2, but no before it
2579 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2580 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2581 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2582 :
2583 : // Create a branch, check that the relation is visible there
2584 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2585 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2586 : Some(timeline) => timeline,
2587 : None => panic!("Should have a local timeline"),
2588 : };
2589 : let newtline = DatadirTimelineImpl::new(newtline);
2590 : assert!(newtline
2591 : .list_rels(0, TESTDB, Lsn(0x30))?
2592 : .contains(&TESTREL_A));
2593 :
2594 : // Drop it on the branch
2595 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2596 : new_writer.drop_relation(TESTREL_A)?;
2597 : new_writer.finish()?;
2598 :
2599 : // Check that it's no longer listed on the branch after the point where it was dropped
2600 : assert!(newtline
2601 : .list_rels(0, TESTDB, Lsn(0x30))?
2602 : .contains(&TESTREL_A));
2603 : assert!(!newtline
2604 : .list_rels(0, TESTDB, Lsn(0x40))?
2605 : .contains(&TESTREL_A));
2606 :
2607 : // Run checkpoint and garbage collection and check that it's still not visible
2608 : newtline.checkpoint(CheckpointConfig::Forced)?;
2609 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2610 :
2611 : assert!(!newtline
2612 : .list_rels(0, TESTDB, Lsn(0x40))?
2613 : .contains(&TESTREL_A));
2614 :
2615 : Ok(())
2616 : }
2617 : */
2618 :
2619 : /*
2620 : #[test]
2621 : fn test_read_beyond_eof() -> Result<()> {
2622 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
2623 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
2624 :
2625 : make_some_layers(&tline, Lsn(0x20))?;
2626 : let mut writer = tline.begin_record(Lsn(0x60));
2627 : walingest.put_rel_page_image(
2628 : &mut writer,
2629 : TESTREL_A,
2630 : 0,
2631 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
2632 : )?;
2633 : writer.finish()?;
2634 :
2635 : // Test read before rel creation. Should error out.
2636 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
2637 :
2638 : // Read block beyond end of relation at different points in time.
2639 : // These reads should fall into different delta, image, and in-memory layers.
2640 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
2641 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
2642 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
2643 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
2644 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
2645 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
2646 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
2647 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
2648 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
2649 :
2650 : // Test on an in-memory layer with no preceding layer
2651 : let mut writer = tline.begin_record(Lsn(0x70));
2652 : walingest.put_rel_page_image(
2653 : &mut writer,
2654 : TESTREL_B,
2655 : 0,
2656 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
2657 : )?;
2658 : writer.finish()?;
2659 :
2660 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
2661 :
2662 : Ok(())
2663 : }
2664 : */
2665 : }
|