LCOV - code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 88.3 % 264 233
Test Date: 2024-02-07 07:37:29 Functions: 62.5 % 40 25

            Line data    Source code
       1              : use anyhow::{bail, ensure};
       2              : use camino_tempfile::{tempdir, Utf8TempDir};
       3              : use log::*;
       4              : use postgres::types::PgLsn;
       5              : use postgres::Client;
       6              : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
       7              : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
       8              : use std::cmp::Ordering;
       9              : use std::path::{Path, PathBuf};
      10              : use std::process::Command;
      11              : use std::time::{Duration, Instant};
      12              : 
      13              : macro_rules! xlog_utils_test {
      14              :     ($version:ident) => {
      15              :         #[path = "."]
      16              :         mod $version {
      17              :             #[allow(unused_imports)]
      18              :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      19              :             #[allow(clippy::duplicate_mod)]
      20              :             #[cfg(test)]
      21              :             mod xlog_utils_test;
      22              :         }
      23              :     };
      24              : }
      25              : 
      26              : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      27              : 
      28            0 : #[derive(Debug, Clone, PartialEq, Eq)]
      29              : pub struct Conf {
      30              :     pub pg_version: u32,
      31              :     pub pg_distrib_dir: PathBuf,
      32              :     pub datadir: PathBuf,
      33              : }
      34              : 
      35              : pub struct PostgresServer {
      36              :     process: std::process::Child,
      37              :     _unix_socket_dir: Utf8TempDir,
      38              :     client_config: postgres::Config,
      39              : }
      40              : 
      41              : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      42              :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      43              :     "shared_preload_libraries=neon", // can only be loaded at startup
      44              :     // Disable background processes as much as possible
      45              :     "wal_writer_delay=10s",
      46              :     "autovacuum=off",
      47              : ];
      48              : 
      49              : impl Conf {
      50          162 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      51          162 :         let path = self.pg_distrib_dir.clone();
      52          162 : 
      53          162 :         #[allow(clippy::manual_range_patterns)]
      54          162 :         match self.pg_version {
      55          162 :             14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
      56            0 :             _ => bail!("Unsupported postgres version: {}", self.pg_version),
      57              :         }
      58          162 :     }
      59              : 
      60           54 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      61           54 :         Ok(self.pg_distrib_dir()?.join("bin"))
      62           54 :     }
      63              : 
      64          108 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      65          108 :         Ok(self.pg_distrib_dir()?.join("lib"))
      66          108 :     }
      67              : 
      68          270 :     pub fn wal_dir(&self) -> PathBuf {
      69          270 :         self.datadir.join("pg_wal")
      70          270 :     }
      71              : 
      72           54 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      73           54 :         let path = self.pg_bin_dir()?.join(command);
      74           54 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      75           54 :         let mut cmd = Command::new(path);
      76           54 :         cmd.env_clear()
      77           54 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      78           54 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
      79           54 :         Ok(cmd)
      80           54 :     }
      81              : 
      82           18 :     pub fn initdb(&self) -> anyhow::Result<()> {
      83           18 :         if let Some(parent) = self.datadir.parent() {
      84           18 :             info!("Pre-creating parent directory {:?}", parent);
      85              :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      86              :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      87           18 :             std::fs::create_dir_all(parent)?;
      88            0 :         }
      89           18 :         info!(
      90           18 :             "Running initdb in {:?} with user \"postgres\"",
      91              :             self.datadir
      92              :         );
      93           18 :         let output = self
      94           18 :             .new_pg_command("initdb")?
      95           18 :             .arg("-D")
      96           18 :             .arg(&self.datadir)
      97           18 :             .args(["-U", "postgres", "--no-instructions", "--no-sync"])
      98           18 :             .output()?;
      99           18 :         debug!("initdb output: {:?}", output);
     100              :         ensure!(
     101           18 :             output.status.success(),
     102            0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     103            0 :             String::from_utf8_lossy(&output.stdout),
     104            0 :             String::from_utf8_lossy(&output.stderr),
     105              :         );
     106           18 :         Ok(())
     107           18 :     }
     108              : 
     109           18 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     110           18 :         info!("Starting Postgres server in {:?}", self.datadir);
     111           18 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     112           18 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     113           18 :         let server_process = self
     114           18 :             .new_pg_command("postgres")?
     115           18 :             .args(["-c", "listen_addresses="])
     116           18 :             .arg("-k")
     117           18 :             .arg(&unix_socket_dir_path)
     118           18 :             .arg("-D")
     119           18 :             .arg(&self.datadir)
     120           72 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     121           18 :             .spawn()?;
     122           18 :         let server = PostgresServer {
     123           18 :             process: server_process,
     124           18 :             _unix_socket_dir: unix_socket_dir,
     125           18 :             client_config: {
     126           18 :                 let mut c = postgres::Config::new();
     127           18 :                 c.host_path(&unix_socket_dir_path);
     128           18 :                 c.user("postgres");
     129           18 :                 c.connect_timeout(Duration::from_millis(10000));
     130           18 :                 c
     131           18 :             },
     132           18 :         };
     133           18 :         Ok(server)
     134           18 :     }
     135              : 
     136           18 :     pub fn pg_waldump(
     137           18 :         &self,
     138           18 :         first_segment_name: &str,
     139           18 :         last_segment_name: &str,
     140           18 :     ) -> anyhow::Result<std::process::Output> {
     141           18 :         let first_segment_file = self.datadir.join(first_segment_name);
     142           18 :         let last_segment_file = self.datadir.join(last_segment_name);
     143           18 :         info!(
     144           18 :             "Running pg_waldump for {} .. {}",
     145           18 :             first_segment_file.display(),
     146           18 :             last_segment_file.display()
     147              :         );
     148           18 :         let output = self
     149           18 :             .new_pg_command("pg_waldump")?
     150           18 :             .args([&first_segment_file, &last_segment_file])
     151           18 :             .output()?;
     152           18 :         debug!("waldump output: {:?}", output);
     153           18 :         Ok(output)
     154           18 :     }
     155              : }
     156              : 
     157              : impl PostgresServer {
     158           18 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     159           18 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     160           37 :         while Instant::now() < retry_until {
     161           37 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     162           18 :                 return Ok(client);
     163           19 :             }
     164           19 :             std::thread::sleep(Duration::from_millis(100));
     165              :         }
     166            0 :         bail!("Connection timed out");
     167           18 :     }
     168              : 
     169           18 :     pub fn kill(mut self) {
     170           18 :         self.process.kill().unwrap();
     171           18 :         self.process.wait().unwrap();
     172           18 :     }
     173              : }
     174              : 
     175              : impl Drop for PostgresServer {
     176           18 :     fn drop(&mut self) {
     177           18 :         match self.process.try_wait() {
     178           18 :             Ok(Some(_)) => return,
     179              :             Ok(None) => {
     180            0 :                 warn!("Server was not terminated, will be killed");
     181              :             }
     182            0 :             Err(e) => {
     183            0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     184              :             }
     185              :         }
     186            0 :         let _ = self.process.kill();
     187           18 :     }
     188              : }
     189              : 
     190              : pub trait PostgresClientExt: postgres::GenericClient {
     191           56 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     192           56 :         Ok(self
     193           56 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     194           56 :             .get(0))
     195           56 :     }
     196           21 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     197           21 :         Ok(self
     198           21 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     199           21 :             .get(0))
     200           21 :     }
     201              : }
     202              : 
     203              : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     204              : 
     205           23 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     206           23 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     207              : 
     208           23 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     209           23 :     ensure!(wal_keep_size == "50MB");
     210           23 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     211           23 :     ensure!(wal_writer_delay == "10s");
     212           23 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     213           23 :     ensure!(autovacuum == "off");
     214              : 
     215           23 :     let wal_segment_size = client.query_one(
     216           23 :         "select cast(setting as bigint) as setting, unit \
     217           23 :          from pg_settings where name = 'wal_segment_size'",
     218           23 :         &[],
     219           23 :     )?;
     220              :     ensure!(
     221           23 :         wal_segment_size.get::<_, String>("unit") == "B",
     222            0 :         "Unexpected wal_segment_size unit"
     223              :     );
     224              :     ensure!(
     225           23 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     226            0 :         "Unexpected wal_segment_size in bytes"
     227              :     );
     228              : 
     229           23 :     Ok(())
     230           23 : }
     231              : 
     232              : pub trait Crafter {
     233              :     const NAME: &'static str;
     234              : 
     235              :     /// Generates WAL using the client `client`. Returns a pair of:
     236              :     /// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
     237              :     ///   May include or exclude Lsn(0) and the end-of-wal.
     238              :     /// * The expected end-of-wal LSN.
     239              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
     240              : }
     241              : 
     242           21 : fn craft_internal<C: postgres::GenericClient>(
     243           21 :     client: &mut C,
     244           21 :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
     245           21 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     246           21 :     ensure_server_config(client)?;
     247              : 
     248           21 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     249           21 :     info!("LSN initial = {}", initial_lsn);
     250              : 
     251           21 :     let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
     252           21 :     let last_lsn = match last_lsn {
     253            7 :         None => client.pg_current_wal_insert_lsn()?,
     254           14 :         Some(last_lsn) => {
     255           14 :             let insert_lsn = client.pg_current_wal_insert_lsn()?;
     256           14 :             match last_lsn.cmp(&insert_lsn) {
     257            0 :                 Ordering::Less => bail!(
     258            0 :                     "Some records were inserted after the crafted WAL: {} vs {}",
     259            0 :                     last_lsn,
     260            0 :                     insert_lsn
     261            0 :                 ),
     262           14 :                 Ordering::Equal => last_lsn,
     263            0 :                 Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
     264              :             }
     265              :         }
     266              :     };
     267           21 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     268           21 :         intermediate_lsns.insert(0, initial_lsn);
     269           21 :     }
     270              : 
     271              :     // Some records may be not flushed, e.g. non-transactional logical messages.
     272           21 :     client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
     273           21 :     match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
     274            0 :         Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
     275           21 :         Ordering::Equal => {}
     276            0 :         Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
     277              :     }
     278           21 :     Ok((intermediate_lsns, last_lsn))
     279           21 : }
     280              : 
     281              : pub struct Simple;
     282              : impl Crafter for Simple {
     283              :     const NAME: &'static str = "simple";
     284            7 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     285            7 :         craft_internal(client, |client, _| {
     286            7 :             client.execute("CREATE table t(x int)", &[])?;
     287            7 :             Ok((Vec::new(), None))
     288            7 :         })
     289            7 :     }
     290              : }
     291              : 
     292              : pub struct LastWalRecordXlogSwitch;
     293              : impl Crafter for LastWalRecordXlogSwitch {
     294              :     const NAME: &'static str = "last_wal_record_xlog_switch";
     295            1 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     296            1 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     297            1 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     298            1 :         ensure_server_config(client)?;
     299              : 
     300            1 :         client.execute("CREATE table t(x int)", &[])?;
     301            1 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     302            1 :         let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     303            1 :         let next_segment = PgLsn::from(0x0200_0000);
     304            1 :         ensure!(
     305            1 :             after_xlog_switch <= next_segment,
     306            0 :             "XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
     307              :             after_xlog_switch,
     308              :             next_segment
     309              :         );
     310            1 :         Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
     311            1 :     }
     312              : }
     313              : 
     314              : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     315              : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     316              :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     317            1 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     318            1 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     319            1 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     320            1 :         ensure_server_config(client)?;
     321              : 
     322            1 :         client.execute("CREATE table t(x int)", &[])?;
     323              : 
     324              :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
     325              :         // We will use logical message as the padding. We start with detecting how much WAL
     326              :         // it takes for one logical message, considering all alignments and headers.
     327            1 :         let base_wal_advance = {
     328            1 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     329              :             // Small non-empty message bigger than few bytes is more likely than an empty
     330              :             // message to have the same format as the big padding message.
     331            1 :             client.execute(
     332            1 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     333            1 :                 &[],
     334            1 :             )?;
     335              :             // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     336            1 :             (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
     337              :                 + XLOG_SIZE_OF_XLOG_RECORD
     338              :         };
     339            1 :         let mut remaining_lsn =
     340            1 :             XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
     341            1 :         if remaining_lsn < base_wal_advance {
     342            0 :             remaining_lsn += XLOG_BLCKSZ;
     343            1 :         }
     344            1 :         let repeats = 10 + remaining_lsn - base_wal_advance;
     345            1 :         info!(
     346            1 :             "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
     347            1 :             client.pg_current_wal_insert_lsn()?,
     348              :             remaining_lsn,
     349              :             base_wal_advance,
     350              :             repeats
     351              :         );
     352            1 :         client.execute(
     353            1 :             "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     354            1 :             &[&(repeats as i32)],
     355            1 :         )?;
     356            1 :         info!(
     357            1 :             "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     358            1 :             client.pg_current_wal_insert_lsn()?,
     359              :             XLOG_SIZE_OF_XLOG_RECORD
     360              :         );
     361              : 
     362              :         // Emit the XLOG_SWITCH
     363            1 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     364            1 :         let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     365            1 :         let next_segment = PgLsn::from(0x0200_0000);
     366            1 :         ensure!(
     367            1 :             after_xlog_switch < next_segment,
     368            0 :             "XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
     369              :             after_xlog_switch,
     370              :             next_segment
     371              :         );
     372              :         ensure!(
     373            1 :             u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
     374            0 :             "XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
     375            0 :             after_xlog_switch,
     376            0 :             u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ
     377              :         );
     378            1 :         Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
     379            1 :     }
     380              : }
     381              : 
     382           14 : fn craft_single_logical_message(
     383           14 :     client: &mut impl postgres::GenericClient,
     384           14 :     transactional: bool,
     385           14 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     386           14 :     craft_internal(client, |client, initial_lsn| {
     387           14 :         ensure!(
     388           14 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     389            0 :             "Initial LSN is too far in the future"
     390              :         );
     391              : 
     392           14 :         let message_lsn: PgLsn = client
     393           14 :             .query_one(
     394           14 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     395           14 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     396           14 :                 &[&transactional],
     397           14 :             )?
     398           14 :             .get("message_lsn");
     399           14 :         ensure!(
     400           14 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     401            0 :             "Logical message did not cross the segment boundary"
     402              :         );
     403              :         ensure!(
     404           14 :             message_lsn < PgLsn::from(0x0400_0000),
     405            0 :             "Logical message crossed two segments"
     406              :         );
     407              : 
     408           14 :         if transactional {
     409              :             // Transactional logical messages are part of a transaction, so the one above is
     410              :             // followed by a small COMMIT record.
     411              : 
     412            7 :             let after_message_lsn = client.pg_current_wal_insert_lsn()?;
     413              :             ensure!(
     414            7 :                 message_lsn < after_message_lsn,
     415            0 :                 "No record found after the emitted message"
     416              :             );
     417            7 :             Ok((vec![message_lsn], Some(after_message_lsn)))
     418              :         } else {
     419            7 :             Ok((Vec::new(), Some(message_lsn)))
     420              :         }
     421           14 :     })
     422           14 : }
     423              : 
     424              : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     425              : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     426              :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     427            7 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     428            7 :         craft_single_logical_message(client, true)
     429            7 :     }
     430              : }
     431              : 
     432              : pub struct LastWalRecordCrossingSegment;
     433              : impl Crafter for LastWalRecordCrossingSegment {
     434              :     const NAME: &'static str = "last_wal_record_crossing_segment";
     435            7 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     436            7 :         craft_single_logical_message(client, false)
     437            7 :     }
     438              : }
        

Generated by: LCOV version 2.1-beta