Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::sync::Arc;
25 : use std::sync::OnceLock;
26 : use std::time::Duration;
27 : use std::time::Instant;
28 : use std::time::SystemTime;
29 :
30 : use pageserver_api::shard::ShardIdentity;
31 : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
32 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
33 :
34 : use anyhow::{bail, Context, Result};
35 : use bytes::{Buf, Bytes, BytesMut};
36 : use tracing::*;
37 : use utils::failpoint_support;
38 : use utils::rate_limit::RateLimit;
39 :
40 : use crate::context::RequestContext;
41 : use crate::metrics::WAL_INGEST;
42 : use crate::pgdatadir_mapping::{DatadirModification, Version};
43 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
44 : use crate::tenant::PageReconstructError;
45 : use crate::tenant::Timeline;
46 : use crate::walrecord::*;
47 : use crate::ZERO_PAGE;
48 : use pageserver_api::key::rel_block_to_key;
49 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
50 : use postgres_ffi::pg_constants;
51 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
52 : use postgres_ffi::TransactionId;
53 : use postgres_ffi::BLCKSZ;
54 : use utils::bin_ser::SerializeError;
55 : use utils::lsn::Lsn;
56 :
57 : enum_pgversion! {CheckPoint, pgv::CheckPoint}
58 :
59 : impl CheckPoint {
60 6 : fn encode(&self) -> Result<Bytes, SerializeError> {
61 6 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
62 6 : }
63 :
64 145834 : fn update_next_xid(&mut self, xid: u32) -> bool {
65 145834 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
66 145834 : }
67 :
68 0 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
69 0 : enum_pgversion_dispatch!(self, CheckPoint, cp, {
70 0 : cp.update_next_multixid(multi_xid, multi_offset)
71 : })
72 0 : }
73 : }
74 :
75 : /// Temporary limitation of WAL lag warnings after attach
76 : ///
77 : /// After tenant attach, we want to limit WAL lag warnings because
78 : /// we don't look at the WAL until the attach is complete, which
79 : /// might take a while.
80 : pub struct WalLagCooldown {
81 : /// Until when should this limitation apply at all
82 : active_until: std::time::Instant,
83 : /// The maximum lag to suppress. Lags above this limit get reported anyways.
84 : max_lag: Duration,
85 : }
86 :
87 : impl WalLagCooldown {
88 0 : pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
89 0 : Self {
90 0 : active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
91 0 : max_lag: attach_duration * 2 + Duration::from_secs(60),
92 0 : }
93 0 : }
94 : }
95 :
96 : pub struct WalIngest {
97 : attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
98 : shard: ShardIdentity,
99 : checkpoint: CheckPoint,
100 : checkpoint_modified: bool,
101 : warn_ingest_lag: WarnIngestLag,
102 : }
103 :
104 : struct WarnIngestLag {
105 : lag_msg_ratelimit: RateLimit,
106 : future_lsn_msg_ratelimit: RateLimit,
107 : timestamp_invalid_msg_ratelimit: RateLimit,
108 : }
109 :
110 : // These structs are an intermediary representation of the PostgreSQL WAL records.
111 : // The ones prefixed with `Xl` are lower level, while the ones that are not have
112 : // all the required context to be acted upon by the pageserver.
113 :
114 : enum HeapamRecord {
115 : ClearVmBits(ClearVmBits),
116 : }
117 :
118 : struct ClearVmBits {
119 : new_heap_blkno: Option<u32>,
120 : old_heap_blkno: Option<u32>,
121 : vm_rel: RelTag,
122 : flags: u8,
123 : }
124 :
125 : enum NeonrmgrRecord {
126 : ClearVmBits(ClearVmBits),
127 : }
128 :
129 : enum SmgrRecord {
130 : Create(SmgrCreate),
131 : Truncate(XlSmgrTruncate),
132 : }
133 :
134 : struct SmgrCreate {
135 : rel: RelTag,
136 : }
137 :
138 : enum DbaseRecord {
139 : Create(DbaseCreate),
140 : Drop(DbaseDrop),
141 : }
142 :
143 : struct DbaseCreate {
144 : db_id: u32,
145 : tablespace_id: u32,
146 : src_db_id: u32,
147 : src_tablespace_id: u32,
148 : }
149 :
150 : struct DbaseDrop {
151 : db_id: u32,
152 : tablespace_ids: Vec<u32>,
153 : }
154 :
155 : enum ClogRecord {
156 : ZeroPage(ClogZeroPage),
157 : Truncate(ClogTruncate),
158 : }
159 :
160 : struct ClogZeroPage {
161 : segno: u32,
162 : rpageno: u32,
163 : }
164 :
165 : struct ClogTruncate {
166 : pageno: u32,
167 : oldest_xid: u32,
168 : oldest_xid_db: u32,
169 : }
170 :
171 : enum XactRecord {
172 : Commit(XactCommon),
173 : Abort(XactCommon),
174 : CommitPrepared(XactCommon),
175 : AbortPrepared(XactCommon),
176 : Prepare(XactPrepare),
177 : }
178 :
179 : struct XactCommon {
180 : parsed: XlXactParsedRecord,
181 : origin_id: u16,
182 : // Fields below are only used for logging
183 : xl_xid: u32,
184 : lsn: Lsn,
185 : }
186 :
187 : struct XactPrepare {
188 : xl_xid: u32,
189 : data: Bytes,
190 : }
191 :
192 : enum MultiXactRecord {
193 : ZeroPage(MultiXactZeroPage),
194 : Create(XlMultiXactCreate),
195 : Truncate(XlMultiXactTruncate),
196 : }
197 :
198 : struct MultiXactZeroPage {
199 : slru_kind: SlruKind,
200 : segno: u32,
201 : rpageno: u32,
202 : }
203 :
204 : enum RelmapRecord {
205 : Update(RelmapUpdate),
206 : }
207 :
208 : struct RelmapUpdate {
209 : update: XlRelmapUpdate,
210 : buf: Bytes,
211 : }
212 :
213 : enum XlogRecord {
214 : Raw(RawXlogRecord),
215 : }
216 :
217 : struct RawXlogRecord {
218 : info: u8,
219 : lsn: Lsn,
220 : buf: Bytes,
221 : }
222 :
223 : enum LogicalMessageRecord {
224 : Put(PutLogicalMessage),
225 : #[cfg(feature = "testing")]
226 : Failpoint,
227 : }
228 :
229 : struct PutLogicalMessage {
230 : path: String,
231 : buf: Bytes,
232 : }
233 :
234 : enum StandbyRecord {
235 : RunningXacts(StandbyRunningXacts),
236 : }
237 :
238 : struct StandbyRunningXacts {
239 : oldest_running_xid: u32,
240 : }
241 :
242 : enum ReploriginRecord {
243 : Set(XlReploriginSet),
244 : Drop(XlReploriginDrop),
245 : }
246 :
247 : impl WalIngest {
248 12 : pub async fn new(
249 12 : timeline: &Timeline,
250 12 : startpoint: Lsn,
251 12 : ctx: &RequestContext,
252 12 : ) -> anyhow::Result<WalIngest> {
253 : // Fetch the latest checkpoint into memory, so that we can compare with it
254 : // quickly in `ingest_record` and update it when it changes.
255 12 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
256 12 : let pgversion = timeline.pg_version;
257 :
258 12 : let checkpoint = dispatch_pgversion!(pgversion, {
259 0 : let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
260 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
261 0 : <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
262 : });
263 :
264 12 : Ok(WalIngest {
265 12 : shard: *timeline.get_shard_identity(),
266 12 : checkpoint,
267 12 : checkpoint_modified: false,
268 12 : attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
269 12 : warn_ingest_lag: WarnIngestLag {
270 12 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
271 12 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
272 12 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
273 12 : },
274 12 : })
275 12 : }
276 :
277 : ///
278 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
279 : ///
280 : /// This function updates `lsn` field of `DatadirModification`
281 : ///
282 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
283 : /// relations/pages that the record affects.
284 : ///
285 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
286 : ///
287 145852 : pub async fn ingest_record(
288 145852 : &mut self,
289 145852 : decoded: DecodedWALRecord,
290 145852 : lsn: Lsn,
291 145852 : modification: &mut DatadirModification<'_>,
292 145852 : ctx: &RequestContext,
293 145852 : ) -> anyhow::Result<bool> {
294 145852 : WAL_INGEST.records_received.inc();
295 145852 : let pg_version = modification.tline.pg_version;
296 145852 : let prev_len = modification.len();
297 145852 :
298 145852 : modification.set_lsn(lsn)?;
299 :
300 145852 : if decoded.is_dbase_create_copy(pg_version) {
301 : // Records of this type should always be preceded by a commit(), as they
302 : // rely on reading data pages back from the Timeline.
303 0 : assert!(!modification.has_dirty_data_pages());
304 145852 : }
305 :
306 145852 : let mut buf = decoded.record.clone();
307 145852 : buf.advance(decoded.main_data_offset);
308 145852 :
309 145852 : assert!(!self.checkpoint_modified);
310 145852 : if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
311 145834 : && self.checkpoint.update_next_xid(decoded.xl_xid)
312 2 : {
313 2 : self.checkpoint_modified = true;
314 145850 : }
315 :
316 145852 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
317 :
318 145852 : match decoded.xl_rmid {
319 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
320 : // Heap AM records need some special handling, because they modify VM pages
321 : // without registering them with the standard mechanism.
322 145474 : let maybe_heapam_record =
323 145474 : Self::decode_heapam_record(&mut buf, &decoded, pg_version)?;
324 145474 : if let Some(heapam_record) = maybe_heapam_record {
325 12 : match heapam_record {
326 12 : HeapamRecord::ClearVmBits(clear_vm_bits) => {
327 12 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
328 0 : .await?;
329 : }
330 : }
331 145462 : }
332 : }
333 : pg_constants::RM_NEON_ID => {
334 0 : let maybe_nenonrmgr_record =
335 0 : Self::decode_neonmgr_record(&mut buf, &decoded, pg_version)?;
336 0 : if let Some(neonrmgr_record) = maybe_nenonrmgr_record {
337 0 : match neonrmgr_record {
338 0 : NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
339 0 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
340 0 : .await?;
341 : }
342 : }
343 0 : }
344 : }
345 : // Handle other special record types
346 : pg_constants::RM_SMGR_ID => {
347 16 : let maybe_smgr_record =
348 16 : Self::decode_smgr_record(&mut buf, &decoded, pg_version).unwrap();
349 16 : if let Some(smgr_record) = maybe_smgr_record {
350 16 : match smgr_record {
351 16 : SmgrRecord::Create(create) => {
352 16 : self.ingest_xlog_smgr_create(create, modification, ctx)
353 12 : .await?;
354 : }
355 0 : SmgrRecord::Truncate(truncate) => {
356 0 : self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
357 0 : .await?;
358 : }
359 : }
360 0 : }
361 : }
362 : pg_constants::RM_DBASE_ID => {
363 0 : let maybe_dbase_record =
364 0 : Self::decode_dbase_record(&mut buf, &decoded, pg_version).unwrap();
365 :
366 0 : if let Some(dbase_record) = maybe_dbase_record {
367 0 : match dbase_record {
368 0 : DbaseRecord::Create(create) => {
369 0 : self.ingest_xlog_dbase_create(create, modification, ctx)
370 0 : .await?;
371 : }
372 0 : DbaseRecord::Drop(drop) => {
373 0 : self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
374 : }
375 : }
376 0 : }
377 : }
378 : pg_constants::RM_TBLSPC_ID => {
379 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
380 : }
381 : pg_constants::RM_CLOG_ID => {
382 : // [`Self::decode_clog_record`] may never fail and always returns.
383 : // It has this interface to match all the other decoding methods.
384 0 : let clog_record = Self::decode_clog_record(&mut buf, &decoded, pg_version)
385 0 : .unwrap()
386 0 : .unwrap();
387 0 :
388 0 : match clog_record {
389 0 : ClogRecord::ZeroPage(zero_page) => {
390 0 : self.ingest_clog_zero_page(zero_page, modification, ctx)
391 0 : .await?;
392 : }
393 0 : ClogRecord::Truncate(truncate) => {
394 0 : self.ingest_clog_truncate(truncate, modification, ctx)
395 0 : .await?;
396 : }
397 : }
398 : }
399 : pg_constants::RM_XACT_ID => {
400 24 : let maybe_xact_record =
401 24 : Self::decode_xact_record(&mut buf, &decoded, lsn, pg_version).unwrap();
402 24 : if let Some(xact_record) = maybe_xact_record {
403 8 : self.ingest_xact_record(xact_record, modification, ctx)
404 0 : .await?;
405 16 : }
406 : }
407 : pg_constants::RM_MULTIXACT_ID => {
408 0 : let maybe_multixact_record =
409 0 : Self::decode_multixact_record(&mut buf, &decoded, pg_version).unwrap();
410 0 : if let Some(multixact_record) = maybe_multixact_record {
411 0 : match multixact_record {
412 0 : MultiXactRecord::ZeroPage(zero_page) => {
413 0 : self.ingest_multixact_zero_page(zero_page, modification, ctx)
414 0 : .await?;
415 : }
416 0 : MultiXactRecord::Create(create) => {
417 0 : self.ingest_multixact_create(modification, &create)?;
418 : }
419 0 : MultiXactRecord::Truncate(truncate) => {
420 0 : self.ingest_multixact_truncate(modification, &truncate, ctx)
421 0 : .await?;
422 : }
423 : }
424 0 : }
425 : }
426 : pg_constants::RM_RELMAP_ID => {
427 0 : let relmap_record = Self::decode_relmap_record(&mut buf, &decoded, pg_version)
428 0 : .unwrap()
429 0 : .unwrap();
430 0 : match relmap_record {
431 0 : RelmapRecord::Update(update) => {
432 0 : self.ingest_relmap_update(update, modification, ctx).await?;
433 : }
434 : }
435 : }
436 : // This is an odd duck. It needs to go to all shards.
437 : // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
438 : // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
439 : // the pageserver and decode it there.
440 : //
441 : // Alternatively, one can make the checkpoint part of the subscription protocol
442 : // to the pageserver. This should work fine, but can be done at a later point.
443 : pg_constants::RM_XLOG_ID => {
444 30 : let xlog_record = Self::decode_xlog_record(&mut buf, &decoded, lsn, pg_version)
445 30 : .unwrap()
446 30 : .unwrap();
447 30 :
448 30 : match xlog_record {
449 30 : XlogRecord::Raw(raw) => {
450 30 : self.ingest_raw_xlog_record(raw, modification, ctx).await?;
451 : }
452 : }
453 : }
454 : pg_constants::RM_LOGICALMSG_ID => {
455 0 : let maybe_logical_message_record =
456 0 : Self::decode_logical_message_record(&mut buf, &decoded, pg_version).unwrap();
457 0 : if let Some(logical_message_record) = maybe_logical_message_record {
458 0 : match logical_message_record {
459 0 : LogicalMessageRecord::Put(put) => {
460 0 : self.ingest_logical_message_put(put, modification, ctx)
461 0 : .await?;
462 : }
463 : #[cfg(feature = "testing")]
464 : LogicalMessageRecord::Failpoint => {
465 : // This is a convenient way to make the WAL ingestion pause at
466 : // particular point in the WAL. For more fine-grained control,
467 : // we could peek into the message and only pause if it contains
468 : // a particular string, for example, but this is enough for now.
469 0 : failpoint_support::sleep_millis_async!(
470 0 : "pageserver-wal-ingest-logical-message-sleep"
471 0 : );
472 : }
473 : }
474 0 : }
475 : }
476 : pg_constants::RM_STANDBY_ID => {
477 16 : let maybe_standby_record =
478 16 : Self::decode_standby_record(&mut buf, &decoded, pg_version).unwrap();
479 16 : if let Some(standby_record) = maybe_standby_record {
480 0 : self.ingest_standby_record(standby_record).unwrap();
481 16 : }
482 : }
483 : pg_constants::RM_REPLORIGIN_ID => {
484 0 : let maybe_replorigin_record =
485 0 : Self::decode_replorigin_record(&mut buf, &decoded, pg_version).unwrap();
486 0 : if let Some(replorigin_record) = maybe_replorigin_record {
487 0 : self.ingest_replorigin_record(replorigin_record, modification)
488 0 : .await?;
489 0 : }
490 : }
491 292 : _x => {
492 292 : // TODO: should probably log & fail here instead of blindly
493 292 : // doing something without understanding the protocol
494 292 : }
495 : }
496 :
497 : // Iterate through all the blocks that the record modifies, and
498 : // "put" a separate copy of the record for each block.
499 145852 : for blk in decoded.blocks.iter() {
500 145642 : let rel = RelTag {
501 145642 : spcnode: blk.rnode_spcnode,
502 145642 : dbnode: blk.rnode_dbnode,
503 145642 : relnode: blk.rnode_relnode,
504 145642 : forknum: blk.forknum,
505 145642 : };
506 145642 :
507 145642 : let key = rel_block_to_key(rel, blk.blkno);
508 145642 : let key_is_local = self.shard.is_key_local(&key);
509 145642 :
510 145642 : tracing::debug!(
511 : lsn=%lsn,
512 : key=%key,
513 0 : "ingest: shard decision {} (checkpoint={})",
514 0 : if !key_is_local { "drop" } else { "keep" },
515 : self.checkpoint_modified
516 : );
517 :
518 145642 : if !key_is_local {
519 0 : if self.shard.is_shard_zero() {
520 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
521 : // its blkno in case it implicitly extends a relation.
522 0 : self.observe_decoded_block(modification, blk, ctx).await?;
523 0 : }
524 :
525 0 : continue;
526 145642 : }
527 145642 : self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
528 284 : .await?;
529 : }
530 :
531 : // If checkpoint data was updated, store the new version in the repository
532 145852 : if self.checkpoint_modified {
533 6 : let new_checkpoint_bytes = self.checkpoint.encode()?;
534 :
535 6 : modification.put_checkpoint(new_checkpoint_bytes)?;
536 6 : self.checkpoint_modified = false;
537 145846 : }
538 :
539 : // Note that at this point this record is only cached in the modification
540 : // until commit() is called to flush the data into the repository and update
541 : // the latest LSN.
542 :
543 145852 : modification.on_record_end();
544 145852 :
545 145852 : Ok(modification.len() > prev_len)
546 145852 : }
547 :
548 : /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
549 0 : fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
550 0 : let next_full_xid =
551 0 : enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
552 :
553 0 : let next_xid = (next_full_xid) as u32;
554 0 : let mut epoch = (next_full_xid >> 32) as u32;
555 0 :
556 0 : if xid > next_xid {
557 : // Wraparound occurred, must be from a prev epoch.
558 0 : if epoch == 0 {
559 0 : bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
560 0 : }
561 0 : epoch -= 1;
562 0 : }
563 :
564 0 : Ok((epoch as u64) << 32 | xid as u64)
565 0 : }
566 :
567 : /// Do not store this block, but observe it for the purposes of updating our relation size state.
568 0 : async fn observe_decoded_block(
569 0 : &mut self,
570 0 : modification: &mut DatadirModification<'_>,
571 0 : blk: &DecodedBkpBlock,
572 0 : ctx: &RequestContext,
573 0 : ) -> Result<(), PageReconstructError> {
574 0 : let rel = RelTag {
575 0 : spcnode: blk.rnode_spcnode,
576 0 : dbnode: blk.rnode_dbnode,
577 0 : relnode: blk.rnode_relnode,
578 0 : forknum: blk.forknum,
579 0 : };
580 0 : self.handle_rel_extend(modification, rel, blk.blkno, ctx)
581 0 : .await
582 0 : }
583 :
584 145642 : async fn ingest_decoded_block(
585 145642 : &mut self,
586 145642 : modification: &mut DatadirModification<'_>,
587 145642 : lsn: Lsn,
588 145642 : decoded: &DecodedWALRecord,
589 145642 : blk: &DecodedBkpBlock,
590 145642 : ctx: &RequestContext,
591 145642 : ) -> Result<(), PageReconstructError> {
592 145642 : let rel = RelTag {
593 145642 : spcnode: blk.rnode_spcnode,
594 145642 : dbnode: blk.rnode_dbnode,
595 145642 : relnode: blk.rnode_relnode,
596 145642 : forknum: blk.forknum,
597 145642 : };
598 145642 :
599 145642 : //
600 145642 : // Instead of storing full-page-image WAL record,
601 145642 : // it is better to store extracted image: we can skip wal-redo
602 145642 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
603 145642 : // so them have to be copied multiple times.
604 145642 : //
605 145642 : if blk.apply_image
606 60 : && blk.has_image
607 60 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
608 24 : && (decoded.xl_info == pg_constants::XLOG_FPI
609 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
610 : // compression of WAL is not yet supported: fall back to storing the original WAL record
611 24 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
612 : // do not materialize null pages because them most likely be soon replaced with real data
613 24 : && blk.bimg_len != 0
614 : {
615 : // Extract page image from FPI record
616 24 : let img_len = blk.bimg_len as usize;
617 24 : let img_offs = blk.bimg_offset as usize;
618 24 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
619 24 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
620 24 :
621 24 : if blk.hole_length != 0 {
622 0 : let tail = image.split_off(blk.hole_offset as usize);
623 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
624 0 : image.unsplit(tail);
625 24 : }
626 : //
627 : // Match the logic of XLogReadBufferForRedoExtended:
628 : // The page may be uninitialized. If so, we can't set the LSN because
629 : // that would corrupt the page.
630 : //
631 24 : if !page_is_new(&image) {
632 18 : page_set_lsn(&mut image, lsn)
633 6 : }
634 24 : assert_eq!(image.len(), BLCKSZ as usize);
635 :
636 24 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
637 5 : .await?;
638 : } else {
639 145618 : let rec = NeonWalRecord::Postgres {
640 145618 : will_init: blk.will_init || blk.apply_image,
641 145618 : rec: decoded.record.clone(),
642 145618 : };
643 145618 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
644 279 : .await?;
645 : }
646 145642 : Ok(())
647 145642 : }
648 :
649 12 : async fn ingest_clear_vm_bits(
650 12 : &mut self,
651 12 : clear_vm_bits: ClearVmBits,
652 12 : modification: &mut DatadirModification<'_>,
653 12 : ctx: &RequestContext,
654 12 : ) -> anyhow::Result<()> {
655 12 : let ClearVmBits {
656 12 : new_heap_blkno,
657 12 : old_heap_blkno,
658 12 : flags,
659 12 : vm_rel,
660 12 : } = clear_vm_bits;
661 12 : // Clear the VM bits if required.
662 12 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
663 12 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
664 :
665 : // Sometimes, Postgres seems to create heap WAL records with the
666 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
667 : // not set. In fact, it's possible that the VM page does not exist at all.
668 : // In that case, we don't want to store a record to clear the VM bit;
669 : // replaying it would fail to find the previous image of the page, because
670 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
671 : // record if it doesn't.
672 12 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
673 12 : if let Some(blknum) = new_vm_blk {
674 12 : if blknum >= vm_size {
675 0 : new_vm_blk = None;
676 12 : }
677 0 : }
678 12 : if let Some(blknum) = old_vm_blk {
679 0 : if blknum >= vm_size {
680 0 : old_vm_blk = None;
681 0 : }
682 12 : }
683 :
684 12 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
685 12 : if new_vm_blk == old_vm_blk {
686 : // An UPDATE record that needs to clear the bits for both old and the
687 : // new page, both of which reside on the same VM page.
688 0 : self.put_rel_wal_record(
689 0 : modification,
690 0 : vm_rel,
691 0 : new_vm_blk.unwrap(),
692 0 : NeonWalRecord::ClearVisibilityMapFlags {
693 0 : new_heap_blkno,
694 0 : old_heap_blkno,
695 0 : flags,
696 0 : },
697 0 : ctx,
698 0 : )
699 0 : .await?;
700 : } else {
701 : // Clear VM bits for one heap page, or for two pages that reside on
702 : // different VM pages.
703 12 : if let Some(new_vm_blk) = new_vm_blk {
704 12 : self.put_rel_wal_record(
705 12 : modification,
706 12 : vm_rel,
707 12 : new_vm_blk,
708 12 : NeonWalRecord::ClearVisibilityMapFlags {
709 12 : new_heap_blkno,
710 12 : old_heap_blkno: None,
711 12 : flags,
712 12 : },
713 12 : ctx,
714 12 : )
715 0 : .await?;
716 0 : }
717 12 : if let Some(old_vm_blk) = old_vm_blk {
718 0 : self.put_rel_wal_record(
719 0 : modification,
720 0 : vm_rel,
721 0 : old_vm_blk,
722 0 : NeonWalRecord::ClearVisibilityMapFlags {
723 0 : new_heap_blkno: None,
724 0 : old_heap_blkno,
725 0 : flags,
726 0 : },
727 0 : ctx,
728 0 : )
729 0 : .await?;
730 12 : }
731 : }
732 0 : }
733 :
734 12 : Ok(())
735 12 : }
736 :
737 145474 : fn decode_heapam_record(
738 145474 : buf: &mut Bytes,
739 145474 : decoded: &DecodedWALRecord,
740 145474 : pg_version: u32,
741 145474 : ) -> anyhow::Result<Option<HeapamRecord>> {
742 145474 : // Handle VM bit updates that are implicitly part of heap records.
743 145474 :
744 145474 : // First, look at the record to determine which VM bits need
745 145474 : // to be cleared. If either of these variables is set, we
746 145474 : // need to clear the corresponding bits in the visibility map.
747 145474 : let mut new_heap_blkno: Option<u32> = None;
748 145474 : let mut old_heap_blkno: Option<u32> = None;
749 145474 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
750 145474 :
751 145474 : match pg_version {
752 : 14 => {
753 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
754 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
755 0 :
756 0 : if info == pg_constants::XLOG_HEAP_INSERT {
757 0 : let xlrec = v14::XlHeapInsert::decode(buf);
758 0 : assert_eq!(0, buf.remaining());
759 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
760 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
761 0 : }
762 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
763 0 : let xlrec = v14::XlHeapDelete::decode(buf);
764 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
765 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
766 0 : }
767 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
768 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
769 : {
770 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
771 0 : // the size of tuple data is inferred from the size of the record.
772 0 : // we can't validate the remaining number of bytes without parsing
773 0 : // the tuple data.
774 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
775 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
776 0 : }
777 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
778 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
779 0 : // non-HOT update where the new tuple goes to different page than
780 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
781 0 : // set.
782 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
783 0 : }
784 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
785 0 : let xlrec = v14::XlHeapLock::decode(buf);
786 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
787 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
788 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
789 0 : }
790 0 : }
791 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
792 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
793 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
794 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
795 :
796 0 : let offset_array_len =
797 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
798 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
799 0 : 0
800 : } else {
801 0 : size_of::<u16>() * xlrec.ntuples as usize
802 : };
803 0 : assert_eq!(offset_array_len, buf.remaining());
804 :
805 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
806 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
807 0 : }
808 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
809 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
810 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
811 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
812 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
813 0 : }
814 0 : }
815 : } else {
816 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
817 : }
818 : }
819 : 15 => {
820 145474 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
821 145286 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
822 145286 :
823 145286 : if info == pg_constants::XLOG_HEAP_INSERT {
824 145276 : let xlrec = v15::XlHeapInsert::decode(buf);
825 145276 : assert_eq!(0, buf.remaining());
826 145276 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
827 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
828 145272 : }
829 10 : } else if info == pg_constants::XLOG_HEAP_DELETE {
830 0 : let xlrec = v15::XlHeapDelete::decode(buf);
831 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
832 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
833 0 : }
834 10 : } else if info == pg_constants::XLOG_HEAP_UPDATE
835 2 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
836 : {
837 8 : let xlrec = v15::XlHeapUpdate::decode(buf);
838 8 : // the size of tuple data is inferred from the size of the record.
839 8 : // we can't validate the remaining number of bytes without parsing
840 8 : // the tuple data.
841 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
842 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
843 8 : }
844 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
845 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
846 0 : // non-HOT update where the new tuple goes to different page than
847 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
848 0 : // set.
849 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
850 8 : }
851 2 : } else if info == pg_constants::XLOG_HEAP_LOCK {
852 0 : let xlrec = v15::XlHeapLock::decode(buf);
853 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
854 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
855 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
856 0 : }
857 2 : }
858 188 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
859 188 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
860 188 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
861 42 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
862 :
863 42 : let offset_array_len =
864 42 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
865 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
866 2 : 0
867 : } else {
868 40 : size_of::<u16>() * xlrec.ntuples as usize
869 : };
870 42 : assert_eq!(offset_array_len, buf.remaining());
871 :
872 42 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
873 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
874 34 : }
875 146 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
876 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
877 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
878 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
879 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
880 0 : }
881 146 : }
882 : } else {
883 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
884 : }
885 : }
886 : 16 => {
887 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
888 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
889 0 :
890 0 : if info == pg_constants::XLOG_HEAP_INSERT {
891 0 : let xlrec = v16::XlHeapInsert::decode(buf);
892 0 : assert_eq!(0, buf.remaining());
893 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
894 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
895 0 : }
896 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
897 0 : let xlrec = v16::XlHeapDelete::decode(buf);
898 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
899 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
900 0 : }
901 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
902 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
903 : {
904 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
905 0 : // the size of tuple data is inferred from the size of the record.
906 0 : // we can't validate the remaining number of bytes without parsing
907 0 : // the tuple data.
908 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
909 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
910 0 : }
911 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
912 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
913 0 : // non-HOT update where the new tuple goes to different page than
914 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
915 0 : // set.
916 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
917 0 : }
918 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
919 0 : let xlrec = v16::XlHeapLock::decode(buf);
920 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
921 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
922 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
923 0 : }
924 0 : }
925 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
926 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
927 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
928 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
929 :
930 0 : let offset_array_len =
931 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
932 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
933 0 : 0
934 : } else {
935 0 : size_of::<u16>() * xlrec.ntuples as usize
936 : };
937 0 : assert_eq!(offset_array_len, buf.remaining());
938 :
939 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
940 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
941 0 : }
942 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
943 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
944 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
945 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
946 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
947 0 : }
948 0 : }
949 : } else {
950 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
951 : }
952 : }
953 : 17 => {
954 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
955 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
956 0 :
957 0 : if info == pg_constants::XLOG_HEAP_INSERT {
958 0 : let xlrec = v17::XlHeapInsert::decode(buf);
959 0 : assert_eq!(0, buf.remaining());
960 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
961 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
962 0 : }
963 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
964 0 : let xlrec = v17::XlHeapDelete::decode(buf);
965 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
966 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
967 0 : }
968 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
969 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
970 : {
971 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
972 0 : // the size of tuple data is inferred from the size of the record.
973 0 : // we can't validate the remaining number of bytes without parsing
974 0 : // the tuple data.
975 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
976 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
977 0 : }
978 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
979 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
980 0 : // non-HOT update where the new tuple goes to different page than
981 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
982 0 : // set.
983 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
984 0 : }
985 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
986 0 : let xlrec = v17::XlHeapLock::decode(buf);
987 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
988 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
989 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
990 0 : }
991 0 : }
992 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
993 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
994 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
995 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
996 :
997 0 : let offset_array_len =
998 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
999 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
1000 0 : 0
1001 : } else {
1002 0 : size_of::<u16>() * xlrec.ntuples as usize
1003 : };
1004 0 : assert_eq!(offset_array_len, buf.remaining());
1005 :
1006 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
1007 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1008 0 : }
1009 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
1010 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
1011 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
1012 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
1013 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
1014 0 : }
1015 0 : }
1016 : } else {
1017 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
1018 : }
1019 : }
1020 0 : _ => {}
1021 : }
1022 :
1023 145474 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
1024 12 : let vm_rel = RelTag {
1025 12 : forknum: VISIBILITYMAP_FORKNUM,
1026 12 : spcnode: decoded.blocks[0].rnode_spcnode,
1027 12 : dbnode: decoded.blocks[0].rnode_dbnode,
1028 12 : relnode: decoded.blocks[0].rnode_relnode,
1029 12 : };
1030 12 :
1031 12 : Ok(Some(HeapamRecord::ClearVmBits(ClearVmBits {
1032 12 : new_heap_blkno,
1033 12 : old_heap_blkno,
1034 12 : vm_rel,
1035 12 : flags,
1036 12 : })))
1037 : } else {
1038 145462 : Ok(None)
1039 : }
1040 145474 : }
1041 :
1042 0 : fn decode_neonmgr_record(
1043 0 : buf: &mut Bytes,
1044 0 : decoded: &DecodedWALRecord,
1045 0 : pg_version: u32,
1046 0 : ) -> anyhow::Result<Option<NeonrmgrRecord>> {
1047 0 : // Handle VM bit updates that are implicitly part of heap records.
1048 0 :
1049 0 : // First, look at the record to determine which VM bits need
1050 0 : // to be cleared. If either of these variables is set, we
1051 0 : // need to clear the corresponding bits in the visibility map.
1052 0 : let mut new_heap_blkno: Option<u32> = None;
1053 0 : let mut old_heap_blkno: Option<u32> = None;
1054 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
1055 0 :
1056 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
1057 :
1058 0 : match pg_version {
1059 : 16 | 17 => {
1060 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1061 0 :
1062 0 : match info {
1063 : pg_constants::XLOG_NEON_HEAP_INSERT => {
1064 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
1065 0 : assert_eq!(0, buf.remaining());
1066 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
1067 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1068 0 : }
1069 : }
1070 : pg_constants::XLOG_NEON_HEAP_DELETE => {
1071 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
1072 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
1073 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1074 0 : }
1075 : }
1076 : pg_constants::XLOG_NEON_HEAP_UPDATE
1077 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
1078 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
1079 0 : // the size of tuple data is inferred from the size of the record.
1080 0 : // we can't validate the remaining number of bytes without parsing
1081 0 : // the tuple data.
1082 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
1083 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
1084 0 : }
1085 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
1086 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
1087 0 : // non-HOT update where the new tuple goes to different page than
1088 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
1089 0 : // set.
1090 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1091 0 : }
1092 : }
1093 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
1094 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
1095 :
1096 0 : let offset_array_len =
1097 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
1098 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
1099 0 : 0
1100 : } else {
1101 0 : size_of::<u16>() * xlrec.ntuples as usize
1102 : };
1103 0 : assert_eq!(offset_array_len, buf.remaining());
1104 :
1105 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
1106 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1107 0 : }
1108 : }
1109 : pg_constants::XLOG_NEON_HEAP_LOCK => {
1110 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
1111 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
1112 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
1113 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
1114 0 : }
1115 : }
1116 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
1117 : }
1118 : }
1119 0 : _ => bail!(
1120 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
1121 0 : pg_version
1122 0 : ),
1123 : }
1124 :
1125 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
1126 0 : let vm_rel = RelTag {
1127 0 : forknum: VISIBILITYMAP_FORKNUM,
1128 0 : spcnode: decoded.blocks[0].rnode_spcnode,
1129 0 : dbnode: decoded.blocks[0].rnode_dbnode,
1130 0 : relnode: decoded.blocks[0].rnode_relnode,
1131 0 : };
1132 0 :
1133 0 : Ok(Some(NeonrmgrRecord::ClearVmBits(ClearVmBits {
1134 0 : new_heap_blkno,
1135 0 : old_heap_blkno,
1136 0 : vm_rel,
1137 0 : flags,
1138 0 : })))
1139 : } else {
1140 0 : Ok(None)
1141 : }
1142 0 : }
1143 :
1144 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
1145 0 : async fn ingest_xlog_dbase_create(
1146 0 : &mut self,
1147 0 : create: DbaseCreate,
1148 0 : modification: &mut DatadirModification<'_>,
1149 0 : ctx: &RequestContext,
1150 0 : ) -> anyhow::Result<()> {
1151 0 : let DbaseCreate {
1152 0 : db_id,
1153 0 : tablespace_id,
1154 0 : src_db_id,
1155 0 : src_tablespace_id,
1156 0 : } = create;
1157 :
1158 0 : let rels = modification
1159 0 : .tline
1160 0 : .list_rels(
1161 0 : src_tablespace_id,
1162 0 : src_db_id,
1163 0 : Version::Modified(modification),
1164 0 : ctx,
1165 0 : )
1166 0 : .await?;
1167 :
1168 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
1169 :
1170 : // Copy relfilemap
1171 0 : let filemap = modification
1172 0 : .tline
1173 0 : .get_relmap_file(
1174 0 : src_tablespace_id,
1175 0 : src_db_id,
1176 0 : Version::Modified(modification),
1177 0 : ctx,
1178 0 : )
1179 0 : .await?;
1180 0 : modification
1181 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
1182 0 : .await?;
1183 :
1184 0 : let mut num_rels_copied = 0;
1185 0 : let mut num_blocks_copied = 0;
1186 0 : for src_rel in rels {
1187 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
1188 0 : assert_eq!(src_rel.dbnode, src_db_id);
1189 :
1190 0 : let nblocks = modification
1191 0 : .tline
1192 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
1193 0 : .await?;
1194 0 : let dst_rel = RelTag {
1195 0 : spcnode: tablespace_id,
1196 0 : dbnode: db_id,
1197 0 : relnode: src_rel.relnode,
1198 0 : forknum: src_rel.forknum,
1199 0 : };
1200 0 :
1201 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
1202 :
1203 : // Copy content
1204 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
1205 0 : for blknum in 0..nblocks {
1206 : // Sharding:
1207 : // - src and dst are always on the same shard, because they differ only by dbNode, and
1208 : // dbNode is not included in the hash inputs for sharding.
1209 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
1210 : // that belong to it.
1211 0 : let src_key = rel_block_to_key(src_rel, blknum);
1212 0 : if !self.shard.is_key_local(&src_key) {
1213 0 : debug!(
1214 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
1215 : src_key
1216 : );
1217 0 : continue;
1218 0 : }
1219 0 : debug!(
1220 0 : "copying block {} from {} ({}) to {}",
1221 : blknum, src_rel, src_key, dst_rel
1222 : );
1223 :
1224 0 : let content = modification
1225 0 : .tline
1226 0 : .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
1227 0 : .await?;
1228 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
1229 0 : num_blocks_copied += 1;
1230 : }
1231 :
1232 0 : num_rels_copied += 1;
1233 : }
1234 :
1235 0 : info!(
1236 0 : "Created database {}/{}, copied {} blocks in {} rels",
1237 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
1238 : );
1239 0 : Ok(())
1240 0 : }
1241 :
1242 0 : async fn ingest_xlog_dbase_drop(
1243 0 : &mut self,
1244 0 : dbase_drop: DbaseDrop,
1245 0 : modification: &mut DatadirModification<'_>,
1246 0 : ctx: &RequestContext,
1247 0 : ) -> anyhow::Result<()> {
1248 0 : let DbaseDrop {
1249 0 : db_id,
1250 0 : tablespace_ids,
1251 0 : } = dbase_drop;
1252 0 : for tablespace_id in tablespace_ids {
1253 0 : trace!("Drop db {}, {}", tablespace_id, db_id);
1254 0 : modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
1255 : }
1256 :
1257 0 : Ok(())
1258 0 : }
1259 :
1260 0 : fn decode_dbase_record(
1261 0 : buf: &mut Bytes,
1262 0 : decoded: &DecodedWALRecord,
1263 0 : pg_version: u32,
1264 0 : ) -> anyhow::Result<Option<DbaseRecord>> {
1265 0 : // TODO: Refactor this to avoid the duplication between postgres versions.
1266 0 :
1267 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1268 0 : debug!(%info, %pg_version, "handle RM_DBASE_ID");
1269 :
1270 0 : if pg_version == 14 {
1271 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
1272 0 : let createdb = XlCreateDatabase::decode(buf);
1273 0 : debug!("XLOG_DBASE_CREATE v14");
1274 :
1275 0 : let record = DbaseRecord::Create(DbaseCreate {
1276 0 : db_id: createdb.db_id,
1277 0 : tablespace_id: createdb.tablespace_id,
1278 0 : src_db_id: createdb.src_db_id,
1279 0 : src_tablespace_id: createdb.src_tablespace_id,
1280 0 : });
1281 0 :
1282 0 : return Ok(Some(record));
1283 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
1284 0 : let dropdb = XlDropDatabase::decode(buf);
1285 0 :
1286 0 : let record = DbaseRecord::Drop(DbaseDrop {
1287 0 : db_id: dropdb.db_id,
1288 0 : tablespace_ids: dropdb.tablespace_ids,
1289 0 : });
1290 0 :
1291 0 : return Ok(Some(record));
1292 0 : }
1293 0 : } else if pg_version == 15 {
1294 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
1295 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
1296 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
1297 : // The XLOG record was renamed between v14 and v15,
1298 : // but the record format is the same.
1299 : // So we can reuse XlCreateDatabase here.
1300 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
1301 :
1302 0 : let createdb = XlCreateDatabase::decode(buf);
1303 0 : let record = DbaseRecord::Create(DbaseCreate {
1304 0 : db_id: createdb.db_id,
1305 0 : tablespace_id: createdb.tablespace_id,
1306 0 : src_db_id: createdb.src_db_id,
1307 0 : src_tablespace_id: createdb.src_tablespace_id,
1308 0 : });
1309 0 :
1310 0 : return Ok(Some(record));
1311 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
1312 0 : let dropdb = XlDropDatabase::decode(buf);
1313 0 : let record = DbaseRecord::Drop(DbaseDrop {
1314 0 : db_id: dropdb.db_id,
1315 0 : tablespace_ids: dropdb.tablespace_ids,
1316 0 : });
1317 0 :
1318 0 : return Ok(Some(record));
1319 0 : }
1320 0 : } else if pg_version == 16 {
1321 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
1322 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
1323 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
1324 : // The XLOG record was renamed between v14 and v15,
1325 : // but the record format is the same.
1326 : // So we can reuse XlCreateDatabase here.
1327 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
1328 :
1329 0 : let createdb = XlCreateDatabase::decode(buf);
1330 0 : let record = DbaseRecord::Create(DbaseCreate {
1331 0 : db_id: createdb.db_id,
1332 0 : tablespace_id: createdb.tablespace_id,
1333 0 : src_db_id: createdb.src_db_id,
1334 0 : src_tablespace_id: createdb.src_tablespace_id,
1335 0 : });
1336 0 :
1337 0 : return Ok(Some(record));
1338 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
1339 0 : let dropdb = XlDropDatabase::decode(buf);
1340 0 : let record = DbaseRecord::Drop(DbaseDrop {
1341 0 : db_id: dropdb.db_id,
1342 0 : tablespace_ids: dropdb.tablespace_ids,
1343 0 : });
1344 0 :
1345 0 : return Ok(Some(record));
1346 0 : }
1347 0 : } else if pg_version == 17 {
1348 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
1349 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
1350 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
1351 : // The XLOG record was renamed between v14 and v15,
1352 : // but the record format is the same.
1353 : // So we can reuse XlCreateDatabase here.
1354 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
1355 :
1356 0 : let createdb = XlCreateDatabase::decode(buf);
1357 0 : let record = DbaseRecord::Create(DbaseCreate {
1358 0 : db_id: createdb.db_id,
1359 0 : tablespace_id: createdb.tablespace_id,
1360 0 : src_db_id: createdb.src_db_id,
1361 0 : src_tablespace_id: createdb.src_tablespace_id,
1362 0 : });
1363 0 :
1364 0 : return Ok(Some(record));
1365 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
1366 0 : let dropdb = XlDropDatabase::decode(buf);
1367 0 : let record = DbaseRecord::Drop(DbaseDrop {
1368 0 : db_id: dropdb.db_id,
1369 0 : tablespace_ids: dropdb.tablespace_ids,
1370 0 : });
1371 0 :
1372 0 : return Ok(Some(record));
1373 0 : }
1374 0 : }
1375 :
1376 0 : Ok(None)
1377 0 : }
1378 :
1379 16 : async fn ingest_xlog_smgr_create(
1380 16 : &mut self,
1381 16 : create: SmgrCreate,
1382 16 : modification: &mut DatadirModification<'_>,
1383 16 : ctx: &RequestContext,
1384 16 : ) -> anyhow::Result<()> {
1385 16 : let SmgrCreate { rel } = create;
1386 16 : self.put_rel_creation(modification, rel, ctx).await?;
1387 16 : Ok(())
1388 16 : }
1389 :
1390 16 : fn decode_smgr_record(
1391 16 : buf: &mut Bytes,
1392 16 : decoded: &DecodedWALRecord,
1393 16 : _pg_version: u32,
1394 16 : ) -> anyhow::Result<Option<SmgrRecord>> {
1395 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1396 16 : if info == pg_constants::XLOG_SMGR_CREATE {
1397 16 : let create = XlSmgrCreate::decode(buf);
1398 16 : let rel = RelTag {
1399 16 : spcnode: create.rnode.spcnode,
1400 16 : dbnode: create.rnode.dbnode,
1401 16 : relnode: create.rnode.relnode,
1402 16 : forknum: create.forknum,
1403 16 : };
1404 16 :
1405 16 : return Ok(Some(SmgrRecord::Create(SmgrCreate { rel })));
1406 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
1407 0 : let truncate = XlSmgrTruncate::decode(buf);
1408 0 : return Ok(Some(SmgrRecord::Truncate(truncate)));
1409 0 : }
1410 0 :
1411 0 : Ok(None)
1412 16 : }
1413 :
1414 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1415 : ///
1416 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1417 0 : async fn ingest_xlog_smgr_truncate(
1418 0 : &mut self,
1419 0 : truncate: XlSmgrTruncate,
1420 0 : modification: &mut DatadirModification<'_>,
1421 0 : ctx: &RequestContext,
1422 0 : ) -> anyhow::Result<()> {
1423 0 : let XlSmgrTruncate {
1424 0 : blkno,
1425 0 : rnode,
1426 0 : flags,
1427 0 : } = truncate;
1428 0 :
1429 0 : let spcnode = rnode.spcnode;
1430 0 : let dbnode = rnode.dbnode;
1431 0 : let relnode = rnode.relnode;
1432 0 :
1433 0 : if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
1434 0 : let rel = RelTag {
1435 0 : spcnode,
1436 0 : dbnode,
1437 0 : relnode,
1438 0 : forknum: MAIN_FORKNUM,
1439 0 : };
1440 0 :
1441 0 : self.put_rel_truncation(modification, rel, blkno, ctx)
1442 0 : .await?;
1443 0 : }
1444 0 : if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
1445 0 : let rel = RelTag {
1446 0 : spcnode,
1447 0 : dbnode,
1448 0 : relnode,
1449 0 : forknum: FSM_FORKNUM,
1450 0 : };
1451 0 :
1452 0 : let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1453 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1454 0 : if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1455 : // Tail of last remaining FSM page has to be zeroed.
1456 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1457 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
1458 0 : fsm_physical_page_no += 1;
1459 0 : }
1460 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1461 0 : if nblocks > fsm_physical_page_no {
1462 : // check if something to do: FSM is larger than truncate position
1463 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1464 0 : .await?;
1465 0 : }
1466 0 : }
1467 0 : if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
1468 0 : let rel = RelTag {
1469 0 : spcnode,
1470 0 : dbnode,
1471 0 : relnode,
1472 0 : forknum: VISIBILITYMAP_FORKNUM,
1473 0 : };
1474 0 :
1475 0 : let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1476 0 : if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1477 : // Tail of last remaining vm page has to be zeroed.
1478 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1479 0 : modification.put_rel_page_image_zero(rel, vm_page_no)?;
1480 0 : vm_page_no += 1;
1481 0 : }
1482 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1483 0 : if nblocks > vm_page_no {
1484 : // check if something to do: VM is larger than truncate position
1485 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1486 0 : .await?;
1487 0 : }
1488 0 : }
1489 0 : Ok(())
1490 0 : }
1491 :
1492 8 : fn warn_on_ingest_lag(
1493 8 : &mut self,
1494 8 : conf: &crate::config::PageServerConf,
1495 8 : wal_timestamp: TimestampTz,
1496 8 : ) {
1497 8 : debug_assert_current_span_has_tenant_and_timeline_id();
1498 8 : let now = SystemTime::now();
1499 8 : let rate_limits = &mut self.warn_ingest_lag;
1500 :
1501 8 : let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
1502 0 : pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
1503 : });
1504 :
1505 8 : match ts {
1506 8 : Ok(ts) => {
1507 8 : match now.duration_since(ts) {
1508 8 : Ok(lag) => {
1509 8 : if lag > conf.wait_lsn_timeout {
1510 8 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
1511 2 : if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
1512 0 : if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
1513 0 : return;
1514 0 : }
1515 2 : } else {
1516 2 : // Still loading? We shouldn't be here
1517 2 : }
1518 2 : let lag = humantime::format_duration(lag);
1519 2 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
1520 8 : })
1521 0 : }
1522 : }
1523 0 : Err(e) => {
1524 0 : let delta_t = e.duration();
1525 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
1526 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
1527 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
1528 0 : if delta_t > IGNORED_DRIFT {
1529 0 : let delta_t = humantime::format_duration(delta_t);
1530 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
1531 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
1532 0 : })
1533 0 : }
1534 : }
1535 : };
1536 : }
1537 0 : Err(error) => {
1538 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
1539 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
1540 0 : })
1541 : }
1542 : }
1543 8 : }
1544 :
1545 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1546 : ///
1547 8 : async fn ingest_xact_record(
1548 8 : &mut self,
1549 8 : record: XactRecord,
1550 8 : modification: &mut DatadirModification<'_>,
1551 8 : ctx: &RequestContext,
1552 8 : ) -> anyhow::Result<()> {
1553 8 : let (xact_common, is_commit, is_prepared) = match record {
1554 0 : XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
1555 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
1556 0 : self.adjust_to_full_transaction_id(xl_xid)?
1557 : } else {
1558 0 : xl_xid as u64
1559 : };
1560 0 : return modification.put_twophase_file(xid, data, ctx).await;
1561 : }
1562 8 : XactRecord::Commit(common) => (common, true, false),
1563 0 : XactRecord::Abort(common) => (common, false, false),
1564 0 : XactRecord::CommitPrepared(common) => (common, true, true),
1565 0 : XactRecord::AbortPrepared(common) => (common, false, true),
1566 : };
1567 :
1568 : let XactCommon {
1569 8 : parsed,
1570 8 : origin_id,
1571 8 : xl_xid,
1572 8 : lsn,
1573 8 : } = xact_common;
1574 8 :
1575 8 : // Record update of CLOG pages
1576 8 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1577 8 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1578 8 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1579 8 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1580 8 :
1581 8 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
1582 :
1583 8 : for subxact in &parsed.subxacts {
1584 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1585 0 : if subxact_pageno != pageno {
1586 : // This subxact goes to different page. Write the record
1587 : // for all the XIDs on the previous page, and continue
1588 : // accumulating XIDs on this new page.
1589 0 : modification.put_slru_wal_record(
1590 0 : SlruKind::Clog,
1591 0 : segno,
1592 0 : rpageno,
1593 0 : if is_commit {
1594 0 : NeonWalRecord::ClogSetCommitted {
1595 0 : xids: page_xids,
1596 0 : timestamp: parsed.xact_time,
1597 0 : }
1598 : } else {
1599 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1600 : },
1601 0 : )?;
1602 0 : page_xids = Vec::new();
1603 0 : }
1604 0 : pageno = subxact_pageno;
1605 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1606 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1607 0 : page_xids.push(*subxact);
1608 : }
1609 8 : modification.put_slru_wal_record(
1610 8 : SlruKind::Clog,
1611 8 : segno,
1612 8 : rpageno,
1613 8 : if is_commit {
1614 8 : NeonWalRecord::ClogSetCommitted {
1615 8 : xids: page_xids,
1616 8 : timestamp: parsed.xact_time,
1617 8 : }
1618 : } else {
1619 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1620 : },
1621 0 : )?;
1622 :
1623 8 : for xnode in &parsed.xnodes {
1624 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1625 0 : let rel = RelTag {
1626 0 : forknum,
1627 0 : spcnode: xnode.spcnode,
1628 0 : dbnode: xnode.dbnode,
1629 0 : relnode: xnode.relnode,
1630 0 : };
1631 0 : if modification
1632 0 : .tline
1633 0 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1634 0 : .await?
1635 : {
1636 0 : self.put_rel_drop(modification, rel, ctx).await?;
1637 0 : }
1638 : }
1639 : }
1640 8 : if origin_id != 0 {
1641 0 : modification
1642 0 : .set_replorigin(origin_id, parsed.origin_lsn)
1643 0 : .await?;
1644 8 : }
1645 :
1646 8 : if is_prepared {
1647 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
1648 0 : trace!(
1649 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
1650 : xl_xid,
1651 : parsed.xid,
1652 : lsn,
1653 : );
1654 :
1655 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
1656 0 : self.adjust_to_full_transaction_id(parsed.xid)?
1657 : } else {
1658 0 : parsed.xid as u64
1659 : };
1660 0 : modification.drop_twophase_file(xid, ctx).await?;
1661 8 : }
1662 :
1663 8 : Ok(())
1664 8 : }
1665 :
1666 : // TODO(vlad): Standardise interface for `decode_...`
1667 24 : fn decode_xact_record(
1668 24 : buf: &mut Bytes,
1669 24 : decoded: &DecodedWALRecord,
1670 24 : lsn: Lsn,
1671 24 : _pg_version: u32,
1672 24 : ) -> anyhow::Result<Option<XactRecord>> {
1673 24 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
1674 24 : let origin_id = decoded.origin_id;
1675 24 : let xl_xid = decoded.xl_xid;
1676 24 :
1677 24 : if info == pg_constants::XLOG_XACT_COMMIT {
1678 8 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
1679 8 : return Ok(Some(XactRecord::Commit(XactCommon {
1680 8 : parsed,
1681 8 : origin_id,
1682 8 : xl_xid,
1683 8 : lsn,
1684 8 : })));
1685 16 : } else if info == pg_constants::XLOG_XACT_ABORT {
1686 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
1687 0 : return Ok(Some(XactRecord::Abort(XactCommon {
1688 0 : parsed,
1689 0 : origin_id,
1690 0 : xl_xid,
1691 0 : lsn,
1692 0 : })));
1693 16 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
1694 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
1695 0 : return Ok(Some(XactRecord::CommitPrepared(XactCommon {
1696 0 : parsed,
1697 0 : origin_id,
1698 0 : xl_xid,
1699 0 : lsn,
1700 0 : })));
1701 16 : } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
1702 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
1703 0 : return Ok(Some(XactRecord::AbortPrepared(XactCommon {
1704 0 : parsed,
1705 0 : origin_id,
1706 0 : xl_xid,
1707 0 : lsn,
1708 0 : })));
1709 16 : } else if info == pg_constants::XLOG_XACT_PREPARE {
1710 0 : return Ok(Some(XactRecord::Prepare(XactPrepare {
1711 0 : xl_xid: decoded.xl_xid,
1712 0 : data: Bytes::copy_from_slice(&buf[..]),
1713 0 : })));
1714 16 : }
1715 16 :
1716 16 : Ok(None)
1717 24 : }
1718 :
1719 0 : async fn ingest_clog_truncate(
1720 0 : &mut self,
1721 0 : truncate: ClogTruncate,
1722 0 : modification: &mut DatadirModification<'_>,
1723 0 : ctx: &RequestContext,
1724 0 : ) -> anyhow::Result<()> {
1725 0 : let ClogTruncate {
1726 0 : pageno,
1727 0 : oldest_xid,
1728 0 : oldest_xid_db,
1729 0 : } = truncate;
1730 0 :
1731 0 : info!(
1732 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1733 : pageno, oldest_xid, oldest_xid_db
1734 : );
1735 :
1736 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
1737 : // truncated, but a checkpoint record with the updated values isn't written until
1738 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
1739 : // so we keep the oldestXid and oldestXidDB up-to-date.
1740 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1741 0 : cp.oldestXid = oldest_xid;
1742 0 : cp.oldestXidDB = oldest_xid_db;
1743 0 : });
1744 0 : self.checkpoint_modified = true;
1745 :
1746 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1747 :
1748 0 : let latest_page_number =
1749 0 : enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
1750 : / pg_constants::CLOG_XACTS_PER_PAGE;
1751 :
1752 : // Now delete all segments containing pages between xlrec.pageno
1753 : // and latest_page_number.
1754 :
1755 : // First, make an important safety check:
1756 : // the current endpoint page must not be eligible for removal.
1757 : // See SimpleLruTruncate() in slru.c
1758 0 : if dispatch_pgversion!(modification.tline.pg_version, {
1759 0 : pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
1760 : }) {
1761 0 : info!("could not truncate directory pg_xact apparent wraparound");
1762 0 : return Ok(());
1763 0 : }
1764 :
1765 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1766 : //
1767 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1768 : // will block waiting for the last valid LSN to advance up to
1769 : // it. So we use the previous record's LSN in the get calls
1770 : // instead.
1771 0 : for segno in modification
1772 0 : .tline
1773 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
1774 0 : .await?
1775 : {
1776 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1777 :
1778 0 : let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
1779 0 : pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
1780 : });
1781 :
1782 0 : if may_delete {
1783 0 : modification
1784 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1785 0 : .await?;
1786 0 : trace!("Drop CLOG segment {:>04X}", segno);
1787 0 : }
1788 : }
1789 :
1790 0 : Ok(())
1791 0 : }
1792 :
1793 0 : async fn ingest_clog_zero_page(
1794 0 : &mut self,
1795 0 : zero_page: ClogZeroPage,
1796 0 : modification: &mut DatadirModification<'_>,
1797 0 : ctx: &RequestContext,
1798 0 : ) -> anyhow::Result<()> {
1799 0 : let ClogZeroPage { segno, rpageno } = zero_page;
1800 0 :
1801 0 : self.put_slru_page_image(
1802 0 : modification,
1803 0 : SlruKind::Clog,
1804 0 : segno,
1805 0 : rpageno,
1806 0 : ZERO_PAGE.clone(),
1807 0 : ctx,
1808 0 : )
1809 0 : .await
1810 0 : }
1811 :
1812 0 : fn decode_clog_record(
1813 0 : buf: &mut Bytes,
1814 0 : decoded: &DecodedWALRecord,
1815 0 : pg_version: u32,
1816 0 : ) -> anyhow::Result<Option<ClogRecord>> {
1817 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
1818 0 :
1819 0 : if info == pg_constants::CLOG_ZEROPAGE {
1820 0 : let pageno = if pg_version < 17 {
1821 0 : buf.get_u32_le()
1822 : } else {
1823 0 : buf.get_u64_le() as u32
1824 : };
1825 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1826 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1827 0 :
1828 0 : Ok(Some(ClogRecord::ZeroPage(ClogZeroPage { segno, rpageno })))
1829 : } else {
1830 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
1831 0 : let xlrec = XlClogTruncate::decode(buf, pg_version);
1832 0 :
1833 0 : Ok(Some(ClogRecord::Truncate(ClogTruncate {
1834 0 : pageno: xlrec.pageno,
1835 0 : oldest_xid: xlrec.oldest_xid,
1836 0 : oldest_xid_db: xlrec.oldest_xid_db,
1837 0 : })))
1838 : }
1839 0 : }
1840 :
1841 0 : fn ingest_multixact_create(
1842 0 : &mut self,
1843 0 : modification: &mut DatadirModification,
1844 0 : xlrec: &XlMultiXactCreate,
1845 0 : ) -> Result<()> {
1846 0 : // Create WAL record for updating the multixact-offsets page
1847 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1848 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1849 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1850 0 :
1851 0 : modification.put_slru_wal_record(
1852 0 : SlruKind::MultiXactOffsets,
1853 0 : segno,
1854 0 : rpageno,
1855 0 : NeonWalRecord::MultixactOffsetCreate {
1856 0 : mid: xlrec.mid,
1857 0 : moff: xlrec.moff,
1858 0 : },
1859 0 : )?;
1860 :
1861 : // Create WAL records for the update of each affected multixact-members page
1862 0 : let mut members = xlrec.members.iter();
1863 0 : let mut offset = xlrec.moff;
1864 : loop {
1865 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1866 0 :
1867 0 : // How many members fit on this page?
1868 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1869 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1870 0 :
1871 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1872 0 : for _ in 0..page_remain {
1873 0 : if let Some(m) = members.next() {
1874 0 : this_page_members.push(m.clone());
1875 0 : } else {
1876 0 : break;
1877 : }
1878 : }
1879 0 : if this_page_members.is_empty() {
1880 : // all done
1881 0 : break;
1882 0 : }
1883 0 : let n_this_page = this_page_members.len();
1884 0 :
1885 0 : modification.put_slru_wal_record(
1886 0 : SlruKind::MultiXactMembers,
1887 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1888 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1889 0 : NeonWalRecord::MultixactMembersCreate {
1890 0 : moff: offset,
1891 0 : members: this_page_members,
1892 0 : },
1893 0 : )?;
1894 :
1895 : // Note: The multixact members can wrap around, even within one WAL record.
1896 0 : offset = offset.wrapping_add(n_this_page as u32);
1897 : }
1898 0 : let next_offset = offset;
1899 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
1900 :
1901 : // Update next-multi-xid and next-offset
1902 : //
1903 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
1904 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
1905 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
1906 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
1907 : // is stored, so it's never 0 in a checkpoint.
1908 : //
1909 : // I don't know why it's done that way, it seems less error-prone to skip over 0
1910 : // when the value is stored rather than when it's read. But let's do it the same
1911 : // way here.
1912 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
1913 0 :
1914 0 : if self
1915 0 : .checkpoint
1916 0 : .update_next_multixid(next_multi_xid, next_offset)
1917 0 : {
1918 0 : self.checkpoint_modified = true;
1919 0 : }
1920 :
1921 : // Also update the next-xid with the highest member. According to the comments in
1922 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
1923 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1924 0 : if let Some(max_xid) = acc {
1925 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1926 0 : Some(mbr.xid)
1927 : } else {
1928 0 : acc
1929 : }
1930 : } else {
1931 0 : Some(mbr.xid)
1932 : }
1933 0 : });
1934 :
1935 0 : if let Some(max_xid) = max_mbr_xid {
1936 0 : if self.checkpoint.update_next_xid(max_xid) {
1937 0 : self.checkpoint_modified = true;
1938 0 : }
1939 0 : }
1940 0 : Ok(())
1941 0 : }
1942 :
1943 0 : async fn ingest_multixact_truncate(
1944 0 : &mut self,
1945 0 : modification: &mut DatadirModification<'_>,
1946 0 : xlrec: &XlMultiXactTruncate,
1947 0 : ctx: &RequestContext,
1948 0 : ) -> Result<()> {
1949 0 : let (maxsegment, startsegment, endsegment) =
1950 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1951 0 : cp.oldestMulti = xlrec.end_trunc_off;
1952 0 : cp.oldestMultiDB = xlrec.oldest_multi_db;
1953 0 : let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
1954 0 : pg_constants::MAX_MULTIXACT_OFFSET,
1955 0 : );
1956 0 : let startsegment: i32 =
1957 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
1958 0 : let endsegment: i32 =
1959 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
1960 0 : (maxsegment, startsegment, endsegment)
1961 : });
1962 :
1963 0 : self.checkpoint_modified = true;
1964 0 :
1965 0 : // PerformMembersTruncation
1966 0 : let mut segment: i32 = startsegment;
1967 :
1968 : // Delete all the segments except the last one. The last segment can still
1969 : // contain, possibly partially, valid data.
1970 0 : while segment != endsegment {
1971 0 : modification
1972 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1973 0 : .await?;
1974 :
1975 : /* move to next segment, handling wraparound correctly */
1976 0 : if segment == maxsegment {
1977 0 : segment = 0;
1978 0 : } else {
1979 0 : segment += 1;
1980 0 : }
1981 : }
1982 :
1983 : // Truncate offsets
1984 : // FIXME: this did not handle wraparound correctly
1985 :
1986 0 : Ok(())
1987 0 : }
1988 :
1989 0 : async fn ingest_multixact_zero_page(
1990 0 : &mut self,
1991 0 : zero_page: MultiXactZeroPage,
1992 0 : modification: &mut DatadirModification<'_>,
1993 0 : ctx: &RequestContext,
1994 0 : ) -> Result<()> {
1995 0 : let MultiXactZeroPage {
1996 0 : slru_kind,
1997 0 : segno,
1998 0 : rpageno,
1999 0 : } = zero_page;
2000 0 : self.put_slru_page_image(
2001 0 : modification,
2002 0 : slru_kind,
2003 0 : segno,
2004 0 : rpageno,
2005 0 : ZERO_PAGE.clone(),
2006 0 : ctx,
2007 0 : )
2008 0 : .await
2009 0 : }
2010 :
2011 0 : fn decode_multixact_record(
2012 0 : buf: &mut Bytes,
2013 0 : decoded: &DecodedWALRecord,
2014 0 : pg_version: u32,
2015 0 : ) -> anyhow::Result<Option<MultiXactRecord>> {
2016 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
2017 0 :
2018 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
2019 0 : || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
2020 : {
2021 0 : let pageno = if pg_version < 17 {
2022 0 : buf.get_u32_le()
2023 : } else {
2024 0 : buf.get_u64_le() as u32
2025 : };
2026 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
2027 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
2028 :
2029 0 : let slru_kind = match info {
2030 0 : pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
2031 0 : pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
2032 0 : _ => unreachable!(),
2033 : };
2034 :
2035 0 : return Ok(Some(MultiXactRecord::ZeroPage(MultiXactZeroPage {
2036 0 : slru_kind,
2037 0 : segno,
2038 0 : rpageno,
2039 0 : })));
2040 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
2041 0 : let xlrec = XlMultiXactCreate::decode(buf);
2042 0 : return Ok(Some(MultiXactRecord::Create(xlrec)));
2043 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
2044 0 : let xlrec = XlMultiXactTruncate::decode(buf);
2045 0 : return Ok(Some(MultiXactRecord::Truncate(xlrec)));
2046 0 : }
2047 0 :
2048 0 : Ok(None)
2049 0 : }
2050 :
2051 0 : async fn ingest_relmap_update(
2052 0 : &mut self,
2053 0 : update: RelmapUpdate,
2054 0 : modification: &mut DatadirModification<'_>,
2055 0 : ctx: &RequestContext,
2056 0 : ) -> Result<()> {
2057 0 : let RelmapUpdate { update, buf } = update;
2058 0 :
2059 0 : modification
2060 0 : .put_relmap_file(update.tsid, update.dbid, buf, ctx)
2061 0 : .await
2062 0 : }
2063 :
2064 0 : fn decode_relmap_record(
2065 0 : buf: &mut Bytes,
2066 0 : decoded: &DecodedWALRecord,
2067 0 : _pg_version: u32,
2068 0 : ) -> anyhow::Result<Option<RelmapRecord>> {
2069 0 : let update = XlRelmapUpdate::decode(buf);
2070 0 :
2071 0 : let mut buf = decoded.record.clone();
2072 0 : buf.advance(decoded.main_data_offset);
2073 0 : // skip xl_relmap_update
2074 0 : buf.advance(12);
2075 0 :
2076 0 : Ok(Some(RelmapRecord::Update(RelmapUpdate {
2077 0 : update,
2078 0 : buf: Bytes::copy_from_slice(&buf[..]),
2079 0 : })))
2080 0 : }
2081 :
2082 30 : async fn ingest_raw_xlog_record(
2083 30 : &mut self,
2084 30 : raw_record: RawXlogRecord,
2085 30 : modification: &mut DatadirModification<'_>,
2086 30 : ctx: &RequestContext,
2087 30 : ) -> Result<()> {
2088 30 : let RawXlogRecord { info, lsn, mut buf } = raw_record;
2089 30 : let pg_version = modification.tline.pg_version;
2090 30 :
2091 30 : if info == pg_constants::XLOG_PARAMETER_CHANGE {
2092 2 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
2093 0 : let rec = v17::XlParameterChange::decode(&mut buf);
2094 0 : cp.wal_level = rec.wal_level;
2095 0 : self.checkpoint_modified = true;
2096 2 : }
2097 28 : } else if info == pg_constants::XLOG_END_OF_RECOVERY {
2098 0 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
2099 0 : let rec = v17::XlEndOfRecovery::decode(&mut buf);
2100 0 : cp.wal_level = rec.wal_level;
2101 0 : self.checkpoint_modified = true;
2102 0 : }
2103 28 : }
2104 :
2105 30 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
2106 0 : if info == pg_constants::XLOG_NEXTOID {
2107 0 : let next_oid = buf.get_u32_le();
2108 0 : if cp.nextOid != next_oid {
2109 0 : cp.nextOid = next_oid;
2110 0 : self.checkpoint_modified = true;
2111 0 : }
2112 0 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
2113 0 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
2114 : {
2115 0 : let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
2116 0 : buf.copy_to_slice(&mut checkpoint_bytes);
2117 0 : let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
2118 0 : trace!(
2119 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
2120 : xlog_checkpoint.oldestXid,
2121 : cp.oldestXid
2122 : );
2123 0 : if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
2124 0 : cp.oldestXid = xlog_checkpoint.oldestXid;
2125 0 : }
2126 0 : trace!(
2127 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
2128 : xlog_checkpoint.oldestActiveXid,
2129 : cp.oldestActiveXid
2130 : );
2131 :
2132 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
2133 : // because at shutdown, all in-progress transactions will implicitly
2134 : // end. Postgres startup code knows that, and allows hot standby to start
2135 : // immediately from a shutdown checkpoint.
2136 : //
2137 : // In Neon, Postgres hot standby startup always behaves as if starting from
2138 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
2139 : // instead of overwriting self.checkpoint.oldestActiveXid with
2140 : // InvalidTransactionid from the checkpoint WAL record, update it to a
2141 : // proper value, knowing that there are no in-progress transactions at this
2142 : // point, except for prepared transactions.
2143 : //
2144 : // See also the neon code changes in the InitWalRecovery() function.
2145 0 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
2146 0 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
2147 : {
2148 0 : let oldest_active_xid = if pg_version >= 17 {
2149 0 : let mut oldest_active_full_xid = cp.nextXid.value;
2150 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
2151 0 : if xid < oldest_active_full_xid {
2152 0 : oldest_active_full_xid = xid;
2153 0 : }
2154 : }
2155 0 : oldest_active_full_xid as u32
2156 : } else {
2157 0 : let mut oldest_active_xid = cp.nextXid.value as u32;
2158 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
2159 0 : let narrow_xid = xid as u32;
2160 0 : if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
2161 0 : oldest_active_xid = narrow_xid;
2162 0 : }
2163 : }
2164 0 : oldest_active_xid
2165 : };
2166 0 : cp.oldestActiveXid = oldest_active_xid;
2167 0 : } else {
2168 0 : cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
2169 0 : }
2170 :
2171 : // Write a new checkpoint key-value pair on every checkpoint record, even
2172 : // if nothing really changed. Not strictly required, but it seems nice to
2173 : // have some trace of the checkpoint records in the layer files at the same
2174 : // LSNs.
2175 0 : self.checkpoint_modified = true;
2176 0 : }
2177 : });
2178 :
2179 30 : Ok(())
2180 30 : }
2181 :
2182 30 : fn decode_xlog_record(
2183 30 : buf: &mut Bytes,
2184 30 : decoded: &DecodedWALRecord,
2185 30 : lsn: Lsn,
2186 30 : _pg_version: u32,
2187 30 : ) -> anyhow::Result<Option<XlogRecord>> {
2188 30 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
2189 30 : Ok(Some(XlogRecord::Raw(RawXlogRecord {
2190 30 : info,
2191 30 : lsn,
2192 30 : buf: buf.clone(),
2193 30 : })))
2194 30 : }
2195 :
2196 0 : async fn ingest_logical_message_put(
2197 0 : &mut self,
2198 0 : put: PutLogicalMessage,
2199 0 : modification: &mut DatadirModification<'_>,
2200 0 : ctx: &RequestContext,
2201 0 : ) -> Result<()> {
2202 0 : let PutLogicalMessage { path, buf } = put;
2203 0 : modification.put_file(path.as_str(), &buf, ctx).await
2204 0 : }
2205 :
2206 0 : fn decode_logical_message_record(
2207 0 : buf: &mut Bytes,
2208 0 : decoded: &DecodedWALRecord,
2209 0 : _pg_version: u32,
2210 0 : ) -> anyhow::Result<Option<LogicalMessageRecord>> {
2211 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
2212 0 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
2213 0 : let xlrec = crate::walrecord::XlLogicalMessage::decode(buf);
2214 0 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
2215 :
2216 : #[cfg(feature = "testing")]
2217 0 : if prefix == "neon-test" {
2218 0 : return Ok(Some(LogicalMessageRecord::Failpoint));
2219 0 : }
2220 :
2221 0 : if let Some(path) = prefix.strip_prefix("neon-file:") {
2222 0 : let buf_size = xlrec.prefix_size + xlrec.message_size;
2223 0 : let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
2224 0 : return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage {
2225 0 : path: path.to_string(),
2226 0 : buf,
2227 0 : })));
2228 0 : }
2229 0 : }
2230 :
2231 0 : Ok(None)
2232 0 : }
2233 :
2234 16 : fn decode_standby_record(
2235 16 : buf: &mut Bytes,
2236 16 : decoded: &DecodedWALRecord,
2237 16 : _pg_version: u32,
2238 16 : ) -> anyhow::Result<Option<StandbyRecord>> {
2239 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
2240 16 : if info == pg_constants::XLOG_RUNNING_XACTS {
2241 0 : let xlrec = crate::walrecord::XlRunningXacts::decode(buf);
2242 0 : return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts {
2243 0 : oldest_running_xid: xlrec.oldest_running_xid,
2244 0 : })));
2245 16 : }
2246 16 :
2247 16 : Ok(None)
2248 16 : }
2249 :
2250 0 : fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
2251 0 : match record {
2252 0 : StandbyRecord::RunningXacts(running_xacts) => {
2253 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
2254 0 : cp.oldestActiveXid = running_xacts.oldest_running_xid;
2255 0 : });
2256 :
2257 0 : self.checkpoint_modified = true;
2258 0 : }
2259 0 : }
2260 0 :
2261 0 : Ok(())
2262 0 : }
2263 :
2264 0 : fn decode_replorigin_record(
2265 0 : buf: &mut Bytes,
2266 0 : decoded: &DecodedWALRecord,
2267 0 : _pg_version: u32,
2268 0 : ) -> anyhow::Result<Option<ReploriginRecord>> {
2269 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
2270 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
2271 0 : let xlrec = crate::walrecord::XlReploriginSet::decode(buf);
2272 0 : return Ok(Some(ReploriginRecord::Set(xlrec)));
2273 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
2274 0 : let xlrec = crate::walrecord::XlReploriginDrop::decode(buf);
2275 0 : return Ok(Some(ReploriginRecord::Drop(xlrec)));
2276 0 : }
2277 0 :
2278 0 : Ok(None)
2279 0 : }
2280 :
2281 0 : async fn ingest_replorigin_record(
2282 0 : &mut self,
2283 0 : record: ReploriginRecord,
2284 0 : modification: &mut DatadirModification<'_>,
2285 0 : ) -> Result<()> {
2286 0 : match record {
2287 0 : ReploriginRecord::Set(set) => {
2288 0 : modification
2289 0 : .set_replorigin(set.node_id, set.remote_lsn)
2290 0 : .await?;
2291 : }
2292 0 : ReploriginRecord::Drop(drop) => {
2293 0 : modification.drop_replorigin(drop.node_id).await?;
2294 : }
2295 : }
2296 :
2297 0 : Ok(())
2298 0 : }
2299 :
2300 18 : async fn put_rel_creation(
2301 18 : &mut self,
2302 18 : modification: &mut DatadirModification<'_>,
2303 18 : rel: RelTag,
2304 18 : ctx: &RequestContext,
2305 18 : ) -> Result<()> {
2306 18 : modification.put_rel_creation(rel, 0, ctx).await?;
2307 18 : Ok(())
2308 18 : }
2309 :
2310 272426 : async fn put_rel_page_image(
2311 272426 : &mut self,
2312 272426 : modification: &mut DatadirModification<'_>,
2313 272426 : rel: RelTag,
2314 272426 : blknum: BlockNumber,
2315 272426 : img: Bytes,
2316 272426 : ctx: &RequestContext,
2317 272426 : ) -> Result<(), PageReconstructError> {
2318 272426 : self.handle_rel_extend(modification, rel, blknum, ctx)
2319 5425 : .await?;
2320 272426 : modification.put_rel_page_image(rel, blknum, img)?;
2321 272426 : Ok(())
2322 272426 : }
2323 :
2324 145630 : async fn put_rel_wal_record(
2325 145630 : &mut self,
2326 145630 : modification: &mut DatadirModification<'_>,
2327 145630 : rel: RelTag,
2328 145630 : blknum: BlockNumber,
2329 145630 : rec: NeonWalRecord,
2330 145630 : ctx: &RequestContext,
2331 145630 : ) -> Result<()> {
2332 145630 : self.handle_rel_extend(modification, rel, blknum, ctx)
2333 279 : .await?;
2334 145630 : modification.put_rel_wal_record(rel, blknum, rec)?;
2335 145630 : Ok(())
2336 145630 : }
2337 :
2338 6012 : async fn put_rel_truncation(
2339 6012 : &mut self,
2340 6012 : modification: &mut DatadirModification<'_>,
2341 6012 : rel: RelTag,
2342 6012 : nblocks: BlockNumber,
2343 6012 : ctx: &RequestContext,
2344 6012 : ) -> anyhow::Result<()> {
2345 6012 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
2346 6012 : Ok(())
2347 6012 : }
2348 :
2349 2 : async fn put_rel_drop(
2350 2 : &mut self,
2351 2 : modification: &mut DatadirModification<'_>,
2352 2 : rel: RelTag,
2353 2 : ctx: &RequestContext,
2354 2 : ) -> Result<()> {
2355 2 : modification.put_rel_drop(rel, ctx).await?;
2356 2 : Ok(())
2357 2 : }
2358 :
2359 418056 : async fn handle_rel_extend(
2360 418056 : &mut self,
2361 418056 : modification: &mut DatadirModification<'_>,
2362 418056 : rel: RelTag,
2363 418056 : blknum: BlockNumber,
2364 418056 : ctx: &RequestContext,
2365 418056 : ) -> Result<(), PageReconstructError> {
2366 418056 : let new_nblocks = blknum + 1;
2367 : // Check if the relation exists. We implicitly create relations on first
2368 : // record.
2369 : // TODO: would be nice if to be more explicit about it
2370 :
2371 : // Get current size and put rel creation if rel doesn't exist
2372 : //
2373 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
2374 : // check the cache too. This is because eagerly checking the cache results in
2375 : // less work overall and 10% better performance. It's more work on cache miss
2376 : // but cache miss is rare.
2377 418056 : let old_nblocks = if let Some(nblocks) = modification
2378 418056 : .tline
2379 418056 : .get_cached_rel_size(&rel, modification.get_lsn())
2380 : {
2381 418046 : nblocks
2382 10 : } else if !modification
2383 10 : .tline
2384 10 : .get_rel_exists(rel, Version::Modified(modification), ctx)
2385 2 : .await?
2386 : {
2387 : // create it with 0 size initially, the logic below will extend it
2388 10 : modification
2389 10 : .put_rel_creation(rel, 0, ctx)
2390 3 : .await
2391 10 : .context("Relation Error")?;
2392 10 : 0
2393 : } else {
2394 0 : modification
2395 0 : .tline
2396 0 : .get_rel_size(rel, Version::Modified(modification), ctx)
2397 0 : .await?
2398 : };
2399 :
2400 418056 : if new_nblocks > old_nblocks {
2401 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
2402 274788 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
2403 :
2404 274788 : let mut key = rel_block_to_key(rel, blknum);
2405 274788 :
2406 274788 : // fill the gap with zeros
2407 274788 : let mut gap_blocks_filled: u64 = 0;
2408 274788 : for gap_blknum in old_nblocks..blknum {
2409 2998 : key.field6 = gap_blknum;
2410 2998 :
2411 2998 : if self.shard.get_shard_number(&key) != self.shard.number {
2412 0 : continue;
2413 2998 : }
2414 2998 :
2415 2998 : modification.put_rel_page_image_zero(rel, gap_blknum)?;
2416 2998 : gap_blocks_filled += 1;
2417 : }
2418 :
2419 274788 : WAL_INGEST
2420 274788 : .gap_blocks_zeroed_on_rel_extend
2421 274788 : .inc_by(gap_blocks_filled);
2422 143268 : }
2423 418056 : Ok(())
2424 418056 : }
2425 :
2426 0 : async fn put_slru_page_image(
2427 0 : &mut self,
2428 0 : modification: &mut DatadirModification<'_>,
2429 0 : kind: SlruKind,
2430 0 : segno: u32,
2431 0 : blknum: BlockNumber,
2432 0 : img: Bytes,
2433 0 : ctx: &RequestContext,
2434 0 : ) -> Result<()> {
2435 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
2436 0 : .await?;
2437 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
2438 0 : Ok(())
2439 0 : }
2440 :
2441 0 : async fn handle_slru_extend(
2442 0 : &mut self,
2443 0 : modification: &mut DatadirModification<'_>,
2444 0 : kind: SlruKind,
2445 0 : segno: u32,
2446 0 : blknum: BlockNumber,
2447 0 : ctx: &RequestContext,
2448 0 : ) -> anyhow::Result<()> {
2449 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
2450 0 : // extended with ZEROPAGE records, not with commit records, so it happens
2451 0 : // a lot less frequently.
2452 0 :
2453 0 : let new_nblocks = blknum + 1;
2454 : // Check if the relation exists. We implicitly create relations on first
2455 : // record.
2456 : // TODO: would be nice if to be more explicit about it
2457 0 : let old_nblocks = if !modification
2458 0 : .tline
2459 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
2460 0 : .await?
2461 : {
2462 : // create it with 0 size initially, the logic below will extend it
2463 0 : modification
2464 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
2465 0 : .await?;
2466 0 : 0
2467 : } else {
2468 0 : modification
2469 0 : .tline
2470 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
2471 0 : .await?
2472 : };
2473 :
2474 0 : if new_nblocks > old_nblocks {
2475 0 : trace!(
2476 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
2477 : kind,
2478 : segno,
2479 : old_nblocks,
2480 : new_nblocks
2481 : );
2482 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
2483 :
2484 : // fill the gap with zeros
2485 0 : for gap_blknum in old_nblocks..blknum {
2486 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
2487 : }
2488 0 : }
2489 0 : Ok(())
2490 0 : }
2491 : }
2492 :
2493 12 : async fn get_relsize(
2494 12 : modification: &DatadirModification<'_>,
2495 12 : rel: RelTag,
2496 12 : ctx: &RequestContext,
2497 12 : ) -> Result<BlockNumber, PageReconstructError> {
2498 12 : let nblocks = if !modification
2499 12 : .tline
2500 12 : .get_rel_exists(rel, Version::Modified(modification), ctx)
2501 0 : .await?
2502 : {
2503 0 : 0
2504 : } else {
2505 12 : modification
2506 12 : .tline
2507 12 : .get_rel_size(rel, Version::Modified(modification), ctx)
2508 0 : .await?
2509 : };
2510 12 : Ok(nblocks)
2511 12 : }
2512 :
2513 : #[allow(clippy::bool_assert_comparison)]
2514 : #[cfg(test)]
2515 : mod tests {
2516 : use super::*;
2517 : use crate::tenant::harness::*;
2518 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
2519 : use postgres_ffi::RELSEG_SIZE;
2520 :
2521 : use crate::DEFAULT_PG_VERSION;
2522 :
2523 : /// Arbitrary relation tag, for testing.
2524 : const TESTREL_A: RelTag = RelTag {
2525 : spcnode: 0,
2526 : dbnode: 111,
2527 : relnode: 1000,
2528 : forknum: 0,
2529 : };
2530 :
2531 12 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
2532 12 : // TODO
2533 12 : }
2534 :
2535 : #[tokio::test]
2536 2 : async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
2537 8 : for i in 14..=16 {
2538 6 : dispatch_pgversion!(i, {
2539 2 : pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
2540 2 : });
2541 2 : }
2542 2 :
2543 2 : Ok(())
2544 2 : }
2545 :
2546 8 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
2547 8 : let mut m = tline.begin_modification(Lsn(0x10));
2548 8 : m.put_checkpoint(dispatch_pgversion!(
2549 8 : tline.pg_version,
2550 0 : pgv::ZERO_CHECKPOINT.clone()
2551 0 : ))?;
2552 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
2553 8 : m.commit(ctx).await?;
2554 8 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
2555 :
2556 8 : Ok(walingest)
2557 8 : }
2558 :
2559 : #[tokio::test]
2560 2 : async fn test_relsize() -> Result<()> {
2561 10 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
2562 2 : let tline = tenant
2563 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2564 4 : .await?;
2565 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2566 2 :
2567 2 : let mut m = tline.begin_modification(Lsn(0x20));
2568 2 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
2569 2 : walingest
2570 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
2571 2 : .await?;
2572 2 : m.on_record_end();
2573 2 : m.commit(&ctx).await?;
2574 2 : let mut m = tline.begin_modification(Lsn(0x30));
2575 2 : walingest
2576 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
2577 2 : .await?;
2578 2 : m.on_record_end();
2579 2 : m.commit(&ctx).await?;
2580 2 : let mut m = tline.begin_modification(Lsn(0x40));
2581 2 : walingest
2582 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
2583 2 : .await?;
2584 2 : m.on_record_end();
2585 2 : m.commit(&ctx).await?;
2586 2 : let mut m = tline.begin_modification(Lsn(0x50));
2587 2 : walingest
2588 2 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
2589 2 : .await?;
2590 2 : m.on_record_end();
2591 2 : m.commit(&ctx).await?;
2592 2 :
2593 2 : assert_current_logical_size(&tline, Lsn(0x50));
2594 2 :
2595 2 : // The relation was created at LSN 2, not visible at LSN 1 yet.
2596 2 : assert_eq!(
2597 2 : tline
2598 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2599 2 : .await?,
2600 2 : false
2601 2 : );
2602 2 : assert!(tline
2603 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2604 2 : .await
2605 2 : .is_err());
2606 2 : assert_eq!(
2607 2 : tline
2608 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2609 2 : .await?,
2610 2 : true
2611 2 : );
2612 2 : assert_eq!(
2613 2 : tline
2614 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2615 2 : .await?,
2616 2 : 1
2617 2 : );
2618 2 : assert_eq!(
2619 2 : tline
2620 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2621 2 : .await?,
2622 2 : 3
2623 2 : );
2624 2 :
2625 2 : // Check page contents at each LSN
2626 2 : assert_eq!(
2627 2 : tline
2628 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
2629 2 : .await?,
2630 2 : test_img("foo blk 0 at 2")
2631 2 : );
2632 2 :
2633 2 : assert_eq!(
2634 2 : tline
2635 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
2636 2 : .await?,
2637 2 : test_img("foo blk 0 at 3")
2638 2 : );
2639 2 :
2640 2 : assert_eq!(
2641 2 : tline
2642 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
2643 2 : .await?,
2644 2 : test_img("foo blk 0 at 3")
2645 2 : );
2646 2 : assert_eq!(
2647 2 : tline
2648 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
2649 2 : .await?,
2650 2 : test_img("foo blk 1 at 4")
2651 2 : );
2652 2 :
2653 2 : assert_eq!(
2654 2 : tline
2655 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
2656 2 : .await?,
2657 2 : test_img("foo blk 0 at 3")
2658 2 : );
2659 2 : assert_eq!(
2660 2 : tline
2661 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
2662 2 : .await?,
2663 2 : test_img("foo blk 1 at 4")
2664 2 : );
2665 2 : assert_eq!(
2666 2 : tline
2667 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
2668 2 : .await?,
2669 2 : test_img("foo blk 2 at 5")
2670 2 : );
2671 2 :
2672 2 : // Truncate last block
2673 2 : let mut m = tline.begin_modification(Lsn(0x60));
2674 2 : walingest
2675 2 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
2676 2 : .await?;
2677 2 : m.commit(&ctx).await?;
2678 2 : assert_current_logical_size(&tline, Lsn(0x60));
2679 2 :
2680 2 : // Check reported size and contents after truncation
2681 2 : assert_eq!(
2682 2 : tline
2683 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2684 2 : .await?,
2685 2 : 2
2686 2 : );
2687 2 : assert_eq!(
2688 2 : tline
2689 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
2690 2 : .await?,
2691 2 : test_img("foo blk 0 at 3")
2692 2 : );
2693 2 : assert_eq!(
2694 2 : tline
2695 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
2696 2 : .await?,
2697 2 : test_img("foo blk 1 at 4")
2698 2 : );
2699 2 :
2700 2 : // should still see the truncated block with older LSN
2701 2 : assert_eq!(
2702 2 : tline
2703 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2704 2 : .await?,
2705 2 : 3
2706 2 : );
2707 2 : assert_eq!(
2708 2 : tline
2709 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
2710 2 : .await?,
2711 2 : test_img("foo blk 2 at 5")
2712 2 : );
2713 2 :
2714 2 : // Truncate to zero length
2715 2 : let mut m = tline.begin_modification(Lsn(0x68));
2716 2 : walingest
2717 2 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
2718 2 : .await?;
2719 2 : m.commit(&ctx).await?;
2720 2 : assert_eq!(
2721 2 : tline
2722 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
2723 2 : .await?,
2724 2 : 0
2725 2 : );
2726 2 :
2727 2 : // Extend from 0 to 2 blocks, leaving a gap
2728 2 : let mut m = tline.begin_modification(Lsn(0x70));
2729 2 : walingest
2730 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
2731 2 : .await?;
2732 2 : m.on_record_end();
2733 2 : m.commit(&ctx).await?;
2734 2 : assert_eq!(
2735 2 : tline
2736 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
2737 2 : .await?,
2738 2 : 2
2739 2 : );
2740 2 : assert_eq!(
2741 2 : tline
2742 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
2743 2 : .await?,
2744 2 : ZERO_PAGE
2745 2 : );
2746 2 : assert_eq!(
2747 2 : tline
2748 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
2749 2 : .await?,
2750 2 : test_img("foo blk 1")
2751 2 : );
2752 2 :
2753 2 : // Extend a lot more, leaving a big gap that spans across segments
2754 2 : let mut m = tline.begin_modification(Lsn(0x80));
2755 2 : walingest
2756 2 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
2757 2 : .await?;
2758 2 : m.on_record_end();
2759 190 : m.commit(&ctx).await?;
2760 2 : assert_eq!(
2761 2 : tline
2762 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2763 2 : .await?,
2764 2 : 1501
2765 2 : );
2766 2998 : for blk in 2..1500 {
2767 2996 : assert_eq!(
2768 2996 : tline
2769 2996 : .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
2770 1540 : .await?,
2771 2996 : ZERO_PAGE
2772 2 : );
2773 2 : }
2774 2 : assert_eq!(
2775 2 : tline
2776 2 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
2777 2 : .await?,
2778 2 : test_img("foo blk 1500")
2779 2 : );
2780 2 :
2781 2 : Ok(())
2782 2 : }
2783 :
2784 : // Test what happens if we dropped a relation
2785 : // and then created it again within the same layer.
2786 : #[tokio::test]
2787 2 : async fn test_drop_extend() -> Result<()> {
2788 2 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
2789 2 : .await?
2790 2 : .load()
2791 10 : .await;
2792 2 : let tline = tenant
2793 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2794 4 : .await?;
2795 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2796 2 :
2797 2 : let mut m = tline.begin_modification(Lsn(0x20));
2798 2 : walingest
2799 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
2800 2 : .await?;
2801 2 : m.commit(&ctx).await?;
2802 2 :
2803 2 : // Check that rel exists and size is correct
2804 2 : assert_eq!(
2805 2 : tline
2806 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2807 2 : .await?,
2808 2 : true
2809 2 : );
2810 2 : assert_eq!(
2811 2 : tline
2812 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2813 2 : .await?,
2814 2 : 1
2815 2 : );
2816 2 :
2817 2 : // Drop rel
2818 2 : let mut m = tline.begin_modification(Lsn(0x30));
2819 2 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
2820 2 : m.commit(&ctx).await?;
2821 2 :
2822 2 : // Check that rel is not visible anymore
2823 2 : assert_eq!(
2824 2 : tline
2825 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
2826 2 : .await?,
2827 2 : false
2828 2 : );
2829 2 :
2830 2 : // FIXME: should fail
2831 2 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
2832 2 :
2833 2 : // Re-create it
2834 2 : let mut m = tline.begin_modification(Lsn(0x40));
2835 2 : walingest
2836 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
2837 2 : .await?;
2838 2 : m.commit(&ctx).await?;
2839 2 :
2840 2 : // Check that rel exists and size is correct
2841 2 : assert_eq!(
2842 2 : tline
2843 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2844 2 : .await?,
2845 2 : true
2846 2 : );
2847 2 : assert_eq!(
2848 2 : tline
2849 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2850 2 : .await?,
2851 2 : 1
2852 2 : );
2853 2 :
2854 2 : Ok(())
2855 2 : }
2856 :
2857 : // Test what happens if we truncated a relation
2858 : // so that one of its segments was dropped
2859 : // and then extended it again within the same layer.
2860 : #[tokio::test]
2861 2 : async fn test_truncate_extend() -> Result<()> {
2862 2 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
2863 2 : .await?
2864 2 : .load()
2865 10 : .await;
2866 2 : let tline = tenant
2867 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2868 4 : .await?;
2869 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2870 2 :
2871 2 : // Create a 20 MB relation (the size is arbitrary)
2872 2 : let relsize = 20 * 1024 * 1024 / 8192;
2873 2 : let mut m = tline.begin_modification(Lsn(0x20));
2874 5120 : for blkno in 0..relsize {
2875 5120 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
2876 5120 : walingest
2877 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2878 2 : .await?;
2879 2 : }
2880 2 : m.commit(&ctx).await?;
2881 2 :
2882 2 : // The relation was created at LSN 20, not visible at LSN 1 yet.
2883 2 : assert_eq!(
2884 2 : tline
2885 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2886 2 : .await?,
2887 2 : false
2888 2 : );
2889 2 : assert!(tline
2890 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2891 2 : .await
2892 2 : .is_err());
2893 2 :
2894 2 : assert_eq!(
2895 2 : tline
2896 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2897 2 : .await?,
2898 2 : true
2899 2 : );
2900 2 : assert_eq!(
2901 2 : tline
2902 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2903 2 : .await?,
2904 2 : relsize
2905 2 : );
2906 2 :
2907 2 : // Check relation content
2908 5120 : for blkno in 0..relsize {
2909 5120 : let lsn = Lsn(0x20);
2910 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2911 5120 : assert_eq!(
2912 5120 : tline
2913 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
2914 1803 : .await?,
2915 5120 : test_img(&data)
2916 2 : );
2917 2 : }
2918 2 :
2919 2 : // Truncate relation so that second segment was dropped
2920 2 : // - only leave one page
2921 2 : let mut m = tline.begin_modification(Lsn(0x60));
2922 2 : walingest
2923 2 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2924 2 : .await?;
2925 2 : m.commit(&ctx).await?;
2926 2 :
2927 2 : // Check reported size and contents after truncation
2928 2 : assert_eq!(
2929 2 : tline
2930 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2931 2 : .await?,
2932 2 : 1
2933 2 : );
2934 2 :
2935 4 : for blkno in 0..1 {
2936 2 : let lsn = Lsn(0x20);
2937 2 : let data = format!("foo blk {} at {}", blkno, lsn);
2938 2 : assert_eq!(
2939 2 : tline
2940 2 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
2941 2 : .await?,
2942 2 : test_img(&data)
2943 2 : );
2944 2 : }
2945 2 :
2946 2 : // should still see all blocks with older LSN
2947 2 : assert_eq!(
2948 2 : tline
2949 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2950 2 : .await?,
2951 2 : relsize
2952 2 : );
2953 5120 : for blkno in 0..relsize {
2954 5120 : let lsn = Lsn(0x20);
2955 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2956 5120 : assert_eq!(
2957 5120 : tline
2958 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
2959 1856 : .await?,
2960 5120 : test_img(&data)
2961 2 : );
2962 2 : }
2963 2 :
2964 2 : // Extend relation again.
2965 2 : // Add enough blocks to create second segment
2966 2 : let lsn = Lsn(0x80);
2967 2 : let mut m = tline.begin_modification(lsn);
2968 5120 : for blkno in 0..relsize {
2969 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2970 5120 : walingest
2971 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2972 2 : .await?;
2973 2 : }
2974 3 : m.commit(&ctx).await?;
2975 2 :
2976 2 : assert_eq!(
2977 2 : tline
2978 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2979 2 : .await?,
2980 2 : true
2981 2 : );
2982 2 : assert_eq!(
2983 2 : tline
2984 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2985 2 : .await?,
2986 2 : relsize
2987 2 : );
2988 2 : // Check relation content
2989 5120 : for blkno in 0..relsize {
2990 5120 : let lsn = Lsn(0x80);
2991 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2992 5120 : assert_eq!(
2993 5120 : tline
2994 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
2995 1828 : .await?,
2996 5120 : test_img(&data)
2997 2 : );
2998 2 : }
2999 2 :
3000 2 : Ok(())
3001 2 : }
3002 :
3003 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
3004 : /// split into multiple 1 GB segments in Postgres.
3005 : #[tokio::test]
3006 2 : async fn test_large_rel() -> Result<()> {
3007 10 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
3008 2 : let tline = tenant
3009 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
3010 4 : .await?;
3011 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
3012 2 :
3013 2 : let mut lsn = 0x10;
3014 262146 : for blknum in 0..RELSEG_SIZE + 1 {
3015 262146 : lsn += 0x10;
3016 262146 : let mut m = tline.begin_modification(Lsn(lsn));
3017 262146 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
3018 262146 : walingest
3019 262146 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
3020 5420 : .await?;
3021 262146 : m.commit(&ctx).await?;
3022 2 : }
3023 2 :
3024 2 : assert_current_logical_size(&tline, Lsn(lsn));
3025 2 :
3026 2 : assert_eq!(
3027 2 : tline
3028 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
3029 2 : .await?,
3030 2 : RELSEG_SIZE + 1
3031 2 : );
3032 2 :
3033 2 : // Truncate one block
3034 2 : lsn += 0x10;
3035 2 : let mut m = tline.begin_modification(Lsn(lsn));
3036 2 : walingest
3037 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
3038 2 : .await?;
3039 2 : m.commit(&ctx).await?;
3040 2 : assert_eq!(
3041 2 : tline
3042 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
3043 2 : .await?,
3044 2 : RELSEG_SIZE
3045 2 : );
3046 2 : assert_current_logical_size(&tline, Lsn(lsn));
3047 2 :
3048 2 : // Truncate another block
3049 2 : lsn += 0x10;
3050 2 : let mut m = tline.begin_modification(Lsn(lsn));
3051 2 : walingest
3052 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
3053 2 : .await?;
3054 2 : m.commit(&ctx).await?;
3055 2 : assert_eq!(
3056 2 : tline
3057 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
3058 2 : .await?,
3059 2 : RELSEG_SIZE - 1
3060 2 : );
3061 2 : assert_current_logical_size(&tline, Lsn(lsn));
3062 2 :
3063 2 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
3064 2 : // This tests the behavior at segment boundaries
3065 2 : let mut size: i32 = 3000;
3066 6004 : while size >= 0 {
3067 6002 : lsn += 0x10;
3068 6002 : let mut m = tline.begin_modification(Lsn(lsn));
3069 6002 : walingest
3070 6002 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
3071 2 : .await?;
3072 6002 : m.commit(&ctx).await?;
3073 6002 : assert_eq!(
3074 6002 : tline
3075 6002 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
3076 2 : .await?,
3077 6002 : size as BlockNumber
3078 2 : );
3079 2 :
3080 6002 : size -= 1;
3081 2 : }
3082 2 : assert_current_logical_size(&tline, Lsn(lsn));
3083 2 :
3084 2 : Ok(())
3085 2 : }
3086 :
3087 : /// Replay a wal segment file taken directly from safekeepers.
3088 : ///
3089 : /// This test is useful for benchmarking since it allows us to profile only
3090 : /// the walingest code in a single-threaded executor, and iterate more quickly
3091 : /// without waiting for unrelated steps.
3092 : #[tokio::test]
3093 2 : async fn test_ingest_real_wal() {
3094 2 : use crate::tenant::harness::*;
3095 2 : use postgres_ffi::waldecoder::WalStreamDecoder;
3096 2 : use postgres_ffi::WAL_SEGMENT_SIZE;
3097 2 :
3098 2 : // Define test data path and constants.
3099 2 : //
3100 2 : // Steps to reconstruct the data, if needed:
3101 2 : // 1. Run the pgbench python test
3102 2 : // 2. Take the first wal segment file from safekeeper
3103 2 : // 3. Compress it using `zstd --long input_file`
3104 2 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
3105 2 : // 5. Grep sk logs for "restart decoder" to get startpoint
3106 2 : // 6. Run just the decoder from this test to get the endpoint.
3107 2 : // It's the last LSN the decoder will output.
3108 2 : let pg_version = 15; // The test data was generated by pg15
3109 2 : let path = "test_data/sk_wal_segment_from_pgbench";
3110 2 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
3111 2 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
3112 2 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
3113 2 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
3114 2 :
3115 2 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
3116 2 : let span = harness
3117 2 : .span()
3118 2 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
3119 10 : let (tenant, ctx) = harness.load().await;
3120 2 :
3121 2 : let remote_initdb_path =
3122 2 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
3123 2 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
3124 2 :
3125 2 : std::fs::create_dir_all(initdb_path.parent().unwrap())
3126 2 : .expect("creating test dir should work");
3127 2 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
3128 2 :
3129 2 : // Bootstrap a real timeline. We can't use create_test_timeline because
3130 2 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
3131 2 : // the garbage data.
3132 2 : let tline = tenant
3133 2 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
3134 16662 : .await
3135 2 : .unwrap();
3136 2 :
3137 2 : // We fully read and decompress this into memory before decoding
3138 2 : // to get a more accurate perf profile of the decoder.
3139 2 : let bytes = {
3140 2 : use async_compression::tokio::bufread::ZstdDecoder;
3141 2 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
3142 2 : let reader = tokio::io::BufReader::new(file);
3143 2 : let decoder = ZstdDecoder::new(reader);
3144 2 : let mut reader = tokio::io::BufReader::new(decoder);
3145 2 : let mut buffer = Vec::new();
3146 224 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
3147 2 : buffer
3148 2 : };
3149 2 :
3150 2 : // TODO start a profiler too
3151 2 : let started_at = std::time::Instant::now();
3152 2 :
3153 2 : // Initialize walingest
3154 2 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
3155 2 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
3156 2 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
3157 5 : .await
3158 2 : .unwrap();
3159 2 : let mut modification = tline.begin_modification(startpoint);
3160 2 : println!("decoding {} bytes", bytes.len() - xlogoff);
3161 2 :
3162 2 : // Decode and ingest wal. We process the wal in chunks because
3163 2 : // that's what happens when we get bytes from safekeepers.
3164 474686 : for chunk in bytes[xlogoff..].chunks(50) {
3165 474686 : decoder.feed_bytes(chunk);
3166 620536 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
3167 145850 : let mut decoded = DecodedWALRecord::default();
3168 145850 : decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
3169 145850 : walingest
3170 145850 : .ingest_record(decoded, lsn, &mut modification, &ctx)
3171 145850 : .instrument(span.clone())
3172 296 : .await
3173 145850 : .unwrap();
3174 2 : }
3175 474686 : modification.commit(&ctx).await.unwrap();
3176 2 : }
3177 2 :
3178 2 : let duration = started_at.elapsed();
3179 2 : println!("done in {:?}", duration);
3180 2 : }
3181 : }
|