TLA Line data Source code
1 : //!
2 : //! This provides an abstraction to store PostgreSQL relations and other files
3 : //! in the key-value store that implements the Repository interface.
4 : //!
5 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
6 : //! walingest.rs handles a few things like implicit relation creation and extension.
7 : //! Clarify that)
8 : //!
9 : use super::tenant::{PageReconstructError, Timeline};
10 : use crate::context::RequestContext;
11 : use crate::keyspace::{KeySpace, KeySpaceAccum};
12 : use crate::repository::*;
13 : use crate::walrecord::NeonWalRecord;
14 : use anyhow::Context;
15 : use bytes::{Buf, Bytes};
16 : use pageserver_api::reltag::{RelTag, SlruKind};
17 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
18 : use postgres_ffi::BLCKSZ;
19 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
20 : use serde::{Deserialize, Serialize};
21 : use std::collections::{hash_map, HashMap, HashSet};
22 : use std::ops::ControlFlow;
23 : use std::ops::Range;
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::{debug, trace, warn};
26 : use utils::{bin_ser::BeSer, lsn::Lsn};
27 :
28 : /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
29 : pub type BlockNumber = u32;
30 EUB :
31 UIC 0 : #[derive(Debug)]
32 : pub enum LsnForTimestamp {
33 : Present(Lsn),
34 : Future(Lsn),
35 : Past(Lsn),
36 : NoData(Lsn),
37 : }
38 ECB (3) :
39 GIC 3 : #[derive(Debug, thiserror::Error)]
40 : pub enum CalculateLogicalSizeError {
41 : #[error("cancelled")]
42 : Cancelled,
43 : #[error(transparent)]
44 : Other(#[from] anyhow::Error),
45 : }
46 EUB :
47 UIC 0 : #[derive(Debug, thiserror::Error)]
48 : pub enum RelationError {
49 : #[error("Relation Already Exists")]
50 : AlreadyExists,
51 : #[error("invalid relnode")]
52 : InvalidRelnode,
53 : #[error(transparent)]
54 : Other(#[from] anyhow::Error),
55 : }
56 :
57 : ///
58 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
59 : /// and other special kinds of files, in a versioned key-value store. The
60 : /// Timeline struct provides the key-value store.
61 : ///
62 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
63 : /// implementation, and might be moved into a separate struct later.
64 : impl Timeline {
65 : /// Start ingesting a WAL record, or other atomic modification of
66 : /// the timeline.
67 : ///
68 : /// This provides a transaction-like interface to perform a bunch
69 : /// of modifications atomically.
70 : ///
71 : /// To ingest a WAL record, call begin_modification(lsn) to get a
72 : /// DatadirModification object. Use the functions in the object to
73 : /// modify the repository state, updating all the pages and metadata
74 : /// that the WAL record affects. When you're done, call commit() to
75 : /// commit the changes.
76 : ///
77 : /// Lsn stored in modification is advanced by `ingest_record` and
78 : /// is used by `commit()` to update `last_record_lsn`.
79 : ///
80 : /// Calling commit() will flush all the changes and reset the state,
81 : /// so the `DatadirModification` struct can be reused to perform the next modification.
82 : ///
83 : /// Note that any pending modifications you make through the
84 : /// modification object won't be visible to calls to the 'get' and list
85 : /// functions of the timeline until you finish! And if you update the
86 : /// same page twice, the last update wins.
87 ECB (948596) : ///
88 CBC 910264 : pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
89 910264 : where
90 910264 : Self: Sized,
91 910264 : {
92 910264 : DatadirModification {
93 910264 : tline: self,
94 910264 : pending_updates: HashMap::new(),
95 910264 : pending_deletions: Vec::new(),
96 910264 : pending_nblocks: 0,
97 910264 : lsn,
98 910264 : }
99 GIC 910264 : }
100 :
101 : //------------------------------------------------------------------------------
102 : // Public GET functions
103 : //------------------------------------------------------------------------------
104 :
105 ECB (3738587) : /// Look up given page version.
106 CBC 3765663 : pub async fn get_rel_page_at_lsn(
107 3765663 : &self,
108 3765663 : tag: RelTag,
109 3765663 : blknum: BlockNumber,
110 3765663 : lsn: Lsn,
111 3765663 : latest: bool,
112 3765663 : ctx: &RequestContext,
113 3765663 : ) -> Result<Bytes, PageReconstructError> {
114 GBC 3765662 : if tag.relnode == 0 {
115 UBC 0 : return Err(PageReconstructError::Other(
116 0 : RelationError::InvalidRelnode.into(),
117 LBC (3738587) : ));
118 GIC 3765662 : }
119 ECB (3738587) :
120 CBC 3765662 : let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
121 GBC 3765662 : if blknum >= nblocks {
122 UBC 0 : debug!(
123 0 : "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
124 0 : tag, blknum, lsn, nblocks
125 LBC (162580) : );
126 CBC 166513 : return Ok(ZERO_PAGE.clone());
127 3599149 : }
128 3599149 :
129 3599149 : let key = rel_block_to_key(tag, blknum);
130 5291916 : self.get(key, lsn, ctx).await
131 GIC 3765632 : }
132 :
133 ECB (8) : // Get size of a database in blocks
134 CBC 8 : pub async fn get_db_size(
135 8 : &self,
136 8 : spcnode: Oid,
137 8 : dbnode: Oid,
138 8 : lsn: Lsn,
139 8 : latest: bool,
140 8 : ctx: &RequestContext,
141 8 : ) -> Result<usize, PageReconstructError> {
142 GIC 8 : let mut total_blocks = 0;
143 ECB (8) :
144 GIC 8 : let rels = self.list_rels(spcnode, dbnode, lsn, ctx).await?;
145 ECB (2351) :
146 CBC 2351 : for rel in rels {
147 2343 : let n_blocks = self.get_rel_size(rel, lsn, latest, ctx).await?;
148 GIC 2343 : total_blocks += n_blocks as usize;
149 ECB (8) : }
150 CBC 8 : Ok(total_blocks)
151 GIC 8 : }
152 :
153 ECB (73842396) : /// Get size of a relation file
154 CBC 74248149 : pub async fn get_rel_size(
155 74248149 : &self,
156 74248149 : tag: RelTag,
157 74248149 : lsn: Lsn,
158 74248149 : latest: bool,
159 74248149 : ctx: &RequestContext,
160 74248149 : ) -> Result<BlockNumber, PageReconstructError> {
161 GBC 74248056 : if tag.relnode == 0 {
162 UBC 0 : return Err(PageReconstructError::Other(
163 0 : RelationError::InvalidRelnode.into(),
164 LBC (73842336) : ));
165 GIC 74248056 : }
166 ECB (73842336) :
167 CBC 74248056 : if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
168 73989791 : return Ok(nblocks);
169 258265 : }
170 258265 :
171 258265 : if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
172 GIC 7329 : && !self.get_rel_exists(tag, lsn, latest, ctx).await?
173 : {
174 : // FIXME: Postgres sometimes calls smgrcreate() to create
175 : // FSM, and smgrnblocks() on it immediately afterwards,
176 : // without extending it. Tolerate that by claiming that
177 EUB : // any non-existent FSM fork has size 0.
178 LBC (272635) : return Ok(0);
179 CBC 258265 : }
180 258265 :
181 258265 : let key = rel_size_to_key(tag);
182 258265 : let mut buf = self.get(key, lsn, ctx).await?;
183 258263 : let nblocks = buf.get_u32_le();
184 258263 :
185 258263 : if latest {
186 139850 : // Update relation size cache only if "latest" flag is set.
187 139850 : // This flag is set by compute when it is working with most recent version of relation.
188 139850 : // Typically master compute node always set latest=true.
189 139850 : // Please notice, that even if compute node "by mistake" specifies old LSN but set
190 139850 : // latest=true, then it can not cause cache corruption, because with latest=true
191 139850 : // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
192 139850 : // associated with most recent value of LSN.
193 139850 : self.update_cached_rel_size(tag, lsn, nblocks);
194 139850 : }
195 258263 : Ok(nblocks)
196 GIC 74248056 : }
197 :
198 ECB (70195304) : /// Does relation exist?
199 CBC 70574056 : pub async fn get_rel_exists(
200 70574056 : &self,
201 70574056 : tag: RelTag,
202 70574056 : lsn: Lsn,
203 70574056 : _latest: bool,
204 70574056 : ctx: &RequestContext,
205 70574056 : ) -> Result<bool, PageReconstructError> {
206 GBC 70573964 : if tag.relnode == 0 {
207 UBC 0 : return Err(PageReconstructError::Other(
208 0 : RelationError::InvalidRelnode.into(),
209 LBC (70195244) : ));
210 GIC 70573964 : }
211 :
212 ECB (70195244) : // first try to lookup relation in cache
213 CBC 70573964 : if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
214 70470746 : return Ok(true);
215 103218 : }
216 103218 : // fetch directory listing
217 103218 : let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
218 GIC 103218 : let buf = self.get(key, lsn, ctx).await?;
219 ECB (103123) :
220 CBC 103218 : match RelDirectory::des(&buf).context("deserialization failure") {
221 103218 : Ok(dir) => {
222 103218 : let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
223 GIC 103218 : Ok(exists)
224 EUB : }
225 UIC 0 : Err(e) => Err(PageReconstructError::from(e)),
226 ECB (70195244) : }
227 GIC 70573964 : }
228 :
229 ECB (6649) : /// Get a list of all existing relations in given tablespace and database.
230 CBC 6641 : pub async fn list_rels(
231 6641 : &self,
232 6641 : spcnode: Oid,
233 6641 : dbnode: Oid,
234 6641 : lsn: Lsn,
235 6641 : ctx: &RequestContext,
236 6641 : ) -> Result<HashSet<RelTag>, PageReconstructError> {
237 6641 : // fetch directory listing
238 6641 : let key = rel_dir_to_key(spcnode, dbnode);
239 GIC 6641 : let buf = self.get(key, lsn, ctx).await?;
240 ECB (6649) :
241 CBC 6641 : match RelDirectory::des(&buf).context("deserialization failure") {
242 6641 : Ok(dir) => {
243 6641 : let rels: HashSet<RelTag> =
244 1561081 : HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
245 1561081 : spcnode,
246 1561081 : dbnode,
247 1561081 : relnode: *relnode,
248 1561081 : forknum: *forknum,
249 1561081 : }));
250 6641 :
251 GIC 6641 : Ok(rels)
252 EUB : }
253 UIC 0 : Err(e) => Err(PageReconstructError::from(e)),
254 ECB (6649) : }
255 GIC 6641 : }
256 :
257 ECB (5726) : /// Look up given SLRU page version.
258 CBC 5927 : pub async fn get_slru_page_at_lsn(
259 5927 : &self,
260 5927 : kind: SlruKind,
261 5927 : segno: u32,
262 5927 : blknum: BlockNumber,
263 5927 : lsn: Lsn,
264 5927 : ctx: &RequestContext,
265 5927 : ) -> Result<Bytes, PageReconstructError> {
266 5927 : let key = slru_block_to_key(kind, segno, blknum);
267 365153 : self.get(key, lsn, ctx).await
268 GIC 5927 : }
269 :
270 ECB (5633) : /// Get size of an SLRU segment
271 CBC 5869 : pub async fn get_slru_segment_size(
272 5869 : &self,
273 5869 : kind: SlruKind,
274 5869 : segno: u32,
275 5869 : lsn: Lsn,
276 5869 : ctx: &RequestContext,
277 5869 : ) -> Result<BlockNumber, PageReconstructError> {
278 5869 : let key = slru_segment_size_to_key(kind, segno);
279 5869 : let mut buf = self.get(key, lsn, ctx).await?;
280 5869 : Ok(buf.get_u32_le())
281 GIC 5869 : }
282 :
283 ECB (667) : /// Get size of an SLRU segment
284 CBC 667 : pub async fn get_slru_segment_exists(
285 667 : &self,
286 667 : kind: SlruKind,
287 667 : segno: u32,
288 667 : lsn: Lsn,
289 667 : ctx: &RequestContext,
290 667 : ) -> Result<bool, PageReconstructError> {
291 667 : // fetch directory listing
292 667 : let key = slru_dir_to_key(kind);
293 GIC 667 : let buf = self.get(key, lsn, ctx).await?;
294 ECB (667) :
295 CBC 667 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
296 667 : Ok(dir) => {
297 667 : let exists = dir.segments.get(&segno).is_some();
298 GIC 667 : Ok(exists)
299 EUB : }
300 UIC 0 : Err(e) => Err(PageReconstructError::from(e)),
301 ECB (667) : }
302 GIC 667 : }
303 :
304 : /// Locate LSN, such that all transactions that committed before
305 : /// 'search_timestamp' are visible, but nothing newer is.
306 : ///
307 : /// This is not exact. Commit timestamps are not guaranteed to be ordered,
308 : /// so it's not well defined which LSN you get if there were multiple commits
309 : /// "in flight" at that point in time.
310 ECB (181) : ///
311 CBC 195 : pub async fn find_lsn_for_timestamp(
312 195 : &self,
313 195 : search_timestamp: TimestampTz,
314 195 : ctx: &RequestContext,
315 195 : ) -> Result<LsnForTimestamp, PageReconstructError> {
316 195 : let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
317 195 : let min_lsn = *gc_cutoff_lsn_guard;
318 195 : let max_lsn = self.get_last_record_lsn();
319 195 :
320 195 : // LSNs are always 8-byte aligned. low/mid/high represent the
321 195 : // LSN divided by 8.
322 195 : let mut low = min_lsn.0 / 8;
323 195 : let mut high = max_lsn.0 / 8 + 1;
324 195 :
325 195 : let mut found_smaller = false;
326 195 : let mut found_larger = false;
327 GIC 3470 : while low < high {
328 ECB (3058) : // cannot overflow, high and low are both smaller than u64::MAX / 2
329 GIC 3275 : let mid = (high + low) / 2;
330 ECB (3058) :
331 CBC 3275 : let cmp = self
332 3275 : .is_latest_commit_timestamp_ge_than(
333 3275 : search_timestamp,
334 3275 : Lsn(mid * 8),
335 3275 : &mut found_smaller,
336 3275 : &mut found_larger,
337 3275 : ctx,
338 3275 : )
339 GIC 338184 : .await?;
340 ECB (3058) :
341 CBC 3275 : if cmp {
342 896 : high = mid;
343 2379 : } else {
344 2379 : low = mid + 1;
345 GIC 2379 : }
346 ECB (181) : }
347 GIC 195 : match (found_smaller, found_larger) {
348 : (false, false) => {
349 : // This can happen if no commit records have been processed yet, e.g.
350 ECB (21) : // just after importing a cluster.
351 GIC 23 : Ok(LsnForTimestamp::NoData(max_lsn))
352 : }
353 : (true, false) => {
354 ECB (73) : // Didn't find any commit timestamps larger than the request
355 GIC 78 : Ok(LsnForTimestamp::Future(max_lsn))
356 : }
357 : (false, true) => {
358 ECB (12) : // Didn't find any commit timestamps smaller than the request
359 GIC 13 : Ok(LsnForTimestamp::Past(max_lsn))
360 : }
361 : (true, true) => {
362 : // low is the LSN of the first commit record *after* the search_timestamp,
363 : // Back off by one to get to the point just before the commit.
364 : //
365 : // FIXME: it would be better to get the LSN of the previous commit.
366 : // Otherwise, if you restore to the returned LSN, the database will
367 : // include physical changes from later commits that will be marked
368 ECB (75) : // as aborted, and will need to be vacuumed away.
369 GIC 81 : Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
370 : }
371 ECB (181) : }
372 GIC 195 : }
373 :
374 : /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
375 : /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
376 : ///
377 : /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
378 : /// with a smaller/larger timestamp.
379 : ///
380 CBC 3275 : pub async fn is_latest_commit_timestamp_ge_than(
381 3275 : &self,
382 3275 : search_timestamp: TimestampTz,
383 3275 : probe_lsn: Lsn,
384 3275 : found_smaller: &mut bool,
385 3275 : found_larger: &mut bool,
386 3275 : ctx: &RequestContext,
387 3275 : ) -> Result<bool, PageReconstructError> {
388 3275 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
389 3084 : if timestamp >= search_timestamp {
390 896 : *found_larger = true;
391 GIC 896 : return ControlFlow::Break(true);
392 CBC 2188 : } else {
393 2188 : *found_smaller = true;
394 2188 : }
395 2188 : ControlFlow::Continue(())
396 3275 : })
397 338184 : .await
398 3275 : }
399 :
400 ECB (3154) : /// Obtain the possible timestamp range for the given lsn.
401 (2923) : ///
402 (2923) : /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
403 CBC 12 : pub async fn get_timestamp_for_lsn(
404 12 : &self,
405 12 : probe_lsn: Lsn,
406 12 : ctx: &RequestContext,
407 12 : ) -> Result<Option<TimestampTz>, PageReconstructError> {
408 12 : let mut max: Option<TimestampTz> = None;
409 12 : self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
410 10 : if let Some(max_prev) = max {
411 LBC (231) : max = Some(max_prev.max(timestamp));
412 GIC 10 : } else {
413 10 : max = Some(timestamp);
414 CBC 10 : }
415 10 : ControlFlow::Continue(())
416 GIC 12 : })
417 80 : .await?;
418 ECB (4968) :
419 CBC 10 : Ok(max)
420 12 : }
421 ECB (4968) :
422 (4968) : /// Runs the given function on all the timestamps for a given lsn
423 (4968) : ///
424 (4968) : /// The return value is either given by the closure, or set to the `Default`
425 (4968) : /// impl's output.
426 GIC 3287 : async fn map_all_timestamps<T: Default>(
427 CBC 3287 : &self,
428 3287 : probe_lsn: Lsn,
429 3287 : ctx: &RequestContext,
430 GBC 3287 : mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
431 GIC 3287 : ) -> Result<T, PageReconstructError> {
432 CBC 3287 : for segno in self
433 GIC 3287 : .list_slru_segments(SlruKind::Clog, probe_lsn, ctx)
434 CBC 2583 : .await?
435 ECB (2569) : {
436 CBC 3285 : let nblocks = self
437 3285 : .get_slru_segment_size(SlruKind::Clog, segno, probe_lsn, ctx)
438 2137 : .await?;
439 3346 : for blknum in (0..nblocks).rev() {
440 3346 : let clog_page = self
441 3346 : .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
442 GIC 333544 : .await?;
443 ECB (2569) :
444 CBC 3346 : if clog_page.len() == BLCKSZ as usize + 8 {
445 3094 : let mut timestamp_bytes = [0u8; 8];
446 GIC 3094 : timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
447 CBC 3094 : let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
448 3094 :
449 3094 : match f(timestamp) {
450 896 : ControlFlow::Break(b) => return Ok(b),
451 2198 : ControlFlow::Continue(()) => (),
452 : }
453 252 : }
454 : }
455 ECB (636) : }
456 CBC 2389 : Ok(Default::default())
457 GBC 3287 : }
458 :
459 ECB (636) : /// Get a list of SLRU segments
460 GIC 5206 : pub async fn list_slru_segments(
461 CBC 5206 : &self,
462 5206 : kind: SlruKind,
463 5206 : lsn: Lsn,
464 5206 : ctx: &RequestContext,
465 5206 : ) -> Result<HashSet<u32>, PageReconstructError> {
466 5206 : // fetch directory entry
467 5206 : let key = slru_dir_to_key(kind);
468 ECB (2) :
469 CBC 5206 : let buf = self.get(key, lsn, ctx).await?;
470 5203 : match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
471 GIC 5203 : Ok(dir) => Ok(dir.segments),
472 LBC (636) : Err(e) => Err(PageReconstructError::from(e)),
473 ECB (636) : }
474 CBC 5206 : }
475 ECB (636) :
476 CBC 2579 : pub async fn get_relmap_file(
477 GIC 2579 : &self,
478 CBC 2579 : spcnode: Oid,
479 GIC 2579 : dbnode: Oid,
480 CBC 2579 : lsn: Lsn,
481 2579 : ctx: &RequestContext,
482 GBC 2579 : ) -> Result<Bytes, PageReconstructError> {
483 GIC 2579 : let key = relmap_file_key(spcnode, dbnode);
484 ECB (636) :
485 GIC 2579 : let buf = self.get(key, lsn, ctx).await?;
486 CBC 2579 : Ok(buf)
487 2579 : }
488 ECB (635) :
489 CBC 639 : pub async fn list_dbdirs(
490 639 : &self,
491 639 : lsn: Lsn,
492 639 : ctx: &RequestContext,
493 GIC 639 : ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
494 ECB (1881) : // fetch directory entry
495 CBC 639 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
496 ECB (1881) :
497 CBC 639 : match DbDirectory::des(&buf).context("deserialization failure") {
498 639 : Ok(dir) => Ok(dir.dbdirs),
499 LBC (1881) : Err(e) => Err(PageReconstructError::from(e)),
500 ECB (1881) : }
501 GIC 639 : }
502 ECB (2557) :
503 CBC 2 : pub async fn get_twophase_file(
504 2 : &self,
505 2 : xid: TransactionId,
506 2 : lsn: Lsn,
507 2 : ctx: &RequestContext,
508 2 : ) -> Result<Bytes, PageReconstructError> {
509 2 : let key = twophase_file_key(xid);
510 GBC 2 : let buf = self.get(key, lsn, ctx).await?;
511 GIC 2 : Ok(buf)
512 CBC 2 : }
513 ECB (4) :
514 CBC 639 : pub async fn list_twophase_files(
515 GIC 639 : &self,
516 639 : lsn: Lsn,
517 CBC 639 : ctx: &RequestContext,
518 GIC 639 : ) -> Result<HashSet<TransactionId>, PageReconstructError> {
519 : // fetch directory entry
520 639 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
521 :
522 639 : match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
523 639 : Ok(dir) => Ok(dir.xids),
524 LBC (428) : Err(e) => Err(PageReconstructError::from(e)),
525 ECB (428) : }
526 CBC 639 : }
527 ECB (428) :
528 CBC 638 : pub async fn get_control_file(
529 638 : &self,
530 638 : lsn: Lsn,
531 GIC 638 : ctx: &RequestContext,
532 638 : ) -> Result<Bytes, PageReconstructError> {
533 CBC 638 : self.get(CONTROLFILE_KEY, lsn, ctx).await
534 638 : }
535 :
536 1895 : pub async fn get_checkpoint(
537 1895 : &self,
538 1895 : lsn: Lsn,
539 1895 : ctx: &RequestContext,
540 1895 : ) -> Result<Bytes, PageReconstructError> {
541 1895 : self.get(CHECKPOINT_KEY, lsn, ctx).await
542 GIC 1895 : }
543 ECB (395532) :
544 CBC 2567 : pub async fn list_aux_files(
545 2567 : &self,
546 2567 : lsn: Lsn,
547 2567 : ctx: &RequestContext,
548 2567 : ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
549 2567 : match self.get(AUX_FILES_KEY, lsn, ctx).await {
550 2563 : Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
551 2563 : Ok(dir) => Ok(dir.files),
552 LBC (395530) : Err(e) => Err(PageReconstructError::from(e)),
553 ECB (395530) : },
554 GIC 4 : Err(e) => {
555 4 : warn!("Failed to get info about AUX files: {}", e);
556 CBC 4 : Ok(HashMap::new())
557 ECB (426) : }
558 : }
559 GIC 2567 : }
560 :
561 : /// Does the same as get_current_logical_size but counted on demand.
562 : /// Used to initialize the logical size tracking on startup.
563 ECB (677) : ///
564 (677) : /// Only relation blocks are counted currently. That excludes metadata,
565 (677) : /// SLRUs, twophase files etc.
566 CBC 431 : pub async fn get_current_logical_size_non_incremental(
567 431 : &self,
568 431 : lsn: Lsn,
569 431 : cancel: CancellationToken,
570 431 : ctx: &RequestContext,
571 431 : ) -> Result<u64, CalculateLogicalSizeError> {
572 431 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
573 :
574 : // Fetch list of database dirs and iterate them
575 431 : let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
576 425 : let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
577 :
578 425 : let mut total_size: u64 = 0;
579 1709 : for (spcnode, dbnode) in dbdir.dbdirs.keys() {
580 399146 : for rel in self
581 1709 : .list_rels(*spcnode, *dbnode, lsn, ctx)
582 949 : .await
583 GIC 1709 : .context("list rels")?
584 ECB (2376) : {
585 CBC 399146 : if cancel.is_cancelled() {
586 LBC (1360) : return Err(CalculateLogicalSizeError::Cancelled);
587 CBC 399146 : }
588 399146 : let relsize_key = rel_size_to_key(rel);
589 399146 : let mut buf = self
590 399146 : .get(relsize_key, lsn, ctx)
591 197707 : .await
592 399146 : .with_context(|| format!("read relation size of {rel:?}"))?;
593 399146 : let relsize = buf.get_u32_le();
594 399146 :
595 399146 : total_size += relsize as u64;
596 ECB (560968) : }
597 : }
598 GIC 425 : Ok(total_size * BLCKSZ as u64)
599 429 : }
600 :
601 ECB (2031) : ///
602 (677) : /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
603 (677) : /// Anything that's not listed maybe removed from the underlying storage (from
604 (677) : /// that LSN forwards).
605 GIC 669 : pub async fn collect_keyspace(
606 CBC 669 : &self,
607 669 : lsn: Lsn,
608 669 : ctx: &RequestContext,
609 669 : ) -> anyhow::Result<KeySpace> {
610 669 : // Iterate through key ranges, greedily packing them into partitions
611 669 : let mut result = KeySpaceAccum::new();
612 669 :
613 669 : // The dbdir metadata always exists
614 669 : result.add_key(DBDIR_KEY);
615 ECB (1781) :
616 (1781) : // Fetch list of database dirs and iterate them
617 CBC 669 : let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
618 668 : let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
619 ECB (1781) :
620 CBC 668 : let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
621 GIC 668 : dbs.sort_unstable();
622 3011 : for (spcnode, dbnode) in dbs {
623 2343 : result.add_key(relmap_file_key(spcnode, dbnode));
624 2343 : result.add_key(rel_dir_to_key(spcnode, dbnode));
625 ECB (677) :
626 CBC 2343 : let mut rels: Vec<RelTag> = self
627 2343 : .list_rels(spcnode, dbnode, lsn, ctx)
628 1121 : .await?
629 2343 : .into_iter()
630 2343 : .collect();
631 GBC 2343 : rels.sort_unstable();
632 556534 : for rel in rels {
633 GIC 554191 : let relsize_key = rel_size_to_key(rel);
634 CBC 554191 : let mut buf = self.get(relsize_key, lsn, ctx).await?;
635 554191 : let relsize = buf.get_u32_le();
636 554191 :
637 554191 : result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
638 554191 : result.add_key(relsize_key);
639 ECB (677) : }
640 : }
641 :
642 (144037700) : // Iterate SLRUs next
643 CBC 2004 : for kind in [
644 668 : SlruKind::Clog,
645 668 : SlruKind::MultiXactMembers,
646 668 : SlruKind::MultiXactOffsets,
647 ECB (137872) : ] {
648 CBC 2004 : let slrudir_key = slru_dir_to_key(kind);
649 2004 : result.add_key(slrudir_key);
650 2004 : let buf = self.get(slrudir_key, lsn, ctx).await?;
651 GIC 2004 : let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
652 2004 : let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
653 CBC 2004 : segments.sort_unstable();
654 3756 : for segno in segments {
655 1752 : let segsize_key = slru_segment_size_to_key(kind, segno);
656 1752 : let mut buf = self.get(segsize_key, lsn, ctx).await?;
657 1752 : let segsize = buf.get_u32_le();
658 1752 :
659 1752 : result.add_range(
660 1752 : slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
661 GIC 1752 : );
662 CBC 1752 : result.add_key(segsize_key);
663 ECB (20036) : }
664 (20036) : }
665 :
666 (153446) : // Then pg_twophase
667 GIC 668 : result.add_key(TWOPHASEDIR_KEY);
668 668 : let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
669 CBC 668 : let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
670 668 : let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
671 668 : xids.sort_unstable();
672 668 : for xid in xids {
673 UIC 0 : result.add_key(twophase_file_key(xid));
674 0 : }
675 ECB (17672) :
676 CBC 668 : result.add_key(CONTROLFILE_KEY);
677 668 : result.add_key(CHECKPOINT_KEY);
678 668 : result.add_key(AUX_FILES_KEY);
679 GIC 668 :
680 668 : Ok(result.to_keyspace())
681 668 : }
682 :
683 : /// Get cached size of relation if it not updated after specified LSN
684 144822205 : pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
685 144822205 : let rel_size_cache = self.rel_size_cache.read().unwrap();
686 144822205 : if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
687 144584882 : if lsn >= *cached_lsn {
688 144460722 : return Some(*nblocks);
689 124160 : }
690 237323 : }
691 361483 : None
692 144822205 : }
693 :
694 : /// Update cached relation size if there is no more recent update
695 139850 : pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
696 139850 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
697 139850 : match rel_size_cache.entry(tag) {
698 119708 : hash_map::Entry::Occupied(mut entry) => {
699 119708 : let cached_lsn = entry.get_mut();
700 119708 : if lsn >= cached_lsn.0 {
701 3 : *cached_lsn = (lsn, nblocks);
702 119705 : }
703 : }
704 20142 : hash_map::Entry::Vacant(entry) => {
705 20142 : entry.insert((lsn, nblocks));
706 20142 : }
707 ECB (613) : }
708 CBC 139850 : }
709 ECB (613) :
710 (613) : /// Store cached relation size
711 CBC 1598602 : pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
712 GIC 1598602 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
713 1598602 : rel_size_cache.insert(tag, (lsn, nblocks));
714 CBC 1598602 : }
715 ECB (613) :
716 (613) : /// Remove cached relation size
717 CBC 17673 : pub fn remove_cached_rel_size(&self, tag: &RelTag) {
718 GIC 17673 : let mut rel_size_cache = self.rel_size_cache.write().unwrap();
719 CBC 17673 : rel_size_cache.remove(tag);
720 17673 : }
721 ECB (613) : }
722 (613) :
723 : /// DatadirModification represents an operation to ingest an atomic set of
724 (613) : /// updates to the repository. It is created by the 'begin_record'
725 (613) : /// function. It is called for each WAL record, so that all the modifications
726 (613) : /// by a one WAL record appear atomic.
727 (613) : pub struct DatadirModification<'a> {
728 (613) : /// The timeline this modification applies to. You can access this to
729 (613) : /// read the state, but note that any pending updates are *not* reflected
730 (613) : /// in the state in 'tline' yet.
731 (613) : pub tline: &'a Timeline,
732 (613) :
733 (613) : /// Lsn assigned by begin_modification
734 (613) : pub lsn: Lsn,
735 :
736 : // The modifications are not applied directly to the underlying key-value store.
737 : // The put-functions add the modifications here, and they are flushed to the
738 (36) : // underlying key-value store by the 'finish' function.
739 (36) : pending_updates: HashMap<Key, Value>,
740 (36) : pending_deletions: Vec<Range<Key>>,
741 (36) : pending_nblocks: i64,
742 (36) : }
743 (36) :
744 (36) : impl<'a> DatadirModification<'a> {
745 (36) : /// Initialize a completely new repository.
746 (36) : ///
747 (36) : /// This inserts the directory metadata entries that are assumed to
748 (36) : /// always exist.
749 GIC 615 : pub fn init_empty(&mut self) -> anyhow::Result<()> {
750 615 : let buf = DbDirectory::ser(&DbDirectory {
751 615 : dbdirs: HashMap::new(),
752 615 : })?;
753 615 : self.put(DBDIR_KEY, Value::Image(buf.into()));
754 :
755 ECB (69675515) : // Create AuxFilesDirectory
756 CBC 615 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
757 615 : files: HashMap::new(),
758 615 : })?;
759 615 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
760 ECB (69675515) :
761 CBC 615 : let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
762 615 : xids: HashSet::new(),
763 615 : })?;
764 615 : self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
765 :
766 GIC 615 : let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
767 CBC 615 : let empty_dir = Value::Image(buf);
768 615 : self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
769 615 : self.put(
770 615 : slru_dir_to_key(SlruKind::MultiXactMembers),
771 615 : empty_dir.clone(),
772 615 : );
773 615 : self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
774 615 :
775 615 : Ok(())
776 615 : }
777 ECB (1991227) :
778 (1991227) : #[cfg(test)]
779 (1991227) : pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
780 GIC 36 : self.init_empty()?;
781 36 : self.put_control_file(bytes::Bytes::from_static(
782 CBC 36 : b"control_file contents do not matter",
783 36 : ))
784 36 : .context("put_control_file")?;
785 36 : self.put_checkpoint(bytes::Bytes::from_static(
786 36 : b"checkpoint_file contents do not matter",
787 36 : ))
788 36 : .context("put_checkpoint_file")?;
789 36 : Ok(())
790 36 : }
791 ECB (2302125) :
792 : /// Put a new page version that can be constructed from a WAL record
793 (2395) : ///
794 (2395) : /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
795 (2395) : /// current end-of-file. It's up to the caller to check that the relation size
796 (2395) : /// matches the blocks inserted!
797 CBC 70051708 : pub fn put_rel_wal_record(
798 70051708 : &mut self,
799 70051708 : rel: RelTag,
800 70051708 : blknum: BlockNumber,
801 70051708 : rec: NeonWalRecord,
802 70051708 : ) -> anyhow::Result<()> {
803 GIC 70051708 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
804 70051708 : self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
805 CBC 70051708 : Ok(())
806 70051708 : }
807 ECB (2375) :
808 (2375) : // Same, but for an SLRU.
809 CBC 1998464 : pub fn put_slru_wal_record(
810 1998464 : &mut self,
811 1998464 : kind: SlruKind,
812 GIC 1998464 : segno: u32,
813 CBC 1998464 : blknum: BlockNumber,
814 1998464 : rec: NeonWalRecord,
815 GIC 1998464 : ) -> anyhow::Result<()> {
816 CBC 1998464 : self.put(
817 1998464 : slru_block_to_key(kind, segno, blknum),
818 GIC 1998464 : Value::WalRecord(rec),
819 1998464 : );
820 1998464 : Ok(())
821 CBC 1998464 : }
822 ECB (2321) :
823 : /// Like put_wal_record, but with ready-made image of the page.
824 GIC 2312475 : pub fn put_rel_page_image(
825 CBC 2312475 : &mut self,
826 2312475 : rel: RelTag,
827 2312475 : blknum: BlockNumber,
828 2312475 : img: Bytes,
829 2312475 : ) -> anyhow::Result<()> {
830 2312475 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
831 2312475 : self.put(rel_block_to_key(rel, blknum), Value::Image(img));
832 2312475 : Ok(())
833 2312475 : }
834 ECB (21) :
835 CBC 2401 : pub fn put_slru_page_image(
836 2401 : &mut self,
837 2401 : kind: SlruKind,
838 2401 : segno: u32,
839 2401 : blknum: BlockNumber,
840 GIC 2401 : img: Bytes,
841 CBC 2401 : ) -> anyhow::Result<()> {
842 2401 : self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
843 2401 : Ok(())
844 GIC 2401 : }
845 ECB (4) :
846 (4) : /// Store a relmapper file (pg_filenode.map) in the repository
847 CBC 2383 : pub async fn put_relmap_file(
848 2383 : &mut self,
849 2383 : spcnode: Oid,
850 2383 : dbnode: Oid,
851 GIC 2383 : img: Bytes,
852 CBC 2383 : ctx: &RequestContext,
853 2383 : ) -> anyhow::Result<()> {
854 ECB (4) : // Add it to the directory (if it doesn't exist already)
855 GBC 2383 : let buf = self.get(DBDIR_KEY, ctx).await?;
856 CBC 2383 : let mut dbdir = DbDirectory::des(&buf)?;
857 ECB (4) :
858 CBC 2383 : let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
859 2383 : if r.is_none() || r == Some(false) {
860 : // The dbdir entry didn't exist, or it contained a
861 : // 'false'. The 'insert' call already updated it with
862 ECB (4) : // 'true', now write the updated 'dbdirs' map back.
863 CBC 2329 : let buf = DbDirectory::ser(&dbdir)?;
864 2329 : self.put(DBDIR_KEY, Value::Image(buf.into()));
865 :
866 ECB (611) : // Create AuxFilesDirectory as well
867 CBC 2329 : let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
868 2329 : files: HashMap::new(),
869 2329 : })?;
870 GIC 2329 : self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
871 CBC 54 : }
872 2383 : if r.is_none() {
873 21 : // Create RelDirectory
874 21 : let buf = RelDirectory::ser(&RelDirectory {
875 GIC 21 : rels: HashSet::new(),
876 CBC 21 : })?;
877 21 : self.put(
878 21 : rel_dir_to_key(spcnode, dbnode),
879 21 : Value::Image(Bytes::from(buf)),
880 21 : );
881 2362 : }
882 ECB (3) :
883 GIC 2383 : self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
884 CBC 2383 : Ok(())
885 2383 : }
886 ECB (3) :
887 GBC 4 : pub async fn put_twophase_file(
888 GIC 4 : &mut self,
889 4 : xid: TransactionId,
890 CBC 4 : img: Bytes,
891 4 : ctx: &RequestContext,
892 4 : ) -> anyhow::Result<()> {
893 ECB (3) : // Add it to the directory entry
894 CBC 4 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
895 GIC 4 : let mut dir = TwoPhaseDirectory::des(&buf)?;
896 GBC 4 : if !dir.xids.insert(xid) {
897 UBC 0 : anyhow::bail!("twophase file for xid {} already exists", xid);
898 GBC 4 : }
899 4 : self.put(
900 GIC 4 : TWOPHASEDIR_KEY,
901 4 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
902 : );
903 ECB (3) :
904 CBC 4 : self.put(twophase_file_key(xid), Value::Image(img));
905 4 : Ok(())
906 4 : }
907 ECB (3) :
908 CBC 613 : pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
909 GIC 613 : self.put(CONTROLFILE_KEY, Value::Image(img));
910 613 : Ok(())
911 613 : }
912 :
913 CBC 28659 : pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
914 28659 : self.put(CHECKPOINT_KEY, Value::Image(img));
915 28659 : Ok(())
916 28659 : }
917 ECB (563733) :
918 CBC 3 : pub async fn drop_dbdir(
919 3 : &mut self,
920 GBC 3 : spcnode: Oid,
921 CBC 3 : dbnode: Oid,
922 GIC 3 : ctx: &RequestContext,
923 3 : ) -> anyhow::Result<()> {
924 CBC 3 : let req_lsn = self.tline.get_last_record_lsn();
925 ECB (563733) :
926 CBC 3 : let total_blocks = self
927 3 : .tline
928 GIC 3 : .get_db_size(spcnode, dbnode, req_lsn, true, ctx)
929 LBC (2301) : .await?;
930 ECB (2301) :
931 (2301) : // Remove entry from dbdir
932 CBC 3 : let buf = self.get(DBDIR_KEY, ctx).await?;
933 3 : let mut dir = DbDirectory::des(&buf)?;
934 3 : if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
935 GIC 3 : let buf = DbDirectory::ser(&dir)?;
936 3 : self.put(DBDIR_KEY, Value::Image(buf.into()));
937 ECB (561432) : } else {
938 LBC (561432) : warn!(
939 UIC 0 : "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
940 0 : spcnode, dbnode
941 0 : );
942 ECB (563733) : }
943 EUB :
944 ECB (563733) : // Update logical database size.
945 CBC 3 : self.pending_nblocks -= total_blocks as i64;
946 3 :
947 3 : // Delete all relations and metadata files for the spcnode/dnode
948 3 : self.delete(dbdir_key_range(spcnode, dbnode));
949 GIC 3 : Ok(())
950 3 : }
951 :
952 : /// Create a relation fork.
953 ECB (563733) : ///
954 (563733) : /// 'nblocks' is the initial size.
955 CBC 565601 : pub async fn put_rel_creation(
956 565601 : &mut self,
957 565601 : rel: RelTag,
958 565601 : nblocks: BlockNumber,
959 565601 : ctx: &RequestContext,
960 565601 : ) -> Result<(), RelationError> {
961 565601 : if rel.relnode == 0 {
962 LBC (563733) : return Err(RelationError::InvalidRelnode);
963 CBC 565601 : }
964 ECB (563733) : // It's possible that this is the first rel for this db in this
965 (563733) : // tablespace. Create the reldir entry for it if so.
966 GIC 565601 : let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
967 565601 : .context("deserialize db")?;
968 CBC 565601 : let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
969 565601 : let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
970 ECB (3097) : // Didn't exist. Update dbdir
971 CBC 2309 : dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
972 2309 : let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
973 2309 : self.put(DBDIR_KEY, Value::Image(buf.into()));
974 2309 :
975 2309 : // and create the RelDirectory
976 2309 : RelDirectory::default()
977 ECB (3097) : } else {
978 : // reldir already exists, fetch it
979 CBC 563292 : RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
980 563292 : .context("deserialize db")?
981 ECB (3097) : };
982 (3097) :
983 (3097) : // Add the new relation to the rel directory entry, and write it back
984 CBC 565601 : if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
985 LBC (3097) : return Err(RelationError::AlreadyExists);
986 CBC 565601 : }
987 565601 : self.put(
988 565601 : rel_dir_key,
989 565601 : Value::Image(Bytes::from(
990 565601 : RelDirectory::ser(&rel_dir).context("serialize")?,
991 ECB (3097) : )),
992 (3097) : );
993 EUB :
994 ECB (3097) : // Put size
995 CBC 565601 : let size_key = rel_size_to_key(rel);
996 GIC 565601 : let buf = nblocks.to_le_bytes();
997 565601 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
998 565601 :
999 CBC 565601 : self.pending_nblocks += nblocks as i64;
1000 565601 :
1001 565601 : // Update relation size cache
1002 565601 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1003 565601 :
1004 565601 : // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
1005 565601 : // caller.
1006 GIC 565601 : Ok(())
1007 565601 : }
1008 ECB (1564545) :
1009 (1564545) : /// Truncate relation
1010 CBC 3094 : pub async fn put_rel_truncation(
1011 3094 : &mut self,
1012 3094 : rel: RelTag,
1013 3094 : nblocks: BlockNumber,
1014 3094 : ctx: &RequestContext,
1015 3094 : ) -> anyhow::Result<()> {
1016 3094 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1017 3094 : let last_lsn = self.tline.get_last_record_lsn();
1018 3094 : if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
1019 3094 : let size_key = rel_size_to_key(rel);
1020 ECB (1028277) : // Fetch the old size first
1021 CBC 3094 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1022 3094 :
1023 GIC 3094 : // Update the entry with the new size.
1024 3094 : let buf = nblocks.to_le_bytes();
1025 CBC 3094 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1026 3094 :
1027 GIC 3094 : // Update relation size cache
1028 3094 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1029 CBC 3094 :
1030 3094 : // Update relation size cache
1031 3094 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1032 GIC 3094 :
1033 CBC 3094 : // Update logical database size.
1034 3094 : self.pending_nblocks -= old_size as i64 - nblocks as i64;
1035 UIC 0 : }
1036 GBC 3094 : Ok(())
1037 GIC 3094 : }
1038 :
1039 : /// Extend relation
1040 ECB (17672) : /// If new size is smaller, do nothing.
1041 CBC 1564943 : pub async fn put_rel_extend(
1042 1564943 : &mut self,
1043 1564943 : rel: RelTag,
1044 1564943 : nblocks: BlockNumber,
1045 1564943 : ctx: &RequestContext,
1046 1564943 : ) -> anyhow::Result<()> {
1047 1564943 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1048 ECB (17672) :
1049 (17672) : // Put size
1050 CBC 1564943 : let size_key = rel_size_to_key(rel);
1051 1564943 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1052 GIC 1564943 :
1053 CBC 1564943 : // only extend relation here. never decrease the size
1054 1564943 : if nblocks > old_size {
1055 1026813 : let buf = nblocks.to_le_bytes();
1056 1026813 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1057 1026813 :
1058 1026813 : // Update relation size cache
1059 1026813 : self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
1060 1026813 :
1061 1026813 : self.pending_nblocks += nblocks as i64 - old_size as i64;
1062 1026813 : }
1063 1564943 : Ok(())
1064 GIC 1564943 : }
1065 ECB (1746) :
1066 EUB : /// Drop a relation.
1067 CBC 17673 : pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
1068 17673 : anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
1069 ECB (1746) :
1070 (1746) : // Remove it from the directory entry
1071 GIC 17673 : let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
1072 17673 : let buf = self.get(dir_key, ctx).await?;
1073 17673 : let mut dir = RelDirectory::des(&buf)?;
1074 ECB (1746) :
1075 CBC 17673 : if dir.rels.remove(&(rel.relnode, rel.forknum)) {
1076 17673 : self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
1077 ECB (1746) : } else {
1078 LBC (1746) : warn!("dropped rel {} did not exist in rel directory", rel);
1079 ECB (1746) : }
1080 (1746) :
1081 (1746) : // update logical size
1082 GIC 17673 : let size_key = rel_size_to_key(rel);
1083 17673 : let old_size = self.get(size_key, ctx).await?.get_u32_le();
1084 CBC 17673 : self.pending_nblocks -= old_size as i64;
1085 17673 :
1086 17673 : // Remove enty from relation size cache
1087 17673 : self.tline.remove_cached_rel_size(&rel);
1088 17673 :
1089 17673 : // Delete size entry, as well as all blocks
1090 17673 : self.delete(rel_key_range(rel));
1091 17673 :
1092 17673 : Ok(())
1093 17673 : }
1094 ECB (661) :
1095 CBC 1752 : pub async fn put_slru_segment_creation(
1096 GIC 1752 : &mut self,
1097 1752 : kind: SlruKind,
1098 CBC 1752 : segno: u32,
1099 1752 : nblocks: BlockNumber,
1100 1752 : ctx: &RequestContext,
1101 1752 : ) -> anyhow::Result<()> {
1102 1752 : // Add it to the directory entry
1103 1752 : let dir_key = slru_dir_to_key(kind);
1104 1752 : let buf = self.get(dir_key, ctx).await?;
1105 1752 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1106 ECB (9) :
1107 CBC 1752 : if !dir.segments.insert(segno) {
1108 UIC 0 : anyhow::bail!("slru segment {kind:?}/{segno} already exists");
1109 CBC 1752 : }
1110 GBC 1752 : self.put(
1111 CBC 1752 : dir_key,
1112 1752 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1113 ECB (9) : );
1114 (9) :
1115 : // Put size
1116 GIC 1752 : let size_key = slru_segment_size_to_key(kind, segno);
1117 1752 : let buf = nblocks.to_le_bytes();
1118 CBC 1752 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1119 1752 :
1120 1752 : // even if nblocks > 0, we don't insert any actual blocks here
1121 1752 :
1122 GIC 1752 : Ok(())
1123 1752 : }
1124 EUB :
1125 : /// Extend SLRU segment
1126 GBC 661 : pub fn put_slru_extend(
1127 661 : &mut self,
1128 GIC 661 : kind: SlruKind,
1129 661 : segno: u32,
1130 CBC 661 : nblocks: BlockNumber,
1131 661 : ) -> anyhow::Result<()> {
1132 661 : // Put size
1133 661 : let size_key = slru_segment_size_to_key(kind, segno);
1134 661 : let buf = nblocks.to_le_bytes();
1135 GIC 661 : self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
1136 CBC 661 : Ok(())
1137 661 : }
1138 :
1139 ECB (2) : /// This method is used for marking truncated SLRU files
1140 GBC 9 : pub async fn drop_slru_segment(
1141 CBC 9 : &mut self,
1142 9 : kind: SlruKind,
1143 9 : segno: u32,
1144 9 : ctx: &RequestContext,
1145 GIC 9 : ) -> anyhow::Result<()> {
1146 9 : // Remove it from the directory entry
1147 9 : let dir_key = slru_dir_to_key(kind);
1148 CBC 9 : let buf = self.get(dir_key, ctx).await?;
1149 9 : let mut dir = SlruSegmentDirectory::des(&buf)?;
1150 ECB (2) :
1151 CBC 9 : if !dir.segments.remove(&segno) {
1152 UIC 0 : warn!("slru segment {:?}/{} does not exist", kind, segno);
1153 CBC 9 : }
1154 9 : self.put(
1155 9 : dir_key,
1156 9 : Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
1157 ECB (94) : );
1158 (94) :
1159 (94) : // Delete size entry, as well as all blocks
1160 CBC 9 : self.delete(slru_segment_key_range(kind, segno));
1161 GBC 9 :
1162 9 : Ok(())
1163 9 : }
1164 EUB :
1165 : /// Drop a relmapper file (pg_filenode.map)
1166 UIC 0 : pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
1167 0 : // TODO
1168 LBC (94) : Ok(())
1169 (94) : }
1170 ECB (3) :
1171 (91) : /// This method is used for marking truncated SLRU files
1172 CBC 2 : pub async fn drop_twophase_file(
1173 2 : &mut self,
1174 2 : xid: TransactionId,
1175 2 : ctx: &RequestContext,
1176 2 : ) -> anyhow::Result<()> {
1177 ECB (94) : // Remove it from the directory entry
1178 GIC 2 : let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
1179 2 : let mut dir = TwoPhaseDirectory::des(&buf)?;
1180 ECB (94) :
1181 CBC 2 : if !dir.xids.remove(&xid) {
1182 UIC 0 : warn!("twophase file for xid {} does not exist", xid);
1183 GIC 2 : }
1184 2 : self.put(
1185 2 : TWOPHASEDIR_KEY,
1186 2 : Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
1187 : );
1188 :
1189 : // Delete it
1190 2 : self.delete(twophase_key_range(xid));
1191 2 :
1192 2 : Ok(())
1193 2 : }
1194 :
1195 94 : pub async fn put_file(
1196 94 : &mut self,
1197 94 : path: &str,
1198 94 : content: &[u8],
1199 94 : ctx: &RequestContext,
1200 94 : ) -> anyhow::Result<()> {
1201 CBC 94 : let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
1202 94 : Ok(buf) => AuxFilesDirectory::des(&buf)?,
1203 LBC (546640) : Err(e) => {
1204 (546640) : warn!("Failed to get info about AUX files: {}", e);
1205 (546640) : AuxFilesDirectory {
1206 (546640) : files: HashMap::new(),
1207 UBC 0 : }
1208 : }
1209 EUB : };
1210 GIC 94 : let path = path.to_string();
1211 94 : if content.is_empty() {
1212 GBC 3 : dir.files.remove(&path);
1213 91 : } else {
1214 91 : dir.files.insert(path, Bytes::copy_from_slice(content));
1215 GIC 91 : }
1216 94 : self.put(
1217 GBC 94 : AUX_FILES_KEY,
1218 94 : Value::Image(Bytes::from(
1219 94 : AuxFilesDirectory::ser(&dir).context("serialize")?,
1220 EUB : )),
1221 : );
1222 GBC 94 : Ok(())
1223 94 : }
1224 EUB :
1225 : ///
1226 : /// Flush changes accumulated so far to the underlying repository.
1227 : ///
1228 : /// Usually, changes made in DatadirModification are atomic, but this allows
1229 : /// you to flush them to the underlying repository before the final `commit`.
1230 ECB (546640) : /// That allows to free up the memory used to hold the pending changes.
1231 : ///
1232 : /// Currently only used during bulk import of a data directory. In that
1233 : /// context, breaking the atomicity is OK. If the import is interrupted, the
1234 : /// whole import fails and the timeline will be deleted anyway.
1235 : /// (Or to be precise, it will be left behind for debugging purposes and
1236 : /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
1237 (68371807) : ///
1238 (68371793) : /// Note: A consequence of flushing the pending operations is that they
1239 (68371793) : /// won't be visible to subsequent operations until `commit`. The function
1240 (68371793) : /// retains all the metadata, but data pages are flushed. That's again OK
1241 (68371793) : /// for bulk import, where you are just loading data pages and won't try to
1242 : /// modify the same pages twice.
1243 CBC 548538 : pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1244 548538 : // Unless we have accumulated a decent amount of changes, it's not worth it
1245 GIC 548538 : // to scan through the pending_updates list.
1246 CBC 548538 : let pending_nblocks = self.pending_nblocks;
1247 548538 : if pending_nblocks < 10000 {
1248 GIC 548538 : return Ok(());
1249 UIC 0 : }
1250 ECB (68371785) :
1251 LBC (68371785) : let writer = self.tline.writer().await;
1252 ECB (68371785) :
1253 (1030507) : // Flush relation and SLRU data blocks, keep metadata.
1254 LBC (67341278) : let mut retained_pending_updates = HashMap::new();
1255 UIC 0 : for (key, value) in self.pending_updates.drain() {
1256 LBC (68371785) : if is_rel_block_key(key) || is_slru_block_key(key) {
1257 ECB (68371785) : // This bails out on first error without modifying pending_updates.
1258 : // That's Ok, cf this function's doc comment.
1259 UIC 0 : writer.put(key, self.lsn, &value, ctx).await?;
1260 0 : } else {
1261 LBC (2732384) : retained_pending_updates.insert(key, value);
1262 UIC 0 : }
1263 : }
1264 0 : self.pending_updates.extend(retained_pending_updates);
1265 0 :
1266 0 : if pending_nblocks != 0 {
1267 LBC (2732384) : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1268 (1638695) : self.pending_nblocks = 0;
1269 (1638695) : }
1270 :
1271 UIC 0 : Ok(())
1272 GIC 548538 : }
1273 :
1274 : ///
1275 : /// Finish this atomic update, writing all the updated keys to the
1276 EUB : /// underlying timeline.
1277 : /// All the modifications in this atomic update are stamped by the specified LSN.
1278 : ///
1279 GIC 68723403 : pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
1280 68723388 : let writer = self.tline.writer().await;
1281 CBC 68723388 : let lsn = self.lsn;
1282 68723388 : let pending_nblocks = self.pending_nblocks;
1283 GIC 68723388 : self.pending_nblocks = 0;
1284 ECB (2732384) :
1285 GIC 76014982 : for (key, value) in self.pending_updates.drain() {
1286 CBC 76014982 : writer.put(key, lsn, &value, ctx).await?;
1287 ECB (76194320) : }
1288 CBC 68723381 : for key_range in self.pending_deletions.drain(..) {
1289 GIC 17687 : writer.delete(key_range, lsn).await?;
1290 ECB (17686) : }
1291 (17686) :
1292 CBC 68723381 : writer.finish_write(lsn);
1293 68723381 :
1294 GIC 68723381 : if pending_nblocks != 0 {
1295 1029042 : writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
1296 67694339 : }
1297 :
1298 CBC 68723381 : Ok(())
1299 GIC 68723381 : }
1300 :
1301 : // Internal helper functions to batch the modifications
1302 :
1303 2736523 : async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
1304 ECB (1319) : // Have we already updated the same key? Read the pending updated
1305 : // version in that case.
1306 : //
1307 : // Note: we don't check pending_deletions. It is an error to request a
1308 : // value that has been removed, deletion only avoids leaking storage.
1309 CBC 2736523 : if let Some(value) = self.pending_updates.get(&key) {
1310 GIC 1644295 : if let Value::Image(img) = value {
1311 1644295 : Ok(img.clone())
1312 : } else {
1313 : // Currently, we never need to read back a WAL record that we
1314 : // inserted in the same "transaction". All the metadata updates
1315 : // work directly with Images, and we never need to read actual
1316 : // data pages. We could handle this if we had to, by calling
1317 : // the walredo manager, but let's keep it simple for now.
1318 LBC (6056) : Err(PageReconstructError::from(anyhow::anyhow!(
1319 UIC 0 : "unexpected pending WAL record"
1320 0 : )))
1321 : }
1322 : } else {
1323 GBC 1092228 : let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
1324 GIC 1092228 : self.tline.get(key, lsn, ctx).await
1325 : }
1326 2736523 : }
1327 :
1328 CBC 76590444 : fn put(&mut self, key: Key, val: Value) {
1329 GIC 76590444 : self.pending_updates.insert(key, val);
1330 76590444 : }
1331 :
1332 17687 : fn delete(&mut self, key_range: Range<Key>) {
1333 17687 : trace!("DELETE {}-{}", key_range.start, key_range.end);
1334 17687 : self.pending_deletions.push(key_range);
1335 17687 : }
1336 : }
1337 :
1338 : //--- Metadata structs stored in key-value pairs in the repository.
1339 :
1340 569719 : #[derive(Debug, Serialize, Deserialize)]
1341 : struct DbDirectory {
1342 : // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
1343 : dbdirs: HashMap<(Oid, Oid), bool>,
1344 : }
1345 :
1346 1313 : #[derive(Debug, Serialize, Deserialize)]
1347 : struct TwoPhaseDirectory {
1348 : xids: HashSet<TransactionId>,
1349 : }
1350 :
1351 1166590 : #[derive(Debug, Serialize, Deserialize, Default)]
1352 : struct RelDirectory {
1353 : // Set of relations that exist. (relfilenode, forknum)
1354 : //
1355 : // TODO: Store it as a btree or radix tree or something else that spans multiple
1356 : // key-value pairs, if you have a lot of relations
1357 : rels: HashSet<(Oid, u8)>,
1358 : }
1359 :
1360 6076 : #[derive(Debug, Serialize, Deserialize, Default)]
1361 : struct AuxFilesDirectory {
1362 : files: HashMap<String, Bytes>,
1363 : }
1364 :
1365 UIC 0 : #[derive(Debug, Serialize, Deserialize)]
1366 : struct RelSizeEntry {
1367 : nblocks: u32,
1368 : }
1369 :
1370 GIC 9635 : #[derive(Debug, Serialize, Deserialize, Default)]
1371 : struct SlruSegmentDirectory {
1372 : // Set of SLRU segments that exist.
1373 : segments: HashSet<u32>,
1374 : }
1375 :
1376 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
1377 :
1378 : // Layout of the Key address space
1379 : //
1380 : // The Key struct, used to address the underlying key-value store, consists of
1381 : // 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
1382 : // all the data and metadata keys into those 18 bytes.
1383 : //
1384 : // Principles for the mapping:
1385 : //
1386 : // - Things that are often accessed or modified together, should be close to
1387 : // each other in the key space. For example, if a relation is extended by one
1388 : // block, we create a new key-value pair for the block data, and update the
1389 : // relation size entry. Because of that, the RelSize key comes after all the
1390 : // RelBlocks of a relation: the RelSize and the last RelBlock are always next
1391 : // to each other.
1392 : //
1393 : // The key space is divided into four major sections, identified by the first
1394 : // byte, and the form a hierarchy:
1395 : //
1396 : // 00 Relation data and metadata
1397 : //
1398 : // DbDir () -> (dbnode, spcnode)
1399 : // Filenodemap
1400 : // RelDir -> relnode forknum
1401 : // RelBlocks
1402 : // RelSize
1403 : //
1404 : // 01 SLRUs
1405 : //
1406 : // SlruDir kind
1407 : // SlruSegBlocks segno
1408 : // SlruSegSize
1409 : //
1410 : // 02 pg_twophase
1411 : //
1412 : // 03 misc
1413 : // Controlfile
1414 : // checkpoint
1415 : // pg_version
1416 : //
1417 : // 04 aux files
1418 : //
1419 : // Below is a full list of the keyspace allocation:
1420 : //
1421 : // DbDir:
1422 : // 00 00000000 00000000 00000000 00 00000000
1423 : //
1424 : // Filenodemap:
1425 : // 00 SPCNODE DBNODE 00000000 00 00000000
1426 : //
1427 : // RelDir:
1428 : // 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
1429 : //
1430 ECB (3) : // RelBlock:
1431 (3) : // 00 SPCNODE DBNODE RELNODE FORK BLKNUM
1432 (3) : //
1433 (3) : // RelSize:
1434 (3) : // 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
1435 (3) : //
1436 (3) : // SlruDir:
1437 (3) : // 01 kind 00000000 00000000 00 00000000
1438 (3) : //
1439 (3) : // SlruSegBlock:
1440 (3) : // 01 kind 00000001 SEGNO 00 BLKNUM
1441 (3) : //
1442 (3) : // SlruSegSize:
1443 (3) : // 01 kind 00000001 SEGNO 00 FFFFFFFF
1444 (3) : //
1445 (3) : // TwoPhaseDir:
1446 (3) : // 02 00000000 00000000 00000000 00 00000000
1447 : //
1448 (7320) : // TwoPhaseFile:
1449 (7320) : // 02 00000000 00000000 00000000 00 XID
1450 (7320) : //
1451 (7320) : // ControlFile:
1452 (7320) : // 03 00000000 00000000 00000000 00 00000000
1453 (7320) : //
1454 (7320) : // Checkpoint:
1455 (7320) : // 03 00000000 00000000 00000000 00 00000001
1456 (7320) : //
1457 (7320) : // AuxFiles:
1458 : // 03 00000000 00000000 00000000 00 00000002
1459 (693574) : //
1460 (693574) :
1461 (693574) : //-- Section 01: relation data and metadata
1462 (693574) :
1463 (693574) : const DBDIR_KEY: Key = Key {
1464 (693574) : field1: 0x00,
1465 (693574) : field2: 0,
1466 (693574) : field3: 0,
1467 (693574) : field4: 0,
1468 (693574) : field5: 0,
1469 : field6: 0,
1470 (76675583) : };
1471 (76675583) :
1472 CBC 3 : fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
1473 3 : Key {
1474 3 : field1: 0x00,
1475 3 : field2: spcnode,
1476 3 : field3: dbnode,
1477 3 : field4: 0,
1478 3 : field5: 0,
1479 3 : field6: 0,
1480 GIC 3 : }..Key {
1481 CBC 3 : field1: 0x00,
1482 3 : field2: spcnode,
1483 3 : field3: dbnode,
1484 3 : field4: 0xffffffff,
1485 3 : field5: 0xff,
1486 3 : field6: 0xffffffff,
1487 3 : }
1488 3 : }
1489 ECB (3378180) :
1490 CBC 7305 : fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
1491 GIC 7305 : Key {
1492 CBC 7305 : field1: 0x00,
1493 7305 : field2: spcnode,
1494 7305 : field3: dbnode,
1495 7305 : field4: 0,
1496 7305 : field5: 0,
1497 7305 : field6: 0,
1498 7305 : }
1499 7305 : }
1500 ECB (17672) :
1501 CBC 695497 : fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
1502 695497 : Key {
1503 695497 : field1: 0x00,
1504 695497 : field2: spcnode,
1505 695497 : field3: dbnode,
1506 695497 : field4: 0,
1507 695497 : field5: 0,
1508 695497 : field6: 1,
1509 GIC 695497 : }
1510 695497 : }
1511 :
1512 CBC 77071715 : fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
1513 77071715 : Key {
1514 77071715 : field1: 0x00,
1515 77071715 : field2: rel.spcnode,
1516 77071715 : field3: rel.dbnode,
1517 77071715 : field4: rel.relnode,
1518 77071715 : field5: rel.forknum,
1519 GIC 77071715 : field6: blknum,
1520 77071715 : }
1521 77071715 : }
1522 :
1523 3362913 : fn rel_size_to_key(rel: RelTag) -> Key {
1524 3362913 : Key {
1525 CBC 3362913 : field1: 0x00,
1526 GIC 3362913 : field2: rel.spcnode,
1527 CBC 3362913 : field3: rel.dbnode,
1528 3362913 : field4: rel.relnode,
1529 3362913 : field5: rel.forknum,
1530 3362913 : field6: 0xffffffff,
1531 3362913 : }
1532 3362913 : }
1533 ECB (26457) :
1534 GIC 17673 : fn rel_key_range(rel: RelTag) -> Range<Key> {
1535 17673 : Key {
1536 CBC 17673 : field1: 0x00,
1537 17673 : field2: rel.spcnode,
1538 17673 : field3: rel.dbnode,
1539 17673 : field4: rel.relnode,
1540 17673 : field5: rel.forknum,
1541 GIC 17673 : field6: 0,
1542 CBC 17673 : }..Key {
1543 17673 : field1: 0x00,
1544 17673 : field2: rel.spcnode,
1545 17673 : field3: rel.dbnode,
1546 17673 : field4: rel.relnode,
1547 17673 : field5: rel.forknum + 1,
1548 17673 : field6: 0,
1549 GIC 17673 : }
1550 17673 : }
1551 ECB (9821) :
1552 (9821) : //-- Section 02: SLRUs
1553 (9821) :
1554 CBC 11483 : fn slru_dir_to_key(kind: SlruKind) -> Key {
1555 11483 : Key {
1556 GIC 11483 : field1: 0x01,
1557 CBC 11483 : field2: match kind {
1558 6169 : SlruKind::Clog => 0x00,
1559 2800 : SlruKind::MultiXactMembers => 0x01,
1560 GBC 2514 : SlruKind::MultiXactOffsets => 0x02,
1561 EUB : },
1562 : field3: 0,
1563 : field4: 0,
1564 ECB (9) : field5: 0,
1565 (9) : field6: 0,
1566 (9) : }
1567 CBC 11483 : }
1568 ECB (9) :
1569 CBC 2010296 : fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
1570 2010296 : Key {
1571 2010296 : field1: 0x01,
1572 2010296 : field2: match kind {
1573 1956291 : SlruKind::Clog => 0x00,
1574 27561 : SlruKind::MultiXactMembers => 0x01,
1575 26444 : SlruKind::MultiXactOffsets => 0x02,
1576 ECB (9) : },
1577 (9) : field3: 1,
1578 CBC 2010296 : field4: segno,
1579 2010296 : field5: 0,
1580 GIC 2010296 : field6: blknum,
1581 2010296 : }
1582 2010296 : }
1583 :
1584 10034 : fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
1585 10034 : Key {
1586 10034 : field1: 0x01,
1587 10034 : field2: match kind {
1588 5808 : SlruKind::Clog => 0x00,
1589 2402 : SlruKind::MultiXactMembers => 0x01,
1590 1824 : SlruKind::MultiXactOffsets => 0x02,
1591 : },
1592 ECB (6) : field3: 1,
1593 CBC 10034 : field4: segno,
1594 10034 : field5: 0,
1595 10034 : field6: 0xffffffff,
1596 10034 : }
1597 10034 : }
1598 ECB (6) :
1599 CBC 9 : fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
1600 9 : let field2 = match kind {
1601 9 : SlruKind::Clog => 0x00,
1602 UIC 0 : SlruKind::MultiXactMembers => 0x01,
1603 LBC (2) : SlruKind::MultiXactOffsets => 0x02,
1604 ECB (2) : };
1605 (2) :
1606 CBC 9 : Key {
1607 9 : field1: 0x01,
1608 9 : field2,
1609 9 : field3: 1,
1610 9 : field4: segno,
1611 9 : field5: 0,
1612 9 : field6: 0,
1613 9 : }..Key {
1614 9 : field1: 0x01,
1615 9 : field2,
1616 9 : field3: 1,
1617 9 : field4: segno,
1618 9 : field5: 1,
1619 9 : field6: 0,
1620 9 : }
1621 9 : }
1622 :
1623 : //-- Section 03: pg_twophase
1624 :
1625 : const TWOPHASEDIR_KEY: Key = Key {
1626 : field1: 0x02,
1627 : field2: 0,
1628 : field3: 0,
1629 : field4: 0,
1630 : field5: 0,
1631 : field6: 0,
1632 : };
1633 :
1634 GIC 6 : fn twophase_file_key(xid: TransactionId) -> Key {
1635 6 : Key {
1636 6 : field1: 0x02,
1637 6 : field2: 0,
1638 6 : field3: 0,
1639 6 : field4: 0,
1640 6 : field5: 0,
1641 6 : field6: xid,
1642 6 : }
1643 6 : }
1644 :
1645 2 : fn twophase_key_range(xid: TransactionId) -> Range<Key> {
1646 2 : let (next_xid, overflowed) = xid.overflowing_add(1);
1647 2 :
1648 2 : Key {
1649 2 : field1: 0x02,
1650 2 : field2: 0,
1651 2 : field3: 0,
1652 2 : field4: 0,
1653 2 : field5: 0,
1654 CBC 2 : field6: xid,
1655 2 : }..Key {
1656 2 : field1: 0x02,
1657 2 : field2: 0,
1658 2 : field3: 0,
1659 2 : field4: 0,
1660 2 : field5: u8::from(overflowed),
1661 2 : field6: next_xid,
1662 2 : }
1663 2 : }
1664 ECB (2268096) :
1665 EUB : //-- Section 03: Control file
1666 : const CONTROLFILE_KEY: Key = Key {
1667 ECB (2268096) : field1: 0x03,
1668 : field2: 0,
1669 EUB : field3: 0,
1670 : field4: 0,
1671 : field5: 0,
1672 : field6: 0,
1673 : };
1674 :
1675 : const CHECKPOINT_KEY: Key = Key {
1676 : field1: 0x03,
1677 : field2: 0,
1678 : field3: 0,
1679 : field4: 0,
1680 : field5: 0,
1681 : field6: 1,
1682 : };
1683 :
1684 ECB (21637090) : const AUX_FILES_KEY: Key = Key {
1685 (21637090) : field1: 0x03,
1686 : field2: 0,
1687 (21637090) : field3: 0,
1688 (21541218) : field4: 0,
1689 (48210) : field5: 0,
1690 (47662) : field6: 2,
1691 EUB : };
1692 :
1693 ECB (21637090) : // Reverse mappings for a few Keys.
1694 (21637090) : // These are needed by WAL redo manager.
1695 (21637090) :
1696 CBC 2262451 : pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
1697 GIC 2262451 : Ok(match key.field1 {
1698 GBC 2262451 : 0x00 => (
1699 GIC 2262451 : RelTag {
1700 CBC 2262451 : spcnode: key.field2,
1701 GIC 2262451 : dbnode: key.field3,
1702 GBC 2262451 : relnode: key.field4,
1703 2262451 : forknum: key.field5,
1704 2262451 : },
1705 2262451 : key.field6,
1706 2262451 : ),
1707 UIC 0 : _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
1708 : })
1709 GIC 2262451 : }
1710 :
1711 UIC 0 : fn is_rel_block_key(key: Key) -> bool {
1712 0 : key.field1 == 0x00 && key.field4 != 0
1713 0 : }
1714 :
1715 0 : pub fn is_rel_fsm_block_key(key: Key) -> bool {
1716 0 : key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
1717 0 : }
1718 :
1719 0 : pub fn is_rel_vm_block_key(key: Key) -> bool {
1720 0 : key.field1 == 0x00
1721 0 : && key.field4 != 0
1722 0 : && key.field5 == VISIBILITYMAP_FORKNUM
1723 0 : && key.field6 != 0xffffffff
1724 0 : }
1725 :
1726 GIC 23039581 : pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
1727 23039581 : Ok(match key.field1 {
1728 : 0x01 => {
1729 23039581 : let kind = match key.field2 {
1730 22943709 : 0x00 => SlruKind::Clog,
1731 48210 : 0x01 => SlruKind::MultiXactMembers,
1732 47662 : 0x02 => SlruKind::MultiXactOffsets,
1733 UIC 0 : _ => anyhow::bail!("unrecognized slru kind 0x{:02x}", key.field2),
1734 : };
1735 GIC 23039581 : let segno = key.field4;
1736 23039581 : let blknum = key.field6;
1737 23039581 :
1738 23039581 : (kind, segno, blknum)
1739 : }
1740 UIC 0 : _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
1741 : })
1742 GIC 23039581 : }
1743 :
1744 UIC 0 : fn is_slru_block_key(key: Key) -> bool {
1745 0 : key.field1 == 0x01 // SLRU-related
1746 0 : && key.field3 == 0x00000001 // but not SlruDir
1747 0 : && key.field6 != 0xffffffff // and not SlruSegSize
1748 0 : }
1749 :
1750 : #[allow(clippy::bool_assert_comparison)]
1751 : #[cfg(test)]
1752 : mod tests {
1753 : //use super::repo_harness::*;
1754 : //use super::*;
1755 :
1756 : /*
1757 : fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
1758 : let incremental = timeline.get_current_logical_size();
1759 : let non_incremental = timeline
1760 : .get_current_logical_size_non_incremental(lsn)
1761 : .unwrap();
1762 : assert_eq!(incremental, non_incremental);
1763 : }
1764 : */
1765 :
1766 : /*
1767 : ///
1768 : /// Test list_rels() function, with branches and dropped relations
1769 : ///
1770 : #[test]
1771 : fn test_list_rels_drop() -> Result<()> {
1772 : let repo = RepoHarness::create("test_list_rels_drop")?.load();
1773 : let tline = create_empty_timeline(repo, TIMELINE_ID)?;
1774 : const TESTDB: u32 = 111;
1775 :
1776 : // Import initial dummy checkpoint record, otherwise the get_timeline() call
1777 : // after branching fails below
1778 : let mut writer = tline.begin_record(Lsn(0x10));
1779 : writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1780 : writer.finish()?;
1781 :
1782 : // Create a relation on the timeline
1783 : let mut writer = tline.begin_record(Lsn(0x20));
1784 : writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
1785 : writer.finish()?;
1786 :
1787 : let writer = tline.begin_record(Lsn(0x00));
1788 : writer.finish()?;
1789 :
1790 : // Check that list_rels() lists it after LSN 2, but no before it
1791 : assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
1792 : assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
1793 : assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
1794 :
1795 : // Create a branch, check that the relation is visible there
1796 : repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
1797 : let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
1798 : Some(timeline) => timeline,
1799 : None => panic!("Should have a local timeline"),
1800 : };
1801 : let newtline = DatadirTimelineImpl::new(newtline);
1802 : assert!(newtline
1803 : .list_rels(0, TESTDB, Lsn(0x30))?
1804 : .contains(&TESTREL_A));
1805 :
1806 : // Drop it on the branch
1807 : let mut new_writer = newtline.begin_record(Lsn(0x40));
1808 : new_writer.drop_relation(TESTREL_A)?;
1809 : new_writer.finish()?;
1810 :
1811 : // Check that it's no longer listed on the branch after the point where it was dropped
1812 : assert!(newtline
1813 : .list_rels(0, TESTDB, Lsn(0x30))?
1814 : .contains(&TESTREL_A));
1815 : assert!(!newtline
1816 : .list_rels(0, TESTDB, Lsn(0x40))?
1817 : .contains(&TESTREL_A));
1818 :
1819 : // Run checkpoint and garbage collection and check that it's still not visible
1820 : newtline.checkpoint(CheckpointConfig::Forced)?;
1821 : repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
1822 :
1823 : assert!(!newtline
1824 : .list_rels(0, TESTDB, Lsn(0x40))?
1825 : .contains(&TESTREL_A));
1826 :
1827 : Ok(())
1828 : }
1829 : */
1830 :
1831 : /*
1832 : #[test]
1833 : fn test_read_beyond_eof() -> Result<()> {
1834 : let repo = RepoHarness::create("test_read_beyond_eof")?.load();
1835 : let tline = create_test_timeline(repo, TIMELINE_ID)?;
1836 :
1837 : make_some_layers(&tline, Lsn(0x20))?;
1838 : let mut writer = tline.begin_record(Lsn(0x60));
1839 : walingest.put_rel_page_image(
1840 : &mut writer,
1841 : TESTREL_A,
1842 : 0,
1843 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
1844 : )?;
1845 : writer.finish()?;
1846 :
1847 : // Test read before rel creation. Should error out.
1848 : assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
1849 :
1850 : // Read block beyond end of relation at different points in time.
1851 : // These reads should fall into different delta, image, and in-memory layers.
1852 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
1853 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
1854 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
1855 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
1856 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
1857 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
1858 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
1859 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
1860 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
1861 :
1862 : // Test on an in-memory layer with no preceding layer
1863 : let mut writer = tline.begin_record(Lsn(0x70));
1864 : walingest.put_rel_page_image(
1865 : &mut writer,
1866 : TESTREL_B,
1867 : 0,
1868 : TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
1869 : )?;
1870 : writer.finish()?;
1871 :
1872 : assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
1873 :
1874 : Ok(())
1875 : }
1876 : */
1877 : }
|