Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> [`wal_decoder`] -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers.
9 : //! Records get decoded and interpreted in the [`wal_decoder`] module
10 : //! and then stored to the Repository by WalIngest.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
14 : //! extracts page images out of some WAL records, but mostly it's WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_rel_wal_record or put_rel_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::collections::HashMap;
25 : use std::sync::Arc;
26 : use std::sync::OnceLock;
27 : use std::time::Duration;
28 : use std::time::Instant;
29 : use std::time::SystemTime;
30 :
31 : use pageserver_api::shard::ShardIdentity;
32 : use postgres_ffi::fsm_logical_to_physical;
33 : use postgres_ffi::walrecord::*;
34 : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
35 : use wal_decoder::models::*;
36 :
37 : use anyhow::{bail, Result};
38 : use bytes::{Buf, Bytes};
39 : use tracing::*;
40 : use utils::failpoint_support;
41 : use utils::rate_limit::RateLimit;
42 :
43 : use crate::context::RequestContext;
44 : use crate::metrics::WAL_INGEST;
45 : use crate::pgdatadir_mapping::{DatadirModification, Version};
46 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
47 : use crate::tenant::PageReconstructError;
48 : use crate::tenant::Timeline;
49 : use crate::ZERO_PAGE;
50 : use pageserver_api::key::rel_block_to_key;
51 : use pageserver_api::record::NeonWalRecord;
52 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
53 : use postgres_ffi::pg_constants;
54 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
55 : use postgres_ffi::TransactionId;
56 : use utils::bin_ser::SerializeError;
57 : use utils::lsn::Lsn;
58 :
59 : enum_pgversion! {CheckPoint, pgv::CheckPoint}
60 :
61 : impl CheckPoint {
62 6 : fn encode(&self) -> Result<Bytes, SerializeError> {
63 6 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
64 6 : }
65 :
66 145834 : fn update_next_xid(&mut self, xid: u32) -> bool {
67 145834 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
68 145834 : }
69 :
70 0 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
71 0 : enum_pgversion_dispatch!(self, CheckPoint, cp, {
72 0 : cp.update_next_multixid(multi_xid, multi_offset)
73 : })
74 0 : }
75 : }
76 :
77 : /// Temporary limitation of WAL lag warnings after attach
78 : ///
79 : /// After tenant attach, we want to limit WAL lag warnings because
80 : /// we don't look at the WAL until the attach is complete, which
81 : /// might take a while.
82 : pub struct WalLagCooldown {
83 : /// Until when should this limitation apply at all
84 : active_until: std::time::Instant,
85 : /// The maximum lag to suppress. Lags above this limit get reported anyways.
86 : max_lag: Duration,
87 : }
88 :
89 : impl WalLagCooldown {
90 0 : pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
91 0 : Self {
92 0 : active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
93 0 : max_lag: attach_duration * 2 + Duration::from_secs(60),
94 0 : }
95 0 : }
96 : }
97 :
98 : pub struct WalIngest {
99 : attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
100 : shard: ShardIdentity,
101 : checkpoint: CheckPoint,
102 : checkpoint_modified: bool,
103 : warn_ingest_lag: WarnIngestLag,
104 : }
105 :
106 : struct WarnIngestLag {
107 : lag_msg_ratelimit: RateLimit,
108 : future_lsn_msg_ratelimit: RateLimit,
109 : timestamp_invalid_msg_ratelimit: RateLimit,
110 : }
111 :
112 : impl WalIngest {
113 12 : pub async fn new(
114 12 : timeline: &Timeline,
115 12 : startpoint: Lsn,
116 12 : ctx: &RequestContext,
117 12 : ) -> anyhow::Result<WalIngest> {
118 : // Fetch the latest checkpoint into memory, so that we can compare with it
119 : // quickly in `ingest_record` and update it when it changes.
120 12 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
121 12 : let pgversion = timeline.pg_version;
122 :
123 12 : let checkpoint = dispatch_pgversion!(pgversion, {
124 0 : let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
125 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
126 0 : <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
127 : });
128 :
129 12 : Ok(WalIngest {
130 12 : shard: *timeline.get_shard_identity(),
131 12 : checkpoint,
132 12 : checkpoint_modified: false,
133 12 : attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
134 12 : warn_ingest_lag: WarnIngestLag {
135 12 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
136 12 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
137 12 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
138 12 : },
139 12 : })
140 12 : }
141 :
142 : /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
143 : /// storage of a given timeline.
144 : ///
145 : /// This function updates `lsn` field of `DatadirModification`
146 : ///
147 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
148 145852 : pub async fn ingest_record(
149 145852 : &mut self,
150 145852 : interpreted: InterpretedWalRecord,
151 145852 : modification: &mut DatadirModification<'_>,
152 145852 : ctx: &RequestContext,
153 145852 : ) -> anyhow::Result<bool> {
154 145852 : WAL_INGEST.records_received.inc();
155 145852 : let prev_len = modification.len();
156 145852 :
157 145852 : modification.set_lsn(interpreted.next_record_lsn)?;
158 :
159 145852 : if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
160 : // Records of this type should always be preceded by a commit(), as they
161 : // rely on reading data pages back from the Timeline.
162 0 : assert!(!modification.has_dirty_data());
163 145852 : }
164 :
165 145852 : assert!(!self.checkpoint_modified);
166 145852 : if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
167 145834 : && self.checkpoint.update_next_xid(interpreted.xid)
168 2 : {
169 2 : self.checkpoint_modified = true;
170 145850 : }
171 :
172 145852 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
173 :
174 66 : match interpreted.metadata_record {
175 12 : Some(MetadataRecord::Heapam(rec)) => match rec {
176 12 : HeapamRecord::ClearVmBits(clear_vm_bits) => {
177 12 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
178 0 : .await?;
179 : }
180 : },
181 0 : Some(MetadataRecord::Neonrmgr(rec)) => match rec {
182 0 : NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
183 0 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
184 0 : .await?;
185 : }
186 : },
187 16 : Some(MetadataRecord::Smgr(rec)) => match rec {
188 16 : SmgrRecord::Create(create) => {
189 16 : self.ingest_xlog_smgr_create(create, modification, ctx)
190 12 : .await?;
191 : }
192 0 : SmgrRecord::Truncate(truncate) => {
193 0 : self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
194 0 : .await?;
195 : }
196 : },
197 0 : Some(MetadataRecord::Dbase(rec)) => match rec {
198 0 : DbaseRecord::Create(create) => {
199 0 : self.ingest_xlog_dbase_create(create, modification, ctx)
200 0 : .await?;
201 : }
202 0 : DbaseRecord::Drop(drop) => {
203 0 : self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
204 : }
205 : },
206 0 : Some(MetadataRecord::Clog(rec)) => match rec {
207 0 : ClogRecord::ZeroPage(zero_page) => {
208 0 : self.ingest_clog_zero_page(zero_page, modification, ctx)
209 0 : .await?;
210 : }
211 0 : ClogRecord::Truncate(truncate) => {
212 0 : self.ingest_clog_truncate(truncate, modification, ctx)
213 0 : .await?;
214 : }
215 : },
216 8 : Some(MetadataRecord::Xact(rec)) => {
217 8 : self.ingest_xact_record(rec, modification, ctx).await?;
218 : }
219 0 : Some(MetadataRecord::MultiXact(rec)) => match rec {
220 0 : MultiXactRecord::ZeroPage(zero_page) => {
221 0 : self.ingest_multixact_zero_page(zero_page, modification, ctx)
222 0 : .await?;
223 : }
224 0 : MultiXactRecord::Create(create) => {
225 0 : self.ingest_multixact_create(modification, &create)?;
226 : }
227 0 : MultiXactRecord::Truncate(truncate) => {
228 0 : self.ingest_multixact_truncate(modification, &truncate, ctx)
229 0 : .await?;
230 : }
231 : },
232 0 : Some(MetadataRecord::Relmap(rec)) => match rec {
233 0 : RelmapRecord::Update(update) => {
234 0 : self.ingest_relmap_update(update, modification, ctx).await?;
235 : }
236 : },
237 30 : Some(MetadataRecord::Xlog(rec)) => match rec {
238 30 : XlogRecord::Raw(raw) => {
239 30 : self.ingest_raw_xlog_record(raw, modification, ctx).await?;
240 : }
241 : },
242 0 : Some(MetadataRecord::LogicalMessage(rec)) => match rec {
243 0 : LogicalMessageRecord::Put(put) => {
244 0 : self.ingest_logical_message_put(put, modification, ctx)
245 0 : .await?;
246 : }
247 : #[cfg(feature = "testing")]
248 : LogicalMessageRecord::Failpoint => {
249 : // This is a convenient way to make the WAL ingestion pause at
250 : // particular point in the WAL. For more fine-grained control,
251 : // we could peek into the message and only pause if it contains
252 : // a particular string, for example, but this is enough for now.
253 0 : failpoint_support::sleep_millis_async!(
254 0 : "pageserver-wal-ingest-logical-message-sleep"
255 0 : );
256 : }
257 : },
258 0 : Some(MetadataRecord::Standby(rec)) => {
259 0 : self.ingest_standby_record(rec).unwrap();
260 0 : }
261 0 : Some(MetadataRecord::Replorigin(rec)) => {
262 0 : self.ingest_replorigin_record(rec, modification).await?;
263 : }
264 145786 : None => {
265 145786 : // There are two cases through which we end up here:
266 145786 : // 1. The resource manager for the original PG WAL record
267 145786 : // is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
268 145786 : // record type within Neon.
269 145786 : // 2. The resource manager id was unknown to
270 145786 : // [`wal_decoder::decoder::MetadataRecord::from_decoded`].
271 145786 : // TODO(vlad): Tighten this up more once we build confidence
272 145786 : // that case (2) does not happen in the field.
273 145786 : }
274 : }
275 :
276 145852 : modification
277 145852 : .ingest_batch(interpreted.batch, &self.shard, ctx)
278 284 : .await?;
279 :
280 : // If checkpoint data was updated, store the new version in the repository
281 145852 : if self.checkpoint_modified {
282 6 : let new_checkpoint_bytes = self.checkpoint.encode()?;
283 :
284 6 : modification.put_checkpoint(new_checkpoint_bytes)?;
285 6 : self.checkpoint_modified = false;
286 145846 : }
287 :
288 : // Note that at this point this record is only cached in the modification
289 : // until commit() is called to flush the data into the repository and update
290 : // the latest LSN.
291 :
292 145852 : Ok(modification.len() > prev_len)
293 145852 : }
294 :
295 : /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
296 0 : fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
297 0 : let next_full_xid =
298 0 : enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
299 :
300 0 : let next_xid = (next_full_xid) as u32;
301 0 : let mut epoch = (next_full_xid >> 32) as u32;
302 0 :
303 0 : if xid > next_xid {
304 : // Wraparound occurred, must be from a prev epoch.
305 0 : if epoch == 0 {
306 0 : bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
307 0 : }
308 0 : epoch -= 1;
309 0 : }
310 :
311 0 : Ok((epoch as u64) << 32 | xid as u64)
312 0 : }
313 :
314 12 : async fn ingest_clear_vm_bits(
315 12 : &mut self,
316 12 : clear_vm_bits: ClearVmBits,
317 12 : modification: &mut DatadirModification<'_>,
318 12 : ctx: &RequestContext,
319 12 : ) -> anyhow::Result<()> {
320 12 : let ClearVmBits {
321 12 : new_heap_blkno,
322 12 : old_heap_blkno,
323 12 : flags,
324 12 : vm_rel,
325 12 : } = clear_vm_bits;
326 12 : // Clear the VM bits if required.
327 12 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
328 12 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
329 :
330 : // Sometimes, Postgres seems to create heap WAL records with the
331 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
332 : // not set. In fact, it's possible that the VM page does not exist at all.
333 : // In that case, we don't want to store a record to clear the VM bit;
334 : // replaying it would fail to find the previous image of the page, because
335 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
336 : // record if it doesn't.
337 12 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
338 12 : if let Some(blknum) = new_vm_blk {
339 12 : if blknum >= vm_size {
340 0 : new_vm_blk = None;
341 12 : }
342 0 : }
343 12 : if let Some(blknum) = old_vm_blk {
344 0 : if blknum >= vm_size {
345 0 : old_vm_blk = None;
346 0 : }
347 12 : }
348 :
349 12 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
350 12 : if new_vm_blk == old_vm_blk {
351 : // An UPDATE record that needs to clear the bits for both old and the
352 : // new page, both of which reside on the same VM page.
353 0 : self.put_rel_wal_record(
354 0 : modification,
355 0 : vm_rel,
356 0 : new_vm_blk.unwrap(),
357 0 : NeonWalRecord::ClearVisibilityMapFlags {
358 0 : new_heap_blkno,
359 0 : old_heap_blkno,
360 0 : flags,
361 0 : },
362 0 : ctx,
363 0 : )
364 0 : .await?;
365 : } else {
366 : // Clear VM bits for one heap page, or for two pages that reside on
367 : // different VM pages.
368 12 : if let Some(new_vm_blk) = new_vm_blk {
369 12 : self.put_rel_wal_record(
370 12 : modification,
371 12 : vm_rel,
372 12 : new_vm_blk,
373 12 : NeonWalRecord::ClearVisibilityMapFlags {
374 12 : new_heap_blkno,
375 12 : old_heap_blkno: None,
376 12 : flags,
377 12 : },
378 12 : ctx,
379 12 : )
380 0 : .await?;
381 0 : }
382 12 : if let Some(old_vm_blk) = old_vm_blk {
383 0 : self.put_rel_wal_record(
384 0 : modification,
385 0 : vm_rel,
386 0 : old_vm_blk,
387 0 : NeonWalRecord::ClearVisibilityMapFlags {
388 0 : new_heap_blkno: None,
389 0 : old_heap_blkno,
390 0 : flags,
391 0 : },
392 0 : ctx,
393 0 : )
394 0 : .await?;
395 12 : }
396 : }
397 0 : }
398 :
399 12 : Ok(())
400 12 : }
401 :
402 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
403 0 : async fn ingest_xlog_dbase_create(
404 0 : &mut self,
405 0 : create: DbaseCreate,
406 0 : modification: &mut DatadirModification<'_>,
407 0 : ctx: &RequestContext,
408 0 : ) -> anyhow::Result<()> {
409 0 : let DbaseCreate {
410 0 : db_id,
411 0 : tablespace_id,
412 0 : src_db_id,
413 0 : src_tablespace_id,
414 0 : } = create;
415 :
416 0 : let rels = modification
417 0 : .tline
418 0 : .list_rels(
419 0 : src_tablespace_id,
420 0 : src_db_id,
421 0 : Version::Modified(modification),
422 0 : ctx,
423 0 : )
424 0 : .await?;
425 :
426 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
427 :
428 : // Copy relfilemap
429 0 : let filemap = modification
430 0 : .tline
431 0 : .get_relmap_file(
432 0 : src_tablespace_id,
433 0 : src_db_id,
434 0 : Version::Modified(modification),
435 0 : ctx,
436 0 : )
437 0 : .await?;
438 0 : modification
439 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
440 0 : .await?;
441 :
442 0 : let mut num_rels_copied = 0;
443 0 : let mut num_blocks_copied = 0;
444 0 : for src_rel in rels {
445 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
446 0 : assert_eq!(src_rel.dbnode, src_db_id);
447 :
448 0 : let nblocks = modification
449 0 : .tline
450 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
451 0 : .await?;
452 0 : let dst_rel = RelTag {
453 0 : spcnode: tablespace_id,
454 0 : dbnode: db_id,
455 0 : relnode: src_rel.relnode,
456 0 : forknum: src_rel.forknum,
457 0 : };
458 0 :
459 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
460 :
461 : // Copy content
462 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
463 0 : for blknum in 0..nblocks {
464 : // Sharding:
465 : // - src and dst are always on the same shard, because they differ only by dbNode, and
466 : // dbNode is not included in the hash inputs for sharding.
467 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
468 : // that belong to it.
469 0 : let src_key = rel_block_to_key(src_rel, blknum);
470 0 : if !self.shard.is_key_local(&src_key) {
471 0 : debug!(
472 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
473 : src_key
474 : );
475 0 : continue;
476 0 : }
477 0 : debug!(
478 0 : "copying block {} from {} ({}) to {}",
479 : blknum, src_rel, src_key, dst_rel
480 : );
481 :
482 0 : let content = modification
483 0 : .tline
484 0 : .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
485 0 : .await?;
486 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
487 0 : num_blocks_copied += 1;
488 : }
489 :
490 0 : num_rels_copied += 1;
491 : }
492 :
493 0 : info!(
494 0 : "Created database {}/{}, copied {} blocks in {} rels",
495 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
496 : );
497 0 : Ok(())
498 0 : }
499 :
500 0 : async fn ingest_xlog_dbase_drop(
501 0 : &mut self,
502 0 : dbase_drop: DbaseDrop,
503 0 : modification: &mut DatadirModification<'_>,
504 0 : ctx: &RequestContext,
505 0 : ) -> anyhow::Result<()> {
506 0 : let DbaseDrop {
507 0 : db_id,
508 0 : tablespace_ids,
509 0 : } = dbase_drop;
510 0 : for tablespace_id in tablespace_ids {
511 0 : trace!("Drop db {}, {}", tablespace_id, db_id);
512 0 : modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
513 : }
514 :
515 0 : Ok(())
516 0 : }
517 :
518 16 : async fn ingest_xlog_smgr_create(
519 16 : &mut self,
520 16 : create: SmgrCreate,
521 16 : modification: &mut DatadirModification<'_>,
522 16 : ctx: &RequestContext,
523 16 : ) -> anyhow::Result<()> {
524 16 : let SmgrCreate { rel } = create;
525 16 : self.put_rel_creation(modification, rel, ctx).await?;
526 16 : Ok(())
527 16 : }
528 :
529 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
530 : ///
531 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
532 0 : async fn ingest_xlog_smgr_truncate(
533 0 : &mut self,
534 0 : truncate: XlSmgrTruncate,
535 0 : modification: &mut DatadirModification<'_>,
536 0 : ctx: &RequestContext,
537 0 : ) -> anyhow::Result<()> {
538 0 : let XlSmgrTruncate {
539 0 : blkno,
540 0 : rnode,
541 0 : flags,
542 0 : } = truncate;
543 0 :
544 0 : let spcnode = rnode.spcnode;
545 0 : let dbnode = rnode.dbnode;
546 0 : let relnode = rnode.relnode;
547 0 :
548 0 : if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
549 0 : let rel = RelTag {
550 0 : spcnode,
551 0 : dbnode,
552 0 : relnode,
553 0 : forknum: MAIN_FORKNUM,
554 0 : };
555 0 :
556 0 : self.put_rel_truncation(modification, rel, blkno, ctx)
557 0 : .await?;
558 0 : }
559 0 : if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
560 0 : let rel = RelTag {
561 0 : spcnode,
562 0 : dbnode,
563 0 : relnode,
564 0 : forknum: FSM_FORKNUM,
565 0 : };
566 0 :
567 0 : let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
568 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
569 0 : if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
570 : // Tail of last remaining FSM page has to be zeroed.
571 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
572 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
573 0 : fsm_physical_page_no += 1;
574 0 : }
575 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
576 0 : if nblocks > fsm_physical_page_no {
577 : // check if something to do: FSM is larger than truncate position
578 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
579 0 : .await?;
580 0 : }
581 0 : }
582 0 : if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
583 0 : let rel = RelTag {
584 0 : spcnode,
585 0 : dbnode,
586 0 : relnode,
587 0 : forknum: VISIBILITYMAP_FORKNUM,
588 0 : };
589 0 :
590 0 : // last remaining block, byte, and bit
591 0 : let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
592 0 : let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
593 0 : / pg_constants::VM_HEAPBLOCKS_PER_BYTE;
594 0 : let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
595 0 : * pg_constants::VM_BITS_PER_HEAPBLOCK;
596 0 :
597 0 : // Unless the new size is exactly at a visibility map page boundary, the
598 0 : // tail bits in the last remaining map page, representing truncated heap
599 0 : // blocks, need to be cleared. This is not only tidy, but also necessary
600 0 : // because we don't get a chance to clear the bits if the heap is extended
601 0 : // again.
602 0 : if (trunc_byte != 0 || trunc_offs != 0)
603 0 : && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
604 : {
605 0 : modification.put_rel_wal_record(
606 0 : rel,
607 0 : vm_page_no,
608 0 : NeonWalRecord::TruncateVisibilityMap {
609 0 : trunc_byte,
610 0 : trunc_offs,
611 0 : },
612 0 : )?;
613 0 : vm_page_no += 1;
614 0 : }
615 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
616 0 : if nblocks > vm_page_no {
617 : // check if something to do: VM is larger than truncate position
618 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
619 0 : .await?;
620 0 : }
621 0 : }
622 0 : Ok(())
623 0 : }
624 :
625 8 : fn warn_on_ingest_lag(
626 8 : &mut self,
627 8 : conf: &crate::config::PageServerConf,
628 8 : wal_timestamp: TimestampTz,
629 8 : ) {
630 8 : debug_assert_current_span_has_tenant_and_timeline_id();
631 8 : let now = SystemTime::now();
632 8 : let rate_limits = &mut self.warn_ingest_lag;
633 :
634 8 : let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
635 0 : pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
636 : });
637 :
638 8 : match ts {
639 8 : Ok(ts) => {
640 8 : match now.duration_since(ts) {
641 8 : Ok(lag) => {
642 8 : if lag > conf.wait_lsn_timeout {
643 8 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
644 2 : if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
645 0 : if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
646 0 : return;
647 0 : }
648 2 : } else {
649 2 : // Still loading? We shouldn't be here
650 2 : }
651 2 : let lag = humantime::format_duration(lag);
652 2 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
653 8 : })
654 0 : }
655 : }
656 0 : Err(e) => {
657 0 : let delta_t = e.duration();
658 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
659 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
660 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
661 0 : if delta_t > IGNORED_DRIFT {
662 0 : let delta_t = humantime::format_duration(delta_t);
663 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
664 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
665 0 : })
666 0 : }
667 : }
668 : };
669 : }
670 0 : Err(error) => {
671 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
672 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
673 0 : })
674 : }
675 : }
676 8 : }
677 :
678 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
679 : ///
680 8 : async fn ingest_xact_record(
681 8 : &mut self,
682 8 : record: XactRecord,
683 8 : modification: &mut DatadirModification<'_>,
684 8 : ctx: &RequestContext,
685 8 : ) -> anyhow::Result<()> {
686 8 : let (xact_common, is_commit, is_prepared) = match record {
687 0 : XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
688 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
689 0 : self.adjust_to_full_transaction_id(xl_xid)?
690 : } else {
691 0 : xl_xid as u64
692 : };
693 0 : return modification.put_twophase_file(xid, data, ctx).await;
694 : }
695 8 : XactRecord::Commit(common) => (common, true, false),
696 0 : XactRecord::Abort(common) => (common, false, false),
697 0 : XactRecord::CommitPrepared(common) => (common, true, true),
698 0 : XactRecord::AbortPrepared(common) => (common, false, true),
699 : };
700 :
701 : let XactCommon {
702 8 : parsed,
703 8 : origin_id,
704 8 : xl_xid,
705 8 : lsn,
706 8 : } = xact_common;
707 8 :
708 8 : // Record update of CLOG pages
709 8 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
710 8 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
711 8 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
712 8 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
713 8 :
714 8 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
715 :
716 8 : for subxact in &parsed.subxacts {
717 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
718 0 : if subxact_pageno != pageno {
719 : // This subxact goes to different page. Write the record
720 : // for all the XIDs on the previous page, and continue
721 : // accumulating XIDs on this new page.
722 0 : modification.put_slru_wal_record(
723 0 : SlruKind::Clog,
724 0 : segno,
725 0 : rpageno,
726 0 : if is_commit {
727 0 : NeonWalRecord::ClogSetCommitted {
728 0 : xids: page_xids,
729 0 : timestamp: parsed.xact_time,
730 0 : }
731 : } else {
732 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
733 : },
734 0 : )?;
735 0 : page_xids = Vec::new();
736 0 : }
737 0 : pageno = subxact_pageno;
738 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
739 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
740 0 : page_xids.push(*subxact);
741 : }
742 8 : modification.put_slru_wal_record(
743 8 : SlruKind::Clog,
744 8 : segno,
745 8 : rpageno,
746 8 : if is_commit {
747 8 : NeonWalRecord::ClogSetCommitted {
748 8 : xids: page_xids,
749 8 : timestamp: parsed.xact_time,
750 8 : }
751 : } else {
752 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
753 : },
754 0 : )?;
755 :
756 : // Group relations to drop by dbNode. This map will contain all relations that _might_
757 : // exist, we will reduce it to which ones really exist later. This map can be huge if
758 : // the transaction touches a huge number of relations (there is no bound on this in
759 : // postgres).
760 8 : let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
761 :
762 8 : for xnode in &parsed.xnodes {
763 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
764 0 : let rel = RelTag {
765 0 : forknum,
766 0 : spcnode: xnode.spcnode,
767 0 : dbnode: xnode.dbnode,
768 0 : relnode: xnode.relnode,
769 0 : };
770 0 : drop_relations
771 0 : .entry((xnode.spcnode, xnode.dbnode))
772 0 : .or_default()
773 0 : .push(rel);
774 0 : }
775 : }
776 :
777 : // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
778 8 : modification.put_rel_drops(drop_relations, ctx).await?;
779 :
780 8 : if origin_id != 0 {
781 0 : modification
782 0 : .set_replorigin(origin_id, parsed.origin_lsn)
783 0 : .await?;
784 8 : }
785 :
786 8 : if is_prepared {
787 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
788 0 : trace!(
789 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
790 : xl_xid,
791 : parsed.xid,
792 : lsn,
793 : );
794 :
795 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
796 0 : self.adjust_to_full_transaction_id(parsed.xid)?
797 : } else {
798 0 : parsed.xid as u64
799 : };
800 0 : modification.drop_twophase_file(xid, ctx).await?;
801 8 : }
802 :
803 8 : Ok(())
804 8 : }
805 :
806 0 : async fn ingest_clog_truncate(
807 0 : &mut self,
808 0 : truncate: ClogTruncate,
809 0 : modification: &mut DatadirModification<'_>,
810 0 : ctx: &RequestContext,
811 0 : ) -> anyhow::Result<()> {
812 0 : let ClogTruncate {
813 0 : pageno,
814 0 : oldest_xid,
815 0 : oldest_xid_db,
816 0 : } = truncate;
817 0 :
818 0 : info!(
819 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
820 : pageno, oldest_xid, oldest_xid_db
821 : );
822 :
823 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
824 : // truncated, but a checkpoint record with the updated values isn't written until
825 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
826 : // so we keep the oldestXid and oldestXidDB up-to-date.
827 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
828 0 : cp.oldestXid = oldest_xid;
829 0 : cp.oldestXidDB = oldest_xid_db;
830 0 : });
831 0 : self.checkpoint_modified = true;
832 :
833 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
834 :
835 0 : let latest_page_number =
836 0 : enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
837 : / pg_constants::CLOG_XACTS_PER_PAGE;
838 :
839 : // Now delete all segments containing pages between xlrec.pageno
840 : // and latest_page_number.
841 :
842 : // First, make an important safety check:
843 : // the current endpoint page must not be eligible for removal.
844 : // See SimpleLruTruncate() in slru.c
845 0 : if dispatch_pgversion!(modification.tline.pg_version, {
846 0 : pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
847 : }) {
848 0 : info!("could not truncate directory pg_xact apparent wraparound");
849 0 : return Ok(());
850 0 : }
851 :
852 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
853 : //
854 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
855 : // will block waiting for the last valid LSN to advance up to
856 : // it. So we use the previous record's LSN in the get calls
857 : // instead.
858 0 : for segno in modification
859 0 : .tline
860 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
861 0 : .await?
862 : {
863 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
864 :
865 0 : let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
866 0 : pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
867 : });
868 :
869 0 : if may_delete {
870 0 : modification
871 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
872 0 : .await?;
873 0 : trace!("Drop CLOG segment {:>04X}", segno);
874 0 : }
875 : }
876 :
877 0 : Ok(())
878 0 : }
879 :
880 0 : async fn ingest_clog_zero_page(
881 0 : &mut self,
882 0 : zero_page: ClogZeroPage,
883 0 : modification: &mut DatadirModification<'_>,
884 0 : ctx: &RequestContext,
885 0 : ) -> anyhow::Result<()> {
886 0 : let ClogZeroPage { segno, rpageno } = zero_page;
887 0 :
888 0 : self.put_slru_page_image(
889 0 : modification,
890 0 : SlruKind::Clog,
891 0 : segno,
892 0 : rpageno,
893 0 : ZERO_PAGE.clone(),
894 0 : ctx,
895 0 : )
896 0 : .await
897 0 : }
898 :
899 0 : fn ingest_multixact_create(
900 0 : &mut self,
901 0 : modification: &mut DatadirModification,
902 0 : xlrec: &XlMultiXactCreate,
903 0 : ) -> Result<()> {
904 0 : // Create WAL record for updating the multixact-offsets page
905 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
906 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
907 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
908 0 :
909 0 : modification.put_slru_wal_record(
910 0 : SlruKind::MultiXactOffsets,
911 0 : segno,
912 0 : rpageno,
913 0 : NeonWalRecord::MultixactOffsetCreate {
914 0 : mid: xlrec.mid,
915 0 : moff: xlrec.moff,
916 0 : },
917 0 : )?;
918 :
919 : // Create WAL records for the update of each affected multixact-members page
920 0 : let mut members = xlrec.members.iter();
921 0 : let mut offset = xlrec.moff;
922 : loop {
923 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
924 0 :
925 0 : // How many members fit on this page?
926 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
927 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
928 0 :
929 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
930 0 : for _ in 0..page_remain {
931 0 : if let Some(m) = members.next() {
932 0 : this_page_members.push(m.clone());
933 0 : } else {
934 0 : break;
935 : }
936 : }
937 0 : if this_page_members.is_empty() {
938 : // all done
939 0 : break;
940 0 : }
941 0 : let n_this_page = this_page_members.len();
942 0 :
943 0 : modification.put_slru_wal_record(
944 0 : SlruKind::MultiXactMembers,
945 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
946 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
947 0 : NeonWalRecord::MultixactMembersCreate {
948 0 : moff: offset,
949 0 : members: this_page_members,
950 0 : },
951 0 : )?;
952 :
953 : // Note: The multixact members can wrap around, even within one WAL record.
954 0 : offset = offset.wrapping_add(n_this_page as u32);
955 : }
956 0 : let next_offset = offset;
957 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
958 :
959 : // Update next-multi-xid and next-offset
960 : //
961 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
962 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
963 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
964 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
965 : // is stored, so it's never 0 in a checkpoint.
966 : //
967 : // I don't know why it's done that way, it seems less error-prone to skip over 0
968 : // when the value is stored rather than when it's read. But let's do it the same
969 : // way here.
970 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
971 0 :
972 0 : if self
973 0 : .checkpoint
974 0 : .update_next_multixid(next_multi_xid, next_offset)
975 0 : {
976 0 : self.checkpoint_modified = true;
977 0 : }
978 :
979 : // Also update the next-xid with the highest member. According to the comments in
980 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
981 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
982 0 : if let Some(max_xid) = acc {
983 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
984 0 : Some(mbr.xid)
985 : } else {
986 0 : acc
987 : }
988 : } else {
989 0 : Some(mbr.xid)
990 : }
991 0 : });
992 :
993 0 : if let Some(max_xid) = max_mbr_xid {
994 0 : if self.checkpoint.update_next_xid(max_xid) {
995 0 : self.checkpoint_modified = true;
996 0 : }
997 0 : }
998 0 : Ok(())
999 0 : }
1000 :
1001 0 : async fn ingest_multixact_truncate(
1002 0 : &mut self,
1003 0 : modification: &mut DatadirModification<'_>,
1004 0 : xlrec: &XlMultiXactTruncate,
1005 0 : ctx: &RequestContext,
1006 0 : ) -> Result<()> {
1007 0 : let (maxsegment, startsegment, endsegment) =
1008 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1009 0 : cp.oldestMulti = xlrec.end_trunc_off;
1010 0 : cp.oldestMultiDB = xlrec.oldest_multi_db;
1011 0 : let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
1012 0 : pg_constants::MAX_MULTIXACT_OFFSET,
1013 0 : );
1014 0 : let startsegment: i32 =
1015 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
1016 0 : let endsegment: i32 =
1017 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
1018 0 : (maxsegment, startsegment, endsegment)
1019 : });
1020 :
1021 0 : self.checkpoint_modified = true;
1022 0 :
1023 0 : // PerformMembersTruncation
1024 0 : let mut segment: i32 = startsegment;
1025 :
1026 : // Delete all the segments except the last one. The last segment can still
1027 : // contain, possibly partially, valid data.
1028 0 : while segment != endsegment {
1029 0 : modification
1030 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1031 0 : .await?;
1032 :
1033 : /* move to next segment, handling wraparound correctly */
1034 0 : if segment == maxsegment {
1035 0 : segment = 0;
1036 0 : } else {
1037 0 : segment += 1;
1038 0 : }
1039 : }
1040 :
1041 : // Truncate offsets
1042 : // FIXME: this did not handle wraparound correctly
1043 :
1044 0 : Ok(())
1045 0 : }
1046 :
1047 0 : async fn ingest_multixact_zero_page(
1048 0 : &mut self,
1049 0 : zero_page: MultiXactZeroPage,
1050 0 : modification: &mut DatadirModification<'_>,
1051 0 : ctx: &RequestContext,
1052 0 : ) -> Result<()> {
1053 0 : let MultiXactZeroPage {
1054 0 : slru_kind,
1055 0 : segno,
1056 0 : rpageno,
1057 0 : } = zero_page;
1058 0 : self.put_slru_page_image(
1059 0 : modification,
1060 0 : slru_kind,
1061 0 : segno,
1062 0 : rpageno,
1063 0 : ZERO_PAGE.clone(),
1064 0 : ctx,
1065 0 : )
1066 0 : .await
1067 0 : }
1068 :
1069 0 : async fn ingest_relmap_update(
1070 0 : &mut self,
1071 0 : update: RelmapUpdate,
1072 0 : modification: &mut DatadirModification<'_>,
1073 0 : ctx: &RequestContext,
1074 0 : ) -> Result<()> {
1075 0 : let RelmapUpdate { update, buf } = update;
1076 0 :
1077 0 : modification
1078 0 : .put_relmap_file(update.tsid, update.dbid, buf, ctx)
1079 0 : .await
1080 0 : }
1081 :
1082 30 : async fn ingest_raw_xlog_record(
1083 30 : &mut self,
1084 30 : raw_record: RawXlogRecord,
1085 30 : modification: &mut DatadirModification<'_>,
1086 30 : ctx: &RequestContext,
1087 30 : ) -> Result<()> {
1088 30 : let RawXlogRecord { info, lsn, mut buf } = raw_record;
1089 30 : let pg_version = modification.tline.pg_version;
1090 30 :
1091 30 : if info == pg_constants::XLOG_PARAMETER_CHANGE {
1092 2 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1093 0 : let rec = v17::XlParameterChange::decode(&mut buf);
1094 0 : cp.wal_level = rec.wal_level;
1095 0 : self.checkpoint_modified = true;
1096 2 : }
1097 28 : } else if info == pg_constants::XLOG_END_OF_RECOVERY {
1098 0 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1099 0 : let rec = v17::XlEndOfRecovery::decode(&mut buf);
1100 0 : cp.wal_level = rec.wal_level;
1101 0 : self.checkpoint_modified = true;
1102 0 : }
1103 28 : }
1104 :
1105 30 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1106 0 : if info == pg_constants::XLOG_NEXTOID {
1107 0 : let next_oid = buf.get_u32_le();
1108 0 : if cp.nextOid != next_oid {
1109 0 : cp.nextOid = next_oid;
1110 0 : self.checkpoint_modified = true;
1111 0 : }
1112 0 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
1113 0 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1114 : {
1115 0 : let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
1116 0 : buf.copy_to_slice(&mut checkpoint_bytes);
1117 0 : let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
1118 0 : trace!(
1119 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
1120 : xlog_checkpoint.oldestXid,
1121 : cp.oldestXid
1122 : );
1123 0 : if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
1124 0 : cp.oldestXid = xlog_checkpoint.oldestXid;
1125 0 : }
1126 0 : trace!(
1127 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
1128 : xlog_checkpoint.oldestActiveXid,
1129 : cp.oldestActiveXid
1130 : );
1131 :
1132 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
1133 : // because at shutdown, all in-progress transactions will implicitly
1134 : // end. Postgres startup code knows that, and allows hot standby to start
1135 : // immediately from a shutdown checkpoint.
1136 : //
1137 : // In Neon, Postgres hot standby startup always behaves as if starting from
1138 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
1139 : // instead of overwriting self.checkpoint.oldestActiveXid with
1140 : // InvalidTransactionid from the checkpoint WAL record, update it to a
1141 : // proper value, knowing that there are no in-progress transactions at this
1142 : // point, except for prepared transactions.
1143 : //
1144 : // See also the neon code changes in the InitWalRecovery() function.
1145 0 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
1146 0 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1147 : {
1148 0 : let oldest_active_xid = if pg_version >= 17 {
1149 0 : let mut oldest_active_full_xid = cp.nextXid.value;
1150 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1151 0 : if xid < oldest_active_full_xid {
1152 0 : oldest_active_full_xid = xid;
1153 0 : }
1154 : }
1155 0 : oldest_active_full_xid as u32
1156 : } else {
1157 0 : let mut oldest_active_xid = cp.nextXid.value as u32;
1158 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1159 0 : let narrow_xid = xid as u32;
1160 0 : if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
1161 0 : oldest_active_xid = narrow_xid;
1162 0 : }
1163 : }
1164 0 : oldest_active_xid
1165 : };
1166 0 : cp.oldestActiveXid = oldest_active_xid;
1167 0 : } else {
1168 0 : cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
1169 0 : }
1170 :
1171 : // Write a new checkpoint key-value pair on every checkpoint record, even
1172 : // if nothing really changed. Not strictly required, but it seems nice to
1173 : // have some trace of the checkpoint records in the layer files at the same
1174 : // LSNs.
1175 0 : self.checkpoint_modified = true;
1176 0 : }
1177 : });
1178 :
1179 30 : Ok(())
1180 30 : }
1181 :
1182 0 : async fn ingest_logical_message_put(
1183 0 : &mut self,
1184 0 : put: PutLogicalMessage,
1185 0 : modification: &mut DatadirModification<'_>,
1186 0 : ctx: &RequestContext,
1187 0 : ) -> Result<()> {
1188 0 : let PutLogicalMessage { path, buf } = put;
1189 0 : modification.put_file(path.as_str(), &buf, ctx).await
1190 0 : }
1191 :
1192 0 : fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
1193 0 : match record {
1194 0 : StandbyRecord::RunningXacts(running_xacts) => {
1195 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1196 0 : cp.oldestActiveXid = running_xacts.oldest_running_xid;
1197 0 : });
1198 :
1199 0 : self.checkpoint_modified = true;
1200 0 : }
1201 0 : }
1202 0 :
1203 0 : Ok(())
1204 0 : }
1205 :
1206 0 : async fn ingest_replorigin_record(
1207 0 : &mut self,
1208 0 : record: ReploriginRecord,
1209 0 : modification: &mut DatadirModification<'_>,
1210 0 : ) -> Result<()> {
1211 0 : match record {
1212 0 : ReploriginRecord::Set(set) => {
1213 0 : modification
1214 0 : .set_replorigin(set.node_id, set.remote_lsn)
1215 0 : .await?;
1216 : }
1217 0 : ReploriginRecord::Drop(drop) => {
1218 0 : modification.drop_replorigin(drop.node_id).await?;
1219 : }
1220 : }
1221 :
1222 0 : Ok(())
1223 0 : }
1224 :
1225 18 : async fn put_rel_creation(
1226 18 : &mut self,
1227 18 : modification: &mut DatadirModification<'_>,
1228 18 : rel: RelTag,
1229 18 : ctx: &RequestContext,
1230 18 : ) -> Result<()> {
1231 18 : modification.put_rel_creation(rel, 0, ctx).await?;
1232 18 : Ok(())
1233 18 : }
1234 :
1235 : #[cfg(test)]
1236 272402 : async fn put_rel_page_image(
1237 272402 : &mut self,
1238 272402 : modification: &mut DatadirModification<'_>,
1239 272402 : rel: RelTag,
1240 272402 : blknum: BlockNumber,
1241 272402 : img: Bytes,
1242 272402 : ctx: &RequestContext,
1243 272402 : ) -> Result<(), PageReconstructError> {
1244 272402 : self.handle_rel_extend(modification, rel, blknum, ctx)
1245 5420 : .await?;
1246 272402 : modification.put_rel_page_image(rel, blknum, img)?;
1247 272402 : Ok(())
1248 272402 : }
1249 :
1250 12 : async fn put_rel_wal_record(
1251 12 : &mut self,
1252 12 : modification: &mut DatadirModification<'_>,
1253 12 : rel: RelTag,
1254 12 : blknum: BlockNumber,
1255 12 : rec: NeonWalRecord,
1256 12 : ctx: &RequestContext,
1257 12 : ) -> Result<()> {
1258 12 : self.handle_rel_extend(modification, rel, blknum, ctx)
1259 0 : .await?;
1260 12 : modification.put_rel_wal_record(rel, blknum, rec)?;
1261 12 : Ok(())
1262 12 : }
1263 :
1264 6012 : async fn put_rel_truncation(
1265 6012 : &mut self,
1266 6012 : modification: &mut DatadirModification<'_>,
1267 6012 : rel: RelTag,
1268 6012 : nblocks: BlockNumber,
1269 6012 : ctx: &RequestContext,
1270 6012 : ) -> anyhow::Result<()> {
1271 6012 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1272 6012 : Ok(())
1273 6012 : }
1274 :
1275 272414 : async fn handle_rel_extend(
1276 272414 : &mut self,
1277 272414 : modification: &mut DatadirModification<'_>,
1278 272414 : rel: RelTag,
1279 272414 : blknum: BlockNumber,
1280 272414 : ctx: &RequestContext,
1281 272414 : ) -> Result<(), PageReconstructError> {
1282 272414 : let new_nblocks = blknum + 1;
1283 : // Check if the relation exists. We implicitly create relations on first
1284 : // record.
1285 272414 : let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
1286 :
1287 272414 : if new_nblocks > old_nblocks {
1288 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1289 272398 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1290 :
1291 272398 : let mut key = rel_block_to_key(rel, blknum);
1292 272398 :
1293 272398 : // fill the gap with zeros
1294 272398 : let mut gap_blocks_filled: u64 = 0;
1295 272398 : for gap_blknum in old_nblocks..blknum {
1296 2998 : key.field6 = gap_blknum;
1297 2998 :
1298 2998 : if self.shard.get_shard_number(&key) != self.shard.number {
1299 0 : continue;
1300 2998 : }
1301 2998 :
1302 2998 : modification.put_rel_page_image_zero(rel, gap_blknum)?;
1303 2998 : gap_blocks_filled += 1;
1304 : }
1305 :
1306 272398 : WAL_INGEST
1307 272398 : .gap_blocks_zeroed_on_rel_extend
1308 272398 : .inc_by(gap_blocks_filled);
1309 272398 :
1310 272398 : // Log something when relation extends cause use to fill gaps
1311 272398 : // with zero pages. Logging is rate limited per pg version to
1312 272398 : // avoid skewing.
1313 272398 : if gap_blocks_filled > 0 {
1314 : use once_cell::sync::Lazy;
1315 : use std::sync::Mutex;
1316 : use utils::rate_limit::RateLimit;
1317 :
1318 : struct RateLimitPerPgVersion {
1319 : rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
1320 : }
1321 :
1322 : impl RateLimitPerPgVersion {
1323 0 : const fn new() -> Self {
1324 0 : Self {
1325 0 : rate_limiters: [const {
1326 2 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
1327 0 : }; 4],
1328 0 : }
1329 0 : }
1330 :
1331 4 : const fn rate_limiter(
1332 4 : &self,
1333 4 : pg_version: u32,
1334 4 : ) -> Option<&Lazy<Mutex<RateLimit>>> {
1335 : const MIN_PG_VERSION: u32 = 14;
1336 : const MAX_PG_VERSION: u32 = 17;
1337 :
1338 4 : if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
1339 0 : return None;
1340 4 : }
1341 4 :
1342 4 : Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
1343 4 : }
1344 : }
1345 :
1346 : static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
1347 4 : if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
1348 4 : if let Ok(mut locked) = rate_limiter.try_lock() {
1349 4 : locked.call(|| {
1350 2 : info!(
1351 0 : lsn=%modification.get_lsn(),
1352 0 : pg_version=%modification.tline.pg_version,
1353 0 : rel=%rel,
1354 0 : "Filled {} gap blocks on rel extend to {} from {}",
1355 : gap_blocks_filled,
1356 : new_nblocks,
1357 : old_nblocks);
1358 4 : });
1359 4 : }
1360 0 : }
1361 272394 : }
1362 16 : }
1363 272414 : Ok(())
1364 272414 : }
1365 :
1366 0 : async fn put_slru_page_image(
1367 0 : &mut self,
1368 0 : modification: &mut DatadirModification<'_>,
1369 0 : kind: SlruKind,
1370 0 : segno: u32,
1371 0 : blknum: BlockNumber,
1372 0 : img: Bytes,
1373 0 : ctx: &RequestContext,
1374 0 : ) -> Result<()> {
1375 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1376 0 : .await?;
1377 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1378 0 : Ok(())
1379 0 : }
1380 :
1381 0 : async fn handle_slru_extend(
1382 0 : &mut self,
1383 0 : modification: &mut DatadirModification<'_>,
1384 0 : kind: SlruKind,
1385 0 : segno: u32,
1386 0 : blknum: BlockNumber,
1387 0 : ctx: &RequestContext,
1388 0 : ) -> anyhow::Result<()> {
1389 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1390 0 : // extended with ZEROPAGE records, not with commit records, so it happens
1391 0 : // a lot less frequently.
1392 0 :
1393 0 : let new_nblocks = blknum + 1;
1394 : // Check if the relation exists. We implicitly create relations on first
1395 : // record.
1396 : // TODO: would be nice if to be more explicit about it
1397 0 : let old_nblocks = if !modification
1398 0 : .tline
1399 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1400 0 : .await?
1401 : {
1402 : // create it with 0 size initially, the logic below will extend it
1403 0 : modification
1404 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1405 0 : .await?;
1406 0 : 0
1407 : } else {
1408 0 : modification
1409 0 : .tline
1410 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1411 0 : .await?
1412 : };
1413 :
1414 0 : if new_nblocks > old_nblocks {
1415 0 : trace!(
1416 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1417 : kind,
1418 : segno,
1419 : old_nblocks,
1420 : new_nblocks
1421 : );
1422 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1423 :
1424 : // fill the gap with zeros
1425 0 : for gap_blknum in old_nblocks..blknum {
1426 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
1427 : }
1428 0 : }
1429 0 : Ok(())
1430 0 : }
1431 : }
1432 :
1433 12 : async fn get_relsize(
1434 12 : modification: &DatadirModification<'_>,
1435 12 : rel: RelTag,
1436 12 : ctx: &RequestContext,
1437 12 : ) -> Result<BlockNumber, PageReconstructError> {
1438 12 : let nblocks = if !modification
1439 12 : .tline
1440 12 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1441 0 : .await?
1442 : {
1443 0 : 0
1444 : } else {
1445 12 : modification
1446 12 : .tline
1447 12 : .get_rel_size(rel, Version::Modified(modification), ctx)
1448 0 : .await?
1449 : };
1450 12 : Ok(nblocks)
1451 12 : }
1452 :
1453 : #[allow(clippy::bool_assert_comparison)]
1454 : #[cfg(test)]
1455 : mod tests {
1456 : use super::*;
1457 : use crate::tenant::harness::*;
1458 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
1459 : use postgres_ffi::RELSEG_SIZE;
1460 :
1461 : use crate::DEFAULT_PG_VERSION;
1462 :
1463 : /// Arbitrary relation tag, for testing.
1464 : const TESTREL_A: RelTag = RelTag {
1465 : spcnode: 0,
1466 : dbnode: 111,
1467 : relnode: 1000,
1468 : forknum: 0,
1469 : };
1470 :
1471 12 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1472 12 : // TODO
1473 12 : }
1474 :
1475 : #[tokio::test]
1476 2 : async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
1477 8 : for i in 14..=16 {
1478 6 : dispatch_pgversion!(i, {
1479 2 : pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
1480 2 : });
1481 2 : }
1482 2 :
1483 2 : Ok(())
1484 2 : }
1485 :
1486 8 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1487 8 : let mut m = tline.begin_modification(Lsn(0x10));
1488 8 : m.put_checkpoint(dispatch_pgversion!(
1489 8 : tline.pg_version,
1490 0 : pgv::ZERO_CHECKPOINT.clone()
1491 0 : ))?;
1492 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1493 8 : m.commit(ctx).await?;
1494 8 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1495 :
1496 8 : Ok(walingest)
1497 8 : }
1498 :
1499 : #[tokio::test]
1500 2 : async fn test_relsize() -> Result<()> {
1501 20 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
1502 2 : let tline = tenant
1503 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1504 6 : .await?;
1505 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1506 2 :
1507 2 : let mut m = tline.begin_modification(Lsn(0x20));
1508 2 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1509 2 : walingest
1510 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1511 2 : .await?;
1512 2 : m.commit(&ctx).await?;
1513 2 : let mut m = tline.begin_modification(Lsn(0x30));
1514 2 : walingest
1515 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
1516 2 : .await?;
1517 2 : m.commit(&ctx).await?;
1518 2 : let mut m = tline.begin_modification(Lsn(0x40));
1519 2 : walingest
1520 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
1521 2 : .await?;
1522 2 : m.commit(&ctx).await?;
1523 2 : let mut m = tline.begin_modification(Lsn(0x50));
1524 2 : walingest
1525 2 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
1526 2 : .await?;
1527 2 : m.commit(&ctx).await?;
1528 2 :
1529 2 : assert_current_logical_size(&tline, Lsn(0x50));
1530 2 :
1531 2 : let test_span = tracing::info_span!(parent: None, "test",
1532 2 : tenant_id=%tline.tenant_shard_id.tenant_id,
1533 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
1534 0 : timeline_id=%tline.timeline_id);
1535 2 :
1536 2 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1537 2 : assert_eq!(
1538 2 : tline
1539 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1540 2 : .await?,
1541 2 : false
1542 2 : );
1543 2 : assert!(tline
1544 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1545 2 : .await
1546 2 : .is_err());
1547 2 : assert_eq!(
1548 2 : tline
1549 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1550 2 : .await?,
1551 2 : true
1552 2 : );
1553 2 : assert_eq!(
1554 2 : tline
1555 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1556 2 : .await?,
1557 2 : 1
1558 2 : );
1559 2 : assert_eq!(
1560 2 : tline
1561 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1562 2 : .await?,
1563 2 : 3
1564 2 : );
1565 2 :
1566 2 : // Check page contents at each LSN
1567 2 : assert_eq!(
1568 2 : tline
1569 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
1570 2 : .instrument(test_span.clone())
1571 2 : .await?,
1572 2 : test_img("foo blk 0 at 2")
1573 2 : );
1574 2 :
1575 2 : assert_eq!(
1576 2 : tline
1577 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
1578 2 : .instrument(test_span.clone())
1579 2 : .await?,
1580 2 : test_img("foo blk 0 at 3")
1581 2 : );
1582 2 :
1583 2 : assert_eq!(
1584 2 : tline
1585 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
1586 2 : .instrument(test_span.clone())
1587 2 : .await?,
1588 2 : test_img("foo blk 0 at 3")
1589 2 : );
1590 2 : assert_eq!(
1591 2 : tline
1592 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
1593 2 : .instrument(test_span.clone())
1594 2 : .await?,
1595 2 : test_img("foo blk 1 at 4")
1596 2 : );
1597 2 :
1598 2 : assert_eq!(
1599 2 : tline
1600 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
1601 2 : .instrument(test_span.clone())
1602 2 : .await?,
1603 2 : test_img("foo blk 0 at 3")
1604 2 : );
1605 2 : assert_eq!(
1606 2 : tline
1607 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
1608 2 : .instrument(test_span.clone())
1609 2 : .await?,
1610 2 : test_img("foo blk 1 at 4")
1611 2 : );
1612 2 : assert_eq!(
1613 2 : tline
1614 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
1615 2 : .instrument(test_span.clone())
1616 2 : .await?,
1617 2 : test_img("foo blk 2 at 5")
1618 2 : );
1619 2 :
1620 2 : // Truncate last block
1621 2 : let mut m = tline.begin_modification(Lsn(0x60));
1622 2 : walingest
1623 2 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1624 2 : .await?;
1625 2 : m.commit(&ctx).await?;
1626 2 : assert_current_logical_size(&tline, Lsn(0x60));
1627 2 :
1628 2 : // Check reported size and contents after truncation
1629 2 : assert_eq!(
1630 2 : tline
1631 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
1632 2 : .await?,
1633 2 : 2
1634 2 : );
1635 2 : assert_eq!(
1636 2 : tline
1637 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
1638 2 : .instrument(test_span.clone())
1639 2 : .await?,
1640 2 : test_img("foo blk 0 at 3")
1641 2 : );
1642 2 : assert_eq!(
1643 2 : tline
1644 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
1645 2 : .instrument(test_span.clone())
1646 2 : .await?,
1647 2 : test_img("foo blk 1 at 4")
1648 2 : );
1649 2 :
1650 2 : // should still see the truncated block with older LSN
1651 2 : assert_eq!(
1652 2 : tline
1653 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1654 2 : .await?,
1655 2 : 3
1656 2 : );
1657 2 : assert_eq!(
1658 2 : tline
1659 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
1660 2 : .instrument(test_span.clone())
1661 2 : .await?,
1662 2 : test_img("foo blk 2 at 5")
1663 2 : );
1664 2 :
1665 2 : // Truncate to zero length
1666 2 : let mut m = tline.begin_modification(Lsn(0x68));
1667 2 : walingest
1668 2 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1669 2 : .await?;
1670 2 : m.commit(&ctx).await?;
1671 2 : assert_eq!(
1672 2 : tline
1673 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
1674 2 : .await?,
1675 2 : 0
1676 2 : );
1677 2 :
1678 2 : // Extend from 0 to 2 blocks, leaving a gap
1679 2 : let mut m = tline.begin_modification(Lsn(0x70));
1680 2 : walingest
1681 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
1682 2 : .await?;
1683 2 : m.commit(&ctx).await?;
1684 2 : assert_eq!(
1685 2 : tline
1686 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
1687 2 : .await?,
1688 2 : 2
1689 2 : );
1690 2 : assert_eq!(
1691 2 : tline
1692 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
1693 2 : .instrument(test_span.clone())
1694 2 : .await?,
1695 2 : ZERO_PAGE
1696 2 : );
1697 2 : assert_eq!(
1698 2 : tline
1699 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
1700 2 : .instrument(test_span.clone())
1701 2 : .await?,
1702 2 : test_img("foo blk 1")
1703 2 : );
1704 2 :
1705 2 : // Extend a lot more, leaving a big gap that spans across segments
1706 2 : let mut m = tline.begin_modification(Lsn(0x80));
1707 2 : walingest
1708 2 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
1709 2 : .await?;
1710 190 : m.commit(&ctx).await?;
1711 2 : assert_eq!(
1712 2 : tline
1713 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
1714 2 : .await?,
1715 2 : 1501
1716 2 : );
1717 2998 : for blk in 2..1500 {
1718 2996 : assert_eq!(
1719 2996 : tline
1720 2996 : .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
1721 2996 : .instrument(test_span.clone())
1722 1540 : .await?,
1723 2996 : ZERO_PAGE
1724 2 : );
1725 2 : }
1726 2 : assert_eq!(
1727 2 : tline
1728 2 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
1729 2 : .instrument(test_span.clone())
1730 2 : .await?,
1731 2 : test_img("foo blk 1500")
1732 2 : );
1733 2 :
1734 2 : Ok(())
1735 2 : }
1736 :
1737 : // Test what happens if we dropped a relation
1738 : // and then created it again within the same layer.
1739 : #[tokio::test]
1740 2 : async fn test_drop_extend() -> Result<()> {
1741 2 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
1742 2 : .await?
1743 2 : .load()
1744 20 : .await;
1745 2 : let tline = tenant
1746 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1747 6 : .await?;
1748 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1749 2 :
1750 2 : let mut m = tline.begin_modification(Lsn(0x20));
1751 2 : walingest
1752 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1753 2 : .await?;
1754 2 : m.commit(&ctx).await?;
1755 2 :
1756 2 : // Check that rel exists and size is correct
1757 2 : assert_eq!(
1758 2 : tline
1759 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1760 2 : .await?,
1761 2 : true
1762 2 : );
1763 2 : assert_eq!(
1764 2 : tline
1765 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1766 2 : .await?,
1767 2 : 1
1768 2 : );
1769 2 :
1770 2 : // Drop rel
1771 2 : let mut m = tline.begin_modification(Lsn(0x30));
1772 2 : let mut rel_drops = HashMap::new();
1773 2 : rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
1774 2 : m.put_rel_drops(rel_drops, &ctx).await?;
1775 2 : m.commit(&ctx).await?;
1776 2 :
1777 2 : // Check that rel is not visible anymore
1778 2 : assert_eq!(
1779 2 : tline
1780 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
1781 2 : .await?,
1782 2 : false
1783 2 : );
1784 2 :
1785 2 : // FIXME: should fail
1786 2 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
1787 2 :
1788 2 : // Re-create it
1789 2 : let mut m = tline.begin_modification(Lsn(0x40));
1790 2 : walingest
1791 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
1792 2 : .await?;
1793 2 : m.commit(&ctx).await?;
1794 2 :
1795 2 : // Check that rel exists and size is correct
1796 2 : assert_eq!(
1797 2 : tline
1798 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
1799 2 : .await?,
1800 2 : true
1801 2 : );
1802 2 : assert_eq!(
1803 2 : tline
1804 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
1805 2 : .await?,
1806 2 : 1
1807 2 : );
1808 2 :
1809 2 : Ok(())
1810 2 : }
1811 :
1812 : // Test what happens if we truncated a relation
1813 : // so that one of its segments was dropped
1814 : // and then extended it again within the same layer.
1815 : #[tokio::test]
1816 2 : async fn test_truncate_extend() -> Result<()> {
1817 2 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
1818 2 : .await?
1819 2 : .load()
1820 20 : .await;
1821 2 : let tline = tenant
1822 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1823 6 : .await?;
1824 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1825 2 :
1826 2 : // Create a 20 MB relation (the size is arbitrary)
1827 2 : let relsize = 20 * 1024 * 1024 / 8192;
1828 2 : let mut m = tline.begin_modification(Lsn(0x20));
1829 5120 : for blkno in 0..relsize {
1830 5120 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
1831 5120 : walingest
1832 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
1833 2 : .await?;
1834 2 : }
1835 2 : m.commit(&ctx).await?;
1836 2 :
1837 2 : let test_span = tracing::info_span!(parent: None, "test",
1838 2 : tenant_id=%tline.tenant_shard_id.tenant_id,
1839 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
1840 0 : timeline_id=%tline.timeline_id);
1841 2 :
1842 2 : // The relation was created at LSN 20, not visible at LSN 1 yet.
1843 2 : assert_eq!(
1844 2 : tline
1845 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1846 2 : .await?,
1847 2 : false
1848 2 : );
1849 2 : assert!(tline
1850 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1851 2 : .await
1852 2 : .is_err());
1853 2 :
1854 2 : assert_eq!(
1855 2 : tline
1856 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1857 2 : .await?,
1858 2 : true
1859 2 : );
1860 2 : assert_eq!(
1861 2 : tline
1862 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1863 2 : .await?,
1864 2 : relsize
1865 2 : );
1866 2 :
1867 2 : // Check relation content
1868 5120 : for blkno in 0..relsize {
1869 5120 : let lsn = Lsn(0x20);
1870 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
1871 5120 : assert_eq!(
1872 5120 : tline
1873 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
1874 5120 : .instrument(test_span.clone())
1875 1803 : .await?,
1876 5120 : test_img(&data)
1877 2 : );
1878 2 : }
1879 2 :
1880 2 : // Truncate relation so that second segment was dropped
1881 2 : // - only leave one page
1882 2 : let mut m = tline.begin_modification(Lsn(0x60));
1883 2 : walingest
1884 2 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
1885 2 : .await?;
1886 2 : m.commit(&ctx).await?;
1887 2 :
1888 2 : // Check reported size and contents after truncation
1889 2 : assert_eq!(
1890 2 : tline
1891 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
1892 2 : .await?,
1893 2 : 1
1894 2 : );
1895 2 :
1896 4 : for blkno in 0..1 {
1897 2 : let lsn = Lsn(0x20);
1898 2 : let data = format!("foo blk {} at {}", blkno, lsn);
1899 2 : assert_eq!(
1900 2 : tline
1901 2 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
1902 2 : .instrument(test_span.clone())
1903 2 : .await?,
1904 2 : test_img(&data)
1905 2 : );
1906 2 : }
1907 2 :
1908 2 : // should still see all blocks with older LSN
1909 2 : assert_eq!(
1910 2 : tline
1911 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1912 2 : .await?,
1913 2 : relsize
1914 2 : );
1915 5120 : for blkno in 0..relsize {
1916 5120 : let lsn = Lsn(0x20);
1917 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
1918 5120 : assert_eq!(
1919 5120 : tline
1920 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
1921 5120 : .instrument(test_span.clone())
1922 1856 : .await?,
1923 5120 : test_img(&data)
1924 2 : );
1925 2 : }
1926 2 :
1927 2 : // Extend relation again.
1928 2 : // Add enough blocks to create second segment
1929 2 : let lsn = Lsn(0x80);
1930 2 : let mut m = tline.begin_modification(lsn);
1931 5120 : for blkno in 0..relsize {
1932 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
1933 5120 : walingest
1934 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
1935 2 : .await?;
1936 2 : }
1937 3 : m.commit(&ctx).await?;
1938 2 :
1939 2 : assert_eq!(
1940 2 : tline
1941 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
1942 2 : .await?,
1943 2 : true
1944 2 : );
1945 2 : assert_eq!(
1946 2 : tline
1947 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
1948 2 : .await?,
1949 2 : relsize
1950 2 : );
1951 2 : // Check relation content
1952 5120 : for blkno in 0..relsize {
1953 5120 : let lsn = Lsn(0x80);
1954 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
1955 5120 : assert_eq!(
1956 5120 : tline
1957 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
1958 5120 : .instrument(test_span.clone())
1959 1828 : .await?,
1960 5120 : test_img(&data)
1961 2 : );
1962 2 : }
1963 2 :
1964 2 : Ok(())
1965 2 : }
1966 :
1967 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
1968 : /// split into multiple 1 GB segments in Postgres.
1969 : #[tokio::test]
1970 2 : async fn test_large_rel() -> Result<()> {
1971 20 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
1972 2 : let tline = tenant
1973 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1974 6 : .await?;
1975 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1976 2 :
1977 2 : let mut lsn = 0x10;
1978 262146 : for blknum in 0..RELSEG_SIZE + 1 {
1979 262146 : lsn += 0x10;
1980 262146 : let mut m = tline.begin_modification(Lsn(lsn));
1981 262146 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
1982 262146 : walingest
1983 262146 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
1984 5420 : .await?;
1985 262146 : m.commit(&ctx).await?;
1986 2 : }
1987 2 :
1988 2 : assert_current_logical_size(&tline, Lsn(lsn));
1989 2 :
1990 2 : assert_eq!(
1991 2 : tline
1992 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
1993 2 : .await?,
1994 2 : RELSEG_SIZE + 1
1995 2 : );
1996 2 :
1997 2 : // Truncate one block
1998 2 : lsn += 0x10;
1999 2 : let mut m = tline.begin_modification(Lsn(lsn));
2000 2 : walingest
2001 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2002 2 : .await?;
2003 2 : m.commit(&ctx).await?;
2004 2 : assert_eq!(
2005 2 : tline
2006 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2007 2 : .await?,
2008 2 : RELSEG_SIZE
2009 2 : );
2010 2 : assert_current_logical_size(&tline, Lsn(lsn));
2011 2 :
2012 2 : // Truncate another block
2013 2 : lsn += 0x10;
2014 2 : let mut m = tline.begin_modification(Lsn(lsn));
2015 2 : walingest
2016 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2017 2 : .await?;
2018 2 : m.commit(&ctx).await?;
2019 2 : assert_eq!(
2020 2 : tline
2021 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2022 2 : .await?,
2023 2 : RELSEG_SIZE - 1
2024 2 : );
2025 2 : assert_current_logical_size(&tline, Lsn(lsn));
2026 2 :
2027 2 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2028 2 : // This tests the behavior at segment boundaries
2029 2 : let mut size: i32 = 3000;
2030 6004 : while size >= 0 {
2031 6002 : lsn += 0x10;
2032 6002 : let mut m = tline.begin_modification(Lsn(lsn));
2033 6002 : walingest
2034 6002 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2035 2 : .await?;
2036 6002 : m.commit(&ctx).await?;
2037 6002 : assert_eq!(
2038 6002 : tline
2039 6002 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2040 2 : .await?,
2041 6002 : size as BlockNumber
2042 2 : );
2043 2 :
2044 6002 : size -= 1;
2045 2 : }
2046 2 : assert_current_logical_size(&tline, Lsn(lsn));
2047 2 :
2048 2 : Ok(())
2049 2 : }
2050 :
2051 : /// Replay a wal segment file taken directly from safekeepers.
2052 : ///
2053 : /// This test is useful for benchmarking since it allows us to profile only
2054 : /// the walingest code in a single-threaded executor, and iterate more quickly
2055 : /// without waiting for unrelated steps.
2056 : #[tokio::test]
2057 2 : async fn test_ingest_real_wal() {
2058 2 : use crate::tenant::harness::*;
2059 2 : use postgres_ffi::waldecoder::WalStreamDecoder;
2060 2 : use postgres_ffi::WAL_SEGMENT_SIZE;
2061 2 :
2062 2 : // Define test data path and constants.
2063 2 : //
2064 2 : // Steps to reconstruct the data, if needed:
2065 2 : // 1. Run the pgbench python test
2066 2 : // 2. Take the first wal segment file from safekeeper
2067 2 : // 3. Compress it using `zstd --long input_file`
2068 2 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2069 2 : // 5. Grep sk logs for "restart decoder" to get startpoint
2070 2 : // 6. Run just the decoder from this test to get the endpoint.
2071 2 : // It's the last LSN the decoder will output.
2072 2 : let pg_version = 15; // The test data was generated by pg15
2073 2 : let path = "test_data/sk_wal_segment_from_pgbench";
2074 2 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2075 2 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2076 2 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2077 2 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2078 2 :
2079 2 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
2080 2 : let span = harness
2081 2 : .span()
2082 2 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
2083 20 : let (tenant, ctx) = harness.load().await;
2084 2 :
2085 2 : let remote_initdb_path =
2086 2 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2087 2 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2088 2 :
2089 2 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2090 2 : .expect("creating test dir should work");
2091 2 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2092 2 :
2093 2 : // Bootstrap a real timeline. We can't use create_test_timeline because
2094 2 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2095 2 : // the garbage data.
2096 2 : let tline = tenant
2097 2 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2098 20357 : .await
2099 2 : .unwrap();
2100 2 :
2101 2 : // We fully read and decompress this into memory before decoding
2102 2 : // to get a more accurate perf profile of the decoder.
2103 2 : let bytes = {
2104 2 : use async_compression::tokio::bufread::ZstdDecoder;
2105 2 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2106 2 : let reader = tokio::io::BufReader::new(file);
2107 2 : let decoder = ZstdDecoder::new(reader);
2108 2 : let mut reader = tokio::io::BufReader::new(decoder);
2109 2 : let mut buffer = Vec::new();
2110 224 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2111 2 : buffer
2112 2 : };
2113 2 :
2114 2 : // TODO start a profiler too
2115 2 : let started_at = std::time::Instant::now();
2116 2 :
2117 2 : // Initialize walingest
2118 2 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2119 2 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2120 2 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2121 5 : .await
2122 2 : .unwrap();
2123 2 : let mut modification = tline.begin_modification(startpoint);
2124 2 : println!("decoding {} bytes", bytes.len() - xlogoff);
2125 2 :
2126 2 : // Decode and ingest wal. We process the wal in chunks because
2127 2 : // that's what happens when we get bytes from safekeepers.
2128 474686 : for chunk in bytes[xlogoff..].chunks(50) {
2129 474686 : decoder.feed_bytes(chunk);
2130 620536 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2131 145850 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
2132 145850 : recdata,
2133 145850 : modification.tline.get_shard_identity(),
2134 145850 : lsn,
2135 145850 : modification.tline.pg_version,
2136 145850 : )
2137 145850 : .unwrap();
2138 145850 :
2139 145850 : walingest
2140 145850 : .ingest_record(interpreted, &mut modification, &ctx)
2141 145850 : .instrument(span.clone())
2142 296 : .await
2143 145850 : .unwrap();
2144 2 : }
2145 474686 : modification.commit(&ctx).await.unwrap();
2146 2 : }
2147 2 :
2148 2 : let duration = started_at.elapsed();
2149 2 : println!("done in {:?}", duration);
2150 2 : }
2151 : }
|