Line data Source code
1 : use anyhow::{bail, ensure};
2 : use log::*;
3 : use postgres::types::PgLsn;
4 : use postgres::Client;
5 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
6 : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
7 : use std::cmp::Ordering;
8 : use std::path::{Path, PathBuf};
9 : use std::process::Command;
10 : use std::time::{Duration, Instant};
11 : use tempfile::{tempdir, TempDir};
12 :
13 : macro_rules! xlog_utils_test {
14 : ($version:ident) => {
15 : #[path = "."]
16 : mod $version {
17 : pub use postgres_ffi::$version::wal_craft_test_export::*;
18 : #[allow(clippy::duplicate_mod)]
19 : #[cfg(test)]
20 : mod xlog_utils_test;
21 : }
22 : };
23 : }
24 :
25 : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
26 :
27 0 : #[derive(Debug, Clone, PartialEq, Eq)]
28 : pub struct Conf {
29 : pub pg_version: u32,
30 : pub pg_distrib_dir: PathBuf,
31 : pub datadir: PathBuf,
32 : }
33 :
34 : pub struct PostgresServer {
35 : process: std::process::Child,
36 : _unix_socket_dir: TempDir,
37 : client_config: postgres::Config,
38 : }
39 :
40 : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
41 : "wal_keep_size=50MB", // Ensure old WAL is not removed
42 : "shared_preload_libraries=neon", // can only be loaded at startup
43 : // Disable background processes as much as possible
44 : "wal_writer_delay=10s",
45 : "autovacuum=off",
46 : ];
47 :
48 : impl Conf {
49 54 : pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
50 54 : let path = self.pg_distrib_dir.clone();
51 54 :
52 54 : match self.pg_version {
53 27 : 14 => Ok(path.join(format!("v{}", self.pg_version))),
54 27 : 15 => Ok(path.join(format!("v{}", self.pg_version))),
55 0 : _ => bail!("Unsupported postgres version: {}", self.pg_version),
56 : }
57 54 : }
58 :
59 18 : fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
60 18 : Ok(self.pg_distrib_dir()?.join("bin"))
61 18 : }
62 :
63 36 : fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
64 36 : Ok(self.pg_distrib_dir()?.join("lib"))
65 36 : }
66 :
67 90 : pub fn wal_dir(&self) -> PathBuf {
68 90 : self.datadir.join("pg_wal")
69 90 : }
70 :
71 18 : fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
72 18 : let path = self.pg_bin_dir()?.join(command);
73 18 : ensure!(path.exists(), "Command {:?} does not exist", path);
74 18 : let mut cmd = Command::new(path);
75 18 : cmd.env_clear()
76 18 : .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
77 18 : .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
78 18 : Ok(cmd)
79 18 : }
80 :
81 : pub fn initdb(&self) -> anyhow::Result<()> {
82 6 : if let Some(parent) = self.datadir.parent() {
83 6 : info!("Pre-creating parent directory {:?}", parent);
84 : // Tests may be run concurrently and there may be a race to create `test_output/`.
85 : // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
86 6 : std::fs::create_dir_all(parent)?;
87 0 : }
88 6 : info!(
89 6 : "Running initdb in {:?} with user \"postgres\"",
90 : self.datadir
91 : );
92 6 : let output = self
93 6 : .new_pg_command("initdb")?
94 6 : .arg("-D")
95 6 : .arg(&self.datadir)
96 6 : .args(["-U", "postgres", "--no-instructions", "--no-sync"])
97 6 : .output()?;
98 6 : debug!("initdb output: {:?}", output);
99 6 : ensure!(
100 6 : output.status.success(),
101 0 : "initdb failed, stdout and stderr follow:\n{}{}",
102 0 : String::from_utf8_lossy(&output.stdout),
103 0 : String::from_utf8_lossy(&output.stderr),
104 : );
105 6 : Ok(())
106 6 : }
107 :
108 6 : pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
109 6 : info!("Starting Postgres server in {:?}", self.datadir);
110 6 : let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
111 6 : let unix_socket_dir_path = unix_socket_dir.path().to_owned();
112 6 : let server_process = self
113 6 : .new_pg_command("postgres")?
114 6 : .args(["-c", "listen_addresses="])
115 6 : .arg("-k")
116 6 : .arg(&unix_socket_dir_path)
117 6 : .arg("-D")
118 6 : .arg(&self.datadir)
119 24 : .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
120 6 : .spawn()?;
121 6 : let server = PostgresServer {
122 6 : process: server_process,
123 6 : _unix_socket_dir: unix_socket_dir,
124 6 : client_config: {
125 6 : let mut c = postgres::Config::new();
126 6 : c.host_path(&unix_socket_dir_path);
127 6 : c.user("postgres");
128 6 : c.connect_timeout(Duration::from_millis(10000));
129 6 : c
130 6 : },
131 6 : };
132 6 : Ok(server)
133 6 : }
134 :
135 6 : pub fn pg_waldump(
136 6 : &self,
137 6 : first_segment_name: &str,
138 6 : last_segment_name: &str,
139 6 : ) -> anyhow::Result<std::process::Output> {
140 6 : let first_segment_file = self.datadir.join(first_segment_name);
141 6 : let last_segment_file = self.datadir.join(last_segment_name);
142 6 : info!(
143 6 : "Running pg_waldump for {} .. {}",
144 6 : first_segment_file.display(),
145 6 : last_segment_file.display()
146 : );
147 6 : let output = self
148 6 : .new_pg_command("pg_waldump")?
149 6 : .args([&first_segment_file, &last_segment_file])
150 6 : .output()?;
151 6 : debug!("waldump output: {:?}", output);
152 6 : Ok(output)
153 6 : }
154 : }
155 :
156 : impl PostgresServer {
157 6 : pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
158 6 : let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
159 12 : while Instant::now() < retry_until {
160 12 : if let Ok(client) = self.client_config.connect(postgres::NoTls) {
161 6 : return Ok(client);
162 6 : }
163 6 : std::thread::sleep(Duration::from_millis(100));
164 : }
165 0 : bail!("Connection timed out");
166 6 : }
167 :
168 6 : pub fn kill(mut self) {
169 6 : self.process.kill().unwrap();
170 6 : self.process.wait().unwrap();
171 6 : }
172 : }
173 :
174 : impl Drop for PostgresServer {
175 6 : fn drop(&mut self) {
176 6 : match self.process.try_wait() {
177 6 : Ok(Some(_)) => return,
178 : Ok(None) => {
179 0 : warn!("Server was not terminated, will be killed");
180 : }
181 0 : Err(e) => {
182 0 : error!("Unable to get status of the server: {}, will be killed", e);
183 : }
184 : }
185 0 : let _ = self.process.kill();
186 6 : }
187 : }
188 :
189 : pub trait PostgresClientExt: postgres::GenericClient {
190 28 : fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
191 28 : Ok(self
192 28 : .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
193 28 : .get(0))
194 28 : }
195 9 : fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
196 9 : Ok(self
197 9 : .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
198 9 : .get(0))
199 9 : }
200 : }
201 :
202 : impl<C: postgres::GenericClient> PostgresClientExt for C {}
203 :
204 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
205 11 : client.execute("create extension if not exists neon_test_utils", &[])?;
206 :
207 11 : let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
208 11 : ensure!(wal_keep_size == "50MB");
209 11 : let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
210 11 : ensure!(wal_writer_delay == "10s");
211 11 : let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
212 11 : ensure!(autovacuum == "off");
213 :
214 11 : let wal_segment_size = client.query_one(
215 11 : "select cast(setting as bigint) as setting, unit \
216 11 : from pg_settings where name = 'wal_segment_size'",
217 11 : &[],
218 11 : )?;
219 11 : ensure!(
220 11 : wal_segment_size.get::<_, String>("unit") == "B",
221 0 : "Unexpected wal_segment_size unit"
222 : );
223 11 : ensure!(
224 11 : wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
225 0 : "Unexpected wal_segment_size in bytes"
226 : );
227 :
228 11 : Ok(())
229 11 : }
230 :
231 : pub trait Crafter {
232 : const NAME: &'static str;
233 :
234 : /// Generates WAL using the client `client`. Returns a pair of:
235 : /// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
236 : /// May include or exclude Lsn(0) and the end-of-wal.
237 : /// * The expected end-of-wal LSN.
238 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
239 : }
240 :
241 : fn craft_internal<C: postgres::GenericClient>(
242 : client: &mut C,
243 : f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
244 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
245 9 : ensure_server_config(client)?;
246 :
247 9 : let initial_lsn = client.pg_current_wal_insert_lsn()?;
248 9 : info!("LSN initial = {}", initial_lsn);
249 :
250 9 : let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
251 9 : let last_lsn = match last_lsn {
252 3 : None => client.pg_current_wal_insert_lsn()?,
253 6 : Some(last_lsn) => match last_lsn.cmp(&client.pg_current_wal_insert_lsn()?) {
254 0 : Ordering::Less => bail!("Some records were inserted after the crafted WAL"),
255 6 : Ordering::Equal => last_lsn,
256 0 : Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
257 : },
258 : };
259 9 : if !intermediate_lsns.starts_with(&[initial_lsn]) {
260 9 : intermediate_lsns.insert(0, initial_lsn);
261 9 : }
262 :
263 : // Some records may be not flushed, e.g. non-transactional logical messages.
264 9 : client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
265 9 : match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
266 0 : Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
267 9 : Ordering::Equal => {}
268 0 : Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
269 : }
270 9 : Ok((intermediate_lsns, last_lsn))
271 9 : }
272 :
273 : pub struct Simple;
274 : impl Crafter for Simple {
275 : const NAME: &'static str = "simple";
276 3 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
277 3 : craft_internal(client, |client, _| {
278 3 : client.execute("CREATE table t(x int)", &[])?;
279 3 : Ok((Vec::new(), None))
280 3 : })
281 3 : }
282 : }
283 :
284 : pub struct LastWalRecordXlogSwitch;
285 : impl Crafter for LastWalRecordXlogSwitch {
286 : const NAME: &'static str = "last_wal_record_xlog_switch";
287 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
288 : // Do not use generate_internal because here we end up with flush_lsn exactly on
289 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
290 1 : ensure_server_config(client)?;
291 :
292 1 : client.execute("CREATE table t(x int)", &[])?;
293 1 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
294 1 : let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
295 1 : let next_segment = PgLsn::from(0x0200_0000);
296 1 : ensure!(
297 1 : after_xlog_switch <= next_segment,
298 0 : "XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
299 : after_xlog_switch,
300 : next_segment
301 : );
302 1 : Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
303 1 : }
304 : }
305 :
306 : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
307 : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
308 : const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
309 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
310 : // Do not use generate_internal because here we end up with flush_lsn exactly on
311 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
312 1 : ensure_server_config(client)?;
313 :
314 1 : client.execute("CREATE table t(x int)", &[])?;
315 :
316 : // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
317 : // We will use logical message as the padding. We start with detecting how much WAL
318 : // it takes for one logical message, considering all alignments and headers.
319 1 : let base_wal_advance = {
320 1 : let before_lsn = client.pg_current_wal_insert_lsn()?;
321 : // Small non-empty message bigger than few bytes is more likely than an empty
322 : // message to have the same format as the big padding message.
323 1 : client.execute(
324 1 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
325 1 : &[],
326 1 : )?;
327 : // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
328 1 : (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
329 : + XLOG_SIZE_OF_XLOG_RECORD
330 : };
331 1 : let mut remaining_lsn =
332 1 : XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
333 1 : if remaining_lsn < base_wal_advance {
334 0 : remaining_lsn += XLOG_BLCKSZ;
335 1 : }
336 1 : let repeats = 10 + remaining_lsn - base_wal_advance;
337 1 : info!(
338 1 : "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
339 1 : client.pg_current_wal_insert_lsn()?,
340 : remaining_lsn,
341 : base_wal_advance,
342 : repeats
343 : );
344 1 : client.execute(
345 1 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
346 1 : &[&(repeats as i32)],
347 1 : )?;
348 1 : info!(
349 1 : "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
350 1 : client.pg_current_wal_insert_lsn()?,
351 : XLOG_SIZE_OF_XLOG_RECORD
352 : );
353 :
354 : // Emit the XLOG_SWITCH
355 1 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
356 1 : let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
357 1 : let next_segment = PgLsn::from(0x0200_0000);
358 1 : ensure!(
359 1 : after_xlog_switch < next_segment,
360 0 : "XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
361 : after_xlog_switch,
362 : next_segment
363 : );
364 1 : ensure!(
365 1 : u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
366 0 : "XLOG_SWITCH message ended not on page boundary: {}",
367 : after_xlog_switch
368 : );
369 1 : Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
370 1 : }
371 : }
372 :
373 6 : fn craft_single_logical_message(
374 6 : client: &mut impl postgres::GenericClient,
375 6 : transactional: bool,
376 6 : ) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
377 6 : craft_internal(client, |client, initial_lsn| {
378 6 : ensure!(
379 6 : initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
380 0 : "Initial LSN is too far in the future"
381 : );
382 :
383 6 : let message_lsn: PgLsn = client
384 6 : .query_one(
385 6 : "select pg_logical_emit_message($1, 'big-16mb-msg', \
386 6 : concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
387 6 : &[&transactional],
388 6 : )?
389 6 : .get("message_lsn");
390 6 : ensure!(
391 6 : message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
392 0 : "Logical message did not cross the segment boundary"
393 : );
394 6 : ensure!(
395 6 : message_lsn < PgLsn::from(0x0400_0000),
396 0 : "Logical message crossed two segments"
397 : );
398 :
399 6 : if transactional {
400 : // Transactional logical messages are part of a transaction, so the one above is
401 : // followed by a small COMMIT record.
402 :
403 3 : let after_message_lsn = client.pg_current_wal_insert_lsn()?;
404 3 : ensure!(
405 3 : message_lsn < after_message_lsn,
406 0 : "No record found after the emitted message"
407 : );
408 3 : Ok((vec![message_lsn], Some(after_message_lsn)))
409 : } else {
410 3 : Ok((Vec::new(), Some(message_lsn)))
411 : }
412 6 : })
413 6 : }
414 :
415 : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
416 : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
417 : const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
418 3 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
419 3 : craft_single_logical_message(client, true)
420 3 : }
421 : }
422 :
423 : pub struct LastWalRecordCrossingSegment;
424 : impl Crafter for LastWalRecordCrossingSegment {
425 : const NAME: &'static str = "last_wal_record_crossing_segment";
426 3 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
427 3 : craft_single_logical_message(client, false)
428 3 : }
429 : }
|