LCOV - code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 67.3 % 278 187
Test Date: 2025-03-12 16:10:49 Functions: 61.1 % 36 22

            Line data    Source code
       1              : use std::ffi::OsStr;
       2              : use std::path::{Path, PathBuf};
       3              : use std::process::Command;
       4              : use std::time::{Duration, Instant};
       5              : 
       6              : use anyhow::{bail, ensure};
       7              : use camino_tempfile::{Utf8TempDir, tempdir};
       8              : use log::*;
       9              : use postgres::Client;
      10              : use postgres::types::PgLsn;
      11              : use postgres_ffi::{
      12              :     WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD,
      13              :     XLOG_SIZE_OF_XLOG_SHORT_PHD,
      14              : };
      15              : 
      16              : macro_rules! xlog_utils_test {
      17              :     ($version:ident) => {
      18              :         #[path = "."]
      19              :         mod $version {
      20              :             #[allow(unused_imports)]
      21              :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      22              :             #[allow(clippy::duplicate_mod)]
      23              :             #[cfg(test)]
      24              :             mod xlog_utils_test;
      25              :         }
      26              :     };
      27              : }
      28              : 
      29              : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      30              : 
      31              : pub struct Conf {
      32              :     pub pg_version: u32,
      33              :     pub pg_distrib_dir: PathBuf,
      34              :     pub datadir: PathBuf,
      35              : }
      36              : 
      37              : pub struct PostgresServer {
      38              :     process: std::process::Child,
      39              :     _unix_socket_dir: Utf8TempDir,
      40              :     client_config: postgres::Config,
      41              : }
      42              : 
      43              : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      44              :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      45              :     "shared_preload_libraries=neon", // can only be loaded at startup
      46              :     // Disable background processes as much as possible
      47              :     "wal_writer_delay=10s",
      48              :     "autovacuum=off",
      49              : ];
      50              : 
      51              : impl Conf {
      52          108 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      53          108 :         let path = self.pg_distrib_dir.clone();
      54          108 : 
      55          108 :         #[allow(clippy::manual_range_patterns)]
      56          108 :         match self.pg_version {
      57          108 :             14 | 15 | 16 | 17 => Ok(path.join(format!("v{}", self.pg_version))),
      58            0 :             _ => bail!("Unsupported postgres version: {}", self.pg_version),
      59              :         }
      60          108 :     }
      61              : 
      62           36 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      63           36 :         Ok(self.pg_distrib_dir()?.join("bin"))
      64           36 :     }
      65              : 
      66           72 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      67           72 :         Ok(self.pg_distrib_dir()?.join("lib"))
      68           72 :     }
      69              : 
      70          204 :     pub fn wal_dir(&self) -> PathBuf {
      71          204 :         self.datadir.join("pg_wal")
      72          204 :     }
      73              : 
      74           36 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      75           36 :         let path = self.pg_bin_dir()?.join(command);
      76           36 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      77           36 :         let mut cmd = Command::new(path);
      78           36 :         cmd.env_clear()
      79           36 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      80           36 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?)
      81           36 :             .env(
      82           36 :                 "ASAN_OPTIONS",
      83           36 :                 std::env::var("ASAN_OPTIONS").unwrap_or_default(),
      84           36 :             )
      85           36 :             .env(
      86           36 :                 "UBSAN_OPTIONS",
      87           36 :                 std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
      88           36 :             );
      89           36 :         Ok(cmd)
      90           36 :     }
      91              : 
      92           12 :     pub fn initdb(&self) -> anyhow::Result<()> {
      93           12 :         if let Some(parent) = self.datadir.parent() {
      94           12 :             info!("Pre-creating parent directory {:?}", parent);
      95              :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      96              :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      97           12 :             std::fs::create_dir_all(parent)?;
      98            0 :         }
      99           12 :         info!(
     100           12 :             "Running initdb in {:?} with user \"postgres\"",
     101              :             self.datadir
     102              :         );
     103           12 :         let output = self
     104           12 :             .new_pg_command("initdb")?
     105           12 :             .arg("--pgdata")
     106           12 :             .arg(&self.datadir)
     107           12 :             .args(["--username", "postgres", "--no-instructions", "--no-sync"])
     108           12 :             .output()?;
     109           12 :         debug!("initdb output: {:?}", output);
     110           12 :         ensure!(
     111           12 :             output.status.success(),
     112            0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     113            0 :             String::from_utf8_lossy(&output.stdout),
     114            0 :             String::from_utf8_lossy(&output.stderr),
     115              :         );
     116           12 :         Ok(())
     117           12 :     }
     118              : 
     119           12 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     120           12 :         info!("Starting Postgres server in {:?}", self.datadir);
     121           12 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     122           12 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     123           12 :         let server_process = self
     124           12 :             .new_pg_command("postgres")?
     125           12 :             .args(["-c", "listen_addresses="])
     126           12 :             .arg("-k")
     127           12 :             .arg(&unix_socket_dir_path)
     128           12 :             .arg("-D")
     129           12 :             .arg(&self.datadir)
     130           48 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     131           12 :             .spawn()?;
     132           12 :         let server = PostgresServer {
     133           12 :             process: server_process,
     134           12 :             _unix_socket_dir: unix_socket_dir,
     135           12 :             client_config: {
     136           12 :                 let mut c = postgres::Config::new();
     137           12 :                 c.host_path(&unix_socket_dir_path);
     138           12 :                 c.user("postgres");
     139           12 :                 c.connect_timeout(Duration::from_millis(10000));
     140           12 :                 c
     141           12 :             },
     142           12 :         };
     143           12 :         Ok(server)
     144           12 :     }
     145              : 
     146           12 :     pub fn pg_waldump(
     147           12 :         &self,
     148           12 :         first_segment_name: &OsStr,
     149           12 :         last_segment_name: &OsStr,
     150           12 :     ) -> anyhow::Result<std::process::Output> {
     151           12 :         let first_segment_file = self.datadir.join(first_segment_name);
     152           12 :         let last_segment_file = self.datadir.join(last_segment_name);
     153           12 :         info!(
     154           12 :             "Running pg_waldump for {} .. {}",
     155           12 :             first_segment_file.display(),
     156           12 :             last_segment_file.display()
     157              :         );
     158           12 :         let output = self
     159           12 :             .new_pg_command("pg_waldump")?
     160           12 :             .args([&first_segment_file, &last_segment_file])
     161           12 :             .output()?;
     162           12 :         debug!("waldump output: {:?}", output);
     163           12 :         Ok(output)
     164           12 :     }
     165              : }
     166              : 
     167              : impl PostgresServer {
     168           12 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     169           12 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     170           25 :         while Instant::now() < retry_until {
     171           25 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     172           12 :                 return Ok(client);
     173           13 :             }
     174           13 :             std::thread::sleep(Duration::from_millis(100));
     175              :         }
     176            0 :         bail!("Connection timed out");
     177           12 :     }
     178              : 
     179           12 :     pub fn kill(mut self) {
     180           12 :         self.process.kill().unwrap();
     181           12 :         self.process.wait().unwrap();
     182           12 :     }
     183              : }
     184              : 
     185              : impl Drop for PostgresServer {
     186           12 :     fn drop(&mut self) {
     187           12 :         match self.process.try_wait() {
     188           12 :             Ok(Some(_)) => return,
     189              :             Ok(None) => {
     190            0 :                 warn!("Server was not terminated, will be killed");
     191              :             }
     192            0 :             Err(e) => {
     193            0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     194              :             }
     195              :         }
     196            0 :         let _ = self.process.kill();
     197           12 :     }
     198              : }
     199              : 
     200              : pub trait PostgresClientExt: postgres::GenericClient {
     201           24 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     202           24 :         Ok(self
     203           24 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     204           24 :             .get(0))
     205           24 :     }
     206            0 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     207            0 :         Ok(self
     208            0 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     209            0 :             .get(0))
     210            0 :     }
     211              : }
     212              : 
     213              : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     214              : 
     215           12 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     216           12 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     217              : 
     218           12 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     219           12 :     ensure!(wal_keep_size == "50MB");
     220           12 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     221           12 :     ensure!(wal_writer_delay == "10s");
     222           12 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     223           12 :     ensure!(autovacuum == "off");
     224              : 
     225           12 :     let wal_segment_size = client.query_one(
     226           12 :         "select cast(setting as bigint) as setting, unit \
     227           12 :          from pg_settings where name = 'wal_segment_size'",
     228           12 :         &[],
     229           12 :     )?;
     230           12 :     ensure!(
     231           12 :         wal_segment_size.get::<_, String>("unit") == "B",
     232            0 :         "Unexpected wal_segment_size unit"
     233              :     );
     234           12 :     ensure!(
     235           12 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     236            0 :         "Unexpected wal_segment_size in bytes"
     237              :     );
     238              : 
     239           12 :     Ok(())
     240           12 : }
     241              : 
     242              : pub trait Crafter {
     243              :     const NAME: &'static str;
     244              : 
     245              :     /// Generates WAL using the client `client`. Returns a vector of some valid
     246              :     /// "interesting" intermediate LSNs which one may start reading from.
     247              :     /// test_end_of_wal uses this to check various starting points.
     248              :     ///
     249              :     /// Note that postgres is generally keen about writing some WAL. While we
     250              :     /// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
     251              :     /// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
     252              :     /// stable WAL end would be flaky unless postgres is shut down. For this
     253              :     /// reason returning potential end of WAL here is pointless. Most of the
     254              :     /// time this doesn't happen though, so it is reasonable to create needed
     255              :     /// WAL structure and immediately kill postgres like test_end_of_wal does.
     256              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
     257              : }
     258              : 
     259              : /// Wraps some WAL craft function, providing current LSN to it before the
     260              : /// insertion and flushing WAL afterwards. Also pushes initial LSN to the
     261              : /// result.
     262           12 : fn craft_internal<C: postgres::GenericClient>(
     263           12 :     client: &mut C,
     264           12 :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
     265           12 : ) -> anyhow::Result<Vec<PgLsn>> {
     266           12 :     ensure_server_config(client)?;
     267              : 
     268           12 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     269           12 :     info!("LSN initial = {}", initial_lsn);
     270              : 
     271           12 :     let mut intermediate_lsns = f(client, initial_lsn)?;
     272           12 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     273           12 :         intermediate_lsns.insert(0, initial_lsn);
     274           12 :     }
     275              : 
     276              :     // Some records may be not flushed, e.g. non-transactional logical messages. Flush now.
     277              :     //
     278              :     // If the previous WAL record ended exactly at page boundary, pg_current_wal_insert_lsn
     279              :     // returns the position just after the page header on the next page. That's where the next
     280              :     // record will be inserted. But the page header hasn't actually been written to the WAL
     281              :     // yet, and if you try to flush it, you get a "request to flush past end of generated WAL"
     282              :     // error. Because of that, if the insert location is just after a page header, back off to
     283              :     // previous page boundary.
     284           12 :     let mut lsn = u64::from(client.pg_current_wal_insert_lsn()?);
     285           12 :     if lsn % WAL_SEGMENT_SIZE as u64 == XLOG_SIZE_OF_XLOG_LONG_PHD as u64 {
     286            0 :         lsn -= XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     287           12 :     } else if lsn % XLOG_BLCKSZ as u64 == XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 {
     288            0 :         lsn -= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     289           12 :     }
     290           12 :     client.execute("select neon_xlogflush($1)", &[&PgLsn::from(lsn)])?;
     291           12 :     Ok(intermediate_lsns)
     292           12 : }
     293              : 
     294              : pub struct Simple;
     295              : impl Crafter for Simple {
     296              :     const NAME: &'static str = "simple";
     297            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     298            4 :         craft_internal(client, |client, _| {
     299            4 :             client.execute("CREATE table t(x int)", &[])?;
     300            4 :             Ok(Vec::new())
     301            4 :         })
     302            4 :     }
     303              : }
     304              : 
     305              : pub struct LastWalRecordXlogSwitch;
     306              : impl Crafter for LastWalRecordXlogSwitch {
     307              :     const NAME: &'static str = "last_wal_record_xlog_switch";
     308            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     309            0 :         // Do not use craft_internal because here we end up with flush_lsn exactly on
     310            0 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     311            0 :         ensure_server_config(client)?;
     312              : 
     313            0 :         client.execute("CREATE table t(x int)", &[])?;
     314            0 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     315              :         // pg_switch_wal returns end of last record of the switched segment,
     316              :         // i.e. end of SWITCH itself.
     317            0 :         let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     318            0 :         let before_xlog_switch_u64 = u64::from(before_xlog_switch);
     319            0 :         let next_segment = PgLsn::from(
     320            0 :             before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
     321            0 :                 + WAL_SEGMENT_SIZE as u64,
     322            0 :         );
     323            0 :         ensure!(
     324            0 :             xlog_switch_record_end <= next_segment,
     325            0 :             "XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
     326              :             xlog_switch_record_end,
     327              :             next_segment
     328              :         );
     329            0 :         Ok(vec![before_xlog_switch, xlog_switch_record_end])
     330            0 :     }
     331              : }
     332              : 
     333              : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     334              : /// Craft xlog SWITCH record ending at page boundary.
     335              : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     336              :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     337            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     338            0 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     339            0 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     340            0 :         ensure_server_config(client)?;
     341              : 
     342            0 :         client.execute("CREATE table t(x int)", &[])?;
     343              : 
     344              :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.  We
     345              :         // will use carefully-sized logical messages to advance WAL insert location such
     346              :         // that there is just enough space on the page for the XLOG_SWITCH record.
     347              :         loop {
     348              :             // We start with measuring how much WAL it takes for one logical message,
     349              :             // considering all alignments and headers.
     350            0 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     351            0 :             client.execute(
     352            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     353            0 :                 &[],
     354            0 :             )?;
     355            0 :             let after_lsn = client.pg_current_wal_insert_lsn()?;
     356              : 
     357              :             // Did the record cross a page boundary? If it did, start over. Crossing a
     358              :             // page boundary adds to the apparent size of the record because of the page
     359              :             // header, which throws off the calculation.
     360            0 :             if u64::from(before_lsn) / XLOG_BLCKSZ as u64
     361            0 :                 != u64::from(after_lsn) / XLOG_BLCKSZ as u64
     362              :             {
     363            0 :                 continue;
     364            0 :             }
     365            0 :             // base_size is the size of a logical message without the payload
     366            0 :             let base_size = u64::from(after_lsn) - u64::from(before_lsn) - 10;
     367            0 : 
     368            0 :             // Is there enough space on the page for another logical message and an
     369            0 :             // XLOG_SWITCH? If not, start over.
     370            0 :             let page_remain = XLOG_BLCKSZ as u64 - u64::from(after_lsn) % XLOG_BLCKSZ as u64;
     371            0 :             if page_remain < base_size + XLOG_SIZE_OF_XLOG_RECORD as u64 {
     372            0 :                 continue;
     373            0 :             }
     374            0 : 
     375            0 :             // We will write another logical message, such that after the logical message
     376            0 :             // record, there will be space for exactly one XLOG_SWITCH. How large should
     377            0 :             // the logical message's payload be? An XLOG_SWITCH record has no data => its
     378            0 :             // size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     379            0 :             let repeats = page_remain - base_size - XLOG_SIZE_OF_XLOG_RECORD as u64;
     380            0 : 
     381            0 :             client.execute(
     382            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     383            0 :                 &[&(repeats as i32)],
     384            0 :             )?;
     385            0 :             info!(
     386            0 :                 "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     387            0 :                 client.pg_current_wal_insert_lsn()?,
     388              :                 XLOG_SIZE_OF_XLOG_RECORD
     389              :             );
     390              : 
     391              :             // Emit the XLOG_SWITCH
     392            0 :             let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     393            0 :             let xlog_switch_record_end: PgLsn =
     394            0 :                 client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     395            0 : 
     396            0 :             if u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     397            0 :                 != XLOG_SIZE_OF_XLOG_SHORT_PHD
     398              :             {
     399            0 :                 warn!(
     400            0 :                     "XLOG_SWITCH message ended not on page boundary: {}, offset = {}, repeating",
     401            0 :                     xlog_switch_record_end,
     402            0 :                     u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     403              :                 );
     404            0 :                 continue;
     405            0 :             }
     406            0 :             return Ok(vec![before_xlog_switch, xlog_switch_record_end]);
     407              :         }
     408            0 :     }
     409              : }
     410              : 
     411              : /// Write ~16MB logical message; it should cross WAL segment.
     412            8 : fn craft_seg_size_logical_message(
     413            8 :     client: &mut impl postgres::GenericClient,
     414            8 :     transactional: bool,
     415            8 : ) -> anyhow::Result<Vec<PgLsn>> {
     416            8 :     craft_internal(client, |client, initial_lsn| {
     417            8 :         ensure!(
     418            8 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     419            0 :             "Initial LSN is too far in the future"
     420              :         );
     421              : 
     422            8 :         let message_lsn: PgLsn = client
     423            8 :             .query_one(
     424            8 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     425            8 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     426            8 :                 &[&transactional],
     427            8 :             )?
     428            8 :             .get("message_lsn");
     429            8 :         ensure!(
     430            8 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     431            0 :             "Logical message did not cross the segment boundary"
     432              :         );
     433            8 :         ensure!(
     434            8 :             message_lsn < PgLsn::from(0x0400_0000),
     435            0 :             "Logical message crossed two segments"
     436              :         );
     437              : 
     438            8 :         Ok(vec![message_lsn])
     439            8 :     })
     440            8 : }
     441              : 
     442              : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     443              : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     444              :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     445            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     446            4 :         // Transactional message crossing WAL segment will be followed by small
     447            4 :         // commit record.
     448            4 :         craft_seg_size_logical_message(client, true)
     449            4 :     }
     450              : }
     451              : 
     452              : pub struct LastWalRecordCrossingSegment;
     453              : impl Crafter for LastWalRecordCrossingSegment {
     454              :     const NAME: &'static str = "last_wal_record_crossing_segment";
     455            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     456            4 :         craft_seg_size_logical_message(client, false)
     457            4 :     }
     458              : }
        

Generated by: LCOV version 2.1-beta