LCOV - differential code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 88.3 % 264 233 31 233
Current Date: 2024-01-09 02:06:09 Functions: 62.5 % 40 25 15 25
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use anyhow::{bail, ensure};
       2                 : use camino_tempfile::{tempdir, Utf8TempDir};
       3                 : use log::*;
       4                 : use postgres::types::PgLsn;
       5                 : use postgres::Client;
       6                 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
       7                 : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
       8                 : use std::cmp::Ordering;
       9                 : use std::path::{Path, PathBuf};
      10                 : use std::process::Command;
      11                 : use std::time::{Duration, Instant};
      12                 : 
      13                 : macro_rules! xlog_utils_test {
      14                 :     ($version:ident) => {
      15                 :         #[path = "."]
      16                 :         mod $version {
      17                 :             #[allow(unused_imports)]
      18                 :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      19                 :             #[allow(clippy::duplicate_mod)]
      20                 :             #[cfg(test)]
      21                 :             mod xlog_utils_test;
      22                 :         }
      23                 :     };
      24                 : }
      25                 : 
      26                 : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      27                 : 
      28 UBC           0 : #[derive(Debug, Clone, PartialEq, Eq)]
      29                 : pub struct Conf {
      30                 :     pub pg_version: u32,
      31                 :     pub pg_distrib_dir: PathBuf,
      32                 :     pub datadir: PathBuf,
      33                 : }
      34                 : 
      35                 : pub struct PostgresServer {
      36                 :     process: std::process::Child,
      37                 :     _unix_socket_dir: Utf8TempDir,
      38                 :     client_config: postgres::Config,
      39                 : }
      40                 : 
      41                 : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      42                 :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      43                 :     "shared_preload_libraries=neon", // can only be loaded at startup
      44                 :     // Disable background processes as much as possible
      45                 :     "wal_writer_delay=10s",
      46                 :     "autovacuum=off",
      47                 : ];
      48                 : 
      49                 : impl Conf {
      50 CBC          81 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      51              81 :         let path = self.pg_distrib_dir.clone();
      52              81 : 
      53              81 :         #[allow(clippy::manual_range_patterns)]
      54              81 :         match self.pg_version {
      55              81 :             14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
      56 UBC           0 :             _ => bail!("Unsupported postgres version: {}", self.pg_version),
      57                 :         }
      58 CBC          81 :     }
      59                 : 
      60              27 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      61              27 :         Ok(self.pg_distrib_dir()?.join("bin"))
      62              27 :     }
      63                 : 
      64              54 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      65              54 :         Ok(self.pg_distrib_dir()?.join("lib"))
      66              54 :     }
      67                 : 
      68             135 :     pub fn wal_dir(&self) -> PathBuf {
      69             135 :         self.datadir.join("pg_wal")
      70             135 :     }
      71                 : 
      72              27 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      73              27 :         let path = self.pg_bin_dir()?.join(command);
      74              27 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      75              27 :         let mut cmd = Command::new(path);
      76              27 :         cmd.env_clear()
      77              27 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      78              27 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
      79              27 :         Ok(cmd)
      80              27 :     }
      81                 : 
      82               9 :     pub fn initdb(&self) -> anyhow::Result<()> {
      83               9 :         if let Some(parent) = self.datadir.parent() {
      84               9 :             info!("Pre-creating parent directory {:?}", parent);
      85                 :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      86                 :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      87               9 :             std::fs::create_dir_all(parent)?;
      88 UBC           0 :         }
      89 CBC           9 :         info!(
      90               9 :             "Running initdb in {:?} with user \"postgres\"",
      91                 :             self.datadir
      92                 :         );
      93               9 :         let output = self
      94               9 :             .new_pg_command("initdb")?
      95               9 :             .arg("-D")
      96               9 :             .arg(&self.datadir)
      97               9 :             .args(["-U", "postgres", "--no-instructions", "--no-sync"])
      98               9 :             .output()?;
      99               9 :         debug!("initdb output: {:?}", output);
     100                 :         ensure!(
     101               9 :             output.status.success(),
     102 UBC           0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     103               0 :             String::from_utf8_lossy(&output.stdout),
     104               0 :             String::from_utf8_lossy(&output.stderr),
     105                 :         );
     106 CBC           9 :         Ok(())
     107               9 :     }
     108                 : 
     109               9 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     110               9 :         info!("Starting Postgres server in {:?}", self.datadir);
     111               9 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     112               9 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     113               9 :         let server_process = self
     114               9 :             .new_pg_command("postgres")?
     115               9 :             .args(["-c", "listen_addresses="])
     116               9 :             .arg("-k")
     117               9 :             .arg(&unix_socket_dir_path)
     118               9 :             .arg("-D")
     119               9 :             .arg(&self.datadir)
     120              36 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     121               9 :             .spawn()?;
     122               9 :         let server = PostgresServer {
     123               9 :             process: server_process,
     124               9 :             _unix_socket_dir: unix_socket_dir,
     125               9 :             client_config: {
     126               9 :                 let mut c = postgres::Config::new();
     127               9 :                 c.host_path(&unix_socket_dir_path);
     128               9 :                 c.user("postgres");
     129               9 :                 c.connect_timeout(Duration::from_millis(10000));
     130               9 :                 c
     131               9 :             },
     132               9 :         };
     133               9 :         Ok(server)
     134               9 :     }
     135                 : 
     136               9 :     pub fn pg_waldump(
     137               9 :         &self,
     138               9 :         first_segment_name: &str,
     139               9 :         last_segment_name: &str,
     140               9 :     ) -> anyhow::Result<std::process::Output> {
     141               9 :         let first_segment_file = self.datadir.join(first_segment_name);
     142               9 :         let last_segment_file = self.datadir.join(last_segment_name);
     143               9 :         info!(
     144               9 :             "Running pg_waldump for {} .. {}",
     145               9 :             first_segment_file.display(),
     146               9 :             last_segment_file.display()
     147                 :         );
     148               9 :         let output = self
     149               9 :             .new_pg_command("pg_waldump")?
     150               9 :             .args([&first_segment_file, &last_segment_file])
     151               9 :             .output()?;
     152               9 :         debug!("waldump output: {:?}", output);
     153               9 :         Ok(output)
     154               9 :     }
     155                 : }
     156                 : 
     157                 : impl PostgresServer {
     158               9 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     159               9 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     160              18 :         while Instant::now() < retry_until {
     161              18 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     162               9 :                 return Ok(client);
     163               9 :             }
     164               9 :             std::thread::sleep(Duration::from_millis(100));
     165                 :         }
     166 UBC           0 :         bail!("Connection timed out");
     167 CBC           9 :     }
     168                 : 
     169               9 :     pub fn kill(mut self) {
     170               9 :         self.process.kill().unwrap();
     171               9 :         self.process.wait().unwrap();
     172               9 :     }
     173                 : }
     174                 : 
     175                 : impl Drop for PostgresServer {
     176               9 :     fn drop(&mut self) {
     177               9 :         match self.process.try_wait() {
     178               9 :             Ok(Some(_)) => return,
     179                 :             Ok(None) => {
     180 UBC           0 :                 warn!("Server was not terminated, will be killed");
     181                 :             }
     182               0 :             Err(e) => {
     183               0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     184                 :             }
     185                 :         }
     186               0 :         let _ = self.process.kill();
     187 CBC           9 :     }
     188                 : }
     189                 : 
     190                 : pub trait PostgresClientExt: postgres::GenericClient {
     191              35 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     192              35 :         Ok(self
     193              35 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     194              35 :             .get(0))
     195              35 :     }
     196              12 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     197              12 :         Ok(self
     198              12 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     199              12 :             .get(0))
     200              12 :     }
     201                 : }
     202                 : 
     203                 : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     204                 : 
     205              14 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     206              14 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     207                 : 
     208              14 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     209              14 :     ensure!(wal_keep_size == "50MB");
     210              14 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     211              14 :     ensure!(wal_writer_delay == "10s");
     212              14 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     213              14 :     ensure!(autovacuum == "off");
     214                 : 
     215              14 :     let wal_segment_size = client.query_one(
     216              14 :         "select cast(setting as bigint) as setting, unit \
     217              14 :          from pg_settings where name = 'wal_segment_size'",
     218              14 :         &[],
     219              14 :     )?;
     220                 :     ensure!(
     221              14 :         wal_segment_size.get::<_, String>("unit") == "B",
     222 UBC           0 :         "Unexpected wal_segment_size unit"
     223                 :     );
     224                 :     ensure!(
     225 CBC          14 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     226 UBC           0 :         "Unexpected wal_segment_size in bytes"
     227                 :     );
     228                 : 
     229 CBC          14 :     Ok(())
     230              14 : }
     231                 : 
     232                 : pub trait Crafter {
     233                 :     const NAME: &'static str;
     234                 : 
     235                 :     /// Generates WAL using the client `client`. Returns a pair of:
     236                 :     /// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
     237                 :     ///   May include or exclude Lsn(0) and the end-of-wal.
     238                 :     /// * The expected end-of-wal LSN.
     239                 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
     240                 : }
     241                 : 
     242              12 : fn craft_internal<C: postgres::GenericClient>(
     243              12 :     client: &mut C,
     244              12 :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
     245              12 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     246              12 :     ensure_server_config(client)?;
     247                 : 
     248              12 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     249              12 :     info!("LSN initial = {}", initial_lsn);
     250                 : 
     251              12 :     let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
     252              12 :     let last_lsn = match last_lsn {
     253               4 :         None => client.pg_current_wal_insert_lsn()?,
     254               8 :         Some(last_lsn) => {
     255               8 :             let insert_lsn = client.pg_current_wal_insert_lsn()?;
     256               8 :             match last_lsn.cmp(&insert_lsn) {
     257 UBC           0 :                 Ordering::Less => bail!(
     258               0 :                     "Some records were inserted after the crafted WAL: {} vs {}",
     259               0 :                     last_lsn,
     260               0 :                     insert_lsn
     261               0 :                 ),
     262 CBC           8 :                 Ordering::Equal => last_lsn,
     263 UBC           0 :                 Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
     264                 :             }
     265                 :         }
     266                 :     };
     267 CBC          12 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     268              12 :         intermediate_lsns.insert(0, initial_lsn);
     269              12 :     }
     270                 : 
     271                 :     // Some records may be not flushed, e.g. non-transactional logical messages.
     272              12 :     client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
     273              12 :     match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
     274 UBC           0 :         Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
     275 CBC          12 :         Ordering::Equal => {}
     276 UBC           0 :         Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
     277                 :     }
     278 CBC          12 :     Ok((intermediate_lsns, last_lsn))
     279              12 : }
     280                 : 
     281                 : pub struct Simple;
     282                 : impl Crafter for Simple {
     283                 :     const NAME: &'static str = "simple";
     284               4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     285               4 :         craft_internal(client, |client, _| {
     286               4 :             client.execute("CREATE table t(x int)", &[])?;
     287               4 :             Ok((Vec::new(), None))
     288               4 :         })
     289               4 :     }
     290                 : }
     291                 : 
     292                 : pub struct LastWalRecordXlogSwitch;
     293                 : impl Crafter for LastWalRecordXlogSwitch {
     294                 :     const NAME: &'static str = "last_wal_record_xlog_switch";
     295               1 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     296               1 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     297               1 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     298               1 :         ensure_server_config(client)?;
     299                 : 
     300               1 :         client.execute("CREATE table t(x int)", &[])?;
     301               1 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     302               1 :         let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     303               1 :         let next_segment = PgLsn::from(0x0200_0000);
     304               1 :         ensure!(
     305               1 :             after_xlog_switch <= next_segment,
     306 UBC           0 :             "XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
     307                 :             after_xlog_switch,
     308                 :             next_segment
     309                 :         );
     310 CBC           1 :         Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
     311               1 :     }
     312                 : }
     313                 : 
     314                 : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     315                 : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     316                 :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     317               1 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     318               1 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     319               1 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     320               1 :         ensure_server_config(client)?;
     321                 : 
     322               1 :         client.execute("CREATE table t(x int)", &[])?;
     323                 : 
     324                 :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
     325                 :         // We will use logical message as the padding. We start with detecting how much WAL
     326                 :         // it takes for one logical message, considering all alignments and headers.
     327               1 :         let base_wal_advance = {
     328               1 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     329                 :             // Small non-empty message bigger than few bytes is more likely than an empty
     330                 :             // message to have the same format as the big padding message.
     331               1 :             client.execute(
     332               1 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     333               1 :                 &[],
     334               1 :             )?;
     335                 :             // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     336               1 :             (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
     337                 :                 + XLOG_SIZE_OF_XLOG_RECORD
     338                 :         };
     339               1 :         let mut remaining_lsn =
     340               1 :             XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
     341               1 :         if remaining_lsn < base_wal_advance {
     342 UBC           0 :             remaining_lsn += XLOG_BLCKSZ;
     343 CBC           1 :         }
     344               1 :         let repeats = 10 + remaining_lsn - base_wal_advance;
     345               1 :         info!(
     346               1 :             "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
     347               1 :             client.pg_current_wal_insert_lsn()?,
     348                 :             remaining_lsn,
     349                 :             base_wal_advance,
     350                 :             repeats
     351                 :         );
     352               1 :         client.execute(
     353               1 :             "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     354               1 :             &[&(repeats as i32)],
     355               1 :         )?;
     356               1 :         info!(
     357               1 :             "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     358               1 :             client.pg_current_wal_insert_lsn()?,
     359                 :             XLOG_SIZE_OF_XLOG_RECORD
     360                 :         );
     361                 : 
     362                 :         // Emit the XLOG_SWITCH
     363               1 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     364               1 :         let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     365               1 :         let next_segment = PgLsn::from(0x0200_0000);
     366               1 :         ensure!(
     367               1 :             after_xlog_switch < next_segment,
     368 UBC           0 :             "XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
     369                 :             after_xlog_switch,
     370                 :             next_segment
     371                 :         );
     372                 :         ensure!(
     373 CBC           1 :             u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
     374 UBC           0 :             "XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
     375               0 :             after_xlog_switch,
     376               0 :             u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ
     377                 :         );
     378 CBC           1 :         Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
     379               1 :     }
     380                 : }
     381                 : 
     382               8 : fn craft_single_logical_message(
     383               8 :     client: &mut impl postgres::GenericClient,
     384               8 :     transactional: bool,
     385               8 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     386               8 :     craft_internal(client, |client, initial_lsn| {
     387               8 :         ensure!(
     388               8 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     389 UBC           0 :             "Initial LSN is too far in the future"
     390                 :         );
     391                 : 
     392 CBC           8 :         let message_lsn: PgLsn = client
     393               8 :             .query_one(
     394               8 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     395               8 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     396               8 :                 &[&transactional],
     397               8 :             )?
     398               8 :             .get("message_lsn");
     399               8 :         ensure!(
     400               8 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     401 UBC           0 :             "Logical message did not cross the segment boundary"
     402                 :         );
     403                 :         ensure!(
     404 CBC           8 :             message_lsn < PgLsn::from(0x0400_0000),
     405 UBC           0 :             "Logical message crossed two segments"
     406                 :         );
     407                 : 
     408 CBC           8 :         if transactional {
     409                 :             // Transactional logical messages are part of a transaction, so the one above is
     410                 :             // followed by a small COMMIT record.
     411                 : 
     412               4 :             let after_message_lsn = client.pg_current_wal_insert_lsn()?;
     413                 :             ensure!(
     414               4 :                 message_lsn < after_message_lsn,
     415 UBC           0 :                 "No record found after the emitted message"
     416                 :             );
     417 CBC           4 :             Ok((vec![message_lsn], Some(after_message_lsn)))
     418                 :         } else {
     419               4 :             Ok((Vec::new(), Some(message_lsn)))
     420                 :         }
     421               8 :     })
     422               8 : }
     423                 : 
     424                 : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     425                 : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     426                 :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     427               4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     428               4 :         craft_single_logical_message(client, true)
     429               4 :     }
     430                 : }
     431                 : 
     432                 : pub struct LastWalRecordCrossingSegment;
     433                 : impl Crafter for LastWalRecordCrossingSegment {
     434                 :     const NAME: &'static str = "last_wal_record_crossing_segment";
     435               4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     436               4 :         craft_single_logical_message(client, false)
     437               4 :     }
     438                 : }
        

Generated by: LCOV version 2.1-beta