Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 :
12 : use crate::walingest::{WalIngestError, WalIngestErrorKind};
13 : use crate::{PERF_TRACE_TARGET, ensure_walingest};
14 : use anyhow::Context;
15 : use bytes::{Buf, Bytes, BytesMut};
16 : use enum_map::Enum;
17 : use pageserver_api::key::{
18 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
19 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
20 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
21 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
22 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
23 : };
24 : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
25 : use pageserver_api::models::RelSizeMigration;
26 : use pageserver_api::record::NeonWalRecord;
27 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
28 : use pageserver_api::shard::ShardIdentity;
29 : use pageserver_api::value::Value;
30 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
31 : use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
32 : use serde::{Deserialize, Serialize};
33 : use strum::IntoEnumIterator;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::{debug, info, info_span, trace, warn};
36 : use utils::bin_ser::{BeSer, DeserializeError};
37 : use utils::lsn::Lsn;
38 : use utils::pausable_failpoint;
39 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
40 :
41 : use super::tenant::{PageReconstructError, Timeline};
42 : use crate::aux_file;
43 : use crate::context::{PerfInstrumentFutureExt, RequestContext};
44 : use crate::keyspace::{KeySpace, KeySpaceAccum};
45 : use crate::metrics::{
46 : RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
47 : };
48 : use crate::span::{
49 : debug_assert_current_span_has_tenant_and_timeline_id,
50 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
51 : };
52 : use crate::tenant::storage_layer::IoConcurrency;
53 : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
54 :
55 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
56 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
57 :
58 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
59 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
60 :
61 : #[derive(Debug)]
62 : pub enum LsnForTimestamp {
63 : /// Found commits both before and after the given timestamp
64 : Present(Lsn),
65 :
66 : /// Found no commits after the given timestamp, this means
67 : /// that the newest data in the branch is older than the given
68 : /// timestamp.
69 : ///
70 : /// All commits <= LSN happened before the given timestamp
71 : Future(Lsn),
72 :
73 : /// The queried timestamp is past our horizon we look back at (PITR)
74 : ///
75 : /// All commits > LSN happened after the given timestamp,
76 : /// but any commits < LSN might have happened before or after
77 : /// the given timestamp. We don't know because no data before
78 : /// the given lsn is available.
79 : Past(Lsn),
80 :
81 : /// We have found no commit with a timestamp,
82 : /// so we can't return anything meaningful.
83 : ///
84 : /// The associated LSN is the lower bound value we can safely
85 : /// create branches on, but no statement is made if it is
86 : /// older or newer than the timestamp.
87 : ///
88 : /// This variant can e.g. be returned right after a
89 : /// cluster import.
90 : NoData(Lsn),
91 : }
92 :
93 : #[derive(Debug, thiserror::Error)]
94 : pub(crate) enum CalculateLogicalSizeError {
95 : #[error("cancelled")]
96 : Cancelled,
97 :
98 : /// Something went wrong while reading the metadata we use to calculate logical size
99 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
100 : /// in the `From` implementation for this variant.
101 : #[error(transparent)]
102 : PageRead(PageReconstructError),
103 :
104 : /// Something went wrong deserializing metadata that we read to calculate logical size
105 : #[error("decode error: {0}")]
106 : Decode(#[from] DeserializeError),
107 : }
108 :
109 : #[derive(Debug, thiserror::Error)]
110 : pub(crate) enum CollectKeySpaceError {
111 : #[error(transparent)]
112 : Decode(#[from] DeserializeError),
113 : #[error(transparent)]
114 : PageRead(PageReconstructError),
115 : #[error("cancelled")]
116 : Cancelled,
117 : }
118 :
119 : impl From<PageReconstructError> for CollectKeySpaceError {
120 0 : fn from(err: PageReconstructError) -> Self {
121 0 : match err {
122 0 : PageReconstructError::Cancelled => Self::Cancelled,
123 0 : err => Self::PageRead(err),
124 : }
125 0 : }
126 : }
127 :
128 : impl From<PageReconstructError> for CalculateLogicalSizeError {
129 0 : fn from(pre: PageReconstructError) -> Self {
130 0 : match pre {
131 0 : PageReconstructError::Cancelled => Self::Cancelled,
132 0 : _ => Self::PageRead(pre),
133 : }
134 0 : }
135 : }
136 :
137 : #[derive(Debug, thiserror::Error)]
138 : pub enum RelationError {
139 : #[error("invalid relnode")]
140 : InvalidRelnode,
141 : }
142 :
143 : ///
144 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
145 : /// and other special kinds of files, in a versioned key-value store. The
146 : /// Timeline struct provides the key-value store.
147 : ///
148 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
149 : /// implementation, and might be moved into a separate struct later.
150 : impl Timeline {
151 : /// Start ingesting a WAL record, or other atomic modification of
152 : /// the timeline.
153 : ///
154 : /// This provides a transaction-like interface to perform a bunch
155 : /// of modifications atomically.
156 : ///
157 : /// To ingest a WAL record, call begin_modification(lsn) to get a
158 : /// DatadirModification object. Use the functions in the object to
159 : /// modify the repository state, updating all the pages and metadata
160 : /// that the WAL record affects. When you're done, call commit() to
161 : /// commit the changes.
162 : ///
163 : /// Lsn stored in modification is advanced by `ingest_record` and
164 : /// is used by `commit()` to update `last_record_lsn`.
165 : ///
166 : /// Calling commit() will flush all the changes and reset the state,
167 : /// so the `DatadirModification` struct can be reused to perform the next modification.
168 : ///
169 : /// Note that any pending modifications you make through the
170 : /// modification object won't be visible to calls to the 'get' and list
171 : /// functions of the timeline until you finish! And if you update the
172 : /// same page twice, the last update wins.
173 : ///
174 1610556 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
175 1610556 : where
176 1610556 : Self: Sized,
177 1610556 : {
178 1610556 : DatadirModification {
179 1610556 : tline: self,
180 1610556 : pending_lsns: Vec::new(),
181 1610556 : pending_metadata_pages: HashMap::new(),
182 1610556 : pending_data_batch: None,
183 1610556 : pending_deletions: Vec::new(),
184 1610556 : pending_nblocks: 0,
185 1610556 : pending_directory_entries: Vec::new(),
186 1610556 : pending_metadata_bytes: 0,
187 1610556 : lsn,
188 1610556 : }
189 1610556 : }
190 :
191 : //------------------------------------------------------------------------------
192 : // Public GET functions
193 : //------------------------------------------------------------------------------
194 :
195 : /// Look up given page version.
196 110304 : pub(crate) async fn get_rel_page_at_lsn(
197 110304 : &self,
198 110304 : tag: RelTag,
199 110304 : blknum: BlockNumber,
200 110304 : version: Version<'_>,
201 110304 : ctx: &RequestContext,
202 110304 : io_concurrency: IoConcurrency,
203 110304 : ) -> Result<Bytes, PageReconstructError> {
204 110304 : match version {
205 110304 : Version::Lsn(effective_lsn) => {
206 110304 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
207 110304 : let res = self
208 110304 : .get_rel_page_at_lsn_batched(
209 110304 : pages.iter().map(|(tag, blknum)| {
210 110304 : (tag, blknum, effective_lsn, ctx.attached_child())
211 110304 : }),
212 110304 : io_concurrency.clone(),
213 110304 : ctx,
214 110304 : )
215 110304 : .await;
216 110304 : assert_eq!(res.len(), 1);
217 110304 : res.into_iter().next().unwrap()
218 : }
219 0 : Version::Modified(modification) => {
220 0 : if tag.relnode == 0 {
221 0 : return Err(PageReconstructError::Other(
222 0 : RelationError::InvalidRelnode.into(),
223 0 : ));
224 0 : }
225 :
226 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
227 0 : if blknum >= nblocks {
228 0 : debug!(
229 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
230 0 : tag,
231 0 : blknum,
232 0 : version.get_lsn(),
233 : nblocks
234 : );
235 0 : return Ok(ZERO_PAGE.clone());
236 0 : }
237 0 :
238 0 : let key = rel_block_to_key(tag, blknum);
239 0 : modification.get(key, ctx).await
240 : }
241 : }
242 110304 : }
243 :
244 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
245 : ///
246 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
247 110304 : pub(crate) async fn get_rel_page_at_lsn_batched(
248 110304 : &self,
249 110304 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, Lsn, RequestContext)>,
250 110304 : io_concurrency: IoConcurrency,
251 110304 : ctx: &RequestContext,
252 110304 : ) -> Vec<Result<Bytes, PageReconstructError>> {
253 110304 : debug_assert_current_span_has_tenant_and_timeline_id();
254 110304 :
255 110304 : let mut slots_filled = 0;
256 110304 : let page_count = pages.len();
257 110304 :
258 110304 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
259 110304 : let mut result = Vec::with_capacity(pages.len());
260 110304 : let result_slots = result.spare_capacity_mut();
261 110304 :
262 110304 : let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
263 110304 : HashMap::with_capacity(pages.len());
264 110304 :
265 110304 : let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
266 110304 : HashMap::with_capacity(pages.len());
267 :
268 110304 : for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
269 110304 : if tag.relnode == 0 {
270 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
271 0 : RelationError::InvalidRelnode.into(),
272 0 : )));
273 0 :
274 0 : slots_filled += 1;
275 0 : continue;
276 110304 : }
277 :
278 110304 : let nblocks = match self
279 110304 : .get_rel_size(*tag, Version::Lsn(lsn), &ctx)
280 110304 : .maybe_perf_instrument(&ctx, |crnt_perf_span| {
281 0 : info_span!(
282 : target: PERF_TRACE_TARGET,
283 0 : parent: crnt_perf_span,
284 : "GET_REL_SIZE",
285 : reltag=%tag,
286 : lsn=%lsn,
287 : )
288 110304 : })
289 110304 : .await
290 : {
291 110304 : Ok(nblocks) => nblocks,
292 0 : Err(err) => {
293 0 : result_slots[response_slot_idx].write(Err(err));
294 0 : slots_filled += 1;
295 0 : continue;
296 : }
297 : };
298 :
299 110304 : if *blknum >= nblocks {
300 0 : debug!(
301 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
302 : tag, blknum, lsn, nblocks
303 : );
304 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
305 0 : slots_filled += 1;
306 0 : continue;
307 110304 : }
308 110304 :
309 110304 : let key = rel_block_to_key(*tag, *blknum);
310 110304 :
311 110304 : let key_slots = keys_slots.entry(key).or_default();
312 110304 : key_slots.push((response_slot_idx, ctx));
313 110304 :
314 110304 : let acc = req_keyspaces.entry(lsn).or_default();
315 110304 : acc.add_key(key);
316 : }
317 :
318 110304 : let query: Vec<(Lsn, KeySpace)> = req_keyspaces
319 110304 : .into_iter()
320 110304 : .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
321 110304 : .collect();
322 110304 :
323 110304 : let query = VersionedKeySpaceQuery::scattered(query);
324 110304 : let res = self
325 110304 : .get_vectored(query, io_concurrency, ctx)
326 110304 : .maybe_perf_instrument(ctx, |current_perf_span| {
327 0 : info_span!(
328 : target: PERF_TRACE_TARGET,
329 0 : parent: current_perf_span,
330 : "GET_BATCH",
331 : batch_size = %page_count,
332 : )
333 110304 : })
334 110304 : .await;
335 :
336 110304 : match res {
337 110304 : Ok(results) => {
338 220608 : for (key, res) in results {
339 110304 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
340 110304 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
341 :
342 110304 : for (slot, req_ctx) in key_slots {
343 0 : let clone = match &res {
344 0 : Ok(buf) => Ok(buf.clone()),
345 0 : Err(err) => Err(match err {
346 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
347 :
348 0 : x @ PageReconstructError::Other(_)
349 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
350 0 : | x @ PageReconstructError::WalRedo(_)
351 0 : | x @ PageReconstructError::MissingKey(_) => {
352 0 : PageReconstructError::Other(anyhow::anyhow!(
353 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
354 0 : ))
355 : }
356 : }),
357 : };
358 :
359 0 : result_slots[slot].write(clone);
360 0 : // There is no standardized way to express that the batched span followed from N request spans.
361 0 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
362 0 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
363 0 : req_ctx.perf_follows_from(ctx);
364 0 : slots_filled += 1;
365 : }
366 :
367 110304 : result_slots[first_slot].write(res);
368 110304 : first_req_ctx.perf_follows_from(ctx);
369 110304 : slots_filled += 1;
370 : }
371 : }
372 0 : Err(err) => {
373 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
374 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
375 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
376 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
377 : // but without taking ownership of the GetVectoredError
378 0 : let err = match &err {
379 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
380 : // TODO: restructure get_vectored API to make this error per-key
381 0 : GetVectoredError::MissingKey(err) => {
382 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
383 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
384 0 : )))
385 : }
386 : // TODO: restructure get_vectored API to make this error per-key
387 0 : GetVectoredError::GetReadyAncestorError(err) => {
388 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
389 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
390 0 : )))
391 : }
392 : // TODO: restructure get_vectored API to make this error per-key
393 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
394 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
395 0 : )),
396 : // TODO: we can prevent this error class by moving this check into the type system
397 0 : GetVectoredError::InvalidLsn(e) => {
398 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
399 : }
400 : // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
401 : // TODO: we can prevent this error class by moving this check into the type system
402 0 : GetVectoredError::Oversized(err) => {
403 0 : Err(anyhow::anyhow!("batching oversized: {err:?}").into())
404 : }
405 : };
406 :
407 0 : req_ctx.perf_follows_from(ctx);
408 0 : result_slots[*slot].write(err);
409 : }
410 :
411 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
412 0 : }
413 : };
414 :
415 110304 : assert_eq!(slots_filled, page_count);
416 : // SAFETY:
417 : // 1. `result` and any of its uninint members are not read from until this point
418 : // 2. The length below is tracked at run-time and matches the number of requested pages.
419 110304 : unsafe {
420 110304 : result.set_len(page_count);
421 110304 : }
422 110304 :
423 110304 : result
424 110304 : }
425 :
426 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
427 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
428 : /// for pages up to the highest page number it has stored.
429 0 : pub(crate) async fn get_db_size(
430 0 : &self,
431 0 : spcnode: Oid,
432 0 : dbnode: Oid,
433 0 : version: Version<'_>,
434 0 : ctx: &RequestContext,
435 0 : ) -> Result<usize, PageReconstructError> {
436 0 : let mut total_blocks = 0;
437 :
438 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
439 :
440 0 : for rel in rels {
441 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
442 0 : total_blocks += n_blocks as usize;
443 : }
444 0 : Ok(total_blocks)
445 0 : }
446 :
447 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
448 : ///
449 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
450 : /// page number stored in the shard.
451 146604 : pub(crate) async fn get_rel_size(
452 146604 : &self,
453 146604 : tag: RelTag,
454 146604 : version: Version<'_>,
455 146604 : ctx: &RequestContext,
456 146604 : ) -> Result<BlockNumber, PageReconstructError> {
457 146604 : if tag.relnode == 0 {
458 0 : return Err(PageReconstructError::Other(
459 0 : RelationError::InvalidRelnode.into(),
460 0 : ));
461 146604 : }
462 :
463 146604 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
464 115764 : return Ok(nblocks);
465 30840 : }
466 30840 :
467 30840 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
468 0 : && !self.get_rel_exists(tag, version, ctx).await?
469 : {
470 : // FIXME: Postgres sometimes calls smgrcreate() to create
471 : // FSM, and smgrnblocks() on it immediately afterwards,
472 : // without extending it. Tolerate that by claiming that
473 : // any non-existent FSM fork has size 0.
474 0 : return Ok(0);
475 30840 : }
476 30840 :
477 30840 : let key = rel_size_to_key(tag);
478 30840 : let mut buf = version.get(self, key, ctx).await?;
479 30816 : let nblocks = buf.get_u32_le();
480 30816 :
481 30816 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
482 30816 :
483 30816 : Ok(nblocks)
484 146604 : }
485 :
486 : /// Does the relation exist?
487 : ///
488 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
489 : /// the shard stores pages for.
490 36300 : pub(crate) async fn get_rel_exists(
491 36300 : &self,
492 36300 : tag: RelTag,
493 36300 : version: Version<'_>,
494 36300 : ctx: &RequestContext,
495 36300 : ) -> Result<bool, PageReconstructError> {
496 36300 : if tag.relnode == 0 {
497 0 : return Err(PageReconstructError::Other(
498 0 : RelationError::InvalidRelnode.into(),
499 0 : ));
500 36300 : }
501 :
502 : // first try to lookup relation in cache
503 36300 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
504 36192 : return Ok(true);
505 108 : }
506 : // then check if the database was already initialized.
507 : // get_rel_exists can be called before dbdir is created.
508 108 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
509 108 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
510 108 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
511 0 : return Ok(false);
512 108 : }
513 108 :
514 108 : // Read path: first read the new reldir keyspace. Early return if the relation exists.
515 108 : // Otherwise, read the old reldir keyspace.
516 108 : // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
517 108 :
518 108 : if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
519 108 : self.get_rel_size_v2_status()
520 : {
521 : // fetch directory listing (new)
522 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
523 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
524 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
525 0 : let exists_v2 = buf == RelDirExists::Exists;
526 0 : // Fast path: if the relation exists in the new format, return true.
527 0 : // TODO: we should have a verification mode that checks both keyspaces
528 0 : // to ensure the relation only exists in one of them.
529 0 : if exists_v2 {
530 0 : return Ok(true);
531 0 : }
532 108 : }
533 :
534 : // fetch directory listing (old)
535 :
536 108 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
537 108 : let buf = version.get(self, key, ctx).await?;
538 :
539 108 : let dir = RelDirectory::des(&buf)?;
540 108 : let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
541 108 : Ok(exists_v1)
542 36300 : }
543 :
544 : /// Get a list of all existing relations in given tablespace and database.
545 : ///
546 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
547 : /// the shard stores pages for.
548 : ///
549 : /// # Cancel-Safety
550 : ///
551 : /// This method is cancellation-safe.
552 0 : pub(crate) async fn list_rels(
553 0 : &self,
554 0 : spcnode: Oid,
555 0 : dbnode: Oid,
556 0 : version: Version<'_>,
557 0 : ctx: &RequestContext,
558 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
559 0 : // fetch directory listing (old)
560 0 : let key = rel_dir_to_key(spcnode, dbnode);
561 0 : let buf = version.get(self, key, ctx).await?;
562 :
563 0 : let dir = RelDirectory::des(&buf)?;
564 0 : let rels_v1: HashSet<RelTag> =
565 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
566 0 : spcnode,
567 0 : dbnode,
568 0 : relnode: *relnode,
569 0 : forknum: *forknum,
570 0 : }));
571 0 :
572 0 : if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
573 0 : return Ok(rels_v1);
574 0 : }
575 0 :
576 0 : // scan directory listing (new), merge with the old results
577 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
578 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
579 0 : self.conf,
580 0 : self.gate
581 0 : .enter()
582 0 : .map_err(|_| PageReconstructError::Cancelled)?,
583 : );
584 0 : let results = self
585 0 : .scan(
586 0 : KeySpace::single(key_range),
587 0 : version.get_lsn(),
588 0 : ctx,
589 0 : io_concurrency,
590 0 : )
591 0 : .await?;
592 0 : let mut rels = rels_v1;
593 0 : for (key, val) in results {
594 0 : let val = RelDirExists::decode(&val?)
595 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
596 0 : assert_eq!(key.field6, 1);
597 0 : assert_eq!(key.field2, spcnode);
598 0 : assert_eq!(key.field3, dbnode);
599 0 : let tag = RelTag {
600 0 : spcnode,
601 0 : dbnode,
602 0 : relnode: key.field4,
603 0 : forknum: key.field5,
604 0 : };
605 0 : if val == RelDirExists::Removed {
606 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
607 0 : continue;
608 0 : }
609 0 : let did_not_contain = rels.insert(tag);
610 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
611 : }
612 0 : Ok(rels)
613 0 : }
614 :
615 : /// Get the whole SLRU segment
616 0 : pub(crate) async fn get_slru_segment(
617 0 : &self,
618 0 : kind: SlruKind,
619 0 : segno: u32,
620 0 : lsn: Lsn,
621 0 : ctx: &RequestContext,
622 0 : ) -> Result<Bytes, PageReconstructError> {
623 0 : assert!(self.tenant_shard_id.is_shard_zero());
624 0 : let n_blocks = self
625 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
626 0 : .await?;
627 :
628 0 : let keyspace = KeySpace::single(
629 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
630 0 : );
631 0 :
632 0 : let batches = keyspace.partition(
633 0 : self.get_shard_identity(),
634 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
635 0 : );
636 :
637 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
638 0 : self.conf,
639 0 : self.gate
640 0 : .enter()
641 0 : .map_err(|_| PageReconstructError::Cancelled)?,
642 : );
643 :
644 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
645 0 : for batch in batches.parts {
646 0 : let query = VersionedKeySpaceQuery::uniform(batch, lsn);
647 0 : let blocks = self
648 0 : .get_vectored(query, io_concurrency.clone(), ctx)
649 0 : .await?;
650 :
651 0 : for (_key, block) in blocks {
652 0 : let block = block?;
653 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
654 : }
655 : }
656 :
657 0 : Ok(segment.freeze())
658 0 : }
659 :
660 : /// Get size of an SLRU segment
661 0 : pub(crate) async fn get_slru_segment_size(
662 0 : &self,
663 0 : kind: SlruKind,
664 0 : segno: u32,
665 0 : version: Version<'_>,
666 0 : ctx: &RequestContext,
667 0 : ) -> Result<BlockNumber, PageReconstructError> {
668 0 : assert!(self.tenant_shard_id.is_shard_zero());
669 0 : let key = slru_segment_size_to_key(kind, segno);
670 0 : let mut buf = version.get(self, key, ctx).await?;
671 0 : Ok(buf.get_u32_le())
672 0 : }
673 :
674 : /// Does the slru segment exist?
675 0 : pub(crate) async fn get_slru_segment_exists(
676 0 : &self,
677 0 : kind: SlruKind,
678 0 : segno: u32,
679 0 : version: Version<'_>,
680 0 : ctx: &RequestContext,
681 0 : ) -> Result<bool, PageReconstructError> {
682 0 : assert!(self.tenant_shard_id.is_shard_zero());
683 : // fetch directory listing
684 0 : let key = slru_dir_to_key(kind);
685 0 : let buf = version.get(self, key, ctx).await?;
686 :
687 0 : let dir = SlruSegmentDirectory::des(&buf)?;
688 0 : Ok(dir.segments.contains(&segno))
689 0 : }
690 :
691 : /// Locate LSN, such that all transactions that committed before
692 : /// 'search_timestamp' are visible, but nothing newer is.
693 : ///
694 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
695 : /// so it's not well defined which LSN you get if there were multiple commits
696 : /// "in flight" at that point in time.
697 : ///
698 0 : pub(crate) async fn find_lsn_for_timestamp(
699 0 : &self,
700 0 : search_timestamp: TimestampTz,
701 0 : cancel: &CancellationToken,
702 0 : ctx: &RequestContext,
703 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
704 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
705 :
706 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
707 0 : let gc_cutoff_planned = {
708 0 : let gc_info = self.gc_info.read().unwrap();
709 0 : gc_info.min_cutoff()
710 0 : };
711 0 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
712 0 : // but let's be defensive.
713 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
714 0 : // We use this method to figure out the branching LSN for the new branch, but the
715 0 : // GC cutoff could be before the branching point and we cannot create a new branch
716 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
717 0 : // on the safe side.
718 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
719 0 : let max_lsn = self.get_last_record_lsn();
720 0 :
721 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
722 0 : // LSN divided by 8.
723 0 : let mut low = min_lsn.0 / 8;
724 0 : let mut high = max_lsn.0 / 8 + 1;
725 0 :
726 0 : let mut found_smaller = false;
727 0 : let mut found_larger = false;
728 :
729 0 : while low < high {
730 0 : if cancel.is_cancelled() {
731 0 : return Err(PageReconstructError::Cancelled);
732 0 : }
733 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
734 0 : let mid = (high + low) / 2;
735 :
736 0 : let cmp = match self
737 0 : .is_latest_commit_timestamp_ge_than(
738 0 : search_timestamp,
739 0 : Lsn(mid * 8),
740 0 : &mut found_smaller,
741 0 : &mut found_larger,
742 0 : ctx,
743 0 : )
744 0 : .await
745 : {
746 0 : Ok(res) => res,
747 0 : Err(PageReconstructError::MissingKey(e)) => {
748 0 : warn!(
749 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
750 : e
751 : );
752 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
753 0 : return Ok(LsnForTimestamp::Past(min_lsn));
754 : }
755 0 : Err(e) => return Err(e),
756 : };
757 :
758 0 : if cmp {
759 0 : high = mid;
760 0 : } else {
761 0 : low = mid + 1;
762 0 : }
763 : }
764 :
765 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
766 : // so the LSN of the last commit record before or at `search_timestamp`.
767 : // Remove one from `low` to get `t`.
768 : //
769 : // FIXME: it would be better to get the LSN of the previous commit.
770 : // Otherwise, if you restore to the returned LSN, the database will
771 : // include physical changes from later commits that will be marked
772 : // as aborted, and will need to be vacuumed away.
773 0 : let commit_lsn = Lsn((low - 1) * 8);
774 0 : match (found_smaller, found_larger) {
775 : (false, false) => {
776 : // This can happen if no commit records have been processed yet, e.g.
777 : // just after importing a cluster.
778 0 : Ok(LsnForTimestamp::NoData(min_lsn))
779 : }
780 : (false, true) => {
781 : // Didn't find any commit timestamps smaller than the request
782 0 : Ok(LsnForTimestamp::Past(min_lsn))
783 : }
784 0 : (true, _) if commit_lsn < min_lsn => {
785 0 : // the search above did set found_smaller to true but it never increased the lsn.
786 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
787 0 : // below the min_lsn. We should never do that.
788 0 : Ok(LsnForTimestamp::Past(min_lsn))
789 : }
790 : (true, false) => {
791 : // Only found commits with timestamps smaller than the request.
792 : // It's still a valid case for branch creation, return it.
793 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
794 : // case, anyway.
795 0 : Ok(LsnForTimestamp::Future(commit_lsn))
796 : }
797 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
798 : }
799 0 : }
800 :
801 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
802 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
803 : ///
804 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
805 : /// with a smaller/larger timestamp.
806 : ///
807 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
808 0 : &self,
809 0 : search_timestamp: TimestampTz,
810 0 : probe_lsn: Lsn,
811 0 : found_smaller: &mut bool,
812 0 : found_larger: &mut bool,
813 0 : ctx: &RequestContext,
814 0 : ) -> Result<bool, PageReconstructError> {
815 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
816 0 : if timestamp >= search_timestamp {
817 0 : *found_larger = true;
818 0 : return ControlFlow::Break(true);
819 0 : } else {
820 0 : *found_smaller = true;
821 0 : }
822 0 : ControlFlow::Continue(())
823 0 : })
824 0 : .await
825 0 : }
826 :
827 : /// Obtain the timestamp for the given lsn.
828 : ///
829 : /// If the lsn has no timestamps (e.g. no commits), returns None.
830 0 : pub(crate) async fn get_timestamp_for_lsn(
831 0 : &self,
832 0 : probe_lsn: Lsn,
833 0 : ctx: &RequestContext,
834 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
835 0 : let mut max: Option<TimestampTz> = None;
836 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
837 0 : if let Some(max_prev) = max {
838 0 : max = Some(max_prev.max(timestamp));
839 0 : } else {
840 0 : max = Some(timestamp);
841 0 : }
842 0 : ControlFlow::Continue(())
843 0 : })
844 0 : .await?;
845 :
846 0 : Ok(max)
847 0 : }
848 :
849 : /// Runs the given function on all the timestamps for a given lsn
850 : ///
851 : /// The return value is either given by the closure, or set to the `Default`
852 : /// impl's output.
853 0 : async fn map_all_timestamps<T: Default>(
854 0 : &self,
855 0 : probe_lsn: Lsn,
856 0 : ctx: &RequestContext,
857 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
858 0 : ) -> Result<T, PageReconstructError> {
859 0 : for segno in self
860 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
861 0 : .await?
862 : {
863 0 : let nblocks = self
864 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
865 0 : .await?;
866 :
867 0 : let keyspace = KeySpace::single(
868 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
869 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
870 0 : );
871 0 :
872 0 : let batches = keyspace.partition(
873 0 : self.get_shard_identity(),
874 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
875 0 : );
876 :
877 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
878 0 : self.conf,
879 0 : self.gate
880 0 : .enter()
881 0 : .map_err(|_| PageReconstructError::Cancelled)?,
882 : );
883 :
884 0 : for batch in batches.parts.into_iter().rev() {
885 0 : let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
886 0 : let blocks = self
887 0 : .get_vectored(query, io_concurrency.clone(), ctx)
888 0 : .await?;
889 :
890 0 : for (_key, clog_page) in blocks.into_iter().rev() {
891 0 : let clog_page = clog_page?;
892 :
893 0 : if clog_page.len() == BLCKSZ as usize + 8 {
894 0 : let mut timestamp_bytes = [0u8; 8];
895 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
896 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
897 0 :
898 0 : match f(timestamp) {
899 0 : ControlFlow::Break(b) => return Ok(b),
900 0 : ControlFlow::Continue(()) => (),
901 : }
902 0 : }
903 : }
904 : }
905 : }
906 0 : Ok(Default::default())
907 0 : }
908 :
909 0 : pub(crate) async fn get_slru_keyspace(
910 0 : &self,
911 0 : version: Version<'_>,
912 0 : ctx: &RequestContext,
913 0 : ) -> Result<KeySpace, PageReconstructError> {
914 0 : let mut accum = KeySpaceAccum::new();
915 :
916 0 : for kind in SlruKind::iter() {
917 0 : let mut segments: Vec<u32> = self
918 0 : .list_slru_segments(kind, version, ctx)
919 0 : .await?
920 0 : .into_iter()
921 0 : .collect();
922 0 : segments.sort_unstable();
923 :
924 0 : for seg in segments {
925 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
926 :
927 0 : accum.add_range(
928 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
929 0 : );
930 : }
931 : }
932 :
933 0 : Ok(accum.to_keyspace())
934 0 : }
935 :
936 : /// Get a list of SLRU segments
937 0 : pub(crate) async fn list_slru_segments(
938 0 : &self,
939 0 : kind: SlruKind,
940 0 : version: Version<'_>,
941 0 : ctx: &RequestContext,
942 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
943 0 : // fetch directory entry
944 0 : let key = slru_dir_to_key(kind);
945 :
946 0 : let buf = version.get(self, key, ctx).await?;
947 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
948 0 : }
949 :
950 0 : pub(crate) async fn get_relmap_file(
951 0 : &self,
952 0 : spcnode: Oid,
953 0 : dbnode: Oid,
954 0 : version: Version<'_>,
955 0 : ctx: &RequestContext,
956 0 : ) -> Result<Bytes, PageReconstructError> {
957 0 : let key = relmap_file_key(spcnode, dbnode);
958 :
959 0 : let buf = version.get(self, key, ctx).await?;
960 0 : Ok(buf)
961 0 : }
962 :
963 2004 : pub(crate) async fn list_dbdirs(
964 2004 : &self,
965 2004 : lsn: Lsn,
966 2004 : ctx: &RequestContext,
967 2004 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
968 : // fetch directory entry
969 2004 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
970 :
971 2004 : Ok(DbDirectory::des(&buf)?.dbdirs)
972 2004 : }
973 :
974 0 : pub(crate) async fn get_twophase_file(
975 0 : &self,
976 0 : xid: u64,
977 0 : lsn: Lsn,
978 0 : ctx: &RequestContext,
979 0 : ) -> Result<Bytes, PageReconstructError> {
980 0 : let key = twophase_file_key(xid);
981 0 : let buf = self.get(key, lsn, ctx).await?;
982 0 : Ok(buf)
983 0 : }
984 :
985 2016 : pub(crate) async fn list_twophase_files(
986 2016 : &self,
987 2016 : lsn: Lsn,
988 2016 : ctx: &RequestContext,
989 2016 : ) -> Result<HashSet<u64>, PageReconstructError> {
990 : // fetch directory entry
991 2016 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
992 :
993 2016 : if self.pg_version >= 17 {
994 1860 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
995 : } else {
996 156 : Ok(TwoPhaseDirectory::des(&buf)?
997 : .xids
998 156 : .iter()
999 156 : .map(|x| u64::from(*x))
1000 156 : .collect())
1001 : }
1002 2016 : }
1003 :
1004 0 : pub(crate) async fn get_control_file(
1005 0 : &self,
1006 0 : lsn: Lsn,
1007 0 : ctx: &RequestContext,
1008 0 : ) -> Result<Bytes, PageReconstructError> {
1009 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1010 0 : }
1011 :
1012 72 : pub(crate) async fn get_checkpoint(
1013 72 : &self,
1014 72 : lsn: Lsn,
1015 72 : ctx: &RequestContext,
1016 72 : ) -> Result<Bytes, PageReconstructError> {
1017 72 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1018 72 : }
1019 :
1020 72 : async fn list_aux_files_v2(
1021 72 : &self,
1022 72 : lsn: Lsn,
1023 72 : ctx: &RequestContext,
1024 72 : io_concurrency: IoConcurrency,
1025 72 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1026 72 : let kv = self
1027 72 : .scan(
1028 72 : KeySpace::single(Key::metadata_aux_key_range()),
1029 72 : lsn,
1030 72 : ctx,
1031 72 : io_concurrency,
1032 72 : )
1033 72 : .await?;
1034 72 : let mut result = HashMap::new();
1035 72 : let mut sz = 0;
1036 180 : for (_, v) in kv {
1037 108 : let v = v?;
1038 108 : let v = aux_file::decode_file_value_bytes(&v)
1039 108 : .context("value decode")
1040 108 : .map_err(PageReconstructError::Other)?;
1041 204 : for (fname, content) in v {
1042 96 : sz += fname.len();
1043 96 : sz += content.len();
1044 96 : result.insert(fname, content);
1045 96 : }
1046 : }
1047 72 : self.aux_file_size_estimator.on_initial(sz);
1048 72 : Ok(result)
1049 72 : }
1050 :
1051 0 : pub(crate) async fn trigger_aux_file_size_computation(
1052 0 : &self,
1053 0 : lsn: Lsn,
1054 0 : ctx: &RequestContext,
1055 0 : io_concurrency: IoConcurrency,
1056 0 : ) -> Result<(), PageReconstructError> {
1057 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1058 0 : Ok(())
1059 0 : }
1060 :
1061 72 : pub(crate) async fn list_aux_files(
1062 72 : &self,
1063 72 : lsn: Lsn,
1064 72 : ctx: &RequestContext,
1065 72 : io_concurrency: IoConcurrency,
1066 72 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1067 72 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1068 72 : }
1069 :
1070 24 : pub(crate) async fn get_replorigins(
1071 24 : &self,
1072 24 : lsn: Lsn,
1073 24 : ctx: &RequestContext,
1074 24 : io_concurrency: IoConcurrency,
1075 24 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1076 24 : let kv = self
1077 24 : .scan(
1078 24 : KeySpace::single(repl_origin_key_range()),
1079 24 : lsn,
1080 24 : ctx,
1081 24 : io_concurrency,
1082 24 : )
1083 24 : .await?;
1084 24 : let mut result = HashMap::new();
1085 72 : for (k, v) in kv {
1086 60 : let v = v?;
1087 60 : if v.is_empty() {
1088 : // This is a tombstone -- we can skip it.
1089 : // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
1090 : // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
1091 : // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
1092 : // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
1093 24 : continue;
1094 36 : }
1095 36 : let origin_id = k.field6 as RepOriginId;
1096 36 : let origin_lsn = Lsn::des(&v)
1097 36 : .with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
1098 24 : if origin_lsn != Lsn::INVALID {
1099 24 : result.insert(origin_id, origin_lsn);
1100 24 : }
1101 : }
1102 12 : Ok(result)
1103 24 : }
1104 :
1105 : /// Does the same as get_current_logical_size but counted on demand.
1106 : /// Used to initialize the logical size tracking on startup.
1107 : ///
1108 : /// Only relation blocks are counted currently. That excludes metadata,
1109 : /// SLRUs, twophase files etc.
1110 : ///
1111 : /// # Cancel-Safety
1112 : ///
1113 : /// This method is cancellation-safe.
1114 84 : pub(crate) async fn get_current_logical_size_non_incremental(
1115 84 : &self,
1116 84 : lsn: Lsn,
1117 84 : ctx: &RequestContext,
1118 84 : ) -> Result<u64, CalculateLogicalSizeError> {
1119 84 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1120 84 :
1121 84 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1122 :
1123 : // Fetch list of database dirs and iterate them
1124 84 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1125 84 : let dbdir = DbDirectory::des(&buf)?;
1126 :
1127 84 : let mut total_size: u64 = 0;
1128 84 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1129 0 : for rel in self
1130 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
1131 0 : .await?
1132 : {
1133 0 : if self.cancel.is_cancelled() {
1134 0 : return Err(CalculateLogicalSizeError::Cancelled);
1135 0 : }
1136 0 : let relsize_key = rel_size_to_key(rel);
1137 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1138 0 : let relsize = buf.get_u32_le();
1139 0 :
1140 0 : total_size += relsize as u64;
1141 : }
1142 : }
1143 84 : Ok(total_size * BLCKSZ as u64)
1144 84 : }
1145 :
1146 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1147 : /// for gc-compaction.
1148 : ///
1149 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1150 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1151 : /// be kept only for a specific range of LSN.
1152 : ///
1153 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1154 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1155 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1156 : /// determine which keys to retain/drop for gc-compaction.
1157 : ///
1158 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1159 : /// to be retained for each of the branch LSN.
1160 : ///
1161 : /// The return value is (dense keyspace, sparse keyspace).
1162 324 : pub(crate) async fn collect_gc_compaction_keyspace(
1163 324 : &self,
1164 324 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1165 324 : let metadata_key_begin = Key::metadata_key_range().start;
1166 324 : let aux_v1_key = AUX_FILES_KEY;
1167 324 : let dense_keyspace = KeySpace {
1168 324 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1169 324 : };
1170 324 : Ok((
1171 324 : dense_keyspace,
1172 324 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1173 324 : ))
1174 324 : }
1175 :
1176 : ///
1177 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1178 : /// Anything that's not listed maybe removed from the underlying storage (from
1179 : /// that LSN forwards).
1180 : ///
1181 : /// The return value is (dense keyspace, sparse keyspace).
1182 2004 : pub(crate) async fn collect_keyspace(
1183 2004 : &self,
1184 2004 : lsn: Lsn,
1185 2004 : ctx: &RequestContext,
1186 2004 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1187 2004 : // Iterate through key ranges, greedily packing them into partitions
1188 2004 : let mut result = KeySpaceAccum::new();
1189 2004 :
1190 2004 : // The dbdir metadata always exists
1191 2004 : result.add_key(DBDIR_KEY);
1192 :
1193 : // Fetch list of database dirs and iterate them
1194 2004 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1195 2004 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1196 2004 :
1197 2004 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1198 2004 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1199 0 : if has_relmap_file {
1200 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1201 0 : }
1202 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1203 :
1204 0 : let mut rels: Vec<RelTag> = self
1205 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
1206 0 : .await?
1207 0 : .into_iter()
1208 0 : .collect();
1209 0 : rels.sort_unstable();
1210 0 : for rel in rels {
1211 0 : let relsize_key = rel_size_to_key(rel);
1212 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1213 0 : let relsize = buf.get_u32_le();
1214 0 :
1215 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1216 0 : result.add_key(relsize_key);
1217 : }
1218 : }
1219 :
1220 : // Iterate SLRUs next
1221 2004 : if self.tenant_shard_id.is_shard_zero() {
1222 5904 : for kind in [
1223 1968 : SlruKind::Clog,
1224 1968 : SlruKind::MultiXactMembers,
1225 1968 : SlruKind::MultiXactOffsets,
1226 : ] {
1227 5904 : let slrudir_key = slru_dir_to_key(kind);
1228 5904 : result.add_key(slrudir_key);
1229 5904 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1230 5904 : let dir = SlruSegmentDirectory::des(&buf)?;
1231 5904 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1232 5904 : segments.sort_unstable();
1233 5904 : for segno in segments {
1234 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1235 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1236 0 : let segsize = buf.get_u32_le();
1237 0 :
1238 0 : result.add_range(
1239 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1240 0 : );
1241 0 : result.add_key(segsize_key);
1242 : }
1243 : }
1244 36 : }
1245 :
1246 : // Then pg_twophase
1247 2004 : result.add_key(TWOPHASEDIR_KEY);
1248 :
1249 2004 : let mut xids: Vec<u64> = self
1250 2004 : .list_twophase_files(lsn, ctx)
1251 2004 : .await?
1252 2004 : .iter()
1253 2004 : .cloned()
1254 2004 : .collect();
1255 2004 : xids.sort_unstable();
1256 2004 : for xid in xids {
1257 0 : result.add_key(twophase_file_key(xid));
1258 0 : }
1259 :
1260 2004 : result.add_key(CONTROLFILE_KEY);
1261 2004 : result.add_key(CHECKPOINT_KEY);
1262 2004 :
1263 2004 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1264 2004 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1265 2004 : // and the keys will not be garbage-colllected.
1266 2004 : #[cfg(test)]
1267 2004 : {
1268 2004 : let guard = self.extra_test_dense_keyspace.load();
1269 2004 : for kr in &guard.ranges {
1270 0 : result.add_range(kr.clone());
1271 0 : }
1272 0 : }
1273 0 :
1274 2004 : let dense_keyspace = result.to_keyspace();
1275 2004 : let sparse_keyspace = SparseKeySpace(KeySpace {
1276 2004 : ranges: vec![
1277 2004 : Key::metadata_aux_key_range(),
1278 2004 : repl_origin_key_range(),
1279 2004 : Key::rel_dir_sparse_key_range(),
1280 2004 : ],
1281 2004 : });
1282 2004 :
1283 2004 : if cfg!(debug_assertions) {
1284 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1285 :
1286 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1287 : // category of sparse keys are split into their own image/delta files. If there
1288 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1289 : // and we want the developer to keep the keyspaces separated.
1290 :
1291 2004 : let ranges = &sparse_keyspace.0.ranges;
1292 :
1293 : // TODO: use a single overlaps_with across the codebase
1294 6012 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1295 6012 : !(a.end <= b.start || b.end <= a.start)
1296 6012 : }
1297 6012 : for i in 0..ranges.len() {
1298 6012 : for j in 0..i {
1299 6012 : if overlaps_with(&ranges[i], &ranges[j]) {
1300 0 : panic!(
1301 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1302 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1303 0 : );
1304 6012 : }
1305 : }
1306 : }
1307 4008 : for i in 1..ranges.len() {
1308 4008 : assert!(
1309 4008 : ranges[i - 1].end <= ranges[i].start,
1310 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1311 0 : ranges[i - 1].start,
1312 0 : ranges[i - 1].end,
1313 0 : ranges[i].start,
1314 0 : ranges[i].end
1315 : );
1316 : }
1317 0 : }
1318 :
1319 2004 : Ok((dense_keyspace, sparse_keyspace))
1320 2004 : }
1321 :
1322 : /// Get cached size of relation if it not updated after specified LSN
1323 2691240 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
1324 2691240 : let rel_size_cache = self.rel_size_cache.read().unwrap();
1325 2691240 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
1326 2691108 : if lsn >= *cached_lsn {
1327 2660232 : RELSIZE_CACHE_HITS.inc();
1328 2660232 : return Some(*nblocks);
1329 30876 : }
1330 30876 : RELSIZE_CACHE_MISSES_OLD.inc();
1331 132 : }
1332 31008 : RELSIZE_CACHE_MISSES.inc();
1333 31008 : None
1334 2691240 : }
1335 :
1336 : /// Update cached relation size if there is no more recent update
1337 30816 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1338 30816 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1339 30816 :
1340 30816 : if lsn < rel_size_cache.complete_as_of {
1341 : // Do not cache old values. It's safe to cache the size on read, as long as
1342 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1343 : // never evict values from the cache, so if the relation size changed after
1344 : // 'lsn', the new value is already in the cache.
1345 0 : return;
1346 30816 : }
1347 30816 :
1348 30816 : match rel_size_cache.map.entry(tag) {
1349 30816 : hash_map::Entry::Occupied(mut entry) => {
1350 30816 : let cached_lsn = entry.get_mut();
1351 30816 : if lsn >= cached_lsn.0 {
1352 0 : *cached_lsn = (lsn, nblocks);
1353 30816 : }
1354 : }
1355 0 : hash_map::Entry::Vacant(entry) => {
1356 0 : entry.insert((lsn, nblocks));
1357 0 : RELSIZE_CACHE_ENTRIES.inc();
1358 0 : }
1359 : }
1360 30816 : }
1361 :
1362 : /// Store cached relation size
1363 1696320 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1364 1696320 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1365 1696320 : if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
1366 11520 : RELSIZE_CACHE_ENTRIES.inc();
1367 1684800 : }
1368 1696320 : }
1369 :
1370 : /// Remove cached relation size
1371 12 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1372 12 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1373 12 : if rel_size_cache.map.remove(tag).is_some() {
1374 12 : RELSIZE_CACHE_ENTRIES.dec();
1375 12 : }
1376 12 : }
1377 : }
1378 :
1379 : /// DatadirModification represents an operation to ingest an atomic set of
1380 : /// updates to the repository.
1381 : ///
1382 : /// It is created by the 'begin_record' function. It is called for each WAL
1383 : /// record, so that all the modifications by a one WAL record appear atomic.
1384 : pub struct DatadirModification<'a> {
1385 : /// The timeline this modification applies to. You can access this to
1386 : /// read the state, but note that any pending updates are *not* reflected
1387 : /// in the state in 'tline' yet.
1388 : pub tline: &'a Timeline,
1389 :
1390 : /// Current LSN of the modification
1391 : lsn: Lsn,
1392 :
1393 : // The modifications are not applied directly to the underlying key-value store.
1394 : // The put-functions add the modifications here, and they are flushed to the
1395 : // underlying key-value store by the 'finish' function.
1396 : pending_lsns: Vec<Lsn>,
1397 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1398 : pending_nblocks: i64,
1399 :
1400 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1401 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1402 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1403 :
1404 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1405 : /// which keys are stored here.
1406 : pending_data_batch: Option<SerializedValueBatch>,
1407 :
1408 : /// For special "directory" keys that store key-value maps, track the size of the map
1409 : /// if it was updated in this modification.
1410 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1411 :
1412 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1413 : pending_metadata_bytes: usize,
1414 : }
1415 :
1416 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1417 : pub enum MetricsUpdate {
1418 : /// Set the metrics to this value
1419 : Set(u64),
1420 : /// Increment the metrics by this value
1421 : Add(u64),
1422 : /// Decrement the metrics by this value
1423 : Sub(u64),
1424 : }
1425 :
1426 : impl DatadirModification<'_> {
1427 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1428 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1429 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1430 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1431 :
1432 : /// Get the current lsn
1433 2508348 : pub(crate) fn get_lsn(&self) -> Lsn {
1434 2508348 : self.lsn
1435 2508348 : }
1436 :
1437 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1438 0 : self.pending_data_batch
1439 0 : .as_ref()
1440 0 : .map_or(0, |b| b.buffer_size())
1441 0 : + self.pending_metadata_bytes
1442 0 : }
1443 :
1444 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1445 0 : self.pending_data_batch
1446 0 : .as_ref()
1447 0 : .is_some_and(|b| b.has_data())
1448 0 : }
1449 :
1450 : /// Returns statistics about the currently pending modifications.
1451 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1452 0 : let mut stats = DatadirModificationStats::default();
1453 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1454 0 : match value {
1455 0 : Value::Image(_) => stats.metadata_images += 1,
1456 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1457 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1458 : }
1459 : }
1460 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1461 0 : match valuemeta {
1462 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1463 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1464 0 : ValueMeta::Observed(_) => {}
1465 : }
1466 : }
1467 0 : stats
1468 0 : }
1469 :
1470 : /// Set the current lsn
1471 875148 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
1472 875148 : ensure_walingest!(
1473 875148 : lsn >= self.lsn,
1474 875148 : "setting an older lsn {} than {} is not allowed",
1475 875148 : lsn,
1476 875148 : self.lsn
1477 875148 : );
1478 :
1479 875148 : if lsn > self.lsn {
1480 875148 : self.pending_lsns.push(self.lsn);
1481 875148 : self.lsn = lsn;
1482 875148 : }
1483 875148 : Ok(())
1484 875148 : }
1485 :
1486 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1487 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1488 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1489 : ///
1490 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1491 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1492 : /// via [`Self::get`] as soon as their record has been ingested.
1493 5104284 : fn is_data_key(key: &Key) -> bool {
1494 5104284 : key.is_rel_block_key() || key.is_slru_block_key()
1495 5104284 : }
1496 :
1497 : /// Initialize a completely new repository.
1498 : ///
1499 : /// This inserts the directory metadata entries that are assumed to
1500 : /// always exist.
1501 1320 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1502 1320 : let buf = DbDirectory::ser(&DbDirectory {
1503 1320 : dbdirs: HashMap::new(),
1504 1320 : })?;
1505 1320 : self.pending_directory_entries
1506 1320 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1507 1320 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1508 :
1509 1320 : let buf = if self.tline.pg_version >= 17 {
1510 1164 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1511 1164 : xids: HashSet::new(),
1512 1164 : })
1513 : } else {
1514 156 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1515 156 : xids: HashSet::new(),
1516 156 : })
1517 0 : }?;
1518 1320 : self.pending_directory_entries
1519 1320 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1520 1320 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1521 :
1522 1320 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1523 1320 : let empty_dir = Value::Image(buf);
1524 1320 :
1525 1320 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1526 1320 : // harmless but they'd just be dropped on later compaction.
1527 1320 : if self.tline.tenant_shard_id.is_shard_zero() {
1528 1284 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1529 1284 : self.pending_directory_entries.push((
1530 1284 : DirectoryKind::SlruSegment(SlruKind::Clog),
1531 1284 : MetricsUpdate::Set(0),
1532 1284 : ));
1533 1284 : self.put(
1534 1284 : slru_dir_to_key(SlruKind::MultiXactMembers),
1535 1284 : empty_dir.clone(),
1536 1284 : );
1537 1284 : self.pending_directory_entries.push((
1538 1284 : DirectoryKind::SlruSegment(SlruKind::Clog),
1539 1284 : MetricsUpdate::Set(0),
1540 1284 : ));
1541 1284 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1542 1284 : self.pending_directory_entries.push((
1543 1284 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1544 1284 : MetricsUpdate::Set(0),
1545 1284 : ));
1546 1284 : }
1547 :
1548 1320 : Ok(())
1549 1320 : }
1550 :
1551 : #[cfg(test)]
1552 1308 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1553 1308 : self.init_empty()?;
1554 1308 : self.put_control_file(bytes::Bytes::from_static(
1555 1308 : b"control_file contents do not matter",
1556 1308 : ))
1557 1308 : .context("put_control_file")?;
1558 1308 : self.put_checkpoint(bytes::Bytes::from_static(
1559 1308 : b"checkpoint_file contents do not matter",
1560 1308 : ))
1561 1308 : .context("put_checkpoint_file")?;
1562 1308 : Ok(())
1563 1308 : }
1564 :
1565 : /// Creates a relation if it is not already present.
1566 : /// Returns the current size of the relation
1567 2508336 : pub(crate) async fn create_relation_if_required(
1568 2508336 : &mut self,
1569 2508336 : rel: RelTag,
1570 2508336 : ctx: &RequestContext,
1571 2508336 : ) -> Result<u32, WalIngestError> {
1572 : // Get current size and put rel creation if rel doesn't exist
1573 : //
1574 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1575 : // check the cache too. This is because eagerly checking the cache results in
1576 : // less work overall and 10% better performance. It's more work on cache miss
1577 : // but cache miss is rare.
1578 2508336 : if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
1579 2508276 : Ok(nblocks)
1580 60 : } else if !self
1581 60 : .tline
1582 60 : .get_rel_exists(rel, Version::Modified(self), ctx)
1583 60 : .await?
1584 : {
1585 : // create it with 0 size initially, the logic below will extend it
1586 60 : self.put_rel_creation(rel, 0, ctx).await?;
1587 60 : Ok(0)
1588 : } else {
1589 0 : Ok(self
1590 0 : .tline
1591 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1592 0 : .await?)
1593 : }
1594 2508336 : }
1595 :
1596 : /// Given a block number for a relation (which represents a newly written block),
1597 : /// the previous block count of the relation, and the shard info, find the gaps
1598 : /// that were created by the newly written block if any.
1599 874020 : fn find_gaps(
1600 874020 : rel: RelTag,
1601 874020 : blkno: u32,
1602 874020 : previous_nblocks: u32,
1603 874020 : shard: &ShardIdentity,
1604 874020 : ) -> Option<KeySpace> {
1605 874020 : let mut key = rel_block_to_key(rel, blkno);
1606 874020 : let mut gap_accum = None;
1607 :
1608 874020 : for gap_blkno in previous_nblocks..blkno {
1609 192 : key.field6 = gap_blkno;
1610 192 :
1611 192 : if shard.get_shard_number(&key) != shard.number {
1612 48 : continue;
1613 144 : }
1614 144 :
1615 144 : gap_accum
1616 144 : .get_or_insert_with(KeySpaceAccum::new)
1617 144 : .add_key(key);
1618 : }
1619 :
1620 874020 : gap_accum.map(|accum| accum.to_keyspace())
1621 874020 : }
1622 :
1623 875112 : pub async fn ingest_batch(
1624 875112 : &mut self,
1625 875112 : mut batch: SerializedValueBatch,
1626 875112 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1627 875112 : shard: &ShardIdentity,
1628 875112 : ctx: &RequestContext,
1629 875112 : ) -> Result<(), WalIngestError> {
1630 875112 : let mut gaps_at_lsns = Vec::default();
1631 :
1632 875112 : for meta in batch.metadata.iter() {
1633 873852 : let key = Key::from_compact(meta.key());
1634 873852 : let (rel, blkno) = key
1635 873852 : .to_rel_block()
1636 873852 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
1637 873852 : let new_nblocks = blkno + 1;
1638 :
1639 873852 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1640 873852 : if new_nblocks > old_nblocks {
1641 14340 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1642 859512 : }
1643 :
1644 873852 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1645 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1646 873852 : }
1647 : }
1648 :
1649 875112 : if !gaps_at_lsns.is_empty() {
1650 0 : batch.zero_gaps(gaps_at_lsns);
1651 875112 : }
1652 :
1653 875112 : match self.pending_data_batch.as_mut() {
1654 120 : Some(pending_batch) => {
1655 120 : pending_batch.extend(batch);
1656 120 : }
1657 874992 : None if batch.has_data() => {
1658 873780 : self.pending_data_batch = Some(batch);
1659 873780 : }
1660 1212 : None => {
1661 1212 : // Nothing to initialize the batch with
1662 1212 : }
1663 : }
1664 :
1665 875112 : Ok(())
1666 875112 : }
1667 :
1668 : /// Put a new page version that can be constructed from a WAL record
1669 : ///
1670 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1671 : /// current end-of-file. It's up to the caller to check that the relation size
1672 : /// matches the blocks inserted!
1673 72 : pub fn put_rel_wal_record(
1674 72 : &mut self,
1675 72 : rel: RelTag,
1676 72 : blknum: BlockNumber,
1677 72 : rec: NeonWalRecord,
1678 72 : ) -> Result<(), WalIngestError> {
1679 72 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1680 72 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1681 72 : Ok(())
1682 72 : }
1683 :
1684 : // Same, but for an SLRU.
1685 48 : pub fn put_slru_wal_record(
1686 48 : &mut self,
1687 48 : kind: SlruKind,
1688 48 : segno: u32,
1689 48 : blknum: BlockNumber,
1690 48 : rec: NeonWalRecord,
1691 48 : ) -> Result<(), WalIngestError> {
1692 48 : if !self.tline.tenant_shard_id.is_shard_zero() {
1693 0 : return Ok(());
1694 48 : }
1695 48 :
1696 48 : self.put(
1697 48 : slru_block_to_key(kind, segno, blknum),
1698 48 : Value::WalRecord(rec),
1699 48 : );
1700 48 : Ok(())
1701 48 : }
1702 :
1703 : /// Like put_wal_record, but with ready-made image of the page.
1704 1667052 : pub fn put_rel_page_image(
1705 1667052 : &mut self,
1706 1667052 : rel: RelTag,
1707 1667052 : blknum: BlockNumber,
1708 1667052 : img: Bytes,
1709 1667052 : ) -> Result<(), WalIngestError> {
1710 1667052 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1711 1667052 : let key = rel_block_to_key(rel, blknum);
1712 1667052 : if !key.is_valid_key_on_write_path() {
1713 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1714 1667052 : }
1715 1667052 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1716 1667052 : Ok(())
1717 1667052 : }
1718 :
1719 36 : pub fn put_slru_page_image(
1720 36 : &mut self,
1721 36 : kind: SlruKind,
1722 36 : segno: u32,
1723 36 : blknum: BlockNumber,
1724 36 : img: Bytes,
1725 36 : ) -> Result<(), WalIngestError> {
1726 36 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1727 :
1728 36 : let key = slru_block_to_key(kind, segno, blknum);
1729 36 : if !key.is_valid_key_on_write_path() {
1730 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1731 36 : }
1732 36 : self.put(key, Value::Image(img));
1733 36 : Ok(())
1734 36 : }
1735 :
1736 17988 : pub(crate) fn put_rel_page_image_zero(
1737 17988 : &mut self,
1738 17988 : rel: RelTag,
1739 17988 : blknum: BlockNumber,
1740 17988 : ) -> Result<(), WalIngestError> {
1741 17988 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1742 17988 : let key = rel_block_to_key(rel, blknum);
1743 17988 : if !key.is_valid_key_on_write_path() {
1744 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1745 17988 : }
1746 :
1747 17988 : let batch = self
1748 17988 : .pending_data_batch
1749 17988 : .get_or_insert_with(SerializedValueBatch::default);
1750 17988 :
1751 17988 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1752 17988 :
1753 17988 : Ok(())
1754 17988 : }
1755 :
1756 0 : pub(crate) fn put_slru_page_image_zero(
1757 0 : &mut self,
1758 0 : kind: SlruKind,
1759 0 : segno: u32,
1760 0 : blknum: BlockNumber,
1761 0 : ) -> Result<(), WalIngestError> {
1762 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1763 0 : let key = slru_block_to_key(kind, segno, blknum);
1764 0 : if !key.is_valid_key_on_write_path() {
1765 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1766 0 : }
1767 :
1768 0 : let batch = self
1769 0 : .pending_data_batch
1770 0 : .get_or_insert_with(SerializedValueBatch::default);
1771 0 :
1772 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1773 0 :
1774 0 : Ok(())
1775 0 : }
1776 :
1777 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
1778 : /// we enable it, we also need to persist it in `index_part.json`.
1779 11676 : pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
1780 11676 : let status = self.tline.get_rel_size_v2_status();
1781 11676 : let config = self.tline.get_rel_size_v2_enabled();
1782 11676 : match (config, status) {
1783 : (false, RelSizeMigration::Legacy) => {
1784 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
1785 11676 : Ok(false)
1786 : }
1787 : (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1788 : // index_part already persisted that the timeline has enabled rel_size_v2
1789 0 : Ok(true)
1790 : }
1791 : (true, RelSizeMigration::Legacy) => {
1792 : // The first time we enable it, we need to persist it in `index_part.json`
1793 0 : self.tline
1794 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
1795 0 : tracing::info!("enabled rel_size_v2");
1796 0 : Ok(true)
1797 : }
1798 : (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1799 : // index_part already persisted that the timeline has enabled rel_size_v2
1800 : // and we don't need to do anything
1801 0 : Ok(true)
1802 : }
1803 : }
1804 11676 : }
1805 :
1806 : /// Store a relmapper file (pg_filenode.map) in the repository
1807 96 : pub async fn put_relmap_file(
1808 96 : &mut self,
1809 96 : spcnode: Oid,
1810 96 : dbnode: Oid,
1811 96 : img: Bytes,
1812 96 : ctx: &RequestContext,
1813 96 : ) -> Result<(), WalIngestError> {
1814 96 : let v2_enabled = self
1815 96 : .maybe_enable_rel_size_v2()
1816 96 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
1817 :
1818 : // Add it to the directory (if it doesn't exist already)
1819 96 : let buf = self.get(DBDIR_KEY, ctx).await?;
1820 96 : let mut dbdir = DbDirectory::des(&buf)?;
1821 :
1822 96 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1823 96 : if r.is_none() || r == Some(false) {
1824 : // The dbdir entry didn't exist, or it contained a
1825 : // 'false'. The 'insert' call already updated it with
1826 : // 'true', now write the updated 'dbdirs' map back.
1827 96 : let buf = DbDirectory::ser(&dbdir)?;
1828 96 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1829 0 : }
1830 96 : if r.is_none() {
1831 : // Create RelDirectory
1832 : // TODO: if we have fully migrated to v2, no need to create this directory
1833 48 : let buf = RelDirectory::ser(&RelDirectory {
1834 48 : rels: HashSet::new(),
1835 48 : })?;
1836 48 : self.pending_directory_entries
1837 48 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1838 48 : if v2_enabled {
1839 0 : self.pending_directory_entries
1840 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1841 48 : }
1842 48 : self.put(
1843 48 : rel_dir_to_key(spcnode, dbnode),
1844 48 : Value::Image(Bytes::from(buf)),
1845 48 : );
1846 48 : }
1847 :
1848 96 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1849 96 : Ok(())
1850 96 : }
1851 :
1852 0 : pub async fn put_twophase_file(
1853 0 : &mut self,
1854 0 : xid: u64,
1855 0 : img: Bytes,
1856 0 : ctx: &RequestContext,
1857 0 : ) -> Result<(), WalIngestError> {
1858 : // Add it to the directory entry
1859 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1860 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1861 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1862 0 : if !dir.xids.insert(xid) {
1863 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
1864 0 : }
1865 0 : self.pending_directory_entries.push((
1866 0 : DirectoryKind::TwoPhase,
1867 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1868 0 : ));
1869 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1870 : } else {
1871 0 : let xid = xid as u32;
1872 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1873 0 : if !dir.xids.insert(xid) {
1874 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
1875 0 : }
1876 0 : self.pending_directory_entries.push((
1877 0 : DirectoryKind::TwoPhase,
1878 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1879 0 : ));
1880 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1881 : };
1882 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1883 0 :
1884 0 : self.put(twophase_file_key(xid), Value::Image(img));
1885 0 : Ok(())
1886 0 : }
1887 :
1888 12 : pub async fn set_replorigin(
1889 12 : &mut self,
1890 12 : origin_id: RepOriginId,
1891 12 : origin_lsn: Lsn,
1892 12 : ) -> Result<(), WalIngestError> {
1893 12 : let key = repl_origin_key(origin_id);
1894 12 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1895 12 : Ok(())
1896 12 : }
1897 :
1898 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
1899 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1900 0 : }
1901 :
1902 1320 : pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
1903 1320 : self.put(CONTROLFILE_KEY, Value::Image(img));
1904 1320 : Ok(())
1905 1320 : }
1906 :
1907 1404 : pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
1908 1404 : self.put(CHECKPOINT_KEY, Value::Image(img));
1909 1404 : Ok(())
1910 1404 : }
1911 :
1912 0 : pub async fn drop_dbdir(
1913 0 : &mut self,
1914 0 : spcnode: Oid,
1915 0 : dbnode: Oid,
1916 0 : ctx: &RequestContext,
1917 0 : ) -> Result<(), WalIngestError> {
1918 0 : let total_blocks = self
1919 0 : .tline
1920 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1921 0 : .await?;
1922 :
1923 : // Remove entry from dbdir
1924 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1925 0 : let mut dir = DbDirectory::des(&buf)?;
1926 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1927 0 : let buf = DbDirectory::ser(&dir)?;
1928 0 : self.pending_directory_entries.push((
1929 0 : DirectoryKind::Db,
1930 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
1931 0 : ));
1932 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1933 : } else {
1934 0 : warn!(
1935 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1936 : spcnode, dbnode
1937 : );
1938 : }
1939 :
1940 : // Update logical database size.
1941 0 : self.pending_nblocks -= total_blocks as i64;
1942 0 :
1943 0 : // Delete all relations and metadata files for the spcnode/dnode
1944 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1945 0 : Ok(())
1946 0 : }
1947 :
1948 : /// Create a relation fork.
1949 : ///
1950 : /// 'nblocks' is the initial size.
1951 11520 : pub async fn put_rel_creation(
1952 11520 : &mut self,
1953 11520 : rel: RelTag,
1954 11520 : nblocks: BlockNumber,
1955 11520 : ctx: &RequestContext,
1956 11520 : ) -> Result<(), WalIngestError> {
1957 11520 : if rel.relnode == 0 {
1958 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
1959 0 : "invalid relnode"
1960 0 : )))?;
1961 11520 : }
1962 : // It's possible that this is the first rel for this db in this
1963 : // tablespace. Create the reldir entry for it if so.
1964 11520 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
1965 :
1966 11520 : let dbdir_exists =
1967 11520 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1968 : // Didn't exist. Update dbdir
1969 48 : e.insert(false);
1970 48 : let buf = DbDirectory::ser(&dbdir)?;
1971 48 : self.pending_directory_entries.push((
1972 48 : DirectoryKind::Db,
1973 48 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
1974 48 : ));
1975 48 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1976 48 : false
1977 : } else {
1978 11472 : true
1979 : };
1980 :
1981 11520 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1982 11520 : let mut rel_dir = if !dbdir_exists {
1983 : // Create the RelDirectory
1984 48 : RelDirectory::default()
1985 : } else {
1986 : // reldir already exists, fetch it
1987 11472 : RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
1988 : };
1989 :
1990 11520 : let v2_enabled = self
1991 11520 : .maybe_enable_rel_size_v2()
1992 11520 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
1993 :
1994 11520 : if v2_enabled {
1995 0 : if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
1996 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
1997 0 : }
1998 0 : let sparse_rel_dir_key =
1999 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2000 : // check if the rel_dir_key exists in v2
2001 0 : let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
2002 0 : let val = RelDirExists::decode_option(val)
2003 0 : .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
2004 0 : if val == RelDirExists::Exists {
2005 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2006 0 : }
2007 0 : self.put(
2008 0 : sparse_rel_dir_key,
2009 0 : Value::Image(RelDirExists::Exists.encode()),
2010 0 : );
2011 0 : if !dbdir_exists {
2012 0 : self.pending_directory_entries
2013 0 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2014 0 : self.pending_directory_entries
2015 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2016 0 : // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
2017 0 : // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
2018 0 : // will be key not found errors if we don't create an empty one for rel_size_v2.
2019 0 : self.put(
2020 0 : rel_dir_key,
2021 0 : Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
2022 : );
2023 0 : }
2024 0 : self.pending_directory_entries
2025 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2026 : } else {
2027 : // Add the new relation to the rel directory entry, and write it back
2028 11520 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2029 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2030 11520 : }
2031 11520 : if !dbdir_exists {
2032 48 : self.pending_directory_entries
2033 48 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2034 11472 : }
2035 11520 : self.pending_directory_entries
2036 11520 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2037 11520 : self.put(
2038 11520 : rel_dir_key,
2039 11520 : Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
2040 : );
2041 : }
2042 :
2043 : // Put size
2044 11520 : let size_key = rel_size_to_key(rel);
2045 11520 : let buf = nblocks.to_le_bytes();
2046 11520 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2047 11520 :
2048 11520 : self.pending_nblocks += nblocks as i64;
2049 11520 :
2050 11520 : // Update relation size cache
2051 11520 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2052 11520 :
2053 11520 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2054 11520 : // caller.
2055 11520 : Ok(())
2056 11520 : }
2057 :
2058 : /// Truncate relation
2059 36072 : pub async fn put_rel_truncation(
2060 36072 : &mut self,
2061 36072 : rel: RelTag,
2062 36072 : nblocks: BlockNumber,
2063 36072 : ctx: &RequestContext,
2064 36072 : ) -> Result<(), WalIngestError> {
2065 36072 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2066 36072 : if self
2067 36072 : .tline
2068 36072 : .get_rel_exists(rel, Version::Modified(self), ctx)
2069 36072 : .await?
2070 : {
2071 36072 : let size_key = rel_size_to_key(rel);
2072 : // Fetch the old size first
2073 36072 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2074 36072 :
2075 36072 : // Update the entry with the new size.
2076 36072 : let buf = nblocks.to_le_bytes();
2077 36072 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2078 36072 :
2079 36072 : // Update relation size cache
2080 36072 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2081 36072 :
2082 36072 : // Update logical database size.
2083 36072 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2084 0 : }
2085 36072 : Ok(())
2086 36072 : }
2087 :
2088 : /// Extend relation
2089 : /// If new size is smaller, do nothing.
2090 1660080 : pub async fn put_rel_extend(
2091 1660080 : &mut self,
2092 1660080 : rel: RelTag,
2093 1660080 : nblocks: BlockNumber,
2094 1660080 : ctx: &RequestContext,
2095 1660080 : ) -> Result<(), WalIngestError> {
2096 1660080 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2097 :
2098 : // Put size
2099 1660080 : let size_key = rel_size_to_key(rel);
2100 1660080 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2101 1660080 :
2102 1660080 : // only extend relation here. never decrease the size
2103 1660080 : if nblocks > old_size {
2104 1648728 : let buf = nblocks.to_le_bytes();
2105 1648728 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2106 1648728 :
2107 1648728 : // Update relation size cache
2108 1648728 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2109 1648728 :
2110 1648728 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2111 1648728 : }
2112 1660080 : Ok(())
2113 1660080 : }
2114 :
2115 : /// Drop some relations
2116 60 : pub(crate) async fn put_rel_drops(
2117 60 : &mut self,
2118 60 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2119 60 : ctx: &RequestContext,
2120 60 : ) -> Result<(), WalIngestError> {
2121 60 : let v2_enabled = self
2122 60 : .maybe_enable_rel_size_v2()
2123 60 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2124 72 : for ((spc_node, db_node), rel_tags) in drop_relations {
2125 12 : let dir_key = rel_dir_to_key(spc_node, db_node);
2126 12 : let buf = self.get(dir_key, ctx).await?;
2127 12 : let mut dir = RelDirectory::des(&buf)?;
2128 :
2129 12 : let mut dirty = false;
2130 24 : for rel_tag in rel_tags {
2131 12 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2132 12 : self.pending_directory_entries
2133 12 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2134 12 : dirty = true;
2135 12 : true
2136 0 : } else if v2_enabled {
2137 : // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
2138 : // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
2139 : // logic).
2140 0 : let key =
2141 0 : rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2142 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2143 0 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2144 0 : if val == RelDirExists::Exists {
2145 0 : self.pending_directory_entries
2146 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2147 0 : // put tombstone
2148 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2149 0 : // no need to set dirty to true
2150 0 : true
2151 : } else {
2152 0 : false
2153 : }
2154 : } else {
2155 0 : false
2156 : };
2157 :
2158 12 : if found {
2159 : // update logical size
2160 12 : let size_key = rel_size_to_key(rel_tag);
2161 12 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2162 12 : self.pending_nblocks -= old_size as i64;
2163 12 :
2164 12 : // Remove entry from relation size cache
2165 12 : self.tline.remove_cached_rel_size(&rel_tag);
2166 12 :
2167 12 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2168 12 : self.delete(rel_key_range(rel_tag));
2169 0 : }
2170 : }
2171 :
2172 12 : if dirty {
2173 12 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2174 0 : }
2175 : }
2176 :
2177 60 : Ok(())
2178 60 : }
2179 :
2180 36 : pub async fn put_slru_segment_creation(
2181 36 : &mut self,
2182 36 : kind: SlruKind,
2183 36 : segno: u32,
2184 36 : nblocks: BlockNumber,
2185 36 : ctx: &RequestContext,
2186 36 : ) -> Result<(), WalIngestError> {
2187 36 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2188 :
2189 : // Add it to the directory entry
2190 36 : let dir_key = slru_dir_to_key(kind);
2191 36 : let buf = self.get(dir_key, ctx).await?;
2192 36 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2193 :
2194 36 : if !dir.segments.insert(segno) {
2195 0 : Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
2196 36 : }
2197 36 : self.pending_directory_entries.push((
2198 36 : DirectoryKind::SlruSegment(kind),
2199 36 : MetricsUpdate::Set(dir.segments.len() as u64),
2200 36 : ));
2201 36 : self.put(
2202 36 : dir_key,
2203 36 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2204 : );
2205 :
2206 : // Put size
2207 36 : let size_key = slru_segment_size_to_key(kind, segno);
2208 36 : let buf = nblocks.to_le_bytes();
2209 36 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2210 36 :
2211 36 : // even if nblocks > 0, we don't insert any actual blocks here
2212 36 :
2213 36 : Ok(())
2214 36 : }
2215 :
2216 : /// Extend SLRU segment
2217 0 : pub fn put_slru_extend(
2218 0 : &mut self,
2219 0 : kind: SlruKind,
2220 0 : segno: u32,
2221 0 : nblocks: BlockNumber,
2222 0 : ) -> Result<(), WalIngestError> {
2223 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2224 :
2225 : // Put size
2226 0 : let size_key = slru_segment_size_to_key(kind, segno);
2227 0 : let buf = nblocks.to_le_bytes();
2228 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2229 0 : Ok(())
2230 0 : }
2231 :
2232 : /// This method is used for marking truncated SLRU files
2233 0 : pub async fn drop_slru_segment(
2234 0 : &mut self,
2235 0 : kind: SlruKind,
2236 0 : segno: u32,
2237 0 : ctx: &RequestContext,
2238 0 : ) -> Result<(), WalIngestError> {
2239 0 : // Remove it from the directory entry
2240 0 : let dir_key = slru_dir_to_key(kind);
2241 0 : let buf = self.get(dir_key, ctx).await?;
2242 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2243 :
2244 0 : if !dir.segments.remove(&segno) {
2245 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2246 0 : }
2247 0 : self.pending_directory_entries.push((
2248 0 : DirectoryKind::SlruSegment(kind),
2249 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2250 0 : ));
2251 0 : self.put(
2252 0 : dir_key,
2253 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2254 : );
2255 :
2256 : // Delete size entry, as well as all blocks
2257 0 : self.delete(slru_segment_key_range(kind, segno));
2258 0 :
2259 0 : Ok(())
2260 0 : }
2261 :
2262 : /// Drop a relmapper file (pg_filenode.map)
2263 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
2264 0 : // TODO
2265 0 : Ok(())
2266 0 : }
2267 :
2268 : /// This method is used for marking truncated SLRU files
2269 0 : pub async fn drop_twophase_file(
2270 0 : &mut self,
2271 0 : xid: u64,
2272 0 : ctx: &RequestContext,
2273 0 : ) -> Result<(), WalIngestError> {
2274 : // Remove it from the directory entry
2275 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2276 0 : let newdirbuf = if self.tline.pg_version >= 17 {
2277 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2278 :
2279 0 : if !dir.xids.remove(&xid) {
2280 0 : warn!("twophase file for xid {} does not exist", xid);
2281 0 : }
2282 0 : self.pending_directory_entries.push((
2283 0 : DirectoryKind::TwoPhase,
2284 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2285 0 : ));
2286 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2287 : } else {
2288 0 : let xid: u32 = u32::try_from(xid)
2289 0 : .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
2290 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2291 :
2292 0 : if !dir.xids.remove(&xid) {
2293 0 : warn!("twophase file for xid {} does not exist", xid);
2294 0 : }
2295 0 : self.pending_directory_entries.push((
2296 0 : DirectoryKind::TwoPhase,
2297 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2298 0 : ));
2299 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2300 : };
2301 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2302 0 :
2303 0 : // Delete it
2304 0 : self.delete(twophase_key_range(xid));
2305 0 :
2306 0 : Ok(())
2307 0 : }
2308 :
2309 96 : pub async fn put_file(
2310 96 : &mut self,
2311 96 : path: &str,
2312 96 : content: &[u8],
2313 96 : ctx: &RequestContext,
2314 96 : ) -> Result<(), WalIngestError> {
2315 96 : let key = aux_file::encode_aux_file_key(path);
2316 : // retrieve the key from the engine
2317 96 : let old_val = match self.get(key, ctx).await {
2318 24 : Ok(val) => Some(val),
2319 72 : Err(PageReconstructError::MissingKey(_)) => None,
2320 0 : Err(e) => return Err(e.into()),
2321 : };
2322 96 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2323 24 : aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
2324 : } else {
2325 72 : Vec::new()
2326 : };
2327 96 : let mut other_files = Vec::with_capacity(files.len());
2328 96 : let mut modifying_file = None;
2329 120 : for file @ (p, content) in files {
2330 24 : if path == p {
2331 24 : assert!(
2332 24 : modifying_file.is_none(),
2333 0 : "duplicated entries found for {}",
2334 : path
2335 : );
2336 24 : modifying_file = Some(content);
2337 0 : } else {
2338 0 : other_files.push(file);
2339 0 : }
2340 : }
2341 96 : let mut new_files = other_files;
2342 96 : match (modifying_file, content.is_empty()) {
2343 12 : (Some(old_content), false) => {
2344 12 : self.tline
2345 12 : .aux_file_size_estimator
2346 12 : .on_update(old_content.len(), content.len());
2347 12 : new_files.push((path, content));
2348 12 : }
2349 12 : (Some(old_content), true) => {
2350 12 : self.tline
2351 12 : .aux_file_size_estimator
2352 12 : .on_remove(old_content.len());
2353 12 : // not adding the file key to the final `new_files` vec.
2354 12 : }
2355 72 : (None, false) => {
2356 72 : self.tline.aux_file_size_estimator.on_add(content.len());
2357 72 : new_files.push((path, content));
2358 72 : }
2359 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2360 : // Compute doesn't know if previous version of this file exists or not, so
2361 : // attempt to delete non-existing file can cause this message.
2362 : // To avoid false alarms, log it as info rather than warning.
2363 0 : (None, true) if path.starts_with("pg_stat/") => {
2364 0 : info!("removing non-existing pg_stat file: {}", path)
2365 : }
2366 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2367 : }
2368 96 : let new_val = aux_file::encode_file_value(&new_files)
2369 96 : .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
2370 96 : self.put(key, Value::Image(new_val.into()));
2371 96 :
2372 96 : Ok(())
2373 96 : }
2374 :
2375 : ///
2376 : /// Flush changes accumulated so far to the underlying repository.
2377 : ///
2378 : /// Usually, changes made in DatadirModification are atomic, but this allows
2379 : /// you to flush them to the underlying repository before the final `commit`.
2380 : /// That allows to free up the memory used to hold the pending changes.
2381 : ///
2382 : /// Currently only used during bulk import of a data directory. In that
2383 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2384 : /// whole import fails and the timeline will be deleted anyway.
2385 : /// (Or to be precise, it will be left behind for debugging purposes and
2386 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2387 : ///
2388 : /// Note: A consequence of flushing the pending operations is that they
2389 : /// won't be visible to subsequent operations until `commit`. The function
2390 : /// retains all the metadata, but data pages are flushed. That's again OK
2391 : /// for bulk import, where you are just loading data pages and won't try to
2392 : /// modify the same pages twice.
2393 11580 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2394 11580 : // Unless we have accumulated a decent amount of changes, it's not worth it
2395 11580 : // to scan through the pending_updates list.
2396 11580 : let pending_nblocks = self.pending_nblocks;
2397 11580 : if pending_nblocks < 10000 {
2398 11580 : return Ok(());
2399 0 : }
2400 :
2401 0 : let mut writer = self.tline.writer().await;
2402 :
2403 : // Flush relation and SLRU data blocks, keep metadata.
2404 0 : if let Some(batch) = self.pending_data_batch.take() {
2405 0 : tracing::debug!(
2406 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2407 0 : batch.max_lsn,
2408 0 : self.tline.get_last_record_lsn()
2409 : );
2410 :
2411 : // This bails out on first error without modifying pending_updates.
2412 : // That's Ok, cf this function's doc comment.
2413 0 : writer.put_batch(batch, ctx).await?;
2414 0 : }
2415 :
2416 0 : if pending_nblocks != 0 {
2417 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2418 0 : self.pending_nblocks = 0;
2419 0 : }
2420 :
2421 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2422 0 : writer.update_directory_entries_count(kind, count);
2423 0 : }
2424 :
2425 0 : Ok(())
2426 11580 : }
2427 :
2428 : ///
2429 : /// Finish this atomic update, writing all the updated keys to the
2430 : /// underlying timeline.
2431 : /// All the modifications in this atomic update are stamped by the specified LSN.
2432 : ///
2433 4458660 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2434 4458660 : let mut writer = self.tline.writer().await;
2435 :
2436 4458660 : let pending_nblocks = self.pending_nblocks;
2437 4458660 : self.pending_nblocks = 0;
2438 :
2439 : // Ordering: the items in this batch do not need to be in any global order, but values for
2440 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2441 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2442 : // more details.
2443 :
2444 4458660 : let metadata_batch = {
2445 4458660 : let pending_meta = self
2446 4458660 : .pending_metadata_pages
2447 4458660 : .drain()
2448 4458660 : .flat_map(|(key, values)| {
2449 1644648 : values
2450 1644648 : .into_iter()
2451 1644648 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2452 4458660 : })
2453 4458660 : .collect::<Vec<_>>();
2454 4458660 :
2455 4458660 : if pending_meta.is_empty() {
2456 2833668 : None
2457 : } else {
2458 1624992 : Some(SerializedValueBatch::from_values(pending_meta))
2459 : }
2460 : };
2461 :
2462 4458660 : let data_batch = self.pending_data_batch.take();
2463 :
2464 4458660 : let maybe_batch = match (data_batch, metadata_batch) {
2465 1587336 : (Some(mut data), Some(metadata)) => {
2466 1587336 : data.extend(metadata);
2467 1587336 : Some(data)
2468 : }
2469 859572 : (Some(data), None) => Some(data),
2470 37656 : (None, Some(metadata)) => Some(metadata),
2471 1974096 : (None, None) => None,
2472 : };
2473 :
2474 4458660 : if let Some(batch) = maybe_batch {
2475 2484564 : tracing::debug!(
2476 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2477 0 : batch.max_lsn,
2478 0 : self.tline.get_last_record_lsn()
2479 : );
2480 :
2481 : // This bails out on first error without modifying pending_updates.
2482 : // That's Ok, cf this function's doc comment.
2483 2484564 : writer.put_batch(batch, ctx).await?;
2484 1974096 : }
2485 :
2486 4458660 : if !self.pending_deletions.is_empty() {
2487 12 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2488 12 : self.pending_deletions.clear();
2489 4458648 : }
2490 :
2491 4458660 : self.pending_lsns.push(self.lsn);
2492 5333808 : for pending_lsn in self.pending_lsns.drain(..) {
2493 5333808 : // TODO(vlad): pretty sure the comment below is not valid anymore
2494 5333808 : // and we can call finish write with the latest LSN
2495 5333808 : //
2496 5333808 : // Ideally, we should be able to call writer.finish_write() only once
2497 5333808 : // with the highest LSN. However, the last_record_lsn variable in the
2498 5333808 : // timeline keeps track of the latest LSN and the immediate previous LSN
2499 5333808 : // so we need to record every LSN to not leave a gap between them.
2500 5333808 : writer.finish_write(pending_lsn);
2501 5333808 : }
2502 :
2503 4458660 : if pending_nblocks != 0 {
2504 1623420 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2505 2835240 : }
2506 :
2507 4458660 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2508 18204 : writer.update_directory_entries_count(kind, count);
2509 18204 : }
2510 :
2511 4458660 : self.pending_metadata_bytes = 0;
2512 4458660 :
2513 4458660 : Ok(())
2514 4458660 : }
2515 :
2516 1750224 : pub(crate) fn len(&self) -> usize {
2517 1750224 : self.pending_metadata_pages.len()
2518 1750224 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2519 1750224 : + self.pending_deletions.len()
2520 1750224 : }
2521 :
2522 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2523 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2524 : /// in the modification.
2525 : ///
2526 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2527 : /// page must ensure that the pages they read are already committed in Timeline, for example
2528 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2529 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2530 : /// and not data pages.
2531 1719516 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2532 1719516 : if !Self::is_data_key(&key) {
2533 : // Have we already updated the same key? Read the latest pending updated
2534 : // version in that case.
2535 : //
2536 : // Note: we don't check pending_deletions. It is an error to request a
2537 : // value that has been removed, deletion only avoids leaking storage.
2538 1719516 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2539 95568 : if let Some((_, _, value)) = values.last() {
2540 95568 : return if let Value::Image(img) = value {
2541 95568 : Ok(img.clone())
2542 : } else {
2543 : // Currently, we never need to read back a WAL record that we
2544 : // inserted in the same "transaction". All the metadata updates
2545 : // work directly with Images, and we never need to read actual
2546 : // data pages. We could handle this if we had to, by calling
2547 : // the walredo manager, but let's keep it simple for now.
2548 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2549 0 : "unexpected pending WAL record"
2550 0 : )))
2551 : };
2552 0 : }
2553 1623948 : }
2554 : } else {
2555 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2556 : // this key should never be present in pending_data_pages. We ensure this by committing
2557 : // modifications before ingesting DB create operations, which are the only kind that reads
2558 : // data pages during ingest.
2559 0 : if cfg!(debug_assertions) {
2560 0 : assert!(
2561 0 : !self
2562 0 : .pending_data_batch
2563 0 : .as_ref()
2564 0 : .is_some_and(|b| b.updates_key(&key))
2565 0 : );
2566 0 : }
2567 : }
2568 :
2569 : // Metadata page cache miss, or we're reading a data page.
2570 1623948 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2571 1623948 : self.tline.get(key, lsn, ctx).await
2572 1719516 : }
2573 :
2574 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2575 : /// and the empty value into None.
2576 0 : async fn sparse_get(
2577 0 : &self,
2578 0 : key: Key,
2579 0 : ctx: &RequestContext,
2580 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2581 0 : let val = self.get(key, ctx).await;
2582 0 : match val {
2583 0 : Ok(val) if val.is_empty() => Ok(None),
2584 0 : Ok(val) => Ok(Some(val)),
2585 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2586 0 : Err(e) => Err(e),
2587 : }
2588 0 : }
2589 :
2590 : #[cfg(test)]
2591 24 : pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
2592 24 : self.put(key, val);
2593 24 : }
2594 :
2595 3384768 : fn put(&mut self, key: Key, val: Value) {
2596 3384768 : if Self::is_data_key(&key) {
2597 1667208 : self.put_data(key.to_compact(), val)
2598 : } else {
2599 1717560 : self.put_metadata(key.to_compact(), val)
2600 : }
2601 3384768 : }
2602 :
2603 1667208 : fn put_data(&mut self, key: CompactKey, val: Value) {
2604 1667208 : let batch = self
2605 1667208 : .pending_data_batch
2606 1667208 : .get_or_insert_with(SerializedValueBatch::default);
2607 1667208 : batch.put(key, val, self.lsn);
2608 1667208 : }
2609 :
2610 1717560 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2611 1717560 : let values = self.pending_metadata_pages.entry(key).or_default();
2612 : // Replace the previous value if it exists at the same lsn
2613 1717560 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2614 72912 : if *last_lsn == self.lsn {
2615 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2616 72912 : self.pending_metadata_bytes -= *last_value_ser_size;
2617 72912 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2618 72912 : self.pending_metadata_bytes += *last_value_ser_size;
2619 72912 :
2620 72912 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2621 72912 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2622 72912 : *last_value = val;
2623 72912 : return;
2624 0 : }
2625 1644648 : }
2626 :
2627 1644648 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2628 1644648 : self.pending_metadata_bytes += val_serialized_size;
2629 1644648 : values.push((self.lsn, val_serialized_size, val));
2630 1644648 :
2631 1644648 : if key == CHECKPOINT_KEY.to_compact() {
2632 1404 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2633 1643244 : }
2634 1717560 : }
2635 :
2636 12 : fn delete(&mut self, key_range: Range<Key>) {
2637 12 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2638 12 : self.pending_deletions.push((key_range, self.lsn));
2639 12 : }
2640 : }
2641 :
2642 : /// Statistics for a DatadirModification.
2643 : #[derive(Default)]
2644 : pub struct DatadirModificationStats {
2645 : pub metadata_images: u64,
2646 : pub metadata_deltas: u64,
2647 : pub data_images: u64,
2648 : pub data_deltas: u64,
2649 : }
2650 :
2651 : /// This struct facilitates accessing either a committed key from the timeline at a
2652 : /// specific LSN, or the latest uncommitted key from a pending modification.
2653 : ///
2654 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2655 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2656 : /// need to look up the keys in the modification first before looking them up in the
2657 : /// timeline to not miss the latest updates.
2658 : #[derive(Clone, Copy)]
2659 : pub enum Version<'a> {
2660 : Lsn(Lsn),
2661 : Modified(&'a DatadirModification<'a>),
2662 : }
2663 :
2664 : impl Version<'_> {
2665 31056 : async fn get(
2666 31056 : &self,
2667 31056 : timeline: &Timeline,
2668 31056 : key: Key,
2669 31056 : ctx: &RequestContext,
2670 31056 : ) -> Result<Bytes, PageReconstructError> {
2671 31056 : match self {
2672 30936 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2673 120 : Version::Modified(modification) => modification.get(key, ctx).await,
2674 : }
2675 31056 : }
2676 :
2677 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2678 : /// and the empty value into None.
2679 0 : async fn sparse_get(
2680 0 : &self,
2681 0 : timeline: &Timeline,
2682 0 : key: Key,
2683 0 : ctx: &RequestContext,
2684 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2685 0 : let val = self.get(timeline, key, ctx).await;
2686 0 : match val {
2687 0 : Ok(val) if val.is_empty() => Ok(None),
2688 0 : Ok(val) => Ok(Some(val)),
2689 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2690 0 : Err(e) => Err(e),
2691 : }
2692 0 : }
2693 :
2694 213720 : fn get_lsn(&self) -> Lsn {
2695 213720 : match self {
2696 177444 : Version::Lsn(lsn) => *lsn,
2697 36276 : Version::Modified(modification) => modification.lsn,
2698 : }
2699 213720 : }
2700 : }
2701 :
2702 : //--- Metadata structs stored in key-value pairs in the repository.
2703 :
2704 0 : #[derive(Debug, Serialize, Deserialize)]
2705 : pub(crate) struct DbDirectory {
2706 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2707 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2708 : }
2709 :
2710 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2711 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2712 : // were named like "pg_twophase/000002E5", now they're like
2713 : // "pg_twophsae/0000000A000002E4".
2714 :
2715 0 : #[derive(Debug, Serialize, Deserialize)]
2716 : pub(crate) struct TwoPhaseDirectory {
2717 : pub(crate) xids: HashSet<TransactionId>,
2718 : }
2719 :
2720 0 : #[derive(Debug, Serialize, Deserialize)]
2721 : struct TwoPhaseDirectoryV17 {
2722 : xids: HashSet<u64>,
2723 : }
2724 :
2725 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2726 : pub(crate) struct RelDirectory {
2727 : // Set of relations that exist. (relfilenode, forknum)
2728 : //
2729 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2730 : // key-value pairs, if you have a lot of relations
2731 : pub(crate) rels: HashSet<(Oid, u8)>,
2732 : }
2733 :
2734 0 : #[derive(Debug, Serialize, Deserialize)]
2735 : struct RelSizeEntry {
2736 : nblocks: u32,
2737 : }
2738 :
2739 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2740 : pub(crate) struct SlruSegmentDirectory {
2741 : // Set of SLRU segments that exist.
2742 : pub(crate) segments: HashSet<u32>,
2743 : }
2744 :
2745 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2746 : #[repr(u8)]
2747 : pub(crate) enum DirectoryKind {
2748 : Db,
2749 : TwoPhase,
2750 : Rel,
2751 : AuxFiles,
2752 : SlruSegment(SlruKind),
2753 : RelV2,
2754 : }
2755 :
2756 : impl DirectoryKind {
2757 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2758 54624 : pub(crate) fn offset(&self) -> usize {
2759 54624 : self.into_usize()
2760 54624 : }
2761 : }
2762 :
2763 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2764 :
2765 : #[allow(clippy::bool_assert_comparison)]
2766 : #[cfg(test)]
2767 : mod tests {
2768 : use hex_literal::hex;
2769 : use pageserver_api::models::ShardParameters;
2770 : use pageserver_api::shard::ShardStripeSize;
2771 : use utils::id::TimelineId;
2772 : use utils::shard::{ShardCount, ShardNumber};
2773 :
2774 : use super::*;
2775 : use crate::DEFAULT_PG_VERSION;
2776 : use crate::tenant::harness::TenantHarness;
2777 :
2778 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2779 : #[tokio::test]
2780 12 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2781 12 : let name = "aux_files_round_trip";
2782 12 : let harness = TenantHarness::create(name).await?;
2783 12 :
2784 12 : pub const TIMELINE_ID: TimelineId =
2785 12 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2786 12 :
2787 12 : let (tenant, ctx) = harness.load().await;
2788 12 : let (tline, ctx) = tenant
2789 12 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2790 12 : .await?;
2791 12 : let tline = tline.raw_timeline().unwrap();
2792 12 :
2793 12 : // First modification: insert two keys
2794 12 : let mut modification = tline.begin_modification(Lsn(0x1000));
2795 12 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2796 12 : modification.set_lsn(Lsn(0x1008))?;
2797 12 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2798 12 : modification.commit(&ctx).await?;
2799 12 : let expect_1008 = HashMap::from([
2800 12 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2801 12 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2802 12 : ]);
2803 12 :
2804 12 : let io_concurrency = IoConcurrency::spawn_for_test();
2805 12 :
2806 12 : let readback = tline
2807 12 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2808 12 : .await?;
2809 12 : assert_eq!(readback, expect_1008);
2810 12 :
2811 12 : // Second modification: update one key, remove the other
2812 12 : let mut modification = tline.begin_modification(Lsn(0x2000));
2813 12 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2814 12 : modification.set_lsn(Lsn(0x2008))?;
2815 12 : modification.put_file("foo/bar2", b"", &ctx).await?;
2816 12 : modification.commit(&ctx).await?;
2817 12 : let expect_2008 =
2818 12 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2819 12 :
2820 12 : let readback = tline
2821 12 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2822 12 : .await?;
2823 12 : assert_eq!(readback, expect_2008);
2824 12 :
2825 12 : // Reading back in time works
2826 12 : let readback = tline
2827 12 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2828 12 : .await?;
2829 12 : assert_eq!(readback, expect_1008);
2830 12 :
2831 12 : Ok(())
2832 12 : }
2833 :
2834 : #[test]
2835 12 : fn gap_finding() {
2836 12 : let rel = RelTag {
2837 12 : spcnode: 1663,
2838 12 : dbnode: 208101,
2839 12 : relnode: 2620,
2840 12 : forknum: 0,
2841 12 : };
2842 12 : let base_blkno = 1;
2843 12 :
2844 12 : let base_key = rel_block_to_key(rel, base_blkno);
2845 12 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2846 12 :
2847 12 : let shard = ShardIdentity::unsharded();
2848 12 :
2849 12 : let mut previous_nblocks = 0;
2850 132 : for i in 0..10 {
2851 120 : let crnt_blkno = base_blkno + i;
2852 120 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2853 120 :
2854 120 : previous_nblocks = crnt_blkno + 1;
2855 120 :
2856 120 : if i == 0 {
2857 : // The first block we write is 1, so we should find the gap.
2858 12 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2859 : } else {
2860 108 : assert!(gaps.is_none());
2861 : }
2862 : }
2863 :
2864 : // This is an update to an already existing block. No gaps here.
2865 12 : let update_blkno = 5;
2866 12 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2867 12 : assert!(gaps.is_none());
2868 :
2869 : // This is an update past the current end block.
2870 12 : let after_gap_blkno = 20;
2871 12 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2872 12 :
2873 12 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2874 12 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2875 12 : assert_eq!(
2876 12 : gaps.unwrap(),
2877 12 : KeySpace::single(gap_start_key..after_gap_key)
2878 12 : );
2879 12 : }
2880 :
2881 : #[test]
2882 12 : fn sharded_gap_finding() {
2883 12 : let rel = RelTag {
2884 12 : spcnode: 1663,
2885 12 : dbnode: 208101,
2886 12 : relnode: 2620,
2887 12 : forknum: 0,
2888 12 : };
2889 12 :
2890 12 : let first_blkno = 6;
2891 12 :
2892 12 : // This shard will get the even blocks
2893 12 : let shard = ShardIdentity::from_params(
2894 12 : ShardNumber(0),
2895 12 : &ShardParameters {
2896 12 : count: ShardCount(2),
2897 12 : stripe_size: ShardStripeSize(1),
2898 12 : },
2899 12 : );
2900 12 :
2901 12 : // Only keys belonging to this shard are considered as gaps.
2902 12 : let mut previous_nblocks = 0;
2903 12 : let gaps =
2904 12 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2905 12 : assert!(!gaps.ranges.is_empty());
2906 36 : for gap_range in gaps.ranges {
2907 24 : let mut k = gap_range.start;
2908 48 : while k != gap_range.end {
2909 24 : assert_eq!(shard.get_shard_number(&k), shard.number);
2910 24 : k = k.next();
2911 : }
2912 : }
2913 :
2914 12 : previous_nblocks = first_blkno;
2915 12 :
2916 12 : let update_blkno = 2;
2917 12 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2918 12 : assert!(gaps.is_none());
2919 12 : }
2920 :
2921 : /*
2922 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2923 : let incremental = timeline.get_current_logical_size();
2924 : let non_incremental = timeline
2925 : .get_current_logical_size_non_incremental(lsn)
2926 : .unwrap();
2927 : assert_eq!(incremental, non_incremental);
2928 : }
2929 : */
2930 :
2931 : /*
2932 : ///
2933 : /// Test list_rels() function, with branches and dropped relations
2934 : ///
2935 : #[test]
2936 : fn test_list_rels_drop() -> Result<()> {
2937 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2938 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2939 : const TESTDB: u32 = 111;
2940 :
2941 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2942 : // after branching fails below
2943 : let mut writer = tline.begin_record(Lsn(0x10));
2944 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2945 : writer.finish()?;
2946 :
2947 : // Create a relation on the timeline
2948 : let mut writer = tline.begin_record(Lsn(0x20));
2949 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2950 : writer.finish()?;
2951 :
2952 : let writer = tline.begin_record(Lsn(0x00));
2953 : writer.finish()?;
2954 :
2955 : // Check that list_rels() lists it after LSN 2, but no before it
2956 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2957 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2958 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2959 :
2960 : // Create a branch, check that the relation is visible there
2961 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2962 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2963 : Some(timeline) => timeline,
2964 : None => panic!("Should have a local timeline"),
2965 : };
2966 : let newtline = DatadirTimelineImpl::new(newtline);
2967 : assert!(newtline
2968 : .list_rels(0, TESTDB, Lsn(0x30))?
2969 : .contains(&TESTREL_A));
2970 :
2971 : // Drop it on the branch
2972 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2973 : new_writer.drop_relation(TESTREL_A)?;
2974 : new_writer.finish()?;
2975 :
2976 : // Check that it's no longer listed on the branch after the point where it was dropped
2977 : assert!(newtline
2978 : .list_rels(0, TESTDB, Lsn(0x30))?
2979 : .contains(&TESTREL_A));
2980 : assert!(!newtline
2981 : .list_rels(0, TESTDB, Lsn(0x40))?
2982 : .contains(&TESTREL_A));
2983 :
2984 : // Run checkpoint and garbage collection and check that it's still not visible
2985 : newtline.checkpoint(CheckpointConfig::Forced)?;
2986 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2987 :
2988 : assert!(!newtline
2989 : .list_rels(0, TESTDB, Lsn(0x40))?
2990 : .contains(&TESTREL_A));
2991 :
2992 : Ok(())
2993 : }
2994 : */
2995 :
2996 : /*
2997 : #[test]
2998 : fn test_read_beyond_eof() -> Result<()> {
2999 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3000 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3001 :
3002 : make_some_layers(&tline, Lsn(0x20))?;
3003 : let mut writer = tline.begin_record(Lsn(0x60));
3004 : walingest.put_rel_page_image(
3005 : &mut writer,
3006 : TESTREL_A,
3007 : 0,
3008 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3009 : )?;
3010 : writer.finish()?;
3011 :
3012 : // Test read before rel creation. Should error out.
3013 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3014 :
3015 : // Read block beyond end of relation at different points in time.
3016 : // These reads should fall into different delta, image, and in-memory layers.
3017 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3018 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3019 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3020 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3021 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3022 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3023 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3024 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3025 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3026 :
3027 : // Test on an in-memory layer with no preceding layer
3028 : let mut writer = tline.begin_record(Lsn(0x70));
3029 : walingest.put_rel_page_image(
3030 : &mut writer,
3031 : TESTREL_B,
3032 : 0,
3033 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3034 : )?;
3035 : writer.finish()?;
3036 :
3037 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3038 :
3039 : Ok(())
3040 : }
3041 : */
3042 : }
|