Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 :
12 : use crate::walingest::{WalIngestError, WalIngestErrorKind};
13 : use crate::{PERF_TRACE_TARGET, ensure_walingest};
14 : use anyhow::Context;
15 : use bytes::{Buf, Bytes, BytesMut};
16 : use enum_map::Enum;
17 : use itertools::Itertools;
18 : use pageserver_api::key::{
19 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
20 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
21 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
22 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
23 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
24 : };
25 : use pageserver_api::keyspace::SparseKeySpace;
26 : use pageserver_api::models::RelSizeMigration;
27 : use pageserver_api::record::NeonWalRecord;
28 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
29 : use pageserver_api::shard::ShardIdentity;
30 : use pageserver_api::value::Value;
31 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
32 : use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
33 : use serde::{Deserialize, Serialize};
34 : use strum::IntoEnumIterator;
35 : use tokio_util::sync::CancellationToken;
36 : use tracing::{debug, info, info_span, trace, warn};
37 : use utils::bin_ser::{BeSer, DeserializeError};
38 : use utils::lsn::Lsn;
39 : use utils::pausable_failpoint;
40 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
41 :
42 : use super::tenant::{PageReconstructError, Timeline};
43 : use crate::aux_file;
44 : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
45 : use crate::keyspace::{KeySpace, KeySpaceAccum};
46 : use crate::metrics::{
47 : RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
48 : };
49 : use crate::span::{
50 : debug_assert_current_span_has_tenant_and_timeline_id,
51 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
52 : };
53 : use crate::tenant::storage_layer::IoConcurrency;
54 : use crate::tenant::timeline::GetVectoredError;
55 :
56 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
57 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
58 :
59 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
60 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
61 :
62 : #[derive(Debug)]
63 : pub enum LsnForTimestamp {
64 : /// Found commits both before and after the given timestamp
65 : Present(Lsn),
66 :
67 : /// Found no commits after the given timestamp, this means
68 : /// that the newest data in the branch is older than the given
69 : /// timestamp.
70 : ///
71 : /// All commits <= LSN happened before the given timestamp
72 : Future(Lsn),
73 :
74 : /// The queried timestamp is past our horizon we look back at (PITR)
75 : ///
76 : /// All commits > LSN happened after the given timestamp,
77 : /// but any commits < LSN might have happened before or after
78 : /// the given timestamp. We don't know because no data before
79 : /// the given lsn is available.
80 : Past(Lsn),
81 :
82 : /// We have found no commit with a timestamp,
83 : /// so we can't return anything meaningful.
84 : ///
85 : /// The associated LSN is the lower bound value we can safely
86 : /// create branches on, but no statement is made if it is
87 : /// older or newer than the timestamp.
88 : ///
89 : /// This variant can e.g. be returned right after a
90 : /// cluster import.
91 : NoData(Lsn),
92 : }
93 :
94 : #[derive(Debug, thiserror::Error)]
95 : pub(crate) enum CalculateLogicalSizeError {
96 : #[error("cancelled")]
97 : Cancelled,
98 :
99 : /// Something went wrong while reading the metadata we use to calculate logical size
100 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
101 : /// in the `From` implementation for this variant.
102 : #[error(transparent)]
103 : PageRead(PageReconstructError),
104 :
105 : /// Something went wrong deserializing metadata that we read to calculate logical size
106 : #[error("decode error: {0}")]
107 : Decode(#[from] DeserializeError),
108 : }
109 :
110 : #[derive(Debug, thiserror::Error)]
111 : pub(crate) enum CollectKeySpaceError {
112 : #[error(transparent)]
113 : Decode(#[from] DeserializeError),
114 : #[error(transparent)]
115 : PageRead(PageReconstructError),
116 : #[error("cancelled")]
117 : Cancelled,
118 : }
119 :
120 : impl From<PageReconstructError> for CollectKeySpaceError {
121 0 : fn from(err: PageReconstructError) -> Self {
122 0 : match err {
123 0 : PageReconstructError::Cancelled => Self::Cancelled,
124 0 : err => Self::PageRead(err),
125 : }
126 0 : }
127 : }
128 :
129 : impl From<PageReconstructError> for CalculateLogicalSizeError {
130 0 : fn from(pre: PageReconstructError) -> Self {
131 0 : match pre {
132 0 : PageReconstructError::Cancelled => Self::Cancelled,
133 0 : _ => Self::PageRead(pre),
134 : }
135 0 : }
136 : }
137 :
138 : #[derive(Debug, thiserror::Error)]
139 : pub enum RelationError {
140 : #[error("invalid relnode")]
141 : InvalidRelnode,
142 : }
143 :
144 : ///
145 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
146 : /// and other special kinds of files, in a versioned key-value store. The
147 : /// Timeline struct provides the key-value store.
148 : ///
149 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
150 : /// implementation, and might be moved into a separate struct later.
151 : impl Timeline {
152 : /// Start ingesting a WAL record, or other atomic modification of
153 : /// the timeline.
154 : ///
155 : /// This provides a transaction-like interface to perform a bunch
156 : /// of modifications atomically.
157 : ///
158 : /// To ingest a WAL record, call begin_modification(lsn) to get a
159 : /// DatadirModification object. Use the functions in the object to
160 : /// modify the repository state, updating all the pages and metadata
161 : /// that the WAL record affects. When you're done, call commit() to
162 : /// commit the changes.
163 : ///
164 : /// Lsn stored in modification is advanced by `ingest_record` and
165 : /// is used by `commit()` to update `last_record_lsn`.
166 : ///
167 : /// Calling commit() will flush all the changes and reset the state,
168 : /// so the `DatadirModification` struct can be reused to perform the next modification.
169 : ///
170 : /// Note that any pending modifications you make through the
171 : /// modification object won't be visible to calls to the 'get' and list
172 : /// functions of the timeline until you finish! And if you update the
173 : /// same page twice, the last update wins.
174 : ///
175 536836 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
176 536836 : where
177 536836 : Self: Sized,
178 536836 : {
179 536836 : DatadirModification {
180 536836 : tline: self,
181 536836 : pending_lsns: Vec::new(),
182 536836 : pending_metadata_pages: HashMap::new(),
183 536836 : pending_data_batch: None,
184 536836 : pending_deletions: Vec::new(),
185 536836 : pending_nblocks: 0,
186 536836 : pending_directory_entries: Vec::new(),
187 536836 : pending_metadata_bytes: 0,
188 536836 : lsn,
189 536836 : }
190 536836 : }
191 :
192 : //------------------------------------------------------------------------------
193 : // Public GET functions
194 : //------------------------------------------------------------------------------
195 :
196 : /// Look up given page version.
197 36768 : pub(crate) async fn get_rel_page_at_lsn(
198 36768 : &self,
199 36768 : tag: RelTag,
200 36768 : blknum: BlockNumber,
201 36768 : version: Version<'_>,
202 36768 : ctx: &RequestContext,
203 36768 : io_concurrency: IoConcurrency,
204 36768 : ) -> Result<Bytes, PageReconstructError> {
205 36768 : match version {
206 36768 : Version::Lsn(effective_lsn) => {
207 36768 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
208 36768 : let res = self
209 36768 : .get_rel_page_at_lsn_batched(
210 36768 : pages
211 36768 : .iter()
212 36768 : .map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
213 36768 : effective_lsn,
214 36768 : io_concurrency.clone(),
215 36768 : ctx,
216 36768 : )
217 36768 : .await;
218 36768 : assert_eq!(res.len(), 1);
219 36768 : res.into_iter().next().unwrap()
220 : }
221 0 : Version::Modified(modification) => {
222 0 : if tag.relnode == 0 {
223 0 : return Err(PageReconstructError::Other(
224 0 : RelationError::InvalidRelnode.into(),
225 0 : ));
226 0 : }
227 :
228 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
229 0 : if blknum >= nblocks {
230 0 : debug!(
231 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
232 0 : tag,
233 0 : blknum,
234 0 : version.get_lsn(),
235 : nblocks
236 : );
237 0 : return Ok(ZERO_PAGE.clone());
238 0 : }
239 0 :
240 0 : let key = rel_block_to_key(tag, blknum);
241 0 : modification.get(key, ctx).await
242 : }
243 : }
244 36768 : }
245 :
246 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
247 : ///
248 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
249 36768 : pub(crate) async fn get_rel_page_at_lsn_batched(
250 36768 : &self,
251 36768 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
252 36768 : effective_lsn: Lsn,
253 36768 : io_concurrency: IoConcurrency,
254 36768 : ctx: &RequestContext,
255 36768 : ) -> Vec<Result<Bytes, PageReconstructError>> {
256 36768 : debug_assert_current_span_has_tenant_and_timeline_id();
257 36768 :
258 36768 : let mut slots_filled = 0;
259 36768 : let page_count = pages.len();
260 36768 :
261 36768 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
262 36768 : let mut result = Vec::with_capacity(pages.len());
263 36768 : let result_slots = result.spare_capacity_mut();
264 36768 :
265 36768 : let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
266 36768 : BTreeMap::default();
267 36768 :
268 36768 : let mut perf_instrument = false;
269 36768 : for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() {
270 36768 : if tag.relnode == 0 {
271 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
272 0 : RelationError::InvalidRelnode.into(),
273 0 : )));
274 0 :
275 0 : slots_filled += 1;
276 0 : continue;
277 36768 : }
278 :
279 36768 : let nblocks = match self
280 36768 : .get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx)
281 36768 : .maybe_perf_instrument(&ctx, |crnt_perf_span| {
282 0 : info_span!(
283 : target: PERF_TRACE_TARGET,
284 0 : parent: crnt_perf_span,
285 : "GET_REL_SIZE",
286 : reltag=%tag,
287 : lsn=%effective_lsn,
288 : )
289 36768 : })
290 36768 : .await
291 : {
292 36768 : Ok(nblocks) => nblocks,
293 0 : Err(err) => {
294 0 : result_slots[response_slot_idx].write(Err(err));
295 0 : slots_filled += 1;
296 0 : continue;
297 : }
298 : };
299 :
300 36768 : if *blknum >= nblocks {
301 0 : debug!(
302 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
303 : tag, blknum, effective_lsn, nblocks
304 : );
305 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
306 0 : slots_filled += 1;
307 0 : continue;
308 36768 : }
309 36768 :
310 36768 : let key = rel_block_to_key(*tag, *blknum);
311 36768 :
312 36768 : if ctx.has_perf_span() {
313 0 : perf_instrument = true;
314 36768 : }
315 :
316 36768 : let key_slots = keys_slots.entry(key).or_default();
317 36768 : key_slots.push((response_slot_idx, ctx));
318 : }
319 :
320 36768 : let keyspace = {
321 : // add_key requires monotonicity
322 36768 : let mut acc = KeySpaceAccum::new();
323 36768 : for key in keys_slots
324 36768 : .keys()
325 36768 : // in fact it requires strong monotonicity
326 36768 : .dedup()
327 36768 : {
328 36768 : acc.add_key(*key);
329 36768 : }
330 36768 : acc.to_keyspace()
331 : };
332 :
333 36768 : let ctx = match perf_instrument {
334 0 : true => RequestContextBuilder::from(ctx)
335 0 : .root_perf_span(|| {
336 0 : info_span!(
337 : target: PERF_TRACE_TARGET,
338 : "GET_VECTORED",
339 : tenant_id = %self.tenant_shard_id.tenant_id,
340 : timeline_id = %self.timeline_id,
341 : lsn = %effective_lsn,
342 0 : shard = %self.tenant_shard_id.shard_slug(),
343 : )
344 0 : })
345 0 : .attached_child(),
346 36768 : false => ctx.attached_child(),
347 : };
348 :
349 36768 : let res = self
350 36768 : .get_vectored(keyspace, effective_lsn, io_concurrency, &ctx)
351 36768 : .maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
352 36768 : .await;
353 :
354 36768 : match res {
355 36768 : Ok(results) => {
356 73536 : for (key, res) in results {
357 36768 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
358 36768 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
359 :
360 36768 : for (slot, req_ctx) in key_slots {
361 0 : let clone = match &res {
362 0 : Ok(buf) => Ok(buf.clone()),
363 0 : Err(err) => Err(match err {
364 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
365 :
366 0 : x @ PageReconstructError::Other(_)
367 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
368 0 : | x @ PageReconstructError::WalRedo(_)
369 0 : | x @ PageReconstructError::MissingKey(_) => {
370 0 : PageReconstructError::Other(anyhow::anyhow!(
371 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
372 0 : ))
373 : }
374 : }),
375 : };
376 :
377 0 : result_slots[slot].write(clone);
378 0 : // There is no standardized way to express that the batched span followed from N request spans.
379 0 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
380 0 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
381 0 : req_ctx.perf_follows_from(&ctx);
382 0 : slots_filled += 1;
383 : }
384 :
385 36768 : result_slots[first_slot].write(res);
386 36768 : first_req_ctx.perf_follows_from(&ctx);
387 36768 : slots_filled += 1;
388 : }
389 : }
390 0 : Err(err) => {
391 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
392 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
393 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
394 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
395 : // but without taking ownership of the GetVectoredError
396 0 : let err = match &err {
397 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
398 : // TODO: restructure get_vectored API to make this error per-key
399 0 : GetVectoredError::MissingKey(err) => {
400 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
401 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
402 0 : )))
403 : }
404 : // TODO: restructure get_vectored API to make this error per-key
405 0 : GetVectoredError::GetReadyAncestorError(err) => {
406 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
407 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
408 0 : )))
409 : }
410 : // TODO: restructure get_vectored API to make this error per-key
411 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
412 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
413 0 : )),
414 : // TODO: we can prevent this error class by moving this check into the type system
415 0 : GetVectoredError::InvalidLsn(e) => {
416 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
417 : }
418 : // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
419 : // TODO: we can prevent this error class by moving this check into the type system
420 0 : GetVectoredError::Oversized(err) => {
421 0 : Err(anyhow::anyhow!("batching oversized: {err:?}").into())
422 : }
423 : };
424 :
425 0 : req_ctx.perf_follows_from(&ctx);
426 0 : result_slots[*slot].write(err);
427 : }
428 :
429 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
430 0 : }
431 : };
432 :
433 36768 : assert_eq!(slots_filled, page_count);
434 : // SAFETY:
435 : // 1. `result` and any of its uninint members are not read from until this point
436 : // 2. The length below is tracked at run-time and matches the number of requested pages.
437 36768 : unsafe {
438 36768 : result.set_len(page_count);
439 36768 : }
440 36768 :
441 36768 : result
442 36768 : }
443 :
444 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
445 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
446 : /// for pages up to the highest page number it has stored.
447 0 : pub(crate) async fn get_db_size(
448 0 : &self,
449 0 : spcnode: Oid,
450 0 : dbnode: Oid,
451 0 : version: Version<'_>,
452 0 : ctx: &RequestContext,
453 0 : ) -> Result<usize, PageReconstructError> {
454 0 : let mut total_blocks = 0;
455 :
456 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
457 :
458 0 : for rel in rels {
459 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
460 0 : total_blocks += n_blocks as usize;
461 : }
462 0 : Ok(total_blocks)
463 0 : }
464 :
465 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
466 : ///
467 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
468 : /// page number stored in the shard.
469 48868 : pub(crate) async fn get_rel_size(
470 48868 : &self,
471 48868 : tag: RelTag,
472 48868 : version: Version<'_>,
473 48868 : ctx: &RequestContext,
474 48868 : ) -> Result<BlockNumber, PageReconstructError> {
475 48868 : if tag.relnode == 0 {
476 0 : return Err(PageReconstructError::Other(
477 0 : RelationError::InvalidRelnode.into(),
478 0 : ));
479 48868 : }
480 :
481 48868 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
482 38588 : return Ok(nblocks);
483 10280 : }
484 10280 :
485 10280 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
486 0 : && !self.get_rel_exists(tag, version, ctx).await?
487 : {
488 : // FIXME: Postgres sometimes calls smgrcreate() to create
489 : // FSM, and smgrnblocks() on it immediately afterwards,
490 : // without extending it. Tolerate that by claiming that
491 : // any non-existent FSM fork has size 0.
492 0 : return Ok(0);
493 10280 : }
494 10280 :
495 10280 : let key = rel_size_to_key(tag);
496 10280 : let mut buf = version.get(self, key, ctx).await?;
497 10272 : let nblocks = buf.get_u32_le();
498 10272 :
499 10272 : self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
500 10272 :
501 10272 : Ok(nblocks)
502 48868 : }
503 :
504 : /// Does the relation exist?
505 : ///
506 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
507 : /// the shard stores pages for.
508 12100 : pub(crate) async fn get_rel_exists(
509 12100 : &self,
510 12100 : tag: RelTag,
511 12100 : version: Version<'_>,
512 12100 : ctx: &RequestContext,
513 12100 : ) -> Result<bool, PageReconstructError> {
514 12100 : if tag.relnode == 0 {
515 0 : return Err(PageReconstructError::Other(
516 0 : RelationError::InvalidRelnode.into(),
517 0 : ));
518 12100 : }
519 :
520 : // first try to lookup relation in cache
521 12100 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
522 12064 : return Ok(true);
523 36 : }
524 : // then check if the database was already initialized.
525 : // get_rel_exists can be called before dbdir is created.
526 36 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
527 36 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
528 36 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
529 0 : return Ok(false);
530 36 : }
531 36 :
532 36 : // Read path: first read the new reldir keyspace. Early return if the relation exists.
533 36 : // Otherwise, read the old reldir keyspace.
534 36 : // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
535 36 :
536 36 : if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
537 36 : self.get_rel_size_v2_status()
538 : {
539 : // fetch directory listing (new)
540 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
541 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
542 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
543 0 : let exists_v2 = buf == RelDirExists::Exists;
544 0 : // Fast path: if the relation exists in the new format, return true.
545 0 : // TODO: we should have a verification mode that checks both keyspaces
546 0 : // to ensure the relation only exists in one of them.
547 0 : if exists_v2 {
548 0 : return Ok(true);
549 0 : }
550 36 : }
551 :
552 : // fetch directory listing (old)
553 :
554 36 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
555 36 : let buf = version.get(self, key, ctx).await?;
556 :
557 36 : let dir = RelDirectory::des(&buf)?;
558 36 : let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
559 36 : Ok(exists_v1)
560 12100 : }
561 :
562 : /// Get a list of all existing relations in given tablespace and database.
563 : ///
564 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
565 : /// the shard stores pages for.
566 : ///
567 : /// # Cancel-Safety
568 : ///
569 : /// This method is cancellation-safe.
570 0 : pub(crate) async fn list_rels(
571 0 : &self,
572 0 : spcnode: Oid,
573 0 : dbnode: Oid,
574 0 : version: Version<'_>,
575 0 : ctx: &RequestContext,
576 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
577 0 : // fetch directory listing (old)
578 0 : let key = rel_dir_to_key(spcnode, dbnode);
579 0 : let buf = version.get(self, key, ctx).await?;
580 :
581 0 : let dir = RelDirectory::des(&buf)?;
582 0 : let rels_v1: HashSet<RelTag> =
583 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
584 0 : spcnode,
585 0 : dbnode,
586 0 : relnode: *relnode,
587 0 : forknum: *forknum,
588 0 : }));
589 0 :
590 0 : if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
591 0 : return Ok(rels_v1);
592 0 : }
593 0 :
594 0 : // scan directory listing (new), merge with the old results
595 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
596 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
597 0 : self.conf,
598 0 : self.gate
599 0 : .enter()
600 0 : .map_err(|_| PageReconstructError::Cancelled)?,
601 : );
602 0 : let results = self
603 0 : .scan(
604 0 : KeySpace::single(key_range),
605 0 : version.get_lsn(),
606 0 : ctx,
607 0 : io_concurrency,
608 0 : )
609 0 : .await?;
610 0 : let mut rels = rels_v1;
611 0 : for (key, val) in results {
612 0 : let val = RelDirExists::decode(&val?)
613 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
614 0 : assert_eq!(key.field6, 1);
615 0 : assert_eq!(key.field2, spcnode);
616 0 : assert_eq!(key.field3, dbnode);
617 0 : let tag = RelTag {
618 0 : spcnode,
619 0 : dbnode,
620 0 : relnode: key.field4,
621 0 : forknum: key.field5,
622 0 : };
623 0 : if val == RelDirExists::Removed {
624 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
625 0 : continue;
626 0 : }
627 0 : let did_not_contain = rels.insert(tag);
628 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
629 : }
630 0 : Ok(rels)
631 0 : }
632 :
633 : /// Get the whole SLRU segment
634 0 : pub(crate) async fn get_slru_segment(
635 0 : &self,
636 0 : kind: SlruKind,
637 0 : segno: u32,
638 0 : lsn: Lsn,
639 0 : ctx: &RequestContext,
640 0 : ) -> Result<Bytes, PageReconstructError> {
641 0 : assert!(self.tenant_shard_id.is_shard_zero());
642 0 : let n_blocks = self
643 0 : .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
644 0 : .await?;
645 :
646 0 : let keyspace = KeySpace::single(
647 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
648 0 : );
649 0 :
650 0 : let batches = keyspace.partition(
651 0 : self.get_shard_identity(),
652 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
653 0 : );
654 :
655 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
656 0 : self.conf,
657 0 : self.gate
658 0 : .enter()
659 0 : .map_err(|_| PageReconstructError::Cancelled)?,
660 : );
661 :
662 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
663 0 : for batch in batches.parts {
664 0 : let blocks = self
665 0 : .get_vectored(batch, lsn, io_concurrency.clone(), ctx)
666 0 : .await?;
667 :
668 0 : for (_key, block) in blocks {
669 0 : let block = block?;
670 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
671 : }
672 : }
673 :
674 0 : Ok(segment.freeze())
675 0 : }
676 :
677 : /// Get size of an SLRU segment
678 0 : pub(crate) async fn get_slru_segment_size(
679 0 : &self,
680 0 : kind: SlruKind,
681 0 : segno: u32,
682 0 : version: Version<'_>,
683 0 : ctx: &RequestContext,
684 0 : ) -> Result<BlockNumber, PageReconstructError> {
685 0 : assert!(self.tenant_shard_id.is_shard_zero());
686 0 : let key = slru_segment_size_to_key(kind, segno);
687 0 : let mut buf = version.get(self, key, ctx).await?;
688 0 : Ok(buf.get_u32_le())
689 0 : }
690 :
691 : /// Does the slru segment exist?
692 0 : pub(crate) async fn get_slru_segment_exists(
693 0 : &self,
694 0 : kind: SlruKind,
695 0 : segno: u32,
696 0 : version: Version<'_>,
697 0 : ctx: &RequestContext,
698 0 : ) -> Result<bool, PageReconstructError> {
699 0 : assert!(self.tenant_shard_id.is_shard_zero());
700 : // fetch directory listing
701 0 : let key = slru_dir_to_key(kind);
702 0 : let buf = version.get(self, key, ctx).await?;
703 :
704 0 : let dir = SlruSegmentDirectory::des(&buf)?;
705 0 : Ok(dir.segments.contains(&segno))
706 0 : }
707 :
708 : /// Locate LSN, such that all transactions that committed before
709 : /// 'search_timestamp' are visible, but nothing newer is.
710 : ///
711 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
712 : /// so it's not well defined which LSN you get if there were multiple commits
713 : /// "in flight" at that point in time.
714 : ///
715 0 : pub(crate) async fn find_lsn_for_timestamp(
716 0 : &self,
717 0 : search_timestamp: TimestampTz,
718 0 : cancel: &CancellationToken,
719 0 : ctx: &RequestContext,
720 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
721 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
722 :
723 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
724 0 : let gc_cutoff_planned = {
725 0 : let gc_info = self.gc_info.read().unwrap();
726 0 : gc_info.min_cutoff()
727 0 : };
728 0 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
729 0 : // but let's be defensive.
730 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
731 0 : // We use this method to figure out the branching LSN for the new branch, but the
732 0 : // GC cutoff could be before the branching point and we cannot create a new branch
733 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
734 0 : // on the safe side.
735 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
736 0 : let max_lsn = self.get_last_record_lsn();
737 0 :
738 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
739 0 : // LSN divided by 8.
740 0 : let mut low = min_lsn.0 / 8;
741 0 : let mut high = max_lsn.0 / 8 + 1;
742 0 :
743 0 : let mut found_smaller = false;
744 0 : let mut found_larger = false;
745 :
746 0 : while low < high {
747 0 : if cancel.is_cancelled() {
748 0 : return Err(PageReconstructError::Cancelled);
749 0 : }
750 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
751 0 : let mid = (high + low) / 2;
752 :
753 0 : let cmp = match self
754 0 : .is_latest_commit_timestamp_ge_than(
755 0 : search_timestamp,
756 0 : Lsn(mid * 8),
757 0 : &mut found_smaller,
758 0 : &mut found_larger,
759 0 : ctx,
760 0 : )
761 0 : .await
762 : {
763 0 : Ok(res) => res,
764 0 : Err(PageReconstructError::MissingKey(e)) => {
765 0 : warn!(
766 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
767 : e
768 : );
769 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
770 0 : return Ok(LsnForTimestamp::Past(min_lsn));
771 : }
772 0 : Err(e) => return Err(e),
773 : };
774 :
775 0 : if cmp {
776 0 : high = mid;
777 0 : } else {
778 0 : low = mid + 1;
779 0 : }
780 : }
781 :
782 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
783 : // so the LSN of the last commit record before or at `search_timestamp`.
784 : // Remove one from `low` to get `t`.
785 : //
786 : // FIXME: it would be better to get the LSN of the previous commit.
787 : // Otherwise, if you restore to the returned LSN, the database will
788 : // include physical changes from later commits that will be marked
789 : // as aborted, and will need to be vacuumed away.
790 0 : let commit_lsn = Lsn((low - 1) * 8);
791 0 : match (found_smaller, found_larger) {
792 : (false, false) => {
793 : // This can happen if no commit records have been processed yet, e.g.
794 : // just after importing a cluster.
795 0 : Ok(LsnForTimestamp::NoData(min_lsn))
796 : }
797 : (false, true) => {
798 : // Didn't find any commit timestamps smaller than the request
799 0 : Ok(LsnForTimestamp::Past(min_lsn))
800 : }
801 0 : (true, _) if commit_lsn < min_lsn => {
802 0 : // the search above did set found_smaller to true but it never increased the lsn.
803 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
804 0 : // below the min_lsn. We should never do that.
805 0 : Ok(LsnForTimestamp::Past(min_lsn))
806 : }
807 : (true, false) => {
808 : // Only found commits with timestamps smaller than the request.
809 : // It's still a valid case for branch creation, return it.
810 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
811 : // case, anyway.
812 0 : Ok(LsnForTimestamp::Future(commit_lsn))
813 : }
814 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
815 : }
816 0 : }
817 :
818 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
819 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
820 : ///
821 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
822 : /// with a smaller/larger timestamp.
823 : ///
824 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
825 0 : &self,
826 0 : search_timestamp: TimestampTz,
827 0 : probe_lsn: Lsn,
828 0 : found_smaller: &mut bool,
829 0 : found_larger: &mut bool,
830 0 : ctx: &RequestContext,
831 0 : ) -> Result<bool, PageReconstructError> {
832 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
833 0 : if timestamp >= search_timestamp {
834 0 : *found_larger = true;
835 0 : return ControlFlow::Break(true);
836 0 : } else {
837 0 : *found_smaller = true;
838 0 : }
839 0 : ControlFlow::Continue(())
840 0 : })
841 0 : .await
842 0 : }
843 :
844 : /// Obtain the timestamp for the given lsn.
845 : ///
846 : /// If the lsn has no timestamps (e.g. no commits), returns None.
847 0 : pub(crate) async fn get_timestamp_for_lsn(
848 0 : &self,
849 0 : probe_lsn: Lsn,
850 0 : ctx: &RequestContext,
851 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
852 0 : let mut max: Option<TimestampTz> = None;
853 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
854 0 : if let Some(max_prev) = max {
855 0 : max = Some(max_prev.max(timestamp));
856 0 : } else {
857 0 : max = Some(timestamp);
858 0 : }
859 0 : ControlFlow::Continue(())
860 0 : })
861 0 : .await?;
862 :
863 0 : Ok(max)
864 0 : }
865 :
866 : /// Runs the given function on all the timestamps for a given lsn
867 : ///
868 : /// The return value is either given by the closure, or set to the `Default`
869 : /// impl's output.
870 0 : async fn map_all_timestamps<T: Default>(
871 0 : &self,
872 0 : probe_lsn: Lsn,
873 0 : ctx: &RequestContext,
874 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
875 0 : ) -> Result<T, PageReconstructError> {
876 0 : for segno in self
877 0 : .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
878 0 : .await?
879 : {
880 0 : let nblocks = self
881 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
882 0 : .await?;
883 :
884 0 : let keyspace = KeySpace::single(
885 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
886 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
887 0 : );
888 0 :
889 0 : let batches = keyspace.partition(
890 0 : self.get_shard_identity(),
891 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
892 0 : );
893 :
894 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
895 0 : self.conf,
896 0 : self.gate
897 0 : .enter()
898 0 : .map_err(|_| PageReconstructError::Cancelled)?,
899 : );
900 :
901 0 : for batch in batches.parts.into_iter().rev() {
902 0 : let blocks = self
903 0 : .get_vectored(batch, probe_lsn, io_concurrency.clone(), ctx)
904 0 : .await?;
905 :
906 0 : for (_key, clog_page) in blocks.into_iter().rev() {
907 0 : let clog_page = clog_page?;
908 :
909 0 : if clog_page.len() == BLCKSZ as usize + 8 {
910 0 : let mut timestamp_bytes = [0u8; 8];
911 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
912 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
913 0 :
914 0 : match f(timestamp) {
915 0 : ControlFlow::Break(b) => return Ok(b),
916 0 : ControlFlow::Continue(()) => (),
917 : }
918 0 : }
919 : }
920 : }
921 : }
922 0 : Ok(Default::default())
923 0 : }
924 :
925 0 : pub(crate) async fn get_slru_keyspace(
926 0 : &self,
927 0 : version: Version<'_>,
928 0 : ctx: &RequestContext,
929 0 : ) -> Result<KeySpace, PageReconstructError> {
930 0 : let mut accum = KeySpaceAccum::new();
931 :
932 0 : for kind in SlruKind::iter() {
933 0 : let mut segments: Vec<u32> = self
934 0 : .list_slru_segments(kind, version, ctx)
935 0 : .await?
936 0 : .into_iter()
937 0 : .collect();
938 0 : segments.sort_unstable();
939 :
940 0 : for seg in segments {
941 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
942 :
943 0 : accum.add_range(
944 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
945 0 : );
946 : }
947 : }
948 :
949 0 : Ok(accum.to_keyspace())
950 0 : }
951 :
952 : /// Get a list of SLRU segments
953 0 : pub(crate) async fn list_slru_segments(
954 0 : &self,
955 0 : kind: SlruKind,
956 0 : version: Version<'_>,
957 0 : ctx: &RequestContext,
958 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
959 0 : // fetch directory entry
960 0 : let key = slru_dir_to_key(kind);
961 :
962 0 : let buf = version.get(self, key, ctx).await?;
963 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
964 0 : }
965 :
966 0 : pub(crate) async fn get_relmap_file(
967 0 : &self,
968 0 : spcnode: Oid,
969 0 : dbnode: Oid,
970 0 : version: Version<'_>,
971 0 : ctx: &RequestContext,
972 0 : ) -> Result<Bytes, PageReconstructError> {
973 0 : let key = relmap_file_key(spcnode, dbnode);
974 :
975 0 : let buf = version.get(self, key, ctx).await?;
976 0 : Ok(buf)
977 0 : }
978 :
979 660 : pub(crate) async fn list_dbdirs(
980 660 : &self,
981 660 : lsn: Lsn,
982 660 : ctx: &RequestContext,
983 660 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
984 : // fetch directory entry
985 660 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
986 :
987 660 : Ok(DbDirectory::des(&buf)?.dbdirs)
988 660 : }
989 :
990 0 : pub(crate) async fn get_twophase_file(
991 0 : &self,
992 0 : xid: u64,
993 0 : lsn: Lsn,
994 0 : ctx: &RequestContext,
995 0 : ) -> Result<Bytes, PageReconstructError> {
996 0 : let key = twophase_file_key(xid);
997 0 : let buf = self.get(key, lsn, ctx).await?;
998 0 : Ok(buf)
999 0 : }
1000 :
1001 664 : pub(crate) async fn list_twophase_files(
1002 664 : &self,
1003 664 : lsn: Lsn,
1004 664 : ctx: &RequestContext,
1005 664 : ) -> Result<HashSet<u64>, PageReconstructError> {
1006 : // fetch directory entry
1007 664 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
1008 :
1009 664 : if self.pg_version >= 17 {
1010 0 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
1011 : } else {
1012 664 : Ok(TwoPhaseDirectory::des(&buf)?
1013 : .xids
1014 664 : .iter()
1015 664 : .map(|x| u64::from(*x))
1016 664 : .collect())
1017 : }
1018 664 : }
1019 :
1020 0 : pub(crate) async fn get_control_file(
1021 0 : &self,
1022 0 : lsn: Lsn,
1023 0 : ctx: &RequestContext,
1024 0 : ) -> Result<Bytes, PageReconstructError> {
1025 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1026 0 : }
1027 :
1028 24 : pub(crate) async fn get_checkpoint(
1029 24 : &self,
1030 24 : lsn: Lsn,
1031 24 : ctx: &RequestContext,
1032 24 : ) -> Result<Bytes, PageReconstructError> {
1033 24 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1034 24 : }
1035 :
1036 24 : async fn list_aux_files_v2(
1037 24 : &self,
1038 24 : lsn: Lsn,
1039 24 : ctx: &RequestContext,
1040 24 : io_concurrency: IoConcurrency,
1041 24 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1042 24 : let kv = self
1043 24 : .scan(
1044 24 : KeySpace::single(Key::metadata_aux_key_range()),
1045 24 : lsn,
1046 24 : ctx,
1047 24 : io_concurrency,
1048 24 : )
1049 24 : .await?;
1050 24 : let mut result = HashMap::new();
1051 24 : let mut sz = 0;
1052 60 : for (_, v) in kv {
1053 36 : let v = v?;
1054 36 : let v = aux_file::decode_file_value_bytes(&v)
1055 36 : .context("value decode")
1056 36 : .map_err(PageReconstructError::Other)?;
1057 68 : for (fname, content) in v {
1058 32 : sz += fname.len();
1059 32 : sz += content.len();
1060 32 : result.insert(fname, content);
1061 32 : }
1062 : }
1063 24 : self.aux_file_size_estimator.on_initial(sz);
1064 24 : Ok(result)
1065 24 : }
1066 :
1067 0 : pub(crate) async fn trigger_aux_file_size_computation(
1068 0 : &self,
1069 0 : lsn: Lsn,
1070 0 : ctx: &RequestContext,
1071 0 : io_concurrency: IoConcurrency,
1072 0 : ) -> Result<(), PageReconstructError> {
1073 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1074 0 : Ok(())
1075 0 : }
1076 :
1077 24 : pub(crate) async fn list_aux_files(
1078 24 : &self,
1079 24 : lsn: Lsn,
1080 24 : ctx: &RequestContext,
1081 24 : io_concurrency: IoConcurrency,
1082 24 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1083 24 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1084 24 : }
1085 :
1086 0 : pub(crate) async fn get_replorigins(
1087 0 : &self,
1088 0 : lsn: Lsn,
1089 0 : ctx: &RequestContext,
1090 0 : io_concurrency: IoConcurrency,
1091 0 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1092 0 : let kv = self
1093 0 : .scan(
1094 0 : KeySpace::single(repl_origin_key_range()),
1095 0 : lsn,
1096 0 : ctx,
1097 0 : io_concurrency,
1098 0 : )
1099 0 : .await?;
1100 0 : let mut result = HashMap::new();
1101 0 : for (k, v) in kv {
1102 0 : let v = v?;
1103 0 : let origin_id = k.field6 as RepOriginId;
1104 0 : let origin_lsn = Lsn::des(&v).unwrap();
1105 0 : if origin_lsn != Lsn::INVALID {
1106 0 : result.insert(origin_id, origin_lsn);
1107 0 : }
1108 : }
1109 0 : Ok(result)
1110 0 : }
1111 :
1112 : /// Does the same as get_current_logical_size but counted on demand.
1113 : /// Used to initialize the logical size tracking on startup.
1114 : ///
1115 : /// Only relation blocks are counted currently. That excludes metadata,
1116 : /// SLRUs, twophase files etc.
1117 : ///
1118 : /// # Cancel-Safety
1119 : ///
1120 : /// This method is cancellation-safe.
1121 28 : pub(crate) async fn get_current_logical_size_non_incremental(
1122 28 : &self,
1123 28 : lsn: Lsn,
1124 28 : ctx: &RequestContext,
1125 28 : ) -> Result<u64, CalculateLogicalSizeError> {
1126 28 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1127 28 :
1128 28 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1129 :
1130 : // Fetch list of database dirs and iterate them
1131 28 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1132 28 : let dbdir = DbDirectory::des(&buf)?;
1133 :
1134 28 : let mut total_size: u64 = 0;
1135 28 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1136 0 : for rel in self
1137 0 : .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
1138 0 : .await?
1139 : {
1140 0 : if self.cancel.is_cancelled() {
1141 0 : return Err(CalculateLogicalSizeError::Cancelled);
1142 0 : }
1143 0 : let relsize_key = rel_size_to_key(rel);
1144 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1145 0 : let relsize = buf.get_u32_le();
1146 0 :
1147 0 : total_size += relsize as u64;
1148 : }
1149 : }
1150 28 : Ok(total_size * BLCKSZ as u64)
1151 28 : }
1152 :
1153 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1154 : /// for gc-compaction.
1155 : ///
1156 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1157 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1158 : /// be kept only for a specific range of LSN.
1159 : ///
1160 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1161 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1162 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1163 : /// determine which keys to retain/drop for gc-compaction.
1164 : ///
1165 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1166 : /// to be retained for each of the branch LSN.
1167 : ///
1168 : /// The return value is (dense keyspace, sparse keyspace).
1169 108 : pub(crate) async fn collect_gc_compaction_keyspace(
1170 108 : &self,
1171 108 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1172 108 : let metadata_key_begin = Key::metadata_key_range().start;
1173 108 : let aux_v1_key = AUX_FILES_KEY;
1174 108 : let dense_keyspace = KeySpace {
1175 108 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1176 108 : };
1177 108 : Ok((
1178 108 : dense_keyspace,
1179 108 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1180 108 : ))
1181 108 : }
1182 :
1183 : ///
1184 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1185 : /// Anything that's not listed maybe removed from the underlying storage (from
1186 : /// that LSN forwards).
1187 : ///
1188 : /// The return value is (dense keyspace, sparse keyspace).
1189 660 : pub(crate) async fn collect_keyspace(
1190 660 : &self,
1191 660 : lsn: Lsn,
1192 660 : ctx: &RequestContext,
1193 660 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1194 660 : // Iterate through key ranges, greedily packing them into partitions
1195 660 : let mut result = KeySpaceAccum::new();
1196 660 :
1197 660 : // The dbdir metadata always exists
1198 660 : result.add_key(DBDIR_KEY);
1199 :
1200 : // Fetch list of database dirs and iterate them
1201 660 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1202 660 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1203 660 :
1204 660 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1205 660 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1206 0 : if has_relmap_file {
1207 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1208 0 : }
1209 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1210 :
1211 0 : let mut rels: Vec<RelTag> = self
1212 0 : .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
1213 0 : .await?
1214 0 : .into_iter()
1215 0 : .collect();
1216 0 : rels.sort_unstable();
1217 0 : for rel in rels {
1218 0 : let relsize_key = rel_size_to_key(rel);
1219 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1220 0 : let relsize = buf.get_u32_le();
1221 0 :
1222 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1223 0 : result.add_key(relsize_key);
1224 : }
1225 : }
1226 :
1227 : // Iterate SLRUs next
1228 660 : if self.tenant_shard_id.is_shard_zero() {
1229 1944 : for kind in [
1230 648 : SlruKind::Clog,
1231 648 : SlruKind::MultiXactMembers,
1232 648 : SlruKind::MultiXactOffsets,
1233 : ] {
1234 1944 : let slrudir_key = slru_dir_to_key(kind);
1235 1944 : result.add_key(slrudir_key);
1236 1944 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1237 1944 : let dir = SlruSegmentDirectory::des(&buf)?;
1238 1944 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1239 1944 : segments.sort_unstable();
1240 1944 : for segno in segments {
1241 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1242 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1243 0 : let segsize = buf.get_u32_le();
1244 0 :
1245 0 : result.add_range(
1246 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1247 0 : );
1248 0 : result.add_key(segsize_key);
1249 : }
1250 : }
1251 12 : }
1252 :
1253 : // Then pg_twophase
1254 660 : result.add_key(TWOPHASEDIR_KEY);
1255 :
1256 660 : let mut xids: Vec<u64> = self
1257 660 : .list_twophase_files(lsn, ctx)
1258 660 : .await?
1259 660 : .iter()
1260 660 : .cloned()
1261 660 : .collect();
1262 660 : xids.sort_unstable();
1263 660 : for xid in xids {
1264 0 : result.add_key(twophase_file_key(xid));
1265 0 : }
1266 :
1267 660 : result.add_key(CONTROLFILE_KEY);
1268 660 : result.add_key(CHECKPOINT_KEY);
1269 660 :
1270 660 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1271 660 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1272 660 : // and the keys will not be garbage-colllected.
1273 660 : #[cfg(test)]
1274 660 : {
1275 660 : let guard = self.extra_test_dense_keyspace.load();
1276 660 : for kr in &guard.ranges {
1277 0 : result.add_range(kr.clone());
1278 0 : }
1279 0 : }
1280 0 :
1281 660 : let dense_keyspace = result.to_keyspace();
1282 660 : let sparse_keyspace = SparseKeySpace(KeySpace {
1283 660 : ranges: vec![
1284 660 : Key::metadata_aux_key_range(),
1285 660 : repl_origin_key_range(),
1286 660 : Key::rel_dir_sparse_key_range(),
1287 660 : ],
1288 660 : });
1289 660 :
1290 660 : if cfg!(debug_assertions) {
1291 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1292 :
1293 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1294 : // category of sparse keys are split into their own image/delta files. If there
1295 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1296 : // and we want the developer to keep the keyspaces separated.
1297 :
1298 660 : let ranges = &sparse_keyspace.0.ranges;
1299 :
1300 : // TODO: use a single overlaps_with across the codebase
1301 1980 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1302 1980 : !(a.end <= b.start || b.end <= a.start)
1303 1980 : }
1304 1980 : for i in 0..ranges.len() {
1305 1980 : for j in 0..i {
1306 1980 : if overlaps_with(&ranges[i], &ranges[j]) {
1307 0 : panic!(
1308 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1309 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1310 0 : );
1311 1980 : }
1312 : }
1313 : }
1314 1320 : for i in 1..ranges.len() {
1315 1320 : assert!(
1316 1320 : ranges[i - 1].end <= ranges[i].start,
1317 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1318 0 : ranges[i - 1].start,
1319 0 : ranges[i - 1].end,
1320 0 : ranges[i].start,
1321 0 : ranges[i].end
1322 : );
1323 : }
1324 0 : }
1325 :
1326 660 : Ok((dense_keyspace, sparse_keyspace))
1327 660 : }
1328 :
1329 : /// Get cached size of relation if it not updated after specified LSN
1330 897080 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
1331 897080 : let rel_size_cache = self.rel_size_cache.read().unwrap();
1332 897080 : if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
1333 897036 : if lsn >= *cached_lsn {
1334 886744 : RELSIZE_CACHE_HITS.inc();
1335 886744 : return Some(*nblocks);
1336 10292 : }
1337 10292 : RELSIZE_CACHE_MISSES_OLD.inc();
1338 44 : }
1339 10336 : RELSIZE_CACHE_MISSES.inc();
1340 10336 : None
1341 897080 : }
1342 :
1343 : /// Update cached relation size if there is no more recent update
1344 10272 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1345 10272 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1346 10272 :
1347 10272 : if lsn < rel_size_cache.complete_as_of {
1348 : // Do not cache old values. It's safe to cache the size on read, as long as
1349 : // the read was at an LSN since we started the WAL ingestion. Reasoning: we
1350 : // never evict values from the cache, so if the relation size changed after
1351 : // 'lsn', the new value is already in the cache.
1352 0 : return;
1353 10272 : }
1354 10272 :
1355 10272 : match rel_size_cache.map.entry(tag) {
1356 10272 : hash_map::Entry::Occupied(mut entry) => {
1357 10272 : let cached_lsn = entry.get_mut();
1358 10272 : if lsn >= cached_lsn.0 {
1359 0 : *cached_lsn = (lsn, nblocks);
1360 10272 : }
1361 : }
1362 0 : hash_map::Entry::Vacant(entry) => {
1363 0 : entry.insert((lsn, nblocks));
1364 0 : RELSIZE_CACHE_ENTRIES.inc();
1365 0 : }
1366 : }
1367 10272 : }
1368 :
1369 : /// Store cached relation size
1370 565440 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1371 565440 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1372 565440 : if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
1373 3840 : RELSIZE_CACHE_ENTRIES.inc();
1374 561600 : }
1375 565440 : }
1376 :
1377 : /// Remove cached relation size
1378 4 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1379 4 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
1380 4 : if rel_size_cache.map.remove(tag).is_some() {
1381 4 : RELSIZE_CACHE_ENTRIES.dec();
1382 4 : }
1383 4 : }
1384 : }
1385 :
1386 : /// DatadirModification represents an operation to ingest an atomic set of
1387 : /// updates to the repository.
1388 : ///
1389 : /// It is created by the 'begin_record' function. It is called for each WAL
1390 : /// record, so that all the modifications by a one WAL record appear atomic.
1391 : pub struct DatadirModification<'a> {
1392 : /// The timeline this modification applies to. You can access this to
1393 : /// read the state, but note that any pending updates are *not* reflected
1394 : /// in the state in 'tline' yet.
1395 : pub tline: &'a Timeline,
1396 :
1397 : /// Current LSN of the modification
1398 : lsn: Lsn,
1399 :
1400 : // The modifications are not applied directly to the underlying key-value store.
1401 : // The put-functions add the modifications here, and they are flushed to the
1402 : // underlying key-value store by the 'finish' function.
1403 : pending_lsns: Vec<Lsn>,
1404 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1405 : pending_nblocks: i64,
1406 :
1407 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1408 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1409 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1410 :
1411 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1412 : /// which keys are stored here.
1413 : pending_data_batch: Option<SerializedValueBatch>,
1414 :
1415 : /// For special "directory" keys that store key-value maps, track the size of the map
1416 : /// if it was updated in this modification.
1417 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1418 :
1419 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1420 : pending_metadata_bytes: usize,
1421 : }
1422 :
1423 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1424 : pub enum MetricsUpdate {
1425 : /// Set the metrics to this value
1426 : Set(u64),
1427 : /// Increment the metrics by this value
1428 : Add(u64),
1429 : /// Decrement the metrics by this value
1430 : Sub(u64),
1431 : }
1432 :
1433 : impl DatadirModification<'_> {
1434 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1435 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1436 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1437 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1438 :
1439 : /// Get the current lsn
1440 836116 : pub(crate) fn get_lsn(&self) -> Lsn {
1441 836116 : self.lsn
1442 836116 : }
1443 :
1444 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1445 0 : self.pending_data_batch
1446 0 : .as_ref()
1447 0 : .map_or(0, |b| b.buffer_size())
1448 0 : + self.pending_metadata_bytes
1449 0 : }
1450 :
1451 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1452 0 : self.pending_data_batch
1453 0 : .as_ref()
1454 0 : .is_some_and(|b| b.has_data())
1455 0 : }
1456 :
1457 : /// Returns statistics about the currently pending modifications.
1458 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1459 0 : let mut stats = DatadirModificationStats::default();
1460 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1461 0 : match value {
1462 0 : Value::Image(_) => stats.metadata_images += 1,
1463 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1464 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1465 : }
1466 : }
1467 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1468 0 : match valuemeta {
1469 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1470 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1471 0 : ValueMeta::Observed(_) => {}
1472 : }
1473 : }
1474 0 : stats
1475 0 : }
1476 :
1477 : /// Set the current lsn
1478 291716 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
1479 291716 : ensure_walingest!(
1480 291716 : lsn >= self.lsn,
1481 291716 : "setting an older lsn {} than {} is not allowed",
1482 291716 : lsn,
1483 291716 : self.lsn
1484 291716 : );
1485 :
1486 291716 : if lsn > self.lsn {
1487 291716 : self.pending_lsns.push(self.lsn);
1488 291716 : self.lsn = lsn;
1489 291716 : }
1490 291716 : Ok(())
1491 291716 : }
1492 :
1493 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1494 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1495 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1496 : ///
1497 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1498 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1499 : /// via [`Self::get`] as soon as their record has been ingested.
1500 1701360 : fn is_data_key(key: &Key) -> bool {
1501 1701360 : key.is_rel_block_key() || key.is_slru_block_key()
1502 1701360 : }
1503 :
1504 : /// Initialize a completely new repository.
1505 : ///
1506 : /// This inserts the directory metadata entries that are assumed to
1507 : /// always exist.
1508 432 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1509 432 : let buf = DbDirectory::ser(&DbDirectory {
1510 432 : dbdirs: HashMap::new(),
1511 432 : })?;
1512 432 : self.pending_directory_entries
1513 432 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1514 432 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1515 :
1516 432 : let buf = if self.tline.pg_version >= 17 {
1517 0 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1518 0 : xids: HashSet::new(),
1519 0 : })
1520 : } else {
1521 432 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1522 432 : xids: HashSet::new(),
1523 432 : })
1524 0 : }?;
1525 432 : self.pending_directory_entries
1526 432 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1527 432 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1528 :
1529 432 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1530 432 : let empty_dir = Value::Image(buf);
1531 432 :
1532 432 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1533 432 : // harmless but they'd just be dropped on later compaction.
1534 432 : if self.tline.tenant_shard_id.is_shard_zero() {
1535 420 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1536 420 : self.pending_directory_entries.push((
1537 420 : DirectoryKind::SlruSegment(SlruKind::Clog),
1538 420 : MetricsUpdate::Set(0),
1539 420 : ));
1540 420 : self.put(
1541 420 : slru_dir_to_key(SlruKind::MultiXactMembers),
1542 420 : empty_dir.clone(),
1543 420 : );
1544 420 : self.pending_directory_entries.push((
1545 420 : DirectoryKind::SlruSegment(SlruKind::Clog),
1546 420 : MetricsUpdate::Set(0),
1547 420 : ));
1548 420 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1549 420 : self.pending_directory_entries.push((
1550 420 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1551 420 : MetricsUpdate::Set(0),
1552 420 : ));
1553 420 : }
1554 :
1555 432 : Ok(())
1556 432 : }
1557 :
1558 : #[cfg(test)]
1559 428 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1560 428 : self.init_empty()?;
1561 428 : self.put_control_file(bytes::Bytes::from_static(
1562 428 : b"control_file contents do not matter",
1563 428 : ))
1564 428 : .context("put_control_file")?;
1565 428 : self.put_checkpoint(bytes::Bytes::from_static(
1566 428 : b"checkpoint_file contents do not matter",
1567 428 : ))
1568 428 : .context("put_checkpoint_file")?;
1569 428 : Ok(())
1570 428 : }
1571 :
1572 : /// Creates a relation if it is not already present.
1573 : /// Returns the current size of the relation
1574 836112 : pub(crate) async fn create_relation_if_required(
1575 836112 : &mut self,
1576 836112 : rel: RelTag,
1577 836112 : ctx: &RequestContext,
1578 836112 : ) -> Result<u32, WalIngestError> {
1579 : // Get current size and put rel creation if rel doesn't exist
1580 : //
1581 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1582 : // check the cache too. This is because eagerly checking the cache results in
1583 : // less work overall and 10% better performance. It's more work on cache miss
1584 : // but cache miss is rare.
1585 836112 : if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
1586 836092 : Ok(nblocks)
1587 20 : } else if !self
1588 20 : .tline
1589 20 : .get_rel_exists(rel, Version::Modified(self), ctx)
1590 20 : .await?
1591 : {
1592 : // create it with 0 size initially, the logic below will extend it
1593 20 : self.put_rel_creation(rel, 0, ctx).await?;
1594 20 : Ok(0)
1595 : } else {
1596 0 : Ok(self
1597 0 : .tline
1598 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1599 0 : .await?)
1600 : }
1601 836112 : }
1602 :
1603 : /// Given a block number for a relation (which represents a newly written block),
1604 : /// the previous block count of the relation, and the shard info, find the gaps
1605 : /// that were created by the newly written block if any.
1606 291340 : fn find_gaps(
1607 291340 : rel: RelTag,
1608 291340 : blkno: u32,
1609 291340 : previous_nblocks: u32,
1610 291340 : shard: &ShardIdentity,
1611 291340 : ) -> Option<KeySpace> {
1612 291340 : let mut key = rel_block_to_key(rel, blkno);
1613 291340 : let mut gap_accum = None;
1614 :
1615 291340 : for gap_blkno in previous_nblocks..blkno {
1616 64 : key.field6 = gap_blkno;
1617 64 :
1618 64 : if shard.get_shard_number(&key) != shard.number {
1619 16 : continue;
1620 48 : }
1621 48 :
1622 48 : gap_accum
1623 48 : .get_or_insert_with(KeySpaceAccum::new)
1624 48 : .add_key(key);
1625 : }
1626 :
1627 291340 : gap_accum.map(|accum| accum.to_keyspace())
1628 291340 : }
1629 :
1630 291704 : pub async fn ingest_batch(
1631 291704 : &mut self,
1632 291704 : mut batch: SerializedValueBatch,
1633 291704 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1634 291704 : shard: &ShardIdentity,
1635 291704 : ctx: &RequestContext,
1636 291704 : ) -> Result<(), WalIngestError> {
1637 291704 : let mut gaps_at_lsns = Vec::default();
1638 :
1639 291704 : for meta in batch.metadata.iter() {
1640 291284 : let key = Key::from_compact(meta.key());
1641 291284 : let (rel, blkno) = key
1642 291284 : .to_rel_block()
1643 291284 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
1644 291284 : let new_nblocks = blkno + 1;
1645 :
1646 291284 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1647 291284 : if new_nblocks > old_nblocks {
1648 4780 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1649 286504 : }
1650 :
1651 291284 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1652 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1653 291284 : }
1654 : }
1655 :
1656 291704 : if !gaps_at_lsns.is_empty() {
1657 0 : batch.zero_gaps(gaps_at_lsns);
1658 291704 : }
1659 :
1660 291704 : match self.pending_data_batch.as_mut() {
1661 40 : Some(pending_batch) => {
1662 40 : pending_batch.extend(batch);
1663 40 : }
1664 291664 : None if batch.has_data() => {
1665 291260 : self.pending_data_batch = Some(batch);
1666 291260 : }
1667 404 : None => {
1668 404 : // Nothing to initialize the batch with
1669 404 : }
1670 : }
1671 :
1672 291704 : Ok(())
1673 291704 : }
1674 :
1675 : /// Put a new page version that can be constructed from a WAL record
1676 : ///
1677 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1678 : /// current end-of-file. It's up to the caller to check that the relation size
1679 : /// matches the blocks inserted!
1680 24 : pub fn put_rel_wal_record(
1681 24 : &mut self,
1682 24 : rel: RelTag,
1683 24 : blknum: BlockNumber,
1684 24 : rec: NeonWalRecord,
1685 24 : ) -> Result<(), WalIngestError> {
1686 24 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1687 24 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1688 24 : Ok(())
1689 24 : }
1690 :
1691 : // Same, but for an SLRU.
1692 16 : pub fn put_slru_wal_record(
1693 16 : &mut self,
1694 16 : kind: SlruKind,
1695 16 : segno: u32,
1696 16 : blknum: BlockNumber,
1697 16 : rec: NeonWalRecord,
1698 16 : ) -> Result<(), WalIngestError> {
1699 16 : if !self.tline.tenant_shard_id.is_shard_zero() {
1700 0 : return Ok(());
1701 16 : }
1702 16 :
1703 16 : self.put(
1704 16 : slru_block_to_key(kind, segno, blknum),
1705 16 : Value::WalRecord(rec),
1706 16 : );
1707 16 : Ok(())
1708 16 : }
1709 :
1710 : /// Like put_wal_record, but with ready-made image of the page.
1711 555684 : pub fn put_rel_page_image(
1712 555684 : &mut self,
1713 555684 : rel: RelTag,
1714 555684 : blknum: BlockNumber,
1715 555684 : img: Bytes,
1716 555684 : ) -> Result<(), WalIngestError> {
1717 555684 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1718 555684 : let key = rel_block_to_key(rel, blknum);
1719 555684 : if !key.is_valid_key_on_write_path() {
1720 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1721 555684 : }
1722 555684 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1723 555684 : Ok(())
1724 555684 : }
1725 :
1726 12 : pub fn put_slru_page_image(
1727 12 : &mut self,
1728 12 : kind: SlruKind,
1729 12 : segno: u32,
1730 12 : blknum: BlockNumber,
1731 12 : img: Bytes,
1732 12 : ) -> Result<(), WalIngestError> {
1733 12 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1734 :
1735 12 : let key = slru_block_to_key(kind, segno, blknum);
1736 12 : if !key.is_valid_key_on_write_path() {
1737 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1738 12 : }
1739 12 : self.put(key, Value::Image(img));
1740 12 : Ok(())
1741 12 : }
1742 :
1743 5996 : pub(crate) fn put_rel_page_image_zero(
1744 5996 : &mut self,
1745 5996 : rel: RelTag,
1746 5996 : blknum: BlockNumber,
1747 5996 : ) -> Result<(), WalIngestError> {
1748 5996 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1749 5996 : let key = rel_block_to_key(rel, blknum);
1750 5996 : if !key.is_valid_key_on_write_path() {
1751 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1752 5996 : }
1753 :
1754 5996 : let batch = self
1755 5996 : .pending_data_batch
1756 5996 : .get_or_insert_with(SerializedValueBatch::default);
1757 5996 :
1758 5996 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1759 5996 :
1760 5996 : Ok(())
1761 5996 : }
1762 :
1763 0 : pub(crate) fn put_slru_page_image_zero(
1764 0 : &mut self,
1765 0 : kind: SlruKind,
1766 0 : segno: u32,
1767 0 : blknum: BlockNumber,
1768 0 : ) -> Result<(), WalIngestError> {
1769 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1770 0 : let key = slru_block_to_key(kind, segno, blknum);
1771 0 : if !key.is_valid_key_on_write_path() {
1772 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1773 0 : }
1774 :
1775 0 : let batch = self
1776 0 : .pending_data_batch
1777 0 : .get_or_insert_with(SerializedValueBatch::default);
1778 0 :
1779 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1780 0 :
1781 0 : Ok(())
1782 0 : }
1783 :
1784 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
1785 : /// we enable it, we also need to persist it in `index_part.json`.
1786 3892 : pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
1787 3892 : let status = self.tline.get_rel_size_v2_status();
1788 3892 : let config = self.tline.get_rel_size_v2_enabled();
1789 3892 : match (config, status) {
1790 : (false, RelSizeMigration::Legacy) => {
1791 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
1792 3892 : Ok(false)
1793 : }
1794 : (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1795 : // index_part already persisted that the timeline has enabled rel_size_v2
1796 0 : Ok(true)
1797 : }
1798 : (true, RelSizeMigration::Legacy) => {
1799 : // The first time we enable it, we need to persist it in `index_part.json`
1800 0 : self.tline
1801 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
1802 0 : tracing::info!("enabled rel_size_v2");
1803 0 : Ok(true)
1804 : }
1805 : (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1806 : // index_part already persisted that the timeline has enabled rel_size_v2
1807 : // and we don't need to do anything
1808 0 : Ok(true)
1809 : }
1810 : }
1811 3892 : }
1812 :
1813 : /// Store a relmapper file (pg_filenode.map) in the repository
1814 32 : pub async fn put_relmap_file(
1815 32 : &mut self,
1816 32 : spcnode: Oid,
1817 32 : dbnode: Oid,
1818 32 : img: Bytes,
1819 32 : ctx: &RequestContext,
1820 32 : ) -> Result<(), WalIngestError> {
1821 32 : let v2_enabled = self
1822 32 : .maybe_enable_rel_size_v2()
1823 32 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
1824 :
1825 : // Add it to the directory (if it doesn't exist already)
1826 32 : let buf = self.get(DBDIR_KEY, ctx).await?;
1827 32 : let mut dbdir = DbDirectory::des(&buf)?;
1828 :
1829 32 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1830 32 : if r.is_none() || r == Some(false) {
1831 : // The dbdir entry didn't exist, or it contained a
1832 : // 'false'. The 'insert' call already updated it with
1833 : // 'true', now write the updated 'dbdirs' map back.
1834 32 : let buf = DbDirectory::ser(&dbdir)?;
1835 32 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1836 0 : }
1837 32 : if r.is_none() {
1838 : // Create RelDirectory
1839 : // TODO: if we have fully migrated to v2, no need to create this directory
1840 16 : let buf = RelDirectory::ser(&RelDirectory {
1841 16 : rels: HashSet::new(),
1842 16 : })?;
1843 16 : self.pending_directory_entries
1844 16 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1845 16 : if v2_enabled {
1846 0 : self.pending_directory_entries
1847 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1848 16 : }
1849 16 : self.put(
1850 16 : rel_dir_to_key(spcnode, dbnode),
1851 16 : Value::Image(Bytes::from(buf)),
1852 16 : );
1853 16 : }
1854 :
1855 32 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1856 32 : Ok(())
1857 32 : }
1858 :
1859 0 : pub async fn put_twophase_file(
1860 0 : &mut self,
1861 0 : xid: u64,
1862 0 : img: Bytes,
1863 0 : ctx: &RequestContext,
1864 0 : ) -> Result<(), WalIngestError> {
1865 : // Add it to the directory entry
1866 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1867 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1868 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1869 0 : if !dir.xids.insert(xid) {
1870 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
1871 0 : }
1872 0 : self.pending_directory_entries.push((
1873 0 : DirectoryKind::TwoPhase,
1874 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1875 0 : ));
1876 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1877 : } else {
1878 0 : let xid = xid as u32;
1879 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1880 0 : if !dir.xids.insert(xid) {
1881 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
1882 0 : }
1883 0 : self.pending_directory_entries.push((
1884 0 : DirectoryKind::TwoPhase,
1885 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1886 0 : ));
1887 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1888 : };
1889 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1890 0 :
1891 0 : self.put(twophase_file_key(xid), Value::Image(img));
1892 0 : Ok(())
1893 0 : }
1894 :
1895 0 : pub async fn set_replorigin(
1896 0 : &mut self,
1897 0 : origin_id: RepOriginId,
1898 0 : origin_lsn: Lsn,
1899 0 : ) -> Result<(), WalIngestError> {
1900 0 : let key = repl_origin_key(origin_id);
1901 0 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1902 0 : Ok(())
1903 0 : }
1904 :
1905 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
1906 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1907 0 : }
1908 :
1909 432 : pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
1910 432 : self.put(CONTROLFILE_KEY, Value::Image(img));
1911 432 : Ok(())
1912 432 : }
1913 :
1914 460 : pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
1915 460 : self.put(CHECKPOINT_KEY, Value::Image(img));
1916 460 : Ok(())
1917 460 : }
1918 :
1919 0 : pub async fn drop_dbdir(
1920 0 : &mut self,
1921 0 : spcnode: Oid,
1922 0 : dbnode: Oid,
1923 0 : ctx: &RequestContext,
1924 0 : ) -> Result<(), WalIngestError> {
1925 0 : let total_blocks = self
1926 0 : .tline
1927 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1928 0 : .await?;
1929 :
1930 : // Remove entry from dbdir
1931 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1932 0 : let mut dir = DbDirectory::des(&buf)?;
1933 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1934 0 : let buf = DbDirectory::ser(&dir)?;
1935 0 : self.pending_directory_entries.push((
1936 0 : DirectoryKind::Db,
1937 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
1938 0 : ));
1939 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1940 : } else {
1941 0 : warn!(
1942 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1943 : spcnode, dbnode
1944 : );
1945 : }
1946 :
1947 : // Update logical database size.
1948 0 : self.pending_nblocks -= total_blocks as i64;
1949 0 :
1950 0 : // Delete all relations and metadata files for the spcnode/dnode
1951 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1952 0 : Ok(())
1953 0 : }
1954 :
1955 : /// Create a relation fork.
1956 : ///
1957 : /// 'nblocks' is the initial size.
1958 3840 : pub async fn put_rel_creation(
1959 3840 : &mut self,
1960 3840 : rel: RelTag,
1961 3840 : nblocks: BlockNumber,
1962 3840 : ctx: &RequestContext,
1963 3840 : ) -> Result<(), WalIngestError> {
1964 3840 : if rel.relnode == 0 {
1965 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
1966 0 : "invalid relnode"
1967 0 : )))?;
1968 3840 : }
1969 : // It's possible that this is the first rel for this db in this
1970 : // tablespace. Create the reldir entry for it if so.
1971 3840 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
1972 :
1973 3840 : let dbdir_exists =
1974 3840 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
1975 : // Didn't exist. Update dbdir
1976 16 : e.insert(false);
1977 16 : let buf = DbDirectory::ser(&dbdir)?;
1978 16 : self.pending_directory_entries.push((
1979 16 : DirectoryKind::Db,
1980 16 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
1981 16 : ));
1982 16 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1983 16 : false
1984 : } else {
1985 3824 : true
1986 : };
1987 :
1988 3840 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1989 3840 : let mut rel_dir = if !dbdir_exists {
1990 : // Create the RelDirectory
1991 16 : RelDirectory::default()
1992 : } else {
1993 : // reldir already exists, fetch it
1994 3824 : RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
1995 : };
1996 :
1997 3840 : let v2_enabled = self
1998 3840 : .maybe_enable_rel_size_v2()
1999 3840 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2000 :
2001 3840 : if v2_enabled {
2002 0 : if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
2003 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2004 0 : }
2005 0 : let sparse_rel_dir_key =
2006 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2007 : // check if the rel_dir_key exists in v2
2008 0 : let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
2009 0 : let val = RelDirExists::decode_option(val)
2010 0 : .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
2011 0 : if val == RelDirExists::Exists {
2012 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2013 0 : }
2014 0 : self.put(
2015 0 : sparse_rel_dir_key,
2016 0 : Value::Image(RelDirExists::Exists.encode()),
2017 0 : );
2018 0 : if !dbdir_exists {
2019 0 : self.pending_directory_entries
2020 0 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2021 0 : self.pending_directory_entries
2022 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2023 0 : // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
2024 0 : // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
2025 0 : // will be key not found errors if we don't create an empty one for rel_size_v2.
2026 0 : self.put(
2027 0 : rel_dir_key,
2028 0 : Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
2029 : );
2030 0 : }
2031 0 : self.pending_directory_entries
2032 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2033 : } else {
2034 : // Add the new relation to the rel directory entry, and write it back
2035 3840 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2036 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2037 3840 : }
2038 3840 : if !dbdir_exists {
2039 16 : self.pending_directory_entries
2040 16 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2041 3824 : }
2042 3840 : self.pending_directory_entries
2043 3840 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2044 3840 : self.put(
2045 3840 : rel_dir_key,
2046 3840 : Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
2047 : );
2048 : }
2049 :
2050 : // Put size
2051 3840 : let size_key = rel_size_to_key(rel);
2052 3840 : let buf = nblocks.to_le_bytes();
2053 3840 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2054 3840 :
2055 3840 : self.pending_nblocks += nblocks as i64;
2056 3840 :
2057 3840 : // Update relation size cache
2058 3840 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2059 3840 :
2060 3840 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2061 3840 : // caller.
2062 3840 : Ok(())
2063 3840 : }
2064 :
2065 : /// Truncate relation
2066 12024 : pub async fn put_rel_truncation(
2067 12024 : &mut self,
2068 12024 : rel: RelTag,
2069 12024 : nblocks: BlockNumber,
2070 12024 : ctx: &RequestContext,
2071 12024 : ) -> Result<(), WalIngestError> {
2072 12024 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2073 12024 : if self
2074 12024 : .tline
2075 12024 : .get_rel_exists(rel, Version::Modified(self), ctx)
2076 12024 : .await?
2077 : {
2078 12024 : let size_key = rel_size_to_key(rel);
2079 : // Fetch the old size first
2080 12024 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2081 12024 :
2082 12024 : // Update the entry with the new size.
2083 12024 : let buf = nblocks.to_le_bytes();
2084 12024 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2085 12024 :
2086 12024 : // Update relation size cache
2087 12024 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2088 12024 :
2089 12024 : // Update logical database size.
2090 12024 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2091 0 : }
2092 12024 : Ok(())
2093 12024 : }
2094 :
2095 : /// Extend relation
2096 : /// If new size is smaller, do nothing.
2097 553360 : pub async fn put_rel_extend(
2098 553360 : &mut self,
2099 553360 : rel: RelTag,
2100 553360 : nblocks: BlockNumber,
2101 553360 : ctx: &RequestContext,
2102 553360 : ) -> Result<(), WalIngestError> {
2103 553360 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2104 :
2105 : // Put size
2106 553360 : let size_key = rel_size_to_key(rel);
2107 553360 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2108 553360 :
2109 553360 : // only extend relation here. never decrease the size
2110 553360 : if nblocks > old_size {
2111 549576 : let buf = nblocks.to_le_bytes();
2112 549576 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2113 549576 :
2114 549576 : // Update relation size cache
2115 549576 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2116 549576 :
2117 549576 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2118 549576 : }
2119 553360 : Ok(())
2120 553360 : }
2121 :
2122 : /// Drop some relations
2123 20 : pub(crate) async fn put_rel_drops(
2124 20 : &mut self,
2125 20 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2126 20 : ctx: &RequestContext,
2127 20 : ) -> Result<(), WalIngestError> {
2128 20 : let v2_enabled = self
2129 20 : .maybe_enable_rel_size_v2()
2130 20 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2131 24 : for ((spc_node, db_node), rel_tags) in drop_relations {
2132 4 : let dir_key = rel_dir_to_key(spc_node, db_node);
2133 4 : let buf = self.get(dir_key, ctx).await?;
2134 4 : let mut dir = RelDirectory::des(&buf)?;
2135 :
2136 4 : let mut dirty = false;
2137 8 : for rel_tag in rel_tags {
2138 4 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2139 4 : self.pending_directory_entries
2140 4 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2141 4 : dirty = true;
2142 4 : true
2143 0 : } else if v2_enabled {
2144 : // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
2145 : // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
2146 : // logic).
2147 0 : let key =
2148 0 : rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2149 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2150 0 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2151 0 : if val == RelDirExists::Exists {
2152 0 : self.pending_directory_entries
2153 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2154 0 : // put tombstone
2155 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2156 0 : // no need to set dirty to true
2157 0 : true
2158 : } else {
2159 0 : false
2160 : }
2161 : } else {
2162 0 : false
2163 : };
2164 :
2165 4 : if found {
2166 : // update logical size
2167 4 : let size_key = rel_size_to_key(rel_tag);
2168 4 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2169 4 : self.pending_nblocks -= old_size as i64;
2170 4 :
2171 4 : // Remove entry from relation size cache
2172 4 : self.tline.remove_cached_rel_size(&rel_tag);
2173 4 :
2174 4 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2175 4 : self.delete(rel_key_range(rel_tag));
2176 0 : }
2177 : }
2178 :
2179 4 : if dirty {
2180 4 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2181 0 : }
2182 : }
2183 :
2184 20 : Ok(())
2185 20 : }
2186 :
2187 12 : pub async fn put_slru_segment_creation(
2188 12 : &mut self,
2189 12 : kind: SlruKind,
2190 12 : segno: u32,
2191 12 : nblocks: BlockNumber,
2192 12 : ctx: &RequestContext,
2193 12 : ) -> Result<(), WalIngestError> {
2194 12 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2195 :
2196 : // Add it to the directory entry
2197 12 : let dir_key = slru_dir_to_key(kind);
2198 12 : let buf = self.get(dir_key, ctx).await?;
2199 12 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2200 :
2201 12 : if !dir.segments.insert(segno) {
2202 0 : Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
2203 12 : }
2204 12 : self.pending_directory_entries.push((
2205 12 : DirectoryKind::SlruSegment(kind),
2206 12 : MetricsUpdate::Set(dir.segments.len() as u64),
2207 12 : ));
2208 12 : self.put(
2209 12 : dir_key,
2210 12 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2211 : );
2212 :
2213 : // Put size
2214 12 : let size_key = slru_segment_size_to_key(kind, segno);
2215 12 : let buf = nblocks.to_le_bytes();
2216 12 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2217 12 :
2218 12 : // even if nblocks > 0, we don't insert any actual blocks here
2219 12 :
2220 12 : Ok(())
2221 12 : }
2222 :
2223 : /// Extend SLRU segment
2224 0 : pub fn put_slru_extend(
2225 0 : &mut self,
2226 0 : kind: SlruKind,
2227 0 : segno: u32,
2228 0 : nblocks: BlockNumber,
2229 0 : ) -> Result<(), WalIngestError> {
2230 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2231 :
2232 : // Put size
2233 0 : let size_key = slru_segment_size_to_key(kind, segno);
2234 0 : let buf = nblocks.to_le_bytes();
2235 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2236 0 : Ok(())
2237 0 : }
2238 :
2239 : /// This method is used for marking truncated SLRU files
2240 0 : pub async fn drop_slru_segment(
2241 0 : &mut self,
2242 0 : kind: SlruKind,
2243 0 : segno: u32,
2244 0 : ctx: &RequestContext,
2245 0 : ) -> Result<(), WalIngestError> {
2246 0 : // Remove it from the directory entry
2247 0 : let dir_key = slru_dir_to_key(kind);
2248 0 : let buf = self.get(dir_key, ctx).await?;
2249 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2250 :
2251 0 : if !dir.segments.remove(&segno) {
2252 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2253 0 : }
2254 0 : self.pending_directory_entries.push((
2255 0 : DirectoryKind::SlruSegment(kind),
2256 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2257 0 : ));
2258 0 : self.put(
2259 0 : dir_key,
2260 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2261 : );
2262 :
2263 : // Delete size entry, as well as all blocks
2264 0 : self.delete(slru_segment_key_range(kind, segno));
2265 0 :
2266 0 : Ok(())
2267 0 : }
2268 :
2269 : /// Drop a relmapper file (pg_filenode.map)
2270 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
2271 0 : // TODO
2272 0 : Ok(())
2273 0 : }
2274 :
2275 : /// This method is used for marking truncated SLRU files
2276 0 : pub async fn drop_twophase_file(
2277 0 : &mut self,
2278 0 : xid: u64,
2279 0 : ctx: &RequestContext,
2280 0 : ) -> Result<(), WalIngestError> {
2281 : // Remove it from the directory entry
2282 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2283 0 : let newdirbuf = if self.tline.pg_version >= 17 {
2284 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2285 :
2286 0 : if !dir.xids.remove(&xid) {
2287 0 : warn!("twophase file for xid {} does not exist", xid);
2288 0 : }
2289 0 : self.pending_directory_entries.push((
2290 0 : DirectoryKind::TwoPhase,
2291 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2292 0 : ));
2293 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2294 : } else {
2295 0 : let xid: u32 = u32::try_from(xid)
2296 0 : .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
2297 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2298 :
2299 0 : if !dir.xids.remove(&xid) {
2300 0 : warn!("twophase file for xid {} does not exist", xid);
2301 0 : }
2302 0 : self.pending_directory_entries.push((
2303 0 : DirectoryKind::TwoPhase,
2304 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2305 0 : ));
2306 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2307 : };
2308 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2309 0 :
2310 0 : // Delete it
2311 0 : self.delete(twophase_key_range(xid));
2312 0 :
2313 0 : Ok(())
2314 0 : }
2315 :
2316 32 : pub async fn put_file(
2317 32 : &mut self,
2318 32 : path: &str,
2319 32 : content: &[u8],
2320 32 : ctx: &RequestContext,
2321 32 : ) -> Result<(), WalIngestError> {
2322 32 : let key = aux_file::encode_aux_file_key(path);
2323 : // retrieve the key from the engine
2324 32 : let old_val = match self.get(key, ctx).await {
2325 8 : Ok(val) => Some(val),
2326 24 : Err(PageReconstructError::MissingKey(_)) => None,
2327 0 : Err(e) => return Err(e.into()),
2328 : };
2329 32 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2330 8 : aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
2331 : } else {
2332 24 : Vec::new()
2333 : };
2334 32 : let mut other_files = Vec::with_capacity(files.len());
2335 32 : let mut modifying_file = None;
2336 40 : for file @ (p, content) in files {
2337 8 : if path == p {
2338 8 : assert!(
2339 8 : modifying_file.is_none(),
2340 0 : "duplicated entries found for {}",
2341 : path
2342 : );
2343 8 : modifying_file = Some(content);
2344 0 : } else {
2345 0 : other_files.push(file);
2346 0 : }
2347 : }
2348 32 : let mut new_files = other_files;
2349 32 : match (modifying_file, content.is_empty()) {
2350 4 : (Some(old_content), false) => {
2351 4 : self.tline
2352 4 : .aux_file_size_estimator
2353 4 : .on_update(old_content.len(), content.len());
2354 4 : new_files.push((path, content));
2355 4 : }
2356 4 : (Some(old_content), true) => {
2357 4 : self.tline
2358 4 : .aux_file_size_estimator
2359 4 : .on_remove(old_content.len());
2360 4 : // not adding the file key to the final `new_files` vec.
2361 4 : }
2362 24 : (None, false) => {
2363 24 : self.tline.aux_file_size_estimator.on_add(content.len());
2364 24 : new_files.push((path, content));
2365 24 : }
2366 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2367 : // Compute doesn't know if previous version of this file exists or not, so
2368 : // attempt to delete non-existing file can cause this message.
2369 : // To avoid false alarms, log it as info rather than warning.
2370 0 : (None, true) if path.starts_with("pg_stat/") => {
2371 0 : info!("removing non-existing pg_stat file: {}", path)
2372 : }
2373 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2374 : }
2375 32 : let new_val = aux_file::encode_file_value(&new_files)
2376 32 : .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
2377 32 : self.put(key, Value::Image(new_val.into()));
2378 32 :
2379 32 : Ok(())
2380 32 : }
2381 :
2382 : ///
2383 : /// Flush changes accumulated so far to the underlying repository.
2384 : ///
2385 : /// Usually, changes made in DatadirModification are atomic, but this allows
2386 : /// you to flush them to the underlying repository before the final `commit`.
2387 : /// That allows to free up the memory used to hold the pending changes.
2388 : ///
2389 : /// Currently only used during bulk import of a data directory. In that
2390 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2391 : /// whole import fails and the timeline will be deleted anyway.
2392 : /// (Or to be precise, it will be left behind for debugging purposes and
2393 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2394 : ///
2395 : /// Note: A consequence of flushing the pending operations is that they
2396 : /// won't be visible to subsequent operations until `commit`. The function
2397 : /// retains all the metadata, but data pages are flushed. That's again OK
2398 : /// for bulk import, where you are just loading data pages and won't try to
2399 : /// modify the same pages twice.
2400 3860 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2401 3860 : // Unless we have accumulated a decent amount of changes, it's not worth it
2402 3860 : // to scan through the pending_updates list.
2403 3860 : let pending_nblocks = self.pending_nblocks;
2404 3860 : if pending_nblocks < 10000 {
2405 3860 : return Ok(());
2406 0 : }
2407 :
2408 0 : let mut writer = self.tline.writer().await;
2409 :
2410 : // Flush relation and SLRU data blocks, keep metadata.
2411 0 : if let Some(batch) = self.pending_data_batch.take() {
2412 0 : tracing::debug!(
2413 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2414 0 : batch.max_lsn,
2415 0 : self.tline.get_last_record_lsn()
2416 : );
2417 :
2418 : // This bails out on first error without modifying pending_updates.
2419 : // That's Ok, cf this function's doc comment.
2420 0 : writer.put_batch(batch, ctx).await?;
2421 0 : }
2422 :
2423 0 : if pending_nblocks != 0 {
2424 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2425 0 : self.pending_nblocks = 0;
2426 0 : }
2427 :
2428 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2429 0 : writer.update_directory_entries_count(kind, count);
2430 0 : }
2431 :
2432 0 : Ok(())
2433 3860 : }
2434 :
2435 : ///
2436 : /// Finish this atomic update, writing all the updated keys to the
2437 : /// underlying timeline.
2438 : /// All the modifications in this atomic update are stamped by the specified LSN.
2439 : ///
2440 1486204 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2441 1486204 : let mut writer = self.tline.writer().await;
2442 :
2443 1486204 : let pending_nblocks = self.pending_nblocks;
2444 1486204 : self.pending_nblocks = 0;
2445 :
2446 : // Ordering: the items in this batch do not need to be in any global order, but values for
2447 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2448 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2449 : // more details.
2450 :
2451 1486204 : let metadata_batch = {
2452 1486204 : let pending_meta = self
2453 1486204 : .pending_metadata_pages
2454 1486204 : .drain()
2455 1486204 : .flat_map(|(key, values)| {
2456 548148 : values
2457 548148 : .into_iter()
2458 548148 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2459 1486204 : })
2460 1486204 : .collect::<Vec<_>>();
2461 1486204 :
2462 1486204 : if pending_meta.is_empty() {
2463 944556 : None
2464 : } else {
2465 541648 : Some(SerializedValueBatch::from_values(pending_meta))
2466 : }
2467 : };
2468 :
2469 1486204 : let data_batch = self.pending_data_batch.take();
2470 :
2471 1486204 : let maybe_batch = match (data_batch, metadata_batch) {
2472 529112 : (Some(mut data), Some(metadata)) => {
2473 529112 : data.extend(metadata);
2474 529112 : Some(data)
2475 : }
2476 286524 : (Some(data), None) => Some(data),
2477 12536 : (None, Some(metadata)) => Some(metadata),
2478 658032 : (None, None) => None,
2479 : };
2480 :
2481 1486204 : if let Some(batch) = maybe_batch {
2482 828172 : tracing::debug!(
2483 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2484 0 : batch.max_lsn,
2485 0 : self.tline.get_last_record_lsn()
2486 : );
2487 :
2488 : // This bails out on first error without modifying pending_updates.
2489 : // That's Ok, cf this function's doc comment.
2490 828172 : writer.put_batch(batch, ctx).await?;
2491 658032 : }
2492 :
2493 1486204 : if !self.pending_deletions.is_empty() {
2494 4 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2495 4 : self.pending_deletions.clear();
2496 1486200 : }
2497 :
2498 1486204 : self.pending_lsns.push(self.lsn);
2499 1777920 : for pending_lsn in self.pending_lsns.drain(..) {
2500 1777920 : // TODO(vlad): pretty sure the comment below is not valid anymore
2501 1777920 : // and we can call finish write with the latest LSN
2502 1777920 : //
2503 1777920 : // Ideally, we should be able to call writer.finish_write() only once
2504 1777920 : // with the highest LSN. However, the last_record_lsn variable in the
2505 1777920 : // timeline keeps track of the latest LSN and the immediate previous LSN
2506 1777920 : // so we need to record every LSN to not leave a gap between them.
2507 1777920 : writer.finish_write(pending_lsn);
2508 1777920 : }
2509 :
2510 1486204 : if pending_nblocks != 0 {
2511 541140 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2512 945064 : }
2513 :
2514 1486204 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2515 6028 : writer.update_directory_entries_count(kind, count);
2516 6028 : }
2517 :
2518 1486204 : self.pending_metadata_bytes = 0;
2519 1486204 :
2520 1486204 : Ok(())
2521 1486204 : }
2522 :
2523 583408 : pub(crate) fn len(&self) -> usize {
2524 583408 : self.pending_metadata_pages.len()
2525 583408 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2526 583408 : + self.pending_deletions.len()
2527 583408 : }
2528 :
2529 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2530 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2531 : /// in the modification.
2532 : ///
2533 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2534 : /// page must ensure that the pages they read are already committed in Timeline, for example
2535 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2536 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2537 : /// and not data pages.
2538 573172 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2539 573172 : if !Self::is_data_key(&key) {
2540 : // Have we already updated the same key? Read the latest pending updated
2541 : // version in that case.
2542 : //
2543 : // Note: we don't check pending_deletions. It is an error to request a
2544 : // value that has been removed, deletion only avoids leaking storage.
2545 573172 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2546 31856 : if let Some((_, _, value)) = values.last() {
2547 31856 : return if let Value::Image(img) = value {
2548 31856 : Ok(img.clone())
2549 : } else {
2550 : // Currently, we never need to read back a WAL record that we
2551 : // inserted in the same "transaction". All the metadata updates
2552 : // work directly with Images, and we never need to read actual
2553 : // data pages. We could handle this if we had to, by calling
2554 : // the walredo manager, but let's keep it simple for now.
2555 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2556 0 : "unexpected pending WAL record"
2557 0 : )))
2558 : };
2559 0 : }
2560 541316 : }
2561 : } else {
2562 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2563 : // this key should never be present in pending_data_pages. We ensure this by committing
2564 : // modifications before ingesting DB create operations, which are the only kind that reads
2565 : // data pages during ingest.
2566 0 : if cfg!(debug_assertions) {
2567 0 : assert!(
2568 0 : !self
2569 0 : .pending_data_batch
2570 0 : .as_ref()
2571 0 : .is_some_and(|b| b.updates_key(&key))
2572 0 : );
2573 0 : }
2574 : }
2575 :
2576 : // Metadata page cache miss, or we're reading a data page.
2577 541316 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2578 541316 : self.tline.get(key, lsn, ctx).await
2579 573172 : }
2580 :
2581 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2582 : /// and the empty value into None.
2583 0 : async fn sparse_get(
2584 0 : &self,
2585 0 : key: Key,
2586 0 : ctx: &RequestContext,
2587 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2588 0 : let val = self.get(key, ctx).await;
2589 0 : match val {
2590 0 : Ok(val) if val.is_empty() => Ok(None),
2591 0 : Ok(val) => Ok(Some(val)),
2592 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2593 0 : Err(e) => Err(e),
2594 : }
2595 0 : }
2596 :
2597 1128188 : fn put(&mut self, key: Key, val: Value) {
2598 1128188 : if Self::is_data_key(&key) {
2599 555736 : self.put_data(key.to_compact(), val)
2600 : } else {
2601 572452 : self.put_metadata(key.to_compact(), val)
2602 : }
2603 1128188 : }
2604 :
2605 555736 : fn put_data(&mut self, key: CompactKey, val: Value) {
2606 555736 : let batch = self
2607 555736 : .pending_data_batch
2608 555736 : .get_or_insert_with(SerializedValueBatch::default);
2609 555736 : batch.put(key, val, self.lsn);
2610 555736 : }
2611 :
2612 572452 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2613 572452 : let values = self.pending_metadata_pages.entry(key).or_default();
2614 : // Replace the previous value if it exists at the same lsn
2615 572452 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2616 24304 : if *last_lsn == self.lsn {
2617 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2618 24304 : self.pending_metadata_bytes -= *last_value_ser_size;
2619 24304 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2620 24304 : self.pending_metadata_bytes += *last_value_ser_size;
2621 24304 :
2622 24304 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2623 24304 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2624 24304 : *last_value = val;
2625 24304 : return;
2626 0 : }
2627 548148 : }
2628 :
2629 548148 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2630 548148 : self.pending_metadata_bytes += val_serialized_size;
2631 548148 : values.push((self.lsn, val_serialized_size, val));
2632 548148 :
2633 548148 : if key == CHECKPOINT_KEY.to_compact() {
2634 460 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2635 547688 : }
2636 572452 : }
2637 :
2638 4 : fn delete(&mut self, key_range: Range<Key>) {
2639 4 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2640 4 : self.pending_deletions.push((key_range, self.lsn));
2641 4 : }
2642 : }
2643 :
2644 : /// Statistics for a DatadirModification.
2645 : #[derive(Default)]
2646 : pub struct DatadirModificationStats {
2647 : pub metadata_images: u64,
2648 : pub metadata_deltas: u64,
2649 : pub data_images: u64,
2650 : pub data_deltas: u64,
2651 : }
2652 :
2653 : /// This struct facilitates accessing either a committed key from the timeline at a
2654 : /// specific LSN, or the latest uncommitted key from a pending modification.
2655 : ///
2656 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2657 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2658 : /// need to look up the keys in the modification first before looking them up in the
2659 : /// timeline to not miss the latest updates.
2660 : #[derive(Clone, Copy)]
2661 : pub enum Version<'a> {
2662 : Lsn(Lsn),
2663 : Modified(&'a DatadirModification<'a>),
2664 : }
2665 :
2666 : impl Version<'_> {
2667 10352 : async fn get(
2668 10352 : &self,
2669 10352 : timeline: &Timeline,
2670 10352 : key: Key,
2671 10352 : ctx: &RequestContext,
2672 10352 : ) -> Result<Bytes, PageReconstructError> {
2673 10352 : match self {
2674 10312 : Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
2675 40 : Version::Modified(modification) => modification.get(key, ctx).await,
2676 : }
2677 10352 : }
2678 :
2679 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2680 : /// and the empty value into None.
2681 0 : async fn sparse_get(
2682 0 : &self,
2683 0 : timeline: &Timeline,
2684 0 : key: Key,
2685 0 : ctx: &RequestContext,
2686 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2687 0 : let val = self.get(timeline, key, ctx).await;
2688 0 : match val {
2689 0 : Ok(val) if val.is_empty() => Ok(None),
2690 0 : Ok(val) => Ok(Some(val)),
2691 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2692 0 : Err(e) => Err(e),
2693 : }
2694 0 : }
2695 :
2696 71240 : fn get_lsn(&self) -> Lsn {
2697 71240 : match self {
2698 59148 : Version::Lsn(lsn) => *lsn,
2699 12092 : Version::Modified(modification) => modification.lsn,
2700 : }
2701 71240 : }
2702 : }
2703 :
2704 : //--- Metadata structs stored in key-value pairs in the repository.
2705 :
2706 0 : #[derive(Debug, Serialize, Deserialize)]
2707 : pub(crate) struct DbDirectory {
2708 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2709 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2710 : }
2711 :
2712 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2713 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2714 : // were named like "pg_twophase/000002E5", now they're like
2715 : // "pg_twophsae/0000000A000002E4".
2716 :
2717 0 : #[derive(Debug, Serialize, Deserialize)]
2718 : pub(crate) struct TwoPhaseDirectory {
2719 : pub(crate) xids: HashSet<TransactionId>,
2720 : }
2721 :
2722 0 : #[derive(Debug, Serialize, Deserialize)]
2723 : struct TwoPhaseDirectoryV17 {
2724 : xids: HashSet<u64>,
2725 : }
2726 :
2727 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2728 : pub(crate) struct RelDirectory {
2729 : // Set of relations that exist. (relfilenode, forknum)
2730 : //
2731 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2732 : // key-value pairs, if you have a lot of relations
2733 : pub(crate) rels: HashSet<(Oid, u8)>,
2734 : }
2735 :
2736 0 : #[derive(Debug, Serialize, Deserialize)]
2737 : struct RelSizeEntry {
2738 : nblocks: u32,
2739 : }
2740 :
2741 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2742 : pub(crate) struct SlruSegmentDirectory {
2743 : // Set of SLRU segments that exist.
2744 : pub(crate) segments: HashSet<u32>,
2745 : }
2746 :
2747 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2748 : #[repr(u8)]
2749 : pub(crate) enum DirectoryKind {
2750 : Db,
2751 : TwoPhase,
2752 : Rel,
2753 : AuxFiles,
2754 : SlruSegment(SlruKind),
2755 : RelV2,
2756 : }
2757 :
2758 : impl DirectoryKind {
2759 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2760 18088 : pub(crate) fn offset(&self) -> usize {
2761 18088 : self.into_usize()
2762 18088 : }
2763 : }
2764 :
2765 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2766 :
2767 : #[allow(clippy::bool_assert_comparison)]
2768 : #[cfg(test)]
2769 : mod tests {
2770 : use hex_literal::hex;
2771 : use pageserver_api::models::ShardParameters;
2772 : use pageserver_api::shard::ShardStripeSize;
2773 : use utils::id::TimelineId;
2774 : use utils::shard::{ShardCount, ShardNumber};
2775 :
2776 : use super::*;
2777 : use crate::DEFAULT_PG_VERSION;
2778 : use crate::tenant::harness::TenantHarness;
2779 :
2780 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2781 : #[tokio::test]
2782 4 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2783 4 : let name = "aux_files_round_trip";
2784 4 : let harness = TenantHarness::create(name).await?;
2785 4 :
2786 4 : pub const TIMELINE_ID: TimelineId =
2787 4 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2788 4 :
2789 4 : let (tenant, ctx) = harness.load().await;
2790 4 : let (tline, ctx) = tenant
2791 4 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2792 4 : .await?;
2793 4 : let tline = tline.raw_timeline().unwrap();
2794 4 :
2795 4 : // First modification: insert two keys
2796 4 : let mut modification = tline.begin_modification(Lsn(0x1000));
2797 4 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2798 4 : modification.set_lsn(Lsn(0x1008))?;
2799 4 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2800 4 : modification.commit(&ctx).await?;
2801 4 : let expect_1008 = HashMap::from([
2802 4 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2803 4 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2804 4 : ]);
2805 4 :
2806 4 : let io_concurrency = IoConcurrency::spawn_for_test();
2807 4 :
2808 4 : let readback = tline
2809 4 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2810 4 : .await?;
2811 4 : assert_eq!(readback, expect_1008);
2812 4 :
2813 4 : // Second modification: update one key, remove the other
2814 4 : let mut modification = tline.begin_modification(Lsn(0x2000));
2815 4 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2816 4 : modification.set_lsn(Lsn(0x2008))?;
2817 4 : modification.put_file("foo/bar2", b"", &ctx).await?;
2818 4 : modification.commit(&ctx).await?;
2819 4 : let expect_2008 =
2820 4 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2821 4 :
2822 4 : let readback = tline
2823 4 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2824 4 : .await?;
2825 4 : assert_eq!(readback, expect_2008);
2826 4 :
2827 4 : // Reading back in time works
2828 4 : let readback = tline
2829 4 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2830 4 : .await?;
2831 4 : assert_eq!(readback, expect_1008);
2832 4 :
2833 4 : Ok(())
2834 4 : }
2835 :
2836 : #[test]
2837 4 : fn gap_finding() {
2838 4 : let rel = RelTag {
2839 4 : spcnode: 1663,
2840 4 : dbnode: 208101,
2841 4 : relnode: 2620,
2842 4 : forknum: 0,
2843 4 : };
2844 4 : let base_blkno = 1;
2845 4 :
2846 4 : let base_key = rel_block_to_key(rel, base_blkno);
2847 4 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2848 4 :
2849 4 : let shard = ShardIdentity::unsharded();
2850 4 :
2851 4 : let mut previous_nblocks = 0;
2852 44 : for i in 0..10 {
2853 40 : let crnt_blkno = base_blkno + i;
2854 40 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2855 40 :
2856 40 : previous_nblocks = crnt_blkno + 1;
2857 40 :
2858 40 : if i == 0 {
2859 : // The first block we write is 1, so we should find the gap.
2860 4 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2861 : } else {
2862 36 : assert!(gaps.is_none());
2863 : }
2864 : }
2865 :
2866 : // This is an update to an already existing block. No gaps here.
2867 4 : let update_blkno = 5;
2868 4 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2869 4 : assert!(gaps.is_none());
2870 :
2871 : // This is an update past the current end block.
2872 4 : let after_gap_blkno = 20;
2873 4 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2874 4 :
2875 4 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2876 4 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2877 4 : assert_eq!(
2878 4 : gaps.unwrap(),
2879 4 : KeySpace::single(gap_start_key..after_gap_key)
2880 4 : );
2881 4 : }
2882 :
2883 : #[test]
2884 4 : fn sharded_gap_finding() {
2885 4 : let rel = RelTag {
2886 4 : spcnode: 1663,
2887 4 : dbnode: 208101,
2888 4 : relnode: 2620,
2889 4 : forknum: 0,
2890 4 : };
2891 4 :
2892 4 : let first_blkno = 6;
2893 4 :
2894 4 : // This shard will get the even blocks
2895 4 : let shard = ShardIdentity::from_params(
2896 4 : ShardNumber(0),
2897 4 : &ShardParameters {
2898 4 : count: ShardCount(2),
2899 4 : stripe_size: ShardStripeSize(1),
2900 4 : },
2901 4 : );
2902 4 :
2903 4 : // Only keys belonging to this shard are considered as gaps.
2904 4 : let mut previous_nblocks = 0;
2905 4 : let gaps =
2906 4 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2907 4 : assert!(!gaps.ranges.is_empty());
2908 12 : for gap_range in gaps.ranges {
2909 8 : let mut k = gap_range.start;
2910 16 : while k != gap_range.end {
2911 8 : assert_eq!(shard.get_shard_number(&k), shard.number);
2912 8 : k = k.next();
2913 : }
2914 : }
2915 :
2916 4 : previous_nblocks = first_blkno;
2917 4 :
2918 4 : let update_blkno = 2;
2919 4 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2920 4 : assert!(gaps.is_none());
2921 4 : }
2922 :
2923 : /*
2924 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2925 : let incremental = timeline.get_current_logical_size();
2926 : let non_incremental = timeline
2927 : .get_current_logical_size_non_incremental(lsn)
2928 : .unwrap();
2929 : assert_eq!(incremental, non_incremental);
2930 : }
2931 : */
2932 :
2933 : /*
2934 : ///
2935 : /// Test list_rels() function, with branches and dropped relations
2936 : ///
2937 : #[test]
2938 : fn test_list_rels_drop() -> Result<()> {
2939 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
2940 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
2941 : const TESTDB: u32 = 111;
2942 :
2943 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
2944 : // after branching fails below
2945 : let mut writer = tline.begin_record(Lsn(0x10));
2946 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
2947 : writer.finish()?;
2948 :
2949 : // Create a relation on the timeline
2950 : let mut writer = tline.begin_record(Lsn(0x20));
2951 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
2952 : writer.finish()?;
2953 :
2954 : let writer = tline.begin_record(Lsn(0x00));
2955 : writer.finish()?;
2956 :
2957 : // Check that list_rels() lists it after LSN 2, but no before it
2958 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
2959 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
2960 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
2961 :
2962 : // Create a branch, check that the relation is visible there
2963 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
2964 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
2965 : Some(timeline) => timeline,
2966 : None => panic!("Should have a local timeline"),
2967 : };
2968 : let newtline = DatadirTimelineImpl::new(newtline);
2969 : assert!(newtline
2970 : .list_rels(0, TESTDB, Lsn(0x30))?
2971 : .contains(&TESTREL_A));
2972 :
2973 : // Drop it on the branch
2974 : let mut new_writer = newtline.begin_record(Lsn(0x40));
2975 : new_writer.drop_relation(TESTREL_A)?;
2976 : new_writer.finish()?;
2977 :
2978 : // Check that it's no longer listed on the branch after the point where it was dropped
2979 : assert!(newtline
2980 : .list_rels(0, TESTDB, Lsn(0x30))?
2981 : .contains(&TESTREL_A));
2982 : assert!(!newtline
2983 : .list_rels(0, TESTDB, Lsn(0x40))?
2984 : .contains(&TESTREL_A));
2985 :
2986 : // Run checkpoint and garbage collection and check that it's still not visible
2987 : newtline.checkpoint(CheckpointConfig::Forced)?;
2988 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
2989 :
2990 : assert!(!newtline
2991 : .list_rels(0, TESTDB, Lsn(0x40))?
2992 : .contains(&TESTREL_A));
2993 :
2994 : Ok(())
2995 : }
2996 : */
2997 :
2998 : /*
2999 : #[test]
3000 : fn test_read_beyond_eof() -> Result<()> {
3001 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3002 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3003 :
3004 : make_some_layers(&tline, Lsn(0x20))?;
3005 : let mut writer = tline.begin_record(Lsn(0x60));
3006 : walingest.put_rel_page_image(
3007 : &mut writer,
3008 : TESTREL_A,
3009 : 0,
3010 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3011 : )?;
3012 : writer.finish()?;
3013 :
3014 : // Test read before rel creation. Should error out.
3015 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3016 :
3017 : // Read block beyond end of relation at different points in time.
3018 : // These reads should fall into different delta, image, and in-memory layers.
3019 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3020 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3021 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3022 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3023 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3024 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3025 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3026 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3027 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3028 :
3029 : // Test on an in-memory layer with no preceding layer
3030 : let mut writer = tline.begin_record(Lsn(0x70));
3031 : walingest.put_rel_page_image(
3032 : &mut writer,
3033 : TESTREL_B,
3034 : 0,
3035 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3036 : )?;
3037 : writer.finish()?;
3038 :
3039 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3040 :
3041 : Ok(())
3042 : }
3043 : */
3044 : }
|