Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{BTreeSet, HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 : use std::sync::Arc;
12 :
13 : use crate::walingest::{WalIngestError, WalIngestErrorKind};
14 : use crate::{PERF_TRACE_TARGET, ensure_walingest};
15 : use anyhow::Context;
16 : use bytes::{Buf, Bytes, BytesMut};
17 : use enum_map::Enum;
18 : use pageserver_api::key::{
19 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
20 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
21 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
22 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
23 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
24 : };
25 : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
26 : use pageserver_api::models::RelSizeMigration;
27 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
28 : use pageserver_api::shard::ShardIdentity;
29 : use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
30 : use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
31 : use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
32 : use serde::{Deserialize, Serialize};
33 : use strum::IntoEnumIterator;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::{debug, info, info_span, trace, warn};
36 : use utils::bin_ser::{BeSer, DeserializeError};
37 : use utils::lsn::Lsn;
38 : use utils::pausable_failpoint;
39 : use wal_decoder::models::record::NeonWalRecord;
40 : use wal_decoder::models::value::Value;
41 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
42 :
43 : use super::tenant::{PageReconstructError, Timeline};
44 : use crate::aux_file;
45 : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
46 : use crate::keyspace::{KeySpace, KeySpaceAccum};
47 : use crate::metrics::{
48 : RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
49 : RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
50 : RELSIZE_SNAPSHOT_CACHE_MISSES,
51 : };
52 : use crate::span::{
53 : debug_assert_current_span_has_tenant_and_timeline_id,
54 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
55 : };
56 : use crate::tenant::storage_layer::IoConcurrency;
57 : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
58 :
59 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
60 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
61 :
62 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
63 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
64 :
65 : #[derive(Debug)]
66 : pub enum LsnForTimestamp {
67 : /// Found commits both before and after the given timestamp
68 : Present(Lsn),
69 :
70 : /// Found no commits after the given timestamp, this means
71 : /// that the newest data in the branch is older than the given
72 : /// timestamp.
73 : ///
74 : /// All commits <= LSN happened before the given timestamp
75 : Future(Lsn),
76 :
77 : /// The queried timestamp is past our horizon we look back at (PITR)
78 : ///
79 : /// All commits > LSN happened after the given timestamp,
80 : /// but any commits < LSN might have happened before or after
81 : /// the given timestamp. We don't know because no data before
82 : /// the given lsn is available.
83 : Past(Lsn),
84 :
85 : /// We have found no commit with a timestamp,
86 : /// so we can't return anything meaningful.
87 : ///
88 : /// The associated LSN is the lower bound value we can safely
89 : /// create branches on, but no statement is made if it is
90 : /// older or newer than the timestamp.
91 : ///
92 : /// This variant can e.g. be returned right after a
93 : /// cluster import.
94 : NoData(Lsn),
95 : }
96 :
97 : /// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
98 : /// See comments libs/pageserver_api/src/models.rs.
99 : /// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
100 : /// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
101 : #[derive(Debug, Clone, Copy, Default)]
102 : pub struct LsnRange {
103 : pub effective_lsn: Lsn,
104 : pub request_lsn: Lsn,
105 : }
106 :
107 : impl LsnRange {
108 0 : pub fn at(lsn: Lsn) -> LsnRange {
109 0 : LsnRange {
110 0 : effective_lsn: lsn,
111 0 : request_lsn: lsn,
112 0 : }
113 0 : }
114 16 : pub fn is_latest(&self) -> bool {
115 16 : self.request_lsn == Lsn::MAX
116 16 : }
117 : }
118 :
119 : #[derive(Debug, thiserror::Error)]
120 : pub(crate) enum CalculateLogicalSizeError {
121 : #[error("cancelled")]
122 : Cancelled,
123 :
124 : /// Something went wrong while reading the metadata we use to calculate logical size
125 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
126 : /// in the `From` implementation for this variant.
127 : #[error(transparent)]
128 : PageRead(PageReconstructError),
129 :
130 : /// Something went wrong deserializing metadata that we read to calculate logical size
131 : #[error("decode error: {0}")]
132 : Decode(#[from] DeserializeError),
133 : }
134 :
135 : #[derive(Debug, thiserror::Error)]
136 : pub(crate) enum CollectKeySpaceError {
137 : #[error(transparent)]
138 : Decode(#[from] DeserializeError),
139 : #[error(transparent)]
140 : PageRead(PageReconstructError),
141 : #[error("cancelled")]
142 : Cancelled,
143 : }
144 :
145 : impl CollectKeySpaceError {
146 0 : pub(crate) fn is_cancel(&self) -> bool {
147 0 : match self {
148 0 : CollectKeySpaceError::Decode(_) => false,
149 0 : CollectKeySpaceError::PageRead(e) => e.is_cancel(),
150 0 : CollectKeySpaceError::Cancelled => true,
151 : }
152 0 : }
153 0 : pub(crate) fn into_anyhow(self) -> anyhow::Error {
154 0 : match self {
155 0 : CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
156 0 : CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
157 0 : CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
158 : }
159 0 : }
160 : }
161 :
162 : impl From<PageReconstructError> for CollectKeySpaceError {
163 0 : fn from(err: PageReconstructError) -> Self {
164 0 : match err {
165 0 : PageReconstructError::Cancelled => Self::Cancelled,
166 0 : err => Self::PageRead(err),
167 : }
168 0 : }
169 : }
170 :
171 : impl From<PageReconstructError> for CalculateLogicalSizeError {
172 0 : fn from(pre: PageReconstructError) -> Self {
173 0 : match pre {
174 0 : PageReconstructError::Cancelled => Self::Cancelled,
175 0 : _ => Self::PageRead(pre),
176 : }
177 0 : }
178 : }
179 :
180 : #[derive(Debug, thiserror::Error)]
181 : pub enum RelationError {
182 : #[error("invalid relnode")]
183 : InvalidRelnode,
184 : }
185 :
186 : ///
187 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
188 : /// and other special kinds of files, in a versioned key-value store. The
189 : /// Timeline struct provides the key-value store.
190 : ///
191 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
192 : /// implementation, and might be moved into a separate struct later.
193 : impl Timeline {
194 : /// Start ingesting a WAL record, or other atomic modification of
195 : /// the timeline.
196 : ///
197 : /// This provides a transaction-like interface to perform a bunch
198 : /// of modifications atomically.
199 : ///
200 : /// To ingest a WAL record, call begin_modification(lsn) to get a
201 : /// DatadirModification object. Use the functions in the object to
202 : /// modify the repository state, updating all the pages and metadata
203 : /// that the WAL record affects. When you're done, call commit() to
204 : /// commit the changes.
205 : ///
206 : /// Lsn stored in modification is advanced by `ingest_record` and
207 : /// is used by `commit()` to update `last_record_lsn`.
208 : ///
209 : /// Calling commit() will flush all the changes and reset the state,
210 : /// so the `DatadirModification` struct can be reused to perform the next modification.
211 : ///
212 : /// Note that any pending modifications you make through the
213 : /// modification object won't be visible to calls to the 'get' and list
214 : /// functions of the timeline until you finish! And if you update the
215 : /// same page twice, the last update wins.
216 : ///
217 134213 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
218 134213 : where
219 134213 : Self: Sized,
220 : {
221 134213 : DatadirModification {
222 134213 : tline: self,
223 134213 : pending_lsns: Vec::new(),
224 134213 : pending_metadata_pages: HashMap::new(),
225 134213 : pending_data_batch: None,
226 134213 : pending_deletions: Vec::new(),
227 134213 : pending_nblocks: 0,
228 134213 : pending_directory_entries: Vec::new(),
229 134213 : pending_metadata_bytes: 0,
230 134213 : is_importing_pgdata: false,
231 134213 : lsn,
232 134213 : }
233 134213 : }
234 :
235 2 : pub fn begin_modification_for_import(&self, lsn: Lsn) -> DatadirModification
236 2 : where
237 2 : Self: Sized,
238 : {
239 2 : DatadirModification {
240 2 : tline: self,
241 2 : pending_lsns: Vec::new(),
242 2 : pending_metadata_pages: HashMap::new(),
243 2 : pending_data_batch: None,
244 2 : pending_deletions: Vec::new(),
245 2 : pending_nblocks: 0,
246 2 : pending_directory_entries: Vec::new(),
247 2 : pending_metadata_bytes: 0,
248 2 : is_importing_pgdata: true,
249 2 : lsn,
250 2 : }
251 2 : }
252 :
253 : //------------------------------------------------------------------------------
254 : // Public GET functions
255 : //------------------------------------------------------------------------------
256 :
257 : /// Look up given page version.
258 9192 : pub(crate) async fn get_rel_page_at_lsn(
259 9192 : &self,
260 9192 : tag: RelTag,
261 9192 : blknum: BlockNumber,
262 9192 : version: Version<'_>,
263 9192 : ctx: &RequestContext,
264 9192 : io_concurrency: IoConcurrency,
265 9192 : ) -> Result<Bytes, PageReconstructError> {
266 9192 : match version {
267 9192 : Version::LsnRange(lsns) => {
268 9192 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
269 9192 : let res = self
270 9192 : .get_rel_page_at_lsn_batched(
271 9192 : pages
272 9192 : .iter()
273 9192 : .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
274 9192 : io_concurrency.clone(),
275 9192 : ctx,
276 : )
277 9192 : .await;
278 9192 : assert_eq!(res.len(), 1);
279 9192 : res.into_iter().next().unwrap()
280 : }
281 0 : Version::Modified(modification) => {
282 0 : if tag.relnode == 0 {
283 0 : return Err(PageReconstructError::Other(
284 0 : RelationError::InvalidRelnode.into(),
285 0 : ));
286 0 : }
287 :
288 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
289 0 : if blknum >= nblocks {
290 0 : debug!(
291 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
292 : tag,
293 : blknum,
294 0 : version.get_lsn(),
295 : nblocks
296 : );
297 0 : return Ok(ZERO_PAGE.clone());
298 0 : }
299 :
300 0 : let key = rel_block_to_key(tag, blknum);
301 0 : modification.get(key, ctx).await
302 : }
303 : }
304 9192 : }
305 :
306 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
307 : ///
308 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
309 : ///
310 : /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
311 : /// if the client goes away (e.g. due to timeout or cancellation).
312 : /// TODO: verify that it actually is cancellation-safe.
313 9192 : pub(crate) async fn get_rel_page_at_lsn_batched(
314 9192 : &self,
315 9192 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
316 9192 : io_concurrency: IoConcurrency,
317 9192 : ctx: &RequestContext,
318 9192 : ) -> Vec<Result<Bytes, PageReconstructError>> {
319 9192 : debug_assert_current_span_has_tenant_and_timeline_id();
320 :
321 9192 : let mut slots_filled = 0;
322 9192 : let page_count = pages.len();
323 :
324 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
325 9192 : let mut result = Vec::with_capacity(pages.len());
326 9192 : let result_slots = result.spare_capacity_mut();
327 :
328 9192 : let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
329 9192 : HashMap::with_capacity(pages.len());
330 :
331 9192 : let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
332 9192 : HashMap::with_capacity(pages.len());
333 :
334 9192 : for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
335 9192 : if tag.relnode == 0 {
336 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
337 0 : RelationError::InvalidRelnode.into(),
338 0 : )));
339 :
340 0 : slots_filled += 1;
341 0 : continue;
342 9192 : }
343 9192 : let lsn = lsns.effective_lsn;
344 9192 : let nblocks = {
345 9192 : let ctx = RequestContextBuilder::from(&ctx)
346 9192 : .perf_span(|crnt_perf_span| {
347 0 : info_span!(
348 : target: PERF_TRACE_TARGET,
349 0 : parent: crnt_perf_span,
350 : "GET_REL_SIZE",
351 : reltag=%tag,
352 : lsn=%lsn,
353 : )
354 0 : })
355 9192 : .attached_child();
356 :
357 9192 : match self
358 9192 : .get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
359 9192 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
360 9192 : .await
361 : {
362 9192 : Ok(nblocks) => nblocks,
363 0 : Err(err) => {
364 0 : result_slots[response_slot_idx].write(Err(err));
365 0 : slots_filled += 1;
366 0 : continue;
367 : }
368 : }
369 : };
370 :
371 9192 : if *blknum >= nblocks {
372 0 : debug!(
373 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
374 : tag, blknum, lsn, nblocks
375 : );
376 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
377 0 : slots_filled += 1;
378 0 : continue;
379 9192 : }
380 :
381 9192 : let key = rel_block_to_key(*tag, *blknum);
382 :
383 9192 : let ctx = RequestContextBuilder::from(&ctx)
384 9192 : .perf_span(|crnt_perf_span| {
385 0 : info_span!(
386 : target: PERF_TRACE_TARGET,
387 0 : parent: crnt_perf_span,
388 : "GET_BATCH",
389 : batch_size = %page_count,
390 : )
391 0 : })
392 9192 : .attached_child();
393 :
394 9192 : let key_slots = keys_slots.entry(key).or_default();
395 9192 : key_slots.push((response_slot_idx, ctx));
396 :
397 9192 : let acc = req_keyspaces.entry(lsn).or_default();
398 9192 : acc.add_key(key);
399 : }
400 :
401 9192 : let query: Vec<(Lsn, KeySpace)> = req_keyspaces
402 9192 : .into_iter()
403 9192 : .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
404 9192 : .collect();
405 :
406 9192 : let query = VersionedKeySpaceQuery::scattered(query);
407 9192 : let res = self
408 9192 : .get_vectored(query, io_concurrency, ctx)
409 9192 : .maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
410 9192 : .await;
411 :
412 9192 : match res {
413 9192 : Ok(results) => {
414 18384 : for (key, res) in results {
415 9192 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
416 9192 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
417 :
418 9192 : for (slot, req_ctx) in key_slots {
419 0 : let clone = match &res {
420 0 : Ok(buf) => Ok(buf.clone()),
421 0 : Err(err) => Err(match err {
422 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
423 :
424 0 : x @ PageReconstructError::Other(_)
425 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
426 0 : | x @ PageReconstructError::WalRedo(_)
427 0 : | x @ PageReconstructError::MissingKey(_) => {
428 0 : PageReconstructError::Other(anyhow::anyhow!(
429 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
430 0 : ))
431 : }
432 : }),
433 : };
434 :
435 0 : result_slots[slot].write(clone);
436 : // There is no standardized way to express that the batched span followed from N request spans.
437 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
438 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
439 0 : req_ctx.perf_follows_from(ctx);
440 0 : slots_filled += 1;
441 : }
442 :
443 9192 : result_slots[first_slot].write(res);
444 9192 : first_req_ctx.perf_follows_from(ctx);
445 9192 : slots_filled += 1;
446 : }
447 : }
448 0 : Err(err) => {
449 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
450 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
451 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
452 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
453 : // but without taking ownership of the GetVectoredError
454 0 : let err = match &err {
455 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
456 : // TODO: restructure get_vectored API to make this error per-key
457 0 : GetVectoredError::MissingKey(err) => {
458 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
459 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
460 0 : )))
461 : }
462 : // TODO: restructure get_vectored API to make this error per-key
463 0 : GetVectoredError::GetReadyAncestorError(err) => {
464 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
465 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
466 0 : )))
467 : }
468 : // TODO: restructure get_vectored API to make this error per-key
469 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
470 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
471 0 : )),
472 : // TODO: we can prevent this error class by moving this check into the type system
473 0 : GetVectoredError::InvalidLsn(e) => {
474 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
475 : }
476 : // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys
477 : // TODO: we can prevent this error class by moving this check into the type system
478 0 : GetVectoredError::Oversized(err, max) => {
479 0 : Err(anyhow::anyhow!("batching oversized: {err} > {max}").into())
480 : }
481 : };
482 :
483 0 : req_ctx.perf_follows_from(ctx);
484 0 : result_slots[*slot].write(err);
485 : }
486 :
487 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
488 : }
489 : };
490 :
491 9192 : assert_eq!(slots_filled, page_count);
492 : // SAFETY:
493 : // 1. `result` and any of its uninint members are not read from until this point
494 : // 2. The length below is tracked at run-time and matches the number of requested pages.
495 9192 : unsafe {
496 9192 : result.set_len(page_count);
497 9192 : }
498 :
499 9192 : result
500 9192 : }
501 :
502 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
503 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
504 : /// for pages up to the highest page number it has stored.
505 0 : pub(crate) async fn get_db_size(
506 0 : &self,
507 0 : spcnode: Oid,
508 0 : dbnode: Oid,
509 0 : version: Version<'_>,
510 0 : ctx: &RequestContext,
511 0 : ) -> Result<usize, PageReconstructError> {
512 0 : let mut total_blocks = 0;
513 :
514 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
515 :
516 0 : if rels.is_empty() {
517 0 : return Ok(0);
518 0 : }
519 :
520 : // Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`.
521 0 : let reldir_key = rel_dir_to_key(spcnode, dbnode);
522 0 : let buf = version.get(self, reldir_key, ctx).await?;
523 0 : let reldir = RelDirectory::des(&buf)?;
524 :
525 0 : for rel in rels {
526 0 : let n_blocks = self
527 0 : .get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), false, ctx)
528 0 : .await?
529 0 : .expect("allow_missing=false");
530 0 : total_blocks += n_blocks as usize;
531 : }
532 0 : Ok(total_blocks)
533 0 : }
534 :
535 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
536 : ///
537 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
538 : /// page number stored in the shard.
539 12217 : pub(crate) async fn get_rel_size(
540 12217 : &self,
541 12217 : tag: RelTag,
542 12217 : version: Version<'_>,
543 12217 : ctx: &RequestContext,
544 12217 : ) -> Result<BlockNumber, PageReconstructError> {
545 12217 : Ok(self
546 12217 : .get_rel_size_in_reldir(tag, version, None, false, ctx)
547 12217 : .await?
548 12215 : .expect("allow_missing=false"))
549 12217 : }
550 :
551 : /// Get size of a relation file. If `allow_missing` is true, returns None for missing relations,
552 : /// otherwise errors.
553 : ///
554 : /// INVARIANT: never returns None if `allow_missing=false`.
555 : ///
556 : /// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
557 12217 : pub(crate) async fn get_rel_size_in_reldir(
558 12217 : &self,
559 12217 : tag: RelTag,
560 12217 : version: Version<'_>,
561 12217 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
562 12217 : allow_missing: bool,
563 12217 : ctx: &RequestContext,
564 12217 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
565 12217 : if tag.relnode == 0 {
566 0 : return Err(PageReconstructError::Other(
567 0 : RelationError::InvalidRelnode.into(),
568 0 : ));
569 12217 : }
570 :
571 12217 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
572 12210 : return Ok(Some(nblocks));
573 7 : }
574 :
575 7 : if allow_missing
576 0 : && !self
577 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
578 0 : .await?
579 : {
580 0 : return Ok(None);
581 7 : }
582 :
583 7 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
584 0 : && !self
585 0 : .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
586 0 : .await?
587 : {
588 : // FIXME: Postgres sometimes calls smgrcreate() to create
589 : // FSM, and smgrnblocks() on it immediately afterwards,
590 : // without extending it. Tolerate that by claiming that
591 : // any non-existent FSM fork has size 0.
592 0 : return Ok(Some(0));
593 7 : }
594 :
595 7 : let key = rel_size_to_key(tag);
596 7 : let mut buf = version.get(self, key, ctx).await?;
597 5 : let nblocks = buf.get_u32_le();
598 :
599 5 : self.update_cached_rel_size(tag, version, nblocks);
600 :
601 5 : Ok(Some(nblocks))
602 12217 : }
603 :
604 : /// Does the relation exist?
605 : ///
606 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
607 : /// the shard stores pages for.
608 : ///
609 3025 : pub(crate) async fn get_rel_exists(
610 3025 : &self,
611 3025 : tag: RelTag,
612 3025 : version: Version<'_>,
613 3025 : ctx: &RequestContext,
614 3025 : ) -> Result<bool, PageReconstructError> {
615 3025 : self.get_rel_exists_in_reldir(tag, version, None, ctx).await
616 3025 : }
617 :
618 9 : async fn get_rel_exists_in_reldir_v1(
619 9 : &self,
620 9 : tag: RelTag,
621 9 : version: Version<'_>,
622 9 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
623 9 : ctx: &RequestContext,
624 9 : ) -> Result<bool, PageReconstructError> {
625 9 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
626 9 : if let Some((cached_key, dir)) = deserialized_reldir_v1 {
627 0 : if cached_key == key {
628 0 : return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
629 0 : } else if cfg!(test) || cfg!(feature = "testing") {
630 0 : panic!("cached reldir key mismatch: {cached_key} != {key}");
631 : } else {
632 0 : warn!("cached reldir key mismatch: {cached_key} != {key}");
633 : }
634 : // Fallback to reading the directory from the datadir.
635 9 : }
636 :
637 9 : let buf = version.get(self, key, ctx).await?;
638 :
639 9 : let dir = RelDirectory::des(&buf)?;
640 9 : Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
641 9 : }
642 :
643 0 : async fn get_rel_exists_in_reldir_v2(
644 0 : &self,
645 0 : tag: RelTag,
646 0 : version: Version<'_>,
647 0 : ctx: &RequestContext,
648 0 : ) -> Result<bool, PageReconstructError> {
649 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
650 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?).map_err(
651 0 : |_| {
652 0 : PageReconstructError::Other(anyhow::anyhow!(
653 0 : "invalid reldir key: decode failed, {}",
654 0 : key
655 0 : ))
656 0 : },
657 0 : )?;
658 0 : let exists_v2 = buf == RelDirExists::Exists;
659 0 : Ok(exists_v2)
660 0 : }
661 :
662 : /// Does the relation exist? With a cached deserialized `RelDirectory`.
663 : ///
664 : /// There are some cases where the caller loops across all relations. In that specific case,
665 : /// the caller should obtain the deserialized `RelDirectory` first and then call this function
666 : /// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing
667 : /// a new API (e.g., `get_rel_exists_batched`).
668 3025 : pub(crate) async fn get_rel_exists_in_reldir(
669 3025 : &self,
670 3025 : tag: RelTag,
671 3025 : version: Version<'_>,
672 3025 : deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
673 3025 : ctx: &RequestContext,
674 3025 : ) -> Result<bool, PageReconstructError> {
675 3025 : if tag.relnode == 0 {
676 0 : return Err(PageReconstructError::Other(
677 0 : RelationError::InvalidRelnode.into(),
678 0 : ));
679 3025 : }
680 :
681 : // first try to lookup relation in cache
682 3025 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
683 3016 : return Ok(true);
684 9 : }
685 : // then check if the database was already initialized.
686 : // get_rel_exists can be called before dbdir is created.
687 9 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
688 9 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
689 9 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
690 0 : return Ok(false);
691 9 : }
692 :
693 9 : let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
694 :
695 0 : match v2_status {
696 : RelSizeMigration::Legacy => {
697 9 : let v1_exists = self
698 9 : .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
699 9 : .await?;
700 9 : Ok(v1_exists)
701 : }
702 : RelSizeMigration::Migrating | RelSizeMigration::Migrated
703 0 : if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
704 : {
705 : // For requests below the migrated LSN, we still use the v1 read path.
706 0 : let v1_exists = self
707 0 : .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
708 0 : .await?;
709 0 : Ok(v1_exists)
710 : }
711 : RelSizeMigration::Migrating => {
712 0 : let v1_exists = self
713 0 : .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
714 0 : .await?;
715 0 : let v2_exists_res = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await;
716 0 : match v2_exists_res {
717 0 : Ok(v2_exists) if v1_exists == v2_exists => {}
718 0 : Ok(v2_exists) => {
719 0 : tracing::warn!(
720 0 : "inconsistent v1/v2 reldir keyspace for rel {}: v1_exists={}, v2_exists={}",
721 : tag,
722 : v1_exists,
723 : v2_exists
724 : );
725 : }
726 0 : Err(e) => {
727 0 : tracing::warn!("failed to get rel exists in v2: {e}");
728 : }
729 : }
730 0 : Ok(v1_exists)
731 : }
732 : RelSizeMigration::Migrated => {
733 0 : let v2_exists = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await?;
734 0 : Ok(v2_exists)
735 : }
736 : }
737 3025 : }
738 :
739 0 : async fn list_rels_v1(
740 0 : &self,
741 0 : spcnode: Oid,
742 0 : dbnode: Oid,
743 0 : version: Version<'_>,
744 0 : ctx: &RequestContext,
745 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
746 0 : let key = rel_dir_to_key(spcnode, dbnode);
747 0 : let buf = version.get(self, key, ctx).await?;
748 0 : let dir = RelDirectory::des(&buf)?;
749 0 : let rels_v1: HashSet<RelTag> =
750 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
751 0 : spcnode,
752 0 : dbnode,
753 0 : relnode: *relnode,
754 0 : forknum: *forknum,
755 0 : }));
756 0 : Ok(rels_v1)
757 0 : }
758 :
759 0 : async fn list_rels_v2(
760 0 : &self,
761 0 : spcnode: Oid,
762 0 : dbnode: Oid,
763 0 : version: Version<'_>,
764 0 : ctx: &RequestContext,
765 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
766 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
767 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
768 0 : self.conf.get_vectored_concurrent_io,
769 0 : self.gate
770 0 : .enter()
771 0 : .map_err(|_| PageReconstructError::Cancelled)?,
772 : );
773 0 : let results = self
774 0 : .scan(
775 0 : KeySpace::single(key_range),
776 0 : version.get_lsn(),
777 0 : ctx,
778 0 : io_concurrency,
779 0 : )
780 0 : .await?;
781 0 : let mut rels = HashSet::new();
782 0 : for (key, val) in results {
783 0 : let val = RelDirExists::decode(&val?).map_err(|_| {
784 0 : PageReconstructError::Other(anyhow::anyhow!(
785 0 : "invalid reldir key: decode failed, {}",
786 0 : key
787 0 : ))
788 0 : })?;
789 0 : if key.field6 != 1 {
790 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
791 0 : "invalid reldir key: field6 != 1, {}",
792 0 : key
793 0 : )));
794 0 : }
795 0 : if key.field2 != spcnode {
796 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
797 0 : "invalid reldir key: field2 != spcnode, {}",
798 0 : key
799 0 : )));
800 0 : }
801 0 : if key.field3 != dbnode {
802 0 : return Err(PageReconstructError::Other(anyhow::anyhow!(
803 0 : "invalid reldir key: field3 != dbnode, {}",
804 0 : key
805 0 : )));
806 0 : }
807 0 : let tag = RelTag {
808 0 : spcnode,
809 0 : dbnode,
810 0 : relnode: key.field4,
811 0 : forknum: key.field5,
812 0 : };
813 0 : if val == RelDirExists::Removed {
814 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
815 0 : continue;
816 0 : }
817 0 : let did_not_contain = rels.insert(tag);
818 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
819 : }
820 0 : Ok(rels)
821 0 : }
822 :
823 : /// Get a list of all existing relations in given tablespace and database.
824 : ///
825 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
826 : /// the shard stores pages for.
827 : ///
828 : /// # Cancel-Safety
829 : ///
830 : /// This method is cancellation-safe.
831 0 : pub(crate) async fn list_rels(
832 0 : &self,
833 0 : spcnode: Oid,
834 0 : dbnode: Oid,
835 0 : version: Version<'_>,
836 0 : ctx: &RequestContext,
837 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
838 0 : let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
839 :
840 0 : match v2_status {
841 : RelSizeMigration::Legacy => {
842 0 : let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
843 0 : Ok(rels_v1)
844 : }
845 : RelSizeMigration::Migrating | RelSizeMigration::Migrated
846 0 : if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
847 : {
848 : // For requests below the migrated LSN, we still use the v1 read path.
849 0 : let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
850 0 : Ok(rels_v1)
851 : }
852 : RelSizeMigration::Migrating => {
853 0 : let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
854 0 : let rels_v2_res = self.list_rels_v2(spcnode, dbnode, version, ctx).await;
855 0 : match rels_v2_res {
856 0 : Ok(rels_v2) if rels_v1 == rels_v2 => {}
857 0 : Ok(rels_v2) => {
858 0 : tracing::warn!(
859 0 : "inconsistent v1/v2 reldir keyspace for db {} {}: v1_rels.len()={}, v2_rels.len()={}",
860 : spcnode,
861 : dbnode,
862 0 : rels_v1.len(),
863 0 : rels_v2.len()
864 : );
865 : }
866 0 : Err(e) => {
867 0 : tracing::warn!("failed to list rels in v2: {e}");
868 : }
869 : }
870 0 : Ok(rels_v1)
871 : }
872 : RelSizeMigration::Migrated => {
873 0 : let rels_v2 = self.list_rels_v2(spcnode, dbnode, version, ctx).await?;
874 0 : Ok(rels_v2)
875 : }
876 : }
877 0 : }
878 :
879 : /// Get the whole SLRU segment
880 0 : pub(crate) async fn get_slru_segment(
881 0 : &self,
882 0 : kind: SlruKind,
883 0 : segno: u32,
884 0 : lsn: Lsn,
885 0 : ctx: &RequestContext,
886 0 : ) -> Result<Bytes, PageReconstructError> {
887 0 : assert!(self.tenant_shard_id.is_shard_zero());
888 0 : let n_blocks = self
889 0 : .get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
890 0 : .await?;
891 :
892 0 : let keyspace = KeySpace::single(
893 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
894 : );
895 :
896 0 : let batches = keyspace.partition(
897 0 : self.get_shard_identity(),
898 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
899 0 : BLCKSZ as u64,
900 : );
901 :
902 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
903 0 : self.conf.get_vectored_concurrent_io,
904 0 : self.gate
905 0 : .enter()
906 0 : .map_err(|_| PageReconstructError::Cancelled)?,
907 : );
908 :
909 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
910 0 : for batch in batches.parts {
911 0 : let query = VersionedKeySpaceQuery::uniform(batch, lsn);
912 0 : let blocks = self
913 0 : .get_vectored(query, io_concurrency.clone(), ctx)
914 0 : .await?;
915 :
916 0 : for (_key, block) in blocks {
917 0 : let block = block?;
918 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
919 : }
920 : }
921 :
922 0 : Ok(segment.freeze())
923 0 : }
924 :
925 : /// Get size of an SLRU segment
926 0 : pub(crate) async fn get_slru_segment_size(
927 0 : &self,
928 0 : kind: SlruKind,
929 0 : segno: u32,
930 0 : version: Version<'_>,
931 0 : ctx: &RequestContext,
932 0 : ) -> Result<BlockNumber, PageReconstructError> {
933 0 : assert!(self.tenant_shard_id.is_shard_zero());
934 0 : let key = slru_segment_size_to_key(kind, segno);
935 0 : let mut buf = version.get(self, key, ctx).await?;
936 0 : Ok(buf.get_u32_le())
937 0 : }
938 :
939 : /// Does the slru segment exist?
940 0 : pub(crate) async fn get_slru_segment_exists(
941 0 : &self,
942 0 : kind: SlruKind,
943 0 : segno: u32,
944 0 : version: Version<'_>,
945 0 : ctx: &RequestContext,
946 0 : ) -> Result<bool, PageReconstructError> {
947 0 : assert!(self.tenant_shard_id.is_shard_zero());
948 : // fetch directory listing
949 0 : let key = slru_dir_to_key(kind);
950 0 : let buf = version.get(self, key, ctx).await?;
951 :
952 0 : let dir = SlruSegmentDirectory::des(&buf)?;
953 0 : Ok(dir.segments.contains(&segno))
954 0 : }
955 :
956 : /// Locate LSN, such that all transactions that committed before
957 : /// 'search_timestamp' are visible, but nothing newer is.
958 : ///
959 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
960 : /// so it's not well defined which LSN you get if there were multiple commits
961 : /// "in flight" at that point in time.
962 : ///
963 0 : pub(crate) async fn find_lsn_for_timestamp(
964 0 : &self,
965 0 : search_timestamp: TimestampTz,
966 0 : cancel: &CancellationToken,
967 0 : ctx: &RequestContext,
968 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
969 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
970 :
971 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
972 0 : let gc_cutoff_planned = {
973 0 : let gc_info = self.gc_info.read().unwrap();
974 0 : info!(cutoffs=?gc_info.cutoffs, applied_cutoff=%*gc_cutoff_lsn_guard, "starting find_lsn_for_timestamp");
975 0 : gc_info.min_cutoff()
976 : };
977 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
978 : // but let's be defensive.
979 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
980 : // We use this method to figure out the branching LSN for the new branch, but the
981 : // GC cutoff could be before the branching point and we cannot create a new branch
982 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
983 : // on the safe side.
984 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
985 0 : let max_lsn = self.get_last_record_lsn();
986 :
987 : // LSNs are always 8-byte aligned. low/mid/high represent the
988 : // LSN divided by 8.
989 0 : let mut low = min_lsn.0 / 8;
990 0 : let mut high = max_lsn.0 / 8 + 1;
991 :
992 0 : let mut found_smaller = false;
993 0 : let mut found_larger = false;
994 :
995 0 : while low < high {
996 0 : if cancel.is_cancelled() {
997 0 : return Err(PageReconstructError::Cancelled);
998 0 : }
999 : // cannot overflow, high and low are both smaller than u64::MAX / 2
1000 0 : let mid = (high + low) / 2;
1001 :
1002 0 : let cmp = match self
1003 0 : .is_latest_commit_timestamp_ge_than(
1004 0 : search_timestamp,
1005 0 : Lsn(mid * 8),
1006 0 : &mut found_smaller,
1007 0 : &mut found_larger,
1008 0 : ctx,
1009 : )
1010 0 : .await
1011 : {
1012 0 : Ok(res) => res,
1013 0 : Err(PageReconstructError::MissingKey(e)) => {
1014 0 : warn!(
1015 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
1016 : e
1017 : );
1018 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
1019 0 : return Ok(LsnForTimestamp::Past(min_lsn));
1020 : }
1021 0 : Err(e) => return Err(e),
1022 : };
1023 :
1024 0 : if cmp {
1025 0 : high = mid;
1026 0 : } else {
1027 0 : low = mid + 1;
1028 0 : }
1029 : }
1030 :
1031 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
1032 : // so the LSN of the last commit record before or at `search_timestamp`.
1033 : // Remove one from `low` to get `t`.
1034 : //
1035 : // FIXME: it would be better to get the LSN of the previous commit.
1036 : // Otherwise, if you restore to the returned LSN, the database will
1037 : // include physical changes from later commits that will be marked
1038 : // as aborted, and will need to be vacuumed away.
1039 0 : let commit_lsn = Lsn((low - 1) * 8);
1040 0 : match (found_smaller, found_larger) {
1041 : (false, false) => {
1042 : // This can happen if no commit records have been processed yet, e.g.
1043 : // just after importing a cluster.
1044 0 : Ok(LsnForTimestamp::NoData(min_lsn))
1045 : }
1046 : (false, true) => {
1047 : // Didn't find any commit timestamps smaller than the request
1048 0 : Ok(LsnForTimestamp::Past(min_lsn))
1049 : }
1050 0 : (true, _) if commit_lsn < min_lsn => {
1051 : // the search above did set found_smaller to true but it never increased the lsn.
1052 : // Then, low is still the old min_lsn, and the subtraction above gave a value
1053 : // below the min_lsn. We should never do that.
1054 0 : Ok(LsnForTimestamp::Past(min_lsn))
1055 : }
1056 : (true, false) => {
1057 : // Only found commits with timestamps smaller than the request.
1058 : // It's still a valid case for branch creation, return it.
1059 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
1060 : // case, anyway.
1061 0 : Ok(LsnForTimestamp::Future(commit_lsn))
1062 : }
1063 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
1064 : }
1065 0 : }
1066 :
1067 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
1068 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
1069 : ///
1070 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
1071 : /// with a smaller/larger timestamp.
1072 : ///
1073 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
1074 0 : &self,
1075 0 : search_timestamp: TimestampTz,
1076 0 : probe_lsn: Lsn,
1077 0 : found_smaller: &mut bool,
1078 0 : found_larger: &mut bool,
1079 0 : ctx: &RequestContext,
1080 0 : ) -> Result<bool, PageReconstructError> {
1081 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
1082 0 : if timestamp >= search_timestamp {
1083 0 : *found_larger = true;
1084 0 : return ControlFlow::Break(true);
1085 0 : } else {
1086 0 : *found_smaller = true;
1087 0 : }
1088 0 : ControlFlow::Continue(())
1089 0 : })
1090 0 : .await
1091 0 : }
1092 :
1093 : /// Obtain the timestamp for the given lsn.
1094 : ///
1095 : /// If the lsn has no timestamps (e.g. no commits), returns None.
1096 0 : pub(crate) async fn get_timestamp_for_lsn(
1097 0 : &self,
1098 0 : probe_lsn: Lsn,
1099 0 : ctx: &RequestContext,
1100 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
1101 0 : let mut max: Option<TimestampTz> = None;
1102 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
1103 0 : if let Some(max_prev) = max {
1104 0 : max = Some(max_prev.max(timestamp));
1105 0 : } else {
1106 0 : max = Some(timestamp);
1107 0 : }
1108 0 : ControlFlow::Continue(())
1109 0 : })
1110 0 : .await?;
1111 :
1112 0 : Ok(max)
1113 0 : }
1114 :
1115 : /// Runs the given function on all the timestamps for a given lsn
1116 : ///
1117 : /// The return value is either given by the closure, or set to the `Default`
1118 : /// impl's output.
1119 0 : async fn map_all_timestamps<T: Default>(
1120 0 : &self,
1121 0 : probe_lsn: Lsn,
1122 0 : ctx: &RequestContext,
1123 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
1124 0 : ) -> Result<T, PageReconstructError> {
1125 0 : for segno in self
1126 0 : .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
1127 0 : .await?
1128 : {
1129 0 : let nblocks = self
1130 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
1131 0 : .await?;
1132 :
1133 0 : let keyspace = KeySpace::single(
1134 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
1135 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
1136 : );
1137 :
1138 0 : let batches = keyspace.partition(
1139 0 : self.get_shard_identity(),
1140 0 : self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
1141 0 : BLCKSZ as u64,
1142 : );
1143 :
1144 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
1145 0 : self.conf.get_vectored_concurrent_io,
1146 0 : self.gate
1147 0 : .enter()
1148 0 : .map_err(|_| PageReconstructError::Cancelled)?,
1149 : );
1150 :
1151 0 : for batch in batches.parts.into_iter().rev() {
1152 0 : let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
1153 0 : let blocks = self
1154 0 : .get_vectored(query, io_concurrency.clone(), ctx)
1155 0 : .await?;
1156 :
1157 0 : for (_key, clog_page) in blocks.into_iter().rev() {
1158 0 : let clog_page = clog_page?;
1159 :
1160 0 : if clog_page.len() == BLCKSZ as usize + 8 {
1161 0 : let mut timestamp_bytes = [0u8; 8];
1162 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
1163 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
1164 :
1165 0 : match f(timestamp) {
1166 0 : ControlFlow::Break(b) => return Ok(b),
1167 0 : ControlFlow::Continue(()) => (),
1168 : }
1169 0 : }
1170 : }
1171 : }
1172 : }
1173 0 : Ok(Default::default())
1174 0 : }
1175 :
1176 0 : pub(crate) async fn get_slru_keyspace(
1177 0 : &self,
1178 0 : version: Version<'_>,
1179 0 : ctx: &RequestContext,
1180 0 : ) -> Result<KeySpace, PageReconstructError> {
1181 0 : let mut accum = KeySpaceAccum::new();
1182 :
1183 0 : for kind in SlruKind::iter() {
1184 0 : let mut segments: Vec<u32> = self
1185 0 : .list_slru_segments(kind, version, ctx)
1186 0 : .await?
1187 0 : .into_iter()
1188 0 : .collect();
1189 0 : segments.sort_unstable();
1190 :
1191 0 : for seg in segments {
1192 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
1193 :
1194 0 : accum.add_range(
1195 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
1196 : );
1197 : }
1198 : }
1199 :
1200 0 : Ok(accum.to_keyspace())
1201 0 : }
1202 :
1203 : /// Get a list of SLRU segments
1204 0 : pub(crate) async fn list_slru_segments(
1205 0 : &self,
1206 0 : kind: SlruKind,
1207 0 : version: Version<'_>,
1208 0 : ctx: &RequestContext,
1209 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
1210 : // fetch directory entry
1211 0 : let key = slru_dir_to_key(kind);
1212 :
1213 0 : let buf = version.get(self, key, ctx).await?;
1214 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
1215 0 : }
1216 :
1217 0 : pub(crate) async fn get_relmap_file(
1218 0 : &self,
1219 0 : spcnode: Oid,
1220 0 : dbnode: Oid,
1221 0 : version: Version<'_>,
1222 0 : ctx: &RequestContext,
1223 0 : ) -> Result<Bytes, PageReconstructError> {
1224 0 : let key = relmap_file_key(spcnode, dbnode);
1225 :
1226 0 : let buf = version.get(self, key, ctx).await?;
1227 0 : Ok(buf)
1228 0 : }
1229 :
1230 169 : pub(crate) async fn list_dbdirs(
1231 169 : &self,
1232 169 : lsn: Lsn,
1233 169 : ctx: &RequestContext,
1234 169 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
1235 : // fetch directory entry
1236 169 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1237 :
1238 169 : Ok(DbDirectory::des(&buf)?.dbdirs)
1239 169 : }
1240 :
1241 0 : pub(crate) async fn get_twophase_file(
1242 0 : &self,
1243 0 : xid: u64,
1244 0 : lsn: Lsn,
1245 0 : ctx: &RequestContext,
1246 0 : ) -> Result<Bytes, PageReconstructError> {
1247 0 : let key = twophase_file_key(xid);
1248 0 : let buf = self.get(key, lsn, ctx).await?;
1249 0 : Ok(buf)
1250 0 : }
1251 :
1252 170 : pub(crate) async fn list_twophase_files(
1253 170 : &self,
1254 170 : lsn: Lsn,
1255 170 : ctx: &RequestContext,
1256 170 : ) -> Result<HashSet<u64>, PageReconstructError> {
1257 : // fetch directory entry
1258 170 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
1259 :
1260 170 : if self.pg_version >= PgMajorVersion::PG17 {
1261 157 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
1262 : } else {
1263 13 : Ok(TwoPhaseDirectory::des(&buf)?
1264 : .xids
1265 13 : .iter()
1266 13 : .map(|x| u64::from(*x))
1267 13 : .collect())
1268 : }
1269 170 : }
1270 :
1271 0 : pub(crate) async fn get_control_file(
1272 0 : &self,
1273 0 : lsn: Lsn,
1274 0 : ctx: &RequestContext,
1275 0 : ) -> Result<Bytes, PageReconstructError> {
1276 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1277 0 : }
1278 :
1279 6 : pub(crate) async fn get_checkpoint(
1280 6 : &self,
1281 6 : lsn: Lsn,
1282 6 : ctx: &RequestContext,
1283 6 : ) -> Result<Bytes, PageReconstructError> {
1284 6 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1285 6 : }
1286 :
1287 6 : async fn list_aux_files_v2(
1288 6 : &self,
1289 6 : lsn: Lsn,
1290 6 : ctx: &RequestContext,
1291 6 : io_concurrency: IoConcurrency,
1292 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1293 6 : let kv = self
1294 6 : .scan(
1295 6 : KeySpace::single(Key::metadata_aux_key_range()),
1296 6 : lsn,
1297 6 : ctx,
1298 6 : io_concurrency,
1299 6 : )
1300 6 : .await?;
1301 6 : let mut result = HashMap::new();
1302 6 : let mut sz = 0;
1303 15 : for (_, v) in kv {
1304 9 : let v = v?;
1305 9 : let v = aux_file::decode_file_value_bytes(&v)
1306 9 : .context("value decode")
1307 9 : .map_err(PageReconstructError::Other)?;
1308 17 : for (fname, content) in v {
1309 8 : sz += fname.len();
1310 8 : sz += content.len();
1311 8 : result.insert(fname, content);
1312 8 : }
1313 : }
1314 6 : self.aux_file_size_estimator.on_initial(sz);
1315 6 : Ok(result)
1316 6 : }
1317 :
1318 0 : pub(crate) async fn trigger_aux_file_size_computation(
1319 0 : &self,
1320 0 : lsn: Lsn,
1321 0 : ctx: &RequestContext,
1322 0 : io_concurrency: IoConcurrency,
1323 0 : ) -> Result<(), PageReconstructError> {
1324 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1325 0 : Ok(())
1326 0 : }
1327 :
1328 6 : pub(crate) async fn list_aux_files(
1329 6 : &self,
1330 6 : lsn: Lsn,
1331 6 : ctx: &RequestContext,
1332 6 : io_concurrency: IoConcurrency,
1333 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1334 6 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1335 6 : }
1336 :
1337 2 : pub(crate) async fn get_replorigins(
1338 2 : &self,
1339 2 : lsn: Lsn,
1340 2 : ctx: &RequestContext,
1341 2 : io_concurrency: IoConcurrency,
1342 2 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1343 2 : let kv = self
1344 2 : .scan(
1345 2 : KeySpace::single(repl_origin_key_range()),
1346 2 : lsn,
1347 2 : ctx,
1348 2 : io_concurrency,
1349 2 : )
1350 2 : .await?;
1351 2 : let mut result = HashMap::new();
1352 6 : for (k, v) in kv {
1353 5 : let v = v?;
1354 5 : if v.is_empty() {
1355 : // This is a tombstone -- we can skip it.
1356 : // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
1357 : // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
1358 : // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
1359 : // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
1360 2 : continue;
1361 3 : }
1362 3 : let origin_id = k.field6 as RepOriginId;
1363 3 : let origin_lsn = Lsn::des(&v)
1364 3 : .with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
1365 2 : if origin_lsn != Lsn::INVALID {
1366 2 : result.insert(origin_id, origin_lsn);
1367 2 : }
1368 : }
1369 1 : Ok(result)
1370 2 : }
1371 :
1372 : /// Does the same as get_current_logical_size but counted on demand.
1373 : /// Used to initialize the logical size tracking on startup.
1374 : ///
1375 : /// Only relation blocks are counted currently. That excludes metadata,
1376 : /// SLRUs, twophase files etc.
1377 : ///
1378 : /// # Cancel-Safety
1379 : ///
1380 : /// This method is cancellation-safe.
1381 7 : pub(crate) async fn get_current_logical_size_non_incremental(
1382 7 : &self,
1383 7 : lsn: Lsn,
1384 7 : ctx: &RequestContext,
1385 7 : ) -> Result<u64, CalculateLogicalSizeError> {
1386 7 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1387 :
1388 7 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1389 :
1390 : // Fetch list of database dirs and iterate them
1391 7 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1392 7 : let dbdir = DbDirectory::des(&buf)?;
1393 :
1394 7 : let mut total_size: u64 = 0;
1395 7 : let mut dbdir_cnt = 0;
1396 7 : let mut rel_cnt = 0;
1397 :
1398 7 : for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
1399 0 : dbdir_cnt += 1;
1400 0 : for rel in self
1401 0 : .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
1402 0 : .await?
1403 : {
1404 0 : rel_cnt += 1;
1405 0 : if self.cancel.is_cancelled() {
1406 0 : return Err(CalculateLogicalSizeError::Cancelled);
1407 0 : }
1408 0 : let relsize_key = rel_size_to_key(rel);
1409 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1410 0 : let relsize = buf.get_u32_le();
1411 :
1412 0 : total_size += relsize as u64;
1413 : }
1414 : }
1415 :
1416 7 : self.db_rel_count
1417 7 : .store(Some(Arc::new((dbdir_cnt, rel_cnt))));
1418 :
1419 7 : Ok(total_size * BLCKSZ as u64)
1420 7 : }
1421 :
1422 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1423 : /// for gc-compaction.
1424 : ///
1425 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1426 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1427 : /// be kept only for a specific range of LSN.
1428 : ///
1429 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1430 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1431 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1432 : /// determine which keys to retain/drop for gc-compaction.
1433 : ///
1434 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1435 : /// to be retained for each of the branch LSN.
1436 : ///
1437 : /// The return value is (dense keyspace, sparse keyspace).
1438 27 : pub(crate) async fn collect_gc_compaction_keyspace(
1439 27 : &self,
1440 27 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1441 27 : let metadata_key_begin = Key::metadata_key_range().start;
1442 27 : let aux_v1_key = AUX_FILES_KEY;
1443 27 : let dense_keyspace = KeySpace {
1444 27 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1445 27 : };
1446 27 : Ok((
1447 27 : dense_keyspace,
1448 27 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1449 27 : ))
1450 27 : }
1451 :
1452 : ///
1453 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1454 : /// Anything that's not listed maybe removed from the underlying storage (from
1455 : /// that LSN forwards).
1456 : ///
1457 : /// The return value is (dense keyspace, sparse keyspace).
1458 169 : pub(crate) async fn collect_keyspace(
1459 169 : &self,
1460 169 : lsn: Lsn,
1461 169 : ctx: &RequestContext,
1462 169 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1463 : // Iterate through key ranges, greedily packing them into partitions
1464 169 : let mut result = KeySpaceAccum::new();
1465 :
1466 : // The dbdir metadata always exists
1467 169 : result.add_key(DBDIR_KEY);
1468 :
1469 : // Fetch list of database dirs and iterate them
1470 169 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1471 169 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1472 :
1473 169 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1474 169 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1475 0 : if has_relmap_file {
1476 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1477 0 : }
1478 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1479 :
1480 0 : let mut rels: Vec<RelTag> = self
1481 0 : .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
1482 0 : .await?
1483 0 : .into_iter()
1484 0 : .collect();
1485 0 : rels.sort_unstable();
1486 0 : for rel in rels {
1487 0 : let relsize_key = rel_size_to_key(rel);
1488 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1489 0 : let relsize = buf.get_u32_le();
1490 :
1491 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1492 0 : result.add_key(relsize_key);
1493 : }
1494 : }
1495 :
1496 : // Iterate SLRUs next
1497 169 : if self.tenant_shard_id.is_shard_zero() {
1498 498 : for kind in [
1499 166 : SlruKind::Clog,
1500 166 : SlruKind::MultiXactMembers,
1501 166 : SlruKind::MultiXactOffsets,
1502 : ] {
1503 498 : let slrudir_key = slru_dir_to_key(kind);
1504 498 : result.add_key(slrudir_key);
1505 498 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1506 498 : let dir = SlruSegmentDirectory::des(&buf)?;
1507 498 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1508 498 : segments.sort_unstable();
1509 498 : for segno in segments {
1510 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1511 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1512 0 : let segsize = buf.get_u32_le();
1513 :
1514 0 : result.add_range(
1515 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1516 : );
1517 0 : result.add_key(segsize_key);
1518 : }
1519 : }
1520 3 : }
1521 :
1522 : // Then pg_twophase
1523 169 : result.add_key(TWOPHASEDIR_KEY);
1524 :
1525 169 : let mut xids: Vec<u64> = self
1526 169 : .list_twophase_files(lsn, ctx)
1527 169 : .await?
1528 169 : .iter()
1529 169 : .cloned()
1530 169 : .collect();
1531 169 : xids.sort_unstable();
1532 169 : for xid in xids {
1533 0 : result.add_key(twophase_file_key(xid));
1534 0 : }
1535 :
1536 169 : result.add_key(CONTROLFILE_KEY);
1537 169 : result.add_key(CHECKPOINT_KEY);
1538 :
1539 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1540 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1541 : // and the keys will not be garbage-colllected.
1542 : #[cfg(test)]
1543 : {
1544 169 : let guard = self.extra_test_dense_keyspace.load();
1545 169 : for kr in &guard.ranges {
1546 0 : result.add_range(kr.clone());
1547 0 : }
1548 : }
1549 :
1550 169 : let dense_keyspace = result.to_keyspace();
1551 169 : let sparse_keyspace = SparseKeySpace(KeySpace {
1552 169 : ranges: vec![
1553 169 : Key::metadata_aux_key_range(),
1554 169 : repl_origin_key_range(),
1555 169 : Key::rel_dir_sparse_key_range(),
1556 169 : ],
1557 169 : });
1558 :
1559 169 : if cfg!(debug_assertions) {
1560 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1561 :
1562 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1563 : // category of sparse keys are split into their own image/delta files. If there
1564 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1565 : // and we want the developer to keep the keyspaces separated.
1566 :
1567 169 : let ranges = &sparse_keyspace.0.ranges;
1568 :
1569 : // TODO: use a single overlaps_with across the codebase
1570 507 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1571 507 : !(a.end <= b.start || b.end <= a.start)
1572 507 : }
1573 507 : for i in 0..ranges.len() {
1574 507 : for j in 0..i {
1575 507 : if overlaps_with(&ranges[i], &ranges[j]) {
1576 0 : panic!(
1577 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1578 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1579 : );
1580 507 : }
1581 : }
1582 : }
1583 338 : for i in 1..ranges.len() {
1584 338 : assert!(
1585 338 : ranges[i - 1].end <= ranges[i].start,
1586 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1587 0 : ranges[i - 1].start,
1588 0 : ranges[i - 1].end,
1589 0 : ranges[i].start,
1590 0 : ranges[i].end
1591 : );
1592 : }
1593 0 : }
1594 :
1595 169 : Ok((dense_keyspace, sparse_keyspace))
1596 169 : }
1597 :
1598 : /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
1599 : /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
1600 : /// at the particular LSN (snapshot).
1601 224270 : pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
1602 224270 : let lsn = version.get_lsn();
1603 : {
1604 224270 : let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
1605 224270 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
1606 224259 : if lsn >= *cached_lsn {
1607 221686 : RELSIZE_LATEST_CACHE_HITS.inc();
1608 221686 : return Some(*nblocks);
1609 2573 : }
1610 2573 : RELSIZE_CACHE_MISSES_OLD.inc();
1611 11 : }
1612 : }
1613 : {
1614 2584 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1615 2584 : if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
1616 2563 : RELSIZE_SNAPSHOT_CACHE_HITS.inc();
1617 2563 : return Some(*nblock);
1618 21 : }
1619 : }
1620 21 : if version.is_latest() {
1621 10 : RELSIZE_LATEST_CACHE_MISSES.inc();
1622 11 : } else {
1623 11 : RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
1624 11 : }
1625 21 : None
1626 224270 : }
1627 :
1628 : /// Update cached relation size if there is no more recent update
1629 5 : pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
1630 5 : let lsn = version.get_lsn();
1631 5 : if version.is_latest() {
1632 0 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1633 0 : match rel_size_cache.entry(tag) {
1634 0 : hash_map::Entry::Occupied(mut entry) => {
1635 0 : let cached_lsn = entry.get_mut();
1636 0 : if lsn >= cached_lsn.0 {
1637 0 : *cached_lsn = (lsn, nblocks);
1638 0 : }
1639 : }
1640 0 : hash_map::Entry::Vacant(entry) => {
1641 0 : entry.insert((lsn, nblocks));
1642 0 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1643 0 : }
1644 : }
1645 : } else {
1646 5 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1647 5 : if rel_size_cache.capacity() != 0 {
1648 5 : rel_size_cache.insert((lsn, tag), nblocks);
1649 5 : RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
1650 5 : }
1651 : }
1652 5 : }
1653 :
1654 : /// Store cached relation size
1655 141360 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1656 141360 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1657 141360 : if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
1658 960 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1659 140400 : }
1660 141360 : }
1661 :
1662 : /// Remove cached relation size
1663 1 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1664 1 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1665 1 : if rel_size_cache.remove(tag).is_some() {
1666 1 : RELSIZE_LATEST_CACHE_ENTRIES.dec();
1667 1 : }
1668 1 : }
1669 : }
1670 :
1671 : /// DatadirModification represents an operation to ingest an atomic set of
1672 : /// updates to the repository.
1673 : ///
1674 : /// It is created by the 'begin_record' function. It is called for each WAL
1675 : /// record, so that all the modifications by a one WAL record appear atomic.
1676 : pub struct DatadirModification<'a> {
1677 : /// The timeline this modification applies to. You can access this to
1678 : /// read the state, but note that any pending updates are *not* reflected
1679 : /// in the state in 'tline' yet.
1680 : pub tline: &'a Timeline,
1681 :
1682 : /// Current LSN of the modification
1683 : lsn: Lsn,
1684 :
1685 : // The modifications are not applied directly to the underlying key-value store.
1686 : // The put-functions add the modifications here, and they are flushed to the
1687 : // underlying key-value store by the 'finish' function.
1688 : pending_lsns: Vec<Lsn>,
1689 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1690 : pending_nblocks: i64,
1691 :
1692 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1693 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1694 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1695 :
1696 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1697 : /// which keys are stored here.
1698 : pending_data_batch: Option<SerializedValueBatch>,
1699 :
1700 : /// For special "directory" keys that store key-value maps, track the size of the map
1701 : /// if it was updated in this modification.
1702 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1703 :
1704 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1705 : pending_metadata_bytes: usize,
1706 :
1707 : /// Whether we are importing a pgdata directory.
1708 : is_importing_pgdata: bool,
1709 : }
1710 :
1711 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1712 : pub enum MetricsUpdate {
1713 : /// Set the metrics to this value
1714 : Set(u64),
1715 : /// Increment the metrics by this value
1716 : Add(u64),
1717 : /// Decrement the metrics by this value
1718 : Sub(u64),
1719 : }
1720 :
1721 : /// Controls the behavior of the reldir keyspace.
1722 : pub struct RelDirMode {
1723 : // Whether we can read the v2 keyspace or not.
1724 : current_status: RelSizeMigration,
1725 : // Whether we should initialize the v2 keyspace or not.
1726 : initialize: bool,
1727 : }
1728 :
1729 : impl DatadirModification<'_> {
1730 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1731 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1732 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1733 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1734 :
1735 : /// Get the current lsn
1736 1 : pub(crate) fn get_lsn(&self) -> Lsn {
1737 1 : self.lsn
1738 1 : }
1739 :
1740 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1741 0 : self.pending_data_batch
1742 0 : .as_ref()
1743 0 : .map_or(0, |b| b.buffer_size())
1744 0 : + self.pending_metadata_bytes
1745 0 : }
1746 :
1747 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1748 0 : self.pending_data_batch
1749 0 : .as_ref()
1750 0 : .is_some_and(|b| b.has_data())
1751 0 : }
1752 :
1753 : /// Returns statistics about the currently pending modifications.
1754 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1755 0 : let mut stats = DatadirModificationStats::default();
1756 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1757 0 : match value {
1758 0 : Value::Image(_) => stats.metadata_images += 1,
1759 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1760 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1761 : }
1762 : }
1763 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1764 0 : match valuemeta {
1765 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1766 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1767 0 : ValueMeta::Observed(_) => {}
1768 : }
1769 : }
1770 0 : stats
1771 0 : }
1772 :
1773 : /// Set the current lsn
1774 72929 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
1775 72929 : ensure_walingest!(
1776 : lsn >= self.lsn,
1777 : "setting an older lsn {} than {} is not allowed",
1778 : lsn,
1779 : self.lsn
1780 : );
1781 :
1782 72929 : if lsn > self.lsn {
1783 72929 : self.pending_lsns.push(self.lsn);
1784 72929 : self.lsn = lsn;
1785 72929 : }
1786 72929 : Ok(())
1787 72929 : }
1788 :
1789 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1790 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1791 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1792 : ///
1793 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1794 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1795 : /// via [`Self::get`] as soon as their record has been ingested.
1796 425371 : fn is_data_key(key: &Key) -> bool {
1797 425371 : key.is_rel_block_key() || key.is_slru_block_key()
1798 425371 : }
1799 :
1800 : /// Initialize a completely new repository.
1801 : ///
1802 : /// This inserts the directory metadata entries that are assumed to
1803 : /// always exist.
1804 112 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1805 112 : let buf = DbDirectory::ser(&DbDirectory {
1806 112 : dbdirs: HashMap::new(),
1807 112 : })?;
1808 112 : self.pending_directory_entries
1809 112 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1810 112 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1811 :
1812 112 : let buf = if self.tline.pg_version >= PgMajorVersion::PG17 {
1813 99 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1814 99 : xids: HashSet::new(),
1815 99 : })
1816 : } else {
1817 13 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1818 13 : xids: HashSet::new(),
1819 13 : })
1820 0 : }?;
1821 112 : self.pending_directory_entries
1822 112 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1823 112 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1824 :
1825 112 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1826 112 : let empty_dir = Value::Image(buf);
1827 :
1828 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1829 : // harmless but they'd just be dropped on later compaction.
1830 112 : if self.tline.tenant_shard_id.is_shard_zero() {
1831 109 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1832 109 : self.pending_directory_entries.push((
1833 109 : DirectoryKind::SlruSegment(SlruKind::Clog),
1834 109 : MetricsUpdate::Set(0),
1835 109 : ));
1836 109 : self.put(
1837 109 : slru_dir_to_key(SlruKind::MultiXactMembers),
1838 109 : empty_dir.clone(),
1839 109 : );
1840 109 : self.pending_directory_entries.push((
1841 109 : DirectoryKind::SlruSegment(SlruKind::Clog),
1842 109 : MetricsUpdate::Set(0),
1843 109 : ));
1844 109 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1845 109 : self.pending_directory_entries.push((
1846 109 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1847 109 : MetricsUpdate::Set(0),
1848 109 : ));
1849 109 : }
1850 :
1851 112 : Ok(())
1852 112 : }
1853 :
1854 : #[cfg(test)]
1855 111 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1856 111 : self.init_empty()?;
1857 111 : self.put_control_file(bytes::Bytes::from_static(
1858 111 : b"control_file contents do not matter",
1859 : ))
1860 111 : .context("put_control_file")?;
1861 111 : self.put_checkpoint(bytes::Bytes::from_static(
1862 111 : b"checkpoint_file contents do not matter",
1863 : ))
1864 111 : .context("put_checkpoint_file")?;
1865 111 : Ok(())
1866 111 : }
1867 :
1868 : /// Creates a relation if it is not already present.
1869 : /// Returns the current size of the relation
1870 209028 : pub(crate) async fn create_relation_if_required(
1871 209028 : &mut self,
1872 209028 : rel: RelTag,
1873 209028 : ctx: &RequestContext,
1874 209028 : ) -> Result<u32, WalIngestError> {
1875 : // Get current size and put rel creation if rel doesn't exist
1876 : //
1877 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1878 : // check the cache too. This is because eagerly checking the cache results in
1879 : // less work overall and 10% better performance. It's more work on cache miss
1880 : // but cache miss is rare.
1881 209028 : if let Some(nblocks) = self
1882 209028 : .tline
1883 209028 : .get_cached_rel_size(&rel, Version::Modified(self))
1884 : {
1885 209023 : Ok(nblocks)
1886 5 : } else if !self
1887 5 : .tline
1888 5 : .get_rel_exists(rel, Version::Modified(self), ctx)
1889 5 : .await?
1890 : {
1891 : // create it with 0 size initially, the logic below will extend it
1892 5 : self.put_rel_creation(rel, 0, ctx).await?;
1893 5 : Ok(0)
1894 : } else {
1895 0 : Ok(self
1896 0 : .tline
1897 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1898 0 : .await?)
1899 : }
1900 209028 : }
1901 :
1902 : /// Given a block number for a relation (which represents a newly written block),
1903 : /// the previous block count of the relation, and the shard info, find the gaps
1904 : /// that were created by the newly written block if any.
1905 72835 : fn find_gaps(
1906 72835 : rel: RelTag,
1907 72835 : blkno: u32,
1908 72835 : previous_nblocks: u32,
1909 72835 : shard: &ShardIdentity,
1910 72835 : ) -> Option<KeySpace> {
1911 72835 : let mut key = rel_block_to_key(rel, blkno);
1912 72835 : let mut gap_accum = None;
1913 :
1914 72835 : for gap_blkno in previous_nblocks..blkno {
1915 16 : key.field6 = gap_blkno;
1916 :
1917 16 : if shard.get_shard_number(&key) != shard.number {
1918 4 : continue;
1919 12 : }
1920 :
1921 12 : gap_accum
1922 12 : .get_or_insert_with(KeySpaceAccum::new)
1923 12 : .add_key(key);
1924 : }
1925 :
1926 72835 : gap_accum.map(|accum| accum.to_keyspace())
1927 72835 : }
1928 :
1929 72926 : pub async fn ingest_batch(
1930 72926 : &mut self,
1931 72926 : mut batch: SerializedValueBatch,
1932 72926 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1933 72926 : shard: &ShardIdentity,
1934 72926 : ctx: &RequestContext,
1935 72926 : ) -> Result<(), WalIngestError> {
1936 72926 : let mut gaps_at_lsns = Vec::default();
1937 :
1938 72926 : for meta in batch.metadata.iter() {
1939 72821 : let key = Key::from_compact(meta.key());
1940 72821 : let (rel, blkno) = key
1941 72821 : .to_rel_block()
1942 72821 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
1943 72821 : let new_nblocks = blkno + 1;
1944 :
1945 72821 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1946 72821 : if new_nblocks > old_nblocks {
1947 1195 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1948 71626 : }
1949 :
1950 72821 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1951 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1952 72821 : }
1953 : }
1954 :
1955 72926 : if !gaps_at_lsns.is_empty() {
1956 0 : batch.zero_gaps(gaps_at_lsns);
1957 72926 : }
1958 :
1959 72926 : match self.pending_data_batch.as_mut() {
1960 10 : Some(pending_batch) => {
1961 10 : pending_batch.extend(batch);
1962 10 : }
1963 72916 : None if batch.has_data() => {
1964 72815 : self.pending_data_batch = Some(batch);
1965 72815 : }
1966 101 : None => {
1967 101 : // Nothing to initialize the batch with
1968 101 : }
1969 : }
1970 :
1971 72926 : Ok(())
1972 72926 : }
1973 :
1974 : /// Put a new page version that can be constructed from a WAL record
1975 : ///
1976 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1977 : /// current end-of-file. It's up to the caller to check that the relation size
1978 : /// matches the blocks inserted!
1979 6 : pub fn put_rel_wal_record(
1980 6 : &mut self,
1981 6 : rel: RelTag,
1982 6 : blknum: BlockNumber,
1983 6 : rec: NeonWalRecord,
1984 6 : ) -> Result<(), WalIngestError> {
1985 6 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1986 6 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1987 6 : Ok(())
1988 6 : }
1989 :
1990 : // Same, but for an SLRU.
1991 4 : pub fn put_slru_wal_record(
1992 4 : &mut self,
1993 4 : kind: SlruKind,
1994 4 : segno: u32,
1995 4 : blknum: BlockNumber,
1996 4 : rec: NeonWalRecord,
1997 4 : ) -> Result<(), WalIngestError> {
1998 4 : if !self.tline.tenant_shard_id.is_shard_zero() {
1999 0 : return Ok(());
2000 4 : }
2001 :
2002 4 : self.put(
2003 4 : slru_block_to_key(kind, segno, blknum),
2004 4 : Value::WalRecord(rec),
2005 : );
2006 4 : Ok(())
2007 4 : }
2008 :
2009 : /// Like put_wal_record, but with ready-made image of the page.
2010 138921 : pub fn put_rel_page_image(
2011 138921 : &mut self,
2012 138921 : rel: RelTag,
2013 138921 : blknum: BlockNumber,
2014 138921 : img: Bytes,
2015 138921 : ) -> Result<(), WalIngestError> {
2016 138921 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2017 138921 : let key = rel_block_to_key(rel, blknum);
2018 138921 : if !key.is_valid_key_on_write_path() {
2019 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2020 138921 : }
2021 138921 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
2022 138921 : Ok(())
2023 138921 : }
2024 :
2025 3 : pub fn put_slru_page_image(
2026 3 : &mut self,
2027 3 : kind: SlruKind,
2028 3 : segno: u32,
2029 3 : blknum: BlockNumber,
2030 3 : img: Bytes,
2031 3 : ) -> Result<(), WalIngestError> {
2032 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2033 :
2034 3 : let key = slru_block_to_key(kind, segno, blknum);
2035 3 : if !key.is_valid_key_on_write_path() {
2036 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2037 3 : }
2038 3 : self.put(key, Value::Image(img));
2039 3 : Ok(())
2040 3 : }
2041 :
2042 1499 : pub(crate) fn put_rel_page_image_zero(
2043 1499 : &mut self,
2044 1499 : rel: RelTag,
2045 1499 : blknum: BlockNumber,
2046 1499 : ) -> Result<(), WalIngestError> {
2047 1499 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2048 1499 : let key = rel_block_to_key(rel, blknum);
2049 1499 : if !key.is_valid_key_on_write_path() {
2050 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2051 1499 : }
2052 :
2053 1499 : let batch = self
2054 1499 : .pending_data_batch
2055 1499 : .get_or_insert_with(SerializedValueBatch::default);
2056 :
2057 1499 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
2058 :
2059 1499 : Ok(())
2060 1499 : }
2061 :
2062 0 : pub(crate) fn put_slru_page_image_zero(
2063 0 : &mut self,
2064 0 : kind: SlruKind,
2065 0 : segno: u32,
2066 0 : blknum: BlockNumber,
2067 0 : ) -> Result<(), WalIngestError> {
2068 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2069 0 : let key = slru_block_to_key(kind, segno, blknum);
2070 0 : if !key.is_valid_key_on_write_path() {
2071 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2072 0 : }
2073 :
2074 0 : let batch = self
2075 0 : .pending_data_batch
2076 0 : .get_or_insert_with(SerializedValueBatch::default);
2077 :
2078 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
2079 :
2080 0 : Ok(())
2081 0 : }
2082 :
2083 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
2084 : /// we enable it, we also need to persist it in `index_part.json` (initialize is true).
2085 : ///
2086 : /// As this function is only used on the write path, we do not need to read the migrated_at
2087 : /// field.
2088 973 : pub fn maybe_enable_rel_size_v2(&mut self, is_create: bool) -> anyhow::Result<RelDirMode> {
2089 : // TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature
2090 :
2091 973 : let (status, _) = self.tline.get_rel_size_v2_status();
2092 973 : let config = self.tline.get_rel_size_v2_enabled();
2093 973 : match (config, status) {
2094 : (false, RelSizeMigration::Legacy) => {
2095 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
2096 973 : Ok(RelDirMode {
2097 973 : current_status: RelSizeMigration::Legacy,
2098 973 : initialize: false,
2099 973 : })
2100 : }
2101 0 : (false, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
2102 : // index_part already persisted that the timeline has enabled rel_size_v2
2103 0 : Ok(RelDirMode {
2104 0 : current_status: status,
2105 0 : initialize: false,
2106 0 : })
2107 : }
2108 : (true, RelSizeMigration::Legacy) => {
2109 : // The first time we enable it, we need to persist it in `index_part.json`
2110 : // The caller should update the reldir status once the initialization is done.
2111 : //
2112 : // Only initialize the v2 keyspace on new relation creation. No initialization
2113 : // during `timeline_create` (TODO: fix this, we should allow, but currently it
2114 : // hits consistency issues).
2115 : Ok(RelDirMode {
2116 0 : current_status: RelSizeMigration::Legacy,
2117 0 : initialize: is_create && !self.is_importing_pgdata,
2118 : })
2119 : }
2120 0 : (true, status @ RelSizeMigration::Migrating | status @ RelSizeMigration::Migrated) => {
2121 : // index_part already persisted that the timeline has enabled rel_size_v2
2122 : // and we don't need to do anything
2123 0 : Ok(RelDirMode {
2124 0 : current_status: status,
2125 0 : initialize: false,
2126 0 : })
2127 : }
2128 : }
2129 973 : }
2130 :
2131 : /// Store a relmapper file (pg_filenode.map) in the repository
2132 8 : pub async fn put_relmap_file(
2133 8 : &mut self,
2134 8 : spcnode: Oid,
2135 8 : dbnode: Oid,
2136 8 : img: Bytes,
2137 8 : ctx: &RequestContext,
2138 8 : ) -> Result<(), WalIngestError> {
2139 8 : let v2_mode = self
2140 8 : .maybe_enable_rel_size_v2(false)
2141 8 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2142 :
2143 : // Add it to the directory (if it doesn't exist already)
2144 8 : let buf = self.get(DBDIR_KEY, ctx).await?;
2145 8 : let mut dbdir = DbDirectory::des(&buf)?;
2146 :
2147 8 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
2148 8 : if r.is_none() || r == Some(false) {
2149 : // The dbdir entry didn't exist, or it contained a
2150 : // 'false'. The 'insert' call already updated it with
2151 : // 'true', now write the updated 'dbdirs' map back.
2152 8 : let buf = DbDirectory::ser(&dbdir)?;
2153 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2154 0 : }
2155 8 : if r.is_none() {
2156 4 : if v2_mode.current_status != RelSizeMigration::Legacy {
2157 0 : self.pending_directory_entries
2158 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2159 4 : }
2160 :
2161 : // Create RelDirectory in v1 keyspace. TODO: if we have fully migrated to v2, no need to create this directory.
2162 : // Some code path relies on this directory to be present. We should remove it once we starts to set tenants to
2163 : // `RelSizeMigration::Migrated` state (currently we don't, all tenants will have `RelSizeMigration::Migrating`).
2164 4 : let buf = RelDirectory::ser(&RelDirectory {
2165 4 : rels: HashSet::new(),
2166 4 : })?;
2167 4 : self.pending_directory_entries
2168 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2169 4 : self.put(
2170 4 : rel_dir_to_key(spcnode, dbnode),
2171 4 : Value::Image(Bytes::from(buf)),
2172 : );
2173 4 : }
2174 :
2175 8 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
2176 8 : Ok(())
2177 8 : }
2178 :
2179 0 : pub async fn put_twophase_file(
2180 0 : &mut self,
2181 0 : xid: u64,
2182 0 : img: Bytes,
2183 0 : ctx: &RequestContext,
2184 0 : ) -> Result<(), WalIngestError> {
2185 : // Add it to the directory entry
2186 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2187 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2188 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
2189 0 : if !dir.xids.insert(xid) {
2190 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
2191 0 : }
2192 0 : self.pending_directory_entries.push((
2193 0 : DirectoryKind::TwoPhase,
2194 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2195 0 : ));
2196 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2197 : } else {
2198 0 : let xid = xid as u32;
2199 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
2200 0 : if !dir.xids.insert(xid) {
2201 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
2202 0 : }
2203 0 : self.pending_directory_entries.push((
2204 0 : DirectoryKind::TwoPhase,
2205 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2206 0 : ));
2207 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2208 : };
2209 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2210 :
2211 0 : self.put(twophase_file_key(xid), Value::Image(img));
2212 0 : Ok(())
2213 0 : }
2214 :
2215 1 : pub async fn set_replorigin(
2216 1 : &mut self,
2217 1 : origin_id: RepOriginId,
2218 1 : origin_lsn: Lsn,
2219 1 : ) -> Result<(), WalIngestError> {
2220 1 : let key = repl_origin_key(origin_id);
2221 1 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
2222 1 : Ok(())
2223 1 : }
2224 :
2225 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
2226 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
2227 0 : }
2228 :
2229 112 : pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2230 112 : self.put(CONTROLFILE_KEY, Value::Image(img));
2231 112 : Ok(())
2232 112 : }
2233 :
2234 119 : pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
2235 119 : self.put(CHECKPOINT_KEY, Value::Image(img));
2236 119 : Ok(())
2237 119 : }
2238 :
2239 0 : pub async fn drop_dbdir(
2240 0 : &mut self,
2241 0 : spcnode: Oid,
2242 0 : dbnode: Oid,
2243 0 : ctx: &RequestContext,
2244 0 : ) -> Result<(), WalIngestError> {
2245 0 : let total_blocks = self
2246 0 : .tline
2247 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
2248 0 : .await?;
2249 :
2250 : // Remove entry from dbdir
2251 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
2252 0 : let mut dir = DbDirectory::des(&buf)?;
2253 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
2254 0 : let buf = DbDirectory::ser(&dir)?;
2255 0 : self.pending_directory_entries.push((
2256 0 : DirectoryKind::Db,
2257 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
2258 0 : ));
2259 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2260 : } else {
2261 0 : warn!(
2262 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
2263 : spcnode, dbnode
2264 : );
2265 : }
2266 :
2267 : // Update logical database size.
2268 0 : self.pending_nblocks -= total_blocks as i64;
2269 :
2270 : // Delete all relations and metadata files for the spcnode/dnode
2271 0 : self.delete(dbdir_key_range(spcnode, dbnode));
2272 0 : Ok(())
2273 0 : }
2274 :
2275 0 : async fn initialize_rel_size_v2_keyspace(
2276 0 : &mut self,
2277 0 : ctx: &RequestContext,
2278 0 : dbdir: &DbDirectory,
2279 0 : ) -> Result<(), WalIngestError> {
2280 : // Copy everything from relv1 to relv2; TODO: check if there's any key in the v2 keyspace, if so, abort.
2281 0 : tracing::info!("initializing rel_size_v2 keyspace");
2282 0 : let mut rel_cnt = 0;
2283 : // relmap_exists (the value of dbdirs hashmap) does not affect the migration: we need to copy things over anyways
2284 0 : for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
2285 0 : let rel_dir_key = rel_dir_to_key(spcnode, dbnode);
2286 0 : let rel_dir = RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?;
2287 0 : for (relnode, forknum) in rel_dir.rels {
2288 0 : let sparse_rel_dir_key = rel_tag_sparse_key(spcnode, dbnode, relnode, forknum);
2289 0 : self.put(
2290 0 : sparse_rel_dir_key,
2291 0 : Value::Image(RelDirExists::Exists.encode()),
2292 : );
2293 0 : tracing::info!(
2294 0 : "migrated rel_size_v2: {}",
2295 0 : RelTag {
2296 0 : spcnode,
2297 0 : dbnode,
2298 0 : relnode,
2299 0 : forknum
2300 0 : }
2301 : );
2302 0 : rel_cnt += 1;
2303 : }
2304 : }
2305 0 : tracing::info!(
2306 0 : "initialized rel_size_v2 keyspace at lsn {}: migrated {} relations",
2307 : self.lsn,
2308 : rel_cnt
2309 : );
2310 0 : self.tline
2311 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating, Some(self.lsn))
2312 0 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2313 0 : Ok::<_, WalIngestError>(())
2314 0 : }
2315 :
2316 960 : async fn put_rel_creation_v1(
2317 960 : &mut self,
2318 960 : rel: RelTag,
2319 960 : dbdir_exists: bool,
2320 960 : ctx: &RequestContext,
2321 960 : ) -> Result<(), WalIngestError> {
2322 : // Reldir v1 write path
2323 960 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
2324 960 : let mut rel_dir = if !dbdir_exists {
2325 : // Create the RelDirectory
2326 4 : RelDirectory::default()
2327 : } else {
2328 : // reldir already exists, fetch it
2329 956 : RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
2330 : };
2331 :
2332 : // Add the new relation to the rel directory entry, and write it back
2333 960 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2334 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2335 960 : }
2336 960 : if !dbdir_exists {
2337 4 : self.pending_directory_entries
2338 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2339 956 : }
2340 960 : self.pending_directory_entries
2341 960 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2342 960 : self.put(
2343 960 : rel_dir_key,
2344 960 : Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
2345 : );
2346 960 : Ok(())
2347 960 : }
2348 :
2349 0 : async fn put_rel_creation_v2(
2350 0 : &mut self,
2351 0 : rel: RelTag,
2352 0 : dbdir_exists: bool,
2353 0 : ctx: &RequestContext,
2354 0 : ) -> Result<(), WalIngestError> {
2355 : // Reldir v2 write path
2356 0 : let sparse_rel_dir_key =
2357 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2358 : // check if the rel_dir_key exists in v2
2359 0 : let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
2360 0 : let val = RelDirExists::decode_option(val)
2361 0 : .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
2362 0 : if val == RelDirExists::Exists {
2363 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2364 0 : }
2365 0 : self.put(
2366 0 : sparse_rel_dir_key,
2367 0 : Value::Image(RelDirExists::Exists.encode()),
2368 : );
2369 0 : if !dbdir_exists {
2370 0 : self.pending_directory_entries
2371 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2372 0 : }
2373 0 : self.pending_directory_entries
2374 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2375 0 : Ok(())
2376 0 : }
2377 :
2378 : /// Create a relation fork.
2379 : ///
2380 : /// 'nblocks' is the initial size.
2381 960 : pub async fn put_rel_creation(
2382 960 : &mut self,
2383 960 : rel: RelTag,
2384 960 : nblocks: BlockNumber,
2385 960 : ctx: &RequestContext,
2386 960 : ) -> Result<(), WalIngestError> {
2387 960 : if rel.relnode == 0 {
2388 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
2389 0 : "invalid relnode"
2390 0 : )))?;
2391 960 : }
2392 : // It's possible that this is the first rel for this db in this
2393 : // tablespace. Create the reldir entry for it if so.
2394 960 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
2395 :
2396 960 : let dbdir_exists =
2397 960 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
2398 : // Didn't exist. Update dbdir
2399 4 : e.insert(false);
2400 4 : let buf = DbDirectory::ser(&dbdir)?;
2401 4 : self.pending_directory_entries.push((
2402 4 : DirectoryKind::Db,
2403 4 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
2404 4 : ));
2405 4 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2406 4 : false
2407 : } else {
2408 956 : true
2409 : };
2410 :
2411 960 : let mut v2_mode = self
2412 960 : .maybe_enable_rel_size_v2(true)
2413 960 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2414 :
2415 960 : if v2_mode.initialize {
2416 0 : if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await {
2417 0 : tracing::warn!("error initializing rel_size_v2 keyspace: {}", e);
2418 : // TODO: circuit breaker so that it won't retry forever
2419 0 : } else {
2420 0 : v2_mode.current_status = RelSizeMigration::Migrating;
2421 0 : }
2422 960 : }
2423 :
2424 960 : if v2_mode.current_status != RelSizeMigration::Migrated {
2425 960 : self.put_rel_creation_v1(rel, dbdir_exists, ctx).await?;
2426 0 : }
2427 :
2428 960 : if v2_mode.current_status != RelSizeMigration::Legacy {
2429 0 : let write_v2_res = self.put_rel_creation_v2(rel, dbdir_exists, ctx).await;
2430 0 : if let Err(e) = write_v2_res {
2431 0 : if v2_mode.current_status == RelSizeMigration::Migrated {
2432 0 : return Err(e);
2433 0 : }
2434 0 : tracing::warn!("error writing rel_size_v2 keyspace: {}", e);
2435 0 : }
2436 960 : }
2437 :
2438 : // Put size
2439 960 : let size_key = rel_size_to_key(rel);
2440 960 : let buf = nblocks.to_le_bytes();
2441 960 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2442 :
2443 960 : self.pending_nblocks += nblocks as i64;
2444 :
2445 : // Update relation size cache
2446 960 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2447 :
2448 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2449 : // caller.
2450 960 : Ok(())
2451 960 : }
2452 :
2453 : /// Truncate relation
2454 3006 : pub async fn put_rel_truncation(
2455 3006 : &mut self,
2456 3006 : rel: RelTag,
2457 3006 : nblocks: BlockNumber,
2458 3006 : ctx: &RequestContext,
2459 3006 : ) -> Result<(), WalIngestError> {
2460 3006 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2461 3006 : if self
2462 3006 : .tline
2463 3006 : .get_rel_exists(rel, Version::Modified(self), ctx)
2464 3006 : .await?
2465 : {
2466 3006 : let size_key = rel_size_to_key(rel);
2467 : // Fetch the old size first
2468 3006 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2469 :
2470 : // Update the entry with the new size.
2471 3006 : let buf = nblocks.to_le_bytes();
2472 3006 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2473 :
2474 : // Update relation size cache
2475 3006 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2476 :
2477 : // Update logical database size.
2478 3006 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2479 0 : }
2480 3006 : Ok(())
2481 3006 : }
2482 :
2483 : /// Extend relation
2484 : /// If new size is smaller, do nothing.
2485 138340 : pub async fn put_rel_extend(
2486 138340 : &mut self,
2487 138340 : rel: RelTag,
2488 138340 : nblocks: BlockNumber,
2489 138340 : ctx: &RequestContext,
2490 138340 : ) -> Result<(), WalIngestError> {
2491 138340 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2492 :
2493 : // Put size
2494 138340 : let size_key = rel_size_to_key(rel);
2495 138340 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2496 :
2497 : // only extend relation here. never decrease the size
2498 138340 : if nblocks > old_size {
2499 137394 : let buf = nblocks.to_le_bytes();
2500 137394 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2501 137394 :
2502 137394 : // Update relation size cache
2503 137394 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2504 137394 :
2505 137394 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2506 137394 : }
2507 138340 : Ok(())
2508 138340 : }
2509 :
2510 5 : async fn put_rel_drop_v1(
2511 5 : &mut self,
2512 5 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2513 5 : ctx: &RequestContext,
2514 5 : ) -> Result<BTreeSet<RelTag>, WalIngestError> {
2515 5 : let mut dropped_rels = BTreeSet::new();
2516 6 : for ((spc_node, db_node), rel_tags) in drop_relations {
2517 1 : let dir_key = rel_dir_to_key(spc_node, db_node);
2518 1 : let buf = self.get(dir_key, ctx).await?;
2519 1 : let mut dir = RelDirectory::des(&buf)?;
2520 :
2521 1 : let mut dirty = false;
2522 2 : for rel_tag in rel_tags {
2523 1 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2524 1 : self.pending_directory_entries
2525 1 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2526 1 : dirty = true;
2527 1 : dropped_rels.insert(rel_tag);
2528 1 : true
2529 : } else {
2530 0 : false
2531 : };
2532 :
2533 1 : if found {
2534 : // update logical size
2535 1 : let size_key = rel_size_to_key(rel_tag);
2536 1 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2537 1 : self.pending_nblocks -= old_size as i64;
2538 :
2539 : // Remove entry from relation size cache
2540 1 : self.tline.remove_cached_rel_size(&rel_tag);
2541 :
2542 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2543 1 : self.delete(rel_key_range(rel_tag));
2544 0 : }
2545 : }
2546 :
2547 1 : if dirty {
2548 1 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2549 0 : }
2550 : }
2551 5 : Ok(dropped_rels)
2552 5 : }
2553 :
2554 0 : async fn put_rel_drop_v2(
2555 0 : &mut self,
2556 0 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2557 0 : ctx: &RequestContext,
2558 0 : ) -> Result<BTreeSet<RelTag>, WalIngestError> {
2559 0 : let mut dropped_rels = BTreeSet::new();
2560 0 : for ((spc_node, db_node), rel_tags) in drop_relations {
2561 0 : for rel_tag in rel_tags {
2562 0 : let key = rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2563 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2564 0 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2565 0 : if val == RelDirExists::Exists {
2566 0 : dropped_rels.insert(rel_tag);
2567 0 : self.pending_directory_entries
2568 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2569 0 : // put tombstone
2570 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2571 0 : }
2572 : }
2573 : }
2574 0 : Ok(dropped_rels)
2575 0 : }
2576 :
2577 : /// Drop some relations
2578 5 : pub(crate) async fn put_rel_drops(
2579 5 : &mut self,
2580 5 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2581 5 : ctx: &RequestContext,
2582 5 : ) -> Result<(), WalIngestError> {
2583 5 : let v2_mode = self
2584 5 : .maybe_enable_rel_size_v2(false)
2585 5 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2586 5 : match v2_mode.current_status {
2587 : RelSizeMigration::Legacy => {
2588 5 : self.put_rel_drop_v1(drop_relations, ctx).await?;
2589 : }
2590 : RelSizeMigration::Migrating => {
2591 0 : let dropped_rels_v1 = self.put_rel_drop_v1(drop_relations.clone(), ctx).await?;
2592 0 : let dropped_rels_v2_res = self.put_rel_drop_v2(drop_relations, ctx).await;
2593 0 : match dropped_rels_v2_res {
2594 0 : Ok(dropped_rels_v2) => {
2595 0 : if dropped_rels_v1 != dropped_rels_v2 {
2596 0 : tracing::warn!(
2597 0 : "inconsistent v1/v2 rel drop: dropped_rels_v1.len()={}, dropped_rels_v2.len()={}",
2598 0 : dropped_rels_v1.len(),
2599 0 : dropped_rels_v2.len()
2600 : );
2601 0 : }
2602 : }
2603 0 : Err(e) => {
2604 0 : tracing::warn!("error dropping rels: {}", e);
2605 : }
2606 : }
2607 : }
2608 : RelSizeMigration::Migrated => {
2609 0 : self.put_rel_drop_v2(drop_relations, ctx).await?;
2610 : }
2611 : }
2612 5 : Ok(())
2613 5 : }
2614 :
2615 3 : pub async fn put_slru_segment_creation(
2616 3 : &mut self,
2617 3 : kind: SlruKind,
2618 3 : segno: u32,
2619 3 : nblocks: BlockNumber,
2620 3 : ctx: &RequestContext,
2621 3 : ) -> Result<(), WalIngestError> {
2622 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2623 :
2624 : // Add it to the directory entry
2625 3 : let dir_key = slru_dir_to_key(kind);
2626 3 : let buf = self.get(dir_key, ctx).await?;
2627 3 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2628 :
2629 3 : if !dir.segments.insert(segno) {
2630 0 : Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
2631 3 : }
2632 3 : self.pending_directory_entries.push((
2633 3 : DirectoryKind::SlruSegment(kind),
2634 3 : MetricsUpdate::Set(dir.segments.len() as u64),
2635 3 : ));
2636 3 : self.put(
2637 3 : dir_key,
2638 3 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2639 : );
2640 :
2641 : // Put size
2642 3 : let size_key = slru_segment_size_to_key(kind, segno);
2643 3 : let buf = nblocks.to_le_bytes();
2644 3 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2645 :
2646 : // even if nblocks > 0, we don't insert any actual blocks here
2647 :
2648 3 : Ok(())
2649 3 : }
2650 :
2651 : /// Extend SLRU segment
2652 0 : pub fn put_slru_extend(
2653 0 : &mut self,
2654 0 : kind: SlruKind,
2655 0 : segno: u32,
2656 0 : nblocks: BlockNumber,
2657 0 : ) -> Result<(), WalIngestError> {
2658 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2659 :
2660 : // Put size
2661 0 : let size_key = slru_segment_size_to_key(kind, segno);
2662 0 : let buf = nblocks.to_le_bytes();
2663 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2664 0 : Ok(())
2665 0 : }
2666 :
2667 : /// This method is used for marking truncated SLRU files
2668 0 : pub async fn drop_slru_segment(
2669 0 : &mut self,
2670 0 : kind: SlruKind,
2671 0 : segno: u32,
2672 0 : ctx: &RequestContext,
2673 0 : ) -> Result<(), WalIngestError> {
2674 : // Remove it from the directory entry
2675 0 : let dir_key = slru_dir_to_key(kind);
2676 0 : let buf = self.get(dir_key, ctx).await?;
2677 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2678 :
2679 0 : if !dir.segments.remove(&segno) {
2680 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2681 0 : }
2682 0 : self.pending_directory_entries.push((
2683 0 : DirectoryKind::SlruSegment(kind),
2684 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2685 0 : ));
2686 0 : self.put(
2687 0 : dir_key,
2688 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2689 : );
2690 :
2691 : // Delete size entry, as well as all blocks
2692 0 : self.delete(slru_segment_key_range(kind, segno));
2693 :
2694 0 : Ok(())
2695 0 : }
2696 :
2697 : /// Drop a relmapper file (pg_filenode.map)
2698 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
2699 : // TODO
2700 0 : Ok(())
2701 0 : }
2702 :
2703 : /// This method is used for marking truncated SLRU files
2704 0 : pub async fn drop_twophase_file(
2705 0 : &mut self,
2706 0 : xid: u64,
2707 0 : ctx: &RequestContext,
2708 0 : ) -> Result<(), WalIngestError> {
2709 : // Remove it from the directory entry
2710 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2711 0 : let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
2712 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2713 :
2714 0 : if !dir.xids.remove(&xid) {
2715 0 : warn!("twophase file for xid {} does not exist", xid);
2716 0 : }
2717 0 : self.pending_directory_entries.push((
2718 0 : DirectoryKind::TwoPhase,
2719 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2720 0 : ));
2721 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2722 : } else {
2723 0 : let xid: u32 = u32::try_from(xid)
2724 0 : .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
2725 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2726 :
2727 0 : if !dir.xids.remove(&xid) {
2728 0 : warn!("twophase file for xid {} does not exist", xid);
2729 0 : }
2730 0 : self.pending_directory_entries.push((
2731 0 : DirectoryKind::TwoPhase,
2732 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2733 0 : ));
2734 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2735 : };
2736 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2737 :
2738 : // Delete it
2739 0 : self.delete(twophase_key_range(xid));
2740 :
2741 0 : Ok(())
2742 0 : }
2743 :
2744 8 : pub async fn put_file(
2745 8 : &mut self,
2746 8 : path: &str,
2747 8 : content: &[u8],
2748 8 : ctx: &RequestContext,
2749 8 : ) -> Result<(), WalIngestError> {
2750 8 : let key = aux_file::encode_aux_file_key(path);
2751 : // retrieve the key from the engine
2752 8 : let old_val = match self.get(key, ctx).await {
2753 2 : Ok(val) => Some(val),
2754 6 : Err(PageReconstructError::MissingKey(_)) => None,
2755 0 : Err(e) => return Err(e.into()),
2756 : };
2757 8 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2758 2 : aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
2759 : } else {
2760 6 : Vec::new()
2761 : };
2762 8 : let mut other_files = Vec::with_capacity(files.len());
2763 8 : let mut modifying_file = None;
2764 10 : for file @ (p, content) in files {
2765 2 : if path == p {
2766 2 : assert!(
2767 2 : modifying_file.is_none(),
2768 0 : "duplicated entries found for {path}"
2769 : );
2770 2 : modifying_file = Some(content);
2771 0 : } else {
2772 0 : other_files.push(file);
2773 0 : }
2774 : }
2775 8 : let mut new_files = other_files;
2776 8 : match (modifying_file, content.is_empty()) {
2777 1 : (Some(old_content), false) => {
2778 1 : self.tline
2779 1 : .aux_file_size_estimator
2780 1 : .on_update(old_content.len(), content.len());
2781 1 : new_files.push((path, content));
2782 1 : }
2783 1 : (Some(old_content), true) => {
2784 1 : self.tline
2785 1 : .aux_file_size_estimator
2786 1 : .on_remove(old_content.len());
2787 1 : // not adding the file key to the final `new_files` vec.
2788 1 : }
2789 6 : (None, false) => {
2790 6 : self.tline.aux_file_size_estimator.on_add(content.len());
2791 6 : new_files.push((path, content));
2792 6 : }
2793 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2794 : // Compute doesn't know if previous version of this file exists or not, so
2795 : // attempt to delete non-existing file can cause this message.
2796 : // To avoid false alarms, log it as info rather than warning.
2797 0 : (None, true) if path.starts_with("pg_stat/") => {
2798 0 : info!("removing non-existing pg_stat file: {}", path)
2799 : }
2800 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2801 : }
2802 8 : let new_val = aux_file::encode_file_value(&new_files)
2803 8 : .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
2804 8 : self.put(key, Value::Image(new_val.into()));
2805 :
2806 8 : Ok(())
2807 8 : }
2808 :
2809 : ///
2810 : /// Flush changes accumulated so far to the underlying repository.
2811 : ///
2812 : /// Usually, changes made in DatadirModification are atomic, but this allows
2813 : /// you to flush them to the underlying repository before the final `commit`.
2814 : /// That allows to free up the memory used to hold the pending changes.
2815 : ///
2816 : /// Currently only used during bulk import of a data directory. In that
2817 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2818 : /// whole import fails and the timeline will be deleted anyway.
2819 : /// (Or to be precise, it will be left behind for debugging purposes and
2820 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2821 : ///
2822 : /// Note: A consequence of flushing the pending operations is that they
2823 : /// won't be visible to subsequent operations until `commit`. The function
2824 : /// retains all the metadata, but data pages are flushed. That's again OK
2825 : /// for bulk import, where you are just loading data pages and won't try to
2826 : /// modify the same pages twice.
2827 965 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2828 : // Unless we have accumulated a decent amount of changes, it's not worth it
2829 : // to scan through the pending_updates list.
2830 965 : let pending_nblocks = self.pending_nblocks;
2831 965 : if pending_nblocks < 10000 {
2832 965 : return Ok(());
2833 0 : }
2834 :
2835 0 : let mut writer = self.tline.writer().await;
2836 :
2837 : // Flush relation and SLRU data blocks, keep metadata.
2838 0 : if let Some(batch) = self.pending_data_batch.take() {
2839 0 : tracing::debug!(
2840 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2841 : batch.max_lsn,
2842 0 : self.tline.get_last_record_lsn()
2843 : );
2844 :
2845 : // This bails out on first error without modifying pending_updates.
2846 : // That's Ok, cf this function's doc comment.
2847 0 : writer.put_batch(batch, ctx).await?;
2848 0 : }
2849 :
2850 0 : if pending_nblocks != 0 {
2851 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2852 0 : self.pending_nblocks = 0;
2853 0 : }
2854 :
2855 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2856 0 : writer.update_directory_entries_count(kind, count);
2857 0 : }
2858 :
2859 0 : Ok(())
2860 965 : }
2861 :
2862 : ///
2863 : /// Finish this atomic update, writing all the updated keys to the
2864 : /// underlying timeline.
2865 : /// All the modifications in this atomic update are stamped by the specified LSN.
2866 : ///
2867 371557 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2868 371557 : let mut writer = self.tline.writer().await;
2869 :
2870 371557 : let pending_nblocks = self.pending_nblocks;
2871 371557 : self.pending_nblocks = 0;
2872 :
2873 : // Ordering: the items in this batch do not need to be in any global order, but values for
2874 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2875 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2876 : // more details.
2877 :
2878 371557 : let metadata_batch = {
2879 371557 : let pending_meta = self
2880 371557 : .pending_metadata_pages
2881 371557 : .drain()
2882 371557 : .flat_map(|(key, values)| {
2883 137068 : values
2884 137068 : .into_iter()
2885 137068 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2886 137068 : })
2887 371557 : .collect::<Vec<_>>();
2888 :
2889 371557 : if pending_meta.is_empty() {
2890 236139 : None
2891 : } else {
2892 135418 : Some(SerializedValueBatch::from_values(pending_meta))
2893 : }
2894 : };
2895 :
2896 371557 : let data_batch = self.pending_data_batch.take();
2897 :
2898 371557 : let maybe_batch = match (data_batch, metadata_batch) {
2899 132278 : (Some(mut data), Some(metadata)) => {
2900 132278 : data.extend(metadata);
2901 132278 : Some(data)
2902 : }
2903 71631 : (Some(data), None) => Some(data),
2904 3140 : (None, Some(metadata)) => Some(metadata),
2905 164508 : (None, None) => None,
2906 : };
2907 :
2908 371557 : if let Some(batch) = maybe_batch {
2909 207049 : tracing::debug!(
2910 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2911 : batch.max_lsn,
2912 0 : self.tline.get_last_record_lsn()
2913 : );
2914 :
2915 : // This bails out on first error without modifying pending_updates.
2916 : // That's Ok, cf this function's doc comment.
2917 207049 : writer.put_batch(batch, ctx).await?;
2918 164508 : }
2919 :
2920 371557 : if !self.pending_deletions.is_empty() {
2921 1 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2922 1 : self.pending_deletions.clear();
2923 371556 : }
2924 :
2925 371557 : self.pending_lsns.push(self.lsn);
2926 444486 : for pending_lsn in self.pending_lsns.drain(..) {
2927 444486 : // TODO(vlad): pretty sure the comment below is not valid anymore
2928 444486 : // and we can call finish write with the latest LSN
2929 444486 : //
2930 444486 : // Ideally, we should be able to call writer.finish_write() only once
2931 444486 : // with the highest LSN. However, the last_record_lsn variable in the
2932 444486 : // timeline keeps track of the latest LSN and the immediate previous LSN
2933 444486 : // so we need to record every LSN to not leave a gap between them.
2934 444486 : writer.finish_write(pending_lsn);
2935 444486 : }
2936 :
2937 371557 : if pending_nblocks != 0 {
2938 135285 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2939 236272 : }
2940 :
2941 371557 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2942 1527 : writer.update_directory_entries_count(kind, count);
2943 1527 : }
2944 :
2945 371557 : self.pending_metadata_bytes = 0;
2946 :
2947 371557 : Ok(())
2948 371557 : }
2949 :
2950 145852 : pub(crate) fn len(&self) -> usize {
2951 145852 : self.pending_metadata_pages.len()
2952 145852 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2953 145852 : + self.pending_deletions.len()
2954 145852 : }
2955 :
2956 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2957 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2958 : /// in the modification.
2959 : ///
2960 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2961 : /// page must ensure that the pages they read are already committed in Timeline, for example
2962 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2963 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2964 : /// and not data pages.
2965 143293 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2966 143293 : if !Self::is_data_key(&key) {
2967 : // Have we already updated the same key? Read the latest pending updated
2968 : // version in that case.
2969 : //
2970 : // Note: we don't check pending_deletions. It is an error to request a
2971 : // value that has been removed, deletion only avoids leaking storage.
2972 143293 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2973 7964 : if let Some((_, _, value)) = values.last() {
2974 7964 : return if let Value::Image(img) = value {
2975 7964 : Ok(img.clone())
2976 : } else {
2977 : // Currently, we never need to read back a WAL record that we
2978 : // inserted in the same "transaction". All the metadata updates
2979 : // work directly with Images, and we never need to read actual
2980 : // data pages. We could handle this if we had to, by calling
2981 : // the walredo manager, but let's keep it simple for now.
2982 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2983 0 : "unexpected pending WAL record"
2984 0 : )))
2985 : };
2986 0 : }
2987 135329 : }
2988 : } else {
2989 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2990 : // this key should never be present in pending_data_pages. We ensure this by committing
2991 : // modifications before ingesting DB create operations, which are the only kind that reads
2992 : // data pages during ingest.
2993 0 : if cfg!(debug_assertions) {
2994 0 : assert!(
2995 0 : !self
2996 0 : .pending_data_batch
2997 0 : .as_ref()
2998 0 : .is_some_and(|b| b.updates_key(&key))
2999 : );
3000 0 : }
3001 : }
3002 :
3003 : // Metadata page cache miss, or we're reading a data page.
3004 135329 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
3005 135329 : self.tline.get(key, lsn, ctx).await
3006 143293 : }
3007 :
3008 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
3009 : /// and the empty value into None.
3010 0 : async fn sparse_get(
3011 0 : &self,
3012 0 : key: Key,
3013 0 : ctx: &RequestContext,
3014 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
3015 0 : let val = self.get(key, ctx).await;
3016 0 : match val {
3017 0 : Ok(val) if val.is_empty() => Ok(None),
3018 0 : Ok(val) => Ok(Some(val)),
3019 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
3020 0 : Err(e) => Err(e),
3021 : }
3022 0 : }
3023 :
3024 : #[cfg(test)]
3025 2 : pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
3026 2 : self.put(key, val);
3027 2 : }
3028 :
3029 282078 : fn put(&mut self, key: Key, val: Value) {
3030 282078 : if Self::is_data_key(&key) {
3031 138934 : self.put_data(key.to_compact(), val)
3032 : } else {
3033 143144 : self.put_metadata(key.to_compact(), val)
3034 : }
3035 282078 : }
3036 :
3037 138934 : fn put_data(&mut self, key: CompactKey, val: Value) {
3038 138934 : let batch = self
3039 138934 : .pending_data_batch
3040 138934 : .get_or_insert_with(SerializedValueBatch::default);
3041 138934 : batch.put(key, val, self.lsn);
3042 138934 : }
3043 :
3044 143144 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
3045 143144 : let values = self.pending_metadata_pages.entry(key).or_default();
3046 : // Replace the previous value if it exists at the same lsn
3047 143144 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
3048 6076 : if *last_lsn == self.lsn {
3049 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
3050 6076 : self.pending_metadata_bytes -= *last_value_ser_size;
3051 6076 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
3052 6076 : self.pending_metadata_bytes += *last_value_ser_size;
3053 :
3054 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
3055 : // have been generated by synthesized zero page writes prior to the first real write to a page.
3056 6076 : *last_value = val;
3057 6076 : return;
3058 0 : }
3059 137068 : }
3060 :
3061 137068 : let val_serialized_size = val.serialized_size().unwrap() as usize;
3062 137068 : self.pending_metadata_bytes += val_serialized_size;
3063 137068 : values.push((self.lsn, val_serialized_size, val));
3064 :
3065 137068 : if key == CHECKPOINT_KEY.to_compact() {
3066 119 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
3067 136949 : }
3068 143144 : }
3069 :
3070 1 : fn delete(&mut self, key_range: Range<Key>) {
3071 1 : trace!("DELETE {}-{}", key_range.start, key_range.end);
3072 1 : self.pending_deletions.push((key_range, self.lsn));
3073 1 : }
3074 : }
3075 :
3076 : /// Statistics for a DatadirModification.
3077 : #[derive(Default)]
3078 : pub struct DatadirModificationStats {
3079 : pub metadata_images: u64,
3080 : pub metadata_deltas: u64,
3081 : pub data_images: u64,
3082 : pub data_deltas: u64,
3083 : }
3084 :
3085 : /// This struct facilitates accessing either a committed key from the timeline at a
3086 : /// specific LSN, or the latest uncommitted key from a pending modification.
3087 : ///
3088 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
3089 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
3090 : /// need to look up the keys in the modification first before looking them up in the
3091 : /// timeline to not miss the latest updates.
3092 : #[derive(Clone, Copy)]
3093 : pub enum Version<'a> {
3094 : LsnRange(LsnRange),
3095 : Modified(&'a DatadirModification<'a>),
3096 : }
3097 :
3098 : impl Version<'_> {
3099 25 : async fn get(
3100 25 : &self,
3101 25 : timeline: &Timeline,
3102 25 : key: Key,
3103 25 : ctx: &RequestContext,
3104 25 : ) -> Result<Bytes, PageReconstructError> {
3105 25 : match self {
3106 15 : Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
3107 10 : Version::Modified(modification) => modification.get(key, ctx).await,
3108 : }
3109 25 : }
3110 :
3111 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
3112 : /// and the empty value into None.
3113 0 : async fn sparse_get(
3114 0 : &self,
3115 0 : timeline: &Timeline,
3116 0 : key: Key,
3117 0 : ctx: &RequestContext,
3118 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
3119 0 : let val = self.get(timeline, key, ctx).await;
3120 0 : match val {
3121 0 : Ok(val) if val.is_empty() => Ok(None),
3122 0 : Ok(val) => Ok(Some(val)),
3123 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
3124 0 : Err(e) => Err(e),
3125 : }
3126 0 : }
3127 :
3128 26 : pub fn is_latest(&self) -> bool {
3129 26 : match self {
3130 16 : Version::LsnRange(lsns) => lsns.is_latest(),
3131 10 : Version::Modified(_) => true,
3132 : }
3133 26 : }
3134 :
3135 224275 : pub fn get_lsn(&self) -> Lsn {
3136 224275 : match self {
3137 12224 : Version::LsnRange(lsns) => lsns.effective_lsn,
3138 212051 : Version::Modified(modification) => modification.lsn,
3139 : }
3140 224275 : }
3141 :
3142 12219 : pub fn at(lsn: Lsn) -> Self {
3143 12219 : Version::LsnRange(LsnRange {
3144 12219 : effective_lsn: lsn,
3145 12219 : request_lsn: lsn,
3146 12219 : })
3147 12219 : }
3148 : }
3149 :
3150 : //--- Metadata structs stored in key-value pairs in the repository.
3151 :
3152 0 : #[derive(Debug, Serialize, Deserialize)]
3153 : pub(crate) struct DbDirectory {
3154 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
3155 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
3156 : }
3157 :
3158 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
3159 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
3160 : // were named like "pg_twophase/000002E5", now they're like
3161 : // "pg_twophsae/0000000A000002E4".
3162 :
3163 0 : #[derive(Debug, Serialize, Deserialize)]
3164 : pub(crate) struct TwoPhaseDirectory {
3165 : pub(crate) xids: HashSet<TransactionId>,
3166 : }
3167 :
3168 0 : #[derive(Debug, Serialize, Deserialize)]
3169 : struct TwoPhaseDirectoryV17 {
3170 : xids: HashSet<u64>,
3171 : }
3172 :
3173 0 : #[derive(Debug, Serialize, Deserialize, Default)]
3174 : pub(crate) struct RelDirectory {
3175 : // Set of relations that exist. (relfilenode, forknum)
3176 : //
3177 : // TODO: Store it as a btree or radix tree or something else that spans multiple
3178 : // key-value pairs, if you have a lot of relations
3179 : pub(crate) rels: HashSet<(Oid, u8)>,
3180 : }
3181 :
3182 0 : #[derive(Debug, Serialize, Deserialize)]
3183 : struct RelSizeEntry {
3184 : nblocks: u32,
3185 : }
3186 :
3187 0 : #[derive(Debug, Serialize, Deserialize, Default)]
3188 : pub(crate) struct SlruSegmentDirectory {
3189 : // Set of SLRU segments that exist.
3190 : pub(crate) segments: HashSet<u32>,
3191 : }
3192 :
3193 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
3194 : #[repr(u8)]
3195 : pub(crate) enum DirectoryKind {
3196 : Db,
3197 : TwoPhase,
3198 : Rel,
3199 : AuxFiles,
3200 : SlruSegment(SlruKind),
3201 : RelV2,
3202 : }
3203 :
3204 : impl DirectoryKind {
3205 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
3206 4582 : pub(crate) fn offset(&self) -> usize {
3207 4582 : self.into_usize()
3208 4582 : }
3209 : }
3210 :
3211 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
3212 :
3213 : #[allow(clippy::bool_assert_comparison)]
3214 : #[cfg(test)]
3215 : mod tests {
3216 : use hex_literal::hex;
3217 : use pageserver_api::models::ShardParameters;
3218 : use utils::id::TimelineId;
3219 : use utils::shard::{ShardCount, ShardNumber, ShardStripeSize};
3220 :
3221 : use super::*;
3222 : use crate::DEFAULT_PG_VERSION;
3223 : use crate::tenant::harness::TenantHarness;
3224 :
3225 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
3226 : #[tokio::test]
3227 1 : async fn aux_files_round_trip() -> anyhow::Result<()> {
3228 1 : let name = "aux_files_round_trip";
3229 1 : let harness = TenantHarness::create(name).await?;
3230 :
3231 : pub const TIMELINE_ID: TimelineId =
3232 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
3233 :
3234 1 : let (tenant, ctx) = harness.load().await;
3235 1 : let (tline, ctx) = tenant
3236 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3237 1 : .await?;
3238 1 : let tline = tline.raw_timeline().unwrap();
3239 :
3240 : // First modification: insert two keys
3241 1 : let mut modification = tline.begin_modification(Lsn(0x1000));
3242 1 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
3243 1 : modification.set_lsn(Lsn(0x1008))?;
3244 1 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
3245 1 : modification.commit(&ctx).await?;
3246 1 : let expect_1008 = HashMap::from([
3247 1 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
3248 1 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
3249 1 : ]);
3250 :
3251 1 : let io_concurrency = IoConcurrency::spawn_for_test();
3252 :
3253 1 : let readback = tline
3254 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
3255 1 : .await?;
3256 1 : assert_eq!(readback, expect_1008);
3257 :
3258 : // Second modification: update one key, remove the other
3259 1 : let mut modification = tline.begin_modification(Lsn(0x2000));
3260 1 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
3261 1 : modification.set_lsn(Lsn(0x2008))?;
3262 1 : modification.put_file("foo/bar2", b"", &ctx).await?;
3263 1 : modification.commit(&ctx).await?;
3264 1 : let expect_2008 =
3265 1 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
3266 :
3267 1 : let readback = tline
3268 1 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
3269 1 : .await?;
3270 1 : assert_eq!(readback, expect_2008);
3271 :
3272 : // Reading back in time works
3273 1 : let readback = tline
3274 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
3275 1 : .await?;
3276 1 : assert_eq!(readback, expect_1008);
3277 :
3278 2 : Ok(())
3279 1 : }
3280 :
3281 : #[test]
3282 1 : fn gap_finding() {
3283 1 : let rel = RelTag {
3284 1 : spcnode: 1663,
3285 1 : dbnode: 208101,
3286 1 : relnode: 2620,
3287 1 : forknum: 0,
3288 1 : };
3289 1 : let base_blkno = 1;
3290 :
3291 1 : let base_key = rel_block_to_key(rel, base_blkno);
3292 1 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
3293 :
3294 1 : let shard = ShardIdentity::unsharded();
3295 :
3296 1 : let mut previous_nblocks = 0;
3297 11 : for i in 0..10 {
3298 10 : let crnt_blkno = base_blkno + i;
3299 10 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
3300 :
3301 10 : previous_nblocks = crnt_blkno + 1;
3302 :
3303 10 : if i == 0 {
3304 : // The first block we write is 1, so we should find the gap.
3305 1 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
3306 : } else {
3307 9 : assert!(gaps.is_none());
3308 : }
3309 : }
3310 :
3311 : // This is an update to an already existing block. No gaps here.
3312 1 : let update_blkno = 5;
3313 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3314 1 : assert!(gaps.is_none());
3315 :
3316 : // This is an update past the current end block.
3317 1 : let after_gap_blkno = 20;
3318 1 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
3319 :
3320 1 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
3321 1 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
3322 1 : assert_eq!(
3323 1 : gaps.unwrap(),
3324 1 : KeySpace::single(gap_start_key..after_gap_key)
3325 : );
3326 1 : }
3327 :
3328 : #[test]
3329 1 : fn sharded_gap_finding() {
3330 1 : let rel = RelTag {
3331 1 : spcnode: 1663,
3332 1 : dbnode: 208101,
3333 1 : relnode: 2620,
3334 1 : forknum: 0,
3335 1 : };
3336 :
3337 1 : let first_blkno = 6;
3338 :
3339 : // This shard will get the even blocks
3340 1 : let shard = ShardIdentity::from_params(
3341 1 : ShardNumber(0),
3342 1 : ShardParameters {
3343 1 : count: ShardCount(2),
3344 1 : stripe_size: ShardStripeSize(1),
3345 1 : },
3346 : );
3347 :
3348 : // Only keys belonging to this shard are considered as gaps.
3349 1 : let mut previous_nblocks = 0;
3350 1 : let gaps =
3351 1 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
3352 1 : assert!(!gaps.ranges.is_empty());
3353 3 : for gap_range in gaps.ranges {
3354 2 : let mut k = gap_range.start;
3355 4 : while k != gap_range.end {
3356 2 : assert_eq!(shard.get_shard_number(&k), shard.number);
3357 2 : k = k.next();
3358 : }
3359 : }
3360 :
3361 1 : previous_nblocks = first_blkno;
3362 :
3363 1 : let update_blkno = 2;
3364 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
3365 1 : assert!(gaps.is_none());
3366 1 : }
3367 :
3368 : /*
3369 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
3370 : let incremental = timeline.get_current_logical_size();
3371 : let non_incremental = timeline
3372 : .get_current_logical_size_non_incremental(lsn)
3373 : .unwrap();
3374 : assert_eq!(incremental, non_incremental);
3375 : }
3376 : */
3377 :
3378 : /*
3379 : ///
3380 : /// Test list_rels() function, with branches and dropped relations
3381 : ///
3382 : #[test]
3383 : fn test_list_rels_drop() -> Result<()> {
3384 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
3385 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
3386 : const TESTDB: u32 = 111;
3387 :
3388 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
3389 : // after branching fails below
3390 : let mut writer = tline.begin_record(Lsn(0x10));
3391 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
3392 : writer.finish()?;
3393 :
3394 : // Create a relation on the timeline
3395 : let mut writer = tline.begin_record(Lsn(0x20));
3396 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
3397 : writer.finish()?;
3398 :
3399 : let writer = tline.begin_record(Lsn(0x00));
3400 : writer.finish()?;
3401 :
3402 : // Check that list_rels() lists it after LSN 2, but no before it
3403 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
3404 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
3405 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
3406 :
3407 : // Create a branch, check that the relation is visible there
3408 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
3409 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
3410 : Some(timeline) => timeline,
3411 : None => panic!("Should have a local timeline"),
3412 : };
3413 : let newtline = DatadirTimelineImpl::new(newtline);
3414 : assert!(newtline
3415 : .list_rels(0, TESTDB, Lsn(0x30))?
3416 : .contains(&TESTREL_A));
3417 :
3418 : // Drop it on the branch
3419 : let mut new_writer = newtline.begin_record(Lsn(0x40));
3420 : new_writer.drop_relation(TESTREL_A)?;
3421 : new_writer.finish()?;
3422 :
3423 : // Check that it's no longer listed on the branch after the point where it was dropped
3424 : assert!(newtline
3425 : .list_rels(0, TESTDB, Lsn(0x30))?
3426 : .contains(&TESTREL_A));
3427 : assert!(!newtline
3428 : .list_rels(0, TESTDB, Lsn(0x40))?
3429 : .contains(&TESTREL_A));
3430 :
3431 : // Run checkpoint and garbage collection and check that it's still not visible
3432 : newtline.checkpoint(CheckpointConfig::Forced)?;
3433 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
3434 :
3435 : assert!(!newtline
3436 : .list_rels(0, TESTDB, Lsn(0x40))?
3437 : .contains(&TESTREL_A));
3438 :
3439 : Ok(())
3440 : }
3441 : */
3442 :
3443 : /*
3444 : #[test]
3445 : fn test_read_beyond_eof() -> Result<()> {
3446 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3447 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3448 :
3449 : make_some_layers(&tline, Lsn(0x20))?;
3450 : let mut writer = tline.begin_record(Lsn(0x60));
3451 : walingest.put_rel_page_image(
3452 : &mut writer,
3453 : TESTREL_A,
3454 : 0,
3455 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3456 : )?;
3457 : writer.finish()?;
3458 :
3459 : // Test read before rel creation. Should error out.
3460 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3461 :
3462 : // Read block beyond end of relation at different points in time.
3463 : // These reads should fall into different delta, image, and in-memory layers.
3464 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3465 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3466 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3467 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3468 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3469 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3470 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3471 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3472 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3473 :
3474 : // Test on an in-memory layer with no preceding layer
3475 : let mut writer = tline.begin_record(Lsn(0x70));
3476 : walingest.put_rel_page_image(
3477 : &mut writer,
3478 : TESTREL_B,
3479 : 0,
3480 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3481 : )?;
3482 : writer.finish()?;
3483 :
3484 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3485 :
3486 : Ok(())
3487 : }
3488 : */
3489 : }
|