TLA Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use pageserver_api::shard::ShardIdentity;
25 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
26 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
27 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
28 :
29 : use anyhow::{bail, Context, Result};
30 : use bytes::{Buf, Bytes, BytesMut};
31 : use tracing::*;
32 : use utils::failpoint_support;
33 :
34 : use crate::context::RequestContext;
35 : use crate::metrics::WAL_INGEST;
36 : use crate::pgdatadir_mapping::*;
37 : use crate::tenant::PageReconstructError;
38 : use crate::tenant::Timeline;
39 : use crate::walrecord::*;
40 : use crate::ZERO_PAGE;
41 : use pageserver_api::reltag::{RelTag, SlruKind};
42 : use postgres_ffi::pg_constants;
43 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
44 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
45 : use postgres_ffi::v14::xlog_utils::*;
46 : use postgres_ffi::v14::CheckPoint;
47 : use postgres_ffi::TransactionId;
48 : use postgres_ffi::BLCKSZ;
49 : use utils::lsn::Lsn;
50 :
51 : pub struct WalIngest {
52 : shard: ShardIdentity,
53 : checkpoint: CheckPoint,
54 : checkpoint_modified: bool,
55 : }
56 :
57 : impl WalIngest {
58 CBC 1241 : pub async fn new(
59 1241 : timeline: &Timeline,
60 1241 : startpoint: Lsn,
61 1241 : ctx: &RequestContext,
62 1241 : ) -> anyhow::Result<WalIngest> {
63 : // Fetch the latest checkpoint into memory, so that we can compare with it
64 : // quickly in `ingest_record` and update it when it changes.
65 1241 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
66 1232 : let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
67 UBC 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
68 :
69 CBC 1232 : Ok(WalIngest {
70 1232 : shard: *timeline.get_shard_identity(),
71 1232 : checkpoint,
72 1232 : checkpoint_modified: false,
73 1232 : })
74 1241 : }
75 :
76 : ///
77 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
78 : ///
79 : /// This function updates `lsn` field of `DatadirModification`
80 : ///
81 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
82 : /// relations/pages that the record affects.
83 : ///
84 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
85 : ///
86 47422616 : pub async fn ingest_record(
87 47422616 : &mut self,
88 47422616 : recdata: Bytes,
89 47422616 : lsn: Lsn,
90 47422616 : modification: &mut DatadirModification<'_>,
91 47422616 : decoded: &mut DecodedWALRecord,
92 47422616 : ctx: &RequestContext,
93 47422616 : ) -> anyhow::Result<bool> {
94 47422616 : WAL_INGEST.records_received.inc();
95 47422616 : let pg_version = modification.tline.pg_version;
96 47422616 : let prev_len = modification.len();
97 47422616 :
98 47422616 : modification.set_lsn(lsn)?;
99 47422616 : decode_wal_record(recdata, decoded, pg_version)?;
100 :
101 47422616 : let mut buf = decoded.record.clone();
102 47422616 : buf.advance(decoded.main_data_offset);
103 47422616 :
104 47422616 : assert!(!self.checkpoint_modified);
105 47422616 : if self.checkpoint.update_next_xid(decoded.xl_xid) {
106 3403 : self.checkpoint_modified = true;
107 47419213 : }
108 :
109 47422616 : match decoded.xl_rmid {
110 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
111 : // Heap AM records need some special handling, because they modify VM pages
112 : // without registering them with the standard mechanism.
113 37638804 : self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
114 7 : .await?;
115 : }
116 : pg_constants::RM_NEON_ID => {
117 UBC 0 : self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
118 0 : .await?;
119 : }
120 : // Handle other special record types
121 : pg_constants::RM_SMGR_ID => {
122 CBC 21472 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
123 21472 :
124 21472 : if info == pg_constants::XLOG_SMGR_CREATE {
125 21428 : let create = XlSmgrCreate::decode(&mut buf);
126 21428 : self.ingest_xlog_smgr_create(modification, &create, ctx)
127 656 : .await?;
128 44 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
129 44 : let truncate = XlSmgrTruncate::decode(&mut buf);
130 44 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
131 GBC 1 : .await?;
132 UBC 0 : }
133 : }
134 : pg_constants::RM_DBASE_ID => {
135 CBC 12 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
136 UBC 0 : debug!(%info, %pg_version, "handle RM_DBASE_ID");
137 :
138 CBC 12 : if pg_version == 14 {
139 12 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
140 9 : let createdb = XlCreateDatabase::decode(&mut buf);
141 UBC 0 : debug!("XLOG_DBASE_CREATE v14");
142 :
143 CBC 9 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
144 926 : .await?;
145 3 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
146 3 : let dropdb = XlDropDatabase::decode(&mut buf);
147 6 : for tablespace_id in dropdb.tablespace_ids {
148 UBC 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
149 CBC 3 : modification
150 3 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
151 UBC 0 : .await?;
152 : }
153 0 : }
154 0 : } else if pg_version == 15 {
155 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
156 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
157 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
158 : // The XLOG record was renamed between v14 and v15,
159 : // but the record format is the same.
160 : // So we can reuse XlCreateDatabase here.
161 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
162 0 : let createdb = XlCreateDatabase::decode(&mut buf);
163 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
164 0 : .await?;
165 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
166 0 : let dropdb = XlDropDatabase::decode(&mut buf);
167 0 : for tablespace_id in dropdb.tablespace_ids {
168 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
169 0 : modification
170 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
171 0 : .await?;
172 : }
173 0 : }
174 0 : } else if pg_version == 16 {
175 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
176 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
177 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
178 : // The XLOG record was renamed between v14 and v15,
179 : // but the record format is the same.
180 : // So we can reuse XlCreateDatabase here.
181 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
182 0 : let createdb = XlCreateDatabase::decode(&mut buf);
183 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
184 0 : .await?;
185 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
186 0 : let dropdb = XlDropDatabase::decode(&mut buf);
187 0 : for tablespace_id in dropdb.tablespace_ids {
188 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
189 0 : modification
190 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
191 0 : .await?;
192 : }
193 0 : }
194 0 : }
195 : }
196 : pg_constants::RM_TBLSPC_ID => {
197 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
198 : }
199 : pg_constants::RM_CLOG_ID => {
200 CBC 360 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
201 360 :
202 360 : if info == pg_constants::CLOG_ZEROPAGE {
203 359 : let pageno = buf.get_u32_le();
204 359 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
205 359 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
206 359 : self.put_slru_page_image(
207 359 : modification,
208 359 : SlruKind::Clog,
209 359 : segno,
210 359 : rpageno,
211 359 : ZERO_PAGE.clone(),
212 359 : ctx,
213 359 : )
214 7 : .await?;
215 : } else {
216 1 : assert!(info == pg_constants::CLOG_TRUNCATE);
217 1 : let xlrec = XlClogTruncate::decode(&mut buf);
218 1 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
219 UBC 0 : .await?;
220 : }
221 : }
222 : pg_constants::RM_XACT_ID => {
223 CBC 1910750 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
224 1910750 :
225 1910750 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
226 1801602 : let parsed_xact =
227 1801602 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
228 1801602 : self.ingest_xact_record(
229 1801602 : modification,
230 1801602 : &parsed_xact,
231 1801602 : info == pg_constants::XLOG_XACT_COMMIT,
232 1801602 : ctx,
233 1801602 : )
234 785 : .await?;
235 109148 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
236 109147 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
237 : {
238 2 : let parsed_xact =
239 2 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
240 2 : self.ingest_xact_record(
241 2 : modification,
242 2 : &parsed_xact,
243 2 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
244 2 : ctx,
245 2 : )
246 UBC 0 : .await?;
247 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
248 0 : trace!(
249 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
250 0 : decoded.xl_xid,
251 0 : parsed_xact.xid,
252 0 : lsn,
253 0 : );
254 CBC 2 : modification
255 2 : .drop_twophase_file(parsed_xact.xid, ctx)
256 UBC 0 : .await?;
257 CBC 109146 : } else if info == pg_constants::XLOG_XACT_PREPARE {
258 4 : modification
259 4 : .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
260 UBC 0 : .await?;
261 CBC 109142 : }
262 : }
263 : pg_constants::RM_MULTIXACT_ID => {
264 24332 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
265 24332 :
266 24332 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
267 14 : let pageno = buf.get_u32_le();
268 14 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
269 14 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
270 14 : self.put_slru_page_image(
271 14 : modification,
272 14 : SlruKind::MultiXactOffsets,
273 14 : segno,
274 14 : rpageno,
275 14 : ZERO_PAGE.clone(),
276 14 : ctx,
277 14 : )
278 UBC 0 : .await?;
279 CBC 24318 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
280 291 : let pageno = buf.get_u32_le();
281 291 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
282 291 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
283 291 : self.put_slru_page_image(
284 291 : modification,
285 291 : SlruKind::MultiXactMembers,
286 291 : segno,
287 291 : rpageno,
288 291 : ZERO_PAGE.clone(),
289 291 : ctx,
290 291 : )
291 UBC 0 : .await?;
292 CBC 24027 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
293 24027 : let xlrec = XlMultiXactCreate::decode(&mut buf);
294 24027 : self.ingest_multixact_create_record(modification, &xlrec)?;
295 UBC 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
296 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
297 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
298 0 : .await?;
299 0 : }
300 : }
301 : pg_constants::RM_RELMAP_ID => {
302 CBC 54 : let xlrec = XlRelmapUpdate::decode(&mut buf);
303 54 : self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
304 2 : .await?;
305 : }
306 : pg_constants::RM_XLOG_ID => {
307 129668 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
308 129668 :
309 129668 : if info == pg_constants::XLOG_NEXTOID {
310 383 : let next_oid = buf.get_u32_le();
311 383 : if self.checkpoint.nextOid != next_oid {
312 383 : self.checkpoint.nextOid = next_oid;
313 383 : self.checkpoint_modified = true;
314 383 : }
315 129285 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
316 129230 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
317 : {
318 579 : let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
319 579 : buf.copy_to_slice(&mut checkpoint_bytes);
320 579 : let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
321 UBC 0 : trace!(
322 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
323 0 : xlog_checkpoint.oldestXid,
324 0 : self.checkpoint.oldestXid
325 0 : );
326 CBC 579 : if (self
327 579 : .checkpoint
328 579 : .oldestXid
329 579 : .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
330 579 : < 0
331 UBC 0 : {
332 0 : self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
333 0 : self.checkpoint_modified = true;
334 CBC 579 : }
335 128706 : }
336 : }
337 : pg_constants::RM_LOGICALMSG_ID => {
338 116 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
339 116 :
340 116 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
341 116 : let xlrec = XlLogicalMessage::decode(&mut buf);
342 116 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
343 116 : let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
344 116 : if prefix == "neon-test" {
345 : // This is a convenient way to make the WAL ingestion pause at
346 : // particular point in the WAL. For more fine-grained control,
347 : // we could peek into the message and only pause if it contains
348 : // a particular string, for example, but this is enough for now.
349 6 : failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
350 112 : } else if let Some(path) = prefix.strip_prefix("neon-file:") {
351 105 : modification.put_file(path, message, ctx).await?;
352 7 : }
353 UBC 0 : }
354 : }
355 CBC 7697044 : _x => {
356 7697044 : // TODO: should probably log & fail here instead of blindly
357 7697044 : // doing something without understanding the protocol
358 7697044 : }
359 : }
360 :
361 : // Iterate through all the blocks that the record modifies, and
362 : // "put" a separate copy of the record for each block.
363 47422615 : for blk in decoded.blocks.iter() {
364 47001894 : let rel = RelTag {
365 47001894 : spcnode: blk.rnode_spcnode,
366 47001894 : dbnode: blk.rnode_dbnode,
367 47001894 : relnode: blk.rnode_relnode,
368 47001894 : forknum: blk.forknum,
369 47001894 : };
370 47001894 :
371 47001894 : let key = rel_block_to_key(rel, blk.blkno);
372 47001894 : let key_is_local = self.shard.is_key_local(&key);
373 :
374 UBC 0 : tracing::debug!(
375 0 : lsn=%lsn,
376 0 : key=%key,
377 0 : "ingest: shard decision {} (checkpoint={})",
378 0 : if !key_is_local { "drop" } else { "keep" },
379 0 : self.checkpoint_modified
380 0 : );
381 :
382 CBC 47001894 : if !key_is_local {
383 UBC 0 : if self.shard.is_zero() {
384 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
385 : // its blkno in case it implicitly extends a relation.
386 0 : self.observe_decoded_block(modification, blk, ctx).await?;
387 0 : }
388 :
389 0 : continue;
390 CBC 47001894 : }
391 47001894 : self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
392 5776 : .await?;
393 : }
394 :
395 : // If checkpoint data was updated, store the new version in the repository
396 47422611 : if self.checkpoint_modified {
397 27795 : let new_checkpoint_bytes = self.checkpoint.encode()?;
398 :
399 27795 : modification.put_checkpoint(new_checkpoint_bytes)?;
400 27795 : self.checkpoint_modified = false;
401 47394816 : }
402 :
403 : // Note that at this point this record is only cached in the modification
404 : // until commit() is called to flush the data into the repository and update
405 : // the latest LSN.
406 :
407 47422611 : Ok(modification.len() > prev_len)
408 47422616 : }
409 :
410 : /// Do not store this block, but observe it for the purposes of updating our relation size state.
411 UBC 0 : async fn observe_decoded_block(
412 0 : &mut self,
413 0 : modification: &mut DatadirModification<'_>,
414 0 : blk: &DecodedBkpBlock,
415 0 : ctx: &RequestContext,
416 0 : ) -> Result<(), PageReconstructError> {
417 0 : let rel = RelTag {
418 0 : spcnode: blk.rnode_spcnode,
419 0 : dbnode: blk.rnode_dbnode,
420 0 : relnode: blk.rnode_relnode,
421 0 : forknum: blk.forknum,
422 0 : };
423 0 : self.handle_rel_extend(modification, rel, blk.blkno, ctx)
424 0 : .await
425 0 : }
426 :
427 CBC 47001894 : async fn ingest_decoded_block(
428 47001894 : &mut self,
429 47001894 : modification: &mut DatadirModification<'_>,
430 47001894 : lsn: Lsn,
431 47001894 : decoded: &DecodedWALRecord,
432 47001894 : blk: &DecodedBkpBlock,
433 47001894 : ctx: &RequestContext,
434 47001894 : ) -> Result<(), PageReconstructError> {
435 47001894 : let rel = RelTag {
436 47001894 : spcnode: blk.rnode_spcnode,
437 47001894 : dbnode: blk.rnode_dbnode,
438 47001894 : relnode: blk.rnode_relnode,
439 47001894 : forknum: blk.forknum,
440 47001894 : };
441 47001894 :
442 47001894 : //
443 47001894 : // Instead of storing full-page-image WAL record,
444 47001894 : // it is better to store extracted image: we can skip wal-redo
445 47001894 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
446 47001894 : // so them have to be copied multiple times.
447 47001894 : //
448 47001894 : if blk.apply_image
449 148064 : && blk.has_image
450 148064 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
451 129995 : && (decoded.xl_info == pg_constants::XLOG_FPI
452 UBC 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
453 : // compression of WAL is not yet supported: fall back to storing the original WAL record
454 CBC 129995 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
455 : // do not materialize null pages because them most likely be soon replaced with real data
456 129995 : && blk.bimg_len != 0
457 : {
458 : // Extract page image from FPI record
459 129995 : let img_len = blk.bimg_len as usize;
460 129995 : let img_offs = blk.bimg_offset as usize;
461 129995 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
462 129995 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
463 129995 :
464 129995 : if blk.hole_length != 0 {
465 107511 : let tail = image.split_off(blk.hole_offset as usize);
466 107511 : image.resize(image.len() + blk.hole_length as usize, 0u8);
467 107511 : image.unsplit(tail);
468 107511 : }
469 : //
470 : // Match the logic of XLogReadBufferForRedoExtended:
471 : // The page may be uninitialized. If so, we can't set the LSN because
472 : // that would corrupt the page.
473 : //
474 129995 : if !page_is_new(&image) {
475 125364 : page_set_lsn(&mut image, lsn)
476 4631 : }
477 129995 : assert_eq!(image.len(), BLCKSZ as usize);
478 129995 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
479 340 : .await?;
480 : } else {
481 46871899 : let rec = NeonWalRecord::Postgres {
482 46871899 : will_init: blk.will_init || blk.apply_image,
483 46871899 : rec: decoded.record.clone(),
484 46871899 : };
485 46871899 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
486 5436 : .await?;
487 : }
488 47001890 : Ok(())
489 47001894 : }
490 :
491 37638804 : async fn ingest_heapam_record(
492 37638804 : &mut self,
493 37638804 : buf: &mut Bytes,
494 37638804 : modification: &mut DatadirModification<'_>,
495 37638804 : decoded: &DecodedWALRecord,
496 37638804 : ctx: &RequestContext,
497 37638804 : ) -> anyhow::Result<()> {
498 37638804 : // Handle VM bit updates that are implicitly part of heap records.
499 37638804 :
500 37638804 : // First, look at the record to determine which VM bits need
501 37638804 : // to be cleared. If either of these variables is set, we
502 37638804 : // need to clear the corresponding bits in the visibility map.
503 37638804 : let mut new_heap_blkno: Option<u32> = None;
504 37638804 : let mut old_heap_blkno: Option<u32> = None;
505 37638804 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
506 37638804 :
507 37638804 : match modification.tline.pg_version {
508 : 14 => {
509 37566067 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
510 35337961 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
511 35337961 :
512 35337961 : if info == pg_constants::XLOG_HEAP_INSERT {
513 29256546 : let xlrec = v14::XlHeapInsert::decode(buf);
514 29256546 : assert_eq!(0, buf.remaining());
515 29256546 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
516 2562 : new_heap_blkno = Some(decoded.blocks[0].blkno);
517 29253984 : }
518 6081415 : } else if info == pg_constants::XLOG_HEAP_DELETE {
519 535744 : let xlrec = v14::XlHeapDelete::decode(buf);
520 535744 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
521 730 : new_heap_blkno = Some(decoded.blocks[0].blkno);
522 535014 : }
523 5545671 : } else if info == pg_constants::XLOG_HEAP_UPDATE
524 4075052 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
525 : {
526 2801135 : let xlrec = v14::XlHeapUpdate::decode(buf);
527 2801135 : // the size of tuple data is inferred from the size of the record.
528 2801135 : // we can't validate the remaining number of bytes without parsing
529 2801135 : // the tuple data.
530 2801135 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
531 83421 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
532 2717714 : }
533 2801135 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
534 1701 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
535 1701 : // non-HOT update where the new tuple goes to different page than
536 1701 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
537 1701 : // set.
538 1701 : new_heap_blkno = Some(decoded.blocks[0].blkno);
539 2799434 : }
540 2744536 : } else if info == pg_constants::XLOG_HEAP_LOCK {
541 2688546 : let xlrec = v14::XlHeapLock::decode(buf);
542 2688546 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
543 53 : old_heap_blkno = Some(decoded.blocks[0].blkno);
544 53 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
545 2688493 : }
546 55990 : }
547 2228106 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
548 2228106 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
549 2228106 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
550 780174 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
551 :
552 780174 : let offset_array_len =
553 780174 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
554 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
555 629382 : 0
556 : } else {
557 150792 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
558 : };
559 780174 : assert_eq!(offset_array_len, buf.remaining());
560 :
561 780174 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
562 1852 : new_heap_blkno = Some(decoded.blocks[0].blkno);
563 778322 : }
564 1447932 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
565 3898 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
566 3898 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
567 UBC 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
568 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
569 CBC 3898 : }
570 1444034 : }
571 : } else {
572 UBC 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
573 : }
574 : }
575 : 15 => {
576 CBC 72737 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
577 72643 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
578 72643 :
579 72643 : if info == pg_constants::XLOG_HEAP_INSERT {
580 72638 : let xlrec = v15::XlHeapInsert::decode(buf);
581 72638 : assert_eq!(0, buf.remaining());
582 72638 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
583 2 : new_heap_blkno = Some(decoded.blocks[0].blkno);
584 72636 : }
585 5 : } else if info == pg_constants::XLOG_HEAP_DELETE {
586 UBC 0 : let xlrec = v15::XlHeapDelete::decode(buf);
587 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
588 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
589 0 : }
590 CBC 5 : } else if info == pg_constants::XLOG_HEAP_UPDATE
591 1 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
592 : {
593 4 : let xlrec = v15::XlHeapUpdate::decode(buf);
594 4 : // the size of tuple data is inferred from the size of the record.
595 4 : // we can't validate the remaining number of bytes without parsing
596 4 : // the tuple data.
597 4 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
598 UBC 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
599 CBC 4 : }
600 4 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
601 UBC 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
602 0 : // non-HOT update where the new tuple goes to different page than
603 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
604 0 : // set.
605 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
606 CBC 4 : }
607 1 : } else if info == pg_constants::XLOG_HEAP_LOCK {
608 UBC 0 : let xlrec = v15::XlHeapLock::decode(buf);
609 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
610 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
611 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
612 0 : }
613 CBC 1 : }
614 94 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
615 94 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
616 94 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
617 21 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
618 :
619 21 : let offset_array_len =
620 21 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
621 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
622 1 : 0
623 : } else {
624 20 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
625 : };
626 21 : assert_eq!(offset_array_len, buf.remaining());
627 :
628 21 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
629 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
630 17 : }
631 73 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
632 UBC 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
633 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
634 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
635 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
636 0 : }
637 CBC 73 : }
638 : } else {
639 UBC 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
640 : }
641 : }
642 : 16 => {
643 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
644 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
645 0 :
646 0 : if info == pg_constants::XLOG_HEAP_INSERT {
647 0 : let xlrec = v16::XlHeapInsert::decode(buf);
648 0 : assert_eq!(0, buf.remaining());
649 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
650 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
651 0 : }
652 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
653 0 : let xlrec = v16::XlHeapDelete::decode(buf);
654 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
655 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
656 0 : }
657 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
658 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
659 : {
660 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
661 0 : // the size of tuple data is inferred from the size of the record.
662 0 : // we can't validate the remaining number of bytes without parsing
663 0 : // the tuple data.
664 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
665 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
666 0 : }
667 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
668 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
669 0 : // non-HOT update where the new tuple goes to different page than
670 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
671 0 : // set.
672 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
673 0 : }
674 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
675 0 : let xlrec = v16::XlHeapLock::decode(buf);
676 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
677 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
678 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
679 0 : }
680 0 : }
681 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
682 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
683 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
684 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
685 :
686 0 : let offset_array_len =
687 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
688 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
689 0 : 0
690 : } else {
691 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
692 : };
693 0 : assert_eq!(offset_array_len, buf.remaining());
694 :
695 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
696 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
697 0 : }
698 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
699 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
700 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
701 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
702 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
703 0 : }
704 0 : }
705 : } else {
706 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
707 : }
708 : }
709 0 : _ => {}
710 : }
711 :
712 : // Clear the VM bits if required.
713 CBC 37638804 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
714 90274 : let vm_rel = RelTag {
715 90274 : forknum: VISIBILITYMAP_FORKNUM,
716 90274 : spcnode: decoded.blocks[0].rnode_spcnode,
717 90274 : dbnode: decoded.blocks[0].rnode_dbnode,
718 90274 : relnode: decoded.blocks[0].rnode_relnode,
719 90274 : };
720 90274 :
721 90274 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
722 90274 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
723 :
724 : // Sometimes, Postgres seems to create heap WAL records with the
725 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
726 : // not set. In fact, it's possible that the VM page does not exist at all.
727 : // In that case, we don't want to store a record to clear the VM bit;
728 : // replaying it would fail to find the previous image of the page, because
729 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
730 : // record if it doesn't.
731 90274 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
732 90274 : if let Some(blknum) = new_vm_blk {
733 6851 : if blknum >= vm_size {
734 UBC 0 : new_vm_blk = None;
735 CBC 6851 : }
736 83423 : }
737 90274 : if let Some(blknum) = old_vm_blk {
738 83474 : if blknum >= vm_size {
739 UBC 0 : old_vm_blk = None;
740 CBC 83474 : }
741 6800 : }
742 :
743 90274 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
744 90274 : if new_vm_blk == old_vm_blk {
745 : // An UPDATE record that needs to clear the bits for both old and the
746 : // new page, both of which reside on the same VM page.
747 51 : self.put_rel_wal_record(
748 51 : modification,
749 51 : vm_rel,
750 51 : new_vm_blk.unwrap(),
751 51 : NeonWalRecord::ClearVisibilityMapFlags {
752 51 : new_heap_blkno,
753 51 : old_heap_blkno,
754 51 : flags,
755 51 : },
756 51 : ctx,
757 51 : )
758 UBC 0 : .await?;
759 : } else {
760 : // Clear VM bits for one heap page, or for two pages that reside on
761 : // different VM pages.
762 CBC 90223 : if let Some(new_vm_blk) = new_vm_blk {
763 6800 : self.put_rel_wal_record(
764 6800 : modification,
765 6800 : vm_rel,
766 6800 : new_vm_blk,
767 6800 : NeonWalRecord::ClearVisibilityMapFlags {
768 6800 : new_heap_blkno,
769 6800 : old_heap_blkno: None,
770 6800 : flags,
771 6800 : },
772 6800 : ctx,
773 6800 : )
774 UBC 0 : .await?;
775 CBC 83423 : }
776 90223 : if let Some(old_vm_blk) = old_vm_blk {
777 83423 : self.put_rel_wal_record(
778 83423 : modification,
779 83423 : vm_rel,
780 83423 : old_vm_blk,
781 83423 : NeonWalRecord::ClearVisibilityMapFlags {
782 83423 : new_heap_blkno: None,
783 83423 : old_heap_blkno,
784 83423 : flags,
785 83423 : },
786 83423 : ctx,
787 83423 : )
788 UBC 0 : .await?;
789 CBC 6800 : }
790 : }
791 UBC 0 : }
792 CBC 37548530 : }
793 :
794 37638804 : Ok(())
795 37638804 : }
796 :
797 UBC 0 : async fn ingest_neonrmgr_record(
798 0 : &mut self,
799 0 : buf: &mut Bytes,
800 0 : modification: &mut DatadirModification<'_>,
801 0 : decoded: &DecodedWALRecord,
802 0 : ctx: &RequestContext,
803 0 : ) -> anyhow::Result<()> {
804 0 : // Handle VM bit updates that are implicitly part of heap records.
805 0 :
806 0 : // First, look at the record to determine which VM bits need
807 0 : // to be cleared. If either of these variables is set, we
808 0 : // need to clear the corresponding bits in the visibility map.
809 0 : let mut new_heap_blkno: Option<u32> = None;
810 0 : let mut old_heap_blkno: Option<u32> = None;
811 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
812 0 : let pg_version = modification.tline.pg_version;
813 0 :
814 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
815 :
816 0 : match pg_version {
817 : 16 => {
818 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
819 0 :
820 0 : match info {
821 : pg_constants::XLOG_NEON_HEAP_INSERT => {
822 0 : let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
823 0 : assert_eq!(0, buf.remaining());
824 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
825 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
826 0 : }
827 : }
828 : pg_constants::XLOG_NEON_HEAP_DELETE => {
829 0 : let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
830 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
831 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
832 0 : }
833 : }
834 : pg_constants::XLOG_NEON_HEAP_UPDATE
835 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
836 0 : let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
837 0 : // the size of tuple data is inferred from the size of the record.
838 0 : // we can't validate the remaining number of bytes without parsing
839 0 : // the tuple data.
840 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
841 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
842 0 : }
843 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
844 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
845 0 : // non-HOT update where the new tuple goes to different page than
846 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
847 0 : // set.
848 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
849 0 : }
850 : }
851 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
852 0 : let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
853 :
854 0 : let offset_array_len =
855 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
856 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
857 0 : 0
858 : } else {
859 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
860 : };
861 0 : assert_eq!(offset_array_len, buf.remaining());
862 :
863 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
864 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
865 0 : }
866 : }
867 : pg_constants::XLOG_NEON_HEAP_LOCK => {
868 0 : let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
869 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
870 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
871 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
872 0 : }
873 : }
874 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
875 : }
876 : }
877 0 : _ => bail!(
878 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
879 0 : pg_version
880 0 : ),
881 : }
882 :
883 : // Clear the VM bits if required.
884 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
885 0 : let vm_rel = RelTag {
886 0 : forknum: VISIBILITYMAP_FORKNUM,
887 0 : spcnode: decoded.blocks[0].rnode_spcnode,
888 0 : dbnode: decoded.blocks[0].rnode_dbnode,
889 0 : relnode: decoded.blocks[0].rnode_relnode,
890 0 : };
891 0 :
892 0 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
893 0 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
894 :
895 : // Sometimes, Postgres seems to create heap WAL records with the
896 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
897 : // not set. In fact, it's possible that the VM page does not exist at all.
898 : // In that case, we don't want to store a record to clear the VM bit;
899 : // replaying it would fail to find the previous image of the page, because
900 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
901 : // record if it doesn't.
902 0 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
903 0 : if let Some(blknum) = new_vm_blk {
904 0 : if blknum >= vm_size {
905 0 : new_vm_blk = None;
906 0 : }
907 0 : }
908 0 : if let Some(blknum) = old_vm_blk {
909 0 : if blknum >= vm_size {
910 0 : old_vm_blk = None;
911 0 : }
912 0 : }
913 :
914 0 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
915 0 : if new_vm_blk == old_vm_blk {
916 : // An UPDATE record that needs to clear the bits for both old and the
917 : // new page, both of which reside on the same VM page.
918 0 : self.put_rel_wal_record(
919 0 : modification,
920 0 : vm_rel,
921 0 : new_vm_blk.unwrap(),
922 0 : NeonWalRecord::ClearVisibilityMapFlags {
923 0 : new_heap_blkno,
924 0 : old_heap_blkno,
925 0 : flags,
926 0 : },
927 0 : ctx,
928 0 : )
929 0 : .await?;
930 : } else {
931 : // Clear VM bits for one heap page, or for two pages that reside on
932 : // different VM pages.
933 0 : if let Some(new_vm_blk) = new_vm_blk {
934 0 : self.put_rel_wal_record(
935 0 : modification,
936 0 : vm_rel,
937 0 : new_vm_blk,
938 0 : NeonWalRecord::ClearVisibilityMapFlags {
939 0 : new_heap_blkno,
940 0 : old_heap_blkno: None,
941 0 : flags,
942 0 : },
943 0 : ctx,
944 0 : )
945 0 : .await?;
946 0 : }
947 0 : if let Some(old_vm_blk) = old_vm_blk {
948 0 : self.put_rel_wal_record(
949 0 : modification,
950 0 : vm_rel,
951 0 : old_vm_blk,
952 0 : NeonWalRecord::ClearVisibilityMapFlags {
953 0 : new_heap_blkno: None,
954 0 : old_heap_blkno,
955 0 : flags,
956 0 : },
957 0 : ctx,
958 0 : )
959 0 : .await?;
960 0 : }
961 : }
962 0 : }
963 0 : }
964 :
965 0 : Ok(())
966 0 : }
967 :
968 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
969 CBC 9 : async fn ingest_xlog_dbase_create(
970 9 : &mut self,
971 9 : modification: &mut DatadirModification<'_>,
972 9 : rec: &XlCreateDatabase,
973 9 : ctx: &RequestContext,
974 9 : ) -> anyhow::Result<()> {
975 9 : let db_id = rec.db_id;
976 9 : let tablespace_id = rec.tablespace_id;
977 9 : let src_db_id = rec.src_db_id;
978 9 : let src_tablespace_id = rec.src_tablespace_id;
979 :
980 9 : let rels = modification
981 9 : .tline
982 9 : .list_rels(
983 9 : src_tablespace_id,
984 9 : src_db_id,
985 9 : Version::Modified(modification),
986 9 : ctx,
987 9 : )
988 UBC 0 : .await?;
989 :
990 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
991 :
992 : // Copy relfilemap
993 CBC 9 : let filemap = modification
994 9 : .tline
995 9 : .get_relmap_file(
996 9 : src_tablespace_id,
997 9 : src_db_id,
998 9 : Version::Modified(modification),
999 9 : ctx,
1000 9 : )
1001 UBC 0 : .await?;
1002 CBC 9 : modification
1003 9 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
1004 UBC 0 : .await?;
1005 :
1006 CBC 9 : let mut num_rels_copied = 0;
1007 9 : let mut num_blocks_copied = 0;
1008 2637 : for src_rel in rels {
1009 2628 : assert_eq!(src_rel.spcnode, src_tablespace_id);
1010 2628 : assert_eq!(src_rel.dbnode, src_db_id);
1011 :
1012 2628 : let nblocks = modification
1013 2628 : .tline
1014 2628 : .get_rel_size(src_rel, Version::Modified(modification), true, ctx)
1015 211 : .await?;
1016 2628 : let dst_rel = RelTag {
1017 2628 : spcnode: tablespace_id,
1018 2628 : dbnode: db_id,
1019 2628 : relnode: src_rel.relnode,
1020 2628 : forknum: src_rel.forknum,
1021 2628 : };
1022 2628 :
1023 2628 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
1024 :
1025 : // Copy content
1026 UBC 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
1027 CBC 9099 : for blknum in 0..nblocks {
1028 UBC 0 : debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
1029 :
1030 CBC 9099 : let content = modification
1031 9099 : .tline
1032 9099 : .get_rel_page_at_lsn(
1033 9099 : src_rel,
1034 9099 : blknum,
1035 9099 : Version::Modified(modification),
1036 9099 : true,
1037 9099 : ctx,
1038 9099 : )
1039 715 : .await?;
1040 9099 : modification.put_rel_page_image(dst_rel, blknum, content)?;
1041 9099 : num_blocks_copied += 1;
1042 : }
1043 :
1044 2628 : num_rels_copied += 1;
1045 : }
1046 :
1047 9 : info!(
1048 9 : "Created database {}/{}, copied {} blocks in {} rels",
1049 9 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
1050 9 : );
1051 9 : Ok(())
1052 9 : }
1053 :
1054 21428 : async fn ingest_xlog_smgr_create(
1055 21428 : &mut self,
1056 21428 : modification: &mut DatadirModification<'_>,
1057 21428 : rec: &XlSmgrCreate,
1058 21428 : ctx: &RequestContext,
1059 21428 : ) -> anyhow::Result<()> {
1060 21428 : let rel = RelTag {
1061 21428 : spcnode: rec.rnode.spcnode,
1062 21428 : dbnode: rec.rnode.dbnode,
1063 21428 : relnode: rec.rnode.relnode,
1064 21428 : forknum: rec.forknum,
1065 21428 : };
1066 21428 : self.put_rel_creation(modification, rel, ctx).await?;
1067 21427 : Ok(())
1068 21428 : }
1069 :
1070 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1071 : ///
1072 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1073 44 : async fn ingest_xlog_smgr_truncate(
1074 44 : &mut self,
1075 44 : modification: &mut DatadirModification<'_>,
1076 44 : rec: &XlSmgrTruncate,
1077 44 : ctx: &RequestContext,
1078 44 : ) -> anyhow::Result<()> {
1079 44 : let spcnode = rec.rnode.spcnode;
1080 44 : let dbnode = rec.rnode.dbnode;
1081 44 : let relnode = rec.rnode.relnode;
1082 44 :
1083 44 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
1084 44 : let rel = RelTag {
1085 44 : spcnode,
1086 44 : dbnode,
1087 44 : relnode,
1088 44 : forknum: MAIN_FORKNUM,
1089 44 : };
1090 44 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
1091 GBC 1 : .await?;
1092 UBC 0 : }
1093 CBC 44 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
1094 44 : let rel = RelTag {
1095 44 : spcnode,
1096 44 : dbnode,
1097 44 : relnode,
1098 44 : forknum: FSM_FORKNUM,
1099 44 : };
1100 44 :
1101 44 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1102 44 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1103 44 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1104 : // Tail of last remaining FSM page has to be zeroed.
1105 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1106 21 : modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
1107 21 : fsm_physical_page_no += 1;
1108 23 : }
1109 44 : let nblocks = get_relsize(modification, rel, ctx).await?;
1110 44 : if nblocks > fsm_physical_page_no {
1111 : // check if something to do: FSM is larger than truncate position
1112 21 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1113 UBC 0 : .await?;
1114 CBC 23 : }
1115 UBC 0 : }
1116 CBC 44 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
1117 44 : let rel = RelTag {
1118 44 : spcnode,
1119 44 : dbnode,
1120 44 : relnode,
1121 44 : forknum: VISIBILITYMAP_FORKNUM,
1122 44 : };
1123 44 :
1124 44 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1125 44 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1126 : // Tail of last remaining vm page has to be zeroed.
1127 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1128 21 : modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
1129 21 : vm_page_no += 1;
1130 23 : }
1131 44 : let nblocks = get_relsize(modification, rel, ctx).await?;
1132 44 : if nblocks > vm_page_no {
1133 : // check if something to do: VM is larger than truncate position
1134 21 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1135 UBC 0 : .await?;
1136 CBC 23 : }
1137 UBC 0 : }
1138 CBC 44 : Ok(())
1139 44 : }
1140 :
1141 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1142 : ///
1143 1801604 : async fn ingest_xact_record(
1144 1801604 : &mut self,
1145 1801604 : modification: &mut DatadirModification<'_>,
1146 1801604 : parsed: &XlXactParsedRecord,
1147 1801604 : is_commit: bool,
1148 1801604 : ctx: &RequestContext,
1149 1801604 : ) -> anyhow::Result<()> {
1150 1801604 : // Record update of CLOG pages
1151 1801604 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1152 1801604 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1153 1801604 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1154 1801604 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1155 :
1156 2001653 : for subxact in &parsed.subxacts {
1157 200049 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1158 200049 : if subxact_pageno != pageno {
1159 : // This subxact goes to different page. Write the record
1160 : // for all the XIDs on the previous page, and continue
1161 : // accumulating XIDs on this new page.
1162 6 : modification.put_slru_wal_record(
1163 6 : SlruKind::Clog,
1164 6 : segno,
1165 6 : rpageno,
1166 6 : if is_commit {
1167 6 : NeonWalRecord::ClogSetCommitted {
1168 6 : xids: page_xids,
1169 6 : timestamp: parsed.xact_time,
1170 6 : }
1171 : } else {
1172 UBC 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1173 : },
1174 0 : )?;
1175 CBC 6 : page_xids = Vec::new();
1176 200043 : }
1177 200049 : pageno = subxact_pageno;
1178 200049 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1179 200049 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1180 200049 : page_xids.push(*subxact);
1181 : }
1182 1801604 : modification.put_slru_wal_record(
1183 1801604 : SlruKind::Clog,
1184 1801604 : segno,
1185 1801604 : rpageno,
1186 1801604 : if is_commit {
1187 1799812 : NeonWalRecord::ClogSetCommitted {
1188 1799812 : xids: page_xids,
1189 1799812 : timestamp: parsed.xact_time,
1190 1799812 : }
1191 : } else {
1192 1792 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1193 : },
1194 UBC 0 : )?;
1195 :
1196 CBC 1818296 : for xnode in &parsed.xnodes {
1197 83460 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1198 66768 : let rel = RelTag {
1199 66768 : forknum,
1200 66768 : spcnode: xnode.spcnode,
1201 66768 : dbnode: xnode.dbnode,
1202 66768 : relnode: xnode.relnode,
1203 66768 : };
1204 66768 : if modification
1205 66768 : .tline
1206 66768 : .get_rel_exists(rel, Version::Modified(modification), true, ctx)
1207 LBC (1) : .await?
1208 : {
1209 CBC 17593 : self.put_rel_drop(modification, rel, ctx).await?;
1210 49175 : }
1211 : }
1212 : }
1213 1801604 : Ok(())
1214 1801604 : }
1215 :
1216 1 : async fn ingest_clog_truncate_record(
1217 1 : &mut self,
1218 1 : modification: &mut DatadirModification<'_>,
1219 1 : xlrec: &XlClogTruncate,
1220 1 : ctx: &RequestContext,
1221 1 : ) -> anyhow::Result<()> {
1222 1 : info!(
1223 1 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1224 1 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
1225 1 : );
1226 :
1227 : // Here we treat oldestXid and oldestXidDB
1228 : // differently from postgres redo routines.
1229 : // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
1230 : // until checkpoint happens and updates the value.
1231 : // Here we can use the most recent value.
1232 : // It's just an optimization, though and can be deleted.
1233 : // TODO Figure out if there will be any issues with replica.
1234 1 : self.checkpoint.oldestXid = xlrec.oldest_xid;
1235 1 : self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
1236 1 : self.checkpoint_modified = true;
1237 1 :
1238 1 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1239 1 :
1240 1 : let latest_page_number =
1241 1 : self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
1242 1 :
1243 1 : // Now delete all segments containing pages between xlrec.pageno
1244 1 : // and latest_page_number.
1245 1 :
1246 1 : // First, make an important safety check:
1247 1 : // the current endpoint page must not be eligible for removal.
1248 1 : // See SimpleLruTruncate() in slru.c
1249 1 : if clogpage_precedes(latest_page_number, xlrec.pageno) {
1250 UBC 0 : info!("could not truncate directory pg_xact apparent wraparound");
1251 0 : return Ok(());
1252 CBC 1 : }
1253 :
1254 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1255 : //
1256 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1257 : // will block waiting for the last valid LSN to advance up to
1258 : // it. So we use the previous record's LSN in the get calls
1259 : // instead.
1260 10 : for segno in modification
1261 1 : .tline
1262 1 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
1263 UBC 0 : .await?
1264 : {
1265 CBC 10 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1266 10 : if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
1267 9 : modification
1268 9 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1269 UBC 0 : .await?;
1270 0 : trace!("Drop CLOG segment {:>04X}", segno);
1271 CBC 1 : }
1272 : }
1273 :
1274 1 : Ok(())
1275 1 : }
1276 :
1277 24027 : fn ingest_multixact_create_record(
1278 24027 : &mut self,
1279 24027 : modification: &mut DatadirModification,
1280 24027 : xlrec: &XlMultiXactCreate,
1281 24027 : ) -> Result<()> {
1282 24027 : // Create WAL record for updating the multixact-offsets page
1283 24027 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1284 24027 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1285 24027 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1286 24027 :
1287 24027 : modification.put_slru_wal_record(
1288 24027 : SlruKind::MultiXactOffsets,
1289 24027 : segno,
1290 24027 : rpageno,
1291 24027 : NeonWalRecord::MultixactOffsetCreate {
1292 24027 : mid: xlrec.mid,
1293 24027 : moff: xlrec.moff,
1294 24027 : },
1295 24027 : )?;
1296 :
1297 : // Create WAL records for the update of each affected multixact-members page
1298 24027 : let mut members = xlrec.members.iter();
1299 24027 : let mut offset = xlrec.moff;
1300 : loop {
1301 48328 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1302 48328 :
1303 48328 : // How many members fit on this page?
1304 48328 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1305 48328 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1306 48328 :
1307 48328 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1308 48328 : for _ in 0..page_remain {
1309 520988 : if let Some(m) = members.next() {
1310 472948 : this_page_members.push(m.clone());
1311 472948 : } else {
1312 48040 : break;
1313 : }
1314 : }
1315 48328 : if this_page_members.is_empty() {
1316 : // all done
1317 24027 : break;
1318 24301 : }
1319 24301 : let n_this_page = this_page_members.len();
1320 24301 :
1321 24301 : modification.put_slru_wal_record(
1322 24301 : SlruKind::MultiXactMembers,
1323 24301 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1324 24301 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1325 24301 : NeonWalRecord::MultixactMembersCreate {
1326 24301 : moff: offset,
1327 24301 : members: this_page_members,
1328 24301 : },
1329 24301 : )?;
1330 :
1331 : // Note: The multixact members can wrap around, even within one WAL record.
1332 24301 : offset = offset.wrapping_add(n_this_page as u32);
1333 : }
1334 24027 : if xlrec.mid >= self.checkpoint.nextMulti {
1335 24027 : self.checkpoint.nextMulti = xlrec.mid + 1;
1336 24027 : self.checkpoint_modified = true;
1337 24027 : }
1338 24027 : if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
1339 24027 : self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
1340 24027 : self.checkpoint_modified = true;
1341 24027 : }
1342 472948 : let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
1343 472948 : if mbr.xid.wrapping_sub(acc) as i32 > 0 {
1344 472903 : mbr.xid
1345 : } else {
1346 45 : acc
1347 : }
1348 472948 : });
1349 24027 :
1350 24027 : if self.checkpoint.update_next_xid(max_mbr_xid) {
1351 UBC 0 : self.checkpoint_modified = true;
1352 CBC 24027 : }
1353 24027 : Ok(())
1354 24027 : }
1355 :
1356 UBC 0 : async fn ingest_multixact_truncate_record(
1357 0 : &mut self,
1358 0 : modification: &mut DatadirModification<'_>,
1359 0 : xlrec: &XlMultiXactTruncate,
1360 0 : ctx: &RequestContext,
1361 0 : ) -> Result<()> {
1362 0 : self.checkpoint.oldestMulti = xlrec.end_trunc_off;
1363 0 : self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
1364 0 : self.checkpoint_modified = true;
1365 0 :
1366 0 : // PerformMembersTruncation
1367 0 : let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
1368 0 : let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
1369 0 : let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
1370 0 : let mut segment: i32 = startsegment;
1371 :
1372 : // Delete all the segments except the last one. The last segment can still
1373 : // contain, possibly partially, valid data.
1374 0 : while segment != endsegment {
1375 0 : modification
1376 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1377 0 : .await?;
1378 :
1379 : /* move to next segment, handling wraparound correctly */
1380 0 : if segment == maxsegment {
1381 0 : segment = 0;
1382 0 : } else {
1383 0 : segment += 1;
1384 0 : }
1385 : }
1386 :
1387 : // Truncate offsets
1388 : // FIXME: this did not handle wraparound correctly
1389 :
1390 0 : Ok(())
1391 0 : }
1392 :
1393 CBC 54 : async fn ingest_relmap_page(
1394 54 : &mut self,
1395 54 : modification: &mut DatadirModification<'_>,
1396 54 : xlrec: &XlRelmapUpdate,
1397 54 : decoded: &DecodedWALRecord,
1398 54 : ctx: &RequestContext,
1399 54 : ) -> Result<()> {
1400 54 : let mut buf = decoded.record.clone();
1401 54 : buf.advance(decoded.main_data_offset);
1402 54 : // skip xl_relmap_update
1403 54 : buf.advance(12);
1404 54 :
1405 54 : modification
1406 54 : .put_relmap_file(
1407 54 : xlrec.tsid,
1408 54 : xlrec.dbid,
1409 54 : Bytes::copy_from_slice(&buf[..]),
1410 54 : ctx,
1411 54 : )
1412 2 : .await
1413 54 : }
1414 :
1415 21429 : async fn put_rel_creation(
1416 21429 : &mut self,
1417 21429 : modification: &mut DatadirModification<'_>,
1418 21429 : rel: RelTag,
1419 21429 : ctx: &RequestContext,
1420 21429 : ) -> Result<()> {
1421 21429 : modification.put_rel_creation(rel, 0, ctx).await?;
1422 21428 : Ok(())
1423 21429 : }
1424 :
1425 266196 : async fn put_rel_page_image(
1426 266196 : &mut self,
1427 266196 : modification: &mut DatadirModification<'_>,
1428 266196 : rel: RelTag,
1429 266196 : blknum: BlockNumber,
1430 266196 : img: Bytes,
1431 266196 : ctx: &RequestContext,
1432 266196 : ) -> Result<(), PageReconstructError> {
1433 266196 : self.handle_rel_extend(modification, rel, blknum, ctx)
1434 3439 : .await?;
1435 266196 : modification.put_rel_page_image(rel, blknum, img)?;
1436 266196 : Ok(())
1437 266196 : }
1438 :
1439 46962173 : async fn put_rel_wal_record(
1440 46962173 : &mut self,
1441 46962173 : modification: &mut DatadirModification<'_>,
1442 46962173 : rel: RelTag,
1443 46962173 : blknum: BlockNumber,
1444 46962173 : rec: NeonWalRecord,
1445 46962173 : ctx: &RequestContext,
1446 46962173 : ) -> Result<()> {
1447 46962173 : self.handle_rel_extend(modification, rel, blknum, ctx)
1448 5436 : .await?;
1449 46962169 : modification.put_rel_wal_record(rel, blknum, rec)?;
1450 46962169 : Ok(())
1451 46962173 : }
1452 :
1453 3092 : async fn put_rel_truncation(
1454 3092 : &mut self,
1455 3092 : modification: &mut DatadirModification<'_>,
1456 3092 : rel: RelTag,
1457 3092 : nblocks: BlockNumber,
1458 3092 : ctx: &RequestContext,
1459 3092 : ) -> anyhow::Result<()> {
1460 3092 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1461 3092 : Ok(())
1462 3092 : }
1463 :
1464 17594 : async fn put_rel_drop(
1465 17594 : &mut self,
1466 17594 : modification: &mut DatadirModification<'_>,
1467 17594 : rel: RelTag,
1468 17594 : ctx: &RequestContext,
1469 17594 : ) -> Result<()> {
1470 17594 : modification.put_rel_drop(rel, ctx).await?;
1471 17594 : Ok(())
1472 17594 : }
1473 :
1474 47228369 : async fn handle_rel_extend(
1475 47228369 : &mut self,
1476 47228369 : modification: &mut DatadirModification<'_>,
1477 47228369 : rel: RelTag,
1478 47228369 : blknum: BlockNumber,
1479 47228369 : ctx: &RequestContext,
1480 47228369 : ) -> Result<(), PageReconstructError> {
1481 47228369 : let new_nblocks = blknum + 1;
1482 : // Check if the relation exists. We implicitly create relations on first
1483 : // record.
1484 : // TODO: would be nice if to be more explicit about it
1485 :
1486 : // Get current size and put rel creation if rel doesn't exist
1487 : //
1488 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1489 : // check the cache too. This is because eagerly checking the cache results in
1490 : // less work overall and 10% better performance. It's more work on cache miss
1491 : // but cache miss is rare.
1492 47228369 : let old_nblocks = if let Some(nblocks) = modification
1493 47228369 : .tline
1494 47228369 : .get_cached_rel_size(&rel, modification.get_lsn())
1495 : {
1496 47225596 : nblocks
1497 2773 : } else if !modification
1498 2773 : .tline
1499 2773 : .get_rel_exists(rel, Version::Modified(modification), true, ctx)
1500 71 : .await?
1501 : {
1502 : // create it with 0 size initially, the logic below will extend it
1503 1959 : modification
1504 1959 : .put_rel_creation(rel, 0, ctx)
1505 200 : .await
1506 1959 : .context("Relation Error")?;
1507 1959 : 0
1508 : } else {
1509 814 : modification
1510 814 : .tline
1511 814 : .get_rel_size(rel, Version::Modified(modification), true, ctx)
1512 250 : .await?
1513 : };
1514 :
1515 47228369 : if new_nblocks > old_nblocks {
1516 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1517 963140 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1518 :
1519 963136 : let mut key = rel_block_to_key(rel, blknum);
1520 : // fill the gap with zeros
1521 963136 : for gap_blknum in old_nblocks..blknum {
1522 228405 : key.field6 = gap_blknum;
1523 228405 :
1524 228405 : if self.shard.get_shard_number(&key) != self.shard.number {
1525 UBC 0 : continue;
1526 CBC 228405 : }
1527 228405 :
1528 228405 : modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
1529 : }
1530 46265229 : }
1531 47228365 : Ok(())
1532 47228369 : }
1533 :
1534 664 : async fn put_slru_page_image(
1535 664 : &mut self,
1536 664 : modification: &mut DatadirModification<'_>,
1537 664 : kind: SlruKind,
1538 664 : segno: u32,
1539 664 : blknum: BlockNumber,
1540 664 : img: Bytes,
1541 664 : ctx: &RequestContext,
1542 664 : ) -> Result<()> {
1543 664 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1544 7 : .await?;
1545 664 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1546 664 : Ok(())
1547 664 : }
1548 :
1549 664 : async fn handle_slru_extend(
1550 664 : &mut self,
1551 664 : modification: &mut DatadirModification<'_>,
1552 664 : kind: SlruKind,
1553 664 : segno: u32,
1554 664 : blknum: BlockNumber,
1555 664 : ctx: &RequestContext,
1556 664 : ) -> anyhow::Result<()> {
1557 664 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1558 664 : // extended with ZEROPAGE records, not with commit records, so it happens
1559 664 : // a lot less frequently.
1560 664 :
1561 664 : let new_nblocks = blknum + 1;
1562 : // Check if the relation exists. We implicitly create relations on first
1563 : // record.
1564 : // TODO: would be nice if to be more explicit about it
1565 664 : let old_nblocks = if !modification
1566 664 : .tline
1567 664 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1568 7 : .await?
1569 : {
1570 : // create it with 0 size initially, the logic below will extend it
1571 18 : modification
1572 18 : .put_slru_segment_creation(kind, segno, 0, ctx)
1573 LBC (1) : .await?;
1574 CBC 18 : 0
1575 : } else {
1576 646 : modification
1577 646 : .tline
1578 646 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1579 UBC 0 : .await?
1580 : };
1581 :
1582 CBC 664 : if new_nblocks > old_nblocks {
1583 UBC 0 : trace!(
1584 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1585 0 : kind,
1586 0 : segno,
1587 0 : old_nblocks,
1588 0 : new_nblocks
1589 0 : );
1590 CBC 658 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1591 :
1592 : // fill the gap with zeros
1593 658 : for gap_blknum in old_nblocks..blknum {
1594 UBC 0 : modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
1595 : }
1596 CBC 6 : }
1597 664 : Ok(())
1598 664 : }
1599 : }
1600 :
1601 90362 : async fn get_relsize(
1602 90362 : modification: &DatadirModification<'_>,
1603 90362 : rel: RelTag,
1604 90362 : ctx: &RequestContext,
1605 90362 : ) -> anyhow::Result<BlockNumber> {
1606 90362 : let nblocks = if !modification
1607 90362 : .tline
1608 90362 : .get_rel_exists(rel, Version::Modified(modification), true, ctx)
1609 UBC 0 : .await?
1610 : {
1611 CBC 4 : 0
1612 : } else {
1613 90358 : modification
1614 90358 : .tline
1615 90358 : .get_rel_size(rel, Version::Modified(modification), true, ctx)
1616 7 : .await?
1617 : };
1618 90362 : Ok(nblocks)
1619 90362 : }
1620 :
1621 : #[allow(clippy::bool_assert_comparison)]
1622 : #[cfg(test)]
1623 : mod tests {
1624 : use super::*;
1625 : use crate::tenant::harness::*;
1626 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
1627 : use crate::tenant::Timeline;
1628 : use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
1629 : use postgres_ffi::RELSEG_SIZE;
1630 :
1631 : use crate::DEFAULT_PG_VERSION;
1632 :
1633 : /// Arbitrary relation tag, for testing.
1634 : const TESTREL_A: RelTag = RelTag {
1635 : spcnode: 0,
1636 : dbnode: 111,
1637 : relnode: 1000,
1638 : forknum: 0,
1639 : };
1640 :
1641 6 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1642 6 : // TODO
1643 6 : }
1644 :
1645 : static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
1646 :
1647 4 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1648 4 : let mut m = tline.begin_modification(Lsn(0x10));
1649 4 : m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1650 4 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1651 4 : m.commit(ctx).await?;
1652 4 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1653 :
1654 4 : Ok(walingest)
1655 4 : }
1656 :
1657 1 : #[tokio::test]
1658 1 : async fn test_relsize() -> Result<()> {
1659 1 : let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
1660 1 : let tline = tenant
1661 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1662 1 : .await?;
1663 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1664 :
1665 1 : let mut m = tline.begin_modification(Lsn(0x20));
1666 1 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1667 1 : walingest
1668 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1669 UBC 0 : .await?;
1670 CBC 1 : m.commit(&ctx).await?;
1671 1 : let mut m = tline.begin_modification(Lsn(0x30));
1672 1 : walingest
1673 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
1674 UBC 0 : .await?;
1675 CBC 1 : m.commit(&ctx).await?;
1676 1 : let mut m = tline.begin_modification(Lsn(0x40));
1677 1 : walingest
1678 1 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
1679 UBC 0 : .await?;
1680 CBC 1 : m.commit(&ctx).await?;
1681 1 : let mut m = tline.begin_modification(Lsn(0x50));
1682 1 : walingest
1683 1 : .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
1684 UBC 0 : .await?;
1685 CBC 1 : m.commit(&ctx).await?;
1686 :
1687 1 : assert_current_logical_size(&tline, Lsn(0x50));
1688 :
1689 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1690 1 : assert_eq!(
1691 1 : tline
1692 1 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
1693 UBC 0 : .await?,
1694 : false
1695 : );
1696 CBC 1 : assert!(tline
1697 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
1698 UBC 0 : .await
1699 CBC 1 : .is_err());
1700 1 : assert_eq!(
1701 1 : tline
1702 1 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1703 UBC 0 : .await?,
1704 : true
1705 : );
1706 CBC 1 : assert_eq!(
1707 1 : tline
1708 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1709 UBC 0 : .await?,
1710 : 1
1711 : );
1712 CBC 1 : assert_eq!(
1713 1 : tline
1714 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
1715 UBC 0 : .await?,
1716 : 3
1717 : );
1718 :
1719 : // Check page contents at each LSN
1720 CBC 1 : assert_eq!(
1721 1 : tline
1722 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
1723 UBC 0 : .await?,
1724 CBC 1 : TEST_IMG("foo blk 0 at 2")
1725 : );
1726 :
1727 1 : assert_eq!(
1728 1 : tline
1729 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
1730 UBC 0 : .await?,
1731 CBC 1 : TEST_IMG("foo blk 0 at 3")
1732 : );
1733 :
1734 1 : assert_eq!(
1735 1 : tline
1736 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
1737 UBC 0 : .await?,
1738 CBC 1 : TEST_IMG("foo blk 0 at 3")
1739 : );
1740 1 : assert_eq!(
1741 1 : tline
1742 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
1743 UBC 0 : .await?,
1744 CBC 1 : TEST_IMG("foo blk 1 at 4")
1745 : );
1746 :
1747 1 : assert_eq!(
1748 1 : tline
1749 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
1750 UBC 0 : .await?,
1751 CBC 1 : TEST_IMG("foo blk 0 at 3")
1752 : );
1753 1 : assert_eq!(
1754 1 : tline
1755 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
1756 UBC 0 : .await?,
1757 CBC 1 : TEST_IMG("foo blk 1 at 4")
1758 : );
1759 1 : assert_eq!(
1760 1 : tline
1761 1 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
1762 UBC 0 : .await?,
1763 CBC 1 : TEST_IMG("foo blk 2 at 5")
1764 : );
1765 :
1766 : // Truncate last block
1767 1 : let mut m = tline.begin_modification(Lsn(0x60));
1768 1 : walingest
1769 1 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1770 UBC 0 : .await?;
1771 CBC 1 : m.commit(&ctx).await?;
1772 1 : assert_current_logical_size(&tline, Lsn(0x60));
1773 :
1774 : // Check reported size and contents after truncation
1775 1 : assert_eq!(
1776 1 : tline
1777 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
1778 UBC 0 : .await?,
1779 : 2
1780 : );
1781 CBC 1 : assert_eq!(
1782 1 : tline
1783 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
1784 UBC 0 : .await?,
1785 CBC 1 : TEST_IMG("foo blk 0 at 3")
1786 : );
1787 1 : assert_eq!(
1788 1 : tline
1789 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
1790 UBC 0 : .await?,
1791 CBC 1 : TEST_IMG("foo blk 1 at 4")
1792 : );
1793 :
1794 : // should still see the truncated block with older LSN
1795 1 : assert_eq!(
1796 1 : tline
1797 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
1798 UBC 0 : .await?,
1799 : 3
1800 : );
1801 CBC 1 : assert_eq!(
1802 1 : tline
1803 1 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
1804 UBC 0 : .await?,
1805 CBC 1 : TEST_IMG("foo blk 2 at 5")
1806 : );
1807 :
1808 : // Truncate to zero length
1809 1 : let mut m = tline.begin_modification(Lsn(0x68));
1810 1 : walingest
1811 1 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1812 UBC 0 : .await?;
1813 CBC 1 : m.commit(&ctx).await?;
1814 1 : assert_eq!(
1815 1 : tline
1816 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
1817 UBC 0 : .await?,
1818 : 0
1819 : );
1820 :
1821 : // Extend from 0 to 2 blocks, leaving a gap
1822 CBC 1 : let mut m = tline.begin_modification(Lsn(0x70));
1823 1 : walingest
1824 1 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
1825 UBC 0 : .await?;
1826 CBC 1 : m.commit(&ctx).await?;
1827 1 : assert_eq!(
1828 1 : tline
1829 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
1830 UBC 0 : .await?,
1831 : 2
1832 : );
1833 CBC 1 : assert_eq!(
1834 1 : tline
1835 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
1836 UBC 0 : .await?,
1837 CBC 1 : ZERO_PAGE
1838 : );
1839 1 : assert_eq!(
1840 1 : tline
1841 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
1842 UBC 0 : .await?,
1843 CBC 1 : TEST_IMG("foo blk 1")
1844 : );
1845 :
1846 : // Extend a lot more, leaving a big gap that spans across segments
1847 1 : let mut m = tline.begin_modification(Lsn(0x80));
1848 1 : walingest
1849 1 : .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
1850 1 : .await?;
1851 11 : m.commit(&ctx).await?;
1852 1 : assert_eq!(
1853 1 : tline
1854 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
1855 UBC 0 : .await?,
1856 : 1501
1857 : );
1858 CBC 1499 : for blk in 2..1500 {
1859 1498 : assert_eq!(
1860 1498 : tline
1861 1498 : .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
1862 59 : .await?,
1863 1498 : ZERO_PAGE
1864 : );
1865 : }
1866 1 : assert_eq!(
1867 1 : tline
1868 1 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
1869 UBC 0 : .await?,
1870 CBC 1 : TEST_IMG("foo blk 1500")
1871 : );
1872 :
1873 1 : Ok(())
1874 : }
1875 :
1876 : // Test what happens if we dropped a relation
1877 : // and then created it again within the same layer.
1878 1 : #[tokio::test]
1879 1 : async fn test_drop_extend() -> Result<()> {
1880 1 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
1881 1 : let tline = tenant
1882 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1883 2 : .await?;
1884 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1885 :
1886 1 : let mut m = tline.begin_modification(Lsn(0x20));
1887 1 : walingest
1888 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1889 UBC 0 : .await?;
1890 CBC 1 : m.commit(&ctx).await?;
1891 :
1892 : // Check that rel exists and size is correct
1893 1 : assert_eq!(
1894 1 : tline
1895 1 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1896 UBC 0 : .await?,
1897 : true
1898 : );
1899 CBC 1 : assert_eq!(
1900 1 : tline
1901 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1902 UBC 0 : .await?,
1903 : 1
1904 : );
1905 :
1906 : // Drop rel
1907 CBC 1 : let mut m = tline.begin_modification(Lsn(0x30));
1908 1 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
1909 1 : m.commit(&ctx).await?;
1910 :
1911 : // Check that rel is not visible anymore
1912 1 : assert_eq!(
1913 1 : tline
1914 1 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
1915 UBC 0 : .await?,
1916 : false
1917 : );
1918 :
1919 : // FIXME: should fail
1920 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
1921 :
1922 : // Re-create it
1923 CBC 1 : let mut m = tline.begin_modification(Lsn(0x40));
1924 1 : walingest
1925 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
1926 UBC 0 : .await?;
1927 CBC 1 : m.commit(&ctx).await?;
1928 :
1929 : // Check that rel exists and size is correct
1930 1 : assert_eq!(
1931 1 : tline
1932 1 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
1933 UBC 0 : .await?,
1934 : true
1935 : );
1936 CBC 1 : assert_eq!(
1937 1 : tline
1938 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
1939 UBC 0 : .await?,
1940 : 1
1941 : );
1942 :
1943 CBC 1 : Ok(())
1944 : }
1945 :
1946 : // Test what happens if we truncated a relation
1947 : // so that one of its segments was dropped
1948 : // and then extended it again within the same layer.
1949 1 : #[tokio::test]
1950 1 : async fn test_truncate_extend() -> Result<()> {
1951 1 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
1952 1 : let tline = tenant
1953 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1954 1 : .await?;
1955 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1956 :
1957 : // Create a 20 MB relation (the size is arbitrary)
1958 1 : let relsize = 20 * 1024 * 1024 / 8192;
1959 1 : let mut m = tline.begin_modification(Lsn(0x20));
1960 2560 : for blkno in 0..relsize {
1961 2560 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
1962 2560 : walingest
1963 2560 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
1964 UBC 0 : .await?;
1965 : }
1966 CBC 1 : m.commit(&ctx).await?;
1967 :
1968 : // The relation was created at LSN 20, not visible at LSN 1 yet.
1969 1 : assert_eq!(
1970 1 : tline
1971 1 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
1972 UBC 0 : .await?,
1973 : false
1974 : );
1975 CBC 1 : assert!(tline
1976 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
1977 UBC 0 : .await
1978 CBC 1 : .is_err());
1979 :
1980 1 : assert_eq!(
1981 1 : tline
1982 1 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1983 UBC 0 : .await?,
1984 : true
1985 : );
1986 CBC 1 : assert_eq!(
1987 1 : tline
1988 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
1989 UBC 0 : .await?,
1990 : relsize
1991 : );
1992 :
1993 : // Check relation content
1994 CBC 2560 : for blkno in 0..relsize {
1995 2560 : let lsn = Lsn(0x20);
1996 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1997 2560 : assert_eq!(
1998 2560 : tline
1999 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
2000 100 : .await?,
2001 2560 : TEST_IMG(&data)
2002 : );
2003 : }
2004 :
2005 : // Truncate relation so that second segment was dropped
2006 : // - only leave one page
2007 1 : let mut m = tline.begin_modification(Lsn(0x60));
2008 1 : walingest
2009 1 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2010 UBC 0 : .await?;
2011 CBC 1 : m.commit(&ctx).await?;
2012 :
2013 : // Check reported size and contents after truncation
2014 1 : assert_eq!(
2015 1 : tline
2016 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
2017 UBC 0 : .await?,
2018 : 1
2019 : );
2020 :
2021 CBC 2 : for blkno in 0..1 {
2022 1 : let lsn = Lsn(0x20);
2023 1 : let data = format!("foo blk {} at {}", blkno, lsn);
2024 1 : assert_eq!(
2025 1 : tline
2026 1 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
2027 UBC 0 : .await?,
2028 CBC 1 : TEST_IMG(&data)
2029 : );
2030 : }
2031 :
2032 : // should still see all blocks with older LSN
2033 1 : assert_eq!(
2034 1 : tline
2035 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
2036 UBC 0 : .await?,
2037 : relsize
2038 : );
2039 CBC 2560 : for blkno in 0..relsize {
2040 2560 : let lsn = Lsn(0x20);
2041 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
2042 2560 : assert_eq!(
2043 2560 : tline
2044 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
2045 201 : .await?,
2046 2560 : TEST_IMG(&data)
2047 : );
2048 : }
2049 :
2050 : // Extend relation again.
2051 : // Add enough blocks to create second segment
2052 1 : let lsn = Lsn(0x80);
2053 1 : let mut m = tline.begin_modification(lsn);
2054 2560 : for blkno in 0..relsize {
2055 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
2056 2560 : walingest
2057 2560 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
2058 UBC 0 : .await?;
2059 : }
2060 CBC 1 : m.commit(&ctx).await?;
2061 :
2062 1 : assert_eq!(
2063 1 : tline
2064 1 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
2065 UBC 0 : .await?,
2066 : true
2067 : );
2068 CBC 1 : assert_eq!(
2069 1 : tline
2070 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
2071 UBC 0 : .await?,
2072 : relsize
2073 : );
2074 : // Check relation content
2075 CBC 2560 : for blkno in 0..relsize {
2076 2560 : let lsn = Lsn(0x80);
2077 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
2078 2560 : assert_eq!(
2079 2560 : tline
2080 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
2081 100 : .await?,
2082 2560 : TEST_IMG(&data)
2083 : );
2084 : }
2085 :
2086 1 : Ok(())
2087 : }
2088 :
2089 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2090 : /// split into multiple 1 GB segments in Postgres.
2091 1 : #[tokio::test]
2092 1 : async fn test_large_rel() -> Result<()> {
2093 1 : let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
2094 1 : let tline = tenant
2095 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2096 2 : .await?;
2097 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2098 :
2099 1 : let mut lsn = 0x10;
2100 131073 : for blknum in 0..RELSEG_SIZE + 1 {
2101 131073 : lsn += 0x10;
2102 131073 : let mut m = tline.begin_modification(Lsn(lsn));
2103 131073 : let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2104 131073 : walingest
2105 131073 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2106 3098 : .await?;
2107 131073 : m.commit(&ctx).await?;
2108 : }
2109 :
2110 1 : assert_current_logical_size(&tline, Lsn(lsn));
2111 :
2112 1 : assert_eq!(
2113 1 : tline
2114 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
2115 UBC 0 : .await?,
2116 CBC 1 : RELSEG_SIZE + 1
2117 : );
2118 :
2119 : // Truncate one block
2120 1 : lsn += 0x10;
2121 1 : let mut m = tline.begin_modification(Lsn(lsn));
2122 1 : walingest
2123 1 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2124 UBC 0 : .await?;
2125 CBC 1 : m.commit(&ctx).await?;
2126 1 : assert_eq!(
2127 1 : tline
2128 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
2129 UBC 0 : .await?,
2130 : RELSEG_SIZE
2131 : );
2132 CBC 1 : assert_current_logical_size(&tline, Lsn(lsn));
2133 1 :
2134 1 : // Truncate another block
2135 1 : lsn += 0x10;
2136 1 : let mut m = tline.begin_modification(Lsn(lsn));
2137 1 : walingest
2138 1 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2139 UBC 0 : .await?;
2140 CBC 1 : m.commit(&ctx).await?;
2141 1 : assert_eq!(
2142 1 : tline
2143 1 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
2144 UBC 0 : .await?,
2145 CBC 1 : RELSEG_SIZE - 1
2146 : );
2147 1 : assert_current_logical_size(&tline, Lsn(lsn));
2148 1 :
2149 1 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2150 1 : // This tests the behavior at segment boundaries
2151 1 : let mut size: i32 = 3000;
2152 3002 : while size >= 0 {
2153 3001 : lsn += 0x10;
2154 3001 : let mut m = tline.begin_modification(Lsn(lsn));
2155 3001 : walingest
2156 3001 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2157 72 : .await?;
2158 3001 : m.commit(&ctx).await?;
2159 3001 : assert_eq!(
2160 3001 : tline
2161 3001 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
2162 UBC 0 : .await?,
2163 CBC 3001 : size as BlockNumber
2164 : );
2165 :
2166 3001 : size -= 1;
2167 : }
2168 1 : assert_current_logical_size(&tline, Lsn(lsn));
2169 1 :
2170 1 : Ok(())
2171 : }
2172 :
2173 : /// Replay a wal segment file taken directly from safekeepers.
2174 : ///
2175 : /// This test is useful for benchmarking since it allows us to profile only
2176 : /// the walingest code in a single-threaded executor, and iterate more quickly
2177 : /// without waiting for unrelated steps.
2178 1 : #[tokio::test]
2179 1 : async fn test_ingest_real_wal() {
2180 1 : use crate::tenant::harness::*;
2181 1 : use postgres_ffi::waldecoder::WalStreamDecoder;
2182 1 : use postgres_ffi::WAL_SEGMENT_SIZE;
2183 1 :
2184 1 : // Define test data path and constants.
2185 1 : //
2186 1 : // Steps to reconstruct the data, if needed:
2187 1 : // 1. Run the pgbench python test
2188 1 : // 2. Take the first wal segment file from safekeeper
2189 1 : // 3. Compress it using `zstd --long input_file`
2190 1 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2191 1 : // 5. Grep sk logs for "restart decoder" to get startpoint
2192 1 : // 6. Run just the decoder from this test to get the endpoint.
2193 1 : // It's the last LSN the decoder will output.
2194 1 : let pg_version = 15; // The test data was generated by pg15
2195 1 : let path = "test_data/sk_wal_segment_from_pgbench";
2196 1 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2197 1 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2198 1 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2199 1 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2200 1 :
2201 1 : let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
2202 1 : let (tenant, ctx) = harness.load().await;
2203 :
2204 1 : let remote_initdb_path = remote_initdb_archive_path(&tenant.tenant_id(), &TIMELINE_ID);
2205 1 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2206 1 :
2207 1 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2208 1 : .expect("creating test dir should work");
2209 1 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2210 :
2211 : // Bootstrap a real timeline. We can't use create_test_timeline because
2212 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2213 : // the garbage data.
2214 1 : let tline = tenant
2215 1 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2216 10237 : .await
2217 1 : .unwrap();
2218 :
2219 : // We fully read and decompress this into memory before decoding
2220 : // to get a more accurate perf profile of the decoder.
2221 1 : let bytes = {
2222 : use async_compression::tokio::bufread::ZstdDecoder;
2223 1 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2224 1 : let reader = tokio::io::BufReader::new(file);
2225 1 : let decoder = ZstdDecoder::new(reader);
2226 1 : let mut reader = tokio::io::BufReader::new(decoder);
2227 1 : let mut buffer = Vec::new();
2228 112 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2229 1 : buffer
2230 1 : };
2231 1 :
2232 1 : // TODO start a profiler too
2233 1 : let started_at = std::time::Instant::now();
2234 1 :
2235 1 : // Initialize walingest
2236 1 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2237 1 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2238 1 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2239 UBC 0 : .await
2240 CBC 1 : .unwrap();
2241 1 : let mut modification = tline.begin_modification(startpoint);
2242 1 : let mut decoded = DecodedWALRecord::default();
2243 1 : println!("decoding {} bytes", bytes.len() - xlogoff);
2244 :
2245 : // Decode and ingest wal. We process the wal in chunks because
2246 : // that's what happens when we get bytes from safekeepers.
2247 237343 : for chunk in bytes[xlogoff..].chunks(50) {
2248 237343 : decoder.feed_bytes(chunk);
2249 310268 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2250 72925 : walingest
2251 72925 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
2252 56 : .await
2253 72925 : .unwrap();
2254 : }
2255 237343 : modification.commit(&ctx).await.unwrap();
2256 : }
2257 :
2258 1 : let duration = started_at.elapsed();
2259 1 : println!("done in {:?}", duration);
2260 : }
2261 : }
|