Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> [`wal_decoder`] -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers.
9 : //! Records get decoded and interpreted in the [`wal_decoder`] module
10 : //! and then stored to the Repository by WalIngest.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
14 : //! extracts page images out of some WAL records, but mostly it's WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_rel_wal_record or put_rel_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::collections::HashMap;
25 : use std::sync::{Arc, OnceLock};
26 : use std::time::{Duration, Instant, SystemTime};
27 :
28 : use anyhow::{Result, bail};
29 : use bytes::{Buf, Bytes};
30 : use pageserver_api::key::rel_block_to_key;
31 : use pageserver_api::record::NeonWalRecord;
32 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
33 : use pageserver_api::shard::ShardIdentity;
34 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
35 : use postgres_ffi::walrecord::*;
36 : use postgres_ffi::{
37 : TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
38 : fsm_logical_to_physical, pg_constants,
39 : };
40 : use tracing::*;
41 : use utils::bin_ser::SerializeError;
42 : use utils::lsn::Lsn;
43 : use utils::rate_limit::RateLimit;
44 : use utils::{critical, failpoint_support};
45 : use wal_decoder::models::*;
46 :
47 : use crate::ZERO_PAGE;
48 : use crate::context::RequestContext;
49 : use crate::metrics::WAL_INGEST;
50 : use crate::pgdatadir_mapping::{DatadirModification, Version};
51 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
52 : use crate::tenant::{PageReconstructError, Timeline};
53 :
54 : enum_pgversion! {CheckPoint, pgv::CheckPoint}
55 :
56 : impl CheckPoint {
57 12 : fn encode(&self) -> Result<Bytes, SerializeError> {
58 12 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
59 12 : }
60 :
61 291668 : fn update_next_xid(&mut self, xid: u32) -> bool {
62 291668 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
63 291668 : }
64 :
65 0 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
66 0 : enum_pgversion_dispatch!(self, CheckPoint, cp, {
67 0 : cp.update_next_multixid(multi_xid, multi_offset)
68 : })
69 0 : }
70 : }
71 :
72 : /// Temporary limitation of WAL lag warnings after attach
73 : ///
74 : /// After tenant attach, we want to limit WAL lag warnings because
75 : /// we don't look at the WAL until the attach is complete, which
76 : /// might take a while.
77 : pub struct WalLagCooldown {
78 : /// Until when should this limitation apply at all
79 : active_until: std::time::Instant,
80 : /// The maximum lag to suppress. Lags above this limit get reported anyways.
81 : max_lag: Duration,
82 : }
83 :
84 : impl WalLagCooldown {
85 0 : pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
86 0 : Self {
87 0 : active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
88 0 : max_lag: attach_duration * 2 + Duration::from_secs(60),
89 0 : }
90 0 : }
91 : }
92 :
93 : pub struct WalIngest {
94 : attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
95 : shard: ShardIdentity,
96 : checkpoint: CheckPoint,
97 : checkpoint_modified: bool,
98 : warn_ingest_lag: WarnIngestLag,
99 : }
100 :
101 : struct WarnIngestLag {
102 : lag_msg_ratelimit: RateLimit,
103 : future_lsn_msg_ratelimit: RateLimit,
104 : timestamp_invalid_msg_ratelimit: RateLimit,
105 : }
106 :
107 : impl WalIngest {
108 24 : pub async fn new(
109 24 : timeline: &Timeline,
110 24 : startpoint: Lsn,
111 24 : ctx: &RequestContext,
112 24 : ) -> anyhow::Result<WalIngest> {
113 : // Fetch the latest checkpoint into memory, so that we can compare with it
114 : // quickly in `ingest_record` and update it when it changes.
115 24 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
116 24 : let pgversion = timeline.pg_version;
117 :
118 24 : let checkpoint = dispatch_pgversion!(pgversion, {
119 0 : let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
120 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
121 0 : <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
122 : });
123 :
124 24 : Ok(WalIngest {
125 24 : shard: *timeline.get_shard_identity(),
126 24 : checkpoint,
127 24 : checkpoint_modified: false,
128 24 : attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
129 24 : warn_ingest_lag: WarnIngestLag {
130 24 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
131 24 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
132 24 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
133 24 : },
134 24 : })
135 24 : }
136 :
137 : /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
138 : /// storage of a given timeline.
139 : ///
140 : /// This function updates `lsn` field of `DatadirModification`
141 : ///
142 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
143 291704 : pub async fn ingest_record(
144 291704 : &mut self,
145 291704 : interpreted: InterpretedWalRecord,
146 291704 : modification: &mut DatadirModification<'_>,
147 291704 : ctx: &RequestContext,
148 291704 : ) -> anyhow::Result<bool> {
149 291704 : WAL_INGEST.records_received.inc();
150 291704 : let prev_len = modification.len();
151 291704 :
152 291704 : modification.set_lsn(interpreted.next_record_lsn)?;
153 :
154 291704 : if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
155 : // Records of this type should always be preceded by a commit(), as they
156 : // rely on reading data pages back from the Timeline.
157 0 : assert!(!modification.has_dirty_data());
158 291704 : }
159 :
160 291704 : assert!(!self.checkpoint_modified);
161 291704 : if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
162 291668 : && self.checkpoint.update_next_xid(interpreted.xid)
163 4 : {
164 4 : self.checkpoint_modified = true;
165 291700 : }
166 :
167 291704 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
168 :
169 132 : match interpreted.metadata_record {
170 24 : Some(MetadataRecord::Heapam(rec)) => match rec {
171 24 : HeapamRecord::ClearVmBits(clear_vm_bits) => {
172 24 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
173 24 : .await?;
174 : }
175 : },
176 0 : Some(MetadataRecord::Neonrmgr(rec)) => match rec {
177 0 : NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
178 0 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
179 0 : .await?;
180 : }
181 : },
182 32 : Some(MetadataRecord::Smgr(rec)) => match rec {
183 32 : SmgrRecord::Create(create) => {
184 32 : self.ingest_xlog_smgr_create(create, modification, ctx)
185 32 : .await?;
186 : }
187 0 : SmgrRecord::Truncate(truncate) => {
188 0 : self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
189 0 : .await?;
190 : }
191 : },
192 0 : Some(MetadataRecord::Dbase(rec)) => match rec {
193 0 : DbaseRecord::Create(create) => {
194 0 : self.ingest_xlog_dbase_create(create, modification, ctx)
195 0 : .await?;
196 : }
197 0 : DbaseRecord::Drop(drop) => {
198 0 : self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
199 : }
200 : },
201 0 : Some(MetadataRecord::Clog(rec)) => match rec {
202 0 : ClogRecord::ZeroPage(zero_page) => {
203 0 : self.ingest_clog_zero_page(zero_page, modification, ctx)
204 0 : .await?;
205 : }
206 0 : ClogRecord::Truncate(truncate) => {
207 0 : self.ingest_clog_truncate(truncate, modification, ctx)
208 0 : .await?;
209 : }
210 : },
211 16 : Some(MetadataRecord::Xact(rec)) => {
212 16 : self.ingest_xact_record(rec, modification, ctx).await?;
213 : }
214 0 : Some(MetadataRecord::MultiXact(rec)) => match rec {
215 0 : MultiXactRecord::ZeroPage(zero_page) => {
216 0 : self.ingest_multixact_zero_page(zero_page, modification, ctx)
217 0 : .await?;
218 : }
219 0 : MultiXactRecord::Create(create) => {
220 0 : self.ingest_multixact_create(modification, &create)?;
221 : }
222 0 : MultiXactRecord::Truncate(truncate) => {
223 0 : self.ingest_multixact_truncate(modification, &truncate, ctx)
224 0 : .await?;
225 : }
226 : },
227 0 : Some(MetadataRecord::Relmap(rec)) => match rec {
228 0 : RelmapRecord::Update(update) => {
229 0 : self.ingest_relmap_update(update, modification, ctx).await?;
230 : }
231 : },
232 60 : Some(MetadataRecord::Xlog(rec)) => match rec {
233 60 : XlogRecord::Raw(raw) => {
234 60 : self.ingest_raw_xlog_record(raw, modification, ctx).await?;
235 : }
236 : },
237 0 : Some(MetadataRecord::LogicalMessage(rec)) => match rec {
238 0 : LogicalMessageRecord::Put(put) => {
239 0 : self.ingest_logical_message_put(put, modification, ctx)
240 0 : .await?;
241 : }
242 : #[cfg(feature = "testing")]
243 : LogicalMessageRecord::Failpoint => {
244 : // This is a convenient way to make the WAL ingestion pause at
245 : // particular point in the WAL. For more fine-grained control,
246 : // we could peek into the message and only pause if it contains
247 : // a particular string, for example, but this is enough for now.
248 0 : failpoint_support::sleep_millis_async!(
249 0 : "pageserver-wal-ingest-logical-message-sleep"
250 0 : );
251 : }
252 : },
253 0 : Some(MetadataRecord::Standby(rec)) => {
254 0 : self.ingest_standby_record(rec).unwrap();
255 0 : }
256 0 : Some(MetadataRecord::Replorigin(rec)) => {
257 0 : self.ingest_replorigin_record(rec, modification).await?;
258 : }
259 291572 : None => {
260 291572 : // There are two cases through which we end up here:
261 291572 : // 1. The resource manager for the original PG WAL record
262 291572 : // is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
263 291572 : // record type within Neon.
264 291572 : // 2. The resource manager id was unknown to
265 291572 : // [`wal_decoder::decoder::MetadataRecord::from_decoded`].
266 291572 : // TODO(vlad): Tighten this up more once we build confidence
267 291572 : // that case (2) does not happen in the field.
268 291572 : }
269 : }
270 :
271 291704 : modification
272 291704 : .ingest_batch(interpreted.batch, &self.shard, ctx)
273 291704 : .await?;
274 :
275 : // If checkpoint data was updated, store the new version in the repository
276 291704 : if self.checkpoint_modified {
277 12 : let new_checkpoint_bytes = self.checkpoint.encode()?;
278 :
279 12 : modification.put_checkpoint(new_checkpoint_bytes)?;
280 12 : self.checkpoint_modified = false;
281 291692 : }
282 :
283 : // Note that at this point this record is only cached in the modification
284 : // until commit() is called to flush the data into the repository and update
285 : // the latest LSN.
286 :
287 291704 : Ok(modification.len() > prev_len)
288 291704 : }
289 :
290 : /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
291 0 : fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
292 0 : let next_full_xid =
293 0 : enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
294 :
295 0 : let next_xid = (next_full_xid) as u32;
296 0 : let mut epoch = (next_full_xid >> 32) as u32;
297 0 :
298 0 : if xid > next_xid {
299 : // Wraparound occurred, must be from a prev epoch.
300 0 : if epoch == 0 {
301 0 : bail!(
302 0 : "apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
303 0 : );
304 0 : }
305 0 : epoch -= 1;
306 0 : }
307 :
308 0 : Ok(((epoch as u64) << 32) | xid as u64)
309 0 : }
310 :
311 24 : async fn ingest_clear_vm_bits(
312 24 : &mut self,
313 24 : clear_vm_bits: ClearVmBits,
314 24 : modification: &mut DatadirModification<'_>,
315 24 : ctx: &RequestContext,
316 24 : ) -> anyhow::Result<()> {
317 24 : let ClearVmBits {
318 24 : new_heap_blkno,
319 24 : old_heap_blkno,
320 24 : flags,
321 24 : vm_rel,
322 24 : } = clear_vm_bits;
323 24 : // Clear the VM bits if required.
324 24 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
325 24 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
326 :
327 : // VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
328 : // its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
329 : // as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
330 : // https://github.com/neondatabase/neon/pull/10634.
331 24 : let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
332 0 : critical!("clear_vm_bits for unknown VM relation {vm_rel}");
333 0 : return Ok(());
334 : };
335 24 : if let Some(blknum) = new_vm_blk {
336 24 : if blknum >= vm_size {
337 0 : critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
338 0 : new_vm_blk = None;
339 24 : }
340 0 : }
341 24 : if let Some(blknum) = old_vm_blk {
342 0 : if blknum >= vm_size {
343 0 : critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
344 0 : old_vm_blk = None;
345 0 : }
346 24 : }
347 :
348 24 : if new_vm_blk.is_none() && old_vm_blk.is_none() {
349 0 : return Ok(());
350 24 : } else if new_vm_blk == old_vm_blk {
351 : // An UPDATE record that needs to clear the bits for both old and the new page, both of
352 : // which reside on the same VM page.
353 0 : self.put_rel_wal_record(
354 0 : modification,
355 0 : vm_rel,
356 0 : new_vm_blk.unwrap(),
357 0 : NeonWalRecord::ClearVisibilityMapFlags {
358 0 : new_heap_blkno,
359 0 : old_heap_blkno,
360 0 : flags,
361 0 : },
362 0 : ctx,
363 0 : )
364 0 : .await?;
365 : } else {
366 : // Clear VM bits for one heap page, or for two pages that reside on different VM pages.
367 24 : if let Some(new_vm_blk) = new_vm_blk {
368 24 : self.put_rel_wal_record(
369 24 : modification,
370 24 : vm_rel,
371 24 : new_vm_blk,
372 24 : NeonWalRecord::ClearVisibilityMapFlags {
373 24 : new_heap_blkno,
374 24 : old_heap_blkno: None,
375 24 : flags,
376 24 : },
377 24 : ctx,
378 24 : )
379 24 : .await?;
380 0 : }
381 24 : if let Some(old_vm_blk) = old_vm_blk {
382 0 : self.put_rel_wal_record(
383 0 : modification,
384 0 : vm_rel,
385 0 : old_vm_blk,
386 0 : NeonWalRecord::ClearVisibilityMapFlags {
387 0 : new_heap_blkno: None,
388 0 : old_heap_blkno,
389 0 : flags,
390 0 : },
391 0 : ctx,
392 0 : )
393 0 : .await?;
394 24 : }
395 : }
396 24 : Ok(())
397 24 : }
398 :
399 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
400 0 : async fn ingest_xlog_dbase_create(
401 0 : &mut self,
402 0 : create: DbaseCreate,
403 0 : modification: &mut DatadirModification<'_>,
404 0 : ctx: &RequestContext,
405 0 : ) -> anyhow::Result<()> {
406 0 : let DbaseCreate {
407 0 : db_id,
408 0 : tablespace_id,
409 0 : src_db_id,
410 0 : src_tablespace_id,
411 0 : } = create;
412 :
413 0 : let rels = modification
414 0 : .tline
415 0 : .list_rels(
416 0 : src_tablespace_id,
417 0 : src_db_id,
418 0 : Version::Modified(modification),
419 0 : ctx,
420 0 : )
421 0 : .await?;
422 :
423 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
424 :
425 : // Copy relfilemap
426 0 : let filemap = modification
427 0 : .tline
428 0 : .get_relmap_file(
429 0 : src_tablespace_id,
430 0 : src_db_id,
431 0 : Version::Modified(modification),
432 0 : ctx,
433 0 : )
434 0 : .await?;
435 0 : modification
436 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
437 0 : .await?;
438 :
439 0 : let mut num_rels_copied = 0;
440 0 : let mut num_blocks_copied = 0;
441 0 : for src_rel in rels {
442 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
443 0 : assert_eq!(src_rel.dbnode, src_db_id);
444 :
445 0 : let nblocks = modification
446 0 : .tline
447 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
448 0 : .await?;
449 0 : let dst_rel = RelTag {
450 0 : spcnode: tablespace_id,
451 0 : dbnode: db_id,
452 0 : relnode: src_rel.relnode,
453 0 : forknum: src_rel.forknum,
454 0 : };
455 0 :
456 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
457 :
458 : // Copy content
459 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
460 0 : for blknum in 0..nblocks {
461 : // Sharding:
462 : // - src and dst are always on the same shard, because they differ only by dbNode, and
463 : // dbNode is not included in the hash inputs for sharding.
464 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
465 : // that belong to it.
466 0 : let src_key = rel_block_to_key(src_rel, blknum);
467 0 : if !self.shard.is_key_local(&src_key) {
468 0 : debug!(
469 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
470 : src_key
471 : );
472 0 : continue;
473 0 : }
474 0 : debug!(
475 0 : "copying block {} from {} ({}) to {}",
476 : blknum, src_rel, src_key, dst_rel
477 : );
478 :
479 0 : let content = modification
480 0 : .tline
481 0 : .get_rel_page_at_lsn(
482 0 : src_rel,
483 0 : blknum,
484 0 : Version::Modified(modification),
485 0 : ctx,
486 0 : crate::tenant::storage_layer::IoConcurrency::sequential(),
487 0 : )
488 0 : .await?;
489 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
490 0 : num_blocks_copied += 1;
491 : }
492 :
493 0 : num_rels_copied += 1;
494 : }
495 :
496 0 : info!(
497 0 : "Created database {}/{}, copied {} blocks in {} rels",
498 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
499 : );
500 0 : Ok(())
501 0 : }
502 :
503 0 : async fn ingest_xlog_dbase_drop(
504 0 : &mut self,
505 0 : dbase_drop: DbaseDrop,
506 0 : modification: &mut DatadirModification<'_>,
507 0 : ctx: &RequestContext,
508 0 : ) -> anyhow::Result<()> {
509 0 : let DbaseDrop {
510 0 : db_id,
511 0 : tablespace_ids,
512 0 : } = dbase_drop;
513 0 : for tablespace_id in tablespace_ids {
514 0 : trace!("Drop db {}, {}", tablespace_id, db_id);
515 0 : modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
516 : }
517 :
518 0 : Ok(())
519 0 : }
520 :
521 32 : async fn ingest_xlog_smgr_create(
522 32 : &mut self,
523 32 : create: SmgrCreate,
524 32 : modification: &mut DatadirModification<'_>,
525 32 : ctx: &RequestContext,
526 32 : ) -> anyhow::Result<()> {
527 32 : let SmgrCreate { rel } = create;
528 32 : self.put_rel_creation(modification, rel, ctx).await?;
529 32 : Ok(())
530 32 : }
531 :
532 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
533 : ///
534 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
535 0 : async fn ingest_xlog_smgr_truncate(
536 0 : &mut self,
537 0 : truncate: XlSmgrTruncate,
538 0 : modification: &mut DatadirModification<'_>,
539 0 : ctx: &RequestContext,
540 0 : ) -> anyhow::Result<()> {
541 0 : let XlSmgrTruncate {
542 0 : blkno,
543 0 : rnode,
544 0 : flags,
545 0 : } = truncate;
546 0 :
547 0 : let spcnode = rnode.spcnode;
548 0 : let dbnode = rnode.dbnode;
549 0 : let relnode = rnode.relnode;
550 0 :
551 0 : if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
552 0 : let rel = RelTag {
553 0 : spcnode,
554 0 : dbnode,
555 0 : relnode,
556 0 : forknum: MAIN_FORKNUM,
557 0 : };
558 0 :
559 0 : self.put_rel_truncation(modification, rel, blkno, ctx)
560 0 : .await?;
561 0 : }
562 0 : if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
563 0 : let rel = RelTag {
564 0 : spcnode,
565 0 : dbnode,
566 0 : relnode,
567 0 : forknum: FSM_FORKNUM,
568 0 : };
569 0 :
570 0 : // Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
571 0 : // and instead of digging in the FSM bitmap format we just clear the whole page.
572 0 : let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
573 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
574 0 : if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
575 0 : && self
576 0 : .shard
577 0 : .is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
578 : {
579 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
580 0 : fsm_physical_page_no += 1;
581 0 : }
582 : // Truncate this shard's view of the FSM relation size, if it even has one.
583 0 : let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
584 0 : if nblocks > fsm_physical_page_no {
585 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
586 0 : .await?;
587 0 : }
588 0 : }
589 0 : if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
590 0 : let rel = RelTag {
591 0 : spcnode,
592 0 : dbnode,
593 0 : relnode,
594 0 : forknum: VISIBILITYMAP_FORKNUM,
595 0 : };
596 0 :
597 0 : // last remaining block, byte, and bit
598 0 : let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
599 0 : let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
600 0 : / pg_constants::VM_HEAPBLOCKS_PER_BYTE;
601 0 : let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
602 0 : * pg_constants::VM_BITS_PER_HEAPBLOCK;
603 0 :
604 0 : // Unless the new size is exactly at a visibility map page boundary, the
605 0 : // tail bits in the last remaining map page, representing truncated heap
606 0 : // blocks, need to be cleared. This is not only tidy, but also necessary
607 0 : // because we don't get a chance to clear the bits if the heap is extended
608 0 : // again. Only do this on the shard that owns the page.
609 0 : if (trunc_byte != 0 || trunc_offs != 0)
610 0 : && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
611 : {
612 0 : modification.put_rel_wal_record(
613 0 : rel,
614 0 : vm_page_no,
615 0 : NeonWalRecord::TruncateVisibilityMap {
616 0 : trunc_byte,
617 0 : trunc_offs,
618 0 : },
619 0 : )?;
620 0 : vm_page_no += 1;
621 0 : }
622 : // Truncate this shard's view of the VM relation size, if it even has one.
623 0 : let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
624 0 : if nblocks > vm_page_no {
625 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
626 0 : .await?;
627 0 : }
628 0 : }
629 0 : Ok(())
630 0 : }
631 :
632 16 : fn warn_on_ingest_lag(
633 16 : &mut self,
634 16 : conf: &crate::config::PageServerConf,
635 16 : wal_timestamp: TimestampTz,
636 16 : ) {
637 16 : debug_assert_current_span_has_tenant_and_timeline_id();
638 16 : let now = SystemTime::now();
639 16 : let rate_limits = &mut self.warn_ingest_lag;
640 :
641 16 : let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
642 0 : pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
643 : });
644 :
645 16 : match ts {
646 16 : Ok(ts) => {
647 16 : match now.duration_since(ts) {
648 16 : Ok(lag) => {
649 16 : if lag > conf.wait_lsn_timeout {
650 16 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
651 4 : if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
652 0 : if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
653 0 : return;
654 0 : }
655 4 : } else {
656 4 : // Still loading? We shouldn't be here
657 4 : }
658 4 : let lag = humantime::format_duration(lag);
659 4 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
660 16 : })
661 0 : }
662 : }
663 0 : Err(e) => {
664 0 : let delta_t = e.duration();
665 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
666 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
667 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
668 0 : if delta_t > IGNORED_DRIFT {
669 0 : let delta_t = humantime::format_duration(delta_t);
670 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
671 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
672 0 : })
673 0 : }
674 : }
675 : };
676 : }
677 0 : Err(error) => {
678 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
679 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
680 0 : })
681 : }
682 : }
683 16 : }
684 :
685 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
686 : ///
687 16 : async fn ingest_xact_record(
688 16 : &mut self,
689 16 : record: XactRecord,
690 16 : modification: &mut DatadirModification<'_>,
691 16 : ctx: &RequestContext,
692 16 : ) -> anyhow::Result<()> {
693 16 : let (xact_common, is_commit, is_prepared) = match record {
694 0 : XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
695 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
696 0 : self.adjust_to_full_transaction_id(xl_xid)?
697 : } else {
698 0 : xl_xid as u64
699 : };
700 0 : return modification.put_twophase_file(xid, data, ctx).await;
701 : }
702 16 : XactRecord::Commit(common) => (common, true, false),
703 0 : XactRecord::Abort(common) => (common, false, false),
704 0 : XactRecord::CommitPrepared(common) => (common, true, true),
705 0 : XactRecord::AbortPrepared(common) => (common, false, true),
706 : };
707 :
708 : let XactCommon {
709 16 : parsed,
710 16 : origin_id,
711 16 : xl_xid,
712 16 : lsn,
713 16 : } = xact_common;
714 16 :
715 16 : // Record update of CLOG pages
716 16 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
717 16 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
718 16 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
719 16 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
720 16 :
721 16 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
722 :
723 16 : for subxact in &parsed.subxacts {
724 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
725 0 : if subxact_pageno != pageno {
726 : // This subxact goes to different page. Write the record
727 : // for all the XIDs on the previous page, and continue
728 : // accumulating XIDs on this new page.
729 0 : modification.put_slru_wal_record(
730 0 : SlruKind::Clog,
731 0 : segno,
732 0 : rpageno,
733 0 : if is_commit {
734 0 : NeonWalRecord::ClogSetCommitted {
735 0 : xids: page_xids,
736 0 : timestamp: parsed.xact_time,
737 0 : }
738 : } else {
739 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
740 : },
741 0 : )?;
742 0 : page_xids = Vec::new();
743 0 : }
744 0 : pageno = subxact_pageno;
745 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
746 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
747 0 : page_xids.push(*subxact);
748 : }
749 16 : modification.put_slru_wal_record(
750 16 : SlruKind::Clog,
751 16 : segno,
752 16 : rpageno,
753 16 : if is_commit {
754 16 : NeonWalRecord::ClogSetCommitted {
755 16 : xids: page_xids,
756 16 : timestamp: parsed.xact_time,
757 16 : }
758 : } else {
759 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
760 : },
761 0 : )?;
762 :
763 : // Group relations to drop by dbNode. This map will contain all relations that _might_
764 : // exist, we will reduce it to which ones really exist later. This map can be huge if
765 : // the transaction touches a huge number of relations (there is no bound on this in
766 : // postgres).
767 16 : let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
768 :
769 16 : for xnode in &parsed.xnodes {
770 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
771 0 : let rel = RelTag {
772 0 : forknum,
773 0 : spcnode: xnode.spcnode,
774 0 : dbnode: xnode.dbnode,
775 0 : relnode: xnode.relnode,
776 0 : };
777 0 : drop_relations
778 0 : .entry((xnode.spcnode, xnode.dbnode))
779 0 : .or_default()
780 0 : .push(rel);
781 0 : }
782 : }
783 :
784 : // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
785 16 : modification.put_rel_drops(drop_relations, ctx).await?;
786 :
787 16 : if origin_id != 0 {
788 0 : modification
789 0 : .set_replorigin(origin_id, parsed.origin_lsn)
790 0 : .await?;
791 16 : }
792 :
793 16 : if is_prepared {
794 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
795 0 : trace!(
796 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
797 : xl_xid, parsed.xid, lsn,
798 : );
799 :
800 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
801 0 : self.adjust_to_full_transaction_id(parsed.xid)?
802 : } else {
803 0 : parsed.xid as u64
804 : };
805 0 : modification.drop_twophase_file(xid, ctx).await?;
806 16 : }
807 :
808 16 : Ok(())
809 16 : }
810 :
811 0 : async fn ingest_clog_truncate(
812 0 : &mut self,
813 0 : truncate: ClogTruncate,
814 0 : modification: &mut DatadirModification<'_>,
815 0 : ctx: &RequestContext,
816 0 : ) -> anyhow::Result<()> {
817 0 : let ClogTruncate {
818 0 : pageno,
819 0 : oldest_xid,
820 0 : oldest_xid_db,
821 0 : } = truncate;
822 0 :
823 0 : info!(
824 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
825 : pageno, oldest_xid, oldest_xid_db
826 : );
827 :
828 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
829 : // truncated, but a checkpoint record with the updated values isn't written until
830 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
831 : // so we keep the oldestXid and oldestXidDB up-to-date.
832 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
833 0 : cp.oldestXid = oldest_xid;
834 0 : cp.oldestXidDB = oldest_xid_db;
835 0 : });
836 0 : self.checkpoint_modified = true;
837 :
838 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
839 :
840 0 : let latest_page_number =
841 0 : enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
842 : / pg_constants::CLOG_XACTS_PER_PAGE;
843 :
844 : // Now delete all segments containing pages between xlrec.pageno
845 : // and latest_page_number.
846 :
847 : // First, make an important safety check:
848 : // the current endpoint page must not be eligible for removal.
849 : // See SimpleLruTruncate() in slru.c
850 0 : if dispatch_pgversion!(modification.tline.pg_version, {
851 0 : pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
852 : }) {
853 0 : info!("could not truncate directory pg_xact apparent wraparound");
854 0 : return Ok(());
855 0 : }
856 0 :
857 0 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
858 0 : //
859 0 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
860 0 : // will block waiting for the last valid LSN to advance up to
861 0 : // it. So we use the previous record's LSN in the get calls
862 0 : // instead.
863 0 : if modification.tline.get_shard_identity().is_shard_zero() {
864 0 : for segno in modification
865 0 : .tline
866 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
867 0 : .await?
868 : {
869 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
870 :
871 0 : let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
872 0 : pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
873 : });
874 :
875 0 : if may_delete {
876 0 : modification
877 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
878 0 : .await?;
879 0 : trace!("Drop CLOG segment {:>04X}", segno);
880 0 : }
881 : }
882 0 : }
883 :
884 0 : Ok(())
885 0 : }
886 :
887 0 : async fn ingest_clog_zero_page(
888 0 : &mut self,
889 0 : zero_page: ClogZeroPage,
890 0 : modification: &mut DatadirModification<'_>,
891 0 : ctx: &RequestContext,
892 0 : ) -> anyhow::Result<()> {
893 0 : let ClogZeroPage { segno, rpageno } = zero_page;
894 0 :
895 0 : self.put_slru_page_image(
896 0 : modification,
897 0 : SlruKind::Clog,
898 0 : segno,
899 0 : rpageno,
900 0 : ZERO_PAGE.clone(),
901 0 : ctx,
902 0 : )
903 0 : .await
904 0 : }
905 :
906 0 : fn ingest_multixact_create(
907 0 : &mut self,
908 0 : modification: &mut DatadirModification,
909 0 : xlrec: &XlMultiXactCreate,
910 0 : ) -> Result<()> {
911 0 : // Create WAL record for updating the multixact-offsets page
912 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
913 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
914 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
915 0 :
916 0 : modification.put_slru_wal_record(
917 0 : SlruKind::MultiXactOffsets,
918 0 : segno,
919 0 : rpageno,
920 0 : NeonWalRecord::MultixactOffsetCreate {
921 0 : mid: xlrec.mid,
922 0 : moff: xlrec.moff,
923 0 : },
924 0 : )?;
925 :
926 : // Create WAL records for the update of each affected multixact-members page
927 0 : let mut members = xlrec.members.iter();
928 0 : let mut offset = xlrec.moff;
929 : loop {
930 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
931 0 :
932 0 : // How many members fit on this page?
933 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
934 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
935 0 :
936 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
937 0 : for _ in 0..page_remain {
938 0 : if let Some(m) = members.next() {
939 0 : this_page_members.push(m.clone());
940 0 : } else {
941 0 : break;
942 : }
943 : }
944 0 : if this_page_members.is_empty() {
945 : // all done
946 0 : break;
947 0 : }
948 0 : let n_this_page = this_page_members.len();
949 0 :
950 0 : modification.put_slru_wal_record(
951 0 : SlruKind::MultiXactMembers,
952 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
953 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
954 0 : NeonWalRecord::MultixactMembersCreate {
955 0 : moff: offset,
956 0 : members: this_page_members,
957 0 : },
958 0 : )?;
959 :
960 : // Note: The multixact members can wrap around, even within one WAL record.
961 0 : offset = offset.wrapping_add(n_this_page as u32);
962 : }
963 0 : let next_offset = offset;
964 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
965 :
966 : // Update next-multi-xid and next-offset
967 : //
968 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
969 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
970 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
971 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
972 : // is stored, so it's never 0 in a checkpoint.
973 : //
974 : // I don't know why it's done that way, it seems less error-prone to skip over 0
975 : // when the value is stored rather than when it's read. But let's do it the same
976 : // way here.
977 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
978 0 :
979 0 : if self
980 0 : .checkpoint
981 0 : .update_next_multixid(next_multi_xid, next_offset)
982 0 : {
983 0 : self.checkpoint_modified = true;
984 0 : }
985 :
986 : // Also update the next-xid with the highest member. According to the comments in
987 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
988 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
989 0 : if let Some(max_xid) = acc {
990 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
991 0 : Some(mbr.xid)
992 : } else {
993 0 : acc
994 : }
995 : } else {
996 0 : Some(mbr.xid)
997 : }
998 0 : });
999 :
1000 0 : if let Some(max_xid) = max_mbr_xid {
1001 0 : if self.checkpoint.update_next_xid(max_xid) {
1002 0 : self.checkpoint_modified = true;
1003 0 : }
1004 0 : }
1005 0 : Ok(())
1006 0 : }
1007 :
1008 0 : async fn ingest_multixact_truncate(
1009 0 : &mut self,
1010 0 : modification: &mut DatadirModification<'_>,
1011 0 : xlrec: &XlMultiXactTruncate,
1012 0 : ctx: &RequestContext,
1013 0 : ) -> Result<()> {
1014 0 : let (maxsegment, startsegment, endsegment) =
1015 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1016 0 : cp.oldestMulti = xlrec.end_trunc_off;
1017 0 : cp.oldestMultiDB = xlrec.oldest_multi_db;
1018 0 : let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
1019 0 : pg_constants::MAX_MULTIXACT_OFFSET,
1020 0 : );
1021 0 : let startsegment: i32 =
1022 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
1023 0 : let endsegment: i32 =
1024 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
1025 0 : (maxsegment, startsegment, endsegment)
1026 : });
1027 :
1028 0 : self.checkpoint_modified = true;
1029 0 :
1030 0 : // PerformMembersTruncation
1031 0 : let mut segment: i32 = startsegment;
1032 0 :
1033 0 : // Delete all the segments except the last one. The last segment can still
1034 0 : // contain, possibly partially, valid data.
1035 0 : if modification.tline.get_shard_identity().is_shard_zero() {
1036 0 : while segment != endsegment {
1037 0 : modification
1038 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1039 0 : .await?;
1040 :
1041 : /* move to next segment, handling wraparound correctly */
1042 0 : if segment == maxsegment {
1043 0 : segment = 0;
1044 0 : } else {
1045 0 : segment += 1;
1046 0 : }
1047 : }
1048 0 : }
1049 :
1050 : // Truncate offsets
1051 : // FIXME: this did not handle wraparound correctly
1052 :
1053 0 : Ok(())
1054 0 : }
1055 :
1056 0 : async fn ingest_multixact_zero_page(
1057 0 : &mut self,
1058 0 : zero_page: MultiXactZeroPage,
1059 0 : modification: &mut DatadirModification<'_>,
1060 0 : ctx: &RequestContext,
1061 0 : ) -> Result<()> {
1062 0 : let MultiXactZeroPage {
1063 0 : slru_kind,
1064 0 : segno,
1065 0 : rpageno,
1066 0 : } = zero_page;
1067 0 : self.put_slru_page_image(
1068 0 : modification,
1069 0 : slru_kind,
1070 0 : segno,
1071 0 : rpageno,
1072 0 : ZERO_PAGE.clone(),
1073 0 : ctx,
1074 0 : )
1075 0 : .await
1076 0 : }
1077 :
1078 0 : async fn ingest_relmap_update(
1079 0 : &mut self,
1080 0 : update: RelmapUpdate,
1081 0 : modification: &mut DatadirModification<'_>,
1082 0 : ctx: &RequestContext,
1083 0 : ) -> Result<()> {
1084 0 : let RelmapUpdate { update, buf } = update;
1085 0 :
1086 0 : modification
1087 0 : .put_relmap_file(update.tsid, update.dbid, buf, ctx)
1088 0 : .await
1089 0 : }
1090 :
1091 60 : async fn ingest_raw_xlog_record(
1092 60 : &mut self,
1093 60 : raw_record: RawXlogRecord,
1094 60 : modification: &mut DatadirModification<'_>,
1095 60 : ctx: &RequestContext,
1096 60 : ) -> Result<()> {
1097 60 : let RawXlogRecord { info, lsn, mut buf } = raw_record;
1098 60 : let pg_version = modification.tline.pg_version;
1099 60 :
1100 60 : if info == pg_constants::XLOG_PARAMETER_CHANGE {
1101 4 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1102 0 : let rec = v17::XlParameterChange::decode(&mut buf);
1103 0 : cp.wal_level = rec.wal_level;
1104 0 : self.checkpoint_modified = true;
1105 4 : }
1106 56 : } else if info == pg_constants::XLOG_END_OF_RECOVERY {
1107 0 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1108 0 : let rec = v17::XlEndOfRecovery::decode(&mut buf);
1109 0 : cp.wal_level = rec.wal_level;
1110 0 : self.checkpoint_modified = true;
1111 0 : }
1112 56 : }
1113 :
1114 60 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1115 0 : if info == pg_constants::XLOG_NEXTOID {
1116 0 : let next_oid = buf.get_u32_le();
1117 0 : if cp.nextOid != next_oid {
1118 0 : cp.nextOid = next_oid;
1119 0 : self.checkpoint_modified = true;
1120 0 : }
1121 0 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
1122 0 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1123 : {
1124 0 : let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
1125 0 : buf.copy_to_slice(&mut checkpoint_bytes);
1126 0 : let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
1127 0 : trace!(
1128 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
1129 : xlog_checkpoint.oldestXid, cp.oldestXid
1130 : );
1131 0 : if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
1132 0 : cp.oldestXid = xlog_checkpoint.oldestXid;
1133 0 : }
1134 0 : trace!(
1135 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
1136 : xlog_checkpoint.oldestActiveXid, cp.oldestActiveXid
1137 : );
1138 :
1139 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
1140 : // because at shutdown, all in-progress transactions will implicitly
1141 : // end. Postgres startup code knows that, and allows hot standby to start
1142 : // immediately from a shutdown checkpoint.
1143 : //
1144 : // In Neon, Postgres hot standby startup always behaves as if starting from
1145 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
1146 : // instead of overwriting self.checkpoint.oldestActiveXid with
1147 : // InvalidTransactionid from the checkpoint WAL record, update it to a
1148 : // proper value, knowing that there are no in-progress transactions at this
1149 : // point, except for prepared transactions.
1150 : //
1151 : // See also the neon code changes in the InitWalRecovery() function.
1152 0 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
1153 0 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1154 : {
1155 0 : let oldest_active_xid = if pg_version >= 17 {
1156 0 : let mut oldest_active_full_xid = cp.nextXid.value;
1157 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1158 0 : if xid < oldest_active_full_xid {
1159 0 : oldest_active_full_xid = xid;
1160 0 : }
1161 : }
1162 0 : oldest_active_full_xid as u32
1163 : } else {
1164 0 : let mut oldest_active_xid = cp.nextXid.value as u32;
1165 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1166 0 : let narrow_xid = xid as u32;
1167 0 : if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
1168 0 : oldest_active_xid = narrow_xid;
1169 0 : }
1170 : }
1171 0 : oldest_active_xid
1172 : };
1173 0 : cp.oldestActiveXid = oldest_active_xid;
1174 0 : } else {
1175 0 : cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
1176 0 : }
1177 : // NB: We abuse the Checkpoint.redo field:
1178 : //
1179 : // - In PostgreSQL, the Checkpoint struct doesn't store the information
1180 : // of whether this is an online checkpoint or a shutdown checkpoint. It's
1181 : // stored in the XLOG info field of the WAL record, shutdown checkpoints
1182 : // use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
1183 : // XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
1184 : // in the pageserver, however.
1185 : //
1186 : // - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
1187 : // checkpoint record, if it's a shutdown checkpoint. But when we are
1188 : // starting from a shutdown checkpoint, the basebackup LSN is the *end*
1189 : // of the shutdown checkpoint WAL record. That makes it difficult to
1190 : // correctly detect whether we're starting from a shutdown record or
1191 : // not.
1192 : //
1193 : // To address both of those issues, we store 0 in the redo field if it's
1194 : // an online checkpoint record, and the record's *end* LSN if it's a
1195 : // shutdown checkpoint. We don't need the original redo pointer in neon,
1196 : // because we don't perform WAL replay at startup anyway, so we can get
1197 : // away with abusing the redo field like this.
1198 : //
1199 : // XXX: Ideally, we would persist the extra information in a more
1200 : // explicit format, rather than repurpose the fields of the Postgres
1201 : // struct like this. However, we already have persisted data like this,
1202 : // so we need to maintain backwards compatibility.
1203 : //
1204 : // NB: We didn't originally have this convention, so there are still old
1205 : // persisted records that didn't do this. Before, we didn't update the
1206 : // persisted redo field at all. That means that old records have a bogus
1207 : // redo pointer that points to some old value, from the checkpoint record
1208 : // that was originally imported from the data directory. If it was a
1209 : // project created in Neon, that means it points to the first checkpoint
1210 : // after initdb. That's OK for our purposes: all such old checkpoints are
1211 : // treated as old online checkpoints when the basebackup is created.
1212 0 : cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
1213 : // Store the *end* LSN of the checkpoint record. Or to be precise,
1214 : // the start LSN of the *next* record, i.e. if the record ends
1215 : // exactly at page boundary, the redo LSN points to just after the
1216 : // page header on the next page.
1217 0 : lsn.into()
1218 : } else {
1219 0 : Lsn::INVALID.into()
1220 : };
1221 :
1222 : // Write a new checkpoint key-value pair on every checkpoint record, even
1223 : // if nothing really changed. Not strictly required, but it seems nice to
1224 : // have some trace of the checkpoint records in the layer files at the same
1225 : // LSNs.
1226 0 : self.checkpoint_modified = true;
1227 0 : }
1228 : });
1229 :
1230 60 : Ok(())
1231 60 : }
1232 :
1233 0 : async fn ingest_logical_message_put(
1234 0 : &mut self,
1235 0 : put: PutLogicalMessage,
1236 0 : modification: &mut DatadirModification<'_>,
1237 0 : ctx: &RequestContext,
1238 0 : ) -> Result<()> {
1239 0 : let PutLogicalMessage { path, buf } = put;
1240 0 : modification.put_file(path.as_str(), &buf, ctx).await
1241 0 : }
1242 :
1243 0 : fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
1244 0 : match record {
1245 0 : StandbyRecord::RunningXacts(running_xacts) => {
1246 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1247 0 : cp.oldestActiveXid = running_xacts.oldest_running_xid;
1248 0 : });
1249 :
1250 0 : self.checkpoint_modified = true;
1251 0 : }
1252 0 : }
1253 0 :
1254 0 : Ok(())
1255 0 : }
1256 :
1257 0 : async fn ingest_replorigin_record(
1258 0 : &mut self,
1259 0 : record: ReploriginRecord,
1260 0 : modification: &mut DatadirModification<'_>,
1261 0 : ) -> Result<()> {
1262 0 : match record {
1263 0 : ReploriginRecord::Set(set) => {
1264 0 : modification
1265 0 : .set_replorigin(set.node_id, set.remote_lsn)
1266 0 : .await?;
1267 : }
1268 0 : ReploriginRecord::Drop(drop) => {
1269 0 : modification.drop_replorigin(drop.node_id).await?;
1270 : }
1271 : }
1272 :
1273 0 : Ok(())
1274 0 : }
1275 :
1276 36 : async fn put_rel_creation(
1277 36 : &mut self,
1278 36 : modification: &mut DatadirModification<'_>,
1279 36 : rel: RelTag,
1280 36 : ctx: &RequestContext,
1281 36 : ) -> Result<()> {
1282 36 : modification.put_rel_creation(rel, 0, ctx).await?;
1283 36 : Ok(())
1284 36 : }
1285 :
1286 : #[cfg(test)]
1287 544804 : async fn put_rel_page_image(
1288 544804 : &mut self,
1289 544804 : modification: &mut DatadirModification<'_>,
1290 544804 : rel: RelTag,
1291 544804 : blknum: BlockNumber,
1292 544804 : img: Bytes,
1293 544804 : ctx: &RequestContext,
1294 544804 : ) -> Result<(), PageReconstructError> {
1295 544804 : self.handle_rel_extend(modification, rel, blknum, ctx)
1296 544804 : .await?;
1297 544804 : modification.put_rel_page_image(rel, blknum, img)?;
1298 544804 : Ok(())
1299 544804 : }
1300 :
1301 24 : async fn put_rel_wal_record(
1302 24 : &mut self,
1303 24 : modification: &mut DatadirModification<'_>,
1304 24 : rel: RelTag,
1305 24 : blknum: BlockNumber,
1306 24 : rec: NeonWalRecord,
1307 24 : ctx: &RequestContext,
1308 24 : ) -> Result<()> {
1309 24 : self.handle_rel_extend(modification, rel, blknum, ctx)
1310 24 : .await?;
1311 24 : modification.put_rel_wal_record(rel, blknum, rec)?;
1312 24 : Ok(())
1313 24 : }
1314 :
1315 12024 : async fn put_rel_truncation(
1316 12024 : &mut self,
1317 12024 : modification: &mut DatadirModification<'_>,
1318 12024 : rel: RelTag,
1319 12024 : nblocks: BlockNumber,
1320 12024 : ctx: &RequestContext,
1321 12024 : ) -> anyhow::Result<()> {
1322 12024 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1323 12024 : Ok(())
1324 12024 : }
1325 :
1326 544828 : async fn handle_rel_extend(
1327 544828 : &mut self,
1328 544828 : modification: &mut DatadirModification<'_>,
1329 544828 : rel: RelTag,
1330 544828 : blknum: BlockNumber,
1331 544828 : ctx: &RequestContext,
1332 544828 : ) -> Result<(), PageReconstructError> {
1333 544828 : let new_nblocks = blknum + 1;
1334 : // Check if the relation exists. We implicitly create relations on first
1335 : // record.
1336 544828 : let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
1337 :
1338 544828 : if new_nblocks > old_nblocks {
1339 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1340 544796 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1341 :
1342 544796 : let mut key = rel_block_to_key(rel, blknum);
1343 544796 :
1344 544796 : // fill the gap with zeros
1345 544796 : let mut gap_blocks_filled: u64 = 0;
1346 544796 : for gap_blknum in old_nblocks..blknum {
1347 5996 : key.field6 = gap_blknum;
1348 5996 :
1349 5996 : if self.shard.get_shard_number(&key) != self.shard.number {
1350 0 : continue;
1351 5996 : }
1352 5996 :
1353 5996 : modification.put_rel_page_image_zero(rel, gap_blknum)?;
1354 5996 : gap_blocks_filled += 1;
1355 : }
1356 :
1357 544796 : WAL_INGEST
1358 544796 : .gap_blocks_zeroed_on_rel_extend
1359 544796 : .inc_by(gap_blocks_filled);
1360 544796 :
1361 544796 : // Log something when relation extends cause use to fill gaps
1362 544796 : // with zero pages. Logging is rate limited per pg version to
1363 544796 : // avoid skewing.
1364 544796 : if gap_blocks_filled > 0 {
1365 : use std::sync::Mutex;
1366 :
1367 : use once_cell::sync::Lazy;
1368 : use utils::rate_limit::RateLimit;
1369 :
1370 : struct RateLimitPerPgVersion {
1371 : rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
1372 : }
1373 :
1374 : impl RateLimitPerPgVersion {
1375 0 : const fn new() -> Self {
1376 0 : Self {
1377 0 : rate_limiters: [const {
1378 4 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
1379 0 : }; 4],
1380 0 : }
1381 0 : }
1382 :
1383 8 : const fn rate_limiter(
1384 8 : &self,
1385 8 : pg_version: u32,
1386 8 : ) -> Option<&Lazy<Mutex<RateLimit>>> {
1387 : const MIN_PG_VERSION: u32 = 14;
1388 : const MAX_PG_VERSION: u32 = 17;
1389 :
1390 8 : if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
1391 0 : return None;
1392 8 : }
1393 8 :
1394 8 : Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
1395 8 : }
1396 : }
1397 :
1398 : static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
1399 8 : if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
1400 8 : if let Ok(mut locked) = rate_limiter.try_lock() {
1401 8 : locked.call(|| {
1402 4 : info!(
1403 0 : lsn=%modification.get_lsn(),
1404 0 : pg_version=%modification.tline.pg_version,
1405 0 : rel=%rel,
1406 0 : "Filled {} gap blocks on rel extend to {} from {}",
1407 : gap_blocks_filled,
1408 : new_nblocks,
1409 : old_nblocks);
1410 8 : });
1411 8 : }
1412 0 : }
1413 544788 : }
1414 32 : }
1415 544828 : Ok(())
1416 544828 : }
1417 :
1418 0 : async fn put_slru_page_image(
1419 0 : &mut self,
1420 0 : modification: &mut DatadirModification<'_>,
1421 0 : kind: SlruKind,
1422 0 : segno: u32,
1423 0 : blknum: BlockNumber,
1424 0 : img: Bytes,
1425 0 : ctx: &RequestContext,
1426 0 : ) -> Result<()> {
1427 0 : if !self.shard.is_shard_zero() {
1428 0 : return Ok(());
1429 0 : }
1430 0 :
1431 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1432 0 : .await?;
1433 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1434 0 : Ok(())
1435 0 : }
1436 :
1437 0 : async fn handle_slru_extend(
1438 0 : &mut self,
1439 0 : modification: &mut DatadirModification<'_>,
1440 0 : kind: SlruKind,
1441 0 : segno: u32,
1442 0 : blknum: BlockNumber,
1443 0 : ctx: &RequestContext,
1444 0 : ) -> anyhow::Result<()> {
1445 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1446 0 : // extended with ZEROPAGE records, not with commit records, so it happens
1447 0 : // a lot less frequently.
1448 0 :
1449 0 : let new_nblocks = blknum + 1;
1450 : // Check if the relation exists. We implicitly create relations on first
1451 : // record.
1452 : // TODO: would be nice if to be more explicit about it
1453 0 : let old_nblocks = if !modification
1454 0 : .tline
1455 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1456 0 : .await?
1457 : {
1458 : // create it with 0 size initially, the logic below will extend it
1459 0 : modification
1460 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1461 0 : .await?;
1462 0 : 0
1463 : } else {
1464 0 : modification
1465 0 : .tline
1466 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1467 0 : .await?
1468 : };
1469 :
1470 0 : if new_nblocks > old_nblocks {
1471 0 : trace!(
1472 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1473 : kind, segno, old_nblocks, new_nblocks
1474 : );
1475 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1476 :
1477 : // fill the gap with zeros
1478 0 : for gap_blknum in old_nblocks..blknum {
1479 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
1480 : }
1481 0 : }
1482 0 : Ok(())
1483 0 : }
1484 : }
1485 :
1486 : /// Returns the size of the relation as of this modification, or None if the relation doesn't exist.
1487 : ///
1488 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
1489 : /// page number stored in the shard, or None if the shard does not have any pages for it.
1490 24 : async fn get_relsize(
1491 24 : modification: &DatadirModification<'_>,
1492 24 : rel: RelTag,
1493 24 : ctx: &RequestContext,
1494 24 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
1495 24 : if !modification
1496 24 : .tline
1497 24 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1498 24 : .await?
1499 : {
1500 0 : return Ok(None);
1501 24 : }
1502 24 : modification
1503 24 : .tline
1504 24 : .get_rel_size(rel, Version::Modified(modification), ctx)
1505 24 : .await
1506 24 : .map(Some)
1507 24 : }
1508 :
1509 : #[allow(clippy::bool_assert_comparison)]
1510 : #[cfg(test)]
1511 : mod tests {
1512 : use postgres_ffi::RELSEG_SIZE;
1513 :
1514 : use super::*;
1515 : use crate::DEFAULT_PG_VERSION;
1516 : use crate::tenant::harness::*;
1517 : use crate::tenant::remote_timeline_client::{INITDB_PATH, remote_initdb_archive_path};
1518 : use crate::tenant::storage_layer::IoConcurrency;
1519 :
1520 : /// Arbitrary relation tag, for testing.
1521 : const TESTREL_A: RelTag = RelTag {
1522 : spcnode: 0,
1523 : dbnode: 111,
1524 : relnode: 1000,
1525 : forknum: 0,
1526 : };
1527 :
1528 24 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1529 24 : // TODO
1530 24 : }
1531 :
1532 : #[tokio::test]
1533 4 : async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
1534 16 : for i in 14..=16 {
1535 12 : dispatch_pgversion!(i, {
1536 4 : pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
1537 4 : });
1538 4 : }
1539 4 :
1540 4 : Ok(())
1541 4 : }
1542 :
1543 16 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1544 16 : let mut m = tline.begin_modification(Lsn(0x10));
1545 16 : m.put_checkpoint(dispatch_pgversion!(
1546 16 : tline.pg_version,
1547 0 : pgv::ZERO_CHECKPOINT.clone()
1548 0 : ))?;
1549 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1550 16 : m.commit(ctx).await?;
1551 16 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1552 :
1553 16 : Ok(walingest)
1554 16 : }
1555 :
1556 : #[tokio::test]
1557 4 : async fn test_relsize() -> Result<()> {
1558 4 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
1559 4 : let io_concurrency = IoConcurrency::spawn_for_test();
1560 4 : let tline = tenant
1561 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1562 4 : .await?;
1563 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1564 4 :
1565 4 : let mut m = tline.begin_modification(Lsn(0x20));
1566 4 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1567 4 : walingest
1568 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1569 4 : .await?;
1570 4 : m.commit(&ctx).await?;
1571 4 : let mut m = tline.begin_modification(Lsn(0x30));
1572 4 : walingest
1573 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
1574 4 : .await?;
1575 4 : m.commit(&ctx).await?;
1576 4 : let mut m = tline.begin_modification(Lsn(0x40));
1577 4 : walingest
1578 4 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
1579 4 : .await?;
1580 4 : m.commit(&ctx).await?;
1581 4 : let mut m = tline.begin_modification(Lsn(0x50));
1582 4 : walingest
1583 4 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
1584 4 : .await?;
1585 4 : m.commit(&ctx).await?;
1586 4 :
1587 4 : assert_current_logical_size(&tline, Lsn(0x50));
1588 4 :
1589 4 : let test_span = tracing::info_span!(parent: None, "test",
1590 4 : tenant_id=%tline.tenant_shard_id.tenant_id,
1591 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
1592 0 : timeline_id=%tline.timeline_id);
1593 4 :
1594 4 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1595 4 : assert_eq!(
1596 4 : tline
1597 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1598 4 : .await?,
1599 4 : false
1600 4 : );
1601 4 : assert!(
1602 4 : tline
1603 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1604 4 : .await
1605 4 : .is_err()
1606 4 : );
1607 4 : assert_eq!(
1608 4 : tline
1609 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1610 4 : .await?,
1611 4 : true
1612 4 : );
1613 4 : assert_eq!(
1614 4 : tline
1615 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1616 4 : .await?,
1617 4 : 1
1618 4 : );
1619 4 : assert_eq!(
1620 4 : tline
1621 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1622 4 : .await?,
1623 4 : 3
1624 4 : );
1625 4 :
1626 4 : // Check page contents at each LSN
1627 4 : assert_eq!(
1628 4 : tline
1629 4 : .get_rel_page_at_lsn(
1630 4 : TESTREL_A,
1631 4 : 0,
1632 4 : Version::Lsn(Lsn(0x20)),
1633 4 : &ctx,
1634 4 : io_concurrency.clone()
1635 4 : )
1636 4 : .instrument(test_span.clone())
1637 4 : .await?,
1638 4 : test_img("foo blk 0 at 2")
1639 4 : );
1640 4 :
1641 4 : assert_eq!(
1642 4 : tline
1643 4 : .get_rel_page_at_lsn(
1644 4 : TESTREL_A,
1645 4 : 0,
1646 4 : Version::Lsn(Lsn(0x30)),
1647 4 : &ctx,
1648 4 : io_concurrency.clone()
1649 4 : )
1650 4 : .instrument(test_span.clone())
1651 4 : .await?,
1652 4 : test_img("foo blk 0 at 3")
1653 4 : );
1654 4 :
1655 4 : assert_eq!(
1656 4 : tline
1657 4 : .get_rel_page_at_lsn(
1658 4 : TESTREL_A,
1659 4 : 0,
1660 4 : Version::Lsn(Lsn(0x40)),
1661 4 : &ctx,
1662 4 : io_concurrency.clone()
1663 4 : )
1664 4 : .instrument(test_span.clone())
1665 4 : .await?,
1666 4 : test_img("foo blk 0 at 3")
1667 4 : );
1668 4 : assert_eq!(
1669 4 : tline
1670 4 : .get_rel_page_at_lsn(
1671 4 : TESTREL_A,
1672 4 : 1,
1673 4 : Version::Lsn(Lsn(0x40)),
1674 4 : &ctx,
1675 4 : io_concurrency.clone()
1676 4 : )
1677 4 : .instrument(test_span.clone())
1678 4 : .await?,
1679 4 : test_img("foo blk 1 at 4")
1680 4 : );
1681 4 :
1682 4 : assert_eq!(
1683 4 : tline
1684 4 : .get_rel_page_at_lsn(
1685 4 : TESTREL_A,
1686 4 : 0,
1687 4 : Version::Lsn(Lsn(0x50)),
1688 4 : &ctx,
1689 4 : io_concurrency.clone()
1690 4 : )
1691 4 : .instrument(test_span.clone())
1692 4 : .await?,
1693 4 : test_img("foo blk 0 at 3")
1694 4 : );
1695 4 : assert_eq!(
1696 4 : tline
1697 4 : .get_rel_page_at_lsn(
1698 4 : TESTREL_A,
1699 4 : 1,
1700 4 : Version::Lsn(Lsn(0x50)),
1701 4 : &ctx,
1702 4 : io_concurrency.clone()
1703 4 : )
1704 4 : .instrument(test_span.clone())
1705 4 : .await?,
1706 4 : test_img("foo blk 1 at 4")
1707 4 : );
1708 4 : assert_eq!(
1709 4 : tline
1710 4 : .get_rel_page_at_lsn(
1711 4 : TESTREL_A,
1712 4 : 2,
1713 4 : Version::Lsn(Lsn(0x50)),
1714 4 : &ctx,
1715 4 : io_concurrency.clone()
1716 4 : )
1717 4 : .instrument(test_span.clone())
1718 4 : .await?,
1719 4 : test_img("foo blk 2 at 5")
1720 4 : );
1721 4 :
1722 4 : // Truncate last block
1723 4 : let mut m = tline.begin_modification(Lsn(0x60));
1724 4 : walingest
1725 4 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1726 4 : .await?;
1727 4 : m.commit(&ctx).await?;
1728 4 : assert_current_logical_size(&tline, Lsn(0x60));
1729 4 :
1730 4 : // Check reported size and contents after truncation
1731 4 : assert_eq!(
1732 4 : tline
1733 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
1734 4 : .await?,
1735 4 : 2
1736 4 : );
1737 4 : assert_eq!(
1738 4 : tline
1739 4 : .get_rel_page_at_lsn(
1740 4 : TESTREL_A,
1741 4 : 0,
1742 4 : Version::Lsn(Lsn(0x60)),
1743 4 : &ctx,
1744 4 : io_concurrency.clone()
1745 4 : )
1746 4 : .instrument(test_span.clone())
1747 4 : .await?,
1748 4 : test_img("foo blk 0 at 3")
1749 4 : );
1750 4 : assert_eq!(
1751 4 : tline
1752 4 : .get_rel_page_at_lsn(
1753 4 : TESTREL_A,
1754 4 : 1,
1755 4 : Version::Lsn(Lsn(0x60)),
1756 4 : &ctx,
1757 4 : io_concurrency.clone()
1758 4 : )
1759 4 : .instrument(test_span.clone())
1760 4 : .await?,
1761 4 : test_img("foo blk 1 at 4")
1762 4 : );
1763 4 :
1764 4 : // should still see the truncated block with older LSN
1765 4 : assert_eq!(
1766 4 : tline
1767 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1768 4 : .await?,
1769 4 : 3
1770 4 : );
1771 4 : assert_eq!(
1772 4 : tline
1773 4 : .get_rel_page_at_lsn(
1774 4 : TESTREL_A,
1775 4 : 2,
1776 4 : Version::Lsn(Lsn(0x50)),
1777 4 : &ctx,
1778 4 : io_concurrency.clone()
1779 4 : )
1780 4 : .instrument(test_span.clone())
1781 4 : .await?,
1782 4 : test_img("foo blk 2 at 5")
1783 4 : );
1784 4 :
1785 4 : // Truncate to zero length
1786 4 : let mut m = tline.begin_modification(Lsn(0x68));
1787 4 : walingest
1788 4 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1789 4 : .await?;
1790 4 : m.commit(&ctx).await?;
1791 4 : assert_eq!(
1792 4 : tline
1793 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
1794 4 : .await?,
1795 4 : 0
1796 4 : );
1797 4 :
1798 4 : // Extend from 0 to 2 blocks, leaving a gap
1799 4 : let mut m = tline.begin_modification(Lsn(0x70));
1800 4 : walingest
1801 4 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
1802 4 : .await?;
1803 4 : m.commit(&ctx).await?;
1804 4 : assert_eq!(
1805 4 : tline
1806 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
1807 4 : .await?,
1808 4 : 2
1809 4 : );
1810 4 : assert_eq!(
1811 4 : tline
1812 4 : .get_rel_page_at_lsn(
1813 4 : TESTREL_A,
1814 4 : 0,
1815 4 : Version::Lsn(Lsn(0x70)),
1816 4 : &ctx,
1817 4 : io_concurrency.clone()
1818 4 : )
1819 4 : .instrument(test_span.clone())
1820 4 : .await?,
1821 4 : ZERO_PAGE
1822 4 : );
1823 4 : assert_eq!(
1824 4 : tline
1825 4 : .get_rel_page_at_lsn(
1826 4 : TESTREL_A,
1827 4 : 1,
1828 4 : Version::Lsn(Lsn(0x70)),
1829 4 : &ctx,
1830 4 : io_concurrency.clone()
1831 4 : )
1832 4 : .instrument(test_span.clone())
1833 4 : .await?,
1834 4 : test_img("foo blk 1")
1835 4 : );
1836 4 :
1837 4 : // Extend a lot more, leaving a big gap that spans across segments
1838 4 : let mut m = tline.begin_modification(Lsn(0x80));
1839 4 : walingest
1840 4 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
1841 4 : .await?;
1842 4 : m.commit(&ctx).await?;
1843 4 : assert_eq!(
1844 4 : tline
1845 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
1846 4 : .await?,
1847 4 : 1501
1848 4 : );
1849 5996 : for blk in 2..1500 {
1850 5992 : assert_eq!(
1851 5992 : tline
1852 5992 : .get_rel_page_at_lsn(
1853 5992 : TESTREL_A,
1854 5992 : blk,
1855 5992 : Version::Lsn(Lsn(0x80)),
1856 5992 : &ctx,
1857 5992 : io_concurrency.clone()
1858 5992 : )
1859 5992 : .instrument(test_span.clone())
1860 5992 : .await?,
1861 5992 : ZERO_PAGE
1862 4 : );
1863 4 : }
1864 4 : assert_eq!(
1865 4 : tline
1866 4 : .get_rel_page_at_lsn(
1867 4 : TESTREL_A,
1868 4 : 1500,
1869 4 : Version::Lsn(Lsn(0x80)),
1870 4 : &ctx,
1871 4 : io_concurrency.clone()
1872 4 : )
1873 4 : .instrument(test_span.clone())
1874 4 : .await?,
1875 4 : test_img("foo blk 1500")
1876 4 : );
1877 4 :
1878 4 : Ok(())
1879 4 : }
1880 :
1881 : // Test what happens if we dropped a relation
1882 : // and then created it again within the same layer.
1883 : #[tokio::test]
1884 4 : async fn test_drop_extend() -> Result<()> {
1885 4 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
1886 4 : .await?
1887 4 : .load()
1888 4 : .await;
1889 4 : let tline = tenant
1890 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1891 4 : .await?;
1892 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1893 4 :
1894 4 : let mut m = tline.begin_modification(Lsn(0x20));
1895 4 : walingest
1896 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1897 4 : .await?;
1898 4 : m.commit(&ctx).await?;
1899 4 :
1900 4 : // Check that rel exists and size is correct
1901 4 : assert_eq!(
1902 4 : tline
1903 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1904 4 : .await?,
1905 4 : true
1906 4 : );
1907 4 : assert_eq!(
1908 4 : tline
1909 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1910 4 : .await?,
1911 4 : 1
1912 4 : );
1913 4 :
1914 4 : // Drop rel
1915 4 : let mut m = tline.begin_modification(Lsn(0x30));
1916 4 : let mut rel_drops = HashMap::new();
1917 4 : rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
1918 4 : m.put_rel_drops(rel_drops, &ctx).await?;
1919 4 : m.commit(&ctx).await?;
1920 4 :
1921 4 : // Check that rel is not visible anymore
1922 4 : assert_eq!(
1923 4 : tline
1924 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
1925 4 : .await?,
1926 4 : false
1927 4 : );
1928 4 :
1929 4 : // FIXME: should fail
1930 4 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
1931 4 :
1932 4 : // Re-create it
1933 4 : let mut m = tline.begin_modification(Lsn(0x40));
1934 4 : walingest
1935 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
1936 4 : .await?;
1937 4 : m.commit(&ctx).await?;
1938 4 :
1939 4 : // Check that rel exists and size is correct
1940 4 : assert_eq!(
1941 4 : tline
1942 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
1943 4 : .await?,
1944 4 : true
1945 4 : );
1946 4 : assert_eq!(
1947 4 : tline
1948 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
1949 4 : .await?,
1950 4 : 1
1951 4 : );
1952 4 :
1953 4 : Ok(())
1954 4 : }
1955 :
1956 : // Test what happens if we truncated a relation
1957 : // so that one of its segments was dropped
1958 : // and then extended it again within the same layer.
1959 : #[tokio::test]
1960 4 : async fn test_truncate_extend() -> Result<()> {
1961 4 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
1962 4 : .await?
1963 4 : .load()
1964 4 : .await;
1965 4 : let io_concurrency = IoConcurrency::spawn_for_test();
1966 4 : let tline = tenant
1967 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1968 4 : .await?;
1969 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1970 4 :
1971 4 : // Create a 20 MB relation (the size is arbitrary)
1972 4 : let relsize = 20 * 1024 * 1024 / 8192;
1973 4 : let mut m = tline.begin_modification(Lsn(0x20));
1974 10240 : for blkno in 0..relsize {
1975 10240 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
1976 10240 : walingest
1977 10240 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
1978 10240 : .await?;
1979 4 : }
1980 4 : m.commit(&ctx).await?;
1981 4 :
1982 4 : let test_span = tracing::info_span!(parent: None, "test",
1983 4 : tenant_id=%tline.tenant_shard_id.tenant_id,
1984 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
1985 0 : timeline_id=%tline.timeline_id);
1986 4 :
1987 4 : // The relation was created at LSN 20, not visible at LSN 1 yet.
1988 4 : assert_eq!(
1989 4 : tline
1990 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1991 4 : .await?,
1992 4 : false
1993 4 : );
1994 4 : assert!(
1995 4 : tline
1996 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1997 4 : .await
1998 4 : .is_err()
1999 4 : );
2000 4 :
2001 4 : assert_eq!(
2002 4 : tline
2003 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2004 4 : .await?,
2005 4 : true
2006 4 : );
2007 4 : assert_eq!(
2008 4 : tline
2009 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2010 4 : .await?,
2011 4 : relsize
2012 4 : );
2013 4 :
2014 4 : // Check relation content
2015 10240 : for blkno in 0..relsize {
2016 10240 : let lsn = Lsn(0x20);
2017 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2018 10240 : assert_eq!(
2019 10240 : tline
2020 10240 : .get_rel_page_at_lsn(
2021 10240 : TESTREL_A,
2022 10240 : blkno,
2023 10240 : Version::Lsn(lsn),
2024 10240 : &ctx,
2025 10240 : io_concurrency.clone()
2026 10240 : )
2027 10240 : .instrument(test_span.clone())
2028 10240 : .await?,
2029 10240 : test_img(&data)
2030 4 : );
2031 4 : }
2032 4 :
2033 4 : // Truncate relation so that second segment was dropped
2034 4 : // - only leave one page
2035 4 : let mut m = tline.begin_modification(Lsn(0x60));
2036 4 : walingest
2037 4 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2038 4 : .await?;
2039 4 : m.commit(&ctx).await?;
2040 4 :
2041 4 : // Check reported size and contents after truncation
2042 4 : assert_eq!(
2043 4 : tline
2044 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2045 4 : .await?,
2046 4 : 1
2047 4 : );
2048 4 :
2049 8 : for blkno in 0..1 {
2050 4 : let lsn = Lsn(0x20);
2051 4 : let data = format!("foo blk {} at {}", blkno, lsn);
2052 4 : assert_eq!(
2053 4 : tline
2054 4 : .get_rel_page_at_lsn(
2055 4 : TESTREL_A,
2056 4 : blkno,
2057 4 : Version::Lsn(Lsn(0x60)),
2058 4 : &ctx,
2059 4 : io_concurrency.clone()
2060 4 : )
2061 4 : .instrument(test_span.clone())
2062 4 : .await?,
2063 4 : test_img(&data)
2064 4 : );
2065 4 : }
2066 4 :
2067 4 : // should still see all blocks with older LSN
2068 4 : assert_eq!(
2069 4 : tline
2070 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2071 4 : .await?,
2072 4 : relsize
2073 4 : );
2074 10240 : for blkno in 0..relsize {
2075 10240 : let lsn = Lsn(0x20);
2076 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2077 10240 : assert_eq!(
2078 10240 : tline
2079 10240 : .get_rel_page_at_lsn(
2080 10240 : TESTREL_A,
2081 10240 : blkno,
2082 10240 : Version::Lsn(Lsn(0x50)),
2083 10240 : &ctx,
2084 10240 : io_concurrency.clone()
2085 10240 : )
2086 10240 : .instrument(test_span.clone())
2087 10240 : .await?,
2088 10240 : test_img(&data)
2089 4 : );
2090 4 : }
2091 4 :
2092 4 : // Extend relation again.
2093 4 : // Add enough blocks to create second segment
2094 4 : let lsn = Lsn(0x80);
2095 4 : let mut m = tline.begin_modification(lsn);
2096 10240 : for blkno in 0..relsize {
2097 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2098 10240 : walingest
2099 10240 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2100 10240 : .await?;
2101 4 : }
2102 4 : m.commit(&ctx).await?;
2103 4 :
2104 4 : assert_eq!(
2105 4 : tline
2106 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2107 4 : .await?,
2108 4 : true
2109 4 : );
2110 4 : assert_eq!(
2111 4 : tline
2112 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2113 4 : .await?,
2114 4 : relsize
2115 4 : );
2116 4 : // Check relation content
2117 10240 : for blkno in 0..relsize {
2118 10240 : let lsn = Lsn(0x80);
2119 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2120 10240 : assert_eq!(
2121 10240 : tline
2122 10240 : .get_rel_page_at_lsn(
2123 10240 : TESTREL_A,
2124 10240 : blkno,
2125 10240 : Version::Lsn(Lsn(0x80)),
2126 10240 : &ctx,
2127 10240 : io_concurrency.clone()
2128 10240 : )
2129 10240 : .instrument(test_span.clone())
2130 10240 : .await?,
2131 10240 : test_img(&data)
2132 4 : );
2133 4 : }
2134 4 :
2135 4 : Ok(())
2136 4 : }
2137 :
2138 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2139 : /// split into multiple 1 GB segments in Postgres.
2140 : #[tokio::test]
2141 4 : async fn test_large_rel() -> Result<()> {
2142 4 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
2143 4 : let tline = tenant
2144 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2145 4 : .await?;
2146 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2147 4 :
2148 4 : let mut lsn = 0x10;
2149 524292 : for blknum in 0..RELSEG_SIZE + 1 {
2150 524292 : lsn += 0x10;
2151 524292 : let mut m = tline.begin_modification(Lsn(lsn));
2152 524292 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2153 524292 : walingest
2154 524292 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2155 524292 : .await?;
2156 524292 : m.commit(&ctx).await?;
2157 4 : }
2158 4 :
2159 4 : assert_current_logical_size(&tline, Lsn(lsn));
2160 4 :
2161 4 : assert_eq!(
2162 4 : tline
2163 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2164 4 : .await?,
2165 4 : RELSEG_SIZE + 1
2166 4 : );
2167 4 :
2168 4 : // Truncate one block
2169 4 : lsn += 0x10;
2170 4 : let mut m = tline.begin_modification(Lsn(lsn));
2171 4 : walingest
2172 4 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2173 4 : .await?;
2174 4 : m.commit(&ctx).await?;
2175 4 : assert_eq!(
2176 4 : tline
2177 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2178 4 : .await?,
2179 4 : RELSEG_SIZE
2180 4 : );
2181 4 : assert_current_logical_size(&tline, Lsn(lsn));
2182 4 :
2183 4 : // Truncate another block
2184 4 : lsn += 0x10;
2185 4 : let mut m = tline.begin_modification(Lsn(lsn));
2186 4 : walingest
2187 4 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2188 4 : .await?;
2189 4 : m.commit(&ctx).await?;
2190 4 : assert_eq!(
2191 4 : tline
2192 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2193 4 : .await?,
2194 4 : RELSEG_SIZE - 1
2195 4 : );
2196 4 : assert_current_logical_size(&tline, Lsn(lsn));
2197 4 :
2198 4 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2199 4 : // This tests the behavior at segment boundaries
2200 4 : let mut size: i32 = 3000;
2201 12008 : while size >= 0 {
2202 12004 : lsn += 0x10;
2203 12004 : let mut m = tline.begin_modification(Lsn(lsn));
2204 12004 : walingest
2205 12004 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2206 12004 : .await?;
2207 12004 : m.commit(&ctx).await?;
2208 12004 : assert_eq!(
2209 12004 : tline
2210 12004 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2211 12004 : .await?,
2212 12004 : size as BlockNumber
2213 4 : );
2214 4 :
2215 12004 : size -= 1;
2216 4 : }
2217 4 : assert_current_logical_size(&tline, Lsn(lsn));
2218 4 :
2219 4 : Ok(())
2220 4 : }
2221 :
2222 : /// Replay a wal segment file taken directly from safekeepers.
2223 : ///
2224 : /// This test is useful for benchmarking since it allows us to profile only
2225 : /// the walingest code in a single-threaded executor, and iterate more quickly
2226 : /// without waiting for unrelated steps.
2227 : #[tokio::test]
2228 4 : async fn test_ingest_real_wal() {
2229 4 : use postgres_ffi::WAL_SEGMENT_SIZE;
2230 4 : use postgres_ffi::waldecoder::WalStreamDecoder;
2231 4 :
2232 4 : use crate::tenant::harness::*;
2233 4 :
2234 4 : // Define test data path and constants.
2235 4 : //
2236 4 : // Steps to reconstruct the data, if needed:
2237 4 : // 1. Run the pgbench python test
2238 4 : // 2. Take the first wal segment file from safekeeper
2239 4 : // 3. Compress it using `zstd --long input_file`
2240 4 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2241 4 : // 5. Grep sk logs for "restart decoder" to get startpoint
2242 4 : // 6. Run just the decoder from this test to get the endpoint.
2243 4 : // It's the last LSN the decoder will output.
2244 4 : let pg_version = 15; // The test data was generated by pg15
2245 4 : let path = "test_data/sk_wal_segment_from_pgbench";
2246 4 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2247 4 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2248 4 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2249 4 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2250 4 :
2251 4 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
2252 4 : let span = harness
2253 4 : .span()
2254 4 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
2255 4 : let (tenant, ctx) = harness.load().await;
2256 4 :
2257 4 : let remote_initdb_path =
2258 4 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2259 4 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2260 4 :
2261 4 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2262 4 : .expect("creating test dir should work");
2263 4 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2264 4 :
2265 4 : // Bootstrap a real timeline. We can't use create_test_timeline because
2266 4 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2267 4 : // the garbage data.
2268 4 : let tline = tenant
2269 4 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2270 4 : .await
2271 4 : .unwrap();
2272 4 :
2273 4 : // We fully read and decompress this into memory before decoding
2274 4 : // to get a more accurate perf profile of the decoder.
2275 4 : let bytes = {
2276 4 : use async_compression::tokio::bufread::ZstdDecoder;
2277 4 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2278 4 : let reader = tokio::io::BufReader::new(file);
2279 4 : let decoder = ZstdDecoder::new(reader);
2280 4 : let mut reader = tokio::io::BufReader::new(decoder);
2281 4 : let mut buffer = Vec::new();
2282 4 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2283 4 : buffer
2284 4 : };
2285 4 :
2286 4 : // TODO start a profiler too
2287 4 : let started_at = std::time::Instant::now();
2288 4 :
2289 4 : // Initialize walingest
2290 4 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2291 4 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2292 4 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2293 4 : .await
2294 4 : .unwrap();
2295 4 : let mut modification = tline.begin_modification(startpoint);
2296 4 : println!("decoding {} bytes", bytes.len() - xlogoff);
2297 4 :
2298 4 : // Decode and ingest wal. We process the wal in chunks because
2299 4 : // that's what happens when we get bytes from safekeepers.
2300 949372 : for chunk in bytes[xlogoff..].chunks(50) {
2301 949372 : decoder.feed_bytes(chunk);
2302 1241072 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2303 291700 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
2304 291700 : recdata,
2305 291700 : &[*modification.tline.get_shard_identity()],
2306 291700 : lsn,
2307 291700 : modification.tline.pg_version,
2308 291700 : )
2309 291700 : .unwrap()
2310 291700 : .remove(modification.tline.get_shard_identity())
2311 291700 : .unwrap();
2312 291700 :
2313 291700 : walingest
2314 291700 : .ingest_record(interpreted, &mut modification, &ctx)
2315 291700 : .instrument(span.clone())
2316 291700 : .await
2317 291700 : .unwrap();
2318 4 : }
2319 949372 : modification.commit(&ctx).await.unwrap();
2320 4 : }
2321 4 :
2322 4 : let duration = started_at.elapsed();
2323 4 : println!("done in {:?}", duration);
2324 4 : }
2325 : }
|