Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 :
12 : use crate::PERF_TRACE_TARGET;
13 : use anyhow::{Context, ensure};
14 : use bytes::{Buf, Bytes, BytesMut};
15 : use enum_map::Enum;
16 : use itertools::Itertools;
17 : use pageserver_api::key::{
18 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
19 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
20 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
21 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
22 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
23 : };
24 : use pageserver_api::keyspace::SparseKeySpace;
25 : use pageserver_api::models::RelSizeMigration;
26 : use pageserver_api::record::NeonWalRecord;
27 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
28 : use pageserver_api::shard::ShardIdentity;
29 : use pageserver_api::value::Value;
30 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
31 : use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
32 : use serde::{Deserialize, Serialize};
33 : use strum::IntoEnumIterator;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::{debug, info, info_span, trace, warn};
36 : use utils::bin_ser::{BeSer, DeserializeError};
37 : use utils::lsn::Lsn;
38 : use utils::pausable_failpoint;
39 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
40 :
41 : use super::tenant::{PageReconstructError, Timeline};
42 : use crate::aux_file;
43 : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
44 : use crate::keyspace::{KeySpace, KeySpaceAccum};
45 : use crate::metrics::{
46 : RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
47 : };
48 : use crate::span::{
49 : debug_assert_current_span_has_tenant_and_timeline_id,
50 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
51 : };
52 : use crate::tenant::storage_layer::IoConcurrency;
53 : use crate::tenant::timeline::GetVectoredError;
54 :
55 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
56 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
57 :
58 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
59 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
60 :
61 : #[derive(Debug)]
62 : pub enum LsnForTimestamp {
63 : /// Found commits both before and after the given timestamp
64 : Present(Lsn),
65 :
66 : /// Found no commits after the given timestamp, this means
67 : /// that the newest data in the branch is older than the given
68 : /// timestamp.
69 : ///
70 : /// All commits <= LSN happened before the given timestamp
71 : Future(Lsn),
72 :
73 : /// The queried timestamp is past our horizon we look back at (PITR)
74 : ///
75 : /// All commits > LSN happened after the given timestamp,
76 : /// but any commits < LSN might have happened before or after
77 : /// the given timestamp. We don't know because no data before
78 : /// the given lsn is available.
79 : Past(Lsn),
80 :
81 : /// We have found no commit with a timestamp,
82 : /// so we can't return anything meaningful.
83 : ///
84 : /// The associated LSN is the lower bound value we can safely
85 : /// create branches on, but no statement is made if it is
86 : /// older or newer than the timestamp.
87 : ///
88 : /// This variant can e.g. be returned right after a
89 : /// cluster import.
90 : NoData(Lsn),
91 : }
92 :
93 : #[derive(Debug, thiserror::Error)]
94 : pub(crate) enum CalculateLogicalSizeError {
95 : #[error("cancelled")]
96 : Cancelled,
97 :
98 : /// Something went wrong while reading the metadata we use to calculate logical size
99 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
100 : /// in the `From` implementation for this variant.
101 : #[error(transparent)]
102 : PageRead(PageReconstructError),
103 :
104 : /// Something went wrong deserializing metadata that we read to calculate logical size
105 : #[error("decode error: {0}")]
106 : Decode(#[from] DeserializeError),
107 : }
108 :
109 : #[derive(Debug, thiserror::Error)]
110 : pub(crate) enum CollectKeySpaceError {
111 : #[error(transparent)]
112 : Decode(#[from] DeserializeError),
113 : #[error(transparent)]
114 : PageRead(PageReconstructError),
115 : #[error("cancelled")]
116 : Cancelled,
117 : }
118 :
119 : impl From<PageReconstructError> for CollectKeySpaceError {
120 0 : fn from(err: PageReconstructError) -> Self {
121 0 : match err {
122 0 : PageReconstructError::Cancelled => Self::Cancelled,
123 0 : err => Self::PageRead(err),
124 : }
125 0 : }
126 : }
127 :
128 : impl From<PageReconstructError> for CalculateLogicalSizeError {
129 0 : fn from(pre: PageReconstructError) -> Self {
130 0 : match pre {
131 0 : PageReconstructError::Cancelled => Self::Cancelled,
132 0 : _ => Self::PageRead(pre),
133 : }
134 0 : }
135 : }
136 :
137 : #[derive(Debug, thiserror::Error)]
138 : pub enum RelationError {
139 : #[error("Relation Already Exists")]
140 : AlreadyExists,
141 : #[error("invalid relnode")]
142 : InvalidRelnode,
143 : #[error(transparent)]
144 : Other(#[from] anyhow::Error),
145 : }
146 :
147 : ///
148 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
149 : /// and other special kinds of files, in a versioned key-value store. The
150 : /// Timeline struct provides the key-value store.
151 : ///
152 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
153 : /// implementation, and might be moved into a separate struct later.
154 : impl Timeline {
155 : /// Start ingesting a WAL record, or other atomic modification of
156 : /// the timeline.
157 : ///
158 : /// This provides a transaction-like interface to perform a bunch
159 : /// of modifications atomically.
160 : ///
161 : /// To ingest a WAL record, call begin_modification(lsn) to get a
162 : /// DatadirModification object. Use the functions in the object to
163 : /// modify the repository state, updating all the pages and metadata
164 : /// that the WAL record affects. When you're done, call commit() to
165 : /// commit the changes.
166 : ///
167 : /// Lsn stored in modification is advanced by `ingest_record` and
168 : /// is used by `commit()` to update `last_record_lsn`.
169 : ///
170 : /// Calling commit() will flush all the changes and reset the state,
171 : /// so the `DatadirModification` struct can be reused to perform the next modification.
172 : ///
173 : /// Note that any pending modifications you make through the
174 : /// modification object won't be visible to calls to the 'get' and list
175 : /// functions of the timeline until you finish! And if you update the
176 : /// same page twice, the last update wins.
177 : ///
178 536836 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
179 536836 : where
180 536836 : Self: Sized,
181 536836 : {
182 536836 : DatadirModification {
183 536836 : tline: self,
184 536836 : pending_lsns: Vec::new(),
185 536836 : pending_metadata_pages: HashMap::new(),
186 536836 : pending_data_batch: None,
187 536836 : pending_deletions: Vec::new(),
188 536836 : pending_nblocks: 0,
189 536836 : pending_directory_entries: Vec::new(),
190 536836 : pending_metadata_bytes: 0,
191 536836 : lsn,
192 536836 : }
193 536836 : }
194 :
195 : //------------------------------------------------------------------------------
196 : // Public GET functions
197 : //------------------------------------------------------------------------------
198 :
199 : /// Look up given page version.
200 36768 : pub(crate) async fn get_rel_page_at_lsn(
201 36768 : &self,
202 36768 : tag: RelTag,
203 36768 : blknum: BlockNumber,
204 36768 : version: Version<'_>,
205 36768 : ctx: &RequestContext,
206 36768 : io_concurrency: IoConcurrency,
207 36768 : ) -> Result<Bytes, PageReconstructError> {
208 36768 : match version {
209 36768 : Version::Lsn(effective_lsn) => {
210 36768 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
211 36768 : let res = self
212 36768 : .get_rel_page_at_lsn_batched(
213 36768 : pages
214 36768 : .iter()
215 36768 : .map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
216 36768 : effective_lsn,
217 36768 : io_concurrency.clone(),
218 36768 : ctx,
219 36768 : )
220 36768 : .await;
221 36768 : assert_eq!(res.len(), 1);
222 36768 : res.into_iter().next().unwrap()
223 : }
224 0 : Version::Modified(modification) => {
225 0 : if tag.relnode == 0 {
226 0 : return Err(PageReconstructError::Other(
227 0 : RelationError::InvalidRelnode.into(),
228 0 : ));
229 0 : }
230 :
231 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
232 0 : if blknum >= nblocks {
233 0 : debug!(
234 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
235 0 : tag,
236 0 : blknum,
237 0 : version.get_lsn(),
238 : nblocks
239 : );
240 0 : return Ok(ZERO_PAGE.clone());
241 0 : }
242 0 :
243 0 : let key = rel_block_to_key(tag, blknum);
244 0 : modification.get(key, ctx).await
245 : }
246 : }
247 36768 : }
248 :
249 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
250 : ///
251 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
252 36768 : pub(crate) async fn get_rel_page_at_lsn_batched(
253 36768 : &self,
254 36768 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
255 36768 : effective_lsn: Lsn,
256 36768 : io_concurrency: IoConcurrency,
257 36768 : ctx: &RequestContext,
258 36768 : ) -> Vec<Result<Bytes, PageReconstructError>> {
259 36768 : debug_assert_current_span_has_tenant_and_timeline_id();
260 36768 :
261 36768 : let mut slots_filled = 0;
262 36768 : let page_count = pages.len();
263 36768 :
264 36768 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
265 36768 : let mut result = Vec::with_capacity(pages.len());
266 36768 : let result_slots = result.spare_capacity_mut();
267 36768 :
268 36768 : let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
269 36768 : BTreeMap::default();
270 36768 :
271 36768 : let mut perf_instrument = false;
272 36768 : for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() {
273 36768 : if tag.relnode == 0 {
274 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
275 0 : RelationError::InvalidRelnode.into(),
276 0 : )));
277 0 :
278 0 : slots_filled += 1;
279 0 : continue;
280 36768 : }
281 :
282 36768 : let nblocks = match self
283 36768 : .get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx)
284 36768 : .maybe_perf_instrument(&ctx, |crnt_perf_span| {
285 0 : info_span!(
286 : target: PERF_TRACE_TARGET,
287 0 : parent: crnt_perf_span,
288 : "GET_REL_SIZE",
289 : reltag=%tag,
290 : lsn=%effective_lsn,
291 : )
292 36768 : })
293 36768 : .await
294 : {
295 36768 : Ok(nblocks) => nblocks,
296 0 : Err(err) => {
297 0 : result_slots[response_slot_idx].write(Err(err));
298 0 : slots_filled += 1;
299 0 : continue;
300 : }
301 : };
302 :
303 36768 : if *blknum >= nblocks {
304 0 : debug!(
305 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
306 : tag, blknum, effective_lsn, nblocks
307 : );
308 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
309 0 : slots_filled += 1;
310 0 : continue;
311 36768 : }
312 36768 :
313 36768 : let key = rel_block_to_key(*tag, *blknum);
314 36768 :
315 36768 : if ctx.has_perf_span() {
316 0 : perf_instrument = true;
317 36768 : }
318 :
319 36768 : let key_slots = keys_slots.entry(key).or_default();
320 36768 : key_slots.push((response_slot_idx, ctx));
321 : }
322 :
323 36768 : let keyspace = {
324 : // add_key requires monotonicity
325 36768 : let mut acc = KeySpaceAccum::new();
326 36768 : for key in keys_slots
327 36768 : .keys()
328 36768 : // in fact it requires strong monotonicity
329 36768 : .dedup()
330 36768 : {
331 36768 : acc.add_key(*key);
332 36768 : }
333 36768 : acc.to_keyspace()
334 : };
335 :
336 36768 : let ctx = match perf_instrument {
337 0 : true => RequestContextBuilder::from(ctx)
338 0 : .root_perf_span(|| {
339 0 : info_span!(
340 : target: PERF_TRACE_TARGET,
341 : "GET_VECTORED",
342 : tenant_id = %self.tenant_shard_id.tenant_id,
343 : timeline_id = %self.timeline_id,
344 : lsn = %effective_lsn,
345 0 : shard = %self.tenant_shard_id.shard_slug(),
346 : )
347 0 : })
348 0 : .attached_child(),
349 36768 : false => ctx.attached_child(),
350 : };
351 :
352 36768 : let res = self
353 36768 : .get_vectored(keyspace, effective_lsn, io_concurrency, &ctx)
354 36768 : .maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
355 36768 : .await;
356 :
357 36768 : match res {
358 36768 : Ok(results) => {
359 73536 : for (key, res) in results {
360 36768 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
361 36768 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
362 :
363 36768 : for (slot, req_ctx) in key_slots {
364 0 : let clone = match &res {
365 0 : Ok(buf) => Ok(buf.clone()),
366 0 : Err(err) => Err(match err {
367 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
368 :
369 0 : x @ PageReconstructError::Other(_)
370 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
371 0 : | x @ PageReconstructError::WalRedo(_)
372 0 : | x @ PageReconstructError::MissingKey(_) => {
373 0 : PageReconstructError::Other(anyhow::anyhow!(
374 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
375 0 : ))
376 : }
377 : }),
378 : };
379 :
380 0 : result_slots[slot].write(clone);
381 0 : // There is no standardized way to express that the batched span followed from N request spans.
382 0 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
383 0 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
384 0 : req_ctx.perf_follows_from(&ctx);
385 0 : slots_filled += 1;
386 : }
387 :
388 36768 : result_slots[first_slot].write(res);
389 36768 : first_req_ctx.perf_follows_from(&ctx);
390 36768 : slots_filled += 1;
391 : }
392 : }
393 0 : Err(err) => {
394 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
395 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
396 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
397 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
398 : // but without taking ownership of the GetVectoredError
399 0 : let err = match &err {
400 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
401 : // TODO: restructure get_vectored API to make this error per-key
402 0 : GetVectoredError::MissingKey(err) => {
403 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
404 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
405 0 : )))
406 : }
407 : // TODO: restructure get_vectored API to make this error per-key
408 0 : GetVectoredError::GetReadyAncestorError(err) => {
409 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
410 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
411 0 : )))
412 : }
413 : // TODO: restructure get_vectored API to make this error per-key
414 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
415 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
416 0 : )),
417 : // TODO: we can prevent this error class by moving this check into the type system
418 0 : GetVectoredError::InvalidLsn(e) => {
419 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
420 : }
421 : // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
422 : // TODO: we can prevent this error class by moving this check into the type system
423 0 : GetVectoredError::Oversized(err) => {
424 0 : Err(anyhow::anyhow!("batching oversized: {err:?}").into())
425 : }
426 : };
427 :
428 0 : req_ctx.perf_follows_from(&ctx);
429 0 : result_slots[*slot].write(err);
430 : }
431 :
432 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
433 0 : }
434 : };
435 :
436 36768 : assert_eq!(slots_filled, page_count);
437 : // SAFETY:
438 : // 1. `result` and any of its uninint members are not read from until this point
439 : // 2. The length below is tracked at run-time and matches the number of requested pages.
440 36768 : unsafe {
441 36768 : result.set_len(page_count);
442 36768 : }
443 36768 :
444 36768 : result
445 36768 : }
446 :
447 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
448 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
449 : /// for pages up to the highest page number it has stored.
450 0 : pub(crate) async fn get_db_size(
451 0 : &self,
452 0 : spcnode: Oid,
453 0 : dbnode: Oid,
454 0 : version: Version<'_>,
455 0 : ctx: &RequestContext,
456 0 : ) -> Result<usize, PageReconstructError> {
457 0 : let mut total_blocks = 0;
458 :
459 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
460 :
461 0 : for rel in rels {
462 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
463 0 : total_blocks += n_blocks as usize;
464 : }
465 0 : Ok(total_blocks)
466 0 : }
467 :
468 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
469 : ///
470 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
471 : /// page number stored in the shard.
472 48868 : pub(crate) async fn get_rel_size(
473 48868 : &self,
474 48868 : tag: RelTag,
475 48868 : version: Version<'_>,
476 48868 : ctx: &RequestContext,
477 48868 : ) -> Result<BlockNumber, PageReconstructError> {
478 48868 : if tag.relnode == 0 {
479 0 : return Err(PageReconstructError::Other(
480 0 : RelationError::InvalidRelnode.into(),
481 0 : ));
482 48868 : }
483 :
484 48868 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
485 38588 : return Ok(nblocks);
486 10280 : }
487 10280 :
488 10280 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
489 0 : && !self.get_rel_exists(tag, version, ctx).await?
490 : {
491 : // FIXME: Postgres sometimes calls smgrcreate() to create
492 : // FSM, and smgrnblocks() on it immediately afterwards,
493 : // without extending it. Tolerate that by claiming that
494 : // any non-existent FSM fork has size 0.
495 0 : return Ok(0);
496 10280 : }
497 10280 :
498 10280 : let key = rel_size_to_key(tag);
499 10280 : let mut buf = version.get(self, key, ctx).await?;
500 10272 : let nblocks = buf.get_u32_le();
501 10272 :
502 10272 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
503 10272 :
504 10272 : Ok(nblocks)
505 48868 : }
506 :
507 : /// Does the relation exist?
508 : ///
509 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
510 : /// the shard stores pages for.
511 12100 : pub(crate) async fn get_rel_exists(
512 12100 : &self,
513 12100 : tag: RelTag,
514 12100 : version: Version<'_>,
515 12100 : ctx: &RequestContext,
516 12100 : ) -> Result<bool, PageReconstructError> {
517 12100 : if tag.relnode == 0 {
518 0 : return Err(PageReconstructError::Other(
519 0 : RelationError::InvalidRelnode.into(),
520 0 : ));
521 12100 : }
522 :
523 : // first try to lookup relation in cache
524 12100 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
525 12064 : return Ok(true);
526 36 : }
527 : // then check if the database was already initialized.
528 : // get_rel_exists can be called before dbdir is created.
529 36 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
530 36 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
531 36 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
532 0 : return Ok(false);
533 36 : }
534 36 :
535 36 : // Read path: first read the new reldir keyspace. Early return if the relation exists.
536 36 : // Otherwise, read the old reldir keyspace.
537 36 : // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
538 36 :
539 36 : if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
540 36 : self.get_rel_size_v2_status()
541 : {
542 : // fetch directory listing (new)
543 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
544 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
545 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
546 0 : let exists_v2 = buf == RelDirExists::Exists;
547 0 : // Fast path: if the relation exists in the new format, return true.
548 0 : // TODO: we should have a verification mode that checks both keyspaces
549 0 : // to ensure the relation only exists in one of them.
550 0 : if exists_v2 {
551 0 : return Ok(true);
552 0 : }
553 36 : }
554 :
555 : // fetch directory listing (old)
556 :
557 36 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
558 36 : let buf = version.get(self, key, ctx).await?;
559 :
560 36 : let dir = RelDirectory::des(&buf)?;
561 36 : let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
562 36 : Ok(exists_v1)
563 12100 : }
564 :
565 : /// Get a list of all existing relations in given tablespace and database.
566 : ///
567 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
568 : /// the shard stores pages for.
569 : ///
570 : /// # Cancel-Safety
571 : ///
572 : /// This method is cancellation-safe.
573 0 : pub(crate) async fn list_rels(
574 0 : &self,
575 0 : spcnode: Oid,
576 0 : dbnode: Oid,
577 0 : version: Version<'_>,
578 0 : ctx: &RequestContext,
579 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
580 0 : // fetch directory listing (old)
581 0 : let key = rel_dir_to_key(spcnode, dbnode);
582 0 : let buf = version.get(self, key, ctx).await?;
583 :
584 0 : let dir = RelDirectory::des(&buf)?;
585 0 : let rels_v1: HashSet<RelTag> =
586 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
587 0 : spcnode,
588 0 : dbnode,
589 0 : relnode: *relnode,
590 0 : forknum: *forknum,
591 0 : }));
592 0 :
593 0 : if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
594 0 : return Ok(rels_v1);
595 0 : }
596 0 :
597 0 : // scan directory listing (new), merge with the old results
598 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
599 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
600 0 : self.conf,
601 0 : self.gate
602 0 : .enter()
603 0 : .map_err(|_| PageReconstructError::Cancelled)?,
604 : );
605 0 : let results = self
606 0 : .scan(
607 0 : KeySpace::single(key_range),
608 0 : version.get_lsn(),
609 0 : ctx,
610 0 : io_concurrency,
611 0 : )
612 0 : .await?;
613 0 : let mut rels = rels_v1;
614 0 : for (key, val) in results {
615 0 : let val = RelDirExists::decode(&val?)
616 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
617 0 : assert_eq!(key.field6, 1);
618 0 : assert_eq!(key.field2, spcnode);
619 0 : assert_eq!(key.field3, dbnode);
620 0 : let tag = RelTag {
621 0 : spcnode,
622 0 : dbnode,
623 0 : relnode: key.field4,
624 0 : forknum: key.field5,
625 0 : };
626 0 : if val == RelDirExists::Removed {
627 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
628 0 : continue;
629 0 : }
630 0 : let did_not_contain = rels.insert(tag);
631 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
632 : }
633 0 : Ok(rels)
634 0 : }
635 :
636 : /// Get the whole SLRU segment
637 0 : pub(crate) async fn get_slru_segment(
638 0 : &self,
639 0 : kind: SlruKind,
640 0 : segno: u32,
641 0 : lsn: Lsn,
642 0 : ctx: &RequestContext,
643 0 : ) -> Result<Bytes, PageReconstructError> {
644 0 : assert!(self.tenant_shard_id.is_shard_zero());
645 0 : let n_blocks = self
646 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
647 0 : .await?;
648 :
649 0 : let keyspace = KeySpace::single(
650 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
651 0 : );
652 0 :
653 0 : let batches = keyspace.partition(
654 0 : self.get_shard_identity(),
655 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
656 0 : );
657 :
658 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
659 0 : self.conf,
660 0 : self.gate
661 0 : .enter()
662 0 : .map_err(|_| PageReconstructError::Cancelled)?,
663 : );
664 :
665 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
666 0 : for batch in batches.parts {
667 0 : let blocks = self
668 0 : .get_vectored(batch, lsn, io_concurrency.clone(), ctx)
669 0 : .await?;
670 :
671 0 : for (_key, block) in blocks {
672 0 : let block = block?;
673 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
674 : }
675 : }
676 :
677 0 : Ok(segment.freeze())
678 0 : }
679 :
680 : /// Get size of an SLRU segment
681 0 : pub(crate) async fn get_slru_segment_size(
682 0 : &self,
683 0 : kind: SlruKind,
684 0 : segno: u32,
685 0 : version: Version<'_>,
686 0 : ctx: &RequestContext,
687 0 : ) -> Result<BlockNumber, PageReconstructError> {
688 0 : assert!(self.tenant_shard_id.is_shard_zero());
689 0 : let key = slru_segment_size_to_key(kind, segno);
690 0 : let mut buf = version.get(self, key, ctx).await?;
691 0 : Ok(buf.get_u32_le())
692 0 : }
693 :
694 : /// Does the slru segment exist?
695 0 : pub(crate) async fn get_slru_segment_exists(
696 0 : &self,
697 0 : kind: SlruKind,
698 0 : segno: u32,
699 0 : version: Version<'_>,
700 0 : ctx: &RequestContext,
701 0 : ) -> Result<bool, PageReconstructError> {
702 0 : assert!(self.tenant_shard_id.is_shard_zero());
703 : // fetch directory listing
704 0 : let key = slru_dir_to_key(kind);
705 0 : let buf = version.get(self, key, ctx).await?;
706 :
707 0 : let dir = SlruSegmentDirectory::des(&buf)?;
708 0 : Ok(dir.segments.contains(&segno))
709 0 : }
710 :
711 : /// Locate LSN, such that all transactions that committed before
712 : /// 'search_timestamp' are visible, but nothing newer is.
713 : ///
714 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
715 : /// so it's not well defined which LSN you get if there were multiple commits
716 : /// "in flight" at that point in time.
717 : ///
718 0 : pub(crate) async fn find_lsn_for_timestamp(
719 0 : &self,
720 0 : search_timestamp: TimestampTz,
721 0 : cancel: &CancellationToken,
722 0 : ctx: &RequestContext,
723 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
724 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
725 :
726 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
727 0 : let gc_cutoff_planned = {
728 0 : let gc_info = self.gc_info.read().unwrap();
729 0 : gc_info.min_cutoff()
730 0 : };
731 0 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
732 0 : // but let's be defensive.
733 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
734 0 : // We use this method to figure out the branching LSN for the new branch, but the
735 0 : // GC cutoff could be before the branching point and we cannot create a new branch
736 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
737 0 : // on the safe side.
738 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
739 0 : let max_lsn = self.get_last_record_lsn();
740 0 :
741 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
742 0 : // LSN divided by 8.
743 0 : let mut low = min_lsn.0 / 8;
744 0 : let mut high = max_lsn.0 / 8 + 1;
745 0 :
746 0 : let mut found_smaller = false;
747 0 : let mut found_larger = false;
748 :
749 0 : while low < high {
750 0 : if cancel.is_cancelled() {
751 0 : return Err(PageReconstructError::Cancelled);
752 0 : }
753 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
754 0 : let mid = (high + low) / 2;
755 :
756 0 : let cmp = match self
757 0 : .is_latest_commit_timestamp_ge_than(
758 0 : search_timestamp,
759 0 : Lsn(mid * 8),
760 0 : &mut found_smaller,
761 0 : &mut found_larger,
762 0 : ctx,
763 0 : )
764 0 : .await
765 : {
766 0 : Ok(res) => res,
767 0 : Err(PageReconstructError::MissingKey(e)) => {
768 0 : warn!(
769 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
770 : e
771 : );
772 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
773 0 : return Ok(LsnForTimestamp::Past(min_lsn));
774 : }
775 0 : Err(e) => return Err(e),
776 : };
777 :
778 0 : if cmp {
779 0 : high = mid;
780 0 : } else {
781 0 : low = mid + 1;
782 0 : }
783 : }
784 :
785 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
786 : // so the LSN of the last commit record before or at `search_timestamp`.
787 : // Remove one from `low` to get `t`.
788 : //
789 : // FIXME: it would be better to get the LSN of the previous commit.
790 : // Otherwise, if you restore to the returned LSN, the database will
791 : // include physical changes from later commits that will be marked
792 : // as aborted, and will need to be vacuumed away.
793 0 : let commit_lsn = Lsn((low - 1) * 8);
794 0 : match (found_smaller, found_larger) {
795 : (false, false) => {
796 : // This can happen if no commit records have been processed yet, e.g.
797 : // just after importing a cluster.
798 0 : Ok(LsnForTimestamp::NoData(min_lsn))
799 : }
800 : (false, true) => {
801 : // Didn't find any commit timestamps smaller than the request
802 0 : Ok(LsnForTimestamp::Past(min_lsn))
803 : }
804 0 : (true, _) if commit_lsn < min_lsn => {
805 0 : // the search above did set found_smaller to true but it never increased the lsn.
806 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
807 0 : // below the min_lsn. We should never do that.
808 0 : Ok(LsnForTimestamp::Past(min_lsn))
809 : }
810 : (true, false) => {
811 : // Only found commits with timestamps smaller than the request.
812 : // It's still a valid case for branch creation, return it.
813 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
814 : // case, anyway.
815 0 : Ok(LsnForTimestamp::Future(commit_lsn))
816 : }
817 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
818 : }
819 0 : }
820 :
821 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
822 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
823 : ///
824 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
825 : /// with a smaller/larger timestamp.
826 : ///
827 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
828 0 : &self,
829 0 : search_timestamp: TimestampTz,
830 0 : probe_lsn: Lsn,
831 0 : found_smaller: &mut bool,
832 0 : found_larger: &mut bool,
833 0 : ctx: &RequestContext,
834 0 : ) -> Result<bool, PageReconstructError> {
835 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
836 0 : if timestamp >= search_timestamp {
837 0 : *found_larger = true;
838 0 : return ControlFlow::Break(true);
839 0 : } else {
840 0 : *found_smaller = true;
841 0 : }
842 0 : ControlFlow::Continue(())
843 0 : })
844 0 : .await
845 0 : }
846 :
847 : /// Obtain the timestamp for the given lsn.
848 : ///
849 : /// If the lsn has no timestamps (e.g. no commits), returns None.
850 0 : pub(crate) async fn get_timestamp_for_lsn(
851 0 : &self,
852 0 : probe_lsn: Lsn,
853 0 : ctx: &RequestContext,
854 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
855 0 : let mut max: Option<TimestampTz> = None;
856 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
857 0 : if let Some(max_prev) = max {
858 0 : max = Some(max_prev.max(timestamp));
859 0 : } else {
860 0 : max = Some(timestamp);
861 0 : }
862 0 : ControlFlow::Continue(())
863 0 : })
864 0 : .await?;
865 :
866 0 : Ok(max)
867 0 : }
868 :
869 : /// Runs the given function on all the timestamps for a given lsn
870 : ///
871 : /// The return value is either given by the closure, or set to the `Default`
872 : /// impl's output.
873 0 : async fn map_all_timestamps<T: Default>(
874 0 : &self,
875 0 : probe_lsn: Lsn,
876 0 : ctx: &RequestContext,
877 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
878 0 : ) -> Result<T, PageReconstructError> {
879 0 : for segno in self
880 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
881 0 : .await?
882 : {
883 0 : let nblocks = self
884 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
885 0 : .await?;
886 :
887 0 : let keyspace = KeySpace::single(
888 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
889 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
890 0 : );
891 0 :
892 0 : let batches = keyspace.partition(
893 0 : self.get_shard_identity(),
894 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
895 0 : );
896 :
897 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
898 0 : self.conf,
899 0 : self.gate
900 0 : .enter()
901 0 : .map_err(|_| PageReconstructError::Cancelled)?,
902 : );
903 :
904 0 : for batch in batches.parts.into_iter().rev() {
905 0 : let blocks = self
906 0 : .get_vectored(batch, probe_lsn, io_concurrency.clone(), ctx)
907 0 : .await?;
908 :
909 0 : for (_key, clog_page) in blocks.into_iter().rev() {
910 0 : let clog_page = clog_page?;
911 :
912 0 : if clog_page.len() == BLCKSZ as usize + 8 {
913 0 : let mut timestamp_bytes = [0u8; 8];
914 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
915 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
916 0 :
917 0 : match f(timestamp) {
918 0 : ControlFlow::Break(b) => return Ok(b),
919 0 : ControlFlow::Continue(()) => (),
920 : }
921 0 : }
922 : }
923 : }
924 : }
925 0 : Ok(Default::default())
926 0 : }
927 :
928 0 : pub(crate) async fn get_slru_keyspace(
929 0 : &self,
930 0 : version: Version<'_>,
931 0 : ctx: &RequestContext,
932 0 : ) -> Result<KeySpace, PageReconstructError> {
933 0 : let mut accum = KeySpaceAccum::new();
934 :
935 0 : for kind in SlruKind::iter() {
936 0 : let mut segments: Vec<u32> = self
937 0 : .list_slru_segments(kind, version, ctx)
938 0 : .await?
939 0 : .into_iter()
940 0 : .collect();
941 0 : segments.sort_unstable();
942 :
943 0 : for seg in segments {
944 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
945 :
946 0 : accum.add_range(
947 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
948 0 : );
949 : }
950 : }
951 :
952 0 : Ok(accum.to_keyspace())
953 0 : }
954 :
955 : /// Get a list of SLRU segments
956 0 : pub(crate) async fn list_slru_segments(
957 0 : &self,
958 0 : kind: SlruKind,
959 0 : version: Version<'_>,
960 0 : ctx: &RequestContext,
961 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
962 0 : // fetch directory entry
963 0 : let key = slru_dir_to_key(kind);
964 :
965 0 : let buf = version.get(self, key, ctx).await?;
966 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
967 0 : }
968 :
969 0 : pub(crate) async fn get_relmap_file(
970 0 : &self,
971 0 : spcnode: Oid,
972 0 : dbnode: Oid,
973 0 : version: Version<'_>,
974 0 : ctx: &RequestContext,
975 0 : ) -> Result<Bytes, PageReconstructError> {
976 0 : let key = relmap_file_key(spcnode, dbnode);
977 :
978 0 : let buf = version.get(self, key, ctx).await?;
979 0 : Ok(buf)
980 0 : }
981 :
982 660 : pub(crate) async fn list_dbdirs(
983 660 : &self,
984 660 : lsn: Lsn,
985 660 : ctx: &RequestContext,
986 660 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
987 : // fetch directory entry
988 660 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
989 :
990 660 : Ok(DbDirectory::des(&buf)?.dbdirs)
991 660 : }
992 :
993 0 : pub(crate) async fn get_twophase_file(
994 0 : &self,
995 0 : xid: u64,
996 0 : lsn: Lsn,
997 0 : ctx: &RequestContext,
998 0 : ) -> Result<Bytes, PageReconstructError> {
999 0 : let key = twophase_file_key(xid);
1000 0 : let buf = self.get(key, lsn, ctx).await?;
1001 0 : Ok(buf)
1002 0 : }
1003 :
1004 664 : pub(crate) async fn list_twophase_files(
1005 664 : &self,
1006 664 : lsn: Lsn,
1007 664 : ctx: &RequestContext,
1008 664 : ) -> Result<HashSet<u64>, PageReconstructError> {
1009 : // fetch directory entry
1010 664 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
1011 :
1012 664 : if self.pg_version >= 17 {
1013 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
1014 : } else {
1015 664 : Ok(TwoPhaseDirectory::des(&buf)?
1016 : .xids
1017 664 : .iter()
1018 664 : .map(|x| u64::from(*x))
1019 664 : .collect())
1020 : }
1021 664 : }
1022 :
1023 0 : pub(crate) async fn get_control_file(
1024 0 : &self,
1025 0 : lsn: Lsn,
1026 0 : ctx: &RequestContext,
1027 0 : ) -> Result<Bytes, PageReconstructError> {
1028 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1029 0 : }
1030 :
1031 24 : pub(crate) async fn get_checkpoint(
1032 24 : &self,
1033 24 : lsn: Lsn,
1034 24 : ctx: &RequestContext,
1035 24 : ) -> Result<Bytes, PageReconstructError> {
1036 24 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1037 24 : }
1038 :
1039 24 : async fn list_aux_files_v2(
1040 24 : &self,
1041 24 : lsn: Lsn,
1042 24 : ctx: &RequestContext,
1043 24 : io_concurrency: IoConcurrency,
1044 24 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1045 24 : let kv = self
1046 24 : .scan(
1047 24 : KeySpace::single(Key::metadata_aux_key_range()),
1048 24 : lsn,
1049 24 : ctx,
1050 24 : io_concurrency,
1051 24 : )
1052 24 : .await?;
1053 24 : let mut result = HashMap::new();
1054 24 : let mut sz = 0;
1055 60 : for (_, v) in kv {
1056 36 : let v = v?;
1057 36 : let v = aux_file::decode_file_value_bytes(&v)
1058 36 : .context("value decode")
1059 36 : .map_err(PageReconstructError::Other)?;
1060 68 : for (fname, content) in v {
1061 32 : sz += fname.len();
1062 32 : sz += content.len();
1063 32 : result.insert(fname, content);
1064 32 : }
1065 : }
1066 24 : self.aux_file_size_estimator.on_initial(sz);
1067 24 : Ok(result)
1068 24 : }
1069 :
1070 0 : pub(crate) async fn trigger_aux_file_size_computation(
1071 0 : &self,
1072 0 : lsn: Lsn,
1073 0 : ctx: &RequestContext,
1074 0 : io_concurrency: IoConcurrency,
1075 0 : ) -> Result<(), PageReconstructError> {
1076 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1077 0 : Ok(())
1078 0 : }
1079 :
1080 24 : pub(crate) async fn list_aux_files(
1081 24 : &self,
1082 24 : lsn: Lsn,
1083 24 : ctx: &RequestContext,
1084 24 : io_concurrency: IoConcurrency,
1085 24 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1086 24 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1087 24 : }
1088 :
1089 0 : pub(crate) async fn get_replorigins(
1090 0 : &self,
1091 0 : lsn: Lsn,
1092 0 : ctx: &RequestContext,
1093 0 : io_concurrency: IoConcurrency,
1094 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1095 0 : let kv = self
1096 0 : .scan(
1097 0 : KeySpace::single(repl_origin_key_range()),
1098 0 : lsn,
1099 0 : ctx,
1100 0 : io_concurrency,
1101 0 : )
1102 0 : .await?;
1103 0 : let mut result = HashMap::new();
1104 0 : for (k, v) in kv {
1105 0 : let v = v?;
1106 0 : let origin_id = k.field6 as RepOriginId;
1107 0 : let origin_lsn = Lsn::des(&v).unwrap();
1108 0 : if origin_lsn != Lsn::INVALID {
1109 0 : result.insert(origin_id, origin_lsn);
1110 0 : }
1111 : }
1112 0 : Ok(result)
1113 0 : }
1114 :
1115 : /// Does the same as get_current_logical_size but counted on demand.
1116 : /// Used to initialize the logical size tracking on startup.
1117 : ///
1118 : /// Only relation blocks are counted currently. That excludes metadata,
1119 : /// SLRUs, twophase files etc.
1120 : ///
1121 : /// # Cancel-Safety
1122 : ///
1123 : /// This method is cancellation-safe.
1124 28 : pub(crate) async fn get_current_logical_size_non_incremental(
1125 28 : &self,
1126 28 : lsn: Lsn,
1127 28 : ctx: &RequestContext,
1128 28 : ) -> Result<u64, CalculateLogicalSizeError> {
1129 28 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1130 28 :
1131 28 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1132 :
1133 : // Fetch list of database dirs and iterate them
1134 28 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1135 28 : let dbdir = DbDirectory::des(&buf)?;
1136 :
1137 28 : let mut total_size: u64 = 0;
1138 28 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1139 0 : for rel in self
1140 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
1141 0 : .await?
1142 : {
1143 0 : if self.cancel.is_cancelled() {
1144 0 : return Err(CalculateLogicalSizeError::Cancelled);
1145 0 : }
1146 0 : let relsize_key = rel_size_to_key(rel);
1147 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1148 0 : let relsize = buf.get_u32_le();
1149 0 :
1150 0 : total_size += relsize as u64;
1151 : }
1152 : }
1153 28 : Ok(total_size * BLCKSZ as u64)
1154 28 : }
1155 :
1156 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1157 : /// for gc-compaction.
1158 : ///
1159 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1160 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1161 : /// be kept only for a specific range of LSN.
1162 : ///
1163 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1164 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1165 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1166 : /// determine which keys to retain/drop for gc-compaction.
1167 : ///
1168 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1169 : /// to be retained for each of the branch LSN.
1170 : ///
1171 : /// The return value is (dense keyspace, sparse keyspace).
1172 108 : pub(crate) async fn collect_gc_compaction_keyspace(
1173 108 : &self,
1174 108 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1175 108 : let metadata_key_begin = Key::metadata_key_range().start;
1176 108 : let aux_v1_key = AUX_FILES_KEY;
1177 108 : let dense_keyspace = KeySpace {
1178 108 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1179 108 : };
1180 108 : Ok((
1181 108 : dense_keyspace,
1182 108 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1183 108 : ))
1184 108 : }
1185 :
1186 : ///
1187 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1188 : /// Anything that's not listed maybe removed from the underlying storage (from
1189 : /// that LSN forwards).
1190 : ///
1191 : /// The return value is (dense keyspace, sparse keyspace).
1192 660 : pub(crate) async fn collect_keyspace(
1193 660 : &self,
1194 660 : lsn: Lsn,
1195 660 : ctx: &RequestContext,
1196 660 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1197 660 : // Iterate through key ranges, greedily packing them into partitions
1198 660 : let mut result = KeySpaceAccum::new();
1199 660 :
1200 660 : // The dbdir metadata always exists
1201 660 : result.add_key(DBDIR_KEY);
1202 :
1203 : // Fetch list of database dirs and iterate them
1204 660 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1205 660 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1206 660 :
1207 660 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1208 660 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1209 0 : if has_relmap_file {
1210 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1211 0 : }
1212 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1213 :
1214 0 : let mut rels: Vec<RelTag> = self
1215 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
1216 0 : .await?
1217 0 : .into_iter()
1218 0 : .collect();
1219 0 : rels.sort_unstable();
1220 0 : for rel in rels {
1221 0 : let relsize_key = rel_size_to_key(rel);
1222 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1223 0 : let relsize = buf.get_u32_le();
1224 0 :
1225 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1226 0 : result.add_key(relsize_key);
1227 : }
1228 : }
1229 :
1230 : // Iterate SLRUs next
1231 660 : if self.tenant_shard_id.is_shard_zero() {
1232 1944 : for kind in [
1233 648 : SlruKind::Clog,
1234 648 : SlruKind::MultiXactMembers,
1235 648 : SlruKind::MultiXactOffsets,
1236 : ] {
1237 1944 : let slrudir_key = slru_dir_to_key(kind);
1238 1944 : result.add_key(slrudir_key);
1239 1944 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1240 1944 : let dir = SlruSegmentDirectory::des(&buf)?;
1241 1944 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1242 1944 : segments.sort_unstable();
1243 1944 : for segno in segments {
1244 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1245 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1246 0 : let segsize = buf.get_u32_le();
1247 0 :
1248 0 : result.add_range(
1249 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1250 0 : );
1251 0 : result.add_key(segsize_key);
1252 : }
1253 : }
1254 12 : }
1255 :
1256 : // Then pg_twophase
1257 660 : result.add_key(TWOPHASEDIR_KEY);
1258 :
1259 660 : let mut xids: Vec<u64> = self
1260 660 : .list_twophase_files(lsn, ctx)
1261 660 : .await?
1262 660 : .iter()
1263 660 : .cloned()
1264 660 : .collect();
1265 660 : xids.sort_unstable();
1266 660 : for xid in xids {
1267 0 : result.add_key(twophase_file_key(xid));
1268 0 : }
1269 :
1270 660 : result.add_key(CONTROLFILE_KEY);
1271 660 : result.add_key(CHECKPOINT_KEY);
1272 660 :
1273 660 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1274 660 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1275 660 : // and the keys will not be garbage-colllected.
1276 660 : #[cfg(test)]
1277 660 : {
1278 660 : let guard = self.extra_test_dense_keyspace.load();
1279 660 : for kr in &guard.ranges {
1280 0 : result.add_range(kr.clone());
1281 0 : }
1282 0 : }
1283 0 :
1284 660 : let dense_keyspace = result.to_keyspace();
1285 660 : let sparse_keyspace = SparseKeySpace(KeySpace {
1286 660 : ranges: vec![
1287 660 : Key::metadata_aux_key_range(),
1288 660 : repl_origin_key_range(),
1289 660 : Key::rel_dir_sparse_key_range(),
1290 660 : ],
1291 660 : });
1292 660 :
1293 660 : if cfg!(debug_assertions) {
1294 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1295 :
1296 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1297 : // category of sparse keys are split into their own image/delta files. If there
1298 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1299 : // and we want the developer to keep the keyspaces separated.
1300 :
1301 660 : let ranges = &sparse_keyspace.0.ranges;
1302 :
1303 : // TODO: use a single overlaps_with across the codebase
1304 1980 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1305 1980 : !(a.end <= b.start || b.end <= a.start)
1306 1980 : }
1307 1980 : for i in 0..ranges.len() {
1308 1980 : for j in 0..i {
1309 1980 : if overlaps_with(&ranges[i], &ranges[j]) {
1310 0 : panic!(
1311 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1312 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1313 0 : );
1314 1980 : }
1315 : }
1316 : }
1317 1320 : for i in 1..ranges.len() {
1318 1320 : assert!(
1319 1320 : ranges[i - 1].end <= ranges[i].start,
1320 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1321 0 : ranges[i - 1].start,
1322 0 : ranges[i - 1].end,
1323 0 : ranges[i].start,
1324 0 : ranges[i].end
1325 : );
1326 : }
1327 0 : }
1328 :
1329 660 : Ok((dense_keyspace, sparse_keyspace))
1330 660 : }
1331 :
1332 : /// Get cached size of relation if it not updated after specified LSN
1333 897080 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
1334 897080 : let rel_size_cache = self.rel_size_cache.read().unwrap();
1335 897080 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
1336 897036 : if lsn >= *cached_lsn {
1337 886744 : RELSIZE_CACHE_HITS.inc();
1338 886744 : return Some(*nblocks);
1339 10292 : }
1340 10292 : RELSIZE_CACHE_MISSES_OLD.inc();
1341 44 : }
1342 10336 : RELSIZE_CACHE_MISSES.inc();
1343 10336 : None
1344 897080 : }
1345 :
1346 : /// Update cached relation size if there is no more recent update
1347 10272 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1348 10272 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1349 10272 :
1350 10272 : if lsn < rel_size_cache.complete_as_of {
1351 : // Do not cache old values. It's safe to cache the size on read, as long as
1352 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1353 : // never evict values from the cache, so if the relation size changed after
1354 : // 'lsn', the new value is already in the cache.
1355 0 : return;
1356 10272 : }
1357 10272 :
1358 10272 : match rel_size_cache.map.entry(tag) {
1359 10272 : hash_map::Entry::Occupied(mut entry) => {
1360 10272 : let cached_lsn = entry.get_mut();
1361 10272 : if lsn >= cached_lsn.0 {
1362 0 : *cached_lsn = (lsn, nblocks);
1363 10272 : }
1364 : }
1365 0 : hash_map::Entry::Vacant(entry) => {
1366 0 : entry.insert((lsn, nblocks));
1367 0 : RELSIZE_CACHE_ENTRIES.inc();
1368 0 : }
1369 : }
1370 10272 : }
1371 :
1372 : /// Store cached relation size
1373 565440 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1374 565440 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1375 565440 : if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
1376 3840 : RELSIZE_CACHE_ENTRIES.inc();
1377 561600 : }
1378 565440 : }
1379 :
1380 : /// Remove cached relation size
1381 4 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1382 4 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1383 4 : if rel_size_cache.map.remove(tag).is_some() {
1384 4 : RELSIZE_CACHE_ENTRIES.dec();
1385 4 : }
1386 4 : }
1387 : }
1388 :
1389 : /// DatadirModification represents an operation to ingest an atomic set of
1390 : /// updates to the repository.
1391 : ///
1392 : /// It is created by the 'begin_record' function. It is called for each WAL
1393 : /// record, so that all the modifications by a one WAL record appear atomic.
1394 : pub struct DatadirModification<'a> {
1395 : /// The timeline this modification applies to. You can access this to
1396 : /// read the state, but note that any pending updates are *not* reflected
1397 : /// in the state in 'tline' yet.
1398 : pub tline: &'a Timeline,
1399 :
1400 : /// Current LSN of the modification
1401 : lsn: Lsn,
1402 :
1403 : // The modifications are not applied directly to the underlying key-value store.
1404 : // The put-functions add the modifications here, and they are flushed to the
1405 : // underlying key-value store by the 'finish' function.
1406 : pending_lsns: Vec<Lsn>,
1407 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1408 : pending_nblocks: i64,
1409 :
1410 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1411 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1412 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1413 :
1414 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1415 : /// which keys are stored here.
1416 : pending_data_batch: Option<SerializedValueBatch>,
1417 :
1418 : /// For special "directory" keys that store key-value maps, track the size of the map
1419 : /// if it was updated in this modification.
1420 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1421 :
1422 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1423 : pending_metadata_bytes: usize,
1424 : }
1425 :
1426 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1427 : pub enum MetricsUpdate {
1428 : /// Set the metrics to this value
1429 : Set(u64),
1430 : /// Increment the metrics by this value
1431 : Add(u64),
1432 : /// Decrement the metrics by this value
1433 : Sub(u64),
1434 : }
1435 :
1436 : impl DatadirModification<'_> {
1437 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1438 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1439 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1440 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1441 :
1442 : /// Get the current lsn
1443 836116 : pub(crate) fn get_lsn(&self) -> Lsn {
1444 836116 : self.lsn
1445 836116 : }
1446 :
1447 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1448 0 : self.pending_data_batch
1449 0 : .as_ref()
1450 0 : .map_or(0, |b| b.buffer_size())
1451 0 : + self.pending_metadata_bytes
1452 0 : }
1453 :
1454 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1455 0 : self.pending_data_batch
1456 0 : .as_ref()
1457 0 : .is_some_and(|b| b.has_data())
1458 0 : }
1459 :
1460 : /// Returns statistics about the currently pending modifications.
1461 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1462 0 : let mut stats = DatadirModificationStats::default();
1463 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1464 0 : match value {
1465 0 : Value::Image(_) => stats.metadata_images += 1,
1466 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1467 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1468 : }
1469 : }
1470 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1471 0 : match valuemeta {
1472 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1473 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1474 0 : ValueMeta::Observed(_) => {}
1475 : }
1476 : }
1477 0 : stats
1478 0 : }
1479 :
1480 : /// Set the current lsn
1481 291716 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
1482 291716 : ensure!(
1483 291716 : lsn >= self.lsn,
1484 0 : "setting an older lsn {} than {} is not allowed",
1485 : lsn,
1486 : self.lsn
1487 : );
1488 :
1489 291716 : if lsn > self.lsn {
1490 291716 : self.pending_lsns.push(self.lsn);
1491 291716 : self.lsn = lsn;
1492 291716 : }
1493 291716 : Ok(())
1494 291716 : }
1495 :
1496 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1497 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1498 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1499 : ///
1500 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1501 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1502 : /// via [`Self::get`] as soon as their record has been ingested.
1503 1701360 : fn is_data_key(key: &Key) -> bool {
1504 1701360 : key.is_rel_block_key() || key.is_slru_block_key()
1505 1701360 : }
1506 :
1507 : /// Initialize a completely new repository.
1508 : ///
1509 : /// This inserts the directory metadata entries that are assumed to
1510 : /// always exist.
1511 432 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1512 432 : let buf = DbDirectory::ser(&DbDirectory {
1513 432 : dbdirs: HashMap::new(),
1514 432 : })?;
1515 432 : self.pending_directory_entries
1516 432 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1517 432 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1518 :
1519 432 : let buf = if self.tline.pg_version >= 17 {
1520 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1521 0 : xids: HashSet::new(),
1522 0 : })
1523 : } else {
1524 432 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1525 432 : xids: HashSet::new(),
1526 432 : })
1527 0 : }?;
1528 432 : self.pending_directory_entries
1529 432 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1530 432 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1531 :
1532 432 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1533 432 : let empty_dir = Value::Image(buf);
1534 432 :
1535 432 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1536 432 : // harmless but they'd just be dropped on later compaction.
1537 432 : if self.tline.tenant_shard_id.is_shard_zero() {
1538 420 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1539 420 : self.pending_directory_entries.push((
1540 420 : DirectoryKind::SlruSegment(SlruKind::Clog),
1541 420 : MetricsUpdate::Set(0),
1542 420 : ));
1543 420 : self.put(
1544 420 : slru_dir_to_key(SlruKind::MultiXactMembers),
1545 420 : empty_dir.clone(),
1546 420 : );
1547 420 : self.pending_directory_entries.push((
1548 420 : DirectoryKind::SlruSegment(SlruKind::Clog),
1549 420 : MetricsUpdate::Set(0),
1550 420 : ));
1551 420 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1552 420 : self.pending_directory_entries.push((
1553 420 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1554 420 : MetricsUpdate::Set(0),
1555 420 : ));
1556 420 : }
1557 :
1558 432 : Ok(())
1559 432 : }
1560 :
1561 : #[cfg(test)]
1562 428 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1563 428 : self.init_empty()?;
1564 428 : self.put_control_file(bytes::Bytes::from_static(
1565 428 : b"control_file contents do not matter",
1566 428 : ))
1567 428 : .context("put_control_file")?;
1568 428 : self.put_checkpoint(bytes::Bytes::from_static(
1569 428 : b"checkpoint_file contents do not matter",
1570 428 : ))
1571 428 : .context("put_checkpoint_file")?;
1572 428 : Ok(())
1573 428 : }
1574 :
1575 : /// Creates a relation if it is not already present.
1576 : /// Returns the current size of the relation
1577 836112 : pub(crate) async fn create_relation_if_required(
1578 836112 : &mut self,
1579 836112 : rel: RelTag,
1580 836112 : ctx: &RequestContext,
1581 836112 : ) -> Result<u32, PageReconstructError> {
1582 : // Get current size and put rel creation if rel doesn't exist
1583 : //
1584 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1585 : // check the cache too. This is because eagerly checking the cache results in
1586 : // less work overall and 10% better performance. It's more work on cache miss
1587 : // but cache miss is rare.
1588 836112 : if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
1589 836092 : Ok(nblocks)
1590 20 : } else if !self
1591 20 : .tline
1592 20 : .get_rel_exists(rel, Version::Modified(self), ctx)
1593 20 : .await?
1594 : {
1595 : // create it with 0 size initially, the logic below will extend it
1596 20 : self.put_rel_creation(rel, 0, ctx)
1597 20 : .await
1598 20 : .context("Relation Error")?;
1599 20 : Ok(0)
1600 : } else {
1601 0 : self.tline
1602 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1603 0 : .await
1604 : }
1605 836112 : }
1606 :
1607 : /// Given a block number for a relation (which represents a newly written block),
1608 : /// the previous block count of the relation, and the shard info, find the gaps
1609 : /// that were created by the newly written block if any.
1610 291340 : fn find_gaps(
1611 291340 : rel: RelTag,
1612 291340 : blkno: u32,
1613 291340 : previous_nblocks: u32,
1614 291340 : shard: &ShardIdentity,
1615 291340 : ) -> Option<KeySpace> {
1616 291340 : let mut key = rel_block_to_key(rel, blkno);
1617 291340 : let mut gap_accum = None;
1618 :
1619 291340 : for gap_blkno in previous_nblocks..blkno {
1620 64 : key.field6 = gap_blkno;
1621 64 :
1622 64 : if shard.get_shard_number(&key) != shard.number {
1623 16 : continue;
1624 48 : }
1625 48 :
1626 48 : gap_accum
1627 48 : .get_or_insert_with(KeySpaceAccum::new)
1628 48 : .add_key(key);
1629 : }
1630 :
1631 291340 : gap_accum.map(|accum| accum.to_keyspace())
1632 291340 : }
1633 :
1634 291704 : pub async fn ingest_batch(
1635 291704 : &mut self,
1636 291704 : mut batch: SerializedValueBatch,
1637 291704 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1638 291704 : shard: &ShardIdentity,
1639 291704 : ctx: &RequestContext,
1640 291704 : ) -> anyhow::Result<()> {
1641 291704 : let mut gaps_at_lsns = Vec::default();
1642 :
1643 291704 : for meta in batch.metadata.iter() {
1644 291284 : let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
1645 291284 : let new_nblocks = blkno + 1;
1646 :
1647 291284 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1648 291284 : if new_nblocks > old_nblocks {
1649 4780 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1650 286504 : }
1651 :
1652 291284 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1653 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1654 291284 : }
1655 : }
1656 :
1657 291704 : if !gaps_at_lsns.is_empty() {
1658 0 : batch.zero_gaps(gaps_at_lsns);
1659 291704 : }
1660 :
1661 291704 : match self.pending_data_batch.as_mut() {
1662 40 : Some(pending_batch) => {
1663 40 : pending_batch.extend(batch);
1664 40 : }
1665 291664 : None if batch.has_data() => {
1666 291260 : self.pending_data_batch = Some(batch);
1667 291260 : }
1668 404 : None => {
1669 404 : // Nothing to initialize the batch with
1670 404 : }
1671 : }
1672 :
1673 291704 : Ok(())
1674 291704 : }
1675 :
1676 : /// Put a new page version that can be constructed from a WAL record
1677 : ///
1678 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1679 : /// current end-of-file. It's up to the caller to check that the relation size
1680 : /// matches the blocks inserted!
1681 24 : pub fn put_rel_wal_record(
1682 24 : &mut self,
1683 24 : rel: RelTag,
1684 24 : blknum: BlockNumber,
1685 24 : rec: NeonWalRecord,
1686 24 : ) -> anyhow::Result<()> {
1687 24 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1688 24 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1689 24 : Ok(())
1690 24 : }
1691 :
1692 : // Same, but for an SLRU.
1693 16 : pub fn put_slru_wal_record(
1694 16 : &mut self,
1695 16 : kind: SlruKind,
1696 16 : segno: u32,
1697 16 : blknum: BlockNumber,
1698 16 : rec: NeonWalRecord,
1699 16 : ) -> anyhow::Result<()> {
1700 16 : if !self.tline.tenant_shard_id.is_shard_zero() {
1701 0 : return Ok(());
1702 16 : }
1703 16 :
1704 16 : self.put(
1705 16 : slru_block_to_key(kind, segno, blknum),
1706 16 : Value::WalRecord(rec),
1707 16 : );
1708 16 : Ok(())
1709 16 : }
1710 :
1711 : /// Like put_wal_record, but with ready-made image of the page.
1712 555684 : pub fn put_rel_page_image(
1713 555684 : &mut self,
1714 555684 : rel: RelTag,
1715 555684 : blknum: BlockNumber,
1716 555684 : img: Bytes,
1717 555684 : ) -> anyhow::Result<()> {
1718 555684 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1719 555684 : let key = rel_block_to_key(rel, blknum);
1720 555684 : if !key.is_valid_key_on_write_path() {
1721 0 : anyhow::bail!(
1722 0 : "the request contains data not supported by pageserver at {}",
1723 0 : key
1724 0 : );
1725 555684 : }
1726 555684 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1727 555684 : Ok(())
1728 555684 : }
1729 :
1730 12 : pub fn put_slru_page_image(
1731 12 : &mut self,
1732 12 : kind: SlruKind,
1733 12 : segno: u32,
1734 12 : blknum: BlockNumber,
1735 12 : img: Bytes,
1736 12 : ) -> anyhow::Result<()> {
1737 12 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1738 :
1739 12 : let key = slru_block_to_key(kind, segno, blknum);
1740 12 : if !key.is_valid_key_on_write_path() {
1741 0 : anyhow::bail!(
1742 0 : "the request contains data not supported by pageserver at {}",
1743 0 : key
1744 0 : );
1745 12 : }
1746 12 : self.put(key, Value::Image(img));
1747 12 : Ok(())
1748 12 : }
1749 :
1750 5996 : pub(crate) fn put_rel_page_image_zero(
1751 5996 : &mut self,
1752 5996 : rel: RelTag,
1753 5996 : blknum: BlockNumber,
1754 5996 : ) -> anyhow::Result<()> {
1755 5996 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1756 5996 : let key = rel_block_to_key(rel, blknum);
1757 5996 : if !key.is_valid_key_on_write_path() {
1758 0 : anyhow::bail!(
1759 0 : "the request contains data not supported by pageserver: {} @ {}",
1760 0 : key,
1761 0 : self.lsn
1762 0 : );
1763 5996 : }
1764 5996 :
1765 5996 : let batch = self
1766 5996 : .pending_data_batch
1767 5996 : .get_or_insert_with(SerializedValueBatch::default);
1768 5996 :
1769 5996 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1770 5996 :
1771 5996 : Ok(())
1772 5996 : }
1773 :
1774 0 : pub(crate) fn put_slru_page_image_zero(
1775 0 : &mut self,
1776 0 : kind: SlruKind,
1777 0 : segno: u32,
1778 0 : blknum: BlockNumber,
1779 0 : ) -> anyhow::Result<()> {
1780 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1781 0 : let key = slru_block_to_key(kind, segno, blknum);
1782 0 : if !key.is_valid_key_on_write_path() {
1783 0 : anyhow::bail!(
1784 0 : "the request contains data not supported by pageserver: {} @ {}",
1785 0 : key,
1786 0 : self.lsn
1787 0 : );
1788 0 : }
1789 0 :
1790 0 : let batch = self
1791 0 : .pending_data_batch
1792 0 : .get_or_insert_with(SerializedValueBatch::default);
1793 0 :
1794 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1795 0 :
1796 0 : Ok(())
1797 0 : }
1798 :
1799 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
1800 : /// we enable it, we also need to persist it in `index_part.json`.
1801 3892 : pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
1802 3892 : let status = self.tline.get_rel_size_v2_status();
1803 3892 : let config = self.tline.get_rel_size_v2_enabled();
1804 3892 : match (config, status) {
1805 : (false, RelSizeMigration::Legacy) => {
1806 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
1807 3892 : Ok(false)
1808 : }
1809 : (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1810 : // index_part already persisted that the timeline has enabled rel_size_v2
1811 0 : Ok(true)
1812 : }
1813 : (true, RelSizeMigration::Legacy) => {
1814 : // The first time we enable it, we need to persist it in `index_part.json`
1815 0 : self.tline
1816 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
1817 0 : tracing::info!("enabled rel_size_v2");
1818 0 : Ok(true)
1819 : }
1820 : (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1821 : // index_part already persisted that the timeline has enabled rel_size_v2
1822 : // and we don't need to do anything
1823 0 : Ok(true)
1824 : }
1825 : }
1826 3892 : }
1827 :
1828 : /// Store a relmapper file (pg_filenode.map) in the repository
1829 32 : pub async fn put_relmap_file(
1830 32 : &mut self,
1831 32 : spcnode: Oid,
1832 32 : dbnode: Oid,
1833 32 : img: Bytes,
1834 32 : ctx: &RequestContext,
1835 32 : ) -> anyhow::Result<()> {
1836 32 : let v2_enabled = self.maybe_enable_rel_size_v2()?;
1837 :
1838 : // Add it to the directory (if it doesn't exist already)
1839 32 : let buf = self.get(DBDIR_KEY, ctx).await?;
1840 32 : let mut dbdir = DbDirectory::des(&buf)?;
1841 :
1842 32 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1843 32 : if r.is_none() || r == Some(false) {
1844 : // The dbdir entry didn't exist, or it contained a
1845 : // 'false'. The 'insert' call already updated it with
1846 : // 'true', now write the updated 'dbdirs' map back.
1847 32 : let buf = DbDirectory::ser(&dbdir)?;
1848 32 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1849 0 : }
1850 32 : if r.is_none() {
1851 : // Create RelDirectory
1852 : // TODO: if we have fully migrated to v2, no need to create this directory
1853 16 : let buf = RelDirectory::ser(&RelDirectory {
1854 16 : rels: HashSet::new(),
1855 16 : })?;
1856 16 : self.pending_directory_entries
1857 16 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1858 16 : if v2_enabled {
1859 0 : self.pending_directory_entries
1860 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1861 16 : }
1862 16 : self.put(
1863 16 : rel_dir_to_key(spcnode, dbnode),
1864 16 : Value::Image(Bytes::from(buf)),
1865 16 : );
1866 16 : }
1867 :
1868 32 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1869 32 : Ok(())
1870 32 : }
1871 :
1872 0 : pub async fn put_twophase_file(
1873 0 : &mut self,
1874 0 : xid: u64,
1875 0 : img: Bytes,
1876 0 : ctx: &RequestContext,
1877 0 : ) -> anyhow::Result<()> {
1878 : // Add it to the directory entry
1879 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1880 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1881 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1882 0 : if !dir.xids.insert(xid) {
1883 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1884 0 : }
1885 0 : self.pending_directory_entries.push((
1886 0 : DirectoryKind::TwoPhase,
1887 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1888 0 : ));
1889 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1890 : } else {
1891 0 : let xid = xid as u32;
1892 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1893 0 : if !dir.xids.insert(xid) {
1894 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
1895 0 : }
1896 0 : self.pending_directory_entries.push((
1897 0 : DirectoryKind::TwoPhase,
1898 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1899 0 : ));
1900 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1901 : };
1902 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1903 0 :
1904 0 : self.put(twophase_file_key(xid), Value::Image(img));
1905 0 : Ok(())
1906 0 : }
1907 :
1908 0 : pub async fn set_replorigin(
1909 0 : &mut self,
1910 0 : origin_id: RepOriginId,
1911 0 : origin_lsn: Lsn,
1912 0 : ) -> anyhow::Result<()> {
1913 0 : let key = repl_origin_key(origin_id);
1914 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1915 0 : Ok(())
1916 0 : }
1917 :
1918 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
1919 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1920 0 : }
1921 :
1922 432 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
1923 432 : self.put(CONTROLFILE_KEY, Value::Image(img));
1924 432 : Ok(())
1925 432 : }
1926 :
1927 460 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
1928 460 : self.put(CHECKPOINT_KEY, Value::Image(img));
1929 460 : Ok(())
1930 460 : }
1931 :
1932 0 : pub async fn drop_dbdir(
1933 0 : &mut self,
1934 0 : spcnode: Oid,
1935 0 : dbnode: Oid,
1936 0 : ctx: &RequestContext,
1937 0 : ) -> anyhow::Result<()> {
1938 0 : let total_blocks = self
1939 0 : .tline
1940 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1941 0 : .await?;
1942 :
1943 : // Remove entry from dbdir
1944 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1945 0 : let mut dir = DbDirectory::des(&buf)?;
1946 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1947 0 : let buf = DbDirectory::ser(&dir)?;
1948 0 : self.pending_directory_entries.push((
1949 0 : DirectoryKind::Db,
1950 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
1951 0 : ));
1952 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1953 : } else {
1954 0 : warn!(
1955 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1956 : spcnode, dbnode
1957 : );
1958 : }
1959 :
1960 : // Update logical database size.
1961 0 : self.pending_nblocks -= total_blocks as i64;
1962 0 :
1963 0 : // Delete all relations and metadata files for the spcnode/dnode
1964 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1965 0 : Ok(())
1966 0 : }
1967 :
1968 : /// Create a relation fork.
1969 : ///
1970 : /// 'nblocks' is the initial size.
1971 3840 : pub async fn put_rel_creation(
1972 3840 : &mut self,
1973 3840 : rel: RelTag,
1974 3840 : nblocks: BlockNumber,
1975 3840 : ctx: &RequestContext,
1976 3840 : ) -> Result<(), RelationError> {
1977 3840 : if rel.relnode == 0 {
1978 0 : return Err(RelationError::InvalidRelnode);
1979 3840 : }
1980 : // It's possible that this is the first rel for this db in this
1981 : // tablespace. Create the reldir entry for it if so.
1982 3840 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
1983 3840 : .context("deserialize db")?;
1984 :
1985 3840 : let dbdir_exists =
1986 3840 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1987 : // Didn't exist. Update dbdir
1988 16 : e.insert(false);
1989 16 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
1990 16 : self.pending_directory_entries.push((
1991 16 : DirectoryKind::Db,
1992 16 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
1993 16 : ));
1994 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1995 16 : false
1996 : } else {
1997 3824 : true
1998 : };
1999 :
2000 3840 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
2001 3840 : let mut rel_dir = if !dbdir_exists {
2002 : // Create the RelDirectory
2003 16 : RelDirectory::default()
2004 : } else {
2005 : // reldir already exists, fetch it
2006 3824 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
2007 3824 : .context("deserialize db")?
2008 : };
2009 :
2010 3840 : let v2_enabled = self.maybe_enable_rel_size_v2()?;
2011 :
2012 3840 : if v2_enabled {
2013 0 : if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
2014 0 : return Err(RelationError::AlreadyExists);
2015 0 : }
2016 0 : let sparse_rel_dir_key =
2017 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2018 : // check if the rel_dir_key exists in v2
2019 0 : let val = self
2020 0 : .sparse_get(sparse_rel_dir_key, ctx)
2021 0 : .await
2022 0 : .map_err(|e| RelationError::Other(e.into()))?;
2023 0 : let val = RelDirExists::decode_option(val)
2024 0 : .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
2025 0 : if val == RelDirExists::Exists {
2026 0 : return Err(RelationError::AlreadyExists);
2027 0 : }
2028 0 : self.put(
2029 0 : sparse_rel_dir_key,
2030 0 : Value::Image(RelDirExists::Exists.encode()),
2031 0 : );
2032 0 : if !dbdir_exists {
2033 0 : self.pending_directory_entries
2034 0 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2035 0 : self.pending_directory_entries
2036 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2037 0 : // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
2038 0 : // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
2039 0 : // will be key not found errors if we don't create an empty one for rel_size_v2.
2040 0 : self.put(
2041 0 : rel_dir_key,
2042 0 : Value::Image(Bytes::from(
2043 0 : RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
2044 : )),
2045 : );
2046 0 : }
2047 0 : self.pending_directory_entries
2048 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2049 : } else {
2050 : // Add the new relation to the rel directory entry, and write it back
2051 3840 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2052 0 : return Err(RelationError::AlreadyExists);
2053 3840 : }
2054 3840 : if !dbdir_exists {
2055 16 : self.pending_directory_entries
2056 16 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2057 3824 : }
2058 3840 : self.pending_directory_entries
2059 3840 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2060 3840 : self.put(
2061 3840 : rel_dir_key,
2062 3840 : Value::Image(Bytes::from(
2063 3840 : RelDirectory::ser(&rel_dir).context("serialize")?,
2064 : )),
2065 : );
2066 : }
2067 :
2068 : // Put size
2069 3840 : let size_key = rel_size_to_key(rel);
2070 3840 : let buf = nblocks.to_le_bytes();
2071 3840 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2072 3840 :
2073 3840 : self.pending_nblocks += nblocks as i64;
2074 3840 :
2075 3840 : // Update relation size cache
2076 3840 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2077 3840 :
2078 3840 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2079 3840 : // caller.
2080 3840 : Ok(())
2081 3840 : }
2082 :
2083 : /// Truncate relation
2084 12024 : pub async fn put_rel_truncation(
2085 12024 : &mut self,
2086 12024 : rel: RelTag,
2087 12024 : nblocks: BlockNumber,
2088 12024 : ctx: &RequestContext,
2089 12024 : ) -> anyhow::Result<()> {
2090 12024 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
2091 12024 : if self
2092 12024 : .tline
2093 12024 : .get_rel_exists(rel, Version::Modified(self), ctx)
2094 12024 : .await?
2095 : {
2096 12024 : let size_key = rel_size_to_key(rel);
2097 : // Fetch the old size first
2098 12024 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2099 12024 :
2100 12024 : // Update the entry with the new size.
2101 12024 : let buf = nblocks.to_le_bytes();
2102 12024 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2103 12024 :
2104 12024 : // Update relation size cache
2105 12024 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2106 12024 :
2107 12024 : // Update logical database size.
2108 12024 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2109 0 : }
2110 12024 : Ok(())
2111 12024 : }
2112 :
2113 : /// Extend relation
2114 : /// If new size is smaller, do nothing.
2115 553360 : pub async fn put_rel_extend(
2116 553360 : &mut self,
2117 553360 : rel: RelTag,
2118 553360 : nblocks: BlockNumber,
2119 553360 : ctx: &RequestContext,
2120 553360 : ) -> anyhow::Result<()> {
2121 553360 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
2122 :
2123 : // Put size
2124 553360 : let size_key = rel_size_to_key(rel);
2125 553360 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2126 553360 :
2127 553360 : // only extend relation here. never decrease the size
2128 553360 : if nblocks > old_size {
2129 549576 : let buf = nblocks.to_le_bytes();
2130 549576 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2131 549576 :
2132 549576 : // Update relation size cache
2133 549576 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2134 549576 :
2135 549576 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2136 549576 : }
2137 553360 : Ok(())
2138 553360 : }
2139 :
2140 : /// Drop some relations
2141 20 : pub(crate) async fn put_rel_drops(
2142 20 : &mut self,
2143 20 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2144 20 : ctx: &RequestContext,
2145 20 : ) -> anyhow::Result<()> {
2146 20 : let v2_enabled = self.maybe_enable_rel_size_v2()?;
2147 24 : for ((spc_node, db_node), rel_tags) in drop_relations {
2148 4 : let dir_key = rel_dir_to_key(spc_node, db_node);
2149 4 : let buf = self.get(dir_key, ctx).await?;
2150 4 : let mut dir = RelDirectory::des(&buf)?;
2151 :
2152 4 : let mut dirty = false;
2153 8 : for rel_tag in rel_tags {
2154 4 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2155 4 : self.pending_directory_entries
2156 4 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2157 4 : dirty = true;
2158 4 : true
2159 0 : } else if v2_enabled {
2160 : // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
2161 : // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
2162 : // logic).
2163 0 : let key =
2164 0 : rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2165 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2166 0 : .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
2167 0 : if val == RelDirExists::Exists {
2168 0 : self.pending_directory_entries
2169 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2170 0 : // put tombstone
2171 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2172 0 : // no need to set dirty to true
2173 0 : true
2174 : } else {
2175 0 : false
2176 : }
2177 : } else {
2178 0 : false
2179 : };
2180 :
2181 4 : if found {
2182 : // update logical size
2183 4 : let size_key = rel_size_to_key(rel_tag);
2184 4 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2185 4 : self.pending_nblocks -= old_size as i64;
2186 4 :
2187 4 : // Remove entry from relation size cache
2188 4 : self.tline.remove_cached_rel_size(&rel_tag);
2189 4 :
2190 4 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2191 4 : self.delete(rel_key_range(rel_tag));
2192 0 : }
2193 : }
2194 :
2195 4 : if dirty {
2196 4 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2197 0 : }
2198 : }
2199 :
2200 20 : Ok(())
2201 20 : }
2202 :
2203 12 : pub async fn put_slru_segment_creation(
2204 12 : &mut self,
2205 12 : kind: SlruKind,
2206 12 : segno: u32,
2207 12 : nblocks: BlockNumber,
2208 12 : ctx: &RequestContext,
2209 12 : ) -> anyhow::Result<()> {
2210 12 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2211 :
2212 : // Add it to the directory entry
2213 12 : let dir_key = slru_dir_to_key(kind);
2214 12 : let buf = self.get(dir_key, ctx).await?;
2215 12 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2216 :
2217 12 : if !dir.segments.insert(segno) {
2218 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
2219 12 : }
2220 12 : self.pending_directory_entries.push((
2221 12 : DirectoryKind::SlruSegment(kind),
2222 12 : MetricsUpdate::Set(dir.segments.len() as u64),
2223 12 : ));
2224 12 : self.put(
2225 12 : dir_key,
2226 12 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2227 : );
2228 :
2229 : // Put size
2230 12 : let size_key = slru_segment_size_to_key(kind, segno);
2231 12 : let buf = nblocks.to_le_bytes();
2232 12 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2233 12 :
2234 12 : // even if nblocks > 0, we don't insert any actual blocks here
2235 12 :
2236 12 : Ok(())
2237 12 : }
2238 :
2239 : /// Extend SLRU segment
2240 0 : pub fn put_slru_extend(
2241 0 : &mut self,
2242 0 : kind: SlruKind,
2243 0 : segno: u32,
2244 0 : nblocks: BlockNumber,
2245 0 : ) -> anyhow::Result<()> {
2246 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2247 :
2248 : // Put size
2249 0 : let size_key = slru_segment_size_to_key(kind, segno);
2250 0 : let buf = nblocks.to_le_bytes();
2251 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2252 0 : Ok(())
2253 0 : }
2254 :
2255 : /// This method is used for marking truncated SLRU files
2256 0 : pub async fn drop_slru_segment(
2257 0 : &mut self,
2258 0 : kind: SlruKind,
2259 0 : segno: u32,
2260 0 : ctx: &RequestContext,
2261 0 : ) -> anyhow::Result<()> {
2262 0 : // Remove it from the directory entry
2263 0 : let dir_key = slru_dir_to_key(kind);
2264 0 : let buf = self.get(dir_key, ctx).await?;
2265 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2266 :
2267 0 : if !dir.segments.remove(&segno) {
2268 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2269 0 : }
2270 0 : self.pending_directory_entries.push((
2271 0 : DirectoryKind::SlruSegment(kind),
2272 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2273 0 : ));
2274 0 : self.put(
2275 0 : dir_key,
2276 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2277 : );
2278 :
2279 : // Delete size entry, as well as all blocks
2280 0 : self.delete(slru_segment_key_range(kind, segno));
2281 0 :
2282 0 : Ok(())
2283 0 : }
2284 :
2285 : /// Drop a relmapper file (pg_filenode.map)
2286 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
2287 0 : // TODO
2288 0 : Ok(())
2289 0 : }
2290 :
2291 : /// This method is used for marking truncated SLRU files
2292 0 : pub async fn drop_twophase_file(
2293 0 : &mut self,
2294 0 : xid: u64,
2295 0 : ctx: &RequestContext,
2296 0 : ) -> anyhow::Result<()> {
2297 : // Remove it from the directory entry
2298 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2299 0 : let newdirbuf = if self.tline.pg_version >= 17 {
2300 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2301 :
2302 0 : if !dir.xids.remove(&xid) {
2303 0 : warn!("twophase file for xid {} does not exist", xid);
2304 0 : }
2305 0 : self.pending_directory_entries.push((
2306 0 : DirectoryKind::TwoPhase,
2307 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2308 0 : ));
2309 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2310 : } else {
2311 0 : let xid: u32 = u32::try_from(xid)?;
2312 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2313 :
2314 0 : if !dir.xids.remove(&xid) {
2315 0 : warn!("twophase file for xid {} does not exist", xid);
2316 0 : }
2317 0 : self.pending_directory_entries.push((
2318 0 : DirectoryKind::TwoPhase,
2319 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2320 0 : ));
2321 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2322 : };
2323 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2324 0 :
2325 0 : // Delete it
2326 0 : self.delete(twophase_key_range(xid));
2327 0 :
2328 0 : Ok(())
2329 0 : }
2330 :
2331 32 : pub async fn put_file(
2332 32 : &mut self,
2333 32 : path: &str,
2334 32 : content: &[u8],
2335 32 : ctx: &RequestContext,
2336 32 : ) -> anyhow::Result<()> {
2337 32 : let key = aux_file::encode_aux_file_key(path);
2338 : // retrieve the key from the engine
2339 32 : let old_val = match self.get(key, ctx).await {
2340 8 : Ok(val) => Some(val),
2341 24 : Err(PageReconstructError::MissingKey(_)) => None,
2342 0 : Err(e) => return Err(e.into()),
2343 : };
2344 32 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2345 8 : aux_file::decode_file_value(old_val)?
2346 : } else {
2347 24 : Vec::new()
2348 : };
2349 32 : let mut other_files = Vec::with_capacity(files.len());
2350 32 : let mut modifying_file = None;
2351 40 : for file @ (p, content) in files {
2352 8 : if path == p {
2353 8 : assert!(
2354 8 : modifying_file.is_none(),
2355 0 : "duplicated entries found for {}",
2356 : path
2357 : );
2358 8 : modifying_file = Some(content);
2359 0 : } else {
2360 0 : other_files.push(file);
2361 0 : }
2362 : }
2363 32 : let mut new_files = other_files;
2364 32 : match (modifying_file, content.is_empty()) {
2365 4 : (Some(old_content), false) => {
2366 4 : self.tline
2367 4 : .aux_file_size_estimator
2368 4 : .on_update(old_content.len(), content.len());
2369 4 : new_files.push((path, content));
2370 4 : }
2371 4 : (Some(old_content), true) => {
2372 4 : self.tline
2373 4 : .aux_file_size_estimator
2374 4 : .on_remove(old_content.len());
2375 4 : // not adding the file key to the final `new_files` vec.
2376 4 : }
2377 24 : (None, false) => {
2378 24 : self.tline.aux_file_size_estimator.on_add(content.len());
2379 24 : new_files.push((path, content));
2380 24 : }
2381 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2382 : // Compute doesn't know if previous version of this file exists or not, so
2383 : // attempt to delete non-existing file can cause this message.
2384 : // To avoid false alarms, log it as info rather than warning.
2385 0 : (None, true) if path.starts_with("pg_stat/") => {
2386 0 : info!("removing non-existing pg_stat file: {}", path)
2387 : }
2388 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2389 : }
2390 32 : let new_val = aux_file::encode_file_value(&new_files)?;
2391 32 : self.put(key, Value::Image(new_val.into()));
2392 32 :
2393 32 : Ok(())
2394 32 : }
2395 :
2396 : ///
2397 : /// Flush changes accumulated so far to the underlying repository.
2398 : ///
2399 : /// Usually, changes made in DatadirModification are atomic, but this allows
2400 : /// you to flush them to the underlying repository before the final `commit`.
2401 : /// That allows to free up the memory used to hold the pending changes.
2402 : ///
2403 : /// Currently only used during bulk import of a data directory. In that
2404 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2405 : /// whole import fails and the timeline will be deleted anyway.
2406 : /// (Or to be precise, it will be left behind for debugging purposes and
2407 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2408 : ///
2409 : /// Note: A consequence of flushing the pending operations is that they
2410 : /// won't be visible to subsequent operations until `commit`. The function
2411 : /// retains all the metadata, but data pages are flushed. That's again OK
2412 : /// for bulk import, where you are just loading data pages and won't try to
2413 : /// modify the same pages twice.
2414 3860 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2415 3860 : // Unless we have accumulated a decent amount of changes, it's not worth it
2416 3860 : // to scan through the pending_updates list.
2417 3860 : let pending_nblocks = self.pending_nblocks;
2418 3860 : if pending_nblocks < 10000 {
2419 3860 : return Ok(());
2420 0 : }
2421 :
2422 0 : let mut writer = self.tline.writer().await;
2423 :
2424 : // Flush relation and SLRU data blocks, keep metadata.
2425 0 : if let Some(batch) = self.pending_data_batch.take() {
2426 0 : tracing::debug!(
2427 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2428 0 : batch.max_lsn,
2429 0 : self.tline.get_last_record_lsn()
2430 : );
2431 :
2432 : // This bails out on first error without modifying pending_updates.
2433 : // That's Ok, cf this function's doc comment.
2434 0 : writer.put_batch(batch, ctx).await?;
2435 0 : }
2436 :
2437 0 : if pending_nblocks != 0 {
2438 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2439 0 : self.pending_nblocks = 0;
2440 0 : }
2441 :
2442 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2443 0 : writer.update_directory_entries_count(kind, count);
2444 0 : }
2445 :
2446 0 : Ok(())
2447 3860 : }
2448 :
2449 : ///
2450 : /// Finish this atomic update, writing all the updated keys to the
2451 : /// underlying timeline.
2452 : /// All the modifications in this atomic update are stamped by the specified LSN.
2453 : ///
2454 1486204 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2455 1486204 : let mut writer = self.tline.writer().await;
2456 :
2457 1486204 : let pending_nblocks = self.pending_nblocks;
2458 1486204 : self.pending_nblocks = 0;
2459 :
2460 : // Ordering: the items in this batch do not need to be in any global order, but values for
2461 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2462 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2463 : // more details.
2464 :
2465 1486204 : let metadata_batch = {
2466 1486204 : let pending_meta = self
2467 1486204 : .pending_metadata_pages
2468 1486204 : .drain()
2469 1486204 : .flat_map(|(key, values)| {
2470 548148 : values
2471 548148 : .into_iter()
2472 548148 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2473 1486204 : })
2474 1486204 : .collect::<Vec<_>>();
2475 1486204 :
2476 1486204 : if pending_meta.is_empty() {
2477 944556 : None
2478 : } else {
2479 541648 : Some(SerializedValueBatch::from_values(pending_meta))
2480 : }
2481 : };
2482 :
2483 1486204 : let data_batch = self.pending_data_batch.take();
2484 :
2485 1486204 : let maybe_batch = match (data_batch, metadata_batch) {
2486 529112 : (Some(mut data), Some(metadata)) => {
2487 529112 : data.extend(metadata);
2488 529112 : Some(data)
2489 : }
2490 286524 : (Some(data), None) => Some(data),
2491 12536 : (None, Some(metadata)) => Some(metadata),
2492 658032 : (None, None) => None,
2493 : };
2494 :
2495 1486204 : if let Some(batch) = maybe_batch {
2496 828172 : tracing::debug!(
2497 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2498 0 : batch.max_lsn,
2499 0 : self.tline.get_last_record_lsn()
2500 : );
2501 :
2502 : // This bails out on first error without modifying pending_updates.
2503 : // That's Ok, cf this function's doc comment.
2504 828172 : writer.put_batch(batch, ctx).await?;
2505 658032 : }
2506 :
2507 1486204 : if !self.pending_deletions.is_empty() {
2508 4 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2509 4 : self.pending_deletions.clear();
2510 1486200 : }
2511 :
2512 1486204 : self.pending_lsns.push(self.lsn);
2513 1777920 : for pending_lsn in self.pending_lsns.drain(..) {
2514 1777920 : // TODO(vlad): pretty sure the comment below is not valid anymore
2515 1777920 : // and we can call finish write with the latest LSN
2516 1777920 : //
2517 1777920 : // Ideally, we should be able to call writer.finish_write() only once
2518 1777920 : // with the highest LSN. However, the last_record_lsn variable in the
2519 1777920 : // timeline keeps track of the latest LSN and the immediate previous LSN
2520 1777920 : // so we need to record every LSN to not leave a gap between them.
2521 1777920 : writer.finish_write(pending_lsn);
2522 1777920 : }
2523 :
2524 1486204 : if pending_nblocks != 0 {
2525 541140 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2526 945064 : }
2527 :
2528 1486204 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2529 6028 : writer.update_directory_entries_count(kind, count);
2530 6028 : }
2531 :
2532 1486204 : self.pending_metadata_bytes = 0;
2533 1486204 :
2534 1486204 : Ok(())
2535 1486204 : }
2536 :
2537 583408 : pub(crate) fn len(&self) -> usize {
2538 583408 : self.pending_metadata_pages.len()
2539 583408 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2540 583408 : + self.pending_deletions.len()
2541 583408 : }
2542 :
2543 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2544 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2545 : /// in the modification.
2546 : ///
2547 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2548 : /// page must ensure that the pages they read are already committed in Timeline, for example
2549 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2550 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2551 : /// and not data pages.
2552 573172 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2553 573172 : if !Self::is_data_key(&key) {
2554 : // Have we already updated the same key? Read the latest pending updated
2555 : // version in that case.
2556 : //
2557 : // Note: we don't check pending_deletions. It is an error to request a
2558 : // value that has been removed, deletion only avoids leaking storage.
2559 573172 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2560 31856 : if let Some((_, _, value)) = values.last() {
2561 31856 : return if let Value::Image(img) = value {
2562 31856 : Ok(img.clone())
2563 : } else {
2564 : // Currently, we never need to read back a WAL record that we
2565 : // inserted in the same "transaction". All the metadata updates
2566 : // work directly with Images, and we never need to read actual
2567 : // data pages. We could handle this if we had to, by calling
2568 : // the walredo manager, but let's keep it simple for now.
2569 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2570 0 : "unexpected pending WAL record"
2571 0 : )))
2572 : };
2573 0 : }
2574 541316 : }
2575 : } else {
2576 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2577 : // this key should never be present in pending_data_pages. We ensure this by committing
2578 : // modifications before ingesting DB create operations, which are the only kind that reads
2579 : // data pages during ingest.
2580 0 : if cfg!(debug_assertions) {
2581 0 : assert!(
2582 0 : !self
2583 0 : .pending_data_batch
2584 0 : .as_ref()
2585 0 : .is_some_and(|b| b.updates_key(&key))
2586 0 : );
2587 0 : }
2588 : }
2589 :
2590 : // Metadata page cache miss, or we're reading a data page.
2591 541316 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2592 541316 : self.tline.get(key, lsn, ctx).await
2593 573172 : }
2594 :
2595 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2596 : /// and the empty value into None.
2597 0 : async fn sparse_get(
2598 0 : &self,
2599 0 : key: Key,
2600 0 : ctx: &RequestContext,
2601 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2602 0 : let val = self.get(key, ctx).await;
2603 0 : match val {
2604 0 : Ok(val) if val.is_empty() => Ok(None),
2605 0 : Ok(val) => Ok(Some(val)),
2606 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2607 0 : Err(e) => Err(e),
2608 : }
2609 0 : }
2610 :
2611 1128188 : fn put(&mut self, key: Key, val: Value) {
2612 1128188 : if Self::is_data_key(&key) {
2613 555736 : self.put_data(key.to_compact(), val)
2614 : } else {
2615 572452 : self.put_metadata(key.to_compact(), val)
2616 : }
2617 1128188 : }
2618 :
2619 555736 : fn put_data(&mut self, key: CompactKey, val: Value) {
2620 555736 : let batch = self
2621 555736 : .pending_data_batch
2622 555736 : .get_or_insert_with(SerializedValueBatch::default);
2623 555736 : batch.put(key, val, self.lsn);
2624 555736 : }
2625 :
2626 572452 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2627 572452 : let values = self.pending_metadata_pages.entry(key).or_default();
2628 : // Replace the previous value if it exists at the same lsn
2629 572452 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2630 24304 : if *last_lsn == self.lsn {
2631 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2632 24304 : self.pending_metadata_bytes -= *last_value_ser_size;
2633 24304 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2634 24304 : self.pending_metadata_bytes += *last_value_ser_size;
2635 24304 :
2636 24304 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2637 24304 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2638 24304 : *last_value = val;
2639 24304 : return;
2640 0 : }
2641 548148 : }
2642 :
2643 548148 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2644 548148 : self.pending_metadata_bytes += val_serialized_size;
2645 548148 : values.push((self.lsn, val_serialized_size, val));
2646 548148 :
2647 548148 : if key == CHECKPOINT_KEY.to_compact() {
2648 460 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2649 547688 : }
2650 572452 : }
2651 :
2652 4 : fn delete(&mut self, key_range: Range<Key>) {
2653 4 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2654 4 : self.pending_deletions.push((key_range, self.lsn));
2655 4 : }
2656 : }
2657 :
2658 : /// Statistics for a DatadirModification.
2659 : #[derive(Default)]
2660 : pub struct DatadirModificationStats {
2661 : pub metadata_images: u64,
2662 : pub metadata_deltas: u64,
2663 : pub data_images: u64,
2664 : pub data_deltas: u64,
2665 : }
2666 :
2667 : /// This struct facilitates accessing either a committed key from the timeline at a
2668 : /// specific LSN, or the latest uncommitted key from a pending modification.
2669 : ///
2670 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2671 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2672 : /// need to look up the keys in the modification first before looking them up in the
2673 : /// timeline to not miss the latest updates.
2674 : #[derive(Clone, Copy)]
2675 : pub enum Version<'a> {
2676 : Lsn(Lsn),
2677 : Modified(&'a DatadirModification<'a>),
2678 : }
2679 :
2680 : impl Version<'_> {
2681 10352 : async fn get(
2682 10352 : &self,
2683 10352 : timeline: &Timeline,
2684 10352 : key: Key,
2685 10352 : ctx: &RequestContext,
2686 10352 : ) -> Result<Bytes, PageReconstructError> {
2687 10352 : match self {
2688 10312 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2689 40 : Version::Modified(modification) => modification.get(key, ctx).await,
2690 : }
2691 10352 : }
2692 :
2693 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2694 : /// and the empty value into None.
2695 0 : async fn sparse_get(
2696 0 : &self,
2697 0 : timeline: &Timeline,
2698 0 : key: Key,
2699 0 : ctx: &RequestContext,
2700 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2701 0 : let val = self.get(timeline, key, ctx).await;
2702 0 : match val {
2703 0 : Ok(val) if val.is_empty() => Ok(None),
2704 0 : Ok(val) => Ok(Some(val)),
2705 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2706 0 : Err(e) => Err(e),
2707 : }
2708 0 : }
2709 :
2710 71240 : fn get_lsn(&self) -> Lsn {
2711 71240 : match self {
2712 59148 : Version::Lsn(lsn) => *lsn,
2713 12092 : Version::Modified(modification) => modification.lsn,
2714 : }
2715 71240 : }
2716 : }
2717 :
2718 : //--- Metadata structs stored in key-value pairs in the repository.
2719 :
2720 0 : #[derive(Debug, Serialize, Deserialize)]
2721 : pub(crate) struct DbDirectory {
2722 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2723 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2724 : }
2725 :
2726 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2727 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2728 : // were named like "pg_twophase/000002E5", now they're like
2729 : // "pg_twophsae/0000000A000002E4".
2730 :
2731 0 : #[derive(Debug, Serialize, Deserialize)]
2732 : pub(crate) struct TwoPhaseDirectory {
2733 : pub(crate) xids: HashSet<TransactionId>,
2734 : }
2735 :
2736 0 : #[derive(Debug, Serialize, Deserialize)]
2737 : struct TwoPhaseDirectoryV17 {
2738 : xids: HashSet<u64>,
2739 : }
2740 :
2741 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2742 : pub(crate) struct RelDirectory {
2743 : // Set of relations that exist. (relfilenode, forknum)
2744 : //
2745 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2746 : // key-value pairs, if you have a lot of relations
2747 : pub(crate) rels: HashSet<(Oid, u8)>,
2748 : }
2749 :
2750 0 : #[derive(Debug, Serialize, Deserialize)]
2751 : struct RelSizeEntry {
2752 : nblocks: u32,
2753 : }
2754 :
2755 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2756 : pub(crate) struct SlruSegmentDirectory {
2757 : // Set of SLRU segments that exist.
2758 : pub(crate) segments: HashSet<u32>,
2759 : }
2760 :
2761 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2762 : #[repr(u8)]
2763 : pub(crate) enum DirectoryKind {
2764 : Db,
2765 : TwoPhase,
2766 : Rel,
2767 : AuxFiles,
2768 : SlruSegment(SlruKind),
2769 : RelV2,
2770 : }
2771 :
2772 : impl DirectoryKind {
2773 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2774 18088 : pub(crate) fn offset(&self) -> usize {
2775 18088 : self.into_usize()
2776 18088 : }
2777 : }
2778 :
2779 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2780 :
2781 : #[allow(clippy::bool_assert_comparison)]
2782 : #[cfg(test)]
2783 : mod tests {
2784 : use hex_literal::hex;
2785 : use pageserver_api::models::ShardParameters;
2786 : use pageserver_api::shard::ShardStripeSize;
2787 : use utils::id::TimelineId;
2788 : use utils::shard::{ShardCount, ShardNumber};
2789 :
2790 : use super::*;
2791 : use crate::DEFAULT_PG_VERSION;
2792 : use crate::tenant::harness::TenantHarness;
2793 :
2794 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2795 : #[tokio::test]
2796 4 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2797 4 : let name = "aux_files_round_trip";
2798 4 : let harness = TenantHarness::create(name).await?;
2799 4 :
2800 4 : pub const TIMELINE_ID: TimelineId =
2801 4 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2802 4 :
2803 4 : let (tenant, ctx) = harness.load().await;
2804 4 : let (tline, ctx) = tenant
2805 4 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2806 4 : .await?;
2807 4 : let tline = tline.raw_timeline().unwrap();
2808 4 :
2809 4 : // First modification: insert two keys
2810 4 : let mut modification = tline.begin_modification(Lsn(0x1000));
2811 4 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2812 4 : modification.set_lsn(Lsn(0x1008))?;
2813 4 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2814 4 : modification.commit(&ctx).await?;
2815 4 : let expect_1008 = HashMap::from([
2816 4 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2817 4 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2818 4 : ]);
2819 4 :
2820 4 : let io_concurrency = IoConcurrency::spawn_for_test();
2821 4 :
2822 4 : let readback = tline
2823 4 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2824 4 : .await?;
2825 4 : assert_eq!(readback, expect_1008);
2826 4 :
2827 4 : // Second modification: update one key, remove the other
2828 4 : let mut modification = tline.begin_modification(Lsn(0x2000));
2829 4 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2830 4 : modification.set_lsn(Lsn(0x2008))?;
2831 4 : modification.put_file("foo/bar2", b"", &ctx).await?;
2832 4 : modification.commit(&ctx).await?;
2833 4 : let expect_2008 =
2834 4 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2835 4 :
2836 4 : let readback = tline
2837 4 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2838 4 : .await?;
2839 4 : assert_eq!(readback, expect_2008);
2840 4 :
2841 4 : // Reading back in time works
2842 4 : let readback = tline
2843 4 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2844 4 : .await?;
2845 4 : assert_eq!(readback, expect_1008);
2846 4 :
2847 4 : Ok(())
2848 4 : }
2849 :
2850 : #[test]
2851 4 : fn gap_finding() {
2852 4 : let rel = RelTag {
2853 4 : spcnode: 1663,
2854 4 : dbnode: 208101,
2855 4 : relnode: 2620,
2856 4 : forknum: 0,
2857 4 : };
2858 4 : let base_blkno = 1;
2859 4 :
2860 4 : let base_key = rel_block_to_key(rel, base_blkno);
2861 4 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2862 4 :
2863 4 : let shard = ShardIdentity::unsharded();
2864 4 :
2865 4 : let mut previous_nblocks = 0;
2866 44 : for i in 0..10 {
2867 40 : let crnt_blkno = base_blkno + i;
2868 40 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2869 40 :
2870 40 : previous_nblocks = crnt_blkno + 1;
2871 40 :
2872 40 : if i == 0 {
2873 : // The first block we write is 1, so we should find the gap.
2874 4 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2875 : } else {
2876 36 : assert!(gaps.is_none());
2877 : }
2878 : }
2879 :
2880 : // This is an update to an already existing block. No gaps here.
2881 4 : let update_blkno = 5;
2882 4 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2883 4 : assert!(gaps.is_none());
2884 :
2885 : // This is an update past the current end block.
2886 4 : let after_gap_blkno = 20;
2887 4 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2888 4 :
2889 4 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2890 4 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2891 4 : assert_eq!(
2892 4 : gaps.unwrap(),
2893 4 : KeySpace::single(gap_start_key..after_gap_key)
2894 4 : );
2895 4 : }
2896 :
2897 : #[test]
2898 4 : fn sharded_gap_finding() {
2899 4 : let rel = RelTag {
2900 4 : spcnode: 1663,
2901 4 : dbnode: 208101,
2902 4 : relnode: 2620,
2903 4 : forknum: 0,
2904 4 : };
2905 4 :
2906 4 : let first_blkno = 6;
2907 4 :
2908 4 : // This shard will get the even blocks
2909 4 : let shard = ShardIdentity::from_params(
2910 4 : ShardNumber(0),
2911 4 : &ShardParameters {
2912 4 : count: ShardCount(2),
2913 4 : stripe_size: ShardStripeSize(1),
2914 4 : },
2915 4 : );
2916 4 :
2917 4 : // Only keys belonging to this shard are considered as gaps.
2918 4 : let mut previous_nblocks = 0;
2919 4 : let gaps =
2920 4 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2921 4 : assert!(!gaps.ranges.is_empty());
2922 12 : for gap_range in gaps.ranges {
2923 8 : let mut k = gap_range.start;
2924 16 : while k != gap_range.end {
2925 8 : assert_eq!(shard.get_shard_number(&k), shard.number);
2926 8 : k = k.next();
2927 : }
2928 : }
2929 :
2930 4 : previous_nblocks = first_blkno;
2931 4 :
2932 4 : let update_blkno = 2;
2933 4 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2934 4 : assert!(gaps.is_none());
2935 4 : }
2936 :
2937 : /*
2938 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2939 : let incremental = timeline.get_current_logical_size();
2940 : let non_incremental = timeline
2941 : .get_current_logical_size_non_incremental(lsn)
2942 : .unwrap();
2943 : assert_eq!(incremental, non_incremental);
2944 : }
2945 : */
2946 :
2947 : /*
2948 : ///
2949 : /// Test list_rels() function, with branches and dropped relations
2950 : ///
2951 : #[test]
2952 : fn test_list_rels_drop() -> Result<()> {
2953 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2954 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2955 : const TESTDB: u32 = 111;
2956 :
2957 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2958 : // after branching fails below
2959 : let mut writer = tline.begin_record(Lsn(0x10));
2960 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2961 : writer.finish()?;
2962 :
2963 : // Create a relation on the timeline
2964 : let mut writer = tline.begin_record(Lsn(0x20));
2965 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2966 : writer.finish()?;
2967 :
2968 : let writer = tline.begin_record(Lsn(0x00));
2969 : writer.finish()?;
2970 :
2971 : // Check that list_rels() lists it after LSN 2, but no before it
2972 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2973 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2974 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2975 :
2976 : // Create a branch, check that the relation is visible there
2977 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2978 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2979 : Some(timeline) => timeline,
2980 : None => panic!("Should have a local timeline"),
2981 : };
2982 : let newtline = DatadirTimelineImpl::new(newtline);
2983 : assert!(newtline
2984 : .list_rels(0, TESTDB, Lsn(0x30))?
2985 : .contains(&TESTREL_A));
2986 :
2987 : // Drop it on the branch
2988 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2989 : new_writer.drop_relation(TESTREL_A)?;
2990 : new_writer.finish()?;
2991 :
2992 : // Check that it's no longer listed on the branch after the point where it was dropped
2993 : assert!(newtline
2994 : .list_rels(0, TESTDB, Lsn(0x30))?
2995 : .contains(&TESTREL_A));
2996 : assert!(!newtline
2997 : .list_rels(0, TESTDB, Lsn(0x40))?
2998 : .contains(&TESTREL_A));
2999 :
3000 : // Run checkpoint and garbage collection and check that it's still not visible
3001 : newtline.checkpoint(CheckpointConfig::Forced)?;
3002 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
3003 :
3004 : assert!(!newtline
3005 : .list_rels(0, TESTDB, Lsn(0x40))?
3006 : .contains(&TESTREL_A));
3007 :
3008 : Ok(())
3009 : }
3010 : */
3011 :
3012 : /*
3013 : #[test]
3014 : fn test_read_beyond_eof() -> Result<()> {
3015 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3016 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3017 :
3018 : make_some_layers(&tline, Lsn(0x20))?;
3019 : let mut writer = tline.begin_record(Lsn(0x60));
3020 : walingest.put_rel_page_image(
3021 : &mut writer,
3022 : TESTREL_A,
3023 : 0,
3024 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3025 : )?;
3026 : writer.finish()?;
3027 :
3028 : // Test read before rel creation. Should error out.
3029 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3030 :
3031 : // Read block beyond end of relation at different points in time.
3032 : // These reads should fall into different delta, image, and in-memory layers.
3033 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3034 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3035 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3036 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3037 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3038 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3039 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3040 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3041 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3042 :
3043 : // Test on an in-memory layer with no preceding layer
3044 : let mut writer = tline.begin_record(Lsn(0x70));
3045 : walingest.put_rel_page_image(
3046 : &mut writer,
3047 : TESTREL_B,
3048 : 0,
3049 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3050 : )?;
3051 : writer.finish()?;
3052 :
3053 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3054 :
3055 : Ok(())
3056 : }
3057 : */
3058 : }
|