Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use std::collections::{HashMap, HashSet, hash_map};
10 : use std::ops::{ControlFlow, Range};
11 :
12 : use crate::walingest::{WalIngestError, WalIngestErrorKind};
13 : use crate::{PERF_TRACE_TARGET, ensure_walingest};
14 : use anyhow::Context;
15 : use bytes::{Buf, Bytes, BytesMut};
16 : use enum_map::Enum;
17 : use pageserver_api::key::{
18 : AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
19 : TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
20 : rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
21 : repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
22 : slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
23 : };
24 : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
25 : use pageserver_api::models::RelSizeMigration;
26 : use pageserver_api::record::NeonWalRecord;
27 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
28 : use pageserver_api::shard::ShardIdentity;
29 : use pageserver_api::value::Value;
30 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
31 : use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
32 : use serde::{Deserialize, Serialize};
33 : use strum::IntoEnumIterator;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::{debug, info, info_span, trace, warn};
36 : use utils::bin_ser::{BeSer, DeserializeError};
37 : use utils::lsn::Lsn;
38 : use utils::pausable_failpoint;
39 : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
40 :
41 : use super::tenant::{PageReconstructError, Timeline};
42 : use crate::aux_file;
43 : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
44 : use crate::keyspace::{KeySpace, KeySpaceAccum};
45 : use crate::metrics::{
46 : RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
47 : RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
48 : RELSIZE_SNAPSHOT_CACHE_MISSES,
49 : };
50 : use crate::span::{
51 : debug_assert_current_span_has_tenant_and_timeline_id,
52 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
53 : };
54 : use crate::tenant::storage_layer::IoConcurrency;
55 : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
56 :
57 : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
58 : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
59 :
60 : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
61 : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
62 :
63 : #[derive(Debug)]
64 : pub enum LsnForTimestamp {
65 : /// Found commits both before and after the given timestamp
66 : Present(Lsn),
67 :
68 : /// Found no commits after the given timestamp, this means
69 : /// that the newest data in the branch is older than the given
70 : /// timestamp.
71 : ///
72 : /// All commits <= LSN happened before the given timestamp
73 : Future(Lsn),
74 :
75 : /// The queried timestamp is past our horizon we look back at (PITR)
76 : ///
77 : /// All commits > LSN happened after the given timestamp,
78 : /// but any commits < LSN might have happened before or after
79 : /// the given timestamp. We don't know because no data before
80 : /// the given lsn is available.
81 : Past(Lsn),
82 :
83 : /// We have found no commit with a timestamp,
84 : /// so we can't return anything meaningful.
85 : ///
86 : /// The associated LSN is the lower bound value we can safely
87 : /// create branches on, but no statement is made if it is
88 : /// older or newer than the timestamp.
89 : ///
90 : /// This variant can e.g. be returned right after a
91 : /// cluster import.
92 : NoData(Lsn),
93 : }
94 :
95 : /// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
96 : /// See comments libs/pageserver_api/src/models.rs.
97 : /// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
98 : /// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
99 : #[derive(Debug, Clone, Copy, Default)]
100 : pub struct LsnRange {
101 : pub effective_lsn: Lsn,
102 : pub request_lsn: Lsn,
103 : }
104 :
105 : impl LsnRange {
106 0 : pub fn at(lsn: Lsn) -> LsnRange {
107 0 : LsnRange {
108 0 : effective_lsn: lsn,
109 0 : request_lsn: lsn,
110 0 : }
111 0 : }
112 16 : pub fn is_latest(&self) -> bool {
113 16 : self.request_lsn == Lsn::MAX
114 16 : }
115 : }
116 :
117 : #[derive(Debug, thiserror::Error)]
118 : pub(crate) enum CalculateLogicalSizeError {
119 : #[error("cancelled")]
120 : Cancelled,
121 :
122 : /// Something went wrong while reading the metadata we use to calculate logical size
123 : /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
124 : /// in the `From` implementation for this variant.
125 : #[error(transparent)]
126 : PageRead(PageReconstructError),
127 :
128 : /// Something went wrong deserializing metadata that we read to calculate logical size
129 : #[error("decode error: {0}")]
130 : Decode(#[from] DeserializeError),
131 : }
132 :
133 : #[derive(Debug, thiserror::Error)]
134 : pub(crate) enum CollectKeySpaceError {
135 : #[error(transparent)]
136 : Decode(#[from] DeserializeError),
137 : #[error(transparent)]
138 : PageRead(PageReconstructError),
139 : #[error("cancelled")]
140 : Cancelled,
141 : }
142 :
143 : impl From<PageReconstructError> for CollectKeySpaceError {
144 0 : fn from(err: PageReconstructError) -> Self {
145 0 : match err {
146 0 : PageReconstructError::Cancelled => Self::Cancelled,
147 0 : err => Self::PageRead(err),
148 : }
149 0 : }
150 : }
151 :
152 : impl From<PageReconstructError> for CalculateLogicalSizeError {
153 0 : fn from(pre: PageReconstructError) -> Self {
154 0 : match pre {
155 0 : PageReconstructError::Cancelled => Self::Cancelled,
156 0 : _ => Self::PageRead(pre),
157 : }
158 0 : }
159 : }
160 :
161 : #[derive(Debug, thiserror::Error)]
162 : pub enum RelationError {
163 : #[error("invalid relnode")]
164 : InvalidRelnode,
165 : }
166 :
167 : ///
168 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
169 : /// and other special kinds of files, in a versioned key-value store. The
170 : /// Timeline struct provides the key-value store.
171 : ///
172 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
173 : /// implementation, and might be moved into a separate struct later.
174 : impl Timeline {
175 : /// Start ingesting a WAL record, or other atomic modification of
176 : /// the timeline.
177 : ///
178 : /// This provides a transaction-like interface to perform a bunch
179 : /// of modifications atomically.
180 : ///
181 : /// To ingest a WAL record, call begin_modification(lsn) to get a
182 : /// DatadirModification object. Use the functions in the object to
183 : /// modify the repository state, updating all the pages and metadata
184 : /// that the WAL record affects. When you're done, call commit() to
185 : /// commit the changes.
186 : ///
187 : /// Lsn stored in modification is advanced by `ingest_record` and
188 : /// is used by `commit()` to update `last_record_lsn`.
189 : ///
190 : /// Calling commit() will flush all the changes and reset the state,
191 : /// so the `DatadirModification` struct can be reused to perform the next modification.
192 : ///
193 : /// Note that any pending modifications you make through the
194 : /// modification object won't be visible to calls to the 'get' and list
195 : /// functions of the timeline until you finish! And if you update the
196 : /// same page twice, the last update wins.
197 : ///
198 134213 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
199 134213 : where
200 134213 : Self: Sized,
201 134213 : {
202 134213 : DatadirModification {
203 134213 : tline: self,
204 134213 : pending_lsns: Vec::new(),
205 134213 : pending_metadata_pages: HashMap::new(),
206 134213 : pending_data_batch: None,
207 134213 : pending_deletions: Vec::new(),
208 134213 : pending_nblocks: 0,
209 134213 : pending_directory_entries: Vec::new(),
210 134213 : pending_metadata_bytes: 0,
211 134213 : lsn,
212 134213 : }
213 134213 : }
214 :
215 : //------------------------------------------------------------------------------
216 : // Public GET functions
217 : //------------------------------------------------------------------------------
218 :
219 : /// Look up given page version.
220 9192 : pub(crate) async fn get_rel_page_at_lsn(
221 9192 : &self,
222 9192 : tag: RelTag,
223 9192 : blknum: BlockNumber,
224 9192 : version: Version<'_>,
225 9192 : ctx: &RequestContext,
226 9192 : io_concurrency: IoConcurrency,
227 9192 : ) -> Result<Bytes, PageReconstructError> {
228 9192 : match version {
229 9192 : Version::LsnRange(lsns) => {
230 9192 : let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
231 9192 : let res = self
232 9192 : .get_rel_page_at_lsn_batched(
233 9192 : pages
234 9192 : .iter()
235 9192 : .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
236 9192 : io_concurrency.clone(),
237 9192 : ctx,
238 9192 : )
239 9192 : .await;
240 9192 : assert_eq!(res.len(), 1);
241 9192 : res.into_iter().next().unwrap()
242 : }
243 0 : Version::Modified(modification) => {
244 0 : if tag.relnode == 0 {
245 0 : return Err(PageReconstructError::Other(
246 0 : RelationError::InvalidRelnode.into(),
247 0 : ));
248 0 : }
249 :
250 0 : let nblocks = self.get_rel_size(tag, version, ctx).await?;
251 0 : if blknum >= nblocks {
252 0 : debug!(
253 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
254 0 : tag,
255 0 : blknum,
256 0 : version.get_lsn(),
257 : nblocks
258 : );
259 0 : return Ok(ZERO_PAGE.clone());
260 0 : }
261 0 :
262 0 : let key = rel_block_to_key(tag, blknum);
263 0 : modification.get(key, ctx).await
264 : }
265 : }
266 9192 : }
267 :
268 : /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
269 : ///
270 : /// The ordering of the returned vec corresponds to the ordering of `pages`.
271 9192 : pub(crate) async fn get_rel_page_at_lsn_batched(
272 9192 : &self,
273 9192 : pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
274 9192 : io_concurrency: IoConcurrency,
275 9192 : ctx: &RequestContext,
276 9192 : ) -> Vec<Result<Bytes, PageReconstructError>> {
277 9192 : debug_assert_current_span_has_tenant_and_timeline_id();
278 9192 :
279 9192 : let mut slots_filled = 0;
280 9192 : let page_count = pages.len();
281 9192 :
282 9192 : // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
283 9192 : let mut result = Vec::with_capacity(pages.len());
284 9192 : let result_slots = result.spare_capacity_mut();
285 9192 :
286 9192 : let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
287 9192 : HashMap::with_capacity(pages.len());
288 9192 :
289 9192 : let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
290 9192 : HashMap::with_capacity(pages.len());
291 :
292 9192 : for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
293 9192 : if tag.relnode == 0 {
294 0 : result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
295 0 : RelationError::InvalidRelnode.into(),
296 0 : )));
297 0 :
298 0 : slots_filled += 1;
299 0 : continue;
300 9192 : }
301 9192 : let lsn = lsns.effective_lsn;
302 9192 : let nblocks = {
303 9192 : let ctx = RequestContextBuilder::from(&ctx)
304 9192 : .perf_span(|crnt_perf_span| {
305 0 : info_span!(
306 : target: PERF_TRACE_TARGET,
307 0 : parent: crnt_perf_span,
308 : "GET_REL_SIZE",
309 : reltag=%tag,
310 : lsn=%lsn,
311 : )
312 9192 : })
313 9192 : .attached_child();
314 9192 :
315 9192 : match self
316 9192 : .get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
317 9192 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
318 9192 : .await
319 : {
320 9192 : Ok(nblocks) => nblocks,
321 0 : Err(err) => {
322 0 : result_slots[response_slot_idx].write(Err(err));
323 0 : slots_filled += 1;
324 0 : continue;
325 : }
326 : }
327 : };
328 :
329 9192 : if *blknum >= nblocks {
330 0 : debug!(
331 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
332 : tag, blknum, lsn, nblocks
333 : );
334 0 : result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
335 0 : slots_filled += 1;
336 0 : continue;
337 9192 : }
338 9192 :
339 9192 : let key = rel_block_to_key(*tag, *blknum);
340 9192 :
341 9192 : let ctx = RequestContextBuilder::from(&ctx)
342 9192 : .perf_span(|crnt_perf_span| {
343 0 : info_span!(
344 : target: PERF_TRACE_TARGET,
345 0 : parent: crnt_perf_span,
346 : "GET_BATCH",
347 : batch_size = %page_count,
348 : )
349 9192 : })
350 9192 : .attached_child();
351 9192 :
352 9192 : let key_slots = keys_slots.entry(key).or_default();
353 9192 : key_slots.push((response_slot_idx, ctx));
354 9192 :
355 9192 : let acc = req_keyspaces.entry(lsn).or_default();
356 9192 : acc.add_key(key);
357 9192 : }
358 :
359 9192 : let query: Vec<(Lsn, KeySpace)> = req_keyspaces
360 9192 : .into_iter()
361 9192 : .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
362 9192 : .collect();
363 9192 :
364 9192 : let query = VersionedKeySpaceQuery::scattered(query);
365 9192 : let res = self
366 9192 : .get_vectored(query, io_concurrency, ctx)
367 9192 : .maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
368 9192 : .await;
369 :
370 9192 : match res {
371 9192 : Ok(results) => {
372 18384 : for (key, res) in results {
373 9192 : let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
374 9192 : let (first_slot, first_req_ctx) = key_slots.next().unwrap();
375 :
376 9192 : for (slot, req_ctx) in key_slots {
377 0 : let clone = match &res {
378 0 : Ok(buf) => Ok(buf.clone()),
379 0 : Err(err) => Err(match err {
380 0 : PageReconstructError::Cancelled => PageReconstructError::Cancelled,
381 :
382 0 : x @ PageReconstructError::Other(_)
383 0 : | x @ PageReconstructError::AncestorLsnTimeout(_)
384 0 : | x @ PageReconstructError::WalRedo(_)
385 0 : | x @ PageReconstructError::MissingKey(_) => {
386 0 : PageReconstructError::Other(anyhow::anyhow!(
387 0 : "there was more than one request for this key in the batch, error logged once: {x:?}"
388 0 : ))
389 : }
390 : }),
391 : };
392 :
393 0 : result_slots[slot].write(clone);
394 0 : // There is no standardized way to express that the batched span followed from N request spans.
395 0 : // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
396 0 : // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
397 0 : req_ctx.perf_follows_from(ctx);
398 0 : slots_filled += 1;
399 : }
400 :
401 9192 : result_slots[first_slot].write(res);
402 9192 : first_req_ctx.perf_follows_from(ctx);
403 9192 : slots_filled += 1;
404 : }
405 : }
406 0 : Err(err) => {
407 : // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
408 : // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
409 0 : for (slot, req_ctx) in keys_slots.values().flatten() {
410 : // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
411 : // but without taking ownership of the GetVectoredError
412 0 : let err = match &err {
413 0 : GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
414 : // TODO: restructure get_vectored API to make this error per-key
415 0 : GetVectoredError::MissingKey(err) => {
416 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
417 0 : "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
418 0 : )))
419 : }
420 : // TODO: restructure get_vectored API to make this error per-key
421 0 : GetVectoredError::GetReadyAncestorError(err) => {
422 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
423 0 : "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
424 0 : )))
425 : }
426 : // TODO: restructure get_vectored API to make this error per-key
427 0 : GetVectoredError::Other(err) => Err(PageReconstructError::Other(
428 0 : anyhow::anyhow!("whole vectored get request failed: {err:?}"),
429 0 : )),
430 : // TODO: we can prevent this error class by moving this check into the type system
431 0 : GetVectoredError::InvalidLsn(e) => {
432 0 : Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
433 : }
434 : // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
435 : // TODO: we can prevent this error class by moving this check into the type system
436 0 : GetVectoredError::Oversized(err) => {
437 0 : Err(anyhow::anyhow!("batching oversized: {err:?}").into())
438 : }
439 : };
440 :
441 0 : req_ctx.perf_follows_from(ctx);
442 0 : result_slots[*slot].write(err);
443 : }
444 :
445 0 : slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
446 0 : }
447 : };
448 :
449 9192 : assert_eq!(slots_filled, page_count);
450 : // SAFETY:
451 : // 1. `result` and any of its uninint members are not read from until this point
452 : // 2. The length below is tracked at run-time and matches the number of requested pages.
453 9192 : unsafe {
454 9192 : result.set_len(page_count);
455 9192 : }
456 9192 :
457 9192 : result
458 9192 : }
459 :
460 : /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
461 : /// other shards, by only accounting for relations the shard has pages for, and only accounting
462 : /// for pages up to the highest page number it has stored.
463 0 : pub(crate) async fn get_db_size(
464 0 : &self,
465 0 : spcnode: Oid,
466 0 : dbnode: Oid,
467 0 : version: Version<'_>,
468 0 : ctx: &RequestContext,
469 0 : ) -> Result<usize, PageReconstructError> {
470 0 : let mut total_blocks = 0;
471 :
472 0 : let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
473 :
474 0 : for rel in rels {
475 0 : let n_blocks = self.get_rel_size(rel, version, ctx).await?;
476 0 : total_blocks += n_blocks as usize;
477 : }
478 0 : Ok(total_blocks)
479 0 : }
480 :
481 : /// Get size of a relation file. The relation must exist, otherwise an error is returned.
482 : ///
483 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
484 : /// page number stored in the shard.
485 12217 : pub(crate) async fn get_rel_size(
486 12217 : &self,
487 12217 : tag: RelTag,
488 12217 : version: Version<'_>,
489 12217 : ctx: &RequestContext,
490 12217 : ) -> Result<BlockNumber, PageReconstructError> {
491 12217 : if tag.relnode == 0 {
492 0 : return Err(PageReconstructError::Other(
493 0 : RelationError::InvalidRelnode.into(),
494 0 : ));
495 12217 : }
496 :
497 12217 : if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
498 12210 : return Ok(nblocks);
499 7 : }
500 7 :
501 7 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
502 0 : && !self.get_rel_exists(tag, version, ctx).await?
503 : {
504 : // FIXME: Postgres sometimes calls smgrcreate() to create
505 : // FSM, and smgrnblocks() on it immediately afterwards,
506 : // without extending it. Tolerate that by claiming that
507 : // any non-existent FSM fork has size 0.
508 0 : return Ok(0);
509 7 : }
510 7 :
511 7 : let key = rel_size_to_key(tag);
512 7 : let mut buf = version.get(self, key, ctx).await?;
513 5 : let nblocks = buf.get_u32_le();
514 5 :
515 5 : self.update_cached_rel_size(tag, version, nblocks);
516 5 :
517 5 : Ok(nblocks)
518 12217 : }
519 :
520 : /// Does the relation exist?
521 : ///
522 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
523 : /// the shard stores pages for.
524 3025 : pub(crate) async fn get_rel_exists(
525 3025 : &self,
526 3025 : tag: RelTag,
527 3025 : version: Version<'_>,
528 3025 : ctx: &RequestContext,
529 3025 : ) -> Result<bool, PageReconstructError> {
530 3025 : if tag.relnode == 0 {
531 0 : return Err(PageReconstructError::Other(
532 0 : RelationError::InvalidRelnode.into(),
533 0 : ));
534 3025 : }
535 :
536 : // first try to lookup relation in cache
537 3025 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
538 3016 : return Ok(true);
539 9 : }
540 : // then check if the database was already initialized.
541 : // get_rel_exists can be called before dbdir is created.
542 9 : let buf = version.get(self, DBDIR_KEY, ctx).await?;
543 9 : let dbdirs = DbDirectory::des(&buf)?.dbdirs;
544 9 : if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
545 0 : return Ok(false);
546 9 : }
547 9 :
548 9 : // Read path: first read the new reldir keyspace. Early return if the relation exists.
549 9 : // Otherwise, read the old reldir keyspace.
550 9 : // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
551 9 :
552 9 : if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
553 9 : self.get_rel_size_v2_status()
554 : {
555 : // fetch directory listing (new)
556 0 : let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
557 0 : let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
558 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
559 0 : let exists_v2 = buf == RelDirExists::Exists;
560 0 : // Fast path: if the relation exists in the new format, return true.
561 0 : // TODO: we should have a verification mode that checks both keyspaces
562 0 : // to ensure the relation only exists in one of them.
563 0 : if exists_v2 {
564 0 : return Ok(true);
565 0 : }
566 9 : }
567 :
568 : // fetch directory listing (old)
569 :
570 9 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
571 9 : let buf = version.get(self, key, ctx).await?;
572 :
573 9 : let dir = RelDirectory::des(&buf)?;
574 9 : let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
575 9 : Ok(exists_v1)
576 3025 : }
577 :
578 : /// Get a list of all existing relations in given tablespace and database.
579 : ///
580 : /// Only shard 0 has a full view of the relations. Other shards only know about relations that
581 : /// the shard stores pages for.
582 : ///
583 : /// # Cancel-Safety
584 : ///
585 : /// This method is cancellation-safe.
586 0 : pub(crate) async fn list_rels(
587 0 : &self,
588 0 : spcnode: Oid,
589 0 : dbnode: Oid,
590 0 : version: Version<'_>,
591 0 : ctx: &RequestContext,
592 0 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
593 0 : // fetch directory listing (old)
594 0 : let key = rel_dir_to_key(spcnode, dbnode);
595 0 : let buf = version.get(self, key, ctx).await?;
596 :
597 0 : let dir = RelDirectory::des(&buf)?;
598 0 : let rels_v1: HashSet<RelTag> =
599 0 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
600 0 : spcnode,
601 0 : dbnode,
602 0 : relnode: *relnode,
603 0 : forknum: *forknum,
604 0 : }));
605 0 :
606 0 : if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
607 0 : return Ok(rels_v1);
608 0 : }
609 0 :
610 0 : // scan directory listing (new), merge with the old results
611 0 : let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
612 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
613 0 : self.conf.get_vectored_concurrent_io,
614 0 : self.gate
615 0 : .enter()
616 0 : .map_err(|_| PageReconstructError::Cancelled)?,
617 : );
618 0 : let results = self
619 0 : .scan(
620 0 : KeySpace::single(key_range),
621 0 : version.get_lsn(),
622 0 : ctx,
623 0 : io_concurrency,
624 0 : )
625 0 : .await?;
626 0 : let mut rels = rels_v1;
627 0 : for (key, val) in results {
628 0 : let val = RelDirExists::decode(&val?)
629 0 : .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
630 0 : assert_eq!(key.field6, 1);
631 0 : assert_eq!(key.field2, spcnode);
632 0 : assert_eq!(key.field3, dbnode);
633 0 : let tag = RelTag {
634 0 : spcnode,
635 0 : dbnode,
636 0 : relnode: key.field4,
637 0 : forknum: key.field5,
638 0 : };
639 0 : if val == RelDirExists::Removed {
640 0 : debug_assert!(!rels.contains(&tag), "removed reltag in v2");
641 0 : continue;
642 0 : }
643 0 : let did_not_contain = rels.insert(tag);
644 0 : debug_assert!(did_not_contain, "duplicate reltag in v2");
645 : }
646 0 : Ok(rels)
647 0 : }
648 :
649 : /// Get the whole SLRU segment
650 0 : pub(crate) async fn get_slru_segment(
651 0 : &self,
652 0 : kind: SlruKind,
653 0 : segno: u32,
654 0 : lsn: Lsn,
655 0 : ctx: &RequestContext,
656 0 : ) -> Result<Bytes, PageReconstructError> {
657 0 : assert!(self.tenant_shard_id.is_shard_zero());
658 0 : let n_blocks = self
659 0 : .get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
660 0 : .await?;
661 :
662 0 : let keyspace = KeySpace::single(
663 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
664 0 : );
665 0 :
666 0 : let batches = keyspace.partition(
667 0 : self.get_shard_identity(),
668 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
669 0 : );
670 :
671 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
672 0 : self.conf.get_vectored_concurrent_io,
673 0 : self.gate
674 0 : .enter()
675 0 : .map_err(|_| PageReconstructError::Cancelled)?,
676 : );
677 :
678 0 : let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
679 0 : for batch in batches.parts {
680 0 : let query = VersionedKeySpaceQuery::uniform(batch, lsn);
681 0 : let blocks = self
682 0 : .get_vectored(query, io_concurrency.clone(), ctx)
683 0 : .await?;
684 :
685 0 : for (_key, block) in blocks {
686 0 : let block = block?;
687 0 : segment.extend_from_slice(&block[..BLCKSZ as usize]);
688 : }
689 : }
690 :
691 0 : Ok(segment.freeze())
692 0 : }
693 :
694 : /// Get size of an SLRU segment
695 0 : pub(crate) async fn get_slru_segment_size(
696 0 : &self,
697 0 : kind: SlruKind,
698 0 : segno: u32,
699 0 : version: Version<'_>,
700 0 : ctx: &RequestContext,
701 0 : ) -> Result<BlockNumber, PageReconstructError> {
702 0 : assert!(self.tenant_shard_id.is_shard_zero());
703 0 : let key = slru_segment_size_to_key(kind, segno);
704 0 : let mut buf = version.get(self, key, ctx).await?;
705 0 : Ok(buf.get_u32_le())
706 0 : }
707 :
708 : /// Does the slru segment exist?
709 0 : pub(crate) async fn get_slru_segment_exists(
710 0 : &self,
711 0 : kind: SlruKind,
712 0 : segno: u32,
713 0 : version: Version<'_>,
714 0 : ctx: &RequestContext,
715 0 : ) -> Result<bool, PageReconstructError> {
716 0 : assert!(self.tenant_shard_id.is_shard_zero());
717 : // fetch directory listing
718 0 : let key = slru_dir_to_key(kind);
719 0 : let buf = version.get(self, key, ctx).await?;
720 :
721 0 : let dir = SlruSegmentDirectory::des(&buf)?;
722 0 : Ok(dir.segments.contains(&segno))
723 0 : }
724 :
725 : /// Locate LSN, such that all transactions that committed before
726 : /// 'search_timestamp' are visible, but nothing newer is.
727 : ///
728 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
729 : /// so it's not well defined which LSN you get if there were multiple commits
730 : /// "in flight" at that point in time.
731 : ///
732 0 : pub(crate) async fn find_lsn_for_timestamp(
733 0 : &self,
734 0 : search_timestamp: TimestampTz,
735 0 : cancel: &CancellationToken,
736 0 : ctx: &RequestContext,
737 0 : ) -> Result<LsnForTimestamp, PageReconstructError> {
738 0 : pausable_failpoint!("find-lsn-for-timestamp-pausable");
739 :
740 0 : let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
741 0 : let gc_cutoff_planned = {
742 0 : let gc_info = self.gc_info.read().unwrap();
743 0 : gc_info.min_cutoff()
744 0 : };
745 0 : // Usually the planned cutoff is newer than the cutoff of the last gc run,
746 0 : // but let's be defensive.
747 0 : let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
748 0 : // We use this method to figure out the branching LSN for the new branch, but the
749 0 : // GC cutoff could be before the branching point and we cannot create a new branch
750 0 : // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
751 0 : // on the safe side.
752 0 : let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
753 0 : let max_lsn = self.get_last_record_lsn();
754 0 :
755 0 : // LSNs are always 8-byte aligned. low/mid/high represent the
756 0 : // LSN divided by 8.
757 0 : let mut low = min_lsn.0 / 8;
758 0 : let mut high = max_lsn.0 / 8 + 1;
759 0 :
760 0 : let mut found_smaller = false;
761 0 : let mut found_larger = false;
762 :
763 0 : while low < high {
764 0 : if cancel.is_cancelled() {
765 0 : return Err(PageReconstructError::Cancelled);
766 0 : }
767 0 : // cannot overflow, high and low are both smaller than u64::MAX / 2
768 0 : let mid = (high + low) / 2;
769 :
770 0 : let cmp = match self
771 0 : .is_latest_commit_timestamp_ge_than(
772 0 : search_timestamp,
773 0 : Lsn(mid * 8),
774 0 : &mut found_smaller,
775 0 : &mut found_larger,
776 0 : ctx,
777 0 : )
778 0 : .await
779 : {
780 0 : Ok(res) => res,
781 0 : Err(PageReconstructError::MissingKey(e)) => {
782 0 : warn!(
783 0 : "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
784 : e
785 : );
786 : // Return that we didn't find any requests smaller than the LSN, and logging the error.
787 0 : return Ok(LsnForTimestamp::Past(min_lsn));
788 : }
789 0 : Err(e) => return Err(e),
790 : };
791 :
792 0 : if cmp {
793 0 : high = mid;
794 0 : } else {
795 0 : low = mid + 1;
796 0 : }
797 : }
798 :
799 : // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
800 : // so the LSN of the last commit record before or at `search_timestamp`.
801 : // Remove one from `low` to get `t`.
802 : //
803 : // FIXME: it would be better to get the LSN of the previous commit.
804 : // Otherwise, if you restore to the returned LSN, the database will
805 : // include physical changes from later commits that will be marked
806 : // as aborted, and will need to be vacuumed away.
807 0 : let commit_lsn = Lsn((low - 1) * 8);
808 0 : match (found_smaller, found_larger) {
809 : (false, false) => {
810 : // This can happen if no commit records have been processed yet, e.g.
811 : // just after importing a cluster.
812 0 : Ok(LsnForTimestamp::NoData(min_lsn))
813 : }
814 : (false, true) => {
815 : // Didn't find any commit timestamps smaller than the request
816 0 : Ok(LsnForTimestamp::Past(min_lsn))
817 : }
818 0 : (true, _) if commit_lsn < min_lsn => {
819 0 : // the search above did set found_smaller to true but it never increased the lsn.
820 0 : // Then, low is still the old min_lsn, and the subtraction above gave a value
821 0 : // below the min_lsn. We should never do that.
822 0 : Ok(LsnForTimestamp::Past(min_lsn))
823 : }
824 : (true, false) => {
825 : // Only found commits with timestamps smaller than the request.
826 : // It's still a valid case for branch creation, return it.
827 : // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
828 : // case, anyway.
829 0 : Ok(LsnForTimestamp::Future(commit_lsn))
830 : }
831 0 : (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
832 : }
833 0 : }
834 :
835 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
836 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
837 : ///
838 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
839 : /// with a smaller/larger timestamp.
840 : ///
841 0 : pub(crate) async fn is_latest_commit_timestamp_ge_than(
842 0 : &self,
843 0 : search_timestamp: TimestampTz,
844 0 : probe_lsn: Lsn,
845 0 : found_smaller: &mut bool,
846 0 : found_larger: &mut bool,
847 0 : ctx: &RequestContext,
848 0 : ) -> Result<bool, PageReconstructError> {
849 0 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
850 0 : if timestamp >= search_timestamp {
851 0 : *found_larger = true;
852 0 : return ControlFlow::Break(true);
853 0 : } else {
854 0 : *found_smaller = true;
855 0 : }
856 0 : ControlFlow::Continue(())
857 0 : })
858 0 : .await
859 0 : }
860 :
861 : /// Obtain the timestamp for the given lsn.
862 : ///
863 : /// If the lsn has no timestamps (e.g. no commits), returns None.
864 0 : pub(crate) async fn get_timestamp_for_lsn(
865 0 : &self,
866 0 : probe_lsn: Lsn,
867 0 : ctx: &RequestContext,
868 0 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
869 0 : let mut max: Option<TimestampTz> = None;
870 0 : self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
871 0 : if let Some(max_prev) = max {
872 0 : max = Some(max_prev.max(timestamp));
873 0 : } else {
874 0 : max = Some(timestamp);
875 0 : }
876 0 : ControlFlow::Continue(())
877 0 : })
878 0 : .await?;
879 :
880 0 : Ok(max)
881 0 : }
882 :
883 : /// Runs the given function on all the timestamps for a given lsn
884 : ///
885 : /// The return value is either given by the closure, or set to the `Default`
886 : /// impl's output.
887 0 : async fn map_all_timestamps<T: Default>(
888 0 : &self,
889 0 : probe_lsn: Lsn,
890 0 : ctx: &RequestContext,
891 0 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
892 0 : ) -> Result<T, PageReconstructError> {
893 0 : for segno in self
894 0 : .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
895 0 : .await?
896 : {
897 0 : let nblocks = self
898 0 : .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
899 0 : .await?;
900 :
901 0 : let keyspace = KeySpace::single(
902 0 : slru_block_to_key(SlruKind::Clog, segno, 0)
903 0 : ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
904 0 : );
905 0 :
906 0 : let batches = keyspace.partition(
907 0 : self.get_shard_identity(),
908 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
909 0 : );
910 :
911 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
912 0 : self.conf.get_vectored_concurrent_io,
913 0 : self.gate
914 0 : .enter()
915 0 : .map_err(|_| PageReconstructError::Cancelled)?,
916 : );
917 :
918 0 : for batch in batches.parts.into_iter().rev() {
919 0 : let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
920 0 : let blocks = self
921 0 : .get_vectored(query, io_concurrency.clone(), ctx)
922 0 : .await?;
923 :
924 0 : for (_key, clog_page) in blocks.into_iter().rev() {
925 0 : let clog_page = clog_page?;
926 :
927 0 : if clog_page.len() == BLCKSZ as usize + 8 {
928 0 : let mut timestamp_bytes = [0u8; 8];
929 0 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
930 0 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
931 0 :
932 0 : match f(timestamp) {
933 0 : ControlFlow::Break(b) => return Ok(b),
934 0 : ControlFlow::Continue(()) => (),
935 : }
936 0 : }
937 : }
938 : }
939 : }
940 0 : Ok(Default::default())
941 0 : }
942 :
943 0 : pub(crate) async fn get_slru_keyspace(
944 0 : &self,
945 0 : version: Version<'_>,
946 0 : ctx: &RequestContext,
947 0 : ) -> Result<KeySpace, PageReconstructError> {
948 0 : let mut accum = KeySpaceAccum::new();
949 :
950 0 : for kind in SlruKind::iter() {
951 0 : let mut segments: Vec<u32> = self
952 0 : .list_slru_segments(kind, version, ctx)
953 0 : .await?
954 0 : .into_iter()
955 0 : .collect();
956 0 : segments.sort_unstable();
957 :
958 0 : for seg in segments {
959 0 : let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
960 :
961 0 : accum.add_range(
962 0 : slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
963 0 : );
964 : }
965 : }
966 :
967 0 : Ok(accum.to_keyspace())
968 0 : }
969 :
970 : /// Get a list of SLRU segments
971 0 : pub(crate) async fn list_slru_segments(
972 0 : &self,
973 0 : kind: SlruKind,
974 0 : version: Version<'_>,
975 0 : ctx: &RequestContext,
976 0 : ) -> Result<HashSet<u32>, PageReconstructError> {
977 0 : // fetch directory entry
978 0 : let key = slru_dir_to_key(kind);
979 :
980 0 : let buf = version.get(self, key, ctx).await?;
981 0 : Ok(SlruSegmentDirectory::des(&buf)?.segments)
982 0 : }
983 :
984 0 : pub(crate) async fn get_relmap_file(
985 0 : &self,
986 0 : spcnode: Oid,
987 0 : dbnode: Oid,
988 0 : version: Version<'_>,
989 0 : ctx: &RequestContext,
990 0 : ) -> Result<Bytes, PageReconstructError> {
991 0 : let key = relmap_file_key(spcnode, dbnode);
992 :
993 0 : let buf = version.get(self, key, ctx).await?;
994 0 : Ok(buf)
995 0 : }
996 :
997 167 : pub(crate) async fn list_dbdirs(
998 167 : &self,
999 167 : lsn: Lsn,
1000 167 : ctx: &RequestContext,
1001 167 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
1002 : // fetch directory entry
1003 167 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1004 :
1005 167 : Ok(DbDirectory::des(&buf)?.dbdirs)
1006 167 : }
1007 :
1008 0 : pub(crate) async fn get_twophase_file(
1009 0 : &self,
1010 0 : xid: u64,
1011 0 : lsn: Lsn,
1012 0 : ctx: &RequestContext,
1013 0 : ) -> Result<Bytes, PageReconstructError> {
1014 0 : let key = twophase_file_key(xid);
1015 0 : let buf = self.get(key, lsn, ctx).await?;
1016 0 : Ok(buf)
1017 0 : }
1018 :
1019 168 : pub(crate) async fn list_twophase_files(
1020 168 : &self,
1021 168 : lsn: Lsn,
1022 168 : ctx: &RequestContext,
1023 168 : ) -> Result<HashSet<u64>, PageReconstructError> {
1024 : // fetch directory entry
1025 168 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
1026 :
1027 168 : if self.pg_version >= 17 {
1028 155 : Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
1029 : } else {
1030 13 : Ok(TwoPhaseDirectory::des(&buf)?
1031 : .xids
1032 13 : .iter()
1033 13 : .map(|x| u64::from(*x))
1034 13 : .collect())
1035 : }
1036 168 : }
1037 :
1038 0 : pub(crate) async fn get_control_file(
1039 0 : &self,
1040 0 : lsn: Lsn,
1041 0 : ctx: &RequestContext,
1042 0 : ) -> Result<Bytes, PageReconstructError> {
1043 0 : self.get(CONTROLFILE_KEY, lsn, ctx).await
1044 0 : }
1045 :
1046 6 : pub(crate) async fn get_checkpoint(
1047 6 : &self,
1048 6 : lsn: Lsn,
1049 6 : ctx: &RequestContext,
1050 6 : ) -> Result<Bytes, PageReconstructError> {
1051 6 : self.get(CHECKPOINT_KEY, lsn, ctx).await
1052 6 : }
1053 :
1054 6 : async fn list_aux_files_v2(
1055 6 : &self,
1056 6 : lsn: Lsn,
1057 6 : ctx: &RequestContext,
1058 6 : io_concurrency: IoConcurrency,
1059 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1060 6 : let kv = self
1061 6 : .scan(
1062 6 : KeySpace::single(Key::metadata_aux_key_range()),
1063 6 : lsn,
1064 6 : ctx,
1065 6 : io_concurrency,
1066 6 : )
1067 6 : .await?;
1068 6 : let mut result = HashMap::new();
1069 6 : let mut sz = 0;
1070 15 : for (_, v) in kv {
1071 9 : let v = v?;
1072 9 : let v = aux_file::decode_file_value_bytes(&v)
1073 9 : .context("value decode")
1074 9 : .map_err(PageReconstructError::Other)?;
1075 17 : for (fname, content) in v {
1076 8 : sz += fname.len();
1077 8 : sz += content.len();
1078 8 : result.insert(fname, content);
1079 8 : }
1080 : }
1081 6 : self.aux_file_size_estimator.on_initial(sz);
1082 6 : Ok(result)
1083 6 : }
1084 :
1085 0 : pub(crate) async fn trigger_aux_file_size_computation(
1086 0 : &self,
1087 0 : lsn: Lsn,
1088 0 : ctx: &RequestContext,
1089 0 : io_concurrency: IoConcurrency,
1090 0 : ) -> Result<(), PageReconstructError> {
1091 0 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
1092 0 : Ok(())
1093 0 : }
1094 :
1095 6 : pub(crate) async fn list_aux_files(
1096 6 : &self,
1097 6 : lsn: Lsn,
1098 6 : ctx: &RequestContext,
1099 6 : io_concurrency: IoConcurrency,
1100 6 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
1101 6 : self.list_aux_files_v2(lsn, ctx, io_concurrency).await
1102 6 : }
1103 :
1104 2 : pub(crate) async fn get_replorigins(
1105 2 : &self,
1106 2 : lsn: Lsn,
1107 2 : ctx: &RequestContext,
1108 2 : io_concurrency: IoConcurrency,
1109 2 : ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
1110 2 : let kv = self
1111 2 : .scan(
1112 2 : KeySpace::single(repl_origin_key_range()),
1113 2 : lsn,
1114 2 : ctx,
1115 2 : io_concurrency,
1116 2 : )
1117 2 : .await?;
1118 2 : let mut result = HashMap::new();
1119 6 : for (k, v) in kv {
1120 5 : let v = v?;
1121 5 : if v.is_empty() {
1122 : // This is a tombstone -- we can skip it.
1123 : // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
1124 : // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
1125 : // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
1126 : // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
1127 2 : continue;
1128 3 : }
1129 3 : let origin_id = k.field6 as RepOriginId;
1130 3 : let origin_lsn = Lsn::des(&v)
1131 3 : .with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
1132 2 : if origin_lsn != Lsn::INVALID {
1133 2 : result.insert(origin_id, origin_lsn);
1134 2 : }
1135 : }
1136 1 : Ok(result)
1137 2 : }
1138 :
1139 : /// Does the same as get_current_logical_size but counted on demand.
1140 : /// Used to initialize the logical size tracking on startup.
1141 : ///
1142 : /// Only relation blocks are counted currently. That excludes metadata,
1143 : /// SLRUs, twophase files etc.
1144 : ///
1145 : /// # Cancel-Safety
1146 : ///
1147 : /// This method is cancellation-safe.
1148 7 : pub(crate) async fn get_current_logical_size_non_incremental(
1149 7 : &self,
1150 7 : lsn: Lsn,
1151 7 : ctx: &RequestContext,
1152 7 : ) -> Result<u64, CalculateLogicalSizeError> {
1153 7 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1154 7 :
1155 7 : fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
1156 :
1157 : // Fetch list of database dirs and iterate them
1158 7 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
1159 7 : let dbdir = DbDirectory::des(&buf)?;
1160 :
1161 7 : let mut total_size: u64 = 0;
1162 7 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
1163 0 : for rel in self
1164 0 : .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
1165 0 : .await?
1166 : {
1167 0 : if self.cancel.is_cancelled() {
1168 0 : return Err(CalculateLogicalSizeError::Cancelled);
1169 0 : }
1170 0 : let relsize_key = rel_size_to_key(rel);
1171 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1172 0 : let relsize = buf.get_u32_le();
1173 0 :
1174 0 : total_size += relsize as u64;
1175 : }
1176 : }
1177 7 : Ok(total_size * BLCKSZ as u64)
1178 7 : }
1179 :
1180 : /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
1181 : /// for gc-compaction.
1182 : ///
1183 : /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
1184 : /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
1185 : /// be kept only for a specific range of LSN.
1186 : ///
1187 : /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
1188 : /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
1189 : /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
1190 : /// determine which keys to retain/drop for gc-compaction.
1191 : ///
1192 : /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
1193 : /// to be retained for each of the branch LSN.
1194 : ///
1195 : /// The return value is (dense keyspace, sparse keyspace).
1196 27 : pub(crate) async fn collect_gc_compaction_keyspace(
1197 27 : &self,
1198 27 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1199 27 : let metadata_key_begin = Key::metadata_key_range().start;
1200 27 : let aux_v1_key = AUX_FILES_KEY;
1201 27 : let dense_keyspace = KeySpace {
1202 27 : ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
1203 27 : };
1204 27 : Ok((
1205 27 : dense_keyspace,
1206 27 : SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
1207 27 : ))
1208 27 : }
1209 :
1210 : ///
1211 : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
1212 : /// Anything that's not listed maybe removed from the underlying storage (from
1213 : /// that LSN forwards).
1214 : ///
1215 : /// The return value is (dense keyspace, sparse keyspace).
1216 167 : pub(crate) async fn collect_keyspace(
1217 167 : &self,
1218 167 : lsn: Lsn,
1219 167 : ctx: &RequestContext,
1220 167 : ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
1221 167 : // Iterate through key ranges, greedily packing them into partitions
1222 167 : let mut result = KeySpaceAccum::new();
1223 167 :
1224 167 : // The dbdir metadata always exists
1225 167 : result.add_key(DBDIR_KEY);
1226 :
1227 : // Fetch list of database dirs and iterate them
1228 167 : let dbdir = self.list_dbdirs(lsn, ctx).await?;
1229 167 : let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
1230 167 :
1231 167 : dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
1232 167 : for ((spcnode, dbnode), has_relmap_file) in dbs {
1233 0 : if has_relmap_file {
1234 0 : result.add_key(relmap_file_key(spcnode, dbnode));
1235 0 : }
1236 0 : result.add_key(rel_dir_to_key(spcnode, dbnode));
1237 :
1238 0 : let mut rels: Vec<RelTag> = self
1239 0 : .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
1240 0 : .await?
1241 0 : .into_iter()
1242 0 : .collect();
1243 0 : rels.sort_unstable();
1244 0 : for rel in rels {
1245 0 : let relsize_key = rel_size_to_key(rel);
1246 0 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
1247 0 : let relsize = buf.get_u32_le();
1248 0 :
1249 0 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
1250 0 : result.add_key(relsize_key);
1251 : }
1252 : }
1253 :
1254 : // Iterate SLRUs next
1255 167 : if self.tenant_shard_id.is_shard_zero() {
1256 492 : for kind in [
1257 164 : SlruKind::Clog,
1258 164 : SlruKind::MultiXactMembers,
1259 164 : SlruKind::MultiXactOffsets,
1260 : ] {
1261 492 : let slrudir_key = slru_dir_to_key(kind);
1262 492 : result.add_key(slrudir_key);
1263 492 : let buf = self.get(slrudir_key, lsn, ctx).await?;
1264 492 : let dir = SlruSegmentDirectory::des(&buf)?;
1265 492 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
1266 492 : segments.sort_unstable();
1267 492 : for segno in segments {
1268 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
1269 0 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
1270 0 : let segsize = buf.get_u32_le();
1271 0 :
1272 0 : result.add_range(
1273 0 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
1274 0 : );
1275 0 : result.add_key(segsize_key);
1276 : }
1277 : }
1278 3 : }
1279 :
1280 : // Then pg_twophase
1281 167 : result.add_key(TWOPHASEDIR_KEY);
1282 :
1283 167 : let mut xids: Vec<u64> = self
1284 167 : .list_twophase_files(lsn, ctx)
1285 167 : .await?
1286 167 : .iter()
1287 167 : .cloned()
1288 167 : .collect();
1289 167 : xids.sort_unstable();
1290 167 : for xid in xids {
1291 0 : result.add_key(twophase_file_key(xid));
1292 0 : }
1293 :
1294 167 : result.add_key(CONTROLFILE_KEY);
1295 167 : result.add_key(CHECKPOINT_KEY);
1296 167 :
1297 167 : // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
1298 167 : // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
1299 167 : // and the keys will not be garbage-colllected.
1300 167 : #[cfg(test)]
1301 167 : {
1302 167 : let guard = self.extra_test_dense_keyspace.load();
1303 167 : for kr in &guard.ranges {
1304 0 : result.add_range(kr.clone());
1305 0 : }
1306 0 : }
1307 0 :
1308 167 : let dense_keyspace = result.to_keyspace();
1309 167 : let sparse_keyspace = SparseKeySpace(KeySpace {
1310 167 : ranges: vec![
1311 167 : Key::metadata_aux_key_range(),
1312 167 : repl_origin_key_range(),
1313 167 : Key::rel_dir_sparse_key_range(),
1314 167 : ],
1315 167 : });
1316 167 :
1317 167 : if cfg!(debug_assertions) {
1318 : // Verify if the sparse keyspaces are ordered and non-overlapping.
1319 :
1320 : // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
1321 : // category of sparse keys are split into their own image/delta files. If there
1322 : // are overlapping keyspaces, they will be automatically merged by keyspace accum,
1323 : // and we want the developer to keep the keyspaces separated.
1324 :
1325 167 : let ranges = &sparse_keyspace.0.ranges;
1326 :
1327 : // TODO: use a single overlaps_with across the codebase
1328 501 : fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
1329 501 : !(a.end <= b.start || b.end <= a.start)
1330 501 : }
1331 501 : for i in 0..ranges.len() {
1332 501 : for j in 0..i {
1333 501 : if overlaps_with(&ranges[i], &ranges[j]) {
1334 0 : panic!(
1335 0 : "overlapping sparse keyspace: {}..{} and {}..{}",
1336 0 : ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
1337 0 : );
1338 501 : }
1339 : }
1340 : }
1341 334 : for i in 1..ranges.len() {
1342 334 : assert!(
1343 334 : ranges[i - 1].end <= ranges[i].start,
1344 0 : "unordered sparse keyspace: {}..{} and {}..{}",
1345 0 : ranges[i - 1].start,
1346 0 : ranges[i - 1].end,
1347 0 : ranges[i].start,
1348 0 : ranges[i].end
1349 : );
1350 : }
1351 0 : }
1352 :
1353 167 : Ok((dense_keyspace, sparse_keyspace))
1354 167 : }
1355 :
1356 : /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
1357 : /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
1358 : /// at the particular LSN (snapshot).
1359 224270 : pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
1360 224270 : let lsn = version.get_lsn();
1361 224270 : {
1362 224270 : let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
1363 224270 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
1364 224259 : if lsn >= *cached_lsn {
1365 221686 : RELSIZE_LATEST_CACHE_HITS.inc();
1366 221686 : return Some(*nblocks);
1367 2573 : }
1368 2573 : RELSIZE_CACHE_MISSES_OLD.inc();
1369 11 : }
1370 : }
1371 : {
1372 2584 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1373 2584 : if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
1374 2563 : RELSIZE_SNAPSHOT_CACHE_HITS.inc();
1375 2563 : return Some(*nblock);
1376 21 : }
1377 21 : }
1378 21 : if version.is_latest() {
1379 10 : RELSIZE_LATEST_CACHE_MISSES.inc();
1380 11 : } else {
1381 11 : RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
1382 11 : }
1383 21 : None
1384 224270 : }
1385 :
1386 : /// Update cached relation size if there is no more recent update
1387 5 : pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
1388 5 : let lsn = version.get_lsn();
1389 5 : if version.is_latest() {
1390 0 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1391 0 : match rel_size_cache.entry(tag) {
1392 0 : hash_map::Entry::Occupied(mut entry) => {
1393 0 : let cached_lsn = entry.get_mut();
1394 0 : if lsn >= cached_lsn.0 {
1395 0 : *cached_lsn = (lsn, nblocks);
1396 0 : }
1397 : }
1398 0 : hash_map::Entry::Vacant(entry) => {
1399 0 : entry.insert((lsn, nblocks));
1400 0 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1401 0 : }
1402 : }
1403 : } else {
1404 5 : let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
1405 5 : if rel_size_cache.capacity() != 0 {
1406 5 : rel_size_cache.insert((lsn, tag), nblocks);
1407 5 : RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
1408 5 : }
1409 : }
1410 5 : }
1411 :
1412 : /// Store cached relation size
1413 141360 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
1414 141360 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1415 141360 : if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
1416 960 : RELSIZE_LATEST_CACHE_ENTRIES.inc();
1417 140400 : }
1418 141360 : }
1419 :
1420 : /// Remove cached relation size
1421 1 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
1422 1 : let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
1423 1 : if rel_size_cache.remove(tag).is_some() {
1424 1 : RELSIZE_LATEST_CACHE_ENTRIES.dec();
1425 1 : }
1426 1 : }
1427 : }
1428 :
1429 : /// DatadirModification represents an operation to ingest an atomic set of
1430 : /// updates to the repository.
1431 : ///
1432 : /// It is created by the 'begin_record' function. It is called for each WAL
1433 : /// record, so that all the modifications by a one WAL record appear atomic.
1434 : pub struct DatadirModification<'a> {
1435 : /// The timeline this modification applies to. You can access this to
1436 : /// read the state, but note that any pending updates are *not* reflected
1437 : /// in the state in 'tline' yet.
1438 : pub tline: &'a Timeline,
1439 :
1440 : /// Current LSN of the modification
1441 : lsn: Lsn,
1442 :
1443 : // The modifications are not applied directly to the underlying key-value store.
1444 : // The put-functions add the modifications here, and they are flushed to the
1445 : // underlying key-value store by the 'finish' function.
1446 : pending_lsns: Vec<Lsn>,
1447 : pending_deletions: Vec<(Range<Key>, Lsn)>,
1448 : pending_nblocks: i64,
1449 :
1450 : /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
1451 : /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
1452 : pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
1453 :
1454 : /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
1455 : /// which keys are stored here.
1456 : pending_data_batch: Option<SerializedValueBatch>,
1457 :
1458 : /// For special "directory" keys that store key-value maps, track the size of the map
1459 : /// if it was updated in this modification.
1460 : pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
1461 :
1462 : /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
1463 : pending_metadata_bytes: usize,
1464 : }
1465 :
1466 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1467 : pub enum MetricsUpdate {
1468 : /// Set the metrics to this value
1469 : Set(u64),
1470 : /// Increment the metrics by this value
1471 : Add(u64),
1472 : /// Decrement the metrics by this value
1473 : Sub(u64),
1474 : }
1475 :
1476 : impl DatadirModification<'_> {
1477 : // When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
1478 : // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
1479 : // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
1480 : pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
1481 :
1482 : /// Get the current lsn
1483 1 : pub(crate) fn get_lsn(&self) -> Lsn {
1484 1 : self.lsn
1485 1 : }
1486 :
1487 0 : pub(crate) fn approx_pending_bytes(&self) -> usize {
1488 0 : self.pending_data_batch
1489 0 : .as_ref()
1490 0 : .map_or(0, |b| b.buffer_size())
1491 0 : + self.pending_metadata_bytes
1492 0 : }
1493 :
1494 0 : pub(crate) fn has_dirty_data(&self) -> bool {
1495 0 : self.pending_data_batch
1496 0 : .as_ref()
1497 0 : .is_some_and(|b| b.has_data())
1498 0 : }
1499 :
1500 : /// Returns statistics about the currently pending modifications.
1501 0 : pub(crate) fn stats(&self) -> DatadirModificationStats {
1502 0 : let mut stats = DatadirModificationStats::default();
1503 0 : for (_, _, value) in self.pending_metadata_pages.values().flatten() {
1504 0 : match value {
1505 0 : Value::Image(_) => stats.metadata_images += 1,
1506 0 : Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
1507 0 : Value::WalRecord(_) => stats.metadata_deltas += 1,
1508 : }
1509 : }
1510 0 : for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
1511 0 : match valuemeta {
1512 0 : ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
1513 0 : ValueMeta::Serialized(_) => stats.data_deltas += 1,
1514 0 : ValueMeta::Observed(_) => {}
1515 : }
1516 : }
1517 0 : stats
1518 0 : }
1519 :
1520 : /// Set the current lsn
1521 72929 : pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
1522 72929 : ensure_walingest!(
1523 72929 : lsn >= self.lsn,
1524 72929 : "setting an older lsn {} than {} is not allowed",
1525 72929 : lsn,
1526 72929 : self.lsn
1527 72929 : );
1528 :
1529 72929 : if lsn > self.lsn {
1530 72929 : self.pending_lsns.push(self.lsn);
1531 72929 : self.lsn = lsn;
1532 72929 : }
1533 72929 : Ok(())
1534 72929 : }
1535 :
1536 : /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
1537 : /// keys that represent literal blocks that postgres can read. So data includes relation blocks and
1538 : /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
1539 : ///
1540 : /// The distinction is important because data keys are handled on a fast path where dirty writes are
1541 : /// not readable until this modification is committed, whereas metadata keys are visible for read
1542 : /// via [`Self::get`] as soon as their record has been ingested.
1543 425357 : fn is_data_key(key: &Key) -> bool {
1544 425357 : key.is_rel_block_key() || key.is_slru_block_key()
1545 425357 : }
1546 :
1547 : /// Initialize a completely new repository.
1548 : ///
1549 : /// This inserts the directory metadata entries that are assumed to
1550 : /// always exist.
1551 110 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
1552 110 : let buf = DbDirectory::ser(&DbDirectory {
1553 110 : dbdirs: HashMap::new(),
1554 110 : })?;
1555 110 : self.pending_directory_entries
1556 110 : .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
1557 110 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1558 :
1559 110 : let buf = if self.tline.pg_version >= 17 {
1560 97 : TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
1561 97 : xids: HashSet::new(),
1562 97 : })
1563 : } else {
1564 13 : TwoPhaseDirectory::ser(&TwoPhaseDirectory {
1565 13 : xids: HashSet::new(),
1566 13 : })
1567 0 : }?;
1568 110 : self.pending_directory_entries
1569 110 : .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
1570 110 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
1571 :
1572 110 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
1573 110 : let empty_dir = Value::Image(buf);
1574 110 :
1575 110 : // Initialize SLRUs on shard 0 only: creating these on other shards would be
1576 110 : // harmless but they'd just be dropped on later compaction.
1577 110 : if self.tline.tenant_shard_id.is_shard_zero() {
1578 107 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
1579 107 : self.pending_directory_entries.push((
1580 107 : DirectoryKind::SlruSegment(SlruKind::Clog),
1581 107 : MetricsUpdate::Set(0),
1582 107 : ));
1583 107 : self.put(
1584 107 : slru_dir_to_key(SlruKind::MultiXactMembers),
1585 107 : empty_dir.clone(),
1586 107 : );
1587 107 : self.pending_directory_entries.push((
1588 107 : DirectoryKind::SlruSegment(SlruKind::Clog),
1589 107 : MetricsUpdate::Set(0),
1590 107 : ));
1591 107 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
1592 107 : self.pending_directory_entries.push((
1593 107 : DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
1594 107 : MetricsUpdate::Set(0),
1595 107 : ));
1596 107 : }
1597 :
1598 110 : Ok(())
1599 110 : }
1600 :
1601 : #[cfg(test)]
1602 109 : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
1603 109 : self.init_empty()?;
1604 109 : self.put_control_file(bytes::Bytes::from_static(
1605 109 : b"control_file contents do not matter",
1606 109 : ))
1607 109 : .context("put_control_file")?;
1608 109 : self.put_checkpoint(bytes::Bytes::from_static(
1609 109 : b"checkpoint_file contents do not matter",
1610 109 : ))
1611 109 : .context("put_checkpoint_file")?;
1612 109 : Ok(())
1613 109 : }
1614 :
1615 : /// Creates a relation if it is not already present.
1616 : /// Returns the current size of the relation
1617 209028 : pub(crate) async fn create_relation_if_required(
1618 209028 : &mut self,
1619 209028 : rel: RelTag,
1620 209028 : ctx: &RequestContext,
1621 209028 : ) -> Result<u32, WalIngestError> {
1622 : // Get current size and put rel creation if rel doesn't exist
1623 : //
1624 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1625 : // check the cache too. This is because eagerly checking the cache results in
1626 : // less work overall and 10% better performance. It's more work on cache miss
1627 : // but cache miss is rare.
1628 209028 : if let Some(nblocks) = self
1629 209028 : .tline
1630 209028 : .get_cached_rel_size(&rel, Version::Modified(self))
1631 : {
1632 209023 : Ok(nblocks)
1633 5 : } else if !self
1634 5 : .tline
1635 5 : .get_rel_exists(rel, Version::Modified(self), ctx)
1636 5 : .await?
1637 : {
1638 : // create it with 0 size initially, the logic below will extend it
1639 5 : self.put_rel_creation(rel, 0, ctx).await?;
1640 5 : Ok(0)
1641 : } else {
1642 0 : Ok(self
1643 0 : .tline
1644 0 : .get_rel_size(rel, Version::Modified(self), ctx)
1645 0 : .await?)
1646 : }
1647 209028 : }
1648 :
1649 : /// Given a block number for a relation (which represents a newly written block),
1650 : /// the previous block count of the relation, and the shard info, find the gaps
1651 : /// that were created by the newly written block if any.
1652 72835 : fn find_gaps(
1653 72835 : rel: RelTag,
1654 72835 : blkno: u32,
1655 72835 : previous_nblocks: u32,
1656 72835 : shard: &ShardIdentity,
1657 72835 : ) -> Option<KeySpace> {
1658 72835 : let mut key = rel_block_to_key(rel, blkno);
1659 72835 : let mut gap_accum = None;
1660 :
1661 72835 : for gap_blkno in previous_nblocks..blkno {
1662 16 : key.field6 = gap_blkno;
1663 16 :
1664 16 : if shard.get_shard_number(&key) != shard.number {
1665 4 : continue;
1666 12 : }
1667 12 :
1668 12 : gap_accum
1669 12 : .get_or_insert_with(KeySpaceAccum::new)
1670 12 : .add_key(key);
1671 : }
1672 :
1673 72835 : gap_accum.map(|accum| accum.to_keyspace())
1674 72835 : }
1675 :
1676 72926 : pub async fn ingest_batch(
1677 72926 : &mut self,
1678 72926 : mut batch: SerializedValueBatch,
1679 72926 : // TODO(vlad): remove this argument and replace the shard check with is_key_local
1680 72926 : shard: &ShardIdentity,
1681 72926 : ctx: &RequestContext,
1682 72926 : ) -> Result<(), WalIngestError> {
1683 72926 : let mut gaps_at_lsns = Vec::default();
1684 :
1685 72926 : for meta in batch.metadata.iter() {
1686 72821 : let key = Key::from_compact(meta.key());
1687 72821 : let (rel, blkno) = key
1688 72821 : .to_rel_block()
1689 72821 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
1690 72821 : let new_nblocks = blkno + 1;
1691 :
1692 72821 : let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
1693 72821 : if new_nblocks > old_nblocks {
1694 1195 : self.put_rel_extend(rel, new_nblocks, ctx).await?;
1695 71626 : }
1696 :
1697 72821 : if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
1698 0 : gaps_at_lsns.push((gaps, meta.lsn()));
1699 72821 : }
1700 : }
1701 :
1702 72926 : if !gaps_at_lsns.is_empty() {
1703 0 : batch.zero_gaps(gaps_at_lsns);
1704 72926 : }
1705 :
1706 72926 : match self.pending_data_batch.as_mut() {
1707 10 : Some(pending_batch) => {
1708 10 : pending_batch.extend(batch);
1709 10 : }
1710 72916 : None if batch.has_data() => {
1711 72815 : self.pending_data_batch = Some(batch);
1712 72815 : }
1713 101 : None => {
1714 101 : // Nothing to initialize the batch with
1715 101 : }
1716 : }
1717 :
1718 72926 : Ok(())
1719 72926 : }
1720 :
1721 : /// Put a new page version that can be constructed from a WAL record
1722 : ///
1723 : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
1724 : /// current end-of-file. It's up to the caller to check that the relation size
1725 : /// matches the blocks inserted!
1726 6 : pub fn put_rel_wal_record(
1727 6 : &mut self,
1728 6 : rel: RelTag,
1729 6 : blknum: BlockNumber,
1730 6 : rec: NeonWalRecord,
1731 6 : ) -> Result<(), WalIngestError> {
1732 6 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1733 6 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
1734 6 : Ok(())
1735 6 : }
1736 :
1737 : // Same, but for an SLRU.
1738 4 : pub fn put_slru_wal_record(
1739 4 : &mut self,
1740 4 : kind: SlruKind,
1741 4 : segno: u32,
1742 4 : blknum: BlockNumber,
1743 4 : rec: NeonWalRecord,
1744 4 : ) -> Result<(), WalIngestError> {
1745 4 : if !self.tline.tenant_shard_id.is_shard_zero() {
1746 0 : return Ok(());
1747 4 : }
1748 4 :
1749 4 : self.put(
1750 4 : slru_block_to_key(kind, segno, blknum),
1751 4 : Value::WalRecord(rec),
1752 4 : );
1753 4 : Ok(())
1754 4 : }
1755 :
1756 : /// Like put_wal_record, but with ready-made image of the page.
1757 138921 : pub fn put_rel_page_image(
1758 138921 : &mut self,
1759 138921 : rel: RelTag,
1760 138921 : blknum: BlockNumber,
1761 138921 : img: Bytes,
1762 138921 : ) -> Result<(), WalIngestError> {
1763 138921 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1764 138921 : let key = rel_block_to_key(rel, blknum);
1765 138921 : if !key.is_valid_key_on_write_path() {
1766 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1767 138921 : }
1768 138921 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
1769 138921 : Ok(())
1770 138921 : }
1771 :
1772 3 : pub fn put_slru_page_image(
1773 3 : &mut self,
1774 3 : kind: SlruKind,
1775 3 : segno: u32,
1776 3 : blknum: BlockNumber,
1777 3 : img: Bytes,
1778 3 : ) -> Result<(), WalIngestError> {
1779 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1780 :
1781 3 : let key = slru_block_to_key(kind, segno, blknum);
1782 3 : if !key.is_valid_key_on_write_path() {
1783 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1784 3 : }
1785 3 : self.put(key, Value::Image(img));
1786 3 : Ok(())
1787 3 : }
1788 :
1789 1499 : pub(crate) fn put_rel_page_image_zero(
1790 1499 : &mut self,
1791 1499 : rel: RelTag,
1792 1499 : blknum: BlockNumber,
1793 1499 : ) -> Result<(), WalIngestError> {
1794 1499 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
1795 1499 : let key = rel_block_to_key(rel, blknum);
1796 1499 : if !key.is_valid_key_on_write_path() {
1797 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1798 1499 : }
1799 :
1800 1499 : let batch = self
1801 1499 : .pending_data_batch
1802 1499 : .get_or_insert_with(SerializedValueBatch::default);
1803 1499 :
1804 1499 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1805 1499 :
1806 1499 : Ok(())
1807 1499 : }
1808 :
1809 0 : pub(crate) fn put_slru_page_image_zero(
1810 0 : &mut self,
1811 0 : kind: SlruKind,
1812 0 : segno: u32,
1813 0 : blknum: BlockNumber,
1814 0 : ) -> Result<(), WalIngestError> {
1815 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
1816 0 : let key = slru_block_to_key(kind, segno, blknum);
1817 0 : if !key.is_valid_key_on_write_path() {
1818 0 : Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
1819 0 : }
1820 :
1821 0 : let batch = self
1822 0 : .pending_data_batch
1823 0 : .get_or_insert_with(SerializedValueBatch::default);
1824 0 :
1825 0 : batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
1826 0 :
1827 0 : Ok(())
1828 0 : }
1829 :
1830 : /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
1831 : /// we enable it, we also need to persist it in `index_part.json`.
1832 973 : pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
1833 973 : let status = self.tline.get_rel_size_v2_status();
1834 973 : let config = self.tline.get_rel_size_v2_enabled();
1835 973 : match (config, status) {
1836 : (false, RelSizeMigration::Legacy) => {
1837 : // tenant config didn't enable it and we didn't write any reldir_v2 key yet
1838 973 : Ok(false)
1839 : }
1840 : (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1841 : // index_part already persisted that the timeline has enabled rel_size_v2
1842 0 : Ok(true)
1843 : }
1844 : (true, RelSizeMigration::Legacy) => {
1845 : // The first time we enable it, we need to persist it in `index_part.json`
1846 0 : self.tline
1847 0 : .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
1848 0 : tracing::info!("enabled rel_size_v2");
1849 0 : Ok(true)
1850 : }
1851 : (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
1852 : // index_part already persisted that the timeline has enabled rel_size_v2
1853 : // and we don't need to do anything
1854 0 : Ok(true)
1855 : }
1856 : }
1857 973 : }
1858 :
1859 : /// Store a relmapper file (pg_filenode.map) in the repository
1860 8 : pub async fn put_relmap_file(
1861 8 : &mut self,
1862 8 : spcnode: Oid,
1863 8 : dbnode: Oid,
1864 8 : img: Bytes,
1865 8 : ctx: &RequestContext,
1866 8 : ) -> Result<(), WalIngestError> {
1867 8 : let v2_enabled = self
1868 8 : .maybe_enable_rel_size_v2()
1869 8 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
1870 :
1871 : // Add it to the directory (if it doesn't exist already)
1872 8 : let buf = self.get(DBDIR_KEY, ctx).await?;
1873 8 : let mut dbdir = DbDirectory::des(&buf)?;
1874 :
1875 8 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
1876 8 : if r.is_none() || r == Some(false) {
1877 : // The dbdir entry didn't exist, or it contained a
1878 : // 'false'. The 'insert' call already updated it with
1879 : // 'true', now write the updated 'dbdirs' map back.
1880 8 : let buf = DbDirectory::ser(&dbdir)?;
1881 8 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1882 0 : }
1883 8 : if r.is_none() {
1884 : // Create RelDirectory
1885 : // TODO: if we have fully migrated to v2, no need to create this directory
1886 4 : let buf = RelDirectory::ser(&RelDirectory {
1887 4 : rels: HashSet::new(),
1888 4 : })?;
1889 4 : self.pending_directory_entries
1890 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
1891 4 : if v2_enabled {
1892 0 : self.pending_directory_entries
1893 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
1894 4 : }
1895 4 : self.put(
1896 4 : rel_dir_to_key(spcnode, dbnode),
1897 4 : Value::Image(Bytes::from(buf)),
1898 4 : );
1899 4 : }
1900 :
1901 8 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
1902 8 : Ok(())
1903 8 : }
1904 :
1905 0 : pub async fn put_twophase_file(
1906 0 : &mut self,
1907 0 : xid: u64,
1908 0 : img: Bytes,
1909 0 : ctx: &RequestContext,
1910 0 : ) -> Result<(), WalIngestError> {
1911 : // Add it to the directory entry
1912 0 : let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1913 0 : let newdirbuf = if self.tline.pg_version >= 17 {
1914 0 : let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
1915 0 : if !dir.xids.insert(xid) {
1916 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
1917 0 : }
1918 0 : self.pending_directory_entries.push((
1919 0 : DirectoryKind::TwoPhase,
1920 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1921 0 : ));
1922 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
1923 : } else {
1924 0 : let xid = xid as u32;
1925 0 : let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
1926 0 : if !dir.xids.insert(xid) {
1927 0 : Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
1928 0 : }
1929 0 : self.pending_directory_entries.push((
1930 0 : DirectoryKind::TwoPhase,
1931 0 : MetricsUpdate::Set(dir.xids.len() as u64),
1932 0 : ));
1933 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
1934 : };
1935 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
1936 0 :
1937 0 : self.put(twophase_file_key(xid), Value::Image(img));
1938 0 : Ok(())
1939 0 : }
1940 :
1941 1 : pub async fn set_replorigin(
1942 1 : &mut self,
1943 1 : origin_id: RepOriginId,
1944 1 : origin_lsn: Lsn,
1945 1 : ) -> Result<(), WalIngestError> {
1946 1 : let key = repl_origin_key(origin_id);
1947 1 : self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
1948 1 : Ok(())
1949 1 : }
1950 :
1951 0 : pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
1952 0 : self.set_replorigin(origin_id, Lsn::INVALID).await
1953 0 : }
1954 :
1955 110 : pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
1956 110 : self.put(CONTROLFILE_KEY, Value::Image(img));
1957 110 : Ok(())
1958 110 : }
1959 :
1960 117 : pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
1961 117 : self.put(CHECKPOINT_KEY, Value::Image(img));
1962 117 : Ok(())
1963 117 : }
1964 :
1965 0 : pub async fn drop_dbdir(
1966 0 : &mut self,
1967 0 : spcnode: Oid,
1968 0 : dbnode: Oid,
1969 0 : ctx: &RequestContext,
1970 0 : ) -> Result<(), WalIngestError> {
1971 0 : let total_blocks = self
1972 0 : .tline
1973 0 : .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
1974 0 : .await?;
1975 :
1976 : // Remove entry from dbdir
1977 0 : let buf = self.get(DBDIR_KEY, ctx).await?;
1978 0 : let mut dir = DbDirectory::des(&buf)?;
1979 0 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
1980 0 : let buf = DbDirectory::ser(&dir)?;
1981 0 : self.pending_directory_entries.push((
1982 0 : DirectoryKind::Db,
1983 0 : MetricsUpdate::Set(dir.dbdirs.len() as u64),
1984 0 : ));
1985 0 : self.put(DBDIR_KEY, Value::Image(buf.into()));
1986 : } else {
1987 0 : warn!(
1988 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
1989 : spcnode, dbnode
1990 : );
1991 : }
1992 :
1993 : // Update logical database size.
1994 0 : self.pending_nblocks -= total_blocks as i64;
1995 0 :
1996 0 : // Delete all relations and metadata files for the spcnode/dnode
1997 0 : self.delete(dbdir_key_range(spcnode, dbnode));
1998 0 : Ok(())
1999 0 : }
2000 :
2001 : /// Create a relation fork.
2002 : ///
2003 : /// 'nblocks' is the initial size.
2004 960 : pub async fn put_rel_creation(
2005 960 : &mut self,
2006 960 : rel: RelTag,
2007 960 : nblocks: BlockNumber,
2008 960 : ctx: &RequestContext,
2009 960 : ) -> Result<(), WalIngestError> {
2010 960 : if rel.relnode == 0 {
2011 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
2012 0 : "invalid relnode"
2013 0 : )))?;
2014 960 : }
2015 : // It's possible that this is the first rel for this db in this
2016 : // tablespace. Create the reldir entry for it if so.
2017 960 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
2018 :
2019 960 : let dbdir_exists =
2020 960 : if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
2021 : // Didn't exist. Update dbdir
2022 4 : e.insert(false);
2023 4 : let buf = DbDirectory::ser(&dbdir)?;
2024 4 : self.pending_directory_entries.push((
2025 4 : DirectoryKind::Db,
2026 4 : MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
2027 4 : ));
2028 4 : self.put(DBDIR_KEY, Value::Image(buf.into()));
2029 4 : false
2030 : } else {
2031 956 : true
2032 : };
2033 :
2034 960 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
2035 960 : let mut rel_dir = if !dbdir_exists {
2036 : // Create the RelDirectory
2037 4 : RelDirectory::default()
2038 : } else {
2039 : // reldir already exists, fetch it
2040 956 : RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
2041 : };
2042 :
2043 960 : let v2_enabled = self
2044 960 : .maybe_enable_rel_size_v2()
2045 960 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2046 :
2047 960 : if v2_enabled {
2048 0 : if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
2049 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2050 0 : }
2051 0 : let sparse_rel_dir_key =
2052 0 : rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
2053 : // check if the rel_dir_key exists in v2
2054 0 : let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
2055 0 : let val = RelDirExists::decode_option(val)
2056 0 : .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
2057 0 : if val == RelDirExists::Exists {
2058 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2059 0 : }
2060 0 : self.put(
2061 0 : sparse_rel_dir_key,
2062 0 : Value::Image(RelDirExists::Exists.encode()),
2063 0 : );
2064 0 : if !dbdir_exists {
2065 0 : self.pending_directory_entries
2066 0 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
2067 0 : self.pending_directory_entries
2068 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
2069 0 : // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
2070 0 : // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
2071 0 : // will be key not found errors if we don't create an empty one for rel_size_v2.
2072 0 : self.put(
2073 0 : rel_dir_key,
2074 0 : Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
2075 : );
2076 0 : }
2077 0 : self.pending_directory_entries
2078 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
2079 : } else {
2080 : // Add the new relation to the rel directory entry, and write it back
2081 960 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
2082 0 : Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
2083 960 : }
2084 960 : if !dbdir_exists {
2085 4 : self.pending_directory_entries
2086 4 : .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
2087 956 : }
2088 960 : self.pending_directory_entries
2089 960 : .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
2090 960 : self.put(
2091 960 : rel_dir_key,
2092 960 : Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
2093 : );
2094 : }
2095 :
2096 : // Put size
2097 960 : let size_key = rel_size_to_key(rel);
2098 960 : let buf = nblocks.to_le_bytes();
2099 960 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2100 960 :
2101 960 : self.pending_nblocks += nblocks as i64;
2102 960 :
2103 960 : // Update relation size cache
2104 960 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2105 960 :
2106 960 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
2107 960 : // caller.
2108 960 : Ok(())
2109 960 : }
2110 :
2111 : /// Truncate relation
2112 3006 : pub async fn put_rel_truncation(
2113 3006 : &mut self,
2114 3006 : rel: RelTag,
2115 3006 : nblocks: BlockNumber,
2116 3006 : ctx: &RequestContext,
2117 3006 : ) -> Result<(), WalIngestError> {
2118 3006 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2119 3006 : if self
2120 3006 : .tline
2121 3006 : .get_rel_exists(rel, Version::Modified(self), ctx)
2122 3006 : .await?
2123 : {
2124 3006 : let size_key = rel_size_to_key(rel);
2125 : // Fetch the old size first
2126 3006 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2127 3006 :
2128 3006 : // Update the entry with the new size.
2129 3006 : let buf = nblocks.to_le_bytes();
2130 3006 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2131 3006 :
2132 3006 : // Update relation size cache
2133 3006 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2134 3006 :
2135 3006 : // Update logical database size.
2136 3006 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
2137 0 : }
2138 3006 : Ok(())
2139 3006 : }
2140 :
2141 : /// Extend relation
2142 : /// If new size is smaller, do nothing.
2143 138340 : pub async fn put_rel_extend(
2144 138340 : &mut self,
2145 138340 : rel: RelTag,
2146 138340 : nblocks: BlockNumber,
2147 138340 : ctx: &RequestContext,
2148 138340 : ) -> Result<(), WalIngestError> {
2149 138340 : ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
2150 :
2151 : // Put size
2152 138340 : let size_key = rel_size_to_key(rel);
2153 138340 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2154 138340 :
2155 138340 : // only extend relation here. never decrease the size
2156 138340 : if nblocks > old_size {
2157 137394 : let buf = nblocks.to_le_bytes();
2158 137394 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2159 137394 :
2160 137394 : // Update relation size cache
2161 137394 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
2162 137394 :
2163 137394 : self.pending_nblocks += nblocks as i64 - old_size as i64;
2164 137394 : }
2165 138340 : Ok(())
2166 138340 : }
2167 :
2168 : /// Drop some relations
2169 5 : pub(crate) async fn put_rel_drops(
2170 5 : &mut self,
2171 5 : drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
2172 5 : ctx: &RequestContext,
2173 5 : ) -> Result<(), WalIngestError> {
2174 5 : let v2_enabled = self
2175 5 : .maybe_enable_rel_size_v2()
2176 5 : .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
2177 6 : for ((spc_node, db_node), rel_tags) in drop_relations {
2178 1 : let dir_key = rel_dir_to_key(spc_node, db_node);
2179 1 : let buf = self.get(dir_key, ctx).await?;
2180 1 : let mut dir = RelDirectory::des(&buf)?;
2181 :
2182 1 : let mut dirty = false;
2183 2 : for rel_tag in rel_tags {
2184 1 : let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
2185 1 : self.pending_directory_entries
2186 1 : .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
2187 1 : dirty = true;
2188 1 : true
2189 0 : } else if v2_enabled {
2190 : // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
2191 : // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
2192 : // logic).
2193 0 : let key =
2194 0 : rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
2195 0 : let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
2196 0 : .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
2197 0 : if val == RelDirExists::Exists {
2198 0 : self.pending_directory_entries
2199 0 : .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
2200 0 : // put tombstone
2201 0 : self.put(key, Value::Image(RelDirExists::Removed.encode()));
2202 0 : // no need to set dirty to true
2203 0 : true
2204 : } else {
2205 0 : false
2206 : }
2207 : } else {
2208 0 : false
2209 : };
2210 :
2211 1 : if found {
2212 : // update logical size
2213 1 : let size_key = rel_size_to_key(rel_tag);
2214 1 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
2215 1 : self.pending_nblocks -= old_size as i64;
2216 1 :
2217 1 : // Remove entry from relation size cache
2218 1 : self.tline.remove_cached_rel_size(&rel_tag);
2219 1 :
2220 1 : // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
2221 1 : self.delete(rel_key_range(rel_tag));
2222 0 : }
2223 : }
2224 :
2225 1 : if dirty {
2226 1 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
2227 0 : }
2228 : }
2229 :
2230 5 : Ok(())
2231 5 : }
2232 :
2233 3 : pub async fn put_slru_segment_creation(
2234 3 : &mut self,
2235 3 : kind: SlruKind,
2236 3 : segno: u32,
2237 3 : nblocks: BlockNumber,
2238 3 : ctx: &RequestContext,
2239 3 : ) -> Result<(), WalIngestError> {
2240 3 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2241 :
2242 : // Add it to the directory entry
2243 3 : let dir_key = slru_dir_to_key(kind);
2244 3 : let buf = self.get(dir_key, ctx).await?;
2245 3 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2246 :
2247 3 : if !dir.segments.insert(segno) {
2248 0 : Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
2249 3 : }
2250 3 : self.pending_directory_entries.push((
2251 3 : DirectoryKind::SlruSegment(kind),
2252 3 : MetricsUpdate::Set(dir.segments.len() as u64),
2253 3 : ));
2254 3 : self.put(
2255 3 : dir_key,
2256 3 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2257 : );
2258 :
2259 : // Put size
2260 3 : let size_key = slru_segment_size_to_key(kind, segno);
2261 3 : let buf = nblocks.to_le_bytes();
2262 3 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2263 3 :
2264 3 : // even if nblocks > 0, we don't insert any actual blocks here
2265 3 :
2266 3 : Ok(())
2267 3 : }
2268 :
2269 : /// Extend SLRU segment
2270 0 : pub fn put_slru_extend(
2271 0 : &mut self,
2272 0 : kind: SlruKind,
2273 0 : segno: u32,
2274 0 : nblocks: BlockNumber,
2275 0 : ) -> Result<(), WalIngestError> {
2276 0 : assert!(self.tline.tenant_shard_id.is_shard_zero());
2277 :
2278 : // Put size
2279 0 : let size_key = slru_segment_size_to_key(kind, segno);
2280 0 : let buf = nblocks.to_le_bytes();
2281 0 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
2282 0 : Ok(())
2283 0 : }
2284 :
2285 : /// This method is used for marking truncated SLRU files
2286 0 : pub async fn drop_slru_segment(
2287 0 : &mut self,
2288 0 : kind: SlruKind,
2289 0 : segno: u32,
2290 0 : ctx: &RequestContext,
2291 0 : ) -> Result<(), WalIngestError> {
2292 0 : // Remove it from the directory entry
2293 0 : let dir_key = slru_dir_to_key(kind);
2294 0 : let buf = self.get(dir_key, ctx).await?;
2295 0 : let mut dir = SlruSegmentDirectory::des(&buf)?;
2296 :
2297 0 : if !dir.segments.remove(&segno) {
2298 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
2299 0 : }
2300 0 : self.pending_directory_entries.push((
2301 0 : DirectoryKind::SlruSegment(kind),
2302 0 : MetricsUpdate::Set(dir.segments.len() as u64),
2303 0 : ));
2304 0 : self.put(
2305 0 : dir_key,
2306 0 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
2307 : );
2308 :
2309 : // Delete size entry, as well as all blocks
2310 0 : self.delete(slru_segment_key_range(kind, segno));
2311 0 :
2312 0 : Ok(())
2313 0 : }
2314 :
2315 : /// Drop a relmapper file (pg_filenode.map)
2316 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
2317 0 : // TODO
2318 0 : Ok(())
2319 0 : }
2320 :
2321 : /// This method is used for marking truncated SLRU files
2322 0 : pub async fn drop_twophase_file(
2323 0 : &mut self,
2324 0 : xid: u64,
2325 0 : ctx: &RequestContext,
2326 0 : ) -> Result<(), WalIngestError> {
2327 : // Remove it from the directory entry
2328 0 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
2329 0 : let newdirbuf = if self.tline.pg_version >= 17 {
2330 0 : let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
2331 :
2332 0 : if !dir.xids.remove(&xid) {
2333 0 : warn!("twophase file for xid {} does not exist", xid);
2334 0 : }
2335 0 : self.pending_directory_entries.push((
2336 0 : DirectoryKind::TwoPhase,
2337 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2338 0 : ));
2339 0 : Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
2340 : } else {
2341 0 : let xid: u32 = u32::try_from(xid)
2342 0 : .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
2343 0 : let mut dir = TwoPhaseDirectory::des(&buf)?;
2344 :
2345 0 : if !dir.xids.remove(&xid) {
2346 0 : warn!("twophase file for xid {} does not exist", xid);
2347 0 : }
2348 0 : self.pending_directory_entries.push((
2349 0 : DirectoryKind::TwoPhase,
2350 0 : MetricsUpdate::Set(dir.xids.len() as u64),
2351 0 : ));
2352 0 : Bytes::from(TwoPhaseDirectory::ser(&dir)?)
2353 : };
2354 0 : self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
2355 0 :
2356 0 : // Delete it
2357 0 : self.delete(twophase_key_range(xid));
2358 0 :
2359 0 : Ok(())
2360 0 : }
2361 :
2362 8 : pub async fn put_file(
2363 8 : &mut self,
2364 8 : path: &str,
2365 8 : content: &[u8],
2366 8 : ctx: &RequestContext,
2367 8 : ) -> Result<(), WalIngestError> {
2368 8 : let key = aux_file::encode_aux_file_key(path);
2369 : // retrieve the key from the engine
2370 8 : let old_val = match self.get(key, ctx).await {
2371 2 : Ok(val) => Some(val),
2372 6 : Err(PageReconstructError::MissingKey(_)) => None,
2373 0 : Err(e) => return Err(e.into()),
2374 : };
2375 8 : let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
2376 2 : aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
2377 : } else {
2378 6 : Vec::new()
2379 : };
2380 8 : let mut other_files = Vec::with_capacity(files.len());
2381 8 : let mut modifying_file = None;
2382 10 : for file @ (p, content) in files {
2383 2 : if path == p {
2384 2 : assert!(
2385 2 : modifying_file.is_none(),
2386 0 : "duplicated entries found for {}",
2387 : path
2388 : );
2389 2 : modifying_file = Some(content);
2390 0 : } else {
2391 0 : other_files.push(file);
2392 0 : }
2393 : }
2394 8 : let mut new_files = other_files;
2395 8 : match (modifying_file, content.is_empty()) {
2396 1 : (Some(old_content), false) => {
2397 1 : self.tline
2398 1 : .aux_file_size_estimator
2399 1 : .on_update(old_content.len(), content.len());
2400 1 : new_files.push((path, content));
2401 1 : }
2402 1 : (Some(old_content), true) => {
2403 1 : self.tline
2404 1 : .aux_file_size_estimator
2405 1 : .on_remove(old_content.len());
2406 1 : // not adding the file key to the final `new_files` vec.
2407 1 : }
2408 6 : (None, false) => {
2409 6 : self.tline.aux_file_size_estimator.on_add(content.len());
2410 6 : new_files.push((path, content));
2411 6 : }
2412 : // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
2413 : // Compute doesn't know if previous version of this file exists or not, so
2414 : // attempt to delete non-existing file can cause this message.
2415 : // To avoid false alarms, log it as info rather than warning.
2416 0 : (None, true) if path.starts_with("pg_stat/") => {
2417 0 : info!("removing non-existing pg_stat file: {}", path)
2418 : }
2419 0 : (None, true) => warn!("removing non-existing aux file: {}", path),
2420 : }
2421 8 : let new_val = aux_file::encode_file_value(&new_files)
2422 8 : .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
2423 8 : self.put(key, Value::Image(new_val.into()));
2424 8 :
2425 8 : Ok(())
2426 8 : }
2427 :
2428 : ///
2429 : /// Flush changes accumulated so far to the underlying repository.
2430 : ///
2431 : /// Usually, changes made in DatadirModification are atomic, but this allows
2432 : /// you to flush them to the underlying repository before the final `commit`.
2433 : /// That allows to free up the memory used to hold the pending changes.
2434 : ///
2435 : /// Currently only used during bulk import of a data directory. In that
2436 : /// context, breaking the atomicity is OK. If the import is interrupted, the
2437 : /// whole import fails and the timeline will be deleted anyway.
2438 : /// (Or to be precise, it will be left behind for debugging purposes and
2439 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
2440 : ///
2441 : /// Note: A consequence of flushing the pending operations is that they
2442 : /// won't be visible to subsequent operations until `commit`. The function
2443 : /// retains all the metadata, but data pages are flushed. That's again OK
2444 : /// for bulk import, where you are just loading data pages and won't try to
2445 : /// modify the same pages twice.
2446 965 : pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2447 965 : // Unless we have accumulated a decent amount of changes, it's not worth it
2448 965 : // to scan through the pending_updates list.
2449 965 : let pending_nblocks = self.pending_nblocks;
2450 965 : if pending_nblocks < 10000 {
2451 965 : return Ok(());
2452 0 : }
2453 :
2454 0 : let mut writer = self.tline.writer().await;
2455 :
2456 : // Flush relation and SLRU data blocks, keep metadata.
2457 0 : if let Some(batch) = self.pending_data_batch.take() {
2458 0 : tracing::debug!(
2459 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2460 0 : batch.max_lsn,
2461 0 : self.tline.get_last_record_lsn()
2462 : );
2463 :
2464 : // This bails out on first error without modifying pending_updates.
2465 : // That's Ok, cf this function's doc comment.
2466 0 : writer.put_batch(batch, ctx).await?;
2467 0 : }
2468 :
2469 0 : if pending_nblocks != 0 {
2470 0 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2471 0 : self.pending_nblocks = 0;
2472 0 : }
2473 :
2474 0 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2475 0 : writer.update_directory_entries_count(kind, count);
2476 0 : }
2477 :
2478 0 : Ok(())
2479 965 : }
2480 :
2481 : ///
2482 : /// Finish this atomic update, writing all the updated keys to the
2483 : /// underlying timeline.
2484 : /// All the modifications in this atomic update are stamped by the specified LSN.
2485 : ///
2486 371555 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
2487 371555 : let mut writer = self.tline.writer().await;
2488 :
2489 371555 : let pending_nblocks = self.pending_nblocks;
2490 371555 : self.pending_nblocks = 0;
2491 :
2492 : // Ordering: the items in this batch do not need to be in any global order, but values for
2493 : // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
2494 : // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for
2495 : // more details.
2496 :
2497 371555 : let metadata_batch = {
2498 371555 : let pending_meta = self
2499 371555 : .pending_metadata_pages
2500 371555 : .drain()
2501 371555 : .flat_map(|(key, values)| {
2502 137054 : values
2503 137054 : .into_iter()
2504 137054 : .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
2505 371555 : })
2506 371555 : .collect::<Vec<_>>();
2507 371555 :
2508 371555 : if pending_meta.is_empty() {
2509 236139 : None
2510 : } else {
2511 135416 : Some(SerializedValueBatch::from_values(pending_meta))
2512 : }
2513 : };
2514 :
2515 371555 : let data_batch = self.pending_data_batch.take();
2516 :
2517 371555 : let maybe_batch = match (data_batch, metadata_batch) {
2518 132278 : (Some(mut data), Some(metadata)) => {
2519 132278 : data.extend(metadata);
2520 132278 : Some(data)
2521 : }
2522 71631 : (Some(data), None) => Some(data),
2523 3138 : (None, Some(metadata)) => Some(metadata),
2524 164508 : (None, None) => None,
2525 : };
2526 :
2527 371555 : if let Some(batch) = maybe_batch {
2528 207047 : tracing::debug!(
2529 0 : "Flushing batch with max_lsn={}. Last record LSN is {}",
2530 0 : batch.max_lsn,
2531 0 : self.tline.get_last_record_lsn()
2532 : );
2533 :
2534 : // This bails out on first error without modifying pending_updates.
2535 : // That's Ok, cf this function's doc comment.
2536 207047 : writer.put_batch(batch, ctx).await?;
2537 164508 : }
2538 :
2539 371555 : if !self.pending_deletions.is_empty() {
2540 1 : writer.delete_batch(&self.pending_deletions, ctx).await?;
2541 1 : self.pending_deletions.clear();
2542 371554 : }
2543 :
2544 371555 : self.pending_lsns.push(self.lsn);
2545 444484 : for pending_lsn in self.pending_lsns.drain(..) {
2546 444484 : // TODO(vlad): pretty sure the comment below is not valid anymore
2547 444484 : // and we can call finish write with the latest LSN
2548 444484 : //
2549 444484 : // Ideally, we should be able to call writer.finish_write() only once
2550 444484 : // with the highest LSN. However, the last_record_lsn variable in the
2551 444484 : // timeline keeps track of the latest LSN and the immediate previous LSN
2552 444484 : // so we need to record every LSN to not leave a gap between them.
2553 444484 : writer.finish_write(pending_lsn);
2554 444484 : }
2555 :
2556 371555 : if pending_nblocks != 0 {
2557 135285 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
2558 236270 : }
2559 :
2560 371555 : for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
2561 1517 : writer.update_directory_entries_count(kind, count);
2562 1517 : }
2563 :
2564 371555 : self.pending_metadata_bytes = 0;
2565 371555 :
2566 371555 : Ok(())
2567 371555 : }
2568 :
2569 145852 : pub(crate) fn len(&self) -> usize {
2570 145852 : self.pending_metadata_pages.len()
2571 145852 : + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
2572 145852 : + self.pending_deletions.len()
2573 145852 : }
2574 :
2575 : /// Read a page from the Timeline we are writing to. For metadata pages, this passes through
2576 : /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
2577 : /// in the modification.
2578 : ///
2579 : /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
2580 : /// page must ensure that the pages they read are already committed in Timeline, for example
2581 : /// DB create operations are always preceded by a call to commit(). This is special cased because
2582 : /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
2583 : /// and not data pages.
2584 143293 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
2585 143293 : if !Self::is_data_key(&key) {
2586 : // Have we already updated the same key? Read the latest pending updated
2587 : // version in that case.
2588 : //
2589 : // Note: we don't check pending_deletions. It is an error to request a
2590 : // value that has been removed, deletion only avoids leaking storage.
2591 143293 : if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
2592 7964 : if let Some((_, _, value)) = values.last() {
2593 7964 : return if let Value::Image(img) = value {
2594 7964 : Ok(img.clone())
2595 : } else {
2596 : // Currently, we never need to read back a WAL record that we
2597 : // inserted in the same "transaction". All the metadata updates
2598 : // work directly with Images, and we never need to read actual
2599 : // data pages. We could handle this if we had to, by calling
2600 : // the walredo manager, but let's keep it simple for now.
2601 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
2602 0 : "unexpected pending WAL record"
2603 0 : )))
2604 : };
2605 0 : }
2606 135329 : }
2607 : } else {
2608 : // This is an expensive check, so we only do it in debug mode. If reading a data key,
2609 : // this key should never be present in pending_data_pages. We ensure this by committing
2610 : // modifications before ingesting DB create operations, which are the only kind that reads
2611 : // data pages during ingest.
2612 0 : if cfg!(debug_assertions) {
2613 0 : assert!(
2614 0 : !self
2615 0 : .pending_data_batch
2616 0 : .as_ref()
2617 0 : .is_some_and(|b| b.updates_key(&key))
2618 0 : );
2619 0 : }
2620 : }
2621 :
2622 : // Metadata page cache miss, or we're reading a data page.
2623 135329 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
2624 135329 : self.tline.get(key, lsn, ctx).await
2625 143293 : }
2626 :
2627 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2628 : /// and the empty value into None.
2629 0 : async fn sparse_get(
2630 0 : &self,
2631 0 : key: Key,
2632 0 : ctx: &RequestContext,
2633 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2634 0 : let val = self.get(key, ctx).await;
2635 0 : match val {
2636 0 : Ok(val) if val.is_empty() => Ok(None),
2637 0 : Ok(val) => Ok(Some(val)),
2638 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2639 0 : Err(e) => Err(e),
2640 : }
2641 0 : }
2642 :
2643 : #[cfg(test)]
2644 2 : pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
2645 2 : self.put(key, val);
2646 2 : }
2647 :
2648 282064 : fn put(&mut self, key: Key, val: Value) {
2649 282064 : if Self::is_data_key(&key) {
2650 138934 : self.put_data(key.to_compact(), val)
2651 : } else {
2652 143130 : self.put_metadata(key.to_compact(), val)
2653 : }
2654 282064 : }
2655 :
2656 138934 : fn put_data(&mut self, key: CompactKey, val: Value) {
2657 138934 : let batch = self
2658 138934 : .pending_data_batch
2659 138934 : .get_or_insert_with(SerializedValueBatch::default);
2660 138934 : batch.put(key, val, self.lsn);
2661 138934 : }
2662 :
2663 143130 : fn put_metadata(&mut self, key: CompactKey, val: Value) {
2664 143130 : let values = self.pending_metadata_pages.entry(key).or_default();
2665 : // Replace the previous value if it exists at the same lsn
2666 143130 : if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
2667 6076 : if *last_lsn == self.lsn {
2668 : // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
2669 6076 : self.pending_metadata_bytes -= *last_value_ser_size;
2670 6076 : *last_value_ser_size = val.serialized_size().unwrap() as usize;
2671 6076 : self.pending_metadata_bytes += *last_value_ser_size;
2672 6076 :
2673 6076 : // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
2674 6076 : // have been generated by synthesized zero page writes prior to the first real write to a page.
2675 6076 : *last_value = val;
2676 6076 : return;
2677 0 : }
2678 137054 : }
2679 :
2680 137054 : let val_serialized_size = val.serialized_size().unwrap() as usize;
2681 137054 : self.pending_metadata_bytes += val_serialized_size;
2682 137054 : values.push((self.lsn, val_serialized_size, val));
2683 137054 :
2684 137054 : if key == CHECKPOINT_KEY.to_compact() {
2685 117 : tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
2686 136937 : }
2687 143130 : }
2688 :
2689 1 : fn delete(&mut self, key_range: Range<Key>) {
2690 1 : trace!("DELETE {}-{}", key_range.start, key_range.end);
2691 1 : self.pending_deletions.push((key_range, self.lsn));
2692 1 : }
2693 : }
2694 :
2695 : /// Statistics for a DatadirModification.
2696 : #[derive(Default)]
2697 : pub struct DatadirModificationStats {
2698 : pub metadata_images: u64,
2699 : pub metadata_deltas: u64,
2700 : pub data_images: u64,
2701 : pub data_deltas: u64,
2702 : }
2703 :
2704 : /// This struct facilitates accessing either a committed key from the timeline at a
2705 : /// specific LSN, or the latest uncommitted key from a pending modification.
2706 : ///
2707 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
2708 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
2709 : /// need to look up the keys in the modification first before looking them up in the
2710 : /// timeline to not miss the latest updates.
2711 : #[derive(Clone, Copy)]
2712 : pub enum Version<'a> {
2713 : LsnRange(LsnRange),
2714 : Modified(&'a DatadirModification<'a>),
2715 : }
2716 :
2717 : impl Version<'_> {
2718 25 : async fn get(
2719 25 : &self,
2720 25 : timeline: &Timeline,
2721 25 : key: Key,
2722 25 : ctx: &RequestContext,
2723 25 : ) -> Result<Bytes, PageReconstructError> {
2724 25 : match self {
2725 15 : Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
2726 10 : Version::Modified(modification) => modification.get(key, ctx).await,
2727 : }
2728 25 : }
2729 :
2730 : /// Get a key from the sparse keyspace. Automatically converts the missing key error
2731 : /// and the empty value into None.
2732 0 : async fn sparse_get(
2733 0 : &self,
2734 0 : timeline: &Timeline,
2735 0 : key: Key,
2736 0 : ctx: &RequestContext,
2737 0 : ) -> Result<Option<Bytes>, PageReconstructError> {
2738 0 : let val = self.get(timeline, key, ctx).await;
2739 0 : match val {
2740 0 : Ok(val) if val.is_empty() => Ok(None),
2741 0 : Ok(val) => Ok(Some(val)),
2742 0 : Err(PageReconstructError::MissingKey(_)) => Ok(None),
2743 0 : Err(e) => Err(e),
2744 : }
2745 0 : }
2746 :
2747 26 : pub fn is_latest(&self) -> bool {
2748 26 : match self {
2749 16 : Version::LsnRange(lsns) => lsns.is_latest(),
2750 10 : Version::Modified(_) => true,
2751 : }
2752 26 : }
2753 :
2754 224275 : pub fn get_lsn(&self) -> Lsn {
2755 224275 : match self {
2756 12224 : Version::LsnRange(lsns) => lsns.effective_lsn,
2757 212051 : Version::Modified(modification) => modification.lsn,
2758 : }
2759 224275 : }
2760 :
2761 12219 : pub fn at(lsn: Lsn) -> Self {
2762 12219 : Version::LsnRange(LsnRange {
2763 12219 : effective_lsn: lsn,
2764 12219 : request_lsn: lsn,
2765 12219 : })
2766 12219 : }
2767 : }
2768 :
2769 : //--- Metadata structs stored in key-value pairs in the repository.
2770 :
2771 0 : #[derive(Debug, Serialize, Deserialize)]
2772 : pub(crate) struct DbDirectory {
2773 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
2774 : pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
2775 : }
2776 :
2777 : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
2778 : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
2779 : // were named like "pg_twophase/000002E5", now they're like
2780 : // "pg_twophsae/0000000A000002E4".
2781 :
2782 0 : #[derive(Debug, Serialize, Deserialize)]
2783 : pub(crate) struct TwoPhaseDirectory {
2784 : pub(crate) xids: HashSet<TransactionId>,
2785 : }
2786 :
2787 0 : #[derive(Debug, Serialize, Deserialize)]
2788 : struct TwoPhaseDirectoryV17 {
2789 : xids: HashSet<u64>,
2790 : }
2791 :
2792 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2793 : pub(crate) struct RelDirectory {
2794 : // Set of relations that exist. (relfilenode, forknum)
2795 : //
2796 : // TODO: Store it as a btree or radix tree or something else that spans multiple
2797 : // key-value pairs, if you have a lot of relations
2798 : pub(crate) rels: HashSet<(Oid, u8)>,
2799 : }
2800 :
2801 0 : #[derive(Debug, Serialize, Deserialize)]
2802 : struct RelSizeEntry {
2803 : nblocks: u32,
2804 : }
2805 :
2806 0 : #[derive(Debug, Serialize, Deserialize, Default)]
2807 : pub(crate) struct SlruSegmentDirectory {
2808 : // Set of SLRU segments that exist.
2809 : pub(crate) segments: HashSet<u32>,
2810 : }
2811 :
2812 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
2813 : #[repr(u8)]
2814 : pub(crate) enum DirectoryKind {
2815 : Db,
2816 : TwoPhase,
2817 : Rel,
2818 : AuxFiles,
2819 : SlruSegment(SlruKind),
2820 : RelV2,
2821 : }
2822 :
2823 : impl DirectoryKind {
2824 : pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
2825 4552 : pub(crate) fn offset(&self) -> usize {
2826 4552 : self.into_usize()
2827 4552 : }
2828 : }
2829 :
2830 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
2831 :
2832 : #[allow(clippy::bool_assert_comparison)]
2833 : #[cfg(test)]
2834 : mod tests {
2835 : use hex_literal::hex;
2836 : use pageserver_api::models::ShardParameters;
2837 : use pageserver_api::shard::ShardStripeSize;
2838 : use utils::id::TimelineId;
2839 : use utils::shard::{ShardCount, ShardNumber};
2840 :
2841 : use super::*;
2842 : use crate::DEFAULT_PG_VERSION;
2843 : use crate::tenant::harness::TenantHarness;
2844 :
2845 : /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
2846 : #[tokio::test]
2847 1 : async fn aux_files_round_trip() -> anyhow::Result<()> {
2848 1 : let name = "aux_files_round_trip";
2849 1 : let harness = TenantHarness::create(name).await?;
2850 1 :
2851 1 : pub const TIMELINE_ID: TimelineId =
2852 1 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
2853 1 :
2854 1 : let (tenant, ctx) = harness.load().await;
2855 1 : let (tline, ctx) = tenant
2856 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2857 1 : .await?;
2858 1 : let tline = tline.raw_timeline().unwrap();
2859 1 :
2860 1 : // First modification: insert two keys
2861 1 : let mut modification = tline.begin_modification(Lsn(0x1000));
2862 1 : modification.put_file("foo/bar1", b"content1", &ctx).await?;
2863 1 : modification.set_lsn(Lsn(0x1008))?;
2864 1 : modification.put_file("foo/bar2", b"content2", &ctx).await?;
2865 1 : modification.commit(&ctx).await?;
2866 1 : let expect_1008 = HashMap::from([
2867 1 : ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
2868 1 : ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
2869 1 : ]);
2870 1 :
2871 1 : let io_concurrency = IoConcurrency::spawn_for_test();
2872 1 :
2873 1 : let readback = tline
2874 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2875 1 : .await?;
2876 1 : assert_eq!(readback, expect_1008);
2877 1 :
2878 1 : // Second modification: update one key, remove the other
2879 1 : let mut modification = tline.begin_modification(Lsn(0x2000));
2880 1 : modification.put_file("foo/bar1", b"content3", &ctx).await?;
2881 1 : modification.set_lsn(Lsn(0x2008))?;
2882 1 : modification.put_file("foo/bar2", b"", &ctx).await?;
2883 1 : modification.commit(&ctx).await?;
2884 1 : let expect_2008 =
2885 1 : HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
2886 1 :
2887 1 : let readback = tline
2888 1 : .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
2889 1 : .await?;
2890 1 : assert_eq!(readback, expect_2008);
2891 1 :
2892 1 : // Reading back in time works
2893 1 : let readback = tline
2894 1 : .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
2895 1 : .await?;
2896 1 : assert_eq!(readback, expect_1008);
2897 1 :
2898 1 : Ok(())
2899 1 : }
2900 :
2901 : #[test]
2902 1 : fn gap_finding() {
2903 1 : let rel = RelTag {
2904 1 : spcnode: 1663,
2905 1 : dbnode: 208101,
2906 1 : relnode: 2620,
2907 1 : forknum: 0,
2908 1 : };
2909 1 : let base_blkno = 1;
2910 1 :
2911 1 : let base_key = rel_block_to_key(rel, base_blkno);
2912 1 : let before_base_key = rel_block_to_key(rel, base_blkno - 1);
2913 1 :
2914 1 : let shard = ShardIdentity::unsharded();
2915 1 :
2916 1 : let mut previous_nblocks = 0;
2917 11 : for i in 0..10 {
2918 10 : let crnt_blkno = base_blkno + i;
2919 10 : let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
2920 10 :
2921 10 : previous_nblocks = crnt_blkno + 1;
2922 10 :
2923 10 : if i == 0 {
2924 : // The first block we write is 1, so we should find the gap.
2925 1 : assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
2926 : } else {
2927 9 : assert!(gaps.is_none());
2928 : }
2929 : }
2930 :
2931 : // This is an update to an already existing block. No gaps here.
2932 1 : let update_blkno = 5;
2933 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2934 1 : assert!(gaps.is_none());
2935 :
2936 : // This is an update past the current end block.
2937 1 : let after_gap_blkno = 20;
2938 1 : let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
2939 1 :
2940 1 : let gap_start_key = rel_block_to_key(rel, previous_nblocks);
2941 1 : let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
2942 1 : assert_eq!(
2943 1 : gaps.unwrap(),
2944 1 : KeySpace::single(gap_start_key..after_gap_key)
2945 1 : );
2946 1 : }
2947 :
2948 : #[test]
2949 1 : fn sharded_gap_finding() {
2950 1 : let rel = RelTag {
2951 1 : spcnode: 1663,
2952 1 : dbnode: 208101,
2953 1 : relnode: 2620,
2954 1 : forknum: 0,
2955 1 : };
2956 1 :
2957 1 : let first_blkno = 6;
2958 1 :
2959 1 : // This shard will get the even blocks
2960 1 : let shard = ShardIdentity::from_params(
2961 1 : ShardNumber(0),
2962 1 : &ShardParameters {
2963 1 : count: ShardCount(2),
2964 1 : stripe_size: ShardStripeSize(1),
2965 1 : },
2966 1 : );
2967 1 :
2968 1 : // Only keys belonging to this shard are considered as gaps.
2969 1 : let mut previous_nblocks = 0;
2970 1 : let gaps =
2971 1 : DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
2972 1 : assert!(!gaps.ranges.is_empty());
2973 3 : for gap_range in gaps.ranges {
2974 2 : let mut k = gap_range.start;
2975 4 : while k != gap_range.end {
2976 2 : assert_eq!(shard.get_shard_number(&k), shard.number);
2977 2 : k = k.next();
2978 : }
2979 : }
2980 :
2981 1 : previous_nblocks = first_blkno;
2982 1 :
2983 1 : let update_blkno = 2;
2984 1 : let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
2985 1 : assert!(gaps.is_none());
2986 1 : }
2987 :
2988 : /*
2989 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
2990 : let incremental = timeline.get_current_logical_size();
2991 : let non_incremental = timeline
2992 : .get_current_logical_size_non_incremental(lsn)
2993 : .unwrap();
2994 : assert_eq!(incremental, non_incremental);
2995 : }
2996 : */
2997 :
2998 : /*
2999 : ///
3000 : /// Test list_rels() function, with branches and dropped relations
3001 : ///
3002 : #[test]
3003 : fn test_list_rels_drop() -> Result<()> {
3004 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
3005 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
3006 : const TESTDB: u32 = 111;
3007 :
3008 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
3009 : // after branching fails below
3010 : let mut writer = tline.begin_record(Lsn(0x10));
3011 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
3012 : writer.finish()?;
3013 :
3014 : // Create a relation on the timeline
3015 : let mut writer = tline.begin_record(Lsn(0x20));
3016 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
3017 : writer.finish()?;
3018 :
3019 : let writer = tline.begin_record(Lsn(0x00));
3020 : writer.finish()?;
3021 :
3022 : // Check that list_rels() lists it after LSN 2, but no before it
3023 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
3024 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
3025 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
3026 :
3027 : // Create a branch, check that the relation is visible there
3028 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
3029 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
3030 : Some(timeline) => timeline,
3031 : None => panic!("Should have a local timeline"),
3032 : };
3033 : let newtline = DatadirTimelineImpl::new(newtline);
3034 : assert!(newtline
3035 : .list_rels(0, TESTDB, Lsn(0x30))?
3036 : .contains(&TESTREL_A));
3037 :
3038 : // Drop it on the branch
3039 : let mut new_writer = newtline.begin_record(Lsn(0x40));
3040 : new_writer.drop_relation(TESTREL_A)?;
3041 : new_writer.finish()?;
3042 :
3043 : // Check that it's no longer listed on the branch after the point where it was dropped
3044 : assert!(newtline
3045 : .list_rels(0, TESTDB, Lsn(0x30))?
3046 : .contains(&TESTREL_A));
3047 : assert!(!newtline
3048 : .list_rels(0, TESTDB, Lsn(0x40))?
3049 : .contains(&TESTREL_A));
3050 :
3051 : // Run checkpoint and garbage collection and check that it's still not visible
3052 : newtline.checkpoint(CheckpointConfig::Forced)?;
3053 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
3054 :
3055 : assert!(!newtline
3056 : .list_rels(0, TESTDB, Lsn(0x40))?
3057 : .contains(&TESTREL_A));
3058 :
3059 : Ok(())
3060 : }
3061 : */
3062 :
3063 : /*
3064 : #[test]
3065 : fn test_read_beyond_eof() -> Result<()> {
3066 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
3067 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
3068 :
3069 : make_some_layers(&tline, Lsn(0x20))?;
3070 : let mut writer = tline.begin_record(Lsn(0x60));
3071 : walingest.put_rel_page_image(
3072 : &mut writer,
3073 : TESTREL_A,
3074 : 0,
3075 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
3076 : )?;
3077 : writer.finish()?;
3078 :
3079 : // Test read before rel creation. Should error out.
3080 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
3081 :
3082 : // Read block beyond end of relation at different points in time.
3083 : // These reads should fall into different delta, image, and in-memory layers.
3084 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
3085 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
3086 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
3087 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
3088 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
3089 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
3090 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
3091 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
3092 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
3093 :
3094 : // Test on an in-memory layer with no preceding layer
3095 : let mut writer = tline.begin_record(Lsn(0x70));
3096 : walingest.put_rel_page_image(
3097 : &mut writer,
3098 : TESTREL_B,
3099 : 0,
3100 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
3101 : )?;
3102 : writer.finish()?;
3103 :
3104 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
3105 :
3106 : Ok(())
3107 : }
3108 : */
3109 : }
|