Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use pageserver_api::shard::ShardIdentity;
25 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
26 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
27 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
28 :
29 : use anyhow::{bail, Context, Result};
30 : use bytes::{Buf, Bytes, BytesMut};
31 : use tracing::*;
32 : use utils::failpoint_support;
33 :
34 : use crate::context::RequestContext;
35 : use crate::metrics::WAL_INGEST;
36 : use crate::pgdatadir_mapping::{DatadirModification, Version};
37 : use crate::tenant::PageReconstructError;
38 : use crate::tenant::Timeline;
39 : use crate::walrecord::*;
40 : use crate::ZERO_PAGE;
41 : use pageserver_api::key::rel_block_to_key;
42 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
43 : use postgres_ffi::pg_constants;
44 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
45 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
46 : use postgres_ffi::v14::xlog_utils::*;
47 : use postgres_ffi::v14::CheckPoint;
48 : use postgres_ffi::TransactionId;
49 : use postgres_ffi::BLCKSZ;
50 : use utils::lsn::Lsn;
51 :
52 : pub struct WalIngest {
53 : shard: ShardIdentity,
54 : checkpoint: CheckPoint,
55 : checkpoint_modified: bool,
56 : }
57 :
58 : impl WalIngest {
59 12 : pub async fn new(
60 12 : timeline: &Timeline,
61 12 : startpoint: Lsn,
62 12 : ctx: &RequestContext,
63 12 : ) -> anyhow::Result<WalIngest> {
64 : // Fetch the latest checkpoint into memory, so that we can compare with it
65 : // quickly in `ingest_record` and update it when it changes.
66 12 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
67 12 : let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
68 12 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
69 :
70 12 : Ok(WalIngest {
71 12 : shard: *timeline.get_shard_identity(),
72 12 : checkpoint,
73 12 : checkpoint_modified: false,
74 12 : })
75 12 : }
76 :
77 : ///
78 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
79 : ///
80 : /// This function updates `lsn` field of `DatadirModification`
81 : ///
82 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
83 : /// relations/pages that the record affects.
84 : ///
85 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
86 : ///
87 145852 : pub async fn ingest_record(
88 145852 : &mut self,
89 145852 : recdata: Bytes,
90 145852 : lsn: Lsn,
91 145852 : modification: &mut DatadirModification<'_>,
92 145852 : decoded: &mut DecodedWALRecord,
93 145852 : ctx: &RequestContext,
94 145852 : ) -> anyhow::Result<bool> {
95 145852 : WAL_INGEST.records_received.inc();
96 145852 : let pg_version = modification.tline.pg_version;
97 145852 : let prev_len = modification.len();
98 145852 :
99 145852 : modification.set_lsn(lsn)?;
100 145852 : decode_wal_record(recdata, decoded, pg_version)?;
101 :
102 145852 : let mut buf = decoded.record.clone();
103 145852 : buf.advance(decoded.main_data_offset);
104 145852 :
105 145852 : assert!(!self.checkpoint_modified);
106 145852 : if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
107 145834 : && self.checkpoint.update_next_xid(decoded.xl_xid)
108 2 : {
109 2 : self.checkpoint_modified = true;
110 145850 : }
111 :
112 145852 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
113 :
114 145852 : match decoded.xl_rmid {
115 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
116 : // Heap AM records need some special handling, because they modify VM pages
117 : // without registering them with the standard mechanism.
118 145474 : self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
119 0 : .await?;
120 : }
121 : pg_constants::RM_NEON_ID => {
122 0 : self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
123 0 : .await?;
124 : }
125 : // Handle other special record types
126 : pg_constants::RM_SMGR_ID => {
127 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
128 16 :
129 16 : if info == pg_constants::XLOG_SMGR_CREATE {
130 16 : let create = XlSmgrCreate::decode(&mut buf);
131 16 : self.ingest_xlog_smgr_create(modification, &create, ctx)
132 5 : .await?;
133 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
134 0 : let truncate = XlSmgrTruncate::decode(&mut buf);
135 0 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
136 0 : .await?;
137 0 : }
138 : }
139 : pg_constants::RM_DBASE_ID => {
140 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
141 0 : debug!(%info, %pg_version, "handle RM_DBASE_ID");
142 :
143 0 : if pg_version == 14 {
144 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
145 0 : let createdb = XlCreateDatabase::decode(&mut buf);
146 0 : debug!("XLOG_DBASE_CREATE v14");
147 :
148 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
149 0 : .await?;
150 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
151 0 : let dropdb = XlDropDatabase::decode(&mut buf);
152 0 : for tablespace_id in dropdb.tablespace_ids {
153 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
154 0 : modification
155 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
156 0 : .await?;
157 : }
158 0 : }
159 0 : } else if pg_version == 15 {
160 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
161 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
162 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
163 : // The XLOG record was renamed between v14 and v15,
164 : // but the record format is the same.
165 : // So we can reuse XlCreateDatabase here.
166 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
167 0 : let createdb = XlCreateDatabase::decode(&mut buf);
168 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
169 0 : .await?;
170 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
171 0 : let dropdb = XlDropDatabase::decode(&mut buf);
172 0 : for tablespace_id in dropdb.tablespace_ids {
173 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
174 0 : modification
175 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
176 0 : .await?;
177 : }
178 0 : }
179 0 : } else if pg_version == 16 {
180 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
181 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
182 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
183 : // The XLOG record was renamed between v14 and v15,
184 : // but the record format is the same.
185 : // So we can reuse XlCreateDatabase here.
186 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
187 0 : let createdb = XlCreateDatabase::decode(&mut buf);
188 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
189 0 : .await?;
190 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
191 0 : let dropdb = XlDropDatabase::decode(&mut buf);
192 0 : for tablespace_id in dropdb.tablespace_ids {
193 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
194 0 : modification
195 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
196 0 : .await?;
197 : }
198 0 : }
199 0 : }
200 : }
201 : pg_constants::RM_TBLSPC_ID => {
202 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
203 : }
204 : pg_constants::RM_CLOG_ID => {
205 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
206 0 :
207 0 : if info == pg_constants::CLOG_ZEROPAGE {
208 0 : let pageno = buf.get_u32_le();
209 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
210 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
211 0 : self.put_slru_page_image(
212 0 : modification,
213 0 : SlruKind::Clog,
214 0 : segno,
215 0 : rpageno,
216 0 : ZERO_PAGE.clone(),
217 0 : ctx,
218 0 : )
219 0 : .await?;
220 : } else {
221 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
222 0 : let xlrec = XlClogTruncate::decode(&mut buf);
223 0 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
224 0 : .await?;
225 : }
226 : }
227 : pg_constants::RM_XACT_ID => {
228 24 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
229 24 :
230 24 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
231 8 : let parsed_xact =
232 8 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
233 8 : self.ingest_xact_record(
234 8 : modification,
235 8 : &parsed_xact,
236 8 : info == pg_constants::XLOG_XACT_COMMIT,
237 8 : decoded.origin_id,
238 8 : ctx,
239 8 : )
240 0 : .await?;
241 16 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
242 16 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
243 : {
244 0 : let parsed_xact =
245 0 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
246 0 : self.ingest_xact_record(
247 0 : modification,
248 0 : &parsed_xact,
249 0 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
250 0 : decoded.origin_id,
251 0 : ctx,
252 0 : )
253 0 : .await?;
254 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
255 0 : trace!(
256 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
257 : decoded.xl_xid,
258 : parsed_xact.xid,
259 : lsn,
260 : );
261 0 : modification
262 0 : .drop_twophase_file(parsed_xact.xid, ctx)
263 0 : .await?;
264 16 : } else if info == pg_constants::XLOG_XACT_PREPARE {
265 0 : modification
266 0 : .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
267 0 : .await?;
268 16 : }
269 : }
270 : pg_constants::RM_MULTIXACT_ID => {
271 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
272 0 :
273 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
274 0 : let pageno = buf.get_u32_le();
275 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
276 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
277 0 : self.put_slru_page_image(
278 0 : modification,
279 0 : SlruKind::MultiXactOffsets,
280 0 : segno,
281 0 : rpageno,
282 0 : ZERO_PAGE.clone(),
283 0 : ctx,
284 0 : )
285 0 : .await?;
286 0 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
287 0 : let pageno = buf.get_u32_le();
288 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
289 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
290 0 : self.put_slru_page_image(
291 0 : modification,
292 0 : SlruKind::MultiXactMembers,
293 0 : segno,
294 0 : rpageno,
295 0 : ZERO_PAGE.clone(),
296 0 : ctx,
297 0 : )
298 0 : .await?;
299 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
300 0 : let xlrec = XlMultiXactCreate::decode(&mut buf);
301 0 : self.ingest_multixact_create_record(modification, &xlrec)?;
302 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
303 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
304 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
305 0 : .await?;
306 0 : }
307 : }
308 : pg_constants::RM_RELMAP_ID => {
309 0 : let xlrec = XlRelmapUpdate::decode(&mut buf);
310 0 : self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
311 0 : .await?;
312 : }
313 : pg_constants::RM_XLOG_ID => {
314 30 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
315 30 :
316 30 : if info == pg_constants::XLOG_NEXTOID {
317 2 : let next_oid = buf.get_u32_le();
318 2 : if self.checkpoint.nextOid != next_oid {
319 2 : self.checkpoint.nextOid = next_oid;
320 2 : self.checkpoint_modified = true;
321 2 : }
322 28 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
323 28 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
324 : {
325 2 : let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
326 2 : buf.copy_to_slice(&mut checkpoint_bytes);
327 2 : let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
328 2 : trace!(
329 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
330 : xlog_checkpoint.oldestXid,
331 : self.checkpoint.oldestXid
332 : );
333 2 : if (self
334 2 : .checkpoint
335 2 : .oldestXid
336 2 : .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
337 2 : < 0
338 0 : {
339 0 : self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
340 2 : }
341 2 : trace!(
342 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
343 : xlog_checkpoint.oldestActiveXid,
344 : self.checkpoint.oldestActiveXid
345 : );
346 :
347 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
348 : // because at shutdown, all in-progress transactions will implicitly
349 : // end. Postgres startup code knows that, and allows hot standby to start
350 : // immediately from a shutdown checkpoint.
351 : //
352 : // In Neon, Postgres hot standby startup always behaves as if starting from
353 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
354 : // instead of overwriting self.checkpoint.oldestActiveXid with
355 : // InvalidTransactionid from the checkpoint WAL record, update it to a
356 : // proper value, knowing that there are no in-progress transactions at this
357 : // point, except for prepared transactions.
358 : //
359 : // See also the neon code changes in the InitWalRecovery() function.
360 2 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
361 2 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
362 : {
363 2 : let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
364 2 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
365 0 : if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
366 0 : oldest_active_xid = xid;
367 0 : }
368 : }
369 2 : self.checkpoint.oldestActiveXid = oldest_active_xid;
370 0 : } else {
371 0 : self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
372 0 : }
373 :
374 : // Write a new checkpoint key-value pair on every checkpoint record, even
375 : // if nothing really changed. Not strictly required, but it seems nice to
376 : // have some trace of the checkpoint records in the layer files at the same
377 : // LSNs.
378 2 : self.checkpoint_modified = true;
379 26 : }
380 : }
381 : pg_constants::RM_LOGICALMSG_ID => {
382 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
383 0 :
384 0 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
385 0 : let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
386 0 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
387 0 : let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
388 0 : if prefix == "neon-test" {
389 : // This is a convenient way to make the WAL ingestion pause at
390 : // particular point in the WAL. For more fine-grained control,
391 : // we could peek into the message and only pause if it contains
392 : // a particular string, for example, but this is enough for now.
393 0 : failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
394 0 : } else if let Some(path) = prefix.strip_prefix("neon-file:") {
395 0 : modification.put_file(path, message, ctx).await?;
396 0 : }
397 0 : }
398 : }
399 : pg_constants::RM_STANDBY_ID => {
400 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
401 16 : if info == pg_constants::XLOG_RUNNING_XACTS {
402 0 : let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
403 0 : self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
404 0 : self.checkpoint_modified = true;
405 16 : }
406 : }
407 : pg_constants::RM_REPLORIGIN_ID => {
408 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
409 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
410 0 : let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
411 0 : modification
412 0 : .set_replorigin(xlrec.node_id, xlrec.remote_lsn)
413 0 : .await?
414 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
415 0 : let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
416 0 : modification.drop_replorigin(xlrec.node_id).await?
417 0 : }
418 : }
419 292 : _x => {
420 292 : // TODO: should probably log & fail here instead of blindly
421 292 : // doing something without understanding the protocol
422 292 : }
423 : }
424 :
425 : // Iterate through all the blocks that the record modifies, and
426 : // "put" a separate copy of the record for each block.
427 145852 : for blk in decoded.blocks.iter() {
428 145642 : let rel = RelTag {
429 145642 : spcnode: blk.rnode_spcnode,
430 145642 : dbnode: blk.rnode_dbnode,
431 145642 : relnode: blk.rnode_relnode,
432 145642 : forknum: blk.forknum,
433 145642 : };
434 145642 :
435 145642 : let key = rel_block_to_key(rel, blk.blkno);
436 145642 : let key_is_local = self.shard.is_key_local(&key);
437 145642 :
438 145642 : tracing::debug!(
439 : lsn=%lsn,
440 : key=%key,
441 0 : "ingest: shard decision {} (checkpoint={})",
442 0 : if !key_is_local { "drop" } else { "keep" },
443 : self.checkpoint_modified
444 : );
445 :
446 145642 : if !key_is_local {
447 0 : if self.shard.is_shard_zero() {
448 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
449 : // its blkno in case it implicitly extends a relation.
450 0 : self.observe_decoded_block(modification, blk, ctx).await?;
451 0 : }
452 :
453 0 : continue;
454 145642 : }
455 145642 : self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
456 43 : .await?;
457 : }
458 :
459 : // If checkpoint data was updated, store the new version in the repository
460 145852 : if self.checkpoint_modified {
461 6 : let new_checkpoint_bytes = self.checkpoint.encode()?;
462 :
463 6 : modification.put_checkpoint(new_checkpoint_bytes)?;
464 6 : self.checkpoint_modified = false;
465 145846 : }
466 :
467 : // Note that at this point this record is only cached in the modification
468 : // until commit() is called to flush the data into the repository and update
469 : // the latest LSN.
470 :
471 145852 : Ok(modification.len() > prev_len)
472 145852 : }
473 :
474 : /// Do not store this block, but observe it for the purposes of updating our relation size state.
475 0 : async fn observe_decoded_block(
476 0 : &mut self,
477 0 : modification: &mut DatadirModification<'_>,
478 0 : blk: &DecodedBkpBlock,
479 0 : ctx: &RequestContext,
480 0 : ) -> Result<(), PageReconstructError> {
481 0 : let rel = RelTag {
482 0 : spcnode: blk.rnode_spcnode,
483 0 : dbnode: blk.rnode_dbnode,
484 0 : relnode: blk.rnode_relnode,
485 0 : forknum: blk.forknum,
486 0 : };
487 0 : self.handle_rel_extend(modification, rel, blk.blkno, ctx)
488 0 : .await
489 0 : }
490 :
491 145642 : async fn ingest_decoded_block(
492 145642 : &mut self,
493 145642 : modification: &mut DatadirModification<'_>,
494 145642 : lsn: Lsn,
495 145642 : decoded: &DecodedWALRecord,
496 145642 : blk: &DecodedBkpBlock,
497 145642 : ctx: &RequestContext,
498 145642 : ) -> Result<(), PageReconstructError> {
499 145642 : let rel = RelTag {
500 145642 : spcnode: blk.rnode_spcnode,
501 145642 : dbnode: blk.rnode_dbnode,
502 145642 : relnode: blk.rnode_relnode,
503 145642 : forknum: blk.forknum,
504 145642 : };
505 145642 :
506 145642 : //
507 145642 : // Instead of storing full-page-image WAL record,
508 145642 : // it is better to store extracted image: we can skip wal-redo
509 145642 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
510 145642 : // so them have to be copied multiple times.
511 145642 : //
512 145642 : if blk.apply_image
513 60 : && blk.has_image
514 60 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
515 24 : && (decoded.xl_info == pg_constants::XLOG_FPI
516 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
517 : // compression of WAL is not yet supported: fall back to storing the original WAL record
518 24 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
519 : // do not materialize null pages because them most likely be soon replaced with real data
520 24 : && blk.bimg_len != 0
521 : {
522 : // Extract page image from FPI record
523 24 : let img_len = blk.bimg_len as usize;
524 24 : let img_offs = blk.bimg_offset as usize;
525 24 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
526 24 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
527 24 :
528 24 : if blk.hole_length != 0 {
529 0 : let tail = image.split_off(blk.hole_offset as usize);
530 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
531 0 : image.unsplit(tail);
532 24 : }
533 : //
534 : // Match the logic of XLogReadBufferForRedoExtended:
535 : // The page may be uninitialized. If so, we can't set the LSN because
536 : // that would corrupt the page.
537 : //
538 24 : if !page_is_new(&image) {
539 18 : page_set_lsn(&mut image, lsn)
540 6 : }
541 24 : assert_eq!(image.len(), BLCKSZ as usize);
542 24 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
543 0 : .await?;
544 : } else {
545 145618 : let rec = NeonWalRecord::Postgres {
546 145618 : will_init: blk.will_init || blk.apply_image,
547 145618 : rec: decoded.record.clone(),
548 145618 : };
549 145618 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
550 43 : .await?;
551 : }
552 145642 : Ok(())
553 145642 : }
554 :
555 145474 : async fn ingest_heapam_record(
556 145474 : &mut self,
557 145474 : buf: &mut Bytes,
558 145474 : modification: &mut DatadirModification<'_>,
559 145474 : decoded: &DecodedWALRecord,
560 145474 : ctx: &RequestContext,
561 145474 : ) -> anyhow::Result<()> {
562 145474 : // Handle VM bit updates that are implicitly part of heap records.
563 145474 :
564 145474 : // First, look at the record to determine which VM bits need
565 145474 : // to be cleared. If either of these variables is set, we
566 145474 : // need to clear the corresponding bits in the visibility map.
567 145474 : let mut new_heap_blkno: Option<u32> = None;
568 145474 : let mut old_heap_blkno: Option<u32> = None;
569 145474 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
570 145474 :
571 145474 : match modification.tline.pg_version {
572 : 14 => {
573 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
574 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
575 0 :
576 0 : if info == pg_constants::XLOG_HEAP_INSERT {
577 0 : let xlrec = v14::XlHeapInsert::decode(buf);
578 0 : assert_eq!(0, buf.remaining());
579 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
580 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
581 0 : }
582 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
583 0 : let xlrec = v14::XlHeapDelete::decode(buf);
584 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
585 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
586 0 : }
587 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
588 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
589 : {
590 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
591 0 : // the size of tuple data is inferred from the size of the record.
592 0 : // we can't validate the remaining number of bytes without parsing
593 0 : // the tuple data.
594 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
595 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
596 0 : }
597 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
598 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
599 0 : // non-HOT update where the new tuple goes to different page than
600 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
601 0 : // set.
602 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
603 0 : }
604 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
605 0 : let xlrec = v14::XlHeapLock::decode(buf);
606 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
607 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
608 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
609 0 : }
610 0 : }
611 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
612 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
613 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
614 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
615 :
616 0 : let offset_array_len =
617 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
618 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
619 0 : 0
620 : } else {
621 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
622 : };
623 0 : assert_eq!(offset_array_len, buf.remaining());
624 :
625 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
626 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
627 0 : }
628 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
629 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
630 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
631 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
632 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
633 0 : }
634 0 : }
635 : } else {
636 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
637 : }
638 : }
639 : 15 => {
640 145474 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
641 145286 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
642 145286 :
643 145286 : if info == pg_constants::XLOG_HEAP_INSERT {
644 145276 : let xlrec = v15::XlHeapInsert::decode(buf);
645 145276 : assert_eq!(0, buf.remaining());
646 145276 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
647 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
648 145272 : }
649 10 : } else if info == pg_constants::XLOG_HEAP_DELETE {
650 0 : let xlrec = v15::XlHeapDelete::decode(buf);
651 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
652 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
653 0 : }
654 10 : } else if info == pg_constants::XLOG_HEAP_UPDATE
655 2 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
656 : {
657 8 : let xlrec = v15::XlHeapUpdate::decode(buf);
658 8 : // the size of tuple data is inferred from the size of the record.
659 8 : // we can't validate the remaining number of bytes without parsing
660 8 : // the tuple data.
661 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
662 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
663 8 : }
664 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
665 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
666 0 : // non-HOT update where the new tuple goes to different page than
667 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
668 0 : // set.
669 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
670 8 : }
671 2 : } else if info == pg_constants::XLOG_HEAP_LOCK {
672 0 : let xlrec = v15::XlHeapLock::decode(buf);
673 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
674 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
675 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
676 0 : }
677 2 : }
678 188 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
679 188 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
680 188 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
681 42 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
682 :
683 42 : let offset_array_len =
684 42 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
685 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
686 2 : 0
687 : } else {
688 40 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
689 : };
690 42 : assert_eq!(offset_array_len, buf.remaining());
691 :
692 42 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
693 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
694 34 : }
695 146 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
696 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
697 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
698 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
699 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
700 0 : }
701 146 : }
702 : } else {
703 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
704 : }
705 : }
706 : 16 => {
707 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
708 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
709 0 :
710 0 : if info == pg_constants::XLOG_HEAP_INSERT {
711 0 : let xlrec = v16::XlHeapInsert::decode(buf);
712 0 : assert_eq!(0, buf.remaining());
713 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
714 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
715 0 : }
716 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
717 0 : let xlrec = v16::XlHeapDelete::decode(buf);
718 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
719 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
720 0 : }
721 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
722 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
723 : {
724 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
725 0 : // the size of tuple data is inferred from the size of the record.
726 0 : // we can't validate the remaining number of bytes without parsing
727 0 : // the tuple data.
728 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
729 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
730 0 : }
731 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
732 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
733 0 : // non-HOT update where the new tuple goes to different page than
734 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
735 0 : // set.
736 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
737 0 : }
738 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
739 0 : let xlrec = v16::XlHeapLock::decode(buf);
740 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
741 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
742 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
743 0 : }
744 0 : }
745 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
746 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
747 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
748 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
749 :
750 0 : let offset_array_len =
751 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
752 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
753 0 : 0
754 : } else {
755 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
756 : };
757 0 : assert_eq!(offset_array_len, buf.remaining());
758 :
759 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
760 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
761 0 : }
762 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
763 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
764 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
765 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
766 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
767 0 : }
768 0 : }
769 : } else {
770 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
771 : }
772 : }
773 0 : _ => {}
774 : }
775 :
776 : // Clear the VM bits if required.
777 145474 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
778 12 : let vm_rel = RelTag {
779 12 : forknum: VISIBILITYMAP_FORKNUM,
780 12 : spcnode: decoded.blocks[0].rnode_spcnode,
781 12 : dbnode: decoded.blocks[0].rnode_dbnode,
782 12 : relnode: decoded.blocks[0].rnode_relnode,
783 12 : };
784 12 :
785 12 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
786 12 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
787 :
788 : // Sometimes, Postgres seems to create heap WAL records with the
789 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
790 : // not set. In fact, it's possible that the VM page does not exist at all.
791 : // In that case, we don't want to store a record to clear the VM bit;
792 : // replaying it would fail to find the previous image of the page, because
793 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
794 : // record if it doesn't.
795 12 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
796 12 : if let Some(blknum) = new_vm_blk {
797 12 : if blknum >= vm_size {
798 0 : new_vm_blk = None;
799 12 : }
800 0 : }
801 12 : if let Some(blknum) = old_vm_blk {
802 0 : if blknum >= vm_size {
803 0 : old_vm_blk = None;
804 0 : }
805 12 : }
806 :
807 12 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
808 12 : if new_vm_blk == old_vm_blk {
809 : // An UPDATE record that needs to clear the bits for both old and the
810 : // new page, both of which reside on the same VM page.
811 0 : self.put_rel_wal_record(
812 0 : modification,
813 0 : vm_rel,
814 0 : new_vm_blk.unwrap(),
815 0 : NeonWalRecord::ClearVisibilityMapFlags {
816 0 : new_heap_blkno,
817 0 : old_heap_blkno,
818 0 : flags,
819 0 : },
820 0 : ctx,
821 0 : )
822 0 : .await?;
823 : } else {
824 : // Clear VM bits for one heap page, or for two pages that reside on
825 : // different VM pages.
826 12 : if let Some(new_vm_blk) = new_vm_blk {
827 12 : self.put_rel_wal_record(
828 12 : modification,
829 12 : vm_rel,
830 12 : new_vm_blk,
831 12 : NeonWalRecord::ClearVisibilityMapFlags {
832 12 : new_heap_blkno,
833 12 : old_heap_blkno: None,
834 12 : flags,
835 12 : },
836 12 : ctx,
837 12 : )
838 0 : .await?;
839 0 : }
840 12 : if let Some(old_vm_blk) = old_vm_blk {
841 0 : self.put_rel_wal_record(
842 0 : modification,
843 0 : vm_rel,
844 0 : old_vm_blk,
845 0 : NeonWalRecord::ClearVisibilityMapFlags {
846 0 : new_heap_blkno: None,
847 0 : old_heap_blkno,
848 0 : flags,
849 0 : },
850 0 : ctx,
851 0 : )
852 0 : .await?;
853 12 : }
854 : }
855 0 : }
856 145462 : }
857 :
858 145474 : Ok(())
859 145474 : }
860 :
861 0 : async fn ingest_neonrmgr_record(
862 0 : &mut self,
863 0 : buf: &mut Bytes,
864 0 : modification: &mut DatadirModification<'_>,
865 0 : decoded: &DecodedWALRecord,
866 0 : ctx: &RequestContext,
867 0 : ) -> anyhow::Result<()> {
868 0 : // Handle VM bit updates that are implicitly part of heap records.
869 0 :
870 0 : // First, look at the record to determine which VM bits need
871 0 : // to be cleared. If either of these variables is set, we
872 0 : // need to clear the corresponding bits in the visibility map.
873 0 : let mut new_heap_blkno: Option<u32> = None;
874 0 : let mut old_heap_blkno: Option<u32> = None;
875 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
876 0 : let pg_version = modification.tline.pg_version;
877 0 :
878 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
879 :
880 0 : match pg_version {
881 : 16 => {
882 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
883 0 :
884 0 : match info {
885 : pg_constants::XLOG_NEON_HEAP_INSERT => {
886 0 : let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
887 0 : assert_eq!(0, buf.remaining());
888 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
889 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
890 0 : }
891 : }
892 : pg_constants::XLOG_NEON_HEAP_DELETE => {
893 0 : let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
894 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
895 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
896 0 : }
897 : }
898 : pg_constants::XLOG_NEON_HEAP_UPDATE
899 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
900 0 : let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
901 0 : // the size of tuple data is inferred from the size of the record.
902 0 : // we can't validate the remaining number of bytes without parsing
903 0 : // the tuple data.
904 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
905 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
906 0 : }
907 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
908 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
909 0 : // non-HOT update where the new tuple goes to different page than
910 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
911 0 : // set.
912 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
913 0 : }
914 : }
915 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
916 0 : let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
917 :
918 0 : let offset_array_len =
919 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
920 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
921 0 : 0
922 : } else {
923 0 : std::mem::size_of::<u16>() * xlrec.ntuples as usize
924 : };
925 0 : assert_eq!(offset_array_len, buf.remaining());
926 :
927 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
928 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
929 0 : }
930 : }
931 : pg_constants::XLOG_NEON_HEAP_LOCK => {
932 0 : let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
933 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
934 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
935 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
936 0 : }
937 : }
938 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
939 : }
940 : }
941 0 : _ => bail!(
942 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
943 0 : pg_version
944 0 : ),
945 : }
946 :
947 : // Clear the VM bits if required.
948 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
949 0 : let vm_rel = RelTag {
950 0 : forknum: VISIBILITYMAP_FORKNUM,
951 0 : spcnode: decoded.blocks[0].rnode_spcnode,
952 0 : dbnode: decoded.blocks[0].rnode_dbnode,
953 0 : relnode: decoded.blocks[0].rnode_relnode,
954 0 : };
955 0 :
956 0 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
957 0 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
958 :
959 : // Sometimes, Postgres seems to create heap WAL records with the
960 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
961 : // not set. In fact, it's possible that the VM page does not exist at all.
962 : // In that case, we don't want to store a record to clear the VM bit;
963 : // replaying it would fail to find the previous image of the page, because
964 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
965 : // record if it doesn't.
966 0 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
967 0 : if let Some(blknum) = new_vm_blk {
968 0 : if blknum >= vm_size {
969 0 : new_vm_blk = None;
970 0 : }
971 0 : }
972 0 : if let Some(blknum) = old_vm_blk {
973 0 : if blknum >= vm_size {
974 0 : old_vm_blk = None;
975 0 : }
976 0 : }
977 :
978 0 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
979 0 : if new_vm_blk == old_vm_blk {
980 : // An UPDATE record that needs to clear the bits for both old and the
981 : // new page, both of which reside on the same VM page.
982 0 : self.put_rel_wal_record(
983 0 : modification,
984 0 : vm_rel,
985 0 : new_vm_blk.unwrap(),
986 0 : NeonWalRecord::ClearVisibilityMapFlags {
987 0 : new_heap_blkno,
988 0 : old_heap_blkno,
989 0 : flags,
990 0 : },
991 0 : ctx,
992 0 : )
993 0 : .await?;
994 : } else {
995 : // Clear VM bits for one heap page, or for two pages that reside on
996 : // different VM pages.
997 0 : if let Some(new_vm_blk) = new_vm_blk {
998 0 : self.put_rel_wal_record(
999 0 : modification,
1000 0 : vm_rel,
1001 0 : new_vm_blk,
1002 0 : NeonWalRecord::ClearVisibilityMapFlags {
1003 0 : new_heap_blkno,
1004 0 : old_heap_blkno: None,
1005 0 : flags,
1006 0 : },
1007 0 : ctx,
1008 0 : )
1009 0 : .await?;
1010 0 : }
1011 0 : if let Some(old_vm_blk) = old_vm_blk {
1012 0 : self.put_rel_wal_record(
1013 0 : modification,
1014 0 : vm_rel,
1015 0 : old_vm_blk,
1016 0 : NeonWalRecord::ClearVisibilityMapFlags {
1017 0 : new_heap_blkno: None,
1018 0 : old_heap_blkno,
1019 0 : flags,
1020 0 : },
1021 0 : ctx,
1022 0 : )
1023 0 : .await?;
1024 0 : }
1025 : }
1026 0 : }
1027 0 : }
1028 :
1029 0 : Ok(())
1030 0 : }
1031 :
1032 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
1033 0 : async fn ingest_xlog_dbase_create(
1034 0 : &mut self,
1035 0 : modification: &mut DatadirModification<'_>,
1036 0 : rec: &XlCreateDatabase,
1037 0 : ctx: &RequestContext,
1038 0 : ) -> anyhow::Result<()> {
1039 0 : let db_id = rec.db_id;
1040 0 : let tablespace_id = rec.tablespace_id;
1041 0 : let src_db_id = rec.src_db_id;
1042 0 : let src_tablespace_id = rec.src_tablespace_id;
1043 :
1044 0 : let rels = modification
1045 0 : .tline
1046 0 : .list_rels(
1047 0 : src_tablespace_id,
1048 0 : src_db_id,
1049 0 : Version::Modified(modification),
1050 0 : ctx,
1051 0 : )
1052 0 : .await?;
1053 :
1054 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
1055 :
1056 : // Copy relfilemap
1057 0 : let filemap = modification
1058 0 : .tline
1059 0 : .get_relmap_file(
1060 0 : src_tablespace_id,
1061 0 : src_db_id,
1062 0 : Version::Modified(modification),
1063 0 : ctx,
1064 0 : )
1065 0 : .await?;
1066 0 : modification
1067 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
1068 0 : .await?;
1069 :
1070 0 : let mut num_rels_copied = 0;
1071 0 : let mut num_blocks_copied = 0;
1072 0 : for src_rel in rels {
1073 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
1074 0 : assert_eq!(src_rel.dbnode, src_db_id);
1075 :
1076 0 : let nblocks = modification
1077 0 : .tline
1078 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
1079 0 : .await?;
1080 0 : let dst_rel = RelTag {
1081 0 : spcnode: tablespace_id,
1082 0 : dbnode: db_id,
1083 0 : relnode: src_rel.relnode,
1084 0 : forknum: src_rel.forknum,
1085 0 : };
1086 0 :
1087 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
1088 :
1089 : // Copy content
1090 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
1091 0 : for blknum in 0..nblocks {
1092 : // Sharding:
1093 : // - src and dst are always on the same shard, because they differ only by dbNode, and
1094 : // dbNode is not included in the hash inputs for sharding.
1095 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
1096 : // that belong to it.
1097 0 : let src_key = rel_block_to_key(src_rel, blknum);
1098 0 : if !self.shard.is_key_local(&src_key) {
1099 0 : debug!(
1100 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
1101 : src_key
1102 : );
1103 0 : continue;
1104 0 : }
1105 0 : debug!(
1106 0 : "copying block {} from {} ({}) to {}",
1107 : blknum, src_rel, src_key, dst_rel
1108 : );
1109 :
1110 0 : let content = modification
1111 0 : .tline
1112 0 : .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
1113 0 : .await?;
1114 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
1115 0 : num_blocks_copied += 1;
1116 : }
1117 :
1118 0 : num_rels_copied += 1;
1119 : }
1120 :
1121 0 : info!(
1122 0 : "Created database {}/{}, copied {} blocks in {} rels",
1123 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
1124 : );
1125 0 : Ok(())
1126 0 : }
1127 :
1128 16 : async fn ingest_xlog_smgr_create(
1129 16 : &mut self,
1130 16 : modification: &mut DatadirModification<'_>,
1131 16 : rec: &XlSmgrCreate,
1132 16 : ctx: &RequestContext,
1133 16 : ) -> anyhow::Result<()> {
1134 16 : let rel = RelTag {
1135 16 : spcnode: rec.rnode.spcnode,
1136 16 : dbnode: rec.rnode.dbnode,
1137 16 : relnode: rec.rnode.relnode,
1138 16 : forknum: rec.forknum,
1139 16 : };
1140 16 : self.put_rel_creation(modification, rel, ctx).await?;
1141 16 : Ok(())
1142 16 : }
1143 :
1144 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1145 : ///
1146 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1147 0 : async fn ingest_xlog_smgr_truncate(
1148 0 : &mut self,
1149 0 : modification: &mut DatadirModification<'_>,
1150 0 : rec: &XlSmgrTruncate,
1151 0 : ctx: &RequestContext,
1152 0 : ) -> anyhow::Result<()> {
1153 0 : let spcnode = rec.rnode.spcnode;
1154 0 : let dbnode = rec.rnode.dbnode;
1155 0 : let relnode = rec.rnode.relnode;
1156 0 :
1157 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
1158 0 : let rel = RelTag {
1159 0 : spcnode,
1160 0 : dbnode,
1161 0 : relnode,
1162 0 : forknum: MAIN_FORKNUM,
1163 0 : };
1164 0 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
1165 0 : .await?;
1166 0 : }
1167 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
1168 0 : let rel = RelTag {
1169 0 : spcnode,
1170 0 : dbnode,
1171 0 : relnode,
1172 0 : forknum: FSM_FORKNUM,
1173 0 : };
1174 0 :
1175 0 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1176 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1177 0 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1178 : // Tail of last remaining FSM page has to be zeroed.
1179 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1180 0 : modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
1181 0 : fsm_physical_page_no += 1;
1182 0 : }
1183 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1184 0 : if nblocks > fsm_physical_page_no {
1185 : // check if something to do: FSM is larger than truncate position
1186 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1187 0 : .await?;
1188 0 : }
1189 0 : }
1190 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
1191 0 : let rel = RelTag {
1192 0 : spcnode,
1193 0 : dbnode,
1194 0 : relnode,
1195 0 : forknum: VISIBILITYMAP_FORKNUM,
1196 0 : };
1197 0 :
1198 0 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1199 0 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1200 : // Tail of last remaining vm page has to be zeroed.
1201 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1202 0 : modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
1203 0 : vm_page_no += 1;
1204 0 : }
1205 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1206 0 : if nblocks > vm_page_no {
1207 : // check if something to do: VM is larger than truncate position
1208 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1209 0 : .await?;
1210 0 : }
1211 0 : }
1212 0 : Ok(())
1213 0 : }
1214 :
1215 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1216 : ///
1217 8 : async fn ingest_xact_record(
1218 8 : &mut self,
1219 8 : modification: &mut DatadirModification<'_>,
1220 8 : parsed: &XlXactParsedRecord,
1221 8 : is_commit: bool,
1222 8 : origin_id: u16,
1223 8 : ctx: &RequestContext,
1224 8 : ) -> anyhow::Result<()> {
1225 8 : // Record update of CLOG pages
1226 8 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1227 8 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1228 8 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1229 8 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1230 :
1231 8 : for subxact in &parsed.subxacts {
1232 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1233 0 : if subxact_pageno != pageno {
1234 : // This subxact goes to different page. Write the record
1235 : // for all the XIDs on the previous page, and continue
1236 : // accumulating XIDs on this new page.
1237 0 : modification.put_slru_wal_record(
1238 0 : SlruKind::Clog,
1239 0 : segno,
1240 0 : rpageno,
1241 0 : if is_commit {
1242 0 : NeonWalRecord::ClogSetCommitted {
1243 0 : xids: page_xids,
1244 0 : timestamp: parsed.xact_time,
1245 0 : }
1246 : } else {
1247 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1248 : },
1249 0 : )?;
1250 0 : page_xids = Vec::new();
1251 0 : }
1252 0 : pageno = subxact_pageno;
1253 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1254 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1255 0 : page_xids.push(*subxact);
1256 : }
1257 8 : modification.put_slru_wal_record(
1258 8 : SlruKind::Clog,
1259 8 : segno,
1260 8 : rpageno,
1261 8 : if is_commit {
1262 8 : NeonWalRecord::ClogSetCommitted {
1263 8 : xids: page_xids,
1264 8 : timestamp: parsed.xact_time,
1265 8 : }
1266 : } else {
1267 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1268 : },
1269 0 : )?;
1270 :
1271 8 : for xnode in &parsed.xnodes {
1272 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1273 0 : let rel = RelTag {
1274 0 : forknum,
1275 0 : spcnode: xnode.spcnode,
1276 0 : dbnode: xnode.dbnode,
1277 0 : relnode: xnode.relnode,
1278 0 : };
1279 0 : if modification
1280 0 : .tline
1281 0 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1282 0 : .await?
1283 : {
1284 0 : self.put_rel_drop(modification, rel, ctx).await?;
1285 0 : }
1286 : }
1287 : }
1288 8 : if origin_id != 0 {
1289 0 : modification
1290 0 : .set_replorigin(origin_id, parsed.origin_lsn)
1291 0 : .await?;
1292 8 : }
1293 8 : Ok(())
1294 8 : }
1295 :
1296 0 : async fn ingest_clog_truncate_record(
1297 0 : &mut self,
1298 0 : modification: &mut DatadirModification<'_>,
1299 0 : xlrec: &XlClogTruncate,
1300 0 : ctx: &RequestContext,
1301 0 : ) -> anyhow::Result<()> {
1302 0 : info!(
1303 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1304 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
1305 : );
1306 :
1307 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
1308 : // truncated, but a checkpoint record with the updated values isn't written until
1309 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
1310 : // so we keep the oldestXid and oldestXidDB up-to-date.
1311 0 : self.checkpoint.oldestXid = xlrec.oldest_xid;
1312 0 : self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
1313 0 : self.checkpoint_modified = true;
1314 0 :
1315 0 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1316 0 :
1317 0 : let latest_page_number =
1318 0 : self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
1319 0 :
1320 0 : // Now delete all segments containing pages between xlrec.pageno
1321 0 : // and latest_page_number.
1322 0 :
1323 0 : // First, make an important safety check:
1324 0 : // the current endpoint page must not be eligible for removal.
1325 0 : // See SimpleLruTruncate() in slru.c
1326 0 : if clogpage_precedes(latest_page_number, xlrec.pageno) {
1327 0 : info!("could not truncate directory pg_xact apparent wraparound");
1328 0 : return Ok(());
1329 0 : }
1330 :
1331 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1332 : //
1333 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1334 : // will block waiting for the last valid LSN to advance up to
1335 : // it. So we use the previous record's LSN in the get calls
1336 : // instead.
1337 0 : for segno in modification
1338 0 : .tline
1339 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
1340 0 : .await?
1341 : {
1342 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1343 0 : if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
1344 0 : modification
1345 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1346 0 : .await?;
1347 0 : trace!("Drop CLOG segment {:>04X}", segno);
1348 0 : }
1349 : }
1350 :
1351 0 : Ok(())
1352 0 : }
1353 :
1354 0 : fn ingest_multixact_create_record(
1355 0 : &mut self,
1356 0 : modification: &mut DatadirModification,
1357 0 : xlrec: &XlMultiXactCreate,
1358 0 : ) -> Result<()> {
1359 0 : // Create WAL record for updating the multixact-offsets page
1360 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1361 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1362 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1363 0 :
1364 0 : modification.put_slru_wal_record(
1365 0 : SlruKind::MultiXactOffsets,
1366 0 : segno,
1367 0 : rpageno,
1368 0 : NeonWalRecord::MultixactOffsetCreate {
1369 0 : mid: xlrec.mid,
1370 0 : moff: xlrec.moff,
1371 0 : },
1372 0 : )?;
1373 :
1374 : // Create WAL records for the update of each affected multixact-members page
1375 0 : let mut members = xlrec.members.iter();
1376 0 : let mut offset = xlrec.moff;
1377 : loop {
1378 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1379 0 :
1380 0 : // How many members fit on this page?
1381 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1382 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1383 0 :
1384 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1385 0 : for _ in 0..page_remain {
1386 0 : if let Some(m) = members.next() {
1387 0 : this_page_members.push(m.clone());
1388 0 : } else {
1389 0 : break;
1390 : }
1391 : }
1392 0 : if this_page_members.is_empty() {
1393 : // all done
1394 0 : break;
1395 0 : }
1396 0 : let n_this_page = this_page_members.len();
1397 0 :
1398 0 : modification.put_slru_wal_record(
1399 0 : SlruKind::MultiXactMembers,
1400 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1401 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1402 0 : NeonWalRecord::MultixactMembersCreate {
1403 0 : moff: offset,
1404 0 : members: this_page_members,
1405 0 : },
1406 0 : )?;
1407 :
1408 : // Note: The multixact members can wrap around, even within one WAL record.
1409 0 : offset = offset.wrapping_add(n_this_page as u32);
1410 : }
1411 0 : let next_offset = offset;
1412 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
1413 :
1414 : // Update next-multi-xid and next-offset
1415 : //
1416 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
1417 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
1418 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
1419 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
1420 : // is stored, so it's never 0 in a checkpoint.
1421 : //
1422 : // I don't know why it's done that way, it seems less error-prone to skip over 0
1423 : // when the value is stored rather than when it's read. But let's do it the same
1424 : // way here.
1425 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
1426 0 :
1427 0 : if self
1428 0 : .checkpoint
1429 0 : .update_next_multixid(next_multi_xid, next_offset)
1430 0 : {
1431 0 : self.checkpoint_modified = true;
1432 0 : }
1433 :
1434 : // Also update the next-xid with the highest member. According to the comments in
1435 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
1436 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1437 0 : if let Some(max_xid) = acc {
1438 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1439 0 : Some(mbr.xid)
1440 : } else {
1441 0 : acc
1442 : }
1443 : } else {
1444 0 : Some(mbr.xid)
1445 : }
1446 0 : });
1447 :
1448 0 : if let Some(max_xid) = max_mbr_xid {
1449 0 : if self.checkpoint.update_next_xid(max_xid) {
1450 0 : self.checkpoint_modified = true;
1451 0 : }
1452 0 : }
1453 0 : Ok(())
1454 0 : }
1455 :
1456 0 : async fn ingest_multixact_truncate_record(
1457 0 : &mut self,
1458 0 : modification: &mut DatadirModification<'_>,
1459 0 : xlrec: &XlMultiXactTruncate,
1460 0 : ctx: &RequestContext,
1461 0 : ) -> Result<()> {
1462 0 : self.checkpoint.oldestMulti = xlrec.end_trunc_off;
1463 0 : self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
1464 0 : self.checkpoint_modified = true;
1465 0 :
1466 0 : // PerformMembersTruncation
1467 0 : let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
1468 0 : let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
1469 0 : let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
1470 0 : let mut segment: i32 = startsegment;
1471 :
1472 : // Delete all the segments except the last one. The last segment can still
1473 : // contain, possibly partially, valid data.
1474 0 : while segment != endsegment {
1475 0 : modification
1476 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1477 0 : .await?;
1478 :
1479 : /* move to next segment, handling wraparound correctly */
1480 0 : if segment == maxsegment {
1481 0 : segment = 0;
1482 0 : } else {
1483 0 : segment += 1;
1484 0 : }
1485 : }
1486 :
1487 : // Truncate offsets
1488 : // FIXME: this did not handle wraparound correctly
1489 :
1490 0 : Ok(())
1491 0 : }
1492 :
1493 0 : async fn ingest_relmap_page(
1494 0 : &mut self,
1495 0 : modification: &mut DatadirModification<'_>,
1496 0 : xlrec: &XlRelmapUpdate,
1497 0 : decoded: &DecodedWALRecord,
1498 0 : ctx: &RequestContext,
1499 0 : ) -> Result<()> {
1500 0 : let mut buf = decoded.record.clone();
1501 0 : buf.advance(decoded.main_data_offset);
1502 0 : // skip xl_relmap_update
1503 0 : buf.advance(12);
1504 0 :
1505 0 : modification
1506 0 : .put_relmap_file(
1507 0 : xlrec.tsid,
1508 0 : xlrec.dbid,
1509 0 : Bytes::copy_from_slice(&buf[..]),
1510 0 : ctx,
1511 0 : )
1512 0 : .await
1513 0 : }
1514 :
1515 18 : async fn put_rel_creation(
1516 18 : &mut self,
1517 18 : modification: &mut DatadirModification<'_>,
1518 18 : rel: RelTag,
1519 18 : ctx: &RequestContext,
1520 18 : ) -> Result<()> {
1521 18 : modification.put_rel_creation(rel, 0, ctx).await?;
1522 18 : Ok(())
1523 18 : }
1524 :
1525 272426 : async fn put_rel_page_image(
1526 272426 : &mut self,
1527 272426 : modification: &mut DatadirModification<'_>,
1528 272426 : rel: RelTag,
1529 272426 : blknum: BlockNumber,
1530 272426 : img: Bytes,
1531 272426 : ctx: &RequestContext,
1532 272426 : ) -> Result<(), PageReconstructError> {
1533 272426 : self.handle_rel_extend(modification, rel, blknum, ctx)
1534 4073 : .await?;
1535 272426 : modification.put_rel_page_image(rel, blknum, img)?;
1536 272426 : Ok(())
1537 272426 : }
1538 :
1539 145630 : async fn put_rel_wal_record(
1540 145630 : &mut self,
1541 145630 : modification: &mut DatadirModification<'_>,
1542 145630 : rel: RelTag,
1543 145630 : blknum: BlockNumber,
1544 145630 : rec: NeonWalRecord,
1545 145630 : ctx: &RequestContext,
1546 145630 : ) -> Result<()> {
1547 145630 : self.handle_rel_extend(modification, rel, blknum, ctx)
1548 43 : .await?;
1549 145630 : modification.put_rel_wal_record(rel, blknum, rec)?;
1550 145630 : Ok(())
1551 145630 : }
1552 :
1553 6012 : async fn put_rel_truncation(
1554 6012 : &mut self,
1555 6012 : modification: &mut DatadirModification<'_>,
1556 6012 : rel: RelTag,
1557 6012 : nblocks: BlockNumber,
1558 6012 : ctx: &RequestContext,
1559 6012 : ) -> anyhow::Result<()> {
1560 6012 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1561 6012 : Ok(())
1562 6012 : }
1563 :
1564 2 : async fn put_rel_drop(
1565 2 : &mut self,
1566 2 : modification: &mut DatadirModification<'_>,
1567 2 : rel: RelTag,
1568 2 : ctx: &RequestContext,
1569 2 : ) -> Result<()> {
1570 2 : modification.put_rel_drop(rel, ctx).await?;
1571 2 : Ok(())
1572 2 : }
1573 :
1574 418056 : async fn handle_rel_extend(
1575 418056 : &mut self,
1576 418056 : modification: &mut DatadirModification<'_>,
1577 418056 : rel: RelTag,
1578 418056 : blknum: BlockNumber,
1579 418056 : ctx: &RequestContext,
1580 418056 : ) -> Result<(), PageReconstructError> {
1581 418056 : let new_nblocks = blknum + 1;
1582 : // Check if the relation exists. We implicitly create relations on first
1583 : // record.
1584 : // TODO: would be nice if to be more explicit about it
1585 :
1586 : // Get current size and put rel creation if rel doesn't exist
1587 : //
1588 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1589 : // check the cache too. This is because eagerly checking the cache results in
1590 : // less work overall and 10% better performance. It's more work on cache miss
1591 : // but cache miss is rare.
1592 418056 : let old_nblocks = if let Some(nblocks) = modification
1593 418056 : .tline
1594 418056 : .get_cached_rel_size(&rel, modification.get_lsn())
1595 : {
1596 418046 : nblocks
1597 10 : } else if !modification
1598 10 : .tline
1599 10 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1600 0 : .await?
1601 : {
1602 : // create it with 0 size initially, the logic below will extend it
1603 10 : modification
1604 10 : .put_rel_creation(rel, 0, ctx)
1605 0 : .await
1606 10 : .context("Relation Error")?;
1607 10 : 0
1608 : } else {
1609 0 : modification
1610 0 : .tline
1611 0 : .get_rel_size(rel, Version::Modified(modification), ctx)
1612 0 : .await?
1613 : };
1614 :
1615 418056 : if new_nblocks > old_nblocks {
1616 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1617 274788 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1618 :
1619 274788 : let mut key = rel_block_to_key(rel, blknum);
1620 : // fill the gap with zeros
1621 274788 : for gap_blknum in old_nblocks..blknum {
1622 2998 : key.field6 = gap_blknum;
1623 2998 :
1624 2998 : if self.shard.get_shard_number(&key) != self.shard.number {
1625 0 : continue;
1626 2998 : }
1627 2998 :
1628 2998 : modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
1629 : }
1630 143268 : }
1631 418056 : Ok(())
1632 418056 : }
1633 :
1634 0 : async fn put_slru_page_image(
1635 0 : &mut self,
1636 0 : modification: &mut DatadirModification<'_>,
1637 0 : kind: SlruKind,
1638 0 : segno: u32,
1639 0 : blknum: BlockNumber,
1640 0 : img: Bytes,
1641 0 : ctx: &RequestContext,
1642 0 : ) -> Result<()> {
1643 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1644 0 : .await?;
1645 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1646 0 : Ok(())
1647 0 : }
1648 :
1649 0 : async fn handle_slru_extend(
1650 0 : &mut self,
1651 0 : modification: &mut DatadirModification<'_>,
1652 0 : kind: SlruKind,
1653 0 : segno: u32,
1654 0 : blknum: BlockNumber,
1655 0 : ctx: &RequestContext,
1656 0 : ) -> anyhow::Result<()> {
1657 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1658 0 : // extended with ZEROPAGE records, not with commit records, so it happens
1659 0 : // a lot less frequently.
1660 0 :
1661 0 : let new_nblocks = blknum + 1;
1662 : // Check if the relation exists. We implicitly create relations on first
1663 : // record.
1664 : // TODO: would be nice if to be more explicit about it
1665 0 : let old_nblocks = if !modification
1666 0 : .tline
1667 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1668 0 : .await?
1669 : {
1670 : // create it with 0 size initially, the logic below will extend it
1671 0 : modification
1672 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1673 0 : .await?;
1674 0 : 0
1675 : } else {
1676 0 : modification
1677 0 : .tline
1678 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1679 0 : .await?
1680 : };
1681 :
1682 0 : if new_nblocks > old_nblocks {
1683 0 : trace!(
1684 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1685 : kind,
1686 : segno,
1687 : old_nblocks,
1688 : new_nblocks
1689 : );
1690 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1691 :
1692 : // fill the gap with zeros
1693 0 : for gap_blknum in old_nblocks..blknum {
1694 0 : modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
1695 : }
1696 0 : }
1697 0 : Ok(())
1698 0 : }
1699 : }
1700 :
1701 12 : async fn get_relsize(
1702 12 : modification: &DatadirModification<'_>,
1703 12 : rel: RelTag,
1704 12 : ctx: &RequestContext,
1705 12 : ) -> anyhow::Result<BlockNumber> {
1706 12 : let nblocks = if !modification
1707 12 : .tline
1708 12 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1709 0 : .await?
1710 : {
1711 0 : 0
1712 : } else {
1713 12 : modification
1714 12 : .tline
1715 12 : .get_rel_size(rel, Version::Modified(modification), ctx)
1716 0 : .await?
1717 : };
1718 12 : Ok(nblocks)
1719 12 : }
1720 :
1721 : #[allow(clippy::bool_assert_comparison)]
1722 : #[cfg(test)]
1723 : mod tests {
1724 : use super::*;
1725 : use crate::tenant::harness::*;
1726 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
1727 : use postgres_ffi::RELSEG_SIZE;
1728 :
1729 : use crate::DEFAULT_PG_VERSION;
1730 :
1731 : /// Arbitrary relation tag, for testing.
1732 : const TESTREL_A: RelTag = RelTag {
1733 : spcnode: 0,
1734 : dbnode: 111,
1735 : relnode: 1000,
1736 : forknum: 0,
1737 : };
1738 :
1739 12 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1740 12 : // TODO
1741 12 : }
1742 :
1743 : static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
1744 :
1745 8 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1746 8 : let mut m = tline.begin_modification(Lsn(0x10));
1747 8 : m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1748 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1749 8 : m.commit(ctx).await?;
1750 8 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1751 :
1752 8 : Ok(walingest)
1753 8 : }
1754 :
1755 : #[tokio::test]
1756 2 : async fn test_relsize() -> Result<()> {
1757 8 : let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
1758 2 : let tline = tenant
1759 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1760 6 : .await?;
1761 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1762 2 :
1763 2 : let mut m = tline.begin_modification(Lsn(0x20));
1764 2 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1765 2 : walingest
1766 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1767 2 : .await?;
1768 2 : m.commit(&ctx).await?;
1769 2 : let mut m = tline.begin_modification(Lsn(0x30));
1770 2 : walingest
1771 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
1772 2 : .await?;
1773 2 : m.commit(&ctx).await?;
1774 2 : let mut m = tline.begin_modification(Lsn(0x40));
1775 2 : walingest
1776 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
1777 2 : .await?;
1778 2 : m.commit(&ctx).await?;
1779 2 : let mut m = tline.begin_modification(Lsn(0x50));
1780 2 : walingest
1781 2 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
1782 2 : .await?;
1783 2 : m.commit(&ctx).await?;
1784 2 :
1785 2 : assert_current_logical_size(&tline, Lsn(0x50));
1786 2 :
1787 2 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1788 2 : assert_eq!(
1789 2 : tline
1790 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1791 2 : .await?,
1792 2 : false
1793 2 : );
1794 2 : assert!(tline
1795 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1796 2 : .await
1797 2 : .is_err());
1798 2 : assert_eq!(
1799 2 : tline
1800 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1801 2 : .await?,
1802 2 : true
1803 2 : );
1804 2 : assert_eq!(
1805 2 : tline
1806 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1807 2 : .await?,
1808 2 : 1
1809 2 : );
1810 2 : assert_eq!(
1811 2 : tline
1812 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1813 2 : .await?,
1814 2 : 3
1815 2 : );
1816 2 :
1817 2 : // Check page contents at each LSN
1818 2 : assert_eq!(
1819 2 : tline
1820 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
1821 2 : .await?,
1822 2 : test_img("foo blk 0 at 2")
1823 2 : );
1824 2 :
1825 2 : assert_eq!(
1826 2 : tline
1827 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
1828 2 : .await?,
1829 2 : test_img("foo blk 0 at 3")
1830 2 : );
1831 2 :
1832 2 : assert_eq!(
1833 2 : tline
1834 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
1835 2 : .await?,
1836 2 : test_img("foo blk 0 at 3")
1837 2 : );
1838 2 : assert_eq!(
1839 2 : tline
1840 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
1841 2 : .await?,
1842 2 : test_img("foo blk 1 at 4")
1843 2 : );
1844 2 :
1845 2 : assert_eq!(
1846 2 : tline
1847 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
1848 2 : .await?,
1849 2 : test_img("foo blk 0 at 3")
1850 2 : );
1851 2 : assert_eq!(
1852 2 : tline
1853 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
1854 2 : .await?,
1855 2 : test_img("foo blk 1 at 4")
1856 2 : );
1857 2 : assert_eq!(
1858 2 : tline
1859 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
1860 2 : .await?,
1861 2 : test_img("foo blk 2 at 5")
1862 2 : );
1863 2 :
1864 2 : // Truncate last block
1865 2 : let mut m = tline.begin_modification(Lsn(0x60));
1866 2 : walingest
1867 2 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1868 2 : .await?;
1869 2 : m.commit(&ctx).await?;
1870 2 : assert_current_logical_size(&tline, Lsn(0x60));
1871 2 :
1872 2 : // Check reported size and contents after truncation
1873 2 : assert_eq!(
1874 2 : tline
1875 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
1876 2 : .await?,
1877 2 : 2
1878 2 : );
1879 2 : assert_eq!(
1880 2 : tline
1881 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
1882 2 : .await?,
1883 2 : test_img("foo blk 0 at 3")
1884 2 : );
1885 2 : assert_eq!(
1886 2 : tline
1887 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
1888 2 : .await?,
1889 2 : test_img("foo blk 1 at 4")
1890 2 : );
1891 2 :
1892 2 : // should still see the truncated block with older LSN
1893 2 : assert_eq!(
1894 2 : tline
1895 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1896 2 : .await?,
1897 2 : 3
1898 2 : );
1899 2 : assert_eq!(
1900 2 : tline
1901 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
1902 2 : .await?,
1903 2 : test_img("foo blk 2 at 5")
1904 2 : );
1905 2 :
1906 2 : // Truncate to zero length
1907 2 : let mut m = tline.begin_modification(Lsn(0x68));
1908 2 : walingest
1909 2 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1910 2 : .await?;
1911 2 : m.commit(&ctx).await?;
1912 2 : assert_eq!(
1913 2 : tline
1914 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
1915 2 : .await?,
1916 2 : 0
1917 2 : );
1918 2 :
1919 2 : // Extend from 0 to 2 blocks, leaving a gap
1920 2 : let mut m = tline.begin_modification(Lsn(0x70));
1921 2 : walingest
1922 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
1923 2 : .await?;
1924 2 : m.commit(&ctx).await?;
1925 2 : assert_eq!(
1926 2 : tline
1927 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
1928 2 : .await?,
1929 2 : 2
1930 2 : );
1931 2 : assert_eq!(
1932 2 : tline
1933 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
1934 2 : .await?,
1935 2 : ZERO_PAGE
1936 2 : );
1937 2 : assert_eq!(
1938 2 : tline
1939 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
1940 2 : .await?,
1941 2 : test_img("foo blk 1")
1942 2 : );
1943 2 :
1944 2 : // Extend a lot more, leaving a big gap that spans across segments
1945 2 : let mut m = tline.begin_modification(Lsn(0x80));
1946 2 : walingest
1947 2 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
1948 2 : .await?;
1949 214 : m.commit(&ctx).await?;
1950 2 : assert_eq!(
1951 2 : tline
1952 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
1953 2 : .await?,
1954 2 : 1501
1955 2 : );
1956 2998 : for blk in 2..1500 {
1957 2996 : assert_eq!(
1958 2996 : tline
1959 2996 : .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
1960 3016 : .await?,
1961 2996 : ZERO_PAGE
1962 2 : );
1963 2 : }
1964 2 : assert_eq!(
1965 2 : tline
1966 2 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
1967 2 : .await?,
1968 2 : test_img("foo blk 1500")
1969 2 : );
1970 2 :
1971 2 : Ok(())
1972 2 : }
1973 :
1974 : // Test what happens if we dropped a relation
1975 : // and then created it again within the same layer.
1976 : #[tokio::test]
1977 2 : async fn test_drop_extend() -> Result<()> {
1978 8 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
1979 2 : let tline = tenant
1980 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1981 6 : .await?;
1982 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1983 2 :
1984 2 : let mut m = tline.begin_modification(Lsn(0x20));
1985 2 : walingest
1986 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1987 2 : .await?;
1988 2 : m.commit(&ctx).await?;
1989 2 :
1990 2 : // Check that rel exists and size is correct
1991 2 : assert_eq!(
1992 2 : tline
1993 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1994 2 : .await?,
1995 2 : true
1996 2 : );
1997 2 : assert_eq!(
1998 2 : tline
1999 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2000 2 : .await?,
2001 2 : 1
2002 2 : );
2003 2 :
2004 2 : // Drop rel
2005 2 : let mut m = tline.begin_modification(Lsn(0x30));
2006 2 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
2007 2 : m.commit(&ctx).await?;
2008 2 :
2009 2 : // Check that rel is not visible anymore
2010 2 : assert_eq!(
2011 2 : tline
2012 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
2013 2 : .await?,
2014 2 : false
2015 2 : );
2016 2 :
2017 2 : // FIXME: should fail
2018 2 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
2019 2 :
2020 2 : // Re-create it
2021 2 : let mut m = tline.begin_modification(Lsn(0x40));
2022 2 : walingest
2023 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
2024 2 : .await?;
2025 2 : m.commit(&ctx).await?;
2026 2 :
2027 2 : // Check that rel exists and size is correct
2028 2 : assert_eq!(
2029 2 : tline
2030 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2031 2 : .await?,
2032 2 : true
2033 2 : );
2034 2 : assert_eq!(
2035 2 : tline
2036 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2037 2 : .await?,
2038 2 : 1
2039 2 : );
2040 2 :
2041 2 : Ok(())
2042 2 : }
2043 :
2044 : // Test what happens if we truncated a relation
2045 : // so that one of its segments was dropped
2046 : // and then extended it again within the same layer.
2047 : #[tokio::test]
2048 2 : async fn test_truncate_extend() -> Result<()> {
2049 8 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
2050 2 : let tline = tenant
2051 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2052 6 : .await?;
2053 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2054 2 :
2055 2 : // Create a 20 MB relation (the size is arbitrary)
2056 2 : let relsize = 20 * 1024 * 1024 / 8192;
2057 2 : let mut m = tline.begin_modification(Lsn(0x20));
2058 5120 : for blkno in 0..relsize {
2059 5120 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
2060 5120 : walingest
2061 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2062 2 : .await?;
2063 2 : }
2064 41 : m.commit(&ctx).await?;
2065 2 :
2066 2 : // The relation was created at LSN 20, not visible at LSN 1 yet.
2067 2 : assert_eq!(
2068 2 : tline
2069 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2070 2 : .await?,
2071 2 : false
2072 2 : );
2073 2 : assert!(tline
2074 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2075 2 : .await
2076 2 : .is_err());
2077 2 :
2078 2 : assert_eq!(
2079 2 : tline
2080 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2081 2 : .await?,
2082 2 : true
2083 2 : );
2084 2 : assert_eq!(
2085 2 : tline
2086 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2087 2 : .await?,
2088 2 : relsize
2089 2 : );
2090 2 :
2091 2 : // Check relation content
2092 5120 : for blkno in 0..relsize {
2093 5120 : let lsn = Lsn(0x20);
2094 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2095 5120 : assert_eq!(
2096 5120 : tline
2097 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
2098 160 : .await?,
2099 5120 : test_img(&data)
2100 2 : );
2101 2 : }
2102 2 :
2103 2 : // Truncate relation so that second segment was dropped
2104 2 : // - only leave one page
2105 2 : let mut m = tline.begin_modification(Lsn(0x60));
2106 2 : walingest
2107 2 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2108 2 : .await?;
2109 2 : m.commit(&ctx).await?;
2110 2 :
2111 2 : // Check reported size and contents after truncation
2112 2 : assert_eq!(
2113 2 : tline
2114 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2115 2 : .await?,
2116 2 : 1
2117 2 : );
2118 2 :
2119 4 : for blkno in 0..1 {
2120 2 : let lsn = Lsn(0x20);
2121 2 : let data = format!("foo blk {} at {}", blkno, lsn);
2122 2 : assert_eq!(
2123 2 : tline
2124 2 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
2125 2 : .await?,
2126 2 : test_img(&data)
2127 2 : );
2128 2 : }
2129 2 :
2130 2 : // should still see all blocks with older LSN
2131 2 : assert_eq!(
2132 2 : tline
2133 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2134 2 : .await?,
2135 2 : relsize
2136 2 : );
2137 5120 : for blkno in 0..relsize {
2138 5120 : let lsn = Lsn(0x20);
2139 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2140 5120 : assert_eq!(
2141 5120 : tline
2142 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
2143 321 : .await?,
2144 5120 : test_img(&data)
2145 2 : );
2146 2 : }
2147 2 :
2148 2 : // Extend relation again.
2149 2 : // Add enough blocks to create second segment
2150 2 : let lsn = Lsn(0x80);
2151 2 : let mut m = tline.begin_modification(lsn);
2152 5120 : for blkno in 0..relsize {
2153 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2154 5120 : walingest
2155 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2156 2 : .await?;
2157 2 : }
2158 42 : m.commit(&ctx).await?;
2159 2 :
2160 2 : assert_eq!(
2161 2 : tline
2162 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2163 2 : .await?,
2164 2 : true
2165 2 : );
2166 2 : assert_eq!(
2167 2 : tline
2168 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2169 2 : .await?,
2170 2 : relsize
2171 2 : );
2172 2 : // Check relation content
2173 5120 : for blkno in 0..relsize {
2174 5120 : let lsn = Lsn(0x80);
2175 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2176 5120 : assert_eq!(
2177 5120 : tline
2178 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
2179 160 : .await?,
2180 5120 : test_img(&data)
2181 2 : );
2182 2 : }
2183 2 :
2184 2 : Ok(())
2185 2 : }
2186 :
2187 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2188 : /// split into multiple 1 GB segments in Postgres.
2189 : #[tokio::test]
2190 2 : async fn test_large_rel() -> Result<()> {
2191 8 : let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
2192 2 : let tline = tenant
2193 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2194 6 : .await?;
2195 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2196 2 :
2197 2 : let mut lsn = 0x10;
2198 262146 : for blknum in 0..RELSEG_SIZE + 1 {
2199 262146 : lsn += 0x10;
2200 262146 : let mut m = tline.begin_modification(Lsn(lsn));
2201 262146 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2202 262146 : walingest
2203 262146 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2204 4073 : .await?;
2205 262146 : m.commit(&ctx).await?;
2206 2 : }
2207 2 :
2208 2 : assert_current_logical_size(&tline, Lsn(lsn));
2209 2 :
2210 2 : assert_eq!(
2211 2 : tline
2212 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2213 2 : .await?,
2214 2 : RELSEG_SIZE + 1
2215 2 : );
2216 2 :
2217 2 : // Truncate one block
2218 2 : lsn += 0x10;
2219 2 : let mut m = tline.begin_modification(Lsn(lsn));
2220 2 : walingest
2221 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2222 2 : .await?;
2223 2 : m.commit(&ctx).await?;
2224 2 : assert_eq!(
2225 2 : tline
2226 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2227 2 : .await?,
2228 2 : RELSEG_SIZE
2229 2 : );
2230 2 : assert_current_logical_size(&tline, Lsn(lsn));
2231 2 :
2232 2 : // Truncate another block
2233 2 : lsn += 0x10;
2234 2 : let mut m = tline.begin_modification(Lsn(lsn));
2235 2 : walingest
2236 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2237 2 : .await?;
2238 2 : m.commit(&ctx).await?;
2239 2 : assert_eq!(
2240 2 : tline
2241 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2242 2 : .await?,
2243 2 : RELSEG_SIZE - 1
2244 2 : );
2245 2 : assert_current_logical_size(&tline, Lsn(lsn));
2246 2 :
2247 2 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2248 2 : // This tests the behavior at segment boundaries
2249 2 : let mut size: i32 = 3000;
2250 6004 : while size >= 0 {
2251 6002 : lsn += 0x10;
2252 6002 : let mut m = tline.begin_modification(Lsn(lsn));
2253 6002 : walingest
2254 6002 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2255 94 : .await?;
2256 6002 : m.commit(&ctx).await?;
2257 6002 : assert_eq!(
2258 6002 : tline
2259 6002 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2260 2 : .await?,
2261 6002 : size as BlockNumber
2262 2 : );
2263 2 :
2264 6002 : size -= 1;
2265 2 : }
2266 2 : assert_current_logical_size(&tline, Lsn(lsn));
2267 2 :
2268 2 : Ok(())
2269 2 : }
2270 :
2271 : /// Replay a wal segment file taken directly from safekeepers.
2272 : ///
2273 : /// This test is useful for benchmarking since it allows us to profile only
2274 : /// the walingest code in a single-threaded executor, and iterate more quickly
2275 : /// without waiting for unrelated steps.
2276 : #[tokio::test]
2277 2 : async fn test_ingest_real_wal() {
2278 2 : use crate::tenant::harness::*;
2279 2 : use postgres_ffi::waldecoder::WalStreamDecoder;
2280 2 : use postgres_ffi::WAL_SEGMENT_SIZE;
2281 2 :
2282 2 : // Define test data path and constants.
2283 2 : //
2284 2 : // Steps to reconstruct the data, if needed:
2285 2 : // 1. Run the pgbench python test
2286 2 : // 2. Take the first wal segment file from safekeeper
2287 2 : // 3. Compress it using `zstd --long input_file`
2288 2 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2289 2 : // 5. Grep sk logs for "restart decoder" to get startpoint
2290 2 : // 6. Run just the decoder from this test to get the endpoint.
2291 2 : // It's the last LSN the decoder will output.
2292 2 : let pg_version = 15; // The test data was generated by pg15
2293 2 : let path = "test_data/sk_wal_segment_from_pgbench";
2294 2 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2295 2 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2296 2 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2297 2 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2298 2 :
2299 2 : let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
2300 8 : let (tenant, ctx) = harness.load().await;
2301 2 :
2302 2 : let remote_initdb_path =
2303 2 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2304 2 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2305 2 :
2306 2 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2307 2 : .expect("creating test dir should work");
2308 2 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2309 2 :
2310 2 : // Bootstrap a real timeline. We can't use create_test_timeline because
2311 2 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2312 2 : // the garbage data.
2313 2 : let tline = tenant
2314 2 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2315 21895 : .await
2316 2 : .unwrap();
2317 2 :
2318 2 : // We fully read and decompress this into memory before decoding
2319 2 : // to get a more accurate perf profile of the decoder.
2320 2 : let bytes = {
2321 2 : use async_compression::tokio::bufread::ZstdDecoder;
2322 2 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2323 2 : let reader = tokio::io::BufReader::new(file);
2324 2 : let decoder = ZstdDecoder::new(reader);
2325 2 : let mut reader = tokio::io::BufReader::new(decoder);
2326 2 : let mut buffer = Vec::new();
2327 220 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2328 2 : buffer
2329 2 : };
2330 2 :
2331 2 : // TODO start a profiler too
2332 2 : let started_at = std::time::Instant::now();
2333 2 :
2334 2 : // Initialize walingest
2335 2 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2336 2 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2337 2 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2338 5 : .await
2339 2 : .unwrap();
2340 2 : let mut modification = tline.begin_modification(startpoint);
2341 2 : let mut decoded = DecodedWALRecord::default();
2342 2 : println!("decoding {} bytes", bytes.len() - xlogoff);
2343 2 :
2344 2 : // Decode and ingest wal. We process the wal in chunks because
2345 2 : // that's what happens when we get bytes from safekeepers.
2346 474686 : for chunk in bytes[xlogoff..].chunks(50) {
2347 474686 : decoder.feed_bytes(chunk);
2348 620536 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2349 145850 : walingest
2350 145850 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
2351 48 : .await
2352 145850 : .unwrap();
2353 2 : }
2354 474686 : modification.commit(&ctx).await.unwrap();
2355 2 : }
2356 2 :
2357 2 : let duration = started_at.elapsed();
2358 2 : println!("done in {:?}", duration);
2359 2 : }
2360 : }
|