LCOV - code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 69.2 % 253 175
Test Date: 2024-04-18 15:32:49 Functions: 61.1 % 36 22

            Line data    Source code
       1              : use anyhow::{bail, ensure};
       2              : use camino_tempfile::{tempdir, Utf8TempDir};
       3              : use log::*;
       4              : use postgres::types::PgLsn;
       5              : use postgres::Client;
       6              : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
       7              : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
       8              : use std::path::{Path, PathBuf};
       9              : use std::process::Command;
      10              : use std::time::{Duration, Instant};
      11              : 
      12              : macro_rules! xlog_utils_test {
      13              :     ($version:ident) => {
      14              :         #[path = "."]
      15              :         mod $version {
      16              :             #[allow(unused_imports)]
      17              :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      18              :             #[allow(clippy::duplicate_mod)]
      19              :             #[cfg(test)]
      20              :             mod xlog_utils_test;
      21              :         }
      22              :     };
      23              : }
      24              : 
      25              : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      26              : 
      27              : #[derive(Debug, Clone, PartialEq, Eq)]
      28              : pub struct Conf {
      29              :     pub pg_version: u32,
      30              :     pub pg_distrib_dir: PathBuf,
      31              :     pub datadir: PathBuf,
      32              : }
      33              : 
      34              : pub struct PostgresServer {
      35              :     process: std::process::Child,
      36              :     _unix_socket_dir: Utf8TempDir,
      37              :     client_config: postgres::Config,
      38              : }
      39              : 
      40              : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      41              :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      42              :     "shared_preload_libraries=neon", // can only be loaded at startup
      43              :     // Disable background processes as much as possible
      44              :     "wal_writer_delay=10s",
      45              :     "autovacuum=off",
      46              : ];
      47              : 
      48              : impl Conf {
      49          162 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      50          162 :         let path = self.pg_distrib_dir.clone();
      51          162 : 
      52          162 :         #[allow(clippy::manual_range_patterns)]
      53          162 :         match self.pg_version {
      54          162 :             14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
      55            0 :             _ => bail!("Unsupported postgres version: {}", self.pg_version),
      56              :         }
      57          162 :     }
      58              : 
      59           54 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      60           54 :         Ok(self.pg_distrib_dir()?.join("bin"))
      61           54 :     }
      62              : 
      63          108 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      64          108 :         Ok(self.pg_distrib_dir()?.join("lib"))
      65          108 :     }
      66              : 
      67          306 :     pub fn wal_dir(&self) -> PathBuf {
      68          306 :         self.datadir.join("pg_wal")
      69          306 :     }
      70              : 
      71           54 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      72           54 :         let path = self.pg_bin_dir()?.join(command);
      73           54 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      74           54 :         let mut cmd = Command::new(path);
      75           54 :         cmd.env_clear()
      76           54 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      77           54 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
      78           54 :         Ok(cmd)
      79           54 :     }
      80              : 
      81           18 :     pub fn initdb(&self) -> anyhow::Result<()> {
      82           18 :         if let Some(parent) = self.datadir.parent() {
      83           18 :             info!("Pre-creating parent directory {:?}", parent);
      84              :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      85              :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      86           18 :             std::fs::create_dir_all(parent)?;
      87            0 :         }
      88           18 :         info!(
      89           18 :             "Running initdb in {:?} with user \"postgres\"",
      90              :             self.datadir
      91              :         );
      92           18 :         let output = self
      93           18 :             .new_pg_command("initdb")?
      94           18 :             .arg("-D")
      95           18 :             .arg(&self.datadir)
      96           18 :             .args(["-U", "postgres", "--no-instructions", "--no-sync"])
      97           18 :             .output()?;
      98           18 :         debug!("initdb output: {:?}", output);
      99           18 :         ensure!(
     100           18 :             output.status.success(),
     101            0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     102            0 :             String::from_utf8_lossy(&output.stdout),
     103            0 :             String::from_utf8_lossy(&output.stderr),
     104              :         );
     105           18 :         Ok(())
     106           18 :     }
     107              : 
     108           18 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     109           18 :         info!("Starting Postgres server in {:?}", self.datadir);
     110           18 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     111           18 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     112           18 :         let server_process = self
     113           18 :             .new_pg_command("postgres")?
     114           18 :             .args(["-c", "listen_addresses="])
     115           18 :             .arg("-k")
     116           18 :             .arg(&unix_socket_dir_path)
     117           18 :             .arg("-D")
     118           18 :             .arg(&self.datadir)
     119           72 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     120           18 :             .spawn()?;
     121           18 :         let server = PostgresServer {
     122           18 :             process: server_process,
     123           18 :             _unix_socket_dir: unix_socket_dir,
     124           18 :             client_config: {
     125           18 :                 let mut c = postgres::Config::new();
     126           18 :                 c.host_path(&unix_socket_dir_path);
     127           18 :                 c.user("postgres");
     128           18 :                 c.connect_timeout(Duration::from_millis(10000));
     129           18 :                 c
     130           18 :             },
     131           18 :         };
     132           18 :         Ok(server)
     133           18 :     }
     134              : 
     135           18 :     pub fn pg_waldump(
     136           18 :         &self,
     137           18 :         first_segment_name: &str,
     138           18 :         last_segment_name: &str,
     139           18 :     ) -> anyhow::Result<std::process::Output> {
     140           18 :         let first_segment_file = self.datadir.join(first_segment_name);
     141           18 :         let last_segment_file = self.datadir.join(last_segment_name);
     142           18 :         info!(
     143           18 :             "Running pg_waldump for {} .. {}",
     144           18 :             first_segment_file.display(),
     145           18 :             last_segment_file.display()
     146              :         );
     147           18 :         let output = self
     148           18 :             .new_pg_command("pg_waldump")?
     149           18 :             .args([&first_segment_file, &last_segment_file])
     150           18 :             .output()?;
     151           18 :         debug!("waldump output: {:?}", output);
     152           18 :         Ok(output)
     153           18 :     }
     154              : }
     155              : 
     156              : impl PostgresServer {
     157           18 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     158           18 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     159           36 :         while Instant::now() < retry_until {
     160           36 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     161           18 :                 return Ok(client);
     162           18 :             }
     163           18 :             std::thread::sleep(Duration::from_millis(100));
     164              :         }
     165            0 :         bail!("Connection timed out");
     166           18 :     }
     167              : 
     168           18 :     pub fn kill(mut self) {
     169           18 :         self.process.kill().unwrap();
     170           18 :         self.process.wait().unwrap();
     171           18 :     }
     172              : }
     173              : 
     174              : impl Drop for PostgresServer {
     175           18 :     fn drop(&mut self) {
     176           18 :         match self.process.try_wait() {
     177           18 :             Ok(Some(_)) => return,
     178              :             Ok(None) => {
     179            0 :                 warn!("Server was not terminated, will be killed");
     180              :             }
     181            0 :             Err(e) => {
     182            0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     183              :             }
     184              :         }
     185            0 :         let _ = self.process.kill();
     186           18 :     }
     187              : }
     188              : 
     189              : pub trait PostgresClientExt: postgres::GenericClient {
     190           18 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     191           18 :         Ok(self
     192           18 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     193           18 :             .get(0))
     194           18 :     }
     195            0 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     196            0 :         Ok(self
     197            0 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     198            0 :             .get(0))
     199            0 :     }
     200              : }
     201              : 
     202              : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     203              : 
     204           18 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     205           18 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     206              : 
     207           18 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     208           18 :     ensure!(wal_keep_size == "50MB");
     209           18 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     210           18 :     ensure!(wal_writer_delay == "10s");
     211           18 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     212           18 :     ensure!(autovacuum == "off");
     213              : 
     214           18 :     let wal_segment_size = client.query_one(
     215           18 :         "select cast(setting as bigint) as setting, unit \
     216           18 :          from pg_settings where name = 'wal_segment_size'",
     217           18 :         &[],
     218           18 :     )?;
     219           18 :     ensure!(
     220           18 :         wal_segment_size.get::<_, String>("unit") == "B",
     221            0 :         "Unexpected wal_segment_size unit"
     222              :     );
     223           18 :     ensure!(
     224           18 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     225            0 :         "Unexpected wal_segment_size in bytes"
     226              :     );
     227              : 
     228           18 :     Ok(())
     229           18 : }
     230              : 
     231              : pub trait Crafter {
     232              :     const NAME: &'static str;
     233              : 
     234              :     /// Generates WAL using the client `client`. Returns a vector of some valid
     235              :     /// "interesting" intermediate LSNs which one may start reading from.
     236              :     /// test_end_of_wal uses this to check various starting points.
     237              :     ///
     238              :     /// Note that postgres is generally keen about writing some WAL. While we
     239              :     /// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
     240              :     /// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
     241              :     /// stable WAL end would be flaky unless postgres is shut down. For this
     242              :     /// reason returning potential end of WAL here is pointless. Most of the
     243              :     /// time this doesn't happen though, so it is reasonable to create needed
     244              :     /// WAL structure and immediately kill postgres like test_end_of_wal does.
     245              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
     246              : }
     247              : 
     248              : /// Wraps some WAL craft function, providing current LSN to it before the
     249              : /// insertion and flushing WAL afterwards. Also pushes initial LSN to the
     250              : /// result.
     251           18 : fn craft_internal<C: postgres::GenericClient>(
     252           18 :     client: &mut C,
     253           18 :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
     254           18 : ) -> anyhow::Result<Vec<PgLsn>> {
     255           18 :     ensure_server_config(client)?;
     256              : 
     257           18 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     258           18 :     info!("LSN initial = {}", initial_lsn);
     259              : 
     260           18 :     let mut intermediate_lsns = f(client, initial_lsn)?;
     261           18 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     262           18 :         intermediate_lsns.insert(0, initial_lsn);
     263           18 :     }
     264              : 
     265              :     // Some records may be not flushed, e.g. non-transactional logical messages.
     266              :     //
     267              :     // Note: this is broken if pg_current_wal_insert_lsn is at page boundary
     268              :     // because pg_current_wal_insert_lsn skips page headers.
     269           18 :     client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
     270           18 :     Ok(intermediate_lsns)
     271           18 : }
     272              : 
     273              : pub struct Simple;
     274              : impl Crafter for Simple {
     275              :     const NAME: &'static str = "simple";
     276            6 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     277            6 :         craft_internal(client, |client, _| {
     278            6 :             client.execute("CREATE table t(x int)", &[])?;
     279            6 :             Ok(Vec::new())
     280            6 :         })
     281            6 :     }
     282              : }
     283              : 
     284              : pub struct LastWalRecordXlogSwitch;
     285              : impl Crafter for LastWalRecordXlogSwitch {
     286              :     const NAME: &'static str = "last_wal_record_xlog_switch";
     287            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     288            0 :         // Do not use craft_internal because here we end up with flush_lsn exactly on
     289            0 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     290            0 :         ensure_server_config(client)?;
     291              : 
     292            0 :         client.execute("CREATE table t(x int)", &[])?;
     293            0 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     294              :         // pg_switch_wal returns end of last record of the switched segment,
     295              :         // i.e. end of SWITCH itself.
     296            0 :         let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     297            0 :         let before_xlog_switch_u64 = u64::from(before_xlog_switch);
     298            0 :         let next_segment = PgLsn::from(
     299            0 :             before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
     300            0 :                 + WAL_SEGMENT_SIZE as u64,
     301            0 :         );
     302            0 :         ensure!(
     303            0 :             xlog_switch_record_end <= next_segment,
     304            0 :             "XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
     305              :             xlog_switch_record_end,
     306              :             next_segment
     307              :         );
     308            0 :         Ok(vec![before_xlog_switch, xlog_switch_record_end])
     309            0 :     }
     310              : }
     311              : 
     312              : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     313              : /// Craft xlog SWITCH record ending at page boundary.
     314              : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     315              :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     316            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     317            0 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     318            0 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     319            0 :         ensure_server_config(client)?;
     320              : 
     321            0 :         client.execute("CREATE table t(x int)", &[])?;
     322              : 
     323              :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
     324              :         // We will use logical message as the padding. We start with detecting how much WAL
     325              :         // it takes for one logical message, considering all alignments and headers.
     326            0 :         let base_wal_advance = {
     327            0 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     328              :             // Small non-empty message bigger than few bytes is more likely than an empty
     329              :             // message to have the same format as the big padding message.
     330            0 :             client.execute(
     331            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     332            0 :                 &[],
     333            0 :             )?;
     334              :             // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     335            0 :             (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
     336              :                 + XLOG_SIZE_OF_XLOG_RECORD
     337              :         };
     338            0 :         let mut remaining_lsn =
     339            0 :             XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
     340            0 :         if remaining_lsn < base_wal_advance {
     341            0 :             remaining_lsn += XLOG_BLCKSZ;
     342            0 :         }
     343            0 :         let repeats = 10 + remaining_lsn - base_wal_advance;
     344            0 :         info!(
     345            0 :             "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
     346            0 :             client.pg_current_wal_insert_lsn()?,
     347              :             remaining_lsn,
     348              :             base_wal_advance,
     349              :             repeats
     350              :         );
     351            0 :         client.execute(
     352            0 :             "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     353            0 :             &[&(repeats as i32)],
     354            0 :         )?;
     355            0 :         info!(
     356            0 :             "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     357            0 :             client.pg_current_wal_insert_lsn()?,
     358              :             XLOG_SIZE_OF_XLOG_RECORD
     359              :         );
     360              : 
     361              :         // Emit the XLOG_SWITCH
     362            0 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     363            0 :         let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     364            0 :         let next_segment = PgLsn::from(0x0200_0000);
     365            0 :         ensure!(
     366            0 :             xlog_switch_record_end < next_segment,
     367            0 :             "XLOG_SWITCH record ended on or after the expected segment boundary: {} > {}",
     368              :             xlog_switch_record_end,
     369              :             next_segment
     370              :         );
     371            0 :         ensure!(
     372            0 :             u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
     373            0 :             "XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
     374            0 :             xlog_switch_record_end,
     375            0 :             u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     376              :         );
     377            0 :         Ok(vec![before_xlog_switch, xlog_switch_record_end])
     378            0 :     }
     379              : }
     380              : 
     381              : /// Write ~16MB logical message; it should cross WAL segment.
     382           12 : fn craft_seg_size_logical_message(
     383           12 :     client: &mut impl postgres::GenericClient,
     384           12 :     transactional: bool,
     385           12 : ) -> anyhow::Result<Vec<PgLsn>> {
     386           12 :     craft_internal(client, |client, initial_lsn| {
     387           12 :         ensure!(
     388           12 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     389            0 :             "Initial LSN is too far in the future"
     390              :         );
     391              : 
     392           12 :         let message_lsn: PgLsn = client
     393           12 :             .query_one(
     394           12 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     395           12 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     396           12 :                 &[&transactional],
     397           12 :             )?
     398           12 :             .get("message_lsn");
     399           12 :         ensure!(
     400           12 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     401            0 :             "Logical message did not cross the segment boundary"
     402              :         );
     403           12 :         ensure!(
     404           12 :             message_lsn < PgLsn::from(0x0400_0000),
     405            0 :             "Logical message crossed two segments"
     406              :         );
     407              : 
     408           12 :         Ok(vec![message_lsn])
     409           12 :     })
     410           12 : }
     411              : 
     412              : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     413              : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     414              :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     415            6 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     416            6 :         // Transactional message crossing WAL segment will be followed by small
     417            6 :         // commit record.
     418            6 :         craft_seg_size_logical_message(client, true)
     419            6 :     }
     420              : }
     421              : 
     422              : pub struct LastWalRecordCrossingSegment;
     423              : impl Crafter for LastWalRecordCrossingSegment {
     424              :     const NAME: &'static str = "last_wal_record_crossing_segment";
     425            6 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     426            6 :         craft_seg_size_logical_message(client, false)
     427            6 :     }
     428              : }
        

Generated by: LCOV version 2.1-beta