Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
25 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
26 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
27 :
28 : use anyhow::{Context, Result};
29 : use bytes::{Buf, Bytes, BytesMut};
30 : use tracing::*;
31 :
32 : use crate::context::RequestContext;
33 : use crate::pgdatadir_mapping::*;
34 : use crate::tenant::PageReconstructError;
35 : use crate::tenant::Timeline;
36 : use crate::walrecord::*;
37 : use crate::ZERO_PAGE;
38 : use pageserver_api::reltag::{RelTag, SlruKind};
39 : use postgres_ffi::pg_constants;
40 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
41 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
42 : use postgres_ffi::v14::xlog_utils::*;
43 : use postgres_ffi::v14::CheckPoint;
44 : use postgres_ffi::TransactionId;
45 : use postgres_ffi::BLCKSZ;
46 : use utils::lsn::Lsn;
47 :
48 : pub struct WalIngest<'a> {
49 : timeline: &'a Timeline,
50 :
51 : checkpoint: CheckPoint,
52 : checkpoint_modified: bool,
53 : }
54 :
55 : impl<'a> WalIngest<'a> {
56 1399 : pub async fn new(
57 1399 : timeline: &'a Timeline,
58 1399 : startpoint: Lsn,
59 1399 : ctx: &'_ RequestContext,
60 1399 : ) -> anyhow::Result<WalIngest<'a>> {
61 : // Fetch the latest checkpoint into memory, so that we can compare with it
62 : // quickly in `ingest_record` and update it when it changes.
63 1399 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
64 1389 : let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
65 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
66 :
67 1389 : Ok(WalIngest {
68 1389 : timeline,
69 1389 : checkpoint,
70 1389 : checkpoint_modified: false,
71 1389 : })
72 1399 : }
73 :
74 : ///
75 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
76 : ///
77 : /// This function updates `lsn` field of `DatadirModification`
78 : ///
79 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
80 : /// relations/pages that the record affects.
81 : ///
82 73526651 : pub async fn ingest_record(
83 73526651 : &mut self,
84 73526651 : recdata: Bytes,
85 73526651 : lsn: Lsn,
86 73526651 : modification: &mut DatadirModification<'_>,
87 73526651 : decoded: &mut DecodedWALRecord,
88 73526651 : ctx: &RequestContext,
89 73526768 : ) -> anyhow::Result<()> {
90 73526768 : modification.lsn = lsn;
91 73526768 : decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
92 :
93 73526768 : let mut buf = decoded.record.clone();
94 73526768 : buf.advance(decoded.main_data_offset);
95 73526768 :
96 73526768 : assert!(!self.checkpoint_modified);
97 73526768 : if self.checkpoint.update_next_xid(decoded.xl_xid) {
98 3936 : self.checkpoint_modified = true;
99 73522832 : }
100 :
101 : // Heap AM records need some special handling, because they modify VM pages
102 : // without registering them with the standard mechanism.
103 73526768 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID
104 15667578 : || decoded.xl_rmid == pg_constants::RM_HEAP2_ID
105 : {
106 59861282 : self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
107 21 : .await?;
108 13665486 : }
109 : // Handle other special record types
110 73526768 : if decoded.xl_rmid == pg_constants::RM_SMGR_ID
111 21995 : && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
112 21995 : == pg_constants::XLOG_SMGR_CREATE
113 : {
114 21950 : let create = XlSmgrCreate::decode(&mut buf);
115 21950 : self.ingest_xlog_smgr_create(modification, &create, ctx)
116 1011 : .await?;
117 73504818 : } else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
118 45 : && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
119 45 : == pg_constants::XLOG_SMGR_TRUNCATE
120 : {
121 45 : let truncate = XlSmgrTruncate::decode(&mut buf);
122 45 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
123 0 : .await?;
124 73504773 : } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
125 0 : debug!(
126 0 : "handle RM_DBASE_ID for Postgres version {:?}",
127 0 : self.timeline.pg_version
128 0 : );
129 14 : if self.timeline.pg_version == 14 {
130 14 : if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
131 14 : == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
132 : {
133 12 : let createdb = XlCreateDatabase::decode(&mut buf);
134 0 : debug!("XLOG_DBASE_CREATE v14");
135 :
136 12 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
137 816 : .await?;
138 2 : } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
139 2 : == postgres_ffi::v14::bindings::XLOG_DBASE_DROP
140 : {
141 2 : let dropdb = XlDropDatabase::decode(&mut buf);
142 4 : for tablespace_id in dropdb.tablespace_ids {
143 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
144 2 : modification
145 2 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
146 0 : .await?;
147 : }
148 0 : }
149 0 : } else if self.timeline.pg_version == 15 {
150 0 : if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
151 0 : == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
152 : {
153 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
154 0 : } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
155 0 : == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
156 : {
157 : // The XLOG record was renamed between v14 and v15,
158 : // but the record format is the same.
159 : // So we can reuse XlCreateDatabase here.
160 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
161 0 : let createdb = XlCreateDatabase::decode(&mut buf);
162 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
163 0 : .await?;
164 0 : } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
165 0 : == postgres_ffi::v15::bindings::XLOG_DBASE_DROP
166 : {
167 0 : let dropdb = XlDropDatabase::decode(&mut buf);
168 0 : for tablespace_id in dropdb.tablespace_ids {
169 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
170 0 : modification
171 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
172 0 : .await?;
173 : }
174 0 : }
175 0 : }
176 73504759 : } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
177 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
178 73504755 : } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID {
179 370 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
180 370 : if info == pg_constants::CLOG_ZEROPAGE {
181 369 : let pageno = buf.get_u32_le();
182 369 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
183 369 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
184 369 : self.put_slru_page_image(
185 369 : modification,
186 369 : SlruKind::Clog,
187 369 : segno,
188 369 : rpageno,
189 369 : ZERO_PAGE.clone(),
190 369 : ctx,
191 369 : )
192 24 : .await?;
193 : } else {
194 1 : assert!(info == pg_constants::CLOG_TRUNCATE);
195 1 : let xlrec = XlClogTruncate::decode(&mut buf);
196 1 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
197 0 : .await?;
198 : }
199 73504385 : } else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
200 2266604 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
201 2266604 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
202 2265097 : let parsed_xact =
203 2265097 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
204 2265097 : self.ingest_xact_record(
205 2265097 : modification,
206 2265097 : &parsed_xact,
207 2265097 : info == pg_constants::XLOG_XACT_COMMIT,
208 2265097 : ctx,
209 2265097 : )
210 2436 : .await?;
211 1507 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
212 1506 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
213 : {
214 2 : let parsed_xact =
215 2 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
216 2 : self.ingest_xact_record(
217 2 : modification,
218 2 : &parsed_xact,
219 2 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
220 2 : ctx,
221 2 : )
222 0 : .await?;
223 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
224 0 : trace!(
225 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
226 0 : decoded.xl_xid,
227 0 : parsed_xact.xid,
228 0 : lsn,
229 0 : );
230 2 : modification
231 2 : .drop_twophase_file(parsed_xact.xid, ctx)
232 0 : .await?;
233 1505 : } else if info == pg_constants::XLOG_XACT_PREPARE {
234 4 : modification
235 4 : .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
236 0 : .await?;
237 1501 : }
238 71237781 : } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID {
239 24332 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
240 24332 :
241 24332 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
242 14 : let pageno = buf.get_u32_le();
243 14 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
244 14 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
245 14 : self.put_slru_page_image(
246 14 : modification,
247 14 : SlruKind::MultiXactOffsets,
248 14 : segno,
249 14 : rpageno,
250 14 : ZERO_PAGE.clone(),
251 14 : ctx,
252 14 : )
253 0 : .await?;
254 24318 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
255 291 : let pageno = buf.get_u32_le();
256 291 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
257 291 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
258 291 : self.put_slru_page_image(
259 291 : modification,
260 291 : SlruKind::MultiXactMembers,
261 291 : segno,
262 291 : rpageno,
263 291 : ZERO_PAGE.clone(),
264 291 : ctx,
265 291 : )
266 1 : .await?;
267 24027 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
268 24027 : let xlrec = XlMultiXactCreate::decode(&mut buf);
269 24027 : self.ingest_multixact_create_record(modification, &xlrec)?;
270 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
271 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
272 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
273 0 : .await?;
274 0 : }
275 71213449 : } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
276 45 : let xlrec = XlRelmapUpdate::decode(&mut buf);
277 45 : self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
278 1 : .await?;
279 71213404 : } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
280 142663 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
281 142663 : if info == pg_constants::XLOG_NEXTOID {
282 449 : let next_oid = buf.get_u32_le();
283 449 : if self.checkpoint.nextOid != next_oid {
284 449 : self.checkpoint.nextOid = next_oid;
285 449 : self.checkpoint_modified = true;
286 449 : }
287 142214 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
288 142161 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
289 : {
290 696 : let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
291 696 : buf.copy_to_slice(&mut checkpoint_bytes);
292 696 : let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
293 0 : trace!(
294 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
295 0 : xlog_checkpoint.oldestXid,
296 0 : self.checkpoint.oldestXid
297 0 : );
298 696 : if (self
299 696 : .checkpoint
300 696 : .oldestXid
301 696 : .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
302 696 : < 0
303 0 : {
304 0 : self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
305 0 : self.checkpoint_modified = true;
306 696 : }
307 141518 : }
308 71070741 : } else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID {
309 15 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
310 15 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
311 : // This is a convenient way to make the WAL ingestion pause at
312 : // particular point in the WAL. For more fine-grained control,
313 : // we could peek into the message and only pause if it contains
314 : // a particular string, for example, but this is enough for now.
315 18 : crate::failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
316 0 : }
317 71070726 : }
318 :
319 : // Iterate through all the blocks that the record modifies, and
320 : // "put" a separate copy of the record for each block.
321 75336655 : for blk in decoded.blocks.iter() {
322 75336655 : self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
323 14044 : .await?;
324 : }
325 :
326 : // If checkpoint data was updated, store the new version in the repository
327 73526767 : if self.checkpoint_modified {
328 28394 : let new_checkpoint_bytes = self.checkpoint.encode()?;
329 :
330 28394 : modification.put_checkpoint(new_checkpoint_bytes)?;
331 28394 : self.checkpoint_modified = false;
332 73498373 : }
333 :
334 : // Now that this record has been fully handled, including updating the
335 : // checkpoint data, let the repository know that it is up-to-date to this LSN
336 73526767 : modification.commit().await?;
337 :
338 73526755 : Ok(())
339 73526755 : }
340 :
341 75336498 : async fn ingest_decoded_block(
342 75336498 : &mut self,
343 75336498 : modification: &mut DatadirModification<'_>,
344 75336498 : lsn: Lsn,
345 75336498 : decoded: &DecodedWALRecord,
346 75336498 : blk: &DecodedBkpBlock,
347 75336498 : ctx: &RequestContext,
348 75336655 : ) -> Result<(), PageReconstructError> {
349 75336655 : let rel = RelTag {
350 75336655 : spcnode: blk.rnode_spcnode,
351 75336655 : dbnode: blk.rnode_dbnode,
352 75336655 : relnode: blk.rnode_relnode,
353 75336655 : forknum: blk.forknum,
354 75336655 : };
355 75336655 :
356 75336655 : //
357 75336655 : // Instead of storing full-page-image WAL record,
358 75336655 : // it is better to store extracted image: we can skip wal-redo
359 75336655 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
360 75336655 : // so them have to be copied multiple times.
361 75336655 : //
362 75336655 : if blk.apply_image
363 170593 : && blk.has_image
364 170593 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
365 143829 : && (decoded.xl_info == pg_constants::XLOG_FPI
366 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
367 : // compression of WAL is not yet supported: fall back to storing the original WAL record
368 143829 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
369 : {
370 : // Extract page image from FPI record
371 143829 : let img_len = blk.bimg_len as usize;
372 143829 : let img_offs = blk.bimg_offset as usize;
373 143829 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
374 143829 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
375 143829 :
376 143829 : if blk.hole_length != 0 {
377 117283 : let tail = image.split_off(blk.hole_offset as usize);
378 117283 : image.resize(image.len() + blk.hole_length as usize, 0u8);
379 117283 : image.unsplit(tail);
380 117283 : }
381 : //
382 : // Match the logic of XLogReadBufferForRedoExtended:
383 : // The page may be uninitialized. If so, we can't set the LSN because
384 : // that would corrupt the page.
385 : //
386 143829 : if !page_is_new(&image) {
387 138540 : page_set_lsn(&mut image, lsn)
388 5289 : }
389 143829 : assert_eq!(image.len(), BLCKSZ as usize);
390 143829 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
391 1958 : .await?;
392 : } else {
393 75192826 : let rec = NeonWalRecord::Postgres {
394 75192826 : will_init: blk.will_init || blk.apply_image,
395 75192826 : rec: decoded.record.clone(),
396 75192826 : };
397 75192826 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
398 12086 : .await?;
399 : }
400 75336654 : Ok(())
401 75336654 : }
402 :
403 59861204 : async fn ingest_heapam_record(
404 59861204 : &mut self,
405 59861204 : buf: &mut Bytes,
406 59861204 : modification: &mut DatadirModification<'_>,
407 59861204 : decoded: &mut DecodedWALRecord,
408 59861204 : ctx: &RequestContext,
409 59861282 : ) -> anyhow::Result<()> {
410 59861282 : // Handle VM bit updates that are implicitly part of heap records.
411 59861282 :
412 59861282 : // First, look at the record to determine which VM bits need
413 59861282 : // to be cleared. If either of these variables is set, we
414 59861282 : // need to clear the corresponding bits in the visibility map.
415 59861282 : let mut new_heap_blkno: Option<u32> = None;
416 59861282 : let mut old_heap_blkno: Option<u32> = None;
417 59861282 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
418 57859190 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
419 57859190 : if info == pg_constants::XLOG_HEAP_INSERT {
420 47303205 : let xlrec = XlHeapInsert::decode(buf);
421 47303205 : assert_eq!(0, buf.remaining());
422 47303205 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
423 2943 : new_heap_blkno = Some(decoded.blocks[0].blkno);
424 47300262 : }
425 10555985 : } else if info == pg_constants::XLOG_HEAP_DELETE {
426 535296 : let xlrec = XlHeapDelete::decode(buf);
427 535296 : assert_eq!(0, buf.remaining());
428 535296 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
429 732 : new_heap_blkno = Some(decoded.blocks[0].blkno);
430 534564 : }
431 10020689 : } else if info == pg_constants::XLOG_HEAP_UPDATE
432 6590864 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
433 : {
434 5355809 : let xlrec = XlHeapUpdate::decode(buf);
435 5355809 : // the size of tuple data is inferred from the size of the record.
436 5355809 : // we can't validate the remaining number of bytes without parsing
437 5355809 : // the tuple data.
438 5355809 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
439 104268 : old_heap_blkno = Some(decoded.blocks[0].blkno);
440 5251541 : }
441 5355809 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
442 9268 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
443 9268 : // non-HOT update where the new tuple goes to different page than
444 9268 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
445 9268 : // set.
446 9268 : new_heap_blkno = Some(decoded.blocks[1].blkno);
447 5346541 : }
448 4664880 : }
449 2002092 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
450 2002092 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
451 2002092 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
452 692488 : let xlrec = XlHeapMultiInsert::decode(buf);
453 :
454 692488 : let offset_array_len = if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
455 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
456 545769 : 0
457 : } else {
458 146719 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
459 : };
460 692488 : assert_eq!(offset_array_len, buf.remaining());
461 :
462 692488 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
463 2045 : new_heap_blkno = Some(decoded.blocks[0].blkno);
464 690443 : }
465 1309604 : }
466 0 : }
467 : // FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED?
468 :
469 : // Clear the VM bits if required.
470 59861282 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
471 119084 : let vm_rel = RelTag {
472 119084 : forknum: VISIBILITYMAP_FORKNUM,
473 119084 : spcnode: decoded.blocks[0].rnode_spcnode,
474 119084 : dbnode: decoded.blocks[0].rnode_dbnode,
475 119084 : relnode: decoded.blocks[0].rnode_relnode,
476 119084 : };
477 119084 :
478 119084 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
479 119084 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
480 :
481 : // Sometimes, Postgres seems to create heap WAL records with the
482 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
483 : // not set. In fact, it's possible that the VM page does not exist at all.
484 : // In that case, we don't want to store a record to clear the VM bit;
485 : // replaying it would fail to find the previous image of the page, because
486 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
487 : // record if it doesn't.
488 119084 : let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
489 119084 : if let Some(blknum) = new_vm_blk {
490 14988 : if blknum >= vm_size {
491 0 : new_vm_blk = None;
492 14988 : }
493 104096 : }
494 119084 : if let Some(blknum) = old_vm_blk {
495 104268 : if blknum >= vm_size {
496 0 : old_vm_blk = None;
497 104268 : }
498 14816 : }
499 :
500 119084 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
501 119084 : if new_vm_blk == old_vm_blk {
502 : // An UPDATE record that needs to clear the bits for both old and the
503 : // new page, both of which reside on the same VM page.
504 172 : self.put_rel_wal_record(
505 172 : modification,
506 172 : vm_rel,
507 172 : new_vm_blk.unwrap(),
508 172 : NeonWalRecord::ClearVisibilityMapFlags {
509 172 : new_heap_blkno,
510 172 : old_heap_blkno,
511 172 : flags: pg_constants::VISIBILITYMAP_VALID_BITS,
512 172 : },
513 172 : ctx,
514 172 : )
515 0 : .await?;
516 : } else {
517 : // Clear VM bits for one heap page, or for two pages that reside on
518 : // different VM pages.
519 118912 : if let Some(new_vm_blk) = new_vm_blk {
520 14816 : self.put_rel_wal_record(
521 14816 : modification,
522 14816 : vm_rel,
523 14816 : new_vm_blk,
524 14816 : NeonWalRecord::ClearVisibilityMapFlags {
525 14816 : new_heap_blkno,
526 14816 : old_heap_blkno: None,
527 14816 : flags: pg_constants::VISIBILITYMAP_VALID_BITS,
528 14816 : },
529 14816 : ctx,
530 14816 : )
531 0 : .await?;
532 104096 : }
533 118912 : if let Some(old_vm_blk) = old_vm_blk {
534 104096 : self.put_rel_wal_record(
535 104096 : modification,
536 104096 : vm_rel,
537 104096 : old_vm_blk,
538 104096 : NeonWalRecord::ClearVisibilityMapFlags {
539 104096 : new_heap_blkno: None,
540 104096 : old_heap_blkno,
541 104096 : flags: pg_constants::VISIBILITYMAP_VALID_BITS,
542 104096 : },
543 104096 : ctx,
544 104096 : )
545 0 : .await?;
546 14816 : }
547 : }
548 0 : }
549 59742198 : }
550 :
551 59861282 : Ok(())
552 59861282 : }
553 :
554 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
555 12 : async fn ingest_xlog_dbase_create(
556 12 : &mut self,
557 12 : modification: &mut DatadirModification<'_>,
558 12 : rec: &XlCreateDatabase,
559 12 : ctx: &RequestContext,
560 12 : ) -> anyhow::Result<()> {
561 12 : let db_id = rec.db_id;
562 12 : let tablespace_id = rec.tablespace_id;
563 12 : let src_db_id = rec.src_db_id;
564 12 : let src_tablespace_id = rec.src_tablespace_id;
565 12 :
566 12 : // Creating a database is implemented by copying the template (aka. source) database.
567 12 : // To copy all the relations, we need to ask for the state as of the same LSN, but we
568 12 : // cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for
569 12 : // the last valid LSN to advance up to it. So we use the previous record's LSN in the
570 12 : // get calls instead.
571 12 : let req_lsn = modification.tline.get_last_record_lsn();
572 :
573 12 : let rels = modification
574 12 : .tline
575 12 : .list_rels(src_tablespace_id, src_db_id, req_lsn, ctx)
576 0 : .await?;
577 :
578 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
579 :
580 : // Copy relfilemap
581 12 : let filemap = modification
582 12 : .tline
583 12 : .get_relmap_file(src_tablespace_id, src_db_id, req_lsn, ctx)
584 0 : .await?;
585 12 : modification
586 12 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
587 0 : .await?;
588 :
589 12 : let mut num_rels_copied = 0;
590 12 : let mut num_blocks_copied = 0;
591 3516 : for src_rel in rels {
592 3504 : assert_eq!(src_rel.spcnode, src_tablespace_id);
593 3504 : assert_eq!(src_rel.dbnode, src_db_id);
594 :
595 3504 : let nblocks = modification
596 3504 : .tline
597 3504 : .get_rel_size(src_rel, req_lsn, true, ctx)
598 148 : .await?;
599 3504 : let dst_rel = RelTag {
600 3504 : spcnode: tablespace_id,
601 3504 : dbnode: db_id,
602 3504 : relnode: src_rel.relnode,
603 3504 : forknum: src_rel.forknum,
604 3504 : };
605 3504 :
606 3504 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
607 :
608 : // Copy content
609 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
610 12120 : for blknum in 0..nblocks {
611 0 : debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
612 :
613 12120 : let content = modification
614 12120 : .tline
615 12120 : .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true, ctx)
616 668 : .await?;
617 12120 : modification.put_rel_page_image(dst_rel, blknum, content)?;
618 12120 : num_blocks_copied += 1;
619 : }
620 :
621 3504 : num_rels_copied += 1;
622 : }
623 :
624 12 : info!(
625 12 : "Created database {}/{}, copied {} blocks in {} rels",
626 12 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
627 12 : );
628 12 : Ok(())
629 12 : }
630 :
631 21950 : async fn ingest_xlog_smgr_create(
632 21950 : &mut self,
633 21950 : modification: &mut DatadirModification<'_>,
634 21950 : rec: &XlSmgrCreate,
635 21950 : ctx: &RequestContext,
636 21950 : ) -> anyhow::Result<()> {
637 21950 : let rel = RelTag {
638 21950 : spcnode: rec.rnode.spcnode,
639 21950 : dbnode: rec.rnode.dbnode,
640 21950 : relnode: rec.rnode.relnode,
641 21950 : forknum: rec.forknum,
642 21950 : };
643 21950 : self.put_rel_creation(modification, rel, ctx).await?;
644 21950 : Ok(())
645 21950 : }
646 :
647 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
648 : ///
649 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
650 45 : async fn ingest_xlog_smgr_truncate(
651 45 : &mut self,
652 45 : modification: &mut DatadirModification<'_>,
653 45 : rec: &XlSmgrTruncate,
654 45 : ctx: &RequestContext,
655 45 : ) -> anyhow::Result<()> {
656 45 : let spcnode = rec.rnode.spcnode;
657 45 : let dbnode = rec.rnode.dbnode;
658 45 : let relnode = rec.rnode.relnode;
659 45 :
660 45 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
661 45 : let rel = RelTag {
662 45 : spcnode,
663 45 : dbnode,
664 45 : relnode,
665 45 : forknum: MAIN_FORKNUM,
666 45 : };
667 45 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
668 0 : .await?;
669 0 : }
670 45 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
671 45 : let rel = RelTag {
672 45 : spcnode,
673 45 : dbnode,
674 45 : relnode,
675 45 : forknum: FSM_FORKNUM,
676 45 : };
677 45 :
678 45 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
679 45 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
680 45 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
681 : // Tail of last remaining FSM page has to be zeroed.
682 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
683 21 : modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
684 21 : fsm_physical_page_no += 1;
685 24 : }
686 45 : let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
687 45 : if nblocks > fsm_physical_page_no {
688 : // check if something to do: FSM is larger than truncate position
689 22 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
690 0 : .await?;
691 23 : }
692 0 : }
693 45 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
694 45 : let rel = RelTag {
695 45 : spcnode,
696 45 : dbnode,
697 45 : relnode,
698 45 : forknum: VISIBILITYMAP_FORKNUM,
699 45 : };
700 45 :
701 45 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
702 45 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
703 : // Tail of last remaining vm page has to be zeroed.
704 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
705 21 : modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
706 21 : vm_page_no += 1;
707 24 : }
708 45 : let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
709 45 : if nblocks > vm_page_no {
710 : // check if something to do: VM is larger than truncate position
711 22 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
712 0 : .await?;
713 23 : }
714 0 : }
715 45 : Ok(())
716 45 : }
717 :
718 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
719 : ///
720 2265099 : async fn ingest_xact_record(
721 2265099 : &mut self,
722 2265099 : modification: &mut DatadirModification<'_>,
723 2265099 : parsed: &XlXactParsedRecord,
724 2265099 : is_commit: bool,
725 2265099 : ctx: &RequestContext,
726 2265099 : ) -> anyhow::Result<()> {
727 2265099 : // Record update of CLOG pages
728 2265099 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
729 2265099 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
730 2265099 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
731 2265099 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
732 :
733 2365148 : for subxact in &parsed.subxacts {
734 100049 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
735 100049 : if subxact_pageno != pageno {
736 : // This subxact goes to different page. Write the record
737 : // for all the XIDs on the previous page, and continue
738 : // accumulating XIDs on this new page.
739 : modification.put_slru_wal_record(
740 3 : SlruKind::Clog,
741 3 : segno,
742 3 : rpageno,
743 3 : if is_commit {
744 3 : NeonWalRecord::ClogSetCommitted {
745 3 : xids: page_xids,
746 3 : timestamp: parsed.xact_time,
747 3 : }
748 : } else {
749 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
750 : },
751 0 : )?;
752 3 : page_xids = Vec::new();
753 100046 : }
754 100049 : pageno = subxact_pageno;
755 100049 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
756 100049 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
757 100049 : page_xids.push(*subxact);
758 : }
759 : modification.put_slru_wal_record(
760 2265099 : SlruKind::Clog,
761 2265099 : segno,
762 2265099 : rpageno,
763 2265099 : if is_commit {
764 2263310 : NeonWalRecord::ClogSetCommitted {
765 2263310 : xids: page_xids,
766 2263310 : timestamp: parsed.xact_time,
767 2263310 : }
768 : } else {
769 1789 : NeonWalRecord::ClogSetAborted { xids: page_xids }
770 : },
771 0 : )?;
772 :
773 2281971 : for xnode in &parsed.xnodes {
774 84360 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
775 67488 : let rel = RelTag {
776 67488 : forknum,
777 67488 : spcnode: xnode.spcnode,
778 67488 : dbnode: xnode.dbnode,
779 67488 : relnode: xnode.relnode,
780 67488 : };
781 67488 : let last_lsn = self.timeline.get_last_record_lsn();
782 67488 : if modification
783 67488 : .tline
784 67488 : .get_rel_exists(rel, last_lsn, true, ctx)
785 1491 : .await?
786 : {
787 17780 : self.put_rel_drop(modification, rel, ctx).await?;
788 49708 : }
789 : }
790 : }
791 2265099 : Ok(())
792 2265099 : }
793 :
794 1 : async fn ingest_clog_truncate_record(
795 1 : &mut self,
796 1 : modification: &mut DatadirModification<'_>,
797 1 : xlrec: &XlClogTruncate,
798 1 : ctx: &RequestContext,
799 1 : ) -> anyhow::Result<()> {
800 1 : info!(
801 1 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
802 1 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
803 1 : );
804 :
805 : // Here we treat oldestXid and oldestXidDB
806 : // differently from postgres redo routines.
807 : // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
808 : // until checkpoint happens and updates the value.
809 : // Here we can use the most recent value.
810 : // It's just an optimization, though and can be deleted.
811 : // TODO Figure out if there will be any issues with replica.
812 1 : self.checkpoint.oldestXid = xlrec.oldest_xid;
813 1 : self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
814 1 : self.checkpoint_modified = true;
815 1 :
816 1 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
817 1 :
818 1 : let latest_page_number =
819 1 : self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
820 1 :
821 1 : // Now delete all segments containing pages between xlrec.pageno
822 1 : // and latest_page_number.
823 1 :
824 1 : // First, make an important safety check:
825 1 : // the current endpoint page must not be eligible for removal.
826 1 : // See SimpleLruTruncate() in slru.c
827 1 : if clogpage_precedes(latest_page_number, xlrec.pageno) {
828 0 : info!("could not truncate directory pg_xact apparent wraparound");
829 0 : return Ok(());
830 1 : }
831 1 :
832 1 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
833 1 : //
834 1 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
835 1 : // will block waiting for the last valid LSN to advance up to
836 1 : // it. So we use the previous record's LSN in the get calls
837 1 : // instead.
838 1 : let req_lsn = modification.tline.get_last_record_lsn();
839 10 : for segno in modification
840 1 : .tline
841 1 : .list_slru_segments(SlruKind::Clog, req_lsn, ctx)
842 0 : .await?
843 : {
844 10 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
845 10 : if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
846 9 : modification
847 9 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
848 0 : .await?;
849 0 : trace!("Drop CLOG segment {:>04X}", segno);
850 1 : }
851 : }
852 :
853 1 : Ok(())
854 1 : }
855 :
856 24027 : fn ingest_multixact_create_record(
857 24027 : &mut self,
858 24027 : modification: &mut DatadirModification,
859 24027 : xlrec: &XlMultiXactCreate,
860 24027 : ) -> Result<()> {
861 24027 : // Create WAL record for updating the multixact-offsets page
862 24027 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
863 24027 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
864 24027 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
865 24027 :
866 24027 : modification.put_slru_wal_record(
867 24027 : SlruKind::MultiXactOffsets,
868 24027 : segno,
869 24027 : rpageno,
870 24027 : NeonWalRecord::MultixactOffsetCreate {
871 24027 : mid: xlrec.mid,
872 24027 : moff: xlrec.moff,
873 24027 : },
874 24027 : )?;
875 :
876 : // Create WAL records for the update of each affected multixact-members page
877 24027 : let mut members = xlrec.members.iter();
878 24027 : let mut offset = xlrec.moff;
879 : loop {
880 48328 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
881 48328 :
882 48328 : // How many members fit on this page?
883 48328 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
884 48328 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
885 48328 :
886 48328 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
887 48328 : for _ in 0..page_remain {
888 520988 : if let Some(m) = members.next() {
889 472948 : this_page_members.push(m.clone());
890 472948 : } else {
891 48040 : break;
892 : }
893 : }
894 48328 : if this_page_members.is_empty() {
895 : // all done
896 24027 : break;
897 24301 : }
898 24301 : let n_this_page = this_page_members.len();
899 24301 :
900 24301 : modification.put_slru_wal_record(
901 24301 : SlruKind::MultiXactMembers,
902 24301 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
903 24301 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
904 24301 : NeonWalRecord::MultixactMembersCreate {
905 24301 : moff: offset,
906 24301 : members: this_page_members,
907 24301 : },
908 24301 : )?;
909 :
910 : // Note: The multixact members can wrap around, even within one WAL record.
911 24301 : offset = offset.wrapping_add(n_this_page as u32);
912 : }
913 24027 : if xlrec.mid >= self.checkpoint.nextMulti {
914 24027 : self.checkpoint.nextMulti = xlrec.mid + 1;
915 24027 : self.checkpoint_modified = true;
916 24027 : }
917 24027 : if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
918 24027 : self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
919 24027 : self.checkpoint_modified = true;
920 24027 : }
921 472948 : let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
922 472948 : if mbr.xid.wrapping_sub(acc) as i32 > 0 {
923 472903 : mbr.xid
924 : } else {
925 45 : acc
926 : }
927 472948 : });
928 24027 :
929 24027 : if self.checkpoint.update_next_xid(max_mbr_xid) {
930 0 : self.checkpoint_modified = true;
931 24027 : }
932 24027 : Ok(())
933 24027 : }
934 :
935 0 : async fn ingest_multixact_truncate_record(
936 0 : &mut self,
937 0 : modification: &mut DatadirModification<'_>,
938 0 : xlrec: &XlMultiXactTruncate,
939 0 : ctx: &RequestContext,
940 0 : ) -> Result<()> {
941 0 : self.checkpoint.oldestMulti = xlrec.end_trunc_off;
942 0 : self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
943 0 : self.checkpoint_modified = true;
944 0 :
945 0 : // PerformMembersTruncation
946 0 : let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
947 0 : let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
948 0 : let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
949 0 : let mut segment: i32 = startsegment;
950 :
951 : // Delete all the segments except the last one. The last segment can still
952 : // contain, possibly partially, valid data.
953 0 : while segment != endsegment {
954 0 : modification
955 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
956 0 : .await?;
957 :
958 : /* move to next segment, handling wraparound correctly */
959 0 : if segment == maxsegment {
960 0 : segment = 0;
961 0 : } else {
962 0 : segment += 1;
963 0 : }
964 : }
965 :
966 : // Truncate offsets
967 : // FIXME: this did not handle wraparound correctly
968 :
969 0 : Ok(())
970 0 : }
971 :
972 45 : async fn ingest_relmap_page(
973 45 : &mut self,
974 45 : modification: &mut DatadirModification<'_>,
975 45 : xlrec: &XlRelmapUpdate,
976 45 : decoded: &DecodedWALRecord,
977 45 : ctx: &RequestContext,
978 45 : ) -> Result<()> {
979 45 : let mut buf = decoded.record.clone();
980 45 : buf.advance(decoded.main_data_offset);
981 45 : // skip xl_relmap_update
982 45 : buf.advance(12);
983 45 :
984 45 : modification
985 45 : .put_relmap_file(
986 45 : xlrec.tsid,
987 45 : xlrec.dbid,
988 45 : Bytes::copy_from_slice(&buf[..]),
989 45 : ctx,
990 45 : )
991 1 : .await
992 45 : }
993 :
994 21951 : async fn put_rel_creation(
995 21951 : &mut self,
996 21951 : modification: &mut DatadirModification<'_>,
997 21951 : rel: RelTag,
998 21951 : ctx: &RequestContext,
999 21951 : ) -> Result<()> {
1000 21951 : modification.put_rel_creation(rel, 0, ctx).await?;
1001 21951 : Ok(())
1002 21951 : }
1003 :
1004 280030 : async fn put_rel_page_image(
1005 280030 : &mut self,
1006 280030 : modification: &mut DatadirModification<'_>,
1007 280030 : rel: RelTag,
1008 280030 : blknum: BlockNumber,
1009 280030 : img: Bytes,
1010 280030 : ctx: &RequestContext,
1011 280030 : ) -> Result<(), PageReconstructError> {
1012 280030 : self.handle_rel_extend(modification, rel, blknum, ctx)
1013 4005 : .await?;
1014 280030 : modification.put_rel_page_image(rel, blknum, img)?;
1015 280030 : Ok(())
1016 280030 : }
1017 :
1018 75311752 : async fn put_rel_wal_record(
1019 75311752 : &mut self,
1020 75311752 : modification: &mut DatadirModification<'_>,
1021 75311752 : rel: RelTag,
1022 75311752 : blknum: BlockNumber,
1023 75311752 : rec: NeonWalRecord,
1024 75311752 : ctx: &RequestContext,
1025 75311752 : ) -> Result<()> {
1026 75311910 : self.handle_rel_extend(modification, rel, blknum, ctx)
1027 12086 : .await?;
1028 75311909 : modification.put_rel_wal_record(rel, blknum, rec)?;
1029 75311909 : Ok(())
1030 75311909 : }
1031 :
1032 3095 : async fn put_rel_truncation(
1033 3095 : &mut self,
1034 3095 : modification: &mut DatadirModification<'_>,
1035 3095 : rel: RelTag,
1036 3095 : nblocks: BlockNumber,
1037 3095 : ctx: &RequestContext,
1038 3095 : ) -> anyhow::Result<()> {
1039 3095 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1040 3095 : Ok(())
1041 3095 : }
1042 :
1043 17781 : async fn put_rel_drop(
1044 17781 : &mut self,
1045 17781 : modification: &mut DatadirModification<'_>,
1046 17781 : rel: RelTag,
1047 17781 : ctx: &RequestContext,
1048 17781 : ) -> Result<()> {
1049 17781 : modification.put_rel_drop(rel, ctx).await?;
1050 17781 : Ok(())
1051 17781 : }
1052 :
1053 119174 : async fn get_relsize(
1054 119174 : &mut self,
1055 119174 : rel: RelTag,
1056 119174 : lsn: Lsn,
1057 119174 : ctx: &RequestContext,
1058 119174 : ) -> anyhow::Result<BlockNumber> {
1059 119174 : let nblocks = if !self.timeline.get_rel_exists(rel, lsn, true, ctx).await? {
1060 4 : 0
1061 : } else {
1062 119170 : self.timeline.get_rel_size(rel, lsn, true, ctx).await?
1063 : };
1064 119174 : Ok(nblocks)
1065 119174 : }
1066 :
1067 75591782 : async fn handle_rel_extend(
1068 75591782 : &mut self,
1069 75591782 : modification: &mut DatadirModification<'_>,
1070 75591782 : rel: RelTag,
1071 75591782 : blknum: BlockNumber,
1072 75591782 : ctx: &RequestContext,
1073 75591939 : ) -> Result<(), PageReconstructError> {
1074 75591939 : let new_nblocks = blknum + 1;
1075 75591939 : // Check if the relation exists. We implicitly create relations on first
1076 75591939 : // record.
1077 75591939 : // TODO: would be nice if to be more explicit about it
1078 75591939 : let last_lsn = modification.lsn;
1079 75591939 : let old_nblocks = if !self
1080 75591939 : .timeline
1081 75591939 : .get_rel_exists(rel, last_lsn, true, ctx)
1082 24 : .await?
1083 : {
1084 : // create it with 0 size initially, the logic below will extend it
1085 2270 : modification
1086 2270 : .put_rel_creation(rel, 0, ctx)
1087 205 : .await
1088 2270 : .context("Relation Error")?;
1089 2270 : 0
1090 : } else {
1091 75589669 : self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?
1092 : };
1093 :
1094 75591939 : if new_nblocks > old_nblocks {
1095 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1096 1092069 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1097 :
1098 : // fill the gap with zeros
1099 1092069 : for gap_blknum in old_nblocks..blknum {
1100 227386 : modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
1101 : }
1102 74499870 : }
1103 75591939 : Ok(())
1104 75591939 : }
1105 :
1106 674 : async fn put_slru_page_image(
1107 674 : &mut self,
1108 674 : modification: &mut DatadirModification<'_>,
1109 674 : kind: SlruKind,
1110 674 : segno: u32,
1111 674 : blknum: BlockNumber,
1112 674 : img: Bytes,
1113 674 : ctx: &RequestContext,
1114 674 : ) -> Result<()> {
1115 674 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1116 25 : .await?;
1117 674 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1118 674 : Ok(())
1119 674 : }
1120 :
1121 674 : async fn handle_slru_extend(
1122 674 : &mut self,
1123 674 : modification: &mut DatadirModification<'_>,
1124 674 : kind: SlruKind,
1125 674 : segno: u32,
1126 674 : blknum: BlockNumber,
1127 674 : ctx: &RequestContext,
1128 674 : ) -> anyhow::Result<()> {
1129 674 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1130 674 : // extended with ZEROPAGE records, not with commit records, so it happens
1131 674 : // a lot less frequently.
1132 674 :
1133 674 : let new_nblocks = blknum + 1;
1134 674 : // Check if the relation exists. We implicitly create relations on first
1135 674 : // record.
1136 674 : // TODO: would be nice if to be more explicit about it
1137 674 : let last_lsn = self.timeline.get_last_record_lsn();
1138 674 : let old_nblocks = if !self
1139 674 : .timeline
1140 674 : .get_slru_segment_exists(kind, segno, last_lsn, ctx)
1141 16 : .await?
1142 : {
1143 : // create it with 0 size initially, the logic below will extend it
1144 18 : modification
1145 18 : .put_slru_segment_creation(kind, segno, 0, ctx)
1146 0 : .await?;
1147 18 : 0
1148 : } else {
1149 656 : self.timeline
1150 656 : .get_slru_segment_size(kind, segno, last_lsn, ctx)
1151 9 : .await?
1152 : };
1153 :
1154 674 : if new_nblocks > old_nblocks {
1155 0 : trace!(
1156 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1157 0 : kind,
1158 0 : segno,
1159 0 : old_nblocks,
1160 0 : new_nblocks
1161 0 : );
1162 668 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1163 :
1164 : // fill the gap with zeros
1165 668 : for gap_blknum in old_nblocks..blknum {
1166 0 : modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
1167 : }
1168 6 : }
1169 674 : Ok(())
1170 674 : }
1171 : }
1172 :
1173 : #[allow(clippy::bool_assert_comparison)]
1174 : #[cfg(test)]
1175 : mod tests {
1176 : use super::*;
1177 : use crate::tenant::harness::*;
1178 : use crate::tenant::Timeline;
1179 : use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
1180 : use postgres_ffi::RELSEG_SIZE;
1181 :
1182 : use crate::DEFAULT_PG_VERSION;
1183 :
1184 : /// Arbitrary relation tag, for testing.
1185 : const TESTREL_A: RelTag = RelTag {
1186 : spcnode: 0,
1187 : dbnode: 111,
1188 : relnode: 1000,
1189 : forknum: 0,
1190 : };
1191 :
1192 6 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1193 6 : // TODO
1194 6 : }
1195 :
1196 : static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
1197 :
1198 4 : async fn init_walingest_test<'a>(
1199 4 : tline: &'a Timeline,
1200 4 : ctx: &RequestContext,
1201 4 : ) -> Result<WalIngest<'a>> {
1202 4 : let mut m = tline.begin_modification(Lsn(0x10));
1203 4 : m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1204 4 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1205 4 : m.commit().await?;
1206 4 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1207 :
1208 4 : Ok(walingest)
1209 4 : }
1210 :
1211 1 : #[tokio::test]
1212 1 : async fn test_relsize() -> Result<()> {
1213 1 : let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
1214 1 : let tline = tenant
1215 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1216 2 : .await?;
1217 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1218 :
1219 1 : let mut m = tline.begin_modification(Lsn(0x20));
1220 1 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1221 1 : walingest
1222 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1223 0 : .await?;
1224 1 : m.commit().await?;
1225 1 : let mut m = tline.begin_modification(Lsn(0x30));
1226 1 : walingest
1227 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
1228 0 : .await?;
1229 1 : m.commit().await?;
1230 1 : let mut m = tline.begin_modification(Lsn(0x40));
1231 1 : walingest
1232 1 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
1233 0 : .await?;
1234 1 : m.commit().await?;
1235 1 : let mut m = tline.begin_modification(Lsn(0x50));
1236 1 : walingest
1237 1 : .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
1238 0 : .await?;
1239 1 : m.commit().await?;
1240 :
1241 1 : assert_current_logical_size(&tline, Lsn(0x50));
1242 :
1243 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1244 1 : assert_eq!(
1245 1 : tline
1246 1 : .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
1247 0 : .await?,
1248 : false
1249 : );
1250 1 : assert!(tline
1251 1 : .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
1252 0 : .await
1253 1 : .is_err());
1254 1 : assert_eq!(
1255 1 : tline
1256 1 : .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
1257 0 : .await?,
1258 : true
1259 : );
1260 1 : assert_eq!(
1261 1 : tline
1262 1 : .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
1263 0 : .await?,
1264 : 1
1265 : );
1266 1 : assert_eq!(
1267 1 : tline
1268 1 : .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
1269 0 : .await?,
1270 : 3
1271 : );
1272 :
1273 : // Check page contents at each LSN
1274 1 : assert_eq!(
1275 1 : tline
1276 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false, &ctx)
1277 0 : .await?,
1278 1 : TEST_IMG("foo blk 0 at 2")
1279 : );
1280 :
1281 1 : assert_eq!(
1282 1 : tline
1283 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false, &ctx)
1284 0 : .await?,
1285 1 : TEST_IMG("foo blk 0 at 3")
1286 : );
1287 :
1288 1 : assert_eq!(
1289 1 : tline
1290 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false, &ctx)
1291 0 : .await?,
1292 1 : TEST_IMG("foo blk 0 at 3")
1293 : );
1294 1 : assert_eq!(
1295 1 : tline
1296 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false, &ctx)
1297 0 : .await?,
1298 1 : TEST_IMG("foo blk 1 at 4")
1299 : );
1300 :
1301 1 : assert_eq!(
1302 1 : tline
1303 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false, &ctx)
1304 0 : .await?,
1305 1 : TEST_IMG("foo blk 0 at 3")
1306 : );
1307 1 : assert_eq!(
1308 1 : tline
1309 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false, &ctx)
1310 0 : .await?,
1311 1 : TEST_IMG("foo blk 1 at 4")
1312 : );
1313 1 : assert_eq!(
1314 1 : tline
1315 1 : .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
1316 0 : .await?,
1317 1 : TEST_IMG("foo blk 2 at 5")
1318 : );
1319 :
1320 : // Truncate last block
1321 1 : let mut m = tline.begin_modification(Lsn(0x60));
1322 1 : walingest
1323 1 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1324 0 : .await?;
1325 1 : m.commit().await?;
1326 1 : assert_current_logical_size(&tline, Lsn(0x60));
1327 :
1328 : // Check reported size and contents after truncation
1329 1 : assert_eq!(
1330 1 : tline
1331 1 : .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
1332 0 : .await?,
1333 : 2
1334 : );
1335 1 : assert_eq!(
1336 1 : tline
1337 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false, &ctx)
1338 0 : .await?,
1339 1 : TEST_IMG("foo blk 0 at 3")
1340 : );
1341 1 : assert_eq!(
1342 1 : tline
1343 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false, &ctx)
1344 0 : .await?,
1345 1 : TEST_IMG("foo blk 1 at 4")
1346 : );
1347 :
1348 : // should still see the truncated block with older LSN
1349 1 : assert_eq!(
1350 1 : tline
1351 1 : .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
1352 0 : .await?,
1353 : 3
1354 : );
1355 1 : assert_eq!(
1356 1 : tline
1357 1 : .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
1358 0 : .await?,
1359 1 : TEST_IMG("foo blk 2 at 5")
1360 : );
1361 :
1362 : // Truncate to zero length
1363 1 : let mut m = tline.begin_modification(Lsn(0x68));
1364 1 : walingest
1365 1 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1366 0 : .await?;
1367 1 : m.commit().await?;
1368 1 : assert_eq!(
1369 1 : tline
1370 1 : .get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
1371 0 : .await?,
1372 : 0
1373 : );
1374 :
1375 : // Extend from 0 to 2 blocks, leaving a gap
1376 1 : let mut m = tline.begin_modification(Lsn(0x70));
1377 1 : walingest
1378 1 : .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
1379 0 : .await?;
1380 1 : m.commit().await?;
1381 1 : assert_eq!(
1382 1 : tline
1383 1 : .get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
1384 0 : .await?,
1385 : 2
1386 : );
1387 1 : assert_eq!(
1388 1 : tline
1389 1 : .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false, &ctx)
1390 0 : .await?,
1391 1 : ZERO_PAGE
1392 : );
1393 1 : assert_eq!(
1394 1 : tline
1395 1 : .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false, &ctx)
1396 0 : .await?,
1397 1 : TEST_IMG("foo blk 1")
1398 : );
1399 :
1400 : // Extend a lot more, leaving a big gap that spans across segments
1401 1 : let mut m = tline.begin_modification(Lsn(0x80));
1402 1 : walingest
1403 1 : .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
1404 0 : .await?;
1405 24 : m.commit().await?;
1406 1 : assert_eq!(
1407 1 : tline
1408 1 : .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
1409 0 : .await?,
1410 : 1501
1411 : );
1412 1499 : for blk in 2..1500 {
1413 1498 : assert_eq!(
1414 1498 : tline
1415 1498 : .get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false, &ctx)
1416 47 : .await?,
1417 1498 : ZERO_PAGE
1418 : );
1419 : }
1420 1 : assert_eq!(
1421 1 : tline
1422 1 : .get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false, &ctx)
1423 0 : .await?,
1424 1 : TEST_IMG("foo blk 1500")
1425 : );
1426 :
1427 1 : Ok(())
1428 : }
1429 :
1430 : // Test what happens if we dropped a relation
1431 : // and then created it again within the same layer.
1432 1 : #[tokio::test]
1433 1 : async fn test_drop_extend() -> Result<()> {
1434 1 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
1435 1 : let tline = tenant
1436 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1437 2 : .await?;
1438 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1439 :
1440 1 : let mut m = tline.begin_modification(Lsn(0x20));
1441 1 : walingest
1442 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
1443 0 : .await?;
1444 1 : m.commit().await?;
1445 :
1446 : // Check that rel exists and size is correct
1447 1 : assert_eq!(
1448 1 : tline
1449 1 : .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
1450 0 : .await?,
1451 : true
1452 : );
1453 1 : assert_eq!(
1454 1 : tline
1455 1 : .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
1456 0 : .await?,
1457 : 1
1458 : );
1459 :
1460 : // Drop rel
1461 1 : let mut m = tline.begin_modification(Lsn(0x30));
1462 1 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
1463 1 : m.commit().await?;
1464 :
1465 : // Check that rel is not visible anymore
1466 1 : assert_eq!(
1467 1 : tline
1468 1 : .get_rel_exists(TESTREL_A, Lsn(0x30), false, &ctx)
1469 0 : .await?,
1470 : false
1471 : );
1472 :
1473 : // FIXME: should fail
1474 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
1475 :
1476 : // Re-create it
1477 1 : let mut m = tline.begin_modification(Lsn(0x40));
1478 1 : walingest
1479 1 : .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
1480 0 : .await?;
1481 1 : m.commit().await?;
1482 :
1483 : // Check that rel exists and size is correct
1484 1 : assert_eq!(
1485 1 : tline
1486 1 : .get_rel_exists(TESTREL_A, Lsn(0x40), false, &ctx)
1487 0 : .await?,
1488 : true
1489 : );
1490 1 : assert_eq!(
1491 1 : tline
1492 1 : .get_rel_size(TESTREL_A, Lsn(0x40), false, &ctx)
1493 0 : .await?,
1494 : 1
1495 : );
1496 :
1497 1 : Ok(())
1498 : }
1499 :
1500 : // Test what happens if we truncated a relation
1501 : // so that one of its segments was dropped
1502 : // and then extended it again within the same layer.
1503 1 : #[tokio::test]
1504 1 : async fn test_truncate_extend() -> Result<()> {
1505 1 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
1506 1 : let tline = tenant
1507 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1508 2 : .await?;
1509 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1510 :
1511 : // Create a 20 MB relation (the size is arbitrary)
1512 1 : let relsize = 20 * 1024 * 1024 / 8192;
1513 1 : let mut m = tline.begin_modification(Lsn(0x20));
1514 2560 : for blkno in 0..relsize {
1515 2560 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
1516 2560 : walingest
1517 2560 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
1518 0 : .await?;
1519 : }
1520 40 : m.commit().await?;
1521 :
1522 : // The relation was created at LSN 20, not visible at LSN 1 yet.
1523 1 : assert_eq!(
1524 1 : tline
1525 1 : .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
1526 0 : .await?,
1527 : false
1528 : );
1529 1 : assert!(tline
1530 1 : .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
1531 0 : .await
1532 1 : .is_err());
1533 :
1534 1 : assert_eq!(
1535 1 : tline
1536 1 : .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
1537 0 : .await?,
1538 : true
1539 : );
1540 1 : assert_eq!(
1541 1 : tline
1542 1 : .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
1543 0 : .await?,
1544 : relsize
1545 : );
1546 :
1547 : // Check relation content
1548 2560 : for blkno in 0..relsize {
1549 2560 : let lsn = Lsn(0x20);
1550 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1551 2560 : assert_eq!(
1552 2560 : tline
1553 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false, &ctx)
1554 60 : .await?,
1555 2560 : TEST_IMG(&data)
1556 : );
1557 : }
1558 :
1559 : // Truncate relation so that second segment was dropped
1560 : // - only leave one page
1561 1 : let mut m = tline.begin_modification(Lsn(0x60));
1562 1 : walingest
1563 1 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
1564 0 : .await?;
1565 1 : m.commit().await?;
1566 :
1567 : // Check reported size and contents after truncation
1568 1 : assert_eq!(
1569 1 : tline
1570 1 : .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
1571 0 : .await?,
1572 : 1
1573 : );
1574 :
1575 2 : for blkno in 0..1 {
1576 1 : let lsn = Lsn(0x20);
1577 1 : let data = format!("foo blk {} at {}", blkno, lsn);
1578 1 : assert_eq!(
1579 1 : tline
1580 1 : .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false, &ctx)
1581 0 : .await?,
1582 1 : TEST_IMG(&data)
1583 : );
1584 : }
1585 :
1586 : // should still see all blocks with older LSN
1587 1 : assert_eq!(
1588 1 : tline
1589 1 : .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
1590 0 : .await?,
1591 : relsize
1592 : );
1593 2560 : for blkno in 0..relsize {
1594 2560 : let lsn = Lsn(0x20);
1595 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1596 2560 : assert_eq!(
1597 2560 : tline
1598 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false, &ctx)
1599 122 : .await?,
1600 2560 : TEST_IMG(&data)
1601 : );
1602 : }
1603 :
1604 : // Extend relation again.
1605 : // Add enough blocks to create second segment
1606 1 : let lsn = Lsn(0x80);
1607 1 : let mut m = tline.begin_modification(lsn);
1608 2560 : for blkno in 0..relsize {
1609 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1610 2560 : walingest
1611 2560 : .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
1612 0 : .await?;
1613 : }
1614 40 : m.commit().await?;
1615 :
1616 1 : assert_eq!(
1617 1 : tline
1618 1 : .get_rel_exists(TESTREL_A, Lsn(0x80), false, &ctx)
1619 0 : .await?,
1620 : true
1621 : );
1622 1 : assert_eq!(
1623 1 : tline
1624 1 : .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
1625 0 : .await?,
1626 : relsize
1627 : );
1628 : // Check relation content
1629 2560 : for blkno in 0..relsize {
1630 2560 : let lsn = Lsn(0x80);
1631 2560 : let data = format!("foo blk {} at {}", blkno, lsn);
1632 2560 : assert_eq!(
1633 2560 : tline
1634 2560 : .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false, &ctx)
1635 60 : .await?,
1636 2560 : TEST_IMG(&data)
1637 : );
1638 : }
1639 :
1640 1 : Ok(())
1641 : }
1642 :
1643 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
1644 : /// split into multiple 1 GB segments in Postgres.
1645 1 : #[tokio::test]
1646 1 : async fn test_large_rel() -> Result<()> {
1647 1 : let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
1648 1 : let tline = tenant
1649 1 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1650 2 : .await?;
1651 1 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1652 :
1653 1 : let mut lsn = 0x10;
1654 131073 : for blknum in 0..RELSEG_SIZE + 1 {
1655 131073 : lsn += 0x10;
1656 131073 : let mut m = tline.begin_modification(Lsn(lsn));
1657 131073 : let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
1658 131073 : walingest
1659 131073 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
1660 2047 : .await?;
1661 131073 : m.commit().await?;
1662 : }
1663 :
1664 1 : assert_current_logical_size(&tline, Lsn(lsn));
1665 :
1666 1 : assert_eq!(
1667 1 : tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
1668 1 : RELSEG_SIZE + 1
1669 : );
1670 :
1671 : // Truncate one block
1672 1 : lsn += 0x10;
1673 1 : let mut m = tline.begin_modification(Lsn(lsn));
1674 1 : walingest
1675 1 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
1676 0 : .await?;
1677 1 : m.commit().await?;
1678 1 : assert_eq!(
1679 1 : tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
1680 : RELSEG_SIZE
1681 : );
1682 1 : assert_current_logical_size(&tline, Lsn(lsn));
1683 1 :
1684 1 : // Truncate another block
1685 1 : lsn += 0x10;
1686 1 : let mut m = tline.begin_modification(Lsn(lsn));
1687 1 : walingest
1688 1 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
1689 0 : .await?;
1690 1 : m.commit().await?;
1691 1 : assert_eq!(
1692 1 : tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
1693 1 : RELSEG_SIZE - 1
1694 : );
1695 1 : assert_current_logical_size(&tline, Lsn(lsn));
1696 1 :
1697 1 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
1698 1 : // This tests the behavior at segment boundaries
1699 1 : let mut size: i32 = 3000;
1700 3002 : while size >= 0 {
1701 3001 : lsn += 0x10;
1702 3001 : let mut m = tline.begin_modification(Lsn(lsn));
1703 3001 : walingest
1704 3001 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
1705 47 : .await?;
1706 3001 : m.commit().await?;
1707 3001 : assert_eq!(
1708 3001 : tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
1709 3001 : size as BlockNumber
1710 : );
1711 :
1712 3001 : size -= 1;
1713 : }
1714 1 : assert_current_logical_size(&tline, Lsn(lsn));
1715 1 :
1716 1 : Ok(())
1717 : }
1718 : }
|