Line data Source code
1 : use anyhow::{bail, ensure};
2 : use camino_tempfile::{tempdir, Utf8TempDir};
3 : use log::*;
4 : use postgres::types::PgLsn;
5 : use postgres::Client;
6 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
7 : use postgres_ffi::{
8 : XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
9 : };
10 : use std::ffi::OsStr;
11 : use std::path::{Path, PathBuf};
12 : use std::process::Command;
13 : use std::time::{Duration, Instant};
14 :
15 : macro_rules! xlog_utils_test {
16 : ($version:ident) => {
17 : #[path = "."]
18 : mod $version {
19 : #[allow(unused_imports)]
20 : pub use postgres_ffi::$version::wal_craft_test_export::*;
21 : #[allow(clippy::duplicate_mod)]
22 : #[cfg(test)]
23 : mod xlog_utils_test;
24 : }
25 : };
26 : }
27 :
28 : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
29 :
30 : pub struct Conf {
31 : pub pg_version: u32,
32 : pub pg_distrib_dir: PathBuf,
33 : pub datadir: PathBuf,
34 : }
35 :
36 : pub struct PostgresServer {
37 : process: std::process::Child,
38 : _unix_socket_dir: Utf8TempDir,
39 : client_config: postgres::Config,
40 : }
41 :
42 : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
43 : "wal_keep_size=50MB", // Ensure old WAL is not removed
44 : "shared_preload_libraries=neon", // can only be loaded at startup
45 : // Disable background processes as much as possible
46 : "wal_writer_delay=10s",
47 : "autovacuum=off",
48 : ];
49 :
50 : impl Conf {
51 108 : pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
52 108 : let path = self.pg_distrib_dir.clone();
53 108 :
54 108 : #[allow(clippy::manual_range_patterns)]
55 108 : match self.pg_version {
56 108 : 14 | 15 | 16 | 17 => Ok(path.join(format!("v{}", self.pg_version))),
57 0 : _ => bail!("Unsupported postgres version: {}", self.pg_version),
58 : }
59 108 : }
60 :
61 36 : fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
62 36 : Ok(self.pg_distrib_dir()?.join("bin"))
63 36 : }
64 :
65 72 : fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
66 72 : Ok(self.pg_distrib_dir()?.join("lib"))
67 72 : }
68 :
69 204 : pub fn wal_dir(&self) -> PathBuf {
70 204 : self.datadir.join("pg_wal")
71 204 : }
72 :
73 36 : fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
74 36 : let path = self.pg_bin_dir()?.join(command);
75 36 : ensure!(path.exists(), "Command {:?} does not exist", path);
76 36 : let mut cmd = Command::new(path);
77 36 : cmd.env_clear()
78 36 : .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
79 36 : .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?)
80 36 : .env(
81 36 : "ASAN_OPTIONS",
82 36 : std::env::var("ASAN_OPTIONS").unwrap_or_default(),
83 36 : )
84 36 : .env(
85 36 : "UBSAN_OPTIONS",
86 36 : std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
87 36 : );
88 36 : Ok(cmd)
89 36 : }
90 :
91 12 : pub fn initdb(&self) -> anyhow::Result<()> {
92 12 : if let Some(parent) = self.datadir.parent() {
93 12 : info!("Pre-creating parent directory {:?}", parent);
94 : // Tests may be run concurrently and there may be a race to create `test_output/`.
95 : // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
96 12 : std::fs::create_dir_all(parent)?;
97 0 : }
98 12 : info!(
99 12 : "Running initdb in {:?} with user \"postgres\"",
100 : self.datadir
101 : );
102 12 : let output = self
103 12 : .new_pg_command("initdb")?
104 12 : .arg("--pgdata")
105 12 : .arg(&self.datadir)
106 12 : .args(["--username", "postgres", "--no-instructions", "--no-sync"])
107 12 : .output()?;
108 12 : debug!("initdb output: {:?}", output);
109 12 : ensure!(
110 12 : output.status.success(),
111 0 : "initdb failed, stdout and stderr follow:\n{}{}",
112 0 : String::from_utf8_lossy(&output.stdout),
113 0 : String::from_utf8_lossy(&output.stderr),
114 : );
115 12 : Ok(())
116 12 : }
117 :
118 12 : pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
119 12 : info!("Starting Postgres server in {:?}", self.datadir);
120 12 : let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
121 12 : let unix_socket_dir_path = unix_socket_dir.path().to_owned();
122 12 : let server_process = self
123 12 : .new_pg_command("postgres")?
124 12 : .args(["-c", "listen_addresses="])
125 12 : .arg("-k")
126 12 : .arg(&unix_socket_dir_path)
127 12 : .arg("-D")
128 12 : .arg(&self.datadir)
129 48 : .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
130 12 : .spawn()?;
131 12 : let server = PostgresServer {
132 12 : process: server_process,
133 12 : _unix_socket_dir: unix_socket_dir,
134 12 : client_config: {
135 12 : let mut c = postgres::Config::new();
136 12 : c.host_path(&unix_socket_dir_path);
137 12 : c.user("postgres");
138 12 : c.connect_timeout(Duration::from_millis(10000));
139 12 : c
140 12 : },
141 12 : };
142 12 : Ok(server)
143 12 : }
144 :
145 12 : pub fn pg_waldump(
146 12 : &self,
147 12 : first_segment_name: &OsStr,
148 12 : last_segment_name: &OsStr,
149 12 : ) -> anyhow::Result<std::process::Output> {
150 12 : let first_segment_file = self.datadir.join(first_segment_name);
151 12 : let last_segment_file = self.datadir.join(last_segment_name);
152 12 : info!(
153 12 : "Running pg_waldump for {} .. {}",
154 12 : first_segment_file.display(),
155 12 : last_segment_file.display()
156 : );
157 12 : let output = self
158 12 : .new_pg_command("pg_waldump")?
159 12 : .args([&first_segment_file, &last_segment_file])
160 12 : .output()?;
161 12 : debug!("waldump output: {:?}", output);
162 12 : Ok(output)
163 12 : }
164 : }
165 :
166 : impl PostgresServer {
167 12 : pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
168 12 : let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
169 24 : while Instant::now() < retry_until {
170 24 : if let Ok(client) = self.client_config.connect(postgres::NoTls) {
171 12 : return Ok(client);
172 12 : }
173 12 : std::thread::sleep(Duration::from_millis(100));
174 : }
175 0 : bail!("Connection timed out");
176 12 : }
177 :
178 12 : pub fn kill(mut self) {
179 12 : self.process.kill().unwrap();
180 12 : self.process.wait().unwrap();
181 12 : }
182 : }
183 :
184 : impl Drop for PostgresServer {
185 12 : fn drop(&mut self) {
186 12 : match self.process.try_wait() {
187 12 : Ok(Some(_)) => return,
188 : Ok(None) => {
189 0 : warn!("Server was not terminated, will be killed");
190 : }
191 0 : Err(e) => {
192 0 : error!("Unable to get status of the server: {}, will be killed", e);
193 : }
194 : }
195 0 : let _ = self.process.kill();
196 12 : }
197 : }
198 :
199 : pub trait PostgresClientExt: postgres::GenericClient {
200 24 : fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
201 24 : Ok(self
202 24 : .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
203 24 : .get(0))
204 24 : }
205 0 : fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
206 0 : Ok(self
207 0 : .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
208 0 : .get(0))
209 0 : }
210 : }
211 :
212 : impl<C: postgres::GenericClient> PostgresClientExt for C {}
213 :
214 12 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
215 12 : client.execute("create extension if not exists neon_test_utils", &[])?;
216 :
217 12 : let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
218 12 : ensure!(wal_keep_size == "50MB");
219 12 : let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
220 12 : ensure!(wal_writer_delay == "10s");
221 12 : let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
222 12 : ensure!(autovacuum == "off");
223 :
224 12 : let wal_segment_size = client.query_one(
225 12 : "select cast(setting as bigint) as setting, unit \
226 12 : from pg_settings where name = 'wal_segment_size'",
227 12 : &[],
228 12 : )?;
229 12 : ensure!(
230 12 : wal_segment_size.get::<_, String>("unit") == "B",
231 0 : "Unexpected wal_segment_size unit"
232 : );
233 12 : ensure!(
234 12 : wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
235 0 : "Unexpected wal_segment_size in bytes"
236 : );
237 :
238 12 : Ok(())
239 12 : }
240 :
241 : pub trait Crafter {
242 : const NAME: &'static str;
243 :
244 : /// Generates WAL using the client `client`. Returns a vector of some valid
245 : /// "interesting" intermediate LSNs which one may start reading from.
246 : /// test_end_of_wal uses this to check various starting points.
247 : ///
248 : /// Note that postgres is generally keen about writing some WAL. While we
249 : /// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
250 : /// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
251 : /// stable WAL end would be flaky unless postgres is shut down. For this
252 : /// reason returning potential end of WAL here is pointless. Most of the
253 : /// time this doesn't happen though, so it is reasonable to create needed
254 : /// WAL structure and immediately kill postgres like test_end_of_wal does.
255 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
256 : }
257 :
258 : /// Wraps some WAL craft function, providing current LSN to it before the
259 : /// insertion and flushing WAL afterwards. Also pushes initial LSN to the
260 : /// result.
261 12 : fn craft_internal<C: postgres::GenericClient>(
262 12 : client: &mut C,
263 12 : f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
264 12 : ) -> anyhow::Result<Vec<PgLsn>> {
265 12 : ensure_server_config(client)?;
266 :
267 12 : let initial_lsn = client.pg_current_wal_insert_lsn()?;
268 12 : info!("LSN initial = {}", initial_lsn);
269 :
270 12 : let mut intermediate_lsns = f(client, initial_lsn)?;
271 12 : if !intermediate_lsns.starts_with(&[initial_lsn]) {
272 12 : intermediate_lsns.insert(0, initial_lsn);
273 12 : }
274 :
275 : // Some records may be not flushed, e.g. non-transactional logical messages. Flush now.
276 : //
277 : // If the previous WAL record ended exactly at page boundary, pg_current_wal_insert_lsn
278 : // returns the position just after the page header on the next page. That's where the next
279 : // record will be inserted. But the page header hasn't actually been written to the WAL
280 : // yet, and if you try to flush it, you get a "request to flush past end of generated WAL"
281 : // error. Because of that, if the insert location is just after a page header, back off to
282 : // previous page boundary.
283 12 : let mut lsn = u64::from(client.pg_current_wal_insert_lsn()?);
284 12 : if lsn % WAL_SEGMENT_SIZE as u64 == XLOG_SIZE_OF_XLOG_LONG_PHD as u64 {
285 0 : lsn -= XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
286 12 : } else if lsn % XLOG_BLCKSZ as u64 == XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 {
287 0 : lsn -= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
288 12 : }
289 12 : client.execute("select neon_xlogflush($1)", &[&PgLsn::from(lsn)])?;
290 12 : Ok(intermediate_lsns)
291 12 : }
292 :
293 : pub struct Simple;
294 : impl Crafter for Simple {
295 : const NAME: &'static str = "simple";
296 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
297 4 : craft_internal(client, |client, _| {
298 4 : client.execute("CREATE table t(x int)", &[])?;
299 4 : Ok(Vec::new())
300 4 : })
301 4 : }
302 : }
303 :
304 : pub struct LastWalRecordXlogSwitch;
305 : impl Crafter for LastWalRecordXlogSwitch {
306 : const NAME: &'static str = "last_wal_record_xlog_switch";
307 0 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
308 0 : // Do not use craft_internal because here we end up with flush_lsn exactly on
309 0 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
310 0 : ensure_server_config(client)?;
311 :
312 0 : client.execute("CREATE table t(x int)", &[])?;
313 0 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
314 : // pg_switch_wal returns end of last record of the switched segment,
315 : // i.e. end of SWITCH itself.
316 0 : let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
317 0 : let before_xlog_switch_u64 = u64::from(before_xlog_switch);
318 0 : let next_segment = PgLsn::from(
319 0 : before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
320 0 : + WAL_SEGMENT_SIZE as u64,
321 0 : );
322 0 : ensure!(
323 0 : xlog_switch_record_end <= next_segment,
324 0 : "XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
325 : xlog_switch_record_end,
326 : next_segment
327 : );
328 0 : Ok(vec![before_xlog_switch, xlog_switch_record_end])
329 0 : }
330 : }
331 :
332 : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
333 : /// Craft xlog SWITCH record ending at page boundary.
334 : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
335 : const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
336 0 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
337 0 : // Do not use generate_internal because here we end up with flush_lsn exactly on
338 0 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
339 0 : ensure_server_config(client)?;
340 :
341 0 : client.execute("CREATE table t(x int)", &[])?;
342 :
343 : // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary. We
344 : // will use carefully-sized logical messages to advance WAL insert location such
345 : // that there is just enough space on the page for the XLOG_SWITCH record.
346 : loop {
347 : // We start with measuring how much WAL it takes for one logical message,
348 : // considering all alignments and headers.
349 0 : let before_lsn = client.pg_current_wal_insert_lsn()?;
350 0 : client.execute(
351 0 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
352 0 : &[],
353 0 : )?;
354 0 : let after_lsn = client.pg_current_wal_insert_lsn()?;
355 :
356 : // Did the record cross a page boundary? If it did, start over. Crossing a
357 : // page boundary adds to the apparent size of the record because of the page
358 : // header, which throws off the calculation.
359 0 : if u64::from(before_lsn) / XLOG_BLCKSZ as u64
360 0 : != u64::from(after_lsn) / XLOG_BLCKSZ as u64
361 : {
362 0 : continue;
363 0 : }
364 0 : // base_size is the size of a logical message without the payload
365 0 : let base_size = u64::from(after_lsn) - u64::from(before_lsn) - 10;
366 0 :
367 0 : // Is there enough space on the page for another logical message and an
368 0 : // XLOG_SWITCH? If not, start over.
369 0 : let page_remain = XLOG_BLCKSZ as u64 - u64::from(after_lsn) % XLOG_BLCKSZ as u64;
370 0 : if page_remain < base_size + XLOG_SIZE_OF_XLOG_RECORD as u64 {
371 0 : continue;
372 0 : }
373 0 :
374 0 : // We will write another logical message, such that after the logical message
375 0 : // record, there will be space for exactly one XLOG_SWITCH. How large should
376 0 : // the logical message's payload be? An XLOG_SWITCH record has no data => its
377 0 : // size is exactly XLOG_SIZE_OF_XLOG_RECORD.
378 0 : let repeats = page_remain - base_size - XLOG_SIZE_OF_XLOG_RECORD as u64;
379 0 :
380 0 : client.execute(
381 0 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
382 0 : &[&(repeats as i32)],
383 0 : )?;
384 0 : info!(
385 0 : "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
386 0 : client.pg_current_wal_insert_lsn()?,
387 : XLOG_SIZE_OF_XLOG_RECORD
388 : );
389 :
390 : // Emit the XLOG_SWITCH
391 0 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
392 0 : let xlog_switch_record_end: PgLsn =
393 0 : client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
394 0 :
395 0 : if u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
396 0 : != XLOG_SIZE_OF_XLOG_SHORT_PHD
397 : {
398 0 : warn!(
399 0 : "XLOG_SWITCH message ended not on page boundary: {}, offset = {}, repeating",
400 0 : xlog_switch_record_end,
401 0 : u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
402 : );
403 0 : continue;
404 0 : }
405 0 : return Ok(vec![before_xlog_switch, xlog_switch_record_end]);
406 : }
407 0 : }
408 : }
409 :
410 : /// Write ~16MB logical message; it should cross WAL segment.
411 8 : fn craft_seg_size_logical_message(
412 8 : client: &mut impl postgres::GenericClient,
413 8 : transactional: bool,
414 8 : ) -> anyhow::Result<Vec<PgLsn>> {
415 8 : craft_internal(client, |client, initial_lsn| {
416 8 : ensure!(
417 8 : initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
418 0 : "Initial LSN is too far in the future"
419 : );
420 :
421 8 : let message_lsn: PgLsn = client
422 8 : .query_one(
423 8 : "select pg_logical_emit_message($1, 'big-16mb-msg', \
424 8 : concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
425 8 : &[&transactional],
426 8 : )?
427 8 : .get("message_lsn");
428 8 : ensure!(
429 8 : message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
430 0 : "Logical message did not cross the segment boundary"
431 : );
432 8 : ensure!(
433 8 : message_lsn < PgLsn::from(0x0400_0000),
434 0 : "Logical message crossed two segments"
435 : );
436 :
437 8 : Ok(vec![message_lsn])
438 8 : })
439 8 : }
440 :
441 : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
442 : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
443 : const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
444 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
445 4 : // Transactional message crossing WAL segment will be followed by small
446 4 : // commit record.
447 4 : craft_seg_size_logical_message(client, true)
448 4 : }
449 : }
450 :
451 : pub struct LastWalRecordCrossingSegment;
452 : impl Crafter for LastWalRecordCrossingSegment {
453 : const NAME: &'static str = "last_wal_record_crossing_segment";
454 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
455 4 : craft_seg_size_logical_message(client, false)
456 4 : }
457 : }
|