Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use pageserver_api::shard::ShardIdentity;
25 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
26 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
27 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
28 :
29 : use anyhow::{bail, Context, Result};
30 : use bytes::{Buf, Bytes, BytesMut};
31 : use tracing::*;
32 : use utils::failpoint_support;
33 :
34 : use crate::context::RequestContext;
35 : use crate::metrics::WAL_INGEST;
36 : use crate::pgdatadir_mapping::{DatadirModification, Version};
37 : use crate::tenant::PageReconstructError;
38 : use crate::tenant::Timeline;
39 : use crate::walrecord::*;
40 : use crate::ZERO_PAGE;
41 : use pageserver_api::key::rel_block_to_key;
42 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
43 : use postgres_ffi::pg_constants;
44 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
45 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
46 : use postgres_ffi::v14::xlog_utils::*;
47 : use postgres_ffi::v14::CheckPoint;
48 : use postgres_ffi::TransactionId;
49 : use postgres_ffi::BLCKSZ;
50 : use utils::lsn::Lsn;
51 :
52 : pub struct WalIngest {
53 : shard: ShardIdentity,
54 : checkpoint: CheckPoint,
55 : checkpoint_modified: bool,
56 : }
57 :
58 : impl WalIngest {
59 1346 : pub async fn new(
60 1346 : timeline: &Timeline,
61 1346 : startpoint: Lsn,
62 1346 : ctx: &RequestContext,
63 1346 : ) -> anyhow::Result<WalIngest> {
64 : // Fetch the latest checkpoint into memory, so that we can compare with it
65 : // quickly in `ingest_record` and update it when it changes.
66 1346 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
67 1333 : let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
68 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
69 :
70 1333 : Ok(WalIngest {
71 1333 : shard: *timeline.get_shard_identity(),
72 1333 : checkpoint,
73 1333 : checkpoint_modified: false,
74 1333 : })
75 1346 : }
76 :
77 : ///
78 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
79 : ///
80 : /// This function updates `lsn` field of `DatadirModification`
81 : ///
82 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
83 : /// relations/pages that the record affects.
84 : ///
85 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
86 : ///
87 73215430 : pub async fn ingest_record(
88 73215430 : &mut self,
89 73215430 : recdata: Bytes,
90 73215430 : lsn: Lsn,
91 73215430 : modification: &mut DatadirModification<'_>,
92 73215430 : decoded: &mut DecodedWALRecord,
93 73215430 : ctx: &RequestContext,
94 73215430 : ) -> anyhow::Result<bool> {
95 73215430 : WAL_INGEST.records_received.inc();
96 73215430 : let pg_version = modification.tline.pg_version;
97 73215430 : let prev_len = modification.len();
98 73215430 :
99 73215430 : modification.set_lsn(lsn)?;
100 73215430 : decode_wal_record(recdata, decoded, pg_version)?;
101 :
102 73215430 : let mut buf = decoded.record.clone();
103 73215430 : buf.advance(decoded.main_data_offset);
104 73215430 :
105 73215430 : assert!(!self.checkpoint_modified);
106 73215430 : if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
107 71949132 : && self.checkpoint.update_next_xid(decoded.xl_xid)
108 7437 : {
109 7437 : self.checkpoint_modified = true;
110 73207993 : }
111 :
112 73215430 : match decoded.xl_rmid {
113 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
114 : // Heap AM records need some special handling, because they modify VM pages
115 : // without registering them with the standard mechanism.
116 54886628 : self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
117 55 : .await?;
118 : }
119 : pg_constants::RM_NEON_ID => {
120 0 : self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
121 0 : .await?;
122 : }
123 : // Handle other special record types
124 : pg_constants::RM_SMGR_ID => {
125 75492 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
126 75492 :
127 75492 : if info == pg_constants::XLOG_SMGR_CREATE {
128 75290 : let create = XlSmgrCreate::decode(&mut buf);
129 75290 : self.ingest_xlog_smgr_create(modification, &create, ctx)
130 1544 : .await?;
131 202 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
132 202 : let truncate = XlSmgrTruncate::decode(&mut buf);
133 202 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
134 2 : .await?;
135 0 : }
136 : }
137 : pg_constants::RM_DBASE_ID => {
138 27 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
139 0 : debug!(%info, %pg_version, "handle RM_DBASE_ID");
140 :
141 27 : if pg_version == 14 {
142 27 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
143 24 : let createdb = XlCreateDatabase::decode(&mut buf);
144 0 : debug!("XLOG_DBASE_CREATE v14");
145 :
146 24 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
147 1693 : .await?;
148 3 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
149 3 : let dropdb = XlDropDatabase::decode(&mut buf);
150 6 : for tablespace_id in dropdb.tablespace_ids {
151 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
152 3 : modification
153 3 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
154 0 : .await?;
155 : }
156 0 : }
157 0 : } else if pg_version == 15 {
158 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
159 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
160 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
161 : // The XLOG record was renamed between v14 and v15,
162 : // but the record format is the same.
163 : // So we can reuse XlCreateDatabase here.
164 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
165 0 : let createdb = XlCreateDatabase::decode(&mut buf);
166 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
167 0 : .await?;
168 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
169 0 : let dropdb = XlDropDatabase::decode(&mut buf);
170 0 : for tablespace_id in dropdb.tablespace_ids {
171 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
172 0 : modification
173 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
174 0 : .await?;
175 : }
176 0 : }
177 0 : } else if pg_version == 16 {
178 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
179 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
180 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
181 : // The XLOG record was renamed between v14 and v15,
182 : // but the record format is the same.
183 : // So we can reuse XlCreateDatabase here.
184 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
185 0 : let createdb = XlCreateDatabase::decode(&mut buf);
186 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
187 0 : .await?;
188 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
189 0 : let dropdb = XlDropDatabase::decode(&mut buf);
190 0 : for tablespace_id in dropdb.tablespace_ids {
191 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
192 0 : modification
193 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
194 0 : .await?;
195 : }
196 0 : }
197 0 : }
198 : }
199 : pg_constants::RM_TBLSPC_ID => {
200 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
201 : }
202 : pg_constants::RM_CLOG_ID => {
203 1018 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
204 1018 :
205 1018 : if info == pg_constants::CLOG_ZEROPAGE {
206 1016 : let pageno = buf.get_u32_le();
207 1016 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
208 1016 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
209 1016 : self.put_slru_page_image(
210 1016 : modification,
211 1016 : SlruKind::Clog,
212 1016 : segno,
213 1016 : rpageno,
214 1016 : ZERO_PAGE.clone(),
215 1016 : ctx,
216 1016 : )
217 27 : .await?;
218 : } else {
219 2 : assert!(info == pg_constants::CLOG_TRUNCATE);
220 2 : let xlrec = XlClogTruncate::decode(&mut buf);
221 2 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
222 0 : .await?;
223 : }
224 : }
225 : pg_constants::RM_XACT_ID => {
226 6517839 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
227 6517839 :
228 6517839 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
229 6113870 : let parsed_xact =
230 6113870 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
231 6113870 : self.ingest_xact_record(
232 6113870 : modification,
233 6113870 : &parsed_xact,
234 6113870 : info == pg_constants::XLOG_XACT_COMMIT,
235 6113870 : ctx,
236 6113870 : )
237 1512 : .await?;
238 403969 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
239 403968 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
240 : {
241 2 : let parsed_xact =
242 2 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
243 2 : self.ingest_xact_record(
244 2 : modification,
245 2 : &parsed_xact,
246 2 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
247 2 : ctx,
248 2 : )
249 0 : .await?;
250 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
251 0 : trace!(
252 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
253 0 : decoded.xl_xid,
254 0 : parsed_xact.xid,
255 0 : lsn,
256 0 : );
257 2 : modification
258 2 : .drop_twophase_file(parsed_xact.xid, ctx)
259 0 : .await?;
260 403967 : } else if info == pg_constants::XLOG_XACT_PREPARE {
261 4 : modification
262 4 : .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
263 0 : .await?;
264 403963 : }
265 : }
266 : pg_constants::RM_MULTIXACT_ID => {
267 25139 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
268 25139 :
269 25139 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
270 23 : let pageno = buf.get_u32_le();
271 23 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
272 23 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
273 23 : self.put_slru_page_image(
274 23 : modification,
275 23 : SlruKind::MultiXactOffsets,
276 23 : segno,
277 23 : rpageno,
278 23 : ZERO_PAGE.clone(),
279 23 : ctx,
280 23 : )
281 1 : .await?;
282 25116 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
283 300 : let pageno = buf.get_u32_le();
284 300 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
285 300 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
286 300 : self.put_slru_page_image(
287 300 : modification,
288 300 : SlruKind::MultiXactMembers,
289 300 : segno,
290 300 : rpageno,
291 300 : ZERO_PAGE.clone(),
292 300 : ctx,
293 300 : )
294 0 : .await?;
295 24816 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
296 24816 : let xlrec = XlMultiXactCreate::decode(&mut buf);
297 24816 : self.ingest_multixact_create_record(modification, &xlrec)?;
298 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
299 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
300 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
301 0 : .await?;
302 0 : }
303 : }
304 : pg_constants::RM_RELMAP_ID => {
305 62 : let xlrec = XlRelmapUpdate::decode(&mut buf);
306 62 : self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
307 0 : .await?;
308 : }
309 : pg_constants::RM_XLOG_ID => {
310 244240 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
311 244240 :
312 244240 : if info == pg_constants::XLOG_NEXTOID {
313 442 : let next_oid = buf.get_u32_le();
314 442 : if self.checkpoint.nextOid != next_oid {
315 442 : self.checkpoint.nextOid = next_oid;
316 442 : self.checkpoint_modified = true;
317 442 : }
318 243798 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
319 243703 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
320 : {
321 688 : let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
322 688 : buf.copy_to_slice(&mut checkpoint_bytes);
323 688 : let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
324 0 : trace!(
325 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
326 0 : xlog_checkpoint.oldestXid,
327 0 : self.checkpoint.oldestXid
328 0 : );
329 688 : if (self
330 688 : .checkpoint
331 688 : .oldestXid
332 688 : .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
333 688 : < 0
334 0 : {
335 0 : self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
336 688 : }
337 :
338 : // Write a new checkpoint key-value pair on every checkpoint record, even
339 : // if nothing really changed. Not strictly required, but it seems nice to
340 : // have some trace of the checkpoint records in the layer files at the same
341 : // LSNs.
342 688 : self.checkpoint_modified = true;
343 243110 : }
344 : }
345 : pg_constants::RM_LOGICALMSG_ID => {
346 128 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
347 128 :
348 128 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
349 128 : let xlrec = XlLogicalMessage::decode(&mut buf);
350 128 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
351 128 : let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
352 128 : if prefix == "neon-test" {
353 : // This is a convenient way to make the WAL ingestion pause at
354 : // particular point in the WAL. For more fine-grained control,
355 : // we could peek into the message and only pause if it contains
356 : // a particular string, for example, but this is enough for now.
357 6 : failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
358 124 : } else if let Some(path) = prefix.strip_prefix("neon-file:") {
359 117 : modification.put_file(path, message, ctx).await?;
360 7 : }
361 0 : }
362 : }
363 11464837 : _x => {
364 11464837 : // TODO: should probably log & fail here instead of blindly
365 11464837 : // doing something without understanding the protocol
366 11464837 : }
367 : }
368 :
369 : // Iterate through all the blocks that the record modifies, and
370 : // "put" a separate copy of the record for each block.
371 73215430 : for blk in decoded.blocks.iter() {
372 67216973 : let rel = RelTag {
373 67216973 : spcnode: blk.rnode_spcnode,
374 67216973 : dbnode: blk.rnode_dbnode,
375 67216973 : relnode: blk.rnode_relnode,
376 67216973 : forknum: blk.forknum,
377 67216973 : };
378 67216973 :
379 67216973 : let key = rel_block_to_key(rel, blk.blkno);
380 67216973 : let key_is_local = self.shard.is_key_local(&key);
381 :
382 0 : tracing::debug!(
383 0 : lsn=%lsn,
384 0 : key=%key,
385 0 : "ingest: shard decision {} (checkpoint={})",
386 0 : if !key_is_local { "drop" } else { "keep" },
387 0 : self.checkpoint_modified
388 0 : );
389 :
390 67216973 : if !key_is_local {
391 13873356 : if self.shard.is_zero() {
392 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
393 : // its blkno in case it implicitly extends a relation.
394 3495228 : self.observe_decoded_block(modification, blk, ctx).await?;
395 10378128 : }
396 :
397 13873356 : continue;
398 53343617 : }
399 53343617 : self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
400 7753 : .await?;
401 : }
402 :
403 : // If checkpoint data was updated, store the new version in the repository
404 73215427 : if self.checkpoint_modified {
405 33365 : let new_checkpoint_bytes = self.checkpoint.encode()?;
406 :
407 33365 : modification.put_checkpoint(new_checkpoint_bytes)?;
408 33365 : self.checkpoint_modified = false;
409 73182062 : }
410 :
411 : // Note that at this point this record is only cached in the modification
412 : // until commit() is called to flush the data into the repository and update
413 : // the latest LSN.
414 :
415 73215427 : Ok(modification.len() > prev_len)
416 73215430 : }
417 :
418 : /// Do not store this block, but observe it for the purposes of updating our relation size state.
419 3495228 : async fn observe_decoded_block(
420 3495228 : &mut self,
421 3495228 : modification: &mut DatadirModification<'_>,
422 3495228 : blk: &DecodedBkpBlock,
423 3495228 : ctx: &RequestContext,
424 3495228 : ) -> Result<(), PageReconstructError> {
425 3495228 : let rel = RelTag {
426 3495228 : spcnode: blk.rnode_spcnode,
427 3495228 : dbnode: blk.rnode_dbnode,
428 3495228 : relnode: blk.rnode_relnode,
429 3495228 : forknum: blk.forknum,
430 3495228 : };
431 3495228 : self.handle_rel_extend(modification, rel, blk.blkno, ctx)
432 468 : .await
433 3495228 : }
434 :
435 53343617 : async fn ingest_decoded_block(
436 53343617 : &mut self,
437 53343617 : modification: &mut DatadirModification<'_>,
438 53343617 : lsn: Lsn,
439 53343617 : decoded: &DecodedWALRecord,
440 53343617 : blk: &DecodedBkpBlock,
441 53343617 : ctx: &RequestContext,
442 53343617 : ) -> Result<(), PageReconstructError> {
443 53343617 : let rel = RelTag {
444 53343617 : spcnode: blk.rnode_spcnode,
445 53343617 : dbnode: blk.rnode_dbnode,
446 53343617 : relnode: blk.rnode_relnode,
447 53343617 : forknum: blk.forknum,
448 53343617 : };
449 53343617 :
450 53343617 : //
451 53343617 : // Instead of storing full-page-image WAL record,
452 53343617 : // it is better to store extracted image: we can skip wal-redo
453 53343617 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
454 53343617 : // so them have to be copied multiple times.
455 53343617 : //
456 53343617 : if blk.apply_image
457 195498 : && blk.has_image
458 195498 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
459 173023 : && (decoded.xl_info == pg_constants::XLOG_FPI
460 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
461 : // compression of WAL is not yet supported: fall back to storing the original WAL record
462 173023 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
463 : // do not materialize null pages because them most likely be soon replaced with real data
464 173023 : && blk.bimg_len != 0
465 : {
466 : // Extract page image from FPI record
467 173023 : let img_len = blk.bimg_len as usize;
468 173023 : let img_offs = blk.bimg_offset as usize;
469 173023 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
470 173023 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
471 173023 :
472 173023 : if blk.hole_length != 0 {
473 139994 : let tail = image.split_off(blk.hole_offset as usize);
474 139994 : image.resize(image.len() + blk.hole_length as usize, 0u8);
475 139994 : image.unsplit(tail);
476 139994 : }
477 : //
478 : // Match the logic of XLogReadBufferForRedoExtended:
479 : // The page may be uninitialized. If so, we can't set the LSN because
480 : // that would corrupt the page.
481 : //
482 173023 : if !page_is_new(&image) {
483 165898 : page_set_lsn(&mut image, lsn)
484 7125 : }
485 173023 : assert_eq!(image.len(), BLCKSZ as usize);
486 173023 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
487 813 : .await?;
488 : } else {
489 53170594 : let rec = NeonWalRecord::Postgres {
490 53170594 : will_init: blk.will_init || blk.apply_image,
491 53170594 : rec: decoded.record.clone(),
492 53170594 : };
493 53170594 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
494 6940 : .await?;
495 : }
496 53343614 : Ok(())
497 53343617 : }
498 :
499 54886628 : async fn ingest_heapam_record(
500 54886628 : &mut self,
501 54886628 : buf: &mut Bytes,
502 54886628 : modification: &mut DatadirModification<'_>,
503 54886628 : decoded: &DecodedWALRecord,
504 54886628 : ctx: &RequestContext,
505 54886628 : ) -> anyhow::Result<()> {
506 54886628 : // Handle VM bit updates that are implicitly part of heap records.
507 54886628 :
508 54886628 : // First, look at the record to determine which VM bits need
509 54886628 : // to be cleared. If either of these variables is set, we
510 54886628 : // need to clear the corresponding bits in the visibility map.
511 54886628 : let mut new_heap_blkno: Option<u32> = None;
512 54886628 : let mut old_heap_blkno: Option<u32> = None;
513 54886628 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
514 54886628 :
515 54886628 : match modification.tline.pg_version {
516 : 14 => {
517 54741154 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
518 50371586 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
519 50371586 :
520 50371586 : if info == pg_constants::XLOG_HEAP_INSERT {
521 41788436 : let xlrec = v14::XlHeapInsert::decode(buf);
522 41788436 : assert_eq!(0, buf.remaining());
523 41788436 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
524 4572 : new_heap_blkno = Some(decoded.blocks[0].blkno);
525 41783864 : }
526 8583150 : } else if info == pg_constants::XLOG_HEAP_DELETE {
527 2302613 : let xlrec = v14::XlHeapDelete::decode(buf);
528 2302613 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
529 2824 : new_heap_blkno = Some(decoded.blocks[0].blkno);
530 2299789 : }
531 6280537 : } else if info == pg_constants::XLOG_HEAP_UPDATE
532 4766689 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
533 : {
534 3413359 : let xlrec = v14::XlHeapUpdate::decode(buf);
535 3413359 : // the size of tuple data is inferred from the size of the record.
536 3413359 : // we can't validate the remaining number of bytes without parsing
537 3413359 : // the tuple data.
538 3413359 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
539 92309 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
540 3321050 : }
541 3413359 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
542 2003 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
543 2003 : // non-HOT update where the new tuple goes to different page than
544 2003 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
545 2003 : // set.
546 2003 : new_heap_blkno = Some(decoded.blocks[0].blkno);
547 3411356 : }
548 2867178 : } else if info == pg_constants::XLOG_HEAP_LOCK {
549 2765675 : let xlrec = v14::XlHeapLock::decode(buf);
550 2765675 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
551 198 : old_heap_blkno = Some(decoded.blocks[0].blkno);
552 198 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
553 2765477 : }
554 101503 : }
555 4369568 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
556 4369568 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
557 4369568 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
558 1160055 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
559 :
560 1160055 : let offset_array_len =
561 1160055 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
562 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
563 721733 : 0
564 : } else {
565 438322 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
566 : };
567 1160055 : assert_eq!(offset_array_len, buf.remaining());
568 :
569 1160055 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
570 3721 : new_heap_blkno = Some(decoded.blocks[0].blkno);
571 1156334 : }
572 3209513 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
573 4146 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
574 4146 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
575 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
576 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
577 4146 : }
578 3205367 : }
579 : } else {
580 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
581 : }
582 : }
583 : 15 => {
584 145474 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
585 145286 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
586 145286 :
587 145286 : if info == pg_constants::XLOG_HEAP_INSERT {
588 145276 : let xlrec = v15::XlHeapInsert::decode(buf);
589 145276 : assert_eq!(0, buf.remaining());
590 145276 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
591 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
592 145272 : }
593 10 : } else if info == pg_constants::XLOG_HEAP_DELETE {
594 0 : let xlrec = v15::XlHeapDelete::decode(buf);
595 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
596 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
597 0 : }
598 10 : } else if info == pg_constants::XLOG_HEAP_UPDATE
599 2 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
600 : {
601 8 : let xlrec = v15::XlHeapUpdate::decode(buf);
602 8 : // the size of tuple data is inferred from the size of the record.
603 8 : // we can't validate the remaining number of bytes without parsing
604 8 : // the tuple data.
605 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
606 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
607 8 : }
608 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
609 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
610 0 : // non-HOT update where the new tuple goes to different page than
611 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
612 0 : // set.
613 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
614 8 : }
615 2 : } else if info == pg_constants::XLOG_HEAP_LOCK {
616 0 : let xlrec = v15::XlHeapLock::decode(buf);
617 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
618 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
619 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
620 0 : }
621 2 : }
622 188 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
623 188 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
624 188 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
625 42 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
626 :
627 42 : let offset_array_len =
628 42 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
629 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
630 2 : 0
631 : } else {
632 40 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
633 : };
634 42 : assert_eq!(offset_array_len, buf.remaining());
635 :
636 42 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
637 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
638 34 : }
639 146 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
640 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
641 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
642 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
643 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
644 0 : }
645 146 : }
646 : } else {
647 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
648 : }
649 : }
650 : 16 => {
651 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
652 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
653 0 :
654 0 : if info == pg_constants::XLOG_HEAP_INSERT {
655 0 : let xlrec = v16::XlHeapInsert::decode(buf);
656 0 : assert_eq!(0, buf.remaining());
657 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
658 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
659 0 : }
660 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
661 0 : let xlrec = v16::XlHeapDelete::decode(buf);
662 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
663 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
664 0 : }
665 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
666 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
667 : {
668 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
669 0 : // the size of tuple data is inferred from the size of the record.
670 0 : // we can't validate the remaining number of bytes without parsing
671 0 : // the tuple data.
672 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
673 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
674 0 : }
675 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
676 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
677 0 : // non-HOT update where the new tuple goes to different page than
678 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
679 0 : // set.
680 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
681 0 : }
682 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
683 0 : let xlrec = v16::XlHeapLock::decode(buf);
684 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
685 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
686 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
687 0 : }
688 0 : }
689 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
690 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
691 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
692 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
693 :
694 0 : let offset_array_len =
695 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
696 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
697 0 : 0
698 : } else {
699 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
700 : };
701 0 : assert_eq!(offset_array_len, buf.remaining());
702 :
703 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
704 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
705 0 : }
706 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
707 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
708 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
709 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
710 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
711 0 : }
712 0 : }
713 : } else {
714 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
715 : }
716 : }
717 0 : _ => {}
718 : }
719 :
720 : // Clear the VM bits if required.
721 54886628 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
722 105534 : let vm_rel = RelTag {
723 105534 : forknum: VISIBILITYMAP_FORKNUM,
724 105534 : spcnode: decoded.blocks[0].rnode_spcnode,
725 105534 : dbnode: decoded.blocks[0].rnode_dbnode,
726 105534 : relnode: decoded.blocks[0].rnode_relnode,
727 105534 : };
728 105534 :
729 105534 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
730 105534 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
731 :
732 : // Sometimes, Postgres seems to create heap WAL records with the
733 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
734 : // not set. In fact, it's possible that the VM page does not exist at all.
735 : // In that case, we don't want to store a record to clear the VM bit;
736 : // replaying it would fail to find the previous image of the page, because
737 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
738 : // record if it doesn't.
739 105534 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
740 105534 : if let Some(blknum) = new_vm_blk {
741 13132 : if blknum >= vm_size {
742 1788 : new_vm_blk = None;
743 11344 : }
744 92402 : }
745 105534 : if let Some(blknum) = old_vm_blk {
746 92507 : if blknum >= vm_size {
747 658 : old_vm_blk = None;
748 91849 : }
749 13027 : }
750 :
751 105534 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
752 103102 : if new_vm_blk == old_vm_blk {
753 : // An UPDATE record that needs to clear the bits for both old and the
754 : // new page, both of which reside on the same VM page.
755 91 : self.put_rel_wal_record(
756 91 : modification,
757 91 : vm_rel,
758 91 : new_vm_blk.unwrap(),
759 91 : NeonWalRecord::ClearVisibilityMapFlags {
760 91 : new_heap_blkno,
761 91 : old_heap_blkno,
762 91 : flags,
763 91 : },
764 91 : ctx,
765 91 : )
766 0 : .await?;
767 : } else {
768 : // Clear VM bits for one heap page, or for two pages that reside on
769 : // different VM pages.
770 103011 : if let Some(new_vm_blk) = new_vm_blk {
771 11253 : self.put_rel_wal_record(
772 11253 : modification,
773 11253 : vm_rel,
774 11253 : new_vm_blk,
775 11253 : NeonWalRecord::ClearVisibilityMapFlags {
776 11253 : new_heap_blkno,
777 11253 : old_heap_blkno: None,
778 11253 : flags,
779 11253 : },
780 11253 : ctx,
781 11253 : )
782 0 : .await?;
783 91758 : }
784 103011 : if let Some(old_vm_blk) = old_vm_blk {
785 91758 : self.put_rel_wal_record(
786 91758 : modification,
787 91758 : vm_rel,
788 91758 : old_vm_blk,
789 91758 : NeonWalRecord::ClearVisibilityMapFlags {
790 91758 : new_heap_blkno: None,
791 91758 : old_heap_blkno,
792 91758 : flags,
793 91758 : },
794 91758 : ctx,
795 91758 : )
796 0 : .await?;
797 11253 : }
798 : }
799 2432 : }
800 54781094 : }
801 :
802 54886628 : Ok(())
803 54886628 : }
804 :
805 0 : async fn ingest_neonrmgr_record(
806 0 : &mut self,
807 0 : buf: &mut Bytes,
808 0 : modification: &mut DatadirModification<'_>,
809 0 : decoded: &DecodedWALRecord,
810 0 : ctx: &RequestContext,
811 0 : ) -> anyhow::Result<()> {
812 0 : // Handle VM bit updates that are implicitly part of heap records.
813 0 :
814 0 : // First, look at the record to determine which VM bits need
815 0 : // to be cleared. If either of these variables is set, we
816 0 : // need to clear the corresponding bits in the visibility map.
817 0 : let mut new_heap_blkno: Option<u32> = None;
818 0 : let mut old_heap_blkno: Option<u32> = None;
819 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
820 0 : let pg_version = modification.tline.pg_version;
821 0 :
822 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
823 :
824 0 : match pg_version {
825 : 16 => {
826 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
827 0 :
828 0 : match info {
829 : pg_constants::XLOG_NEON_HEAP_INSERT => {
830 0 : let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
831 0 : assert_eq!(0, buf.remaining());
832 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
833 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
834 0 : }
835 : }
836 : pg_constants::XLOG_NEON_HEAP_DELETE => {
837 0 : let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
838 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
839 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
840 0 : }
841 : }
842 : pg_constants::XLOG_NEON_HEAP_UPDATE
843 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
844 0 : let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
845 0 : // the size of tuple data is inferred from the size of the record.
846 0 : // we can't validate the remaining number of bytes without parsing
847 0 : // the tuple data.
848 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
849 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
850 0 : }
851 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
852 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
853 0 : // non-HOT update where the new tuple goes to different page than
854 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
855 0 : // set.
856 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
857 0 : }
858 : }
859 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
860 0 : let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
861 :
862 0 : let offset_array_len =
863 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
864 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
865 0 : 0
866 : } else {
867 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
868 : };
869 0 : assert_eq!(offset_array_len, buf.remaining());
870 :
871 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
872 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
873 0 : }
874 : }
875 : pg_constants::XLOG_NEON_HEAP_LOCK => {
876 0 : let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
877 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
878 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
879 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
880 0 : }
881 : }
882 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
883 : }
884 : }
885 0 : _ => bail!(
886 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
887 0 : pg_version
888 0 : ),
889 : }
890 :
891 : // Clear the VM bits if required.
892 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
893 0 : let vm_rel = RelTag {
894 0 : forknum: VISIBILITYMAP_FORKNUM,
895 0 : spcnode: decoded.blocks[0].rnode_spcnode,
896 0 : dbnode: decoded.blocks[0].rnode_dbnode,
897 0 : relnode: decoded.blocks[0].rnode_relnode,
898 0 : };
899 0 :
900 0 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
901 0 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
902 :
903 : // Sometimes, Postgres seems to create heap WAL records with the
904 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
905 : // not set. In fact, it's possible that the VM page does not exist at all.
906 : // In that case, we don't want to store a record to clear the VM bit;
907 : // replaying it would fail to find the previous image of the page, because
908 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
909 : // record if it doesn't.
910 0 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
911 0 : if let Some(blknum) = new_vm_blk {
912 0 : if blknum >= vm_size {
913 0 : new_vm_blk = None;
914 0 : }
915 0 : }
916 0 : if let Some(blknum) = old_vm_blk {
917 0 : if blknum >= vm_size {
918 0 : old_vm_blk = None;
919 0 : }
920 0 : }
921 :
922 0 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
923 0 : if new_vm_blk == old_vm_blk {
924 : // An UPDATE record that needs to clear the bits for both old and the
925 : // new page, both of which reside on the same VM page.
926 0 : self.put_rel_wal_record(
927 0 : modification,
928 0 : vm_rel,
929 0 : new_vm_blk.unwrap(),
930 0 : NeonWalRecord::ClearVisibilityMapFlags {
931 0 : new_heap_blkno,
932 0 : old_heap_blkno,
933 0 : flags,
934 0 : },
935 0 : ctx,
936 0 : )
937 0 : .await?;
938 : } else {
939 : // Clear VM bits for one heap page, or for two pages that reside on
940 : // different VM pages.
941 0 : if let Some(new_vm_blk) = new_vm_blk {
942 0 : self.put_rel_wal_record(
943 0 : modification,
944 0 : vm_rel,
945 0 : new_vm_blk,
946 0 : NeonWalRecord::ClearVisibilityMapFlags {
947 0 : new_heap_blkno,
948 0 : old_heap_blkno: None,
949 0 : flags,
950 0 : },
951 0 : ctx,
952 0 : )
953 0 : .await?;
954 0 : }
955 0 : if let Some(old_vm_blk) = old_vm_blk {
956 0 : self.put_rel_wal_record(
957 0 : modification,
958 0 : vm_rel,
959 0 : old_vm_blk,
960 0 : NeonWalRecord::ClearVisibilityMapFlags {
961 0 : new_heap_blkno: None,
962 0 : old_heap_blkno,
963 0 : flags,
964 0 : },
965 0 : ctx,
966 0 : )
967 0 : .await?;
968 0 : }
969 : }
970 0 : }
971 0 : }
972 :
973 0 : Ok(())
974 0 : }
975 :
976 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
977 24 : async fn ingest_xlog_dbase_create(
978 24 : &mut self,
979 24 : modification: &mut DatadirModification<'_>,
980 24 : rec: &XlCreateDatabase,
981 24 : ctx: &RequestContext,
982 24 : ) -> anyhow::Result<()> {
983 24 : let db_id = rec.db_id;
984 24 : let tablespace_id = rec.tablespace_id;
985 24 : let src_db_id = rec.src_db_id;
986 24 : let src_tablespace_id = rec.src_tablespace_id;
987 :
988 24 : let rels = modification
989 24 : .tline
990 24 : .list_rels(
991 24 : src_tablespace_id,
992 24 : src_db_id,
993 24 : Version::Modified(modification),
994 24 : ctx,
995 24 : )
996 0 : .await?;
997 :
998 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
999 :
1000 : // Copy relfilemap
1001 24 : let filemap = modification
1002 24 : .tline
1003 24 : .get_relmap_file(
1004 24 : src_tablespace_id,
1005 24 : src_db_id,
1006 24 : Version::Modified(modification),
1007 24 : ctx,
1008 24 : )
1009 1 : .await?;
1010 24 : modification
1011 24 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
1012 0 : .await?;
1013 :
1014 24 : let mut num_rels_copied = 0;
1015 24 : let mut num_blocks_copied = 0;
1016 7032 : for src_rel in rels {
1017 7008 : assert_eq!(src_rel.spcnode, src_tablespace_id);
1018 7008 : assert_eq!(src_rel.dbnode, src_db_id);
1019 :
1020 7008 : let nblocks = modification
1021 7008 : .tline
1022 7008 : .get_rel_size(src_rel, Version::Modified(modification), Lsn::MAX, ctx)
1023 218 : .await?;
1024 7008 : let dst_rel = RelTag {
1025 7008 : spcnode: tablespace_id,
1026 7008 : dbnode: db_id,
1027 7008 : relnode: src_rel.relnode,
1028 7008 : forknum: src_rel.forknum,
1029 7008 : };
1030 7008 :
1031 7008 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
1032 :
1033 : // Copy content
1034 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
1035 24264 : for blknum in 0..nblocks {
1036 : // Sharding:
1037 : // - src and dst are always on the same shard, because they differ only by dbNode, and
1038 : // dbNode is not included in the hash inputs for sharding.
1039 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
1040 : // that belong to it.
1041 24264 : let src_key = rel_block_to_key(src_rel, blknum);
1042 24264 : if !self.shard.is_key_local(&src_key) {
1043 0 : debug!(
1044 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
1045 0 : src_key
1046 0 : );
1047 9099 : continue;
1048 15165 : }
1049 0 : debug!(
1050 0 : "copying block {} from {} ({}) to {}",
1051 0 : blknum, src_rel, src_key, dst_rel
1052 0 : );
1053 :
1054 15165 : let content = modification
1055 15165 : .tline
1056 15165 : .get_rel_page_at_lsn(
1057 15165 : src_rel,
1058 15165 : blknum,
1059 15165 : Version::Modified(modification),
1060 15165 : Lsn::MAX,
1061 15165 : ctx,
1062 15165 : )
1063 1474 : .await?;
1064 15165 : modification.put_rel_page_image(dst_rel, blknum, content)?;
1065 15165 : num_blocks_copied += 1;
1066 : }
1067 :
1068 7008 : num_rels_copied += 1;
1069 : }
1070 :
1071 24 : info!(
1072 24 : "Created database {}/{}, copied {} blocks in {} rels",
1073 24 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
1074 24 : );
1075 24 : Ok(())
1076 24 : }
1077 :
1078 75290 : async fn ingest_xlog_smgr_create(
1079 75290 : &mut self,
1080 75290 : modification: &mut DatadirModification<'_>,
1081 75290 : rec: &XlSmgrCreate,
1082 75290 : ctx: &RequestContext,
1083 75290 : ) -> anyhow::Result<()> {
1084 75290 : let rel = RelTag {
1085 75290 : spcnode: rec.rnode.spcnode,
1086 75290 : dbnode: rec.rnode.dbnode,
1087 75290 : relnode: rec.rnode.relnode,
1088 75290 : forknum: rec.forknum,
1089 75290 : };
1090 75290 : self.put_rel_creation(modification, rel, ctx).await?;
1091 75290 : Ok(())
1092 75290 : }
1093 :
1094 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1095 : ///
1096 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1097 202 : async fn ingest_xlog_smgr_truncate(
1098 202 : &mut self,
1099 202 : modification: &mut DatadirModification<'_>,
1100 202 : rec: &XlSmgrTruncate,
1101 202 : ctx: &RequestContext,
1102 202 : ) -> anyhow::Result<()> {
1103 202 : let spcnode = rec.rnode.spcnode;
1104 202 : let dbnode = rec.rnode.dbnode;
1105 202 : let relnode = rec.rnode.relnode;
1106 202 :
1107 202 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
1108 202 : let rel = RelTag {
1109 202 : spcnode,
1110 202 : dbnode,
1111 202 : relnode,
1112 202 : forknum: MAIN_FORKNUM,
1113 202 : };
1114 202 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
1115 1 : .await?;
1116 0 : }
1117 202 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
1118 202 : let rel = RelTag {
1119 202 : spcnode,
1120 202 : dbnode,
1121 202 : relnode,
1122 202 : forknum: FSM_FORKNUM,
1123 202 : };
1124 202 :
1125 202 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1126 202 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1127 202 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1128 : // Tail of last remaining FSM page has to be zeroed.
1129 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1130 95 : modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
1131 95 : fsm_physical_page_no += 1;
1132 107 : }
1133 202 : let nblocks = get_relsize(modification, rel, ctx).await?;
1134 202 : if nblocks > fsm_physical_page_no {
1135 : // check if something to do: FSM is larger than truncate position
1136 54 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1137 1 : .await?;
1138 148 : }
1139 0 : }
1140 202 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
1141 202 : let rel = RelTag {
1142 202 : spcnode,
1143 202 : dbnode,
1144 202 : relnode,
1145 202 : forknum: VISIBILITYMAP_FORKNUM,
1146 202 : };
1147 202 :
1148 202 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1149 202 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1150 : // Tail of last remaining vm page has to be zeroed.
1151 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1152 95 : modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
1153 95 : vm_page_no += 1;
1154 107 : }
1155 202 : let nblocks = get_relsize(modification, rel, ctx).await?;
1156 202 : if nblocks > vm_page_no {
1157 : // check if something to do: VM is larger than truncate position
1158 54 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1159 0 : .await?;
1160 148 : }
1161 0 : }
1162 202 : Ok(())
1163 202 : }
1164 :
1165 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1166 : ///
1167 6113872 : async fn ingest_xact_record(
1168 6113872 : &mut self,
1169 6113872 : modification: &mut DatadirModification<'_>,
1170 6113872 : parsed: &XlXactParsedRecord,
1171 6113872 : is_commit: bool,
1172 6113872 : ctx: &RequestContext,
1173 6113872 : ) -> anyhow::Result<()> {
1174 6113872 : // Record update of CLOG pages
1175 6113872 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1176 6113872 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1177 6113872 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1178 6113872 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1179 :
1180 6224117 : for subxact in &parsed.subxacts {
1181 110245 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1182 110245 : if subxact_pageno != pageno {
1183 : // This subxact goes to different page. Write the record
1184 : // for all the XIDs on the previous page, and continue
1185 : // accumulating XIDs on this new page.
1186 4 : modification.put_slru_wal_record(
1187 4 : SlruKind::Clog,
1188 4 : segno,
1189 4 : rpageno,
1190 4 : if is_commit {
1191 4 : NeonWalRecord::ClogSetCommitted {
1192 4 : xids: page_xids,
1193 4 : timestamp: parsed.xact_time,
1194 4 : }
1195 : } else {
1196 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1197 : },
1198 0 : )?;
1199 4 : page_xids = Vec::new();
1200 110241 : }
1201 110245 : pageno = subxact_pageno;
1202 110245 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1203 110245 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1204 110245 : page_xids.push(*subxact);
1205 : }
1206 6113872 : modification.put_slru_wal_record(
1207 6113872 : SlruKind::Clog,
1208 6113872 : segno,
1209 6113872 : rpageno,
1210 6113872 : if is_commit {
1211 6105004 : NeonWalRecord::ClogSetCommitted {
1212 6105004 : xids: page_xids,
1213 6105004 : timestamp: parsed.xact_time,
1214 6105004 : }
1215 : } else {
1216 8868 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1217 : },
1218 0 : )?;
1219 :
1220 6179231 : for xnode in &parsed.xnodes {
1221 326795 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1222 261436 : let rel = RelTag {
1223 261436 : forknum,
1224 261436 : spcnode: xnode.spcnode,
1225 261436 : dbnode: xnode.dbnode,
1226 261436 : relnode: xnode.relnode,
1227 261436 : };
1228 261436 : if modification
1229 261436 : .tline
1230 261436 : .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
1231 3 : .await?
1232 : {
1233 67231 : self.put_rel_drop(modification, rel, ctx).await?;
1234 194205 : }
1235 : }
1236 : }
1237 6113872 : Ok(())
1238 6113872 : }
1239 :
1240 2 : async fn ingest_clog_truncate_record(
1241 2 : &mut self,
1242 2 : modification: &mut DatadirModification<'_>,
1243 2 : xlrec: &XlClogTruncate,
1244 2 : ctx: &RequestContext,
1245 2 : ) -> anyhow::Result<()> {
1246 2 : info!(
1247 2 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1248 2 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
1249 2 : );
1250 :
1251 : // Here we treat oldestXid and oldestXidDB
1252 : // differently from postgres redo routines.
1253 : // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
1254 : // until checkpoint happens and updates the value.
1255 : // Here we can use the most recent value.
1256 : // It's just an optimization, though and can be deleted.
1257 : // TODO Figure out if there will be any issues with replica.
1258 2 : self.checkpoint.oldestXid = xlrec.oldest_xid;
1259 2 : self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
1260 2 : self.checkpoint_modified = true;
1261 2 :
1262 2 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1263 2 :
1264 2 : let latest_page_number =
1265 2 : self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
1266 2 :
1267 2 : // Now delete all segments containing pages between xlrec.pageno
1268 2 : // and latest_page_number.
1269 2 :
1270 2 : // First, make an important safety check:
1271 2 : // the current endpoint page must not be eligible for removal.
1272 2 : // See SimpleLruTruncate() in slru.c
1273 2 : if clogpage_precedes(latest_page_number, xlrec.pageno) {
1274 0 : info!("could not truncate directory pg_xact apparent wraparound");
1275 0 : return Ok(());
1276 2 : }
1277 :
1278 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1279 : //
1280 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1281 : // will block waiting for the last valid LSN to advance up to
1282 : // it. So we use the previous record's LSN in the get calls
1283 : // instead.
1284 13 : for segno in modification
1285 2 : .tline
1286 2 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
1287 0 : .await?
1288 : {
1289 13 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1290 13 : if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
1291 10 : modification
1292 10 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1293 0 : .await?;
1294 0 : trace!("Drop CLOG segment {:>04X}", segno);
1295 3 : }
1296 : }
1297 :
1298 2 : Ok(())
1299 2 : }
1300 :
1301 24816 : fn ingest_multixact_create_record(
1302 24816 : &mut self,
1303 24816 : modification: &mut DatadirModification,
1304 24816 : xlrec: &XlMultiXactCreate,
1305 24816 : ) -> Result<()> {
1306 24816 : // Create WAL record for updating the multixact-offsets page
1307 24816 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1308 24816 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1309 24816 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1310 24816 :
1311 24816 : modification.put_slru_wal_record(
1312 24816 : SlruKind::MultiXactOffsets,
1313 24816 : segno,
1314 24816 : rpageno,
1315 24816 : NeonWalRecord::MultixactOffsetCreate {
1316 24816 : mid: xlrec.mid,
1317 24816 : moff: xlrec.moff,
1318 24816 : },
1319 24816 : )?;
1320 :
1321 : // Create WAL records for the update of each affected multixact-members page
1322 24816 : let mut members = xlrec.members.iter();
1323 24816 : let mut offset = xlrec.moff;
1324 : loop {
1325 49906 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1326 49906 :
1327 49906 : // How many members fit on this page?
1328 49906 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1329 49906 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1330 49906 :
1331 49906 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1332 49906 : for _ in 0..page_remain {
1333 524292 : if let Some(m) = members.next() {
1334 474674 : this_page_members.push(m.clone());
1335 474674 : } else {
1336 49618 : break;
1337 : }
1338 : }
1339 49906 : if this_page_members.is_empty() {
1340 : // all done
1341 24816 : break;
1342 25090 : }
1343 25090 : let n_this_page = this_page_members.len();
1344 25090 :
1345 25090 : modification.put_slru_wal_record(
1346 25090 : SlruKind::MultiXactMembers,
1347 25090 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1348 25090 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1349 25090 : NeonWalRecord::MultixactMembersCreate {
1350 25090 : moff: offset,
1351 25090 : members: this_page_members,
1352 25090 : },
1353 25090 : )?;
1354 :
1355 : // Note: The multixact members can wrap around, even within one WAL record.
1356 25090 : offset = offset.wrapping_add(n_this_page as u32);
1357 : }
1358 24816 : if xlrec.mid >= self.checkpoint.nextMulti {
1359 24816 : self.checkpoint.nextMulti = xlrec.mid + 1;
1360 24816 : self.checkpoint_modified = true;
1361 24816 : }
1362 24816 : if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
1363 24816 : self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
1364 24816 : self.checkpoint_modified = true;
1365 24816 : }
1366 474674 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1367 474674 : if let Some(max_xid) = acc {
1368 449858 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1369 449633 : Some(mbr.xid)
1370 : } else {
1371 225 : acc
1372 : }
1373 : } else {
1374 24816 : Some(mbr.xid)
1375 : }
1376 474674 : });
1377 :
1378 24816 : if let Some(max_xid) = max_mbr_xid {
1379 24816 : if self.checkpoint.update_next_xid(max_xid) {
1380 0 : self.checkpoint_modified = true;
1381 24816 : }
1382 0 : }
1383 24816 : Ok(())
1384 24816 : }
1385 :
1386 0 : async fn ingest_multixact_truncate_record(
1387 0 : &mut self,
1388 0 : modification: &mut DatadirModification<'_>,
1389 0 : xlrec: &XlMultiXactTruncate,
1390 0 : ctx: &RequestContext,
1391 0 : ) -> Result<()> {
1392 0 : self.checkpoint.oldestMulti = xlrec.end_trunc_off;
1393 0 : self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
1394 0 : self.checkpoint_modified = true;
1395 0 :
1396 0 : // PerformMembersTruncation
1397 0 : let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
1398 0 : let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
1399 0 : let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
1400 0 : let mut segment: i32 = startsegment;
1401 :
1402 : // Delete all the segments except the last one. The last segment can still
1403 : // contain, possibly partially, valid data.
1404 0 : while segment != endsegment {
1405 0 : modification
1406 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1407 0 : .await?;
1408 :
1409 : /* move to next segment, handling wraparound correctly */
1410 0 : if segment == maxsegment {
1411 0 : segment = 0;
1412 0 : } else {
1413 0 : segment += 1;
1414 0 : }
1415 : }
1416 :
1417 : // Truncate offsets
1418 : // FIXME: this did not handle wraparound correctly
1419 :
1420 0 : Ok(())
1421 0 : }
1422 :
1423 62 : async fn ingest_relmap_page(
1424 62 : &mut self,
1425 62 : modification: &mut DatadirModification<'_>,
1426 62 : xlrec: &XlRelmapUpdate,
1427 62 : decoded: &DecodedWALRecord,
1428 62 : ctx: &RequestContext,
1429 62 : ) -> Result<()> {
1430 62 : let mut buf = decoded.record.clone();
1431 62 : buf.advance(decoded.main_data_offset);
1432 62 : // skip xl_relmap_update
1433 62 : buf.advance(12);
1434 62 :
1435 62 : modification
1436 62 : .put_relmap_file(
1437 62 : xlrec.tsid,
1438 62 : xlrec.dbid,
1439 62 : Bytes::copy_from_slice(&buf[..]),
1440 62 : ctx,
1441 62 : )
1442 0 : .await
1443 62 : }
1444 :
1445 75292 : async fn put_rel_creation(
1446 75292 : &mut self,
1447 75292 : modification: &mut DatadirModification<'_>,
1448 75292 : rel: RelTag,
1449 75292 : ctx: &RequestContext,
1450 75292 : ) -> Result<()> {
1451 75292 : modification.put_rel_creation(rel, 0, ctx).await?;
1452 75292 : Ok(())
1453 75292 : }
1454 :
1455 445425 : async fn put_rel_page_image(
1456 445425 : &mut self,
1457 445425 : modification: &mut DatadirModification<'_>,
1458 445425 : rel: RelTag,
1459 445425 : blknum: BlockNumber,
1460 445425 : img: Bytes,
1461 445425 : ctx: &RequestContext,
1462 445425 : ) -> Result<(), PageReconstructError> {
1463 445425 : self.handle_rel_extend(modification, rel, blknum, ctx)
1464 6976 : .await?;
1465 445425 : modification.put_rel_page_image(rel, blknum, img)?;
1466 445425 : Ok(())
1467 445425 : }
1468 :
1469 53273696 : async fn put_rel_wal_record(
1470 53273696 : &mut self,
1471 53273696 : modification: &mut DatadirModification<'_>,
1472 53273696 : rel: RelTag,
1473 53273696 : blknum: BlockNumber,
1474 53273696 : rec: NeonWalRecord,
1475 53273696 : ctx: &RequestContext,
1476 53273696 : ) -> Result<()> {
1477 53273696 : self.handle_rel_extend(modification, rel, blknum, ctx)
1478 6940 : .await?;
1479 53273693 : modification.put_rel_wal_record(rel, blknum, rec)?;
1480 53273693 : Ok(())
1481 53273696 : }
1482 :
1483 6322 : async fn put_rel_truncation(
1484 6322 : &mut self,
1485 6322 : modification: &mut DatadirModification<'_>,
1486 6322 : rel: RelTag,
1487 6322 : nblocks: BlockNumber,
1488 6322 : ctx: &RequestContext,
1489 6322 : ) -> anyhow::Result<()> {
1490 6322 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1491 6322 : Ok(())
1492 6322 : }
1493 :
1494 67233 : async fn put_rel_drop(
1495 67233 : &mut self,
1496 67233 : modification: &mut DatadirModification<'_>,
1497 67233 : rel: RelTag,
1498 67233 : ctx: &RequestContext,
1499 67233 : ) -> Result<()> {
1500 67233 : modification.put_rel_drop(rel, ctx).await?;
1501 67233 : Ok(())
1502 67233 : }
1503 :
1504 57214349 : async fn handle_rel_extend(
1505 57214349 : &mut self,
1506 57214349 : modification: &mut DatadirModification<'_>,
1507 57214349 : rel: RelTag,
1508 57214349 : blknum: BlockNumber,
1509 57214349 : ctx: &RequestContext,
1510 57214349 : ) -> Result<(), PageReconstructError> {
1511 57214349 : let new_nblocks = blknum + 1;
1512 : // Check if the relation exists. We implicitly create relations on first
1513 : // record.
1514 : // TODO: would be nice if to be more explicit about it
1515 :
1516 : // Get current size and put rel creation if rel doesn't exist
1517 : //
1518 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1519 : // check the cache too. This is because eagerly checking the cache results in
1520 : // less work overall and 10% better performance. It's more work on cache miss
1521 : // but cache miss is rare.
1522 57214349 : let old_nblocks = if let Some(nblocks) = modification
1523 57214349 : .tline
1524 57214349 : .get_cached_rel_size(&rel, modification.get_lsn())
1525 : {
1526 57209877 : nblocks
1527 4472 : } else if !modification
1528 4472 : .tline
1529 4472 : .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
1530 81 : .await?
1531 : {
1532 : // create it with 0 size initially, the logic below will extend it
1533 3639 : modification
1534 3639 : .put_rel_creation(rel, 0, ctx)
1535 531 : .await
1536 3639 : .context("Relation Error")?;
1537 3639 : 0
1538 : } else {
1539 833 : modification
1540 833 : .tline
1541 833 : .get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
1542 224 : .await?
1543 : };
1544 :
1545 57214349 : if new_nblocks > old_nblocks {
1546 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1547 1301198 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1548 :
1549 1301195 : let mut key = rel_block_to_key(rel, blknum);
1550 : // fill the gap with zeros
1551 1301195 : for gap_blknum in old_nblocks..blknum {
1552 239870 : key.field6 = gap_blknum;
1553 239870 :
1554 239870 : if self.shard.get_shard_number(&key) != self.shard.number {
1555 1120 : continue;
1556 238750 : }
1557 238750 :
1558 238750 : modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
1559 : }
1560 55913151 : }
1561 57214346 : Ok(())
1562 57214349 : }
1563 :
1564 1339 : async fn put_slru_page_image(
1565 1339 : &mut self,
1566 1339 : modification: &mut DatadirModification<'_>,
1567 1339 : kind: SlruKind,
1568 1339 : segno: u32,
1569 1339 : blknum: BlockNumber,
1570 1339 : img: Bytes,
1571 1339 : ctx: &RequestContext,
1572 1339 : ) -> Result<()> {
1573 1339 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1574 28 : .await?;
1575 1339 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1576 1339 : Ok(())
1577 1339 : }
1578 :
1579 1339 : async fn handle_slru_extend(
1580 1339 : &mut self,
1581 1339 : modification: &mut DatadirModification<'_>,
1582 1339 : kind: SlruKind,
1583 1339 : segno: u32,
1584 1339 : blknum: BlockNumber,
1585 1339 : ctx: &RequestContext,
1586 1339 : ) -> anyhow::Result<()> {
1587 1339 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1588 1339 : // extended with ZEROPAGE records, not with commit records, so it happens
1589 1339 : // a lot less frequently.
1590 1339 :
1591 1339 : let new_nblocks = blknum + 1;
1592 : // Check if the relation exists. We implicitly create relations on first
1593 : // record.
1594 : // TODO: would be nice if to be more explicit about it
1595 1339 : let old_nblocks = if !modification
1596 1339 : .tline
1597 1339 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1598 14 : .await?
1599 : {
1600 : // create it with 0 size initially, the logic below will extend it
1601 35 : modification
1602 35 : .put_slru_segment_creation(kind, segno, 0, ctx)
1603 0 : .await?;
1604 35 : 0
1605 : } else {
1606 1304 : modification
1607 1304 : .tline
1608 1304 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1609 14 : .await?
1610 : };
1611 :
1612 1339 : if new_nblocks > old_nblocks {
1613 0 : trace!(
1614 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1615 0 : kind,
1616 0 : segno,
1617 0 : old_nblocks,
1618 0 : new_nblocks
1619 0 : );
1620 1315 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1621 :
1622 : // fill the gap with zeros
1623 1315 : for gap_blknum in old_nblocks..blknum {
1624 0 : modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
1625 : }
1626 24 : }
1627 1339 : Ok(())
1628 1339 : }
1629 : }
1630 :
1631 105938 : async fn get_relsize(
1632 105938 : modification: &DatadirModification<'_>,
1633 105938 : rel: RelTag,
1634 105938 : ctx: &RequestContext,
1635 105938 : ) -> anyhow::Result<BlockNumber> {
1636 105938 : let nblocks = if !modification
1637 105938 : .tline
1638 105938 : .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
1639 55 : .await?
1640 : {
1641 2570 : 0
1642 : } else {
1643 103368 : modification
1644 103368 : .tline
1645 103368 : .get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
1646 0 : .await?
1647 : };
1648 105938 : Ok(nblocks)
1649 105938 : }
1650 :
1651 : #[allow(clippy::bool_assert_comparison)]
1652 : #[cfg(test)]
1653 : mod tests {
1654 : use super::*;
1655 : use crate::tenant::harness::*;
1656 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
1657 : use crate::tenant::Timeline;
1658 : use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
1659 : use postgres_ffi::RELSEG_SIZE;
1660 :
1661 : use crate::DEFAULT_PG_VERSION;
1662 :
1663 : /// Arbitrary relation tag, for testing.
1664 : const TESTREL_A: RelTag = RelTag {
1665 : spcnode: 0,
1666 : dbnode: 111,
1667 : relnode: 1000,
1668 : forknum: 0,
1669 : };
1670 :
1671 12 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1672 12 : // TODO
1673 12 : }
1674 :
1675 : static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
1676 :
1677 8 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1678 8 : let mut m = tline.begin_modification(Lsn(0x10));
1679 8 : m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1680 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1681 8 : m.commit(ctx).await?;
1682 8 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1683 :
1684 8 : Ok(walingest)
1685 8 : }
1686 :
1687 2 : #[tokio::test]
1688 2 : async fn test_relsize() -> Result<()> {
1689 2 : let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
1690 2 : let tline = tenant
1691 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1692 6 : .await?;
1693 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1694 :
1695 2 : let mut m = tline.begin_modification(Lsn(0x20));
1696 2 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1697 2 : walingest
1698 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1699 0 : .await?;
1700 2 : m.commit(&ctx).await?;
1701 2 : let mut m = tline.begin_modification(Lsn(0x30));
1702 2 : walingest
1703 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
1704 0 : .await?;
1705 2 : m.commit(&ctx).await?;
1706 2 : let mut m = tline.begin_modification(Lsn(0x40));
1707 2 : walingest
1708 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
1709 0 : .await?;
1710 2 : m.commit(&ctx).await?;
1711 2 : let mut m = tline.begin_modification(Lsn(0x50));
1712 2 : walingest
1713 2 : .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
1714 0 : .await?;
1715 2 : m.commit(&ctx).await?;
1716 :
1717 2 : assert_current_logical_size(&tline, Lsn(0x50));
1718 :
1719 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1720 2 : assert_eq!(
1721 2 : tline
1722 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
1723 0 : .await?,
1724 : false
1725 : );
1726 2 : assert!(tline
1727 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
1728 0 : .await
1729 2 : .is_err());
1730 2 : assert_eq!(
1731 2 : tline
1732 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
1733 0 : .await?,
1734 : true
1735 : );
1736 2 : assert_eq!(
1737 2 : tline
1738 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
1739 0 : .await?,
1740 : 1
1741 : );
1742 2 : assert_eq!(
1743 2 : tline
1744 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
1745 0 : .await?,
1746 : 3
1747 : );
1748 :
1749 : // Check page contents at each LSN
1750 2 : assert_eq!(
1751 2 : tline
1752 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
1753 0 : .await?,
1754 2 : TEST_IMG("foo blk 0 at 2")
1755 : );
1756 :
1757 2 : assert_eq!(
1758 2 : tline
1759 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx)
1760 0 : .await?,
1761 2 : TEST_IMG("foo blk 0 at 3")
1762 : );
1763 :
1764 2 : assert_eq!(
1765 2 : tline
1766 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
1767 0 : .await?,
1768 2 : TEST_IMG("foo blk 0 at 3")
1769 : );
1770 2 : assert_eq!(
1771 2 : tline
1772 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
1773 0 : .await?,
1774 2 : TEST_IMG("foo blk 1 at 4")
1775 : );
1776 :
1777 2 : assert_eq!(
1778 2 : tline
1779 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
1780 0 : .await?,
1781 2 : TEST_IMG("foo blk 0 at 3")
1782 : );
1783 2 : assert_eq!(
1784 2 : tline
1785 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
1786 0 : .await?,
1787 2 : TEST_IMG("foo blk 1 at 4")
1788 : );
1789 2 : assert_eq!(
1790 2 : tline
1791 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
1792 0 : .await?,
1793 2 : TEST_IMG("foo blk 2 at 5")
1794 : );
1795 :
1796 : // Truncate last block
1797 2 : let mut m = tline.begin_modification(Lsn(0x60));
1798 2 : walingest
1799 2 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1800 0 : .await?;
1801 2 : m.commit(&ctx).await?;
1802 2 : assert_current_logical_size(&tline, Lsn(0x60));
1803 :
1804 : // Check reported size and contents after truncation
1805 2 : assert_eq!(
1806 2 : tline
1807 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
1808 0 : .await?,
1809 : 2
1810 : );
1811 2 : assert_eq!(
1812 2 : tline
1813 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
1814 0 : .await?,
1815 2 : TEST_IMG("foo blk 0 at 3")
1816 : );
1817 2 : assert_eq!(
1818 2 : tline
1819 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
1820 0 : .await?,
1821 2 : TEST_IMG("foo blk 1 at 4")
1822 : );
1823 :
1824 : // should still see the truncated block with older LSN
1825 2 : assert_eq!(
1826 2 : tline
1827 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
1828 0 : .await?,
1829 : 3
1830 : );
1831 2 : assert_eq!(
1832 2 : tline
1833 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
1834 0 : .await?,
1835 2 : TEST_IMG("foo blk 2 at 5")
1836 : );
1837 :
1838 : // Truncate to zero length
1839 2 : let mut m = tline.begin_modification(Lsn(0x68));
1840 2 : walingest
1841 2 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1842 0 : .await?;
1843 2 : m.commit(&ctx).await?;
1844 2 : assert_eq!(
1845 2 : tline
1846 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), Lsn::INVALID, &ctx)
1847 0 : .await?,
1848 : 0
1849 : );
1850 :
1851 : // Extend from 0 to 2 blocks, leaving a gap
1852 2 : let mut m = tline.begin_modification(Lsn(0x70));
1853 2 : walingest
1854 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
1855 0 : .await?;
1856 2 : m.commit(&ctx).await?;
1857 2 : assert_eq!(
1858 2 : tline
1859 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
1860 0 : .await?,
1861 : 2
1862 : );
1863 2 : assert_eq!(
1864 2 : tline
1865 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
1866 1 : .await?,
1867 2 : ZERO_PAGE
1868 : );
1869 2 : assert_eq!(
1870 2 : tline
1871 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
1872 0 : .await?,
1873 2 : TEST_IMG("foo blk 1")
1874 : );
1875 :
1876 : // Extend a lot more, leaving a big gap that spans across segments
1877 2 : let mut m = tline.begin_modification(Lsn(0x80));
1878 2 : walingest
1879 2 : .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
1880 0 : .await?;
1881 71 : m.commit(&ctx).await?;
1882 2 : assert_eq!(
1883 2 : tline
1884 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
1885 0 : .await?,
1886 : 1501
1887 : );
1888 2998 : for blk in 2..1500 {
1889 2996 : assert_eq!(
1890 2996 : tline
1891 2996 : .get_rel_page_at_lsn(
1892 2996 : TESTREL_A,
1893 2996 : blk,
1894 2996 : Version::Lsn(Lsn(0x80)),
1895 2996 : Lsn::INVALID,
1896 2996 : &ctx
1897 2996 : )
1898 3019 : .await?,
1899 2996 : ZERO_PAGE
1900 : );
1901 : }
1902 2 : assert_eq!(
1903 2 : tline
1904 2 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
1905 1 : .await?,
1906 2 : TEST_IMG("foo blk 1500")
1907 : );
1908 :
1909 2 : Ok(())
1910 : }
1911 :
1912 : // Test what happens if we dropped a relation
1913 : // and then created it again within the same layer.
1914 2 : #[tokio::test]
1915 2 : async fn test_drop_extend() -> Result<()> {
1916 2 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
1917 2 : let tline = tenant
1918 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1919 6 : .await?;
1920 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1921 :
1922 2 : let mut m = tline.begin_modification(Lsn(0x20));
1923 2 : walingest
1924 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1925 0 : .await?;
1926 2 : m.commit(&ctx).await?;
1927 :
1928 : // Check that rel exists and size is correct
1929 2 : assert_eq!(
1930 2 : tline
1931 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
1932 0 : .await?,
1933 : true
1934 : );
1935 2 : assert_eq!(
1936 2 : tline
1937 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
1938 0 : .await?,
1939 : 1
1940 : );
1941 :
1942 : // Drop rel
1943 2 : let mut m = tline.begin_modification(Lsn(0x30));
1944 2 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
1945 2 : m.commit(&ctx).await?;
1946 :
1947 : // Check that rel is not visible anymore
1948 2 : assert_eq!(
1949 2 : tline
1950 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx)
1951 0 : .await?,
1952 : false
1953 : );
1954 :
1955 : // FIXME: should fail
1956 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
1957 :
1958 : // Re-create it
1959 2 : let mut m = tline.begin_modification(Lsn(0x40));
1960 2 : walingest
1961 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
1962 0 : .await?;
1963 2 : m.commit(&ctx).await?;
1964 :
1965 : // Check that rel exists and size is correct
1966 2 : assert_eq!(
1967 2 : tline
1968 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
1969 0 : .await?,
1970 : true
1971 : );
1972 2 : assert_eq!(
1973 2 : tline
1974 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
1975 0 : .await?,
1976 : 1
1977 : );
1978 :
1979 2 : Ok(())
1980 : }
1981 :
1982 : // Test what happens if we truncated a relation
1983 : // so that one of its segments was dropped
1984 : // and then extended it again within the same layer.
1985 2 : #[tokio::test]
1986 2 : async fn test_truncate_extend() -> Result<()> {
1987 2 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
1988 2 : let tline = tenant
1989 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1990 7 : .await?;
1991 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1992 :
1993 : // Create a 20 MB relation (the size is arbitrary)
1994 2 : let relsize = 20 * 1024 * 1024 / 8192;
1995 2 : let mut m = tline.begin_modification(Lsn(0x20));
1996 5120 : for blkno in 0..relsize {
1997 5120 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
1998 5120 : walingest
1999 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
2000 0 : .await?;
2001 : }
2002 2 : m.commit(&ctx).await?;
2003 :
2004 : // The relation was created at LSN 20, not visible at LSN 1 yet.
2005 2 : assert_eq!(
2006 2 : tline
2007 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
2008 0 : .await?,
2009 : false
2010 : );
2011 2 : assert!(tline
2012 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
2013 0 : .await
2014 2 : .is_err());
2015 :
2016 2 : assert_eq!(
2017 2 : tline
2018 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
2019 0 : .await?,
2020 : true
2021 : );
2022 2 : assert_eq!(
2023 2 : tline
2024 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
2025 0 : .await?,
2026 : relsize
2027 : );
2028 :
2029 : // Check relation content
2030 5120 : for blkno in 0..relsize {
2031 5120 : let lsn = Lsn(0x20);
2032 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2033 5120 : assert_eq!(
2034 5120 : tline
2035 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), Lsn::INVALID, &ctx)
2036 201 : .await?,
2037 5120 : TEST_IMG(&data)
2038 : );
2039 : }
2040 :
2041 : // Truncate relation so that second segment was dropped
2042 : // - only leave one page
2043 2 : let mut m = tline.begin_modification(Lsn(0x60));
2044 2 : walingest
2045 2 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2046 1 : .await?;
2047 2 : m.commit(&ctx).await?;
2048 :
2049 : // Check reported size and contents after truncation
2050 2 : assert_eq!(
2051 2 : tline
2052 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
2053 0 : .await?,
2054 : 1
2055 : );
2056 :
2057 4 : for blkno in 0..1 {
2058 2 : let lsn = Lsn(0x20);
2059 2 : let data = format!("foo blk {} at {}", blkno, lsn);
2060 2 : assert_eq!(
2061 2 : tline
2062 2 : .get_rel_page_at_lsn(
2063 2 : TESTREL_A,
2064 2 : blkno,
2065 2 : Version::Lsn(Lsn(0x60)),
2066 2 : Lsn::INVALID,
2067 2 : &ctx
2068 2 : )
2069 0 : .await?,
2070 2 : TEST_IMG(&data)
2071 : );
2072 : }
2073 :
2074 : // should still see all blocks with older LSN
2075 2 : assert_eq!(
2076 2 : tline
2077 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
2078 0 : .await?,
2079 : relsize
2080 : );
2081 5120 : for blkno in 0..relsize {
2082 5120 : let lsn = Lsn(0x20);
2083 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2084 5120 : assert_eq!(
2085 5120 : tline
2086 5120 : .get_rel_page_at_lsn(
2087 5120 : TESTREL_A,
2088 5120 : blkno,
2089 5120 : Version::Lsn(Lsn(0x50)),
2090 5120 : Lsn::INVALID,
2091 5120 : &ctx
2092 5120 : )
2093 400 : .await?,
2094 5120 : TEST_IMG(&data)
2095 : );
2096 : }
2097 :
2098 : // Extend relation again.
2099 : // Add enough blocks to create second segment
2100 2 : let lsn = Lsn(0x80);
2101 2 : let mut m = tline.begin_modification(lsn);
2102 5120 : for blkno in 0..relsize {
2103 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2104 5120 : walingest
2105 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
2106 0 : .await?;
2107 : }
2108 2 : m.commit(&ctx).await?;
2109 :
2110 2 : assert_eq!(
2111 2 : tline
2112 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
2113 0 : .await?,
2114 : true
2115 : );
2116 2 : assert_eq!(
2117 2 : tline
2118 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
2119 0 : .await?,
2120 : relsize
2121 : );
2122 : // Check relation content
2123 5120 : for blkno in 0..relsize {
2124 5120 : let lsn = Lsn(0x80);
2125 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2126 5120 : assert_eq!(
2127 5120 : tline
2128 5120 : .get_rel_page_at_lsn(
2129 5120 : TESTREL_A,
2130 5120 : blkno,
2131 5120 : Version::Lsn(Lsn(0x80)),
2132 5120 : Lsn::INVALID,
2133 5120 : &ctx
2134 5120 : )
2135 201 : .await?,
2136 5120 : TEST_IMG(&data)
2137 : );
2138 : }
2139 :
2140 2 : Ok(())
2141 : }
2142 :
2143 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2144 : /// split into multiple 1 GB segments in Postgres.
2145 2 : #[tokio::test]
2146 2 : async fn test_large_rel() -> Result<()> {
2147 2 : let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
2148 2 : let tline = tenant
2149 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2150 7 : .await?;
2151 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2152 :
2153 2 : let mut lsn = 0x10;
2154 262146 : for blknum in 0..RELSEG_SIZE + 1 {
2155 262146 : lsn += 0x10;
2156 262146 : let mut m = tline.begin_modification(Lsn(lsn));
2157 262146 : let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2158 262146 : walingest
2159 262146 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2160 6163 : .await?;
2161 262146 : m.commit(&ctx).await?;
2162 : }
2163 :
2164 2 : assert_current_logical_size(&tline, Lsn(lsn));
2165 :
2166 2 : assert_eq!(
2167 2 : tline
2168 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
2169 0 : .await?,
2170 2 : RELSEG_SIZE + 1
2171 : );
2172 :
2173 : // Truncate one block
2174 2 : lsn += 0x10;
2175 2 : let mut m = tline.begin_modification(Lsn(lsn));
2176 2 : walingest
2177 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2178 0 : .await?;
2179 2 : m.commit(&ctx).await?;
2180 2 : assert_eq!(
2181 2 : tline
2182 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
2183 0 : .await?,
2184 : RELSEG_SIZE
2185 : );
2186 2 : assert_current_logical_size(&tline, Lsn(lsn));
2187 2 :
2188 2 : // Truncate another block
2189 2 : lsn += 0x10;
2190 2 : let mut m = tline.begin_modification(Lsn(lsn));
2191 2 : walingest
2192 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2193 0 : .await?;
2194 2 : m.commit(&ctx).await?;
2195 2 : assert_eq!(
2196 2 : tline
2197 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
2198 0 : .await?,
2199 2 : RELSEG_SIZE - 1
2200 : );
2201 2 : assert_current_logical_size(&tline, Lsn(lsn));
2202 2 :
2203 2 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2204 2 : // This tests the behavior at segment boundaries
2205 2 : let mut size: i32 = 3000;
2206 6004 : while size >= 0 {
2207 6002 : lsn += 0x10;
2208 6002 : let mut m = tline.begin_modification(Lsn(lsn));
2209 6002 : walingest
2210 6002 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2211 140 : .await?;
2212 6002 : m.commit(&ctx).await?;
2213 6002 : assert_eq!(
2214 6002 : tline
2215 6002 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
2216 0 : .await?,
2217 6002 : size as BlockNumber
2218 : );
2219 :
2220 6002 : size -= 1;
2221 : }
2222 2 : assert_current_logical_size(&tline, Lsn(lsn));
2223 2 :
2224 2 : Ok(())
2225 : }
2226 :
2227 : /// Replay a wal segment file taken directly from safekeepers.
2228 : ///
2229 : /// This test is useful for benchmarking since it allows us to profile only
2230 : /// the walingest code in a single-threaded executor, and iterate more quickly
2231 : /// without waiting for unrelated steps.
2232 2 : #[tokio::test]
2233 2 : async fn test_ingest_real_wal() {
2234 2 : use crate::tenant::harness::*;
2235 2 : use postgres_ffi::waldecoder::WalStreamDecoder;
2236 2 : use postgres_ffi::WAL_SEGMENT_SIZE;
2237 2 :
2238 2 : // Define test data path and constants.
2239 2 : //
2240 2 : // Steps to reconstruct the data, if needed:
2241 2 : // 1. Run the pgbench python test
2242 2 : // 2. Take the first wal segment file from safekeeper
2243 2 : // 3. Compress it using `zstd --long input_file`
2244 2 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2245 2 : // 5. Grep sk logs for "restart decoder" to get startpoint
2246 2 : // 6. Run just the decoder from this test to get the endpoint.
2247 2 : // It's the last LSN the decoder will output.
2248 2 : let pg_version = 15; // The test data was generated by pg15
2249 2 : let path = "test_data/sk_wal_segment_from_pgbench";
2250 2 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2251 2 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2252 2 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2253 2 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2254 2 :
2255 2 : let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
2256 2 : let (tenant, ctx) = harness.load().await;
2257 :
2258 2 : let remote_initdb_path =
2259 2 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2260 2 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2261 2 :
2262 2 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2263 2 : .expect("creating test dir should work");
2264 2 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2265 :
2266 : // Bootstrap a real timeline. We can't use create_test_timeline because
2267 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2268 : // the garbage data.
2269 2 : let tline = tenant
2270 2 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2271 20423 : .await
2272 2 : .unwrap();
2273 :
2274 : // We fully read and decompress this into memory before decoding
2275 : // to get a more accurate perf profile of the decoder.
2276 2 : let bytes = {
2277 : use async_compression::tokio::bufread::ZstdDecoder;
2278 2 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2279 2 : let reader = tokio::io::BufReader::new(file);
2280 2 : let decoder = ZstdDecoder::new(reader);
2281 2 : let mut reader = tokio::io::BufReader::new(decoder);
2282 2 : let mut buffer = Vec::new();
2283 224 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2284 2 : buffer
2285 2 : };
2286 2 :
2287 2 : // TODO start a profiler too
2288 2 : let started_at = std::time::Instant::now();
2289 2 :
2290 2 : // Initialize walingest
2291 2 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2292 2 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2293 2 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2294 5 : .await
2295 2 : .unwrap();
2296 2 : let mut modification = tline.begin_modification(startpoint);
2297 2 : let mut decoded = DecodedWALRecord::default();
2298 2 : println!("decoding {} bytes", bytes.len() - xlogoff);
2299 :
2300 : // Decode and ingest wal. We process the wal in chunks because
2301 : // that's what happens when we get bytes from safekeepers.
2302 474686 : for chunk in bytes[xlogoff..].chunks(50) {
2303 474686 : decoder.feed_bytes(chunk);
2304 620536 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2305 145850 : walingest
2306 145850 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
2307 105 : .await
2308 145850 : .unwrap();
2309 : }
2310 474686 : modification.commit(&ctx).await.unwrap();
2311 : }
2312 :
2313 2 : let duration = started_at.elapsed();
2314 2 : println!("done in {:?}", duration);
2315 : }
2316 : }
|