LCOV - differential code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 88.0 % 258 227 31 227
Current Date: 2023-10-19 02:04:12 Functions: 83.3 % 30 25 5 25
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use anyhow::{bail, ensure};
       2                 : use camino_tempfile::{tempdir, Utf8TempDir};
       3                 : use log::*;
       4                 : use postgres::types::PgLsn;
       5                 : use postgres::Client;
       6                 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
       7                 : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
       8                 : use std::cmp::Ordering;
       9                 : use std::path::{Path, PathBuf};
      10                 : use std::process::Command;
      11                 : use std::time::{Duration, Instant};
      12                 : 
      13                 : macro_rules! xlog_utils_test {
      14                 :     ($version:ident) => {
      15                 :         #[path = "."]
      16                 :         mod $version {
      17                 :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      18                 :             #[allow(clippy::duplicate_mod)]
      19                 :             #[cfg(test)]
      20                 :             mod xlog_utils_test;
      21                 :         }
      22                 :     };
      23                 : }
      24                 : 
      25                 : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      26                 : 
      27 UBC           0 : #[derive(Debug, Clone, PartialEq, Eq)]
      28                 : pub struct Conf {
      29                 :     pub pg_version: u32,
      30                 :     pub pg_distrib_dir: PathBuf,
      31                 :     pub datadir: PathBuf,
      32                 : }
      33                 : 
      34                 : pub struct PostgresServer {
      35                 :     process: std::process::Child,
      36                 :     _unix_socket_dir: Utf8TempDir,
      37                 :     client_config: postgres::Config,
      38                 : }
      39                 : 
      40                 : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      41                 :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      42                 :     "shared_preload_libraries=neon", // can only be loaded at startup
      43                 :     // Disable background processes as much as possible
      44                 :     "wal_writer_delay=10s",
      45                 :     "autovacuum=off",
      46                 : ];
      47                 : 
      48                 : impl Conf {
      49 CBC          81 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      50              81 :         let path = self.pg_distrib_dir.clone();
      51              81 : 
      52              81 :         #[allow(clippy::manual_range_patterns)]
      53              81 :         match self.pg_version {
      54              81 :             14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
      55 UBC           0 :             _ => bail!("Unsupported postgres version: {}", self.pg_version),
      56                 :         }
      57 CBC          81 :     }
      58                 : 
      59              27 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      60              27 :         Ok(self.pg_distrib_dir()?.join("bin"))
      61              27 :     }
      62                 : 
      63              54 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      64              54 :         Ok(self.pg_distrib_dir()?.join("lib"))
      65              54 :     }
      66                 : 
      67             135 :     pub fn wal_dir(&self) -> PathBuf {
      68             135 :         self.datadir.join("pg_wal")
      69             135 :     }
      70                 : 
      71              27 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      72              27 :         let path = self.pg_bin_dir()?.join(command);
      73              27 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      74              27 :         let mut cmd = Command::new(path);
      75              27 :         cmd.env_clear()
      76              27 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      77              27 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
      78              27 :         Ok(cmd)
      79              27 :     }
      80                 : 
      81                 :     pub fn initdb(&self) -> anyhow::Result<()> {
      82               9 :         if let Some(parent) = self.datadir.parent() {
      83               9 :             info!("Pre-creating parent directory {:?}", parent);
      84                 :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      85                 :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      86               9 :             std::fs::create_dir_all(parent)?;
      87 UBC           0 :         }
      88 CBC           9 :         info!(
      89               9 :             "Running initdb in {:?} with user \"postgres\"",
      90                 :             self.datadir
      91                 :         );
      92               9 :         let output = self
      93               9 :             .new_pg_command("initdb")?
      94               9 :             .arg("-D")
      95               9 :             .arg(&self.datadir)
      96               9 :             .args(["-U", "postgres", "--no-instructions", "--no-sync"])
      97               9 :             .output()?;
      98               9 :         debug!("initdb output: {:?}", output);
      99               9 :         ensure!(
     100               9 :             output.status.success(),
     101 UBC           0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     102               0 :             String::from_utf8_lossy(&output.stdout),
     103               0 :             String::from_utf8_lossy(&output.stderr),
     104                 :         );
     105 CBC           9 :         Ok(())
     106               9 :     }
     107                 : 
     108               9 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     109               9 :         info!("Starting Postgres server in {:?}", self.datadir);
     110               9 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     111               9 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     112               9 :         let server_process = self
     113               9 :             .new_pg_command("postgres")?
     114               9 :             .args(["-c", "listen_addresses="])
     115               9 :             .arg("-k")
     116               9 :             .arg(&unix_socket_dir_path)
     117               9 :             .arg("-D")
     118               9 :             .arg(&self.datadir)
     119              36 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     120               9 :             .spawn()?;
     121               9 :         let server = PostgresServer {
     122               9 :             process: server_process,
     123               9 :             _unix_socket_dir: unix_socket_dir,
     124               9 :             client_config: {
     125               9 :                 let mut c = postgres::Config::new();
     126               9 :                 c.host_path(&unix_socket_dir_path);
     127               9 :                 c.user("postgres");
     128               9 :                 c.connect_timeout(Duration::from_millis(10000));
     129               9 :                 c
     130               9 :             },
     131               9 :         };
     132               9 :         Ok(server)
     133               9 :     }
     134                 : 
     135               9 :     pub fn pg_waldump(
     136               9 :         &self,
     137               9 :         first_segment_name: &str,
     138               9 :         last_segment_name: &str,
     139               9 :     ) -> anyhow::Result<std::process::Output> {
     140               9 :         let first_segment_file = self.datadir.join(first_segment_name);
     141               9 :         let last_segment_file = self.datadir.join(last_segment_name);
     142               9 :         info!(
     143               9 :             "Running pg_waldump for {} .. {}",
     144               9 :             first_segment_file.display(),
     145               9 :             last_segment_file.display()
     146                 :         );
     147               9 :         let output = self
     148               9 :             .new_pg_command("pg_waldump")?
     149               9 :             .args([&first_segment_file, &last_segment_file])
     150               9 :             .output()?;
     151               9 :         debug!("waldump output: {:?}", output);
     152               9 :         Ok(output)
     153               9 :     }
     154                 : }
     155                 : 
     156                 : impl PostgresServer {
     157               9 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     158               9 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     159              18 :         while Instant::now() < retry_until {
     160              18 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     161               9 :                 return Ok(client);
     162               9 :             }
     163               9 :             std::thread::sleep(Duration::from_millis(100));
     164                 :         }
     165 UBC           0 :         bail!("Connection timed out");
     166 CBC           9 :     }
     167                 : 
     168               9 :     pub fn kill(mut self) {
     169               9 :         self.process.kill().unwrap();
     170               9 :         self.process.wait().unwrap();
     171               9 :     }
     172                 : }
     173                 : 
     174                 : impl Drop for PostgresServer {
     175               9 :     fn drop(&mut self) {
     176               9 :         match self.process.try_wait() {
     177               9 :             Ok(Some(_)) => return,
     178                 :             Ok(None) => {
     179 UBC           0 :                 warn!("Server was not terminated, will be killed");
     180                 :             }
     181               0 :             Err(e) => {
     182               0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     183                 :             }
     184                 :         }
     185               0 :         let _ = self.process.kill();
     186 CBC           9 :     }
     187                 : }
     188                 : 
     189                 : pub trait PostgresClientExt: postgres::GenericClient {
     190              35 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     191              35 :         Ok(self
     192              35 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     193              35 :             .get(0))
     194              35 :     }
     195              12 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     196              12 :         Ok(self
     197              12 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     198              12 :             .get(0))
     199              12 :     }
     200                 : }
     201                 : 
     202                 : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     203                 : 
     204                 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     205              14 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     206                 : 
     207              14 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     208              14 :     ensure!(wal_keep_size == "50MB");
     209              14 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     210              14 :     ensure!(wal_writer_delay == "10s");
     211              14 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     212              14 :     ensure!(autovacuum == "off");
     213                 : 
     214              14 :     let wal_segment_size = client.query_one(
     215              14 :         "select cast(setting as bigint) as setting, unit \
     216              14 :          from pg_settings where name = 'wal_segment_size'",
     217              14 :         &[],
     218              14 :     )?;
     219              14 :     ensure!(
     220              14 :         wal_segment_size.get::<_, String>("unit") == "B",
     221 UBC           0 :         "Unexpected wal_segment_size unit"
     222                 :     );
     223 CBC          14 :     ensure!(
     224              14 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     225 UBC           0 :         "Unexpected wal_segment_size in bytes"
     226                 :     );
     227                 : 
     228 CBC          14 :     Ok(())
     229              14 : }
     230                 : 
     231                 : pub trait Crafter {
     232                 :     const NAME: &'static str;
     233                 : 
     234                 :     /// Generates WAL using the client `client`. Returns a pair of:
     235                 :     /// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
     236                 :     ///   May include or exclude Lsn(0) and the end-of-wal.
     237                 :     /// * The expected end-of-wal LSN.
     238                 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
     239                 : }
     240                 : 
     241                 : fn craft_internal<C: postgres::GenericClient>(
     242                 :     client: &mut C,
     243                 :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
     244                 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     245              12 :     ensure_server_config(client)?;
     246                 : 
     247              12 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     248              12 :     info!("LSN initial = {}", initial_lsn);
     249                 : 
     250              12 :     let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
     251              12 :     let last_lsn = match last_lsn {
     252               4 :         None => client.pg_current_wal_insert_lsn()?,
     253               8 :         Some(last_lsn) => {
     254               8 :             let insert_lsn = client.pg_current_wal_insert_lsn()?;
     255               8 :             match last_lsn.cmp(&insert_lsn) {
     256 UBC           0 :                 Ordering::Less => bail!(
     257               0 :                     "Some records were inserted after the crafted WAL: {} vs {}",
     258               0 :                     last_lsn,
     259               0 :                     insert_lsn
     260               0 :                 ),
     261 CBC           8 :                 Ordering::Equal => last_lsn,
     262 UBC           0 :                 Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
     263                 :             }
     264                 :         }
     265                 :     };
     266 CBC          12 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     267              12 :         intermediate_lsns.insert(0, initial_lsn);
     268              12 :     }
     269                 : 
     270                 :     // Some records may be not flushed, e.g. non-transactional logical messages.
     271              12 :     client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
     272              12 :     match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
     273 UBC           0 :         Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
     274 CBC          12 :         Ordering::Equal => {}
     275 UBC           0 :         Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
     276                 :     }
     277 CBC          12 :     Ok((intermediate_lsns, last_lsn))
     278              12 : }
     279                 : 
     280                 : pub struct Simple;
     281                 : impl Crafter for Simple {
     282                 :     const NAME: &'static str = "simple";
     283               4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     284               4 :         craft_internal(client, |client, _| {
     285               4 :             client.execute("CREATE table t(x int)", &[])?;
     286               4 :             Ok((Vec::new(), None))
     287               4 :         })
     288               4 :     }
     289                 : }
     290                 : 
     291                 : pub struct LastWalRecordXlogSwitch;
     292                 : impl Crafter for LastWalRecordXlogSwitch {
     293                 :     const NAME: &'static str = "last_wal_record_xlog_switch";
     294                 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     295                 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     296                 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     297               1 :         ensure_server_config(client)?;
     298                 : 
     299               1 :         client.execute("CREATE table t(x int)", &[])?;
     300               1 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     301               1 :         let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     302               1 :         let next_segment = PgLsn::from(0x0200_0000);
     303               1 :         ensure!(
     304               1 :             after_xlog_switch <= next_segment,
     305 UBC           0 :             "XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
     306                 :             after_xlog_switch,
     307                 :             next_segment
     308                 :         );
     309 CBC           1 :         Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
     310               1 :     }
     311                 : }
     312                 : 
     313                 : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     314                 : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     315                 :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     316                 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     317                 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     318                 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     319               1 :         ensure_server_config(client)?;
     320                 : 
     321               1 :         client.execute("CREATE table t(x int)", &[])?;
     322                 : 
     323                 :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
     324                 :         // We will use logical message as the padding. We start with detecting how much WAL
     325                 :         // it takes for one logical message, considering all alignments and headers.
     326               1 :         let base_wal_advance = {
     327               1 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     328                 :             // Small non-empty message bigger than few bytes is more likely than an empty
     329                 :             // message to have the same format as the big padding message.
     330               1 :             client.execute(
     331               1 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     332               1 :                 &[],
     333               1 :             )?;
     334                 :             // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     335               1 :             (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
     336                 :                 + XLOG_SIZE_OF_XLOG_RECORD
     337                 :         };
     338               1 :         let mut remaining_lsn =
     339               1 :             XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
     340               1 :         if remaining_lsn < base_wal_advance {
     341 UBC           0 :             remaining_lsn += XLOG_BLCKSZ;
     342 CBC           1 :         }
     343               1 :         let repeats = 10 + remaining_lsn - base_wal_advance;
     344               1 :         info!(
     345               1 :             "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
     346               1 :             client.pg_current_wal_insert_lsn()?,
     347                 :             remaining_lsn,
     348                 :             base_wal_advance,
     349                 :             repeats
     350                 :         );
     351               1 :         client.execute(
     352               1 :             "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     353               1 :             &[&(repeats as i32)],
     354               1 :         )?;
     355               1 :         info!(
     356               1 :             "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     357               1 :             client.pg_current_wal_insert_lsn()?,
     358                 :             XLOG_SIZE_OF_XLOG_RECORD
     359                 :         );
     360                 : 
     361                 :         // Emit the XLOG_SWITCH
     362               1 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     363               1 :         let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     364               1 :         let next_segment = PgLsn::from(0x0200_0000);
     365               1 :         ensure!(
     366               1 :             after_xlog_switch < next_segment,
     367 UBC           0 :             "XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
     368                 :             after_xlog_switch,
     369                 :             next_segment
     370                 :         );
     371 CBC           1 :         ensure!(
     372               1 :             u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
     373 UBC           0 :             "XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
     374               0 :             after_xlog_switch,
     375               0 :             u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ
     376                 :         );
     377 CBC           1 :         Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
     378               1 :     }
     379                 : }
     380                 : 
     381               8 : fn craft_single_logical_message(
     382               8 :     client: &mut impl postgres::GenericClient,
     383               8 :     transactional: bool,
     384               8 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     385               8 :     craft_internal(client, |client, initial_lsn| {
     386               8 :         ensure!(
     387               8 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     388 UBC           0 :             "Initial LSN is too far in the future"
     389                 :         );
     390                 : 
     391 CBC           8 :         let message_lsn: PgLsn = client
     392               8 :             .query_one(
     393               8 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     394               8 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     395               8 :                 &[&transactional],
     396               8 :             )?
     397               8 :             .get("message_lsn");
     398               8 :         ensure!(
     399               8 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     400 UBC           0 :             "Logical message did not cross the segment boundary"
     401                 :         );
     402 CBC           8 :         ensure!(
     403               8 :             message_lsn < PgLsn::from(0x0400_0000),
     404 UBC           0 :             "Logical message crossed two segments"
     405                 :         );
     406                 : 
     407 CBC           8 :         if transactional {
     408                 :             // Transactional logical messages are part of a transaction, so the one above is
     409                 :             // followed by a small COMMIT record.
     410                 : 
     411               4 :             let after_message_lsn = client.pg_current_wal_insert_lsn()?;
     412               4 :             ensure!(
     413               4 :                 message_lsn < after_message_lsn,
     414 UBC           0 :                 "No record found after the emitted message"
     415                 :             );
     416 CBC           4 :             Ok((vec![message_lsn], Some(after_message_lsn)))
     417                 :         } else {
     418               4 :             Ok((Vec::new(), Some(message_lsn)))
     419                 :         }
     420               8 :     })
     421               8 : }
     422                 : 
     423                 : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     424                 : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     425                 :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     426               4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     427               4 :         craft_single_logical_message(client, true)
     428               4 :     }
     429                 : }
     430                 : 
     431                 : pub struct LastWalRecordCrossingSegment;
     432                 : impl Crafter for LastWalRecordCrossingSegment {
     433                 :     const NAME: &'static str = "last_wal_record_crossing_segment";
     434               4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
     435               4 :         craft_single_logical_message(client, false)
     436               4 :     }
     437                 : }
        

Generated by: LCOV version 2.1-beta