Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::time::Duration;
25 : use std::time::SystemTime;
26 :
27 : use pageserver_api::shard::ShardIdentity;
28 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
29 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
30 : use postgres_ffi::TimestampTz;
31 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
32 :
33 : use anyhow::{bail, Context, Result};
34 : use bytes::{Buf, Bytes, BytesMut};
35 : use tracing::*;
36 : use utils::failpoint_support;
37 : use utils::rate_limit::RateLimit;
38 :
39 : use crate::context::RequestContext;
40 : use crate::metrics::WAL_INGEST;
41 : use crate::pgdatadir_mapping::{DatadirModification, Version};
42 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
43 : use crate::tenant::PageReconstructError;
44 : use crate::tenant::Timeline;
45 : use crate::walrecord::*;
46 : use crate::ZERO_PAGE;
47 : use pageserver_api::key::rel_block_to_key;
48 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
49 : use postgres_ffi::pg_constants;
50 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
51 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
52 : use postgres_ffi::v14::xlog_utils::*;
53 : use postgres_ffi::v14::CheckPoint;
54 : use postgres_ffi::TransactionId;
55 : use postgres_ffi::BLCKSZ;
56 : use utils::lsn::Lsn;
57 :
58 : pub struct WalIngest {
59 : shard: ShardIdentity,
60 : pg_version: u32,
61 : checkpoint: CheckPoint,
62 : checkpoint_modified: bool,
63 : warn_ingest_lag: WarnIngestLag,
64 : }
65 :
66 : struct WarnIngestLag {
67 : lag_msg_ratelimit: RateLimit,
68 : future_lsn_msg_ratelimit: RateLimit,
69 : timestamp_invalid_msg_ratelimit: RateLimit,
70 : }
71 :
72 : impl WalIngest {
73 36 : pub async fn new(
74 36 : timeline: &Timeline,
75 36 : startpoint: Lsn,
76 36 : ctx: &RequestContext,
77 36 : ) -> anyhow::Result<WalIngest> {
78 : // Fetch the latest checkpoint into memory, so that we can compare with it
79 : // quickly in `ingest_record` and update it when it changes.
80 36 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
81 36 : let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
82 36 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
83 :
84 36 : Ok(WalIngest {
85 36 : shard: *timeline.get_shard_identity(),
86 36 : pg_version: timeline.pg_version,
87 36 : checkpoint,
88 36 : checkpoint_modified: false,
89 36 : warn_ingest_lag: WarnIngestLag {
90 36 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
91 36 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
92 36 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
93 36 : },
94 36 : })
95 36 : }
96 :
97 : ///
98 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
99 : ///
100 : /// This function updates `lsn` field of `DatadirModification`
101 : ///
102 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
103 : /// relations/pages that the record affects.
104 : ///
105 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
106 : ///
107 437556 : pub async fn ingest_record(
108 437556 : &mut self,
109 437556 : decoded: DecodedWALRecord,
110 437556 : lsn: Lsn,
111 437556 : modification: &mut DatadirModification<'_>,
112 437556 : ctx: &RequestContext,
113 437556 : ) -> anyhow::Result<bool> {
114 437556 : WAL_INGEST.records_received.inc();
115 437556 : let pg_version = modification.tline.pg_version;
116 437556 : let prev_len = modification.len();
117 437556 :
118 437556 : modification.set_lsn(lsn)?;
119 :
120 437556 : if decoded.is_dbase_create_copy(self.pg_version) {
121 : // Records of this type should always be preceded by a commit(), as they
122 : // rely on reading data pages back from the Timeline.
123 0 : assert!(!modification.has_dirty_data_pages());
124 437556 : }
125 :
126 437556 : let mut buf = decoded.record.clone();
127 437556 : buf.advance(decoded.main_data_offset);
128 437556 :
129 437556 : assert!(!self.checkpoint_modified);
130 437556 : if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
131 437502 : && self.checkpoint.update_next_xid(decoded.xl_xid)
132 6 : {
133 6 : self.checkpoint_modified = true;
134 437550 : }
135 :
136 437556 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
137 :
138 437556 : match decoded.xl_rmid {
139 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
140 : // Heap AM records need some special handling, because they modify VM pages
141 : // without registering them with the standard mechanism.
142 436422 : self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
143 0 : .await?;
144 : }
145 : pg_constants::RM_NEON_ID => {
146 0 : self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
147 0 : .await?;
148 : }
149 : // Handle other special record types
150 : pg_constants::RM_SMGR_ID => {
151 48 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
152 48 :
153 48 : if info == pg_constants::XLOG_SMGR_CREATE {
154 48 : let create = XlSmgrCreate::decode(&mut buf);
155 48 : self.ingest_xlog_smgr_create(modification, &create, ctx)
156 36 : .await?;
157 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
158 0 : let truncate = XlSmgrTruncate::decode(&mut buf);
159 0 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
160 0 : .await?;
161 0 : }
162 : }
163 : pg_constants::RM_DBASE_ID => {
164 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
165 0 : debug!(%info, %pg_version, "handle RM_DBASE_ID");
166 :
167 0 : if pg_version == 14 {
168 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
169 0 : let createdb = XlCreateDatabase::decode(&mut buf);
170 0 : debug!("XLOG_DBASE_CREATE v14");
171 :
172 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
173 0 : .await?;
174 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
175 0 : let dropdb = XlDropDatabase::decode(&mut buf);
176 0 : for tablespace_id in dropdb.tablespace_ids {
177 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
178 0 : modification
179 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
180 0 : .await?;
181 : }
182 0 : }
183 0 : } else if pg_version == 15 {
184 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
185 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
186 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
187 : // The XLOG record was renamed between v14 and v15,
188 : // but the record format is the same.
189 : // So we can reuse XlCreateDatabase here.
190 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
191 0 : let createdb = XlCreateDatabase::decode(&mut buf);
192 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
193 0 : .await?;
194 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
195 0 : let dropdb = XlDropDatabase::decode(&mut buf);
196 0 : for tablespace_id in dropdb.tablespace_ids {
197 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
198 0 : modification
199 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
200 0 : .await?;
201 : }
202 0 : }
203 0 : } else if pg_version == 16 {
204 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
205 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
206 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
207 : // The XLOG record was renamed between v14 and v15,
208 : // but the record format is the same.
209 : // So we can reuse XlCreateDatabase here.
210 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
211 0 : let createdb = XlCreateDatabase::decode(&mut buf);
212 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
213 0 : .await?;
214 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
215 0 : let dropdb = XlDropDatabase::decode(&mut buf);
216 0 : for tablespace_id in dropdb.tablespace_ids {
217 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
218 0 : modification
219 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
220 0 : .await?;
221 : }
222 0 : }
223 0 : }
224 : }
225 : pg_constants::RM_TBLSPC_ID => {
226 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
227 : }
228 : pg_constants::RM_CLOG_ID => {
229 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
230 0 :
231 0 : if info == pg_constants::CLOG_ZEROPAGE {
232 0 : let pageno = buf.get_u32_le();
233 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
234 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
235 0 : self.put_slru_page_image(
236 0 : modification,
237 0 : SlruKind::Clog,
238 0 : segno,
239 0 : rpageno,
240 0 : ZERO_PAGE.clone(),
241 0 : ctx,
242 0 : )
243 0 : .await?;
244 : } else {
245 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
246 0 : let xlrec = XlClogTruncate::decode(&mut buf);
247 0 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
248 0 : .await?;
249 : }
250 : }
251 : pg_constants::RM_XACT_ID => {
252 72 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
253 72 :
254 72 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
255 24 : let parsed_xact =
256 24 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
257 24 : self.ingest_xact_record(
258 24 : modification,
259 24 : &parsed_xact,
260 24 : info == pg_constants::XLOG_XACT_COMMIT,
261 24 : decoded.origin_id,
262 24 : ctx,
263 24 : )
264 0 : .await?;
265 48 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
266 48 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
267 : {
268 0 : let parsed_xact =
269 0 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
270 0 : self.ingest_xact_record(
271 0 : modification,
272 0 : &parsed_xact,
273 0 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
274 0 : decoded.origin_id,
275 0 : ctx,
276 0 : )
277 0 : .await?;
278 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
279 0 : trace!(
280 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
281 : decoded.xl_xid,
282 : parsed_xact.xid,
283 : lsn,
284 : );
285 0 : modification
286 0 : .drop_twophase_file(parsed_xact.xid, ctx)
287 0 : .await?;
288 48 : } else if info == pg_constants::XLOG_XACT_PREPARE {
289 0 : modification
290 0 : .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
291 0 : .await?;
292 48 : }
293 : }
294 : pg_constants::RM_MULTIXACT_ID => {
295 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
296 0 :
297 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
298 0 : let pageno = buf.get_u32_le();
299 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
300 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
301 0 : self.put_slru_page_image(
302 0 : modification,
303 0 : SlruKind::MultiXactOffsets,
304 0 : segno,
305 0 : rpageno,
306 0 : ZERO_PAGE.clone(),
307 0 : ctx,
308 0 : )
309 0 : .await?;
310 0 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
311 0 : let pageno = buf.get_u32_le();
312 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
313 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
314 0 : self.put_slru_page_image(
315 0 : modification,
316 0 : SlruKind::MultiXactMembers,
317 0 : segno,
318 0 : rpageno,
319 0 : ZERO_PAGE.clone(),
320 0 : ctx,
321 0 : )
322 0 : .await?;
323 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
324 0 : let xlrec = XlMultiXactCreate::decode(&mut buf);
325 0 : self.ingest_multixact_create_record(modification, &xlrec)?;
326 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
327 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
328 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
329 0 : .await?;
330 0 : }
331 : }
332 : pg_constants::RM_RELMAP_ID => {
333 0 : let xlrec = XlRelmapUpdate::decode(&mut buf);
334 0 : self.ingest_relmap_page(modification, &xlrec, &decoded, ctx)
335 0 : .await?;
336 : }
337 : pg_constants::RM_XLOG_ID => {
338 90 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
339 90 :
340 90 : if info == pg_constants::XLOG_NEXTOID {
341 6 : let next_oid = buf.get_u32_le();
342 6 : if self.checkpoint.nextOid != next_oid {
343 6 : self.checkpoint.nextOid = next_oid;
344 6 : self.checkpoint_modified = true;
345 6 : }
346 84 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
347 84 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
348 : {
349 6 : let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
350 6 : buf.copy_to_slice(&mut checkpoint_bytes);
351 6 : let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
352 6 : trace!(
353 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
354 : xlog_checkpoint.oldestXid,
355 : self.checkpoint.oldestXid
356 : );
357 6 : if (self
358 6 : .checkpoint
359 6 : .oldestXid
360 6 : .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
361 6 : < 0
362 0 : {
363 0 : self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
364 6 : }
365 6 : trace!(
366 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
367 : xlog_checkpoint.oldestActiveXid,
368 : self.checkpoint.oldestActiveXid
369 : );
370 :
371 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
372 : // because at shutdown, all in-progress transactions will implicitly
373 : // end. Postgres startup code knows that, and allows hot standby to start
374 : // immediately from a shutdown checkpoint.
375 : //
376 : // In Neon, Postgres hot standby startup always behaves as if starting from
377 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
378 : // instead of overwriting self.checkpoint.oldestActiveXid with
379 : // InvalidTransactionid from the checkpoint WAL record, update it to a
380 : // proper value, knowing that there are no in-progress transactions at this
381 : // point, except for prepared transactions.
382 : //
383 : // See also the neon code changes in the InitWalRecovery() function.
384 6 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
385 6 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
386 : {
387 6 : let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
388 6 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
389 0 : if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
390 0 : oldest_active_xid = xid;
391 0 : }
392 : }
393 6 : self.checkpoint.oldestActiveXid = oldest_active_xid;
394 0 : } else {
395 0 : self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
396 0 : }
397 :
398 : // Write a new checkpoint key-value pair on every checkpoint record, even
399 : // if nothing really changed. Not strictly required, but it seems nice to
400 : // have some trace of the checkpoint records in the layer files at the same
401 : // LSNs.
402 6 : self.checkpoint_modified = true;
403 78 : }
404 : }
405 : pg_constants::RM_LOGICALMSG_ID => {
406 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
407 0 :
408 0 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
409 0 : let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
410 0 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
411 0 : let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
412 0 : if prefix == "neon-test" {
413 : // This is a convenient way to make the WAL ingestion pause at
414 : // particular point in the WAL. For more fine-grained control,
415 : // we could peek into the message and only pause if it contains
416 : // a particular string, for example, but this is enough for now.
417 0 : failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
418 0 : } else if let Some(path) = prefix.strip_prefix("neon-file:") {
419 0 : modification.put_file(path, message, ctx).await?;
420 0 : }
421 0 : }
422 : }
423 : pg_constants::RM_STANDBY_ID => {
424 48 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
425 48 : if info == pg_constants::XLOG_RUNNING_XACTS {
426 0 : let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
427 0 : self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
428 0 : self.checkpoint_modified = true;
429 48 : }
430 : }
431 : pg_constants::RM_REPLORIGIN_ID => {
432 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
433 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
434 0 : let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
435 0 : modification
436 0 : .set_replorigin(xlrec.node_id, xlrec.remote_lsn)
437 0 : .await?
438 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
439 0 : let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
440 0 : modification.drop_replorigin(xlrec.node_id).await?
441 0 : }
442 : }
443 876 : _x => {
444 876 : // TODO: should probably log & fail here instead of blindly
445 876 : // doing something without understanding the protocol
446 876 : }
447 : }
448 :
449 : // Iterate through all the blocks that the record modifies, and
450 : // "put" a separate copy of the record for each block.
451 437556 : for blk in decoded.blocks.iter() {
452 436926 : let rel = RelTag {
453 436926 : spcnode: blk.rnode_spcnode,
454 436926 : dbnode: blk.rnode_dbnode,
455 436926 : relnode: blk.rnode_relnode,
456 436926 : forknum: blk.forknum,
457 436926 : };
458 436926 :
459 436926 : let key = rel_block_to_key(rel, blk.blkno);
460 436926 : let key_is_local = self.shard.is_key_local(&key);
461 436926 :
462 436926 : tracing::debug!(
463 : lsn=%lsn,
464 : key=%key,
465 0 : "ingest: shard decision {} (checkpoint={})",
466 0 : if !key_is_local { "drop" } else { "keep" },
467 : self.checkpoint_modified
468 : );
469 :
470 436926 : if !key_is_local {
471 0 : if self.shard.is_shard_zero() {
472 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
473 : // its blkno in case it implicitly extends a relation.
474 0 : self.observe_decoded_block(modification, blk, ctx).await?;
475 0 : }
476 :
477 0 : continue;
478 436926 : }
479 436926 : self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
480 852 : .await?;
481 : }
482 :
483 : // If checkpoint data was updated, store the new version in the repository
484 437556 : if self.checkpoint_modified {
485 18 : let new_checkpoint_bytes = self.checkpoint.encode()?;
486 :
487 18 : modification.put_checkpoint(new_checkpoint_bytes)?;
488 18 : self.checkpoint_modified = false;
489 437538 : }
490 :
491 : // Note that at this point this record is only cached in the modification
492 : // until commit() is called to flush the data into the repository and update
493 : // the latest LSN.
494 :
495 437556 : modification.on_record_end();
496 437556 :
497 437556 : Ok(modification.len() > prev_len)
498 437556 : }
499 :
500 : /// Do not store this block, but observe it for the purposes of updating our relation size state.
501 0 : async fn observe_decoded_block(
502 0 : &mut self,
503 0 : modification: &mut DatadirModification<'_>,
504 0 : blk: &DecodedBkpBlock,
505 0 : ctx: &RequestContext,
506 0 : ) -> Result<(), PageReconstructError> {
507 0 : let rel = RelTag {
508 0 : spcnode: blk.rnode_spcnode,
509 0 : dbnode: blk.rnode_dbnode,
510 0 : relnode: blk.rnode_relnode,
511 0 : forknum: blk.forknum,
512 0 : };
513 0 : self.handle_rel_extend(modification, rel, blk.blkno, ctx)
514 0 : .await
515 0 : }
516 :
517 436926 : async fn ingest_decoded_block(
518 436926 : &mut self,
519 436926 : modification: &mut DatadirModification<'_>,
520 436926 : lsn: Lsn,
521 436926 : decoded: &DecodedWALRecord,
522 436926 : blk: &DecodedBkpBlock,
523 436926 : ctx: &RequestContext,
524 436926 : ) -> Result<(), PageReconstructError> {
525 436926 : let rel = RelTag {
526 436926 : spcnode: blk.rnode_spcnode,
527 436926 : dbnode: blk.rnode_dbnode,
528 436926 : relnode: blk.rnode_relnode,
529 436926 : forknum: blk.forknum,
530 436926 : };
531 436926 :
532 436926 : //
533 436926 : // Instead of storing full-page-image WAL record,
534 436926 : // it is better to store extracted image: we can skip wal-redo
535 436926 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
536 436926 : // so them have to be copied multiple times.
537 436926 : //
538 436926 : if blk.apply_image
539 180 : && blk.has_image
540 180 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
541 72 : && (decoded.xl_info == pg_constants::XLOG_FPI
542 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
543 : // compression of WAL is not yet supported: fall back to storing the original WAL record
544 72 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
545 : // do not materialize null pages because them most likely be soon replaced with real data
546 72 : && blk.bimg_len != 0
547 : {
548 : // Extract page image from FPI record
549 72 : let img_len = blk.bimg_len as usize;
550 72 : let img_offs = blk.bimg_offset as usize;
551 72 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
552 72 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
553 72 :
554 72 : if blk.hole_length != 0 {
555 0 : let tail = image.split_off(blk.hole_offset as usize);
556 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
557 0 : image.unsplit(tail);
558 72 : }
559 : //
560 : // Match the logic of XLogReadBufferForRedoExtended:
561 : // The page may be uninitialized. If so, we can't set the LSN because
562 : // that would corrupt the page.
563 : //
564 72 : if !page_is_new(&image) {
565 54 : page_set_lsn(&mut image, lsn)
566 18 : }
567 72 : assert_eq!(image.len(), BLCKSZ as usize);
568 :
569 72 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
570 15 : .await?;
571 : } else {
572 436854 : let rec = NeonWalRecord::Postgres {
573 436854 : will_init: blk.will_init || blk.apply_image,
574 436854 : rec: decoded.record.clone(),
575 436854 : };
576 436854 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
577 837 : .await?;
578 : }
579 436926 : Ok(())
580 436926 : }
581 :
582 436422 : async fn ingest_heapam_record(
583 436422 : &mut self,
584 436422 : buf: &mut Bytes,
585 436422 : modification: &mut DatadirModification<'_>,
586 436422 : decoded: &DecodedWALRecord,
587 436422 : ctx: &RequestContext,
588 436422 : ) -> anyhow::Result<()> {
589 436422 : // Handle VM bit updates that are implicitly part of heap records.
590 436422 :
591 436422 : // First, look at the record to determine which VM bits need
592 436422 : // to be cleared. If either of these variables is set, we
593 436422 : // need to clear the corresponding bits in the visibility map.
594 436422 : let mut new_heap_blkno: Option<u32> = None;
595 436422 : let mut old_heap_blkno: Option<u32> = None;
596 436422 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
597 436422 :
598 436422 : match modification.tline.pg_version {
599 : 14 => {
600 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
601 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
602 0 :
603 0 : if info == pg_constants::XLOG_HEAP_INSERT {
604 0 : let xlrec = v14::XlHeapInsert::decode(buf);
605 0 : assert_eq!(0, buf.remaining());
606 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
607 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
608 0 : }
609 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
610 0 : let xlrec = v14::XlHeapDelete::decode(buf);
611 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
612 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
613 0 : }
614 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
615 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
616 : {
617 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
618 0 : // the size of tuple data is inferred from the size of the record.
619 0 : // we can't validate the remaining number of bytes without parsing
620 0 : // the tuple data.
621 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
622 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
623 0 : }
624 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
625 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
626 0 : // non-HOT update where the new tuple goes to different page than
627 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
628 0 : // set.
629 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
630 0 : }
631 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
632 0 : let xlrec = v14::XlHeapLock::decode(buf);
633 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
634 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
635 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
636 0 : }
637 0 : }
638 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
639 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
640 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
641 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
642 :
643 0 : let offset_array_len =
644 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
645 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
646 0 : 0
647 : } else {
648 0 : size_of::<u16>() * xlrec.ntuples as usize
649 : };
650 0 : assert_eq!(offset_array_len, buf.remaining());
651 :
652 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
653 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
654 0 : }
655 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
656 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
657 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
658 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
659 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
660 0 : }
661 0 : }
662 : } else {
663 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
664 : }
665 : }
666 : 15 => {
667 436422 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
668 435858 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
669 435858 :
670 435858 : if info == pg_constants::XLOG_HEAP_INSERT {
671 435828 : let xlrec = v15::XlHeapInsert::decode(buf);
672 435828 : assert_eq!(0, buf.remaining());
673 435828 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
674 12 : new_heap_blkno = Some(decoded.blocks[0].blkno);
675 435816 : }
676 30 : } else if info == pg_constants::XLOG_HEAP_DELETE {
677 0 : let xlrec = v15::XlHeapDelete::decode(buf);
678 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
679 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
680 0 : }
681 30 : } else if info == pg_constants::XLOG_HEAP_UPDATE
682 6 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
683 : {
684 24 : let xlrec = v15::XlHeapUpdate::decode(buf);
685 24 : // the size of tuple data is inferred from the size of the record.
686 24 : // we can't validate the remaining number of bytes without parsing
687 24 : // the tuple data.
688 24 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
689 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
690 24 : }
691 24 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
692 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
693 0 : // non-HOT update where the new tuple goes to different page than
694 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
695 0 : // set.
696 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
697 24 : }
698 6 : } else if info == pg_constants::XLOG_HEAP_LOCK {
699 0 : let xlrec = v15::XlHeapLock::decode(buf);
700 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
701 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
702 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
703 0 : }
704 6 : }
705 564 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
706 564 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
707 564 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
708 126 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
709 :
710 126 : let offset_array_len =
711 126 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
712 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
713 6 : 0
714 : } else {
715 120 : size_of::<u16>() * xlrec.ntuples as usize
716 : };
717 126 : assert_eq!(offset_array_len, buf.remaining());
718 :
719 126 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
720 24 : new_heap_blkno = Some(decoded.blocks[0].blkno);
721 102 : }
722 438 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
723 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
724 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
725 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
726 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
727 0 : }
728 438 : }
729 : } else {
730 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
731 : }
732 : }
733 : 16 => {
734 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
735 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
736 0 :
737 0 : if info == pg_constants::XLOG_HEAP_INSERT {
738 0 : let xlrec = v16::XlHeapInsert::decode(buf);
739 0 : assert_eq!(0, buf.remaining());
740 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
741 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
742 0 : }
743 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
744 0 : let xlrec = v16::XlHeapDelete::decode(buf);
745 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
746 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
747 0 : }
748 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
749 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
750 : {
751 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
752 0 : // the size of tuple data is inferred from the size of the record.
753 0 : // we can't validate the remaining number of bytes without parsing
754 0 : // the tuple data.
755 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
756 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
757 0 : }
758 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
759 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
760 0 : // non-HOT update where the new tuple goes to different page than
761 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
762 0 : // set.
763 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
764 0 : }
765 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
766 0 : let xlrec = v16::XlHeapLock::decode(buf);
767 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
768 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
769 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
770 0 : }
771 0 : }
772 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
773 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
774 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
775 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
776 :
777 0 : let offset_array_len =
778 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
779 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
780 0 : 0
781 : } else {
782 0 : size_of::<u16>() * xlrec.ntuples as usize
783 : };
784 0 : assert_eq!(offset_array_len, buf.remaining());
785 :
786 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
787 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
788 0 : }
789 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
790 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
791 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
792 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
793 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
794 0 : }
795 0 : }
796 : } else {
797 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
798 : }
799 : }
800 0 : _ => {}
801 : }
802 :
803 : // Clear the VM bits if required.
804 436422 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
805 36 : let vm_rel = RelTag {
806 36 : forknum: VISIBILITYMAP_FORKNUM,
807 36 : spcnode: decoded.blocks[0].rnode_spcnode,
808 36 : dbnode: decoded.blocks[0].rnode_dbnode,
809 36 : relnode: decoded.blocks[0].rnode_relnode,
810 36 : };
811 36 :
812 36 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
813 36 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
814 :
815 : // Sometimes, Postgres seems to create heap WAL records with the
816 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
817 : // not set. In fact, it's possible that the VM page does not exist at all.
818 : // In that case, we don't want to store a record to clear the VM bit;
819 : // replaying it would fail to find the previous image of the page, because
820 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
821 : // record if it doesn't.
822 36 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
823 36 : if let Some(blknum) = new_vm_blk {
824 36 : if blknum >= vm_size {
825 0 : new_vm_blk = None;
826 36 : }
827 0 : }
828 36 : if let Some(blknum) = old_vm_blk {
829 0 : if blknum >= vm_size {
830 0 : old_vm_blk = None;
831 0 : }
832 36 : }
833 :
834 36 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
835 36 : if new_vm_blk == old_vm_blk {
836 : // An UPDATE record that needs to clear the bits for both old and the
837 : // new page, both of which reside on the same VM page.
838 0 : self.put_rel_wal_record(
839 0 : modification,
840 0 : vm_rel,
841 0 : new_vm_blk.unwrap(),
842 0 : NeonWalRecord::ClearVisibilityMapFlags {
843 0 : new_heap_blkno,
844 0 : old_heap_blkno,
845 0 : flags,
846 0 : },
847 0 : ctx,
848 0 : )
849 0 : .await?;
850 : } else {
851 : // Clear VM bits for one heap page, or for two pages that reside on
852 : // different VM pages.
853 36 : if let Some(new_vm_blk) = new_vm_blk {
854 36 : self.put_rel_wal_record(
855 36 : modification,
856 36 : vm_rel,
857 36 : new_vm_blk,
858 36 : NeonWalRecord::ClearVisibilityMapFlags {
859 36 : new_heap_blkno,
860 36 : old_heap_blkno: None,
861 36 : flags,
862 36 : },
863 36 : ctx,
864 36 : )
865 0 : .await?;
866 0 : }
867 36 : if let Some(old_vm_blk) = old_vm_blk {
868 0 : self.put_rel_wal_record(
869 0 : modification,
870 0 : vm_rel,
871 0 : old_vm_blk,
872 0 : NeonWalRecord::ClearVisibilityMapFlags {
873 0 : new_heap_blkno: None,
874 0 : old_heap_blkno,
875 0 : flags,
876 0 : },
877 0 : ctx,
878 0 : )
879 0 : .await?;
880 36 : }
881 : }
882 0 : }
883 436386 : }
884 :
885 436422 : Ok(())
886 436422 : }
887 :
888 0 : async fn ingest_neonrmgr_record(
889 0 : &mut self,
890 0 : buf: &mut Bytes,
891 0 : modification: &mut DatadirModification<'_>,
892 0 : decoded: &DecodedWALRecord,
893 0 : ctx: &RequestContext,
894 0 : ) -> anyhow::Result<()> {
895 0 : // Handle VM bit updates that are implicitly part of heap records.
896 0 :
897 0 : // First, look at the record to determine which VM bits need
898 0 : // to be cleared. If either of these variables is set, we
899 0 : // need to clear the corresponding bits in the visibility map.
900 0 : let mut new_heap_blkno: Option<u32> = None;
901 0 : let mut old_heap_blkno: Option<u32> = None;
902 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
903 0 : let pg_version = modification.tline.pg_version;
904 0 :
905 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
906 :
907 0 : match pg_version {
908 : 16 => {
909 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
910 0 :
911 0 : match info {
912 : pg_constants::XLOG_NEON_HEAP_INSERT => {
913 0 : let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
914 0 : assert_eq!(0, buf.remaining());
915 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
916 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
917 0 : }
918 : }
919 : pg_constants::XLOG_NEON_HEAP_DELETE => {
920 0 : let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
921 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
922 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
923 0 : }
924 : }
925 : pg_constants::XLOG_NEON_HEAP_UPDATE
926 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
927 0 : let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
928 0 : // the size of tuple data is inferred from the size of the record.
929 0 : // we can't validate the remaining number of bytes without parsing
930 0 : // the tuple data.
931 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
932 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
933 0 : }
934 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
935 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
936 0 : // non-HOT update where the new tuple goes to different page than
937 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
938 0 : // set.
939 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
940 0 : }
941 : }
942 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
943 0 : let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
944 :
945 0 : let offset_array_len =
946 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
947 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
948 0 : 0
949 : } else {
950 0 : size_of::<u16>() * xlrec.ntuples as usize
951 : };
952 0 : assert_eq!(offset_array_len, buf.remaining());
953 :
954 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
955 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
956 0 : }
957 : }
958 : pg_constants::XLOG_NEON_HEAP_LOCK => {
959 0 : let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
960 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
961 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
962 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
963 0 : }
964 : }
965 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
966 : }
967 : }
968 0 : _ => bail!(
969 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
970 0 : pg_version
971 0 : ),
972 : }
973 :
974 : // Clear the VM bits if required.
975 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
976 0 : let vm_rel = RelTag {
977 0 : forknum: VISIBILITYMAP_FORKNUM,
978 0 : spcnode: decoded.blocks[0].rnode_spcnode,
979 0 : dbnode: decoded.blocks[0].rnode_dbnode,
980 0 : relnode: decoded.blocks[0].rnode_relnode,
981 0 : };
982 0 :
983 0 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
984 0 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
985 :
986 : // Sometimes, Postgres seems to create heap WAL records with the
987 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
988 : // not set. In fact, it's possible that the VM page does not exist at all.
989 : // In that case, we don't want to store a record to clear the VM bit;
990 : // replaying it would fail to find the previous image of the page, because
991 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
992 : // record if it doesn't.
993 0 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
994 0 : if let Some(blknum) = new_vm_blk {
995 0 : if blknum >= vm_size {
996 0 : new_vm_blk = None;
997 0 : }
998 0 : }
999 0 : if let Some(blknum) = old_vm_blk {
1000 0 : if blknum >= vm_size {
1001 0 : old_vm_blk = None;
1002 0 : }
1003 0 : }
1004 :
1005 0 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
1006 0 : if new_vm_blk == old_vm_blk {
1007 : // An UPDATE record that needs to clear the bits for both old and the
1008 : // new page, both of which reside on the same VM page.
1009 0 : self.put_rel_wal_record(
1010 0 : modification,
1011 0 : vm_rel,
1012 0 : new_vm_blk.unwrap(),
1013 0 : NeonWalRecord::ClearVisibilityMapFlags {
1014 0 : new_heap_blkno,
1015 0 : old_heap_blkno,
1016 0 : flags,
1017 0 : },
1018 0 : ctx,
1019 0 : )
1020 0 : .await?;
1021 : } else {
1022 : // Clear VM bits for one heap page, or for two pages that reside on
1023 : // different VM pages.
1024 0 : if let Some(new_vm_blk) = new_vm_blk {
1025 0 : self.put_rel_wal_record(
1026 0 : modification,
1027 0 : vm_rel,
1028 0 : new_vm_blk,
1029 0 : NeonWalRecord::ClearVisibilityMapFlags {
1030 0 : new_heap_blkno,
1031 0 : old_heap_blkno: None,
1032 0 : flags,
1033 0 : },
1034 0 : ctx,
1035 0 : )
1036 0 : .await?;
1037 0 : }
1038 0 : if let Some(old_vm_blk) = old_vm_blk {
1039 0 : self.put_rel_wal_record(
1040 0 : modification,
1041 0 : vm_rel,
1042 0 : old_vm_blk,
1043 0 : NeonWalRecord::ClearVisibilityMapFlags {
1044 0 : new_heap_blkno: None,
1045 0 : old_heap_blkno,
1046 0 : flags,
1047 0 : },
1048 0 : ctx,
1049 0 : )
1050 0 : .await?;
1051 0 : }
1052 : }
1053 0 : }
1054 0 : }
1055 :
1056 0 : Ok(())
1057 0 : }
1058 :
1059 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
1060 0 : async fn ingest_xlog_dbase_create(
1061 0 : &mut self,
1062 0 : modification: &mut DatadirModification<'_>,
1063 0 : rec: &XlCreateDatabase,
1064 0 : ctx: &RequestContext,
1065 0 : ) -> anyhow::Result<()> {
1066 0 : let db_id = rec.db_id;
1067 0 : let tablespace_id = rec.tablespace_id;
1068 0 : let src_db_id = rec.src_db_id;
1069 0 : let src_tablespace_id = rec.src_tablespace_id;
1070 :
1071 0 : let rels = modification
1072 0 : .tline
1073 0 : .list_rels(
1074 0 : src_tablespace_id,
1075 0 : src_db_id,
1076 0 : Version::Modified(modification),
1077 0 : ctx,
1078 0 : )
1079 0 : .await?;
1080 :
1081 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
1082 :
1083 : // Copy relfilemap
1084 0 : let filemap = modification
1085 0 : .tline
1086 0 : .get_relmap_file(
1087 0 : src_tablespace_id,
1088 0 : src_db_id,
1089 0 : Version::Modified(modification),
1090 0 : ctx,
1091 0 : )
1092 0 : .await?;
1093 0 : modification
1094 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
1095 0 : .await?;
1096 :
1097 0 : let mut num_rels_copied = 0;
1098 0 : let mut num_blocks_copied = 0;
1099 0 : for src_rel in rels {
1100 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
1101 0 : assert_eq!(src_rel.dbnode, src_db_id);
1102 :
1103 0 : let nblocks = modification
1104 0 : .tline
1105 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
1106 0 : .await?;
1107 0 : let dst_rel = RelTag {
1108 0 : spcnode: tablespace_id,
1109 0 : dbnode: db_id,
1110 0 : relnode: src_rel.relnode,
1111 0 : forknum: src_rel.forknum,
1112 0 : };
1113 0 :
1114 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
1115 :
1116 : // Copy content
1117 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
1118 0 : for blknum in 0..nblocks {
1119 : // Sharding:
1120 : // - src and dst are always on the same shard, because they differ only by dbNode, and
1121 : // dbNode is not included in the hash inputs for sharding.
1122 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
1123 : // that belong to it.
1124 0 : let src_key = rel_block_to_key(src_rel, blknum);
1125 0 : if !self.shard.is_key_local(&src_key) {
1126 0 : debug!(
1127 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
1128 : src_key
1129 : );
1130 0 : continue;
1131 0 : }
1132 0 : debug!(
1133 0 : "copying block {} from {} ({}) to {}",
1134 : blknum, src_rel, src_key, dst_rel
1135 : );
1136 :
1137 0 : let content = modification
1138 0 : .tline
1139 0 : .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
1140 0 : .await?;
1141 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
1142 0 : num_blocks_copied += 1;
1143 : }
1144 :
1145 0 : num_rels_copied += 1;
1146 : }
1147 :
1148 0 : info!(
1149 0 : "Created database {}/{}, copied {} blocks in {} rels",
1150 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
1151 : );
1152 0 : Ok(())
1153 0 : }
1154 :
1155 48 : async fn ingest_xlog_smgr_create(
1156 48 : &mut self,
1157 48 : modification: &mut DatadirModification<'_>,
1158 48 : rec: &XlSmgrCreate,
1159 48 : ctx: &RequestContext,
1160 48 : ) -> anyhow::Result<()> {
1161 48 : let rel = RelTag {
1162 48 : spcnode: rec.rnode.spcnode,
1163 48 : dbnode: rec.rnode.dbnode,
1164 48 : relnode: rec.rnode.relnode,
1165 48 : forknum: rec.forknum,
1166 48 : };
1167 48 : self.put_rel_creation(modification, rel, ctx).await?;
1168 48 : Ok(())
1169 48 : }
1170 :
1171 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1172 : ///
1173 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1174 0 : async fn ingest_xlog_smgr_truncate(
1175 0 : &mut self,
1176 0 : modification: &mut DatadirModification<'_>,
1177 0 : rec: &XlSmgrTruncate,
1178 0 : ctx: &RequestContext,
1179 0 : ) -> anyhow::Result<()> {
1180 0 : let spcnode = rec.rnode.spcnode;
1181 0 : let dbnode = rec.rnode.dbnode;
1182 0 : let relnode = rec.rnode.relnode;
1183 0 :
1184 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
1185 0 : let rel = RelTag {
1186 0 : spcnode,
1187 0 : dbnode,
1188 0 : relnode,
1189 0 : forknum: MAIN_FORKNUM,
1190 0 : };
1191 0 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
1192 0 : .await?;
1193 0 : }
1194 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
1195 0 : let rel = RelTag {
1196 0 : spcnode,
1197 0 : dbnode,
1198 0 : relnode,
1199 0 : forknum: FSM_FORKNUM,
1200 0 : };
1201 0 :
1202 0 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1203 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1204 0 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1205 0 : // Tail of last remaining FSM page has to be zeroed.
1206 0 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1207 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
1208 0 : fsm_physical_page_no += 1;
1209 0 : }
1210 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1211 0 : if nblocks > fsm_physical_page_no {
1212 : // check if something to do: FSM is larger than truncate position
1213 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1214 0 : .await?;
1215 0 : }
1216 0 : }
1217 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
1218 0 : let rel = RelTag {
1219 0 : spcnode,
1220 0 : dbnode,
1221 0 : relnode,
1222 0 : forknum: VISIBILITYMAP_FORKNUM,
1223 0 : };
1224 0 :
1225 0 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1226 0 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1227 0 : // Tail of last remaining vm page has to be zeroed.
1228 0 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1229 0 : modification.put_rel_page_image_zero(rel, vm_page_no);
1230 0 : vm_page_no += 1;
1231 0 : }
1232 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1233 0 : if nblocks > vm_page_no {
1234 : // check if something to do: VM is larger than truncate position
1235 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1236 0 : .await?;
1237 0 : }
1238 0 : }
1239 0 : Ok(())
1240 0 : }
1241 :
1242 24 : fn warn_on_ingest_lag(
1243 24 : &mut self,
1244 24 : conf: &crate::config::PageServerConf,
1245 24 : wal_timestmap: TimestampTz,
1246 24 : ) {
1247 24 : debug_assert_current_span_has_tenant_and_timeline_id();
1248 24 : let now = SystemTime::now();
1249 24 : let rate_limits = &mut self.warn_ingest_lag;
1250 24 : match try_from_pg_timestamp(wal_timestmap) {
1251 24 : Ok(ts) => {
1252 24 : match now.duration_since(ts) {
1253 24 : Ok(lag) => {
1254 24 : if lag > conf.wait_lsn_timeout {
1255 24 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
1256 6 : let lag = humantime::format_duration(lag);
1257 6 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
1258 24 : })
1259 0 : }
1260 : },
1261 0 : Err(e) => {
1262 0 : let delta_t = e.duration();
1263 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
1264 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
1265 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
1266 0 : if delta_t > IGNORED_DRIFT {
1267 0 : let delta_t = humantime::format_duration(delta_t);
1268 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
1269 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
1270 0 : })
1271 0 : }
1272 : }
1273 : };
1274 :
1275 : }
1276 0 : Err(error) => {
1277 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
1278 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
1279 0 : })
1280 : }
1281 : }
1282 24 : }
1283 :
1284 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1285 : ///
1286 24 : async fn ingest_xact_record(
1287 24 : &mut self,
1288 24 : modification: &mut DatadirModification<'_>,
1289 24 : parsed: &XlXactParsedRecord,
1290 24 : is_commit: bool,
1291 24 : origin_id: u16,
1292 24 : ctx: &RequestContext,
1293 24 : ) -> anyhow::Result<()> {
1294 24 : // Record update of CLOG pages
1295 24 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1296 24 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1297 24 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1298 24 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1299 24 :
1300 24 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
1301 :
1302 24 : for subxact in &parsed.subxacts {
1303 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1304 0 : if subxact_pageno != pageno {
1305 : // This subxact goes to different page. Write the record
1306 : // for all the XIDs on the previous page, and continue
1307 : // accumulating XIDs on this new page.
1308 0 : modification.put_slru_wal_record(
1309 0 : SlruKind::Clog,
1310 0 : segno,
1311 0 : rpageno,
1312 0 : if is_commit {
1313 0 : NeonWalRecord::ClogSetCommitted {
1314 0 : xids: page_xids,
1315 0 : timestamp: parsed.xact_time,
1316 0 : }
1317 : } else {
1318 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1319 : },
1320 0 : )?;
1321 0 : page_xids = Vec::new();
1322 0 : }
1323 0 : pageno = subxact_pageno;
1324 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1325 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1326 0 : page_xids.push(*subxact);
1327 : }
1328 24 : modification.put_slru_wal_record(
1329 24 : SlruKind::Clog,
1330 24 : segno,
1331 24 : rpageno,
1332 24 : if is_commit {
1333 24 : NeonWalRecord::ClogSetCommitted {
1334 24 : xids: page_xids,
1335 24 : timestamp: parsed.xact_time,
1336 24 : }
1337 : } else {
1338 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1339 : },
1340 0 : )?;
1341 :
1342 24 : for xnode in &parsed.xnodes {
1343 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1344 0 : let rel = RelTag {
1345 0 : forknum,
1346 0 : spcnode: xnode.spcnode,
1347 0 : dbnode: xnode.dbnode,
1348 0 : relnode: xnode.relnode,
1349 0 : };
1350 0 : if modification
1351 0 : .tline
1352 0 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1353 0 : .await?
1354 : {
1355 0 : self.put_rel_drop(modification, rel, ctx).await?;
1356 0 : }
1357 : }
1358 : }
1359 24 : if origin_id != 0 {
1360 0 : modification
1361 0 : .set_replorigin(origin_id, parsed.origin_lsn)
1362 0 : .await?;
1363 24 : }
1364 24 : Ok(())
1365 24 : }
1366 :
1367 0 : async fn ingest_clog_truncate_record(
1368 0 : &mut self,
1369 0 : modification: &mut DatadirModification<'_>,
1370 0 : xlrec: &XlClogTruncate,
1371 0 : ctx: &RequestContext,
1372 0 : ) -> anyhow::Result<()> {
1373 0 : info!(
1374 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1375 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
1376 : );
1377 :
1378 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
1379 : // truncated, but a checkpoint record with the updated values isn't written until
1380 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
1381 : // so we keep the oldestXid and oldestXidDB up-to-date.
1382 0 : self.checkpoint.oldestXid = xlrec.oldest_xid;
1383 0 : self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
1384 0 : self.checkpoint_modified = true;
1385 0 :
1386 0 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1387 0 :
1388 0 : let latest_page_number =
1389 0 : self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
1390 0 :
1391 0 : // Now delete all segments containing pages between xlrec.pageno
1392 0 : // and latest_page_number.
1393 0 :
1394 0 : // First, make an important safety check:
1395 0 : // the current endpoint page must not be eligible for removal.
1396 0 : // See SimpleLruTruncate() in slru.c
1397 0 : if clogpage_precedes(latest_page_number, xlrec.pageno) {
1398 0 : info!("could not truncate directory pg_xact apparent wraparound");
1399 0 : return Ok(());
1400 0 : }
1401 :
1402 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1403 : //
1404 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1405 : // will block waiting for the last valid LSN to advance up to
1406 : // it. So we use the previous record's LSN in the get calls
1407 : // instead.
1408 0 : for segno in modification
1409 0 : .tline
1410 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
1411 0 : .await?
1412 : {
1413 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1414 0 : if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
1415 0 : modification
1416 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1417 0 : .await?;
1418 0 : trace!("Drop CLOG segment {:>04X}", segno);
1419 0 : }
1420 : }
1421 :
1422 0 : Ok(())
1423 0 : }
1424 :
1425 0 : fn ingest_multixact_create_record(
1426 0 : &mut self,
1427 0 : modification: &mut DatadirModification,
1428 0 : xlrec: &XlMultiXactCreate,
1429 0 : ) -> Result<()> {
1430 0 : // Create WAL record for updating the multixact-offsets page
1431 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1432 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1433 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1434 0 :
1435 0 : modification.put_slru_wal_record(
1436 0 : SlruKind::MultiXactOffsets,
1437 0 : segno,
1438 0 : rpageno,
1439 0 : NeonWalRecord::MultixactOffsetCreate {
1440 0 : mid: xlrec.mid,
1441 0 : moff: xlrec.moff,
1442 0 : },
1443 0 : )?;
1444 :
1445 : // Create WAL records for the update of each affected multixact-members page
1446 0 : let mut members = xlrec.members.iter();
1447 0 : let mut offset = xlrec.moff;
1448 : loop {
1449 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1450 0 :
1451 0 : // How many members fit on this page?
1452 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1453 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1454 0 :
1455 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1456 0 : for _ in 0..page_remain {
1457 0 : if let Some(m) = members.next() {
1458 0 : this_page_members.push(m.clone());
1459 0 : } else {
1460 0 : break;
1461 : }
1462 : }
1463 0 : if this_page_members.is_empty() {
1464 : // all done
1465 0 : break;
1466 0 : }
1467 0 : let n_this_page = this_page_members.len();
1468 0 :
1469 0 : modification.put_slru_wal_record(
1470 0 : SlruKind::MultiXactMembers,
1471 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1472 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1473 0 : NeonWalRecord::MultixactMembersCreate {
1474 0 : moff: offset,
1475 0 : members: this_page_members,
1476 0 : },
1477 0 : )?;
1478 :
1479 : // Note: The multixact members can wrap around, even within one WAL record.
1480 0 : offset = offset.wrapping_add(n_this_page as u32);
1481 : }
1482 0 : let next_offset = offset;
1483 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
1484 :
1485 : // Update next-multi-xid and next-offset
1486 : //
1487 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
1488 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
1489 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
1490 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
1491 : // is stored, so it's never 0 in a checkpoint.
1492 : //
1493 : // I don't know why it's done that way, it seems less error-prone to skip over 0
1494 : // when the value is stored rather than when it's read. But let's do it the same
1495 : // way here.
1496 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
1497 0 :
1498 0 : if self
1499 0 : .checkpoint
1500 0 : .update_next_multixid(next_multi_xid, next_offset)
1501 0 : {
1502 0 : self.checkpoint_modified = true;
1503 0 : }
1504 :
1505 : // Also update the next-xid with the highest member. According to the comments in
1506 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
1507 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1508 0 : if let Some(max_xid) = acc {
1509 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1510 0 : Some(mbr.xid)
1511 : } else {
1512 0 : acc
1513 : }
1514 : } else {
1515 0 : Some(mbr.xid)
1516 : }
1517 0 : });
1518 :
1519 0 : if let Some(max_xid) = max_mbr_xid {
1520 0 : if self.checkpoint.update_next_xid(max_xid) {
1521 0 : self.checkpoint_modified = true;
1522 0 : }
1523 0 : }
1524 0 : Ok(())
1525 0 : }
1526 :
1527 0 : async fn ingest_multixact_truncate_record(
1528 0 : &mut self,
1529 0 : modification: &mut DatadirModification<'_>,
1530 0 : xlrec: &XlMultiXactTruncate,
1531 0 : ctx: &RequestContext,
1532 0 : ) -> Result<()> {
1533 0 : self.checkpoint.oldestMulti = xlrec.end_trunc_off;
1534 0 : self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
1535 0 : self.checkpoint_modified = true;
1536 0 :
1537 0 : // PerformMembersTruncation
1538 0 : let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
1539 0 : let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
1540 0 : let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
1541 0 : let mut segment: i32 = startsegment;
1542 :
1543 : // Delete all the segments except the last one. The last segment can still
1544 : // contain, possibly partially, valid data.
1545 0 : while segment != endsegment {
1546 0 : modification
1547 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1548 0 : .await?;
1549 :
1550 : /* move to next segment, handling wraparound correctly */
1551 0 : if segment == maxsegment {
1552 0 : segment = 0;
1553 0 : } else {
1554 0 : segment += 1;
1555 0 : }
1556 : }
1557 :
1558 : // Truncate offsets
1559 : // FIXME: this did not handle wraparound correctly
1560 :
1561 0 : Ok(())
1562 0 : }
1563 :
1564 0 : async fn ingest_relmap_page(
1565 0 : &mut self,
1566 0 : modification: &mut DatadirModification<'_>,
1567 0 : xlrec: &XlRelmapUpdate,
1568 0 : decoded: &DecodedWALRecord,
1569 0 : ctx: &RequestContext,
1570 0 : ) -> Result<()> {
1571 0 : let mut buf = decoded.record.clone();
1572 0 : buf.advance(decoded.main_data_offset);
1573 0 : // skip xl_relmap_update
1574 0 : buf.advance(12);
1575 0 :
1576 0 : modification
1577 0 : .put_relmap_file(
1578 0 : xlrec.tsid,
1579 0 : xlrec.dbid,
1580 0 : Bytes::copy_from_slice(&buf[..]),
1581 0 : ctx,
1582 0 : )
1583 0 : .await
1584 0 : }
1585 :
1586 54 : async fn put_rel_creation(
1587 54 : &mut self,
1588 54 : modification: &mut DatadirModification<'_>,
1589 54 : rel: RelTag,
1590 54 : ctx: &RequestContext,
1591 54 : ) -> Result<()> {
1592 54 : modification.put_rel_creation(rel, 0, ctx).await?;
1593 54 : Ok(())
1594 54 : }
1595 :
1596 817278 : async fn put_rel_page_image(
1597 817278 : &mut self,
1598 817278 : modification: &mut DatadirModification<'_>,
1599 817278 : rel: RelTag,
1600 817278 : blknum: BlockNumber,
1601 817278 : img: Bytes,
1602 817278 : ctx: &RequestContext,
1603 817278 : ) -> Result<(), PageReconstructError> {
1604 817278 : self.handle_rel_extend(modification, rel, blknum, ctx)
1605 16275 : .await?;
1606 817278 : modification.put_rel_page_image(rel, blknum, img)?;
1607 817278 : Ok(())
1608 817278 : }
1609 :
1610 436890 : async fn put_rel_wal_record(
1611 436890 : &mut self,
1612 436890 : modification: &mut DatadirModification<'_>,
1613 436890 : rel: RelTag,
1614 436890 : blknum: BlockNumber,
1615 436890 : rec: NeonWalRecord,
1616 436890 : ctx: &RequestContext,
1617 436890 : ) -> Result<()> {
1618 436890 : self.handle_rel_extend(modification, rel, blknum, ctx)
1619 837 : .await?;
1620 436890 : modification.put_rel_wal_record(rel, blknum, rec)?;
1621 436890 : Ok(())
1622 436890 : }
1623 :
1624 18036 : async fn put_rel_truncation(
1625 18036 : &mut self,
1626 18036 : modification: &mut DatadirModification<'_>,
1627 18036 : rel: RelTag,
1628 18036 : nblocks: BlockNumber,
1629 18036 : ctx: &RequestContext,
1630 18036 : ) -> anyhow::Result<()> {
1631 18036 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1632 18036 : Ok(())
1633 18036 : }
1634 :
1635 6 : async fn put_rel_drop(
1636 6 : &mut self,
1637 6 : modification: &mut DatadirModification<'_>,
1638 6 : rel: RelTag,
1639 6 : ctx: &RequestContext,
1640 6 : ) -> Result<()> {
1641 6 : modification.put_rel_drop(rel, ctx).await?;
1642 6 : Ok(())
1643 6 : }
1644 :
1645 1254168 : async fn handle_rel_extend(
1646 1254168 : &mut self,
1647 1254168 : modification: &mut DatadirModification<'_>,
1648 1254168 : rel: RelTag,
1649 1254168 : blknum: BlockNumber,
1650 1254168 : ctx: &RequestContext,
1651 1254168 : ) -> Result<(), PageReconstructError> {
1652 1254168 : let new_nblocks = blknum + 1;
1653 : // Check if the relation exists. We implicitly create relations on first
1654 : // record.
1655 : // TODO: would be nice if to be more explicit about it
1656 :
1657 : // Get current size and put rel creation if rel doesn't exist
1658 : //
1659 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1660 : // check the cache too. This is because eagerly checking the cache results in
1661 : // less work overall and 10% better performance. It's more work on cache miss
1662 : // but cache miss is rare.
1663 1254168 : let old_nblocks = if let Some(nblocks) = modification
1664 1254168 : .tline
1665 1254168 : .get_cached_rel_size(&rel, modification.get_lsn())
1666 : {
1667 1254138 : nblocks
1668 30 : } else if !modification
1669 30 : .tline
1670 30 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1671 6 : .await?
1672 : {
1673 : // create it with 0 size initially, the logic below will extend it
1674 30 : modification
1675 30 : .put_rel_creation(rel, 0, ctx)
1676 9 : .await
1677 30 : .context("Relation Error")?;
1678 30 : 0
1679 : } else {
1680 0 : modification
1681 0 : .tline
1682 0 : .get_rel_size(rel, Version::Modified(modification), ctx)
1683 0 : .await?
1684 : };
1685 :
1686 1254168 : if new_nblocks > old_nblocks {
1687 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1688 824364 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1689 :
1690 824364 : let mut key = rel_block_to_key(rel, blknum);
1691 : // fill the gap with zeros
1692 824364 : for gap_blknum in old_nblocks..blknum {
1693 8994 : key.field6 = gap_blknum;
1694 8994 :
1695 8994 : if self.shard.get_shard_number(&key) != self.shard.number {
1696 0 : continue;
1697 8994 : }
1698 8994 :
1699 8994 : modification.put_rel_page_image_zero(rel, gap_blknum);
1700 : }
1701 429804 : }
1702 1254168 : Ok(())
1703 1254168 : }
1704 :
1705 0 : async fn put_slru_page_image(
1706 0 : &mut self,
1707 0 : modification: &mut DatadirModification<'_>,
1708 0 : kind: SlruKind,
1709 0 : segno: u32,
1710 0 : blknum: BlockNumber,
1711 0 : img: Bytes,
1712 0 : ctx: &RequestContext,
1713 0 : ) -> Result<()> {
1714 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1715 0 : .await?;
1716 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1717 0 : Ok(())
1718 0 : }
1719 :
1720 0 : async fn handle_slru_extend(
1721 0 : &mut self,
1722 0 : modification: &mut DatadirModification<'_>,
1723 0 : kind: SlruKind,
1724 0 : segno: u32,
1725 0 : blknum: BlockNumber,
1726 0 : ctx: &RequestContext,
1727 0 : ) -> anyhow::Result<()> {
1728 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1729 0 : // extended with ZEROPAGE records, not with commit records, so it happens
1730 0 : // a lot less frequently.
1731 0 :
1732 0 : let new_nblocks = blknum + 1;
1733 : // Check if the relation exists. We implicitly create relations on first
1734 : // record.
1735 : // TODO: would be nice if to be more explicit about it
1736 0 : let old_nblocks = if !modification
1737 0 : .tline
1738 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1739 0 : .await?
1740 : {
1741 : // create it with 0 size initially, the logic below will extend it
1742 0 : modification
1743 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1744 0 : .await?;
1745 0 : 0
1746 : } else {
1747 0 : modification
1748 0 : .tline
1749 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1750 0 : .await?
1751 : };
1752 :
1753 0 : if new_nblocks > old_nblocks {
1754 0 : trace!(
1755 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1756 : kind,
1757 : segno,
1758 : old_nblocks,
1759 : new_nblocks
1760 : );
1761 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1762 :
1763 : // fill the gap with zeros
1764 0 : for gap_blknum in old_nblocks..blknum {
1765 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum);
1766 0 : }
1767 0 : }
1768 0 : Ok(())
1769 0 : }
1770 : }
1771 :
1772 36 : async fn get_relsize(
1773 36 : modification: &DatadirModification<'_>,
1774 36 : rel: RelTag,
1775 36 : ctx: &RequestContext,
1776 36 : ) -> Result<BlockNumber, PageReconstructError> {
1777 36 : let nblocks = if !modification
1778 36 : .tline
1779 36 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1780 0 : .await?
1781 : {
1782 0 : 0
1783 : } else {
1784 36 : modification
1785 36 : .tline
1786 36 : .get_rel_size(rel, Version::Modified(modification), ctx)
1787 0 : .await?
1788 : };
1789 36 : Ok(nblocks)
1790 36 : }
1791 :
1792 : #[allow(clippy::bool_assert_comparison)]
1793 : #[cfg(test)]
1794 : mod tests {
1795 : use super::*;
1796 : use crate::tenant::harness::*;
1797 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
1798 : use postgres_ffi::RELSEG_SIZE;
1799 :
1800 : use crate::DEFAULT_PG_VERSION;
1801 :
1802 : /// Arbitrary relation tag, for testing.
1803 : const TESTREL_A: RelTag = RelTag {
1804 : spcnode: 0,
1805 : dbnode: 111,
1806 : relnode: 1000,
1807 : forknum: 0,
1808 : };
1809 :
1810 36 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
1811 36 : // TODO
1812 36 : }
1813 :
1814 : static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
1815 :
1816 24 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
1817 24 : let mut m = tline.begin_modification(Lsn(0x10));
1818 24 : m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
1819 48 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
1820 24 : m.commit(ctx).await?;
1821 24 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
1822 :
1823 24 : Ok(walingest)
1824 24 : }
1825 :
1826 : #[tokio::test]
1827 6 : async fn test_relsize() -> Result<()> {
1828 24 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
1829 6 : let tline = tenant
1830 6 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1831 12 : .await?;
1832 15 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
1833 6 :
1834 6 : let mut m = tline.begin_modification(Lsn(0x20));
1835 6 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
1836 6 : walingest
1837 6 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
1838 6 : .await?;
1839 6 : m.on_record_end();
1840 6 : m.commit(&ctx).await?;
1841 6 : let mut m = tline.begin_modification(Lsn(0x30));
1842 6 : walingest
1843 6 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
1844 6 : .await?;
1845 6 : m.on_record_end();
1846 6 : m.commit(&ctx).await?;
1847 6 : let mut m = tline.begin_modification(Lsn(0x40));
1848 6 : walingest
1849 6 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
1850 6 : .await?;
1851 6 : m.on_record_end();
1852 6 : m.commit(&ctx).await?;
1853 6 : let mut m = tline.begin_modification(Lsn(0x50));
1854 6 : walingest
1855 6 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
1856 6 : .await?;
1857 6 : m.on_record_end();
1858 6 : m.commit(&ctx).await?;
1859 6 :
1860 6 : assert_current_logical_size(&tline, Lsn(0x50));
1861 6 :
1862 6 : // The relation was created at LSN 2, not visible at LSN 1 yet.
1863 6 : assert_eq!(
1864 6 : tline
1865 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1866 6 : .await?,
1867 6 : false
1868 6 : );
1869 6 : assert!(tline
1870 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
1871 6 : .await
1872 6 : .is_err());
1873 6 : assert_eq!(
1874 6 : tline
1875 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1876 6 : .await?,
1877 6 : true
1878 6 : );
1879 6 : assert_eq!(
1880 6 : tline
1881 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
1882 6 : .await?,
1883 6 : 1
1884 6 : );
1885 6 : assert_eq!(
1886 6 : tline
1887 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1888 6 : .await?,
1889 6 : 3
1890 6 : );
1891 6 :
1892 6 : // Check page contents at each LSN
1893 6 : assert_eq!(
1894 6 : tline
1895 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
1896 6 : .await?,
1897 6 : test_img("foo blk 0 at 2")
1898 6 : );
1899 6 :
1900 6 : assert_eq!(
1901 6 : tline
1902 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
1903 6 : .await?,
1904 6 : test_img("foo blk 0 at 3")
1905 6 : );
1906 6 :
1907 6 : assert_eq!(
1908 6 : tline
1909 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
1910 6 : .await?,
1911 6 : test_img("foo blk 0 at 3")
1912 6 : );
1913 6 : assert_eq!(
1914 6 : tline
1915 6 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
1916 6 : .await?,
1917 6 : test_img("foo blk 1 at 4")
1918 6 : );
1919 6 :
1920 6 : assert_eq!(
1921 6 : tline
1922 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
1923 6 : .await?,
1924 6 : test_img("foo blk 0 at 3")
1925 6 : );
1926 6 : assert_eq!(
1927 6 : tline
1928 6 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
1929 6 : .await?,
1930 6 : test_img("foo blk 1 at 4")
1931 6 : );
1932 6 : assert_eq!(
1933 6 : tline
1934 6 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
1935 6 : .await?,
1936 6 : test_img("foo blk 2 at 5")
1937 6 : );
1938 6 :
1939 6 : // Truncate last block
1940 6 : let mut m = tline.begin_modification(Lsn(0x60));
1941 6 : walingest
1942 6 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
1943 6 : .await?;
1944 6 : m.commit(&ctx).await?;
1945 6 : assert_current_logical_size(&tline, Lsn(0x60));
1946 6 :
1947 6 : // Check reported size and contents after truncation
1948 6 : assert_eq!(
1949 6 : tline
1950 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
1951 6 : .await?,
1952 6 : 2
1953 6 : );
1954 6 : assert_eq!(
1955 6 : tline
1956 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
1957 6 : .await?,
1958 6 : test_img("foo blk 0 at 3")
1959 6 : );
1960 6 : assert_eq!(
1961 6 : tline
1962 6 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
1963 6 : .await?,
1964 6 : test_img("foo blk 1 at 4")
1965 6 : );
1966 6 :
1967 6 : // should still see the truncated block with older LSN
1968 6 : assert_eq!(
1969 6 : tline
1970 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
1971 6 : .await?,
1972 6 : 3
1973 6 : );
1974 6 : assert_eq!(
1975 6 : tline
1976 6 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
1977 6 : .await?,
1978 6 : test_img("foo blk 2 at 5")
1979 6 : );
1980 6 :
1981 6 : // Truncate to zero length
1982 6 : let mut m = tline.begin_modification(Lsn(0x68));
1983 6 : walingest
1984 6 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
1985 6 : .await?;
1986 6 : m.commit(&ctx).await?;
1987 6 : assert_eq!(
1988 6 : tline
1989 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
1990 6 : .await?,
1991 6 : 0
1992 6 : );
1993 6 :
1994 6 : // Extend from 0 to 2 blocks, leaving a gap
1995 6 : let mut m = tline.begin_modification(Lsn(0x70));
1996 6 : walingest
1997 6 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
1998 6 : .await?;
1999 6 : m.on_record_end();
2000 6 : m.commit(&ctx).await?;
2001 6 : assert_eq!(
2002 6 : tline
2003 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
2004 6 : .await?,
2005 6 : 2
2006 6 : );
2007 6 : assert_eq!(
2008 6 : tline
2009 6 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
2010 6 : .await?,
2011 6 : ZERO_PAGE
2012 6 : );
2013 6 : assert_eq!(
2014 6 : tline
2015 6 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
2016 6 : .await?,
2017 6 : test_img("foo blk 1")
2018 6 : );
2019 6 :
2020 6 : // Extend a lot more, leaving a big gap that spans across segments
2021 6 : let mut m = tline.begin_modification(Lsn(0x80));
2022 6 : walingest
2023 6 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
2024 6 : .await?;
2025 6 : m.on_record_end();
2026 570 : m.commit(&ctx).await?;
2027 6 : assert_eq!(
2028 6 : tline
2029 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2030 6 : .await?,
2031 6 : 1501
2032 6 : );
2033 8994 : for blk in 2..1500 {
2034 8988 : assert_eq!(
2035 8988 : tline
2036 8988 : .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
2037 4620 : .await?,
2038 8988 : ZERO_PAGE
2039 6 : );
2040 6 : }
2041 6 : assert_eq!(
2042 6 : tline
2043 6 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
2044 6 : .await?,
2045 6 : test_img("foo blk 1500")
2046 6 : );
2047 6 :
2048 6 : Ok(())
2049 6 : }
2050 :
2051 : // Test what happens if we dropped a relation
2052 : // and then created it again within the same layer.
2053 : #[tokio::test]
2054 6 : async fn test_drop_extend() -> Result<()> {
2055 6 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
2056 6 : .await?
2057 6 : .load()
2058 24 : .await;
2059 6 : let tline = tenant
2060 6 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2061 12 : .await?;
2062 15 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2063 6 :
2064 6 : let mut m = tline.begin_modification(Lsn(0x20));
2065 6 : walingest
2066 6 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
2067 6 : .await?;
2068 6 : m.commit(&ctx).await?;
2069 6 :
2070 6 : // Check that rel exists and size is correct
2071 6 : assert_eq!(
2072 6 : tline
2073 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2074 6 : .await?,
2075 6 : true
2076 6 : );
2077 6 : assert_eq!(
2078 6 : tline
2079 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2080 6 : .await?,
2081 6 : 1
2082 6 : );
2083 6 :
2084 6 : // Drop rel
2085 6 : let mut m = tline.begin_modification(Lsn(0x30));
2086 6 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
2087 6 : m.commit(&ctx).await?;
2088 6 :
2089 6 : // Check that rel is not visible anymore
2090 6 : assert_eq!(
2091 6 : tline
2092 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
2093 6 : .await?,
2094 6 : false
2095 6 : );
2096 6 :
2097 6 : // FIXME: should fail
2098 6 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
2099 6 :
2100 6 : // Re-create it
2101 6 : let mut m = tline.begin_modification(Lsn(0x40));
2102 6 : walingest
2103 6 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
2104 6 : .await?;
2105 6 : m.commit(&ctx).await?;
2106 6 :
2107 6 : // Check that rel exists and size is correct
2108 6 : assert_eq!(
2109 6 : tline
2110 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2111 6 : .await?,
2112 6 : true
2113 6 : );
2114 6 : assert_eq!(
2115 6 : tline
2116 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2117 6 : .await?,
2118 6 : 1
2119 6 : );
2120 6 :
2121 6 : Ok(())
2122 6 : }
2123 :
2124 : // Test what happens if we truncated a relation
2125 : // so that one of its segments was dropped
2126 : // and then extended it again within the same layer.
2127 : #[tokio::test]
2128 6 : async fn test_truncate_extend() -> Result<()> {
2129 6 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
2130 6 : .await?
2131 6 : .load()
2132 24 : .await;
2133 6 : let tline = tenant
2134 6 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2135 12 : .await?;
2136 15 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2137 6 :
2138 6 : // Create a 20 MB relation (the size is arbitrary)
2139 6 : let relsize = 20 * 1024 * 1024 / 8192;
2140 6 : let mut m = tline.begin_modification(Lsn(0x20));
2141 15360 : for blkno in 0..relsize {
2142 15360 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
2143 15360 : walingest
2144 15360 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2145 6 : .await?;
2146 6 : }
2147 6 : m.commit(&ctx).await?;
2148 6 :
2149 6 : // The relation was created at LSN 20, not visible at LSN 1 yet.
2150 6 : assert_eq!(
2151 6 : tline
2152 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2153 6 : .await?,
2154 6 : false
2155 6 : );
2156 6 : assert!(tline
2157 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2158 6 : .await
2159 6 : .is_err());
2160 6 :
2161 6 : assert_eq!(
2162 6 : tline
2163 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2164 6 : .await?,
2165 6 : true
2166 6 : );
2167 6 : assert_eq!(
2168 6 : tline
2169 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2170 6 : .await?,
2171 6 : relsize
2172 6 : );
2173 6 :
2174 6 : // Check relation content
2175 15360 : for blkno in 0..relsize {
2176 15360 : let lsn = Lsn(0x20);
2177 15360 : let data = format!("foo blk {} at {}", blkno, lsn);
2178 15360 : assert_eq!(
2179 15360 : tline
2180 15360 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
2181 5409 : .await?,
2182 15360 : test_img(&data)
2183 6 : );
2184 6 : }
2185 6 :
2186 6 : // Truncate relation so that second segment was dropped
2187 6 : // - only leave one page
2188 6 : let mut m = tline.begin_modification(Lsn(0x60));
2189 6 : walingest
2190 6 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2191 6 : .await?;
2192 6 : m.commit(&ctx).await?;
2193 6 :
2194 6 : // Check reported size and contents after truncation
2195 6 : assert_eq!(
2196 6 : tline
2197 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2198 6 : .await?,
2199 6 : 1
2200 6 : );
2201 6 :
2202 12 : for blkno in 0..1 {
2203 6 : let lsn = Lsn(0x20);
2204 6 : let data = format!("foo blk {} at {}", blkno, lsn);
2205 6 : assert_eq!(
2206 6 : tline
2207 6 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
2208 6 : .await?,
2209 6 : test_img(&data)
2210 6 : );
2211 6 : }
2212 6 :
2213 6 : // should still see all blocks with older LSN
2214 6 : assert_eq!(
2215 6 : tline
2216 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2217 6 : .await?,
2218 6 : relsize
2219 6 : );
2220 15360 : for blkno in 0..relsize {
2221 15360 : let lsn = Lsn(0x20);
2222 15360 : let data = format!("foo blk {} at {}", blkno, lsn);
2223 15360 : assert_eq!(
2224 15360 : tline
2225 15360 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
2226 5568 : .await?,
2227 15360 : test_img(&data)
2228 6 : );
2229 6 : }
2230 6 :
2231 6 : // Extend relation again.
2232 6 : // Add enough blocks to create second segment
2233 6 : let lsn = Lsn(0x80);
2234 6 : let mut m = tline.begin_modification(lsn);
2235 15360 : for blkno in 0..relsize {
2236 15360 : let data = format!("foo blk {} at {}", blkno, lsn);
2237 15360 : walingest
2238 15360 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2239 6 : .await?;
2240 6 : }
2241 9 : m.commit(&ctx).await?;
2242 6 :
2243 6 : assert_eq!(
2244 6 : tline
2245 6 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2246 6 : .await?,
2247 6 : true
2248 6 : );
2249 6 : assert_eq!(
2250 6 : tline
2251 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2252 6 : .await?,
2253 6 : relsize
2254 6 : );
2255 6 : // Check relation content
2256 15360 : for blkno in 0..relsize {
2257 15360 : let lsn = Lsn(0x80);
2258 15360 : let data = format!("foo blk {} at {}", blkno, lsn);
2259 15360 : assert_eq!(
2260 15360 : tline
2261 15360 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
2262 5484 : .await?,
2263 15360 : test_img(&data)
2264 6 : );
2265 6 : }
2266 6 :
2267 6 : Ok(())
2268 6 : }
2269 :
2270 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2271 : /// split into multiple 1 GB segments in Postgres.
2272 : #[tokio::test]
2273 6 : async fn test_large_rel() -> Result<()> {
2274 21 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
2275 6 : let tline = tenant
2276 6 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2277 12 : .await?;
2278 15 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2279 6 :
2280 6 : let mut lsn = 0x10;
2281 786438 : for blknum in 0..RELSEG_SIZE + 1 {
2282 786438 : lsn += 0x10;
2283 786438 : let mut m = tline.begin_modification(Lsn(lsn));
2284 786438 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2285 786438 : walingest
2286 786438 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2287 16260 : .await?;
2288 786438 : m.commit(&ctx).await?;
2289 6 : }
2290 6 :
2291 6 : assert_current_logical_size(&tline, Lsn(lsn));
2292 6 :
2293 6 : assert_eq!(
2294 6 : tline
2295 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2296 6 : .await?,
2297 6 : RELSEG_SIZE + 1
2298 6 : );
2299 6 :
2300 6 : // Truncate one block
2301 6 : lsn += 0x10;
2302 6 : let mut m = tline.begin_modification(Lsn(lsn));
2303 6 : walingest
2304 6 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2305 6 : .await?;
2306 6 : m.commit(&ctx).await?;
2307 6 : assert_eq!(
2308 6 : tline
2309 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2310 6 : .await?,
2311 6 : RELSEG_SIZE
2312 6 : );
2313 6 : assert_current_logical_size(&tline, Lsn(lsn));
2314 6 :
2315 6 : // Truncate another block
2316 6 : lsn += 0x10;
2317 6 : let mut m = tline.begin_modification(Lsn(lsn));
2318 6 : walingest
2319 6 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2320 6 : .await?;
2321 6 : m.commit(&ctx).await?;
2322 6 : assert_eq!(
2323 6 : tline
2324 6 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2325 6 : .await?,
2326 6 : RELSEG_SIZE - 1
2327 6 : );
2328 6 : assert_current_logical_size(&tline, Lsn(lsn));
2329 6 :
2330 6 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2331 6 : // This tests the behavior at segment boundaries
2332 6 : let mut size: i32 = 3000;
2333 18012 : while size >= 0 {
2334 18006 : lsn += 0x10;
2335 18006 : let mut m = tline.begin_modification(Lsn(lsn));
2336 18006 : walingest
2337 18006 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2338 6 : .await?;
2339 18006 : m.commit(&ctx).await?;
2340 18006 : assert_eq!(
2341 18006 : tline
2342 18006 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2343 6 : .await?,
2344 18006 : size as BlockNumber
2345 6 : );
2346 6 :
2347 18006 : size -= 1;
2348 6 : }
2349 6 : assert_current_logical_size(&tline, Lsn(lsn));
2350 6 :
2351 6 : Ok(())
2352 6 : }
2353 :
2354 : /// Replay a wal segment file taken directly from safekeepers.
2355 : ///
2356 : /// This test is useful for benchmarking since it allows us to profile only
2357 : /// the walingest code in a single-threaded executor, and iterate more quickly
2358 : /// without waiting for unrelated steps.
2359 : #[tokio::test]
2360 6 : async fn test_ingest_real_wal() {
2361 6 : use crate::tenant::harness::*;
2362 6 : use postgres_ffi::waldecoder::WalStreamDecoder;
2363 6 : use postgres_ffi::WAL_SEGMENT_SIZE;
2364 6 :
2365 6 : // Define test data path and constants.
2366 6 : //
2367 6 : // Steps to reconstruct the data, if needed:
2368 6 : // 1. Run the pgbench python test
2369 6 : // 2. Take the first wal segment file from safekeeper
2370 6 : // 3. Compress it using `zstd --long input_file`
2371 6 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2372 6 : // 5. Grep sk logs for "restart decoder" to get startpoint
2373 6 : // 6. Run just the decoder from this test to get the endpoint.
2374 6 : // It's the last LSN the decoder will output.
2375 6 : let pg_version = 15; // The test data was generated by pg15
2376 6 : let path = "test_data/sk_wal_segment_from_pgbench";
2377 6 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2378 6 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2379 6 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2380 6 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2381 6 :
2382 6 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
2383 6 : let span = harness
2384 6 : .span()
2385 6 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
2386 21 : let (tenant, ctx) = harness.load().await;
2387 6 :
2388 6 : let remote_initdb_path =
2389 6 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2390 6 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2391 6 :
2392 6 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2393 6 : .expect("creating test dir should work");
2394 6 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2395 6 :
2396 6 : // Bootstrap a real timeline. We can't use create_test_timeline because
2397 6 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2398 6 : // the garbage data.
2399 6 : let tline = tenant
2400 6 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2401 60641 : .await
2402 6 : .unwrap();
2403 6 :
2404 6 : // We fully read and decompress this into memory before decoding
2405 6 : // to get a more accurate perf profile of the decoder.
2406 6 : let bytes = {
2407 6 : use async_compression::tokio::bufread::ZstdDecoder;
2408 6 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2409 6 : let reader = tokio::io::BufReader::new(file);
2410 6 : let decoder = ZstdDecoder::new(reader);
2411 6 : let mut reader = tokio::io::BufReader::new(decoder);
2412 6 : let mut buffer = Vec::new();
2413 663 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2414 6 : buffer
2415 6 : };
2416 6 :
2417 6 : // TODO start a profiler too
2418 6 : let started_at = std::time::Instant::now();
2419 6 :
2420 6 : // Initialize walingest
2421 6 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2422 6 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2423 6 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2424 15 : .await
2425 6 : .unwrap();
2426 6 : let mut modification = tline.begin_modification(startpoint);
2427 6 : println!("decoding {} bytes", bytes.len() - xlogoff);
2428 6 :
2429 6 : // Decode and ingest wal. We process the wal in chunks because
2430 6 : // that's what happens when we get bytes from safekeepers.
2431 1424058 : for chunk in bytes[xlogoff..].chunks(50) {
2432 1424058 : decoder.feed_bytes(chunk);
2433 1861608 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2434 437550 : let mut decoded = DecodedWALRecord::default();
2435 437550 : decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
2436 437550 : walingest
2437 437550 : .ingest_record(decoded, lsn, &mut modification, &ctx)
2438 437550 : .instrument(span.clone())
2439 888 : .await
2440 437550 : .unwrap();
2441 6 : }
2442 1424058 : modification.commit(&ctx).await.unwrap();
2443 6 : }
2444 6 :
2445 6 : let duration = started_at.elapsed();
2446 6 : println!("done in {:?}", duration);
2447 6 : }
2448 : }
|