LCOV - code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - xlog_utils_test.rs (source / functions) Coverage Total Hit
Test: 2a9d99866121f170b43760bd62e1e2431e597707.info Lines: 98.4 % 189 186
Test Date: 2024-09-02 14:10:37 Functions: 100.0 % 63 63

            Line data    Source code
       1              : //! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency.
       2              : 
       3              : use super::*;
       4              : use crate::{error, info};
       5              : use regex::Regex;
       6              : use std::cmp::min;
       7              : use std::fs::{self, File};
       8              : use std::io::Write;
       9              : use std::{env, str::FromStr};
      10              : use utils::const_assert;
      11              : use utils::lsn::Lsn;
      12              : 
      13            9 : fn init_logging() {
      14            9 :     let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(format!(
      15            9 :         "crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"
      16            9 :     )))
      17            9 :     .is_test(true)
      18            9 :     .try_init();
      19            9 : }
      20              : 
      21              : /// Test that find_end_of_wal returns the same results as pg_dump on various
      22              : /// WALs created by Crafter.
      23            9 : fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
      24            9 :     use crate::*;
      25            9 : 
      26            9 :     let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
      27            9 : 
      28            9 :     // Craft some WAL
      29            9 :     let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
      30            9 :         .join("..")
      31            9 :         .join("..")
      32            9 :         .join("..");
      33            9 :     let cfg = Conf {
      34            9 :         pg_version,
      35            9 :         pg_distrib_dir: top_path.join("pg_install"),
      36            9 :         datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
      37            9 :     };
      38            9 :     if cfg.datadir.exists() {
      39            0 :         fs::remove_dir_all(&cfg.datadir).unwrap();
      40            9 :     }
      41            9 :     cfg.initdb().unwrap();
      42            9 :     let srv = cfg.start_server().unwrap();
      43            9 :     let intermediate_lsns = C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
      44            9 :     let intermediate_lsns: Vec<Lsn> = intermediate_lsns
      45            9 :         .iter()
      46           15 :         .map(|&lsn| u64::from(lsn).into())
      47            9 :         .collect();
      48            9 :     // Kill postgres. Note that it might have inserted to WAL something after
      49            9 :     // 'craft' did its job.
      50            9 :     srv.kill();
      51            9 : 
      52            9 :     // Check find_end_of_wal on the initial WAL
      53            9 :     let last_segment = cfg
      54            9 :         .wal_dir()
      55            9 :         .read_dir()
      56            9 :         .unwrap()
      57           24 :         .map(|f| f.unwrap().file_name().into_string().unwrap())
      58           24 :         .filter(|fname| IsXLogFileName(fname))
      59            9 :         .max()
      60            9 :         .unwrap();
      61            9 :     let expected_end_of_wal = find_pg_waldump_end_of_wal(&cfg, &last_segment);
      62           24 :     for start_lsn in intermediate_lsns
      63            9 :         .iter()
      64            9 :         .chain(std::iter::once(&expected_end_of_wal))
      65              :     {
      66              :         // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
      67              :         // We assume that `start_lsn` is non-decreasing.
      68           24 :         info!(
      69           24 :             "Checking with start_lsn={}, erasing WAL before it",
      70              :             start_lsn
      71              :         );
      72           66 :         for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
      73           66 :             let fname = file.file_name().into_string().unwrap();
      74           66 :             if !IsXLogFileName(&fname) {
      75           24 :                 continue;
      76           42 :             }
      77           42 :             let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
      78           42 :             let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
      79           42 :             if seg_start_lsn > u64::from(*start_lsn) {
      80            6 :                 continue;
      81           36 :             }
      82           36 :             let mut f = File::options().write(true).open(file.path()).unwrap();
      83           36 :             const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
      84           36 :             f.write_all(
      85           36 :                 &ZEROS[0..min(
      86           36 :                     WAL_SEGMENT_SIZE,
      87           36 :                     (u64::from(*start_lsn) - seg_start_lsn) as usize,
      88           36 :                 )],
      89           36 :             )
      90           36 :             .unwrap();
      91              :         }
      92           24 :         check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal);
      93              :     }
      94            9 : }
      95              : 
      96            9 : fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
      97            9 :     // Get the actual end of WAL by pg_waldump
      98            9 :     let waldump_output = cfg
      99            9 :         .pg_waldump("000000010000000000000001", last_segment)
     100            9 :         .unwrap()
     101            9 :         .stderr;
     102            9 :     let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
     103            9 :     let caps = match Regex::new(r"invalid record length at (.+):")
     104            9 :         .unwrap()
     105            9 :         .captures(waldump_output)
     106              :     {
     107            9 :         Some(caps) => caps,
     108              :         None => {
     109            0 :             error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output);
     110            0 :             panic!();
     111              :         }
     112              :     };
     113            9 :     let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
     114            9 :     info!("waldump erred on {}", waldump_wal_end);
     115            9 :     waldump_wal_end
     116            9 : }
     117              : 
     118           24 : fn check_end_of_wal(
     119           24 :     cfg: &crate::Conf,
     120           24 :     last_segment: &str,
     121           24 :     start_lsn: Lsn,
     122           24 :     expected_end_of_wal: Lsn,
     123           24 : ) {
     124           24 :     // Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
     125           24 :     // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
     126           24 :     // info!(
     127           24 :     //     "find_end_of_wal returned wal_end={} with non-partial WAL segment",
     128           24 :     //     wal_end
     129           24 :     // );
     130           24 :     // assert_eq!(wal_end, expected_end_of_wal_non_partial);
     131           24 : 
     132           24 :     // Rename file to partial to actually find last valid lsn, then rename it back.
     133           24 :     fs::rename(
     134           24 :         cfg.wal_dir().join(last_segment),
     135           24 :         cfg.wal_dir().join(format!("{}.partial", last_segment)),
     136           24 :     )
     137           24 :     .unwrap();
     138           24 :     let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
     139           24 :     info!(
     140           24 :         "find_end_of_wal returned wal_end={} with partial WAL segment",
     141              :         wal_end
     142              :     );
     143           24 :     assert_eq!(wal_end, expected_end_of_wal);
     144           24 :     fs::rename(
     145           24 :         cfg.wal_dir().join(format!("{}.partial", last_segment)),
     146           24 :         cfg.wal_dir().join(last_segment),
     147           24 :     )
     148           24 :     .unwrap();
     149           24 : }
     150              : 
     151              : const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
     152              : 
     153              : #[test]
     154            3 : pub fn test_find_end_of_wal_simple() {
     155            3 :     init_logging();
     156            3 :     test_end_of_wal::<crate::Simple>("test_find_end_of_wal_simple");
     157            3 : }
     158              : 
     159              : #[test]
     160            3 : pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() {
     161            3 :     init_logging();
     162            3 :     test_end_of_wal::<crate::WalRecordCrossingSegmentFollowedBySmallOne>(
     163            3 :         "test_find_end_of_wal_crossing_segment_followed_by_small_one",
     164            3 :     );
     165            3 : }
     166              : 
     167              : #[test]
     168            3 : pub fn test_find_end_of_wal_last_crossing_segment() {
     169            3 :     init_logging();
     170            3 :     test_end_of_wal::<crate::LastWalRecordCrossingSegment>(
     171            3 :         "test_find_end_of_wal_last_crossing_segment",
     172            3 :     );
     173            3 : }
     174              : 
     175              : /// Check the math in update_next_xid
     176              : ///
     177              : /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
     178              : /// currently 1024.
     179              : #[test]
     180            3 : pub fn test_update_next_xid() {
     181            3 :     let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
     182            3 :     let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
     183            3 : 
     184            3 :     checkpoint.nextXid = FullTransactionId { value: 10 };
     185            3 :     assert_eq!(checkpoint.nextXid.value, 10);
     186              : 
     187              :     // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
     188              :     // boundary
     189            3 :     checkpoint.update_next_xid(100);
     190            3 :     assert_eq!(checkpoint.nextXid.value, 1024);
     191              : 
     192              :     // No change
     193            3 :     checkpoint.update_next_xid(500);
     194            3 :     assert_eq!(checkpoint.nextXid.value, 1024);
     195            3 :     checkpoint.update_next_xid(1023);
     196            3 :     assert_eq!(checkpoint.nextXid.value, 1024);
     197              : 
     198              :     // The function returns the *next* XID, given the highest XID seen so
     199              :     // far. So when we pass 1024, the nextXid gets bumped up to the next
     200              :     // XID_CHECKPOINT_INTERVAL boundary.
     201            3 :     checkpoint.update_next_xid(1024);
     202            3 :     assert_eq!(checkpoint.nextXid.value, 2048);
     203            3 : }
     204              : 
     205              : #[test]
     206            3 : pub fn test_update_next_multixid() {
     207            3 :     let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
     208            3 :     let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
     209            3 : 
     210            3 :     // simple case
     211            3 :     checkpoint.nextMulti = 20;
     212            3 :     checkpoint.nextMultiOffset = 20;
     213            3 :     checkpoint.update_next_multixid(1000, 2000);
     214            3 :     assert_eq!(checkpoint.nextMulti, 1000);
     215            3 :     assert_eq!(checkpoint.nextMultiOffset, 2000);
     216              : 
     217              :     // No change
     218            3 :     checkpoint.update_next_multixid(500, 900);
     219            3 :     assert_eq!(checkpoint.nextMulti, 1000);
     220            3 :     assert_eq!(checkpoint.nextMultiOffset, 2000);
     221              : 
     222              :     // Close to wraparound, but not wrapped around yet
     223            3 :     checkpoint.nextMulti = 0xffff0000;
     224            3 :     checkpoint.nextMultiOffset = 0xfffe0000;
     225            3 :     checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff);
     226            3 :     assert_eq!(checkpoint.nextMulti, 0xffff00ff);
     227            3 :     assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
     228              : 
     229              :     // Wraparound
     230            3 :     checkpoint.update_next_multixid(1, 900);
     231            3 :     assert_eq!(checkpoint.nextMulti, 1);
     232            3 :     assert_eq!(checkpoint.nextMultiOffset, 900);
     233              : 
     234              :     // Wraparound nextMulti to 0.
     235              :     //
     236              :     // It's a bit surprising that nextMulti can be 0, because that's a special value
     237              :     // (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound:
     238              :     // nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips
     239              :     // the 0 and the next multi-xid actually assigned is 1.
     240            3 :     checkpoint.nextMulti = 0xffff0000;
     241            3 :     checkpoint.nextMultiOffset = 0xfffe0000;
     242            3 :     checkpoint.update_next_multixid(0, 0xfffe00ff);
     243            3 :     assert_eq!(checkpoint.nextMulti, 0);
     244            3 :     assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
     245              : 
     246              :     // Wraparound nextMultiOffset to 0
     247            3 :     checkpoint.update_next_multixid(0, 0);
     248            3 :     assert_eq!(checkpoint.nextMulti, 0);
     249            3 :     assert_eq!(checkpoint.nextMultiOffset, 0);
     250            3 : }
     251              : 
     252              : #[test]
     253            3 : pub fn test_encode_logical_message() {
     254            3 :     let expected = [
     255            3 :         64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38,
     256            3 :         0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102,
     257            3 :         105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
     258            3 :     ];
     259            3 :     let actual = encode_logical_message("prefix", "message");
     260            3 :     assert_eq!(expected, actual[..]);
     261            3 : }
        

Generated by: LCOV version 2.1-beta