LCOV - code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit
Test: 8ff8efadb0253cf618c612650348666c0c564111.info Lines: 66.3 % 270 179
Test Date: 2024-11-20 17:53:50 Functions: 61.1 % 36 22

            Line data    Source code
       1              : use anyhow::{bail, ensure};
       2              : use camino_tempfile::{tempdir, Utf8TempDir};
       3              : use log::*;
       4              : use postgres::types::PgLsn;
       5              : use postgres::Client;
       6              : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
       7              : use postgres_ffi::{
       8              :     XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
       9              : };
      10              : use std::ffi::OsStr;
      11              : use std::path::{Path, PathBuf};
      12              : use std::process::Command;
      13              : use std::time::{Duration, Instant};
      14              : 
      15              : macro_rules! xlog_utils_test {
      16              :     ($version:ident) => {
      17              :         #[path = "."]
      18              :         mod $version {
      19              :             #[allow(unused_imports)]
      20              :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      21              :             #[allow(clippy::duplicate_mod)]
      22              :             #[cfg(test)]
      23              :             mod xlog_utils_test;
      24              :         }
      25              :     };
      26              : }
      27              : 
      28              : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      29              : 
      30              : pub struct Conf {
      31              :     pub pg_version: u32,
      32              :     pub pg_distrib_dir: PathBuf,
      33              :     pub datadir: PathBuf,
      34              : }
      35              : 
      36              : pub struct PostgresServer {
      37              :     process: std::process::Child,
      38              :     _unix_socket_dir: Utf8TempDir,
      39              :     client_config: postgres::Config,
      40              : }
      41              : 
      42              : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      43              :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      44              :     "shared_preload_libraries=neon", // can only be loaded at startup
      45              :     // Disable background processes as much as possible
      46              :     "wal_writer_delay=10s",
      47              :     "autovacuum=off",
      48              : ];
      49              : 
      50              : impl Conf {
      51          108 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      52          108 :         let path = self.pg_distrib_dir.clone();
      53          108 : 
      54          108 :         #[allow(clippy::manual_range_patterns)]
      55          108 :         match self.pg_version {
      56          108 :             14 | 15 | 16 | 17 => Ok(path.join(format!("v{}", self.pg_version))),
      57            0 :             _ => bail!("Unsupported postgres version: {}", self.pg_version),
      58              :         }
      59          108 :     }
      60              : 
      61           36 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      62           36 :         Ok(self.pg_distrib_dir()?.join("bin"))
      63           36 :     }
      64              : 
      65           72 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      66           72 :         Ok(self.pg_distrib_dir()?.join("lib"))
      67           72 :     }
      68              : 
      69          204 :     pub fn wal_dir(&self) -> PathBuf {
      70          204 :         self.datadir.join("pg_wal")
      71          204 :     }
      72              : 
      73           36 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      74           36 :         let path = self.pg_bin_dir()?.join(command);
      75           36 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      76           36 :         let mut cmd = Command::new(path);
      77           36 :         cmd.env_clear()
      78           36 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      79           36 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
      80           36 :         Ok(cmd)
      81           36 :     }
      82              : 
      83           12 :     pub fn initdb(&self) -> anyhow::Result<()> {
      84           12 :         if let Some(parent) = self.datadir.parent() {
      85           12 :             info!("Pre-creating parent directory {:?}", parent);
      86              :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      87              :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      88           12 :             std::fs::create_dir_all(parent)?;
      89            0 :         }
      90           12 :         info!(
      91           12 :             "Running initdb in {:?} with user \"postgres\"",
      92              :             self.datadir
      93              :         );
      94           12 :         let output = self
      95           12 :             .new_pg_command("initdb")?
      96           12 :             .arg("--pgdata")
      97           12 :             .arg(&self.datadir)
      98           12 :             .args(["--username", "postgres", "--no-instructions", "--no-sync"])
      99           12 :             .output()?;
     100           12 :         debug!("initdb output: {:?}", output);
     101           12 :         ensure!(
     102           12 :             output.status.success(),
     103            0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     104            0 :             String::from_utf8_lossy(&output.stdout),
     105            0 :             String::from_utf8_lossy(&output.stderr),
     106              :         );
     107           12 :         Ok(())
     108           12 :     }
     109              : 
     110           12 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     111           12 :         info!("Starting Postgres server in {:?}", self.datadir);
     112           12 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     113           12 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     114           12 :         let server_process = self
     115           12 :             .new_pg_command("postgres")?
     116           12 :             .args(["-c", "listen_addresses="])
     117           12 :             .arg("-k")
     118           12 :             .arg(&unix_socket_dir_path)
     119           12 :             .arg("-D")
     120           12 :             .arg(&self.datadir)
     121           48 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     122           12 :             .spawn()?;
     123           12 :         let server = PostgresServer {
     124           12 :             process: server_process,
     125           12 :             _unix_socket_dir: unix_socket_dir,
     126           12 :             client_config: {
     127           12 :                 let mut c = postgres::Config::new();
     128           12 :                 c.host_path(&unix_socket_dir_path);
     129           12 :                 c.user("postgres");
     130           12 :                 c.connect_timeout(Duration::from_millis(10000));
     131           12 :                 c
     132           12 :             },
     133           12 :         };
     134           12 :         Ok(server)
     135           12 :     }
     136              : 
     137           12 :     pub fn pg_waldump(
     138           12 :         &self,
     139           12 :         first_segment_name: &OsStr,
     140           12 :         last_segment_name: &OsStr,
     141           12 :     ) -> anyhow::Result<std::process::Output> {
     142           12 :         let first_segment_file = self.datadir.join(first_segment_name);
     143           12 :         let last_segment_file = self.datadir.join(last_segment_name);
     144           12 :         info!(
     145           12 :             "Running pg_waldump for {} .. {}",
     146           12 :             first_segment_file.display(),
     147           12 :             last_segment_file.display()
     148              :         );
     149           12 :         let output = self
     150           12 :             .new_pg_command("pg_waldump")?
     151           12 :             .args([&first_segment_file, &last_segment_file])
     152           12 :             .output()?;
     153           12 :         debug!("waldump output: {:?}", output);
     154           12 :         Ok(output)
     155           12 :     }
     156              : }
     157              : 
     158              : impl PostgresServer {
     159           12 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     160           12 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     161           25 :         while Instant::now() < retry_until {
     162           25 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     163           12 :                 return Ok(client);
     164           13 :             }
     165           13 :             std::thread::sleep(Duration::from_millis(100));
     166              :         }
     167            0 :         bail!("Connection timed out");
     168           12 :     }
     169              : 
     170           12 :     pub fn kill(mut self) {
     171           12 :         self.process.kill().unwrap();
     172           12 :         self.process.wait().unwrap();
     173           12 :     }
     174              : }
     175              : 
     176              : impl Drop for PostgresServer {
     177           12 :     fn drop(&mut self) {
     178           12 :         match self.process.try_wait() {
     179           12 :             Ok(Some(_)) => return,
     180              :             Ok(None) => {
     181            0 :                 warn!("Server was not terminated, will be killed");
     182              :             }
     183            0 :             Err(e) => {
     184            0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     185              :             }
     186              :         }
     187            0 :         let _ = self.process.kill();
     188           12 :     }
     189              : }
     190              : 
     191              : pub trait PostgresClientExt: postgres::GenericClient {
     192           24 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     193           24 :         Ok(self
     194           24 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     195           24 :             .get(0))
     196           24 :     }
     197            0 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     198            0 :         Ok(self
     199            0 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     200            0 :             .get(0))
     201            0 :     }
     202              : }
     203              : 
     204              : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     205              : 
     206           12 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     207           12 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     208              : 
     209           12 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     210           12 :     ensure!(wal_keep_size == "50MB");
     211           12 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     212           12 :     ensure!(wal_writer_delay == "10s");
     213           12 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     214           12 :     ensure!(autovacuum == "off");
     215              : 
     216           12 :     let wal_segment_size = client.query_one(
     217           12 :         "select cast(setting as bigint) as setting, unit \
     218           12 :          from pg_settings where name = 'wal_segment_size'",
     219           12 :         &[],
     220           12 :     )?;
     221           12 :     ensure!(
     222           12 :         wal_segment_size.get::<_, String>("unit") == "B",
     223            0 :         "Unexpected wal_segment_size unit"
     224              :     );
     225           12 :     ensure!(
     226           12 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     227            0 :         "Unexpected wal_segment_size in bytes"
     228              :     );
     229              : 
     230           12 :     Ok(())
     231           12 : }
     232              : 
     233              : pub trait Crafter {
     234              :     const NAME: &'static str;
     235              : 
     236              :     /// Generates WAL using the client `client`. Returns a vector of some valid
     237              :     /// "interesting" intermediate LSNs which one may start reading from.
     238              :     /// test_end_of_wal uses this to check various starting points.
     239              :     ///
     240              :     /// Note that postgres is generally keen about writing some WAL. While we
     241              :     /// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
     242              :     /// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
     243              :     /// stable WAL end would be flaky unless postgres is shut down. For this
     244              :     /// reason returning potential end of WAL here is pointless. Most of the
     245              :     /// time this doesn't happen though, so it is reasonable to create needed
     246              :     /// WAL structure and immediately kill postgres like test_end_of_wal does.
     247              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
     248              : }
     249              : 
     250              : /// Wraps some WAL craft function, providing current LSN to it before the
     251              : /// insertion and flushing WAL afterwards. Also pushes initial LSN to the
     252              : /// result.
     253           12 : fn craft_internal<C: postgres::GenericClient>(
     254           12 :     client: &mut C,
     255           12 :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
     256           12 : ) -> anyhow::Result<Vec<PgLsn>> {
     257           12 :     ensure_server_config(client)?;
     258              : 
     259           12 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     260           12 :     info!("LSN initial = {}", initial_lsn);
     261              : 
     262           12 :     let mut intermediate_lsns = f(client, initial_lsn)?;
     263           12 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     264           12 :         intermediate_lsns.insert(0, initial_lsn);
     265           12 :     }
     266              : 
     267              :     // Some records may be not flushed, e.g. non-transactional logical messages. Flush now.
     268              :     //
     269              :     // If the previous WAL record ended exactly at page boundary, pg_current_wal_insert_lsn
     270              :     // returns the position just after the page header on the next page. That's where the next
     271              :     // record will be inserted. But the page header hasn't actually been written to the WAL
     272              :     // yet, and if you try to flush it, you get a "request to flush past end of generated WAL"
     273              :     // error. Because of that, if the insert location is just after a page header, back off to
     274              :     // previous page boundary.
     275           12 :     let mut lsn = u64::from(client.pg_current_wal_insert_lsn()?);
     276           12 :     if lsn % WAL_SEGMENT_SIZE as u64 == XLOG_SIZE_OF_XLOG_LONG_PHD as u64 {
     277            0 :         lsn -= XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     278           12 :     } else if lsn % XLOG_BLCKSZ as u64 == XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 {
     279            0 :         lsn -= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     280           12 :     }
     281           12 :     client.execute("select neon_xlogflush($1)", &[&PgLsn::from(lsn)])?;
     282           12 :     Ok(intermediate_lsns)
     283           12 : }
     284              : 
     285              : pub struct Simple;
     286              : impl Crafter for Simple {
     287              :     const NAME: &'static str = "simple";
     288            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     289            4 :         craft_internal(client, |client, _| {
     290            4 :             client.execute("CREATE table t(x int)", &[])?;
     291            4 :             Ok(Vec::new())
     292            4 :         })
     293            4 :     }
     294              : }
     295              : 
     296              : pub struct LastWalRecordXlogSwitch;
     297              : impl Crafter for LastWalRecordXlogSwitch {
     298              :     const NAME: &'static str = "last_wal_record_xlog_switch";
     299            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     300            0 :         // Do not use craft_internal because here we end up with flush_lsn exactly on
     301            0 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     302            0 :         ensure_server_config(client)?;
     303              : 
     304            0 :         client.execute("CREATE table t(x int)", &[])?;
     305            0 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     306              :         // pg_switch_wal returns end of last record of the switched segment,
     307              :         // i.e. end of SWITCH itself.
     308            0 :         let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     309            0 :         let before_xlog_switch_u64 = u64::from(before_xlog_switch);
     310            0 :         let next_segment = PgLsn::from(
     311            0 :             before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
     312            0 :                 + WAL_SEGMENT_SIZE as u64,
     313            0 :         );
     314            0 :         ensure!(
     315            0 :             xlog_switch_record_end <= next_segment,
     316            0 :             "XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
     317              :             xlog_switch_record_end,
     318              :             next_segment
     319              :         );
     320            0 :         Ok(vec![before_xlog_switch, xlog_switch_record_end])
     321            0 :     }
     322              : }
     323              : 
     324              : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     325              : /// Craft xlog SWITCH record ending at page boundary.
     326              : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     327              :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     328            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     329            0 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     330            0 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     331            0 :         ensure_server_config(client)?;
     332              : 
     333            0 :         client.execute("CREATE table t(x int)", &[])?;
     334              : 
     335              :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.  We
     336              :         // will use carefully-sized logical messages to advance WAL insert location such
     337              :         // that there is just enough space on the page for the XLOG_SWITCH record.
     338              :         loop {
     339              :             // We start with measuring how much WAL it takes for one logical message,
     340              :             // considering all alignments and headers.
     341            0 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     342            0 :             client.execute(
     343            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     344            0 :                 &[],
     345            0 :             )?;
     346            0 :             let after_lsn = client.pg_current_wal_insert_lsn()?;
     347              : 
     348              :             // Did the record cross a page boundary? If it did, start over. Crossing a
     349              :             // page boundary adds to the apparent size of the record because of the page
     350              :             // header, which throws off the calculation.
     351            0 :             if u64::from(before_lsn) / XLOG_BLCKSZ as u64
     352            0 :                 != u64::from(after_lsn) / XLOG_BLCKSZ as u64
     353              :             {
     354            0 :                 continue;
     355            0 :             }
     356            0 :             // base_size is the size of a logical message without the payload
     357            0 :             let base_size = u64::from(after_lsn) - u64::from(before_lsn) - 10;
     358            0 : 
     359            0 :             // Is there enough space on the page for another logical message and an
     360            0 :             // XLOG_SWITCH? If not, start over.
     361            0 :             let page_remain = XLOG_BLCKSZ as u64 - u64::from(after_lsn) % XLOG_BLCKSZ as u64;
     362            0 :             if page_remain < base_size + XLOG_SIZE_OF_XLOG_RECORD as u64 {
     363            0 :                 continue;
     364            0 :             }
     365            0 : 
     366            0 :             // We will write another logical message, such that after the logical message
     367            0 :             // record, there will be space for exactly one XLOG_SWITCH. How large should
     368            0 :             // the logical message's payload be? An XLOG_SWITCH record has no data => its
     369            0 :             // size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     370            0 :             let repeats = page_remain - base_size - XLOG_SIZE_OF_XLOG_RECORD as u64;
     371            0 : 
     372            0 :             client.execute(
     373            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     374            0 :                 &[&(repeats as i32)],
     375            0 :             )?;
     376            0 :             info!(
     377            0 :                 "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     378            0 :                 client.pg_current_wal_insert_lsn()?,
     379              :                 XLOG_SIZE_OF_XLOG_RECORD
     380              :             );
     381              : 
     382              :             // Emit the XLOG_SWITCH
     383            0 :             let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     384            0 :             let xlog_switch_record_end: PgLsn =
     385            0 :                 client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     386            0 : 
     387            0 :             if u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     388            0 :                 != XLOG_SIZE_OF_XLOG_SHORT_PHD
     389              :             {
     390            0 :                 warn!(
     391            0 :                     "XLOG_SWITCH message ended not on page boundary: {}, offset = {}, repeating",
     392            0 :                     xlog_switch_record_end,
     393            0 :                     u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     394              :                 );
     395            0 :                 continue;
     396            0 :             }
     397            0 :             return Ok(vec![before_xlog_switch, xlog_switch_record_end]);
     398              :         }
     399            0 :     }
     400              : }
     401              : 
     402              : /// Write ~16MB logical message; it should cross WAL segment.
     403            8 : fn craft_seg_size_logical_message(
     404            8 :     client: &mut impl postgres::GenericClient,
     405            8 :     transactional: bool,
     406            8 : ) -> anyhow::Result<Vec<PgLsn>> {
     407            8 :     craft_internal(client, |client, initial_lsn| {
     408            8 :         ensure!(
     409            8 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     410            0 :             "Initial LSN is too far in the future"
     411              :         );
     412              : 
     413            8 :         let message_lsn: PgLsn = client
     414            8 :             .query_one(
     415            8 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     416            8 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     417            8 :                 &[&transactional],
     418            8 :             )?
     419            8 :             .get("message_lsn");
     420            8 :         ensure!(
     421            8 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     422            0 :             "Logical message did not cross the segment boundary"
     423              :         );
     424            8 :         ensure!(
     425            8 :             message_lsn < PgLsn::from(0x0400_0000),
     426            0 :             "Logical message crossed two segments"
     427              :         );
     428              : 
     429            8 :         Ok(vec![message_lsn])
     430            8 :     })
     431            8 : }
     432              : 
     433              : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     434              : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     435              :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     436            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     437            4 :         // Transactional message crossing WAL segment will be followed by small
     438            4 :         // commit record.
     439            4 :         craft_seg_size_logical_message(client, true)
     440            4 :     }
     441              : }
     442              : 
     443              : pub struct LastWalRecordCrossingSegment;
     444              : impl Crafter for LastWalRecordCrossingSegment {
     445              :     const NAME: &'static str = "last_wal_record_crossing_segment";
     446            4 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     447            4 :         craft_seg_size_logical_message(client, false)
     448            4 :     }
     449              : }
        

Generated by: LCOV version 2.1-beta