TLA Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
25 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
26 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
27 :
28 : use anyhow::{bail, Context, Result};
29 : use bytes::{Buf, Bytes, BytesMut};
30 : use tracing::*;
31 :
32 : use crate::context::RequestContext;
33 : use crate::pgdatadir_mapping::*;
34 : use crate::tenant::PageReconstructError;
35 : use crate::tenant::Timeline;
36 : use crate::walrecord::*;
37 : use crate::ZERO_PAGE;
38 : use pageserver_api::reltag::{RelTag, SlruKind};
39 : use postgres_ffi::pg_constants;
40 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
41 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
42 : use postgres_ffi::v14::xlog_utils::*;
43 : use postgres_ffi::v14::CheckPoint;
44 : use postgres_ffi::TransactionId;
45 : use postgres_ffi::BLCKSZ;
46 : use utils::lsn::Lsn;
47 :
48 : pub struct WalIngest<'a> {
49 : timeline: &'a Timeline,
50 :
51 : checkpoint: CheckPoint,
52 : checkpoint_modified: bool,
53 : }
54 :
55 : impl<'a> WalIngest<'a> {
56 CBC 1257 : pub async fn new(
57 1257 : timeline: &'a Timeline,
58 1257 : startpoint: Lsn,
59 1257 : ctx: &'_ RequestContext,
60 1257 : ) -> anyhow::Result<WalIngest<'a>> {
61 : // Fetch the latest checkpoint into memory, so that we can compare with it
62 : // quickly in `ingest_record` and update it when it changes.
63 1257 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
64 1248 : let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
65 UBC 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
66 :
67 CBC 1248 : Ok(WalIngest {
68 1248 : timeline,
69 1248 : checkpoint,
70 1248 : checkpoint_modified: false,
71 1248 : })
72 1257 : }
73 :
74 : ///
75 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
76 : ///
77 : /// This function updates `lsn` field of `DatadirModification`
78 : ///
79 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
80 : /// relations/pages that the record affects.
81 : ///
82 68588857 : pub async fn ingest_record(
83 68588857 : &mut self,
84 68588857 : recdata: Bytes,
85 68588857 : lsn: Lsn,
86 68588857 : modification: &mut DatadirModification<'_>,
87 68588857 : decoded: &mut DecodedWALRecord,
88 68588857 : ctx: &RequestContext,
89 68588857 : ) -> anyhow::Result<()> {
90 68588717 : modification.lsn = lsn;
91 68588717 : decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
92 :
93 68588717 : let mut buf = decoded.record.clone();
94 68588717 : buf.advance(decoded.main_data_offset);
95 68588717 :
96 68588717 : assert!(!self.checkpoint_modified);
97 68588717 : if self.checkpoint.update_next_xid(decoded.xl_xid) {
98 3622 : self.checkpoint_modified = true;
99 68585095 : }
100 :
101 : // Heap AM records need some special handling, because they modify VM pages
102 : // without registering them with the standard mechanism.
103 68588717 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID
104 15564608 : || decoded.xl_rmid == pg_constants::RM_HEAP2_ID
105 : {
106 55203024 : self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
107 16 : .await?;
108 13385693 : }
109 68588717 : if decoded.xl_rmid == pg_constants::RM_NEON_ID {
110 UBC 0 : self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
111 0 : .await?;
112 CBC 68588717 : }
113 : // Handle other special record types
114 68588717 : if decoded.xl_rmid == pg_constants::RM_SMGR_ID
115 21661 : && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
116 21661 : == pg_constants::XLOG_SMGR_CREATE
117 : {
118 21617 : let create = XlSmgrCreate::decode(&mut buf);
119 21617 : self.ingest_xlog_smgr_create(modification, &create, ctx)
120 1124 : .await?;
121 68567100 : } else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
122 44 : && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
123 44 : == pg_constants::XLOG_SMGR_TRUNCATE
124 : {
125 44 : let truncate = XlSmgrTruncate::decode(&mut buf);
126 44 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
127 2 : .await?;
128 68567056 : } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
129 UBC 0 : debug!(
130 0 : "handle RM_DBASE_ID for Postgres version {:?}",
131 0 : self.timeline.pg_version
132 0 : );
133 CBC 16 : if self.timeline.pg_version == 14 {
134 16 : if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
135 16 : == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
136 : {
137 13 : let createdb = XlCreateDatabase::decode(&mut buf);
138 UBC 0 : debug!("XLOG_DBASE_CREATE v14");
139 :
140 CBC 13 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
141 1430 : .await?;
142 3 : } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
143 3 : == postgres_ffi::v14::bindings::XLOG_DBASE_DROP
144 : {
145 3 : let dropdb = XlDropDatabase::decode(&mut buf);
146 6 : for tablespace_id in dropdb.tablespace_ids {
147 UBC 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
148 CBC 3 : modification
149 3 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
150 UBC 0 : .await?;
151 : }
152 0 : }
153 0 : } else if self.timeline.pg_version == 15 {
154 0 : if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
155 0 : == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
156 : {
157 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
158 0 : } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
159 0 : == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
160 : {
161 : // The XLOG record was renamed between v14 and v15,
162 : // but the record format is the same.
163 : // So we can reuse XlCreateDatabase here.
164 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
165 0 : let createdb = XlCreateDatabase::decode(&mut buf);
166 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
167 0 : .await?;
168 0 : } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
169 0 : == postgres_ffi::v15::bindings::XLOG_DBASE_DROP
170 : {
171 0 : let dropdb = XlDropDatabase::decode(&mut buf);
172 0 : for tablespace_id in dropdb.tablespace_ids {
173 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
174 0 : modification
175 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
176 0 : .await?;
177 : }
178 0 : }
179 0 : } else if self.timeline.pg_version == 16 {
180 0 : if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
181 0 : == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG
182 : {
183 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
184 0 : } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
185 0 : == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY
186 : {
187 : // The XLOG record was renamed between v14 and v15,
188 : // but the record format is the same.
189 : // So we can reuse XlCreateDatabase here.
190 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
191 0 : let createdb = XlCreateDatabase::decode(&mut buf);
192 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
193 0 : .await?;
194 0 : } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
195 0 : == postgres_ffi::v16::bindings::XLOG_DBASE_DROP
196 : {
197 0 : let dropdb = XlDropDatabase::decode(&mut buf);
198 0 : for tablespace_id in dropdb.tablespace_ids {
199 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
200 0 : modification
201 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
202 0 : .await?;
203 : }
204 0 : }
205 0 : }
206 CBC 68567040 : } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
207 UBC 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
208 CBC 68567036 : } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID {
209 363 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
210 363 : if info == pg_constants::CLOG_ZEROPAGE {
211 362 : let pageno = buf.get_u32_le();
212 362 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
213 362 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
214 362 : self.put_slru_page_image(
215 362 : modification,
216 362 : SlruKind::Clog,
217 362 : segno,
218 362 : rpageno,
219 362 : ZERO_PAGE.clone(),
220 362 : ctx,
221 362 : )
222 28 : .await?;
223 : } else {
224 1 : assert!(info == pg_constants::CLOG_TRUNCATE);
225 1 : let xlrec = XlClogTruncate::decode(&mut buf);
226 1 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
227 UBC 0 : .await?;
228 : }
229 CBC 68566673 : } else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
230 2059356 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
231 2059356 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
232 1950128 : let parsed_xact =
233 1950128 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
234 1950128 : self.ingest_xact_record(
235 1950128 : modification,
236 1950128 : &parsed_xact,
237 1950128 : info == pg_constants::XLOG_XACT_COMMIT,
238 1950128 : ctx,
239 1950128 : )
240 4063 : .await?;
241 109228 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
242 109227 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
243 : {
244 2 : let parsed_xact =
245 2 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
246 2 : self.ingest_xact_record(
247 2 : modification,
248 2 : &parsed_xact,
249 2 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
250 2 : ctx,
251 2 : )
252 UBC 0 : .await?;
253 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
254 0 : trace!(
255 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
256 0 : decoded.xl_xid,
257 0 : parsed_xact.xid,
258 0 : lsn,
259 0 : );
260 CBC 2 : modification
261 2 : .drop_twophase_file(parsed_xact.xid, ctx)
262 UBC 0 : .await?;
263 CBC 109226 : } else if info == pg_constants::XLOG_XACT_PREPARE {
264 4 : modification
265 4 : .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
266 LBC (1) : .await?;
267 CBC 109222 : }
268 66507317 : } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID {
269 24332 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
270 24332 :
271 24332 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
272 14 : let pageno = buf.get_u32_le();
273 14 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
274 14 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
275 14 : self.put_slru_page_image(
276 14 : modification,
277 14 : SlruKind::MultiXactOffsets,
278 14 : segno,
279 14 : rpageno,
280 14 : ZERO_PAGE.clone(),
281 14 : ctx,
282 14 : )
283 UBC 0 : .await?;
284 CBC 24318 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
285 291 : let pageno = buf.get_u32_le();
286 291 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
287 291 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
288 291 : self.put_slru_page_image(
289 291 : modification,
290 291 : SlruKind::MultiXactMembers,
291 291 : segno,
292 291 : rpageno,
293 291 : ZERO_PAGE.clone(),
294 291 : ctx,
295 291 : )
296 3 : .await?;
297 24027 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
298 24027 : let xlrec = XlMultiXactCreate::decode(&mut buf);
299 24027 : self.ingest_multixact_create_record(modification, &xlrec)?;
300 UBC 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
301 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
302 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
303 0 : .await?;
304 0 : }
305 CBC 66482985 : } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
306 54 : let xlrec = XlRelmapUpdate::decode(&mut buf);
307 54 : self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
308 5 : .await?;
309 66482931 : } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
310 133074 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
311 133074 : if info == pg_constants::XLOG_NEXTOID {
312 412 : let next_oid = buf.get_u32_le();
313 412 : if self.checkpoint.nextOid != next_oid {
314 412 : self.checkpoint.nextOid = next_oid;
315 412 : self.checkpoint_modified = true;
316 412 : }
317 132662 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
318 132602 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
319 : {
320 634 : let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
321 634 : buf.copy_to_slice(&mut checkpoint_bytes);
322 634 : let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
323 UBC 0 : trace!(
324 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
325 0 : xlog_checkpoint.oldestXid,
326 0 : self.checkpoint.oldestXid
327 0 : );
328 CBC 634 : if (self
329 634 : .checkpoint
330 634 : .oldestXid
331 634 : .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
332 634 : < 0
333 UBC 0 : {
334 0 : self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
335 0 : self.checkpoint_modified = true;
336 CBC 634 : }
337 132028 : }
338 66349857 : } else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID {
339 109 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
340 109 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
341 109 : let xlrec = XlLogicalMessage::decode(&mut buf);
342 109 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
343 109 : let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
344 109 : if prefix == "neon-test" {
345 : // This is a convenient way to make the WAL ingestion pause at
346 : // particular point in the WAL. For more fine-grained control,
347 : // we could peek into the message and only pause if it contains
348 : // a particular string, for example, but this is enough for now.
349 18 : crate::failpoint_support::sleep_millis_async!(
350 18 : "wal-ingest-logical-message-sleep"
351 18 : );
352 97 : } else if let Some(path) = prefix.strip_prefix("neon-file:") {
353 94 : modification.put_file(path, message, ctx).await?;
354 3 : }
355 UBC 0 : }
356 CBC 66349748 : }
357 :
358 : // Iterate through all the blocks that the record modifies, and
359 : // "put" a separate copy of the record for each block.
360 70056431 : for blk in decoded.blocks.iter() {
361 70056431 : self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
362 23659 : .await?;
363 : }
364 :
365 : // If checkpoint data was updated, store the new version in the repository
366 68588715 : if self.checkpoint_modified {
367 28042 : let new_checkpoint_bytes = self.checkpoint.encode()?;
368 :
369 28042 : modification.put_checkpoint(new_checkpoint_bytes)?;
370 28042 : self.checkpoint_modified = false;
371 68560673 : }
372 :
373 : // Now that this record has been fully handled, including updating the
374 : // checkpoint data, let the repository know that it is up-to-date to this LSN
375 68588715 : modification.commit(ctx).await?;
376 :
377 68588709 : Ok(())
378 68588709 : }
379 :
380 70056619 : async fn ingest_decoded_block(
381 70056619 : &mut self,
382 70056619 : modification: &mut DatadirModification<'_>,
383 70056619 : lsn: Lsn,
384 70056619 : decoded: &DecodedWALRecord,
385 70056619 : blk: &DecodedBkpBlock,
386 70056619 : ctx: &RequestContext,
387 70056619 : ) -> Result<(), PageReconstructError> {
388 70056433 : let rel = RelTag {
389 70056433 : spcnode: blk.rnode_spcnode,
390 70056433 : dbnode: blk.rnode_dbnode,
391 70056433 : relnode: blk.rnode_relnode,
392 70056433 : forknum: blk.forknum,
393 70056433 : };
394 70056433 :
395 70056433 : //
396 70056433 : // Instead of storing full-page-image WAL record,
397 70056433 : // it is better to store extracted image: we can skip wal-redo
398 70056433 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
399 70056433 : // so them have to be copied multiple times.
400 70056433 : //
401 70056433 : if blk.apply_image
402 172031 : && blk.has_image
403 172031 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
404 133267 : && (decoded.xl_info == pg_constants::XLOG_FPI
405 UBC 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
406 : // compression of WAL is not yet supported: fall back to storing the original WAL record
407 CBC 133267 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
408 : {
409 : // Extract page image from FPI record
410 133267 : let img_len = blk.bimg_len as usize;
411 133267 : let img_offs = blk.bimg_offset as usize;
412 133267 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
413 133267 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
414 133267 :
415 133267 : if blk.hole_length != 0 {
416 109519 : let tail = image.split_off(blk.hole_offset as usize);
417 109519 : image.resize(image.len() + blk.hole_length as usize, 0u8);
418 109519 : image.unsplit(tail);
419 109519 : }
420 : //
421 : // Match the logic of XLogReadBufferForRedoExtended:
422 : // The page may be uninitialized. If so, we can't set the LSN because
423 : // that would corrupt the page.
424 : //
425 133267 : if !page_is_new(&image) {
426 128411 : page_set_lsn(&mut image, lsn)
427 4856 : }
428 133267 : assert_eq!(image.len(), BLCKSZ as usize);
429 133267 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
430 3260 : .await?;
431 : } else {
432 69923166 : let rec = NeonWalRecord::Postgres {
433 69923166 : will_init: blk.will_init || blk.apply_image,
434 69923166 : rec: decoded.record.clone(),
435 69923166 : };
436 69923166 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
437 20399 : .await?;
438 : }
439 70056433 : Ok(())
440 70056433 : }
441 :
442 55203122 : async fn ingest_heapam_record(
443 55203122 : &mut self,
444 55203122 : buf: &mut Bytes,
445 55203122 : modification: &mut DatadirModification<'_>,
446 55203122 : decoded: &mut DecodedWALRecord,
447 55203122 : ctx: &RequestContext,
448 55203122 : ) -> anyhow::Result<()> {
449 55203025 : // Handle VM bit updates that are implicitly part of heap records.
450 55203025 :
451 55203025 : // First, look at the record to determine which VM bits need
452 55203025 : // to be cleared. If either of these variables is set, we
453 55203025 : // need to clear the corresponding bits in the visibility map.
454 55203025 : let mut new_heap_blkno: Option<u32> = None;
455 55203025 : let mut old_heap_blkno: Option<u32> = None;
456 55203025 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
457 55203025 :
458 55203025 : match self.timeline.pg_version {
459 : 14 => {
460 55203025 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
461 53024110 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
462 53024110 :
463 53024110 : if info == pg_constants::XLOG_HEAP_INSERT {
464 42667615 : let xlrec = v14::XlHeapInsert::decode(buf);
465 42667615 : assert_eq!(0, buf.remaining());
466 42667615 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
467 2586 : new_heap_blkno = Some(decoded.blocks[0].blkno);
468 42665029 : }
469 10356495 : } else if info == pg_constants::XLOG_HEAP_DELETE {
470 535608 : let xlrec = v14::XlHeapDelete::decode(buf);
471 535608 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
472 732 : new_heap_blkno = Some(decoded.blocks[0].blkno);
473 534876 : }
474 9820887 : } else if info == pg_constants::XLOG_HEAP_UPDATE
475 6341229 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
476 : {
477 5096400 : let xlrec = v14::XlHeapUpdate::decode(buf);
478 5096400 : // the size of tuple data is inferred from the size of the record.
479 5096400 : // we can't validate the remaining number of bytes without parsing
480 5096400 : // the tuple data.
481 5096400 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
482 113991 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
483 4982409 : }
484 5096400 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
485 9335 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
486 9335 : // non-HOT update where the new tuple goes to different page than
487 9335 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
488 9335 : // set.
489 9335 : new_heap_blkno = Some(decoded.blocks[0].blkno);
490 5087064 : }
491 4724487 : } else if info == pg_constants::XLOG_HEAP_LOCK {
492 4668326 : let xlrec = v14::XlHeapLock::decode(buf);
493 4668326 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
494 52 : old_heap_blkno = Some(decoded.blocks[0].blkno);
495 52 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
496 4668274 : }
497 56161 : }
498 2178915 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
499 2178915 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
500 2178915 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
501 710747 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
502 :
503 710747 : let offset_array_len =
504 710747 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
505 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
506 563808 : 0
507 : } else {
508 146939 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
509 : };
510 710747 : assert_eq!(offset_array_len, buf.remaining());
511 :
512 710747 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
513 1934 : new_heap_blkno = Some(decoded.blocks[0].blkno);
514 708813 : }
515 1468168 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
516 3898 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
517 3898 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
518 UBC 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
519 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
520 CBC 3898 : }
521 1464270 : }
522 : } else {
523 UBC 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
524 : }
525 : }
526 : 15 => {
527 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
528 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
529 0 :
530 0 : if info == pg_constants::XLOG_HEAP_INSERT {
531 0 : let xlrec = v15::XlHeapInsert::decode(buf);
532 0 : assert_eq!(0, buf.remaining());
533 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
534 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
535 0 : }
536 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
537 0 : let xlrec = v15::XlHeapDelete::decode(buf);
538 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
539 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
540 0 : }
541 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
542 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
543 : {
544 0 : let xlrec = v15::XlHeapUpdate::decode(buf);
545 0 : // the size of tuple data is inferred from the size of the record.
546 0 : // we can't validate the remaining number of bytes without parsing
547 0 : // the tuple data.
548 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
549 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
550 0 : }
551 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
552 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
553 0 : // non-HOT update where the new tuple goes to different page than
554 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
555 0 : // set.
556 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
557 0 : }
558 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
559 0 : let xlrec = v15::XlHeapLock::decode(buf);
560 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
561 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
562 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
563 0 : }
564 0 : }
565 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
566 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
567 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
568 0 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
569 :
570 0 : let offset_array_len =
571 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
572 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
573 0 : 0
574 : } else {
575 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
576 : };
577 0 : assert_eq!(offset_array_len, buf.remaining());
578 :
579 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
580 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
581 0 : }
582 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
583 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
584 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
585 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
586 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
587 0 : }
588 0 : }
589 : } else {
590 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
591 : }
592 : }
593 : 16 => {
594 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
595 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
596 0 :
597 0 : if info == pg_constants::XLOG_HEAP_INSERT {
598 0 : let xlrec = v16::XlHeapInsert::decode(buf);
599 0 : assert_eq!(0, buf.remaining());
600 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
601 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
602 0 : }
603 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
604 0 : let xlrec = v16::XlHeapDelete::decode(buf);
605 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
606 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
607 0 : }
608 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
609 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
610 : {
611 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
612 0 : // the size of tuple data is inferred from the size of the record.
613 0 : // we can't validate the remaining number of bytes without parsing
614 0 : // the tuple data.
615 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
616 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
617 0 : }
618 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
619 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
620 0 : // non-HOT update where the new tuple goes to different page than
621 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
622 0 : // set.
623 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
624 0 : }
625 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
626 0 : let xlrec = v16::XlHeapLock::decode(buf);
627 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
628 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
629 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
630 0 : }
631 0 : }
632 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
633 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
634 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
635 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
636 :
637 0 : let offset_array_len =
638 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
639 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
640 0 : 0
641 : } else {
642 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
643 : };
644 0 : assert_eq!(offset_array_len, buf.remaining());
645 :
646 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
647 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
648 0 : }
649 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
650 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
651 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
652 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
653 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
654 0 : }
655 0 : }
656 : } else {
657 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
658 : }
659 : }
660 0 : _ => {}
661 : }
662 :
663 : // Clear the VM bits if required.
664 CBC 55203024 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
665 128572 : let vm_rel = RelTag {
666 128572 : forknum: VISIBILITYMAP_FORKNUM,
667 128572 : spcnode: decoded.blocks[0].rnode_spcnode,
668 128572 : dbnode: decoded.blocks[0].rnode_dbnode,
669 128572 : relnode: decoded.blocks[0].rnode_relnode,
670 128572 : };
671 128572 :
672 128572 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
673 128572 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
674 :
675 : // Sometimes, Postgres seems to create heap WAL records with the
676 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
677 : // not set. In fact, it's possible that the VM page does not exist at all.
678 : // In that case, we don't want to store a record to clear the VM bit;
679 : // replaying it would fail to find the previous image of the page, because
680 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
681 : // record if it doesn't.
682 128572 : let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
683 128572 : if let Some(blknum) = new_vm_blk {
684 14587 : if blknum >= vm_size {
685 UBC 0 : new_vm_blk = None;
686 CBC 14587 : }
687 113985 : }
688 128572 : if let Some(blknum) = old_vm_blk {
689 114043 : if blknum >= vm_size {
690 UBC 0 : old_vm_blk = None;
691 CBC 114043 : }
692 14529 : }
693 :
694 128572 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
695 128572 : if new_vm_blk == old_vm_blk {
696 : // An UPDATE record that needs to clear the bits for both old and the
697 : // new page, both of which reside on the same VM page.
698 58 : self.put_rel_wal_record(
699 58 : modification,
700 58 : vm_rel,
701 58 : new_vm_blk.unwrap(),
702 58 : NeonWalRecord::ClearVisibilityMapFlags {
703 58 : new_heap_blkno,
704 58 : old_heap_blkno,
705 58 : flags,
706 58 : },
707 58 : ctx,
708 58 : )
709 UBC 0 : .await?;
710 : } else {
711 : // Clear VM bits for one heap page, or for two pages that reside on
712 : // different VM pages.
713 CBC 128514 : if let Some(new_vm_blk) = new_vm_blk {
714 14529 : self.put_rel_wal_record(
715 14529 : modification,
716 14529 : vm_rel,
717 14529 : new_vm_blk,
718 14529 : NeonWalRecord::ClearVisibilityMapFlags {
719 14529 : new_heap_blkno,
720 14529 : old_heap_blkno: None,
721 14529 : flags,
722 14529 : },
723 14529 : ctx,
724 14529 : )
725 UBC 0 : .await?;
726 CBC 113985 : }
727 128514 : if let Some(old_vm_blk) = old_vm_blk {
728 113985 : self.put_rel_wal_record(
729 113985 : modification,
730 113985 : vm_rel,
731 113985 : old_vm_blk,
732 113985 : NeonWalRecord::ClearVisibilityMapFlags {
733 113985 : new_heap_blkno: None,
734 113985 : old_heap_blkno,
735 113985 : flags,
736 113985 : },
737 113985 : ctx,
738 113985 : )
739 UBC 0 : .await?;
740 CBC 14529 : }
741 : }
742 UBC 0 : }
743 CBC 55074452 : }
744 :
745 55203024 : Ok(())
746 55203024 : }
747 :
748 UBC 0 : async fn ingest_neonrmgr_record(
749 0 : &mut self,
750 0 : buf: &mut Bytes,
751 0 : modification: &mut DatadirModification<'_>,
752 0 : decoded: &mut DecodedWALRecord,
753 0 : ctx: &RequestContext,
754 0 : ) -> anyhow::Result<()> {
755 0 : // Handle VM bit updates that are implicitly part of heap records.
756 0 :
757 0 : // First, look at the record to determine which VM bits need
758 0 : // to be cleared. If either of these variables is set, we
759 0 : // need to clear the corresponding bits in the visibility map.
760 0 : let mut new_heap_blkno: Option<u32> = None;
761 0 : let mut old_heap_blkno: Option<u32> = None;
762 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
763 0 :
764 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
765 :
766 0 : match self.timeline.pg_version {
767 : 16 => {
768 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
769 0 :
770 0 : match info {
771 : pg_constants::XLOG_NEON_HEAP_INSERT => {
772 0 : let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
773 0 : assert_eq!(0, buf.remaining());
774 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
775 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
776 0 : }
777 : }
778 : pg_constants::XLOG_NEON_HEAP_DELETE => {
779 0 : let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
780 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
781 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
782 0 : }
783 : }
784 : pg_constants::XLOG_NEON_HEAP_UPDATE
785 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
786 0 : let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
787 0 : // the size of tuple data is inferred from the size of the record.
788 0 : // we can't validate the remaining number of bytes without parsing
789 0 : // the tuple data.
790 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
791 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
792 0 : }
793 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
794 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
795 0 : // non-HOT update where the new tuple goes to different page than
796 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
797 0 : // set.
798 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
799 0 : }
800 : }
801 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
802 0 : let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
803 :
804 0 : let offset_array_len =
805 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
806 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
807 0 : 0
808 : } else {
809 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
810 : };
811 0 : assert_eq!(offset_array_len, buf.remaining());
812 :
813 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
814 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
815 0 : }
816 : }
817 : pg_constants::XLOG_NEON_HEAP_LOCK => {
818 0 : let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
819 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
820 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
821 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
822 0 : }
823 : }
824 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
825 : }
826 : }
827 0 : _ => bail!(
828 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
829 0 : self.timeline.pg_version
830 0 : ),
831 : }
832 :
833 : // Clear the VM bits if required.
834 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
835 0 : let vm_rel = RelTag {
836 0 : forknum: VISIBILITYMAP_FORKNUM,
837 0 : spcnode: decoded.blocks[0].rnode_spcnode,
838 0 : dbnode: decoded.blocks[0].rnode_dbnode,
839 0 : relnode: decoded.blocks[0].rnode_relnode,
840 0 : };
841 0 :
842 0 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
843 0 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
844 :
845 : // Sometimes, Postgres seems to create heap WAL records with the
846 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
847 : // not set. In fact, it's possible that the VM page does not exist at all.
848 : // In that case, we don't want to store a record to clear the VM bit;
849 : // replaying it would fail to find the previous image of the page, because
850 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
851 : // record if it doesn't.
852 0 : let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
853 0 : if let Some(blknum) = new_vm_blk {
854 0 : if blknum >= vm_size {
855 0 : new_vm_blk = None;
856 0 : }
857 0 : }
858 0 : if let Some(blknum) = old_vm_blk {
859 0 : if blknum >= vm_size {
860 0 : old_vm_blk = None;
861 0 : }
862 0 : }
863 :
864 0 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
865 0 : if new_vm_blk == old_vm_blk {
866 : // An UPDATE record that needs to clear the bits for both old and the
867 : // new page, both of which reside on the same VM page.
868 0 : self.put_rel_wal_record(
869 0 : modification,
870 0 : vm_rel,
871 0 : new_vm_blk.unwrap(),
872 0 : NeonWalRecord::ClearVisibilityMapFlags {
873 0 : new_heap_blkno,
874 0 : old_heap_blkno,
875 0 : flags,
876 0 : },
877 0 : ctx,
878 0 : )
879 0 : .await?;
880 : } else {
881 : // Clear VM bits for one heap page, or for two pages that reside on
882 : // different VM pages.
883 0 : if let Some(new_vm_blk) = new_vm_blk {
884 0 : self.put_rel_wal_record(
885 0 : modification,
886 0 : vm_rel,
887 0 : new_vm_blk,
888 0 : NeonWalRecord::ClearVisibilityMapFlags {
889 0 : new_heap_blkno,
890 0 : old_heap_blkno: None,
891 0 : flags,
892 0 : },
893 0 : ctx,
894 0 : )
895 0 : .await?;
896 0 : }
897 0 : if let Some(old_vm_blk) = old_vm_blk {
898 0 : self.put_rel_wal_record(
899 0 : modification,
900 0 : vm_rel,
901 0 : old_vm_blk,
902 0 : NeonWalRecord::ClearVisibilityMapFlags {
903 0 : new_heap_blkno: None,
904 0 : old_heap_blkno,
905 0 : flags,
906 0 : },
907 0 : ctx,
908 0 : )
909 0 : .await?;
910 0 : }
911 : }
912 0 : }
913 0 : }
914 :
915 0 : Ok(())
916 0 : }
917 :
918 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
919 CBC 13 : async fn ingest_xlog_dbase_create(
920 13 : &mut self,
921 13 : modification: &mut DatadirModification<'_>,
922 13 : rec: &XlCreateDatabase,
923 13 : ctx: &RequestContext,
924 13 : ) -> anyhow::Result<()> {
925 13 : let db_id = rec.db_id;
926 13 : let tablespace_id = rec.tablespace_id;
927 13 : let src_db_id = rec.src_db_id;
928 13 : let src_tablespace_id = rec.src_tablespace_id;
929 13 :
930 13 : // Creating a database is implemented by copying the template (aka. source) database.
931 13 : // To copy all the relations, we need to ask for the state as of the same LSN, but we
932 13 : // cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for
933 13 : // the last valid LSN to advance up to it. So we use the previous record's LSN in the
934 13 : // get calls instead.
935 13 : let req_lsn = modification.tline.get_last_record_lsn();
936 :
937 13 : let rels = modification
938 13 : .tline
939 13 : .list_rels(src_tablespace_id, src_db_id, req_lsn, ctx)
940 UBC 0 : .await?;
941 :
942 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
943 :
944 : // Copy relfilemap
945 CBC 13 : let filemap = modification
946 13 : .tline
947 13 : .get_relmap_file(src_tablespace_id, src_db_id, req_lsn, ctx)
948 UBC 0 : .await?;
949 CBC 13 : modification
950 13 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
951 UBC 0 : .await?;
952 :
953 CBC 13 : let mut num_rels_copied = 0;
954 13 : let mut num_blocks_copied = 0;
955 3809 : for src_rel in rels {
956 3796 : assert_eq!(src_rel.spcnode, src_tablespace_id);
957 3796 : assert_eq!(src_rel.dbnode, src_db_id);
958 :
959 3796 : let nblocks = modification
960 3796 : .tline
961 3796 : .get_rel_size(src_rel, req_lsn, true, ctx)
962 296 : .await?;
963 3796 : let dst_rel = RelTag {
964 3796 : spcnode: tablespace_id,
965 3796 : dbnode: db_id,
966 3796 : relnode: src_rel.relnode,
967 3796 : forknum: src_rel.forknum,
968 3796 : };
969 3796 :
970 3796 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
971 :
972 : // Copy content
973 UBC 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
974 CBC 13130 : for blknum in 0..nblocks {
975 UBC 0 : debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
976 :
977 CBC 13130 : let content = modification
978 13130 : .tline
979 13130 : .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true, ctx)
980 1134 : .await?;
981 13130 : modification.put_rel_page_image(dst_rel, blknum, content)?;
982 13130 : num_blocks_copied += 1;
983 : }
984 :
985 3796 : num_rels_copied += 1;
986 : }
987 :
988 13 : info!(
989 13 : "Created database {}/{}, copied {} blocks in {} rels",
990 13 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
991 13 : );
992 13 : Ok(())
993 13 : }
994 :
995 21617 : async fn ingest_xlog_smgr_create(
996 21617 : &mut self,
997 21617 : modification: &mut DatadirModification<'_>,
998 21617 : rec: &XlSmgrCreate,
999 21617 : ctx: &RequestContext,
1000 21617 : ) -> anyhow::Result<()> {
1001 21617 : let rel = RelTag {
1002 21617 : spcnode: rec.rnode.spcnode,
1003 21617 : dbnode: rec.rnode.dbnode,
1004 21617 : relnode: rec.rnode.relnode,
1005 21617 : forknum: rec.forknum,
1006 21617 : };
1007 21617 : self.put_rel_creation(modification, rel, ctx).await?;
1008 21617 : Ok(())
1009 21617 : }
1010 :
1011 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1012 : ///
1013 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1014 44 : async fn ingest_xlog_smgr_truncate(
1015 44 : &mut self,
1016 44 : modification: &mut DatadirModification<'_>,
1017 44 : rec: &XlSmgrTruncate,
1018 44 : ctx: &RequestContext,
1019 44 : ) -> anyhow::Result<()> {
1020 44 : let spcnode = rec.rnode.spcnode;
1021 44 : let dbnode = rec.rnode.dbnode;
1022 44 : let relnode = rec.rnode.relnode;
1023 44 :
1024 44 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
1025 44 : let rel = RelTag {
1026 44 : spcnode,
1027 44 : dbnode,
1028 44 : relnode,
1029 44 : forknum: MAIN_FORKNUM,
1030 44 : };
1031 44 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
1032 1 : .await?;
1033 UBC 0 : }
1034 CBC 44 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
1035 44 : let rel = RelTag {
1036 44 : spcnode,
1037 44 : dbnode,
1038 44 : relnode,
1039 44 : forknum: FSM_FORKNUM,
1040 44 : };
1041 44 :
1042 44 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1043 44 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1044 44 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1045 : // Tail of last remaining FSM page has to be zeroed.
1046 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1047 22 : modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
1048 22 : fsm_physical_page_no += 1;
1049 22 : }
1050 44 : let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
1051 44 : if nblocks > fsm_physical_page_no {
1052 : // check if something to do: FSM is larger than truncate position
1053 22 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1054 LBC (2) : .await?;
1055 CBC 22 : }
1056 UBC 0 : }
1057 CBC 44 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
1058 44 : let rel = RelTag {
1059 44 : spcnode,
1060 44 : dbnode,
1061 44 : relnode,
1062 44 : forknum: VISIBILITYMAP_FORKNUM,
1063 44 : };
1064 44 :
1065 44 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1066 44 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1067 : // Tail of last remaining vm page has to be zeroed.
1068 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1069 22 : modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
1070 22 : vm_page_no += 1;
1071 22 : }
1072 44 : let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
1073 44 : if nblocks > vm_page_no {
1074 : // check if something to do: VM is larger than truncate position
1075 22 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1076 1 : .await?;
1077 22 : }
1078 UBC 0 : }
1079 CBC 44 : Ok(())
1080 44 : }
1081 :
1082 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1083 : ///
1084 1950130 : async fn ingest_xact_record(
1085 1950130 : &mut self,
1086 1950130 : modification: &mut DatadirModification<'_>,
1087 1950130 : parsed: &XlXactParsedRecord,
1088 1950130 : is_commit: bool,
1089 1950130 : ctx: &RequestContext,
1090 1950130 : ) -> anyhow::Result<()> {
1091 1950130 : // Record update of CLOG pages
1092 1950130 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1093 1950130 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1094 1950130 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1095 1950130 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1096 :
1097 2150179 : for subxact in &parsed.subxacts {
1098 200049 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1099 200049 : if subxact_pageno != pageno {
1100 : // This subxact goes to different page. Write the record
1101 : // for all the XIDs on the previous page, and continue
1102 : // accumulating XIDs on this new page.
1103 6 : modification.put_slru_wal_record(
1104 6 : SlruKind::Clog,
1105 6 : segno,
1106 6 : rpageno,
1107 6 : if is_commit {
1108 6 : NeonWalRecord::ClogSetCommitted {
1109 6 : xids: page_xids,
1110 6 : timestamp: parsed.xact_time,
1111 6 : }
1112 : } else {
1113 UBC 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1114 : },
1115 0 : )?;
1116 CBC 6 : page_xids = Vec::new();
1117 200043 : }
1118 200049 : pageno = subxact_pageno;
1119 200049 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1120 200049 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1121 200049 : page_xids.push(*subxact);
1122 : }
1123 1950130 : modification.put_slru_wal_record(
1124 1950130 : SlruKind::Clog,
1125 1950130 : segno,
1126 1950130 : rpageno,
1127 1950130 : if is_commit {
1128 1948337 : NeonWalRecord::ClogSetCommitted {
1129 1948337 : xids: page_xids,
1130 1948337 : timestamp: parsed.xact_time,
1131 1948337 : }
1132 : } else {
1133 1793 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1134 : },
1135 UBC 0 : )?;
1136 :
1137 CBC 1966894 : for xnode in &parsed.xnodes {
1138 83820 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1139 67056 : let rel = RelTag {
1140 67056 : forknum,
1141 67056 : spcnode: xnode.spcnode,
1142 67056 : dbnode: xnode.dbnode,
1143 67056 : relnode: xnode.relnode,
1144 67056 : };
1145 67056 : let last_lsn = self.timeline.get_last_record_lsn();
1146 67056 : if modification
1147 67056 : .tline
1148 67056 : .get_rel_exists(rel, last_lsn, true, ctx)
1149 2719 : .await?
1150 : {
1151 17672 : self.put_rel_drop(modification, rel, ctx).await?;
1152 49384 : }
1153 : }
1154 : }
1155 1950130 : Ok(())
1156 1950130 : }
1157 :
1158 1 : async fn ingest_clog_truncate_record(
1159 1 : &mut self,
1160 1 : modification: &mut DatadirModification<'_>,
1161 1 : xlrec: &XlClogTruncate,
1162 1 : ctx: &RequestContext,
1163 1 : ) -> anyhow::Result<()> {
1164 1 : info!(
1165 1 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1166 1 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
1167 1 : );
1168 :
1169 : // Here we treat oldestXid and oldestXidDB
1170 : // differently from postgres redo routines.
1171 : // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
1172 : // until checkpoint happens and updates the value.
1173 : // Here we can use the most recent value.
1174 : // It's just an optimization, though and can be deleted.
1175 : // TODO Figure out if there will be any issues with replica.
1176 1 : self.checkpoint.oldestXid = xlrec.oldest_xid;
1177 1 : self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
1178 1 : self.checkpoint_modified = true;
1179 1 :
1180 1 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1181 1 :
1182 1 : let latest_page_number =
1183 1 : self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
1184 1 :
1185 1 : // Now delete all segments containing pages between xlrec.pageno
1186 1 : // and latest_page_number.
1187 1 :
1188 1 : // First, make an important safety check:
1189 1 : // the current endpoint page must not be eligible for removal.
1190 1 : // See SimpleLruTruncate() in slru.c
1191 1 : if clogpage_precedes(latest_page_number, xlrec.pageno) {
1192 UBC 0 : info!("could not truncate directory pg_xact apparent wraparound");
1193 0 : return Ok(());
1194 CBC 1 : }
1195 1 :
1196 1 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1197 1 : //
1198 1 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1199 1 : // will block waiting for the last valid LSN to advance up to
1200 1 : // it. So we use the previous record's LSN in the get calls
1201 1 : // instead.
1202 1 : let req_lsn = modification.tline.get_last_record_lsn();
1203 10 : for segno in modification
1204 1 : .tline
1205 1 : .list_slru_segments(SlruKind::Clog, req_lsn, ctx)
1206 UBC 0 : .await?
1207 : {
1208 CBC 10 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1209 10 : if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
1210 9 : modification
1211 9 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1212 UBC 0 : .await?;
1213 0 : trace!("Drop CLOG segment {:>04X}", segno);
1214 CBC 1 : }
1215 : }
1216 :
1217 1 : Ok(())
1218 1 : }
1219 :
1220 24027 : fn ingest_multixact_create_record(
1221 24027 : &mut self,
1222 24027 : modification: &mut DatadirModification,
1223 24027 : xlrec: &XlMultiXactCreate,
1224 24027 : ) -> Result<()> {
1225 24027 : // Create WAL record for updating the multixact-offsets page
1226 24027 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1227 24027 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1228 24027 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1229 24027 :
1230 24027 : modification.put_slru_wal_record(
1231 24027 : SlruKind::MultiXactOffsets,
1232 24027 : segno,
1233 24027 : rpageno,
1234 24027 : NeonWalRecord::MultixactOffsetCreate {
1235 24027 : mid: xlrec.mid,
1236 24027 : moff: xlrec.moff,
1237 24027 : },
1238 24027 : )?;
1239 :
1240 : // Create WAL records for the update of each affected multixact-members page
1241 24027 : let mut members = xlrec.members.iter();
1242 24027 : let mut offset = xlrec.moff;
1243 : loop {
1244 48328 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1245 48328 :
1246 48328 : // How many members fit on this page?
1247 48328 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1248 48328 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1249 48328 :
1250 48328 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1251 48328 : for _ in 0..page_remain {
1252 520988 : if let Some(m) = members.next() {
1253 472948 : this_page_members.push(m.clone());
1254 472948 : } else {
1255 48040 : break;
1256 : }
1257 : }
1258 48328 : if this_page_members.is_empty() {
1259 : // all done
1260 24027 : break;
1261 24301 : }
1262 24301 : let n_this_page = this_page_members.len();
1263 24301 :
1264 24301 : modification.put_slru_wal_record(
1265 24301 : SlruKind::MultiXactMembers,
1266 24301 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1267 24301 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1268 24301 : NeonWalRecord::MultixactMembersCreate {
1269 24301 : moff: offset,
1270 24301 : members: this_page_members,
1271 24301 : },
1272 24301 : )?;
1273 :
1274 : // Note: The multixact members can wrap around, even within one WAL record.
1275 24301 : offset = offset.wrapping_add(n_this_page as u32);
1276 : }
1277 24027 : if xlrec.mid >= self.checkpoint.nextMulti {
1278 24027 : self.checkpoint.nextMulti = xlrec.mid + 1;
1279 24027 : self.checkpoint_modified = true;
1280 24027 : }
1281 24027 : if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
1282 24027 : self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
1283 24027 : self.checkpoint_modified = true;
1284 24027 : }
1285 472948 : let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
1286 472948 : if mbr.xid.wrapping_sub(acc) as i32 > 0 {
1287 472903 : mbr.xid
1288 : } else {
1289 45 : acc
1290 : }
1291 472948 : });
1292 24027 :
1293 24027 : if self.checkpoint.update_next_xid(max_mbr_xid) {
1294 UBC 0 : self.checkpoint_modified = true;
1295 CBC 24027 : }
1296 24027 : Ok(())
1297 24027 : }
1298 :
1299 UBC 0 : async fn ingest_multixact_truncate_record(
1300 0 : &mut self,
1301 0 : modification: &mut DatadirModification<'_>,
1302 0 : xlrec: &XlMultiXactTruncate,
1303 0 : ctx: &RequestContext,
1304 0 : ) -> Result<()> {
1305 0 : self.checkpoint.oldestMulti = xlrec.end_trunc_off;
1306 0 : self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
1307 0 : self.checkpoint_modified = true;
1308 0 :
1309 0 : // PerformMembersTruncation
1310 0 : let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
1311 0 : let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
1312 0 : let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
1313 0 : let mut segment: i32 = startsegment;
1314 :
1315 : // Delete all the segments except the last one. The last segment can still
1316 : // contain, possibly partially, valid data.
1317 0 : while segment != endsegment {
1318 0 : modification
1319 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1320 0 : .await?;
1321 :
1322 : /* move to next segment, handling wraparound correctly */
1323 0 : if segment == maxsegment {
1324 0 : segment = 0;
1325 0 : } else {
1326 0 : segment += 1;
1327 0 : }
1328 : }
1329 :
1330 : // Truncate offsets
1331 : // FIXME: this did not handle wraparound correctly
1332 :
1333 0 : Ok(())
1334 0 : }
1335 :
1336 CBC 54 : async fn ingest_relmap_page(
1337 54 : &mut self,
1338 54 : modification: &mut DatadirModification<'_>,
1339 54 : xlrec: &XlRelmapUpdate,
1340 54 : decoded: &DecodedWALRecord,
1341 54 : ctx: &RequestContext,
1342 54 : ) -> Result<()> {
1343 54 : let mut buf = decoded.record.clone();
1344 54 : buf.advance(decoded.main_data_offset);
1345 54 : // skip xl_relmap_update
1346 54 : buf.advance(12);
1347 54 :
1348 54 : modification
1349 54 : .put_relmap_file(
1350 54 : xlrec.tsid,
1351 54 : xlrec.dbid,
1352 54 : Bytes::copy_from_slice(&buf[..]),
1353 54 : ctx,
1354 54 : )
1355 5 : .await
1356 54 : }
1357 :
1358 21618 : async fn put_rel_creation(
1359 21618 : &mut self,
1360 21618 : modification: &mut DatadirModification<'_>,
1361 21618 : rel: RelTag,
1362 21618 : ctx: &RequestContext,
1363 21618 : ) -> Result<()> {
1364 21618 : modification.put_rel_creation(rel, 0, ctx).await?;
1365 21618 : Ok(())
1366 21618 : }
1367 :
1368 269468 : async fn put_rel_page_image(
1369 269468 : &mut self,
1370 269468 : modification: &mut DatadirModification<'_>,
1371 269468 : rel: RelTag,
1372 269468 : blknum: BlockNumber,
1373 269468 : img: Bytes,
1374 269468 : ctx: &RequestContext,
1375 269468 : ) -> Result<(), PageReconstructError> {
1376 269468 : self.handle_rel_extend(modification, rel, blknum, ctx)
1377 6219 : .await?;
1378 269468 : modification.put_rel_page_image(rel, blknum, img)?;
1379 269468 : Ok(())
1380 269468 : }
1381 :
1382 70051924 : async fn put_rel_wal_record(
1383 70051924 : &mut self,
1384 70051924 : modification: &mut DatadirModification<'_>,
1385 70051924 : rel: RelTag,
1386 70051924 : blknum: BlockNumber,
1387 70051924 : rec: NeonWalRecord,
1388 70051924 : ctx: &RequestContext,
1389 70051924 : ) -> Result<()> {
1390 70051738 : self.handle_rel_extend(modification, rel, blknum, ctx)
1391 20399 : .await?;
1392 70051738 : modification.put_rel_wal_record(rel, blknum, rec)?;
1393 70051738 : Ok(())
1394 70051738 : }
1395 :
1396 3094 : async fn put_rel_truncation(
1397 3094 : &mut self,
1398 3094 : modification: &mut DatadirModification<'_>,
1399 3094 : rel: RelTag,
1400 3094 : nblocks: BlockNumber,
1401 3094 : ctx: &RequestContext,
1402 3094 : ) -> anyhow::Result<()> {
1403 3094 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1404 3094 : Ok(())
1405 3094 : }
1406 :
1407 17673 : async fn put_rel_drop(
1408 17673 : &mut self,
1409 17673 : modification: &mut DatadirModification<'_>,
1410 17673 : rel: RelTag,
1411 17673 : ctx: &RequestContext,
1412 17673 : ) -> Result<()> {
1413 17673 : modification.put_rel_drop(rel, ctx).await?;
1414 17673 : Ok(())
1415 17673 : }
1416 :
1417 128660 : async fn get_relsize(
1418 128660 : &mut self,
1419 128660 : rel: RelTag,
1420 128660 : lsn: Lsn,
1421 128660 : ctx: &RequestContext,
1422 128660 : ) -> anyhow::Result<BlockNumber> {
1423 128660 : let nblocks = if !self.timeline.get_rel_exists(rel, lsn, true, ctx).await? {
1424 LBC (4) : 0
1425 : } else {
1426 CBC 128660 : self.timeline.get_rel_size(rel, lsn, true, ctx).await?
1427 : };
1428 128660 : Ok(nblocks)
1429 128660 : }
1430 :
1431 70321392 : async fn handle_rel_extend(
1432 70321392 : &mut self,
1433 70321392 : modification: &mut DatadirModification<'_>,
1434 70321392 : rel: RelTag,
1435 70321392 : blknum: BlockNumber,
1436 70321392 : ctx: &RequestContext,
1437 70321392 : ) -> Result<(), PageReconstructError> {
1438 70321206 : let new_nblocks = blknum + 1;
1439 70321206 : // Check if the relation exists. We implicitly create relations on first
1440 70321206 : // record.
1441 70321206 : // TODO: would be nice if to be more explicit about it
1442 70321206 : let last_lsn = modification.lsn;
1443 70321206 : let old_nblocks = if !self
1444 70321206 : .timeline
1445 70321206 : .get_rel_exists(rel, last_lsn, true, ctx)
1446 82 : .await?
1447 : {
1448 : // create it with 0 size initially, the logic below will extend it
1449 2057 : modification
1450 2057 : .put_rel_creation(rel, 0, ctx)
1451 368 : .await
1452 2057 : .context("Relation Error")?;
1453 2057 : 0
1454 : } else {
1455 70319149 : self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?
1456 : };
1457 :
1458 70321206 : if new_nblocks > old_nblocks {
1459 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1460 1026813 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1461 :
1462 : // fill the gap with zeros
1463 1026813 : for gap_blknum in old_nblocks..blknum {
1464 232812 : modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
1465 : }
1466 69294393 : }
1467 70321206 : Ok(())
1468 70321206 : }
1469 :
1470 667 : async fn put_slru_page_image(
1471 667 : &mut self,
1472 667 : modification: &mut DatadirModification<'_>,
1473 667 : kind: SlruKind,
1474 667 : segno: u32,
1475 667 : blknum: BlockNumber,
1476 667 : img: Bytes,
1477 667 : ctx: &RequestContext,
1478 667 : ) -> Result<()> {
1479 667 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1480 31 : .await?;
1481 667 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1482 667 : Ok(())
1483 667 : }
1484 :
1485 667 : async fn handle_slru_extend(
1486 667 : &mut self,
1487 667 : modification: &mut DatadirModification<'_>,
1488 667 : kind: SlruKind,
1489 667 : segno: u32,
1490 667 : blknum: BlockNumber,
1491 667 : ctx: &RequestContext,
1492 667 : ) -> anyhow::Result<()> {
1493 667 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1494 667 : // extended with ZEROPAGE records, not with commit records, so it happens
1495 667 : // a lot less frequently.
1496 667 :
1497 667 : let new_nblocks = blknum + 1;
1498 667 : // Check if the relation exists. We implicitly create relations on first
1499 667 : // record.
1500 667 : // TODO: would be nice if to be more explicit about it
1501 667 : let last_lsn = self.timeline.get_last_record_lsn();
1502 667 : let old_nblocks = if !self
1503 667 : .timeline
1504 667 : .get_slru_segment_exists(kind, segno, last_lsn, ctx)
1505 17 : .await?
1506 : {
1507 : // create it with 0 size initially, the logic below will extend it
1508 18 : modification
1509 18 : .put_slru_segment_creation(kind, segno, 0, ctx)
1510 UBC 0 : .await?;
1511 CBC 18 : 0
1512 : } else {
1513 649 : self.timeline
1514 649 : .get_slru_segment_size(kind, segno, last_lsn, ctx)
1515 14 : .await?
1516 : };
1517 :
1518 667 : if new_nblocks > old_nblocks {
1519 UBC 0 : trace!(
1520 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1521 0 : kind,
1522 0 : segno,
1523 0 : old_nblocks,
1524 0 : new_nblocks
1525 0 : );
1526 CBC 661 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1527 :
1528 : // fill the gap with zeros
1529 661 : for gap_blknum in old_nblocks..blknum {
1530 UBC 0 : modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
1531 : }
1532 CBC 6 : }
1533 667 : Ok(())
1534 667 : }
1535 : }
1536 :
1537 : #[allow(clippy::bool_assert_comparison)]
1538 : #[cfg(test)]
1539 : mod tests {
1540 : use super::*;
1541 : use crate::tenant::harness::*;
1542 : use crate::tenant::Timeline;
1543 : use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
1544 : use postgres_ffi::RELSEG_SIZE;
1545 :
1546 : use crate::DEFAULT_PG_VERSION;
1547 :
1548 : /// Arbitrary relation tag, for testing.
1549 : const TESTREL_A: RelTag = RelTag {
1550 : spcnode: 0,
1551 : dbnode: 111,
1552 : relnode: 1000,
1553 : forknum: 0,
1554 : };
1555 :
1556 6 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1557 6 : // TODO
1558 6 : }
1559 :
1560 : static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
1561 :
1562 4 : async fn init_walingest_test<'a>(
1563 4 : tline: &'a Timeline,
1564 4 : ctx: &RequestContext,
1565 4 : ) -> Result<WalIngest<'a>> {
1566 4 : let mut m = tline.begin_modification(Lsn(0x10));
1567 4 : m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1568 4 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1569 4 : m.commit(ctx).await?;
1570 4 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1571 :
1572 4 : Ok(walingest)
1573 4 : }
1574 :
1575 1 : #[tokio::test]
1576 1 : async fn test_relsize() -> Result<()> {
1577 1 : let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
1578 1 : let tline = tenant
1579 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1580 2 : .await?;
1581 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1582 :
1583 1 : let mut m = tline.begin_modification(Lsn(0x20));
1584 1 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1585 1 : walingest
1586 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1587 UBC 0 : .await?;
1588 CBC 1 : m.commit(&ctx).await?;
1589 1 : let mut m = tline.begin_modification(Lsn(0x30));
1590 1 : walingest
1591 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
1592 UBC 0 : .await?;
1593 CBC 1 : m.commit(&ctx).await?;
1594 1 : let mut m = tline.begin_modification(Lsn(0x40));
1595 1 : walingest
1596 1 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
1597 UBC 0 : .await?;
1598 CBC 1 : m.commit(&ctx).await?;
1599 1 : let mut m = tline.begin_modification(Lsn(0x50));
1600 1 : walingest
1601 1 : .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
1602 UBC 0 : .await?;
1603 CBC 1 : m.commit(&ctx).await?;
1604 :
1605 1 : assert_current_logical_size(&tline, Lsn(0x50));
1606 :
1607 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1608 1 : assert_eq!(
1609 1 : tline
1610 1 : .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
1611 UBC 0 : .await?,
1612 : false
1613 : );
1614 CBC 1 : assert!(tline
1615 1 : .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
1616 UBC 0 : .await
1617 CBC 1 : .is_err());
1618 1 : assert_eq!(
1619 1 : tline
1620 1 : .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
1621 UBC 0 : .await?,
1622 : true
1623 : );
1624 CBC 1 : assert_eq!(
1625 1 : tline
1626 1 : .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
1627 UBC 0 : .await?,
1628 : 1
1629 : );
1630 CBC 1 : assert_eq!(
1631 1 : tline
1632 1 : .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
1633 UBC 0 : .await?,
1634 : 3
1635 : );
1636 :
1637 : // Check page contents at each LSN
1638 CBC 1 : assert_eq!(
1639 1 : tline
1640 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false, &ctx)
1641 UBC 0 : .await?,
1642 CBC 1 : TEST_IMG("foo blk 0 at 2")
1643 : );
1644 :
1645 1 : assert_eq!(
1646 1 : tline
1647 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false, &ctx)
1648 UBC 0 : .await?,
1649 CBC 1 : TEST_IMG("foo blk 0 at 3")
1650 : );
1651 :
1652 1 : assert_eq!(
1653 1 : tline
1654 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false, &ctx)
1655 UBC 0 : .await?,
1656 CBC 1 : TEST_IMG("foo blk 0 at 3")
1657 : );
1658 1 : assert_eq!(
1659 1 : tline
1660 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false, &ctx)
1661 UBC 0 : .await?,
1662 CBC 1 : TEST_IMG("foo blk 1 at 4")
1663 : );
1664 :
1665 1 : assert_eq!(
1666 1 : tline
1667 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false, &ctx)
1668 UBC 0 : .await?,
1669 CBC 1 : TEST_IMG("foo blk 0 at 3")
1670 : );
1671 1 : assert_eq!(
1672 1 : tline
1673 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false, &ctx)
1674 UBC 0 : .await?,
1675 CBC 1 : TEST_IMG("foo blk 1 at 4")
1676 : );
1677 1 : assert_eq!(
1678 1 : tline
1679 1 : .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
1680 UBC 0 : .await?,
1681 CBC 1 : TEST_IMG("foo blk 2 at 5")
1682 : );
1683 :
1684 : // Truncate last block
1685 1 : let mut m = tline.begin_modification(Lsn(0x60));
1686 1 : walingest
1687 1 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1688 UBC 0 : .await?;
1689 CBC 1 : m.commit(&ctx).await?;
1690 1 : assert_current_logical_size(&tline, Lsn(0x60));
1691 :
1692 : // Check reported size and contents after truncation
1693 1 : assert_eq!(
1694 1 : tline
1695 1 : .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
1696 UBC 0 : .await?,
1697 : 2
1698 : );
1699 CBC 1 : assert_eq!(
1700 1 : tline
1701 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false, &ctx)
1702 UBC 0 : .await?,
1703 CBC 1 : TEST_IMG("foo blk 0 at 3")
1704 : );
1705 1 : assert_eq!(
1706 1 : tline
1707 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false, &ctx)
1708 UBC 0 : .await?,
1709 CBC 1 : TEST_IMG("foo blk 1 at 4")
1710 : );
1711 :
1712 : // should still see the truncated block with older LSN
1713 1 : assert_eq!(
1714 1 : tline
1715 1 : .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
1716 UBC 0 : .await?,
1717 : 3
1718 : );
1719 CBC 1 : assert_eq!(
1720 1 : tline
1721 1 : .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
1722 UBC 0 : .await?,
1723 CBC 1 : TEST_IMG("foo blk 2 at 5")
1724 : );
1725 :
1726 : // Truncate to zero length
1727 1 : let mut m = tline.begin_modification(Lsn(0x68));
1728 1 : walingest
1729 1 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1730 UBC 0 : .await?;
1731 CBC 1 : m.commit(&ctx).await?;
1732 1 : assert_eq!(
1733 1 : tline
1734 1 : .get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
1735 UBC 0 : .await?,
1736 : 0
1737 : );
1738 :
1739 : // Extend from 0 to 2 blocks, leaving a gap
1740 CBC 1 : let mut m = tline.begin_modification(Lsn(0x70));
1741 1 : walingest
1742 1 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
1743 1 : .await?;
1744 1 : m.commit(&ctx).await?;
1745 1 : assert_eq!(
1746 1 : tline
1747 1 : .get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
1748 UBC 0 : .await?,
1749 : 2
1750 : );
1751 CBC 1 : assert_eq!(
1752 1 : tline
1753 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false, &ctx)
1754 UBC 0 : .await?,
1755 CBC 1 : ZERO_PAGE
1756 : );
1757 1 : assert_eq!(
1758 1 : tline
1759 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false, &ctx)
1760 UBC 0 : .await?,
1761 CBC 1 : TEST_IMG("foo blk 1")
1762 : );
1763 :
1764 : // Extend a lot more, leaving a big gap that spans across segments
1765 1 : let mut m = tline.begin_modification(Lsn(0x80));
1766 1 : walingest
1767 1 : .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
1768 UBC 0 : .await?;
1769 CBC 35 : m.commit(&ctx).await?;
1770 1 : assert_eq!(
1771 1 : tline
1772 1 : .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
1773 UBC 0 : .await?,
1774 : 1501
1775 : );
1776 CBC 1499 : for blk in 2..1500 {
1777 1498 : assert_eq!(
1778 1498 : tline
1779 1498 : .get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false, &ctx)
1780 59 : .await?,
1781 1498 : ZERO_PAGE
1782 : );
1783 : }
1784 1 : assert_eq!(
1785 1 : tline
1786 1 : .get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false, &ctx)
1787 UBC 0 : .await?,
1788 CBC 1 : TEST_IMG("foo blk 1500")
1789 : );
1790 :
1791 1 : Ok(())
1792 : }
1793 :
1794 : // Test what happens if we dropped a relation
1795 : // and then created it again within the same layer.
1796 1 : #[tokio::test]
1797 1 : async fn test_drop_extend() -> Result<()> {
1798 1 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
1799 1 : let tline = tenant
1800 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1801 2 : .await?;
1802 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1803 :
1804 1 : let mut m = tline.begin_modification(Lsn(0x20));
1805 1 : walingest
1806 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1807 UBC 0 : .await?;
1808 CBC 1 : m.commit(&ctx).await?;
1809 :
1810 : // Check that rel exists and size is correct
1811 1 : assert_eq!(
1812 1 : tline
1813 1 : .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
1814 UBC 0 : .await?,
1815 : true
1816 : );
1817 CBC 1 : assert_eq!(
1818 1 : tline
1819 1 : .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
1820 UBC 0 : .await?,
1821 : 1
1822 : );
1823 :
1824 : // Drop rel
1825 CBC 1 : let mut m = tline.begin_modification(Lsn(0x30));
1826 1 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
1827 1 : m.commit(&ctx).await?;
1828 :
1829 : // Check that rel is not visible anymore
1830 1 : assert_eq!(
1831 1 : tline
1832 1 : .get_rel_exists(TESTREL_A, Lsn(0x30), false, &ctx)
1833 UBC 0 : .await?,
1834 : false
1835 : );
1836 :
1837 : // FIXME: should fail
1838 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
1839 :
1840 : // Re-create it
1841 CBC 1 : let mut m = tline.begin_modification(Lsn(0x40));
1842 1 : walingest
1843 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
1844 UBC 0 : .await?;
1845 CBC 1 : m.commit(&ctx).await?;
1846 :
1847 : // Check that rel exists and size is correct
1848 1 : assert_eq!(
1849 1 : tline
1850 1 : .get_rel_exists(TESTREL_A, Lsn(0x40), false, &ctx)
1851 UBC 0 : .await?,
1852 : true
1853 : );
1854 CBC 1 : assert_eq!(
1855 1 : tline
1856 1 : .get_rel_size(TESTREL_A, Lsn(0x40), false, &ctx)
1857 UBC 0 : .await?,
1858 : 1
1859 : );
1860 :
1861 CBC 1 : Ok(())
1862 : }
1863 :
1864 : // Test what happens if we truncated a relation
1865 : // so that one of its segments was dropped
1866 : // and then extended it again within the same layer.
1867 1 : #[tokio::test]
1868 1 : async fn test_truncate_extend() -> Result<()> {
1869 1 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
1870 1 : let tline = tenant
1871 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1872 2 : .await?;
1873 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1874 :
1875 : // Create a 20 MB relation (the size is arbitrary)
1876 1 : let relsize = 20 * 1024 * 1024 / 8192;
1877 1 : let mut m = tline.begin_modification(Lsn(0x20));
1878 2560 : for blkno in 0..relsize {
1879 2560 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
1880 2560 : walingest
1881 2560 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
1882 UBC 0 : .await?;
1883 : }
1884 CBC 40 : m.commit(&ctx).await?;
1885 :
1886 : // The relation was created at LSN 20, not visible at LSN 1 yet.
1887 1 : assert_eq!(
1888 1 : tline
1889 1 : .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
1890 UBC 0 : .await?,
1891 : false
1892 : );
1893 CBC 1 : assert!(tline
1894 1 : .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
1895 UBC 0 : .await
1896 CBC 1 : .is_err());
1897 :
1898 1 : assert_eq!(
1899 1 : tline
1900 1 : .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
1901 UBC 0 : .await?,
1902 : true
1903 : );
1904 CBC 1 : assert_eq!(
1905 1 : tline
1906 1 : .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
1907 UBC 0 : .await?,
1908 : relsize
1909 : );
1910 :
1911 : // Check relation content
1912 CBC 2560 : for blkno in 0..relsize {
1913 2560 : let lsn = Lsn(0x20);
1914 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1915 2560 : assert_eq!(
1916 2560 : tline
1917 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false, &ctx)
1918 92 : .await?,
1919 2560 : TEST_IMG(&data)
1920 : );
1921 : }
1922 :
1923 : // Truncate relation so that second segment was dropped
1924 : // - only leave one page
1925 1 : let mut m = tline.begin_modification(Lsn(0x60));
1926 1 : walingest
1927 1 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
1928 UBC 0 : .await?;
1929 CBC 1 : m.commit(&ctx).await?;
1930 :
1931 : // Check reported size and contents after truncation
1932 1 : assert_eq!(
1933 1 : tline
1934 1 : .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
1935 UBC 0 : .await?,
1936 : 1
1937 : );
1938 :
1939 CBC 2 : for blkno in 0..1 {
1940 1 : let lsn = Lsn(0x20);
1941 1 : let data = format!("foo blk {} at {}", blkno, lsn);
1942 1 : assert_eq!(
1943 1 : tline
1944 1 : .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false, &ctx)
1945 UBC 0 : .await?,
1946 CBC 1 : TEST_IMG(&data)
1947 : );
1948 : }
1949 :
1950 : // should still see all blocks with older LSN
1951 1 : assert_eq!(
1952 1 : tline
1953 1 : .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
1954 UBC 0 : .await?,
1955 : relsize
1956 : );
1957 CBC 2560 : for blkno in 0..relsize {
1958 2560 : let lsn = Lsn(0x20);
1959 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1960 2560 : assert_eq!(
1961 2560 : tline
1962 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false, &ctx)
1963 195 : .await?,
1964 2560 : TEST_IMG(&data)
1965 : );
1966 : }
1967 :
1968 : // Extend relation again.
1969 : // Add enough blocks to create second segment
1970 1 : let lsn = Lsn(0x80);
1971 1 : let mut m = tline.begin_modification(lsn);
1972 2560 : for blkno in 0..relsize {
1973 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1974 2560 : walingest
1975 2560 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
1976 UBC 0 : .await?;
1977 : }
1978 CBC 40 : m.commit(&ctx).await?;
1979 :
1980 1 : assert_eq!(
1981 1 : tline
1982 1 : .get_rel_exists(TESTREL_A, Lsn(0x80), false, &ctx)
1983 UBC 0 : .await?,
1984 : true
1985 : );
1986 CBC 1 : assert_eq!(
1987 1 : tline
1988 1 : .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
1989 UBC 0 : .await?,
1990 : relsize
1991 : );
1992 : // Check relation content
1993 CBC 2560 : for blkno in 0..relsize {
1994 2560 : let lsn = Lsn(0x80);
1995 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1996 2560 : assert_eq!(
1997 2560 : tline
1998 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false, &ctx)
1999 98 : .await?,
2000 2560 : TEST_IMG(&data)
2001 : );
2002 : }
2003 :
2004 1 : Ok(())
2005 : }
2006 :
2007 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2008 : /// split into multiple 1 GB segments in Postgres.
2009 1 : #[tokio::test]
2010 1 : async fn test_large_rel() -> Result<()> {
2011 1 : let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
2012 1 : let tline = tenant
2013 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2014 2 : .await?;
2015 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2016 :
2017 1 : let mut lsn = 0x10;
2018 131073 : for blknum in 0..RELSEG_SIZE + 1 {
2019 131073 : lsn += 0x10;
2020 131073 : let mut m = tline.begin_modification(Lsn(lsn));
2021 131073 : let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2022 131073 : walingest
2023 131073 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2024 2958 : .await?;
2025 131073 : m.commit(&ctx).await?;
2026 : }
2027 :
2028 1 : assert_current_logical_size(&tline, Lsn(lsn));
2029 :
2030 1 : assert_eq!(
2031 1 : tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
2032 1 : RELSEG_SIZE + 1
2033 : );
2034 :
2035 : // Truncate one block
2036 1 : lsn += 0x10;
2037 1 : let mut m = tline.begin_modification(Lsn(lsn));
2038 1 : walingest
2039 1 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2040 UBC 0 : .await?;
2041 CBC 1 : m.commit(&ctx).await?;
2042 1 : assert_eq!(
2043 1 : tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
2044 : RELSEG_SIZE
2045 : );
2046 1 : assert_current_logical_size(&tline, Lsn(lsn));
2047 1 :
2048 1 : // Truncate another block
2049 1 : lsn += 0x10;
2050 1 : let mut m = tline.begin_modification(Lsn(lsn));
2051 1 : walingest
2052 1 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2053 UBC 0 : .await?;
2054 CBC 1 : m.commit(&ctx).await?;
2055 1 : assert_eq!(
2056 1 : tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
2057 1 : RELSEG_SIZE - 1
2058 : );
2059 1 : assert_current_logical_size(&tline, Lsn(lsn));
2060 1 :
2061 1 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2062 1 : // This tests the behavior at segment boundaries
2063 1 : let mut size: i32 = 3000;
2064 3002 : while size >= 0 {
2065 3001 : lsn += 0x10;
2066 3001 : let mut m = tline.begin_modification(Lsn(lsn));
2067 3001 : walingest
2068 3001 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2069 72 : .await?;
2070 3001 : m.commit(&ctx).await?;
2071 3001 : assert_eq!(
2072 3001 : tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
2073 3001 : size as BlockNumber
2074 : );
2075 :
2076 3001 : size -= 1;
2077 : }
2078 1 : assert_current_logical_size(&tline, Lsn(lsn));
2079 1 :
2080 1 : Ok(())
2081 : }
2082 : }
|