LCOV - code coverage report
Current view: top level - libs/postgres_ffi/wal_craft/src - lib.rs (source / functions) Coverage Total Hit
Test: f081ec316c96fa98335efd15ef501745aa4f015d.info Lines: 66.3 % 270 179
Test Date: 2024-06-25 15:11:17 Functions: 61.1 % 36 22

            Line data    Source code
       1              : use anyhow::{bail, ensure};
       2              : use camino_tempfile::{tempdir, Utf8TempDir};
       3              : use log::*;
       4              : use postgres::types::PgLsn;
       5              : use postgres::Client;
       6              : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
       7              : use postgres_ffi::{
       8              :     XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
       9              : };
      10              : use std::path::{Path, PathBuf};
      11              : use std::process::Command;
      12              : use std::time::{Duration, Instant};
      13              : 
      14              : macro_rules! xlog_utils_test {
      15              :     ($version:ident) => {
      16              :         #[path = "."]
      17              :         mod $version {
      18              :             #[allow(unused_imports)]
      19              :             pub use postgres_ffi::$version::wal_craft_test_export::*;
      20              :             #[allow(clippy::duplicate_mod)]
      21              :             #[cfg(test)]
      22              :             mod xlog_utils_test;
      23              :         }
      24              :     };
      25              : }
      26              : 
      27              : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
      28              : 
      29              : #[derive(Debug, Clone, PartialEq, Eq)]
      30              : pub struct Conf {
      31              :     pub pg_version: u32,
      32              :     pub pg_distrib_dir: PathBuf,
      33              :     pub datadir: PathBuf,
      34              : }
      35              : 
      36              : pub struct PostgresServer {
      37              :     process: std::process::Child,
      38              :     _unix_socket_dir: Utf8TempDir,
      39              :     client_config: postgres::Config,
      40              : }
      41              : 
      42              : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
      43              :     "wal_keep_size=50MB",            // Ensure old WAL is not removed
      44              :     "shared_preload_libraries=neon", // can only be loaded at startup
      45              :     // Disable background processes as much as possible
      46              :     "wal_writer_delay=10s",
      47              :     "autovacuum=off",
      48              : ];
      49              : 
      50              : impl Conf {
      51          162 :     pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
      52          162 :         let path = self.pg_distrib_dir.clone();
      53          162 : 
      54          162 :         #[allow(clippy::manual_range_patterns)]
      55          162 :         match self.pg_version {
      56          162 :             14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
      57            0 :             _ => bail!("Unsupported postgres version: {}", self.pg_version),
      58              :         }
      59          162 :     }
      60              : 
      61           54 :     fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
      62           54 :         Ok(self.pg_distrib_dir()?.join("bin"))
      63           54 :     }
      64              : 
      65          108 :     fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
      66          108 :         Ok(self.pg_distrib_dir()?.join("lib"))
      67          108 :     }
      68              : 
      69          306 :     pub fn wal_dir(&self) -> PathBuf {
      70          306 :         self.datadir.join("pg_wal")
      71          306 :     }
      72              : 
      73           54 :     fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
      74           54 :         let path = self.pg_bin_dir()?.join(command);
      75           54 :         ensure!(path.exists(), "Command {:?} does not exist", path);
      76           54 :         let mut cmd = Command::new(path);
      77           54 :         cmd.env_clear()
      78           54 :             .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
      79           54 :             .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
      80           54 :         Ok(cmd)
      81           54 :     }
      82              : 
      83           18 :     pub fn initdb(&self) -> anyhow::Result<()> {
      84           18 :         if let Some(parent) = self.datadir.parent() {
      85           18 :             info!("Pre-creating parent directory {:?}", parent);
      86              :             // Tests may be run concurrently and there may be a race to create `test_output/`.
      87              :             // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
      88           18 :             std::fs::create_dir_all(parent)?;
      89            0 :         }
      90           18 :         info!(
      91           18 :             "Running initdb in {:?} with user \"postgres\"",
      92              :             self.datadir
      93              :         );
      94           18 :         let output = self
      95           18 :             .new_pg_command("initdb")?
      96           18 :             .arg("-D")
      97           18 :             .arg(&self.datadir)
      98           18 :             .args(["-U", "postgres", "--no-instructions", "--no-sync"])
      99           18 :             .output()?;
     100           18 :         debug!("initdb output: {:?}", output);
     101           18 :         ensure!(
     102           18 :             output.status.success(),
     103            0 :             "initdb failed, stdout and stderr follow:\n{}{}",
     104            0 :             String::from_utf8_lossy(&output.stdout),
     105            0 :             String::from_utf8_lossy(&output.stderr),
     106              :         );
     107           18 :         Ok(())
     108           18 :     }
     109              : 
     110           18 :     pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
     111           18 :         info!("Starting Postgres server in {:?}", self.datadir);
     112           18 :         let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
     113           18 :         let unix_socket_dir_path = unix_socket_dir.path().to_owned();
     114           18 :         let server_process = self
     115           18 :             .new_pg_command("postgres")?
     116           18 :             .args(["-c", "listen_addresses="])
     117           18 :             .arg("-k")
     118           18 :             .arg(&unix_socket_dir_path)
     119           18 :             .arg("-D")
     120           18 :             .arg(&self.datadir)
     121           72 :             .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
     122           18 :             .spawn()?;
     123           18 :         let server = PostgresServer {
     124           18 :             process: server_process,
     125           18 :             _unix_socket_dir: unix_socket_dir,
     126           18 :             client_config: {
     127           18 :                 let mut c = postgres::Config::new();
     128           18 :                 c.host_path(&unix_socket_dir_path);
     129           18 :                 c.user("postgres");
     130           18 :                 c.connect_timeout(Duration::from_millis(10000));
     131           18 :                 c
     132           18 :             },
     133           18 :         };
     134           18 :         Ok(server)
     135           18 :     }
     136              : 
     137           18 :     pub fn pg_waldump(
     138           18 :         &self,
     139           18 :         first_segment_name: &str,
     140           18 :         last_segment_name: &str,
     141           18 :     ) -> anyhow::Result<std::process::Output> {
     142           18 :         let first_segment_file = self.datadir.join(first_segment_name);
     143           18 :         let last_segment_file = self.datadir.join(last_segment_name);
     144           18 :         info!(
     145           18 :             "Running pg_waldump for {} .. {}",
     146           18 :             first_segment_file.display(),
     147           18 :             last_segment_file.display()
     148              :         );
     149           18 :         let output = self
     150           18 :             .new_pg_command("pg_waldump")?
     151           18 :             .args([&first_segment_file, &last_segment_file])
     152           18 :             .output()?;
     153           18 :         debug!("waldump output: {:?}", output);
     154           18 :         Ok(output)
     155           18 :     }
     156              : }
     157              : 
     158              : impl PostgresServer {
     159           18 :     pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
     160           18 :         let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
     161           37 :         while Instant::now() < retry_until {
     162           37 :             if let Ok(client) = self.client_config.connect(postgres::NoTls) {
     163           18 :                 return Ok(client);
     164           19 :             }
     165           19 :             std::thread::sleep(Duration::from_millis(100));
     166              :         }
     167            0 :         bail!("Connection timed out");
     168           18 :     }
     169              : 
     170           18 :     pub fn kill(mut self) {
     171           18 :         self.process.kill().unwrap();
     172           18 :         self.process.wait().unwrap();
     173           18 :     }
     174              : }
     175              : 
     176              : impl Drop for PostgresServer {
     177           18 :     fn drop(&mut self) {
     178           18 :         match self.process.try_wait() {
     179           18 :             Ok(Some(_)) => return,
     180              :             Ok(None) => {
     181            0 :                 warn!("Server was not terminated, will be killed");
     182              :             }
     183            0 :             Err(e) => {
     184            0 :                 error!("Unable to get status of the server: {}, will be killed", e);
     185              :             }
     186              :         }
     187            0 :         let _ = self.process.kill();
     188           18 :     }
     189              : }
     190              : 
     191              : pub trait PostgresClientExt: postgres::GenericClient {
     192           36 :     fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
     193           36 :         Ok(self
     194           36 :             .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
     195           36 :             .get(0))
     196           36 :     }
     197            0 :     fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
     198            0 :         Ok(self
     199            0 :             .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
     200            0 :             .get(0))
     201            0 :     }
     202              : }
     203              : 
     204              : impl<C: postgres::GenericClient> PostgresClientExt for C {}
     205              : 
     206           18 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
     207           18 :     client.execute("create extension if not exists neon_test_utils", &[])?;
     208              : 
     209           18 :     let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
     210           18 :     ensure!(wal_keep_size == "50MB");
     211           18 :     let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
     212           18 :     ensure!(wal_writer_delay == "10s");
     213           18 :     let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
     214           18 :     ensure!(autovacuum == "off");
     215              : 
     216           18 :     let wal_segment_size = client.query_one(
     217           18 :         "select cast(setting as bigint) as setting, unit \
     218           18 :          from pg_settings where name = 'wal_segment_size'",
     219           18 :         &[],
     220           18 :     )?;
     221           18 :     ensure!(
     222           18 :         wal_segment_size.get::<_, String>("unit") == "B",
     223            0 :         "Unexpected wal_segment_size unit"
     224              :     );
     225           18 :     ensure!(
     226           18 :         wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
     227            0 :         "Unexpected wal_segment_size in bytes"
     228              :     );
     229              : 
     230           18 :     Ok(())
     231           18 : }
     232              : 
     233              : pub trait Crafter {
     234              :     const NAME: &'static str;
     235              : 
     236              :     /// Generates WAL using the client `client`. Returns a vector of some valid
     237              :     /// "interesting" intermediate LSNs which one may start reading from.
     238              :     /// test_end_of_wal uses this to check various starting points.
     239              :     ///
     240              :     /// Note that postgres is generally keen about writing some WAL. While we
     241              :     /// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
     242              :     /// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
     243              :     /// stable WAL end would be flaky unless postgres is shut down. For this
     244              :     /// reason returning potential end of WAL here is pointless. Most of the
     245              :     /// time this doesn't happen though, so it is reasonable to create needed
     246              :     /// WAL structure and immediately kill postgres like test_end_of_wal does.
     247              :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
     248              : }
     249              : 
     250              : /// Wraps some WAL craft function, providing current LSN to it before the
     251              : /// insertion and flushing WAL afterwards. Also pushes initial LSN to the
     252              : /// result.
     253           18 : fn craft_internal<C: postgres::GenericClient>(
     254           18 :     client: &mut C,
     255           18 :     f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
     256           18 : ) -> anyhow::Result<Vec<PgLsn>> {
     257           18 :     ensure_server_config(client)?;
     258              : 
     259           18 :     let initial_lsn = client.pg_current_wal_insert_lsn()?;
     260           18 :     info!("LSN initial = {}", initial_lsn);
     261              : 
     262           18 :     let mut intermediate_lsns = f(client, initial_lsn)?;
     263           18 :     if !intermediate_lsns.starts_with(&[initial_lsn]) {
     264           18 :         intermediate_lsns.insert(0, initial_lsn);
     265           18 :     }
     266              : 
     267              :     // Some records may be not flushed, e.g. non-transactional logical messages. Flush now.
     268              :     //
     269              :     // If the previous WAL record ended exactly at page boundary, pg_current_wal_insert_lsn
     270              :     // returns the position just after the page header on the next page. That's where the next
     271              :     // record will be inserted. But the page header hasn't actually been written to the WAL
     272              :     // yet, and if you try to flush it, you get a "request to flush past end of generated WAL"
     273              :     // error. Because of that, if the insert location is just after a page header, back off to
     274              :     // previous page boundary.
     275           18 :     let mut lsn = u64::from(client.pg_current_wal_insert_lsn()?);
     276           18 :     if lsn % WAL_SEGMENT_SIZE as u64 == XLOG_SIZE_OF_XLOG_LONG_PHD as u64 {
     277            0 :         lsn -= XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     278           18 :     } else if lsn % XLOG_BLCKSZ as u64 == XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 {
     279            0 :         lsn -= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     280           18 :     }
     281           18 :     client.execute("select neon_xlogflush($1)", &[&PgLsn::from(lsn)])?;
     282           18 :     Ok(intermediate_lsns)
     283           18 : }
     284              : 
     285              : pub struct Simple;
     286              : impl Crafter for Simple {
     287              :     const NAME: &'static str = "simple";
     288            6 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     289            6 :         craft_internal(client, |client, _| {
     290            6 :             client.execute("CREATE table t(x int)", &[])?;
     291            6 :             Ok(Vec::new())
     292            6 :         })
     293            6 :     }
     294              : }
     295              : 
     296              : pub struct LastWalRecordXlogSwitch;
     297              : impl Crafter for LastWalRecordXlogSwitch {
     298              :     const NAME: &'static str = "last_wal_record_xlog_switch";
     299            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     300            0 :         // Do not use craft_internal because here we end up with flush_lsn exactly on
     301            0 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     302            0 :         ensure_server_config(client)?;
     303              : 
     304            0 :         client.execute("CREATE table t(x int)", &[])?;
     305            0 :         let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     306              :         // pg_switch_wal returns end of last record of the switched segment,
     307              :         // i.e. end of SWITCH itself.
     308            0 :         let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     309            0 :         let before_xlog_switch_u64 = u64::from(before_xlog_switch);
     310            0 :         let next_segment = PgLsn::from(
     311            0 :             before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
     312            0 :                 + WAL_SEGMENT_SIZE as u64,
     313            0 :         );
     314            0 :         ensure!(
     315            0 :             xlog_switch_record_end <= next_segment,
     316            0 :             "XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
     317              :             xlog_switch_record_end,
     318              :             next_segment
     319              :         );
     320            0 :         Ok(vec![before_xlog_switch, xlog_switch_record_end])
     321            0 :     }
     322              : }
     323              : 
     324              : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
     325              : /// Craft xlog SWITCH record ending at page boundary.
     326              : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
     327              :     const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
     328            0 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     329            0 :         // Do not use generate_internal because here we end up with flush_lsn exactly on
     330            0 :         // the segment boundary and insert_lsn after the initial page header, which is unusual.
     331            0 :         ensure_server_config(client)?;
     332              : 
     333            0 :         client.execute("CREATE table t(x int)", &[])?;
     334              : 
     335              :         // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.  We
     336              :         // will use carefully-sized logical messages to advance WAL insert location such
     337              :         // that there is just enough space on the page for the XLOG_SWITCH record.
     338              :         loop {
     339              :             // We start with measuring how much WAL it takes for one logical message,
     340              :             // considering all alignments and headers.
     341            0 :             let before_lsn = client.pg_current_wal_insert_lsn()?;
     342            0 :             client.execute(
     343            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
     344            0 :                 &[],
     345            0 :             )?;
     346            0 :             let after_lsn = client.pg_current_wal_insert_lsn()?;
     347              : 
     348              :             // Did the record cross a page boundary? If it did, start over. Crossing a
     349              :             // page boundary adds to the apparent size of the record because of the page
     350              :             // header, which throws off the calculation.
     351            0 :             if u64::from(before_lsn) / XLOG_BLCKSZ as u64
     352            0 :                 != u64::from(after_lsn) / XLOG_BLCKSZ as u64
     353              :             {
     354            0 :                 continue;
     355            0 :             }
     356            0 :             // base_size is the size of a logical message without the payload
     357            0 :             let base_size = u64::from(after_lsn) - u64::from(before_lsn) - 10;
     358            0 : 
     359            0 :             // Is there enough space on the page for another logical message and an
     360            0 :             // XLOG_SWITCH? If not, start over.
     361            0 :             let page_remain = XLOG_BLCKSZ as u64 - u64::from(after_lsn) % XLOG_BLCKSZ as u64;
     362            0 :             if page_remain < base_size + XLOG_SIZE_OF_XLOG_RECORD as u64 {
     363            0 :                 continue;
     364            0 :             }
     365            0 : 
     366            0 :             // We will write another logical message, such that after the logical message
     367            0 :             // record, there will be space for exactly one XLOG_SWITCH. How large should
     368            0 :             // the logical message's payload be? An XLOG_SWITCH record has no data => its
     369            0 :             // size is exactly XLOG_SIZE_OF_XLOG_RECORD.
     370            0 :             let repeats = page_remain - base_size - XLOG_SIZE_OF_XLOG_RECORD as u64;
     371            0 : 
     372            0 :             client.execute(
     373            0 :                 "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
     374            0 :                 &[&(repeats as i32)],
     375            0 :             )?;
     376            0 :             info!(
     377            0 :                 "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
     378            0 :                 client.pg_current_wal_insert_lsn()?,
     379              :                 XLOG_SIZE_OF_XLOG_RECORD
     380              :             );
     381              : 
     382              :             // Emit the XLOG_SWITCH
     383            0 :             let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
     384            0 :             let xlog_switch_record_end: PgLsn =
     385            0 :                 client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
     386            0 : 
     387            0 :             if u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     388            0 :                 != XLOG_SIZE_OF_XLOG_SHORT_PHD
     389              :             {
     390            0 :                 warn!(
     391            0 :                     "XLOG_SWITCH message ended not on page boundary: {}, offset = {}, repeating",
     392            0 :                     xlog_switch_record_end,
     393            0 :                     u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
     394              :                 );
     395            0 :                 continue;
     396            0 :             }
     397            0 :             return Ok(vec![before_xlog_switch, xlog_switch_record_end]);
     398              :         }
     399            0 :     }
     400              : }
     401              : 
     402              : /// Write ~16MB logical message; it should cross WAL segment.
     403           12 : fn craft_seg_size_logical_message(
     404           12 :     client: &mut impl postgres::GenericClient,
     405           12 :     transactional: bool,
     406           12 : ) -> anyhow::Result<Vec<PgLsn>> {
     407           12 :     craft_internal(client, |client, initial_lsn| {
     408           12 :         ensure!(
     409           12 :             initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
     410            0 :             "Initial LSN is too far in the future"
     411              :         );
     412              : 
     413           12 :         let message_lsn: PgLsn = client
     414           12 :             .query_one(
     415           12 :                 "select pg_logical_emit_message($1, 'big-16mb-msg', \
     416           12 :                  concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
     417           12 :                 &[&transactional],
     418           12 :             )?
     419           12 :             .get("message_lsn");
     420           12 :         ensure!(
     421           12 :             message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
     422            0 :             "Logical message did not cross the segment boundary"
     423              :         );
     424           12 :         ensure!(
     425           12 :             message_lsn < PgLsn::from(0x0400_0000),
     426            0 :             "Logical message crossed two segments"
     427              :         );
     428              : 
     429           12 :         Ok(vec![message_lsn])
     430           12 :     })
     431           12 : }
     432              : 
     433              : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
     434              : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
     435              :     const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
     436            6 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     437            6 :         // Transactional message crossing WAL segment will be followed by small
     438            6 :         // commit record.
     439            6 :         craft_seg_size_logical_message(client, true)
     440            6 :     }
     441              : }
     442              : 
     443              : pub struct LastWalRecordCrossingSegment;
     444              : impl Crafter for LastWalRecordCrossingSegment {
     445              :     const NAME: &'static str = "last_wal_record_crossing_segment";
     446            6 :     fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
     447            6 :         craft_seg_size_logical_message(client, false)
     448            6 :     }
     449              : }
        

Generated by: LCOV version 2.1-beta