LCOV - code coverage report
Current view: top level - pageserver/src/walredo - apply_neon.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 89.1 % 184 164
Test Date: 2024-02-14 18:05:35 Functions: 80.0 % 5 4

            Line data    Source code
       1              : use crate::pgdatadir_mapping::AuxFilesDirectory;
       2              : use crate::walrecord::NeonWalRecord;
       3              : use anyhow::Context;
       4              : use byteorder::{ByteOrder, LittleEndian};
       5              : use bytes::{BufMut, BytesMut};
       6              : use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
       7              : use pageserver_api::reltag::SlruKind;
       8              : use postgres_ffi::pg_constants;
       9              : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
      10              : use postgres_ffi::v14::nonrelfile_utils::{
      11              :     mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
      12              :     transaction_id_set_status,
      13              : };
      14              : use postgres_ffi::BLCKSZ;
      15              : use tracing::*;
      16              : use utils::bin_ser::BeSer;
      17              : 
      18              : /// Can this request be served by neon redo functions
      19              : /// or we need to pass it to wal-redo postgres process?
      20     81815748 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
      21     81815748 :     // Currently, we don't have bespoken Rust code to replay any
      22     81815748 :     // Postgres WAL records. But everything else is handled in neon.
      23     81815748 :     #[allow(clippy::match_like_matches_macro)]
      24     81815748 :     match rec {
      25              :         NeonWalRecord::Postgres {
      26              :             will_init: _,
      27              :             rec: _,
      28     62092277 :         } => false,
      29     19723471 :         _ => true,
      30              :     }
      31     81815748 : }
      32              : 
      33     19723477 : pub(crate) fn apply_in_neon(
      34     19723477 :     record: &NeonWalRecord,
      35     19723477 :     key: Key,
      36     19723477 :     page: &mut BytesMut,
      37     19723477 : ) -> Result<(), anyhow::Error> {
      38     19723477 :     match record {
      39              :         NeonWalRecord::Postgres {
      40              :             will_init: _,
      41              :             rec: _,
      42              :         } => {
      43            0 :             anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
      44              :         }
      45              :         NeonWalRecord::ClearVisibilityMapFlags {
      46          984 :             new_heap_blkno,
      47          984 :             old_heap_blkno,
      48          984 :             flags,
      49              :         } => {
      50              :             // sanity check that this is modifying the correct relation
      51          984 :             let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
      52          984 :             assert!(
      53          984 :                 rel.forknum == VISIBILITYMAP_FORKNUM,
      54            0 :                 "ClearVisibilityMapFlags record on unexpected rel {}",
      55              :                 rel
      56              :             );
      57          984 :             if let Some(heap_blkno) = *new_heap_blkno {
      58              :                 // Calculate the VM block and offset that corresponds to the heap block.
      59          413 :                 let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
      60          413 :                 let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
      61          413 :                 let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
      62          413 : 
      63          413 :                 // Check that we're modifying the correct VM block.
      64          413 :                 assert!(map_block == blknum);
      65              : 
      66              :                 // equivalent to PageGetContents(page)
      67          413 :                 let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
      68          413 : 
      69          413 :                 map[map_byte as usize] &= !(flags << map_offset);
      70          571 :             }
      71              : 
      72              :             // Repeat for 'old_heap_blkno', if any
      73          984 :             if let Some(heap_blkno) = *old_heap_blkno {
      74          574 :                 let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
      75          574 :                 let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
      76          574 :                 let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
      77          574 : 
      78          574 :                 assert!(map_block == blknum);
      79              : 
      80          574 :                 let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
      81          574 : 
      82          574 :                 map[map_byte as usize] &= !(flags << map_offset);
      83          410 :             }
      84              :         }
      85              :         // Non-relational WAL records are handled here, with custom code that has the
      86              :         // same effects as the corresponding Postgres WAL redo function.
      87     19622669 :         NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
      88     19622669 :             let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
      89     19622669 :             assert_eq!(
      90              :                 slru_kind,
      91              :                 SlruKind::Clog,
      92            0 :                 "ClogSetCommitted record with unexpected key {}",
      93              :                 key
      94              :             );
      95     39355366 :             for &xid in xids {
      96     19732697 :                 let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
      97     19732697 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
      98     19732697 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
      99     19732697 : 
     100     19732697 :                 // Check that we're modifying the correct CLOG block.
     101     19732697 :                 assert!(
     102     19732697 :                     segno == expected_segno,
     103            0 :                     "ClogSetCommitted record for XID {} with unexpected key {}",
     104              :                     xid,
     105              :                     key
     106              :                 );
     107     19732697 :                 assert!(
     108     19732697 :                     blknum == expected_blknum,
     109            0 :                     "ClogSetCommitted record for XID {} with unexpected key {}",
     110              :                     xid,
     111              :                     key
     112              :                 );
     113              : 
     114     19732697 :                 transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
     115              :             }
     116              : 
     117              :             // Append the timestamp
     118     19622669 :             if page.len() == BLCKSZ as usize + 8 {
     119     19619194 :                 page.truncate(BLCKSZ as usize);
     120     19619194 :             }
     121     19622669 :             if page.len() == BLCKSZ as usize {
     122     19622669 :                 page.extend_from_slice(&timestamp.to_be_bytes());
     123     19622669 :             } else {
     124            0 :                 warn!(
     125            0 :                     "CLOG blk {} in seg {} has invalid size {}",
     126            0 :                     blknum,
     127            0 :                     segno,
     128            0 :                     page.len()
     129            0 :                 );
     130              :             }
     131              :         }
     132         2577 :         NeonWalRecord::ClogSetAborted { xids } => {
     133         2577 :             let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
     134         2577 :             assert_eq!(
     135              :                 slru_kind,
     136              :                 SlruKind::Clog,
     137            0 :                 "ClogSetAborted record with unexpected key {}",
     138              :                 key
     139              :             );
     140         5168 :             for &xid in xids {
     141         2591 :                 let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     142         2591 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     143         2591 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     144         2591 : 
     145         2591 :                 // Check that we're modifying the correct CLOG block.
     146         2591 :                 assert!(
     147         2591 :                     segno == expected_segno,
     148            0 :                     "ClogSetAborted record for XID {} with unexpected key {}",
     149              :                     xid,
     150              :                     key
     151              :                 );
     152         2591 :                 assert!(
     153         2591 :                     blknum == expected_blknum,
     154            0 :                     "ClogSetAborted record for XID {} with unexpected key {}",
     155              :                     xid,
     156              :                     key
     157              :                 );
     158              : 
     159         2591 :                 transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
     160              :             }
     161              :         }
     162        47665 :         NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
     163        47665 :             let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
     164        47665 :             assert_eq!(
     165              :                 slru_kind,
     166              :                 SlruKind::MultiXactOffsets,
     167            0 :                 "MultixactOffsetCreate record with unexpected key {}",
     168              :                 key
     169              :             );
     170              :             // Compute the block and offset to modify.
     171              :             // See RecordNewMultiXact in PostgreSQL sources.
     172        47665 :             let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     173        47665 :             let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     174        47665 :             let offset = (entryno * 4) as usize;
     175        47665 : 
     176        47665 :             // Check that we're modifying the correct multixact-offsets block.
     177        47665 :             let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     178        47665 :             let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     179        47665 :             assert!(
     180        47665 :                 segno == expected_segno,
     181            0 :                 "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     182              :                 mid,
     183              :                 key
     184              :             );
     185        47665 :             assert!(
     186        47665 :                 blknum == expected_blknum,
     187            0 :                 "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     188              :                 mid,
     189              :                 key
     190              :             );
     191              : 
     192        47665 :             LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
     193              :         }
     194        48213 :         NeonWalRecord::MultixactMembersCreate { moff, members } => {
     195        48213 :             let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
     196        48213 :             assert_eq!(
     197              :                 slru_kind,
     198              :                 SlruKind::MultiXactMembers,
     199            0 :                 "MultixactMembersCreate record with unexpected key {}",
     200              :                 key
     201              :             );
     202       945044 :             for (i, member) in members.iter().enumerate() {
     203       945044 :                 let offset = moff + i as u32;
     204       945044 : 
     205       945044 :                 // Compute the block and offset to modify.
     206       945044 :                 // See RecordNewMultiXact in PostgreSQL sources.
     207       945044 :                 let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     208       945044 :                 let memberoff = mx_offset_to_member_offset(offset);
     209       945044 :                 let flagsoff = mx_offset_to_flags_offset(offset);
     210       945044 :                 let bshift = mx_offset_to_flags_bitshift(offset);
     211       945044 : 
     212       945044 :                 // Check that we're modifying the correct multixact-members block.
     213       945044 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     214       945044 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     215       945044 :                 assert!(
     216       945044 :                     segno == expected_segno,
     217            0 :                     "MultiXactMembersCreate record for offset {} with unexpected key {}",
     218              :                     moff,
     219              :                     key
     220              :                 );
     221       945044 :                 assert!(
     222       945044 :                     blknum == expected_blknum,
     223            0 :                     "MultiXactMembersCreate record for offset {} with unexpected key {}",
     224              :                     moff,
     225              :                     key
     226              :                 );
     227              : 
     228       945044 :                 let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
     229       945044 :                 flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
     230       945044 :                 flagsval |= member.status << bshift;
     231       945044 :                 LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
     232       945044 :                 LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
     233              :             }
     234              :         }
     235         1369 :         NeonWalRecord::AuxFile { file_path, content } => {
     236         1369 :             let mut dir = AuxFilesDirectory::des(page)?;
     237         1369 :             dir.upsert(file_path.clone(), content.clone());
     238         1369 : 
     239         1369 :             page.clear();
     240         1369 :             let mut writer = page.writer();
     241         1369 :             dir.ser_into(&mut writer)?;
     242              :         }
     243              :     }
     244     19723477 :     Ok(())
     245     19723477 : }
     246              : 
     247              : #[cfg(test)]
     248              : mod test {
     249              :     use bytes::Bytes;
     250              :     use pageserver_api::key::AUX_FILES_KEY;
     251              : 
     252              :     use super::*;
     253              :     use std::collections::HashMap;
     254              : 
     255              :     use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
     256              : 
     257              :     /// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
     258            2 :     #[test]
     259            2 :     fn apply_aux_file_deltas() -> anyhow::Result<()> {
     260            2 :         let base_dir = AuxFilesDirectory {
     261            2 :             files: HashMap::from([
     262            2 :                 ("two".to_string(), Bytes::from_static(b"content0")),
     263            2 :                 ("three".to_string(), Bytes::from_static(b"contentX")),
     264            2 :             ]),
     265            2 :         };
     266            2 :         let base_image = AuxFilesDirectory::ser(&base_dir)?;
     267              : 
     268            2 :         let deltas = vec![
     269            2 :             // Insert
     270            2 :             NeonWalRecord::AuxFile {
     271            2 :                 file_path: "one".to_string(),
     272            2 :                 content: Some(Bytes::from_static(b"content1")),
     273            2 :             },
     274            2 :             // Update
     275            2 :             NeonWalRecord::AuxFile {
     276            2 :                 file_path: "two".to_string(),
     277            2 :                 content: Some(Bytes::from_static(b"content99")),
     278            2 :             },
     279            2 :             // Delete
     280            2 :             NeonWalRecord::AuxFile {
     281            2 :                 file_path: "three".to_string(),
     282            2 :                 content: None,
     283            2 :             },
     284            2 :         ];
     285            2 : 
     286            2 :         let file_path = AUX_FILES_KEY;
     287            2 :         let mut page = BytesMut::from_iter(base_image);
     288              : 
     289            8 :         for record in deltas {
     290            6 :             apply_in_neon(&record, file_path, &mut page)?;
     291              :         }
     292              : 
     293            2 :         let reconstructed = AuxFilesDirectory::des(&page)?;
     294            2 :         let expect = HashMap::from([
     295            2 :             ("one".to_string(), Bytes::from_static(b"content1")),
     296            2 :             ("two".to_string(), Bytes::from_static(b"content99")),
     297            2 :         ]);
     298            2 : 
     299            2 :         assert_eq!(reconstructed.files, expect);
     300              : 
     301            2 :         Ok(())
     302            2 :     }
     303              : }
        

Generated by: LCOV version 2.1-beta