Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use pageserver_api::shard::ShardIdentity;
25 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
26 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
27 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
28 :
29 : use anyhow::{bail, Context, Result};
30 : use bytes::{Buf, Bytes, BytesMut};
31 : use tracing::*;
32 : use utils::failpoint_support;
33 :
34 : use crate::context::RequestContext;
35 : use crate::metrics::WAL_INGEST;
36 : use crate::pgdatadir_mapping::{DatadirModification, Version};
37 : use crate::tenant::PageReconstructError;
38 : use crate::tenant::Timeline;
39 : use crate::walrecord::*;
40 : use crate::ZERO_PAGE;
41 : use pageserver_api::key::rel_block_to_key;
42 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
43 : use postgres_ffi::pg_constants;
44 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
45 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
46 : use postgres_ffi::v14::xlog_utils::*;
47 : use postgres_ffi::v14::CheckPoint;
48 : use postgres_ffi::TransactionId;
49 : use postgres_ffi::BLCKSZ;
50 : use utils::lsn::Lsn;
51 :
52 : pub struct WalIngest {
53 : shard: ShardIdentity,
54 : checkpoint: CheckPoint,
55 : checkpoint_modified: bool,
56 : }
57 :
58 : impl WalIngest {
59 1338 : pub async fn new(
60 1338 : timeline: &Timeline,
61 1338 : startpoint: Lsn,
62 1338 : ctx: &RequestContext,
63 1338 : ) -> anyhow::Result<WalIngest> {
64 : // Fetch the latest checkpoint into memory, so that we can compare with it
65 : // quickly in `ingest_record` and update it when it changes.
66 1338 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
67 1324 : let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
68 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
69 :
70 1324 : Ok(WalIngest {
71 1324 : shard: *timeline.get_shard_identity(),
72 1324 : checkpoint,
73 1324 : checkpoint_modified: false,
74 1324 : })
75 1338 : }
76 :
77 : ///
78 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
79 : ///
80 : /// This function updates `lsn` field of `DatadirModification`
81 : ///
82 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
83 : /// relations/pages that the record affects.
84 : ///
85 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
86 : ///
87 73047338 : pub async fn ingest_record(
88 73047338 : &mut self,
89 73047338 : recdata: Bytes,
90 73047338 : lsn: Lsn,
91 73047338 : modification: &mut DatadirModification<'_>,
92 73047338 : decoded: &mut DecodedWALRecord,
93 73047338 : ctx: &RequestContext,
94 73047349 : ) -> anyhow::Result<bool> {
95 73047349 : WAL_INGEST.records_received.inc();
96 73047349 : let pg_version = modification.tline.pg_version;
97 73047349 : let prev_len = modification.len();
98 73047349 :
99 73047349 : modification.set_lsn(lsn)?;
100 73047349 : decode_wal_record(recdata, decoded, pg_version)?;
101 :
102 73047349 : let mut buf = decoded.record.clone();
103 73047349 : buf.advance(decoded.main_data_offset);
104 73047349 :
105 73047349 : assert!(!self.checkpoint_modified);
106 73047349 : if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
107 71794365 : && self.checkpoint.update_next_xid(decoded.xl_xid)
108 8455 : {
109 8455 : self.checkpoint_modified = true;
110 73038894 : }
111 :
112 73047349 : match decoded.xl_rmid {
113 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
114 : // Heap AM records need some special handling, because they modify VM pages
115 : // without registering them with the standard mechanism.
116 54762178 : self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
117 53 : .await?;
118 : }
119 : pg_constants::RM_NEON_ID => {
120 0 : self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
121 0 : .await?;
122 : }
123 : // Handle other special record types
124 : pg_constants::RM_SMGR_ID => {
125 75603 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
126 75603 :
127 75603 : if info == pg_constants::XLOG_SMGR_CREATE {
128 75378 : let create = XlSmgrCreate::decode(&mut buf);
129 75378 : self.ingest_xlog_smgr_create(modification, &create, ctx)
130 1240 : .await?;
131 225 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
132 225 : let truncate = XlSmgrTruncate::decode(&mut buf);
133 225 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
134 2 : .await?;
135 0 : }
136 : }
137 : pg_constants::RM_DBASE_ID => {
138 27 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
139 0 : debug!(%info, %pg_version, "handle RM_DBASE_ID");
140 :
141 27 : if pg_version == 14 {
142 27 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
143 24 : let createdb = XlCreateDatabase::decode(&mut buf);
144 0 : debug!("XLOG_DBASE_CREATE v14");
145 :
146 24 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
147 1547 : .await?;
148 3 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
149 3 : let dropdb = XlDropDatabase::decode(&mut buf);
150 6 : for tablespace_id in dropdb.tablespace_ids {
151 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
152 3 : modification
153 3 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
154 0 : .await?;
155 : }
156 0 : }
157 0 : } else if pg_version == 15 {
158 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
159 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
160 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
161 : // The XLOG record was renamed between v14 and v15,
162 : // but the record format is the same.
163 : // So we can reuse XlCreateDatabase here.
164 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
165 0 : let createdb = XlCreateDatabase::decode(&mut buf);
166 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
167 0 : .await?;
168 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
169 0 : let dropdb = XlDropDatabase::decode(&mut buf);
170 0 : for tablespace_id in dropdb.tablespace_ids {
171 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
172 0 : modification
173 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
174 0 : .await?;
175 : }
176 0 : }
177 0 : } else if pg_version == 16 {
178 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
179 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
180 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
181 : // The XLOG record was renamed between v14 and v15,
182 : // but the record format is the same.
183 : // So we can reuse XlCreateDatabase here.
184 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
185 0 : let createdb = XlCreateDatabase::decode(&mut buf);
186 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
187 0 : .await?;
188 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
189 0 : let dropdb = XlDropDatabase::decode(&mut buf);
190 0 : for tablespace_id in dropdb.tablespace_ids {
191 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
192 0 : modification
193 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
194 0 : .await?;
195 : }
196 0 : }
197 0 : }
198 : }
199 : pg_constants::RM_TBLSPC_ID => {
200 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
201 : }
202 : pg_constants::RM_CLOG_ID => {
203 1326 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
204 1326 :
205 1326 : if info == pg_constants::CLOG_ZEROPAGE {
206 1323 : let pageno = buf.get_u32_le();
207 1323 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
208 1323 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
209 1323 : self.put_slru_page_image(
210 1323 : modification,
211 1323 : SlruKind::Clog,
212 1323 : segno,
213 1323 : rpageno,
214 1323 : ZERO_PAGE.clone(),
215 1323 : ctx,
216 1323 : )
217 115 : .await?;
218 : } else {
219 3 : assert!(info == pg_constants::CLOG_TRUNCATE);
220 3 : let xlrec = XlClogTruncate::decode(&mut buf);
221 3 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
222 0 : .await?;
223 : }
224 : }
225 : pg_constants::RM_XACT_ID => {
226 6538173 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
227 6538173 :
228 6538173 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
229 6132815 : let parsed_xact =
230 6132815 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
231 6132815 : self.ingest_xact_record(
232 6132815 : modification,
233 6132815 : &parsed_xact,
234 6132815 : info == pg_constants::XLOG_XACT_COMMIT,
235 6132815 : ctx,
236 6132815 : )
237 1361 : .await?;
238 405358 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
239 405357 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
240 : {
241 2 : let parsed_xact =
242 2 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
243 2 : self.ingest_xact_record(
244 2 : modification,
245 2 : &parsed_xact,
246 2 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
247 2 : ctx,
248 2 : )
249 0 : .await?;
250 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
251 0 : trace!(
252 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
253 0 : decoded.xl_xid,
254 0 : parsed_xact.xid,
255 0 : lsn,
256 0 : );
257 2 : modification
258 2 : .drop_twophase_file(parsed_xact.xid, ctx)
259 0 : .await?;
260 405356 : } else if info == pg_constants::XLOG_XACT_PREPARE {
261 4 : modification
262 4 : .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
263 0 : .await?;
264 405352 : }
265 : }
266 : pg_constants::RM_MULTIXACT_ID => {
267 25139 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
268 25139 :
269 25139 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
270 23 : let pageno = buf.get_u32_le();
271 23 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
272 23 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
273 23 : self.put_slru_page_image(
274 23 : modification,
275 23 : SlruKind::MultiXactOffsets,
276 23 : segno,
277 23 : rpageno,
278 23 : ZERO_PAGE.clone(),
279 23 : ctx,
280 23 : )
281 0 : .await?;
282 25116 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
283 300 : let pageno = buf.get_u32_le();
284 300 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
285 300 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
286 300 : self.put_slru_page_image(
287 300 : modification,
288 300 : SlruKind::MultiXactMembers,
289 300 : segno,
290 300 : rpageno,
291 300 : ZERO_PAGE.clone(),
292 300 : ctx,
293 300 : )
294 0 : .await?;
295 24816 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
296 24816 : let xlrec = XlMultiXactCreate::decode(&mut buf);
297 24816 : self.ingest_multixact_create_record(modification, &xlrec)?;
298 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
299 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
300 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
301 0 : .await?;
302 0 : }
303 : }
304 : pg_constants::RM_RELMAP_ID => {
305 62 : let xlrec = XlRelmapUpdate::decode(&mut buf);
306 62 : self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
307 2 : .await?;
308 : }
309 : pg_constants::RM_XLOG_ID => {
310 240343 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
311 240343 :
312 240343 : if info == pg_constants::XLOG_NEXTOID {
313 445 : let next_oid = buf.get_u32_le();
314 445 : if self.checkpoint.nextOid != next_oid {
315 445 : self.checkpoint.nextOid = next_oid;
316 445 : self.checkpoint_modified = true;
317 445 : }
318 239898 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
319 239817 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
320 : {
321 677 : let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
322 677 : buf.copy_to_slice(&mut checkpoint_bytes);
323 677 : let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
324 0 : trace!(
325 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
326 0 : xlog_checkpoint.oldestXid,
327 0 : self.checkpoint.oldestXid
328 0 : );
329 677 : if (self
330 677 : .checkpoint
331 677 : .oldestXid
332 677 : .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
333 677 : < 0
334 0 : {
335 0 : self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
336 677 : }
337 :
338 : // Write a new checkpoint key-value pair on every checkpoint record, even
339 : // if nothing really changed. Not strictly required, but it seems nice to
340 : // have some trace of the checkpoint records in the layer files at the same
341 : // LSNs.
342 677 : self.checkpoint_modified = true;
343 239221 : }
344 : }
345 : pg_constants::RM_LOGICALMSG_ID => {
346 129 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
347 129 :
348 129 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
349 129 : let xlrec = XlLogicalMessage::decode(&mut buf);
350 129 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
351 129 : let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
352 129 : if prefix == "neon-test" {
353 : // This is a convenient way to make the WAL ingestion pause at
354 : // particular point in the WAL. For more fine-grained control,
355 : // we could peek into the message and only pause if it contains
356 : // a particular string, for example, but this is enough for now.
357 6 : failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
358 125 : } else if let Some(path) = prefix.strip_prefix("neon-file:") {
359 118 : modification.put_file(path, message, ctx).await?;
360 7 : }
361 0 : }
362 : }
363 11404349 : _x => {
364 11404349 : // TODO: should probably log & fail here instead of blindly
365 11404349 : // doing something without understanding the protocol
366 11404349 : }
367 : }
368 :
369 : // Iterate through all the blocks that the record modifies, and
370 : // "put" a separate copy of the record for each block.
371 73047349 : for blk in decoded.blocks.iter() {
372 66986380 : let rel = RelTag {
373 66986380 : spcnode: blk.rnode_spcnode,
374 66986380 : dbnode: blk.rnode_dbnode,
375 66986380 : relnode: blk.rnode_relnode,
376 66986380 : forknum: blk.forknum,
377 66986380 : };
378 66986380 :
379 66986380 : let key = rel_block_to_key(rel, blk.blkno);
380 66986380 : let key_is_local = self.shard.is_key_local(&key);
381 :
382 0 : tracing::debug!(
383 0 : lsn=%lsn,
384 0 : key=%key,
385 0 : "ingest: shard decision {} (checkpoint={})",
386 0 : if !key_is_local { "drop" } else { "keep" },
387 0 : self.checkpoint_modified
388 0 : );
389 :
390 66986380 : if !key_is_local {
391 13903887 : if self.shard.is_zero() {
392 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
393 : // its blkno in case it implicitly extends a relation.
394 3520036 : self.observe_decoded_block(modification, blk, ctx).await?;
395 10383851 : }
396 :
397 13903887 : continue;
398 53082493 : }
399 53082493 : self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
400 7535 : .await?;
401 : }
402 :
403 : // If checkpoint data was updated, store the new version in the repository
404 73047346 : if self.checkpoint_modified {
405 34376 : let new_checkpoint_bytes = self.checkpoint.encode()?;
406 :
407 34376 : modification.put_checkpoint(new_checkpoint_bytes)?;
408 34376 : self.checkpoint_modified = false;
409 73012970 : }
410 :
411 : // Note that at this point this record is only cached in the modification
412 : // until commit() is called to flush the data into the repository and update
413 : // the latest LSN.
414 :
415 73047346 : Ok(modification.len() > prev_len)
416 73047349 : }
417 :
418 : /// Do not store this block, but observe it for the purposes of updating our relation size state.
419 3520036 : async fn observe_decoded_block(
420 3520036 : &mut self,
421 3520036 : modification: &mut DatadirModification<'_>,
422 3520036 : blk: &DecodedBkpBlock,
423 3520036 : ctx: &RequestContext,
424 3520036 : ) -> Result<(), PageReconstructError> {
425 3520036 : let rel = RelTag {
426 3520036 : spcnode: blk.rnode_spcnode,
427 3520036 : dbnode: blk.rnode_dbnode,
428 3520036 : relnode: blk.rnode_relnode,
429 3520036 : forknum: blk.forknum,
430 3520036 : };
431 3520036 : self.handle_rel_extend(modification, rel, blk.blkno, ctx)
432 449 : .await
433 3520036 : }
434 :
435 53082477 : async fn ingest_decoded_block(
436 53082477 : &mut self,
437 53082477 : modification: &mut DatadirModification<'_>,
438 53082477 : lsn: Lsn,
439 53082477 : decoded: &DecodedWALRecord,
440 53082477 : blk: &DecodedBkpBlock,
441 53082477 : ctx: &RequestContext,
442 53082493 : ) -> Result<(), PageReconstructError> {
443 53082493 : let rel = RelTag {
444 53082493 : spcnode: blk.rnode_spcnode,
445 53082493 : dbnode: blk.rnode_dbnode,
446 53082493 : relnode: blk.rnode_relnode,
447 53082493 : forknum: blk.forknum,
448 53082493 : };
449 53082493 :
450 53082493 : //
451 53082493 : // Instead of storing full-page-image WAL record,
452 53082493 : // it is better to store extracted image: we can skip wal-redo
453 53082493 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
454 53082493 : // so them have to be copied multiple times.
455 53082493 : //
456 53082493 : if blk.apply_image
457 189852 : && blk.has_image
458 189852 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
459 167676 : && (decoded.xl_info == pg_constants::XLOG_FPI
460 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
461 : // compression of WAL is not yet supported: fall back to storing the original WAL record
462 167676 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
463 : // do not materialize null pages because them most likely be soon replaced with real data
464 167676 : && blk.bimg_len != 0
465 : {
466 : // Extract page image from FPI record
467 167676 : let img_len = blk.bimg_len as usize;
468 167676 : let img_offs = blk.bimg_offset as usize;
469 167676 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
470 167676 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
471 167676 :
472 167676 : if blk.hole_length != 0 {
473 134535 : let tail = image.split_off(blk.hole_offset as usize);
474 134535 : image.resize(image.len() + blk.hole_length as usize, 0u8);
475 134535 : image.unsplit(tail);
476 134535 : }
477 : //
478 : // Match the logic of XLogReadBufferForRedoExtended:
479 : // The page may be uninitialized. If so, we can't set the LSN because
480 : // that would corrupt the page.
481 : //
482 167676 : if !page_is_new(&image) {
483 160496 : page_set_lsn(&mut image, lsn)
484 7180 : }
485 167676 : assert_eq!(image.len(), BLCKSZ as usize);
486 167676 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
487 565 : .await?;
488 : } else {
489 52914817 : let rec = NeonWalRecord::Postgres {
490 52914817 : will_init: blk.will_init || blk.apply_image,
491 52914817 : rec: decoded.record.clone(),
492 52914817 : };
493 52914817 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
494 6970 : .await?;
495 : }
496 53082490 : Ok(())
497 53082493 : }
498 :
499 54762167 : async fn ingest_heapam_record(
500 54762167 : &mut self,
501 54762167 : buf: &mut Bytes,
502 54762167 : modification: &mut DatadirModification<'_>,
503 54762167 : decoded: &DecodedWALRecord,
504 54762167 : ctx: &RequestContext,
505 54762178 : ) -> anyhow::Result<()> {
506 54762178 : // Handle VM bit updates that are implicitly part of heap records.
507 54762178 :
508 54762178 : // First, look at the record to determine which VM bits need
509 54762178 : // to be cleared. If either of these variables is set, we
510 54762178 : // need to clear the corresponding bits in the visibility map.
511 54762178 : let mut new_heap_blkno: Option<u32> = None;
512 54762178 : let mut old_heap_blkno: Option<u32> = None;
513 54762178 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
514 54762178 :
515 54762178 : match modification.tline.pg_version {
516 : 14 => {
517 54616704 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
518 50292894 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
519 50292894 :
520 50292894 : if info == pg_constants::XLOG_HEAP_INSERT {
521 41676372 : let xlrec = v14::XlHeapInsert::decode(buf);
522 41676372 : assert_eq!(0, buf.remaining());
523 41676372 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
524 4871 : new_heap_blkno = Some(decoded.blocks[0].blkno);
525 41671501 : }
526 8616522 : } else if info == pg_constants::XLOG_HEAP_DELETE {
527 2303981 : let xlrec = v14::XlHeapDelete::decode(buf);
528 2303981 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
529 2777 : new_heap_blkno = Some(decoded.blocks[0].blkno);
530 2301204 : }
531 6312541 : } else if info == pg_constants::XLOG_HEAP_UPDATE
532 4806456 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
533 : {
534 3450011 : let xlrec = v14::XlHeapUpdate::decode(buf);
535 3450011 : // the size of tuple data is inferred from the size of the record.
536 3450011 : // we can't validate the remaining number of bytes without parsing
537 3450011 : // the tuple data.
538 3450011 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
539 81531 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
540 3368480 : }
541 3450011 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
542 2087 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
543 2087 : // non-HOT update where the new tuple goes to different page than
544 2087 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
545 2087 : // set.
546 2087 : new_heap_blkno = Some(decoded.blocks[0].blkno);
547 3447924 : }
548 2862530 : } else if info == pg_constants::XLOG_HEAP_LOCK {
549 2760102 : let xlrec = v14::XlHeapLock::decode(buf);
550 2760102 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
551 210 : old_heap_blkno = Some(decoded.blocks[0].blkno);
552 210 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
553 2759892 : }
554 102428 : }
555 4323810 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
556 4323810 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
557 4323810 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
558 1126139 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
559 :
560 1126139 : let offset_array_len =
561 1126139 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
562 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
563 689082 : 0
564 : } else {
565 437057 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
566 : };
567 1126139 : assert_eq!(offset_array_len, buf.remaining());
568 :
569 1126139 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
570 3730 : new_heap_blkno = Some(decoded.blocks[0].blkno);
571 1122409 : }
572 3197671 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
573 4146 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
574 4146 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
575 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
576 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
577 4146 : }
578 3193525 : }
579 : } else {
580 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
581 : }
582 : }
583 : 15 => {
584 145474 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
585 145286 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
586 145286 :
587 145286 : if info == pg_constants::XLOG_HEAP_INSERT {
588 145276 : let xlrec = v15::XlHeapInsert::decode(buf);
589 145276 : assert_eq!(0, buf.remaining());
590 145276 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
591 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
592 145272 : }
593 10 : } else if info == pg_constants::XLOG_HEAP_DELETE {
594 0 : let xlrec = v15::XlHeapDelete::decode(buf);
595 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
596 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
597 0 : }
598 10 : } else if info == pg_constants::XLOG_HEAP_UPDATE
599 2 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
600 : {
601 8 : let xlrec = v15::XlHeapUpdate::decode(buf);
602 8 : // the size of tuple data is inferred from the size of the record.
603 8 : // we can't validate the remaining number of bytes without parsing
604 8 : // the tuple data.
605 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
606 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
607 8 : }
608 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
609 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
610 0 : // non-HOT update where the new tuple goes to different page than
611 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
612 0 : // set.
613 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
614 8 : }
615 2 : } else if info == pg_constants::XLOG_HEAP_LOCK {
616 0 : let xlrec = v15::XlHeapLock::decode(buf);
617 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
618 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
619 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
620 0 : }
621 2 : }
622 188 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
623 188 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
624 188 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
625 42 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
626 :
627 42 : let offset_array_len =
628 42 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
629 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
630 2 : 0
631 : } else {
632 40 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
633 : };
634 42 : assert_eq!(offset_array_len, buf.remaining());
635 :
636 42 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
637 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
638 34 : }
639 146 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
640 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
641 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
642 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
643 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
644 0 : }
645 146 : }
646 : } else {
647 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
648 : }
649 : }
650 : 16 => {
651 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
652 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
653 0 :
654 0 : if info == pg_constants::XLOG_HEAP_INSERT {
655 0 : let xlrec = v16::XlHeapInsert::decode(buf);
656 0 : assert_eq!(0, buf.remaining());
657 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
658 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
659 0 : }
660 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
661 0 : let xlrec = v16::XlHeapDelete::decode(buf);
662 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
663 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
664 0 : }
665 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
666 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
667 : {
668 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
669 0 : // the size of tuple data is inferred from the size of the record.
670 0 : // we can't validate the remaining number of bytes without parsing
671 0 : // the tuple data.
672 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
673 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
674 0 : }
675 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
676 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
677 0 : // non-HOT update where the new tuple goes to different page than
678 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
679 0 : // set.
680 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
681 0 : }
682 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
683 0 : let xlrec = v16::XlHeapLock::decode(buf);
684 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
685 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
686 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
687 0 : }
688 0 : }
689 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
690 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
691 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
692 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
693 :
694 0 : let offset_array_len =
695 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
696 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
697 0 : 0
698 : } else {
699 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
700 : };
701 0 : assert_eq!(offset_array_len, buf.remaining());
702 :
703 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
704 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
705 0 : }
706 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
707 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
708 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
709 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
710 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
711 0 : }
712 0 : }
713 : } else {
714 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
715 : }
716 : }
717 0 : _ => {}
718 : }
719 :
720 : // Clear the VM bits if required.
721 54762178 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
722 95121 : let vm_rel = RelTag {
723 95121 : forknum: VISIBILITYMAP_FORKNUM,
724 95121 : spcnode: decoded.blocks[0].rnode_spcnode,
725 95121 : dbnode: decoded.blocks[0].rnode_dbnode,
726 95121 : relnode: decoded.blocks[0].rnode_relnode,
727 95121 : };
728 95121 :
729 95121 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
730 95121 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
731 :
732 : // Sometimes, Postgres seems to create heap WAL records with the
733 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
734 : // not set. In fact, it's possible that the VM page does not exist at all.
735 : // In that case, we don't want to store a record to clear the VM bit;
736 : // replaying it would fail to find the previous image of the page, because
737 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
738 : // record if it doesn't.
739 95121 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
740 95121 : if let Some(blknum) = new_vm_blk {
741 13477 : if blknum >= vm_size {
742 1947 : new_vm_blk = None;
743 11530 : }
744 81644 : }
745 95121 : if let Some(blknum) = old_vm_blk {
746 81741 : if blknum >= vm_size {
747 589 : old_vm_blk = None;
748 81152 : }
749 13380 : }
750 :
751 95121 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
752 92599 : if new_vm_blk == old_vm_blk {
753 : // An UPDATE record that needs to clear the bits for both old and the
754 : // new page, both of which reside on the same VM page.
755 83 : self.put_rel_wal_record(
756 83 : modification,
757 83 : vm_rel,
758 83 : new_vm_blk.unwrap(),
759 83 : NeonWalRecord::ClearVisibilityMapFlags {
760 83 : new_heap_blkno,
761 83 : old_heap_blkno,
762 83 : flags,
763 83 : },
764 83 : ctx,
765 83 : )
766 0 : .await?;
767 : } else {
768 : // Clear VM bits for one heap page, or for two pages that reside on
769 : // different VM pages.
770 92516 : if let Some(new_vm_blk) = new_vm_blk {
771 11447 : self.put_rel_wal_record(
772 11447 : modification,
773 11447 : vm_rel,
774 11447 : new_vm_blk,
775 11447 : NeonWalRecord::ClearVisibilityMapFlags {
776 11447 : new_heap_blkno,
777 11447 : old_heap_blkno: None,
778 11447 : flags,
779 11447 : },
780 11447 : ctx,
781 11447 : )
782 0 : .await?;
783 81069 : }
784 92516 : if let Some(old_vm_blk) = old_vm_blk {
785 81069 : self.put_rel_wal_record(
786 81069 : modification,
787 81069 : vm_rel,
788 81069 : old_vm_blk,
789 81069 : NeonWalRecord::ClearVisibilityMapFlags {
790 81069 : new_heap_blkno: None,
791 81069 : old_heap_blkno,
792 81069 : flags,
793 81069 : },
794 81069 : ctx,
795 81069 : )
796 0 : .await?;
797 11447 : }
798 : }
799 2522 : }
800 54667057 : }
801 :
802 54762178 : Ok(())
803 54762178 : }
804 :
805 0 : async fn ingest_neonrmgr_record(
806 0 : &mut self,
807 0 : buf: &mut Bytes,
808 0 : modification: &mut DatadirModification<'_>,
809 0 : decoded: &DecodedWALRecord,
810 0 : ctx: &RequestContext,
811 0 : ) -> anyhow::Result<()> {
812 0 : // Handle VM bit updates that are implicitly part of heap records.
813 0 :
814 0 : // First, look at the record to determine which VM bits need
815 0 : // to be cleared. If either of these variables is set, we
816 0 : // need to clear the corresponding bits in the visibility map.
817 0 : let mut new_heap_blkno: Option<u32> = None;
818 0 : let mut old_heap_blkno: Option<u32> = None;
819 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
820 0 : let pg_version = modification.tline.pg_version;
821 0 :
822 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
823 :
824 0 : match pg_version {
825 : 16 => {
826 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
827 0 :
828 0 : match info {
829 : pg_constants::XLOG_NEON_HEAP_INSERT => {
830 0 : let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
831 0 : assert_eq!(0, buf.remaining());
832 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
833 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
834 0 : }
835 : }
836 : pg_constants::XLOG_NEON_HEAP_DELETE => {
837 0 : let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
838 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
839 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
840 0 : }
841 : }
842 : pg_constants::XLOG_NEON_HEAP_UPDATE
843 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
844 0 : let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
845 0 : // the size of tuple data is inferred from the size of the record.
846 0 : // we can't validate the remaining number of bytes without parsing
847 0 : // the tuple data.
848 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
849 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
850 0 : }
851 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
852 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
853 0 : // non-HOT update where the new tuple goes to different page than
854 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
855 0 : // set.
856 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
857 0 : }
858 : }
859 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
860 0 : let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
861 :
862 0 : let offset_array_len =
863 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
864 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
865 0 : 0
866 : } else {
867 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
868 : };
869 0 : assert_eq!(offset_array_len, buf.remaining());
870 :
871 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
872 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
873 0 : }
874 : }
875 : pg_constants::XLOG_NEON_HEAP_LOCK => {
876 0 : let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
877 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
878 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
879 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
880 0 : }
881 : }
882 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
883 : }
884 : }
885 0 : _ => bail!(
886 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
887 0 : pg_version
888 0 : ),
889 : }
890 :
891 : // Clear the VM bits if required.
892 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
893 0 : let vm_rel = RelTag {
894 0 : forknum: VISIBILITYMAP_FORKNUM,
895 0 : spcnode: decoded.blocks[0].rnode_spcnode,
896 0 : dbnode: decoded.blocks[0].rnode_dbnode,
897 0 : relnode: decoded.blocks[0].rnode_relnode,
898 0 : };
899 0 :
900 0 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
901 0 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
902 :
903 : // Sometimes, Postgres seems to create heap WAL records with the
904 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
905 : // not set. In fact, it's possible that the VM page does not exist at all.
906 : // In that case, we don't want to store a record to clear the VM bit;
907 : // replaying it would fail to find the previous image of the page, because
908 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
909 : // record if it doesn't.
910 0 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
911 0 : if let Some(blknum) = new_vm_blk {
912 0 : if blknum >= vm_size {
913 0 : new_vm_blk = None;
914 0 : }
915 0 : }
916 0 : if let Some(blknum) = old_vm_blk {
917 0 : if blknum >= vm_size {
918 0 : old_vm_blk = None;
919 0 : }
920 0 : }
921 :
922 0 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
923 0 : if new_vm_blk == old_vm_blk {
924 : // An UPDATE record that needs to clear the bits for both old and the
925 : // new page, both of which reside on the same VM page.
926 0 : self.put_rel_wal_record(
927 0 : modification,
928 0 : vm_rel,
929 0 : new_vm_blk.unwrap(),
930 0 : NeonWalRecord::ClearVisibilityMapFlags {
931 0 : new_heap_blkno,
932 0 : old_heap_blkno,
933 0 : flags,
934 0 : },
935 0 : ctx,
936 0 : )
937 0 : .await?;
938 : } else {
939 : // Clear VM bits for one heap page, or for two pages that reside on
940 : // different VM pages.
941 0 : if let Some(new_vm_blk) = new_vm_blk {
942 0 : self.put_rel_wal_record(
943 0 : modification,
944 0 : vm_rel,
945 0 : new_vm_blk,
946 0 : NeonWalRecord::ClearVisibilityMapFlags {
947 0 : new_heap_blkno,
948 0 : old_heap_blkno: None,
949 0 : flags,
950 0 : },
951 0 : ctx,
952 0 : )
953 0 : .await?;
954 0 : }
955 0 : if let Some(old_vm_blk) = old_vm_blk {
956 0 : self.put_rel_wal_record(
957 0 : modification,
958 0 : vm_rel,
959 0 : old_vm_blk,
960 0 : NeonWalRecord::ClearVisibilityMapFlags {
961 0 : new_heap_blkno: None,
962 0 : old_heap_blkno,
963 0 : flags,
964 0 : },
965 0 : ctx,
966 0 : )
967 0 : .await?;
968 0 : }
969 : }
970 0 : }
971 0 : }
972 :
973 0 : Ok(())
974 0 : }
975 :
976 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
977 24 : async fn ingest_xlog_dbase_create(
978 24 : &mut self,
979 24 : modification: &mut DatadirModification<'_>,
980 24 : rec: &XlCreateDatabase,
981 24 : ctx: &RequestContext,
982 24 : ) -> anyhow::Result<()> {
983 24 : let db_id = rec.db_id;
984 24 : let tablespace_id = rec.tablespace_id;
985 24 : let src_db_id = rec.src_db_id;
986 24 : let src_tablespace_id = rec.src_tablespace_id;
987 :
988 24 : let rels = modification
989 24 : .tline
990 24 : .list_rels(
991 24 : src_tablespace_id,
992 24 : src_db_id,
993 24 : Version::Modified(modification),
994 24 : ctx,
995 24 : )
996 0 : .await?;
997 :
998 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
999 :
1000 : // Copy relfilemap
1001 24 : let filemap = modification
1002 24 : .tline
1003 24 : .get_relmap_file(
1004 24 : src_tablespace_id,
1005 24 : src_db_id,
1006 24 : Version::Modified(modification),
1007 24 : ctx,
1008 24 : )
1009 0 : .await?;
1010 24 : modification
1011 24 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
1012 0 : .await?;
1013 :
1014 24 : let mut num_rels_copied = 0;
1015 24 : let mut num_blocks_copied = 0;
1016 7032 : for src_rel in rels {
1017 7008 : assert_eq!(src_rel.spcnode, src_tablespace_id);
1018 7008 : assert_eq!(src_rel.dbnode, src_db_id);
1019 :
1020 7008 : let nblocks = modification
1021 7008 : .tline
1022 7008 : .get_rel_size(src_rel, Version::Modified(modification), true, ctx)
1023 196 : .await?;
1024 7008 : let dst_rel = RelTag {
1025 7008 : spcnode: tablespace_id,
1026 7008 : dbnode: db_id,
1027 7008 : relnode: src_rel.relnode,
1028 7008 : forknum: src_rel.forknum,
1029 7008 : };
1030 7008 :
1031 7008 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
1032 :
1033 : // Copy content
1034 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
1035 24264 : for blknum in 0..nblocks {
1036 : // Sharding:
1037 : // - src and dst are always on the same shard, because they differ only by dbNode, and
1038 : // dbNode is not included in the hash inputs for sharding.
1039 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
1040 : // that belong to it.
1041 24264 : let src_key = rel_block_to_key(src_rel, blknum);
1042 24264 : if !self.shard.is_key_local(&src_key) {
1043 0 : debug!(
1044 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
1045 0 : src_key
1046 0 : );
1047 9099 : continue;
1048 15165 : }
1049 0 : debug!(
1050 0 : "copying block {} from {} ({}) to {}",
1051 0 : blknum, src_rel, src_key, dst_rel
1052 0 : );
1053 :
1054 15165 : let content = modification
1055 15165 : .tline
1056 15165 : .get_rel_page_at_lsn(
1057 15165 : src_rel,
1058 15165 : blknum,
1059 15165 : Version::Modified(modification),
1060 15165 : true,
1061 15165 : ctx,
1062 15165 : )
1063 1351 : .await?;
1064 15165 : modification.put_rel_page_image(dst_rel, blknum, content)?;
1065 15165 : num_blocks_copied += 1;
1066 : }
1067 :
1068 7008 : num_rels_copied += 1;
1069 : }
1070 :
1071 24 : info!(
1072 24 : "Created database {}/{}, copied {} blocks in {} rels",
1073 24 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
1074 24 : );
1075 24 : Ok(())
1076 24 : }
1077 :
1078 75378 : async fn ingest_xlog_smgr_create(
1079 75378 : &mut self,
1080 75378 : modification: &mut DatadirModification<'_>,
1081 75378 : rec: &XlSmgrCreate,
1082 75378 : ctx: &RequestContext,
1083 75378 : ) -> anyhow::Result<()> {
1084 75378 : let rel = RelTag {
1085 75378 : spcnode: rec.rnode.spcnode,
1086 75378 : dbnode: rec.rnode.dbnode,
1087 75378 : relnode: rec.rnode.relnode,
1088 75378 : forknum: rec.forknum,
1089 75378 : };
1090 75378 : self.put_rel_creation(modification, rel, ctx).await?;
1091 75378 : Ok(())
1092 75378 : }
1093 :
1094 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1095 : ///
1096 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1097 225 : async fn ingest_xlog_smgr_truncate(
1098 225 : &mut self,
1099 225 : modification: &mut DatadirModification<'_>,
1100 225 : rec: &XlSmgrTruncate,
1101 225 : ctx: &RequestContext,
1102 225 : ) -> anyhow::Result<()> {
1103 225 : let spcnode = rec.rnode.spcnode;
1104 225 : let dbnode = rec.rnode.dbnode;
1105 225 : let relnode = rec.rnode.relnode;
1106 225 :
1107 225 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
1108 225 : let rel = RelTag {
1109 225 : spcnode,
1110 225 : dbnode,
1111 225 : relnode,
1112 225 : forknum: MAIN_FORKNUM,
1113 225 : };
1114 225 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
1115 2 : .await?;
1116 0 : }
1117 225 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
1118 225 : let rel = RelTag {
1119 225 : spcnode,
1120 225 : dbnode,
1121 225 : relnode,
1122 225 : forknum: FSM_FORKNUM,
1123 225 : };
1124 225 :
1125 225 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1126 225 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1127 225 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1128 : // Tail of last remaining FSM page has to be zeroed.
1129 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1130 108 : modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
1131 108 : fsm_physical_page_no += 1;
1132 117 : }
1133 225 : let nblocks = get_relsize(modification, rel, ctx).await?;
1134 225 : if nblocks > fsm_physical_page_no {
1135 : // check if something to do: FSM is larger than truncate position
1136 61 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1137 0 : .await?;
1138 164 : }
1139 0 : }
1140 225 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
1141 225 : let rel = RelTag {
1142 225 : spcnode,
1143 225 : dbnode,
1144 225 : relnode,
1145 225 : forknum: VISIBILITYMAP_FORKNUM,
1146 225 : };
1147 225 :
1148 225 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1149 225 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1150 : // Tail of last remaining vm page has to be zeroed.
1151 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1152 108 : modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
1153 108 : vm_page_no += 1;
1154 117 : }
1155 225 : let nblocks = get_relsize(modification, rel, ctx).await?;
1156 225 : if nblocks > vm_page_no {
1157 : // check if something to do: VM is larger than truncate position
1158 61 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1159 0 : .await?;
1160 164 : }
1161 0 : }
1162 225 : Ok(())
1163 225 : }
1164 :
1165 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1166 : ///
1167 6132817 : async fn ingest_xact_record(
1168 6132817 : &mut self,
1169 6132817 : modification: &mut DatadirModification<'_>,
1170 6132817 : parsed: &XlXactParsedRecord,
1171 6132817 : is_commit: bool,
1172 6132817 : ctx: &RequestContext,
1173 6132817 : ) -> anyhow::Result<()> {
1174 6132817 : // Record update of CLOG pages
1175 6132817 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1176 6132817 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1177 6132817 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1178 6132817 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1179 :
1180 6243062 : for subxact in &parsed.subxacts {
1181 110245 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1182 110245 : if subxact_pageno != pageno {
1183 : // This subxact goes to different page. Write the record
1184 : // for all the XIDs on the previous page, and continue
1185 : // accumulating XIDs on this new page.
1186 4 : modification.put_slru_wal_record(
1187 4 : SlruKind::Clog,
1188 4 : segno,
1189 4 : rpageno,
1190 4 : if is_commit {
1191 4 : NeonWalRecord::ClogSetCommitted {
1192 4 : xids: page_xids,
1193 4 : timestamp: parsed.xact_time,
1194 4 : }
1195 : } else {
1196 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1197 : },
1198 0 : )?;
1199 4 : page_xids = Vec::new();
1200 110241 : }
1201 110245 : pageno = subxact_pageno;
1202 110245 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1203 110245 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1204 110245 : page_xids.push(*subxact);
1205 : }
1206 6132817 : modification.put_slru_wal_record(
1207 6132817 : SlruKind::Clog,
1208 6132817 : segno,
1209 6132817 : rpageno,
1210 6132817 : if is_commit {
1211 6123947 : NeonWalRecord::ClogSetCommitted {
1212 6123947 : xids: page_xids,
1213 6123947 : timestamp: parsed.xact_time,
1214 6123947 : }
1215 : } else {
1216 8870 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1217 : },
1218 0 : )?;
1219 :
1220 6198253 : for xnode in &parsed.xnodes {
1221 327180 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1222 261744 : let rel = RelTag {
1223 261744 : forknum,
1224 261744 : spcnode: xnode.spcnode,
1225 261744 : dbnode: xnode.dbnode,
1226 261744 : relnode: xnode.relnode,
1227 261744 : };
1228 261744 : if modification
1229 261744 : .tline
1230 261744 : .get_rel_exists(rel, Version::Modified(modification), true, ctx)
1231 1 : .await?
1232 : {
1233 67298 : self.put_rel_drop(modification, rel, ctx).await?;
1234 194446 : }
1235 : }
1236 : }
1237 6132817 : Ok(())
1238 6132817 : }
1239 :
1240 3 : async fn ingest_clog_truncate_record(
1241 3 : &mut self,
1242 3 : modification: &mut DatadirModification<'_>,
1243 3 : xlrec: &XlClogTruncate,
1244 3 : ctx: &RequestContext,
1245 3 : ) -> anyhow::Result<()> {
1246 3 : info!(
1247 3 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1248 3 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
1249 3 : );
1250 :
1251 : // Here we treat oldestXid and oldestXidDB
1252 : // differently from postgres redo routines.
1253 : // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
1254 : // until checkpoint happens and updates the value.
1255 : // Here we can use the most recent value.
1256 : // It's just an optimization, though and can be deleted.
1257 : // TODO Figure out if there will be any issues with replica.
1258 3 : self.checkpoint.oldestXid = xlrec.oldest_xid;
1259 3 : self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
1260 3 : self.checkpoint_modified = true;
1261 3 :
1262 3 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1263 3 :
1264 3 : let latest_page_number =
1265 3 : self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
1266 3 :
1267 3 : // Now delete all segments containing pages between xlrec.pageno
1268 3 : // and latest_page_number.
1269 3 :
1270 3 : // First, make an important safety check:
1271 3 : // the current endpoint page must not be eligible for removal.
1272 3 : // See SimpleLruTruncate() in slru.c
1273 3 : if clogpage_precedes(latest_page_number, xlrec.pageno) {
1274 0 : info!("could not truncate directory pg_xact apparent wraparound");
1275 0 : return Ok(());
1276 3 : }
1277 :
1278 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1279 : //
1280 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1281 : // will block waiting for the last valid LSN to advance up to
1282 : // it. So we use the previous record's LSN in the get calls
1283 : // instead.
1284 23 : for segno in modification
1285 3 : .tline
1286 3 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
1287 0 : .await?
1288 : {
1289 23 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1290 23 : if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
1291 18 : modification
1292 18 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1293 0 : .await?;
1294 0 : trace!("Drop CLOG segment {:>04X}", segno);
1295 5 : }
1296 : }
1297 :
1298 3 : Ok(())
1299 3 : }
1300 :
1301 24816 : fn ingest_multixact_create_record(
1302 24816 : &mut self,
1303 24816 : modification: &mut DatadirModification,
1304 24816 : xlrec: &XlMultiXactCreate,
1305 24816 : ) -> Result<()> {
1306 24816 : // Create WAL record for updating the multixact-offsets page
1307 24816 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1308 24816 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1309 24816 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1310 24816 :
1311 24816 : modification.put_slru_wal_record(
1312 24816 : SlruKind::MultiXactOffsets,
1313 24816 : segno,
1314 24816 : rpageno,
1315 24816 : NeonWalRecord::MultixactOffsetCreate {
1316 24816 : mid: xlrec.mid,
1317 24816 : moff: xlrec.moff,
1318 24816 : },
1319 24816 : )?;
1320 :
1321 : // Create WAL records for the update of each affected multixact-members page
1322 24816 : let mut members = xlrec.members.iter();
1323 24816 : let mut offset = xlrec.moff;
1324 : loop {
1325 49906 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1326 49906 :
1327 49906 : // How many members fit on this page?
1328 49906 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1329 49906 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1330 49906 :
1331 49906 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1332 49906 : for _ in 0..page_remain {
1333 524292 : if let Some(m) = members.next() {
1334 474674 : this_page_members.push(m.clone());
1335 474674 : } else {
1336 49618 : break;
1337 : }
1338 : }
1339 49906 : if this_page_members.is_empty() {
1340 : // all done
1341 24816 : break;
1342 25090 : }
1343 25090 : let n_this_page = this_page_members.len();
1344 25090 :
1345 25090 : modification.put_slru_wal_record(
1346 25090 : SlruKind::MultiXactMembers,
1347 25090 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1348 25090 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1349 25090 : NeonWalRecord::MultixactMembersCreate {
1350 25090 : moff: offset,
1351 25090 : members: this_page_members,
1352 25090 : },
1353 25090 : )?;
1354 :
1355 : // Note: The multixact members can wrap around, even within one WAL record.
1356 25090 : offset = offset.wrapping_add(n_this_page as u32);
1357 : }
1358 24816 : if xlrec.mid >= self.checkpoint.nextMulti {
1359 24816 : self.checkpoint.nextMulti = xlrec.mid + 1;
1360 24816 : self.checkpoint_modified = true;
1361 24816 : }
1362 24816 : if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
1363 24816 : self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
1364 24816 : self.checkpoint_modified = true;
1365 24816 : }
1366 474674 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1367 474674 : if let Some(max_xid) = acc {
1368 449858 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1369 449633 : Some(mbr.xid)
1370 : } else {
1371 225 : acc
1372 : }
1373 : } else {
1374 24816 : Some(mbr.xid)
1375 : }
1376 474674 : });
1377 :
1378 24816 : if let Some(max_xid) = max_mbr_xid {
1379 24816 : if self.checkpoint.update_next_xid(max_xid) {
1380 0 : self.checkpoint_modified = true;
1381 24816 : }
1382 0 : }
1383 24816 : Ok(())
1384 24816 : }
1385 :
1386 0 : async fn ingest_multixact_truncate_record(
1387 0 : &mut self,
1388 0 : modification: &mut DatadirModification<'_>,
1389 0 : xlrec: &XlMultiXactTruncate,
1390 0 : ctx: &RequestContext,
1391 0 : ) -> Result<()> {
1392 0 : self.checkpoint.oldestMulti = xlrec.end_trunc_off;
1393 0 : self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
1394 0 : self.checkpoint_modified = true;
1395 0 :
1396 0 : // PerformMembersTruncation
1397 0 : let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
1398 0 : let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
1399 0 : let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
1400 0 : let mut segment: i32 = startsegment;
1401 :
1402 : // Delete all the segments except the last one. The last segment can still
1403 : // contain, possibly partially, valid data.
1404 0 : while segment != endsegment {
1405 0 : modification
1406 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1407 0 : .await?;
1408 :
1409 : /* move to next segment, handling wraparound correctly */
1410 0 : if segment == maxsegment {
1411 0 : segment = 0;
1412 0 : } else {
1413 0 : segment += 1;
1414 0 : }
1415 : }
1416 :
1417 : // Truncate offsets
1418 : // FIXME: this did not handle wraparound correctly
1419 :
1420 0 : Ok(())
1421 0 : }
1422 :
1423 62 : async fn ingest_relmap_page(
1424 62 : &mut self,
1425 62 : modification: &mut DatadirModification<'_>,
1426 62 : xlrec: &XlRelmapUpdate,
1427 62 : decoded: &DecodedWALRecord,
1428 62 : ctx: &RequestContext,
1429 62 : ) -> Result<()> {
1430 62 : let mut buf = decoded.record.clone();
1431 62 : buf.advance(decoded.main_data_offset);
1432 62 : // skip xl_relmap_update
1433 62 : buf.advance(12);
1434 62 :
1435 62 : modification
1436 62 : .put_relmap_file(
1437 62 : xlrec.tsid,
1438 62 : xlrec.dbid,
1439 62 : Bytes::copy_from_slice(&buf[..]),
1440 62 : ctx,
1441 62 : )
1442 2 : .await
1443 62 : }
1444 :
1445 75380 : async fn put_rel_creation(
1446 75380 : &mut self,
1447 75380 : modification: &mut DatadirModification<'_>,
1448 75380 : rel: RelTag,
1449 75380 : ctx: &RequestContext,
1450 75380 : ) -> Result<()> {
1451 75380 : modification.put_rel_creation(rel, 0, ctx).await?;
1452 75380 : Ok(())
1453 75380 : }
1454 :
1455 440078 : async fn put_rel_page_image(
1456 440078 : &mut self,
1457 440078 : modification: &mut DatadirModification<'_>,
1458 440078 : rel: RelTag,
1459 440078 : blknum: BlockNumber,
1460 440078 : img: Bytes,
1461 440078 : ctx: &RequestContext,
1462 440078 : ) -> Result<(), PageReconstructError> {
1463 440078 : self.handle_rel_extend(modification, rel, blknum, ctx)
1464 6747 : .await?;
1465 440078 : modification.put_rel_page_image(rel, blknum, img)?;
1466 440078 : Ok(())
1467 440078 : }
1468 :
1469 53007400 : async fn put_rel_wal_record(
1470 53007400 : &mut self,
1471 53007400 : modification: &mut DatadirModification<'_>,
1472 53007400 : rel: RelTag,
1473 53007400 : blknum: BlockNumber,
1474 53007400 : rec: NeonWalRecord,
1475 53007400 : ctx: &RequestContext,
1476 53007416 : ) -> Result<()> {
1477 53007416 : self.handle_rel_extend(modification, rel, blknum, ctx)
1478 6970 : .await?;
1479 53007413 : modification.put_rel_wal_record(rel, blknum, rec)?;
1480 53007413 : Ok(())
1481 53007416 : }
1482 :
1483 6359 : async fn put_rel_truncation(
1484 6359 : &mut self,
1485 6359 : modification: &mut DatadirModification<'_>,
1486 6359 : rel: RelTag,
1487 6359 : nblocks: BlockNumber,
1488 6359 : ctx: &RequestContext,
1489 6359 : ) -> anyhow::Result<()> {
1490 6359 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1491 6359 : Ok(())
1492 6359 : }
1493 :
1494 67300 : async fn put_rel_drop(
1495 67300 : &mut self,
1496 67300 : modification: &mut DatadirModification<'_>,
1497 67300 : rel: RelTag,
1498 67300 : ctx: &RequestContext,
1499 67300 : ) -> Result<()> {
1500 67300 : modification.put_rel_drop(rel, ctx).await?;
1501 67300 : Ok(())
1502 67300 : }
1503 :
1504 56967514 : async fn handle_rel_extend(
1505 56967514 : &mut self,
1506 56967514 : modification: &mut DatadirModification<'_>,
1507 56967514 : rel: RelTag,
1508 56967514 : blknum: BlockNumber,
1509 56967514 : ctx: &RequestContext,
1510 56967530 : ) -> Result<(), PageReconstructError> {
1511 56967530 : let new_nblocks = blknum + 1;
1512 : // Check if the relation exists. We implicitly create relations on first
1513 : // record.
1514 : // TODO: would be nice if to be more explicit about it
1515 :
1516 : // Get current size and put rel creation if rel doesn't exist
1517 : //
1518 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1519 : // check the cache too. This is because eagerly checking the cache results in
1520 : // less work overall and 10% better performance. It's more work on cache miss
1521 : // but cache miss is rare.
1522 56967530 : let old_nblocks = if let Some(nblocks) = modification
1523 56967530 : .tline
1524 56967530 : .get_cached_rel_size(&rel, modification.get_lsn())
1525 : {
1526 56963032 : nblocks
1527 4498 : } else if !modification
1528 4498 : .tline
1529 4498 : .get_rel_exists(rel, Version::Modified(modification), true, ctx)
1530 58 : .await?
1531 : {
1532 : // create it with 0 size initially, the logic below will extend it
1533 3656 : modification
1534 3656 : .put_rel_creation(rel, 0, ctx)
1535 357 : .await
1536 3656 : .context("Relation Error")?;
1537 3656 : 0
1538 : } else {
1539 842 : modification
1540 842 : .tline
1541 842 : .get_rel_size(rel, Version::Modified(modification), true, ctx)
1542 152 : .await?
1543 : };
1544 :
1545 56967530 : if new_nblocks > old_nblocks {
1546 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1547 1264419 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1548 :
1549 1264416 : let mut key = rel_block_to_key(rel, blknum);
1550 : // fill the gap with zeros
1551 1264416 : for gap_blknum in old_nblocks..blknum {
1552 236715 : key.field6 = gap_blknum;
1553 236715 :
1554 236715 : if self.shard.get_shard_number(&key) != self.shard.number {
1555 1359 : continue;
1556 235356 : }
1557 235356 :
1558 235356 : modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
1559 : }
1560 55703111 : }
1561 56967527 : Ok(())
1562 56967530 : }
1563 :
1564 1646 : async fn put_slru_page_image(
1565 1646 : &mut self,
1566 1646 : modification: &mut DatadirModification<'_>,
1567 1646 : kind: SlruKind,
1568 1646 : segno: u32,
1569 1646 : blknum: BlockNumber,
1570 1646 : img: Bytes,
1571 1646 : ctx: &RequestContext,
1572 1646 : ) -> Result<()> {
1573 1646 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1574 115 : .await?;
1575 1646 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1576 1646 : Ok(())
1577 1646 : }
1578 :
1579 1646 : async fn handle_slru_extend(
1580 1646 : &mut self,
1581 1646 : modification: &mut DatadirModification<'_>,
1582 1646 : kind: SlruKind,
1583 1646 : segno: u32,
1584 1646 : blknum: BlockNumber,
1585 1646 : ctx: &RequestContext,
1586 1646 : ) -> anyhow::Result<()> {
1587 1646 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1588 1646 : // extended with ZEROPAGE records, not with commit records, so it happens
1589 1646 : // a lot less frequently.
1590 1646 :
1591 1646 : let new_nblocks = blknum + 1;
1592 : // Check if the relation exists. We implicitly create relations on first
1593 : // record.
1594 : // TODO: would be nice if to be more explicit about it
1595 1646 : let old_nblocks = if !modification
1596 1646 : .tline
1597 1646 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1598 97 : .await?
1599 : {
1600 : // create it with 0 size initially, the logic below will extend it
1601 44 : modification
1602 44 : .put_slru_segment_creation(kind, segno, 0, ctx)
1603 3 : .await?;
1604 44 : 0
1605 : } else {
1606 1602 : modification
1607 1602 : .tline
1608 1602 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1609 15 : .await?
1610 : };
1611 :
1612 1646 : if new_nblocks > old_nblocks {
1613 0 : trace!(
1614 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1615 0 : kind,
1616 0 : segno,
1617 0 : old_nblocks,
1618 0 : new_nblocks
1619 0 : );
1620 1622 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1621 :
1622 : // fill the gap with zeros
1623 1622 : for gap_blknum in old_nblocks..blknum {
1624 0 : modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
1625 : }
1626 24 : }
1627 1646 : Ok(())
1628 1646 : }
1629 : }
1630 :
1631 95571 : async fn get_relsize(
1632 95571 : modification: &DatadirModification<'_>,
1633 95571 : rel: RelTag,
1634 95571 : ctx: &RequestContext,
1635 95571 : ) -> anyhow::Result<BlockNumber> {
1636 95571 : let nblocks = if !modification
1637 95571 : .tline
1638 95571 : .get_rel_exists(rel, Version::Modified(modification), true, ctx)
1639 50 : .await?
1640 : {
1641 2654 : 0
1642 : } else {
1643 92917 : modification
1644 92917 : .tline
1645 92917 : .get_rel_size(rel, Version::Modified(modification), true, ctx)
1646 3 : .await?
1647 : };
1648 95571 : Ok(nblocks)
1649 95571 : }
1650 :
1651 : #[allow(clippy::bool_assert_comparison)]
1652 : #[cfg(test)]
1653 : mod tests {
1654 : use super::*;
1655 : use crate::tenant::harness::*;
1656 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
1657 : use crate::tenant::Timeline;
1658 : use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
1659 : use postgres_ffi::RELSEG_SIZE;
1660 :
1661 : use crate::DEFAULT_PG_VERSION;
1662 :
1663 : /// Arbitrary relation tag, for testing.
1664 : const TESTREL_A: RelTag = RelTag {
1665 : spcnode: 0,
1666 : dbnode: 111,
1667 : relnode: 1000,
1668 : forknum: 0,
1669 : };
1670 :
1671 12 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1672 12 : // TODO
1673 12 : }
1674 :
1675 : static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
1676 :
1677 8 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1678 8 : let mut m = tline.begin_modification(Lsn(0x10));
1679 8 : m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1680 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1681 8 : m.commit(ctx).await?;
1682 8 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1683 :
1684 8 : Ok(walingest)
1685 8 : }
1686 :
1687 2 : #[tokio::test]
1688 2 : async fn test_relsize() -> Result<()> {
1689 2 : let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
1690 2 : let tline = tenant
1691 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1692 6 : .await?;
1693 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1694 2 :
1695 2 : let mut m = tline.begin_modification(Lsn(0x20));
1696 2 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1697 2 : walingest
1698 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1699 2 : .await?;
1700 2 : m.commit(&ctx).await?;
1701 2 : let mut m = tline.begin_modification(Lsn(0x30));
1702 2 : walingest
1703 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
1704 2 : .await?;
1705 2 : m.commit(&ctx).await?;
1706 2 : let mut m = tline.begin_modification(Lsn(0x40));
1707 2 : walingest
1708 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
1709 2 : .await?;
1710 2 : m.commit(&ctx).await?;
1711 2 : let mut m = tline.begin_modification(Lsn(0x50));
1712 2 : walingest
1713 2 : .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
1714 2 : .await?;
1715 2 : m.commit(&ctx).await?;
1716 2 :
1717 2 : assert_current_logical_size(&tline, Lsn(0x50));
1718 2 :
1719 2 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1720 2 : assert_eq!(
1721 2 : tline
1722 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
1723 2 : .await?,
1724 2 : false
1725 2 : );
1726 2 : assert!(tline
1727 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
1728 2 : .await
1729 2 : .is_err());
1730 2 : assert_eq!(
1731 2 : tline
1732 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1733 2 : .await?,
1734 2 : true
1735 2 : );
1736 2 : assert_eq!(
1737 2 : tline
1738 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1739 2 : .await?,
1740 2 : 1
1741 2 : );
1742 2 : assert_eq!(
1743 2 : tline
1744 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
1745 2 : .await?,
1746 2 : 3
1747 2 : );
1748 2 :
1749 2 : // Check page contents at each LSN
1750 2 : assert_eq!(
1751 2 : tline
1752 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
1753 2 : .await?,
1754 2 : TEST_IMG("foo blk 0 at 2")
1755 2 : );
1756 2 :
1757 2 : assert_eq!(
1758 2 : tline
1759 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
1760 2 : .await?,
1761 2 : TEST_IMG("foo blk 0 at 3")
1762 2 : );
1763 2 :
1764 2 : assert_eq!(
1765 2 : tline
1766 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
1767 2 : .await?,
1768 2 : TEST_IMG("foo blk 0 at 3")
1769 2 : );
1770 2 : assert_eq!(
1771 2 : tline
1772 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
1773 2 : .await?,
1774 2 : TEST_IMG("foo blk 1 at 4")
1775 2 : );
1776 2 :
1777 2 : assert_eq!(
1778 2 : tline
1779 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
1780 2 : .await?,
1781 2 : TEST_IMG("foo blk 0 at 3")
1782 2 : );
1783 2 : assert_eq!(
1784 2 : tline
1785 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
1786 2 : .await?,
1787 2 : TEST_IMG("foo blk 1 at 4")
1788 2 : );
1789 2 : assert_eq!(
1790 2 : tline
1791 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
1792 2 : .await?,
1793 2 : TEST_IMG("foo blk 2 at 5")
1794 2 : );
1795 2 :
1796 2 : // Truncate last block
1797 2 : let mut m = tline.begin_modification(Lsn(0x60));
1798 2 : walingest
1799 2 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1800 2 : .await?;
1801 2 : m.commit(&ctx).await?;
1802 2 : assert_current_logical_size(&tline, Lsn(0x60));
1803 2 :
1804 2 : // Check reported size and contents after truncation
1805 2 : assert_eq!(
1806 2 : tline
1807 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
1808 2 : .await?,
1809 2 : 2
1810 2 : );
1811 2 : assert_eq!(
1812 2 : tline
1813 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
1814 2 : .await?,
1815 2 : TEST_IMG("foo blk 0 at 3")
1816 2 : );
1817 2 : assert_eq!(
1818 2 : tline
1819 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
1820 2 : .await?,
1821 2 : TEST_IMG("foo blk 1 at 4")
1822 2 : );
1823 2 :
1824 2 : // should still see the truncated block with older LSN
1825 2 : assert_eq!(
1826 2 : tline
1827 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
1828 2 : .await?,
1829 2 : 3
1830 2 : );
1831 2 : assert_eq!(
1832 2 : tline
1833 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
1834 2 : .await?,
1835 2 : TEST_IMG("foo blk 2 at 5")
1836 2 : );
1837 2 :
1838 2 : // Truncate to zero length
1839 2 : let mut m = tline.begin_modification(Lsn(0x68));
1840 2 : walingest
1841 2 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1842 2 : .await?;
1843 2 : m.commit(&ctx).await?;
1844 2 : assert_eq!(
1845 2 : tline
1846 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
1847 2 : .await?,
1848 2 : 0
1849 2 : );
1850 2 :
1851 2 : // Extend from 0 to 2 blocks, leaving a gap
1852 2 : let mut m = tline.begin_modification(Lsn(0x70));
1853 2 : walingest
1854 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
1855 2 : .await?;
1856 2 : m.commit(&ctx).await?;
1857 2 : assert_eq!(
1858 2 : tline
1859 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
1860 2 : .await?,
1861 2 : 2
1862 2 : );
1863 2 : assert_eq!(
1864 2 : tline
1865 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
1866 2 : .await?,
1867 2 : ZERO_PAGE
1868 2 : );
1869 2 : assert_eq!(
1870 2 : tline
1871 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
1872 2 : .await?,
1873 2 : TEST_IMG("foo blk 1")
1874 2 : );
1875 2 :
1876 2 : // Extend a lot more, leaving a big gap that spans across segments
1877 2 : let mut m = tline.begin_modification(Lsn(0x80));
1878 2 : walingest
1879 2 : .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
1880 2 : .await?;
1881 71 : m.commit(&ctx).await?;
1882 2 : assert_eq!(
1883 2 : tline
1884 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
1885 2 : .await?,
1886 2 : 1501
1887 2 : );
1888 2998 : for blk in 2..1500 {
1889 2996 : assert_eq!(
1890 2996 : tline
1891 2996 : .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
1892 3037 : .await?,
1893 2996 : ZERO_PAGE
1894 2 : );
1895 2 : }
1896 2 : assert_eq!(
1897 2 : tline
1898 2 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
1899 2 : .await?,
1900 2 : TEST_IMG("foo blk 1500")
1901 2 : );
1902 2 :
1903 2 : Ok(())
1904 2 : }
1905 :
1906 : // Test what happens if we dropped a relation
1907 : // and then created it again within the same layer.
1908 2 : #[tokio::test]
1909 2 : async fn test_drop_extend() -> Result<()> {
1910 2 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
1911 2 : let tline = tenant
1912 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1913 6 : .await?;
1914 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1915 2 :
1916 2 : let mut m = tline.begin_modification(Lsn(0x20));
1917 2 : walingest
1918 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1919 2 : .await?;
1920 2 : m.commit(&ctx).await?;
1921 2 :
1922 2 : // Check that rel exists and size is correct
1923 2 : assert_eq!(
1924 2 : tline
1925 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1926 2 : .await?,
1927 2 : true
1928 2 : );
1929 2 : assert_eq!(
1930 2 : tline
1931 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1932 2 : .await?,
1933 2 : 1
1934 2 : );
1935 2 :
1936 2 : // Drop rel
1937 2 : let mut m = tline.begin_modification(Lsn(0x30));
1938 2 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
1939 2 : m.commit(&ctx).await?;
1940 2 :
1941 2 : // Check that rel is not visible anymore
1942 2 : assert_eq!(
1943 2 : tline
1944 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
1945 2 : .await?,
1946 2 : false
1947 2 : );
1948 2 :
1949 2 : // FIXME: should fail
1950 2 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
1951 2 :
1952 2 : // Re-create it
1953 2 : let mut m = tline.begin_modification(Lsn(0x40));
1954 2 : walingest
1955 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
1956 2 : .await?;
1957 2 : m.commit(&ctx).await?;
1958 2 :
1959 2 : // Check that rel exists and size is correct
1960 2 : assert_eq!(
1961 2 : tline
1962 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
1963 2 : .await?,
1964 2 : true
1965 2 : );
1966 2 : assert_eq!(
1967 2 : tline
1968 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
1969 2 : .await?,
1970 2 : 1
1971 2 : );
1972 2 :
1973 2 : Ok(())
1974 2 : }
1975 :
1976 : // Test what happens if we truncated a relation
1977 : // so that one of its segments was dropped
1978 : // and then extended it again within the same layer.
1979 2 : #[tokio::test]
1980 2 : async fn test_truncate_extend() -> Result<()> {
1981 2 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
1982 2 : let tline = tenant
1983 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1984 6 : .await?;
1985 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1986 2 :
1987 2 : // Create a 20 MB relation (the size is arbitrary)
1988 2 : let relsize = 20 * 1024 * 1024 / 8192;
1989 2 : let mut m = tline.begin_modification(Lsn(0x20));
1990 5120 : for blkno in 0..relsize {
1991 5120 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
1992 5120 : walingest
1993 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
1994 2 : .await?;
1995 2 : }
1996 2 : m.commit(&ctx).await?;
1997 2 :
1998 2 : // The relation was created at LSN 20, not visible at LSN 1 yet.
1999 2 : assert_eq!(
2000 2 : tline
2001 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
2002 2 : .await?,
2003 2 : false
2004 2 : );
2005 2 : assert!(tline
2006 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
2007 2 : .await
2008 2 : .is_err());
2009 2 :
2010 2 : assert_eq!(
2011 2 : tline
2012 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
2013 2 : .await?,
2014 2 : true
2015 2 : );
2016 2 : assert_eq!(
2017 2 : tline
2018 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
2019 2 : .await?,
2020 2 : relsize
2021 2 : );
2022 2 :
2023 2 : // Check relation content
2024 5120 : for blkno in 0..relsize {
2025 5120 : let lsn = Lsn(0x20);
2026 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2027 5120 : assert_eq!(
2028 5120 : tline
2029 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
2030 201 : .await?,
2031 5120 : TEST_IMG(&data)
2032 2 : );
2033 2 : }
2034 2 :
2035 2 : // Truncate relation so that second segment was dropped
2036 2 : // - only leave one page
2037 2 : let mut m = tline.begin_modification(Lsn(0x60));
2038 2 : walingest
2039 2 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2040 2 : .await?;
2041 2 : m.commit(&ctx).await?;
2042 2 :
2043 2 : // Check reported size and contents after truncation
2044 2 : assert_eq!(
2045 2 : tline
2046 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
2047 2 : .await?,
2048 2 : 1
2049 2 : );
2050 2 :
2051 4 : for blkno in 0..1 {
2052 2 : let lsn = Lsn(0x20);
2053 2 : let data = format!("foo blk {} at {}", blkno, lsn);
2054 2 : assert_eq!(
2055 2 : tline
2056 2 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
2057 2 : .await?,
2058 2 : TEST_IMG(&data)
2059 2 : );
2060 2 : }
2061 2 :
2062 2 : // should still see all blocks with older LSN
2063 2 : assert_eq!(
2064 2 : tline
2065 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
2066 2 : .await?,
2067 2 : relsize
2068 2 : );
2069 5120 : for blkno in 0..relsize {
2070 5120 : let lsn = Lsn(0x20);
2071 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2072 5120 : assert_eq!(
2073 5120 : tline
2074 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
2075 400 : .await?,
2076 5120 : TEST_IMG(&data)
2077 2 : );
2078 2 : }
2079 2 :
2080 2 : // Extend relation again.
2081 2 : // Add enough blocks to create second segment
2082 2 : let lsn = Lsn(0x80);
2083 2 : let mut m = tline.begin_modification(lsn);
2084 5120 : for blkno in 0..relsize {
2085 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2086 5120 : walingest
2087 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
2088 2 : .await?;
2089 2 : }
2090 2 : m.commit(&ctx).await?;
2091 2 :
2092 2 : assert_eq!(
2093 2 : tline
2094 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
2095 2 : .await?,
2096 2 : true
2097 2 : );
2098 2 : assert_eq!(
2099 2 : tline
2100 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
2101 2 : .await?,
2102 2 : relsize
2103 2 : );
2104 2 : // Check relation content
2105 5120 : for blkno in 0..relsize {
2106 5120 : let lsn = Lsn(0x80);
2107 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2108 5120 : assert_eq!(
2109 5120 : tline
2110 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
2111 201 : .await?,
2112 5120 : TEST_IMG(&data)
2113 2 : );
2114 2 : }
2115 2 :
2116 2 : Ok(())
2117 2 : }
2118 :
2119 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2120 : /// split into multiple 1 GB segments in Postgres.
2121 2 : #[tokio::test]
2122 2 : async fn test_large_rel() -> Result<()> {
2123 2 : let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
2124 2 : let tline = tenant
2125 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2126 7 : .await?;
2127 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2128 2 :
2129 2 : let mut lsn = 0x10;
2130 262146 : for blknum in 0..RELSEG_SIZE + 1 {
2131 262146 : lsn += 0x10;
2132 262146 : let mut m = tline.begin_modification(Lsn(lsn));
2133 262146 : let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2134 262146 : walingest
2135 262146 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2136 6182 : .await?;
2137 262146 : m.commit(&ctx).await?;
2138 2 : }
2139 2 :
2140 2 : assert_current_logical_size(&tline, Lsn(lsn));
2141 2 :
2142 2 : assert_eq!(
2143 2 : tline
2144 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
2145 2 : .await?,
2146 2 : RELSEG_SIZE + 1
2147 2 : );
2148 2 :
2149 2 : // Truncate one block
2150 2 : lsn += 0x10;
2151 2 : let mut m = tline.begin_modification(Lsn(lsn));
2152 2 : walingest
2153 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2154 2 : .await?;
2155 2 : m.commit(&ctx).await?;
2156 2 : assert_eq!(
2157 2 : tline
2158 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
2159 2 : .await?,
2160 2 : RELSEG_SIZE
2161 2 : );
2162 2 : assert_current_logical_size(&tline, Lsn(lsn));
2163 2 :
2164 2 : // Truncate another block
2165 2 : lsn += 0x10;
2166 2 : let mut m = tline.begin_modification(Lsn(lsn));
2167 2 : walingest
2168 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2169 2 : .await?;
2170 2 : m.commit(&ctx).await?;
2171 2 : assert_eq!(
2172 2 : tline
2173 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
2174 2 : .await?,
2175 2 : RELSEG_SIZE - 1
2176 2 : );
2177 2 : assert_current_logical_size(&tline, Lsn(lsn));
2178 2 :
2179 2 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2180 2 : // This tests the behavior at segment boundaries
2181 2 : let mut size: i32 = 3000;
2182 6004 : while size >= 0 {
2183 6002 : lsn += 0x10;
2184 6002 : let mut m = tline.begin_modification(Lsn(lsn));
2185 6002 : walingest
2186 6002 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2187 142 : .await?;
2188 6002 : m.commit(&ctx).await?;
2189 6002 : assert_eq!(
2190 6002 : tline
2191 6002 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
2192 2 : .await?,
2193 6002 : size as BlockNumber
2194 2 : );
2195 2 :
2196 6002 : size -= 1;
2197 2 : }
2198 2 : assert_current_logical_size(&tline, Lsn(lsn));
2199 2 :
2200 2 : Ok(())
2201 2 : }
2202 :
2203 : /// Replay a wal segment file taken directly from safekeepers.
2204 : ///
2205 : /// This test is useful for benchmarking since it allows us to profile only
2206 : /// the walingest code in a single-threaded executor, and iterate more quickly
2207 : /// without waiting for unrelated steps.
2208 2 : #[tokio::test]
2209 2 : async fn test_ingest_real_wal() {
2210 2 : use crate::tenant::harness::*;
2211 2 : use postgres_ffi::waldecoder::WalStreamDecoder;
2212 2 : use postgres_ffi::WAL_SEGMENT_SIZE;
2213 2 :
2214 2 : // Define test data path and constants.
2215 2 : //
2216 2 : // Steps to reconstruct the data, if needed:
2217 2 : // 1. Run the pgbench python test
2218 2 : // 2. Take the first wal segment file from safekeeper
2219 2 : // 3. Compress it using `zstd --long input_file`
2220 2 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2221 2 : // 5. Grep sk logs for "restart decoder" to get startpoint
2222 2 : // 6. Run just the decoder from this test to get the endpoint.
2223 2 : // It's the last LSN the decoder will output.
2224 2 : let pg_version = 15; // The test data was generated by pg15
2225 2 : let path = "test_data/sk_wal_segment_from_pgbench";
2226 2 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2227 2 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2228 2 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2229 2 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2230 2 :
2231 2 : let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
2232 2 : let (tenant, ctx) = harness.load().await;
2233 2 :
2234 2 : let remote_initdb_path =
2235 2 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2236 2 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2237 2 :
2238 2 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2239 2 : .expect("creating test dir should work");
2240 2 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2241 2 :
2242 2 : // Bootstrap a real timeline. We can't use create_test_timeline because
2243 2 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2244 2 : // the garbage data.
2245 2 : let tline = tenant
2246 2 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2247 20533 : .await
2248 2 : .unwrap();
2249 2 :
2250 2 : // We fully read and decompress this into memory before decoding
2251 2 : // to get a more accurate perf profile of the decoder.
2252 2 : let bytes = {
2253 2 : use async_compression::tokio::bufread::ZstdDecoder;
2254 2 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2255 2 : let reader = tokio::io::BufReader::new(file);
2256 2 : let decoder = ZstdDecoder::new(reader);
2257 2 : let mut reader = tokio::io::BufReader::new(decoder);
2258 2 : let mut buffer = Vec::new();
2259 224 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2260 2 : buffer
2261 2 : };
2262 2 :
2263 2 : // TODO start a profiler too
2264 2 : let started_at = std::time::Instant::now();
2265 2 :
2266 2 : // Initialize walingest
2267 2 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2268 2 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2269 2 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2270 5 : .await
2271 2 : .unwrap();
2272 2 : let mut modification = tline.begin_modification(startpoint);
2273 2 : let mut decoded = DecodedWALRecord::default();
2274 2 : println!("decoding {} bytes", bytes.len() - xlogoff);
2275 2 :
2276 2 : // Decode and ingest wal. We process the wal in chunks because
2277 2 : // that's what happens when we get bytes from safekeepers.
2278 474686 : for chunk in bytes[xlogoff..].chunks(50) {
2279 474686 : decoder.feed_bytes(chunk);
2280 620536 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2281 145850 : walingest
2282 145850 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
2283 100 : .await
2284 145850 : .unwrap();
2285 2 : }
2286 474686 : modification.commit(&ctx).await.unwrap();
2287 2 : }
2288 2 :
2289 2 : let duration = started_at.elapsed();
2290 2 : println!("done in {:?}", duration);
2291 2 : }
2292 : }
|