Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> [`wal_decoder`] -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers.
9 : //! Records get decoded and interpreted in the [`wal_decoder`] module
10 : //! and then stored to the Repository by WalIngest.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
14 : //! extracts page images out of some WAL records, but mostly it's WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_rel_wal_record or put_rel_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::collections::HashMap;
25 : use std::sync::Arc;
26 : use std::sync::OnceLock;
27 : use std::time::Duration;
28 : use std::time::Instant;
29 : use std::time::SystemTime;
30 :
31 : use anyhow::{bail, Result};
32 : use bytes::{Buf, Bytes};
33 : use tracing::*;
34 :
35 : use crate::context::RequestContext;
36 : use crate::metrics::WAL_INGEST;
37 : use crate::pgdatadir_mapping::{DatadirModification, Version};
38 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
39 : use crate::tenant::PageReconstructError;
40 : use crate::tenant::Timeline;
41 : use crate::ZERO_PAGE;
42 : use pageserver_api::key::rel_block_to_key;
43 : use pageserver_api::record::NeonWalRecord;
44 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
45 : use pageserver_api::shard::ShardIdentity;
46 : use postgres_ffi::fsm_logical_to_physical;
47 : use postgres_ffi::pg_constants;
48 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
49 : use postgres_ffi::walrecord::*;
50 : use postgres_ffi::TransactionId;
51 : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
52 : use utils::bin_ser::SerializeError;
53 : use utils::lsn::Lsn;
54 : use utils::rate_limit::RateLimit;
55 : use utils::{critical, failpoint_support};
56 : use wal_decoder::models::*;
57 :
58 : enum_pgversion! {CheckPoint, pgv::CheckPoint}
59 :
60 : impl CheckPoint {
61 12 : fn encode(&self) -> Result<Bytes, SerializeError> {
62 12 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
63 12 : }
64 :
65 291668 : fn update_next_xid(&mut self, xid: u32) -> bool {
66 291668 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
67 291668 : }
68 :
69 0 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
70 0 : enum_pgversion_dispatch!(self, CheckPoint, cp, {
71 0 : cp.update_next_multixid(multi_xid, multi_offset)
72 : })
73 0 : }
74 : }
75 :
76 : /// Temporary limitation of WAL lag warnings after attach
77 : ///
78 : /// After tenant attach, we want to limit WAL lag warnings because
79 : /// we don't look at the WAL until the attach is complete, which
80 : /// might take a while.
81 : pub struct WalLagCooldown {
82 : /// Until when should this limitation apply at all
83 : active_until: std::time::Instant,
84 : /// The maximum lag to suppress. Lags above this limit get reported anyways.
85 : max_lag: Duration,
86 : }
87 :
88 : impl WalLagCooldown {
89 0 : pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
90 0 : Self {
91 0 : active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
92 0 : max_lag: attach_duration * 2 + Duration::from_secs(60),
93 0 : }
94 0 : }
95 : }
96 :
97 : pub struct WalIngest {
98 : attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
99 : shard: ShardIdentity,
100 : checkpoint: CheckPoint,
101 : checkpoint_modified: bool,
102 : warn_ingest_lag: WarnIngestLag,
103 : }
104 :
105 : struct WarnIngestLag {
106 : lag_msg_ratelimit: RateLimit,
107 : future_lsn_msg_ratelimit: RateLimit,
108 : timestamp_invalid_msg_ratelimit: RateLimit,
109 : }
110 :
111 : impl WalIngest {
112 24 : pub async fn new(
113 24 : timeline: &Timeline,
114 24 : startpoint: Lsn,
115 24 : ctx: &RequestContext,
116 24 : ) -> anyhow::Result<WalIngest> {
117 : // Fetch the latest checkpoint into memory, so that we can compare with it
118 : // quickly in `ingest_record` and update it when it changes.
119 24 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
120 24 : let pgversion = timeline.pg_version;
121 :
122 24 : let checkpoint = dispatch_pgversion!(pgversion, {
123 0 : let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
124 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
125 0 : <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
126 : });
127 :
128 24 : Ok(WalIngest {
129 24 : shard: *timeline.get_shard_identity(),
130 24 : checkpoint,
131 24 : checkpoint_modified: false,
132 24 : attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
133 24 : warn_ingest_lag: WarnIngestLag {
134 24 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
135 24 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
136 24 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
137 24 : },
138 24 : })
139 24 : }
140 :
141 : /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
142 : /// storage of a given timeline.
143 : ///
144 : /// This function updates `lsn` field of `DatadirModification`
145 : ///
146 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
147 291704 : pub async fn ingest_record(
148 291704 : &mut self,
149 291704 : interpreted: InterpretedWalRecord,
150 291704 : modification: &mut DatadirModification<'_>,
151 291704 : ctx: &RequestContext,
152 291704 : ) -> anyhow::Result<bool> {
153 291704 : WAL_INGEST.records_received.inc();
154 291704 : let prev_len = modification.len();
155 291704 :
156 291704 : modification.set_lsn(interpreted.next_record_lsn)?;
157 :
158 291704 : if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
159 : // Records of this type should always be preceded by a commit(), as they
160 : // rely on reading data pages back from the Timeline.
161 0 : assert!(!modification.has_dirty_data());
162 291704 : }
163 :
164 291704 : assert!(!self.checkpoint_modified);
165 291704 : if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
166 291668 : && self.checkpoint.update_next_xid(interpreted.xid)
167 4 : {
168 4 : self.checkpoint_modified = true;
169 291700 : }
170 :
171 291704 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
172 :
173 132 : match interpreted.metadata_record {
174 24 : Some(MetadataRecord::Heapam(rec)) => match rec {
175 24 : HeapamRecord::ClearVmBits(clear_vm_bits) => {
176 24 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
177 24 : .await?;
178 : }
179 : },
180 0 : Some(MetadataRecord::Neonrmgr(rec)) => match rec {
181 0 : NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
182 0 : self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
183 0 : .await?;
184 : }
185 : },
186 32 : Some(MetadataRecord::Smgr(rec)) => match rec {
187 32 : SmgrRecord::Create(create) => {
188 32 : self.ingest_xlog_smgr_create(create, modification, ctx)
189 32 : .await?;
190 : }
191 0 : SmgrRecord::Truncate(truncate) => {
192 0 : self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
193 0 : .await?;
194 : }
195 : },
196 0 : Some(MetadataRecord::Dbase(rec)) => match rec {
197 0 : DbaseRecord::Create(create) => {
198 0 : self.ingest_xlog_dbase_create(create, modification, ctx)
199 0 : .await?;
200 : }
201 0 : DbaseRecord::Drop(drop) => {
202 0 : self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
203 : }
204 : },
205 0 : Some(MetadataRecord::Clog(rec)) => match rec {
206 0 : ClogRecord::ZeroPage(zero_page) => {
207 0 : self.ingest_clog_zero_page(zero_page, modification, ctx)
208 0 : .await?;
209 : }
210 0 : ClogRecord::Truncate(truncate) => {
211 0 : self.ingest_clog_truncate(truncate, modification, ctx)
212 0 : .await?;
213 : }
214 : },
215 16 : Some(MetadataRecord::Xact(rec)) => {
216 16 : self.ingest_xact_record(rec, modification, ctx).await?;
217 : }
218 0 : Some(MetadataRecord::MultiXact(rec)) => match rec {
219 0 : MultiXactRecord::ZeroPage(zero_page) => {
220 0 : self.ingest_multixact_zero_page(zero_page, modification, ctx)
221 0 : .await?;
222 : }
223 0 : MultiXactRecord::Create(create) => {
224 0 : self.ingest_multixact_create(modification, &create)?;
225 : }
226 0 : MultiXactRecord::Truncate(truncate) => {
227 0 : self.ingest_multixact_truncate(modification, &truncate, ctx)
228 0 : .await?;
229 : }
230 : },
231 0 : Some(MetadataRecord::Relmap(rec)) => match rec {
232 0 : RelmapRecord::Update(update) => {
233 0 : self.ingest_relmap_update(update, modification, ctx).await?;
234 : }
235 : },
236 60 : Some(MetadataRecord::Xlog(rec)) => match rec {
237 60 : XlogRecord::Raw(raw) => {
238 60 : self.ingest_raw_xlog_record(raw, modification, ctx).await?;
239 : }
240 : },
241 0 : Some(MetadataRecord::LogicalMessage(rec)) => match rec {
242 0 : LogicalMessageRecord::Put(put) => {
243 0 : self.ingest_logical_message_put(put, modification, ctx)
244 0 : .await?;
245 : }
246 : #[cfg(feature = "testing")]
247 : LogicalMessageRecord::Failpoint => {
248 : // This is a convenient way to make the WAL ingestion pause at
249 : // particular point in the WAL. For more fine-grained control,
250 : // we could peek into the message and only pause if it contains
251 : // a particular string, for example, but this is enough for now.
252 0 : failpoint_support::sleep_millis_async!(
253 0 : "pageserver-wal-ingest-logical-message-sleep"
254 0 : );
255 : }
256 : },
257 0 : Some(MetadataRecord::Standby(rec)) => {
258 0 : self.ingest_standby_record(rec).unwrap();
259 0 : }
260 0 : Some(MetadataRecord::Replorigin(rec)) => {
261 0 : self.ingest_replorigin_record(rec, modification).await?;
262 : }
263 291572 : None => {
264 291572 : // There are two cases through which we end up here:
265 291572 : // 1. The resource manager for the original PG WAL record
266 291572 : // is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
267 291572 : // record type within Neon.
268 291572 : // 2. The resource manager id was unknown to
269 291572 : // [`wal_decoder::decoder::MetadataRecord::from_decoded`].
270 291572 : // TODO(vlad): Tighten this up more once we build confidence
271 291572 : // that case (2) does not happen in the field.
272 291572 : }
273 : }
274 :
275 291704 : modification
276 291704 : .ingest_batch(interpreted.batch, &self.shard, ctx)
277 291704 : .await?;
278 :
279 : // If checkpoint data was updated, store the new version in the repository
280 291704 : if self.checkpoint_modified {
281 12 : let new_checkpoint_bytes = self.checkpoint.encode()?;
282 :
283 12 : modification.put_checkpoint(new_checkpoint_bytes)?;
284 12 : self.checkpoint_modified = false;
285 291692 : }
286 :
287 : // Note that at this point this record is only cached in the modification
288 : // until commit() is called to flush the data into the repository and update
289 : // the latest LSN.
290 :
291 291704 : Ok(modification.len() > prev_len)
292 291704 : }
293 :
294 : /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
295 0 : fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
296 0 : let next_full_xid =
297 0 : enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
298 :
299 0 : let next_xid = (next_full_xid) as u32;
300 0 : let mut epoch = (next_full_xid >> 32) as u32;
301 0 :
302 0 : if xid > next_xid {
303 : // Wraparound occurred, must be from a prev epoch.
304 0 : if epoch == 0 {
305 0 : bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
306 0 : }
307 0 : epoch -= 1;
308 0 : }
309 :
310 0 : Ok(((epoch as u64) << 32) | xid as u64)
311 0 : }
312 :
313 24 : async fn ingest_clear_vm_bits(
314 24 : &mut self,
315 24 : clear_vm_bits: ClearVmBits,
316 24 : modification: &mut DatadirModification<'_>,
317 24 : ctx: &RequestContext,
318 24 : ) -> anyhow::Result<()> {
319 24 : let ClearVmBits {
320 24 : new_heap_blkno,
321 24 : old_heap_blkno,
322 24 : flags,
323 24 : vm_rel,
324 24 : } = clear_vm_bits;
325 24 : // Clear the VM bits if required.
326 24 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
327 24 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
328 :
329 : // VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
330 : // its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
331 : // as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
332 : // https://github.com/neondatabase/neon/pull/10634.
333 24 : let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
334 0 : critical!("clear_vm_bits for unknown VM relation {vm_rel}");
335 0 : return Ok(());
336 : };
337 24 : if let Some(blknum) = new_vm_blk {
338 24 : if blknum >= vm_size {
339 0 : critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
340 0 : new_vm_blk = None;
341 24 : }
342 0 : }
343 24 : if let Some(blknum) = old_vm_blk {
344 0 : if blknum >= vm_size {
345 0 : critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
346 0 : old_vm_blk = None;
347 0 : }
348 24 : }
349 :
350 24 : if new_vm_blk.is_none() && old_vm_blk.is_none() {
351 0 : return Ok(());
352 24 : } else if new_vm_blk == old_vm_blk {
353 : // An UPDATE record that needs to clear the bits for both old and the new page, both of
354 : // which reside on the same VM page.
355 0 : self.put_rel_wal_record(
356 0 : modification,
357 0 : vm_rel,
358 0 : new_vm_blk.unwrap(),
359 0 : NeonWalRecord::ClearVisibilityMapFlags {
360 0 : new_heap_blkno,
361 0 : old_heap_blkno,
362 0 : flags,
363 0 : },
364 0 : ctx,
365 0 : )
366 0 : .await?;
367 : } else {
368 : // Clear VM bits for one heap page, or for two pages that reside on different VM pages.
369 24 : if let Some(new_vm_blk) = new_vm_blk {
370 24 : self.put_rel_wal_record(
371 24 : modification,
372 24 : vm_rel,
373 24 : new_vm_blk,
374 24 : NeonWalRecord::ClearVisibilityMapFlags {
375 24 : new_heap_blkno,
376 24 : old_heap_blkno: None,
377 24 : flags,
378 24 : },
379 24 : ctx,
380 24 : )
381 24 : .await?;
382 0 : }
383 24 : if let Some(old_vm_blk) = old_vm_blk {
384 0 : self.put_rel_wal_record(
385 0 : modification,
386 0 : vm_rel,
387 0 : old_vm_blk,
388 0 : NeonWalRecord::ClearVisibilityMapFlags {
389 0 : new_heap_blkno: None,
390 0 : old_heap_blkno,
391 0 : flags,
392 0 : },
393 0 : ctx,
394 0 : )
395 0 : .await?;
396 24 : }
397 : }
398 24 : Ok(())
399 24 : }
400 :
401 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
402 0 : async fn ingest_xlog_dbase_create(
403 0 : &mut self,
404 0 : create: DbaseCreate,
405 0 : modification: &mut DatadirModification<'_>,
406 0 : ctx: &RequestContext,
407 0 : ) -> anyhow::Result<()> {
408 0 : let DbaseCreate {
409 0 : db_id,
410 0 : tablespace_id,
411 0 : src_db_id,
412 0 : src_tablespace_id,
413 0 : } = create;
414 :
415 0 : let rels = modification
416 0 : .tline
417 0 : .list_rels(
418 0 : src_tablespace_id,
419 0 : src_db_id,
420 0 : Version::Modified(modification),
421 0 : ctx,
422 0 : )
423 0 : .await?;
424 :
425 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
426 :
427 : // Copy relfilemap
428 0 : let filemap = modification
429 0 : .tline
430 0 : .get_relmap_file(
431 0 : src_tablespace_id,
432 0 : src_db_id,
433 0 : Version::Modified(modification),
434 0 : ctx,
435 0 : )
436 0 : .await?;
437 0 : modification
438 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
439 0 : .await?;
440 :
441 0 : let mut num_rels_copied = 0;
442 0 : let mut num_blocks_copied = 0;
443 0 : for src_rel in rels {
444 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
445 0 : assert_eq!(src_rel.dbnode, src_db_id);
446 :
447 0 : let nblocks = modification
448 0 : .tline
449 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
450 0 : .await?;
451 0 : let dst_rel = RelTag {
452 0 : spcnode: tablespace_id,
453 0 : dbnode: db_id,
454 0 : relnode: src_rel.relnode,
455 0 : forknum: src_rel.forknum,
456 0 : };
457 0 :
458 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
459 :
460 : // Copy content
461 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
462 0 : for blknum in 0..nblocks {
463 : // Sharding:
464 : // - src and dst are always on the same shard, because they differ only by dbNode, and
465 : // dbNode is not included in the hash inputs for sharding.
466 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
467 : // that belong to it.
468 0 : let src_key = rel_block_to_key(src_rel, blknum);
469 0 : if !self.shard.is_key_local(&src_key) {
470 0 : debug!(
471 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
472 : src_key
473 : );
474 0 : continue;
475 0 : }
476 0 : debug!(
477 0 : "copying block {} from {} ({}) to {}",
478 : blknum, src_rel, src_key, dst_rel
479 : );
480 :
481 0 : let content = modification
482 0 : .tline
483 0 : .get_rel_page_at_lsn(
484 0 : src_rel,
485 0 : blknum,
486 0 : Version::Modified(modification),
487 0 : ctx,
488 0 : crate::tenant::storage_layer::IoConcurrency::sequential(),
489 0 : )
490 0 : .await?;
491 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
492 0 : num_blocks_copied += 1;
493 : }
494 :
495 0 : num_rels_copied += 1;
496 : }
497 :
498 0 : info!(
499 0 : "Created database {}/{}, copied {} blocks in {} rels",
500 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
501 : );
502 0 : Ok(())
503 0 : }
504 :
505 0 : async fn ingest_xlog_dbase_drop(
506 0 : &mut self,
507 0 : dbase_drop: DbaseDrop,
508 0 : modification: &mut DatadirModification<'_>,
509 0 : ctx: &RequestContext,
510 0 : ) -> anyhow::Result<()> {
511 0 : let DbaseDrop {
512 0 : db_id,
513 0 : tablespace_ids,
514 0 : } = dbase_drop;
515 0 : for tablespace_id in tablespace_ids {
516 0 : trace!("Drop db {}, {}", tablespace_id, db_id);
517 0 : modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
518 : }
519 :
520 0 : Ok(())
521 0 : }
522 :
523 32 : async fn ingest_xlog_smgr_create(
524 32 : &mut self,
525 32 : create: SmgrCreate,
526 32 : modification: &mut DatadirModification<'_>,
527 32 : ctx: &RequestContext,
528 32 : ) -> anyhow::Result<()> {
529 32 : let SmgrCreate { rel } = create;
530 32 : self.put_rel_creation(modification, rel, ctx).await?;
531 32 : Ok(())
532 32 : }
533 :
534 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
535 : ///
536 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
537 0 : async fn ingest_xlog_smgr_truncate(
538 0 : &mut self,
539 0 : truncate: XlSmgrTruncate,
540 0 : modification: &mut DatadirModification<'_>,
541 0 : ctx: &RequestContext,
542 0 : ) -> anyhow::Result<()> {
543 0 : let XlSmgrTruncate {
544 0 : blkno,
545 0 : rnode,
546 0 : flags,
547 0 : } = truncate;
548 0 :
549 0 : let spcnode = rnode.spcnode;
550 0 : let dbnode = rnode.dbnode;
551 0 : let relnode = rnode.relnode;
552 0 :
553 0 : if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
554 0 : let rel = RelTag {
555 0 : spcnode,
556 0 : dbnode,
557 0 : relnode,
558 0 : forknum: MAIN_FORKNUM,
559 0 : };
560 0 :
561 0 : self.put_rel_truncation(modification, rel, blkno, ctx)
562 0 : .await?;
563 0 : }
564 0 : if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
565 0 : let rel = RelTag {
566 0 : spcnode,
567 0 : dbnode,
568 0 : relnode,
569 0 : forknum: FSM_FORKNUM,
570 0 : };
571 0 :
572 0 : // Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
573 0 : // and instead of digging in the FSM bitmap format we just clear the whole page.
574 0 : let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
575 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
576 0 : if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
577 0 : && self
578 0 : .shard
579 0 : .is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
580 : {
581 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
582 0 : fsm_physical_page_no += 1;
583 0 : }
584 : // Truncate this shard's view of the FSM relation size, if it even has one.
585 0 : let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
586 0 : if nblocks > fsm_physical_page_no {
587 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
588 0 : .await?;
589 0 : }
590 0 : }
591 0 : if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
592 0 : let rel = RelTag {
593 0 : spcnode,
594 0 : dbnode,
595 0 : relnode,
596 0 : forknum: VISIBILITYMAP_FORKNUM,
597 0 : };
598 0 :
599 0 : // last remaining block, byte, and bit
600 0 : let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
601 0 : let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
602 0 : / pg_constants::VM_HEAPBLOCKS_PER_BYTE;
603 0 : let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
604 0 : * pg_constants::VM_BITS_PER_HEAPBLOCK;
605 0 :
606 0 : // Unless the new size is exactly at a visibility map page boundary, the
607 0 : // tail bits in the last remaining map page, representing truncated heap
608 0 : // blocks, need to be cleared. This is not only tidy, but also necessary
609 0 : // because we don't get a chance to clear the bits if the heap is extended
610 0 : // again. Only do this on the shard that owns the page.
611 0 : if (trunc_byte != 0 || trunc_offs != 0)
612 0 : && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
613 : {
614 0 : modification.put_rel_wal_record(
615 0 : rel,
616 0 : vm_page_no,
617 0 : NeonWalRecord::TruncateVisibilityMap {
618 0 : trunc_byte,
619 0 : trunc_offs,
620 0 : },
621 0 : )?;
622 0 : vm_page_no += 1;
623 0 : }
624 : // Truncate this shard's view of the VM relation size, if it even has one.
625 0 : let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
626 0 : if nblocks > vm_page_no {
627 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
628 0 : .await?;
629 0 : }
630 0 : }
631 0 : Ok(())
632 0 : }
633 :
634 16 : fn warn_on_ingest_lag(
635 16 : &mut self,
636 16 : conf: &crate::config::PageServerConf,
637 16 : wal_timestamp: TimestampTz,
638 16 : ) {
639 16 : debug_assert_current_span_has_tenant_and_timeline_id();
640 16 : let now = SystemTime::now();
641 16 : let rate_limits = &mut self.warn_ingest_lag;
642 :
643 16 : let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
644 0 : pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
645 : });
646 :
647 16 : match ts {
648 16 : Ok(ts) => {
649 16 : match now.duration_since(ts) {
650 16 : Ok(lag) => {
651 16 : if lag > conf.wait_lsn_timeout {
652 16 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
653 4 : if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
654 0 : if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
655 0 : return;
656 0 : }
657 4 : } else {
658 4 : // Still loading? We shouldn't be here
659 4 : }
660 4 : let lag = humantime::format_duration(lag);
661 4 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
662 16 : })
663 0 : }
664 : }
665 0 : Err(e) => {
666 0 : let delta_t = e.duration();
667 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
668 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
669 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
670 0 : if delta_t > IGNORED_DRIFT {
671 0 : let delta_t = humantime::format_duration(delta_t);
672 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
673 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
674 0 : })
675 0 : }
676 : }
677 : };
678 : }
679 0 : Err(error) => {
680 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
681 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
682 0 : })
683 : }
684 : }
685 16 : }
686 :
687 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
688 : ///
689 16 : async fn ingest_xact_record(
690 16 : &mut self,
691 16 : record: XactRecord,
692 16 : modification: &mut DatadirModification<'_>,
693 16 : ctx: &RequestContext,
694 16 : ) -> anyhow::Result<()> {
695 16 : let (xact_common, is_commit, is_prepared) = match record {
696 0 : XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
697 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
698 0 : self.adjust_to_full_transaction_id(xl_xid)?
699 : } else {
700 0 : xl_xid as u64
701 : };
702 0 : return modification.put_twophase_file(xid, data, ctx).await;
703 : }
704 16 : XactRecord::Commit(common) => (common, true, false),
705 0 : XactRecord::Abort(common) => (common, false, false),
706 0 : XactRecord::CommitPrepared(common) => (common, true, true),
707 0 : XactRecord::AbortPrepared(common) => (common, false, true),
708 : };
709 :
710 : let XactCommon {
711 16 : parsed,
712 16 : origin_id,
713 16 : xl_xid,
714 16 : lsn,
715 16 : } = xact_common;
716 16 :
717 16 : // Record update of CLOG pages
718 16 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
719 16 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
720 16 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
721 16 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
722 16 :
723 16 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
724 :
725 16 : for subxact in &parsed.subxacts {
726 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
727 0 : if subxact_pageno != pageno {
728 : // This subxact goes to different page. Write the record
729 : // for all the XIDs on the previous page, and continue
730 : // accumulating XIDs on this new page.
731 0 : modification.put_slru_wal_record(
732 0 : SlruKind::Clog,
733 0 : segno,
734 0 : rpageno,
735 0 : if is_commit {
736 0 : NeonWalRecord::ClogSetCommitted {
737 0 : xids: page_xids,
738 0 : timestamp: parsed.xact_time,
739 0 : }
740 : } else {
741 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
742 : },
743 0 : )?;
744 0 : page_xids = Vec::new();
745 0 : }
746 0 : pageno = subxact_pageno;
747 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
748 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
749 0 : page_xids.push(*subxact);
750 : }
751 16 : modification.put_slru_wal_record(
752 16 : SlruKind::Clog,
753 16 : segno,
754 16 : rpageno,
755 16 : if is_commit {
756 16 : NeonWalRecord::ClogSetCommitted {
757 16 : xids: page_xids,
758 16 : timestamp: parsed.xact_time,
759 16 : }
760 : } else {
761 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
762 : },
763 0 : )?;
764 :
765 : // Group relations to drop by dbNode. This map will contain all relations that _might_
766 : // exist, we will reduce it to which ones really exist later. This map can be huge if
767 : // the transaction touches a huge number of relations (there is no bound on this in
768 : // postgres).
769 16 : let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
770 :
771 16 : for xnode in &parsed.xnodes {
772 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
773 0 : let rel = RelTag {
774 0 : forknum,
775 0 : spcnode: xnode.spcnode,
776 0 : dbnode: xnode.dbnode,
777 0 : relnode: xnode.relnode,
778 0 : };
779 0 : drop_relations
780 0 : .entry((xnode.spcnode, xnode.dbnode))
781 0 : .or_default()
782 0 : .push(rel);
783 0 : }
784 : }
785 :
786 : // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
787 16 : modification.put_rel_drops(drop_relations, ctx).await?;
788 :
789 16 : if origin_id != 0 {
790 0 : modification
791 0 : .set_replorigin(origin_id, parsed.origin_lsn)
792 0 : .await?;
793 16 : }
794 :
795 16 : if is_prepared {
796 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
797 0 : trace!(
798 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
799 : xl_xid,
800 : parsed.xid,
801 : lsn,
802 : );
803 :
804 0 : let xid: u64 = if modification.tline.pg_version >= 17 {
805 0 : self.adjust_to_full_transaction_id(parsed.xid)?
806 : } else {
807 0 : parsed.xid as u64
808 : };
809 0 : modification.drop_twophase_file(xid, ctx).await?;
810 16 : }
811 :
812 16 : Ok(())
813 16 : }
814 :
815 0 : async fn ingest_clog_truncate(
816 0 : &mut self,
817 0 : truncate: ClogTruncate,
818 0 : modification: &mut DatadirModification<'_>,
819 0 : ctx: &RequestContext,
820 0 : ) -> anyhow::Result<()> {
821 0 : let ClogTruncate {
822 0 : pageno,
823 0 : oldest_xid,
824 0 : oldest_xid_db,
825 0 : } = truncate;
826 0 :
827 0 : info!(
828 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
829 : pageno, oldest_xid, oldest_xid_db
830 : );
831 :
832 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
833 : // truncated, but a checkpoint record with the updated values isn't written until
834 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
835 : // so we keep the oldestXid and oldestXidDB up-to-date.
836 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
837 0 : cp.oldestXid = oldest_xid;
838 0 : cp.oldestXidDB = oldest_xid_db;
839 0 : });
840 0 : self.checkpoint_modified = true;
841 :
842 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
843 :
844 0 : let latest_page_number =
845 0 : enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
846 : / pg_constants::CLOG_XACTS_PER_PAGE;
847 :
848 : // Now delete all segments containing pages between xlrec.pageno
849 : // and latest_page_number.
850 :
851 : // First, make an important safety check:
852 : // the current endpoint page must not be eligible for removal.
853 : // See SimpleLruTruncate() in slru.c
854 0 : if dispatch_pgversion!(modification.tline.pg_version, {
855 0 : pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
856 : }) {
857 0 : info!("could not truncate directory pg_xact apparent wraparound");
858 0 : return Ok(());
859 0 : }
860 0 :
861 0 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
862 0 : //
863 0 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
864 0 : // will block waiting for the last valid LSN to advance up to
865 0 : // it. So we use the previous record's LSN in the get calls
866 0 : // instead.
867 0 : if modification.tline.get_shard_identity().is_shard_zero() {
868 0 : for segno in modification
869 0 : .tline
870 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
871 0 : .await?
872 : {
873 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
874 :
875 0 : let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
876 0 : pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
877 : });
878 :
879 0 : if may_delete {
880 0 : modification
881 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
882 0 : .await?;
883 0 : trace!("Drop CLOG segment {:>04X}", segno);
884 0 : }
885 : }
886 0 : }
887 :
888 0 : Ok(())
889 0 : }
890 :
891 0 : async fn ingest_clog_zero_page(
892 0 : &mut self,
893 0 : zero_page: ClogZeroPage,
894 0 : modification: &mut DatadirModification<'_>,
895 0 : ctx: &RequestContext,
896 0 : ) -> anyhow::Result<()> {
897 0 : let ClogZeroPage { segno, rpageno } = zero_page;
898 0 :
899 0 : self.put_slru_page_image(
900 0 : modification,
901 0 : SlruKind::Clog,
902 0 : segno,
903 0 : rpageno,
904 0 : ZERO_PAGE.clone(),
905 0 : ctx,
906 0 : )
907 0 : .await
908 0 : }
909 :
910 0 : fn ingest_multixact_create(
911 0 : &mut self,
912 0 : modification: &mut DatadirModification,
913 0 : xlrec: &XlMultiXactCreate,
914 0 : ) -> Result<()> {
915 0 : // Create WAL record for updating the multixact-offsets page
916 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
917 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
918 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
919 0 :
920 0 : modification.put_slru_wal_record(
921 0 : SlruKind::MultiXactOffsets,
922 0 : segno,
923 0 : rpageno,
924 0 : NeonWalRecord::MultixactOffsetCreate {
925 0 : mid: xlrec.mid,
926 0 : moff: xlrec.moff,
927 0 : },
928 0 : )?;
929 :
930 : // Create WAL records for the update of each affected multixact-members page
931 0 : let mut members = xlrec.members.iter();
932 0 : let mut offset = xlrec.moff;
933 : loop {
934 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
935 0 :
936 0 : // How many members fit on this page?
937 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
938 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
939 0 :
940 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
941 0 : for _ in 0..page_remain {
942 0 : if let Some(m) = members.next() {
943 0 : this_page_members.push(m.clone());
944 0 : } else {
945 0 : break;
946 : }
947 : }
948 0 : if this_page_members.is_empty() {
949 : // all done
950 0 : break;
951 0 : }
952 0 : let n_this_page = this_page_members.len();
953 0 :
954 0 : modification.put_slru_wal_record(
955 0 : SlruKind::MultiXactMembers,
956 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
957 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
958 0 : NeonWalRecord::MultixactMembersCreate {
959 0 : moff: offset,
960 0 : members: this_page_members,
961 0 : },
962 0 : )?;
963 :
964 : // Note: The multixact members can wrap around, even within one WAL record.
965 0 : offset = offset.wrapping_add(n_this_page as u32);
966 : }
967 0 : let next_offset = offset;
968 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
969 :
970 : // Update next-multi-xid and next-offset
971 : //
972 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
973 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
974 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
975 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
976 : // is stored, so it's never 0 in a checkpoint.
977 : //
978 : // I don't know why it's done that way, it seems less error-prone to skip over 0
979 : // when the value is stored rather than when it's read. But let's do it the same
980 : // way here.
981 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
982 0 :
983 0 : if self
984 0 : .checkpoint
985 0 : .update_next_multixid(next_multi_xid, next_offset)
986 0 : {
987 0 : self.checkpoint_modified = true;
988 0 : }
989 :
990 : // Also update the next-xid with the highest member. According to the comments in
991 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
992 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
993 0 : if let Some(max_xid) = acc {
994 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
995 0 : Some(mbr.xid)
996 : } else {
997 0 : acc
998 : }
999 : } else {
1000 0 : Some(mbr.xid)
1001 : }
1002 0 : });
1003 :
1004 0 : if let Some(max_xid) = max_mbr_xid {
1005 0 : if self.checkpoint.update_next_xid(max_xid) {
1006 0 : self.checkpoint_modified = true;
1007 0 : }
1008 0 : }
1009 0 : Ok(())
1010 0 : }
1011 :
1012 0 : async fn ingest_multixact_truncate(
1013 0 : &mut self,
1014 0 : modification: &mut DatadirModification<'_>,
1015 0 : xlrec: &XlMultiXactTruncate,
1016 0 : ctx: &RequestContext,
1017 0 : ) -> Result<()> {
1018 0 : let (maxsegment, startsegment, endsegment) =
1019 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1020 0 : cp.oldestMulti = xlrec.end_trunc_off;
1021 0 : cp.oldestMultiDB = xlrec.oldest_multi_db;
1022 0 : let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
1023 0 : pg_constants::MAX_MULTIXACT_OFFSET,
1024 0 : );
1025 0 : let startsegment: i32 =
1026 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
1027 0 : let endsegment: i32 =
1028 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
1029 0 : (maxsegment, startsegment, endsegment)
1030 : });
1031 :
1032 0 : self.checkpoint_modified = true;
1033 0 :
1034 0 : // PerformMembersTruncation
1035 0 : let mut segment: i32 = startsegment;
1036 0 :
1037 0 : // Delete all the segments except the last one. The last segment can still
1038 0 : // contain, possibly partially, valid data.
1039 0 : if modification.tline.get_shard_identity().is_shard_zero() {
1040 0 : while segment != endsegment {
1041 0 : modification
1042 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1043 0 : .await?;
1044 :
1045 : /* move to next segment, handling wraparound correctly */
1046 0 : if segment == maxsegment {
1047 0 : segment = 0;
1048 0 : } else {
1049 0 : segment += 1;
1050 0 : }
1051 : }
1052 0 : }
1053 :
1054 : // Truncate offsets
1055 : // FIXME: this did not handle wraparound correctly
1056 :
1057 0 : Ok(())
1058 0 : }
1059 :
1060 0 : async fn ingest_multixact_zero_page(
1061 0 : &mut self,
1062 0 : zero_page: MultiXactZeroPage,
1063 0 : modification: &mut DatadirModification<'_>,
1064 0 : ctx: &RequestContext,
1065 0 : ) -> Result<()> {
1066 0 : let MultiXactZeroPage {
1067 0 : slru_kind,
1068 0 : segno,
1069 0 : rpageno,
1070 0 : } = zero_page;
1071 0 : self.put_slru_page_image(
1072 0 : modification,
1073 0 : slru_kind,
1074 0 : segno,
1075 0 : rpageno,
1076 0 : ZERO_PAGE.clone(),
1077 0 : ctx,
1078 0 : )
1079 0 : .await
1080 0 : }
1081 :
1082 0 : async fn ingest_relmap_update(
1083 0 : &mut self,
1084 0 : update: RelmapUpdate,
1085 0 : modification: &mut DatadirModification<'_>,
1086 0 : ctx: &RequestContext,
1087 0 : ) -> Result<()> {
1088 0 : let RelmapUpdate { update, buf } = update;
1089 0 :
1090 0 : modification
1091 0 : .put_relmap_file(update.tsid, update.dbid, buf, ctx)
1092 0 : .await
1093 0 : }
1094 :
1095 60 : async fn ingest_raw_xlog_record(
1096 60 : &mut self,
1097 60 : raw_record: RawXlogRecord,
1098 60 : modification: &mut DatadirModification<'_>,
1099 60 : ctx: &RequestContext,
1100 60 : ) -> Result<()> {
1101 60 : let RawXlogRecord { info, lsn, mut buf } = raw_record;
1102 60 : let pg_version = modification.tline.pg_version;
1103 60 :
1104 60 : if info == pg_constants::XLOG_PARAMETER_CHANGE {
1105 4 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1106 0 : let rec = v17::XlParameterChange::decode(&mut buf);
1107 0 : cp.wal_level = rec.wal_level;
1108 0 : self.checkpoint_modified = true;
1109 4 : }
1110 56 : } else if info == pg_constants::XLOG_END_OF_RECOVERY {
1111 0 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
1112 0 : let rec = v17::XlEndOfRecovery::decode(&mut buf);
1113 0 : cp.wal_level = rec.wal_level;
1114 0 : self.checkpoint_modified = true;
1115 0 : }
1116 56 : }
1117 :
1118 60 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1119 0 : if info == pg_constants::XLOG_NEXTOID {
1120 0 : let next_oid = buf.get_u32_le();
1121 0 : if cp.nextOid != next_oid {
1122 0 : cp.nextOid = next_oid;
1123 0 : self.checkpoint_modified = true;
1124 0 : }
1125 0 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
1126 0 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1127 : {
1128 0 : let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
1129 0 : buf.copy_to_slice(&mut checkpoint_bytes);
1130 0 : let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
1131 0 : trace!(
1132 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
1133 : xlog_checkpoint.oldestXid,
1134 : cp.oldestXid
1135 : );
1136 0 : if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
1137 0 : cp.oldestXid = xlog_checkpoint.oldestXid;
1138 0 : }
1139 0 : trace!(
1140 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
1141 : xlog_checkpoint.oldestActiveXid,
1142 : cp.oldestActiveXid
1143 : );
1144 :
1145 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
1146 : // because at shutdown, all in-progress transactions will implicitly
1147 : // end. Postgres startup code knows that, and allows hot standby to start
1148 : // immediately from a shutdown checkpoint.
1149 : //
1150 : // In Neon, Postgres hot standby startup always behaves as if starting from
1151 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
1152 : // instead of overwriting self.checkpoint.oldestActiveXid with
1153 : // InvalidTransactionid from the checkpoint WAL record, update it to a
1154 : // proper value, knowing that there are no in-progress transactions at this
1155 : // point, except for prepared transactions.
1156 : //
1157 : // See also the neon code changes in the InitWalRecovery() function.
1158 0 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
1159 0 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
1160 : {
1161 0 : let oldest_active_xid = if pg_version >= 17 {
1162 0 : let mut oldest_active_full_xid = cp.nextXid.value;
1163 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1164 0 : if xid < oldest_active_full_xid {
1165 0 : oldest_active_full_xid = xid;
1166 0 : }
1167 : }
1168 0 : oldest_active_full_xid as u32
1169 : } else {
1170 0 : let mut oldest_active_xid = cp.nextXid.value as u32;
1171 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
1172 0 : let narrow_xid = xid as u32;
1173 0 : if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
1174 0 : oldest_active_xid = narrow_xid;
1175 0 : }
1176 : }
1177 0 : oldest_active_xid
1178 : };
1179 0 : cp.oldestActiveXid = oldest_active_xid;
1180 0 : } else {
1181 0 : cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
1182 0 : }
1183 :
1184 : // Write a new checkpoint key-value pair on every checkpoint record, even
1185 : // if nothing really changed. Not strictly required, but it seems nice to
1186 : // have some trace of the checkpoint records in the layer files at the same
1187 : // LSNs.
1188 0 : self.checkpoint_modified = true;
1189 0 : }
1190 : });
1191 :
1192 60 : Ok(())
1193 60 : }
1194 :
1195 0 : async fn ingest_logical_message_put(
1196 0 : &mut self,
1197 0 : put: PutLogicalMessage,
1198 0 : modification: &mut DatadirModification<'_>,
1199 0 : ctx: &RequestContext,
1200 0 : ) -> Result<()> {
1201 0 : let PutLogicalMessage { path, buf } = put;
1202 0 : modification.put_file(path.as_str(), &buf, ctx).await
1203 0 : }
1204 :
1205 0 : fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
1206 0 : match record {
1207 0 : StandbyRecord::RunningXacts(running_xacts) => {
1208 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1209 0 : cp.oldestActiveXid = running_xacts.oldest_running_xid;
1210 0 : });
1211 :
1212 0 : self.checkpoint_modified = true;
1213 0 : }
1214 0 : }
1215 0 :
1216 0 : Ok(())
1217 0 : }
1218 :
1219 0 : async fn ingest_replorigin_record(
1220 0 : &mut self,
1221 0 : record: ReploriginRecord,
1222 0 : modification: &mut DatadirModification<'_>,
1223 0 : ) -> Result<()> {
1224 0 : match record {
1225 0 : ReploriginRecord::Set(set) => {
1226 0 : modification
1227 0 : .set_replorigin(set.node_id, set.remote_lsn)
1228 0 : .await?;
1229 : }
1230 0 : ReploriginRecord::Drop(drop) => {
1231 0 : modification.drop_replorigin(drop.node_id).await?;
1232 : }
1233 : }
1234 :
1235 0 : Ok(())
1236 0 : }
1237 :
1238 36 : async fn put_rel_creation(
1239 36 : &mut self,
1240 36 : modification: &mut DatadirModification<'_>,
1241 36 : rel: RelTag,
1242 36 : ctx: &RequestContext,
1243 36 : ) -> Result<()> {
1244 36 : modification.put_rel_creation(rel, 0, ctx).await?;
1245 36 : Ok(())
1246 36 : }
1247 :
1248 : #[cfg(test)]
1249 544804 : async fn put_rel_page_image(
1250 544804 : &mut self,
1251 544804 : modification: &mut DatadirModification<'_>,
1252 544804 : rel: RelTag,
1253 544804 : blknum: BlockNumber,
1254 544804 : img: Bytes,
1255 544804 : ctx: &RequestContext,
1256 544804 : ) -> Result<(), PageReconstructError> {
1257 544804 : self.handle_rel_extend(modification, rel, blknum, ctx)
1258 544804 : .await?;
1259 544804 : modification.put_rel_page_image(rel, blknum, img)?;
1260 544804 : Ok(())
1261 544804 : }
1262 :
1263 24 : async fn put_rel_wal_record(
1264 24 : &mut self,
1265 24 : modification: &mut DatadirModification<'_>,
1266 24 : rel: RelTag,
1267 24 : blknum: BlockNumber,
1268 24 : rec: NeonWalRecord,
1269 24 : ctx: &RequestContext,
1270 24 : ) -> Result<()> {
1271 24 : self.handle_rel_extend(modification, rel, blknum, ctx)
1272 24 : .await?;
1273 24 : modification.put_rel_wal_record(rel, blknum, rec)?;
1274 24 : Ok(())
1275 24 : }
1276 :
1277 12024 : async fn put_rel_truncation(
1278 12024 : &mut self,
1279 12024 : modification: &mut DatadirModification<'_>,
1280 12024 : rel: RelTag,
1281 12024 : nblocks: BlockNumber,
1282 12024 : ctx: &RequestContext,
1283 12024 : ) -> anyhow::Result<()> {
1284 12024 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1285 12024 : Ok(())
1286 12024 : }
1287 :
1288 544828 : async fn handle_rel_extend(
1289 544828 : &mut self,
1290 544828 : modification: &mut DatadirModification<'_>,
1291 544828 : rel: RelTag,
1292 544828 : blknum: BlockNumber,
1293 544828 : ctx: &RequestContext,
1294 544828 : ) -> Result<(), PageReconstructError> {
1295 544828 : let new_nblocks = blknum + 1;
1296 : // Check if the relation exists. We implicitly create relations on first
1297 : // record.
1298 544828 : let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
1299 :
1300 544828 : if new_nblocks > old_nblocks {
1301 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1302 544796 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1303 :
1304 544796 : let mut key = rel_block_to_key(rel, blknum);
1305 544796 :
1306 544796 : // fill the gap with zeros
1307 544796 : let mut gap_blocks_filled: u64 = 0;
1308 544796 : for gap_blknum in old_nblocks..blknum {
1309 5996 : key.field6 = gap_blknum;
1310 5996 :
1311 5996 : if self.shard.get_shard_number(&key) != self.shard.number {
1312 0 : continue;
1313 5996 : }
1314 5996 :
1315 5996 : modification.put_rel_page_image_zero(rel, gap_blknum)?;
1316 5996 : gap_blocks_filled += 1;
1317 : }
1318 :
1319 544796 : WAL_INGEST
1320 544796 : .gap_blocks_zeroed_on_rel_extend
1321 544796 : .inc_by(gap_blocks_filled);
1322 544796 :
1323 544796 : // Log something when relation extends cause use to fill gaps
1324 544796 : // with zero pages. Logging is rate limited per pg version to
1325 544796 : // avoid skewing.
1326 544796 : if gap_blocks_filled > 0 {
1327 : use once_cell::sync::Lazy;
1328 : use std::sync::Mutex;
1329 : use utils::rate_limit::RateLimit;
1330 :
1331 : struct RateLimitPerPgVersion {
1332 : rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
1333 : }
1334 :
1335 : impl RateLimitPerPgVersion {
1336 0 : const fn new() -> Self {
1337 0 : Self {
1338 0 : rate_limiters: [const {
1339 4 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
1340 0 : }; 4],
1341 0 : }
1342 0 : }
1343 :
1344 8 : const fn rate_limiter(
1345 8 : &self,
1346 8 : pg_version: u32,
1347 8 : ) -> Option<&Lazy<Mutex<RateLimit>>> {
1348 : const MIN_PG_VERSION: u32 = 14;
1349 : const MAX_PG_VERSION: u32 = 17;
1350 :
1351 8 : if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
1352 0 : return None;
1353 8 : }
1354 8 :
1355 8 : Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
1356 8 : }
1357 : }
1358 :
1359 : static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
1360 8 : if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
1361 8 : if let Ok(mut locked) = rate_limiter.try_lock() {
1362 8 : locked.call(|| {
1363 4 : info!(
1364 0 : lsn=%modification.get_lsn(),
1365 0 : pg_version=%modification.tline.pg_version,
1366 0 : rel=%rel,
1367 0 : "Filled {} gap blocks on rel extend to {} from {}",
1368 : gap_blocks_filled,
1369 : new_nblocks,
1370 : old_nblocks);
1371 8 : });
1372 8 : }
1373 0 : }
1374 544788 : }
1375 32 : }
1376 544828 : Ok(())
1377 544828 : }
1378 :
1379 0 : async fn put_slru_page_image(
1380 0 : &mut self,
1381 0 : modification: &mut DatadirModification<'_>,
1382 0 : kind: SlruKind,
1383 0 : segno: u32,
1384 0 : blknum: BlockNumber,
1385 0 : img: Bytes,
1386 0 : ctx: &RequestContext,
1387 0 : ) -> Result<()> {
1388 0 : if !self.shard.is_shard_zero() {
1389 0 : return Ok(());
1390 0 : }
1391 0 :
1392 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1393 0 : .await?;
1394 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1395 0 : Ok(())
1396 0 : }
1397 :
1398 0 : async fn handle_slru_extend(
1399 0 : &mut self,
1400 0 : modification: &mut DatadirModification<'_>,
1401 0 : kind: SlruKind,
1402 0 : segno: u32,
1403 0 : blknum: BlockNumber,
1404 0 : ctx: &RequestContext,
1405 0 : ) -> anyhow::Result<()> {
1406 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1407 0 : // extended with ZEROPAGE records, not with commit records, so it happens
1408 0 : // a lot less frequently.
1409 0 :
1410 0 : let new_nblocks = blknum + 1;
1411 : // Check if the relation exists. We implicitly create relations on first
1412 : // record.
1413 : // TODO: would be nice if to be more explicit about it
1414 0 : let old_nblocks = if !modification
1415 0 : .tline
1416 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1417 0 : .await?
1418 : {
1419 : // create it with 0 size initially, the logic below will extend it
1420 0 : modification
1421 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1422 0 : .await?;
1423 0 : 0
1424 : } else {
1425 0 : modification
1426 0 : .tline
1427 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1428 0 : .await?
1429 : };
1430 :
1431 0 : if new_nblocks > old_nblocks {
1432 0 : trace!(
1433 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1434 : kind,
1435 : segno,
1436 : old_nblocks,
1437 : new_nblocks
1438 : );
1439 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1440 :
1441 : // fill the gap with zeros
1442 0 : for gap_blknum in old_nblocks..blknum {
1443 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
1444 : }
1445 0 : }
1446 0 : Ok(())
1447 0 : }
1448 : }
1449 :
1450 : /// Returns the size of the relation as of this modification, or None if the relation doesn't exist.
1451 : ///
1452 : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
1453 : /// page number stored in the shard, or None if the shard does not have any pages for it.
1454 24 : async fn get_relsize(
1455 24 : modification: &DatadirModification<'_>,
1456 24 : rel: RelTag,
1457 24 : ctx: &RequestContext,
1458 24 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
1459 24 : if !modification
1460 24 : .tline
1461 24 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1462 24 : .await?
1463 : {
1464 0 : return Ok(None);
1465 24 : }
1466 24 : modification
1467 24 : .tline
1468 24 : .get_rel_size(rel, Version::Modified(modification), ctx)
1469 24 : .await
1470 24 : .map(Some)
1471 24 : }
1472 :
1473 : #[allow(clippy::bool_assert_comparison)]
1474 : #[cfg(test)]
1475 : mod tests {
1476 : use super::*;
1477 : use crate::tenant::harness::*;
1478 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
1479 : use crate::tenant::storage_layer::IoConcurrency;
1480 : use postgres_ffi::RELSEG_SIZE;
1481 :
1482 : use crate::DEFAULT_PG_VERSION;
1483 :
1484 : /// Arbitrary relation tag, for testing.
1485 : const TESTREL_A: RelTag = RelTag {
1486 : spcnode: 0,
1487 : dbnode: 111,
1488 : relnode: 1000,
1489 : forknum: 0,
1490 : };
1491 :
1492 24 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1493 24 : // TODO
1494 24 : }
1495 :
1496 : #[tokio::test]
1497 4 : async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
1498 16 : for i in 14..=16 {
1499 12 : dispatch_pgversion!(i, {
1500 4 : pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
1501 4 : });
1502 4 : }
1503 4 :
1504 4 : Ok(())
1505 4 : }
1506 :
1507 16 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1508 16 : let mut m = tline.begin_modification(Lsn(0x10));
1509 16 : m.put_checkpoint(dispatch_pgversion!(
1510 16 : tline.pg_version,
1511 0 : pgv::ZERO_CHECKPOINT.clone()
1512 0 : ))?;
1513 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1514 16 : m.commit(ctx).await?;
1515 16 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1516 :
1517 16 : Ok(walingest)
1518 16 : }
1519 :
1520 : #[tokio::test]
1521 4 : async fn test_relsize() -> Result<()> {
1522 4 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
1523 4 : let io_concurrency = IoConcurrency::spawn_for_test();
1524 4 : let tline = tenant
1525 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1526 4 : .await?;
1527 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1528 4 :
1529 4 : let mut m = tline.begin_modification(Lsn(0x20));
1530 4 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1531 4 : walingest
1532 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1533 4 : .await?;
1534 4 : m.commit(&ctx).await?;
1535 4 : let mut m = tline.begin_modification(Lsn(0x30));
1536 4 : walingest
1537 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
1538 4 : .await?;
1539 4 : m.commit(&ctx).await?;
1540 4 : let mut m = tline.begin_modification(Lsn(0x40));
1541 4 : walingest
1542 4 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
1543 4 : .await?;
1544 4 : m.commit(&ctx).await?;
1545 4 : let mut m = tline.begin_modification(Lsn(0x50));
1546 4 : walingest
1547 4 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
1548 4 : .await?;
1549 4 : m.commit(&ctx).await?;
1550 4 :
1551 4 : assert_current_logical_size(&tline, Lsn(0x50));
1552 4 :
1553 4 : let test_span = tracing::info_span!(parent: None, "test",
1554 4 : tenant_id=%tline.tenant_shard_id.tenant_id,
1555 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
1556 0 : timeline_id=%tline.timeline_id);
1557 4 :
1558 4 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1559 4 : assert_eq!(
1560 4 : tline
1561 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1562 4 : .await?,
1563 4 : false
1564 4 : );
1565 4 : assert!(tline
1566 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1567 4 : .await
1568 4 : .is_err());
1569 4 : assert_eq!(
1570 4 : tline
1571 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1572 4 : .await?,
1573 4 : true
1574 4 : );
1575 4 : assert_eq!(
1576 4 : tline
1577 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1578 4 : .await?,
1579 4 : 1
1580 4 : );
1581 4 : assert_eq!(
1582 4 : tline
1583 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1584 4 : .await?,
1585 4 : 3
1586 4 : );
1587 4 :
1588 4 : // Check page contents at each LSN
1589 4 : assert_eq!(
1590 4 : tline
1591 4 : .get_rel_page_at_lsn(
1592 4 : TESTREL_A,
1593 4 : 0,
1594 4 : Version::Lsn(Lsn(0x20)),
1595 4 : &ctx,
1596 4 : io_concurrency.clone()
1597 4 : )
1598 4 : .instrument(test_span.clone())
1599 4 : .await?,
1600 4 : test_img("foo blk 0 at 2")
1601 4 : );
1602 4 :
1603 4 : assert_eq!(
1604 4 : tline
1605 4 : .get_rel_page_at_lsn(
1606 4 : TESTREL_A,
1607 4 : 0,
1608 4 : Version::Lsn(Lsn(0x30)),
1609 4 : &ctx,
1610 4 : io_concurrency.clone()
1611 4 : )
1612 4 : .instrument(test_span.clone())
1613 4 : .await?,
1614 4 : test_img("foo blk 0 at 3")
1615 4 : );
1616 4 :
1617 4 : assert_eq!(
1618 4 : tline
1619 4 : .get_rel_page_at_lsn(
1620 4 : TESTREL_A,
1621 4 : 0,
1622 4 : Version::Lsn(Lsn(0x40)),
1623 4 : &ctx,
1624 4 : io_concurrency.clone()
1625 4 : )
1626 4 : .instrument(test_span.clone())
1627 4 : .await?,
1628 4 : test_img("foo blk 0 at 3")
1629 4 : );
1630 4 : assert_eq!(
1631 4 : tline
1632 4 : .get_rel_page_at_lsn(
1633 4 : TESTREL_A,
1634 4 : 1,
1635 4 : Version::Lsn(Lsn(0x40)),
1636 4 : &ctx,
1637 4 : io_concurrency.clone()
1638 4 : )
1639 4 : .instrument(test_span.clone())
1640 4 : .await?,
1641 4 : test_img("foo blk 1 at 4")
1642 4 : );
1643 4 :
1644 4 : assert_eq!(
1645 4 : tline
1646 4 : .get_rel_page_at_lsn(
1647 4 : TESTREL_A,
1648 4 : 0,
1649 4 : Version::Lsn(Lsn(0x50)),
1650 4 : &ctx,
1651 4 : io_concurrency.clone()
1652 4 : )
1653 4 : .instrument(test_span.clone())
1654 4 : .await?,
1655 4 : test_img("foo blk 0 at 3")
1656 4 : );
1657 4 : assert_eq!(
1658 4 : tline
1659 4 : .get_rel_page_at_lsn(
1660 4 : TESTREL_A,
1661 4 : 1,
1662 4 : Version::Lsn(Lsn(0x50)),
1663 4 : &ctx,
1664 4 : io_concurrency.clone()
1665 4 : )
1666 4 : .instrument(test_span.clone())
1667 4 : .await?,
1668 4 : test_img("foo blk 1 at 4")
1669 4 : );
1670 4 : assert_eq!(
1671 4 : tline
1672 4 : .get_rel_page_at_lsn(
1673 4 : TESTREL_A,
1674 4 : 2,
1675 4 : Version::Lsn(Lsn(0x50)),
1676 4 : &ctx,
1677 4 : io_concurrency.clone()
1678 4 : )
1679 4 : .instrument(test_span.clone())
1680 4 : .await?,
1681 4 : test_img("foo blk 2 at 5")
1682 4 : );
1683 4 :
1684 4 : // Truncate last block
1685 4 : let mut m = tline.begin_modification(Lsn(0x60));
1686 4 : walingest
1687 4 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1688 4 : .await?;
1689 4 : m.commit(&ctx).await?;
1690 4 : assert_current_logical_size(&tline, Lsn(0x60));
1691 4 :
1692 4 : // Check reported size and contents after truncation
1693 4 : assert_eq!(
1694 4 : tline
1695 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
1696 4 : .await?,
1697 4 : 2
1698 4 : );
1699 4 : assert_eq!(
1700 4 : tline
1701 4 : .get_rel_page_at_lsn(
1702 4 : TESTREL_A,
1703 4 : 0,
1704 4 : Version::Lsn(Lsn(0x60)),
1705 4 : &ctx,
1706 4 : io_concurrency.clone()
1707 4 : )
1708 4 : .instrument(test_span.clone())
1709 4 : .await?,
1710 4 : test_img("foo blk 0 at 3")
1711 4 : );
1712 4 : assert_eq!(
1713 4 : tline
1714 4 : .get_rel_page_at_lsn(
1715 4 : TESTREL_A,
1716 4 : 1,
1717 4 : Version::Lsn(Lsn(0x60)),
1718 4 : &ctx,
1719 4 : io_concurrency.clone()
1720 4 : )
1721 4 : .instrument(test_span.clone())
1722 4 : .await?,
1723 4 : test_img("foo blk 1 at 4")
1724 4 : );
1725 4 :
1726 4 : // should still see the truncated block with older LSN
1727 4 : assert_eq!(
1728 4 : tline
1729 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1730 4 : .await?,
1731 4 : 3
1732 4 : );
1733 4 : assert_eq!(
1734 4 : tline
1735 4 : .get_rel_page_at_lsn(
1736 4 : TESTREL_A,
1737 4 : 2,
1738 4 : Version::Lsn(Lsn(0x50)),
1739 4 : &ctx,
1740 4 : io_concurrency.clone()
1741 4 : )
1742 4 : .instrument(test_span.clone())
1743 4 : .await?,
1744 4 : test_img("foo blk 2 at 5")
1745 4 : );
1746 4 :
1747 4 : // Truncate to zero length
1748 4 : let mut m = tline.begin_modification(Lsn(0x68));
1749 4 : walingest
1750 4 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1751 4 : .await?;
1752 4 : m.commit(&ctx).await?;
1753 4 : assert_eq!(
1754 4 : tline
1755 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
1756 4 : .await?,
1757 4 : 0
1758 4 : );
1759 4 :
1760 4 : // Extend from 0 to 2 blocks, leaving a gap
1761 4 : let mut m = tline.begin_modification(Lsn(0x70));
1762 4 : walingest
1763 4 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
1764 4 : .await?;
1765 4 : m.commit(&ctx).await?;
1766 4 : assert_eq!(
1767 4 : tline
1768 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
1769 4 : .await?,
1770 4 : 2
1771 4 : );
1772 4 : assert_eq!(
1773 4 : tline
1774 4 : .get_rel_page_at_lsn(
1775 4 : TESTREL_A,
1776 4 : 0,
1777 4 : Version::Lsn(Lsn(0x70)),
1778 4 : &ctx,
1779 4 : io_concurrency.clone()
1780 4 : )
1781 4 : .instrument(test_span.clone())
1782 4 : .await?,
1783 4 : ZERO_PAGE
1784 4 : );
1785 4 : assert_eq!(
1786 4 : tline
1787 4 : .get_rel_page_at_lsn(
1788 4 : TESTREL_A,
1789 4 : 1,
1790 4 : Version::Lsn(Lsn(0x70)),
1791 4 : &ctx,
1792 4 : io_concurrency.clone()
1793 4 : )
1794 4 : .instrument(test_span.clone())
1795 4 : .await?,
1796 4 : test_img("foo blk 1")
1797 4 : );
1798 4 :
1799 4 : // Extend a lot more, leaving a big gap that spans across segments
1800 4 : let mut m = tline.begin_modification(Lsn(0x80));
1801 4 : walingest
1802 4 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
1803 4 : .await?;
1804 4 : m.commit(&ctx).await?;
1805 4 : assert_eq!(
1806 4 : tline
1807 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
1808 4 : .await?,
1809 4 : 1501
1810 4 : );
1811 5996 : for blk in 2..1500 {
1812 5992 : assert_eq!(
1813 5992 : tline
1814 5992 : .get_rel_page_at_lsn(
1815 5992 : TESTREL_A,
1816 5992 : blk,
1817 5992 : Version::Lsn(Lsn(0x80)),
1818 5992 : &ctx,
1819 5992 : io_concurrency.clone()
1820 5992 : )
1821 5992 : .instrument(test_span.clone())
1822 5992 : .await?,
1823 5992 : ZERO_PAGE
1824 4 : );
1825 4 : }
1826 4 : assert_eq!(
1827 4 : tline
1828 4 : .get_rel_page_at_lsn(
1829 4 : TESTREL_A,
1830 4 : 1500,
1831 4 : Version::Lsn(Lsn(0x80)),
1832 4 : &ctx,
1833 4 : io_concurrency.clone()
1834 4 : )
1835 4 : .instrument(test_span.clone())
1836 4 : .await?,
1837 4 : test_img("foo blk 1500")
1838 4 : );
1839 4 :
1840 4 : Ok(())
1841 4 : }
1842 :
1843 : // Test what happens if we dropped a relation
1844 : // and then created it again within the same layer.
1845 : #[tokio::test]
1846 4 : async fn test_drop_extend() -> Result<()> {
1847 4 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
1848 4 : .await?
1849 4 : .load()
1850 4 : .await;
1851 4 : let tline = tenant
1852 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1853 4 : .await?;
1854 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1855 4 :
1856 4 : let mut m = tline.begin_modification(Lsn(0x20));
1857 4 : walingest
1858 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1859 4 : .await?;
1860 4 : m.commit(&ctx).await?;
1861 4 :
1862 4 : // Check that rel exists and size is correct
1863 4 : assert_eq!(
1864 4 : tline
1865 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1866 4 : .await?,
1867 4 : true
1868 4 : );
1869 4 : assert_eq!(
1870 4 : tline
1871 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1872 4 : .await?,
1873 4 : 1
1874 4 : );
1875 4 :
1876 4 : // Drop rel
1877 4 : let mut m = tline.begin_modification(Lsn(0x30));
1878 4 : let mut rel_drops = HashMap::new();
1879 4 : rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
1880 4 : m.put_rel_drops(rel_drops, &ctx).await?;
1881 4 : m.commit(&ctx).await?;
1882 4 :
1883 4 : // Check that rel is not visible anymore
1884 4 : assert_eq!(
1885 4 : tline
1886 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
1887 4 : .await?,
1888 4 : false
1889 4 : );
1890 4 :
1891 4 : // FIXME: should fail
1892 4 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
1893 4 :
1894 4 : // Re-create it
1895 4 : let mut m = tline.begin_modification(Lsn(0x40));
1896 4 : walingest
1897 4 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
1898 4 : .await?;
1899 4 : m.commit(&ctx).await?;
1900 4 :
1901 4 : // Check that rel exists and size is correct
1902 4 : assert_eq!(
1903 4 : tline
1904 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
1905 4 : .await?,
1906 4 : true
1907 4 : );
1908 4 : assert_eq!(
1909 4 : tline
1910 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
1911 4 : .await?,
1912 4 : 1
1913 4 : );
1914 4 :
1915 4 : Ok(())
1916 4 : }
1917 :
1918 : // Test what happens if we truncated a relation
1919 : // so that one of its segments was dropped
1920 : // and then extended it again within the same layer.
1921 : #[tokio::test]
1922 4 : async fn test_truncate_extend() -> Result<()> {
1923 4 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
1924 4 : .await?
1925 4 : .load()
1926 4 : .await;
1927 4 : let io_concurrency = IoConcurrency::spawn_for_test();
1928 4 : let tline = tenant
1929 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1930 4 : .await?;
1931 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1932 4 :
1933 4 : // Create a 20 MB relation (the size is arbitrary)
1934 4 : let relsize = 20 * 1024 * 1024 / 8192;
1935 4 : let mut m = tline.begin_modification(Lsn(0x20));
1936 10240 : for blkno in 0..relsize {
1937 10240 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
1938 10240 : walingest
1939 10240 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
1940 10240 : .await?;
1941 4 : }
1942 4 : m.commit(&ctx).await?;
1943 4 :
1944 4 : let test_span = tracing::info_span!(parent: None, "test",
1945 4 : tenant_id=%tline.tenant_shard_id.tenant_id,
1946 0 : shard_id=%tline.tenant_shard_id.shard_slug(),
1947 0 : timeline_id=%tline.timeline_id);
1948 4 :
1949 4 : // The relation was created at LSN 20, not visible at LSN 1 yet.
1950 4 : assert_eq!(
1951 4 : tline
1952 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1953 4 : .await?,
1954 4 : false
1955 4 : );
1956 4 : assert!(tline
1957 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1958 4 : .await
1959 4 : .is_err());
1960 4 :
1961 4 : assert_eq!(
1962 4 : tline
1963 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1964 4 : .await?,
1965 4 : true
1966 4 : );
1967 4 : assert_eq!(
1968 4 : tline
1969 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1970 4 : .await?,
1971 4 : relsize
1972 4 : );
1973 4 :
1974 4 : // Check relation content
1975 10240 : for blkno in 0..relsize {
1976 10240 : let lsn = Lsn(0x20);
1977 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
1978 10240 : assert_eq!(
1979 10240 : tline
1980 10240 : .get_rel_page_at_lsn(
1981 10240 : TESTREL_A,
1982 10240 : blkno,
1983 10240 : Version::Lsn(lsn),
1984 10240 : &ctx,
1985 10240 : io_concurrency.clone()
1986 10240 : )
1987 10240 : .instrument(test_span.clone())
1988 10240 : .await?,
1989 10240 : test_img(&data)
1990 4 : );
1991 4 : }
1992 4 :
1993 4 : // Truncate relation so that second segment was dropped
1994 4 : // - only leave one page
1995 4 : let mut m = tline.begin_modification(Lsn(0x60));
1996 4 : walingest
1997 4 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
1998 4 : .await?;
1999 4 : m.commit(&ctx).await?;
2000 4 :
2001 4 : // Check reported size and contents after truncation
2002 4 : assert_eq!(
2003 4 : tline
2004 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2005 4 : .await?,
2006 4 : 1
2007 4 : );
2008 4 :
2009 8 : for blkno in 0..1 {
2010 4 : let lsn = Lsn(0x20);
2011 4 : let data = format!("foo blk {} at {}", blkno, lsn);
2012 4 : assert_eq!(
2013 4 : tline
2014 4 : .get_rel_page_at_lsn(
2015 4 : TESTREL_A,
2016 4 : blkno,
2017 4 : Version::Lsn(Lsn(0x60)),
2018 4 : &ctx,
2019 4 : io_concurrency.clone()
2020 4 : )
2021 4 : .instrument(test_span.clone())
2022 4 : .await?,
2023 4 : test_img(&data)
2024 4 : );
2025 4 : }
2026 4 :
2027 4 : // should still see all blocks with older LSN
2028 4 : assert_eq!(
2029 4 : tline
2030 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2031 4 : .await?,
2032 4 : relsize
2033 4 : );
2034 10240 : for blkno in 0..relsize {
2035 10240 : let lsn = Lsn(0x20);
2036 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2037 10240 : assert_eq!(
2038 10240 : tline
2039 10240 : .get_rel_page_at_lsn(
2040 10240 : TESTREL_A,
2041 10240 : blkno,
2042 10240 : Version::Lsn(Lsn(0x50)),
2043 10240 : &ctx,
2044 10240 : io_concurrency.clone()
2045 10240 : )
2046 10240 : .instrument(test_span.clone())
2047 10240 : .await?,
2048 10240 : test_img(&data)
2049 4 : );
2050 4 : }
2051 4 :
2052 4 : // Extend relation again.
2053 4 : // Add enough blocks to create second segment
2054 4 : let lsn = Lsn(0x80);
2055 4 : let mut m = tline.begin_modification(lsn);
2056 10240 : for blkno in 0..relsize {
2057 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2058 10240 : walingest
2059 10240 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2060 10240 : .await?;
2061 4 : }
2062 4 : m.commit(&ctx).await?;
2063 4 :
2064 4 : assert_eq!(
2065 4 : tline
2066 4 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2067 4 : .await?,
2068 4 : true
2069 4 : );
2070 4 : assert_eq!(
2071 4 : tline
2072 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2073 4 : .await?,
2074 4 : relsize
2075 4 : );
2076 4 : // Check relation content
2077 10240 : for blkno in 0..relsize {
2078 10240 : let lsn = Lsn(0x80);
2079 10240 : let data = format!("foo blk {} at {}", blkno, lsn);
2080 10240 : assert_eq!(
2081 10240 : tline
2082 10240 : .get_rel_page_at_lsn(
2083 10240 : TESTREL_A,
2084 10240 : blkno,
2085 10240 : Version::Lsn(Lsn(0x80)),
2086 10240 : &ctx,
2087 10240 : io_concurrency.clone()
2088 10240 : )
2089 10240 : .instrument(test_span.clone())
2090 10240 : .await?,
2091 10240 : test_img(&data)
2092 4 : );
2093 4 : }
2094 4 :
2095 4 : Ok(())
2096 4 : }
2097 :
2098 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2099 : /// split into multiple 1 GB segments in Postgres.
2100 : #[tokio::test]
2101 4 : async fn test_large_rel() -> Result<()> {
2102 4 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
2103 4 : let tline = tenant
2104 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2105 4 : .await?;
2106 4 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2107 4 :
2108 4 : let mut lsn = 0x10;
2109 524292 : for blknum in 0..RELSEG_SIZE + 1 {
2110 524292 : lsn += 0x10;
2111 524292 : let mut m = tline.begin_modification(Lsn(lsn));
2112 524292 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2113 524292 : walingest
2114 524292 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2115 524292 : .await?;
2116 524292 : m.commit(&ctx).await?;
2117 4 : }
2118 4 :
2119 4 : assert_current_logical_size(&tline, Lsn(lsn));
2120 4 :
2121 4 : assert_eq!(
2122 4 : tline
2123 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2124 4 : .await?,
2125 4 : RELSEG_SIZE + 1
2126 4 : );
2127 4 :
2128 4 : // Truncate one block
2129 4 : lsn += 0x10;
2130 4 : let mut m = tline.begin_modification(Lsn(lsn));
2131 4 : walingest
2132 4 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2133 4 : .await?;
2134 4 : m.commit(&ctx).await?;
2135 4 : assert_eq!(
2136 4 : tline
2137 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2138 4 : .await?,
2139 4 : RELSEG_SIZE
2140 4 : );
2141 4 : assert_current_logical_size(&tline, Lsn(lsn));
2142 4 :
2143 4 : // Truncate another block
2144 4 : lsn += 0x10;
2145 4 : let mut m = tline.begin_modification(Lsn(lsn));
2146 4 : walingest
2147 4 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2148 4 : .await?;
2149 4 : m.commit(&ctx).await?;
2150 4 : assert_eq!(
2151 4 : tline
2152 4 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2153 4 : .await?,
2154 4 : RELSEG_SIZE - 1
2155 4 : );
2156 4 : assert_current_logical_size(&tline, Lsn(lsn));
2157 4 :
2158 4 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2159 4 : // This tests the behavior at segment boundaries
2160 4 : let mut size: i32 = 3000;
2161 12008 : while size >= 0 {
2162 12004 : lsn += 0x10;
2163 12004 : let mut m = tline.begin_modification(Lsn(lsn));
2164 12004 : walingest
2165 12004 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2166 12004 : .await?;
2167 12004 : m.commit(&ctx).await?;
2168 12004 : assert_eq!(
2169 12004 : tline
2170 12004 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2171 12004 : .await?,
2172 12004 : size as BlockNumber
2173 4 : );
2174 4 :
2175 12004 : size -= 1;
2176 4 : }
2177 4 : assert_current_logical_size(&tline, Lsn(lsn));
2178 4 :
2179 4 : Ok(())
2180 4 : }
2181 :
2182 : /// Replay a wal segment file taken directly from safekeepers.
2183 : ///
2184 : /// This test is useful for benchmarking since it allows us to profile only
2185 : /// the walingest code in a single-threaded executor, and iterate more quickly
2186 : /// without waiting for unrelated steps.
2187 : #[tokio::test]
2188 4 : async fn test_ingest_real_wal() {
2189 4 : use crate::tenant::harness::*;
2190 4 : use postgres_ffi::waldecoder::WalStreamDecoder;
2191 4 : use postgres_ffi::WAL_SEGMENT_SIZE;
2192 4 :
2193 4 : // Define test data path and constants.
2194 4 : //
2195 4 : // Steps to reconstruct the data, if needed:
2196 4 : // 1. Run the pgbench python test
2197 4 : // 2. Take the first wal segment file from safekeeper
2198 4 : // 3. Compress it using `zstd --long input_file`
2199 4 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2200 4 : // 5. Grep sk logs for "restart decoder" to get startpoint
2201 4 : // 6. Run just the decoder from this test to get the endpoint.
2202 4 : // It's the last LSN the decoder will output.
2203 4 : let pg_version = 15; // The test data was generated by pg15
2204 4 : let path = "test_data/sk_wal_segment_from_pgbench";
2205 4 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2206 4 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2207 4 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2208 4 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2209 4 :
2210 4 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
2211 4 : let span = harness
2212 4 : .span()
2213 4 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
2214 4 : let (tenant, ctx) = harness.load().await;
2215 4 :
2216 4 : let remote_initdb_path =
2217 4 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2218 4 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2219 4 :
2220 4 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2221 4 : .expect("creating test dir should work");
2222 4 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2223 4 :
2224 4 : // Bootstrap a real timeline. We can't use create_test_timeline because
2225 4 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2226 4 : // the garbage data.
2227 4 : let tline = tenant
2228 4 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2229 4 : .await
2230 4 : .unwrap();
2231 4 :
2232 4 : // We fully read and decompress this into memory before decoding
2233 4 : // to get a more accurate perf profile of the decoder.
2234 4 : let bytes = {
2235 4 : use async_compression::tokio::bufread::ZstdDecoder;
2236 4 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2237 4 : let reader = tokio::io::BufReader::new(file);
2238 4 : let decoder = ZstdDecoder::new(reader);
2239 4 : let mut reader = tokio::io::BufReader::new(decoder);
2240 4 : let mut buffer = Vec::new();
2241 4 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2242 4 : buffer
2243 4 : };
2244 4 :
2245 4 : // TODO start a profiler too
2246 4 : let started_at = std::time::Instant::now();
2247 4 :
2248 4 : // Initialize walingest
2249 4 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2250 4 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2251 4 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2252 4 : .await
2253 4 : .unwrap();
2254 4 : let mut modification = tline.begin_modification(startpoint);
2255 4 : println!("decoding {} bytes", bytes.len() - xlogoff);
2256 4 :
2257 4 : // Decode and ingest wal. We process the wal in chunks because
2258 4 : // that's what happens when we get bytes from safekeepers.
2259 949372 : for chunk in bytes[xlogoff..].chunks(50) {
2260 949372 : decoder.feed_bytes(chunk);
2261 1241072 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2262 291700 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
2263 291700 : recdata,
2264 291700 : &[*modification.tline.get_shard_identity()],
2265 291700 : lsn,
2266 291700 : modification.tline.pg_version,
2267 291700 : )
2268 291700 : .unwrap()
2269 291700 : .remove(modification.tline.get_shard_identity())
2270 291700 : .unwrap();
2271 291700 :
2272 291700 : walingest
2273 291700 : .ingest_record(interpreted, &mut modification, &ctx)
2274 291700 : .instrument(span.clone())
2275 291700 : .await
2276 291700 : .unwrap();
2277 4 : }
2278 949372 : modification.commit(&ctx).await.unwrap();
2279 4 : }
2280 4 :
2281 4 : let duration = started_at.elapsed();
2282 4 : println!("done in {:?}", duration);
2283 4 : }
2284 : }
|