|             Line data    Source code 
       1              : use anyhow::Context;
       2              : use byteorder::{ByteOrder, LittleEndian};
       3              : use bytes::BytesMut;
       4              : use pageserver_api::key::Key;
       5              : use pageserver_api::reltag::SlruKind;
       6              : use postgres_ffi::v14::nonrelfile_utils::{
       7              :     mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
       8              :     transaction_id_set_status,
       9              : };
      10              : use postgres_ffi::{BLCKSZ, pg_constants};
      11              : use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
      12              : use tracing::*;
      13              : use utils::lsn::Lsn;
      14              : use wal_decoder::models::record::NeonWalRecord;
      15              : 
      16              : /// Can this request be served by neon redo functions
      17              : /// or we need to pass it to wal-redo postgres process?
      18      1401850 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
      19              :     // Currently, we don't have bespoken Rust code to replay any
      20              :     // Postgres WAL records. But everything else is handled in neon.
      21              :     #[allow(clippy::match_like_matches_macro)]
      22      1401850 :     match rec {
      23              :         NeonWalRecord::Postgres {
      24              :             will_init: _,
      25              :             rec: _,
      26            6 :         } => false,
      27      1401844 :         _ => true,
      28              :     }
      29      1401850 : }
      30              : 
      31      1401844 : pub(crate) fn apply_in_neon(
      32      1401844 :     record: &NeonWalRecord,
      33      1401844 :     lsn: Lsn,
      34      1401844 :     key: Key,
      35      1401844 :     page: &mut BytesMut,
      36      1401844 : ) -> Result<(), anyhow::Error> {
      37      1401844 :     match record {
      38              :         NeonWalRecord::Postgres {
      39              :             will_init: _,
      40              :             rec: _,
      41              :         } => {
      42            0 :             anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
      43              :         }
      44              :         //
      45              :         // Code copied from PostgreSQL `visibilitymap_prepare_truncate` function in `visibilitymap.c`
      46              :         //
      47              :         NeonWalRecord::TruncateVisibilityMap {
      48            0 :             trunc_byte,
      49            0 :             trunc_offs,
      50              :         } => {
      51              :             // sanity check that this is modifying the correct relation
      52            0 :             let (rel, _) = key.to_rel_block().context("invalid record")?;
      53            0 :             assert!(
      54            0 :                 rel.forknum == VISIBILITYMAP_FORKNUM,
      55            0 :                 "TruncateVisibilityMap record on unexpected rel {rel}"
      56              :             );
      57            0 :             let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
      58            0 :             map[*trunc_byte + 1..].fill(0u8);
      59              :             /*----
      60              :              * Mask out the unwanted bits of the last remaining byte.
      61              :              *
      62              :              * ((1 << 0) - 1) = 00000000
      63              :              * ((1 << 1) - 1) = 00000001
      64              :              * ...
      65              :              * ((1 << 6) - 1) = 00111111
      66              :              * ((1 << 7) - 1) = 01111111
      67              :              *----
      68              :              */
      69            0 :             map[*trunc_byte] &= (1 << *trunc_offs) - 1;
      70              :         }
      71              :         NeonWalRecord::ClearVisibilityMapFlags {
      72            0 :             new_heap_blkno,
      73            0 :             old_heap_blkno,
      74            0 :             flags,
      75              :         } => {
      76              :             // sanity check that this is modifying the correct relation
      77            0 :             let (rel, blknum) = key.to_rel_block().context("invalid record")?;
      78            0 :             assert!(
      79            0 :                 rel.forknum == VISIBILITYMAP_FORKNUM,
      80            0 :                 "ClearVisibilityMapFlags record on unexpected rel {rel}"
      81              :             );
      82            0 :             if let Some(heap_blkno) = *new_heap_blkno {
      83              :                 // Calculate the VM block and offset that corresponds to the heap block.
      84            0 :                 let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
      85            0 :                 let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
      86            0 :                 let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
      87              : 
      88              :                 // Check that we're modifying the correct VM block.
      89            0 :                 assert!(map_block == blknum);
      90              : 
      91              :                 // equivalent to PageGetContents(page)
      92            0 :                 let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
      93              : 
      94            0 :                 map[map_byte as usize] &= !(flags << map_offset);
      95              :                 // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
      96            0 :                 if !postgres_ffi::page_is_new(page) {
      97            0 :                     postgres_ffi::page_set_lsn(page, lsn);
      98            0 :                 }
      99            0 :             }
     100              : 
     101              :             // Repeat for 'old_heap_blkno', if any
     102            0 :             if let Some(heap_blkno) = *old_heap_blkno {
     103            0 :                 let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
     104            0 :                 let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
     105            0 :                 let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
     106              : 
     107            0 :                 assert!(map_block == blknum);
     108              : 
     109            0 :                 let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
     110              : 
     111            0 :                 map[map_byte as usize] &= !(flags << map_offset);
     112              :                 // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
     113            0 :                 if !postgres_ffi::page_is_new(page) {
     114            0 :                     postgres_ffi::page_set_lsn(page, lsn);
     115            0 :                 }
     116            0 :             }
     117              :         }
     118              :         // Non-relational WAL records are handled here, with custom code that has the
     119              :         // same effects as the corresponding Postgres WAL redo function.
     120            0 :         NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
     121            0 :             let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
     122            0 :             assert_eq!(
     123              :                 slru_kind,
     124              :                 SlruKind::Clog,
     125            0 :                 "ClogSetCommitted record with unexpected key {key}"
     126              :             );
     127            0 :             for &xid in xids {
     128            0 :                 let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     129            0 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     130            0 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     131              : 
     132              :                 // Check that we're modifying the correct CLOG block.
     133            0 :                 assert!(
     134            0 :                     segno == expected_segno,
     135            0 :                     "ClogSetCommitted record for XID {xid} with unexpected key {key}"
     136              :                 );
     137            0 :                 assert!(
     138            0 :                     blknum == expected_blknum,
     139            0 :                     "ClogSetCommitted record for XID {xid} with unexpected key {key}"
     140              :                 );
     141              : 
     142            0 :                 transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
     143              :             }
     144              : 
     145              :             // Append the timestamp
     146            0 :             if page.len() == BLCKSZ as usize + 8 {
     147            0 :                 page.truncate(BLCKSZ as usize);
     148            0 :             }
     149            0 :             if page.len() == BLCKSZ as usize {
     150            0 :                 page.extend_from_slice(×tamp.to_be_bytes());
     151            0 :             } else {
     152            0 :                 warn!(
     153            0 :                     "CLOG blk {} in seg {} has invalid size {}",
     154              :                     blknum,
     155              :                     segno,
     156            0 :                     page.len()
     157              :                 );
     158              :             }
     159              :         }
     160            0 :         NeonWalRecord::ClogSetAborted { xids } => {
     161            0 :             let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
     162            0 :             assert_eq!(
     163              :                 slru_kind,
     164              :                 SlruKind::Clog,
     165            0 :                 "ClogSetAborted record with unexpected key {key}"
     166              :             );
     167            0 :             for &xid in xids {
     168            0 :                 let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     169            0 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     170            0 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     171              : 
     172              :                 // Check that we're modifying the correct CLOG block.
     173            0 :                 assert!(
     174            0 :                     segno == expected_segno,
     175            0 :                     "ClogSetAborted record for XID {xid} with unexpected key {key}"
     176              :                 );
     177            0 :                 assert!(
     178            0 :                     blknum == expected_blknum,
     179            0 :                     "ClogSetAborted record for XID {xid} with unexpected key {key}"
     180              :                 );
     181              : 
     182            0 :                 transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
     183              :             }
     184              :         }
     185            0 :         NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
     186            0 :             let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
     187            0 :             assert_eq!(
     188              :                 slru_kind,
     189              :                 SlruKind::MultiXactOffsets,
     190            0 :                 "MultixactOffsetCreate record with unexpected key {key}"
     191              :             );
     192              :             // Compute the block and offset to modify.
     193              :             // See RecordNewMultiXact in PostgreSQL sources.
     194            0 :             let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     195            0 :             let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     196            0 :             let offset = (entryno * 4) as usize;
     197              : 
     198              :             // Check that we're modifying the correct multixact-offsets block.
     199            0 :             let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     200            0 :             let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     201            0 :             assert!(
     202            0 :                 segno == expected_segno,
     203            0 :                 "MultiXactOffsetsCreate record for multi-xid {mid} with unexpected key {key}"
     204              :             );
     205            0 :             assert!(
     206            0 :                 blknum == expected_blknum,
     207            0 :                 "MultiXactOffsetsCreate record for multi-xid {mid} with unexpected key {key}"
     208              :             );
     209              : 
     210            0 :             LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
     211              :         }
     212            0 :         NeonWalRecord::MultixactMembersCreate { moff, members } => {
     213            0 :             let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
     214            0 :             assert_eq!(
     215              :                 slru_kind,
     216              :                 SlruKind::MultiXactMembers,
     217            0 :                 "MultixactMembersCreate record with unexpected key {key}"
     218              :             );
     219            0 :             for (i, member) in members.iter().enumerate() {
     220            0 :                 let offset = moff + i as u32;
     221              : 
     222              :                 // Compute the block and offset to modify.
     223              :                 // See RecordNewMultiXact in PostgreSQL sources.
     224            0 :                 let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     225            0 :                 let memberoff = mx_offset_to_member_offset(offset);
     226            0 :                 let flagsoff = mx_offset_to_flags_offset(offset);
     227            0 :                 let bshift = mx_offset_to_flags_bitshift(offset);
     228              : 
     229              :                 // Check that we're modifying the correct multixact-members block.
     230            0 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     231            0 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     232            0 :                 assert!(
     233            0 :                     segno == expected_segno,
     234            0 :                     "MultiXactMembersCreate record for offset {moff} with unexpected key {key}"
     235              :                 );
     236            0 :                 assert!(
     237            0 :                     blknum == expected_blknum,
     238            0 :                     "MultiXactMembersCreate record for offset {moff} with unexpected key {key}"
     239              :                 );
     240              : 
     241            0 :                 let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
     242            0 :                 flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
     243            0 :                 flagsval |= member.status << bshift;
     244            0 :                 LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
     245            0 :                 LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
     246              :             }
     247              :         }
     248              :         NeonWalRecord::AuxFile { .. } => {
     249              :             // No-op: this record will never be created in aux v2.
     250            0 :             warn!("AuxFile record should not be created in aux v2");
     251              :         }
     252              :         #[cfg(feature = "testing")]
     253              :         NeonWalRecord::Test {
     254      1401844 :             append,
     255      1401844 :             clear,
     256      1401844 :             will_init,
     257      1401844 :             only_if,
     258              :         } => {
     259              :             use bytes::BufMut;
     260      1401844 :             if *will_init {
     261        13868 :                 assert!(*clear, "init record must be clear to ensure correctness");
     262        13868 :                 assert!(
     263        13868 :                     page.is_empty(),
     264            0 :                     "init record must be the first entry to ensure correctness"
     265              :                 );
     266      1387976 :             }
     267      1401844 :             if *clear {
     268        13869 :                 page.clear();
     269      1387975 :             }
     270      1401844 :             if let Some(only_if) = only_if {
     271            2 :                 if page != only_if.as_bytes() {
     272            1 :                     return Err(anyhow::anyhow!(
     273            1 :                         "the current image does not match the expected image, cannot append"
     274            1 :                     ));
     275            1 :                 }
     276      1401842 :             }
     277      1401843 :             page.put_slice(append.as_bytes());
     278              :         }
     279              :     }
     280      1401843 :     Ok(())
     281      1401844 : }
         |