|             Line data    Source code 
       1              : //! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency.
       2              : 
       3              : use super::*;
       4              : use crate::{error, info};
       5              : use regex::Regex;
       6              : use std::cmp::min;
       7              : use std::ffi::OsStr;
       8              : use std::fs::{self, File};
       9              : use std::io::Write;
      10              : use std::{env, str::FromStr};
      11              : use utils::const_assert;
      12              : use utils::lsn::Lsn;
      13              : 
      14           12 : fn init_logging() {
      15           12 :     let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(format!(
      16           12 :         "crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"
      17           12 :     )))
      18           12 :     .is_test(true)
      19           12 :     .try_init();
      20           12 : }
      21              : 
      22              : /// Test that find_end_of_wal returns the same results as pg_dump on various
      23              : /// WALs created by Crafter.
      24           12 : fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
      25              :     use crate::*;
      26              : 
      27           12 :     let pg_version = MY_PGVERSION;
      28              : 
      29              :     // Craft some WAL
      30           12 :     let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
      31           12 :         .join("..")
      32           12 :         .join("..")
      33           12 :         .join("..");
      34           12 :     let cfg = Conf {
      35           12 :         pg_version,
      36           12 :         pg_distrib_dir: top_path.join("pg_install"),
      37           12 :         datadir: top_path.join(format!("test_output/{test_name}-{PG_MAJORVERSION}")),
      38           12 :     };
      39           12 :     if cfg.datadir.exists() {
      40            0 :         fs::remove_dir_all(&cfg.datadir).unwrap();
      41           12 :     }
      42           12 :     cfg.initdb().unwrap();
      43           12 :     let srv = cfg.start_server().unwrap();
      44           12 :     let intermediate_lsns = C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
      45           12 :     let intermediate_lsns: Vec<Lsn> = intermediate_lsns
      46           12 :         .iter()
      47           20 :         .map(|&lsn| u64::from(lsn).into())
      48           12 :         .collect();
      49              :     // Kill postgres. Note that it might have inserted to WAL something after
      50              :     // 'craft' did its job.
      51           12 :     srv.kill();
      52              : 
      53              :     // Check find_end_of_wal on the initial WAL
      54           12 :     let last_segment = cfg
      55           12 :         .wal_dir()
      56           12 :         .read_dir()
      57           12 :         .unwrap()
      58           35 :         .map(|f| f.unwrap().file_name())
      59           35 :         .filter(|fname| IsXLogFileName(fname))
      60           12 :         .max()
      61           12 :         .unwrap();
      62           12 :     let expected_end_of_wal = find_pg_waldump_end_of_wal(&cfg, &last_segment);
      63           32 :     for start_lsn in intermediate_lsns
      64           12 :         .iter()
      65           12 :         .chain(std::iter::once(&expected_end_of_wal))
      66              :     {
      67              :         // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
      68              :         // We assume that `start_lsn` is non-decreasing.
      69           32 :         info!(
      70           32 :             "Checking with start_lsn={}, erasing WAL before it",
      71              :             start_lsn
      72              :         );
      73           96 :         for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
      74           96 :             let fname = file.file_name();
      75           96 :             if !IsXLogFileName(&fname) {
      76           40 :                 continue;
      77           56 :             }
      78           56 :             let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE).unwrap();
      79           56 :             let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
      80           56 :             if seg_start_lsn > u64::from(*start_lsn) {
      81            8 :                 continue;
      82           48 :             }
      83           48 :             let mut f = File::options().write(true).open(file.path()).unwrap();
      84              :             static ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
      85           48 :             f.write_all(
      86           48 :                 &ZEROS[0..min(
      87           48 :                     WAL_SEGMENT_SIZE,
      88           48 :                     (u64::from(*start_lsn) - seg_start_lsn) as usize,
      89           48 :                 )],
      90              :             )
      91           48 :             .unwrap();
      92              :         }
      93           32 :         check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal);
      94              :     }
      95           12 : }
      96              : 
      97           12 : fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
      98              :     // Get the actual end of WAL by pg_waldump
      99           12 :     let waldump_output = cfg
     100           12 :         .pg_waldump(OsStr::new("000000010000000000000001"), last_segment)
     101           12 :         .unwrap()
     102           12 :         .stderr;
     103           12 :     let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
     104           12 :     let caps = match Regex::new(r"invalid record length at (.+):")
     105           12 :         .unwrap()
     106           12 :         .captures(waldump_output)
     107              :     {
     108           12 :         Some(caps) => caps,
     109              :         None => {
     110            0 :             error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output);
     111            0 :             panic!();
     112              :         }
     113              :     };
     114           12 :     let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
     115           12 :     info!("waldump erred on {}", waldump_wal_end);
     116           12 :     waldump_wal_end
     117           12 : }
     118              : 
     119           32 : fn check_end_of_wal(
     120           32 :     cfg: &crate::Conf,
     121           32 :     last_segment: &OsStr,
     122           32 :     start_lsn: Lsn,
     123           32 :     expected_end_of_wal: Lsn,
     124           32 : ) {
     125              :     // Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
     126              :     // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
     127              :     // info!(
     128              :     //     "find_end_of_wal returned wal_end={} with non-partial WAL segment",
     129              :     //     wal_end
     130              :     // );
     131              :     // assert_eq!(wal_end, expected_end_of_wal_non_partial);
     132              : 
     133              :     // Rename file to partial to actually find last valid lsn, then rename it back.
     134           32 :     fs::rename(
     135           32 :         cfg.wal_dir().join(last_segment),
     136           32 :         cfg.wal_dir()
     137           32 :             .join(format!("{}.partial", last_segment.to_str().unwrap())),
     138              :     )
     139           32 :     .unwrap();
     140           32 :     let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
     141           32 :     info!(
     142           32 :         "find_end_of_wal returned wal_end={} with partial WAL segment",
     143              :         wal_end
     144              :     );
     145           32 :     assert_eq!(wal_end, expected_end_of_wal);
     146           32 :     fs::rename(
     147           32 :         cfg.wal_dir()
     148           32 :             .join(format!("{}.partial", last_segment.to_str().unwrap())),
     149           32 :         cfg.wal_dir().join(last_segment),
     150              :     )
     151           32 :     .unwrap();
     152           32 : }
     153              : 
     154              : const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
     155              : 
     156              : #[test]
     157            4 : pub fn test_find_end_of_wal_simple() {
     158            4 :     init_logging();
     159            4 :     test_end_of_wal::<crate::Simple>("test_find_end_of_wal_simple");
     160            4 : }
     161              : 
     162              : #[test]
     163            4 : pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() {
     164            4 :     init_logging();
     165            4 :     test_end_of_wal::<crate::WalRecordCrossingSegmentFollowedBySmallOne>(
     166            4 :         "test_find_end_of_wal_crossing_segment_followed_by_small_one",
     167              :     );
     168            4 : }
     169              : 
     170              : #[test]
     171            4 : pub fn test_find_end_of_wal_last_crossing_segment() {
     172            4 :     init_logging();
     173            4 :     test_end_of_wal::<crate::LastWalRecordCrossingSegment>(
     174            4 :         "test_find_end_of_wal_last_crossing_segment",
     175              :     );
     176            4 : }
     177              : 
     178              : /// Check the math in update_next_xid
     179              : ///
     180              : /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
     181              : /// currently 1024.
     182              : #[test]
     183            4 : pub fn test_update_next_xid() {
     184            4 :     let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
     185            4 :     let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
     186              : 
     187            4 :     checkpoint.nextXid = FullTransactionId { value: 10 };
     188            4 :     assert_eq!(checkpoint.nextXid.value, 10);
     189              : 
     190              :     // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
     191              :     // boundary
     192            4 :     checkpoint.update_next_xid(100);
     193            4 :     assert_eq!(checkpoint.nextXid.value, 1024);
     194              : 
     195              :     // No change
     196            4 :     checkpoint.update_next_xid(500);
     197            4 :     assert_eq!(checkpoint.nextXid.value, 1024);
     198            4 :     checkpoint.update_next_xid(1023);
     199            4 :     assert_eq!(checkpoint.nextXid.value, 1024);
     200              : 
     201              :     // The function returns the *next* XID, given the highest XID seen so
     202              :     // far. So when we pass 1024, the nextXid gets bumped up to the next
     203              :     // XID_CHECKPOINT_INTERVAL boundary.
     204            4 :     checkpoint.update_next_xid(1024);
     205            4 :     assert_eq!(checkpoint.nextXid.value, 2048);
     206            4 : }
     207              : 
     208              : #[test]
     209            4 : pub fn test_update_next_multixid() {
     210            4 :     let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
     211            4 :     let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
     212              : 
     213              :     // simple case
     214            4 :     checkpoint.nextMulti = 20;
     215            4 :     checkpoint.nextMultiOffset = 20;
     216            4 :     checkpoint.update_next_multixid(1000, 2000);
     217            4 :     assert_eq!(checkpoint.nextMulti, 1000);
     218            4 :     assert_eq!(checkpoint.nextMultiOffset, 2000);
     219              : 
     220              :     // No change
     221            4 :     checkpoint.update_next_multixid(500, 900);
     222            4 :     assert_eq!(checkpoint.nextMulti, 1000);
     223            4 :     assert_eq!(checkpoint.nextMultiOffset, 2000);
     224              : 
     225              :     // Close to wraparound, but not wrapped around yet
     226            4 :     checkpoint.nextMulti = 0xffff0000;
     227            4 :     checkpoint.nextMultiOffset = 0xfffe0000;
     228            4 :     checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff);
     229            4 :     assert_eq!(checkpoint.nextMulti, 0xffff00ff);
     230            4 :     assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
     231              : 
     232              :     // Wraparound
     233            4 :     checkpoint.update_next_multixid(1, 900);
     234            4 :     assert_eq!(checkpoint.nextMulti, 1);
     235            4 :     assert_eq!(checkpoint.nextMultiOffset, 900);
     236              : 
     237              :     // Wraparound nextMulti to 0.
     238              :     //
     239              :     // It's a bit surprising that nextMulti can be 0, because that's a special value
     240              :     // (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound:
     241              :     // nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips
     242              :     // the 0 and the next multi-xid actually assigned is 1.
     243            4 :     checkpoint.nextMulti = 0xffff0000;
     244            4 :     checkpoint.nextMultiOffset = 0xfffe0000;
     245            4 :     checkpoint.update_next_multixid(0, 0xfffe00ff);
     246            4 :     assert_eq!(checkpoint.nextMulti, 0);
     247            4 :     assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
     248              : 
     249              :     // Wraparound nextMultiOffset to 0
     250            4 :     checkpoint.update_next_multixid(0, 0);
     251            4 :     assert_eq!(checkpoint.nextMulti, 0);
     252            4 :     assert_eq!(checkpoint.nextMultiOffset, 0);
     253            4 : }
     254              : 
     255              : #[test]
     256            4 : pub fn test_encode_logical_message() {
     257            4 :     let expected = [
     258            4 :         64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38,
     259            4 :         0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102,
     260            4 :         105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
     261            4 :     ];
     262            4 :     let actual = encode_logical_message("prefix", "message");
     263            4 :     assert_eq!(expected, actual[..]);
     264            4 : }
         |