Line data Source code
1 : use anyhow::{bail, ensure};
2 : use camino_tempfile::{tempdir, Utf8TempDir};
3 : use log::*;
4 : use postgres::types::PgLsn;
5 : use postgres::Client;
6 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
7 : use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
8 : use std::path::{Path, PathBuf};
9 : use std::process::Command;
10 : use std::time::{Duration, Instant};
11 :
12 : macro_rules! xlog_utils_test {
13 : ($version:ident) => {
14 : #[path = "."]
15 : mod $version {
16 : #[allow(unused_imports)]
17 : pub use postgres_ffi::$version::wal_craft_test_export::*;
18 : #[allow(clippy::duplicate_mod)]
19 : #[cfg(test)]
20 : mod xlog_utils_test;
21 : }
22 : };
23 : }
24 :
25 : postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
26 :
27 : #[derive(Debug, Clone, PartialEq, Eq)]
28 : pub struct Conf {
29 : pub pg_version: u32,
30 : pub pg_distrib_dir: PathBuf,
31 : pub datadir: PathBuf,
32 : }
33 :
34 : pub struct PostgresServer {
35 : process: std::process::Child,
36 : _unix_socket_dir: Utf8TempDir,
37 : client_config: postgres::Config,
38 : }
39 :
40 : pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
41 : "wal_keep_size=50MB", // Ensure old WAL is not removed
42 : "shared_preload_libraries=neon", // can only be loaded at startup
43 : // Disable background processes as much as possible
44 : "wal_writer_delay=10s",
45 : "autovacuum=off",
46 : ];
47 :
48 : impl Conf {
49 162 : pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
50 162 : let path = self.pg_distrib_dir.clone();
51 162 :
52 162 : #[allow(clippy::manual_range_patterns)]
53 162 : match self.pg_version {
54 162 : 14 | 15 | 16 => Ok(path.join(format!("v{}", self.pg_version))),
55 0 : _ => bail!("Unsupported postgres version: {}", self.pg_version),
56 : }
57 162 : }
58 :
59 54 : fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
60 54 : Ok(self.pg_distrib_dir()?.join("bin"))
61 54 : }
62 :
63 108 : fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
64 108 : Ok(self.pg_distrib_dir()?.join("lib"))
65 108 : }
66 :
67 306 : pub fn wal_dir(&self) -> PathBuf {
68 306 : self.datadir.join("pg_wal")
69 306 : }
70 :
71 54 : fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
72 54 : let path = self.pg_bin_dir()?.join(command);
73 54 : ensure!(path.exists(), "Command {:?} does not exist", path);
74 54 : let mut cmd = Command::new(path);
75 54 : cmd.env_clear()
76 54 : .env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
77 54 : .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
78 54 : Ok(cmd)
79 54 : }
80 :
81 18 : pub fn initdb(&self) -> anyhow::Result<()> {
82 18 : if let Some(parent) = self.datadir.parent() {
83 18 : info!("Pre-creating parent directory {:?}", parent);
84 : // Tests may be run concurrently and there may be a race to create `test_output/`.
85 : // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
86 18 : std::fs::create_dir_all(parent)?;
87 0 : }
88 18 : info!(
89 18 : "Running initdb in {:?} with user \"postgres\"",
90 : self.datadir
91 : );
92 18 : let output = self
93 18 : .new_pg_command("initdb")?
94 18 : .arg("-D")
95 18 : .arg(&self.datadir)
96 18 : .args(["-U", "postgres", "--no-instructions", "--no-sync"])
97 18 : .output()?;
98 18 : debug!("initdb output: {:?}", output);
99 18 : ensure!(
100 18 : output.status.success(),
101 0 : "initdb failed, stdout and stderr follow:\n{}{}",
102 0 : String::from_utf8_lossy(&output.stdout),
103 0 : String::from_utf8_lossy(&output.stderr),
104 : );
105 18 : Ok(())
106 18 : }
107 :
108 18 : pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
109 18 : info!("Starting Postgres server in {:?}", self.datadir);
110 18 : let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
111 18 : let unix_socket_dir_path = unix_socket_dir.path().to_owned();
112 18 : let server_process = self
113 18 : .new_pg_command("postgres")?
114 18 : .args(["-c", "listen_addresses="])
115 18 : .arg("-k")
116 18 : .arg(&unix_socket_dir_path)
117 18 : .arg("-D")
118 18 : .arg(&self.datadir)
119 72 : .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
120 18 : .spawn()?;
121 18 : let server = PostgresServer {
122 18 : process: server_process,
123 18 : _unix_socket_dir: unix_socket_dir,
124 18 : client_config: {
125 18 : let mut c = postgres::Config::new();
126 18 : c.host_path(&unix_socket_dir_path);
127 18 : c.user("postgres");
128 18 : c.connect_timeout(Duration::from_millis(10000));
129 18 : c
130 18 : },
131 18 : };
132 18 : Ok(server)
133 18 : }
134 :
135 18 : pub fn pg_waldump(
136 18 : &self,
137 18 : first_segment_name: &str,
138 18 : last_segment_name: &str,
139 18 : ) -> anyhow::Result<std::process::Output> {
140 18 : let first_segment_file = self.datadir.join(first_segment_name);
141 18 : let last_segment_file = self.datadir.join(last_segment_name);
142 18 : info!(
143 18 : "Running pg_waldump for {} .. {}",
144 18 : first_segment_file.display(),
145 18 : last_segment_file.display()
146 : );
147 18 : let output = self
148 18 : .new_pg_command("pg_waldump")?
149 18 : .args([&first_segment_file, &last_segment_file])
150 18 : .output()?;
151 18 : debug!("waldump output: {:?}", output);
152 18 : Ok(output)
153 18 : }
154 : }
155 :
156 : impl PostgresServer {
157 18 : pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
158 18 : let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
159 36 : while Instant::now() < retry_until {
160 36 : if let Ok(client) = self.client_config.connect(postgres::NoTls) {
161 18 : return Ok(client);
162 18 : }
163 18 : std::thread::sleep(Duration::from_millis(100));
164 : }
165 0 : bail!("Connection timed out");
166 18 : }
167 :
168 18 : pub fn kill(mut self) {
169 18 : self.process.kill().unwrap();
170 18 : self.process.wait().unwrap();
171 18 : }
172 : }
173 :
174 : impl Drop for PostgresServer {
175 18 : fn drop(&mut self) {
176 18 : match self.process.try_wait() {
177 18 : Ok(Some(_)) => return,
178 : Ok(None) => {
179 0 : warn!("Server was not terminated, will be killed");
180 : }
181 0 : Err(e) => {
182 0 : error!("Unable to get status of the server: {}, will be killed", e);
183 : }
184 : }
185 0 : let _ = self.process.kill();
186 18 : }
187 : }
188 :
189 : pub trait PostgresClientExt: postgres::GenericClient {
190 18 : fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
191 18 : Ok(self
192 18 : .query_one("SELECT pg_current_wal_insert_lsn()", &[])?
193 18 : .get(0))
194 18 : }
195 0 : fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
196 0 : Ok(self
197 0 : .query_one("SELECT pg_current_wal_flush_lsn()", &[])?
198 0 : .get(0))
199 0 : }
200 : }
201 :
202 : impl<C: postgres::GenericClient> PostgresClientExt for C {}
203 :
204 18 : pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
205 18 : client.execute("create extension if not exists neon_test_utils", &[])?;
206 :
207 18 : let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
208 18 : ensure!(wal_keep_size == "50MB");
209 18 : let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
210 18 : ensure!(wal_writer_delay == "10s");
211 18 : let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
212 18 : ensure!(autovacuum == "off");
213 :
214 18 : let wal_segment_size = client.query_one(
215 18 : "select cast(setting as bigint) as setting, unit \
216 18 : from pg_settings where name = 'wal_segment_size'",
217 18 : &[],
218 18 : )?;
219 18 : ensure!(
220 18 : wal_segment_size.get::<_, String>("unit") == "B",
221 0 : "Unexpected wal_segment_size unit"
222 : );
223 18 : ensure!(
224 18 : wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
225 0 : "Unexpected wal_segment_size in bytes"
226 : );
227 :
228 18 : Ok(())
229 18 : }
230 :
231 : pub trait Crafter {
232 : const NAME: &'static str;
233 :
234 : /// Generates WAL using the client `client`. Returns a vector of some valid
235 : /// "interesting" intermediate LSNs which one may start reading from.
236 : /// test_end_of_wal uses this to check various starting points.
237 : ///
238 : /// Note that postgres is generally keen about writing some WAL. While we
239 : /// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
240 : /// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
241 : /// stable WAL end would be flaky unless postgres is shut down. For this
242 : /// reason returning potential end of WAL here is pointless. Most of the
243 : /// time this doesn't happen though, so it is reasonable to create needed
244 : /// WAL structure and immediately kill postgres like test_end_of_wal does.
245 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
246 : }
247 :
248 : /// Wraps some WAL craft function, providing current LSN to it before the
249 : /// insertion and flushing WAL afterwards. Also pushes initial LSN to the
250 : /// result.
251 18 : fn craft_internal<C: postgres::GenericClient>(
252 18 : client: &mut C,
253 18 : f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
254 18 : ) -> anyhow::Result<Vec<PgLsn>> {
255 18 : ensure_server_config(client)?;
256 :
257 18 : let initial_lsn = client.pg_current_wal_insert_lsn()?;
258 18 : info!("LSN initial = {}", initial_lsn);
259 :
260 18 : let mut intermediate_lsns = f(client, initial_lsn)?;
261 18 : if !intermediate_lsns.starts_with(&[initial_lsn]) {
262 18 : intermediate_lsns.insert(0, initial_lsn);
263 18 : }
264 :
265 : // Some records may be not flushed, e.g. non-transactional logical messages.
266 : //
267 : // Note: this is broken if pg_current_wal_insert_lsn is at page boundary
268 : // because pg_current_wal_insert_lsn skips page headers.
269 18 : client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
270 18 : Ok(intermediate_lsns)
271 18 : }
272 :
273 : pub struct Simple;
274 : impl Crafter for Simple {
275 : const NAME: &'static str = "simple";
276 6 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
277 6 : craft_internal(client, |client, _| {
278 6 : client.execute("CREATE table t(x int)", &[])?;
279 6 : Ok(Vec::new())
280 6 : })
281 6 : }
282 : }
283 :
284 : pub struct LastWalRecordXlogSwitch;
285 : impl Crafter for LastWalRecordXlogSwitch {
286 : const NAME: &'static str = "last_wal_record_xlog_switch";
287 0 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
288 0 : // Do not use craft_internal because here we end up with flush_lsn exactly on
289 0 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
290 0 : ensure_server_config(client)?;
291 :
292 0 : client.execute("CREATE table t(x int)", &[])?;
293 0 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
294 : // pg_switch_wal returns end of last record of the switched segment,
295 : // i.e. end of SWITCH itself.
296 0 : let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
297 0 : let before_xlog_switch_u64 = u64::from(before_xlog_switch);
298 0 : let next_segment = PgLsn::from(
299 0 : before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
300 0 : + WAL_SEGMENT_SIZE as u64,
301 0 : );
302 0 : ensure!(
303 0 : xlog_switch_record_end <= next_segment,
304 0 : "XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
305 : xlog_switch_record_end,
306 : next_segment
307 : );
308 0 : Ok(vec![before_xlog_switch, xlog_switch_record_end])
309 0 : }
310 : }
311 :
312 : pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
313 : /// Craft xlog SWITCH record ending at page boundary.
314 : impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
315 : const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
316 0 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
317 0 : // Do not use generate_internal because here we end up with flush_lsn exactly on
318 0 : // the segment boundary and insert_lsn after the initial page header, which is unusual.
319 0 : ensure_server_config(client)?;
320 :
321 0 : client.execute("CREATE table t(x int)", &[])?;
322 :
323 : // Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
324 : // We will use logical message as the padding. We start with detecting how much WAL
325 : // it takes for one logical message, considering all alignments and headers.
326 0 : let base_wal_advance = {
327 0 : let before_lsn = client.pg_current_wal_insert_lsn()?;
328 : // Small non-empty message bigger than few bytes is more likely than an empty
329 : // message to have the same format as the big padding message.
330 0 : client.execute(
331 0 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
332 0 : &[],
333 0 : )?;
334 : // The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
335 0 : (u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
336 : + XLOG_SIZE_OF_XLOG_RECORD
337 : };
338 0 : let mut remaining_lsn =
339 0 : XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
340 0 : if remaining_lsn < base_wal_advance {
341 0 : remaining_lsn += XLOG_BLCKSZ;
342 0 : }
343 0 : let repeats = 10 + remaining_lsn - base_wal_advance;
344 0 : info!(
345 0 : "current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
346 0 : client.pg_current_wal_insert_lsn()?,
347 : remaining_lsn,
348 : base_wal_advance,
349 : repeats
350 : );
351 0 : client.execute(
352 0 : "SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
353 0 : &[&(repeats as i32)],
354 0 : )?;
355 0 : info!(
356 0 : "current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
357 0 : client.pg_current_wal_insert_lsn()?,
358 : XLOG_SIZE_OF_XLOG_RECORD
359 : );
360 :
361 : // Emit the XLOG_SWITCH
362 0 : let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
363 0 : let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
364 0 : let next_segment = PgLsn::from(0x0200_0000);
365 0 : ensure!(
366 0 : xlog_switch_record_end < next_segment,
367 0 : "XLOG_SWITCH record ended on or after the expected segment boundary: {} > {}",
368 : xlog_switch_record_end,
369 : next_segment
370 : );
371 0 : ensure!(
372 0 : u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
373 0 : "XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
374 0 : xlog_switch_record_end,
375 0 : u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
376 : );
377 0 : Ok(vec![before_xlog_switch, xlog_switch_record_end])
378 0 : }
379 : }
380 :
381 : /// Write ~16MB logical message; it should cross WAL segment.
382 12 : fn craft_seg_size_logical_message(
383 12 : client: &mut impl postgres::GenericClient,
384 12 : transactional: bool,
385 12 : ) -> anyhow::Result<Vec<PgLsn>> {
386 12 : craft_internal(client, |client, initial_lsn| {
387 12 : ensure!(
388 12 : initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
389 0 : "Initial LSN is too far in the future"
390 : );
391 :
392 12 : let message_lsn: PgLsn = client
393 12 : .query_one(
394 12 : "select pg_logical_emit_message($1, 'big-16mb-msg', \
395 12 : concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
396 12 : &[&transactional],
397 12 : )?
398 12 : .get("message_lsn");
399 12 : ensure!(
400 12 : message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
401 0 : "Logical message did not cross the segment boundary"
402 : );
403 12 : ensure!(
404 12 : message_lsn < PgLsn::from(0x0400_0000),
405 0 : "Logical message crossed two segments"
406 : );
407 :
408 12 : Ok(vec![message_lsn])
409 12 : })
410 12 : }
411 :
412 : pub struct WalRecordCrossingSegmentFollowedBySmallOne;
413 : impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
414 : const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
415 6 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
416 6 : // Transactional message crossing WAL segment will be followed by small
417 6 : // commit record.
418 6 : craft_seg_size_logical_message(client, true)
419 6 : }
420 : }
421 :
422 : pub struct LastWalRecordCrossingSegment;
423 : impl Crafter for LastWalRecordCrossingSegment {
424 : const NAME: &'static str = "last_wal_record_crossing_segment";
425 6 : fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
426 6 : craft_seg_size_logical_message(client, false)
427 6 : }
428 : }
|