TLA Line data Source code
1 : use anyhow::{bail, ensure};
2 : use camino_tempfile::{tempdir, Utf8TempDir};
3 : use log::*;
4 : use postgres::types::PgLsn;
5 : use postgres::Client;
6 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
7 : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
8 : use std::cmp::Ordering;
9 : use std::path::{Path, PathBuf};
10 : use std::process::Command;
11 : use std::time::{Duration, Instant};
12 :
13 : macro_rules! xlog_utils_test {
14 : ($version:ident) => {
15 : #[path = "."]
16 : mod $version {
17 : #[allow(unused_imports)]
18 : pub use postgres_ffi::$version::wal_craft_test_export::*;
19 : #[allow(clippy::duplicate_mod)]
20 : #[cfg(test)]
21 : mod xlog_utils_test;
22 : }
23 : };
24 : }
25 :
26 : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
27 :
28 UBC 0 : #[derive(Debug, Clone, PartialEq, Eq)]
29 : pub struct Conf {
30 : pub pg_version: u32,
31 : pub pg_distrib_dir: PathBuf,
32 : pub datadir: PathBuf,
33 : }
34 :
35 : pub struct PostgresServer {
36 : process: std::process::Child,
37 : _unix_socket_dir: Utf8TempDir,
38 : client_config: postgres::Config,
39 : }
40 :
41 : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
42 : "wal_keep_size=50MB", // Ensure old WAL is not removed
43 : "shared_preload_libraries=neon", // can only be loaded at startup
44 : // Disable background processes as much as possible
45 : "wal_writer_delay=10s",
46 : "autovacuum=off",
47 : ];
48 :
49 : impl Conf {
50 CBC 81 : pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
51 81 : let path = self.pg_distrib_dir.clone();
52 81 :
53 81 : #[allow(clippy::manual_range_patterns)]
54 81 : match self.pg_version {
55 81 : 14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
56 UBC 0 : _ => bail!("Unsupported postgres version: {}", self.pg_version),
57 : }
58 CBC 81 : }
59 :
60 27 : fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
61 27 : Ok(self.pg_distrib_dir()?.join("bin"))
62 27 : }
63 :
64 54 : fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
65 54 : Ok(self.pg_distrib_dir()?.join("lib"))
66 54 : }
67 :
68 135 : pub fn wal_dir(&self) -> PathBuf {
69 135 : self.datadir.join("pg_wal")
70 135 : }
71 :
72 27 : fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
73 27 : let path = self.pg_bin_dir()?.join(command);
74 27 : ensure!(path.exists(), "Command {:?} does not exist", path);
75 27 : let mut cmd = Command::new(path);
76 27 : cmd.env_clear()
77 27 : .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
78 27 : .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
79 27 : Ok(cmd)
80 27 : }
81 :
82 9 : pub fn initdb(&self) -> anyhow::Result<()> {
83 9 : if let Some(parent) = self.datadir.parent() {
84 9 : info!("Pre-creating parent directory {:?}", parent);
85 : // Tests may be run concurrently and there may be a race to create `test_output/`.
86 : // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
87 9 : std::fs::create_dir_all(parent)?;
88 UBC 0 : }
89 CBC 9 : info!(
90 9 : "Running initdb in {:?} with user \"postgres\"",
91 : self.datadir
92 : );
93 9 : let output = self
94 9 : .new_pg_command("initdb")?
95 9 : .arg("-D")
96 9 : .arg(&self.datadir)
97 9 : .args(["-U", "postgres", "--no-instructions", "--no-sync"])
98 9 : .output()?;
99 9 : debug!("initdb output: {:?}", output);
100 : ensure!(
101 9 : output.status.success(),
102 UBC 0 : "initdb failed, stdout and stderr follow:\n{}{}",
103 0 : String::from_utf8_lossy(&output.stdout),
104 0 : String::from_utf8_lossy(&output.stderr),
105 : );
106 CBC 9 : Ok(())
107 9 : }
108 :
109 9 : pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
110 9 : info!("Starting Postgres server in {:?}", self.datadir);
111 9 : let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
112 9 : let unix_socket_dir_path = unix_socket_dir.path().to_owned();
113 9 : let server_process = self
114 9 : .new_pg_command("postgres")?
115 9 : .args(["-c", "listen_addresses="])
116 9 : .arg("-k")
117 9 : .arg(&unix_socket_dir_path)
118 9 : .arg("-D")
119 9 : .arg(&self.datadir)
120 36 : .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
121 9 : .spawn()?;
122 9 : let server = PostgresServer {
123 9 : process: server_process,
124 9 : _unix_socket_dir: unix_socket_dir,
125 9 : client_config: {
126 9 : let mut c = postgres::Config::new();
127 9 : c.host_path(&unix_socket_dir_path);
128 9 : c.user("postgres");
129 9 : c.connect_timeout(Duration::from_millis(10000));
130 9 : c
131 9 : },
132 9 : };
133 9 : Ok(server)
134 9 : }
135 :
136 9 : pub fn pg_waldump(
137 9 : &self,
138 9 : first_segment_name: &str,
139 9 : last_segment_name: &str,
140 9 : ) -> anyhow::Result<std::process::Output> {
141 9 : let first_segment_file = self.datadir.join(first_segment_name);
142 9 : let last_segment_file = self.datadir.join(last_segment_name);
143 9 : info!(
144 9 : "Running pg_waldump for {} .. {}",
145 9 : first_segment_file.display(),
146 9 : last_segment_file.display()
147 : );
148 9 : let output = self
149 9 : .new_pg_command("pg_waldump")?
150 9 : .args([&first_segment_file, &last_segment_file])
151 9 : .output()?;
152 9 : debug!("waldump output: {:?}", output);
153 9 : Ok(output)
154 9 : }
155 : }
156 :
157 : impl PostgresServer {
158 9 : pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
159 9 : let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
160 18 : while Instant::now() < retry_until {
161 18 : if let Ok(client) = self.client_config.connect(postgres::NoTls) {
162 9 : return Ok(client);
163 9 : }
164 9 : std::thread::sleep(Duration::from_millis(100));
165 : }
166 UBC 0 : bail!("Connection timed out");
167 CBC 9 : }
168 :
169 9 : pub fn kill(mut self) {
170 9 : self.process.kill().unwrap();
171 9 : self.process.wait().unwrap();
172 9 : }
173 : }
174 :
175 : impl Drop for PostgresServer {
176 9 : fn drop(&mut self) {
177 9 : match self.process.try_wait() {
178 9 : Ok(Some(_)) => return,
179 : Ok(None) => {
180 UBC 0 : warn!("Server was not terminated, will be killed");
181 : }
182 0 : Err(e) => {
183 0 : error!("Unable to get status of the server: {}, will be killed", e);
184 : }
185 : }
186 0 : let _ = self.process.kill();
187 CBC 9 : }
188 : }
189 :
190 : pub trait PostgresClientExt: postgres::GenericClient {
191 35 : fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
192 35 : Ok(self
193 35 : .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
194 35 : .get(0))
195 35 : }
196 12 : fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
197 12 : Ok(self
198 12 : .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
199 12 : .get(0))
200 12 : }
201 : }
202 :
203 : impl<C: postgres::GenericClient> PostgresClientExt for C {}
204 :
205 14 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
206 14 : client.execute("create extension if not exists neon_test_utils", &[])?;
207 :
208 14 : let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
209 14 : ensure!(wal_keep_size == "50MB");
210 14 : let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
211 14 : ensure!(wal_writer_delay == "10s");
212 14 : let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
213 14 : ensure!(autovacuum == "off");
214 :
215 14 : let wal_segment_size = client.query_one(
216 14 : "select cast(setting as bigint) as setting, unit \
217 14 : from pg_settings where name = 'wal_segment_size'",
218 14 : &[],
219 14 : )?;
220 : ensure!(
221 14 : wal_segment_size.get::<_, String>("unit") == "B",
222 UBC 0 : "Unexpected wal_segment_size unit"
223 : );
224 : ensure!(
225 CBC 14 : wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
226 UBC 0 : "Unexpected wal_segment_size in bytes"
227 : );
228 :
229 CBC 14 : Ok(())
230 14 : }
231 :
232 : pub trait Crafter {
233 : const NAME: &'static str;
234 :
235 : /// Generates WAL using the client `client`. Returns a pair of:
236 : /// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
237 : /// May include or exclude Lsn(0) and the end-of-wal.
238 : /// * The expected end-of-wal LSN.
239 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
240 : }
241 :
242 12 : fn craft_internal<C: postgres::GenericClient>(
243 12 : client: &mut C,
244 12 : f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
245 12 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
246 12 : ensure_server_config(client)?;
247 :
248 12 : let initial_lsn = client.pg_current_wal_insert_lsn()?;
249 12 : info!("LSN initial = {}", initial_lsn);
250 :
251 12 : let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
252 12 : let last_lsn = match last_lsn {
253 4 : None => client.pg_current_wal_insert_lsn()?,
254 8 : Some(last_lsn) => {
255 8 : let insert_lsn = client.pg_current_wal_insert_lsn()?;
256 8 : match last_lsn.cmp(&insert_lsn) {
257 UBC 0 : Ordering::Less => bail!(
258 0 : "Some records were inserted after the crafted WAL: {} vs {}",
259 0 : last_lsn,
260 0 : insert_lsn
261 0 : ),
262 CBC 8 : Ordering::Equal => last_lsn,
263 UBC 0 : Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
264 : }
265 : }
266 : };
267 CBC 12 : if !intermediate_lsns.starts_with(&[initial_lsn]) {
268 12 : intermediate_lsns.insert(0, initial_lsn);
269 12 : }
270 :
271 : // Some records may be not flushed, e.g. non-transactional logical messages.
272 12 : client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
273 12 : match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
274 UBC 0 : Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
275 CBC 12 : Ordering::Equal => {}
276 UBC 0 : Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
277 : }
278 CBC 12 : Ok((intermediate_lsns, last_lsn))
279 12 : }
280 :
281 : pub struct Simple;
282 : impl Crafter for Simple {
283 : const NAME: &'static str = "simple";
284 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
285 4 : craft_internal(client, |client, _| {
286 4 : client.execute("CREATE table t(x int)", &[])?;
287 4 : Ok((Vec::new(), None))
288 4 : })
289 4 : }
290 : }
291 :
292 : pub struct LastWalRecordXlogSwitch;
293 : impl Crafter for LastWalRecordXlogSwitch {
294 : const NAME: &'static str = "last_wal_record_xlog_switch";
295 1 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
296 1 : // Do not use generate_internal because here we end up with flush_lsn exactly on
297 1 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
298 1 : ensure_server_config(client)?;
299 :
300 1 : client.execute("CREATE table t(x int)", &[])?;
301 1 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
302 1 : let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
303 1 : let next_segment = PgLsn::from(0x0200_0000);
304 1 : ensure!(
305 1 : after_xlog_switch <= next_segment,
306 UBC 0 : "XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
307 : after_xlog_switch,
308 : next_segment
309 : );
310 CBC 1 : Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
311 1 : }
312 : }
313 :
314 : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
315 : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
316 : const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
317 1 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
318 1 : // Do not use generate_internal because here we end up with flush_lsn exactly on
319 1 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
320 1 : ensure_server_config(client)?;
321 :
322 1 : client.execute("CREATE table t(x int)", &[])?;
323 :
324 : // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
325 : // We will use logical message as the padding. We start with detecting how much WAL
326 : // it takes for one logical message, considering all alignments and headers.
327 1 : let base_wal_advance = {
328 1 : let before_lsn = client.pg_current_wal_insert_lsn()?;
329 : // Small non-empty message bigger than few bytes is more likely than an empty
330 : // message to have the same format as the big padding message.
331 1 : client.execute(
332 1 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
333 1 : &[],
334 1 : )?;
335 : // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
336 1 : (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
337 : + XLOG_SIZE_OF_XLOG_RECORD
338 : };
339 1 : let mut remaining_lsn =
340 1 : XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
341 1 : if remaining_lsn < base_wal_advance {
342 UBC 0 : remaining_lsn += XLOG_BLCKSZ;
343 CBC 1 : }
344 1 : let repeats = 10 + remaining_lsn - base_wal_advance;
345 1 : info!(
346 1 : "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
347 1 : client.pg_current_wal_insert_lsn()?,
348 : remaining_lsn,
349 : base_wal_advance,
350 : repeats
351 : );
352 1 : client.execute(
353 1 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
354 1 : &[&(repeats as i32)],
355 1 : )?;
356 1 : info!(
357 1 : "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
358 1 : client.pg_current_wal_insert_lsn()?,
359 : XLOG_SIZE_OF_XLOG_RECORD
360 : );
361 :
362 : // Emit the XLOG_SWITCH
363 1 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
364 1 : let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
365 1 : let next_segment = PgLsn::from(0x0200_0000);
366 1 : ensure!(
367 1 : after_xlog_switch < next_segment,
368 UBC 0 : "XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
369 : after_xlog_switch,
370 : next_segment
371 : );
372 : ensure!(
373 CBC 1 : u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
374 UBC 0 : "XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
375 0 : after_xlog_switch,
376 0 : u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ
377 : );
378 CBC 1 : Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
379 1 : }
380 : }
381 :
382 8 : fn craft_single_logical_message(
383 8 : client: &mut impl postgres::GenericClient,
384 8 : transactional: bool,
385 8 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
386 8 : craft_internal(client, |client, initial_lsn| {
387 8 : ensure!(
388 8 : initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
389 UBC 0 : "Initial LSN is too far in the future"
390 : );
391 :
392 CBC 8 : let message_lsn: PgLsn = client
393 8 : .query_one(
394 8 : "select pg_logical_emit_message($1, 'big-16mb-msg', \
395 8 : concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
396 8 : &[&transactional],
397 8 : )?
398 8 : .get("message_lsn");
399 8 : ensure!(
400 8 : message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
401 UBC 0 : "Logical message did not cross the segment boundary"
402 : );
403 : ensure!(
404 CBC 8 : message_lsn < PgLsn::from(0x0400_0000),
405 UBC 0 : "Logical message crossed two segments"
406 : );
407 :
408 CBC 8 : if transactional {
409 : // Transactional logical messages are part of a transaction, so the one above is
410 : // followed by a small COMMIT record.
411 :
412 4 : let after_message_lsn = client.pg_current_wal_insert_lsn()?;
413 : ensure!(
414 4 : message_lsn < after_message_lsn,
415 UBC 0 : "No record found after the emitted message"
416 : );
417 CBC 4 : Ok((vec![message_lsn], Some(after_message_lsn)))
418 : } else {
419 4 : Ok((Vec::new(), Some(message_lsn)))
420 : }
421 8 : })
422 8 : }
423 :
424 : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
425 : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
426 : const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
427 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
428 4 : craft_single_logical_message(client, true)
429 4 : }
430 : }
431 :
432 : pub struct LastWalRecordCrossingSegment;
433 : impl Crafter for LastWalRecordCrossingSegment {
434 : const NAME: &'static str = "last_wal_record_crossing_segment";
435 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
436 4 : craft_single_logical_message(client, false)
437 4 : }
438 : }
|