Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::time::Duration;
25 : use std::time::SystemTime;
26 :
27 : use pageserver_api::shard::ShardIdentity;
28 : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
29 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
30 :
31 : use anyhow::{bail, Context, Result};
32 : use bytes::{Buf, Bytes, BytesMut};
33 : use tracing::*;
34 : use utils::failpoint_support;
35 : use utils::rate_limit::RateLimit;
36 :
37 : use crate::context::RequestContext;
38 : use crate::metrics::WAL_INGEST;
39 : use crate::pgdatadir_mapping::{DatadirModification, Version};
40 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
41 : use crate::tenant::PageReconstructError;
42 : use crate::tenant::Timeline;
43 : use crate::walrecord::*;
44 : use crate::ZERO_PAGE;
45 : use pageserver_api::key::rel_block_to_key;
46 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
47 : use postgres_ffi::pg_constants;
48 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
49 : use postgres_ffi::TransactionId;
50 : use postgres_ffi::BLCKSZ;
51 : use utils::bin_ser::SerializeError;
52 : use utils::lsn::Lsn;
53 :
54 : enum_pgversion! {CheckPoint, pgv::CheckPoint}
55 :
56 : impl CheckPoint {
57 18 : fn encode(&self) -> Result<Bytes, SerializeError> {
58 18 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
59 18 : }
60 :
61 437502 : fn update_next_xid(&mut self, xid: u32) -> bool {
62 437502 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
63 437502 : }
64 :
65 0 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
66 0 : enum_pgversion_dispatch!(self, CheckPoint, cp, {
67 0 : cp.update_next_multixid(multi_xid, multi_offset)
68 : })
69 0 : }
70 : }
71 :
72 : pub struct WalIngest {
73 : shard: ShardIdentity,
74 : checkpoint: CheckPoint,
75 : checkpoint_modified: bool,
76 : warn_ingest_lag: WarnIngestLag,
77 : }
78 :
79 : struct WarnIngestLag {
80 : lag_msg_ratelimit: RateLimit,
81 : future_lsn_msg_ratelimit: RateLimit,
82 : timestamp_invalid_msg_ratelimit: RateLimit,
83 : }
84 :
85 : impl WalIngest {
86 36 : pub async fn new(
87 36 : timeline: &Timeline,
88 36 : startpoint: Lsn,
89 36 : ctx: &RequestContext,
90 36 : ) -> anyhow::Result<WalIngest> {
91 : // Fetch the latest checkpoint into memory, so that we can compare with it
92 : // quickly in `ingest_record` and update it when it changes.
93 36 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
94 36 : let pgversion = timeline.pg_version;
95 :
96 36 : let checkpoint = dispatch_pgversion!(pgversion, {
97 0 : let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
98 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
99 0 : <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
100 : });
101 :
102 36 : Ok(WalIngest {
103 36 : shard: *timeline.get_shard_identity(),
104 36 : checkpoint,
105 36 : checkpoint_modified: false,
106 36 : warn_ingest_lag: WarnIngestLag {
107 36 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
108 36 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
109 36 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
110 36 : },
111 36 : })
112 36 : }
113 :
114 : ///
115 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
116 : ///
117 : /// This function updates `lsn` field of `DatadirModification`
118 : ///
119 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
120 : /// relations/pages that the record affects.
121 : ///
122 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
123 : ///
124 437556 : pub async fn ingest_record(
125 437556 : &mut self,
126 437556 : decoded: DecodedWALRecord,
127 437556 : lsn: Lsn,
128 437556 : modification: &mut DatadirModification<'_>,
129 437556 : ctx: &RequestContext,
130 437556 : ) -> anyhow::Result<bool> {
131 437556 : WAL_INGEST.records_received.inc();
132 437556 : let pg_version = modification.tline.pg_version;
133 437556 : let prev_len = modification.len();
134 437556 :
135 437556 : modification.set_lsn(lsn)?;
136 :
137 437556 : if decoded.is_dbase_create_copy(pg_version) {
138 : // Records of this type should always be preceded by a commit(), as they
139 : // rely on reading data pages back from the Timeline.
140 0 : assert!(!modification.has_dirty_data_pages());
141 437556 : }
142 :
143 437556 : let mut buf = decoded.record.clone();
144 437556 : buf.advance(decoded.main_data_offset);
145 437556 :
146 437556 : assert!(!self.checkpoint_modified);
147 437556 : if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
148 437502 : && self.checkpoint.update_next_xid(decoded.xl_xid)
149 6 : {
150 6 : self.checkpoint_modified = true;
151 437550 : }
152 :
153 437556 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
154 :
155 437556 : match decoded.xl_rmid {
156 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
157 : // Heap AM records need some special handling, because they modify VM pages
158 : // without registering them with the standard mechanism.
159 436422 : self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
160 0 : .await?;
161 : }
162 : pg_constants::RM_NEON_ID => {
163 0 : self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
164 0 : .await?;
165 : }
166 : // Handle other special record types
167 : pg_constants::RM_SMGR_ID => {
168 48 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
169 48 :
170 48 : if info == pg_constants::XLOG_SMGR_CREATE {
171 48 : let create = XlSmgrCreate::decode(&mut buf);
172 48 : self.ingest_xlog_smgr_create(modification, &create, ctx)
173 36 : .await?;
174 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
175 0 : let truncate = XlSmgrTruncate::decode(&mut buf);
176 0 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
177 0 : .await?;
178 0 : }
179 : }
180 : pg_constants::RM_DBASE_ID => {
181 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
182 0 : debug!(%info, %pg_version, "handle RM_DBASE_ID");
183 :
184 0 : if pg_version == 14 {
185 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
186 0 : let createdb = XlCreateDatabase::decode(&mut buf);
187 0 : debug!("XLOG_DBASE_CREATE v14");
188 :
189 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
190 0 : .await?;
191 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
192 0 : let dropdb = XlDropDatabase::decode(&mut buf);
193 0 : for tablespace_id in dropdb.tablespace_ids {
194 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
195 0 : modification
196 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
197 0 : .await?;
198 : }
199 0 : }
200 0 : } else if pg_version == 15 {
201 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
202 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
203 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
204 : // The XLOG record was renamed between v14 and v15,
205 : // but the record format is the same.
206 : // So we can reuse XlCreateDatabase here.
207 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
208 0 : let createdb = XlCreateDatabase::decode(&mut buf);
209 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
210 0 : .await?;
211 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
212 0 : let dropdb = XlDropDatabase::decode(&mut buf);
213 0 : for tablespace_id in dropdb.tablespace_ids {
214 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
215 0 : modification
216 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
217 0 : .await?;
218 : }
219 0 : }
220 0 : } else if pg_version == 16 {
221 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
222 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
223 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
224 : // The XLOG record was renamed between v14 and v15,
225 : // but the record format is the same.
226 : // So we can reuse XlCreateDatabase here.
227 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
228 0 : let createdb = XlCreateDatabase::decode(&mut buf);
229 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
230 0 : .await?;
231 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
232 0 : let dropdb = XlDropDatabase::decode(&mut buf);
233 0 : for tablespace_id in dropdb.tablespace_ids {
234 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
235 0 : modification
236 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
237 0 : .await?;
238 : }
239 0 : }
240 0 : } else if pg_version == 17 {
241 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
242 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
243 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
244 : // The XLOG record was renamed between v14 and v15,
245 : // but the record format is the same.
246 : // So we can reuse XlCreateDatabase here.
247 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
248 0 : let createdb = XlCreateDatabase::decode(&mut buf);
249 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
250 0 : .await?;
251 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
252 0 : let dropdb = XlDropDatabase::decode(&mut buf);
253 0 : for tablespace_id in dropdb.tablespace_ids {
254 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
255 0 : modification
256 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
257 0 : .await?;
258 : }
259 0 : }
260 0 : }
261 : }
262 : pg_constants::RM_TBLSPC_ID => {
263 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
264 : }
265 : pg_constants::RM_CLOG_ID => {
266 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
267 0 :
268 0 : if info == pg_constants::CLOG_ZEROPAGE {
269 0 : let pageno = if pg_version < 17 {
270 0 : buf.get_u32_le()
271 : } else {
272 0 : buf.get_u64_le() as u32
273 : };
274 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
275 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
276 0 : self.put_slru_page_image(
277 0 : modification,
278 0 : SlruKind::Clog,
279 0 : segno,
280 0 : rpageno,
281 0 : ZERO_PAGE.clone(),
282 0 : ctx,
283 0 : )
284 0 : .await?;
285 : } else {
286 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
287 0 : let xlrec = XlClogTruncate::decode(&mut buf, pg_version);
288 0 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
289 0 : .await?;
290 : }
291 : }
292 : pg_constants::RM_XACT_ID => {
293 72 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
294 72 :
295 72 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
296 24 : let parsed_xact =
297 24 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
298 24 : self.ingest_xact_record(
299 24 : modification,
300 24 : &parsed_xact,
301 24 : info == pg_constants::XLOG_XACT_COMMIT,
302 24 : decoded.origin_id,
303 24 : ctx,
304 24 : )
305 0 : .await?;
306 48 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
307 48 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
308 : {
309 0 : let parsed_xact =
310 0 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
311 0 : self.ingest_xact_record(
312 0 : modification,
313 0 : &parsed_xact,
314 0 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
315 0 : decoded.origin_id,
316 0 : ctx,
317 0 : )
318 0 : .await?;
319 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
320 0 : trace!(
321 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
322 : decoded.xl_xid,
323 : parsed_xact.xid,
324 : lsn,
325 : );
326 :
327 0 : let xid: u64 = if pg_version >= 17 {
328 0 : self.adjust_to_full_transaction_id(parsed_xact.xid)?
329 : } else {
330 0 : parsed_xact.xid as u64
331 : };
332 0 : modification.drop_twophase_file(xid, ctx).await?;
333 48 : } else if info == pg_constants::XLOG_XACT_PREPARE {
334 0 : let xid: u64 = if pg_version >= 17 {
335 0 : self.adjust_to_full_transaction_id(decoded.xl_xid)?
336 : } else {
337 0 : decoded.xl_xid as u64
338 : };
339 0 : modification
340 0 : .put_twophase_file(xid, Bytes::copy_from_slice(&buf[..]), ctx)
341 0 : .await?;
342 48 : }
343 : }
344 : pg_constants::RM_MULTIXACT_ID => {
345 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
346 0 :
347 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
348 0 : let pageno = if pg_version < 17 {
349 0 : buf.get_u32_le()
350 : } else {
351 0 : buf.get_u64_le() as u32
352 : };
353 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
354 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
355 0 : self.put_slru_page_image(
356 0 : modification,
357 0 : SlruKind::MultiXactOffsets,
358 0 : segno,
359 0 : rpageno,
360 0 : ZERO_PAGE.clone(),
361 0 : ctx,
362 0 : )
363 0 : .await?;
364 0 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
365 0 : let pageno = if pg_version < 17 {
366 0 : buf.get_u32_le()
367 : } else {
368 0 : buf.get_u64_le() as u32
369 : };
370 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
371 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
372 0 : self.put_slru_page_image(
373 0 : modification,
374 0 : SlruKind::MultiXactMembers,
375 0 : segno,
376 0 : rpageno,
377 0 : ZERO_PAGE.clone(),
378 0 : ctx,
379 0 : )
380 0 : .await?;
381 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
382 0 : let xlrec = XlMultiXactCreate::decode(&mut buf);
383 0 : self.ingest_multixact_create_record(modification, &xlrec)?;
384 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
385 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
386 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
387 0 : .await?;
388 0 : }
389 : }
390 : pg_constants::RM_RELMAP_ID => {
391 0 : let xlrec = XlRelmapUpdate::decode(&mut buf);
392 0 : self.ingest_relmap_page(modification, &xlrec, &decoded, ctx)
393 0 : .await?;
394 : }
395 : pg_constants::RM_XLOG_ID => {
396 90 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
397 90 :
398 90 : if info == pg_constants::XLOG_PARAMETER_CHANGE {
399 6 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
400 0 : let rec = v17::XlParameterChange::decode(&mut buf);
401 0 : cp.wal_level = rec.wal_level;
402 0 : self.checkpoint_modified = true;
403 6 : }
404 84 : } else if info == pg_constants::XLOG_END_OF_RECOVERY {
405 0 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
406 0 : let rec = v17::XlEndOfRecovery::decode(&mut buf);
407 0 : cp.wal_level = rec.wal_level;
408 0 : self.checkpoint_modified = true;
409 0 : }
410 84 : }
411 :
412 90 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
413 0 : if info == pg_constants::XLOG_NEXTOID {
414 0 : let next_oid = buf.get_u32_le();
415 0 : if cp.nextOid != next_oid {
416 0 : cp.nextOid = next_oid;
417 0 : self.checkpoint_modified = true;
418 0 : }
419 0 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
420 0 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
421 : {
422 0 : let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
423 0 : buf.copy_to_slice(&mut checkpoint_bytes);
424 0 : let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
425 0 : trace!(
426 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
427 : xlog_checkpoint.oldestXid,
428 : cp.oldestXid
429 : );
430 0 : if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
431 0 : cp.oldestXid = xlog_checkpoint.oldestXid;
432 0 : }
433 0 : trace!(
434 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
435 : xlog_checkpoint.oldestActiveXid,
436 : cp.oldestActiveXid
437 : );
438 :
439 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
440 : // because at shutdown, all in-progress transactions will implicitly
441 : // end. Postgres startup code knows that, and allows hot standby to start
442 : // immediately from a shutdown checkpoint.
443 : //
444 : // In Neon, Postgres hot standby startup always behaves as if starting from
445 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
446 : // instead of overwriting self.checkpoint.oldestActiveXid with
447 : // InvalidTransactionid from the checkpoint WAL record, update it to a
448 : // proper value, knowing that there are no in-progress transactions at this
449 : // point, except for prepared transactions.
450 : //
451 : // See also the neon code changes in the InitWalRecovery() function.
452 0 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
453 0 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
454 : {
455 0 : let oldest_active_xid = if pg_version >= 17 {
456 0 : let mut oldest_active_full_xid = cp.nextXid.value;
457 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
458 0 : if xid < oldest_active_full_xid {
459 0 : oldest_active_full_xid = xid;
460 0 : }
461 : }
462 0 : oldest_active_full_xid as u32
463 : } else {
464 0 : let mut oldest_active_xid = cp.nextXid.value as u32;
465 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
466 0 : let narrow_xid = xid as u32;
467 0 : if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
468 0 : oldest_active_xid = narrow_xid;
469 0 : }
470 : }
471 0 : oldest_active_xid
472 : };
473 0 : cp.oldestActiveXid = oldest_active_xid;
474 0 : } else {
475 0 : cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
476 0 : }
477 :
478 : // Write a new checkpoint key-value pair on every checkpoint record, even
479 : // if nothing really changed. Not strictly required, but it seems nice to
480 : // have some trace of the checkpoint records in the layer files at the same
481 : // LSNs.
482 0 : self.checkpoint_modified = true;
483 0 : }
484 : });
485 : }
486 : pg_constants::RM_LOGICALMSG_ID => {
487 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
488 0 :
489 0 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
490 0 : let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
491 0 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
492 0 : let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
493 0 : if prefix == "neon-test" {
494 : // This is a convenient way to make the WAL ingestion pause at
495 : // particular point in the WAL. For more fine-grained control,
496 : // we could peek into the message and only pause if it contains
497 : // a particular string, for example, but this is enough for now.
498 0 : failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
499 0 : } else if let Some(path) = prefix.strip_prefix("neon-file:") {
500 0 : modification.put_file(path, message, ctx).await?;
501 0 : }
502 0 : }
503 : }
504 : pg_constants::RM_STANDBY_ID => {
505 48 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
506 48 : if info == pg_constants::XLOG_RUNNING_XACTS {
507 0 : let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
508 :
509 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
510 0 : cp.oldestActiveXid = xlrec.oldest_running_xid;
511 0 : });
512 :
513 0 : self.checkpoint_modified = true;
514 48 : }
515 : }
516 : pg_constants::RM_REPLORIGIN_ID => {
517 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
518 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
519 0 : let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
520 0 : modification
521 0 : .set_replorigin(xlrec.node_id, xlrec.remote_lsn)
522 0 : .await?
523 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
524 0 : let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
525 0 : modification.drop_replorigin(xlrec.node_id).await?
526 0 : }
527 : }
528 876 : _x => {
529 876 : // TODO: should probably log & fail here instead of blindly
530 876 : // doing something without understanding the protocol
531 876 : }
532 : }
533 :
534 : // Iterate through all the blocks that the record modifies, and
535 : // "put" a separate copy of the record for each block.
536 437556 : for blk in decoded.blocks.iter() {
537 436926 : let rel = RelTag {
538 436926 : spcnode: blk.rnode_spcnode,
539 436926 : dbnode: blk.rnode_dbnode,
540 436926 : relnode: blk.rnode_relnode,
541 436926 : forknum: blk.forknum,
542 436926 : };
543 436926 :
544 436926 : let key = rel_block_to_key(rel, blk.blkno);
545 436926 : let key_is_local = self.shard.is_key_local(&key);
546 436926 :
547 436926 : tracing::debug!(
548 : lsn=%lsn,
549 : key=%key,
550 0 : "ingest: shard decision {} (checkpoint={})",
551 0 : if !key_is_local { "drop" } else { "keep" },
552 : self.checkpoint_modified
553 : );
554 :
555 436926 : if !key_is_local {
556 0 : if self.shard.is_shard_zero() {
557 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
558 : // its blkno in case it implicitly extends a relation.
559 0 : self.observe_decoded_block(modification, blk, ctx).await?;
560 0 : }
561 :
562 0 : continue;
563 436926 : }
564 436926 : self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
565 852 : .await?;
566 : }
567 :
568 : // If checkpoint data was updated, store the new version in the repository
569 437556 : if self.checkpoint_modified {
570 18 : let new_checkpoint_bytes = self.checkpoint.encode()?;
571 :
572 18 : modification.put_checkpoint(new_checkpoint_bytes)?;
573 18 : self.checkpoint_modified = false;
574 437538 : }
575 :
576 : // Note that at this point this record is only cached in the modification
577 : // until commit() is called to flush the data into the repository and update
578 : // the latest LSN.
579 :
580 437556 : modification.on_record_end();
581 437556 :
582 437556 : Ok(modification.len() > prev_len)
583 437556 : }
584 :
585 : /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
586 0 : fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
587 0 : let next_full_xid =
588 0 : enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
589 :
590 0 : let next_xid = (next_full_xid) as u32;
591 0 : let mut epoch = (next_full_xid >> 32) as u32;
592 0 :
593 0 : if xid > next_xid {
594 : // Wraparound occurred, must be from a prev epoch.
595 0 : if epoch == 0 {
596 0 : bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
597 0 : }
598 0 : epoch -= 1;
599 0 : }
600 :
601 0 : Ok((epoch as u64) << 32 | xid as u64)
602 0 : }
603 :
604 : /// Do not store this block, but observe it for the purposes of updating our relation size state.
605 0 : async fn observe_decoded_block(
606 0 : &mut self,
607 0 : modification: &mut DatadirModification<'_>,
608 0 : blk: &DecodedBkpBlock,
609 0 : ctx: &RequestContext,
610 0 : ) -> Result<(), PageReconstructError> {
611 0 : let rel = RelTag {
612 0 : spcnode: blk.rnode_spcnode,
613 0 : dbnode: blk.rnode_dbnode,
614 0 : relnode: blk.rnode_relnode,
615 0 : forknum: blk.forknum,
616 0 : };
617 0 : self.handle_rel_extend(modification, rel, blk.blkno, ctx)
618 0 : .await
619 0 : }
620 :
621 436926 : async fn ingest_decoded_block(
622 436926 : &mut self,
623 436926 : modification: &mut DatadirModification<'_>,
624 436926 : lsn: Lsn,
625 436926 : decoded: &DecodedWALRecord,
626 436926 : blk: &DecodedBkpBlock,
627 436926 : ctx: &RequestContext,
628 436926 : ) -> Result<(), PageReconstructError> {
629 436926 : let rel = RelTag {
630 436926 : spcnode: blk.rnode_spcnode,
631 436926 : dbnode: blk.rnode_dbnode,
632 436926 : relnode: blk.rnode_relnode,
633 436926 : forknum: blk.forknum,
634 436926 : };
635 436926 :
636 436926 : //
637 436926 : // Instead of storing full-page-image WAL record,
638 436926 : // it is better to store extracted image: we can skip wal-redo
639 436926 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
640 436926 : // so them have to be copied multiple times.
641 436926 : //
642 436926 : if blk.apply_image
643 180 : && blk.has_image
644 180 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
645 72 : && (decoded.xl_info == pg_constants::XLOG_FPI
646 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
647 : // compression of WAL is not yet supported: fall back to storing the original WAL record
648 72 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
649 : // do not materialize null pages because them most likely be soon replaced with real data
650 72 : && blk.bimg_len != 0
651 : {
652 : // Extract page image from FPI record
653 72 : let img_len = blk.bimg_len as usize;
654 72 : let img_offs = blk.bimg_offset as usize;
655 72 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
656 72 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
657 72 :
658 72 : if blk.hole_length != 0 {
659 0 : let tail = image.split_off(blk.hole_offset as usize);
660 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
661 0 : image.unsplit(tail);
662 72 : }
663 : //
664 : // Match the logic of XLogReadBufferForRedoExtended:
665 : // The page may be uninitialized. If so, we can't set the LSN because
666 : // that would corrupt the page.
667 : //
668 72 : if !page_is_new(&image) {
669 54 : page_set_lsn(&mut image, lsn)
670 18 : }
671 72 : assert_eq!(image.len(), BLCKSZ as usize);
672 :
673 72 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
674 15 : .await?;
675 : } else {
676 436854 : let rec = NeonWalRecord::Postgres {
677 436854 : will_init: blk.will_init || blk.apply_image,
678 436854 : rec: decoded.record.clone(),
679 436854 : };
680 436854 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
681 837 : .await?;
682 : }
683 436926 : Ok(())
684 436926 : }
685 :
686 436422 : async fn ingest_heapam_record(
687 436422 : &mut self,
688 436422 : buf: &mut Bytes,
689 436422 : modification: &mut DatadirModification<'_>,
690 436422 : decoded: &DecodedWALRecord,
691 436422 : ctx: &RequestContext,
692 436422 : ) -> anyhow::Result<()> {
693 436422 : // Handle VM bit updates that are implicitly part of heap records.
694 436422 :
695 436422 : // First, look at the record to determine which VM bits need
696 436422 : // to be cleared. If either of these variables is set, we
697 436422 : // need to clear the corresponding bits in the visibility map.
698 436422 : let mut new_heap_blkno: Option<u32> = None;
699 436422 : let mut old_heap_blkno: Option<u32> = None;
700 436422 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
701 436422 :
702 436422 : match modification.tline.pg_version {
703 : 14 => {
704 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
705 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
706 0 :
707 0 : if info == pg_constants::XLOG_HEAP_INSERT {
708 0 : let xlrec = v14::XlHeapInsert::decode(buf);
709 0 : assert_eq!(0, buf.remaining());
710 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
711 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
712 0 : }
713 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
714 0 : let xlrec = v14::XlHeapDelete::decode(buf);
715 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
716 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
717 0 : }
718 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
719 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
720 : {
721 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
722 0 : // the size of tuple data is inferred from the size of the record.
723 0 : // we can't validate the remaining number of bytes without parsing
724 0 : // the tuple data.
725 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
726 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
727 0 : }
728 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
729 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
730 0 : // non-HOT update where the new tuple goes to different page than
731 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
732 0 : // set.
733 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
734 0 : }
735 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
736 0 : let xlrec = v14::XlHeapLock::decode(buf);
737 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
738 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
739 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
740 0 : }
741 0 : }
742 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
743 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
744 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
745 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
746 :
747 0 : let offset_array_len =
748 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
749 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
750 0 : 0
751 : } else {
752 0 : size_of::<u16>() * xlrec.ntuples as usize
753 : };
754 0 : assert_eq!(offset_array_len, buf.remaining());
755 :
756 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
757 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
758 0 : }
759 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
760 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
761 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
762 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
763 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
764 0 : }
765 0 : }
766 : } else {
767 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
768 : }
769 : }
770 : 15 => {
771 436422 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
772 435858 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
773 435858 :
774 435858 : if info == pg_constants::XLOG_HEAP_INSERT {
775 435828 : let xlrec = v15::XlHeapInsert::decode(buf);
776 435828 : assert_eq!(0, buf.remaining());
777 435828 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
778 12 : new_heap_blkno = Some(decoded.blocks[0].blkno);
779 435816 : }
780 30 : } else if info == pg_constants::XLOG_HEAP_DELETE {
781 0 : let xlrec = v15::XlHeapDelete::decode(buf);
782 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
783 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
784 0 : }
785 30 : } else if info == pg_constants::XLOG_HEAP_UPDATE
786 6 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
787 : {
788 24 : let xlrec = v15::XlHeapUpdate::decode(buf);
789 24 : // the size of tuple data is inferred from the size of the record.
790 24 : // we can't validate the remaining number of bytes without parsing
791 24 : // the tuple data.
792 24 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
793 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
794 24 : }
795 24 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
796 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
797 0 : // non-HOT update where the new tuple goes to different page than
798 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
799 0 : // set.
800 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
801 24 : }
802 6 : } else if info == pg_constants::XLOG_HEAP_LOCK {
803 0 : let xlrec = v15::XlHeapLock::decode(buf);
804 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
805 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
806 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
807 0 : }
808 6 : }
809 564 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
810 564 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
811 564 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
812 126 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
813 :
814 126 : let offset_array_len =
815 126 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
816 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
817 6 : 0
818 : } else {
819 120 : size_of::<u16>() * xlrec.ntuples as usize
820 : };
821 126 : assert_eq!(offset_array_len, buf.remaining());
822 :
823 126 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
824 24 : new_heap_blkno = Some(decoded.blocks[0].blkno);
825 102 : }
826 438 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
827 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
828 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
829 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
830 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
831 0 : }
832 438 : }
833 : } else {
834 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
835 : }
836 : }
837 : 16 => {
838 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
839 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
840 0 :
841 0 : if info == pg_constants::XLOG_HEAP_INSERT {
842 0 : let xlrec = v16::XlHeapInsert::decode(buf);
843 0 : assert_eq!(0, buf.remaining());
844 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
845 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
846 0 : }
847 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
848 0 : let xlrec = v16::XlHeapDelete::decode(buf);
849 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
850 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
851 0 : }
852 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
853 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
854 : {
855 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
856 0 : // the size of tuple data is inferred from the size of the record.
857 0 : // we can't validate the remaining number of bytes without parsing
858 0 : // the tuple data.
859 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
860 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
861 0 : }
862 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
863 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
864 0 : // non-HOT update where the new tuple goes to different page than
865 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
866 0 : // set.
867 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
868 0 : }
869 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
870 0 : let xlrec = v16::XlHeapLock::decode(buf);
871 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
872 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
873 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
874 0 : }
875 0 : }
876 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
877 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
878 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
879 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
880 :
881 0 : let offset_array_len =
882 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
883 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
884 0 : 0
885 : } else {
886 0 : size_of::<u16>() * xlrec.ntuples as usize
887 : };
888 0 : assert_eq!(offset_array_len, buf.remaining());
889 :
890 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
891 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
892 0 : }
893 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
894 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
895 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
896 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
897 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
898 0 : }
899 0 : }
900 : } else {
901 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
902 : }
903 : }
904 : 17 => {
905 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
906 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
907 0 :
908 0 : if info == pg_constants::XLOG_HEAP_INSERT {
909 0 : let xlrec = v17::XlHeapInsert::decode(buf);
910 0 : assert_eq!(0, buf.remaining());
911 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
912 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
913 0 : }
914 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
915 0 : let xlrec = v17::XlHeapDelete::decode(buf);
916 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
917 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
918 0 : }
919 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
920 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
921 : {
922 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
923 0 : // the size of tuple data is inferred from the size of the record.
924 0 : // we can't validate the remaining number of bytes without parsing
925 0 : // the tuple data.
926 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
927 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
928 0 : }
929 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
930 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
931 0 : // non-HOT update where the new tuple goes to different page than
932 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
933 0 : // set.
934 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
935 0 : }
936 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
937 0 : let xlrec = v17::XlHeapLock::decode(buf);
938 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
939 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
940 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
941 0 : }
942 0 : }
943 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
944 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
945 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
946 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
947 :
948 0 : let offset_array_len =
949 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
950 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
951 0 : 0
952 : } else {
953 0 : size_of::<u16>() * xlrec.ntuples as usize
954 : };
955 0 : assert_eq!(offset_array_len, buf.remaining());
956 :
957 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
958 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
959 0 : }
960 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
961 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
962 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
963 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
964 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
965 0 : }
966 0 : }
967 : } else {
968 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
969 : }
970 : }
971 0 : _ => {}
972 : }
973 :
974 : // Clear the VM bits if required.
975 436422 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
976 36 : let vm_rel = RelTag {
977 36 : forknum: VISIBILITYMAP_FORKNUM,
978 36 : spcnode: decoded.blocks[0].rnode_spcnode,
979 36 : dbnode: decoded.blocks[0].rnode_dbnode,
980 36 : relnode: decoded.blocks[0].rnode_relnode,
981 36 : };
982 36 :
983 36 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
984 36 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
985 :
986 : // Sometimes, Postgres seems to create heap WAL records with the
987 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
988 : // not set. In fact, it's possible that the VM page does not exist at all.
989 : // In that case, we don't want to store a record to clear the VM bit;
990 : // replaying it would fail to find the previous image of the page, because
991 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
992 : // record if it doesn't.
993 36 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
994 36 : if let Some(blknum) = new_vm_blk {
995 36 : if blknum >= vm_size {
996 0 : new_vm_blk = None;
997 36 : }
998 0 : }
999 36 : if let Some(blknum) = old_vm_blk {
1000 0 : if blknum >= vm_size {
1001 0 : old_vm_blk = None;
1002 0 : }
1003 36 : }
1004 :
1005 36 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
1006 36 : if new_vm_blk == old_vm_blk {
1007 : // An UPDATE record that needs to clear the bits for both old and the
1008 : // new page, both of which reside on the same VM page.
1009 0 : self.put_rel_wal_record(
1010 0 : modification,
1011 0 : vm_rel,
1012 0 : new_vm_blk.unwrap(),
1013 0 : NeonWalRecord::ClearVisibilityMapFlags {
1014 0 : new_heap_blkno,
1015 0 : old_heap_blkno,
1016 0 : flags,
1017 0 : },
1018 0 : ctx,
1019 0 : )
1020 0 : .await?;
1021 : } else {
1022 : // Clear VM bits for one heap page, or for two pages that reside on
1023 : // different VM pages.
1024 36 : if let Some(new_vm_blk) = new_vm_blk {
1025 36 : self.put_rel_wal_record(
1026 36 : modification,
1027 36 : vm_rel,
1028 36 : new_vm_blk,
1029 36 : NeonWalRecord::ClearVisibilityMapFlags {
1030 36 : new_heap_blkno,
1031 36 : old_heap_blkno: None,
1032 36 : flags,
1033 36 : },
1034 36 : ctx,
1035 36 : )
1036 0 : .await?;
1037 0 : }
1038 36 : if let Some(old_vm_blk) = old_vm_blk {
1039 0 : self.put_rel_wal_record(
1040 0 : modification,
1041 0 : vm_rel,
1042 0 : old_vm_blk,
1043 0 : NeonWalRecord::ClearVisibilityMapFlags {
1044 0 : new_heap_blkno: None,
1045 0 : old_heap_blkno,
1046 0 : flags,
1047 0 : },
1048 0 : ctx,
1049 0 : )
1050 0 : .await?;
1051 36 : }
1052 : }
1053 0 : }
1054 436386 : }
1055 :
1056 436422 : Ok(())
1057 436422 : }
1058 :
1059 0 : async fn ingest_neonrmgr_record(
1060 0 : &mut self,
1061 0 : buf: &mut Bytes,
1062 0 : modification: &mut DatadirModification<'_>,
1063 0 : decoded: &DecodedWALRecord,
1064 0 : ctx: &RequestContext,
1065 0 : ) -> anyhow::Result<()> {
1066 0 : // Handle VM bit updates that are implicitly part of heap records.
1067 0 :
1068 0 : // First, look at the record to determine which VM bits need
1069 0 : // to be cleared. If either of these variables is set, we
1070 0 : // need to clear the corresponding bits in the visibility map.
1071 0 : let mut new_heap_blkno: Option<u32> = None;
1072 0 : let mut old_heap_blkno: Option<u32> = None;
1073 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
1074 0 : let pg_version = modification.tline.pg_version;
1075 0 :
1076 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
1077 :
1078 0 : match pg_version {
1079 : 16 | 17 => {
1080 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1081 0 :
1082 0 : match info {
1083 : pg_constants::XLOG_NEON_HEAP_INSERT => {
1084 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
1085 0 : assert_eq!(0, buf.remaining());
1086 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
1087 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1088 0 : }
1089 : }
1090 : pg_constants::XLOG_NEON_HEAP_DELETE => {
1091 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
1092 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
1093 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1094 0 : }
1095 : }
1096 : pg_constants::XLOG_NEON_HEAP_UPDATE
1097 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
1098 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
1099 0 : // the size of tuple data is inferred from the size of the record.
1100 0 : // we can't validate the remaining number of bytes without parsing
1101 0 : // the tuple data.
1102 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
1103 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
1104 0 : }
1105 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
1106 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
1107 0 : // non-HOT update where the new tuple goes to different page than
1108 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
1109 0 : // set.
1110 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1111 0 : }
1112 : }
1113 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
1114 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
1115 :
1116 0 : let offset_array_len =
1117 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
1118 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
1119 0 : 0
1120 : } else {
1121 0 : size_of::<u16>() * xlrec.ntuples as usize
1122 : };
1123 0 : assert_eq!(offset_array_len, buf.remaining());
1124 :
1125 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
1126 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1127 0 : }
1128 : }
1129 : pg_constants::XLOG_NEON_HEAP_LOCK => {
1130 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
1131 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
1132 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
1133 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
1134 0 : }
1135 : }
1136 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
1137 : }
1138 : }
1139 0 : _ => bail!(
1140 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
1141 0 : pg_version
1142 0 : ),
1143 : }
1144 :
1145 : // Clear the VM bits if required.
1146 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
1147 0 : let vm_rel = RelTag {
1148 0 : forknum: VISIBILITYMAP_FORKNUM,
1149 0 : spcnode: decoded.blocks[0].rnode_spcnode,
1150 0 : dbnode: decoded.blocks[0].rnode_dbnode,
1151 0 : relnode: decoded.blocks[0].rnode_relnode,
1152 0 : };
1153 0 :
1154 0 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
1155 0 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
1156 :
1157 : // Sometimes, Postgres seems to create heap WAL records with the
1158 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
1159 : // not set. In fact, it's possible that the VM page does not exist at all.
1160 : // In that case, we don't want to store a record to clear the VM bit;
1161 : // replaying it would fail to find the previous image of the page, because
1162 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
1163 : // record if it doesn't.
1164 0 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
1165 0 : if let Some(blknum) = new_vm_blk {
1166 0 : if blknum >= vm_size {
1167 0 : new_vm_blk = None;
1168 0 : }
1169 0 : }
1170 0 : if let Some(blknum) = old_vm_blk {
1171 0 : if blknum >= vm_size {
1172 0 : old_vm_blk = None;
1173 0 : }
1174 0 : }
1175 :
1176 0 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
1177 0 : if new_vm_blk == old_vm_blk {
1178 : // An UPDATE record that needs to clear the bits for both old and the
1179 : // new page, both of which reside on the same VM page.
1180 0 : self.put_rel_wal_record(
1181 0 : modification,
1182 0 : vm_rel,
1183 0 : new_vm_blk.unwrap(),
1184 0 : NeonWalRecord::ClearVisibilityMapFlags {
1185 0 : new_heap_blkno,
1186 0 : old_heap_blkno,
1187 0 : flags,
1188 0 : },
1189 0 : ctx,
1190 0 : )
1191 0 : .await?;
1192 : } else {
1193 : // Clear VM bits for one heap page, or for two pages that reside on
1194 : // different VM pages.
1195 0 : if let Some(new_vm_blk) = new_vm_blk {
1196 0 : self.put_rel_wal_record(
1197 0 : modification,
1198 0 : vm_rel,
1199 0 : new_vm_blk,
1200 0 : NeonWalRecord::ClearVisibilityMapFlags {
1201 0 : new_heap_blkno,
1202 0 : old_heap_blkno: None,
1203 0 : flags,
1204 0 : },
1205 0 : ctx,
1206 0 : )
1207 0 : .await?;
1208 0 : }
1209 0 : if let Some(old_vm_blk) = old_vm_blk {
1210 0 : self.put_rel_wal_record(
1211 0 : modification,
1212 0 : vm_rel,
1213 0 : old_vm_blk,
1214 0 : NeonWalRecord::ClearVisibilityMapFlags {
1215 0 : new_heap_blkno: None,
1216 0 : old_heap_blkno,
1217 0 : flags,
1218 0 : },
1219 0 : ctx,
1220 0 : )
1221 0 : .await?;
1222 0 : }
1223 : }
1224 0 : }
1225 0 : }
1226 :
1227 0 : Ok(())
1228 0 : }
1229 :
1230 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
1231 0 : async fn ingest_xlog_dbase_create(
1232 0 : &mut self,
1233 0 : modification: &mut DatadirModification<'_>,
1234 0 : rec: &XlCreateDatabase,
1235 0 : ctx: &RequestContext,
1236 0 : ) -> anyhow::Result<()> {
1237 0 : let db_id = rec.db_id;
1238 0 : let tablespace_id = rec.tablespace_id;
1239 0 : let src_db_id = rec.src_db_id;
1240 0 : let src_tablespace_id = rec.src_tablespace_id;
1241 :
1242 0 : let rels = modification
1243 0 : .tline
1244 0 : .list_rels(
1245 0 : src_tablespace_id,
1246 0 : src_db_id,
1247 0 : Version::Modified(modification),
1248 0 : ctx,
1249 0 : )
1250 0 : .await?;
1251 :
1252 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
1253 :
1254 : // Copy relfilemap
1255 0 : let filemap = modification
1256 0 : .tline
1257 0 : .get_relmap_file(
1258 0 : src_tablespace_id,
1259 0 : src_db_id,
1260 0 : Version::Modified(modification),
1261 0 : ctx,
1262 0 : )
1263 0 : .await?;
1264 0 : modification
1265 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
1266 0 : .await?;
1267 :
1268 0 : let mut num_rels_copied = 0;
1269 0 : let mut num_blocks_copied = 0;
1270 0 : for src_rel in rels {
1271 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
1272 0 : assert_eq!(src_rel.dbnode, src_db_id);
1273 :
1274 0 : let nblocks = modification
1275 0 : .tline
1276 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
1277 0 : .await?;
1278 0 : let dst_rel = RelTag {
1279 0 : spcnode: tablespace_id,
1280 0 : dbnode: db_id,
1281 0 : relnode: src_rel.relnode,
1282 0 : forknum: src_rel.forknum,
1283 0 : };
1284 0 :
1285 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
1286 :
1287 : // Copy content
1288 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
1289 0 : for blknum in 0..nblocks {
1290 : // Sharding:
1291 : // - src and dst are always on the same shard, because they differ only by dbNode, and
1292 : // dbNode is not included in the hash inputs for sharding.
1293 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
1294 : // that belong to it.
1295 0 : let src_key = rel_block_to_key(src_rel, blknum);
1296 0 : if !self.shard.is_key_local(&src_key) {
1297 0 : debug!(
1298 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
1299 : src_key
1300 : );
1301 0 : continue;
1302 0 : }
1303 0 : debug!(
1304 0 : "copying block {} from {} ({}) to {}",
1305 : blknum, src_rel, src_key, dst_rel
1306 : );
1307 :
1308 0 : let content = modification
1309 0 : .tline
1310 0 : .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
1311 0 : .await?;
1312 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
1313 0 : num_blocks_copied += 1;
1314 : }
1315 :
1316 0 : num_rels_copied += 1;
1317 : }
1318 :
1319 0 : info!(
1320 0 : "Created database {}/{}, copied {} blocks in {} rels",
1321 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
1322 : );
1323 0 : Ok(())
1324 0 : }
1325 :
1326 48 : async fn ingest_xlog_smgr_create(
1327 48 : &mut self,
1328 48 : modification: &mut DatadirModification<'_>,
1329 48 : rec: &XlSmgrCreate,
1330 48 : ctx: &RequestContext,
1331 48 : ) -> anyhow::Result<()> {
1332 48 : let rel = RelTag {
1333 48 : spcnode: rec.rnode.spcnode,
1334 48 : dbnode: rec.rnode.dbnode,
1335 48 : relnode: rec.rnode.relnode,
1336 48 : forknum: rec.forknum,
1337 48 : };
1338 48 : self.put_rel_creation(modification, rel, ctx).await?;
1339 48 : Ok(())
1340 48 : }
1341 :
1342 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1343 : ///
1344 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1345 0 : async fn ingest_xlog_smgr_truncate(
1346 0 : &mut self,
1347 0 : modification: &mut DatadirModification<'_>,
1348 0 : rec: &XlSmgrTruncate,
1349 0 : ctx: &RequestContext,
1350 0 : ) -> anyhow::Result<()> {
1351 0 : let spcnode = rec.rnode.spcnode;
1352 0 : let dbnode = rec.rnode.dbnode;
1353 0 : let relnode = rec.rnode.relnode;
1354 0 :
1355 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
1356 0 : let rel = RelTag {
1357 0 : spcnode,
1358 0 : dbnode,
1359 0 : relnode,
1360 0 : forknum: MAIN_FORKNUM,
1361 0 : };
1362 0 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
1363 0 : .await?;
1364 0 : }
1365 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
1366 0 : let rel = RelTag {
1367 0 : spcnode,
1368 0 : dbnode,
1369 0 : relnode,
1370 0 : forknum: FSM_FORKNUM,
1371 0 : };
1372 0 :
1373 0 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1374 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1375 0 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1376 : // Tail of last remaining FSM page has to be zeroed.
1377 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1378 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
1379 0 : fsm_physical_page_no += 1;
1380 0 : }
1381 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1382 0 : if nblocks > fsm_physical_page_no {
1383 : // check if something to do: FSM is larger than truncate position
1384 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1385 0 : .await?;
1386 0 : }
1387 0 : }
1388 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
1389 0 : let rel = RelTag {
1390 0 : spcnode,
1391 0 : dbnode,
1392 0 : relnode,
1393 0 : forknum: VISIBILITYMAP_FORKNUM,
1394 0 : };
1395 0 :
1396 0 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1397 0 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1398 : // Tail of last remaining vm page has to be zeroed.
1399 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1400 0 : modification.put_rel_page_image_zero(rel, vm_page_no)?;
1401 0 : vm_page_no += 1;
1402 0 : }
1403 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1404 0 : if nblocks > vm_page_no {
1405 : // check if something to do: VM is larger than truncate position
1406 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1407 0 : .await?;
1408 0 : }
1409 0 : }
1410 0 : Ok(())
1411 0 : }
1412 :
1413 24 : fn warn_on_ingest_lag(
1414 24 : &mut self,
1415 24 : conf: &crate::config::PageServerConf,
1416 24 : wal_timestamp: TimestampTz,
1417 24 : ) {
1418 24 : debug_assert_current_span_has_tenant_and_timeline_id();
1419 24 : let now = SystemTime::now();
1420 24 : let rate_limits = &mut self.warn_ingest_lag;
1421 :
1422 24 : let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
1423 0 : pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
1424 : });
1425 :
1426 24 : match ts {
1427 24 : Ok(ts) => {
1428 24 : match now.duration_since(ts) {
1429 24 : Ok(lag) => {
1430 24 : if lag > conf.wait_lsn_timeout {
1431 24 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
1432 6 : let lag = humantime::format_duration(lag);
1433 6 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
1434 24 : })
1435 0 : }
1436 : }
1437 0 : Err(e) => {
1438 0 : let delta_t = e.duration();
1439 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
1440 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
1441 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
1442 0 : if delta_t > IGNORED_DRIFT {
1443 0 : let delta_t = humantime::format_duration(delta_t);
1444 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
1445 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
1446 0 : })
1447 0 : }
1448 : }
1449 : };
1450 : }
1451 0 : Err(error) => {
1452 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
1453 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
1454 0 : })
1455 : }
1456 : }
1457 24 : }
1458 :
1459 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1460 : ///
1461 24 : async fn ingest_xact_record(
1462 24 : &mut self,
1463 24 : modification: &mut DatadirModification<'_>,
1464 24 : parsed: &XlXactParsedRecord,
1465 24 : is_commit: bool,
1466 24 : origin_id: u16,
1467 24 : ctx: &RequestContext,
1468 24 : ) -> anyhow::Result<()> {
1469 24 : // Record update of CLOG pages
1470 24 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1471 24 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1472 24 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1473 24 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1474 24 :
1475 24 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
1476 :
1477 24 : for subxact in &parsed.subxacts {
1478 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1479 0 : if subxact_pageno != pageno {
1480 : // This subxact goes to different page. Write the record
1481 : // for all the XIDs on the previous page, and continue
1482 : // accumulating XIDs on this new page.
1483 0 : modification.put_slru_wal_record(
1484 0 : SlruKind::Clog,
1485 0 : segno,
1486 0 : rpageno,
1487 0 : if is_commit {
1488 0 : NeonWalRecord::ClogSetCommitted {
1489 0 : xids: page_xids,
1490 0 : timestamp: parsed.xact_time,
1491 0 : }
1492 : } else {
1493 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1494 : },
1495 0 : )?;
1496 0 : page_xids = Vec::new();
1497 0 : }
1498 0 : pageno = subxact_pageno;
1499 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1500 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1501 0 : page_xids.push(*subxact);
1502 : }
1503 24 : modification.put_slru_wal_record(
1504 24 : SlruKind::Clog,
1505 24 : segno,
1506 24 : rpageno,
1507 24 : if is_commit {
1508 24 : NeonWalRecord::ClogSetCommitted {
1509 24 : xids: page_xids,
1510 24 : timestamp: parsed.xact_time,
1511 24 : }
1512 : } else {
1513 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1514 : },
1515 0 : )?;
1516 :
1517 24 : for xnode in &parsed.xnodes {
1518 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1519 0 : let rel = RelTag {
1520 0 : forknum,
1521 0 : spcnode: xnode.spcnode,
1522 0 : dbnode: xnode.dbnode,
1523 0 : relnode: xnode.relnode,
1524 0 : };
1525 0 : if modification
1526 0 : .tline
1527 0 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1528 0 : .await?
1529 : {
1530 0 : self.put_rel_drop(modification, rel, ctx).await?;
1531 0 : }
1532 : }
1533 : }
1534 24 : if origin_id != 0 {
1535 0 : modification
1536 0 : .set_replorigin(origin_id, parsed.origin_lsn)
1537 0 : .await?;
1538 24 : }
1539 24 : Ok(())
1540 24 : }
1541 :
1542 0 : async fn ingest_clog_truncate_record(
1543 0 : &mut self,
1544 0 : modification: &mut DatadirModification<'_>,
1545 0 : xlrec: &XlClogTruncate,
1546 0 : ctx: &RequestContext,
1547 0 : ) -> anyhow::Result<()> {
1548 0 : info!(
1549 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1550 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
1551 : );
1552 :
1553 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
1554 : // truncated, but a checkpoint record with the updated values isn't written until
1555 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
1556 : // so we keep the oldestXid and oldestXidDB up-to-date.
1557 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1558 0 : cp.oldestXid = xlrec.oldest_xid;
1559 0 : cp.oldestXidDB = xlrec.oldest_xid_db;
1560 0 : });
1561 0 : self.checkpoint_modified = true;
1562 :
1563 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1564 :
1565 0 : let latest_page_number =
1566 0 : enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
1567 : / pg_constants::CLOG_XACTS_PER_PAGE;
1568 :
1569 : // Now delete all segments containing pages between xlrec.pageno
1570 : // and latest_page_number.
1571 :
1572 : // First, make an important safety check:
1573 : // the current endpoint page must not be eligible for removal.
1574 : // See SimpleLruTruncate() in slru.c
1575 0 : if dispatch_pgversion!(modification.tline.pg_version, {
1576 0 : pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno)
1577 : }) {
1578 0 : info!("could not truncate directory pg_xact apparent wraparound");
1579 0 : return Ok(());
1580 0 : }
1581 :
1582 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1583 : //
1584 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1585 : // will block waiting for the last valid LSN to advance up to
1586 : // it. So we use the previous record's LSN in the get calls
1587 : // instead.
1588 0 : for segno in modification
1589 0 : .tline
1590 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
1591 0 : .await?
1592 : {
1593 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1594 :
1595 0 : let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
1596 0 : pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno)
1597 : });
1598 :
1599 0 : if may_delete {
1600 0 : modification
1601 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1602 0 : .await?;
1603 0 : trace!("Drop CLOG segment {:>04X}", segno);
1604 0 : }
1605 : }
1606 :
1607 0 : Ok(())
1608 0 : }
1609 :
1610 0 : fn ingest_multixact_create_record(
1611 0 : &mut self,
1612 0 : modification: &mut DatadirModification,
1613 0 : xlrec: &XlMultiXactCreate,
1614 0 : ) -> Result<()> {
1615 0 : // Create WAL record for updating the multixact-offsets page
1616 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1617 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1618 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1619 0 :
1620 0 : modification.put_slru_wal_record(
1621 0 : SlruKind::MultiXactOffsets,
1622 0 : segno,
1623 0 : rpageno,
1624 0 : NeonWalRecord::MultixactOffsetCreate {
1625 0 : mid: xlrec.mid,
1626 0 : moff: xlrec.moff,
1627 0 : },
1628 0 : )?;
1629 :
1630 : // Create WAL records for the update of each affected multixact-members page
1631 0 : let mut members = xlrec.members.iter();
1632 0 : let mut offset = xlrec.moff;
1633 : loop {
1634 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1635 0 :
1636 0 : // How many members fit on this page?
1637 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1638 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1639 0 :
1640 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1641 0 : for _ in 0..page_remain {
1642 0 : if let Some(m) = members.next() {
1643 0 : this_page_members.push(m.clone());
1644 0 : } else {
1645 0 : break;
1646 : }
1647 : }
1648 0 : if this_page_members.is_empty() {
1649 : // all done
1650 0 : break;
1651 0 : }
1652 0 : let n_this_page = this_page_members.len();
1653 0 :
1654 0 : modification.put_slru_wal_record(
1655 0 : SlruKind::MultiXactMembers,
1656 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1657 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1658 0 : NeonWalRecord::MultixactMembersCreate {
1659 0 : moff: offset,
1660 0 : members: this_page_members,
1661 0 : },
1662 0 : )?;
1663 :
1664 : // Note: The multixact members can wrap around, even within one WAL record.
1665 0 : offset = offset.wrapping_add(n_this_page as u32);
1666 : }
1667 0 : let next_offset = offset;
1668 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
1669 :
1670 : // Update next-multi-xid and next-offset
1671 : //
1672 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
1673 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
1674 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
1675 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
1676 : // is stored, so it's never 0 in a checkpoint.
1677 : //
1678 : // I don't know why it's done that way, it seems less error-prone to skip over 0
1679 : // when the value is stored rather than when it's read. But let's do it the same
1680 : // way here.
1681 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
1682 0 :
1683 0 : if self
1684 0 : .checkpoint
1685 0 : .update_next_multixid(next_multi_xid, next_offset)
1686 0 : {
1687 0 : self.checkpoint_modified = true;
1688 0 : }
1689 :
1690 : // Also update the next-xid with the highest member. According to the comments in
1691 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
1692 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1693 0 : if let Some(max_xid) = acc {
1694 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1695 0 : Some(mbr.xid)
1696 : } else {
1697 0 : acc
1698 : }
1699 : } else {
1700 0 : Some(mbr.xid)
1701 : }
1702 0 : });
1703 :
1704 0 : if let Some(max_xid) = max_mbr_xid {
1705 0 : if self.checkpoint.update_next_xid(max_xid) {
1706 0 : self.checkpoint_modified = true;
1707 0 : }
1708 0 : }
1709 0 : Ok(())
1710 0 : }
1711 :
1712 0 : async fn ingest_multixact_truncate_record(
1713 0 : &mut self,
1714 0 : modification: &mut DatadirModification<'_>,
1715 0 : xlrec: &XlMultiXactTruncate,
1716 0 : ctx: &RequestContext,
1717 0 : ) -> Result<()> {
1718 0 : let (maxsegment, startsegment, endsegment) =
1719 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1720 0 : cp.oldestMulti = xlrec.end_trunc_off;
1721 0 : cp.oldestMultiDB = xlrec.oldest_multi_db;
1722 0 : let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
1723 0 : pg_constants::MAX_MULTIXACT_OFFSET,
1724 0 : );
1725 0 : let startsegment: i32 =
1726 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
1727 0 : let endsegment: i32 =
1728 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
1729 0 : (maxsegment, startsegment, endsegment)
1730 : });
1731 :
1732 0 : self.checkpoint_modified = true;
1733 0 :
1734 0 : // PerformMembersTruncation
1735 0 : let mut segment: i32 = startsegment;
1736 :
1737 : // Delete all the segments except the last one. The last segment can still
1738 : // contain, possibly partially, valid data.
1739 0 : while segment != endsegment {
1740 0 : modification
1741 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1742 0 : .await?;
1743 :
1744 : /* move to next segment, handling wraparound correctly */
1745 0 : if segment == maxsegment {
1746 0 : segment = 0;
1747 0 : } else {
1748 0 : segment += 1;
1749 0 : }
1750 : }
1751 :
1752 : // Truncate offsets
1753 : // FIXME: this did not handle wraparound correctly
1754 :
1755 0 : Ok(())
1756 0 : }
1757 :
1758 0 : async fn ingest_relmap_page(
1759 0 : &mut self,
1760 0 : modification: &mut DatadirModification<'_>,
1761 0 : xlrec: &XlRelmapUpdate,
1762 0 : decoded: &DecodedWALRecord,
1763 0 : ctx: &RequestContext,
1764 0 : ) -> Result<()> {
1765 0 : let mut buf = decoded.record.clone();
1766 0 : buf.advance(decoded.main_data_offset);
1767 0 : // skip xl_relmap_update
1768 0 : buf.advance(12);
1769 0 :
1770 0 : modification
1771 0 : .put_relmap_file(
1772 0 : xlrec.tsid,
1773 0 : xlrec.dbid,
1774 0 : Bytes::copy_from_slice(&buf[..]),
1775 0 : ctx,
1776 0 : )
1777 0 : .await
1778 0 : }
1779 :
1780 54 : async fn put_rel_creation(
1781 54 : &mut self,
1782 54 : modification: &mut DatadirModification<'_>,
1783 54 : rel: RelTag,
1784 54 : ctx: &RequestContext,
1785 54 : ) -> Result<()> {
1786 54 : modification.put_rel_creation(rel, 0, ctx).await?;
1787 54 : Ok(())
1788 54 : }
1789 :
1790 817278 : async fn put_rel_page_image(
1791 817278 : &mut self,
1792 817278 : modification: &mut DatadirModification<'_>,
1793 817278 : rel: RelTag,
1794 817278 : blknum: BlockNumber,
1795 817278 : img: Bytes,
1796 817278 : ctx: &RequestContext,
1797 817278 : ) -> Result<(), PageReconstructError> {
1798 817278 : self.handle_rel_extend(modification, rel, blknum, ctx)
1799 16275 : .await?;
1800 817278 : modification.put_rel_page_image(rel, blknum, img)?;
1801 817278 : Ok(())
1802 817278 : }
1803 :
1804 436890 : async fn put_rel_wal_record(
1805 436890 : &mut self,
1806 436890 : modification: &mut DatadirModification<'_>,
1807 436890 : rel: RelTag,
1808 436890 : blknum: BlockNumber,
1809 436890 : rec: NeonWalRecord,
1810 436890 : ctx: &RequestContext,
1811 436890 : ) -> Result<()> {
1812 436890 : self.handle_rel_extend(modification, rel, blknum, ctx)
1813 837 : .await?;
1814 436890 : modification.put_rel_wal_record(rel, blknum, rec)?;
1815 436890 : Ok(())
1816 436890 : }
1817 :
1818 18036 : async fn put_rel_truncation(
1819 18036 : &mut self,
1820 18036 : modification: &mut DatadirModification<'_>,
1821 18036 : rel: RelTag,
1822 18036 : nblocks: BlockNumber,
1823 18036 : ctx: &RequestContext,
1824 18036 : ) -> anyhow::Result<()> {
1825 18036 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1826 18036 : Ok(())
1827 18036 : }
1828 :
1829 6 : async fn put_rel_drop(
1830 6 : &mut self,
1831 6 : modification: &mut DatadirModification<'_>,
1832 6 : rel: RelTag,
1833 6 : ctx: &RequestContext,
1834 6 : ) -> Result<()> {
1835 6 : modification.put_rel_drop(rel, ctx).await?;
1836 6 : Ok(())
1837 6 : }
1838 :
1839 1254168 : async fn handle_rel_extend(
1840 1254168 : &mut self,
1841 1254168 : modification: &mut DatadirModification<'_>,
1842 1254168 : rel: RelTag,
1843 1254168 : blknum: BlockNumber,
1844 1254168 : ctx: &RequestContext,
1845 1254168 : ) -> Result<(), PageReconstructError> {
1846 1254168 : let new_nblocks = blknum + 1;
1847 : // Check if the relation exists. We implicitly create relations on first
1848 : // record.
1849 : // TODO: would be nice if to be more explicit about it
1850 :
1851 : // Get current size and put rel creation if rel doesn't exist
1852 : //
1853 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1854 : // check the cache too. This is because eagerly checking the cache results in
1855 : // less work overall and 10% better performance. It's more work on cache miss
1856 : // but cache miss is rare.
1857 1254168 : let old_nblocks = if let Some(nblocks) = modification
1858 1254168 : .tline
1859 1254168 : .get_cached_rel_size(&rel, modification.get_lsn())
1860 : {
1861 1254138 : nblocks
1862 30 : } else if !modification
1863 30 : .tline
1864 30 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1865 6 : .await?
1866 : {
1867 : // create it with 0 size initially, the logic below will extend it
1868 30 : modification
1869 30 : .put_rel_creation(rel, 0, ctx)
1870 9 : .await
1871 30 : .context("Relation Error")?;
1872 30 : 0
1873 : } else {
1874 0 : modification
1875 0 : .tline
1876 0 : .get_rel_size(rel, Version::Modified(modification), ctx)
1877 0 : .await?
1878 : };
1879 :
1880 1254168 : if new_nblocks > old_nblocks {
1881 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1882 824364 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1883 :
1884 824364 : let mut key = rel_block_to_key(rel, blknum);
1885 : // fill the gap with zeros
1886 824364 : for gap_blknum in old_nblocks..blknum {
1887 8994 : key.field6 = gap_blknum;
1888 8994 :
1889 8994 : if self.shard.get_shard_number(&key) != self.shard.number {
1890 0 : continue;
1891 8994 : }
1892 8994 :
1893 8994 : modification.put_rel_page_image_zero(rel, gap_blknum)?;
1894 : }
1895 429804 : }
1896 1254168 : Ok(())
1897 1254168 : }
1898 :
1899 0 : async fn put_slru_page_image(
1900 0 : &mut self,
1901 0 : modification: &mut DatadirModification<'_>,
1902 0 : kind: SlruKind,
1903 0 : segno: u32,
1904 0 : blknum: BlockNumber,
1905 0 : img: Bytes,
1906 0 : ctx: &RequestContext,
1907 0 : ) -> Result<()> {
1908 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1909 0 : .await?;
1910 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1911 0 : Ok(())
1912 0 : }
1913 :
1914 0 : async fn handle_slru_extend(
1915 0 : &mut self,
1916 0 : modification: &mut DatadirModification<'_>,
1917 0 : kind: SlruKind,
1918 0 : segno: u32,
1919 0 : blknum: BlockNumber,
1920 0 : ctx: &RequestContext,
1921 0 : ) -> anyhow::Result<()> {
1922 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1923 0 : // extended with ZEROPAGE records, not with commit records, so it happens
1924 0 : // a lot less frequently.
1925 0 :
1926 0 : let new_nblocks = blknum + 1;
1927 : // Check if the relation exists. We implicitly create relations on first
1928 : // record.
1929 : // TODO: would be nice if to be more explicit about it
1930 0 : let old_nblocks = if !modification
1931 0 : .tline
1932 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1933 0 : .await?
1934 : {
1935 : // create it with 0 size initially, the logic below will extend it
1936 0 : modification
1937 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1938 0 : .await?;
1939 0 : 0
1940 : } else {
1941 0 : modification
1942 0 : .tline
1943 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1944 0 : .await?
1945 : };
1946 :
1947 0 : if new_nblocks > old_nblocks {
1948 0 : trace!(
1949 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1950 : kind,
1951 : segno,
1952 : old_nblocks,
1953 : new_nblocks
1954 : );
1955 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1956 :
1957 : // fill the gap with zeros
1958 0 : for gap_blknum in old_nblocks..blknum {
1959 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
1960 : }
1961 0 : }
1962 0 : Ok(())
1963 0 : }
1964 : }
1965 :
1966 36 : async fn get_relsize(
1967 36 : modification: &DatadirModification<'_>,
1968 36 : rel: RelTag,
1969 36 : ctx: &RequestContext,
1970 36 : ) -> Result<BlockNumber, PageReconstructError> {
1971 36 : let nblocks = if !modification
1972 36 : .tline
1973 36 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1974 0 : .await?
1975 : {
1976 0 : 0
1977 : } else {
1978 36 : modification
1979 36 : .tline
1980 36 : .get_rel_size(rel, Version::Modified(modification), ctx)
1981 0 : .await?
1982 : };
1983 36 : Ok(nblocks)
1984 36 : }
1985 :
1986 : #[allow(clippy::bool_assert_comparison)]
1987 : #[cfg(test)]
1988 : mod tests {
1989 : use super::*;
1990 : use crate::tenant::harness::*;
1991 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
1992 : use postgres_ffi::RELSEG_SIZE;
1993 :
1994 : use crate::DEFAULT_PG_VERSION;
1995 :
1996 : /// Arbitrary relation tag, for testing.
1997 : const TESTREL_A: RelTag = RelTag {
1998 : spcnode: 0,
1999 : dbnode: 111,
2000 : relnode: 1000,
2001 : forknum: 0,
2002 : };
2003 :
2004 36 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
2005 36 : // TODO
2006 36 : }
2007 :
2008 : #[tokio::test]
2009 6 : async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
2010 24 : for i in 14..=16 {
2011 18 : dispatch_pgversion!(i, {
2012 6 : pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
2013 6 : });
2014 6 : }
2015 6 :
2016 6 : Ok(())
2017 6 : }
2018 :
2019 24 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
2020 24 : let mut m = tline.begin_modification(Lsn(0x10));
2021 24 : m.put_checkpoint(dispatch_pgversion!(
2022 24 : tline.pg_version,
2023 0 : pgv::ZERO_CHECKPOINT.clone()
2024 0 : ))?;
2025 48 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
2026 24 : m.commit(ctx).await?;
2027 24 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
2028 :
2029 24 : Ok(walingest)
2030 24 : }
2031 :
2032 : #[tokio::test]
2033 6 : async fn test_relsize() -> Result<()> {
2034 24 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
2035 6 : let tline = tenant
2036 6 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2037 12 : .await?;
2038 15 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2039 6 :
2040 6 : let mut m = tline.begin_modification(Lsn(0x20));
2041 6 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
2042 6 : walingest
2043 6 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
2044 6 : .await?;
2045 6 : m.on_record_end();
2046 6 : m.commit(&ctx).await?;
2047 6 : let mut m = tline.begin_modification(Lsn(0x30));
2048 6 : walingest
2049 6 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
2050 6 : .await?;
2051 6 : m.on_record_end();
2052 6 : m.commit(&ctx).await?;
2053 6 : let mut m = tline.begin_modification(Lsn(0x40));
2054 6 : walingest
2055 6 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
2056 6 : .await?;
2057 6 : m.on_record_end();
2058 6 : m.commit(&ctx).await?;
2059 6 : let mut m = tline.begin_modification(Lsn(0x50));
2060 6 : walingest
2061 6 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
2062 6 : .await?;
2063 6 : m.on_record_end();
2064 6 : m.commit(&ctx).await?;
2065 6 :
2066 6 : assert_current_logical_size(&tline, Lsn(0x50));
2067 6 :
2068 6 : // The relation was created at LSN 2, not visible at LSN 1 yet.
2069 6 : assert_eq!(
2070 6 : tline
2071 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2072 6 : .await?,
2073 6 : false
2074 6 : );
2075 6 : assert!(tline
2076 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2077 6 : .await
2078 6 : .is_err());
2079 6 : assert_eq!(
2080 6 : tline
2081 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2082 6 : .await?,
2083 6 : true
2084 6 : );
2085 6 : assert_eq!(
2086 6 : tline
2087 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2088 6 : .await?,
2089 6 : 1
2090 6 : );
2091 6 : assert_eq!(
2092 6 : tline
2093 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2094 6 : .await?,
2095 6 : 3
2096 6 : );
2097 6 :
2098 6 : // Check page contents at each LSN
2099 6 : assert_eq!(
2100 6 : tline
2101 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
2102 6 : .await?,
2103 6 : test_img("foo blk 0 at 2")
2104 6 : );
2105 6 :
2106 6 : assert_eq!(
2107 6 : tline
2108 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
2109 6 : .await?,
2110 6 : test_img("foo blk 0 at 3")
2111 6 : );
2112 6 :
2113 6 : assert_eq!(
2114 6 : tline
2115 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
2116 6 : .await?,
2117 6 : test_img("foo blk 0 at 3")
2118 6 : );
2119 6 : assert_eq!(
2120 6 : tline
2121 6 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
2122 6 : .await?,
2123 6 : test_img("foo blk 1 at 4")
2124 6 : );
2125 6 :
2126 6 : assert_eq!(
2127 6 : tline
2128 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
2129 6 : .await?,
2130 6 : test_img("foo blk 0 at 3")
2131 6 : );
2132 6 : assert_eq!(
2133 6 : tline
2134 6 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
2135 6 : .await?,
2136 6 : test_img("foo blk 1 at 4")
2137 6 : );
2138 6 : assert_eq!(
2139 6 : tline
2140 6 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
2141 6 : .await?,
2142 6 : test_img("foo blk 2 at 5")
2143 6 : );
2144 6 :
2145 6 : // Truncate last block
2146 6 : let mut m = tline.begin_modification(Lsn(0x60));
2147 6 : walingest
2148 6 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
2149 6 : .await?;
2150 6 : m.commit(&ctx).await?;
2151 6 : assert_current_logical_size(&tline, Lsn(0x60));
2152 6 :
2153 6 : // Check reported size and contents after truncation
2154 6 : assert_eq!(
2155 6 : tline
2156 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2157 6 : .await?,
2158 6 : 2
2159 6 : );
2160 6 : assert_eq!(
2161 6 : tline
2162 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
2163 6 : .await?,
2164 6 : test_img("foo blk 0 at 3")
2165 6 : );
2166 6 : assert_eq!(
2167 6 : tline
2168 6 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
2169 6 : .await?,
2170 6 : test_img("foo blk 1 at 4")
2171 6 : );
2172 6 :
2173 6 : // should still see the truncated block with older LSN
2174 6 : assert_eq!(
2175 6 : tline
2176 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2177 6 : .await?,
2178 6 : 3
2179 6 : );
2180 6 : assert_eq!(
2181 6 : tline
2182 6 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
2183 6 : .await?,
2184 6 : test_img("foo blk 2 at 5")
2185 6 : );
2186 6 :
2187 6 : // Truncate to zero length
2188 6 : let mut m = tline.begin_modification(Lsn(0x68));
2189 6 : walingest
2190 6 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
2191 6 : .await?;
2192 6 : m.commit(&ctx).await?;
2193 6 : assert_eq!(
2194 6 : tline
2195 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
2196 6 : .await?,
2197 6 : 0
2198 6 : );
2199 6 :
2200 6 : // Extend from 0 to 2 blocks, leaving a gap
2201 6 : let mut m = tline.begin_modification(Lsn(0x70));
2202 6 : walingest
2203 6 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
2204 6 : .await?;
2205 6 : m.on_record_end();
2206 6 : m.commit(&ctx).await?;
2207 6 : assert_eq!(
2208 6 : tline
2209 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
2210 6 : .await?,
2211 6 : 2
2212 6 : );
2213 6 : assert_eq!(
2214 6 : tline
2215 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
2216 6 : .await?,
2217 6 : ZERO_PAGE
2218 6 : );
2219 6 : assert_eq!(
2220 6 : tline
2221 6 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
2222 6 : .await?,
2223 6 : test_img("foo blk 1")
2224 6 : );
2225 6 :
2226 6 : // Extend a lot more, leaving a big gap that spans across segments
2227 6 : let mut m = tline.begin_modification(Lsn(0x80));
2228 6 : walingest
2229 6 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
2230 6 : .await?;
2231 6 : m.on_record_end();
2232 570 : m.commit(&ctx).await?;
2233 6 : assert_eq!(
2234 6 : tline
2235 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2236 6 : .await?,
2237 6 : 1501
2238 6 : );
2239 8994 : for blk in 2..1500 {
2240 8988 : assert_eq!(
2241 8988 : tline
2242 8988 : .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
2243 4620 : .await?,
2244 8988 : ZERO_PAGE
2245 6 : );
2246 6 : }
2247 6 : assert_eq!(
2248 6 : tline
2249 6 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
2250 6 : .await?,
2251 6 : test_img("foo blk 1500")
2252 6 : );
2253 6 :
2254 6 : Ok(())
2255 6 : }
2256 :
2257 : // Test what happens if we dropped a relation
2258 : // and then created it again within the same layer.
2259 : #[tokio::test]
2260 6 : async fn test_drop_extend() -> Result<()> {
2261 6 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
2262 6 : .await?
2263 6 : .load()
2264 24 : .await;
2265 6 : let tline = tenant
2266 6 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2267 12 : .await?;
2268 15 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2269 6 :
2270 6 : let mut m = tline.begin_modification(Lsn(0x20));
2271 6 : walingest
2272 6 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
2273 6 : .await?;
2274 6 : m.commit(&ctx).await?;
2275 6 :
2276 6 : // Check that rel exists and size is correct
2277 6 : assert_eq!(
2278 6 : tline
2279 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2280 6 : .await?,
2281 6 : true
2282 6 : );
2283 6 : assert_eq!(
2284 6 : tline
2285 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2286 6 : .await?,
2287 6 : 1
2288 6 : );
2289 6 :
2290 6 : // Drop rel
2291 6 : let mut m = tline.begin_modification(Lsn(0x30));
2292 6 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
2293 6 : m.commit(&ctx).await?;
2294 6 :
2295 6 : // Check that rel is not visible anymore
2296 6 : assert_eq!(
2297 6 : tline
2298 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
2299 6 : .await?,
2300 6 : false
2301 6 : );
2302 6 :
2303 6 : // FIXME: should fail
2304 6 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
2305 6 :
2306 6 : // Re-create it
2307 6 : let mut m = tline.begin_modification(Lsn(0x40));
2308 6 : walingest
2309 6 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
2310 6 : .await?;
2311 6 : m.commit(&ctx).await?;
2312 6 :
2313 6 : // Check that rel exists and size is correct
2314 6 : assert_eq!(
2315 6 : tline
2316 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2317 6 : .await?,
2318 6 : true
2319 6 : );
2320 6 : assert_eq!(
2321 6 : tline
2322 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2323 6 : .await?,
2324 6 : 1
2325 6 : );
2326 6 :
2327 6 : Ok(())
2328 6 : }
2329 :
2330 : // Test what happens if we truncated a relation
2331 : // so that one of its segments was dropped
2332 : // and then extended it again within the same layer.
2333 : #[tokio::test]
2334 6 : async fn test_truncate_extend() -> Result<()> {
2335 6 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
2336 6 : .await?
2337 6 : .load()
2338 24 : .await;
2339 6 : let tline = tenant
2340 6 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2341 12 : .await?;
2342 15 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2343 6 :
2344 6 : // Create a 20 MB relation (the size is arbitrary)
2345 6 : let relsize = 20 * 1024 * 1024 / 8192;
2346 6 : let mut m = tline.begin_modification(Lsn(0x20));
2347 15360 : for blkno in 0..relsize {
2348 15360 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
2349 15360 : walingest
2350 15360 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2351 6 : .await?;
2352 6 : }
2353 6 : m.commit(&ctx).await?;
2354 6 :
2355 6 : // The relation was created at LSN 20, not visible at LSN 1 yet.
2356 6 : assert_eq!(
2357 6 : tline
2358 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2359 6 : .await?,
2360 6 : false
2361 6 : );
2362 6 : assert!(tline
2363 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2364 6 : .await
2365 6 : .is_err());
2366 6 :
2367 6 : assert_eq!(
2368 6 : tline
2369 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2370 6 : .await?,
2371 6 : true
2372 6 : );
2373 6 : assert_eq!(
2374 6 : tline
2375 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2376 6 : .await?,
2377 6 : relsize
2378 6 : );
2379 6 :
2380 6 : // Check relation content
2381 15360 : for blkno in 0..relsize {
2382 15360 : let lsn = Lsn(0x20);
2383 15360 : let data = format!("foo blk {} at {}", blkno, lsn);
2384 15360 : assert_eq!(
2385 15360 : tline
2386 15360 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
2387 5409 : .await?,
2388 15360 : test_img(&data)
2389 6 : );
2390 6 : }
2391 6 :
2392 6 : // Truncate relation so that second segment was dropped
2393 6 : // - only leave one page
2394 6 : let mut m = tline.begin_modification(Lsn(0x60));
2395 6 : walingest
2396 6 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2397 6 : .await?;
2398 6 : m.commit(&ctx).await?;
2399 6 :
2400 6 : // Check reported size and contents after truncation
2401 6 : assert_eq!(
2402 6 : tline
2403 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2404 6 : .await?,
2405 6 : 1
2406 6 : );
2407 6 :
2408 12 : for blkno in 0..1 {
2409 6 : let lsn = Lsn(0x20);
2410 6 : let data = format!("foo blk {} at {}", blkno, lsn);
2411 6 : assert_eq!(
2412 6 : tline
2413 6 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
2414 6 : .await?,
2415 6 : test_img(&data)
2416 6 : );
2417 6 : }
2418 6 :
2419 6 : // should still see all blocks with older LSN
2420 6 : assert_eq!(
2421 6 : tline
2422 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2423 6 : .await?,
2424 6 : relsize
2425 6 : );
2426 15360 : for blkno in 0..relsize {
2427 15360 : let lsn = Lsn(0x20);
2428 15360 : let data = format!("foo blk {} at {}", blkno, lsn);
2429 15360 : assert_eq!(
2430 15360 : tline
2431 15360 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
2432 5568 : .await?,
2433 15360 : test_img(&data)
2434 6 : );
2435 6 : }
2436 6 :
2437 6 : // Extend relation again.
2438 6 : // Add enough blocks to create second segment
2439 6 : let lsn = Lsn(0x80);
2440 6 : let mut m = tline.begin_modification(lsn);
2441 15360 : for blkno in 0..relsize {
2442 15360 : let data = format!("foo blk {} at {}", blkno, lsn);
2443 15360 : walingest
2444 15360 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2445 6 : .await?;
2446 6 : }
2447 9 : m.commit(&ctx).await?;
2448 6 :
2449 6 : assert_eq!(
2450 6 : tline
2451 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2452 6 : .await?,
2453 6 : true
2454 6 : );
2455 6 : assert_eq!(
2456 6 : tline
2457 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2458 6 : .await?,
2459 6 : relsize
2460 6 : );
2461 6 : // Check relation content
2462 15360 : for blkno in 0..relsize {
2463 15360 : let lsn = Lsn(0x80);
2464 15360 : let data = format!("foo blk {} at {}", blkno, lsn);
2465 15360 : assert_eq!(
2466 15360 : tline
2467 15360 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
2468 5484 : .await?,
2469 15360 : test_img(&data)
2470 6 : );
2471 6 : }
2472 6 :
2473 6 : Ok(())
2474 6 : }
2475 :
2476 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2477 : /// split into multiple 1 GB segments in Postgres.
2478 : #[tokio::test]
2479 6 : async fn test_large_rel() -> Result<()> {
2480 24 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
2481 6 : let tline = tenant
2482 6 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2483 12 : .await?;
2484 15 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2485 6 :
2486 6 : let mut lsn = 0x10;
2487 786438 : for blknum in 0..RELSEG_SIZE + 1 {
2488 786438 : lsn += 0x10;
2489 786438 : let mut m = tline.begin_modification(Lsn(lsn));
2490 786438 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2491 786438 : walingest
2492 786438 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2493 16260 : .await?;
2494 786438 : m.commit(&ctx).await?;
2495 6 : }
2496 6 :
2497 6 : assert_current_logical_size(&tline, Lsn(lsn));
2498 6 :
2499 6 : assert_eq!(
2500 6 : tline
2501 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2502 6 : .await?,
2503 6 : RELSEG_SIZE + 1
2504 6 : );
2505 6 :
2506 6 : // Truncate one block
2507 6 : lsn += 0x10;
2508 6 : let mut m = tline.begin_modification(Lsn(lsn));
2509 6 : walingest
2510 6 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2511 6 : .await?;
2512 6 : m.commit(&ctx).await?;
2513 6 : assert_eq!(
2514 6 : tline
2515 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2516 6 : .await?,
2517 6 : RELSEG_SIZE
2518 6 : );
2519 6 : assert_current_logical_size(&tline, Lsn(lsn));
2520 6 :
2521 6 : // Truncate another block
2522 6 : lsn += 0x10;
2523 6 : let mut m = tline.begin_modification(Lsn(lsn));
2524 6 : walingest
2525 6 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2526 6 : .await?;
2527 6 : m.commit(&ctx).await?;
2528 6 : assert_eq!(
2529 6 : tline
2530 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2531 6 : .await?,
2532 6 : RELSEG_SIZE - 1
2533 6 : );
2534 6 : assert_current_logical_size(&tline, Lsn(lsn));
2535 6 :
2536 6 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2537 6 : // This tests the behavior at segment boundaries
2538 6 : let mut size: i32 = 3000;
2539 18012 : while size >= 0 {
2540 18006 : lsn += 0x10;
2541 18006 : let mut m = tline.begin_modification(Lsn(lsn));
2542 18006 : walingest
2543 18006 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2544 6 : .await?;
2545 18006 : m.commit(&ctx).await?;
2546 18006 : assert_eq!(
2547 18006 : tline
2548 18006 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2549 6 : .await?,
2550 18006 : size as BlockNumber
2551 6 : );
2552 6 :
2553 18006 : size -= 1;
2554 6 : }
2555 6 : assert_current_logical_size(&tline, Lsn(lsn));
2556 6 :
2557 6 : Ok(())
2558 6 : }
2559 :
2560 : /// Replay a wal segment file taken directly from safekeepers.
2561 : ///
2562 : /// This test is useful for benchmarking since it allows us to profile only
2563 : /// the walingest code in a single-threaded executor, and iterate more quickly
2564 : /// without waiting for unrelated steps.
2565 : #[tokio::test]
2566 6 : async fn test_ingest_real_wal() {
2567 6 : use crate::tenant::harness::*;
2568 6 : use postgres_ffi::waldecoder::WalStreamDecoder;
2569 6 : use postgres_ffi::WAL_SEGMENT_SIZE;
2570 6 :
2571 6 : // Define test data path and constants.
2572 6 : //
2573 6 : // Steps to reconstruct the data, if needed:
2574 6 : // 1. Run the pgbench python test
2575 6 : // 2. Take the first wal segment file from safekeeper
2576 6 : // 3. Compress it using `zstd --long input_file`
2577 6 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2578 6 : // 5. Grep sk logs for "restart decoder" to get startpoint
2579 6 : // 6. Run just the decoder from this test to get the endpoint.
2580 6 : // It's the last LSN the decoder will output.
2581 6 : let pg_version = 15; // The test data was generated by pg15
2582 6 : let path = "test_data/sk_wal_segment_from_pgbench";
2583 6 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2584 6 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2585 6 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2586 6 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2587 6 :
2588 6 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
2589 6 : let span = harness
2590 6 : .span()
2591 6 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
2592 24 : let (tenant, ctx) = harness.load().await;
2593 6 :
2594 6 : let remote_initdb_path =
2595 6 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2596 6 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2597 6 :
2598 6 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2599 6 : .expect("creating test dir should work");
2600 6 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2601 6 :
2602 6 : // Bootstrap a real timeline. We can't use create_test_timeline because
2603 6 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2604 6 : // the garbage data.
2605 6 : let tline = tenant
2606 6 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2607 60904 : .await
2608 6 : .unwrap();
2609 6 :
2610 6 : // We fully read and decompress this into memory before decoding
2611 6 : // to get a more accurate perf profile of the decoder.
2612 6 : let bytes = {
2613 6 : use async_compression::tokio::bufread::ZstdDecoder;
2614 6 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2615 6 : let reader = tokio::io::BufReader::new(file);
2616 6 : let decoder = ZstdDecoder::new(reader);
2617 6 : let mut reader = tokio::io::BufReader::new(decoder);
2618 6 : let mut buffer = Vec::new();
2619 666 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2620 6 : buffer
2621 6 : };
2622 6 :
2623 6 : // TODO start a profiler too
2624 6 : let started_at = std::time::Instant::now();
2625 6 :
2626 6 : // Initialize walingest
2627 6 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2628 6 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2629 6 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2630 15 : .await
2631 6 : .unwrap();
2632 6 : let mut modification = tline.begin_modification(startpoint);
2633 6 : println!("decoding {} bytes", bytes.len() - xlogoff);
2634 6 :
2635 6 : // Decode and ingest wal. We process the wal in chunks because
2636 6 : // that's what happens when we get bytes from safekeepers.
2637 1424058 : for chunk in bytes[xlogoff..].chunks(50) {
2638 1424058 : decoder.feed_bytes(chunk);
2639 1861608 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2640 437550 : let mut decoded = DecodedWALRecord::default();
2641 437550 : decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
2642 437550 : walingest
2643 437550 : .ingest_record(decoded, lsn, &mut modification, &ctx)
2644 437550 : .instrument(span.clone())
2645 888 : .await
2646 437550 : .unwrap();
2647 6 : }
2648 1424058 : modification.commit(&ctx).await.unwrap();
2649 6 : }
2650 6 :
2651 6 : let duration = started_at.elapsed();
2652 6 : println!("done in {:?}", duration);
2653 6 : }
2654 : }
|