Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> [`wal_decoder`] -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers.
9 : //! Records get decoded and interpreted in the [`wal_decoder`] module
10 : //! and then stored to the Repository by WalIngest.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
14 : //! extracts page images out of some WAL records, but mostly it's WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_rel_wal_record or put_rel_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::backtrace::Backtrace;
25 : use std::collections::HashMap;
26 : use std::sync::atomic::AtomicBool;
27 : use std::sync::{Arc, OnceLock};
28 : use std::time::{Duration, Instant, SystemTime};
29 :
30 : use bytes::{Buf, Bytes};
31 : use pageserver_api::key::{Key, rel_block_to_key};
32 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
33 : use pageserver_api::shard::ShardIdentity;
34 : use postgres_ffi::walrecord::*;
35 : use postgres_ffi::{
36 : PgMajorVersion, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
37 : fsm_logical_to_physical, pg_constants,
38 : };
39 : use postgres_ffi_types::TimestampTz;
40 : use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
41 : use tracing::*;
42 : use utils::bin_ser::{DeserializeError, SerializeError};
43 : use utils::lsn::Lsn;
44 : use utils::rate_limit::RateLimit;
45 : use utils::{critical_timeline, failpoint_support};
46 : use wal_decoder::models::record::NeonWalRecord;
47 : use wal_decoder::models::*;
48 :
49 : use crate::ZERO_PAGE;
50 : use crate::context::RequestContext;
51 : use crate::metrics::WAL_INGEST;
52 : use crate::pgdatadir_mapping::{DatadirModification, Version};
53 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
54 : use crate::tenant::{PageReconstructError, Timeline};
55 :
56 : enum_pgversion! {CheckPoint, pgv::CheckPoint}
57 :
58 : impl CheckPoint {
59 3 : fn encode(&self) -> Result<Bytes, SerializeError> {
60 3 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
61 3 : }
62 :
63 72917 : fn update_next_xid(&mut self, xid: u32) -> bool {
64 72917 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
65 72917 : }
66 :
67 0 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
68 0 : enum_pgversion_dispatch!(self, CheckPoint, cp, {
69 0 : cp.update_next_multixid(multi_xid, multi_offset)
70 : })
71 0 : }
72 : }
73 :
74 : /// Temporary limitation of WAL lag warnings after attach
75 : ///
76 : /// After tenant attach, we want to limit WAL lag warnings because
77 : /// we don't look at the WAL until the attach is complete, which
78 : /// might take a while.
79 : pub struct WalLagCooldown {
80 : /// Until when should this limitation apply at all
81 : active_until: std::time::Instant,
82 : /// The maximum lag to suppress. Lags above this limit get reported anyways.
83 : max_lag: Duration,
84 : }
85 :
86 : impl WalLagCooldown {
87 0 : pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
88 0 : Self {
89 0 : active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
90 0 : max_lag: attach_duration * 2 + Duration::from_secs(60),
91 0 : }
92 0 : }
93 : }
94 :
95 : pub struct WalIngest {
96 : attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
97 : shard: ShardIdentity,
98 : checkpoint: CheckPoint,
99 : checkpoint_modified: bool,
100 : warn_ingest_lag: WarnIngestLag,
101 : }
102 :
103 : struct WarnIngestLag {
104 : lag_msg_ratelimit: RateLimit,
105 : future_lsn_msg_ratelimit: RateLimit,
106 : timestamp_invalid_msg_ratelimit: RateLimit,
107 : }
108 :
109 : pub struct WalIngestError {
110 : pub backtrace: std::backtrace::Backtrace,
111 : pub kind: WalIngestErrorKind,
112 : }
113 :
114 : #[derive(thiserror::Error, Debug)]
115 : pub enum WalIngestErrorKind {
116 : #[error(transparent)]
117 : #[allow(private_interfaces)]
118 : PageReconstructError(#[from] PageReconstructError),
119 : #[error(transparent)]
120 : DeserializationFailure(#[from] DeserializeError),
121 : #[error(transparent)]
122 : SerializationFailure(#[from] SerializeError),
123 : #[error("the request contains data not supported by pageserver: {0} @ {1}")]
124 : InvalidKey(Key, Lsn),
125 : #[error("twophase file for xid {0} already exists")]
126 : FileAlreadyExists(u64),
127 : #[error("slru segment {0:?}/{1} already exists")]
128 : SlruAlreadyExists(SlruKind, u32),
129 : #[error("relation already exists")]
130 : RelationAlreadyExists(RelTag),
131 : #[error("invalid reldir key {0}")]
132 : InvalidRelDirKey(Key),
133 :
134 : #[error(transparent)]
135 : LogicalError(anyhow::Error),
136 : #[error(transparent)]
137 : EncodeAuxFileError(anyhow::Error),
138 : #[error(transparent)]
139 : MaybeRelSizeV2Error(anyhow::Error),
140 :
141 : #[error("timeline shutting down")]
142 : Cancelled,
143 : }
144 :
145 : impl<T> From<T> for WalIngestError
146 : where
147 : WalIngestErrorKind: From<T>,
148 : {
149 0 : fn from(value: T) -> Self {
150 0 : WalIngestError {
151 0 : backtrace: Backtrace::capture(),
152 0 : kind: WalIngestErrorKind::from(value),
153 0 : }
154 0 : }
155 : }
156 :
157 : impl std::error::Error for WalIngestError {
158 0 : fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
159 0 : self.kind.source()
160 0 : }
161 : }
162 :
163 : impl core::fmt::Display for WalIngestError {
164 0 : fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
165 0 : self.kind.fmt(f)
166 0 : }
167 : }
168 :
169 : impl core::fmt::Debug for WalIngestError {
170 0 : fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
171 0 : if f.alternate() {
172 0 : f.debug_map()
173 0 : .key(&"backtrace")
174 0 : .value(&self.backtrace)
175 0 : .key(&"kind")
176 0 : .value(&self.kind)
177 0 : .finish()
178 : } else {
179 0 : writeln!(f, "Error: {:?}", self.kind)?;
180 0 : if self.backtrace.status() == std::backtrace::BacktraceStatus::Captured {
181 0 : writeln!(f, "Stack backtrace: {:?}", self.backtrace)?;
182 0 : }
183 0 : Ok(())
184 : }
185 0 : }
186 : }
187 :
188 : #[macro_export]
189 : macro_rules! ensure_walingest {
190 : ($($t:tt)*) => {
191 354701 : _ = || -> Result<(), anyhow::Error> {
192 354701 : anyhow::ensure!($($t)*);
193 354701 : Ok(())
194 354701 : }().map_err(WalIngestErrorKind::LogicalError)?;
195 : };
196 : }
197 :
198 : impl WalIngest {
199 6 : pub async fn new(
200 6 : timeline: &Timeline,
201 6 : startpoint: Lsn,
202 6 : ctx: &RequestContext,
203 6 : ) -> Result<WalIngest, WalIngestError> {
204 : // Fetch the latest checkpoint into memory, so that we can compare with it
205 : // quickly in `ingest_record` and update it when it changes.
206 6 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
207 6 : let pgversion = timeline.pg_version;
208 :
209 6 : let checkpoint = dispatch_pgversion!(pgversion, {
210 0 : let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
211 4 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
212 0 : <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
213 : });
214 :
215 6 : Ok(WalIngest {
216 6 : shard: *timeline.get_shard_identity(),
217 6 : checkpoint,
218 6 : checkpoint_modified: false,
219 6 : attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
220 6 : warn_ingest_lag: WarnIngestLag {
221 6 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
222 6 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
223 6 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
224 6 : },
225 6 : })
226 6 : }
227 :
228 : /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
229 : /// storage of a given timeline.
230 : ///
231 : /// This function updates `lsn` field of `DatadirModification`
232 : ///
233 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
234 72926 : pub async fn ingest_record(
235 72926 : &mut self,
236 72926 : interpreted: InterpretedWalRecord,
237 72926 : modification: &mut DatadirModification<'_>,
238 72926 : ctx: &RequestContext,
239 72926 : ) -> Result<bool, WalIngestError> {
240 72926 : WAL_INGEST.records_received.inc();
241 72926 : let prev_len = modification.len();
242 :
243 72926 : modification.set_lsn(interpreted.next_record_lsn)?;
244 :
245 72926 : if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
246 : // Records of this type should always be preceded by a commit(), as they
247 : // rely on reading data pages back from the Timeline.
248 0 : assert!(!modification.has_dirty_data());
249 72926 : }
250 :
251 72926 : assert!(!self.checkpoint_modified);
252 72926 : if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
253 72917 : && self.checkpoint.update_next_xid(interpreted.xid)
254 1 : {
255 1 : self.checkpoint_modified = true;
256 72925 : }
257 :
258 72926 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
259 :
260 33 : match interpreted.metadata_record {
261 6 : Some(MetadataRecord::Heapam(rec)) => match rec {
262 6 : HeapamRecord::ClearVmBits(clear_vm_bits) => {
263 6 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
264 6 : .await?;
265 : }
266 : },
267 0 : Some(MetadataRecord::Neonrmgr(rec)) => match rec {
268 0 : NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
269 0 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
270 0 : .await?;
271 : }
272 : },
273 8 : Some(MetadataRecord::Smgr(rec)) => match rec {
274 8 : SmgrRecord::Create(create) => {
275 8 : self.ingest_xlog_smgr_create(create, modification, ctx)
276 8 : .await?;
277 : }
278 0 : SmgrRecord::Truncate(truncate) => {
279 0 : self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
280 0 : .await?;
281 : }
282 : },
283 0 : Some(MetadataRecord::Dbase(rec)) => match rec {
284 0 : DbaseRecord::Create(create) => {
285 0 : self.ingest_xlog_dbase_create(create, modification, ctx)
286 0 : .await?;
287 : }
288 0 : DbaseRecord::Drop(drop) => {
289 0 : self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
290 : }
291 : },
292 0 : Some(MetadataRecord::Clog(rec)) => match rec {
293 0 : ClogRecord::ZeroPage(zero_page) => {
294 0 : self.ingest_clog_zero_page(zero_page, modification, ctx)
295 0 : .await?;
296 : }
297 0 : ClogRecord::Truncate(truncate) => {
298 0 : self.ingest_clog_truncate(truncate, modification, ctx)
299 0 : .await?;
300 : }
301 : },
302 4 : Some(MetadataRecord::Xact(rec)) => {
303 4 : self.ingest_xact_record(rec, modification, ctx).await?;
304 : }
305 0 : Some(MetadataRecord::MultiXact(rec)) => match rec {
306 0 : MultiXactRecord::ZeroPage(zero_page) => {
307 0 : self.ingest_multixact_zero_page(zero_page, modification, ctx)
308 0 : .await?;
309 : }
310 0 : MultiXactRecord::Create(create) => {
311 0 : self.ingest_multixact_create(modification, &create)?;
312 : }
313 0 : MultiXactRecord::Truncate(truncate) => {
314 0 : self.ingest_multixact_truncate(modification, &truncate, ctx)
315 0 : .await?;
316 : }
317 : },
318 0 : Some(MetadataRecord::Relmap(rec)) => match rec {
319 0 : RelmapRecord::Update(update) => {
320 0 : self.ingest_relmap_update(update, modification, ctx).await?;
321 : }
322 : },
323 15 : Some(MetadataRecord::Xlog(rec)) => match rec {
324 15 : XlogRecord::Raw(raw) => {
325 15 : self.ingest_raw_xlog_record(raw, modification, ctx).await?;
326 : }
327 : },
328 0 : Some(MetadataRecord::LogicalMessage(rec)) => match rec {
329 0 : LogicalMessageRecord::Put(put) => {
330 0 : self.ingest_logical_message_put(put, modification, ctx)
331 0 : .await?;
332 : }
333 : #[cfg(feature = "testing")]
334 : LogicalMessageRecord::Failpoint => {
335 : // This is a convenient way to make the WAL ingestion pause at
336 : // particular point in the WAL. For more fine-grained control,
337 : // we could peek into the message and only pause if it contains
338 : // a particular string, for example, but this is enough for now.
339 0 : failpoint_support::sleep_millis_async!(
340 : "pageserver-wal-ingest-logical-message-sleep"
341 : );
342 : }
343 : },
344 0 : Some(MetadataRecord::Standby(rec)) => {
345 0 : self.ingest_standby_record(rec).unwrap();
346 0 : }
347 0 : Some(MetadataRecord::Replorigin(rec)) => {
348 0 : self.ingest_replorigin_record(rec, modification).await?;
349 : }
350 72893 : None => {
351 72893 : // There are two cases through which we end up here:
352 72893 : // 1. The resource manager for the original PG WAL record
353 72893 : // is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
354 72893 : // record type within Neon.
355 72893 : // 2. The resource manager id was unknown to
356 72893 : // [`wal_decoder::decoder::MetadataRecord::from_decoded`].
357 72893 : // TODO(vlad): Tighten this up more once we build confidence
358 72893 : // that case (2) does not happen in the field.
359 72893 : }
360 : }
361 :
362 72926 : modification
363 72926 : .ingest_batch(interpreted.batch, &self.shard, ctx)
364 72926 : .await?;
365 :
366 : // If checkpoint data was updated, store the new version in the repository
367 72926 : if self.checkpoint_modified {
368 3 : let new_checkpoint_bytes = self.checkpoint.encode()?;
369 :
370 3 : modification.put_checkpoint(new_checkpoint_bytes)?;
371 3 : self.checkpoint_modified = false;
372 72923 : }
373 :
374 : // Note that at this point this record is only cached in the modification
375 : // until commit() is called to flush the data into the repository and update
376 : // the latest LSN.
377 :
378 72926 : Ok(modification.len() > prev_len)
379 72926 : }
380 :
381 : /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
382 0 : fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64, WalIngestError> {
383 0 : let next_full_xid =
384 0 : enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
385 :
386 0 : let next_xid = (next_full_xid) as u32;
387 0 : let mut epoch = (next_full_xid >> 32) as u32;
388 :
389 0 : if xid > next_xid {
390 : // Wraparound occurred, must be from a prev epoch.
391 0 : if epoch == 0 {
392 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
393 0 : "apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
394 0 : )))?;
395 0 : }
396 0 : epoch -= 1;
397 0 : }
398 :
399 0 : Ok(((epoch as u64) << 32) | xid as u64)
400 0 : }
401 :
402 6 : async fn ingest_clear_vm_bits(
403 6 : &mut self,
404 6 : clear_vm_bits: ClearVmBits,
405 6 : modification: &mut DatadirModification<'_>,
406 6 : ctx: &RequestContext,
407 6 : ) -> Result<(), WalIngestError> {
408 : let ClearVmBits {
409 6 : new_heap_blkno,
410 6 : old_heap_blkno,
411 6 : flags,
412 6 : vm_rel,
413 6 : } = clear_vm_bits;
414 : // Clear the VM bits if required.
415 6 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
416 6 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
417 :
418 : // VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
419 : // its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
420 : // as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
421 : // https://github.com/neondatabase/neon/pull/10634.
422 6 : let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
423 0 : critical_timeline!(
424 0 : modification.tline.tenant_shard_id,
425 0 : modification.tline.timeline_id,
426 : // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
427 0 : None::<&AtomicBool>,
428 0 : "clear_vm_bits for unknown VM relation {vm_rel}"
429 : );
430 0 : return Ok(());
431 : };
432 6 : if let Some(blknum) = new_vm_blk {
433 6 : if blknum >= vm_size {
434 0 : critical_timeline!(
435 0 : modification.tline.tenant_shard_id,
436 0 : modification.tline.timeline_id,
437 : // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
438 0 : None::<&AtomicBool>,
439 0 : "new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
440 : );
441 0 : new_vm_blk = None;
442 6 : }
443 0 : }
444 6 : if let Some(blknum) = old_vm_blk {
445 0 : if blknum >= vm_size {
446 0 : critical_timeline!(
447 0 : modification.tline.tenant_shard_id,
448 0 : modification.tline.timeline_id,
449 : // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
450 0 : None::<&AtomicBool>,
451 0 : "old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
452 : );
453 0 : old_vm_blk = None;
454 0 : }
455 6 : }
456 :
457 6 : if new_vm_blk.is_none() && old_vm_blk.is_none() {
458 0 : return Ok(());
459 6 : } else if new_vm_blk == old_vm_blk {
460 : // An UPDATE record that needs to clear the bits for both old and the new page, both of
461 : // which reside on the same VM page.
462 0 : self.put_rel_wal_record(
463 0 : modification,
464 0 : vm_rel,
465 0 : new_vm_blk.unwrap(),
466 0 : NeonWalRecord::ClearVisibilityMapFlags {
467 0 : new_heap_blkno,
468 0 : old_heap_blkno,
469 0 : flags,
470 0 : },
471 0 : ctx,
472 0 : )
473 0 : .await?;
474 : } else {
475 : // Clear VM bits for one heap page, or for two pages that reside on different VM pages.
476 6 : if let Some(new_vm_blk) = new_vm_blk {
477 6 : self.put_rel_wal_record(
478 6 : modification,
479 6 : vm_rel,
480 6 : new_vm_blk,
481 6 : NeonWalRecord::ClearVisibilityMapFlags {
482 6 : new_heap_blkno,
483 6 : old_heap_blkno: None,
484 6 : flags,
485 6 : },
486 6 : ctx,
487 6 : )
488 6 : .await?;
489 0 : }
490 6 : if let Some(old_vm_blk) = old_vm_blk {
491 0 : self.put_rel_wal_record(
492 0 : modification,
493 0 : vm_rel,
494 0 : old_vm_blk,
495 0 : NeonWalRecord::ClearVisibilityMapFlags {
496 0 : new_heap_blkno: None,
497 0 : old_heap_blkno,
498 0 : flags,
499 0 : },
500 0 : ctx,
501 0 : )
502 0 : .await?;
503 6 : }
504 : }
505 6 : Ok(())
506 6 : }
507 :
508 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
509 0 : async fn ingest_xlog_dbase_create(
510 0 : &mut self,
511 0 : create: DbaseCreate,
512 0 : modification: &mut DatadirModification<'_>,
513 0 : ctx: &RequestContext,
514 0 : ) -> Result<(), WalIngestError> {
515 : let DbaseCreate {
516 0 : db_id,
517 0 : tablespace_id,
518 0 : src_db_id,
519 0 : src_tablespace_id,
520 0 : } = create;
521 :
522 0 : let rels = modification
523 0 : .tline
524 0 : .list_rels(
525 0 : src_tablespace_id,
526 0 : src_db_id,
527 0 : Version::Modified(modification),
528 0 : ctx,
529 0 : )
530 0 : .await?;
531 :
532 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
533 :
534 : // Copy relfilemap
535 0 : let filemap = modification
536 0 : .tline
537 0 : .get_relmap_file(
538 0 : src_tablespace_id,
539 0 : src_db_id,
540 0 : Version::Modified(modification),
541 0 : ctx,
542 0 : )
543 0 : .await?;
544 0 : modification
545 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
546 0 : .await?;
547 :
548 0 : let mut num_rels_copied = 0;
549 0 : let mut num_blocks_copied = 0;
550 0 : for src_rel in rels {
551 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
552 0 : assert_eq!(src_rel.dbnode, src_db_id);
553 :
554 0 : let nblocks = modification
555 0 : .tline
556 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
557 0 : .await?;
558 0 : let dst_rel = RelTag {
559 0 : spcnode: tablespace_id,
560 0 : dbnode: db_id,
561 0 : relnode: src_rel.relnode,
562 0 : forknum: src_rel.forknum,
563 0 : };
564 :
565 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
566 :
567 : // Copy content
568 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
569 0 : for blknum in 0..nblocks {
570 : // Sharding:
571 : // - src and dst are always on the same shard, because they differ only by dbNode, and
572 : // dbNode is not included in the hash inputs for sharding.
573 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
574 : // that belong to it.
575 0 : let src_key = rel_block_to_key(src_rel, blknum);
576 0 : if !self.shard.is_key_local(&src_key) {
577 0 : debug!(
578 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
579 : src_key
580 : );
581 0 : continue;
582 0 : }
583 0 : debug!(
584 0 : "copying block {} from {} ({}) to {}",
585 : blknum, src_rel, src_key, dst_rel
586 : );
587 :
588 0 : let content = modification
589 0 : .tline
590 0 : .get_rel_page_at_lsn(
591 0 : src_rel,
592 0 : blknum,
593 0 : Version::Modified(modification),
594 0 : ctx,
595 0 : crate::tenant::storage_layer::IoConcurrency::sequential(),
596 0 : )
597 0 : .await?;
598 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
599 0 : num_blocks_copied += 1;
600 : }
601 :
602 0 : num_rels_copied += 1;
603 : }
604 :
605 0 : info!(
606 0 : "Created database {}/{}, copied {} blocks in {} rels",
607 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
608 : );
609 0 : Ok(())
610 0 : }
611 :
612 0 : async fn ingest_xlog_dbase_drop(
613 0 : &mut self,
614 0 : dbase_drop: DbaseDrop,
615 0 : modification: &mut DatadirModification<'_>,
616 0 : ctx: &RequestContext,
617 0 : ) -> Result<(), WalIngestError> {
618 : let DbaseDrop {
619 0 : db_id,
620 0 : tablespace_ids,
621 0 : } = dbase_drop;
622 0 : for tablespace_id in tablespace_ids {
623 0 : trace!("Drop db {}, {}", tablespace_id, db_id);
624 0 : modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
625 : }
626 :
627 0 : Ok(())
628 0 : }
629 :
630 8 : async fn ingest_xlog_smgr_create(
631 8 : &mut self,
632 8 : create: SmgrCreate,
633 8 : modification: &mut DatadirModification<'_>,
634 8 : ctx: &RequestContext,
635 8 : ) -> Result<(), WalIngestError> {
636 8 : let SmgrCreate { rel } = create;
637 8 : self.put_rel_creation(modification, rel, ctx).await?;
638 8 : Ok(())
639 8 : }
640 :
641 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
642 : ///
643 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
644 0 : async fn ingest_xlog_smgr_truncate(
645 0 : &mut self,
646 0 : truncate: XlSmgrTruncate,
647 0 : modification: &mut DatadirModification<'_>,
648 0 : ctx: &RequestContext,
649 0 : ) -> Result<(), WalIngestError> {
650 : let XlSmgrTruncate {
651 0 : blkno,
652 0 : rnode,
653 0 : flags,
654 0 : } = truncate;
655 :
656 0 : let spcnode = rnode.spcnode;
657 0 : let dbnode = rnode.dbnode;
658 0 : let relnode = rnode.relnode;
659 :
660 0 : if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
661 0 : let rel = RelTag {
662 0 : spcnode,
663 0 : dbnode,
664 0 : relnode,
665 0 : forknum: MAIN_FORKNUM,
666 0 : };
667 :
668 0 : self.put_rel_truncation(modification, rel, blkno, ctx)
669 0 : .await?;
670 0 : }
671 0 : if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
672 0 : let rel = RelTag {
673 0 : spcnode,
674 0 : dbnode,
675 0 : relnode,
676 0 : forknum: FSM_FORKNUM,
677 0 : };
678 :
679 : // Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
680 : // and instead of digging in the FSM bitmap format we just clear the whole page.
681 0 : let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
682 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
683 0 : if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
684 0 : && self
685 0 : .shard
686 0 : .is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
687 : {
688 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
689 0 : fsm_physical_page_no += 1;
690 0 : }
691 : // Truncate this shard's view of the FSM relation size, if it even has one.
692 0 : let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
693 0 : if nblocks > fsm_physical_page_no {
694 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
695 0 : .await?;
696 0 : }
697 0 : }
698 0 : if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
699 0 : let rel = RelTag {
700 0 : spcnode,
701 0 : dbnode,
702 0 : relnode,
703 0 : forknum: VISIBILITYMAP_FORKNUM,
704 0 : };
705 :
706 : // last remaining block, byte, and bit
707 0 : let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
708 0 : let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
709 0 : / pg_constants::VM_HEAPBLOCKS_PER_BYTE;
710 0 : let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
711 0 : * pg_constants::VM_BITS_PER_HEAPBLOCK;
712 :
713 : // Unless the new size is exactly at a visibility map page boundary, the
714 : // tail bits in the last remaining map page, representing truncated heap
715 : // blocks, need to be cleared. This is not only tidy, but also necessary
716 : // because we don't get a chance to clear the bits if the heap is extended
717 : // again. Only do this on the shard that owns the page.
718 0 : if (trunc_byte != 0 || trunc_offs != 0)
719 0 : && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
720 : {
721 0 : modification.put_rel_wal_record(
722 0 : rel,
723 0 : vm_page_no,
724 0 : NeonWalRecord::TruncateVisibilityMap {
725 0 : trunc_byte,
726 0 : trunc_offs,
727 0 : },
728 0 : )?;
729 0 : vm_page_no += 1;
730 0 : }
731 : // Truncate this shard's view of the VM relation size, if it even has one.
732 0 : let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
733 0 : if nblocks > vm_page_no {
734 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
735 0 : .await?;
736 0 : }
737 0 : }
738 0 : Ok(())
739 0 : }
740 :
741 4 : fn warn_on_ingest_lag(
742 4 : &mut self,
743 4 : conf: &crate::config::PageServerConf,
744 4 : wal_timestamp: TimestampTz,
745 4 : ) {
746 4 : debug_assert_current_span_has_tenant_and_timeline_id();
747 4 : let now = SystemTime::now();
748 4 : let rate_limits = &mut self.warn_ingest_lag;
749 :
750 4 : let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
751 0 : pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
752 : });
753 :
754 4 : match ts {
755 4 : Ok(ts) => {
756 4 : match now.duration_since(ts) {
757 4 : Ok(lag) => {
758 4 : if lag > conf.wait_lsn_timeout {
759 4 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
760 1 : if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
761 0 : if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
762 0 : return;
763 0 : }
764 1 : } else {
765 1 : // Still loading? We shouldn't be here
766 1 : }
767 1 : let lag = humantime::format_duration(lag);
768 1 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
769 1 : })
770 0 : }
771 : }
772 0 : Err(e) => {
773 0 : let delta_t = e.duration();
774 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
775 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
776 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
777 0 : if delta_t > IGNORED_DRIFT {
778 0 : let delta_t = humantime::format_duration(delta_t);
779 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
780 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
781 0 : })
782 0 : }
783 : }
784 : };
785 : }
786 0 : Err(error) => {
787 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
788 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
789 0 : })
790 : }
791 : }
792 4 : }
793 :
794 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
795 : ///
796 4 : async fn ingest_xact_record(
797 4 : &mut self,
798 4 : record: XactRecord,
799 4 : modification: &mut DatadirModification<'_>,
800 4 : ctx: &RequestContext,
801 4 : ) -> Result<(), WalIngestError> {
802 4 : let (xact_common, is_commit, is_prepared) = match record {
803 0 : XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
804 0 : let xid: u64 = if modification.tline.pg_version >= PgMajorVersion::PG17 {
805 0 : self.adjust_to_full_transaction_id(xl_xid)?
806 : } else {
807 0 : xl_xid as u64
808 : };
809 0 : return modification.put_twophase_file(xid, data, ctx).await;
810 : }
811 4 : XactRecord::Commit(common) => (common, true, false),
812 0 : XactRecord::Abort(common) => (common, false, false),
813 0 : XactRecord::CommitPrepared(common) => (common, true, true),
814 0 : XactRecord::AbortPrepared(common) => (common, false, true),
815 : };
816 :
817 : let XactCommon {
818 4 : parsed,
819 4 : origin_id,
820 4 : xl_xid,
821 4 : lsn,
822 4 : } = xact_common;
823 :
824 : // Record update of CLOG pages
825 4 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
826 4 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
827 4 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
828 4 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
829 :
830 4 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
831 :
832 4 : for subxact in &parsed.subxacts {
833 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
834 0 : if subxact_pageno != pageno {
835 : // This subxact goes to different page. Write the record
836 : // for all the XIDs on the previous page, and continue
837 : // accumulating XIDs on this new page.
838 0 : modification.put_slru_wal_record(
839 0 : SlruKind::Clog,
840 0 : segno,
841 0 : rpageno,
842 0 : if is_commit {
843 0 : NeonWalRecord::ClogSetCommitted {
844 0 : xids: page_xids,
845 0 : timestamp: parsed.xact_time,
846 0 : }
847 : } else {
848 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
849 : },
850 0 : )?;
851 0 : page_xids = Vec::new();
852 0 : }
853 0 : pageno = subxact_pageno;
854 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
855 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
856 0 : page_xids.push(*subxact);
857 : }
858 4 : modification.put_slru_wal_record(
859 4 : SlruKind::Clog,
860 4 : segno,
861 4 : rpageno,
862 4 : if is_commit {
863 4 : NeonWalRecord::ClogSetCommitted {
864 4 : xids: page_xids,
865 4 : timestamp: parsed.xact_time,
866 4 : }
867 : } else {
868 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
869 : },
870 0 : )?;
871 :
872 : // Group relations to drop by dbNode. This map will contain all relations that _might_
873 : // exist, we will reduce it to which ones really exist later. This map can be huge if
874 : // the transaction touches a huge number of relations (there is no bound on this in
875 : // postgres).
876 4 : let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
877 :
878 4 : for xnode in &parsed.xnodes {
879 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
880 0 : let rel = RelTag {
881 0 : forknum,
882 0 : spcnode: xnode.spcnode,
883 0 : dbnode: xnode.dbnode,
884 0 : relnode: xnode.relnode,
885 0 : };
886 0 : drop_relations
887 0 : .entry((xnode.spcnode, xnode.dbnode))
888 0 : .or_default()
889 0 : .push(rel);
890 0 : }
891 : }
892 :
893 : // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
894 4 : modification.put_rel_drops(drop_relations, ctx).await?;
895 :
896 4 : if origin_id != 0 {
897 0 : modification
898 0 : .set_replorigin(origin_id, parsed.origin_lsn)
899 0 : .await?;
900 4 : }
901 :
902 4 : if is_prepared {
903 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
904 0 : trace!(
905 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
906 : xl_xid, parsed.xid, lsn,
907 : );
908 :
909 0 : let xid: u64 = if modification.tline.pg_version >= PgMajorVersion::PG17 {
910 0 : self.adjust_to_full_transaction_id(parsed.xid)?
911 : } else {
912 0 : parsed.xid as u64
913 : };
914 0 : modification.drop_twophase_file(xid, ctx).await?;
915 4 : }
916 :
917 4 : Ok(())
918 4 : }
919 :
920 0 : async fn ingest_clog_truncate(
921 0 : &mut self,
922 0 : truncate: ClogTruncate,
923 0 : modification: &mut DatadirModification<'_>,
924 0 : ctx: &RequestContext,
925 0 : ) -> Result<(), WalIngestError> {
926 : let ClogTruncate {
927 0 : pageno,
928 0 : oldest_xid,
929 0 : oldest_xid_db,
930 0 : } = truncate;
931 :
932 0 : info!(
933 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
934 : pageno, oldest_xid, oldest_xid_db
935 : );
936 :
937 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
938 : // truncated, but a checkpoint record with the updated values isn't written until
939 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
940 : // so we keep the oldestXid and oldestXidDB up-to-date.
941 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
942 0 : cp.oldestXid = oldest_xid;
943 0 : cp.oldestXidDB = oldest_xid_db;
944 0 : });
945 0 : self.checkpoint_modified = true;
946 :
947 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
948 :
949 0 : let latest_page_number =
950 0 : enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
951 : / pg_constants::CLOG_XACTS_PER_PAGE;
952 :
953 : // Now delete all segments containing pages between xlrec.pageno
954 : // and latest_page_number.
955 :
956 : // First, make an important safety check:
957 : // the current endpoint page must not be eligible for removal.
958 : // See SimpleLruTruncate() in slru.c
959 0 : if dispatch_pgversion!(modification.tline.pg_version, {
960 0 : pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
961 : }) {
962 0 : info!("could not truncate directory pg_xact apparent wraparound");
963 0 : return Ok(());
964 0 : }
965 :
966 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
967 : //
968 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
969 : // will block waiting for the last valid LSN to advance up to
970 : // it. So we use the previous record's LSN in the get calls
971 : // instead.
972 0 : if modification.tline.get_shard_identity().is_shard_zero() {
973 0 : for segno in modification
974 0 : .tline
975 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
976 0 : .await?
977 : {
978 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
979 :
980 0 : let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
981 0 : pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
982 : });
983 :
984 0 : if may_delete {
985 0 : modification
986 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
987 0 : .await?;
988 0 : trace!("Drop CLOG segment {:>04X}", segno);
989 0 : }
990 : }
991 0 : }
992 :
993 0 : Ok(())
994 0 : }
995 :
996 0 : async fn ingest_clog_zero_page(
997 0 : &mut self,
998 0 : zero_page: ClogZeroPage,
999 0 : modification: &mut DatadirModification<'_>,
1000 0 : ctx: &RequestContext,
1001 0 : ) -> Result<(), WalIngestError> {
1002 0 : let ClogZeroPage { segno, rpageno } = zero_page;
1003 :
1004 0 : self.put_slru_page_image(
1005 0 : modification,
1006 0 : SlruKind::Clog,
1007 0 : segno,
1008 0 : rpageno,
1009 0 : ZERO_PAGE.clone(),
1010 0 : ctx,
1011 0 : )
1012 0 : .await
1013 0 : }
1014 :
1015 0 : fn ingest_multixact_create(
1016 0 : &mut self,
1017 0 : modification: &mut DatadirModification,
1018 0 : xlrec: &XlMultiXactCreate,
1019 0 : ) -> Result<(), WalIngestError> {
1020 : // Create WAL record for updating the multixact-offsets page
1021 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1022 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1023 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1024 :
1025 0 : modification.put_slru_wal_record(
1026 0 : SlruKind::MultiXactOffsets,
1027 0 : segno,
1028 0 : rpageno,
1029 0 : NeonWalRecord::MultixactOffsetCreate {
1030 0 : mid: xlrec.mid,
1031 0 : moff: xlrec.moff,
1032 0 : },
1033 0 : )?;
1034 :
1035 : // Create WAL records for the update of each affected multixact-members page
1036 0 : let mut members = xlrec.members.iter();
1037 0 : let mut offset = xlrec.moff;
1038 : loop {
1039 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1040 :
1041 : // How many members fit on this page?
1042 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1043 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1044 :
1045 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1046 0 : for _ in 0..page_remain {
1047 0 : if let Some(m) = members.next() {
1048 0 : this_page_members.push(m.clone());
1049 0 : } else {
1050 0 : break;
1051 : }
1052 : }
1053 0 : if this_page_members.is_empty() {
1054 : // all done
1055 0 : break;
1056 0 : }
1057 0 : let n_this_page = this_page_members.len();
1058 :
1059 0 : modification.put_slru_wal_record(
1060 0 : SlruKind::MultiXactMembers,
1061 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1062 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1063 0 : NeonWalRecord::MultixactMembersCreate {
1064 0 : moff: offset,
1065 0 : members: this_page_members,
1066 0 : },
1067 0 : )?;
1068 :
1069 : // Note: The multixact members can wrap around, even within one WAL record.
1070 0 : offset = offset.wrapping_add(n_this_page as u32);
1071 : }
1072 0 : let next_offset = offset;
1073 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
1074 :
1075 : // Update next-multi-xid and next-offset
1076 : //
1077 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
1078 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
1079 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
1080 : // incremented! nextXid skips over < FirstNormalTransactionId when the value
1081 : // is stored, so it's never 0 in a checkpoint.
1082 : //
1083 : // I don't know why it's done that way, it seems less error-prone to skip over 0
1084 : // when the value is stored rather than when it's read. But let's do it the same
1085 : // way here.
1086 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
1087 :
1088 0 : if self
1089 0 : .checkpoint
1090 0 : .update_next_multixid(next_multi_xid, next_offset)
1091 0 : {
1092 0 : self.checkpoint_modified = true;
1093 0 : }
1094 :
1095 : // Also update the next-xid with the highest member. According to the comments in
1096 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
1097 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1098 0 : if let Some(max_xid) = acc {
1099 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1100 0 : Some(mbr.xid)
1101 : } else {
1102 0 : acc
1103 : }
1104 : } else {
1105 0 : Some(mbr.xid)
1106 : }
1107 0 : });
1108 :
1109 0 : if let Some(max_xid) = max_mbr_xid {
1110 0 : if self.checkpoint.update_next_xid(max_xid) {
1111 0 : self.checkpoint_modified = true;
1112 0 : }
1113 0 : }
1114 0 : Ok(())
1115 0 : }
1116 :
1117 0 : async fn ingest_multixact_truncate(
1118 0 : &mut self,
1119 0 : modification: &mut DatadirModification<'_>,
1120 0 : xlrec: &XlMultiXactTruncate,
1121 0 : ctx: &RequestContext,
1122 0 : ) -> Result<(), WalIngestError> {
1123 0 : let (maxsegment, startsegment, endsegment) =
1124 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1125 0 : cp.oldestMulti = xlrec.end_trunc_off;
1126 0 : cp.oldestMultiDB = xlrec.oldest_multi_db;
1127 0 : let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
1128 : pg_constants::MAX_MULTIXACT_OFFSET,
1129 : );
1130 0 : let startsegment: i32 =
1131 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
1132 0 : let endsegment: i32 =
1133 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
1134 0 : (maxsegment, startsegment, endsegment)
1135 : });
1136 :
1137 0 : self.checkpoint_modified = true;
1138 :
1139 : // PerformMembersTruncation
1140 0 : let mut segment: i32 = startsegment;
1141 :
1142 : // Delete all the segments except the last one. The last segment can still
1143 : // contain, possibly partially, valid data.
1144 0 : if modification.tline.get_shard_identity().is_shard_zero() {
1145 0 : while segment != endsegment {
1146 0 : modification
1147 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1148 0 : .await?;
1149 :
1150 : /* move to next segment, handling wraparound correctly */
1151 0 : if segment == maxsegment {
1152 0 : segment = 0;
1153 0 : } else {
1154 0 : segment += 1;
1155 0 : }
1156 : }
1157 0 : }
1158 :
1159 : // Truncate offsets
1160 : // FIXME: this did not handle wraparound correctly
1161 :
1162 0 : Ok(())
1163 0 : }
1164 :
1165 0 : async fn ingest_multixact_zero_page(
1166 0 : &mut self,
1167 0 : zero_page: MultiXactZeroPage,
1168 0 : modification: &mut DatadirModification<'_>,
1169 0 : ctx: &RequestContext,
1170 0 : ) -> Result<(), WalIngestError> {
1171 : let MultiXactZeroPage {
1172 0 : slru_kind,
1173 0 : segno,
1174 0 : rpageno,
1175 0 : } = zero_page;
1176 0 : self.put_slru_page_image(
1177 0 : modification,
1178 0 : slru_kind,
1179 0 : segno,
1180 0 : rpageno,
1181 0 : ZERO_PAGE.clone(),
1182 0 : ctx,
1183 0 : )
1184 0 : .await
1185 0 : }
1186 :
1187 0 : async fn ingest_relmap_update(
1188 0 : &mut self,
1189 0 : update: RelmapUpdate,
1190 0 : modification: &mut DatadirModification<'_>,
1191 0 : ctx: &RequestContext,
1192 0 : ) -> Result<(), WalIngestError> {
1193 0 : let RelmapUpdate { update, buf } = update;
1194 :
1195 0 : modification
1196 0 : .put_relmap_file(update.tsid, update.dbid, buf, ctx)
1197 0 : .await
1198 0 : }
1199 :
1200 15 : async fn ingest_raw_xlog_record(
1201 15 : &mut self,
1202 15 : raw_record: RawXlogRecord,
1203 15 : modification: &mut DatadirModification<'_>,
1204 15 : ctx: &RequestContext,
1205 15 : ) -> Result<(), WalIngestError> {
1206 15 : let RawXlogRecord { info, lsn, mut buf } = raw_record;
1207 15 : let pg_version = modification.tline.pg_version;
1208 :
1209 15 : if info == pg_constants::XLOG_PARAMETER_CHANGE {
1210 1 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1211 0 : let rec = v17::XlParameterChange::decode(&mut buf);
1212 0 : cp.wal_level = rec.wal_level;
1213 0 : self.checkpoint_modified = true;
1214 1 : }
1215 14 : } else if info == pg_constants::XLOG_END_OF_RECOVERY {
1216 0 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1217 0 : let rec = v17::XlEndOfRecovery::decode(&mut buf);
1218 0 : cp.wal_level = rec.wal_level;
1219 0 : self.checkpoint_modified = true;
1220 0 : }
1221 14 : }
1222 :
1223 15 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1224 0 : if info == pg_constants::XLOG_NEXTOID {
1225 0 : let next_oid = buf.get_u32_le();
1226 0 : if cp.nextOid != next_oid {
1227 0 : cp.nextOid = next_oid;
1228 0 : self.checkpoint_modified = true;
1229 0 : }
1230 0 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
1231 0 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1232 : {
1233 0 : let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
1234 0 : buf.copy_to_slice(&mut checkpoint_bytes);
1235 0 : let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
1236 0 : trace!(
1237 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
1238 : xlog_checkpoint.oldestXid, cp.oldestXid
1239 : );
1240 0 : if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
1241 0 : cp.oldestXid = xlog_checkpoint.oldestXid;
1242 0 : }
1243 0 : trace!(
1244 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
1245 : xlog_checkpoint.oldestActiveXid, cp.oldestActiveXid
1246 : );
1247 :
1248 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
1249 : // because at shutdown, all in-progress transactions will implicitly
1250 : // end. Postgres startup code knows that, and allows hot standby to start
1251 : // immediately from a shutdown checkpoint.
1252 : //
1253 : // In Neon, Postgres hot standby startup always behaves as if starting from
1254 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
1255 : // instead of overwriting self.checkpoint.oldestActiveXid with
1256 : // InvalidTransactionid from the checkpoint WAL record, update it to a
1257 : // proper value, knowing that there are no in-progress transactions at this
1258 : // point, except for prepared transactions.
1259 : //
1260 : // See also the neon code changes in the InitWalRecovery() function.
1261 0 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
1262 0 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1263 : {
1264 0 : let oldest_active_xid = if pg_version >= PgMajorVersion::PG17 {
1265 0 : let mut oldest_active_full_xid = cp.nextXid.value;
1266 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1267 0 : if xid < oldest_active_full_xid {
1268 0 : oldest_active_full_xid = xid;
1269 0 : }
1270 : }
1271 0 : oldest_active_full_xid as u32
1272 : } else {
1273 0 : let mut oldest_active_xid = cp.nextXid.value as u32;
1274 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1275 0 : let narrow_xid = xid as u32;
1276 0 : if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
1277 0 : oldest_active_xid = narrow_xid;
1278 0 : }
1279 : }
1280 0 : oldest_active_xid
1281 : };
1282 0 : cp.oldestActiveXid = oldest_active_xid;
1283 0 : } else {
1284 0 : cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
1285 0 : }
1286 : // NB: We abuse the Checkpoint.redo field:
1287 : //
1288 : // - In PostgreSQL, the Checkpoint struct doesn't store the information
1289 : // of whether this is an online checkpoint or a shutdown checkpoint. It's
1290 : // stored in the XLOG info field of the WAL record, shutdown checkpoints
1291 : // use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
1292 : // XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
1293 : // in the pageserver, however.
1294 : //
1295 : // - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
1296 : // checkpoint record, if it's a shutdown checkpoint. But when we are
1297 : // starting from a shutdown checkpoint, the basebackup LSN is the *end*
1298 : // of the shutdown checkpoint WAL record. That makes it difficult to
1299 : // correctly detect whether we're starting from a shutdown record or
1300 : // not.
1301 : //
1302 : // To address both of those issues, we store 0 in the redo field if it's
1303 : // an online checkpoint record, and the record's *end* LSN if it's a
1304 : // shutdown checkpoint. We don't need the original redo pointer in neon,
1305 : // because we don't perform WAL replay at startup anyway, so we can get
1306 : // away with abusing the redo field like this.
1307 : //
1308 : // XXX: Ideally, we would persist the extra information in a more
1309 : // explicit format, rather than repurpose the fields of the Postgres
1310 : // struct like this. However, we already have persisted data like this,
1311 : // so we need to maintain backwards compatibility.
1312 : //
1313 : // NB: We didn't originally have this convention, so there are still old
1314 : // persisted records that didn't do this. Before, we didn't update the
1315 : // persisted redo field at all. That means that old records have a bogus
1316 : // redo pointer that points to some old value, from the checkpoint record
1317 : // that was originally imported from the data directory. If it was a
1318 : // project created in Neon, that means it points to the first checkpoint
1319 : // after initdb. That's OK for our purposes: all such old checkpoints are
1320 : // treated as old online checkpoints when the basebackup is created.
1321 0 : cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
1322 : // Store the *end* LSN of the checkpoint record. Or to be precise,
1323 : // the start LSN of the *next* record, i.e. if the record ends
1324 : // exactly at page boundary, the redo LSN points to just after the
1325 : // page header on the next page.
1326 0 : lsn.into()
1327 : } else {
1328 0 : Lsn::INVALID.into()
1329 : };
1330 :
1331 : // Write a new checkpoint key-value pair on every checkpoint record, even
1332 : // if nothing really changed. Not strictly required, but it seems nice to
1333 : // have some trace of the checkpoint records in the layer files at the same
1334 : // LSNs.
1335 0 : self.checkpoint_modified = true;
1336 0 : }
1337 : });
1338 :
1339 15 : if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
1340 1 : modification.tline.prepare_basebackup(lsn);
1341 14 : }
1342 :
1343 15 : Ok(())
1344 15 : }
1345 :
1346 0 : async fn ingest_logical_message_put(
1347 0 : &mut self,
1348 0 : put: PutLogicalMessage,
1349 0 : modification: &mut DatadirModification<'_>,
1350 0 : ctx: &RequestContext,
1351 0 : ) -> Result<(), WalIngestError> {
1352 0 : let PutLogicalMessage { path, buf } = put;
1353 0 : modification.put_file(path.as_str(), &buf, ctx).await
1354 0 : }
1355 :
1356 0 : fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<(), WalIngestError> {
1357 0 : match record {
1358 0 : StandbyRecord::RunningXacts(running_xacts) => {
1359 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1360 0 : cp.oldestActiveXid = running_xacts.oldest_running_xid;
1361 0 : });
1362 :
1363 0 : self.checkpoint_modified = true;
1364 : }
1365 : }
1366 :
1367 0 : Ok(())
1368 0 : }
1369 :
1370 0 : async fn ingest_replorigin_record(
1371 0 : &mut self,
1372 0 : record: ReploriginRecord,
1373 0 : modification: &mut DatadirModification<'_>,
1374 0 : ) -> Result<(), WalIngestError> {
1375 0 : match record {
1376 0 : ReploriginRecord::Set(set) => {
1377 0 : modification
1378 0 : .set_replorigin(set.node_id, set.remote_lsn)
1379 0 : .await?;
1380 : }
1381 0 : ReploriginRecord::Drop(drop) => {
1382 0 : modification.drop_replorigin(drop.node_id).await?;
1383 : }
1384 : }
1385 :
1386 0 : Ok(())
1387 0 : }
1388 :
1389 9 : async fn put_rel_creation(
1390 9 : &mut self,
1391 9 : modification: &mut DatadirModification<'_>,
1392 9 : rel: RelTag,
1393 9 : ctx: &RequestContext,
1394 9 : ) -> Result<(), WalIngestError> {
1395 9 : modification.put_rel_creation(rel, 0, ctx).await?;
1396 9 : Ok(())
1397 9 : }
1398 :
1399 : #[cfg(test)]
1400 136201 : async fn put_rel_page_image(
1401 136201 : &mut self,
1402 136201 : modification: &mut DatadirModification<'_>,
1403 136201 : rel: RelTag,
1404 136201 : blknum: BlockNumber,
1405 136201 : img: Bytes,
1406 136201 : ctx: &RequestContext,
1407 136201 : ) -> Result<(), WalIngestError> {
1408 136201 : self.handle_rel_extend(modification, rel, blknum, ctx)
1409 136201 : .await?;
1410 136201 : modification.put_rel_page_image(rel, blknum, img)?;
1411 136201 : Ok(())
1412 136201 : }
1413 :
1414 6 : async fn put_rel_wal_record(
1415 6 : &mut self,
1416 6 : modification: &mut DatadirModification<'_>,
1417 6 : rel: RelTag,
1418 6 : blknum: BlockNumber,
1419 6 : rec: NeonWalRecord,
1420 6 : ctx: &RequestContext,
1421 6 : ) -> Result<(), WalIngestError> {
1422 6 : self.handle_rel_extend(modification, rel, blknum, ctx)
1423 6 : .await?;
1424 6 : modification.put_rel_wal_record(rel, blknum, rec)?;
1425 6 : Ok(())
1426 6 : }
1427 :
1428 3006 : async fn put_rel_truncation(
1429 3006 : &mut self,
1430 3006 : modification: &mut DatadirModification<'_>,
1431 3006 : rel: RelTag,
1432 3006 : nblocks: BlockNumber,
1433 3006 : ctx: &RequestContext,
1434 3006 : ) -> Result<(), WalIngestError> {
1435 3006 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1436 3006 : Ok(())
1437 3006 : }
1438 :
1439 136207 : async fn handle_rel_extend(
1440 136207 : &mut self,
1441 136207 : modification: &mut DatadirModification<'_>,
1442 136207 : rel: RelTag,
1443 136207 : blknum: BlockNumber,
1444 136207 : ctx: &RequestContext,
1445 136207 : ) -> Result<(), WalIngestError> {
1446 136207 : let new_nblocks = blknum + 1;
1447 : // Check if the relation exists. We implicitly create relations on first
1448 : // record.
1449 136207 : let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
1450 :
1451 136207 : if new_nblocks > old_nblocks {
1452 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1453 136199 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1454 :
1455 136199 : let mut key = rel_block_to_key(rel, blknum);
1456 :
1457 : // fill the gap with zeros
1458 136199 : let mut gap_blocks_filled: u64 = 0;
1459 136199 : for gap_blknum in old_nblocks..blknum {
1460 1499 : key.field6 = gap_blknum;
1461 :
1462 1499 : if self.shard.get_shard_number(&key) != self.shard.number {
1463 0 : continue;
1464 1499 : }
1465 :
1466 1499 : modification.put_rel_page_image_zero(rel, gap_blknum)?;
1467 1499 : gap_blocks_filled += 1;
1468 : }
1469 :
1470 136199 : WAL_INGEST
1471 136199 : .gap_blocks_zeroed_on_rel_extend
1472 136199 : .inc_by(gap_blocks_filled);
1473 :
1474 : // Log something when relation extends cause use to fill gaps
1475 : // with zero pages. Logging is rate limited per pg version to
1476 : // avoid skewing.
1477 136199 : if gap_blocks_filled > 0 {
1478 : use std::sync::Mutex;
1479 :
1480 : use once_cell::sync::Lazy;
1481 : use utils::rate_limit::RateLimit;
1482 :
1483 : struct RateLimitPerPgVersion {
1484 : rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
1485 : }
1486 :
1487 : impl RateLimitPerPgVersion {
1488 0 : const fn new() -> Self {
1489 : Self {
1490 : rate_limiters: [const {
1491 1 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
1492 : }; 4],
1493 : }
1494 0 : }
1495 :
1496 2 : const fn rate_limiter(
1497 2 : &self,
1498 2 : pg_version: PgMajorVersion,
1499 2 : ) -> Option<&Lazy<Mutex<RateLimit>>> {
1500 : const MIN_PG_VERSION: u32 = PgMajorVersion::PG14.major_version_num();
1501 : const MAX_PG_VERSION: u32 = PgMajorVersion::PG17.major_version_num();
1502 2 : let pg_version = pg_version.major_version_num();
1503 :
1504 2 : if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
1505 0 : return None;
1506 2 : }
1507 :
1508 2 : Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
1509 2 : }
1510 : }
1511 :
1512 : static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
1513 2 : if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
1514 2 : if let Ok(mut locked) = rate_limiter.try_lock() {
1515 2 : locked.call(|| {
1516 1 : info!(
1517 0 : lsn=%modification.get_lsn(),
1518 : pg_version=%modification.tline.pg_version,
1519 : rel=%rel,
1520 0 : "Filled {} gap blocks on rel extend to {} from {}",
1521 : gap_blocks_filled,
1522 : new_nblocks,
1523 : old_nblocks);
1524 1 : });
1525 0 : }
1526 0 : }
1527 136197 : }
1528 8 : }
1529 136207 : Ok(())
1530 136207 : }
1531 :
1532 0 : async fn put_slru_page_image(
1533 0 : &mut self,
1534 0 : modification: &mut DatadirModification<'_>,
1535 0 : kind: SlruKind,
1536 0 : segno: u32,
1537 0 : blknum: BlockNumber,
1538 0 : img: Bytes,
1539 0 : ctx: &RequestContext,
1540 0 : ) -> Result<(), WalIngestError> {
1541 0 : if !self.shard.is_shard_zero() {
1542 0 : return Ok(());
1543 0 : }
1544 :
1545 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1546 0 : .await?;
1547 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1548 0 : Ok(())
1549 0 : }
1550 :
1551 0 : async fn handle_slru_extend(
1552 0 : &mut self,
1553 0 : modification: &mut DatadirModification<'_>,
1554 0 : kind: SlruKind,
1555 0 : segno: u32,
1556 0 : blknum: BlockNumber,
1557 0 : ctx: &RequestContext,
1558 0 : ) -> Result<(), WalIngestError> {
1559 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1560 : // extended with ZEROPAGE records, not with commit records, so it happens
1561 : // a lot less frequently.
1562 :
1563 0 : let new_nblocks = blknum + 1;
1564 : // Check if the relation exists. We implicitly create relations on first
1565 : // record.
1566 : // TODO: would be nice if to be more explicit about it
1567 0 : let old_nblocks = if !modification
1568 0 : .tline
1569 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1570 0 : .await?
1571 : {
1572 : // create it with 0 size initially, the logic below will extend it
1573 0 : modification
1574 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1575 0 : .await?;
1576 0 : 0
1577 : } else {
1578 0 : modification
1579 0 : .tline
1580 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1581 0 : .await?
1582 : };
1583 :
1584 0 : if new_nblocks > old_nblocks {
1585 0 : trace!(
1586 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1587 : kind, segno, old_nblocks, new_nblocks
1588 : );
1589 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1590 :
1591 : // fill the gap with zeros
1592 0 : for gap_blknum in old_nblocks..blknum {
1593 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
1594 : }
1595 0 : }
1596 0 : Ok(())
1597 0 : }
1598 : }
1599 :
1600 : /// Returns the size of the relation as of this modification, or None if the relation doesn't exist.
1601 : ///
1602 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
1603 : /// page number stored in the shard, or None if the shard does not have any pages for it.
1604 6 : async fn get_relsize(
1605 6 : modification: &DatadirModification<'_>,
1606 6 : rel: RelTag,
1607 6 : ctx: &RequestContext,
1608 6 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
1609 6 : if !modification
1610 6 : .tline
1611 6 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1612 6 : .await?
1613 : {
1614 0 : return Ok(None);
1615 6 : }
1616 6 : modification
1617 6 : .tline
1618 6 : .get_rel_size(rel, Version::Modified(modification), ctx)
1619 6 : .await
1620 6 : .map(Some)
1621 6 : }
1622 :
1623 : #[allow(clippy::bool_assert_comparison)]
1624 : #[cfg(test)]
1625 : mod tests {
1626 : use anyhow::Result;
1627 : use postgres_ffi::PgMajorVersion;
1628 : use postgres_ffi::RELSEG_SIZE;
1629 :
1630 : use super::*;
1631 : use crate::DEFAULT_PG_VERSION;
1632 : use crate::tenant::harness::*;
1633 : use crate::tenant::remote_timeline_client::{INITDB_PATH, remote_initdb_archive_path};
1634 : use crate::tenant::storage_layer::IoConcurrency;
1635 :
1636 : /// Arbitrary relation tag, for testing.
1637 : const TESTREL_A: RelTag = RelTag {
1638 : spcnode: 0,
1639 : dbnode: 111,
1640 : relnode: 1000,
1641 : forknum: 0,
1642 : };
1643 :
1644 6 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1645 : // TODO
1646 6 : }
1647 :
1648 : #[tokio::test]
1649 1 : async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
1650 5 : for i in PgMajorVersion::ALL {
1651 4 : dispatch_pgversion!(i, {
1652 1 : pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
1653 1 : });
1654 1 : }
1655 1 :
1656 1 : Ok(())
1657 1 : }
1658 :
1659 4 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1660 4 : let mut m = tline.begin_modification(Lsn(0x10));
1661 4 : m.put_checkpoint(dispatch_pgversion!(
1662 4 : tline.pg_version,
1663 0 : pgv::ZERO_CHECKPOINT.clone()
1664 0 : ))?;
1665 4 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1666 4 : m.commit(ctx).await?;
1667 4 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1668 :
1669 4 : Ok(walingest)
1670 4 : }
1671 :
1672 : #[tokio::test]
1673 1 : async fn test_relsize() -> Result<()> {
1674 1 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
1675 1 : let io_concurrency = IoConcurrency::spawn_for_test();
1676 1 : let tline = tenant
1677 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1678 1 : .await?;
1679 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1680 :
1681 1 : let mut m = tline.begin_modification(Lsn(0x20));
1682 1 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1683 1 : walingest
1684 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1685 1 : .await?;
1686 1 : m.commit(&ctx).await?;
1687 1 : let mut m = tline.begin_modification(Lsn(0x30));
1688 1 : walingest
1689 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
1690 1 : .await?;
1691 1 : m.commit(&ctx).await?;
1692 1 : let mut m = tline.begin_modification(Lsn(0x40));
1693 1 : walingest
1694 1 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
1695 1 : .await?;
1696 1 : m.commit(&ctx).await?;
1697 1 : let mut m = tline.begin_modification(Lsn(0x50));
1698 1 : walingest
1699 1 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
1700 1 : .await?;
1701 1 : m.commit(&ctx).await?;
1702 :
1703 1 : assert_current_logical_size(&tline, Lsn(0x50));
1704 :
1705 1 : let test_span = tracing::info_span!(parent: None, "test",
1706 0 : tenant_id=%tline.tenant_shard_id.tenant_id,
1707 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
1708 0 : timeline_id=%tline.timeline_id);
1709 :
1710 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1711 1 : assert_eq!(
1712 1 : tline
1713 1 : .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
1714 1 : .await?,
1715 : false
1716 : );
1717 1 : assert!(
1718 1 : tline
1719 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
1720 1 : .await
1721 1 : .is_err()
1722 : );
1723 1 : assert_eq!(
1724 1 : tline
1725 1 : .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
1726 1 : .await?,
1727 : true
1728 : );
1729 1 : assert_eq!(
1730 1 : tline
1731 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
1732 1 : .await?,
1733 : 1
1734 : );
1735 1 : assert_eq!(
1736 1 : tline
1737 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
1738 1 : .await?,
1739 : 3
1740 : );
1741 :
1742 : // Check page contents at each LSN
1743 1 : assert_eq!(
1744 1 : tline
1745 1 : .get_rel_page_at_lsn(
1746 1 : TESTREL_A,
1747 1 : 0,
1748 1 : Version::at(Lsn(0x20)),
1749 1 : &ctx,
1750 1 : io_concurrency.clone()
1751 1 : )
1752 1 : .instrument(test_span.clone())
1753 1 : .await?,
1754 1 : test_img("foo blk 0 at 2")
1755 : );
1756 :
1757 1 : assert_eq!(
1758 1 : tline
1759 1 : .get_rel_page_at_lsn(
1760 1 : TESTREL_A,
1761 1 : 0,
1762 1 : Version::at(Lsn(0x30)),
1763 1 : &ctx,
1764 1 : io_concurrency.clone()
1765 1 : )
1766 1 : .instrument(test_span.clone())
1767 1 : .await?,
1768 1 : test_img("foo blk 0 at 3")
1769 : );
1770 :
1771 1 : assert_eq!(
1772 1 : tline
1773 1 : .get_rel_page_at_lsn(
1774 1 : TESTREL_A,
1775 1 : 0,
1776 1 : Version::at(Lsn(0x40)),
1777 1 : &ctx,
1778 1 : io_concurrency.clone()
1779 1 : )
1780 1 : .instrument(test_span.clone())
1781 1 : .await?,
1782 1 : test_img("foo blk 0 at 3")
1783 : );
1784 1 : assert_eq!(
1785 1 : tline
1786 1 : .get_rel_page_at_lsn(
1787 1 : TESTREL_A,
1788 1 : 1,
1789 1 : Version::at(Lsn(0x40)),
1790 1 : &ctx,
1791 1 : io_concurrency.clone()
1792 1 : )
1793 1 : .instrument(test_span.clone())
1794 1 : .await?,
1795 1 : test_img("foo blk 1 at 4")
1796 : );
1797 :
1798 1 : assert_eq!(
1799 1 : tline
1800 1 : .get_rel_page_at_lsn(
1801 1 : TESTREL_A,
1802 1 : 0,
1803 1 : Version::at(Lsn(0x50)),
1804 1 : &ctx,
1805 1 : io_concurrency.clone()
1806 1 : )
1807 1 : .instrument(test_span.clone())
1808 1 : .await?,
1809 1 : test_img("foo blk 0 at 3")
1810 : );
1811 1 : assert_eq!(
1812 1 : tline
1813 1 : .get_rel_page_at_lsn(
1814 1 : TESTREL_A,
1815 1 : 1,
1816 1 : Version::at(Lsn(0x50)),
1817 1 : &ctx,
1818 1 : io_concurrency.clone()
1819 1 : )
1820 1 : .instrument(test_span.clone())
1821 1 : .await?,
1822 1 : test_img("foo blk 1 at 4")
1823 : );
1824 1 : assert_eq!(
1825 1 : tline
1826 1 : .get_rel_page_at_lsn(
1827 1 : TESTREL_A,
1828 1 : 2,
1829 1 : Version::at(Lsn(0x50)),
1830 1 : &ctx,
1831 1 : io_concurrency.clone()
1832 1 : )
1833 1 : .instrument(test_span.clone())
1834 1 : .await?,
1835 1 : test_img("foo blk 2 at 5")
1836 : );
1837 :
1838 : // Truncate last block
1839 1 : let mut m = tline.begin_modification(Lsn(0x60));
1840 1 : walingest
1841 1 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1842 1 : .await?;
1843 1 : m.commit(&ctx).await?;
1844 1 : assert_current_logical_size(&tline, Lsn(0x60));
1845 :
1846 : // Check reported size and contents after truncation
1847 1 : assert_eq!(
1848 1 : tline
1849 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
1850 1 : .await?,
1851 : 2
1852 : );
1853 1 : assert_eq!(
1854 1 : tline
1855 1 : .get_rel_page_at_lsn(
1856 1 : TESTREL_A,
1857 1 : 0,
1858 1 : Version::at(Lsn(0x60)),
1859 1 : &ctx,
1860 1 : io_concurrency.clone()
1861 1 : )
1862 1 : .instrument(test_span.clone())
1863 1 : .await?,
1864 1 : test_img("foo blk 0 at 3")
1865 : );
1866 1 : assert_eq!(
1867 1 : tline
1868 1 : .get_rel_page_at_lsn(
1869 1 : TESTREL_A,
1870 1 : 1,
1871 1 : Version::at(Lsn(0x60)),
1872 1 : &ctx,
1873 1 : io_concurrency.clone()
1874 1 : )
1875 1 : .instrument(test_span.clone())
1876 1 : .await?,
1877 1 : test_img("foo blk 1 at 4")
1878 : );
1879 :
1880 : // should still see the truncated block with older LSN
1881 1 : assert_eq!(
1882 1 : tline
1883 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
1884 1 : .await?,
1885 : 3
1886 : );
1887 1 : assert_eq!(
1888 1 : tline
1889 1 : .get_rel_page_at_lsn(
1890 1 : TESTREL_A,
1891 1 : 2,
1892 1 : Version::at(Lsn(0x50)),
1893 1 : &ctx,
1894 1 : io_concurrency.clone()
1895 1 : )
1896 1 : .instrument(test_span.clone())
1897 1 : .await?,
1898 1 : test_img("foo blk 2 at 5")
1899 : );
1900 :
1901 : // Truncate to zero length
1902 1 : let mut m = tline.begin_modification(Lsn(0x68));
1903 1 : walingest
1904 1 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1905 1 : .await?;
1906 1 : m.commit(&ctx).await?;
1907 1 : assert_eq!(
1908 1 : tline
1909 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx)
1910 1 : .await?,
1911 : 0
1912 : );
1913 :
1914 : // Extend from 0 to 2 blocks, leaving a gap
1915 1 : let mut m = tline.begin_modification(Lsn(0x70));
1916 1 : walingest
1917 1 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
1918 1 : .await?;
1919 1 : m.commit(&ctx).await?;
1920 1 : assert_eq!(
1921 1 : tline
1922 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx)
1923 1 : .await?,
1924 : 2
1925 : );
1926 1 : assert_eq!(
1927 1 : tline
1928 1 : .get_rel_page_at_lsn(
1929 1 : TESTREL_A,
1930 1 : 0,
1931 1 : Version::at(Lsn(0x70)),
1932 1 : &ctx,
1933 1 : io_concurrency.clone()
1934 1 : )
1935 1 : .instrument(test_span.clone())
1936 1 : .await?,
1937 1 : ZERO_PAGE
1938 : );
1939 1 : assert_eq!(
1940 1 : tline
1941 1 : .get_rel_page_at_lsn(
1942 1 : TESTREL_A,
1943 1 : 1,
1944 1 : Version::at(Lsn(0x70)),
1945 1 : &ctx,
1946 1 : io_concurrency.clone()
1947 1 : )
1948 1 : .instrument(test_span.clone())
1949 1 : .await?,
1950 1 : test_img("foo blk 1")
1951 : );
1952 :
1953 : // Extend a lot more, leaving a big gap that spans across segments
1954 1 : let mut m = tline.begin_modification(Lsn(0x80));
1955 1 : walingest
1956 1 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
1957 1 : .await?;
1958 1 : m.commit(&ctx).await?;
1959 1 : assert_eq!(
1960 1 : tline
1961 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
1962 1 : .await?,
1963 : 1501
1964 : );
1965 1499 : for blk in 2..1500 {
1966 1498 : assert_eq!(
1967 1498 : tline
1968 1498 : .get_rel_page_at_lsn(
1969 1498 : TESTREL_A,
1970 1498 : blk,
1971 1498 : Version::at(Lsn(0x80)),
1972 1498 : &ctx,
1973 1498 : io_concurrency.clone()
1974 1498 : )
1975 1498 : .instrument(test_span.clone())
1976 1498 : .await?,
1977 1498 : ZERO_PAGE
1978 : );
1979 : }
1980 1 : assert_eq!(
1981 1 : tline
1982 1 : .get_rel_page_at_lsn(
1983 1 : TESTREL_A,
1984 1 : 1500,
1985 1 : Version::at(Lsn(0x80)),
1986 1 : &ctx,
1987 1 : io_concurrency.clone()
1988 1 : )
1989 1 : .instrument(test_span.clone())
1990 1 : .await?,
1991 1 : test_img("foo blk 1500")
1992 : );
1993 :
1994 2 : Ok(())
1995 1 : }
1996 :
1997 : // Test what happens if we dropped a relation
1998 : // and then created it again within the same layer.
1999 : #[tokio::test]
2000 1 : async fn test_drop_extend() -> Result<()> {
2001 1 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
2002 1 : .await?
2003 1 : .load()
2004 1 : .await;
2005 1 : let tline = tenant
2006 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2007 1 : .await?;
2008 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2009 :
2010 1 : let mut m = tline.begin_modification(Lsn(0x20));
2011 1 : walingest
2012 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
2013 1 : .await?;
2014 1 : m.commit(&ctx).await?;
2015 :
2016 : // Check that rel exists and size is correct
2017 1 : assert_eq!(
2018 1 : tline
2019 1 : .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
2020 1 : .await?,
2021 : true
2022 : );
2023 1 : assert_eq!(
2024 1 : tline
2025 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
2026 1 : .await?,
2027 : 1
2028 : );
2029 :
2030 : // Drop rel
2031 1 : let mut m = tline.begin_modification(Lsn(0x30));
2032 1 : let mut rel_drops = HashMap::new();
2033 1 : rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
2034 1 : m.put_rel_drops(rel_drops, &ctx).await?;
2035 1 : m.commit(&ctx).await?;
2036 :
2037 : // Check that rel is not visible anymore
2038 1 : assert_eq!(
2039 1 : tline
2040 1 : .get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx)
2041 1 : .await?,
2042 : false
2043 : );
2044 :
2045 : // FIXME: should fail
2046 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
2047 :
2048 : // Re-create it
2049 1 : let mut m = tline.begin_modification(Lsn(0x40));
2050 1 : walingest
2051 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
2052 1 : .await?;
2053 1 : m.commit(&ctx).await?;
2054 :
2055 : // Check that rel exists and size is correct
2056 1 : assert_eq!(
2057 1 : tline
2058 1 : .get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
2059 1 : .await?,
2060 : true
2061 : );
2062 1 : assert_eq!(
2063 1 : tline
2064 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
2065 1 : .await?,
2066 : 1
2067 : );
2068 :
2069 2 : Ok(())
2070 1 : }
2071 :
2072 : // Test what happens if we truncated a relation
2073 : // so that one of its segments was dropped
2074 : // and then extended it again within the same layer.
2075 : #[tokio::test]
2076 1 : async fn test_truncate_extend() -> Result<()> {
2077 1 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
2078 1 : .await?
2079 1 : .load()
2080 1 : .await;
2081 1 : let io_concurrency = IoConcurrency::spawn_for_test();
2082 1 : let tline = tenant
2083 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2084 1 : .await?;
2085 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2086 :
2087 : // Create a 20 MB relation (the size is arbitrary)
2088 1 : let relsize = 20 * 1024 * 1024 / 8192;
2089 1 : let mut m = tline.begin_modification(Lsn(0x20));
2090 2560 : for blkno in 0..relsize {
2091 2560 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
2092 2560 : walingest
2093 2560 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2094 2560 : .await?;
2095 : }
2096 1 : m.commit(&ctx).await?;
2097 :
2098 1 : let test_span = tracing::info_span!(parent: None, "test",
2099 0 : tenant_id=%tline.tenant_shard_id.tenant_id,
2100 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
2101 0 : timeline_id=%tline.timeline_id);
2102 :
2103 : // The relation was created at LSN 20, not visible at LSN 1 yet.
2104 1 : assert_eq!(
2105 1 : tline
2106 1 : .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
2107 1 : .await?,
2108 : false
2109 : );
2110 1 : assert!(
2111 1 : tline
2112 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
2113 1 : .await
2114 1 : .is_err()
2115 : );
2116 :
2117 1 : assert_eq!(
2118 1 : tline
2119 1 : .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
2120 1 : .await?,
2121 : true
2122 : );
2123 1 : assert_eq!(
2124 1 : tline
2125 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
2126 1 : .await?,
2127 : relsize
2128 : );
2129 :
2130 : // Check relation content
2131 2560 : for blkno in 0..relsize {
2132 2560 : let lsn = Lsn(0x20);
2133 2560 : let data = format!("foo blk {blkno} at {lsn}");
2134 2560 : assert_eq!(
2135 2560 : tline
2136 2560 : .get_rel_page_at_lsn(
2137 2560 : TESTREL_A,
2138 2560 : blkno,
2139 2560 : Version::at(lsn),
2140 2560 : &ctx,
2141 2560 : io_concurrency.clone()
2142 2560 : )
2143 2560 : .instrument(test_span.clone())
2144 2560 : .await?,
2145 2560 : test_img(&data)
2146 : );
2147 : }
2148 :
2149 : // Truncate relation so that second segment was dropped
2150 : // - only leave one page
2151 1 : let mut m = tline.begin_modification(Lsn(0x60));
2152 1 : walingest
2153 1 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2154 1 : .await?;
2155 1 : m.commit(&ctx).await?;
2156 :
2157 : // Check reported size and contents after truncation
2158 1 : assert_eq!(
2159 1 : tline
2160 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
2161 1 : .await?,
2162 : 1
2163 : );
2164 :
2165 2 : for blkno in 0..1 {
2166 1 : let lsn = Lsn(0x20);
2167 1 : let data = format!("foo blk {blkno} at {lsn}");
2168 1 : assert_eq!(
2169 1 : tline
2170 1 : .get_rel_page_at_lsn(
2171 1 : TESTREL_A,
2172 1 : blkno,
2173 1 : Version::at(Lsn(0x60)),
2174 1 : &ctx,
2175 1 : io_concurrency.clone()
2176 1 : )
2177 1 : .instrument(test_span.clone())
2178 1 : .await?,
2179 1 : test_img(&data)
2180 : );
2181 : }
2182 :
2183 : // should still see all blocks with older LSN
2184 1 : assert_eq!(
2185 1 : tline
2186 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
2187 1 : .await?,
2188 : relsize
2189 : );
2190 2560 : for blkno in 0..relsize {
2191 2560 : let lsn = Lsn(0x20);
2192 2560 : let data = format!("foo blk {blkno} at {lsn}");
2193 2560 : assert_eq!(
2194 2560 : tline
2195 2560 : .get_rel_page_at_lsn(
2196 2560 : TESTREL_A,
2197 2560 : blkno,
2198 2560 : Version::at(Lsn(0x50)),
2199 2560 : &ctx,
2200 2560 : io_concurrency.clone()
2201 2560 : )
2202 2560 : .instrument(test_span.clone())
2203 2560 : .await?,
2204 2560 : test_img(&data)
2205 : );
2206 : }
2207 :
2208 : // Extend relation again.
2209 : // Add enough blocks to create second segment
2210 1 : let lsn = Lsn(0x80);
2211 1 : let mut m = tline.begin_modification(lsn);
2212 2560 : for blkno in 0..relsize {
2213 2560 : let data = format!("foo blk {blkno} at {lsn}");
2214 2560 : walingest
2215 2560 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2216 2560 : .await?;
2217 : }
2218 1 : m.commit(&ctx).await?;
2219 :
2220 1 : assert_eq!(
2221 1 : tline
2222 1 : .get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
2223 1 : .await?,
2224 : true
2225 : );
2226 1 : assert_eq!(
2227 1 : tline
2228 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
2229 1 : .await?,
2230 : relsize
2231 : );
2232 : // Check relation content
2233 2560 : for blkno in 0..relsize {
2234 2560 : let lsn = Lsn(0x80);
2235 2560 : let data = format!("foo blk {blkno} at {lsn}");
2236 2560 : assert_eq!(
2237 2560 : tline
2238 2560 : .get_rel_page_at_lsn(
2239 2560 : TESTREL_A,
2240 2560 : blkno,
2241 2560 : Version::at(Lsn(0x80)),
2242 2560 : &ctx,
2243 2560 : io_concurrency.clone()
2244 2560 : )
2245 2560 : .instrument(test_span.clone())
2246 2560 : .await?,
2247 2560 : test_img(&data)
2248 1 : );
2249 1 : }
2250 1 :
2251 1 : Ok(())
2252 1 : }
2253 :
2254 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2255 : /// split into multiple 1 GB segments in Postgres.
2256 : #[tokio::test]
2257 1 : async fn test_large_rel() -> Result<()> {
2258 1 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
2259 1 : let tline = tenant
2260 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2261 1 : .await?;
2262 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2263 :
2264 1 : let mut lsn = 0x10;
2265 131073 : for blknum in 0..RELSEG_SIZE + 1 {
2266 131073 : lsn += 0x10;
2267 131073 : let mut m = tline.begin_modification(Lsn(lsn));
2268 131073 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2269 131073 : walingest
2270 131073 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2271 131073 : .await?;
2272 131073 : m.commit(&ctx).await?;
2273 : }
2274 :
2275 1 : assert_current_logical_size(&tline, Lsn(lsn));
2276 :
2277 1 : assert_eq!(
2278 1 : tline
2279 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
2280 1 : .await?,
2281 1 : RELSEG_SIZE + 1
2282 : );
2283 :
2284 : // Truncate one block
2285 1 : lsn += 0x10;
2286 1 : let mut m = tline.begin_modification(Lsn(lsn));
2287 1 : walingest
2288 1 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2289 1 : .await?;
2290 1 : m.commit(&ctx).await?;
2291 1 : assert_eq!(
2292 1 : tline
2293 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
2294 1 : .await?,
2295 : RELSEG_SIZE
2296 : );
2297 1 : assert_current_logical_size(&tline, Lsn(lsn));
2298 :
2299 : // Truncate another block
2300 1 : lsn += 0x10;
2301 1 : let mut m = tline.begin_modification(Lsn(lsn));
2302 1 : walingest
2303 1 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2304 1 : .await?;
2305 1 : m.commit(&ctx).await?;
2306 1 : assert_eq!(
2307 1 : tline
2308 1 : .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
2309 1 : .await?,
2310 1 : RELSEG_SIZE - 1
2311 : );
2312 1 : assert_current_logical_size(&tline, Lsn(lsn));
2313 :
2314 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2315 : // This tests the behavior at segment boundaries
2316 1 : let mut size: i32 = 3000;
2317 3002 : while size >= 0 {
2318 3001 : lsn += 0x10;
2319 3001 : let mut m = tline.begin_modification(Lsn(lsn));
2320 3001 : walingest
2321 3001 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2322 3001 : .await?;
2323 3001 : m.commit(&ctx).await?;
2324 3001 : assert_eq!(
2325 3001 : tline
2326 3001 : .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
2327 3001 : .await?,
2328 3001 : size as BlockNumber
2329 : );
2330 :
2331 3001 : size -= 1;
2332 : }
2333 1 : assert_current_logical_size(&tline, Lsn(lsn));
2334 :
2335 2 : Ok(())
2336 1 : }
2337 :
2338 : /// Replay a wal segment file taken directly from safekeepers.
2339 : ///
2340 : /// This test is useful for benchmarking since it allows us to profile only
2341 : /// the walingest code in a single-threaded executor, and iterate more quickly
2342 : /// without waiting for unrelated steps.
2343 : #[tokio::test]
2344 1 : async fn test_ingest_real_wal() {
2345 : use postgres_ffi::WAL_SEGMENT_SIZE;
2346 : use postgres_ffi::waldecoder::WalStreamDecoder;
2347 :
2348 : use crate::tenant::harness::*;
2349 :
2350 : // Define test data path and constants.
2351 : //
2352 : // Steps to reconstruct the data, if needed:
2353 : // 1. Run the pgbench python test
2354 : // 2. Take the first wal segment file from safekeeper
2355 : // 3. Compress it using `zstd --long input_file`
2356 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2357 : // 5. Grep sk logs for "restart decoder" to get startpoint
2358 : // 6. Run just the decoder from this test to get the endpoint.
2359 : // It's the last LSN the decoder will output.
2360 1 : let pg_version = PgMajorVersion::PG15; // The test data was generated by pg15
2361 1 : let path = "test_data/sk_wal_segment_from_pgbench";
2362 1 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2363 1 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2364 1 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2365 1 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2366 :
2367 1 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
2368 1 : let span = harness
2369 1 : .span()
2370 1 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
2371 1 : let (tenant, ctx) = harness.load().await;
2372 :
2373 1 : let remote_initdb_path =
2374 1 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2375 1 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2376 :
2377 1 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2378 1 : .expect("creating test dir should work");
2379 1 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2380 :
2381 : // Bootstrap a real timeline. We can't use create_test_timeline because
2382 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2383 : // the garbage data.
2384 1 : let tline = tenant
2385 1 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2386 1 : .await
2387 1 : .unwrap();
2388 :
2389 : // We fully read and decompress this into memory before decoding
2390 : // to get a more accurate perf profile of the decoder.
2391 1 : let bytes = {
2392 : use async_compression::tokio::bufread::ZstdDecoder;
2393 1 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2394 1 : let reader = tokio::io::BufReader::new(file);
2395 1 : let decoder = ZstdDecoder::new(reader);
2396 1 : let mut reader = tokio::io::BufReader::new(decoder);
2397 1 : let mut buffer = Vec::new();
2398 1 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2399 1 : buffer
2400 : };
2401 :
2402 : // TODO start a profiler too
2403 1 : let started_at = std::time::Instant::now();
2404 :
2405 : // Initialize walingest
2406 1 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2407 1 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2408 1 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2409 1 : .await
2410 1 : .unwrap();
2411 1 : let mut modification = tline.begin_modification(startpoint);
2412 1 : println!("decoding {} bytes", bytes.len() - xlogoff);
2413 :
2414 : // Decode and ingest wal. We process the wal in chunks because
2415 : // that's what happens when we get bytes from safekeepers.
2416 237343 : for chunk in bytes[xlogoff..].chunks(50) {
2417 237343 : decoder.feed_bytes(chunk);
2418 310268 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2419 72925 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
2420 72925 : recdata,
2421 72925 : &[*modification.tline.get_shard_identity()],
2422 72925 : lsn,
2423 72925 : modification.tline.pg_version,
2424 72925 : )
2425 72925 : .unwrap()
2426 72925 : .remove(modification.tline.get_shard_identity())
2427 72925 : .unwrap();
2428 :
2429 72925 : walingest
2430 72925 : .ingest_record(interpreted, &mut modification, &ctx)
2431 72925 : .instrument(span.clone())
2432 72925 : .await
2433 72925 : .unwrap();
2434 : }
2435 237343 : modification.commit(&ctx).await.unwrap();
2436 : }
2437 :
2438 1 : let duration = started_at.elapsed();
2439 1 : println!("done in {duration:?}");
2440 1 : }
2441 : }
|