Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> [`wal_decoder`] -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers.
9 : //! Records get decoded and interpreted in the [`wal_decoder`] module
10 : //! and then stored to the Repository by WalIngest.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
14 : //! extracts page images out of some WAL records, but mostly it's WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_rel_wal_record or put_rel_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::backtrace::Backtrace;
25 : use std::collections::HashMap;
26 : use std::sync::{Arc, OnceLock};
27 : use std::time::{Duration, Instant, SystemTime};
28 :
29 : use bytes::{Buf, Bytes};
30 : use pageserver_api::key::{Key, rel_block_to_key};
31 : use pageserver_api::record::NeonWalRecord;
32 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
33 : use pageserver_api::shard::ShardIdentity;
34 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
35 : use postgres_ffi::walrecord::*;
36 : use postgres_ffi::{
37 : TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
38 : fsm_logical_to_physical, pg_constants,
39 : };
40 : use tracing::*;
41 : use utils::bin_ser::{DeserializeError, SerializeError};
42 : use utils::lsn::Lsn;
43 : use utils::rate_limit::RateLimit;
44 : use utils::{critical, failpoint_support};
45 : use wal_decoder::models::*;
46 :
47 : use crate::ZERO_PAGE;
48 : use crate::context::RequestContext;
49 : use crate::metrics::WAL_INGEST;
50 : use crate::pgdatadir_mapping::{DatadirModification, Version};
51 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
52 : use crate::tenant::{PageReconstructError, Timeline};
53 :
54 : enum_pgversion! {CheckPoint, pgv::CheckPoint}
55 :
56 : impl CheckPoint {
57 12 : fn encode(&self) -> Result<Bytes, SerializeError> {
58 12 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
59 12 : }
60 :
61 291668 : fn update_next_xid(&mut self, xid: u32) -> bool {
62 291668 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
63 291668 : }
64 :
65 0 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
66 0 : enum_pgversion_dispatch!(self, CheckPoint, cp, {
67 0 : cp.update_next_multixid(multi_xid, multi_offset)
68 : })
69 0 : }
70 : }
71 :
72 : /// Temporary limitation of WAL lag warnings after attach
73 : ///
74 : /// After tenant attach, we want to limit WAL lag warnings because
75 : /// we don't look at the WAL until the attach is complete, which
76 : /// might take a while.
77 : pub struct WalLagCooldown {
78 : /// Until when should this limitation apply at all
79 : active_until: std::time::Instant,
80 : /// The maximum lag to suppress. Lags above this limit get reported anyways.
81 : max_lag: Duration,
82 : }
83 :
84 : impl WalLagCooldown {
85 0 : pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
86 0 : Self {
87 0 : active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
88 0 : max_lag: attach_duration * 2 + Duration::from_secs(60),
89 0 : }
90 0 : }
91 : }
92 :
93 : pub struct WalIngest {
94 : attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
95 : shard: ShardIdentity,
96 : checkpoint: CheckPoint,
97 : checkpoint_modified: bool,
98 : warn_ingest_lag: WarnIngestLag,
99 : }
100 :
101 : struct WarnIngestLag {
102 : lag_msg_ratelimit: RateLimit,
103 : future_lsn_msg_ratelimit: RateLimit,
104 : timestamp_invalid_msg_ratelimit: RateLimit,
105 : }
106 :
107 : pub struct WalIngestError {
108 : pub backtrace: std::backtrace::Backtrace,
109 : pub kind: WalIngestErrorKind,
110 : }
111 :
112 : #[derive(thiserror::Error, Debug)]
113 : pub enum WalIngestErrorKind {
114 : #[error(transparent)]
115 : #[allow(private_interfaces)]
116 : PageReconstructError(#[from] PageReconstructError),
117 : #[error(transparent)]
118 : DeserializationFailure(#[from] DeserializeError),
119 : #[error(transparent)]
120 : SerializationFailure(#[from] SerializeError),
121 : #[error("the request contains data not supported by pageserver: {0} @ {1}")]
122 : InvalidKey(Key, Lsn),
123 : #[error("twophase file for xid {0} already exists")]
124 : FileAlreadyExists(u64),
125 : #[error("slru segment {0:?}/{1} already exists")]
126 : SlruAlreadyExists(SlruKind, u32),
127 : #[error("relation already exists")]
128 : RelationAlreadyExists(RelTag),
129 : #[error("invalid reldir key {0}")]
130 : InvalidRelDirKey(Key),
131 :
132 : #[error(transparent)]
133 : LogicalError(anyhow::Error),
134 : #[error(transparent)]
135 : EncodeAuxFileError(anyhow::Error),
136 : #[error(transparent)]
137 : MaybeRelSizeV2Error(anyhow::Error),
138 :
139 : #[error("timeline shutting down")]
140 : Cancelled,
141 : }
142 :
143 : impl<T> From<T> for WalIngestError
144 : where
145 : WalIngestErrorKind: From<T>,
146 : {
147 0 : fn from(value: T) -> Self {
148 0 : WalIngestError {
149 0 : backtrace: Backtrace::capture(),
150 0 : kind: WalIngestErrorKind::from(value),
151 0 : }
152 0 : }
153 : }
154 :
155 : impl std::error::Error for WalIngestError {
156 0 : fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
157 0 : self.kind.source()
158 0 : }
159 : }
160 :
161 : impl core::fmt::Display for WalIngestError {
162 0 : fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
163 0 : self.kind.fmt(f)
164 0 : }
165 : }
166 :
167 : impl core::fmt::Debug for WalIngestError {
168 0 : fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
169 0 : if f.alternate() {
170 0 : f.debug_map()
171 0 : .key(&"backtrace")
172 0 : .value(&self.backtrace)
173 0 : .key(&"kind")
174 0 : .value(&self.kind)
175 0 : .finish()
176 : } else {
177 0 : writeln!(f, "Error: {:?}", self.kind)?;
178 0 : if self.backtrace.status() == std::backtrace::BacktraceStatus::Captured {
179 0 : writeln!(f, "Stack backtrace: {:?}", self.backtrace)?;
180 0 : }
181 0 : Ok(())
182 : }
183 0 : }
184 : }
185 :
186 : #[macro_export]
187 : macro_rules! ensure_walingest {
188 : ($($t:tt)*) => {
189 1418804 : _ = || -> Result<(), anyhow::Error> {
190 1418804 : anyhow::ensure!($($t)*);
191 1418804 : Ok(())
192 1418804 : }().map_err(WalIngestErrorKind::LogicalError)?;
193 : };
194 : }
195 :
196 : impl WalIngest {
197 24 : pub async fn new(
198 24 : timeline: &Timeline,
199 24 : startpoint: Lsn,
200 24 : ctx: &RequestContext,
201 24 : ) -> Result<WalIngest, WalIngestError> {
202 : // Fetch the latest checkpoint into memory, so that we can compare with it
203 : // quickly in `ingest_record` and update it when it changes.
204 24 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
205 24 : let pgversion = timeline.pg_version;
206 :
207 24 : let checkpoint = dispatch_pgversion!(pgversion, {
208 0 : let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
209 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
210 0 : <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
211 : });
212 :
213 24 : Ok(WalIngest {
214 24 : shard: *timeline.get_shard_identity(),
215 24 : checkpoint,
216 24 : checkpoint_modified: false,
217 24 : attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
218 24 : warn_ingest_lag: WarnIngestLag {
219 24 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
220 24 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
221 24 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
222 24 : },
223 24 : })
224 24 : }
225 :
226 : /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
227 : /// storage of a given timeline.
228 : ///
229 : /// This function updates `lsn` field of `DatadirModification`
230 : ///
231 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
232 291704 : pub async fn ingest_record(
233 291704 : &mut self,
234 291704 : interpreted: InterpretedWalRecord,
235 291704 : modification: &mut DatadirModification<'_>,
236 291704 : ctx: &RequestContext,
237 291704 : ) -> Result<bool, WalIngestError> {
238 291704 : WAL_INGEST.records_received.inc();
239 291704 : let prev_len = modification.len();
240 291704 :
241 291704 : modification.set_lsn(interpreted.next_record_lsn)?;
242 :
243 291704 : if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
244 : // Records of this type should always be preceded by a commit(), as they
245 : // rely on reading data pages back from the Timeline.
246 0 : assert!(!modification.has_dirty_data());
247 291704 : }
248 :
249 291704 : assert!(!self.checkpoint_modified);
250 291704 : if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
251 291668 : && self.checkpoint.update_next_xid(interpreted.xid)
252 4 : {
253 4 : self.checkpoint_modified = true;
254 291700 : }
255 :
256 291704 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
257 :
258 132 : match interpreted.metadata_record {
259 24 : Some(MetadataRecord::Heapam(rec)) => match rec {
260 24 : HeapamRecord::ClearVmBits(clear_vm_bits) => {
261 24 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
262 24 : .await?;
263 : }
264 : },
265 0 : Some(MetadataRecord::Neonrmgr(rec)) => match rec {
266 0 : NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
267 0 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
268 0 : .await?;
269 : }
270 : },
271 32 : Some(MetadataRecord::Smgr(rec)) => match rec {
272 32 : SmgrRecord::Create(create) => {
273 32 : self.ingest_xlog_smgr_create(create, modification, ctx)
274 32 : .await?;
275 : }
276 0 : SmgrRecord::Truncate(truncate) => {
277 0 : self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
278 0 : .await?;
279 : }
280 : },
281 0 : Some(MetadataRecord::Dbase(rec)) => match rec {
282 0 : DbaseRecord::Create(create) => {
283 0 : self.ingest_xlog_dbase_create(create, modification, ctx)
284 0 : .await?;
285 : }
286 0 : DbaseRecord::Drop(drop) => {
287 0 : self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
288 : }
289 : },
290 0 : Some(MetadataRecord::Clog(rec)) => match rec {
291 0 : ClogRecord::ZeroPage(zero_page) => {
292 0 : self.ingest_clog_zero_page(zero_page, modification, ctx)
293 0 : .await?;
294 : }
295 0 : ClogRecord::Truncate(truncate) => {
296 0 : self.ingest_clog_truncate(truncate, modification, ctx)
297 0 : .await?;
298 : }
299 : },
300 16 : Some(MetadataRecord::Xact(rec)) => {
301 16 : self.ingest_xact_record(rec, modification, ctx).await?;
302 : }
303 0 : Some(MetadataRecord::MultiXact(rec)) => match rec {
304 0 : MultiXactRecord::ZeroPage(zero_page) => {
305 0 : self.ingest_multixact_zero_page(zero_page, modification, ctx)
306 0 : .await?;
307 : }
308 0 : MultiXactRecord::Create(create) => {
309 0 : self.ingest_multixact_create(modification, &create)?;
310 : }
311 0 : MultiXactRecord::Truncate(truncate) => {
312 0 : self.ingest_multixact_truncate(modification, &truncate, ctx)
313 0 : .await?;
314 : }
315 : },
316 0 : Some(MetadataRecord::Relmap(rec)) => match rec {
317 0 : RelmapRecord::Update(update) => {
318 0 : self.ingest_relmap_update(update, modification, ctx).await?;
319 : }
320 : },
321 60 : Some(MetadataRecord::Xlog(rec)) => match rec {
322 60 : XlogRecord::Raw(raw) => {
323 60 : self.ingest_raw_xlog_record(raw, modification, ctx).await?;
324 : }
325 : },
326 0 : Some(MetadataRecord::LogicalMessage(rec)) => match rec {
327 0 : LogicalMessageRecord::Put(put) => {
328 0 : self.ingest_logical_message_put(put, modification, ctx)
329 0 : .await?;
330 : }
331 : #[cfg(feature = "testing")]
332 : LogicalMessageRecord::Failpoint => {
333 : // This is a convenient way to make the WAL ingestion pause at
334 : // particular point in the WAL. For more fine-grained control,
335 : // we could peek into the message and only pause if it contains
336 : // a particular string, for example, but this is enough for now.
337 0 : failpoint_support::sleep_millis_async!(
338 0 : "pageserver-wal-ingest-logical-message-sleep"
339 0 : );
340 : }
341 : },
342 0 : Some(MetadataRecord::Standby(rec)) => {
343 0 : self.ingest_standby_record(rec).unwrap();
344 0 : }
345 0 : Some(MetadataRecord::Replorigin(rec)) => {
346 0 : self.ingest_replorigin_record(rec, modification).await?;
347 : }
348 291572 : None => {
349 291572 : // There are two cases through which we end up here:
350 291572 : // 1. The resource manager for the original PG WAL record
351 291572 : // is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
352 291572 : // record type within Neon.
353 291572 : // 2. The resource manager id was unknown to
354 291572 : // [`wal_decoder::decoder::MetadataRecord::from_decoded`].
355 291572 : // TODO(vlad): Tighten this up more once we build confidence
356 291572 : // that case (2) does not happen in the field.
357 291572 : }
358 : }
359 :
360 291704 : modification
361 291704 : .ingest_batch(interpreted.batch, &self.shard, ctx)
362 291704 : .await?;
363 :
364 : // If checkpoint data was updated, store the new version in the repository
365 291704 : if self.checkpoint_modified {
366 12 : let new_checkpoint_bytes = self.checkpoint.encode()?;
367 :
368 12 : modification.put_checkpoint(new_checkpoint_bytes)?;
369 12 : self.checkpoint_modified = false;
370 291692 : }
371 :
372 : // Note that at this point this record is only cached in the modification
373 : // until commit() is called to flush the data into the repository and update
374 : // the latest LSN.
375 :
376 291704 : Ok(modification.len() > prev_len)
377 291704 : }
378 :
379 : /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
380 0 : fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64, WalIngestError> {
381 0 : let next_full_xid =
382 0 : enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
383 :
384 0 : let next_xid = (next_full_xid) as u32;
385 0 : let mut epoch = (next_full_xid >> 32) as u32;
386 0 :
387 0 : if xid > next_xid {
388 : // Wraparound occurred, must be from a prev epoch.
389 0 : if epoch == 0 {
390 0 : Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
391 0 : "apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
392 0 : )))?;
393 0 : }
394 0 : epoch -= 1;
395 0 : }
396 :
397 0 : Ok(((epoch as u64) << 32) | xid as u64)
398 0 : }
399 :
400 24 : async fn ingest_clear_vm_bits(
401 24 : &mut self,
402 24 : clear_vm_bits: ClearVmBits,
403 24 : modification: &mut DatadirModification<'_>,
404 24 : ctx: &RequestContext,
405 24 : ) -> Result<(), WalIngestError> {
406 24 : let ClearVmBits {
407 24 : new_heap_blkno,
408 24 : old_heap_blkno,
409 24 : flags,
410 24 : vm_rel,
411 24 : } = clear_vm_bits;
412 24 : // Clear the VM bits if required.
413 24 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
414 24 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
415 :
416 : // VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
417 : // its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
418 : // as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
419 : // https://github.com/neondatabase/neon/pull/10634.
420 24 : let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
421 0 : critical!("clear_vm_bits for unknown VM relation {vm_rel}");
422 0 : return Ok(());
423 : };
424 24 : if let Some(blknum) = new_vm_blk {
425 24 : if blknum >= vm_size {
426 0 : critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
427 0 : new_vm_blk = None;
428 24 : }
429 0 : }
430 24 : if let Some(blknum) = old_vm_blk {
431 0 : if blknum >= vm_size {
432 0 : critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
433 0 : old_vm_blk = None;
434 0 : }
435 24 : }
436 :
437 24 : if new_vm_blk.is_none() && old_vm_blk.is_none() {
438 0 : return Ok(());
439 24 : } else if new_vm_blk == old_vm_blk {
440 : // An UPDATE record that needs to clear the bits for both old and the new page, both of
441 : // which reside on the same VM page.
442 0 : self.put_rel_wal_record(
443 0 : modification,
444 0 : vm_rel,
445 0 : new_vm_blk.unwrap(),
446 0 : NeonWalRecord::ClearVisibilityMapFlags {
447 0 : new_heap_blkno,
448 0 : old_heap_blkno,
449 0 : flags,
450 0 : },
451 0 : ctx,
452 0 : )
453 0 : .await?;
454 : } else {
455 : // Clear VM bits for one heap page, or for two pages that reside on different VM pages.
456 24 : if let Some(new_vm_blk) = new_vm_blk {
457 24 : self.put_rel_wal_record(
458 24 : modification,
459 24 : vm_rel,
460 24 : new_vm_blk,
461 24 : NeonWalRecord::ClearVisibilityMapFlags {
462 24 : new_heap_blkno,
463 24 : old_heap_blkno: None,
464 24 : flags,
465 24 : },
466 24 : ctx,
467 24 : )
468 24 : .await?;
469 0 : }
470 24 : if let Some(old_vm_blk) = old_vm_blk {
471 0 : self.put_rel_wal_record(
472 0 : modification,
473 0 : vm_rel,
474 0 : old_vm_blk,
475 0 : NeonWalRecord::ClearVisibilityMapFlags {
476 0 : new_heap_blkno: None,
477 0 : old_heap_blkno,
478 0 : flags,
479 0 : },
480 0 : ctx,
481 0 : )
482 0 : .await?;
483 24 : }
484 : }
485 24 : Ok(())
486 24 : }
487 :
488 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
489 0 : async fn ingest_xlog_dbase_create(
490 0 : &mut self,
491 0 : create: DbaseCreate,
492 0 : modification: &mut DatadirModification<'_>,
493 0 : ctx: &RequestContext,
494 0 : ) -> Result<(), WalIngestError> {
495 0 : let DbaseCreate {
496 0 : db_id,
497 0 : tablespace_id,
498 0 : src_db_id,
499 0 : src_tablespace_id,
500 0 : } = create;
501 :
502 0 : let rels = modification
503 0 : .tline
504 0 : .list_rels(
505 0 : src_tablespace_id,
506 0 : src_db_id,
507 0 : Version::Modified(modification),
508 0 : ctx,
509 0 : )
510 0 : .await?;
511 :
512 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
513 :
514 : // Copy relfilemap
515 0 : let filemap = modification
516 0 : .tline
517 0 : .get_relmap_file(
518 0 : src_tablespace_id,
519 0 : src_db_id,
520 0 : Version::Modified(modification),
521 0 : ctx,
522 0 : )
523 0 : .await?;
524 0 : modification
525 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
526 0 : .await?;
527 :
528 0 : let mut num_rels_copied = 0;
529 0 : let mut num_blocks_copied = 0;
530 0 : for src_rel in rels {
531 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
532 0 : assert_eq!(src_rel.dbnode, src_db_id);
533 :
534 0 : let nblocks = modification
535 0 : .tline
536 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
537 0 : .await?;
538 0 : let dst_rel = RelTag {
539 0 : spcnode: tablespace_id,
540 0 : dbnode: db_id,
541 0 : relnode: src_rel.relnode,
542 0 : forknum: src_rel.forknum,
543 0 : };
544 0 :
545 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
546 :
547 : // Copy content
548 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
549 0 : for blknum in 0..nblocks {
550 : // Sharding:
551 : // - src and dst are always on the same shard, because they differ only by dbNode, and
552 : // dbNode is not included in the hash inputs for sharding.
553 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
554 : // that belong to it.
555 0 : let src_key = rel_block_to_key(src_rel, blknum);
556 0 : if !self.shard.is_key_local(&src_key) {
557 0 : debug!(
558 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
559 : src_key
560 : );
561 0 : continue;
562 0 : }
563 0 : debug!(
564 0 : "copying block {} from {} ({}) to {}",
565 : blknum, src_rel, src_key, dst_rel
566 : );
567 :
568 0 : let content = modification
569 0 : .tline
570 0 : .get_rel_page_at_lsn(
571 0 : src_rel,
572 0 : blknum,
573 0 : Version::Modified(modification),
574 0 : ctx,
575 0 : crate::tenant::storage_layer::IoConcurrency::sequential(),
576 0 : )
577 0 : .await?;
578 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
579 0 : num_blocks_copied += 1;
580 : }
581 :
582 0 : num_rels_copied += 1;
583 : }
584 :
585 0 : info!(
586 0 : "Created database {}/{}, copied {} blocks in {} rels",
587 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
588 : );
589 0 : Ok(())
590 0 : }
591 :
592 0 : async fn ingest_xlog_dbase_drop(
593 0 : &mut self,
594 0 : dbase_drop: DbaseDrop,
595 0 : modification: &mut DatadirModification<'_>,
596 0 : ctx: &RequestContext,
597 0 : ) -> Result<(), WalIngestError> {
598 0 : let DbaseDrop {
599 0 : db_id,
600 0 : tablespace_ids,
601 0 : } = dbase_drop;
602 0 : for tablespace_id in tablespace_ids {
603 0 : trace!("Drop db {}, {}", tablespace_id, db_id);
604 0 : modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
605 : }
606 :
607 0 : Ok(())
608 0 : }
609 :
610 32 : async fn ingest_xlog_smgr_create(
611 32 : &mut self,
612 32 : create: SmgrCreate,
613 32 : modification: &mut DatadirModification<'_>,
614 32 : ctx: &RequestContext,
615 32 : ) -> Result<(), WalIngestError> {
616 32 : let SmgrCreate { rel } = create;
617 32 : self.put_rel_creation(modification, rel, ctx).await?;
618 32 : Ok(())
619 32 : }
620 :
621 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
622 : ///
623 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
624 0 : async fn ingest_xlog_smgr_truncate(
625 0 : &mut self,
626 0 : truncate: XlSmgrTruncate,
627 0 : modification: &mut DatadirModification<'_>,
628 0 : ctx: &RequestContext,
629 0 : ) -> Result<(), WalIngestError> {
630 0 : let XlSmgrTruncate {
631 0 : blkno,
632 0 : rnode,
633 0 : flags,
634 0 : } = truncate;
635 0 :
636 0 : let spcnode = rnode.spcnode;
637 0 : let dbnode = rnode.dbnode;
638 0 : let relnode = rnode.relnode;
639 0 :
640 0 : if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
641 0 : let rel = RelTag {
642 0 : spcnode,
643 0 : dbnode,
644 0 : relnode,
645 0 : forknum: MAIN_FORKNUM,
646 0 : };
647 0 :
648 0 : self.put_rel_truncation(modification, rel, blkno, ctx)
649 0 : .await?;
650 0 : }
651 0 : if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
652 0 : let rel = RelTag {
653 0 : spcnode,
654 0 : dbnode,
655 0 : relnode,
656 0 : forknum: FSM_FORKNUM,
657 0 : };
658 0 :
659 0 : // Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
660 0 : // and instead of digging in the FSM bitmap format we just clear the whole page.
661 0 : let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
662 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
663 0 : if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
664 0 : && self
665 0 : .shard
666 0 : .is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
667 : {
668 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
669 0 : fsm_physical_page_no += 1;
670 0 : }
671 : // Truncate this shard's view of the FSM relation size, if it even has one.
672 0 : let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
673 0 : if nblocks > fsm_physical_page_no {
674 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
675 0 : .await?;
676 0 : }
677 0 : }
678 0 : if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
679 0 : let rel = RelTag {
680 0 : spcnode,
681 0 : dbnode,
682 0 : relnode,
683 0 : forknum: VISIBILITYMAP_FORKNUM,
684 0 : };
685 0 :
686 0 : // last remaining block, byte, and bit
687 0 : let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
688 0 : let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
689 0 : / pg_constants::VM_HEAPBLOCKS_PER_BYTE;
690 0 : let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
691 0 : * pg_constants::VM_BITS_PER_HEAPBLOCK;
692 0 :
693 0 : // Unless the new size is exactly at a visibility map page boundary, the
694 0 : // tail bits in the last remaining map page, representing truncated heap
695 0 : // blocks, need to be cleared. This is not only tidy, but also necessary
696 0 : // because we don't get a chance to clear the bits if the heap is extended
697 0 : // again. Only do this on the shard that owns the page.
698 0 : if (trunc_byte != 0 || trunc_offs != 0)
699 0 : && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
700 : {
701 0 : modification.put_rel_wal_record(
702 0 : rel,
703 0 : vm_page_no,
704 0 : NeonWalRecord::TruncateVisibilityMap {
705 0 : trunc_byte,
706 0 : trunc_offs,
707 0 : },
708 0 : )?;
709 0 : vm_page_no += 1;
710 0 : }
711 : // Truncate this shard's view of the VM relation size, if it even has one.
712 0 : let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
713 0 : if nblocks > vm_page_no {
714 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
715 0 : .await?;
716 0 : }
717 0 : }
718 0 : Ok(())
719 0 : }
720 :
721 16 : fn warn_on_ingest_lag(
722 16 : &mut self,
723 16 : conf: &crate::config::PageServerConf,
724 16 : wal_timestamp: TimestampTz,
725 16 : ) {
726 16 : debug_assert_current_span_has_tenant_and_timeline_id();
727 16 : let now = SystemTime::now();
728 16 : let rate_limits = &mut self.warn_ingest_lag;
729 :
730 16 : let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
731 0 : pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
732 : });
733 :
734 16 : match ts {
735 16 : Ok(ts) => {
736 16 : match now.duration_since(ts) {
737 16 : Ok(lag) => {
738 16 : if lag > conf.wait_lsn_timeout {
739 16 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
740 4 : if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
741 0 : if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
742 0 : return;
743 0 : }
744 4 : } else {
745 4 : // Still loading? We shouldn't be here
746 4 : }
747 4 : let lag = humantime::format_duration(lag);
748 4 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
749 16 : })
750 0 : }
751 : }
752 0 : Err(e) => {
753 0 : let delta_t = e.duration();
754 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
755 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
756 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
757 0 : if delta_t > IGNORED_DRIFT {
758 0 : let delta_t = humantime::format_duration(delta_t);
759 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
760 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
761 0 : })
762 0 : }
763 : }
764 : };
765 : }
766 0 : Err(error) => {
767 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
768 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
769 0 : })
770 : }
771 : }
772 16 : }
773 :
774 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
775 : ///
776 16 : async fn ingest_xact_record(
777 16 : &mut self,
778 16 : record: XactRecord,
779 16 : modification: &mut DatadirModification<'_>,
780 16 : ctx: &RequestContext,
781 16 : ) -> Result<(), WalIngestError> {
782 16 : let (xact_common, is_commit, is_prepared) = match record {
783 0 : XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
784 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
785 0 : self.adjust_to_full_transaction_id(xl_xid)?
786 : } else {
787 0 : xl_xid as u64
788 : };
789 0 : return modification.put_twophase_file(xid, data, ctx).await;
790 : }
791 16 : XactRecord::Commit(common) => (common, true, false),
792 0 : XactRecord::Abort(common) => (common, false, false),
793 0 : XactRecord::CommitPrepared(common) => (common, true, true),
794 0 : XactRecord::AbortPrepared(common) => (common, false, true),
795 : };
796 :
797 : let XactCommon {
798 16 : parsed,
799 16 : origin_id,
800 16 : xl_xid,
801 16 : lsn,
802 16 : } = xact_common;
803 16 :
804 16 : // Record update of CLOG pages
805 16 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
806 16 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
807 16 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
808 16 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
809 16 :
810 16 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
811 :
812 16 : for subxact in &parsed.subxacts {
813 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
814 0 : if subxact_pageno != pageno {
815 : // This subxact goes to different page. Write the record
816 : // for all the XIDs on the previous page, and continue
817 : // accumulating XIDs on this new page.
818 0 : modification.put_slru_wal_record(
819 0 : SlruKind::Clog,
820 0 : segno,
821 0 : rpageno,
822 0 : if is_commit {
823 0 : NeonWalRecord::ClogSetCommitted {
824 0 : xids: page_xids,
825 0 : timestamp: parsed.xact_time,
826 0 : }
827 : } else {
828 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
829 : },
830 0 : )?;
831 0 : page_xids = Vec::new();
832 0 : }
833 0 : pageno = subxact_pageno;
834 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
835 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
836 0 : page_xids.push(*subxact);
837 : }
838 16 : modification.put_slru_wal_record(
839 16 : SlruKind::Clog,
840 16 : segno,
841 16 : rpageno,
842 16 : if is_commit {
843 16 : NeonWalRecord::ClogSetCommitted {
844 16 : xids: page_xids,
845 16 : timestamp: parsed.xact_time,
846 16 : }
847 : } else {
848 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
849 : },
850 0 : )?;
851 :
852 : // Group relations to drop by dbNode. This map will contain all relations that _might_
853 : // exist, we will reduce it to which ones really exist later. This map can be huge if
854 : // the transaction touches a huge number of relations (there is no bound on this in
855 : // postgres).
856 16 : let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
857 :
858 16 : for xnode in &parsed.xnodes {
859 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
860 0 : let rel = RelTag {
861 0 : forknum,
862 0 : spcnode: xnode.spcnode,
863 0 : dbnode: xnode.dbnode,
864 0 : relnode: xnode.relnode,
865 0 : };
866 0 : drop_relations
867 0 : .entry((xnode.spcnode, xnode.dbnode))
868 0 : .or_default()
869 0 : .push(rel);
870 0 : }
871 : }
872 :
873 : // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
874 16 : modification.put_rel_drops(drop_relations, ctx).await?;
875 :
876 16 : if origin_id != 0 {
877 0 : modification
878 0 : .set_replorigin(origin_id, parsed.origin_lsn)
879 0 : .await?;
880 16 : }
881 :
882 16 : if is_prepared {
883 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
884 0 : trace!(
885 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
886 : xl_xid, parsed.xid, lsn,
887 : );
888 :
889 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
890 0 : self.adjust_to_full_transaction_id(parsed.xid)?
891 : } else {
892 0 : parsed.xid as u64
893 : };
894 0 : modification.drop_twophase_file(xid, ctx).await?;
895 16 : }
896 :
897 16 : Ok(())
898 16 : }
899 :
900 0 : async fn ingest_clog_truncate(
901 0 : &mut self,
902 0 : truncate: ClogTruncate,
903 0 : modification: &mut DatadirModification<'_>,
904 0 : ctx: &RequestContext,
905 0 : ) -> Result<(), WalIngestError> {
906 0 : let ClogTruncate {
907 0 : pageno,
908 0 : oldest_xid,
909 0 : oldest_xid_db,
910 0 : } = truncate;
911 0 :
912 0 : info!(
913 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
914 : pageno, oldest_xid, oldest_xid_db
915 : );
916 :
917 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
918 : // truncated, but a checkpoint record with the updated values isn't written until
919 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
920 : // so we keep the oldestXid and oldestXidDB up-to-date.
921 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
922 0 : cp.oldestXid = oldest_xid;
923 0 : cp.oldestXidDB = oldest_xid_db;
924 0 : });
925 0 : self.checkpoint_modified = true;
926 :
927 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
928 :
929 0 : let latest_page_number =
930 0 : enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
931 : / pg_constants::CLOG_XACTS_PER_PAGE;
932 :
933 : // Now delete all segments containing pages between xlrec.pageno
934 : // and latest_page_number.
935 :
936 : // First, make an important safety check:
937 : // the current endpoint page must not be eligible for removal.
938 : // See SimpleLruTruncate() in slru.c
939 0 : if dispatch_pgversion!(modification.tline.pg_version, {
940 0 : pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
941 : }) {
942 0 : info!("could not truncate directory pg_xact apparent wraparound");
943 0 : return Ok(());
944 0 : }
945 0 :
946 0 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
947 0 : //
948 0 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
949 0 : // will block waiting for the last valid LSN to advance up to
950 0 : // it. So we use the previous record's LSN in the get calls
951 0 : // instead.
952 0 : if modification.tline.get_shard_identity().is_shard_zero() {
953 0 : for segno in modification
954 0 : .tline
955 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
956 0 : .await?
957 : {
958 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
959 :
960 0 : let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
961 0 : pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
962 : });
963 :
964 0 : if may_delete {
965 0 : modification
966 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
967 0 : .await?;
968 0 : trace!("Drop CLOG segment {:>04X}", segno);
969 0 : }
970 : }
971 0 : }
972 :
973 0 : Ok(())
974 0 : }
975 :
976 0 : async fn ingest_clog_zero_page(
977 0 : &mut self,
978 0 : zero_page: ClogZeroPage,
979 0 : modification: &mut DatadirModification<'_>,
980 0 : ctx: &RequestContext,
981 0 : ) -> Result<(), WalIngestError> {
982 0 : let ClogZeroPage { segno, rpageno } = zero_page;
983 0 :
984 0 : self.put_slru_page_image(
985 0 : modification,
986 0 : SlruKind::Clog,
987 0 : segno,
988 0 : rpageno,
989 0 : ZERO_PAGE.clone(),
990 0 : ctx,
991 0 : )
992 0 : .await
993 0 : }
994 :
995 0 : fn ingest_multixact_create(
996 0 : &mut self,
997 0 : modification: &mut DatadirModification,
998 0 : xlrec: &XlMultiXactCreate,
999 0 : ) -> Result<(), WalIngestError> {
1000 0 : // Create WAL record for updating the multixact-offsets page
1001 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1002 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1003 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1004 0 :
1005 0 : modification.put_slru_wal_record(
1006 0 : SlruKind::MultiXactOffsets,
1007 0 : segno,
1008 0 : rpageno,
1009 0 : NeonWalRecord::MultixactOffsetCreate {
1010 0 : mid: xlrec.mid,
1011 0 : moff: xlrec.moff,
1012 0 : },
1013 0 : )?;
1014 :
1015 : // Create WAL records for the update of each affected multixact-members page
1016 0 : let mut members = xlrec.members.iter();
1017 0 : let mut offset = xlrec.moff;
1018 : loop {
1019 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1020 0 :
1021 0 : // How many members fit on this page?
1022 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1023 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1024 0 :
1025 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1026 0 : for _ in 0..page_remain {
1027 0 : if let Some(m) = members.next() {
1028 0 : this_page_members.push(m.clone());
1029 0 : } else {
1030 0 : break;
1031 : }
1032 : }
1033 0 : if this_page_members.is_empty() {
1034 : // all done
1035 0 : break;
1036 0 : }
1037 0 : let n_this_page = this_page_members.len();
1038 0 :
1039 0 : modification.put_slru_wal_record(
1040 0 : SlruKind::MultiXactMembers,
1041 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1042 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1043 0 : NeonWalRecord::MultixactMembersCreate {
1044 0 : moff: offset,
1045 0 : members: this_page_members,
1046 0 : },
1047 0 : )?;
1048 :
1049 : // Note: The multixact members can wrap around, even within one WAL record.
1050 0 : offset = offset.wrapping_add(n_this_page as u32);
1051 : }
1052 0 : let next_offset = offset;
1053 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
1054 :
1055 : // Update next-multi-xid and next-offset
1056 : //
1057 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
1058 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
1059 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
1060 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
1061 : // is stored, so it's never 0 in a checkpoint.
1062 : //
1063 : // I don't know why it's done that way, it seems less error-prone to skip over 0
1064 : // when the value is stored rather than when it's read. But let's do it the same
1065 : // way here.
1066 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
1067 0 :
1068 0 : if self
1069 0 : .checkpoint
1070 0 : .update_next_multixid(next_multi_xid, next_offset)
1071 0 : {
1072 0 : self.checkpoint_modified = true;
1073 0 : }
1074 :
1075 : // Also update the next-xid with the highest member. According to the comments in
1076 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
1077 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1078 0 : if let Some(max_xid) = acc {
1079 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1080 0 : Some(mbr.xid)
1081 : } else {
1082 0 : acc
1083 : }
1084 : } else {
1085 0 : Some(mbr.xid)
1086 : }
1087 0 : });
1088 :
1089 0 : if let Some(max_xid) = max_mbr_xid {
1090 0 : if self.checkpoint.update_next_xid(max_xid) {
1091 0 : self.checkpoint_modified = true;
1092 0 : }
1093 0 : }
1094 0 : Ok(())
1095 0 : }
1096 :
1097 0 : async fn ingest_multixact_truncate(
1098 0 : &mut self,
1099 0 : modification: &mut DatadirModification<'_>,
1100 0 : xlrec: &XlMultiXactTruncate,
1101 0 : ctx: &RequestContext,
1102 0 : ) -> Result<(), WalIngestError> {
1103 0 : let (maxsegment, startsegment, endsegment) =
1104 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1105 0 : cp.oldestMulti = xlrec.end_trunc_off;
1106 0 : cp.oldestMultiDB = xlrec.oldest_multi_db;
1107 0 : let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
1108 0 : pg_constants::MAX_MULTIXACT_OFFSET,
1109 0 : );
1110 0 : let startsegment: i32 =
1111 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
1112 0 : let endsegment: i32 =
1113 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
1114 0 : (maxsegment, startsegment, endsegment)
1115 : });
1116 :
1117 0 : self.checkpoint_modified = true;
1118 0 :
1119 0 : // PerformMembersTruncation
1120 0 : let mut segment: i32 = startsegment;
1121 0 :
1122 0 : // Delete all the segments except the last one. The last segment can still
1123 0 : // contain, possibly partially, valid data.
1124 0 : if modification.tline.get_shard_identity().is_shard_zero() {
1125 0 : while segment != endsegment {
1126 0 : modification
1127 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1128 0 : .await?;
1129 :
1130 : /* move to next segment, handling wraparound correctly */
1131 0 : if segment == maxsegment {
1132 0 : segment = 0;
1133 0 : } else {
1134 0 : segment += 1;
1135 0 : }
1136 : }
1137 0 : }
1138 :
1139 : // Truncate offsets
1140 : // FIXME: this did not handle wraparound correctly
1141 :
1142 0 : Ok(())
1143 0 : }
1144 :
1145 0 : async fn ingest_multixact_zero_page(
1146 0 : &mut self,
1147 0 : zero_page: MultiXactZeroPage,
1148 0 : modification: &mut DatadirModification<'_>,
1149 0 : ctx: &RequestContext,
1150 0 : ) -> Result<(), WalIngestError> {
1151 0 : let MultiXactZeroPage {
1152 0 : slru_kind,
1153 0 : segno,
1154 0 : rpageno,
1155 0 : } = zero_page;
1156 0 : self.put_slru_page_image(
1157 0 : modification,
1158 0 : slru_kind,
1159 0 : segno,
1160 0 : rpageno,
1161 0 : ZERO_PAGE.clone(),
1162 0 : ctx,
1163 0 : )
1164 0 : .await
1165 0 : }
1166 :
1167 0 : async fn ingest_relmap_update(
1168 0 : &mut self,
1169 0 : update: RelmapUpdate,
1170 0 : modification: &mut DatadirModification<'_>,
1171 0 : ctx: &RequestContext,
1172 0 : ) -> Result<(), WalIngestError> {
1173 0 : let RelmapUpdate { update, buf } = update;
1174 0 :
1175 0 : modification
1176 0 : .put_relmap_file(update.tsid, update.dbid, buf, ctx)
1177 0 : .await
1178 0 : }
1179 :
1180 60 : async fn ingest_raw_xlog_record(
1181 60 : &mut self,
1182 60 : raw_record: RawXlogRecord,
1183 60 : modification: &mut DatadirModification<'_>,
1184 60 : ctx: &RequestContext,
1185 60 : ) -> Result<(), WalIngestError> {
1186 60 : let RawXlogRecord { info, lsn, mut buf } = raw_record;
1187 60 : let pg_version = modification.tline.pg_version;
1188 60 :
1189 60 : if info == pg_constants::XLOG_PARAMETER_CHANGE {
1190 4 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1191 0 : let rec = v17::XlParameterChange::decode(&mut buf);
1192 0 : cp.wal_level = rec.wal_level;
1193 0 : self.checkpoint_modified = true;
1194 4 : }
1195 56 : } else if info == pg_constants::XLOG_END_OF_RECOVERY {
1196 0 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1197 0 : let rec = v17::XlEndOfRecovery::decode(&mut buf);
1198 0 : cp.wal_level = rec.wal_level;
1199 0 : self.checkpoint_modified = true;
1200 0 : }
1201 56 : }
1202 :
1203 60 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1204 0 : if info == pg_constants::XLOG_NEXTOID {
1205 0 : let next_oid = buf.get_u32_le();
1206 0 : if cp.nextOid != next_oid {
1207 0 : cp.nextOid = next_oid;
1208 0 : self.checkpoint_modified = true;
1209 0 : }
1210 0 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
1211 0 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1212 : {
1213 0 : let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
1214 0 : buf.copy_to_slice(&mut checkpoint_bytes);
1215 0 : let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
1216 0 : trace!(
1217 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
1218 : xlog_checkpoint.oldestXid, cp.oldestXid
1219 : );
1220 0 : if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
1221 0 : cp.oldestXid = xlog_checkpoint.oldestXid;
1222 0 : }
1223 0 : trace!(
1224 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
1225 : xlog_checkpoint.oldestActiveXid, cp.oldestActiveXid
1226 : );
1227 :
1228 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
1229 : // because at shutdown, all in-progress transactions will implicitly
1230 : // end. Postgres startup code knows that, and allows hot standby to start
1231 : // immediately from a shutdown checkpoint.
1232 : //
1233 : // In Neon, Postgres hot standby startup always behaves as if starting from
1234 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
1235 : // instead of overwriting self.checkpoint.oldestActiveXid with
1236 : // InvalidTransactionid from the checkpoint WAL record, update it to a
1237 : // proper value, knowing that there are no in-progress transactions at this
1238 : // point, except for prepared transactions.
1239 : //
1240 : // See also the neon code changes in the InitWalRecovery() function.
1241 0 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
1242 0 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1243 : {
1244 0 : let oldest_active_xid = if pg_version >= 17 {
1245 0 : let mut oldest_active_full_xid = cp.nextXid.value;
1246 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1247 0 : if xid < oldest_active_full_xid {
1248 0 : oldest_active_full_xid = xid;
1249 0 : }
1250 : }
1251 0 : oldest_active_full_xid as u32
1252 : } else {
1253 0 : let mut oldest_active_xid = cp.nextXid.value as u32;
1254 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1255 0 : let narrow_xid = xid as u32;
1256 0 : if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
1257 0 : oldest_active_xid = narrow_xid;
1258 0 : }
1259 : }
1260 0 : oldest_active_xid
1261 : };
1262 0 : cp.oldestActiveXid = oldest_active_xid;
1263 0 : } else {
1264 0 : cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
1265 0 : }
1266 : // NB: We abuse the Checkpoint.redo field:
1267 : //
1268 : // - In PostgreSQL, the Checkpoint struct doesn't store the information
1269 : // of whether this is an online checkpoint or a shutdown checkpoint. It's
1270 : // stored in the XLOG info field of the WAL record, shutdown checkpoints
1271 : // use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
1272 : // XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
1273 : // in the pageserver, however.
1274 : //
1275 : // - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
1276 : // checkpoint record, if it's a shutdown checkpoint. But when we are
1277 : // starting from a shutdown checkpoint, the basebackup LSN is the *end*
1278 : // of the shutdown checkpoint WAL record. That makes it difficult to
1279 : // correctly detect whether we're starting from a shutdown record or
1280 : // not.
1281 : //
1282 : // To address both of those issues, we store 0 in the redo field if it's
1283 : // an online checkpoint record, and the record's *end* LSN if it's a
1284 : // shutdown checkpoint. We don't need the original redo pointer in neon,
1285 : // because we don't perform WAL replay at startup anyway, so we can get
1286 : // away with abusing the redo field like this.
1287 : //
1288 : // XXX: Ideally, we would persist the extra information in a more
1289 : // explicit format, rather than repurpose the fields of the Postgres
1290 : // struct like this. However, we already have persisted data like this,
1291 : // so we need to maintain backwards compatibility.
1292 : //
1293 : // NB: We didn't originally have this convention, so there are still old
1294 : // persisted records that didn't do this. Before, we didn't update the
1295 : // persisted redo field at all. That means that old records have a bogus
1296 : // redo pointer that points to some old value, from the checkpoint record
1297 : // that was originally imported from the data directory. If it was a
1298 : // project created in Neon, that means it points to the first checkpoint
1299 : // after initdb. That's OK for our purposes: all such old checkpoints are
1300 : // treated as old online checkpoints when the basebackup is created.
1301 0 : cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
1302 : // Store the *end* LSN of the checkpoint record. Or to be precise,
1303 : // the start LSN of the *next* record, i.e. if the record ends
1304 : // exactly at page boundary, the redo LSN points to just after the
1305 : // page header on the next page.
1306 0 : lsn.into()
1307 : } else {
1308 0 : Lsn::INVALID.into()
1309 : };
1310 :
1311 : // Write a new checkpoint key-value pair on every checkpoint record, even
1312 : // if nothing really changed. Not strictly required, but it seems nice to
1313 : // have some trace of the checkpoint records in the layer files at the same
1314 : // LSNs.
1315 0 : self.checkpoint_modified = true;
1316 0 : }
1317 : });
1318 :
1319 60 : Ok(())
1320 60 : }
1321 :
1322 0 : async fn ingest_logical_message_put(
1323 0 : &mut self,
1324 0 : put: PutLogicalMessage,
1325 0 : modification: &mut DatadirModification<'_>,
1326 0 : ctx: &RequestContext,
1327 0 : ) -> Result<(), WalIngestError> {
1328 0 : let PutLogicalMessage { path, buf } = put;
1329 0 : modification.put_file(path.as_str(), &buf, ctx).await
1330 0 : }
1331 :
1332 0 : fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<(), WalIngestError> {
1333 0 : match record {
1334 0 : StandbyRecord::RunningXacts(running_xacts) => {
1335 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1336 0 : cp.oldestActiveXid = running_xacts.oldest_running_xid;
1337 0 : });
1338 :
1339 0 : self.checkpoint_modified = true;
1340 0 : }
1341 0 : }
1342 0 :
1343 0 : Ok(())
1344 0 : }
1345 :
1346 0 : async fn ingest_replorigin_record(
1347 0 : &mut self,
1348 0 : record: ReploriginRecord,
1349 0 : modification: &mut DatadirModification<'_>,
1350 0 : ) -> Result<(), WalIngestError> {
1351 0 : match record {
1352 0 : ReploriginRecord::Set(set) => {
1353 0 : modification
1354 0 : .set_replorigin(set.node_id, set.remote_lsn)
1355 0 : .await?;
1356 : }
1357 0 : ReploriginRecord::Drop(drop) => {
1358 0 : modification.drop_replorigin(drop.node_id).await?;
1359 : }
1360 : }
1361 :
1362 0 : Ok(())
1363 0 : }
1364 :
1365 36 : async fn put_rel_creation(
1366 36 : &mut self,
1367 36 : modification: &mut DatadirModification<'_>,
1368 36 : rel: RelTag,
1369 36 : ctx: &RequestContext,
1370 36 : ) -> Result<(), WalIngestError> {
1371 36 : modification.put_rel_creation(rel, 0, ctx).await?;
1372 36 : Ok(())
1373 36 : }
1374 :
1375 : #[cfg(test)]
1376 544804 : async fn put_rel_page_image(
1377 544804 : &mut self,
1378 544804 : modification: &mut DatadirModification<'_>,
1379 544804 : rel: RelTag,
1380 544804 : blknum: BlockNumber,
1381 544804 : img: Bytes,
1382 544804 : ctx: &RequestContext,
1383 544804 : ) -> Result<(), WalIngestError> {
1384 544804 : self.handle_rel_extend(modification, rel, blknum, ctx)
1385 544804 : .await?;
1386 544804 : modification.put_rel_page_image(rel, blknum, img)?;
1387 544804 : Ok(())
1388 544804 : }
1389 :
1390 24 : async fn put_rel_wal_record(
1391 24 : &mut self,
1392 24 : modification: &mut DatadirModification<'_>,
1393 24 : rel: RelTag,
1394 24 : blknum: BlockNumber,
1395 24 : rec: NeonWalRecord,
1396 24 : ctx: &RequestContext,
1397 24 : ) -> Result<(), WalIngestError> {
1398 24 : self.handle_rel_extend(modification, rel, blknum, ctx)
1399 24 : .await?;
1400 24 : modification.put_rel_wal_record(rel, blknum, rec)?;
1401 24 : Ok(())
1402 24 : }
1403 :
1404 12024 : async fn put_rel_truncation(
1405 12024 : &mut self,
1406 12024 : modification: &mut DatadirModification<'_>,
1407 12024 : rel: RelTag,
1408 12024 : nblocks: BlockNumber,
1409 12024 : ctx: &RequestContext,
1410 12024 : ) -> Result<(), WalIngestError> {
1411 12024 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1412 12024 : Ok(())
1413 12024 : }
1414 :
1415 544828 : async fn handle_rel_extend(
1416 544828 : &mut self,
1417 544828 : modification: &mut DatadirModification<'_>,
1418 544828 : rel: RelTag,
1419 544828 : blknum: BlockNumber,
1420 544828 : ctx: &RequestContext,
1421 544828 : ) -> Result<(), WalIngestError> {
1422 544828 : let new_nblocks = blknum + 1;
1423 : // Check if the relation exists. We implicitly create relations on first
1424 : // record.
1425 544828 : let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
1426 :
1427 544828 : if new_nblocks > old_nblocks {
1428 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1429 544796 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1430 :
1431 544796 : let mut key = rel_block_to_key(rel, blknum);
1432 544796 :
1433 544796 : // fill the gap with zeros
1434 544796 : let mut gap_blocks_filled: u64 = 0;
1435 544796 : for gap_blknum in old_nblocks..blknum {
1436 5996 : key.field6 = gap_blknum;
1437 5996 :
1438 5996 : if self.shard.get_shard_number(&key) != self.shard.number {
1439 0 : continue;
1440 5996 : }
1441 5996 :
1442 5996 : modification.put_rel_page_image_zero(rel, gap_blknum)?;
1443 5996 : gap_blocks_filled += 1;
1444 : }
1445 :
1446 544796 : WAL_INGEST
1447 544796 : .gap_blocks_zeroed_on_rel_extend
1448 544796 : .inc_by(gap_blocks_filled);
1449 544796 :
1450 544796 : // Log something when relation extends cause use to fill gaps
1451 544796 : // with zero pages. Logging is rate limited per pg version to
1452 544796 : // avoid skewing.
1453 544796 : if gap_blocks_filled > 0 {
1454 : use std::sync::Mutex;
1455 :
1456 : use once_cell::sync::Lazy;
1457 : use utils::rate_limit::RateLimit;
1458 :
1459 : struct RateLimitPerPgVersion {
1460 : rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
1461 : }
1462 :
1463 : impl RateLimitPerPgVersion {
1464 0 : const fn new() -> Self {
1465 0 : Self {
1466 0 : rate_limiters: [const {
1467 4 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
1468 0 : }; 4],
1469 0 : }
1470 0 : }
1471 :
1472 8 : const fn rate_limiter(
1473 8 : &self,
1474 8 : pg_version: u32,
1475 8 : ) -> Option<&Lazy<Mutex<RateLimit>>> {
1476 : const MIN_PG_VERSION: u32 = 14;
1477 : const MAX_PG_VERSION: u32 = 17;
1478 :
1479 8 : if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
1480 0 : return None;
1481 8 : }
1482 8 :
1483 8 : Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
1484 8 : }
1485 : }
1486 :
1487 : static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
1488 8 : if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
1489 8 : if let Ok(mut locked) = rate_limiter.try_lock() {
1490 8 : locked.call(|| {
1491 4 : info!(
1492 0 : lsn=%modification.get_lsn(),
1493 0 : pg_version=%modification.tline.pg_version,
1494 0 : rel=%rel,
1495 0 : "Filled {} gap blocks on rel extend to {} from {}",
1496 : gap_blocks_filled,
1497 : new_nblocks,
1498 : old_nblocks);
1499 8 : });
1500 8 : }
1501 0 : }
1502 544788 : }
1503 32 : }
1504 544828 : Ok(())
1505 544828 : }
1506 :
1507 0 : async fn put_slru_page_image(
1508 0 : &mut self,
1509 0 : modification: &mut DatadirModification<'_>,
1510 0 : kind: SlruKind,
1511 0 : segno: u32,
1512 0 : blknum: BlockNumber,
1513 0 : img: Bytes,
1514 0 : ctx: &RequestContext,
1515 0 : ) -> Result<(), WalIngestError> {
1516 0 : if !self.shard.is_shard_zero() {
1517 0 : return Ok(());
1518 0 : }
1519 0 :
1520 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1521 0 : .await?;
1522 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1523 0 : Ok(())
1524 0 : }
1525 :
1526 0 : async fn handle_slru_extend(
1527 0 : &mut self,
1528 0 : modification: &mut DatadirModification<'_>,
1529 0 : kind: SlruKind,
1530 0 : segno: u32,
1531 0 : blknum: BlockNumber,
1532 0 : ctx: &RequestContext,
1533 0 : ) -> Result<(), WalIngestError> {
1534 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1535 0 : // extended with ZEROPAGE records, not with commit records, so it happens
1536 0 : // a lot less frequently.
1537 0 :
1538 0 : let new_nblocks = blknum + 1;
1539 : // Check if the relation exists. We implicitly create relations on first
1540 : // record.
1541 : // TODO: would be nice if to be more explicit about it
1542 0 : let old_nblocks = if !modification
1543 0 : .tline
1544 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1545 0 : .await?
1546 : {
1547 : // create it with 0 size initially, the logic below will extend it
1548 0 : modification
1549 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1550 0 : .await?;
1551 0 : 0
1552 : } else {
1553 0 : modification
1554 0 : .tline
1555 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1556 0 : .await?
1557 : };
1558 :
1559 0 : if new_nblocks > old_nblocks {
1560 0 : trace!(
1561 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1562 : kind, segno, old_nblocks, new_nblocks
1563 : );
1564 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1565 :
1566 : // fill the gap with zeros
1567 0 : for gap_blknum in old_nblocks..blknum {
1568 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
1569 : }
1570 0 : }
1571 0 : Ok(())
1572 0 : }
1573 : }
1574 :
1575 : /// Returns the size of the relation as of this modification, or None if the relation doesn't exist.
1576 : ///
1577 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
1578 : /// page number stored in the shard, or None if the shard does not have any pages for it.
1579 24 : async fn get_relsize(
1580 24 : modification: &DatadirModification<'_>,
1581 24 : rel: RelTag,
1582 24 : ctx: &RequestContext,
1583 24 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
1584 24 : if !modification
1585 24 : .tline
1586 24 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1587 24 : .await?
1588 : {
1589 0 : return Ok(None);
1590 24 : }
1591 24 : modification
1592 24 : .tline
1593 24 : .get_rel_size(rel, Version::Modified(modification), ctx)
1594 24 : .await
1595 24 : .map(Some)
1596 24 : }
1597 :
1598 : #[allow(clippy::bool_assert_comparison)]
1599 : #[cfg(test)]
1600 : mod tests {
1601 : use anyhow::Result;
1602 : use postgres_ffi::RELSEG_SIZE;
1603 :
1604 : use super::*;
1605 : use crate::DEFAULT_PG_VERSION;
1606 : use crate::tenant::harness::*;
1607 : use crate::tenant::remote_timeline_client::{INITDB_PATH, remote_initdb_archive_path};
1608 : use crate::tenant::storage_layer::IoConcurrency;
1609 :
1610 : /// Arbitrary relation tag, for testing.
1611 : const TESTREL_A: RelTag = RelTag {
1612 : spcnode: 0,
1613 : dbnode: 111,
1614 : relnode: 1000,
1615 : forknum: 0,
1616 : };
1617 :
1618 24 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1619 24 : // TODO
1620 24 : }
1621 :
1622 : #[tokio::test]
1623 4 : async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
1624 16 : for i in 14..=16 {
1625 12 : dispatch_pgversion!(i, {
1626 4 : pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
1627 4 : });
1628 4 : }
1629 4 :
1630 4 : Ok(())
1631 4 : }
1632 :
1633 16 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1634 16 : let mut m = tline.begin_modification(Lsn(0x10));
1635 16 : m.put_checkpoint(dispatch_pgversion!(
1636 16 : tline.pg_version,
1637 0 : pgv::ZERO_CHECKPOINT.clone()
1638 0 : ))?;
1639 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1640 16 : m.commit(ctx).await?;
1641 16 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1642 :
1643 16 : Ok(walingest)
1644 16 : }
1645 :
1646 : #[tokio::test]
1647 4 : async fn test_relsize() -> Result<()> {
1648 4 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
1649 4 : let io_concurrency = IoConcurrency::spawn_for_test();
1650 4 : let tline = tenant
1651 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1652 4 : .await?;
1653 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1654 4 :
1655 4 : let mut m = tline.begin_modification(Lsn(0x20));
1656 4 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1657 4 : walingest
1658 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1659 4 : .await?;
1660 4 : m.commit(&ctx).await?;
1661 4 : let mut m = tline.begin_modification(Lsn(0x30));
1662 4 : walingest
1663 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
1664 4 : .await?;
1665 4 : m.commit(&ctx).await?;
1666 4 : let mut m = tline.begin_modification(Lsn(0x40));
1667 4 : walingest
1668 4 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
1669 4 : .await?;
1670 4 : m.commit(&ctx).await?;
1671 4 : let mut m = tline.begin_modification(Lsn(0x50));
1672 4 : walingest
1673 4 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
1674 4 : .await?;
1675 4 : m.commit(&ctx).await?;
1676 4 :
1677 4 : assert_current_logical_size(&tline, Lsn(0x50));
1678 4 :
1679 4 : let test_span = tracing::info_span!(parent: None, "test",
1680 4 : tenant_id=%tline.tenant_shard_id.tenant_id,
1681 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
1682 0 : timeline_id=%tline.timeline_id);
1683 4 :
1684 4 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1685 4 : assert_eq!(
1686 4 : tline
1687 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1688 4 : .await?,
1689 4 : false
1690 4 : );
1691 4 : assert!(
1692 4 : tline
1693 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1694 4 : .await
1695 4 : .is_err()
1696 4 : );
1697 4 : assert_eq!(
1698 4 : tline
1699 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1700 4 : .await?,
1701 4 : true
1702 4 : );
1703 4 : assert_eq!(
1704 4 : tline
1705 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1706 4 : .await?,
1707 4 : 1
1708 4 : );
1709 4 : assert_eq!(
1710 4 : tline
1711 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1712 4 : .await?,
1713 4 : 3
1714 4 : );
1715 4 :
1716 4 : // Check page contents at each LSN
1717 4 : assert_eq!(
1718 4 : tline
1719 4 : .get_rel_page_at_lsn(
1720 4 : TESTREL_A,
1721 4 : 0,
1722 4 : Version::Lsn(Lsn(0x20)),
1723 4 : &ctx,
1724 4 : io_concurrency.clone()
1725 4 : )
1726 4 : .instrument(test_span.clone())
1727 4 : .await?,
1728 4 : test_img("foo blk 0 at 2")
1729 4 : );
1730 4 :
1731 4 : assert_eq!(
1732 4 : tline
1733 4 : .get_rel_page_at_lsn(
1734 4 : TESTREL_A,
1735 4 : 0,
1736 4 : Version::Lsn(Lsn(0x30)),
1737 4 : &ctx,
1738 4 : io_concurrency.clone()
1739 4 : )
1740 4 : .instrument(test_span.clone())
1741 4 : .await?,
1742 4 : test_img("foo blk 0 at 3")
1743 4 : );
1744 4 :
1745 4 : assert_eq!(
1746 4 : tline
1747 4 : .get_rel_page_at_lsn(
1748 4 : TESTREL_A,
1749 4 : 0,
1750 4 : Version::Lsn(Lsn(0x40)),
1751 4 : &ctx,
1752 4 : io_concurrency.clone()
1753 4 : )
1754 4 : .instrument(test_span.clone())
1755 4 : .await?,
1756 4 : test_img("foo blk 0 at 3")
1757 4 : );
1758 4 : assert_eq!(
1759 4 : tline
1760 4 : .get_rel_page_at_lsn(
1761 4 : TESTREL_A,
1762 4 : 1,
1763 4 : Version::Lsn(Lsn(0x40)),
1764 4 : &ctx,
1765 4 : io_concurrency.clone()
1766 4 : )
1767 4 : .instrument(test_span.clone())
1768 4 : .await?,
1769 4 : test_img("foo blk 1 at 4")
1770 4 : );
1771 4 :
1772 4 : assert_eq!(
1773 4 : tline
1774 4 : .get_rel_page_at_lsn(
1775 4 : TESTREL_A,
1776 4 : 0,
1777 4 : Version::Lsn(Lsn(0x50)),
1778 4 : &ctx,
1779 4 : io_concurrency.clone()
1780 4 : )
1781 4 : .instrument(test_span.clone())
1782 4 : .await?,
1783 4 : test_img("foo blk 0 at 3")
1784 4 : );
1785 4 : assert_eq!(
1786 4 : tline
1787 4 : .get_rel_page_at_lsn(
1788 4 : TESTREL_A,
1789 4 : 1,
1790 4 : Version::Lsn(Lsn(0x50)),
1791 4 : &ctx,
1792 4 : io_concurrency.clone()
1793 4 : )
1794 4 : .instrument(test_span.clone())
1795 4 : .await?,
1796 4 : test_img("foo blk 1 at 4")
1797 4 : );
1798 4 : assert_eq!(
1799 4 : tline
1800 4 : .get_rel_page_at_lsn(
1801 4 : TESTREL_A,
1802 4 : 2,
1803 4 : Version::Lsn(Lsn(0x50)),
1804 4 : &ctx,
1805 4 : io_concurrency.clone()
1806 4 : )
1807 4 : .instrument(test_span.clone())
1808 4 : .await?,
1809 4 : test_img("foo blk 2 at 5")
1810 4 : );
1811 4 :
1812 4 : // Truncate last block
1813 4 : let mut m = tline.begin_modification(Lsn(0x60));
1814 4 : walingest
1815 4 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1816 4 : .await?;
1817 4 : m.commit(&ctx).await?;
1818 4 : assert_current_logical_size(&tline, Lsn(0x60));
1819 4 :
1820 4 : // Check reported size and contents after truncation
1821 4 : assert_eq!(
1822 4 : tline
1823 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
1824 4 : .await?,
1825 4 : 2
1826 4 : );
1827 4 : assert_eq!(
1828 4 : tline
1829 4 : .get_rel_page_at_lsn(
1830 4 : TESTREL_A,
1831 4 : 0,
1832 4 : Version::Lsn(Lsn(0x60)),
1833 4 : &ctx,
1834 4 : io_concurrency.clone()
1835 4 : )
1836 4 : .instrument(test_span.clone())
1837 4 : .await?,
1838 4 : test_img("foo blk 0 at 3")
1839 4 : );
1840 4 : assert_eq!(
1841 4 : tline
1842 4 : .get_rel_page_at_lsn(
1843 4 : TESTREL_A,
1844 4 : 1,
1845 4 : Version::Lsn(Lsn(0x60)),
1846 4 : &ctx,
1847 4 : io_concurrency.clone()
1848 4 : )
1849 4 : .instrument(test_span.clone())
1850 4 : .await?,
1851 4 : test_img("foo blk 1 at 4")
1852 4 : );
1853 4 :
1854 4 : // should still see the truncated block with older LSN
1855 4 : assert_eq!(
1856 4 : tline
1857 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1858 4 : .await?,
1859 4 : 3
1860 4 : );
1861 4 : assert_eq!(
1862 4 : tline
1863 4 : .get_rel_page_at_lsn(
1864 4 : TESTREL_A,
1865 4 : 2,
1866 4 : Version::Lsn(Lsn(0x50)),
1867 4 : &ctx,
1868 4 : io_concurrency.clone()
1869 4 : )
1870 4 : .instrument(test_span.clone())
1871 4 : .await?,
1872 4 : test_img("foo blk 2 at 5")
1873 4 : );
1874 4 :
1875 4 : // Truncate to zero length
1876 4 : let mut m = tline.begin_modification(Lsn(0x68));
1877 4 : walingest
1878 4 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1879 4 : .await?;
1880 4 : m.commit(&ctx).await?;
1881 4 : assert_eq!(
1882 4 : tline
1883 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
1884 4 : .await?,
1885 4 : 0
1886 4 : );
1887 4 :
1888 4 : // Extend from 0 to 2 blocks, leaving a gap
1889 4 : let mut m = tline.begin_modification(Lsn(0x70));
1890 4 : walingest
1891 4 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
1892 4 : .await?;
1893 4 : m.commit(&ctx).await?;
1894 4 : assert_eq!(
1895 4 : tline
1896 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
1897 4 : .await?,
1898 4 : 2
1899 4 : );
1900 4 : assert_eq!(
1901 4 : tline
1902 4 : .get_rel_page_at_lsn(
1903 4 : TESTREL_A,
1904 4 : 0,
1905 4 : Version::Lsn(Lsn(0x70)),
1906 4 : &ctx,
1907 4 : io_concurrency.clone()
1908 4 : )
1909 4 : .instrument(test_span.clone())
1910 4 : .await?,
1911 4 : ZERO_PAGE
1912 4 : );
1913 4 : assert_eq!(
1914 4 : tline
1915 4 : .get_rel_page_at_lsn(
1916 4 : TESTREL_A,
1917 4 : 1,
1918 4 : Version::Lsn(Lsn(0x70)),
1919 4 : &ctx,
1920 4 : io_concurrency.clone()
1921 4 : )
1922 4 : .instrument(test_span.clone())
1923 4 : .await?,
1924 4 : test_img("foo blk 1")
1925 4 : );
1926 4 :
1927 4 : // Extend a lot more, leaving a big gap that spans across segments
1928 4 : let mut m = tline.begin_modification(Lsn(0x80));
1929 4 : walingest
1930 4 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
1931 4 : .await?;
1932 4 : m.commit(&ctx).await?;
1933 4 : assert_eq!(
1934 4 : tline
1935 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
1936 4 : .await?,
1937 4 : 1501
1938 4 : );
1939 5996 : for blk in 2..1500 {
1940 5992 : assert_eq!(
1941 5992 : tline
1942 5992 : .get_rel_page_at_lsn(
1943 5992 : TESTREL_A,
1944 5992 : blk,
1945 5992 : Version::Lsn(Lsn(0x80)),
1946 5992 : &ctx,
1947 5992 : io_concurrency.clone()
1948 5992 : )
1949 5992 : .instrument(test_span.clone())
1950 5992 : .await?,
1951 5992 : ZERO_PAGE
1952 4 : );
1953 4 : }
1954 4 : assert_eq!(
1955 4 : tline
1956 4 : .get_rel_page_at_lsn(
1957 4 : TESTREL_A,
1958 4 : 1500,
1959 4 : Version::Lsn(Lsn(0x80)),
1960 4 : &ctx,
1961 4 : io_concurrency.clone()
1962 4 : )
1963 4 : .instrument(test_span.clone())
1964 4 : .await?,
1965 4 : test_img("foo blk 1500")
1966 4 : );
1967 4 :
1968 4 : Ok(())
1969 4 : }
1970 :
1971 : // Test what happens if we dropped a relation
1972 : // and then created it again within the same layer.
1973 : #[tokio::test]
1974 4 : async fn test_drop_extend() -> Result<()> {
1975 4 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
1976 4 : .await?
1977 4 : .load()
1978 4 : .await;
1979 4 : let tline = tenant
1980 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1981 4 : .await?;
1982 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1983 4 :
1984 4 : let mut m = tline.begin_modification(Lsn(0x20));
1985 4 : walingest
1986 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1987 4 : .await?;
1988 4 : m.commit(&ctx).await?;
1989 4 :
1990 4 : // Check that rel exists and size is correct
1991 4 : assert_eq!(
1992 4 : tline
1993 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1994 4 : .await?,
1995 4 : true
1996 4 : );
1997 4 : assert_eq!(
1998 4 : tline
1999 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2000 4 : .await?,
2001 4 : 1
2002 4 : );
2003 4 :
2004 4 : // Drop rel
2005 4 : let mut m = tline.begin_modification(Lsn(0x30));
2006 4 : let mut rel_drops = HashMap::new();
2007 4 : rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
2008 4 : m.put_rel_drops(rel_drops, &ctx).await?;
2009 4 : m.commit(&ctx).await?;
2010 4 :
2011 4 : // Check that rel is not visible anymore
2012 4 : assert_eq!(
2013 4 : tline
2014 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
2015 4 : .await?,
2016 4 : false
2017 4 : );
2018 4 :
2019 4 : // FIXME: should fail
2020 4 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
2021 4 :
2022 4 : // Re-create it
2023 4 : let mut m = tline.begin_modification(Lsn(0x40));
2024 4 : walingest
2025 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
2026 4 : .await?;
2027 4 : m.commit(&ctx).await?;
2028 4 :
2029 4 : // Check that rel exists and size is correct
2030 4 : assert_eq!(
2031 4 : tline
2032 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2033 4 : .await?,
2034 4 : true
2035 4 : );
2036 4 : assert_eq!(
2037 4 : tline
2038 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2039 4 : .await?,
2040 4 : 1
2041 4 : );
2042 4 :
2043 4 : Ok(())
2044 4 : }
2045 :
2046 : // Test what happens if we truncated a relation
2047 : // so that one of its segments was dropped
2048 : // and then extended it again within the same layer.
2049 : #[tokio::test]
2050 4 : async fn test_truncate_extend() -> Result<()> {
2051 4 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
2052 4 : .await?
2053 4 : .load()
2054 4 : .await;
2055 4 : let io_concurrency = IoConcurrency::spawn_for_test();
2056 4 : let tline = tenant
2057 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2058 4 : .await?;
2059 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2060 4 :
2061 4 : // Create a 20 MB relation (the size is arbitrary)
2062 4 : let relsize = 20 * 1024 * 1024 / 8192;
2063 4 : let mut m = tline.begin_modification(Lsn(0x20));
2064 10240 : for blkno in 0..relsize {
2065 10240 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
2066 10240 : walingest
2067 10240 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2068 10240 : .await?;
2069 4 : }
2070 4 : m.commit(&ctx).await?;
2071 4 :
2072 4 : let test_span = tracing::info_span!(parent: None, "test",
2073 4 : tenant_id=%tline.tenant_shard_id.tenant_id,
2074 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
2075 0 : timeline_id=%tline.timeline_id);
2076 4 :
2077 4 : // The relation was created at LSN 20, not visible at LSN 1 yet.
2078 4 : assert_eq!(
2079 4 : tline
2080 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2081 4 : .await?,
2082 4 : false
2083 4 : );
2084 4 : assert!(
2085 4 : tline
2086 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2087 4 : .await
2088 4 : .is_err()
2089 4 : );
2090 4 :
2091 4 : assert_eq!(
2092 4 : tline
2093 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2094 4 : .await?,
2095 4 : true
2096 4 : );
2097 4 : assert_eq!(
2098 4 : tline
2099 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2100 4 : .await?,
2101 4 : relsize
2102 4 : );
2103 4 :
2104 4 : // Check relation content
2105 10240 : for blkno in 0..relsize {
2106 10240 : let lsn = Lsn(0x20);
2107 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2108 10240 : assert_eq!(
2109 10240 : tline
2110 10240 : .get_rel_page_at_lsn(
2111 10240 : TESTREL_A,
2112 10240 : blkno,
2113 10240 : Version::Lsn(lsn),
2114 10240 : &ctx,
2115 10240 : io_concurrency.clone()
2116 10240 : )
2117 10240 : .instrument(test_span.clone())
2118 10240 : .await?,
2119 10240 : test_img(&data)
2120 4 : );
2121 4 : }
2122 4 :
2123 4 : // Truncate relation so that second segment was dropped
2124 4 : // - only leave one page
2125 4 : let mut m = tline.begin_modification(Lsn(0x60));
2126 4 : walingest
2127 4 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2128 4 : .await?;
2129 4 : m.commit(&ctx).await?;
2130 4 :
2131 4 : // Check reported size and contents after truncation
2132 4 : assert_eq!(
2133 4 : tline
2134 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2135 4 : .await?,
2136 4 : 1
2137 4 : );
2138 4 :
2139 8 : for blkno in 0..1 {
2140 4 : let lsn = Lsn(0x20);
2141 4 : let data = format!("foo blk {} at {}", blkno, lsn);
2142 4 : assert_eq!(
2143 4 : tline
2144 4 : .get_rel_page_at_lsn(
2145 4 : TESTREL_A,
2146 4 : blkno,
2147 4 : Version::Lsn(Lsn(0x60)),
2148 4 : &ctx,
2149 4 : io_concurrency.clone()
2150 4 : )
2151 4 : .instrument(test_span.clone())
2152 4 : .await?,
2153 4 : test_img(&data)
2154 4 : );
2155 4 : }
2156 4 :
2157 4 : // should still see all blocks with older LSN
2158 4 : assert_eq!(
2159 4 : tline
2160 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2161 4 : .await?,
2162 4 : relsize
2163 4 : );
2164 10240 : for blkno in 0..relsize {
2165 10240 : let lsn = Lsn(0x20);
2166 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2167 10240 : assert_eq!(
2168 10240 : tline
2169 10240 : .get_rel_page_at_lsn(
2170 10240 : TESTREL_A,
2171 10240 : blkno,
2172 10240 : Version::Lsn(Lsn(0x50)),
2173 10240 : &ctx,
2174 10240 : io_concurrency.clone()
2175 10240 : )
2176 10240 : .instrument(test_span.clone())
2177 10240 : .await?,
2178 10240 : test_img(&data)
2179 4 : );
2180 4 : }
2181 4 :
2182 4 : // Extend relation again.
2183 4 : // Add enough blocks to create second segment
2184 4 : let lsn = Lsn(0x80);
2185 4 : let mut m = tline.begin_modification(lsn);
2186 10240 : for blkno in 0..relsize {
2187 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2188 10240 : walingest
2189 10240 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2190 10240 : .await?;
2191 4 : }
2192 4 : m.commit(&ctx).await?;
2193 4 :
2194 4 : assert_eq!(
2195 4 : tline
2196 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2197 4 : .await?,
2198 4 : true
2199 4 : );
2200 4 : assert_eq!(
2201 4 : tline
2202 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2203 4 : .await?,
2204 4 : relsize
2205 4 : );
2206 4 : // Check relation content
2207 10240 : for blkno in 0..relsize {
2208 10240 : let lsn = Lsn(0x80);
2209 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2210 10240 : assert_eq!(
2211 10240 : tline
2212 10240 : .get_rel_page_at_lsn(
2213 10240 : TESTREL_A,
2214 10240 : blkno,
2215 10240 : Version::Lsn(Lsn(0x80)),
2216 10240 : &ctx,
2217 10240 : io_concurrency.clone()
2218 10240 : )
2219 10240 : .instrument(test_span.clone())
2220 10240 : .await?,
2221 10240 : test_img(&data)
2222 4 : );
2223 4 : }
2224 4 :
2225 4 : Ok(())
2226 4 : }
2227 :
2228 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2229 : /// split into multiple 1 GB segments in Postgres.
2230 : #[tokio::test]
2231 4 : async fn test_large_rel() -> Result<()> {
2232 4 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
2233 4 : let tline = tenant
2234 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2235 4 : .await?;
2236 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2237 4 :
2238 4 : let mut lsn = 0x10;
2239 524292 : for blknum in 0..RELSEG_SIZE + 1 {
2240 524292 : lsn += 0x10;
2241 524292 : let mut m = tline.begin_modification(Lsn(lsn));
2242 524292 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2243 524292 : walingest
2244 524292 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2245 524292 : .await?;
2246 524292 : m.commit(&ctx).await?;
2247 4 : }
2248 4 :
2249 4 : assert_current_logical_size(&tline, Lsn(lsn));
2250 4 :
2251 4 : assert_eq!(
2252 4 : tline
2253 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2254 4 : .await?,
2255 4 : RELSEG_SIZE + 1
2256 4 : );
2257 4 :
2258 4 : // Truncate one block
2259 4 : lsn += 0x10;
2260 4 : let mut m = tline.begin_modification(Lsn(lsn));
2261 4 : walingest
2262 4 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2263 4 : .await?;
2264 4 : m.commit(&ctx).await?;
2265 4 : assert_eq!(
2266 4 : tline
2267 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2268 4 : .await?,
2269 4 : RELSEG_SIZE
2270 4 : );
2271 4 : assert_current_logical_size(&tline, Lsn(lsn));
2272 4 :
2273 4 : // Truncate another block
2274 4 : lsn += 0x10;
2275 4 : let mut m = tline.begin_modification(Lsn(lsn));
2276 4 : walingest
2277 4 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2278 4 : .await?;
2279 4 : m.commit(&ctx).await?;
2280 4 : assert_eq!(
2281 4 : tline
2282 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2283 4 : .await?,
2284 4 : RELSEG_SIZE - 1
2285 4 : );
2286 4 : assert_current_logical_size(&tline, Lsn(lsn));
2287 4 :
2288 4 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2289 4 : // This tests the behavior at segment boundaries
2290 4 : let mut size: i32 = 3000;
2291 12008 : while size >= 0 {
2292 12004 : lsn += 0x10;
2293 12004 : let mut m = tline.begin_modification(Lsn(lsn));
2294 12004 : walingest
2295 12004 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2296 12004 : .await?;
2297 12004 : m.commit(&ctx).await?;
2298 12004 : assert_eq!(
2299 12004 : tline
2300 12004 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2301 12004 : .await?,
2302 12004 : size as BlockNumber
2303 4 : );
2304 4 :
2305 12004 : size -= 1;
2306 4 : }
2307 4 : assert_current_logical_size(&tline, Lsn(lsn));
2308 4 :
2309 4 : Ok(())
2310 4 : }
2311 :
2312 : /// Replay a wal segment file taken directly from safekeepers.
2313 : ///
2314 : /// This test is useful for benchmarking since it allows us to profile only
2315 : /// the walingest code in a single-threaded executor, and iterate more quickly
2316 : /// without waiting for unrelated steps.
2317 : #[tokio::test]
2318 4 : async fn test_ingest_real_wal() {
2319 4 : use postgres_ffi::WAL_SEGMENT_SIZE;
2320 4 : use postgres_ffi::waldecoder::WalStreamDecoder;
2321 4 :
2322 4 : use crate::tenant::harness::*;
2323 4 :
2324 4 : // Define test data path and constants.
2325 4 : //
2326 4 : // Steps to reconstruct the data, if needed:
2327 4 : // 1. Run the pgbench python test
2328 4 : // 2. Take the first wal segment file from safekeeper
2329 4 : // 3. Compress it using `zstd --long input_file`
2330 4 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2331 4 : // 5. Grep sk logs for "restart decoder" to get startpoint
2332 4 : // 6. Run just the decoder from this test to get the endpoint.
2333 4 : // It's the last LSN the decoder will output.
2334 4 : let pg_version = 15; // The test data was generated by pg15
2335 4 : let path = "test_data/sk_wal_segment_from_pgbench";
2336 4 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2337 4 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2338 4 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2339 4 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2340 4 :
2341 4 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
2342 4 : let span = harness
2343 4 : .span()
2344 4 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
2345 4 : let (tenant, ctx) = harness.load().await;
2346 4 :
2347 4 : let remote_initdb_path =
2348 4 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2349 4 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2350 4 :
2351 4 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2352 4 : .expect("creating test dir should work");
2353 4 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2354 4 :
2355 4 : // Bootstrap a real timeline. We can't use create_test_timeline because
2356 4 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2357 4 : // the garbage data.
2358 4 : let tline = tenant
2359 4 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2360 4 : .await
2361 4 : .unwrap();
2362 4 :
2363 4 : // We fully read and decompress this into memory before decoding
2364 4 : // to get a more accurate perf profile of the decoder.
2365 4 : let bytes = {
2366 4 : use async_compression::tokio::bufread::ZstdDecoder;
2367 4 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2368 4 : let reader = tokio::io::BufReader::new(file);
2369 4 : let decoder = ZstdDecoder::new(reader);
2370 4 : let mut reader = tokio::io::BufReader::new(decoder);
2371 4 : let mut buffer = Vec::new();
2372 4 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2373 4 : buffer
2374 4 : };
2375 4 :
2376 4 : // TODO start a profiler too
2377 4 : let started_at = std::time::Instant::now();
2378 4 :
2379 4 : // Initialize walingest
2380 4 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2381 4 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2382 4 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2383 4 : .await
2384 4 : .unwrap();
2385 4 : let mut modification = tline.begin_modification(startpoint);
2386 4 : println!("decoding {} bytes", bytes.len() - xlogoff);
2387 4 :
2388 4 : // Decode and ingest wal. We process the wal in chunks because
2389 4 : // that's what happens when we get bytes from safekeepers.
2390 949372 : for chunk in bytes[xlogoff..].chunks(50) {
2391 949372 : decoder.feed_bytes(chunk);
2392 1241072 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2393 291700 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
2394 291700 : recdata,
2395 291700 : &[*modification.tline.get_shard_identity()],
2396 291700 : lsn,
2397 291700 : modification.tline.pg_version,
2398 291700 : )
2399 291700 : .unwrap()
2400 291700 : .remove(modification.tline.get_shard_identity())
2401 291700 : .unwrap();
2402 291700 :
2403 291700 : walingest
2404 291700 : .ingest_record(interpreted, &mut modification, &ctx)
2405 291700 : .instrument(span.clone())
2406 291700 : .await
2407 291700 : .unwrap();
2408 4 : }
2409 949372 : modification.commit(&ctx).await.unwrap();
2410 4 : }
2411 4 :
2412 4 : let duration = started_at.elapsed();
2413 4 : println!("done in {:?}", duration);
2414 4 : }
2415 : }
|