LCOV - code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 90.0 % 250 225
Test Date: 2023-09-06 10:18:01 Functions: 62.5 % 40 25

            Line data    Source code
       1              : use anyhow::{bail, ensure};
       2              : use log::*;
       3              : use postgres::types::PgLsn;
       4              : use postgres::Client;
       5              : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
       6              : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
       7              : use std::cmp::Ordering;
       8              : use std::path::{Path, PathBuf};
       9              : use std::process::Command;
      10              : use std::time::{Duration, Instant};
      11              : use tempfile::{tempdir, TempDir};
      12              : 
      13              : macro_rules! xlog_utils_test {
      14              :     ($version:ident) => {
      15              :         #[path = "."]
      16              :         mod $version {
      17              :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      18              :             #[allow(clippy::duplicate_mod)]
      19              :             #[cfg(test)]
      20              :             mod xlog_utils_test;
      21              :         }
      22              :     };
      23              : }
      24              : 
      25              : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      26              : 
      27            0 : #[derive(Debug, Clone, PartialEq, Eq)]
      28              : pub struct Conf {
      29              :     pub pg_version: u32,
      30              :     pub pg_distrib_dir: PathBuf,
      31              :     pub datadir: PathBuf,
      32              : }
      33              : 
      34              : pub struct PostgresServer {
      35              :     process: std::process::Child,
      36              :     _unix_socket_dir: TempDir,
      37              :     client_config: postgres::Config,
      38              : }
      39              : 
      40              : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      41              :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      42              :     "shared_preload_libraries=neon", // can only be loaded at startup
      43              :     // Disable background processes as much as possible
      44              :     "wal_writer_delay=10s",
      45              :     "autovacuum=off",
      46              : ];
      47              : 
      48              : impl Conf {
      49           54 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      50           54 :         let path = self.pg_distrib_dir.clone();
      51           54 : 
      52           54 :         match self.pg_version {
      53           27 :             14 => Ok(path.join(format!("v{}", self.pg_version))),
      54           27 :             15 => Ok(path.join(format!("v{}", self.pg_version))),
      55            0 :             _ => bail!("Unsupported postgres version: {}", self.pg_version),
      56              :         }
      57           54 :     }
      58              : 
      59           18 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      60           18 :         Ok(self.pg_distrib_dir()?.join("bin"))
      61           18 :     }
      62              : 
      63           36 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      64           36 :         Ok(self.pg_distrib_dir()?.join("lib"))
      65           36 :     }
      66              : 
      67           90 :     pub fn wal_dir(&self) -> PathBuf {
      68           90 :         self.datadir.join("pg_wal")
      69           90 :     }
      70              : 
      71           18 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      72           18 :         let path = self.pg_bin_dir()?.join(command);
      73           18 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      74           18 :         let mut cmd = Command::new(path);
      75           18 :         cmd.env_clear()
      76           18 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      77           18 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
      78           18 :         Ok(cmd)
      79           18 :     }
      80              : 
      81              :     pub fn initdb(&self) -> anyhow::Result<()> {
      82            6 :         if let Some(parent) = self.datadir.parent() {
      83            6 :             info!("Pre-creating parent directory {:?}", parent);
      84              :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      85              :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      86            6 :             std::fs::create_dir_all(parent)?;
      87            0 :         }
      88            6 :         info!(
      89            6 :             "Running initdb in {:?} with user \"postgres\"",
      90              :             self.datadir
      91              :         );
      92            6 :         let output = self
      93            6 :             .new_pg_command("initdb")?
      94            6 :             .arg("-D")
      95            6 :             .arg(&self.datadir)
      96            6 :             .args(["-U", "postgres", "--no-instructions", "--no-sync"])
      97            6 :             .output()?;
      98            6 :         debug!("initdb output: {:?}", output);
      99            6 :         ensure!(
     100            6 :             output.status.success(),
     101            0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     102            0 :             String::from_utf8_lossy(&output.stdout),
     103            0 :             String::from_utf8_lossy(&output.stderr),
     104              :         );
     105            6 :         Ok(())
     106            6 :     }
     107              : 
     108            6 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     109            6 :         info!("Starting Postgres server in {:?}", self.datadir);
     110            6 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     111            6 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     112            6 :         let server_process = self
     113            6 :             .new_pg_command("postgres")?
     114            6 :             .args(["-c", "listen_addresses="])
     115            6 :             .arg("-k")
     116            6 :             .arg(&unix_socket_dir_path)
     117            6 :             .arg("-D")
     118            6 :             .arg(&self.datadir)
     119           24 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     120            6 :             .spawn()?;
     121            6 :         let server = PostgresServer {
     122            6 :             process: server_process,
     123            6 :             _unix_socket_dir: unix_socket_dir,
     124            6 :             client_config: {
     125            6 :                 let mut c = postgres::Config::new();
     126            6 :                 c.host_path(&unix_socket_dir_path);
     127            6 :                 c.user("postgres");
     128            6 :                 c.connect_timeout(Duration::from_millis(10000));
     129            6 :                 c
     130            6 :             },
     131            6 :         };
     132            6 :         Ok(server)
     133            6 :     }
     134              : 
     135            6 :     pub fn pg_waldump(
     136            6 :         &self,
     137            6 :         first_segment_name: &str,
     138            6 :         last_segment_name: &str,
     139            6 :     ) -> anyhow::Result<std::process::Output> {
     140            6 :         let first_segment_file = self.datadir.join(first_segment_name);
     141            6 :         let last_segment_file = self.datadir.join(last_segment_name);
     142            6 :         info!(
     143            6 :             "Running pg_waldump for {} .. {}",
     144            6 :             first_segment_file.display(),
     145            6 :             last_segment_file.display()
     146              :         );
     147            6 :         let output = self
     148            6 :             .new_pg_command("pg_waldump")?
     149            6 :             .args([&first_segment_file, &last_segment_file])
     150            6 :             .output()?;
     151            6 :         debug!("waldump output: {:?}", output);
     152            6 :         Ok(output)
     153            6 :     }
     154              : }
     155              : 
     156              : impl PostgresServer {
     157            6 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     158            6 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     159           12 :         while Instant::now() < retry_until {
     160           12 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     161            6 :                 return Ok(client);
     162            6 :             }
     163            6 :             std::thread::sleep(Duration::from_millis(100));
     164              :         }
     165            0 :         bail!("Connection timed out");
     166            6 :     }
     167              : 
     168            6 :     pub fn kill(mut self) {
     169            6 :         self.process.kill().unwrap();
     170            6 :         self.process.wait().unwrap();
     171            6 :     }
     172              : }
     173              : 
     174              : impl Drop for PostgresServer {
     175            6 :     fn drop(&mut self) {
     176            6 :         match self.process.try_wait() {
     177            6 :             Ok(Some(_)) => return,
     178              :             Ok(None) => {
     179            0 :                 warn!("Server was not terminated, will be killed");
     180              :             }
     181            0 :             Err(e) => {
     182            0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     183              :             }
     184              :         }
     185            0 :         let _ = self.process.kill();
     186            6 :     }
     187              : }
     188              : 
     189              : pub trait PostgresClientExt: postgres::GenericClient {
     190           28 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     191           28 :         Ok(self
     192           28 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     193           28 :             .get(0))
     194           28 :     }
     195            9 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     196            9 :         Ok(self
     197            9 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     198            9 :             .get(0))
     199            9 :     }
     200              : }
     201              : 
     202              : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     203              : 
     204              : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     205           11 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     206              : 
     207           11 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     208           11 :     ensure!(wal_keep_size == "50MB");
     209           11 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     210           11 :     ensure!(wal_writer_delay == "10s");
     211           11 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     212           11 :     ensure!(autovacuum == "off");
     213              : 
     214           11 :     let wal_segment_size = client.query_one(
     215           11 :         "select cast(setting as bigint) as setting, unit \
     216           11 :          from pg_settings where name = 'wal_segment_size'",
     217           11 :         &[],
     218           11 :     )?;
     219           11 :     ensure!(
     220           11 :         wal_segment_size.get::<_, String>("unit") == "B",
     221            0 :         "Unexpected wal_segment_size unit"
     222              :     );
     223           11 :     ensure!(
     224           11 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     225            0 :         "Unexpected wal_segment_size in bytes"
     226              :     );
     227              : 
     228           11 :     Ok(())
     229           11 : }
     230              : 
     231              : pub trait Crafter {
     232              :     const NAME: &'static str;
     233              : 
     234              :     /// Generates WAL using the client `client`. Returns a pair of:
     235              :     /// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
     236              :     ///   May include or exclude Lsn(0) and the end-of-wal.
     237              :     /// * The expected end-of-wal LSN.
     238              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
     239              : }
     240              : 
     241              : fn craft_internal<C: postgres::GenericClient>(
     242              :     client: &mut C,
     243              :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
     244              : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     245            9 :     ensure_server_config(client)?;
     246              : 
     247            9 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     248            9 :     info!("LSN initial = {}", initial_lsn);
     249              : 
     250            9 :     let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
     251            9 :     let last_lsn = match last_lsn {
     252            3 :         None => client.pg_current_wal_insert_lsn()?,
     253            6 :         Some(last_lsn) => match last_lsn.cmp(&client.pg_current_wal_insert_lsn()?) {
     254            0 :             Ordering::Less => bail!("Some records were inserted after the crafted WAL"),
     255            6 :             Ordering::Equal => last_lsn,
     256            0 :             Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
     257              :         },
     258              :     };
     259            9 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     260            9 :         intermediate_lsns.insert(0, initial_lsn);
     261            9 :     }
     262              : 
     263              :     // Some records may be not flushed, e.g. non-transactional logical messages.
     264            9 :     client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
     265            9 :     match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
     266            0 :         Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
     267            9 :         Ordering::Equal => {}
     268            0 :         Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
     269              :     }
     270            9 :     Ok((intermediate_lsns, last_lsn))
     271            9 : }
     272              : 
     273              : pub struct Simple;
     274              : impl Crafter for Simple {
     275              :     const NAME: &'static str = "simple";
     276            3 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     277            3 :         craft_internal(client, |client, _| {
     278            3 :             client.execute("CREATE table t(x int)", &[])?;
     279            3 :             Ok((Vec::new(), None))
     280            3 :         })
     281            3 :     }
     282              : }
     283              : 
     284              : pub struct LastWalRecordXlogSwitch;
     285              : impl Crafter for LastWalRecordXlogSwitch {
     286              :     const NAME: &'static str = "last_wal_record_xlog_switch";
     287              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     288              :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     289              :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     290            1 :         ensure_server_config(client)?;
     291              : 
     292            1 :         client.execute("CREATE table t(x int)", &[])?;
     293            1 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     294            1 :         let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     295            1 :         let next_segment = PgLsn::from(0x0200_0000);
     296            1 :         ensure!(
     297            1 :             after_xlog_switch <= next_segment,
     298            0 :             "XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
     299              :             after_xlog_switch,
     300              :             next_segment
     301              :         );
     302            1 :         Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
     303            1 :     }
     304              : }
     305              : 
     306              : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     307              : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     308              :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     309              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     310              :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     311              :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     312            1 :         ensure_server_config(client)?;
     313              : 
     314            1 :         client.execute("CREATE table t(x int)", &[])?;
     315              : 
     316              :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
     317              :         // We will use logical message as the padding. We start with detecting how much WAL
     318              :         // it takes for one logical message, considering all alignments and headers.
     319            1 :         let base_wal_advance = {
     320            1 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     321              :             // Small non-empty message bigger than few bytes is more likely than an empty
     322              :             // message to have the same format as the big padding message.
     323            1 :             client.execute(
     324            1 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     325            1 :                 &[],
     326            1 :             )?;
     327              :             // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     328            1 :             (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
     329              :                 + XLOG_SIZE_OF_XLOG_RECORD
     330              :         };
     331            1 :         let mut remaining_lsn =
     332            1 :             XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
     333            1 :         if remaining_lsn < base_wal_advance {
     334            0 :             remaining_lsn += XLOG_BLCKSZ;
     335            1 :         }
     336            1 :         let repeats = 10 + remaining_lsn - base_wal_advance;
     337            1 :         info!(
     338            1 :             "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
     339            1 :             client.pg_current_wal_insert_lsn()?,
     340              :             remaining_lsn,
     341              :             base_wal_advance,
     342              :             repeats
     343              :         );
     344            1 :         client.execute(
     345            1 :             "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     346            1 :             &[&(repeats as i32)],
     347            1 :         )?;
     348            1 :         info!(
     349            1 :             "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     350            1 :             client.pg_current_wal_insert_lsn()?,
     351              :             XLOG_SIZE_OF_XLOG_RECORD
     352              :         );
     353              : 
     354              :         // Emit the XLOG_SWITCH
     355            1 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     356            1 :         let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     357            1 :         let next_segment = PgLsn::from(0x0200_0000);
     358            1 :         ensure!(
     359            1 :             after_xlog_switch < next_segment,
     360            0 :             "XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
     361              :             after_xlog_switch,
     362              :             next_segment
     363              :         );
     364            1 :         ensure!(
     365            1 :             u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
     366            0 :             "XLOG_SWITCH message ended not on page boundary: {}",
     367              :             after_xlog_switch
     368              :         );
     369            1 :         Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
     370            1 :     }
     371              : }
     372              : 
     373            6 : fn craft_single_logical_message(
     374            6 :     client: &mut impl postgres::GenericClient,
     375            6 :     transactional: bool,
     376            6 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     377            6 :     craft_internal(client, |client, initial_lsn| {
     378            6 :         ensure!(
     379            6 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     380            0 :             "Initial LSN is too far in the future"
     381              :         );
     382              : 
     383            6 :         let message_lsn: PgLsn = client
     384            6 :             .query_one(
     385            6 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     386            6 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     387            6 :                 &[&transactional],
     388            6 :             )?
     389            6 :             .get("message_lsn");
     390            6 :         ensure!(
     391            6 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     392            0 :             "Logical message did not cross the segment boundary"
     393              :         );
     394            6 :         ensure!(
     395            6 :             message_lsn < PgLsn::from(0x0400_0000),
     396            0 :             "Logical message crossed two segments"
     397              :         );
     398              : 
     399            6 :         if transactional {
     400              :             // Transactional logical messages are part of a transaction, so the one above is
     401              :             // followed by a small COMMIT record.
     402              : 
     403            3 :             let after_message_lsn = client.pg_current_wal_insert_lsn()?;
     404            3 :             ensure!(
     405            3 :                 message_lsn < after_message_lsn,
     406            0 :                 "No record found after the emitted message"
     407              :             );
     408            3 :             Ok((vec![message_lsn], Some(after_message_lsn)))
     409              :         } else {
     410            3 :             Ok((Vec::new(), Some(message_lsn)))
     411              :         }
     412            6 :     })
     413            6 : }
     414              : 
     415              : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     416              : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     417              :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     418            3 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     419            3 :         craft_single_logical_message(client, true)
     420            3 :     }
     421              : }
     422              : 
     423              : pub struct LastWalRecordCrossingSegment;
     424              : impl Crafter for LastWalRecordCrossingSegment {
     425              :     const NAME: &'static str = "last_wal_record_crossing_segment";
     426            3 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     427            3 :         craft_single_logical_message(client, false)
     428            3 :     }
     429              : }
        

Generated by: LCOV version 2.1-beta