TLA Line data Source code
1 : use anyhow::{bail, ensure};
2 : use camino_tempfile::{tempdir, Utf8TempDir};
3 : use log::*;
4 : use postgres::types::PgLsn;
5 : use postgres::Client;
6 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
7 : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
8 : use std::cmp::Ordering;
9 : use std::path::{Path, PathBuf};
10 : use std::process::Command;
11 : use std::time::{Duration, Instant};
12 :
13 : macro_rules! xlog_utils_test {
14 : ($version:ident) => {
15 : #[path = "."]
16 : mod $version {
17 : pub use postgres_ffi::$version::wal_craft_test_export::*;
18 : #[allow(clippy::duplicate_mod)]
19 : #[cfg(test)]
20 : mod xlog_utils_test;
21 : }
22 : };
23 : }
24 :
25 : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
26 :
27 UBC 0 : #[derive(Debug, Clone, PartialEq, Eq)]
28 : pub struct Conf {
29 : pub pg_version: u32,
30 : pub pg_distrib_dir: PathBuf,
31 : pub datadir: PathBuf,
32 : }
33 :
34 : pub struct PostgresServer {
35 : process: std::process::Child,
36 : _unix_socket_dir: Utf8TempDir,
37 : client_config: postgres::Config,
38 : }
39 :
40 : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
41 : "wal_keep_size=50MB", // Ensure old WAL is not removed
42 : "shared_preload_libraries=neon", // can only be loaded at startup
43 : // Disable background processes as much as possible
44 : "wal_writer_delay=10s",
45 : "autovacuum=off",
46 : ];
47 :
48 : impl Conf {
49 CBC 81 : pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
50 81 : let path = self.pg_distrib_dir.clone();
51 81 :
52 81 : #[allow(clippy::manual_range_patterns)]
53 81 : match self.pg_version {
54 81 : 14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
55 UBC 0 : _ => bail!("Unsupported postgres version: {}", self.pg_version),
56 : }
57 CBC 81 : }
58 :
59 27 : fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
60 27 : Ok(self.pg_distrib_dir()?.join("bin"))
61 27 : }
62 :
63 54 : fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
64 54 : Ok(self.pg_distrib_dir()?.join("lib"))
65 54 : }
66 :
67 135 : pub fn wal_dir(&self) -> PathBuf {
68 135 : self.datadir.join("pg_wal")
69 135 : }
70 :
71 27 : fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
72 27 : let path = self.pg_bin_dir()?.join(command);
73 27 : ensure!(path.exists(), "Command {:?} does not exist", path);
74 27 : let mut cmd = Command::new(path);
75 27 : cmd.env_clear()
76 27 : .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
77 27 : .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
78 27 : Ok(cmd)
79 27 : }
80 :
81 : pub fn initdb(&self) -> anyhow::Result<()> {
82 9 : if let Some(parent) = self.datadir.parent() {
83 9 : info!("Pre-creating parent directory {:?}", parent);
84 : // Tests may be run concurrently and there may be a race to create `test_output/`.
85 : // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
86 9 : std::fs::create_dir_all(parent)?;
87 UBC 0 : }
88 CBC 9 : info!(
89 9 : "Running initdb in {:?} with user \"postgres\"",
90 : self.datadir
91 : );
92 9 : let output = self
93 9 : .new_pg_command("initdb")?
94 9 : .arg("-D")
95 9 : .arg(&self.datadir)
96 9 : .args(["-U", "postgres", "--no-instructions", "--no-sync"])
97 9 : .output()?;
98 9 : debug!("initdb output: {:?}", output);
99 9 : ensure!(
100 9 : output.status.success(),
101 UBC 0 : "initdb failed, stdout and stderr follow:\n{}{}",
102 0 : String::from_utf8_lossy(&output.stdout),
103 0 : String::from_utf8_lossy(&output.stderr),
104 : );
105 CBC 9 : Ok(())
106 9 : }
107 :
108 9 : pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
109 9 : info!("Starting Postgres server in {:?}", self.datadir);
110 9 : let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
111 9 : let unix_socket_dir_path = unix_socket_dir.path().to_owned();
112 9 : let server_process = self
113 9 : .new_pg_command("postgres")?
114 9 : .args(["-c", "listen_addresses="])
115 9 : .arg("-k")
116 9 : .arg(&unix_socket_dir_path)
117 9 : .arg("-D")
118 9 : .arg(&self.datadir)
119 36 : .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
120 9 : .spawn()?;
121 9 : let server = PostgresServer {
122 9 : process: server_process,
123 9 : _unix_socket_dir: unix_socket_dir,
124 9 : client_config: {
125 9 : let mut c = postgres::Config::new();
126 9 : c.host_path(&unix_socket_dir_path);
127 9 : c.user("postgres");
128 9 : c.connect_timeout(Duration::from_millis(10000));
129 9 : c
130 9 : },
131 9 : };
132 9 : Ok(server)
133 9 : }
134 :
135 9 : pub fn pg_waldump(
136 9 : &self,
137 9 : first_segment_name: &str,
138 9 : last_segment_name: &str,
139 9 : ) -> anyhow::Result<std::process::Output> {
140 9 : let first_segment_file = self.datadir.join(first_segment_name);
141 9 : let last_segment_file = self.datadir.join(last_segment_name);
142 9 : info!(
143 9 : "Running pg_waldump for {} .. {}",
144 9 : first_segment_file.display(),
145 9 : last_segment_file.display()
146 : );
147 9 : let output = self
148 9 : .new_pg_command("pg_waldump")?
149 9 : .args([&first_segment_file, &last_segment_file])
150 9 : .output()?;
151 9 : debug!("waldump output: {:?}", output);
152 9 : Ok(output)
153 9 : }
154 : }
155 :
156 : impl PostgresServer {
157 9 : pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
158 9 : let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
159 18 : while Instant::now() < retry_until {
160 18 : if let Ok(client) = self.client_config.connect(postgres::NoTls) {
161 9 : return Ok(client);
162 9 : }
163 9 : std::thread::sleep(Duration::from_millis(100));
164 : }
165 UBC 0 : bail!("Connection timed out");
166 CBC 9 : }
167 :
168 9 : pub fn kill(mut self) {
169 9 : self.process.kill().unwrap();
170 9 : self.process.wait().unwrap();
171 9 : }
172 : }
173 :
174 : impl Drop for PostgresServer {
175 9 : fn drop(&mut self) {
176 9 : match self.process.try_wait() {
177 9 : Ok(Some(_)) => return,
178 : Ok(None) => {
179 UBC 0 : warn!("Server was not terminated, will be killed");
180 : }
181 0 : Err(e) => {
182 0 : error!("Unable to get status of the server: {}, will be killed", e);
183 : }
184 : }
185 0 : let _ = self.process.kill();
186 CBC 9 : }
187 : }
188 :
189 : pub trait PostgresClientExt: postgres::GenericClient {
190 35 : fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
191 35 : Ok(self
192 35 : .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
193 35 : .get(0))
194 35 : }
195 12 : fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
196 12 : Ok(self
197 12 : .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
198 12 : .get(0))
199 12 : }
200 : }
201 :
202 : impl<C: postgres::GenericClient> PostgresClientExt for C {}
203 :
204 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
205 14 : client.execute("create extension if not exists neon_test_utils", &[])?;
206 :
207 14 : let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
208 14 : ensure!(wal_keep_size == "50MB");
209 14 : let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
210 14 : ensure!(wal_writer_delay == "10s");
211 14 : let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
212 14 : ensure!(autovacuum == "off");
213 :
214 14 : let wal_segment_size = client.query_one(
215 14 : "select cast(setting as bigint) as setting, unit \
216 14 : from pg_settings where name = 'wal_segment_size'",
217 14 : &[],
218 14 : )?;
219 14 : ensure!(
220 14 : wal_segment_size.get::<_, String>("unit") == "B",
221 UBC 0 : "Unexpected wal_segment_size unit"
222 : );
223 CBC 14 : ensure!(
224 14 : wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
225 UBC 0 : "Unexpected wal_segment_size in bytes"
226 : );
227 :
228 CBC 14 : Ok(())
229 14 : }
230 :
231 : pub trait Crafter {
232 : const NAME: &'static str;
233 :
234 : /// Generates WAL using the client `client`. Returns a pair of:
235 : /// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
236 : /// May include or exclude Lsn(0) and the end-of-wal.
237 : /// * The expected end-of-wal LSN.
238 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
239 : }
240 :
241 : fn craft_internal<C: postgres::GenericClient>(
242 : client: &mut C,
243 : f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
244 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
245 12 : ensure_server_config(client)?;
246 :
247 12 : let initial_lsn = client.pg_current_wal_insert_lsn()?;
248 12 : info!("LSN initial = {}", initial_lsn);
249 :
250 12 : let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
251 12 : let last_lsn = match last_lsn {
252 4 : None => client.pg_current_wal_insert_lsn()?,
253 8 : Some(last_lsn) => {
254 8 : let insert_lsn = client.pg_current_wal_insert_lsn()?;
255 8 : match last_lsn.cmp(&insert_lsn) {
256 UBC 0 : Ordering::Less => bail!(
257 0 : "Some records were inserted after the crafted WAL: {} vs {}",
258 0 : last_lsn,
259 0 : insert_lsn
260 0 : ),
261 CBC 8 : Ordering::Equal => last_lsn,
262 UBC 0 : Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
263 : }
264 : }
265 : };
266 CBC 12 : if !intermediate_lsns.starts_with(&[initial_lsn]) {
267 12 : intermediate_lsns.insert(0, initial_lsn);
268 12 : }
269 :
270 : // Some records may be not flushed, e.g. non-transactional logical messages.
271 12 : client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
272 12 : match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
273 UBC 0 : Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
274 CBC 12 : Ordering::Equal => {}
275 UBC 0 : Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
276 : }
277 CBC 12 : Ok((intermediate_lsns, last_lsn))
278 12 : }
279 :
280 : pub struct Simple;
281 : impl Crafter for Simple {
282 : const NAME: &'static str = "simple";
283 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
284 4 : craft_internal(client, |client, _| {
285 4 : client.execute("CREATE table t(x int)", &[])?;
286 4 : Ok((Vec::new(), None))
287 4 : })
288 4 : }
289 : }
290 :
291 : pub struct LastWalRecordXlogSwitch;
292 : impl Crafter for LastWalRecordXlogSwitch {
293 : const NAME: &'static str = "last_wal_record_xlog_switch";
294 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
295 : // Do not use generate_internal because here we end up with flush_lsn exactly on
296 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
297 1 : ensure_server_config(client)?;
298 :
299 1 : client.execute("CREATE table t(x int)", &[])?;
300 1 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
301 1 : let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
302 1 : let next_segment = PgLsn::from(0x0200_0000);
303 1 : ensure!(
304 1 : after_xlog_switch <= next_segment,
305 UBC 0 : "XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
306 : after_xlog_switch,
307 : next_segment
308 : );
309 CBC 1 : Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
310 1 : }
311 : }
312 :
313 : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
314 : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
315 : const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
316 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
317 : // Do not use generate_internal because here we end up with flush_lsn exactly on
318 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
319 1 : ensure_server_config(client)?;
320 :
321 1 : client.execute("CREATE table t(x int)", &[])?;
322 :
323 : // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
324 : // We will use logical message as the padding. We start with detecting how much WAL
325 : // it takes for one logical message, considering all alignments and headers.
326 1 : let base_wal_advance = {
327 1 : let before_lsn = client.pg_current_wal_insert_lsn()?;
328 : // Small non-empty message bigger than few bytes is more likely than an empty
329 : // message to have the same format as the big padding message.
330 1 : client.execute(
331 1 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
332 1 : &[],
333 1 : )?;
334 : // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
335 1 : (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
336 : + XLOG_SIZE_OF_XLOG_RECORD
337 : };
338 1 : let mut remaining_lsn =
339 1 : XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
340 1 : if remaining_lsn < base_wal_advance {
341 UBC 0 : remaining_lsn += XLOG_BLCKSZ;
342 CBC 1 : }
343 1 : let repeats = 10 + remaining_lsn - base_wal_advance;
344 1 : info!(
345 1 : "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
346 1 : client.pg_current_wal_insert_lsn()?,
347 : remaining_lsn,
348 : base_wal_advance,
349 : repeats
350 : );
351 1 : client.execute(
352 1 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
353 1 : &[&(repeats as i32)],
354 1 : )?;
355 1 : info!(
356 1 : "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
357 1 : client.pg_current_wal_insert_lsn()?,
358 : XLOG_SIZE_OF_XLOG_RECORD
359 : );
360 :
361 : // Emit the XLOG_SWITCH
362 1 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
363 1 : let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
364 1 : let next_segment = PgLsn::from(0x0200_0000);
365 1 : ensure!(
366 1 : after_xlog_switch < next_segment,
367 UBC 0 : "XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
368 : after_xlog_switch,
369 : next_segment
370 : );
371 CBC 1 : ensure!(
372 1 : u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
373 UBC 0 : "XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
374 0 : after_xlog_switch,
375 0 : u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ
376 : );
377 CBC 1 : Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
378 1 : }
379 : }
380 :
381 8 : fn craft_single_logical_message(
382 8 : client: &mut impl postgres::GenericClient,
383 8 : transactional: bool,
384 8 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
385 8 : craft_internal(client, |client, initial_lsn| {
386 8 : ensure!(
387 8 : initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
388 UBC 0 : "Initial LSN is too far in the future"
389 : );
390 :
391 CBC 8 : let message_lsn: PgLsn = client
392 8 : .query_one(
393 8 : "select pg_logical_emit_message($1, 'big-16mb-msg', \
394 8 : concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
395 8 : &[&transactional],
396 8 : )?
397 8 : .get("message_lsn");
398 8 : ensure!(
399 8 : message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
400 UBC 0 : "Logical message did not cross the segment boundary"
401 : );
402 CBC 8 : ensure!(
403 8 : message_lsn < PgLsn::from(0x0400_0000),
404 UBC 0 : "Logical message crossed two segments"
405 : );
406 :
407 CBC 8 : if transactional {
408 : // Transactional logical messages are part of a transaction, so the one above is
409 : // followed by a small COMMIT record.
410 :
411 4 : let after_message_lsn = client.pg_current_wal_insert_lsn()?;
412 4 : ensure!(
413 4 : message_lsn < after_message_lsn,
414 UBC 0 : "No record found after the emitted message"
415 : );
416 CBC 4 : Ok((vec![message_lsn], Some(after_message_lsn)))
417 : } else {
418 4 : Ok((Vec::new(), Some(message_lsn)))
419 : }
420 8 : })
421 8 : }
422 :
423 : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
424 : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
425 : const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
426 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
427 4 : craft_single_logical_message(client, true)
428 4 : }
429 : }
430 :
431 : pub struct LastWalRecordCrossingSegment;
432 : impl Crafter for LastWalRecordCrossingSegment {
433 : const NAME: &'static str = "last_wal_record_crossing_segment";
434 4 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
435 4 : craft_single_logical_message(client, false)
436 4 : }
437 : }
|