LCOV - code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 70.1 % 251 176
Test Date: 2025-07-16 12:29:03 Functions: 61.1 % 36 22

            Line data    Source code
       1              : use std::ffi::OsStr;
       2              : use std::path::{Path, PathBuf};
       3              : use std::process::Command;
       4              : use std::time::{Duration, Instant};
       5              : 
       6              : use anyhow::{bail, ensure};
       7              : use camino_tempfile::{Utf8TempDir, tempdir};
       8              : use log::*;
       9              : use postgres::Client;
      10              : use postgres::types::PgLsn;
      11              : use postgres_ffi::{
      12              :     PgMajorVersion, WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD,
      13              :     XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
      14              : };
      15              : 
      16              : macro_rules! xlog_utils_test {
      17              :     ($version:ident) => {
      18              :         #[path = "."]
      19              :         mod $version {
      20              :             #[allow(unused_imports)]
      21              :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      22              :             #[allow(clippy::duplicate_mod)]
      23              :             #[cfg(test)]
      24              :             mod xlog_utils_test;
      25              :         }
      26              :     };
      27              : }
      28              : 
      29              : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      30              : 
      31              : pub struct Conf {
      32              :     pub pg_version: PgMajorVersion,
      33              :     pub pg_distrib_dir: PathBuf,
      34              :     pub datadir: PathBuf,
      35              : }
      36              : 
      37              : pub struct PostgresServer {
      38              :     process: std::process::Child,
      39              :     _unix_socket_dir: Utf8TempDir,
      40              :     client_config: postgres::Config,
      41              : }
      42              : 
      43              : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      44              :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      45              :     "shared_preload_libraries=neon", // can only be loaded at startup
      46              :     // Disable background processes as much as possible
      47              :     "wal_writer_delay=10s",
      48              :     "autovacuum=off",
      49              : ];
      50              : 
      51              : impl Conf {
      52          108 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      53          108 :         let path = self.pg_distrib_dir.clone();
      54              : 
      55          108 :         Ok(path.join(self.pg_version.v_str()))
      56          108 :     }
      57              : 
      58           36 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      59           36 :         Ok(self.pg_distrib_dir()?.join("bin"))
      60           36 :     }
      61              : 
      62           72 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      63           72 :         Ok(self.pg_distrib_dir()?.join("lib"))
      64           72 :     }
      65              : 
      66          204 :     pub fn wal_dir(&self) -> PathBuf {
      67          204 :         self.datadir.join("pg_wal")
      68          204 :     }
      69              : 
      70           36 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      71           36 :         let path = self.pg_bin_dir()?.join(command);
      72           36 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      73           36 :         let mut cmd = Command::new(path);
      74           36 :         cmd.env_clear()
      75           36 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      76           36 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?)
      77           36 :             .env(
      78              :                 "ASAN_OPTIONS",
      79           36 :                 std::env::var("ASAN_OPTIONS").unwrap_or_default(),
      80              :             )
      81           36 :             .env(
      82              :                 "UBSAN_OPTIONS",
      83           36 :                 std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
      84              :             );
      85           36 :         Ok(cmd)
      86           36 :     }
      87              : 
      88           12 :     pub fn initdb(&self) -> anyhow::Result<()> {
      89           12 :         if let Some(parent) = self.datadir.parent() {
      90           12 :             info!("Pre-creating parent directory {:?}", parent);
      91              :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      92              :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      93           12 :             std::fs::create_dir_all(parent)?;
      94            0 :         }
      95           12 :         info!(
      96           12 :             "Running initdb in {:?} with user \"postgres\"",
      97              :             self.datadir
      98              :         );
      99           12 :         let output = self
     100           12 :             .new_pg_command("initdb")?
     101           12 :             .arg("--pgdata")
     102           12 :             .arg(&self.datadir)
     103           12 :             .args(["--username", "postgres", "--no-instructions", "--no-sync"])
     104           12 :             .output()?;
     105           12 :         debug!("initdb output: {:?}", output);
     106           12 :         ensure!(
     107           12 :             output.status.success(),
     108            0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     109            0 :             String::from_utf8_lossy(&output.stdout),
     110            0 :             String::from_utf8_lossy(&output.stderr),
     111              :         );
     112           12 :         Ok(())
     113           12 :     }
     114              : 
     115           12 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     116           12 :         info!("Starting Postgres server in {:?}", self.datadir);
     117           12 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     118           12 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     119           12 :         let server_process = self
     120           12 :             .new_pg_command("postgres")?
     121           12 :             .args(["-c", "listen_addresses="])
     122           12 :             .arg("-k")
     123           12 :             .arg(&unix_socket_dir_path)
     124           12 :             .arg("-D")
     125           12 :             .arg(&self.datadir)
     126           48 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     127           12 :             .spawn()?;
     128           12 :         let server = PostgresServer {
     129           12 :             process: server_process,
     130           12 :             _unix_socket_dir: unix_socket_dir,
     131           12 :             client_config: {
     132           12 :                 let mut c = postgres::Config::new();
     133           12 :                 c.host_path(&unix_socket_dir_path);
     134           12 :                 c.user("postgres");
     135           12 :                 c.connect_timeout(Duration::from_millis(10000));
     136           12 :                 c
     137           12 :             },
     138           12 :         };
     139           12 :         Ok(server)
     140           12 :     }
     141              : 
     142           12 :     pub fn pg_waldump(
     143           12 :         &self,
     144           12 :         first_segment_name: &OsStr,
     145           12 :         last_segment_name: &OsStr,
     146           12 :     ) -> anyhow::Result<std::process::Output> {
     147           12 :         let first_segment_file = self.datadir.join(first_segment_name);
     148           12 :         let last_segment_file = self.datadir.join(last_segment_name);
     149           12 :         info!(
     150           12 :             "Running pg_waldump for {} .. {}",
     151           12 :             first_segment_file.display(),
     152           12 :             last_segment_file.display()
     153              :         );
     154           12 :         let output = self
     155           12 :             .new_pg_command("pg_waldump")?
     156           12 :             .args([&first_segment_file, &last_segment_file])
     157           12 :             .output()?;
     158           12 :         debug!("waldump output: {:?}", output);
     159           12 :         Ok(output)
     160           12 :     }
     161              : }
     162              : 
     163              : impl PostgresServer {
     164           12 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     165           12 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     166           33 :         while Instant::now() < retry_until {
     167           33 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     168           12 :                 return Ok(client);
     169           21 :             }
     170           21 :             std::thread::sleep(Duration::from_millis(100));
     171              :         }
     172            0 :         bail!("Connection timed out");
     173           12 :     }
     174              : 
     175           12 :     pub fn kill(mut self) {
     176           12 :         self.process.kill().unwrap();
     177           12 :         self.process.wait().unwrap();
     178           12 :     }
     179              : }
     180              : 
     181              : impl Drop for PostgresServer {
     182           12 :     fn drop(&mut self) {
     183           12 :         match self.process.try_wait() {
     184           12 :             Ok(Some(_)) => return,
     185              :             Ok(None) => {
     186            0 :                 warn!("Server was not terminated, will be killed");
     187              :             }
     188            0 :             Err(e) => {
     189            0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     190              :             }
     191              :         }
     192            0 :         let _ = self.process.kill();
     193           12 :     }
     194              : }
     195              : 
     196              : pub trait PostgresClientExt: postgres::GenericClient {
     197           24 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     198           24 :         Ok(self
     199           24 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     200           24 :             .get(0))
     201           24 :     }
     202            0 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     203            0 :         Ok(self
     204            0 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     205            0 :             .get(0))
     206            0 :     }
     207              : }
     208              : 
     209              : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     210              : 
     211           12 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     212           12 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     213              : 
     214           12 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     215           12 :     ensure!(wal_keep_size == "50MB");
     216           12 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     217           12 :     ensure!(wal_writer_delay == "10s");
     218           12 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     219           12 :     ensure!(autovacuum == "off");
     220              : 
     221           12 :     let wal_segment_size = client.query_one(
     222           12 :         "select cast(setting as bigint) as setting, unit \
     223           12 :          from pg_settings where name = 'wal_segment_size'",
     224           12 :         &[],
     225            0 :     )?;
     226           12 :     ensure!(
     227           12 :         wal_segment_size.get::<_, String>("unit") == "B",
     228            0 :         "Unexpected wal_segment_size unit"
     229              :     );
     230           12 :     ensure!(
     231           12 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     232            0 :         "Unexpected wal_segment_size in bytes"
     233              :     );
     234              : 
     235           12 :     Ok(())
     236           12 : }
     237              : 
     238              : pub trait Crafter {
     239              :     const NAME: &'static str;
     240              : 
     241              :     /// Generates WAL using the client `client`. Returns a vector of some valid
     242              :     /// "interesting" intermediate LSNs which one may start reading from.
     243              :     /// test_end_of_wal uses this to check various starting points.
     244              :     ///
     245              :     /// Note that postgres is generally keen about writing some WAL. While we
     246              :     /// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
     247              :     /// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
     248              :     /// stable WAL end would be flaky unless postgres is shut down. For this
     249              :     /// reason returning potential end of WAL here is pointless. Most of the
     250              :     /// time this doesn't happen though, so it is reasonable to create needed
     251              :     /// WAL structure and immediately kill postgres like test_end_of_wal does.
     252              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
     253              : }
     254              : 
     255              : /// Wraps some WAL craft function, providing current LSN to it before the
     256              : /// insertion and flushing WAL afterwards. Also pushes initial LSN to the
     257              : /// result.
     258           12 : fn craft_internal<C: postgres::GenericClient>(
     259           12 :     client: &mut C,
     260           12 :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
     261           12 : ) -> anyhow::Result<Vec<PgLsn>> {
     262           12 :     ensure_server_config(client)?;
     263              : 
     264           12 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     265           12 :     info!("LSN initial = {}", initial_lsn);
     266              : 
     267           12 :     let mut intermediate_lsns = f(client, initial_lsn)?;
     268           12 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     269           12 :         intermediate_lsns.insert(0, initial_lsn);
     270           12 :     }
     271              : 
     272              :     // Some records may be not flushed, e.g. non-transactional logical messages. Flush now.
     273              :     //
     274              :     // If the previous WAL record ended exactly at page boundary, pg_current_wal_insert_lsn
     275              :     // returns the position just after the page header on the next page. That's where the next
     276              :     // record will be inserted. But the page header hasn't actually been written to the WAL
     277              :     // yet, and if you try to flush it, you get a "request to flush past end of generated WAL"
     278              :     // error. Because of that, if the insert location is just after a page header, back off to
     279              :     // previous page boundary.
     280           12 :     let mut lsn = u64::from(client.pg_current_wal_insert_lsn()?);
     281           12 :     if lsn % WAL_SEGMENT_SIZE as u64 == XLOG_SIZE_OF_XLOG_LONG_PHD as u64 {
     282            0 :         lsn -= XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     283           12 :     } else if lsn % XLOG_BLCKSZ as u64 == XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 {
     284            0 :         lsn -= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     285           12 :     }
     286           12 :     client.execute("select neon_xlogflush($1)", &[&PgLsn::from(lsn)])?;
     287           12 :     Ok(intermediate_lsns)
     288           12 : }
     289              : 
     290              : pub struct Simple;
     291              : impl Crafter for Simple {
     292              :     const NAME: &'static str = "simple";
     293            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     294            4 :         craft_internal(client, |client, _| {
     295            4 :             client.execute("CREATE table t(x int)", &[])?;
     296            4 :             Ok(Vec::new())
     297            4 :         })
     298            4 :     }
     299              : }
     300              : 
     301              : pub struct LastWalRecordXlogSwitch;
     302              : impl Crafter for LastWalRecordXlogSwitch {
     303              :     const NAME: &'static str = "last_wal_record_xlog_switch";
     304            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     305              :         // Do not use craft_internal because here we end up with flush_lsn exactly on
     306              :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     307            0 :         ensure_server_config(client)?;
     308              : 
     309            0 :         client.execute("CREATE table t(x int)", &[])?;
     310            0 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     311              :         // pg_switch_wal returns end of last record of the switched segment,
     312              :         // i.e. end of SWITCH itself.
     313            0 :         let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     314            0 :         let before_xlog_switch_u64 = u64::from(before_xlog_switch);
     315            0 :         let next_segment = PgLsn::from(
     316            0 :             before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
     317            0 :                 + WAL_SEGMENT_SIZE as u64,
     318              :         );
     319            0 :         ensure!(
     320            0 :             xlog_switch_record_end <= next_segment,
     321            0 :             "XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
     322              :             xlog_switch_record_end,
     323              :             next_segment
     324              :         );
     325            0 :         Ok(vec![before_xlog_switch, xlog_switch_record_end])
     326            0 :     }
     327              : }
     328              : 
     329              : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     330              : /// Craft xlog SWITCH record ending at page boundary.
     331              : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     332              :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     333            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     334              :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     335              :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     336            0 :         ensure_server_config(client)?;
     337              : 
     338            0 :         client.execute("CREATE table t(x int)", &[])?;
     339              : 
     340              :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.  We
     341              :         // will use carefully-sized logical messages to advance WAL insert location such
     342              :         // that there is just enough space on the page for the XLOG_SWITCH record.
     343              :         loop {
     344              :             // We start with measuring how much WAL it takes for one logical message,
     345              :             // considering all alignments and headers.
     346            0 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     347            0 :             client.execute(
     348            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     349            0 :                 &[],
     350            0 :             )?;
     351            0 :             let after_lsn = client.pg_current_wal_insert_lsn()?;
     352              : 
     353              :             // Did the record cross a page boundary? If it did, start over. Crossing a
     354              :             // page boundary adds to the apparent size of the record because of the page
     355              :             // header, which throws off the calculation.
     356            0 :             if u64::from(before_lsn) / XLOG_BLCKSZ as u64
     357            0 :                 != u64::from(after_lsn) / XLOG_BLCKSZ as u64
     358              :             {
     359            0 :                 continue;
     360            0 :             }
     361              :             // base_size is the size of a logical message without the payload
     362            0 :             let base_size = u64::from(after_lsn) - u64::from(before_lsn) - 10;
     363              : 
     364              :             // Is there enough space on the page for another logical message and an
     365              :             // XLOG_SWITCH? If not, start over.
     366            0 :             let page_remain = XLOG_BLCKSZ as u64 - u64::from(after_lsn) % XLOG_BLCKSZ as u64;
     367            0 :             if page_remain < base_size + XLOG_SIZE_OF_XLOG_RECORD as u64 {
     368            0 :                 continue;
     369            0 :             }
     370              : 
     371              :             // We will write another logical message, such that after the logical message
     372              :             // record, there will be space for exactly one XLOG_SWITCH. How large should
     373              :             // the logical message's payload be? An XLOG_SWITCH record has no data => its
     374              :             // size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     375            0 :             let repeats = page_remain - base_size - XLOG_SIZE_OF_XLOG_RECORD as u64;
     376              : 
     377            0 :             client.execute(
     378            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     379            0 :                 &[&(repeats as i32)],
     380            0 :             )?;
     381            0 :             info!(
     382            0 :                 "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     383            0 :                 client.pg_current_wal_insert_lsn()?,
     384              :                 XLOG_SIZE_OF_XLOG_RECORD
     385              :             );
     386              : 
     387              :             // Emit the XLOG_SWITCH
     388            0 :             let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     389            0 :             let xlog_switch_record_end: PgLsn =
     390            0 :                 client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     391              : 
     392            0 :             if u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     393            0 :                 != XLOG_SIZE_OF_XLOG_SHORT_PHD
     394              :             {
     395            0 :                 warn!(
     396            0 :                     "XLOG_SWITCH message ended not on page boundary: {}, offset = {}, repeating",
     397              :                     xlog_switch_record_end,
     398            0 :                     u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     399              :                 );
     400            0 :                 continue;
     401            0 :             }
     402            0 :             return Ok(vec![before_xlog_switch, xlog_switch_record_end]);
     403              :         }
     404            0 :     }
     405              : }
     406              : 
     407              : /// Write ~16MB logical message; it should cross WAL segment.
     408            8 : fn craft_seg_size_logical_message(
     409            8 :     client: &mut impl postgres::GenericClient,
     410            8 :     transactional: bool,
     411            8 : ) -> anyhow::Result<Vec<PgLsn>> {
     412            8 :     craft_internal(client, |client, initial_lsn| {
     413            8 :         ensure!(
     414            8 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     415            0 :             "Initial LSN is too far in the future"
     416              :         );
     417              : 
     418            8 :         let message_lsn: PgLsn = client
     419            8 :             .query_one(
     420            8 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     421            8 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     422            8 :                 &[&transactional],
     423            0 :             )?
     424            8 :             .get("message_lsn");
     425            8 :         ensure!(
     426            8 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     427            0 :             "Logical message did not cross the segment boundary"
     428              :         );
     429            8 :         ensure!(
     430            8 :             message_lsn < PgLsn::from(0x0400_0000),
     431            0 :             "Logical message crossed two segments"
     432              :         );
     433              : 
     434            8 :         Ok(vec![message_lsn])
     435            8 :     })
     436            8 : }
     437              : 
     438              : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     439              : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     440              :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     441            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     442              :         // Transactional message crossing WAL segment will be followed by small
     443              :         // commit record.
     444            4 :         craft_seg_size_logical_message(client, true)
     445            4 :     }
     446              : }
     447              : 
     448              : pub struct LastWalRecordCrossingSegment;
     449              : impl Crafter for LastWalRecordCrossingSegment {
     450              :     const NAME: &'static str = "last_wal_record_crossing_segment";
     451            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     452            4 :         craft_seg_size_logical_message(client, false)
     453            4 :     }
     454              : }
        

Generated by: LCOV version 2.1-beta