Line data Source code
1 : //!
2 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
3 : //!
4 : //! The pipeline for ingesting WAL looks like this:
5 : //!
6 : //! WAL receiver -> WalIngest -> Repository
7 : //!
8 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
9 : //! and decodes it to individual WAL records. It feeds the WAL records
10 : //! to WalIngest, which parses them and stores them in the Repository.
11 : //!
12 : //! The neon Repository can store page versions in two formats: as
13 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
14 : //! page images out of some WAL records, but most it stores as WAL
15 : //! records. If a WAL record modifies multiple pages, WalIngest
16 : //! will call Repository::put_wal_record or put_page_image functions
17 : //! separately for each modified page.
18 : //!
19 : //! To reconstruct a page using a WAL record, the Repository calls the
20 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
21 : //! redo Postgres process, but some records it can handle directly with
22 : //! bespoken Rust code.
23 :
24 : use std::sync::Arc;
25 : use std::sync::OnceLock;
26 : use std::time::Duration;
27 : use std::time::Instant;
28 : use std::time::SystemTime;
29 :
30 : use pageserver_api::shard::ShardIdentity;
31 : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
32 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
33 :
34 : use anyhow::{bail, Context, Result};
35 : use bytes::{Buf, Bytes, BytesMut};
36 : use tracing::*;
37 : use utils::failpoint_support;
38 : use utils::rate_limit::RateLimit;
39 :
40 : use crate::context::RequestContext;
41 : use crate::metrics::WAL_INGEST;
42 : use crate::pgdatadir_mapping::{DatadirModification, Version};
43 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
44 : use crate::tenant::PageReconstructError;
45 : use crate::tenant::Timeline;
46 : use crate::walrecord::*;
47 : use crate::ZERO_PAGE;
48 : use pageserver_api::key::rel_block_to_key;
49 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
50 : use postgres_ffi::pg_constants;
51 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
52 : use postgres_ffi::TransactionId;
53 : use postgres_ffi::BLCKSZ;
54 : use utils::bin_ser::SerializeError;
55 : use utils::lsn::Lsn;
56 :
57 : enum_pgversion! {CheckPoint, pgv::CheckPoint}
58 :
59 : impl CheckPoint {
60 6 : fn encode(&self) -> Result<Bytes, SerializeError> {
61 6 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
62 6 : }
63 :
64 145834 : fn update_next_xid(&mut self, xid: u32) -> bool {
65 145834 : enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
66 145834 : }
67 :
68 0 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
69 0 : enum_pgversion_dispatch!(self, CheckPoint, cp, {
70 0 : cp.update_next_multixid(multi_xid, multi_offset)
71 : })
72 0 : }
73 : }
74 :
75 : /// Temporary limitation of WAL lag warnings after attach
76 : ///
77 : /// After tenant attach, we want to limit WAL lag warnings because
78 : /// we don't look at the WAL until the attach is complete, which
79 : /// might take a while.
80 : pub struct WalLagCooldown {
81 : /// Until when should this limitation apply at all
82 : active_until: std::time::Instant,
83 : /// The maximum lag to suppress. Lags above this limit get reported anyways.
84 : max_lag: Duration,
85 : }
86 :
87 : impl WalLagCooldown {
88 0 : pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
89 0 : Self {
90 0 : active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
91 0 : max_lag: attach_duration * 2 + Duration::from_secs(60),
92 0 : }
93 0 : }
94 : }
95 :
96 : pub struct WalIngest {
97 : attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
98 : shard: ShardIdentity,
99 : checkpoint: CheckPoint,
100 : checkpoint_modified: bool,
101 : warn_ingest_lag: WarnIngestLag,
102 : }
103 :
104 : struct WarnIngestLag {
105 : lag_msg_ratelimit: RateLimit,
106 : future_lsn_msg_ratelimit: RateLimit,
107 : timestamp_invalid_msg_ratelimit: RateLimit,
108 : }
109 :
110 : impl WalIngest {
111 12 : pub async fn new(
112 12 : timeline: &Timeline,
113 12 : startpoint: Lsn,
114 12 : ctx: &RequestContext,
115 12 : ) -> anyhow::Result<WalIngest> {
116 : // Fetch the latest checkpoint into memory, so that we can compare with it
117 : // quickly in `ingest_record` and update it when it changes.
118 12 : let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
119 12 : let pgversion = timeline.pg_version;
120 :
121 12 : let checkpoint = dispatch_pgversion!(pgversion, {
122 0 : let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
123 0 : trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
124 0 : <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
125 : });
126 :
127 12 : Ok(WalIngest {
128 12 : shard: *timeline.get_shard_identity(),
129 12 : checkpoint,
130 12 : checkpoint_modified: false,
131 12 : attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
132 12 : warn_ingest_lag: WarnIngestLag {
133 12 : lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
134 12 : future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
135 12 : timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
136 12 : },
137 12 : })
138 12 : }
139 :
140 : ///
141 : /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
142 : ///
143 : /// This function updates `lsn` field of `DatadirModification`
144 : ///
145 : /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
146 : /// relations/pages that the record affects.
147 : ///
148 : /// This function returns `true` if the record was ingested, and `false` if it was filtered out
149 : ///
150 145852 : pub async fn ingest_record(
151 145852 : &mut self,
152 145852 : decoded: DecodedWALRecord,
153 145852 : lsn: Lsn,
154 145852 : modification: &mut DatadirModification<'_>,
155 145852 : ctx: &RequestContext,
156 145852 : ) -> anyhow::Result<bool> {
157 145852 : WAL_INGEST.records_received.inc();
158 145852 : let pg_version = modification.tline.pg_version;
159 145852 : let prev_len = modification.len();
160 145852 :
161 145852 : modification.set_lsn(lsn)?;
162 :
163 145852 : if decoded.is_dbase_create_copy(pg_version) {
164 : // Records of this type should always be preceded by a commit(), as they
165 : // rely on reading data pages back from the Timeline.
166 0 : assert!(!modification.has_dirty_data_pages());
167 145852 : }
168 :
169 145852 : let mut buf = decoded.record.clone();
170 145852 : buf.advance(decoded.main_data_offset);
171 145852 :
172 145852 : assert!(!self.checkpoint_modified);
173 145852 : if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
174 145834 : && self.checkpoint.update_next_xid(decoded.xl_xid)
175 2 : {
176 2 : self.checkpoint_modified = true;
177 145850 : }
178 :
179 145852 : failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
180 :
181 145852 : match decoded.xl_rmid {
182 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
183 : // Heap AM records need some special handling, because they modify VM pages
184 : // without registering them with the standard mechanism.
185 145474 : self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
186 0 : .await?;
187 : }
188 : pg_constants::RM_NEON_ID => {
189 0 : self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
190 0 : .await?;
191 : }
192 : // Handle other special record types
193 : pg_constants::RM_SMGR_ID => {
194 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
195 16 :
196 16 : if info == pg_constants::XLOG_SMGR_CREATE {
197 16 : let create = XlSmgrCreate::decode(&mut buf);
198 16 : self.ingest_xlog_smgr_create(modification, &create, ctx)
199 12 : .await?;
200 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
201 0 : let truncate = XlSmgrTruncate::decode(&mut buf);
202 0 : self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
203 0 : .await?;
204 0 : }
205 : }
206 : pg_constants::RM_DBASE_ID => {
207 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
208 0 : debug!(%info, %pg_version, "handle RM_DBASE_ID");
209 :
210 0 : if pg_version == 14 {
211 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
212 0 : let createdb = XlCreateDatabase::decode(&mut buf);
213 0 : debug!("XLOG_DBASE_CREATE v14");
214 :
215 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
216 0 : .await?;
217 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
218 0 : let dropdb = XlDropDatabase::decode(&mut buf);
219 0 : for tablespace_id in dropdb.tablespace_ids {
220 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
221 0 : modification
222 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
223 0 : .await?;
224 : }
225 0 : }
226 0 : } else if pg_version == 15 {
227 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
228 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
229 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
230 : // The XLOG record was renamed between v14 and v15,
231 : // but the record format is the same.
232 : // So we can reuse XlCreateDatabase here.
233 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
234 0 : let createdb = XlCreateDatabase::decode(&mut buf);
235 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
236 0 : .await?;
237 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
238 0 : let dropdb = XlDropDatabase::decode(&mut buf);
239 0 : for tablespace_id in dropdb.tablespace_ids {
240 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
241 0 : modification
242 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
243 0 : .await?;
244 : }
245 0 : }
246 0 : } else if pg_version == 16 {
247 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
248 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
249 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
250 : // The XLOG record was renamed between v14 and v15,
251 : // but the record format is the same.
252 : // So we can reuse XlCreateDatabase here.
253 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
254 0 : let createdb = XlCreateDatabase::decode(&mut buf);
255 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
256 0 : .await?;
257 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
258 0 : let dropdb = XlDropDatabase::decode(&mut buf);
259 0 : for tablespace_id in dropdb.tablespace_ids {
260 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
261 0 : modification
262 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
263 0 : .await?;
264 : }
265 0 : }
266 0 : } else if pg_version == 17 {
267 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
268 0 : debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
269 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
270 : // The XLOG record was renamed between v14 and v15,
271 : // but the record format is the same.
272 : // So we can reuse XlCreateDatabase here.
273 0 : debug!("XLOG_DBASE_CREATE_FILE_COPY");
274 0 : let createdb = XlCreateDatabase::decode(&mut buf);
275 0 : self.ingest_xlog_dbase_create(modification, &createdb, ctx)
276 0 : .await?;
277 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
278 0 : let dropdb = XlDropDatabase::decode(&mut buf);
279 0 : for tablespace_id in dropdb.tablespace_ids {
280 0 : trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
281 0 : modification
282 0 : .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
283 0 : .await?;
284 : }
285 0 : }
286 0 : }
287 : }
288 : pg_constants::RM_TBLSPC_ID => {
289 0 : trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
290 : }
291 : pg_constants::RM_CLOG_ID => {
292 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
293 0 :
294 0 : if info == pg_constants::CLOG_ZEROPAGE {
295 0 : let pageno = if pg_version < 17 {
296 0 : buf.get_u32_le()
297 : } else {
298 0 : buf.get_u64_le() as u32
299 : };
300 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
301 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
302 0 : self.put_slru_page_image(
303 0 : modification,
304 0 : SlruKind::Clog,
305 0 : segno,
306 0 : rpageno,
307 0 : ZERO_PAGE.clone(),
308 0 : ctx,
309 0 : )
310 0 : .await?;
311 : } else {
312 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
313 0 : let xlrec = XlClogTruncate::decode(&mut buf, pg_version);
314 0 : self.ingest_clog_truncate_record(modification, &xlrec, ctx)
315 0 : .await?;
316 : }
317 : }
318 : pg_constants::RM_XACT_ID => {
319 24 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
320 24 :
321 24 : if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
322 8 : let parsed_xact =
323 8 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
324 8 : self.ingest_xact_record(
325 8 : modification,
326 8 : &parsed_xact,
327 8 : info == pg_constants::XLOG_XACT_COMMIT,
328 8 : decoded.origin_id,
329 8 : ctx,
330 8 : )
331 0 : .await?;
332 16 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
333 16 : || info == pg_constants::XLOG_XACT_ABORT_PREPARED
334 : {
335 0 : let parsed_xact =
336 0 : XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
337 0 : self.ingest_xact_record(
338 0 : modification,
339 0 : &parsed_xact,
340 0 : info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
341 0 : decoded.origin_id,
342 0 : ctx,
343 0 : )
344 0 : .await?;
345 : // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
346 0 : trace!(
347 0 : "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
348 : decoded.xl_xid,
349 : parsed_xact.xid,
350 : lsn,
351 : );
352 :
353 0 : let xid: u64 = if pg_version >= 17 {
354 0 : self.adjust_to_full_transaction_id(parsed_xact.xid)?
355 : } else {
356 0 : parsed_xact.xid as u64
357 : };
358 0 : modification.drop_twophase_file(xid, ctx).await?;
359 16 : } else if info == pg_constants::XLOG_XACT_PREPARE {
360 0 : let xid: u64 = if pg_version >= 17 {
361 0 : self.adjust_to_full_transaction_id(decoded.xl_xid)?
362 : } else {
363 0 : decoded.xl_xid as u64
364 : };
365 0 : modification
366 0 : .put_twophase_file(xid, Bytes::copy_from_slice(&buf[..]), ctx)
367 0 : .await?;
368 16 : }
369 : }
370 : pg_constants::RM_MULTIXACT_ID => {
371 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
372 0 :
373 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
374 0 : let pageno = if pg_version < 17 {
375 0 : buf.get_u32_le()
376 : } else {
377 0 : buf.get_u64_le() as u32
378 : };
379 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
380 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
381 0 : self.put_slru_page_image(
382 0 : modification,
383 0 : SlruKind::MultiXactOffsets,
384 0 : segno,
385 0 : rpageno,
386 0 : ZERO_PAGE.clone(),
387 0 : ctx,
388 0 : )
389 0 : .await?;
390 0 : } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
391 0 : let pageno = if pg_version < 17 {
392 0 : buf.get_u32_le()
393 : } else {
394 0 : buf.get_u64_le() as u32
395 : };
396 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
397 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
398 0 : self.put_slru_page_image(
399 0 : modification,
400 0 : SlruKind::MultiXactMembers,
401 0 : segno,
402 0 : rpageno,
403 0 : ZERO_PAGE.clone(),
404 0 : ctx,
405 0 : )
406 0 : .await?;
407 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
408 0 : let xlrec = XlMultiXactCreate::decode(&mut buf);
409 0 : self.ingest_multixact_create_record(modification, &xlrec)?;
410 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
411 0 : let xlrec = XlMultiXactTruncate::decode(&mut buf);
412 0 : self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
413 0 : .await?;
414 0 : }
415 : }
416 : pg_constants::RM_RELMAP_ID => {
417 0 : let xlrec = XlRelmapUpdate::decode(&mut buf);
418 0 : self.ingest_relmap_page(modification, &xlrec, &decoded, ctx)
419 0 : .await?;
420 : }
421 : pg_constants::RM_XLOG_ID => {
422 30 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
423 30 :
424 30 : if info == pg_constants::XLOG_PARAMETER_CHANGE {
425 2 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
426 0 : let rec = v17::XlParameterChange::decode(&mut buf);
427 0 : cp.wal_level = rec.wal_level;
428 0 : self.checkpoint_modified = true;
429 2 : }
430 28 : } else if info == pg_constants::XLOG_END_OF_RECOVERY {
431 0 : if let CheckPoint::V17(cp) = &mut self.checkpoint {
432 0 : let rec = v17::XlEndOfRecovery::decode(&mut buf);
433 0 : cp.wal_level = rec.wal_level;
434 0 : self.checkpoint_modified = true;
435 0 : }
436 28 : }
437 :
438 30 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
439 0 : if info == pg_constants::XLOG_NEXTOID {
440 0 : let next_oid = buf.get_u32_le();
441 0 : if cp.nextOid != next_oid {
442 0 : cp.nextOid = next_oid;
443 0 : self.checkpoint_modified = true;
444 0 : }
445 0 : } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
446 0 : || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
447 : {
448 0 : let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
449 0 : buf.copy_to_slice(&mut checkpoint_bytes);
450 0 : let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
451 0 : trace!(
452 0 : "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
453 : xlog_checkpoint.oldestXid,
454 : cp.oldestXid
455 : );
456 0 : if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
457 0 : cp.oldestXid = xlog_checkpoint.oldestXid;
458 0 : }
459 0 : trace!(
460 0 : "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
461 : xlog_checkpoint.oldestActiveXid,
462 : cp.oldestActiveXid
463 : );
464 :
465 : // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
466 : // because at shutdown, all in-progress transactions will implicitly
467 : // end. Postgres startup code knows that, and allows hot standby to start
468 : // immediately from a shutdown checkpoint.
469 : //
470 : // In Neon, Postgres hot standby startup always behaves as if starting from
471 : // an online checkpoint. It needs a valid `oldestActiveXid` value, so
472 : // instead of overwriting self.checkpoint.oldestActiveXid with
473 : // InvalidTransactionid from the checkpoint WAL record, update it to a
474 : // proper value, knowing that there are no in-progress transactions at this
475 : // point, except for prepared transactions.
476 : //
477 : // See also the neon code changes in the InitWalRecovery() function.
478 0 : if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
479 0 : && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
480 : {
481 0 : let oldest_active_xid = if pg_version >= 17 {
482 0 : let mut oldest_active_full_xid = cp.nextXid.value;
483 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
484 0 : if xid < oldest_active_full_xid {
485 0 : oldest_active_full_xid = xid;
486 0 : }
487 : }
488 0 : oldest_active_full_xid as u32
489 : } else {
490 0 : let mut oldest_active_xid = cp.nextXid.value as u32;
491 0 : for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
492 0 : let narrow_xid = xid as u32;
493 0 : if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
494 0 : oldest_active_xid = narrow_xid;
495 0 : }
496 : }
497 0 : oldest_active_xid
498 : };
499 0 : cp.oldestActiveXid = oldest_active_xid;
500 0 : } else {
501 0 : cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
502 0 : }
503 :
504 : // Write a new checkpoint key-value pair on every checkpoint record, even
505 : // if nothing really changed. Not strictly required, but it seems nice to
506 : // have some trace of the checkpoint records in the layer files at the same
507 : // LSNs.
508 0 : self.checkpoint_modified = true;
509 0 : }
510 : });
511 : }
512 : pg_constants::RM_LOGICALMSG_ID => {
513 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
514 0 :
515 0 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
516 0 : let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
517 0 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
518 0 : let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
519 0 : if prefix == "neon-test" {
520 : // This is a convenient way to make the WAL ingestion pause at
521 : // particular point in the WAL. For more fine-grained control,
522 : // we could peek into the message and only pause if it contains
523 : // a particular string, for example, but this is enough for now.
524 0 : failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
525 0 : } else if let Some(path) = prefix.strip_prefix("neon-file:") {
526 0 : modification.put_file(path, message, ctx).await?;
527 0 : }
528 0 : }
529 : }
530 : pg_constants::RM_STANDBY_ID => {
531 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
532 16 : if info == pg_constants::XLOG_RUNNING_XACTS {
533 0 : let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
534 :
535 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
536 0 : cp.oldestActiveXid = xlrec.oldest_running_xid;
537 0 : });
538 :
539 0 : self.checkpoint_modified = true;
540 16 : }
541 : }
542 : pg_constants::RM_REPLORIGIN_ID => {
543 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
544 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
545 0 : let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
546 0 : modification
547 0 : .set_replorigin(xlrec.node_id, xlrec.remote_lsn)
548 0 : .await?
549 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
550 0 : let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
551 0 : modification.drop_replorigin(xlrec.node_id).await?
552 0 : }
553 : }
554 292 : _x => {
555 292 : // TODO: should probably log & fail here instead of blindly
556 292 : // doing something without understanding the protocol
557 292 : }
558 : }
559 :
560 : // Iterate through all the blocks that the record modifies, and
561 : // "put" a separate copy of the record for each block.
562 145852 : for blk in decoded.blocks.iter() {
563 145642 : let rel = RelTag {
564 145642 : spcnode: blk.rnode_spcnode,
565 145642 : dbnode: blk.rnode_dbnode,
566 145642 : relnode: blk.rnode_relnode,
567 145642 : forknum: blk.forknum,
568 145642 : };
569 145642 :
570 145642 : let key = rel_block_to_key(rel, blk.blkno);
571 145642 : let key_is_local = self.shard.is_key_local(&key);
572 145642 :
573 145642 : tracing::debug!(
574 : lsn=%lsn,
575 : key=%key,
576 0 : "ingest: shard decision {} (checkpoint={})",
577 0 : if !key_is_local { "drop" } else { "keep" },
578 : self.checkpoint_modified
579 : );
580 :
581 145642 : if !key_is_local {
582 0 : if self.shard.is_shard_zero() {
583 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
584 : // its blkno in case it implicitly extends a relation.
585 0 : self.observe_decoded_block(modification, blk, ctx).await?;
586 0 : }
587 :
588 0 : continue;
589 145642 : }
590 145642 : self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
591 284 : .await?;
592 : }
593 :
594 : // If checkpoint data was updated, store the new version in the repository
595 145852 : if self.checkpoint_modified {
596 6 : let new_checkpoint_bytes = self.checkpoint.encode()?;
597 :
598 6 : modification.put_checkpoint(new_checkpoint_bytes)?;
599 6 : self.checkpoint_modified = false;
600 145846 : }
601 :
602 : // Note that at this point this record is only cached in the modification
603 : // until commit() is called to flush the data into the repository and update
604 : // the latest LSN.
605 :
606 145852 : modification.on_record_end();
607 145852 :
608 145852 : Ok(modification.len() > prev_len)
609 145852 : }
610 :
611 : /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
612 0 : fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
613 0 : let next_full_xid =
614 0 : enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
615 :
616 0 : let next_xid = (next_full_xid) as u32;
617 0 : let mut epoch = (next_full_xid >> 32) as u32;
618 0 :
619 0 : if xid > next_xid {
620 : // Wraparound occurred, must be from a prev epoch.
621 0 : if epoch == 0 {
622 0 : bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
623 0 : }
624 0 : epoch -= 1;
625 0 : }
626 :
627 0 : Ok((epoch as u64) << 32 | xid as u64)
628 0 : }
629 :
630 : /// Do not store this block, but observe it for the purposes of updating our relation size state.
631 0 : async fn observe_decoded_block(
632 0 : &mut self,
633 0 : modification: &mut DatadirModification<'_>,
634 0 : blk: &DecodedBkpBlock,
635 0 : ctx: &RequestContext,
636 0 : ) -> Result<(), PageReconstructError> {
637 0 : let rel = RelTag {
638 0 : spcnode: blk.rnode_spcnode,
639 0 : dbnode: blk.rnode_dbnode,
640 0 : relnode: blk.rnode_relnode,
641 0 : forknum: blk.forknum,
642 0 : };
643 0 : self.handle_rel_extend(modification, rel, blk.blkno, ctx)
644 0 : .await
645 0 : }
646 :
647 145642 : async fn ingest_decoded_block(
648 145642 : &mut self,
649 145642 : modification: &mut DatadirModification<'_>,
650 145642 : lsn: Lsn,
651 145642 : decoded: &DecodedWALRecord,
652 145642 : blk: &DecodedBkpBlock,
653 145642 : ctx: &RequestContext,
654 145642 : ) -> Result<(), PageReconstructError> {
655 145642 : let rel = RelTag {
656 145642 : spcnode: blk.rnode_spcnode,
657 145642 : dbnode: blk.rnode_dbnode,
658 145642 : relnode: blk.rnode_relnode,
659 145642 : forknum: blk.forknum,
660 145642 : };
661 145642 :
662 145642 : //
663 145642 : // Instead of storing full-page-image WAL record,
664 145642 : // it is better to store extracted image: we can skip wal-redo
665 145642 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
666 145642 : // so them have to be copied multiple times.
667 145642 : //
668 145642 : if blk.apply_image
669 60 : && blk.has_image
670 60 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
671 24 : && (decoded.xl_info == pg_constants::XLOG_FPI
672 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
673 : // compression of WAL is not yet supported: fall back to storing the original WAL record
674 24 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
675 : // do not materialize null pages because them most likely be soon replaced with real data
676 24 : && blk.bimg_len != 0
677 : {
678 : // Extract page image from FPI record
679 24 : let img_len = blk.bimg_len as usize;
680 24 : let img_offs = blk.bimg_offset as usize;
681 24 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
682 24 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
683 24 :
684 24 : if blk.hole_length != 0 {
685 0 : let tail = image.split_off(blk.hole_offset as usize);
686 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
687 0 : image.unsplit(tail);
688 24 : }
689 : //
690 : // Match the logic of XLogReadBufferForRedoExtended:
691 : // The page may be uninitialized. If so, we can't set the LSN because
692 : // that would corrupt the page.
693 : //
694 24 : if !page_is_new(&image) {
695 18 : page_set_lsn(&mut image, lsn)
696 6 : }
697 24 : assert_eq!(image.len(), BLCKSZ as usize);
698 :
699 24 : self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
700 5 : .await?;
701 : } else {
702 145618 : let rec = NeonWalRecord::Postgres {
703 145618 : will_init: blk.will_init || blk.apply_image,
704 145618 : rec: decoded.record.clone(),
705 145618 : };
706 145618 : self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
707 279 : .await?;
708 : }
709 145642 : Ok(())
710 145642 : }
711 :
712 145474 : async fn ingest_heapam_record(
713 145474 : &mut self,
714 145474 : buf: &mut Bytes,
715 145474 : modification: &mut DatadirModification<'_>,
716 145474 : decoded: &DecodedWALRecord,
717 145474 : ctx: &RequestContext,
718 145474 : ) -> anyhow::Result<()> {
719 145474 : // Handle VM bit updates that are implicitly part of heap records.
720 145474 :
721 145474 : // First, look at the record to determine which VM bits need
722 145474 : // to be cleared. If either of these variables is set, we
723 145474 : // need to clear the corresponding bits in the visibility map.
724 145474 : let mut new_heap_blkno: Option<u32> = None;
725 145474 : let mut old_heap_blkno: Option<u32> = None;
726 145474 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
727 145474 :
728 145474 : match modification.tline.pg_version {
729 : 14 => {
730 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
731 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
732 0 :
733 0 : if info == pg_constants::XLOG_HEAP_INSERT {
734 0 : let xlrec = v14::XlHeapInsert::decode(buf);
735 0 : assert_eq!(0, buf.remaining());
736 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
737 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
738 0 : }
739 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
740 0 : let xlrec = v14::XlHeapDelete::decode(buf);
741 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
742 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
743 0 : }
744 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
745 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
746 : {
747 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
748 0 : // the size of tuple data is inferred from the size of the record.
749 0 : // we can't validate the remaining number of bytes without parsing
750 0 : // the tuple data.
751 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
752 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
753 0 : }
754 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
755 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
756 0 : // non-HOT update where the new tuple goes to different page than
757 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
758 0 : // set.
759 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
760 0 : }
761 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
762 0 : let xlrec = v14::XlHeapLock::decode(buf);
763 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
764 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
765 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
766 0 : }
767 0 : }
768 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
769 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
770 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
771 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
772 :
773 0 : let offset_array_len =
774 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
775 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
776 0 : 0
777 : } else {
778 0 : size_of::<u16>() * xlrec.ntuples as usize
779 : };
780 0 : assert_eq!(offset_array_len, buf.remaining());
781 :
782 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
783 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
784 0 : }
785 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
786 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
787 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
788 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
789 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
790 0 : }
791 0 : }
792 : } else {
793 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
794 : }
795 : }
796 : 15 => {
797 145474 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
798 145286 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
799 145286 :
800 145286 : if info == pg_constants::XLOG_HEAP_INSERT {
801 145276 : let xlrec = v15::XlHeapInsert::decode(buf);
802 145276 : assert_eq!(0, buf.remaining());
803 145276 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
804 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
805 145272 : }
806 10 : } else if info == pg_constants::XLOG_HEAP_DELETE {
807 0 : let xlrec = v15::XlHeapDelete::decode(buf);
808 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
809 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
810 0 : }
811 10 : } else if info == pg_constants::XLOG_HEAP_UPDATE
812 2 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
813 : {
814 8 : let xlrec = v15::XlHeapUpdate::decode(buf);
815 8 : // the size of tuple data is inferred from the size of the record.
816 8 : // we can't validate the remaining number of bytes without parsing
817 8 : // the tuple data.
818 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
819 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
820 8 : }
821 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
822 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
823 0 : // non-HOT update where the new tuple goes to different page than
824 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
825 0 : // set.
826 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
827 8 : }
828 2 : } else if info == pg_constants::XLOG_HEAP_LOCK {
829 0 : let xlrec = v15::XlHeapLock::decode(buf);
830 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
831 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
832 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
833 0 : }
834 2 : }
835 188 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
836 188 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
837 188 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
838 42 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
839 :
840 42 : let offset_array_len =
841 42 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
842 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
843 2 : 0
844 : } else {
845 40 : size_of::<u16>() * xlrec.ntuples as usize
846 : };
847 42 : assert_eq!(offset_array_len, buf.remaining());
848 :
849 42 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
850 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
851 34 : }
852 146 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
853 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
854 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
855 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
856 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
857 0 : }
858 146 : }
859 : } else {
860 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
861 : }
862 : }
863 : 16 => {
864 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
865 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
866 0 :
867 0 : if info == pg_constants::XLOG_HEAP_INSERT {
868 0 : let xlrec = v16::XlHeapInsert::decode(buf);
869 0 : assert_eq!(0, buf.remaining());
870 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
871 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
872 0 : }
873 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
874 0 : let xlrec = v16::XlHeapDelete::decode(buf);
875 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
876 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
877 0 : }
878 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
879 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
880 : {
881 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
882 0 : // the size of tuple data is inferred from the size of the record.
883 0 : // we can't validate the remaining number of bytes without parsing
884 0 : // the tuple data.
885 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
886 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
887 0 : }
888 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
889 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
890 0 : // non-HOT update where the new tuple goes to different page than
891 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
892 0 : // set.
893 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
894 0 : }
895 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
896 0 : let xlrec = v16::XlHeapLock::decode(buf);
897 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
898 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
899 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
900 0 : }
901 0 : }
902 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
903 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
904 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
905 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
906 :
907 0 : let offset_array_len =
908 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
909 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
910 0 : 0
911 : } else {
912 0 : size_of::<u16>() * xlrec.ntuples as usize
913 : };
914 0 : assert_eq!(offset_array_len, buf.remaining());
915 :
916 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
917 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
918 0 : }
919 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
920 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
921 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
922 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
923 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
924 0 : }
925 0 : }
926 : } else {
927 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
928 : }
929 : }
930 : 17 => {
931 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
932 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
933 0 :
934 0 : if info == pg_constants::XLOG_HEAP_INSERT {
935 0 : let xlrec = v17::XlHeapInsert::decode(buf);
936 0 : assert_eq!(0, buf.remaining());
937 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
938 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
939 0 : }
940 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
941 0 : let xlrec = v17::XlHeapDelete::decode(buf);
942 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
943 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
944 0 : }
945 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
946 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
947 : {
948 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
949 0 : // the size of tuple data is inferred from the size of the record.
950 0 : // we can't validate the remaining number of bytes without parsing
951 0 : // the tuple data.
952 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
953 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
954 0 : }
955 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
956 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
957 0 : // non-HOT update where the new tuple goes to different page than
958 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
959 0 : // set.
960 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
961 0 : }
962 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
963 0 : let xlrec = v17::XlHeapLock::decode(buf);
964 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
965 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
966 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
967 0 : }
968 0 : }
969 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
970 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
971 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
972 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
973 :
974 0 : let offset_array_len =
975 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
976 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
977 0 : 0
978 : } else {
979 0 : size_of::<u16>() * xlrec.ntuples as usize
980 : };
981 0 : assert_eq!(offset_array_len, buf.remaining());
982 :
983 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
984 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
985 0 : }
986 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
987 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
988 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
989 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
990 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
991 0 : }
992 0 : }
993 : } else {
994 0 : bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
995 : }
996 : }
997 0 : _ => {}
998 : }
999 :
1000 : // Clear the VM bits if required.
1001 145474 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
1002 12 : let vm_rel = RelTag {
1003 12 : forknum: VISIBILITYMAP_FORKNUM,
1004 12 : spcnode: decoded.blocks[0].rnode_spcnode,
1005 12 : dbnode: decoded.blocks[0].rnode_dbnode,
1006 12 : relnode: decoded.blocks[0].rnode_relnode,
1007 12 : };
1008 12 :
1009 12 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
1010 12 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
1011 :
1012 : // Sometimes, Postgres seems to create heap WAL records with the
1013 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
1014 : // not set. In fact, it's possible that the VM page does not exist at all.
1015 : // In that case, we don't want to store a record to clear the VM bit;
1016 : // replaying it would fail to find the previous image of the page, because
1017 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
1018 : // record if it doesn't.
1019 12 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
1020 12 : if let Some(blknum) = new_vm_blk {
1021 12 : if blknum >= vm_size {
1022 0 : new_vm_blk = None;
1023 12 : }
1024 0 : }
1025 12 : if let Some(blknum) = old_vm_blk {
1026 0 : if blknum >= vm_size {
1027 0 : old_vm_blk = None;
1028 0 : }
1029 12 : }
1030 :
1031 12 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
1032 12 : if new_vm_blk == old_vm_blk {
1033 : // An UPDATE record that needs to clear the bits for both old and the
1034 : // new page, both of which reside on the same VM page.
1035 0 : self.put_rel_wal_record(
1036 0 : modification,
1037 0 : vm_rel,
1038 0 : new_vm_blk.unwrap(),
1039 0 : NeonWalRecord::ClearVisibilityMapFlags {
1040 0 : new_heap_blkno,
1041 0 : old_heap_blkno,
1042 0 : flags,
1043 0 : },
1044 0 : ctx,
1045 0 : )
1046 0 : .await?;
1047 : } else {
1048 : // Clear VM bits for one heap page, or for two pages that reside on
1049 : // different VM pages.
1050 12 : if let Some(new_vm_blk) = new_vm_blk {
1051 12 : self.put_rel_wal_record(
1052 12 : modification,
1053 12 : vm_rel,
1054 12 : new_vm_blk,
1055 12 : NeonWalRecord::ClearVisibilityMapFlags {
1056 12 : new_heap_blkno,
1057 12 : old_heap_blkno: None,
1058 12 : flags,
1059 12 : },
1060 12 : ctx,
1061 12 : )
1062 0 : .await?;
1063 0 : }
1064 12 : if let Some(old_vm_blk) = old_vm_blk {
1065 0 : self.put_rel_wal_record(
1066 0 : modification,
1067 0 : vm_rel,
1068 0 : old_vm_blk,
1069 0 : NeonWalRecord::ClearVisibilityMapFlags {
1070 0 : new_heap_blkno: None,
1071 0 : old_heap_blkno,
1072 0 : flags,
1073 0 : },
1074 0 : ctx,
1075 0 : )
1076 0 : .await?;
1077 12 : }
1078 : }
1079 0 : }
1080 145462 : }
1081 :
1082 145474 : Ok(())
1083 145474 : }
1084 :
1085 0 : async fn ingest_neonrmgr_record(
1086 0 : &mut self,
1087 0 : buf: &mut Bytes,
1088 0 : modification: &mut DatadirModification<'_>,
1089 0 : decoded: &DecodedWALRecord,
1090 0 : ctx: &RequestContext,
1091 0 : ) -> anyhow::Result<()> {
1092 0 : // Handle VM bit updates that are implicitly part of heap records.
1093 0 :
1094 0 : // First, look at the record to determine which VM bits need
1095 0 : // to be cleared. If either of these variables is set, we
1096 0 : // need to clear the corresponding bits in the visibility map.
1097 0 : let mut new_heap_blkno: Option<u32> = None;
1098 0 : let mut old_heap_blkno: Option<u32> = None;
1099 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
1100 0 : let pg_version = modification.tline.pg_version;
1101 0 :
1102 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
1103 :
1104 0 : match pg_version {
1105 : 16 | 17 => {
1106 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1107 0 :
1108 0 : match info {
1109 : pg_constants::XLOG_NEON_HEAP_INSERT => {
1110 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
1111 0 : assert_eq!(0, buf.remaining());
1112 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
1113 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1114 0 : }
1115 : }
1116 : pg_constants::XLOG_NEON_HEAP_DELETE => {
1117 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
1118 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
1119 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1120 0 : }
1121 : }
1122 : pg_constants::XLOG_NEON_HEAP_UPDATE
1123 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
1124 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
1125 0 : // the size of tuple data is inferred from the size of the record.
1126 0 : // we can't validate the remaining number of bytes without parsing
1127 0 : // the tuple data.
1128 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
1129 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
1130 0 : }
1131 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
1132 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
1133 0 : // non-HOT update where the new tuple goes to different page than
1134 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
1135 0 : // set.
1136 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1137 0 : }
1138 : }
1139 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
1140 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
1141 :
1142 0 : let offset_array_len =
1143 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
1144 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
1145 0 : 0
1146 : } else {
1147 0 : size_of::<u16>() * xlrec.ntuples as usize
1148 : };
1149 0 : assert_eq!(offset_array_len, buf.remaining());
1150 :
1151 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
1152 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
1153 0 : }
1154 : }
1155 : pg_constants::XLOG_NEON_HEAP_LOCK => {
1156 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
1157 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
1158 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
1159 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
1160 0 : }
1161 : }
1162 0 : info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
1163 : }
1164 : }
1165 0 : _ => bail!(
1166 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
1167 0 : pg_version
1168 0 : ),
1169 : }
1170 :
1171 : // Clear the VM bits if required.
1172 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
1173 0 : let vm_rel = RelTag {
1174 0 : forknum: VISIBILITYMAP_FORKNUM,
1175 0 : spcnode: decoded.blocks[0].rnode_spcnode,
1176 0 : dbnode: decoded.blocks[0].rnode_dbnode,
1177 0 : relnode: decoded.blocks[0].rnode_relnode,
1178 0 : };
1179 0 :
1180 0 : let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
1181 0 : let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
1182 :
1183 : // Sometimes, Postgres seems to create heap WAL records with the
1184 : // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
1185 : // not set. In fact, it's possible that the VM page does not exist at all.
1186 : // In that case, we don't want to store a record to clear the VM bit;
1187 : // replaying it would fail to find the previous image of the page, because
1188 : // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
1189 : // record if it doesn't.
1190 0 : let vm_size = get_relsize(modification, vm_rel, ctx).await?;
1191 0 : if let Some(blknum) = new_vm_blk {
1192 0 : if blknum >= vm_size {
1193 0 : new_vm_blk = None;
1194 0 : }
1195 0 : }
1196 0 : if let Some(blknum) = old_vm_blk {
1197 0 : if blknum >= vm_size {
1198 0 : old_vm_blk = None;
1199 0 : }
1200 0 : }
1201 :
1202 0 : if new_vm_blk.is_some() || old_vm_blk.is_some() {
1203 0 : if new_vm_blk == old_vm_blk {
1204 : // An UPDATE record that needs to clear the bits for both old and the
1205 : // new page, both of which reside on the same VM page.
1206 0 : self.put_rel_wal_record(
1207 0 : modification,
1208 0 : vm_rel,
1209 0 : new_vm_blk.unwrap(),
1210 0 : NeonWalRecord::ClearVisibilityMapFlags {
1211 0 : new_heap_blkno,
1212 0 : old_heap_blkno,
1213 0 : flags,
1214 0 : },
1215 0 : ctx,
1216 0 : )
1217 0 : .await?;
1218 : } else {
1219 : // Clear VM bits for one heap page, or for two pages that reside on
1220 : // different VM pages.
1221 0 : if let Some(new_vm_blk) = new_vm_blk {
1222 0 : self.put_rel_wal_record(
1223 0 : modification,
1224 0 : vm_rel,
1225 0 : new_vm_blk,
1226 0 : NeonWalRecord::ClearVisibilityMapFlags {
1227 0 : new_heap_blkno,
1228 0 : old_heap_blkno: None,
1229 0 : flags,
1230 0 : },
1231 0 : ctx,
1232 0 : )
1233 0 : .await?;
1234 0 : }
1235 0 : if let Some(old_vm_blk) = old_vm_blk {
1236 0 : self.put_rel_wal_record(
1237 0 : modification,
1238 0 : vm_rel,
1239 0 : old_vm_blk,
1240 0 : NeonWalRecord::ClearVisibilityMapFlags {
1241 0 : new_heap_blkno: None,
1242 0 : old_heap_blkno,
1243 0 : flags,
1244 0 : },
1245 0 : ctx,
1246 0 : )
1247 0 : .await?;
1248 0 : }
1249 : }
1250 0 : }
1251 0 : }
1252 :
1253 0 : Ok(())
1254 0 : }
1255 :
1256 : /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
1257 0 : async fn ingest_xlog_dbase_create(
1258 0 : &mut self,
1259 0 : modification: &mut DatadirModification<'_>,
1260 0 : rec: &XlCreateDatabase,
1261 0 : ctx: &RequestContext,
1262 0 : ) -> anyhow::Result<()> {
1263 0 : let db_id = rec.db_id;
1264 0 : let tablespace_id = rec.tablespace_id;
1265 0 : let src_db_id = rec.src_db_id;
1266 0 : let src_tablespace_id = rec.src_tablespace_id;
1267 :
1268 0 : let rels = modification
1269 0 : .tline
1270 0 : .list_rels(
1271 0 : src_tablespace_id,
1272 0 : src_db_id,
1273 0 : Version::Modified(modification),
1274 0 : ctx,
1275 0 : )
1276 0 : .await?;
1277 :
1278 0 : debug!("ingest_xlog_dbase_create: {} rels", rels.len());
1279 :
1280 : // Copy relfilemap
1281 0 : let filemap = modification
1282 0 : .tline
1283 0 : .get_relmap_file(
1284 0 : src_tablespace_id,
1285 0 : src_db_id,
1286 0 : Version::Modified(modification),
1287 0 : ctx,
1288 0 : )
1289 0 : .await?;
1290 0 : modification
1291 0 : .put_relmap_file(tablespace_id, db_id, filemap, ctx)
1292 0 : .await?;
1293 :
1294 0 : let mut num_rels_copied = 0;
1295 0 : let mut num_blocks_copied = 0;
1296 0 : for src_rel in rels {
1297 0 : assert_eq!(src_rel.spcnode, src_tablespace_id);
1298 0 : assert_eq!(src_rel.dbnode, src_db_id);
1299 :
1300 0 : let nblocks = modification
1301 0 : .tline
1302 0 : .get_rel_size(src_rel, Version::Modified(modification), ctx)
1303 0 : .await?;
1304 0 : let dst_rel = RelTag {
1305 0 : spcnode: tablespace_id,
1306 0 : dbnode: db_id,
1307 0 : relnode: src_rel.relnode,
1308 0 : forknum: src_rel.forknum,
1309 0 : };
1310 0 :
1311 0 : modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
1312 :
1313 : // Copy content
1314 0 : debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
1315 0 : for blknum in 0..nblocks {
1316 : // Sharding:
1317 : // - src and dst are always on the same shard, because they differ only by dbNode, and
1318 : // dbNode is not included in the hash inputs for sharding.
1319 : // - This WAL command is replayed on all shards, but each shard only copies the blocks
1320 : // that belong to it.
1321 0 : let src_key = rel_block_to_key(src_rel, blknum);
1322 0 : if !self.shard.is_key_local(&src_key) {
1323 0 : debug!(
1324 0 : "Skipping non-local key {} during XLOG_DBASE_CREATE",
1325 : src_key
1326 : );
1327 0 : continue;
1328 0 : }
1329 0 : debug!(
1330 0 : "copying block {} from {} ({}) to {}",
1331 : blknum, src_rel, src_key, dst_rel
1332 : );
1333 :
1334 0 : let content = modification
1335 0 : .tline
1336 0 : .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
1337 0 : .await?;
1338 0 : modification.put_rel_page_image(dst_rel, blknum, content)?;
1339 0 : num_blocks_copied += 1;
1340 : }
1341 :
1342 0 : num_rels_copied += 1;
1343 : }
1344 :
1345 0 : info!(
1346 0 : "Created database {}/{}, copied {} blocks in {} rels",
1347 : tablespace_id, db_id, num_blocks_copied, num_rels_copied
1348 : );
1349 0 : Ok(())
1350 0 : }
1351 :
1352 16 : async fn ingest_xlog_smgr_create(
1353 16 : &mut self,
1354 16 : modification: &mut DatadirModification<'_>,
1355 16 : rec: &XlSmgrCreate,
1356 16 : ctx: &RequestContext,
1357 16 : ) -> anyhow::Result<()> {
1358 16 : let rel = RelTag {
1359 16 : spcnode: rec.rnode.spcnode,
1360 16 : dbnode: rec.rnode.dbnode,
1361 16 : relnode: rec.rnode.relnode,
1362 16 : forknum: rec.forknum,
1363 16 : };
1364 16 : self.put_rel_creation(modification, rel, ctx).await?;
1365 16 : Ok(())
1366 16 : }
1367 :
1368 : /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
1369 : ///
1370 : /// This is the same logic as in PostgreSQL's smgr_redo() function.
1371 0 : async fn ingest_xlog_smgr_truncate(
1372 0 : &mut self,
1373 0 : modification: &mut DatadirModification<'_>,
1374 0 : rec: &XlSmgrTruncate,
1375 0 : ctx: &RequestContext,
1376 0 : ) -> anyhow::Result<()> {
1377 0 : let spcnode = rec.rnode.spcnode;
1378 0 : let dbnode = rec.rnode.dbnode;
1379 0 : let relnode = rec.rnode.relnode;
1380 0 :
1381 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
1382 0 : let rel = RelTag {
1383 0 : spcnode,
1384 0 : dbnode,
1385 0 : relnode,
1386 0 : forknum: MAIN_FORKNUM,
1387 0 : };
1388 0 : self.put_rel_truncation(modification, rel, rec.blkno, ctx)
1389 0 : .await?;
1390 0 : }
1391 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
1392 0 : let rel = RelTag {
1393 0 : spcnode,
1394 0 : dbnode,
1395 0 : relnode,
1396 0 : forknum: FSM_FORKNUM,
1397 0 : };
1398 0 :
1399 0 : let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
1400 0 : let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
1401 0 : if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
1402 : // Tail of last remaining FSM page has to be zeroed.
1403 : // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
1404 0 : modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
1405 0 : fsm_physical_page_no += 1;
1406 0 : }
1407 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1408 0 : if nblocks > fsm_physical_page_no {
1409 : // check if something to do: FSM is larger than truncate position
1410 0 : self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
1411 0 : .await?;
1412 0 : }
1413 0 : }
1414 0 : if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
1415 0 : let rel = RelTag {
1416 0 : spcnode,
1417 0 : dbnode,
1418 0 : relnode,
1419 0 : forknum: VISIBILITYMAP_FORKNUM,
1420 0 : };
1421 0 :
1422 0 : let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
1423 0 : if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
1424 : // Tail of last remaining vm page has to be zeroed.
1425 : // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
1426 0 : modification.put_rel_page_image_zero(rel, vm_page_no)?;
1427 0 : vm_page_no += 1;
1428 0 : }
1429 0 : let nblocks = get_relsize(modification, rel, ctx).await?;
1430 0 : if nblocks > vm_page_no {
1431 : // check if something to do: VM is larger than truncate position
1432 0 : self.put_rel_truncation(modification, rel, vm_page_no, ctx)
1433 0 : .await?;
1434 0 : }
1435 0 : }
1436 0 : Ok(())
1437 0 : }
1438 :
1439 8 : fn warn_on_ingest_lag(
1440 8 : &mut self,
1441 8 : conf: &crate::config::PageServerConf,
1442 8 : wal_timestamp: TimestampTz,
1443 8 : ) {
1444 8 : debug_assert_current_span_has_tenant_and_timeline_id();
1445 8 : let now = SystemTime::now();
1446 8 : let rate_limits = &mut self.warn_ingest_lag;
1447 :
1448 8 : let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
1449 0 : pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
1450 : });
1451 :
1452 8 : match ts {
1453 8 : Ok(ts) => {
1454 8 : match now.duration_since(ts) {
1455 8 : Ok(lag) => {
1456 8 : if lag > conf.wait_lsn_timeout {
1457 8 : rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
1458 2 : if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
1459 0 : if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
1460 0 : return;
1461 0 : }
1462 2 : } else {
1463 2 : // Still loading? We shouldn't be here
1464 2 : }
1465 2 : let lag = humantime::format_duration(lag);
1466 2 : warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
1467 8 : })
1468 0 : }
1469 : }
1470 0 : Err(e) => {
1471 0 : let delta_t = e.duration();
1472 : // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
1473 : // => https://www.robustperception.io/time-metric-from-the-node-exporter/
1474 : const IGNORED_DRIFT: Duration = Duration::from_millis(100);
1475 0 : if delta_t > IGNORED_DRIFT {
1476 0 : let delta_t = humantime::format_duration(delta_t);
1477 0 : rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
1478 0 : warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
1479 0 : })
1480 0 : }
1481 : }
1482 : };
1483 : }
1484 0 : Err(error) => {
1485 0 : rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
1486 0 : warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
1487 0 : })
1488 : }
1489 : }
1490 8 : }
1491 :
1492 : /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
1493 : ///
1494 8 : async fn ingest_xact_record(
1495 8 : &mut self,
1496 8 : modification: &mut DatadirModification<'_>,
1497 8 : parsed: &XlXactParsedRecord,
1498 8 : is_commit: bool,
1499 8 : origin_id: u16,
1500 8 : ctx: &RequestContext,
1501 8 : ) -> anyhow::Result<()> {
1502 8 : // Record update of CLOG pages
1503 8 : let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
1504 8 : let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1505 8 : let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1506 8 : let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
1507 8 :
1508 8 : self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
1509 :
1510 8 : for subxact in &parsed.subxacts {
1511 0 : let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
1512 0 : if subxact_pageno != pageno {
1513 : // This subxact goes to different page. Write the record
1514 : // for all the XIDs on the previous page, and continue
1515 : // accumulating XIDs on this new page.
1516 0 : modification.put_slru_wal_record(
1517 0 : SlruKind::Clog,
1518 0 : segno,
1519 0 : rpageno,
1520 0 : if is_commit {
1521 0 : NeonWalRecord::ClogSetCommitted {
1522 0 : xids: page_xids,
1523 0 : timestamp: parsed.xact_time,
1524 0 : }
1525 : } else {
1526 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1527 : },
1528 0 : )?;
1529 0 : page_xids = Vec::new();
1530 0 : }
1531 0 : pageno = subxact_pageno;
1532 0 : segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1533 0 : rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1534 0 : page_xids.push(*subxact);
1535 : }
1536 8 : modification.put_slru_wal_record(
1537 8 : SlruKind::Clog,
1538 8 : segno,
1539 8 : rpageno,
1540 8 : if is_commit {
1541 8 : NeonWalRecord::ClogSetCommitted {
1542 8 : xids: page_xids,
1543 8 : timestamp: parsed.xact_time,
1544 8 : }
1545 : } else {
1546 0 : NeonWalRecord::ClogSetAborted { xids: page_xids }
1547 : },
1548 0 : )?;
1549 :
1550 8 : for xnode in &parsed.xnodes {
1551 0 : for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
1552 0 : let rel = RelTag {
1553 0 : forknum,
1554 0 : spcnode: xnode.spcnode,
1555 0 : dbnode: xnode.dbnode,
1556 0 : relnode: xnode.relnode,
1557 0 : };
1558 0 : if modification
1559 0 : .tline
1560 0 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1561 0 : .await?
1562 : {
1563 0 : self.put_rel_drop(modification, rel, ctx).await?;
1564 0 : }
1565 : }
1566 : }
1567 8 : if origin_id != 0 {
1568 0 : modification
1569 0 : .set_replorigin(origin_id, parsed.origin_lsn)
1570 0 : .await?;
1571 8 : }
1572 8 : Ok(())
1573 8 : }
1574 :
1575 0 : async fn ingest_clog_truncate_record(
1576 0 : &mut self,
1577 0 : modification: &mut DatadirModification<'_>,
1578 0 : xlrec: &XlClogTruncate,
1579 0 : ctx: &RequestContext,
1580 0 : ) -> anyhow::Result<()> {
1581 0 : info!(
1582 0 : "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
1583 : xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
1584 : );
1585 :
1586 : // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
1587 : // truncated, but a checkpoint record with the updated values isn't written until
1588 : // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
1589 : // so we keep the oldestXid and oldestXidDB up-to-date.
1590 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1591 0 : cp.oldestXid = xlrec.oldest_xid;
1592 0 : cp.oldestXidDB = xlrec.oldest_xid_db;
1593 0 : });
1594 0 : self.checkpoint_modified = true;
1595 :
1596 : // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
1597 :
1598 0 : let latest_page_number =
1599 0 : enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
1600 : / pg_constants::CLOG_XACTS_PER_PAGE;
1601 :
1602 : // Now delete all segments containing pages between xlrec.pageno
1603 : // and latest_page_number.
1604 :
1605 : // First, make an important safety check:
1606 : // the current endpoint page must not be eligible for removal.
1607 : // See SimpleLruTruncate() in slru.c
1608 0 : if dispatch_pgversion!(modification.tline.pg_version, {
1609 0 : pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno)
1610 : }) {
1611 0 : info!("could not truncate directory pg_xact apparent wraparound");
1612 0 : return Ok(());
1613 0 : }
1614 :
1615 : // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
1616 : //
1617 : // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
1618 : // will block waiting for the last valid LSN to advance up to
1619 : // it. So we use the previous record's LSN in the get calls
1620 : // instead.
1621 0 : for segno in modification
1622 0 : .tline
1623 0 : .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
1624 0 : .await?
1625 : {
1626 0 : let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
1627 :
1628 0 : let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
1629 0 : pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno)
1630 : });
1631 :
1632 0 : if may_delete {
1633 0 : modification
1634 0 : .drop_slru_segment(SlruKind::Clog, segno, ctx)
1635 0 : .await?;
1636 0 : trace!("Drop CLOG segment {:>04X}", segno);
1637 0 : }
1638 : }
1639 :
1640 0 : Ok(())
1641 0 : }
1642 :
1643 0 : fn ingest_multixact_create_record(
1644 0 : &mut self,
1645 0 : modification: &mut DatadirModification,
1646 0 : xlrec: &XlMultiXactCreate,
1647 0 : ) -> Result<()> {
1648 0 : // Create WAL record for updating the multixact-offsets page
1649 0 : let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
1650 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
1651 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
1652 0 :
1653 0 : modification.put_slru_wal_record(
1654 0 : SlruKind::MultiXactOffsets,
1655 0 : segno,
1656 0 : rpageno,
1657 0 : NeonWalRecord::MultixactOffsetCreate {
1658 0 : mid: xlrec.mid,
1659 0 : moff: xlrec.moff,
1660 0 : },
1661 0 : )?;
1662 :
1663 : // Create WAL records for the update of each affected multixact-members page
1664 0 : let mut members = xlrec.members.iter();
1665 0 : let mut offset = xlrec.moff;
1666 : loop {
1667 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1668 0 :
1669 0 : // How many members fit on this page?
1670 0 : let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
1671 0 : - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
1672 0 :
1673 0 : let mut this_page_members: Vec<MultiXactMember> = Vec::new();
1674 0 : for _ in 0..page_remain {
1675 0 : if let Some(m) = members.next() {
1676 0 : this_page_members.push(m.clone());
1677 0 : } else {
1678 0 : break;
1679 : }
1680 : }
1681 0 : if this_page_members.is_empty() {
1682 : // all done
1683 0 : break;
1684 0 : }
1685 0 : let n_this_page = this_page_members.len();
1686 0 :
1687 0 : modification.put_slru_wal_record(
1688 0 : SlruKind::MultiXactMembers,
1689 0 : pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
1690 0 : pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
1691 0 : NeonWalRecord::MultixactMembersCreate {
1692 0 : moff: offset,
1693 0 : members: this_page_members,
1694 0 : },
1695 0 : )?;
1696 :
1697 : // Note: The multixact members can wrap around, even within one WAL record.
1698 0 : offset = offset.wrapping_add(n_this_page as u32);
1699 : }
1700 0 : let next_offset = offset;
1701 0 : assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
1702 :
1703 : // Update next-multi-xid and next-offset
1704 : //
1705 : // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
1706 : // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
1707 : // read it, like GetNewMultiXactId(). This is different from how nextXid is
1708 : // incremented! nextXid skips over < FirstNormalTransactionId when the the value
1709 : // is stored, so it's never 0 in a checkpoint.
1710 : //
1711 : // I don't know why it's done that way, it seems less error-prone to skip over 0
1712 : // when the value is stored rather than when it's read. But let's do it the same
1713 : // way here.
1714 0 : let next_multi_xid = xlrec.mid.wrapping_add(1);
1715 0 :
1716 0 : if self
1717 0 : .checkpoint
1718 0 : .update_next_multixid(next_multi_xid, next_offset)
1719 0 : {
1720 0 : self.checkpoint_modified = true;
1721 0 : }
1722 :
1723 : // Also update the next-xid with the highest member. According to the comments in
1724 : // multixact_redo(), this shouldn't be necessary, but let's do the same here.
1725 0 : let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
1726 0 : if let Some(max_xid) = acc {
1727 0 : if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
1728 0 : Some(mbr.xid)
1729 : } else {
1730 0 : acc
1731 : }
1732 : } else {
1733 0 : Some(mbr.xid)
1734 : }
1735 0 : });
1736 :
1737 0 : if let Some(max_xid) = max_mbr_xid {
1738 0 : if self.checkpoint.update_next_xid(max_xid) {
1739 0 : self.checkpoint_modified = true;
1740 0 : }
1741 0 : }
1742 0 : Ok(())
1743 0 : }
1744 :
1745 0 : async fn ingest_multixact_truncate_record(
1746 0 : &mut self,
1747 0 : modification: &mut DatadirModification<'_>,
1748 0 : xlrec: &XlMultiXactTruncate,
1749 0 : ctx: &RequestContext,
1750 0 : ) -> Result<()> {
1751 0 : let (maxsegment, startsegment, endsegment) =
1752 0 : enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
1753 0 : cp.oldestMulti = xlrec.end_trunc_off;
1754 0 : cp.oldestMultiDB = xlrec.oldest_multi_db;
1755 0 : let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
1756 0 : pg_constants::MAX_MULTIXACT_OFFSET,
1757 0 : );
1758 0 : let startsegment: i32 =
1759 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
1760 0 : let endsegment: i32 =
1761 0 : pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
1762 0 : (maxsegment, startsegment, endsegment)
1763 : });
1764 :
1765 0 : self.checkpoint_modified = true;
1766 0 :
1767 0 : // PerformMembersTruncation
1768 0 : let mut segment: i32 = startsegment;
1769 :
1770 : // Delete all the segments except the last one. The last segment can still
1771 : // contain, possibly partially, valid data.
1772 0 : while segment != endsegment {
1773 0 : modification
1774 0 : .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
1775 0 : .await?;
1776 :
1777 : /* move to next segment, handling wraparound correctly */
1778 0 : if segment == maxsegment {
1779 0 : segment = 0;
1780 0 : } else {
1781 0 : segment += 1;
1782 0 : }
1783 : }
1784 :
1785 : // Truncate offsets
1786 : // FIXME: this did not handle wraparound correctly
1787 :
1788 0 : Ok(())
1789 0 : }
1790 :
1791 0 : async fn ingest_relmap_page(
1792 0 : &mut self,
1793 0 : modification: &mut DatadirModification<'_>,
1794 0 : xlrec: &XlRelmapUpdate,
1795 0 : decoded: &DecodedWALRecord,
1796 0 : ctx: &RequestContext,
1797 0 : ) -> Result<()> {
1798 0 : let mut buf = decoded.record.clone();
1799 0 : buf.advance(decoded.main_data_offset);
1800 0 : // skip xl_relmap_update
1801 0 : buf.advance(12);
1802 0 :
1803 0 : modification
1804 0 : .put_relmap_file(
1805 0 : xlrec.tsid,
1806 0 : xlrec.dbid,
1807 0 : Bytes::copy_from_slice(&buf[..]),
1808 0 : ctx,
1809 0 : )
1810 0 : .await
1811 0 : }
1812 :
1813 18 : async fn put_rel_creation(
1814 18 : &mut self,
1815 18 : modification: &mut DatadirModification<'_>,
1816 18 : rel: RelTag,
1817 18 : ctx: &RequestContext,
1818 18 : ) -> Result<()> {
1819 18 : modification.put_rel_creation(rel, 0, ctx).await?;
1820 18 : Ok(())
1821 18 : }
1822 :
1823 272426 : async fn put_rel_page_image(
1824 272426 : &mut self,
1825 272426 : modification: &mut DatadirModification<'_>,
1826 272426 : rel: RelTag,
1827 272426 : blknum: BlockNumber,
1828 272426 : img: Bytes,
1829 272426 : ctx: &RequestContext,
1830 272426 : ) -> Result<(), PageReconstructError> {
1831 272426 : self.handle_rel_extend(modification, rel, blknum, ctx)
1832 5425 : .await?;
1833 272426 : modification.put_rel_page_image(rel, blknum, img)?;
1834 272426 : Ok(())
1835 272426 : }
1836 :
1837 145630 : async fn put_rel_wal_record(
1838 145630 : &mut self,
1839 145630 : modification: &mut DatadirModification<'_>,
1840 145630 : rel: RelTag,
1841 145630 : blknum: BlockNumber,
1842 145630 : rec: NeonWalRecord,
1843 145630 : ctx: &RequestContext,
1844 145630 : ) -> Result<()> {
1845 145630 : self.handle_rel_extend(modification, rel, blknum, ctx)
1846 279 : .await?;
1847 145630 : modification.put_rel_wal_record(rel, blknum, rec)?;
1848 145630 : Ok(())
1849 145630 : }
1850 :
1851 6012 : async fn put_rel_truncation(
1852 6012 : &mut self,
1853 6012 : modification: &mut DatadirModification<'_>,
1854 6012 : rel: RelTag,
1855 6012 : nblocks: BlockNumber,
1856 6012 : ctx: &RequestContext,
1857 6012 : ) -> anyhow::Result<()> {
1858 6012 : modification.put_rel_truncation(rel, nblocks, ctx).await?;
1859 6012 : Ok(())
1860 6012 : }
1861 :
1862 2 : async fn put_rel_drop(
1863 2 : &mut self,
1864 2 : modification: &mut DatadirModification<'_>,
1865 2 : rel: RelTag,
1866 2 : ctx: &RequestContext,
1867 2 : ) -> Result<()> {
1868 2 : modification.put_rel_drop(rel, ctx).await?;
1869 2 : Ok(())
1870 2 : }
1871 :
1872 418056 : async fn handle_rel_extend(
1873 418056 : &mut self,
1874 418056 : modification: &mut DatadirModification<'_>,
1875 418056 : rel: RelTag,
1876 418056 : blknum: BlockNumber,
1877 418056 : ctx: &RequestContext,
1878 418056 : ) -> Result<(), PageReconstructError> {
1879 418056 : let new_nblocks = blknum + 1;
1880 : // Check if the relation exists. We implicitly create relations on first
1881 : // record.
1882 : // TODO: would be nice if to be more explicit about it
1883 :
1884 : // Get current size and put rel creation if rel doesn't exist
1885 : //
1886 : // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
1887 : // check the cache too. This is because eagerly checking the cache results in
1888 : // less work overall and 10% better performance. It's more work on cache miss
1889 : // but cache miss is rare.
1890 418056 : let old_nblocks = if let Some(nblocks) = modification
1891 418056 : .tline
1892 418056 : .get_cached_rel_size(&rel, modification.get_lsn())
1893 : {
1894 418046 : nblocks
1895 10 : } else if !modification
1896 10 : .tline
1897 10 : .get_rel_exists(rel, Version::Modified(modification), ctx)
1898 2 : .await?
1899 : {
1900 : // create it with 0 size initially, the logic below will extend it
1901 10 : modification
1902 10 : .put_rel_creation(rel, 0, ctx)
1903 3 : .await
1904 10 : .context("Relation Error")?;
1905 10 : 0
1906 : } else {
1907 0 : modification
1908 0 : .tline
1909 0 : .get_rel_size(rel, Version::Modified(modification), ctx)
1910 0 : .await?
1911 : };
1912 :
1913 418056 : if new_nblocks > old_nblocks {
1914 : //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
1915 274788 : modification.put_rel_extend(rel, new_nblocks, ctx).await?;
1916 :
1917 274788 : let mut key = rel_block_to_key(rel, blknum);
1918 : // fill the gap with zeros
1919 274788 : for gap_blknum in old_nblocks..blknum {
1920 2998 : key.field6 = gap_blknum;
1921 2998 :
1922 2998 : if self.shard.get_shard_number(&key) != self.shard.number {
1923 0 : continue;
1924 2998 : }
1925 2998 :
1926 2998 : modification.put_rel_page_image_zero(rel, gap_blknum)?;
1927 : }
1928 143268 : }
1929 418056 : Ok(())
1930 418056 : }
1931 :
1932 0 : async fn put_slru_page_image(
1933 0 : &mut self,
1934 0 : modification: &mut DatadirModification<'_>,
1935 0 : kind: SlruKind,
1936 0 : segno: u32,
1937 0 : blknum: BlockNumber,
1938 0 : img: Bytes,
1939 0 : ctx: &RequestContext,
1940 0 : ) -> Result<()> {
1941 0 : self.handle_slru_extend(modification, kind, segno, blknum, ctx)
1942 0 : .await?;
1943 0 : modification.put_slru_page_image(kind, segno, blknum, img)?;
1944 0 : Ok(())
1945 0 : }
1946 :
1947 0 : async fn handle_slru_extend(
1948 0 : &mut self,
1949 0 : modification: &mut DatadirModification<'_>,
1950 0 : kind: SlruKind,
1951 0 : segno: u32,
1952 0 : blknum: BlockNumber,
1953 0 : ctx: &RequestContext,
1954 0 : ) -> anyhow::Result<()> {
1955 0 : // we don't use a cache for this like we do for relations. SLRUS are explcitly
1956 0 : // extended with ZEROPAGE records, not with commit records, so it happens
1957 0 : // a lot less frequently.
1958 0 :
1959 0 : let new_nblocks = blknum + 1;
1960 : // Check if the relation exists. We implicitly create relations on first
1961 : // record.
1962 : // TODO: would be nice if to be more explicit about it
1963 0 : let old_nblocks = if !modification
1964 0 : .tline
1965 0 : .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
1966 0 : .await?
1967 : {
1968 : // create it with 0 size initially, the logic below will extend it
1969 0 : modification
1970 0 : .put_slru_segment_creation(kind, segno, 0, ctx)
1971 0 : .await?;
1972 0 : 0
1973 : } else {
1974 0 : modification
1975 0 : .tline
1976 0 : .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
1977 0 : .await?
1978 : };
1979 :
1980 0 : if new_nblocks > old_nblocks {
1981 0 : trace!(
1982 0 : "extending SLRU {:?} seg {} from {} to {} blocks",
1983 : kind,
1984 : segno,
1985 : old_nblocks,
1986 : new_nblocks
1987 : );
1988 0 : modification.put_slru_extend(kind, segno, new_nblocks)?;
1989 :
1990 : // fill the gap with zeros
1991 0 : for gap_blknum in old_nblocks..blknum {
1992 0 : modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
1993 : }
1994 0 : }
1995 0 : Ok(())
1996 0 : }
1997 : }
1998 :
1999 12 : async fn get_relsize(
2000 12 : modification: &DatadirModification<'_>,
2001 12 : rel: RelTag,
2002 12 : ctx: &RequestContext,
2003 12 : ) -> Result<BlockNumber, PageReconstructError> {
2004 12 : let nblocks = if !modification
2005 12 : .tline
2006 12 : .get_rel_exists(rel, Version::Modified(modification), ctx)
2007 0 : .await?
2008 : {
2009 0 : 0
2010 : } else {
2011 12 : modification
2012 12 : .tline
2013 12 : .get_rel_size(rel, Version::Modified(modification), ctx)
2014 0 : .await?
2015 : };
2016 12 : Ok(nblocks)
2017 12 : }
2018 :
2019 : #[allow(clippy::bool_assert_comparison)]
2020 : #[cfg(test)]
2021 : mod tests {
2022 : use super::*;
2023 : use crate::tenant::harness::*;
2024 : use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
2025 : use postgres_ffi::RELSEG_SIZE;
2026 :
2027 : use crate::DEFAULT_PG_VERSION;
2028 :
2029 : /// Arbitrary relation tag, for testing.
2030 : const TESTREL_A: RelTag = RelTag {
2031 : spcnode: 0,
2032 : dbnode: 111,
2033 : relnode: 1000,
2034 : forknum: 0,
2035 : };
2036 :
2037 12 : fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
2038 12 : // TODO
2039 12 : }
2040 :
2041 : #[tokio::test]
2042 2 : async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
2043 8 : for i in 14..=16 {
2044 6 : dispatch_pgversion!(i, {
2045 2 : pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
2046 2 : });
2047 2 : }
2048 2 :
2049 2 : Ok(())
2050 2 : }
2051 :
2052 8 : async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
2053 8 : let mut m = tline.begin_modification(Lsn(0x10));
2054 8 : m.put_checkpoint(dispatch_pgversion!(
2055 8 : tline.pg_version,
2056 0 : pgv::ZERO_CHECKPOINT.clone()
2057 0 : ))?;
2058 16 : m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
2059 8 : m.commit(ctx).await?;
2060 8 : let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
2061 :
2062 8 : Ok(walingest)
2063 8 : }
2064 :
2065 : #[tokio::test]
2066 2 : async fn test_relsize() -> Result<()> {
2067 8 : let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
2068 2 : let tline = tenant
2069 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2070 4 : .await?;
2071 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2072 2 :
2073 2 : let mut m = tline.begin_modification(Lsn(0x20));
2074 2 : walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
2075 2 : walingest
2076 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
2077 2 : .await?;
2078 2 : m.on_record_end();
2079 2 : m.commit(&ctx).await?;
2080 2 : let mut m = tline.begin_modification(Lsn(0x30));
2081 2 : walingest
2082 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
2083 2 : .await?;
2084 2 : m.on_record_end();
2085 2 : m.commit(&ctx).await?;
2086 2 : let mut m = tline.begin_modification(Lsn(0x40));
2087 2 : walingest
2088 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
2089 2 : .await?;
2090 2 : m.on_record_end();
2091 2 : m.commit(&ctx).await?;
2092 2 : let mut m = tline.begin_modification(Lsn(0x50));
2093 2 : walingest
2094 2 : .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
2095 2 : .await?;
2096 2 : m.on_record_end();
2097 2 : m.commit(&ctx).await?;
2098 2 :
2099 2 : assert_current_logical_size(&tline, Lsn(0x50));
2100 2 :
2101 2 : // The relation was created at LSN 2, not visible at LSN 1 yet.
2102 2 : assert_eq!(
2103 2 : tline
2104 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2105 2 : .await?,
2106 2 : false
2107 2 : );
2108 2 : assert!(tline
2109 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2110 2 : .await
2111 2 : .is_err());
2112 2 : assert_eq!(
2113 2 : tline
2114 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2115 2 : .await?,
2116 2 : true
2117 2 : );
2118 2 : assert_eq!(
2119 2 : tline
2120 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2121 2 : .await?,
2122 2 : 1
2123 2 : );
2124 2 : assert_eq!(
2125 2 : tline
2126 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2127 2 : .await?,
2128 2 : 3
2129 2 : );
2130 2 :
2131 2 : // Check page contents at each LSN
2132 2 : assert_eq!(
2133 2 : tline
2134 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
2135 2 : .await?,
2136 2 : test_img("foo blk 0 at 2")
2137 2 : );
2138 2 :
2139 2 : assert_eq!(
2140 2 : tline
2141 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
2142 2 : .await?,
2143 2 : test_img("foo blk 0 at 3")
2144 2 : );
2145 2 :
2146 2 : assert_eq!(
2147 2 : tline
2148 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
2149 2 : .await?,
2150 2 : test_img("foo blk 0 at 3")
2151 2 : );
2152 2 : assert_eq!(
2153 2 : tline
2154 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
2155 2 : .await?,
2156 2 : test_img("foo blk 1 at 4")
2157 2 : );
2158 2 :
2159 2 : assert_eq!(
2160 2 : tline
2161 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
2162 2 : .await?,
2163 2 : test_img("foo blk 0 at 3")
2164 2 : );
2165 2 : assert_eq!(
2166 2 : tline
2167 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
2168 2 : .await?,
2169 2 : test_img("foo blk 1 at 4")
2170 2 : );
2171 2 : assert_eq!(
2172 2 : tline
2173 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
2174 2 : .await?,
2175 2 : test_img("foo blk 2 at 5")
2176 2 : );
2177 2 :
2178 2 : // Truncate last block
2179 2 : let mut m = tline.begin_modification(Lsn(0x60));
2180 2 : walingest
2181 2 : .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
2182 2 : .await?;
2183 2 : m.commit(&ctx).await?;
2184 2 : assert_current_logical_size(&tline, Lsn(0x60));
2185 2 :
2186 2 : // Check reported size and contents after truncation
2187 2 : assert_eq!(
2188 2 : tline
2189 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2190 2 : .await?,
2191 2 : 2
2192 2 : );
2193 2 : assert_eq!(
2194 2 : tline
2195 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
2196 2 : .await?,
2197 2 : test_img("foo blk 0 at 3")
2198 2 : );
2199 2 : assert_eq!(
2200 2 : tline
2201 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
2202 2 : .await?,
2203 2 : test_img("foo blk 1 at 4")
2204 2 : );
2205 2 :
2206 2 : // should still see the truncated block with older LSN
2207 2 : assert_eq!(
2208 2 : tline
2209 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2210 2 : .await?,
2211 2 : 3
2212 2 : );
2213 2 : assert_eq!(
2214 2 : tline
2215 2 : .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
2216 2 : .await?,
2217 2 : test_img("foo blk 2 at 5")
2218 2 : );
2219 2 :
2220 2 : // Truncate to zero length
2221 2 : let mut m = tline.begin_modification(Lsn(0x68));
2222 2 : walingest
2223 2 : .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
2224 2 : .await?;
2225 2 : m.commit(&ctx).await?;
2226 2 : assert_eq!(
2227 2 : tline
2228 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
2229 2 : .await?,
2230 2 : 0
2231 2 : );
2232 2 :
2233 2 : // Extend from 0 to 2 blocks, leaving a gap
2234 2 : let mut m = tline.begin_modification(Lsn(0x70));
2235 2 : walingest
2236 2 : .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
2237 2 : .await?;
2238 2 : m.on_record_end();
2239 2 : m.commit(&ctx).await?;
2240 2 : assert_eq!(
2241 2 : tline
2242 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
2243 2 : .await?,
2244 2 : 2
2245 2 : );
2246 2 : assert_eq!(
2247 2 : tline
2248 2 : .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
2249 2 : .await?,
2250 2 : ZERO_PAGE
2251 2 : );
2252 2 : assert_eq!(
2253 2 : tline
2254 2 : .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
2255 2 : .await?,
2256 2 : test_img("foo blk 1")
2257 2 : );
2258 2 :
2259 2 : // Extend a lot more, leaving a big gap that spans across segments
2260 2 : let mut m = tline.begin_modification(Lsn(0x80));
2261 2 : walingest
2262 2 : .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
2263 2 : .await?;
2264 2 : m.on_record_end();
2265 190 : m.commit(&ctx).await?;
2266 2 : assert_eq!(
2267 2 : tline
2268 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2269 2 : .await?,
2270 2 : 1501
2271 2 : );
2272 2998 : for blk in 2..1500 {
2273 2996 : assert_eq!(
2274 2996 : tline
2275 2996 : .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
2276 1540 : .await?,
2277 2996 : ZERO_PAGE
2278 2 : );
2279 2 : }
2280 2 : assert_eq!(
2281 2 : tline
2282 2 : .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
2283 2 : .await?,
2284 2 : test_img("foo blk 1500")
2285 2 : );
2286 2 :
2287 2 : Ok(())
2288 2 : }
2289 :
2290 : // Test what happens if we dropped a relation
2291 : // and then created it again within the same layer.
2292 : #[tokio::test]
2293 2 : async fn test_drop_extend() -> Result<()> {
2294 2 : let (tenant, ctx) = TenantHarness::create("test_drop_extend")
2295 2 : .await?
2296 2 : .load()
2297 8 : .await;
2298 2 : let tline = tenant
2299 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2300 4 : .await?;
2301 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2302 2 :
2303 2 : let mut m = tline.begin_modification(Lsn(0x20));
2304 2 : walingest
2305 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
2306 2 : .await?;
2307 2 : m.commit(&ctx).await?;
2308 2 :
2309 2 : // Check that rel exists and size is correct
2310 2 : assert_eq!(
2311 2 : tline
2312 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2313 2 : .await?,
2314 2 : true
2315 2 : );
2316 2 : assert_eq!(
2317 2 : tline
2318 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2319 2 : .await?,
2320 2 : 1
2321 2 : );
2322 2 :
2323 2 : // Drop rel
2324 2 : let mut m = tline.begin_modification(Lsn(0x30));
2325 2 : walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
2326 2 : m.commit(&ctx).await?;
2327 2 :
2328 2 : // Check that rel is not visible anymore
2329 2 : assert_eq!(
2330 2 : tline
2331 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
2332 2 : .await?,
2333 2 : false
2334 2 : );
2335 2 :
2336 2 : // FIXME: should fail
2337 2 : //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
2338 2 :
2339 2 : // Re-create it
2340 2 : let mut m = tline.begin_modification(Lsn(0x40));
2341 2 : walingest
2342 2 : .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
2343 2 : .await?;
2344 2 : m.commit(&ctx).await?;
2345 2 :
2346 2 : // Check that rel exists and size is correct
2347 2 : assert_eq!(
2348 2 : tline
2349 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2350 2 : .await?,
2351 2 : true
2352 2 : );
2353 2 : assert_eq!(
2354 2 : tline
2355 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
2356 2 : .await?,
2357 2 : 1
2358 2 : );
2359 2 :
2360 2 : Ok(())
2361 2 : }
2362 :
2363 : // Test what happens if we truncated a relation
2364 : // so that one of its segments was dropped
2365 : // and then extended it again within the same layer.
2366 : #[tokio::test]
2367 2 : async fn test_truncate_extend() -> Result<()> {
2368 2 : let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
2369 2 : .await?
2370 2 : .load()
2371 5 : .await;
2372 2 : let tline = tenant
2373 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2374 4 : .await?;
2375 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2376 2 :
2377 2 : // Create a 20 MB relation (the size is arbitrary)
2378 2 : let relsize = 20 * 1024 * 1024 / 8192;
2379 2 : let mut m = tline.begin_modification(Lsn(0x20));
2380 5120 : for blkno in 0..relsize {
2381 5120 : let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
2382 5120 : walingest
2383 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2384 2 : .await?;
2385 2 : }
2386 2 : m.commit(&ctx).await?;
2387 2 :
2388 2 : // The relation was created at LSN 20, not visible at LSN 1 yet.
2389 2 : assert_eq!(
2390 2 : tline
2391 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2392 2 : .await?,
2393 2 : false
2394 2 : );
2395 2 : assert!(tline
2396 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
2397 2 : .await
2398 2 : .is_err());
2399 2 :
2400 2 : assert_eq!(
2401 2 : tline
2402 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2403 2 : .await?,
2404 2 : true
2405 2 : );
2406 2 : assert_eq!(
2407 2 : tline
2408 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
2409 2 : .await?,
2410 2 : relsize
2411 2 : );
2412 2 :
2413 2 : // Check relation content
2414 5120 : for blkno in 0..relsize {
2415 5120 : let lsn = Lsn(0x20);
2416 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2417 5120 : assert_eq!(
2418 5120 : tline
2419 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
2420 1803 : .await?,
2421 5120 : test_img(&data)
2422 2 : );
2423 2 : }
2424 2 :
2425 2 : // Truncate relation so that second segment was dropped
2426 2 : // - only leave one page
2427 2 : let mut m = tline.begin_modification(Lsn(0x60));
2428 2 : walingest
2429 2 : .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
2430 2 : .await?;
2431 2 : m.commit(&ctx).await?;
2432 2 :
2433 2 : // Check reported size and contents after truncation
2434 2 : assert_eq!(
2435 2 : tline
2436 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
2437 2 : .await?,
2438 2 : 1
2439 2 : );
2440 2 :
2441 4 : for blkno in 0..1 {
2442 2 : let lsn = Lsn(0x20);
2443 2 : let data = format!("foo blk {} at {}", blkno, lsn);
2444 2 : assert_eq!(
2445 2 : tline
2446 2 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
2447 2 : .await?,
2448 2 : test_img(&data)
2449 2 : );
2450 2 : }
2451 2 :
2452 2 : // should still see all blocks with older LSN
2453 2 : assert_eq!(
2454 2 : tline
2455 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
2456 2 : .await?,
2457 2 : relsize
2458 2 : );
2459 5120 : for blkno in 0..relsize {
2460 5120 : let lsn = Lsn(0x20);
2461 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2462 5120 : assert_eq!(
2463 5120 : tline
2464 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
2465 1856 : .await?,
2466 5120 : test_img(&data)
2467 2 : );
2468 2 : }
2469 2 :
2470 2 : // Extend relation again.
2471 2 : // Add enough blocks to create second segment
2472 2 : let lsn = Lsn(0x80);
2473 2 : let mut m = tline.begin_modification(lsn);
2474 5120 : for blkno in 0..relsize {
2475 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2476 5120 : walingest
2477 5120 : .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
2478 2 : .await?;
2479 2 : }
2480 3 : m.commit(&ctx).await?;
2481 2 :
2482 2 : assert_eq!(
2483 2 : tline
2484 2 : .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2485 2 : .await?,
2486 2 : true
2487 2 : );
2488 2 : assert_eq!(
2489 2 : tline
2490 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
2491 2 : .await?,
2492 2 : relsize
2493 2 : );
2494 2 : // Check relation content
2495 5120 : for blkno in 0..relsize {
2496 5120 : let lsn = Lsn(0x80);
2497 5120 : let data = format!("foo blk {} at {}", blkno, lsn);
2498 5120 : assert_eq!(
2499 5120 : tline
2500 5120 : .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
2501 1828 : .await?,
2502 5120 : test_img(&data)
2503 2 : );
2504 2 : }
2505 2 :
2506 2 : Ok(())
2507 2 : }
2508 :
2509 : /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
2510 : /// split into multiple 1 GB segments in Postgres.
2511 : #[tokio::test]
2512 2 : async fn test_large_rel() -> Result<()> {
2513 8 : let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
2514 2 : let tline = tenant
2515 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2516 4 : .await?;
2517 5 : let mut walingest = init_walingest_test(&tline, &ctx).await?;
2518 2 :
2519 2 : let mut lsn = 0x10;
2520 262146 : for blknum in 0..RELSEG_SIZE + 1 {
2521 262146 : lsn += 0x10;
2522 262146 : let mut m = tline.begin_modification(Lsn(lsn));
2523 262146 : let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
2524 262146 : walingest
2525 262146 : .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
2526 5420 : .await?;
2527 262146 : m.commit(&ctx).await?;
2528 2 : }
2529 2 :
2530 2 : assert_current_logical_size(&tline, Lsn(lsn));
2531 2 :
2532 2 : assert_eq!(
2533 2 : tline
2534 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2535 2 : .await?,
2536 2 : RELSEG_SIZE + 1
2537 2 : );
2538 2 :
2539 2 : // Truncate one block
2540 2 : lsn += 0x10;
2541 2 : let mut m = tline.begin_modification(Lsn(lsn));
2542 2 : walingest
2543 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
2544 2 : .await?;
2545 2 : m.commit(&ctx).await?;
2546 2 : assert_eq!(
2547 2 : tline
2548 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2549 2 : .await?,
2550 2 : RELSEG_SIZE
2551 2 : );
2552 2 : assert_current_logical_size(&tline, Lsn(lsn));
2553 2 :
2554 2 : // Truncate another block
2555 2 : lsn += 0x10;
2556 2 : let mut m = tline.begin_modification(Lsn(lsn));
2557 2 : walingest
2558 2 : .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
2559 2 : .await?;
2560 2 : m.commit(&ctx).await?;
2561 2 : assert_eq!(
2562 2 : tline
2563 2 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2564 2 : .await?,
2565 2 : RELSEG_SIZE - 1
2566 2 : );
2567 2 : assert_current_logical_size(&tline, Lsn(lsn));
2568 2 :
2569 2 : // Truncate to 1500, and then truncate all the way down to 0, one block at a time
2570 2 : // This tests the behavior at segment boundaries
2571 2 : let mut size: i32 = 3000;
2572 6004 : while size >= 0 {
2573 6002 : lsn += 0x10;
2574 6002 : let mut m = tline.begin_modification(Lsn(lsn));
2575 6002 : walingest
2576 6002 : .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
2577 2 : .await?;
2578 6002 : m.commit(&ctx).await?;
2579 6002 : assert_eq!(
2580 6002 : tline
2581 6002 : .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
2582 2 : .await?,
2583 6002 : size as BlockNumber
2584 2 : );
2585 2 :
2586 6002 : size -= 1;
2587 2 : }
2588 2 : assert_current_logical_size(&tline, Lsn(lsn));
2589 2 :
2590 2 : Ok(())
2591 2 : }
2592 :
2593 : /// Replay a wal segment file taken directly from safekeepers.
2594 : ///
2595 : /// This test is useful for benchmarking since it allows us to profile only
2596 : /// the walingest code in a single-threaded executor, and iterate more quickly
2597 : /// without waiting for unrelated steps.
2598 : #[tokio::test]
2599 2 : async fn test_ingest_real_wal() {
2600 2 : use crate::tenant::harness::*;
2601 2 : use postgres_ffi::waldecoder::WalStreamDecoder;
2602 2 : use postgres_ffi::WAL_SEGMENT_SIZE;
2603 2 :
2604 2 : // Define test data path and constants.
2605 2 : //
2606 2 : // Steps to reconstruct the data, if needed:
2607 2 : // 1. Run the pgbench python test
2608 2 : // 2. Take the first wal segment file from safekeeper
2609 2 : // 3. Compress it using `zstd --long input_file`
2610 2 : // 4. Copy initdb.tar.zst from local_fs_remote_storage
2611 2 : // 5. Grep sk logs for "restart decoder" to get startpoint
2612 2 : // 6. Run just the decoder from this test to get the endpoint.
2613 2 : // It's the last LSN the decoder will output.
2614 2 : let pg_version = 15; // The test data was generated by pg15
2615 2 : let path = "test_data/sk_wal_segment_from_pgbench";
2616 2 : let wal_segment_path = format!("{path}/000000010000000000000001.zst");
2617 2 : let source_initdb_path = format!("{path}/{INITDB_PATH}");
2618 2 : let startpoint = Lsn::from_hex("14AEC08").unwrap();
2619 2 : let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
2620 2 :
2621 2 : let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
2622 2 : let span = harness
2623 2 : .span()
2624 2 : .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
2625 8 : let (tenant, ctx) = harness.load().await;
2626 2 :
2627 2 : let remote_initdb_path =
2628 2 : remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
2629 2 : let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
2630 2 :
2631 2 : std::fs::create_dir_all(initdb_path.parent().unwrap())
2632 2 : .expect("creating test dir should work");
2633 2 : std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
2634 2 :
2635 2 : // Bootstrap a real timeline. We can't use create_test_timeline because
2636 2 : // it doesn't create a real checkpoint, and Walingest::new tries to parse
2637 2 : // the garbage data.
2638 2 : let tline = tenant
2639 2 : .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
2640 19957 : .await
2641 2 : .unwrap();
2642 2 :
2643 2 : // We fully read and decompress this into memory before decoding
2644 2 : // to get a more accurate perf profile of the decoder.
2645 2 : let bytes = {
2646 2 : use async_compression::tokio::bufread::ZstdDecoder;
2647 2 : let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
2648 2 : let reader = tokio::io::BufReader::new(file);
2649 2 : let decoder = ZstdDecoder::new(reader);
2650 2 : let mut reader = tokio::io::BufReader::new(decoder);
2651 2 : let mut buffer = Vec::new();
2652 192 : tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
2653 2 : buffer
2654 2 : };
2655 2 :
2656 2 : // TODO start a profiler too
2657 2 : let started_at = std::time::Instant::now();
2658 2 :
2659 2 : // Initialize walingest
2660 2 : let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
2661 2 : let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
2662 2 : let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
2663 5 : .await
2664 2 : .unwrap();
2665 2 : let mut modification = tline.begin_modification(startpoint);
2666 2 : println!("decoding {} bytes", bytes.len() - xlogoff);
2667 2 :
2668 2 : // Decode and ingest wal. We process the wal in chunks because
2669 2 : // that's what happens when we get bytes from safekeepers.
2670 474686 : for chunk in bytes[xlogoff..].chunks(50) {
2671 474686 : decoder.feed_bytes(chunk);
2672 620536 : while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
2673 145850 : let mut decoded = DecodedWALRecord::default();
2674 145850 : decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
2675 145850 : walingest
2676 145850 : .ingest_record(decoded, lsn, &mut modification, &ctx)
2677 145850 : .instrument(span.clone())
2678 296 : .await
2679 145850 : .unwrap();
2680 2 : }
2681 474686 : modification.commit(&ctx).await.unwrap();
2682 2 : }
2683 2 :
2684 2 : let duration = started_at.elapsed();
2685 2 : println!("done in {:?}", duration);
2686 2 : }
2687 : }
|