LCOV - code coverage report
Current view: top level - pageserver/src/walredo - apply_neon.rs (source / functions) Coverage Total Hit
Test: 53437f7e869ac68c86c7d3e4c20964c0156f158c.info Lines: 37.8 % 196 74
Test Date: 2024-09-20 16:14:12 Functions: 100.0 % 3 3

            Line data    Source code
       1              : use crate::pgdatadir_mapping::AuxFilesDirectory;
       2              : use crate::walrecord::NeonWalRecord;
       3              : use anyhow::Context;
       4              : use byteorder::{ByteOrder, LittleEndian};
       5              : use bytes::{BufMut, BytesMut};
       6              : use pageserver_api::key::Key;
       7              : use pageserver_api::reltag::SlruKind;
       8              : use postgres_ffi::pg_constants;
       9              : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
      10              : use postgres_ffi::v14::nonrelfile_utils::{
      11              :     mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
      12              :     transaction_id_set_status,
      13              : };
      14              : use postgres_ffi::BLCKSZ;
      15              : use tracing::*;
      16              : use utils::bin_ser::BeSer;
      17              : use utils::lsn::Lsn;
      18              : 
      19              : /// Can this request be served by neon redo functions
      20              : /// or we need to pass it to wal-redo postgres process?
      21         1422 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
      22         1422 :     // Currently, we don't have bespoken Rust code to replay any
      23         1422 :     // Postgres WAL records. But everything else is handled in neon.
      24         1422 :     #[allow(clippy::match_like_matches_macro)]
      25         1422 :     match rec {
      26              :         NeonWalRecord::Postgres {
      27              :             will_init: _,
      28              :             rec: _,
      29           36 :         } => false,
      30         1386 :         _ => true,
      31              :     }
      32         1422 : }
      33              : 
      34         1404 : pub(crate) fn apply_in_neon(
      35         1404 :     record: &NeonWalRecord,
      36         1404 :     lsn: Lsn,
      37         1404 :     key: Key,
      38         1404 :     page: &mut BytesMut,
      39         1404 : ) -> Result<(), anyhow::Error> {
      40         1404 :     match record {
      41              :         NeonWalRecord::Postgres {
      42              :             will_init: _,
      43              :             rec: _,
      44              :         } => {
      45            0 :             anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
      46              :         }
      47              :         NeonWalRecord::ClearVisibilityMapFlags {
      48            0 :             new_heap_blkno,
      49            0 :             old_heap_blkno,
      50            0 :             flags,
      51              :         } => {
      52              :             // sanity check that this is modifying the correct relation
      53            0 :             let (rel, blknum) = key.to_rel_block().context("invalid record")?;
      54            0 :             assert!(
      55            0 :                 rel.forknum == VISIBILITYMAP_FORKNUM,
      56            0 :                 "ClearVisibilityMapFlags record on unexpected rel {}",
      57              :                 rel
      58              :             );
      59            0 :             if let Some(heap_blkno) = *new_heap_blkno {
      60              :                 // Calculate the VM block and offset that corresponds to the heap block.
      61            0 :                 let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
      62            0 :                 let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
      63            0 :                 let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
      64            0 : 
      65            0 :                 // Check that we're modifying the correct VM block.
      66            0 :                 assert!(map_block == blknum);
      67              : 
      68              :                 // equivalent to PageGetContents(page)
      69            0 :                 let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
      70            0 : 
      71            0 :                 map[map_byte as usize] &= !(flags << map_offset);
      72            0 :                 postgres_ffi::page_set_lsn(page, lsn);
      73            0 :             }
      74              : 
      75              :             // Repeat for 'old_heap_blkno', if any
      76            0 :             if let Some(heap_blkno) = *old_heap_blkno {
      77            0 :                 let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
      78            0 :                 let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
      79            0 :                 let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
      80            0 : 
      81            0 :                 assert!(map_block == blknum);
      82              : 
      83            0 :                 let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
      84            0 : 
      85            0 :                 map[map_byte as usize] &= !(flags << map_offset);
      86            0 :                 postgres_ffi::page_set_lsn(page, lsn);
      87            0 :             }
      88              :         }
      89              :         // Non-relational WAL records are handled here, with custom code that has the
      90              :         // same effects as the corresponding Postgres WAL redo function.
      91            0 :         NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
      92            0 :             let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
      93            0 :             assert_eq!(
      94              :                 slru_kind,
      95              :                 SlruKind::Clog,
      96            0 :                 "ClogSetCommitted record with unexpected key {}",
      97              :                 key
      98              :             );
      99            0 :             for &xid in xids {
     100            0 :                 let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     101            0 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     102            0 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     103            0 : 
     104            0 :                 // Check that we're modifying the correct CLOG block.
     105            0 :                 assert!(
     106            0 :                     segno == expected_segno,
     107            0 :                     "ClogSetCommitted record for XID {} with unexpected key {}",
     108              :                     xid,
     109              :                     key
     110              :                 );
     111            0 :                 assert!(
     112            0 :                     blknum == expected_blknum,
     113            0 :                     "ClogSetCommitted record for XID {} with unexpected key {}",
     114              :                     xid,
     115              :                     key
     116              :                 );
     117              : 
     118            0 :                 transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
     119              :             }
     120              : 
     121              :             // Append the timestamp
     122            0 :             if page.len() == BLCKSZ as usize + 8 {
     123            0 :                 page.truncate(BLCKSZ as usize);
     124            0 :             }
     125            0 :             if page.len() == BLCKSZ as usize {
     126            0 :                 page.extend_from_slice(&timestamp.to_be_bytes());
     127            0 :             } else {
     128            0 :                 warn!(
     129            0 :                     "CLOG blk {} in seg {} has invalid size {}",
     130            0 :                     blknum,
     131            0 :                     segno,
     132            0 :                     page.len()
     133              :                 );
     134              :             }
     135              :         }
     136            0 :         NeonWalRecord::ClogSetAborted { xids } => {
     137            0 :             let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
     138            0 :             assert_eq!(
     139              :                 slru_kind,
     140              :                 SlruKind::Clog,
     141            0 :                 "ClogSetAborted record with unexpected key {}",
     142              :                 key
     143              :             );
     144            0 :             for &xid in xids {
     145            0 :                 let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     146            0 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     147            0 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     148            0 : 
     149            0 :                 // Check that we're modifying the correct CLOG block.
     150            0 :                 assert!(
     151            0 :                     segno == expected_segno,
     152            0 :                     "ClogSetAborted record for XID {} with unexpected key {}",
     153              :                     xid,
     154              :                     key
     155              :                 );
     156            0 :                 assert!(
     157            0 :                     blknum == expected_blknum,
     158            0 :                     "ClogSetAborted record for XID {} with unexpected key {}",
     159              :                     xid,
     160              :                     key
     161              :                 );
     162              : 
     163            0 :                 transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
     164              :             }
     165              :         }
     166            0 :         NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
     167            0 :             let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
     168            0 :             assert_eq!(
     169              :                 slru_kind,
     170              :                 SlruKind::MultiXactOffsets,
     171            0 :                 "MultixactOffsetCreate record with unexpected key {}",
     172              :                 key
     173              :             );
     174              :             // Compute the block and offset to modify.
     175              :             // See RecordNewMultiXact in PostgreSQL sources.
     176            0 :             let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     177            0 :             let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     178            0 :             let offset = (entryno * 4) as usize;
     179            0 : 
     180            0 :             // Check that we're modifying the correct multixact-offsets block.
     181            0 :             let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     182            0 :             let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     183            0 :             assert!(
     184            0 :                 segno == expected_segno,
     185            0 :                 "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     186              :                 mid,
     187              :                 key
     188              :             );
     189            0 :             assert!(
     190            0 :                 blknum == expected_blknum,
     191            0 :                 "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     192              :                 mid,
     193              :                 key
     194              :             );
     195              : 
     196            0 :             LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
     197              :         }
     198            0 :         NeonWalRecord::MultixactMembersCreate { moff, members } => {
     199            0 :             let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
     200            0 :             assert_eq!(
     201              :                 slru_kind,
     202              :                 SlruKind::MultiXactMembers,
     203            0 :                 "MultixactMembersCreate record with unexpected key {}",
     204              :                 key
     205              :             );
     206            0 :             for (i, member) in members.iter().enumerate() {
     207            0 :                 let offset = moff + i as u32;
     208            0 : 
     209            0 :                 // Compute the block and offset to modify.
     210            0 :                 // See RecordNewMultiXact in PostgreSQL sources.
     211            0 :                 let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     212            0 :                 let memberoff = mx_offset_to_member_offset(offset);
     213            0 :                 let flagsoff = mx_offset_to_flags_offset(offset);
     214            0 :                 let bshift = mx_offset_to_flags_bitshift(offset);
     215            0 : 
     216            0 :                 // Check that we're modifying the correct multixact-members block.
     217            0 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     218            0 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     219            0 :                 assert!(
     220            0 :                     segno == expected_segno,
     221            0 :                     "MultiXactMembersCreate record for offset {} with unexpected key {}",
     222              :                     moff,
     223              :                     key
     224              :                 );
     225            0 :                 assert!(
     226            0 :                     blknum == expected_blknum,
     227            0 :                     "MultiXactMembersCreate record for offset {} with unexpected key {}",
     228              :                     moff,
     229              :                     key
     230              :                 );
     231              : 
     232            0 :                 let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
     233            0 :                 flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
     234            0 :                 flagsval |= member.status << bshift;
     235            0 :                 LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
     236            0 :                 LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
     237              :             }
     238              :         }
     239           30 :         NeonWalRecord::AuxFile { file_path, content } => {
     240           30 :             let mut dir = AuxFilesDirectory::des(page)?;
     241           30 :             dir.upsert(file_path.clone(), content.clone());
     242           30 : 
     243           30 :             page.clear();
     244           30 :             let mut writer = page.writer();
     245           30 :             dir.ser_into(&mut writer)?;
     246              :         }
     247              :         #[cfg(test)]
     248              :         NeonWalRecord::Test {
     249         1374 :             append,
     250         1374 :             clear,
     251         1374 :             will_init,
     252         1374 :         } => {
     253         1374 :             if *will_init {
     254           12 :                 assert!(*clear, "init record must be clear to ensure correctness");
     255         1362 :             }
     256         1374 :             if *clear {
     257           12 :                 page.clear();
     258         1362 :             }
     259         1374 :             page.put_slice(append.as_bytes());
     260              :         }
     261              :     }
     262         1404 :     Ok(())
     263         1404 : }
     264              : 
     265              : #[cfg(test)]
     266              : mod test {
     267              :     use bytes::Bytes;
     268              :     use pageserver_api::key::AUX_FILES_KEY;
     269              : 
     270              :     use super::*;
     271              :     use std::collections::HashMap;
     272              : 
     273              :     /// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
     274              :     #[test]
     275            6 :     fn apply_aux_file_deltas() -> anyhow::Result<()> {
     276            6 :         let base_dir = AuxFilesDirectory {
     277            6 :             files: HashMap::from([
     278            6 :                 ("two".to_string(), Bytes::from_static(b"content0")),
     279            6 :                 ("three".to_string(), Bytes::from_static(b"contentX")),
     280            6 :             ]),
     281            6 :         };
     282            6 :         let base_image = AuxFilesDirectory::ser(&base_dir)?;
     283              : 
     284            6 :         let deltas = vec![
     285            6 :             // Insert
     286            6 :             NeonWalRecord::AuxFile {
     287            6 :                 file_path: "one".to_string(),
     288            6 :                 content: Some(Bytes::from_static(b"content1")),
     289            6 :             },
     290            6 :             // Update
     291            6 :             NeonWalRecord::AuxFile {
     292            6 :                 file_path: "two".to_string(),
     293            6 :                 content: Some(Bytes::from_static(b"content99")),
     294            6 :             },
     295            6 :             // Delete
     296            6 :             NeonWalRecord::AuxFile {
     297            6 :                 file_path: "three".to_string(),
     298            6 :                 content: None,
     299            6 :             },
     300            6 :         ];
     301            6 : 
     302            6 :         let file_path = AUX_FILES_KEY;
     303            6 :         let mut page = BytesMut::from_iter(base_image);
     304              : 
     305           24 :         for record in deltas {
     306           18 :             apply_in_neon(&record, Lsn(8), file_path, &mut page)?;
     307              :         }
     308              : 
     309            6 :         let reconstructed = AuxFilesDirectory::des(&page)?;
     310            6 :         let expect = HashMap::from([
     311            6 :             ("one".to_string(), Bytes::from_static(b"content1")),
     312            6 :             ("two".to_string(), Bytes::from_static(b"content99")),
     313            6 :         ]);
     314            6 : 
     315            6 :         assert_eq!(reconstructed.files, expect);
     316              : 
     317            6 :         Ok(())
     318            6 :     }
     319              : }
        

Generated by: LCOV version 2.1-beta