Line data Source code
1 : //! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency.
2 :
3 : use super::*;
4 : use crate::{error, info};
5 : use regex::Regex;
6 : use std::cmp::min;
7 : use std::ffi::OsStr;
8 : use std::fs::{self, File};
9 : use std::io::Write;
10 : use std::{env, str::FromStr};
11 : use utils::const_assert;
12 : use utils::lsn::Lsn;
13 :
14 12 : fn init_logging() {
15 12 : let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(format!(
16 12 : "crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"
17 12 : )))
18 12 : .is_test(true)
19 12 : .try_init();
20 12 : }
21 :
22 : /// Test that find_end_of_wal returns the same results as pg_dump on various
23 : /// WALs created by Crafter.
24 12 : fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
25 : use crate::*;
26 :
27 12 : let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
28 12 :
29 12 : // Craft some WAL
30 12 : let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
31 12 : .join("..")
32 12 : .join("..")
33 12 : .join("..");
34 12 : let cfg = Conf {
35 12 : pg_version,
36 12 : pg_distrib_dir: top_path.join("pg_install"),
37 12 : datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
38 12 : };
39 12 : if cfg.datadir.exists() {
40 0 : fs::remove_dir_all(&cfg.datadir).unwrap();
41 12 : }
42 12 : cfg.initdb().unwrap();
43 12 : let srv = cfg.start_server().unwrap();
44 12 : let intermediate_lsns = C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
45 12 : let intermediate_lsns: Vec<Lsn> = intermediate_lsns
46 12 : .iter()
47 20 : .map(|&lsn| u64::from(lsn).into())
48 12 : .collect();
49 12 : // Kill postgres. Note that it might have inserted to WAL something after
50 12 : // 'craft' did its job.
51 12 : srv.kill();
52 12 :
53 12 : // Check find_end_of_wal on the initial WAL
54 12 : let last_segment = cfg
55 12 : .wal_dir()
56 12 : .read_dir()
57 12 : .unwrap()
58 35 : .map(|f| f.unwrap().file_name())
59 35 : .filter(|fname| IsXLogFileName(fname))
60 12 : .max()
61 12 : .unwrap();
62 12 : let expected_end_of_wal = find_pg_waldump_end_of_wal(&cfg, &last_segment);
63 32 : for start_lsn in intermediate_lsns
64 12 : .iter()
65 12 : .chain(std::iter::once(&expected_end_of_wal))
66 : {
67 : // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
68 : // We assume that `start_lsn` is non-decreasing.
69 32 : info!(
70 32 : "Checking with start_lsn={}, erasing WAL before it",
71 : start_lsn
72 : );
73 96 : for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
74 96 : let fname = file.file_name();
75 96 : if !IsXLogFileName(&fname) {
76 40 : continue;
77 56 : }
78 56 : let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE).unwrap();
79 56 : let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
80 56 : if seg_start_lsn > u64::from(*start_lsn) {
81 8 : continue;
82 48 : }
83 48 : let mut f = File::options().write(true).open(file.path()).unwrap();
84 48 : const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
85 48 : f.write_all(
86 48 : &ZEROS[0..min(
87 48 : WAL_SEGMENT_SIZE,
88 48 : (u64::from(*start_lsn) - seg_start_lsn) as usize,
89 48 : )],
90 48 : )
91 48 : .unwrap();
92 48 : }
93 32 : check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal);
94 : }
95 12 : }
96 :
97 12 : fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
98 12 : // Get the actual end of WAL by pg_waldump
99 12 : let waldump_output = cfg
100 12 : .pg_waldump(OsStr::new("000000010000000000000001"), last_segment)
101 12 : .unwrap()
102 12 : .stderr;
103 12 : let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
104 12 : let caps = match Regex::new(r"invalid record length at (.+):")
105 12 : .unwrap()
106 12 : .captures(waldump_output)
107 : {
108 12 : Some(caps) => caps,
109 : None => {
110 0 : error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output);
111 0 : panic!();
112 : }
113 : };
114 12 : let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
115 12 : info!("waldump erred on {}", waldump_wal_end);
116 12 : waldump_wal_end
117 12 : }
118 :
119 32 : fn check_end_of_wal(
120 32 : cfg: &crate::Conf,
121 32 : last_segment: &OsStr,
122 32 : start_lsn: Lsn,
123 32 : expected_end_of_wal: Lsn,
124 32 : ) {
125 32 : // Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
126 32 : // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
127 32 : // info!(
128 32 : // "find_end_of_wal returned wal_end={} with non-partial WAL segment",
129 32 : // wal_end
130 32 : // );
131 32 : // assert_eq!(wal_end, expected_end_of_wal_non_partial);
132 32 :
133 32 : // Rename file to partial to actually find last valid lsn, then rename it back.
134 32 : fs::rename(
135 32 : cfg.wal_dir().join(last_segment),
136 32 : cfg.wal_dir()
137 32 : .join(format!("{}.partial", last_segment.to_str().unwrap())),
138 32 : )
139 32 : .unwrap();
140 32 : let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
141 32 : info!(
142 32 : "find_end_of_wal returned wal_end={} with partial WAL segment",
143 : wal_end
144 : );
145 32 : assert_eq!(wal_end, expected_end_of_wal);
146 32 : fs::rename(
147 32 : cfg.wal_dir()
148 32 : .join(format!("{}.partial", last_segment.to_str().unwrap())),
149 32 : cfg.wal_dir().join(last_segment),
150 32 : )
151 32 : .unwrap();
152 32 : }
153 :
154 : const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
155 :
156 : #[test]
157 4 : pub fn test_find_end_of_wal_simple() {
158 4 : init_logging();
159 4 : test_end_of_wal::<crate::Simple>("test_find_end_of_wal_simple");
160 4 : }
161 :
162 : #[test]
163 4 : pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() {
164 4 : init_logging();
165 4 : test_end_of_wal::<crate::WalRecordCrossingSegmentFollowedBySmallOne>(
166 4 : "test_find_end_of_wal_crossing_segment_followed_by_small_one",
167 4 : );
168 4 : }
169 :
170 : #[test]
171 4 : pub fn test_find_end_of_wal_last_crossing_segment() {
172 4 : init_logging();
173 4 : test_end_of_wal::<crate::LastWalRecordCrossingSegment>(
174 4 : "test_find_end_of_wal_last_crossing_segment",
175 4 : );
176 4 : }
177 :
178 : /// Check the math in update_next_xid
179 : ///
180 : /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
181 : /// currently 1024.
182 : #[test]
183 4 : pub fn test_update_next_xid() {
184 4 : let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
185 4 : let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
186 4 :
187 4 : checkpoint.nextXid = FullTransactionId { value: 10 };
188 4 : assert_eq!(checkpoint.nextXid.value, 10);
189 :
190 : // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
191 : // boundary
192 4 : checkpoint.update_next_xid(100);
193 4 : assert_eq!(checkpoint.nextXid.value, 1024);
194 :
195 : // No change
196 4 : checkpoint.update_next_xid(500);
197 4 : assert_eq!(checkpoint.nextXid.value, 1024);
198 4 : checkpoint.update_next_xid(1023);
199 4 : assert_eq!(checkpoint.nextXid.value, 1024);
200 :
201 : // The function returns the *next* XID, given the highest XID seen so
202 : // far. So when we pass 1024, the nextXid gets bumped up to the next
203 : // XID_CHECKPOINT_INTERVAL boundary.
204 4 : checkpoint.update_next_xid(1024);
205 4 : assert_eq!(checkpoint.nextXid.value, 2048);
206 4 : }
207 :
208 : #[test]
209 4 : pub fn test_update_next_multixid() {
210 4 : let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
211 4 : let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
212 4 :
213 4 : // simple case
214 4 : checkpoint.nextMulti = 20;
215 4 : checkpoint.nextMultiOffset = 20;
216 4 : checkpoint.update_next_multixid(1000, 2000);
217 4 : assert_eq!(checkpoint.nextMulti, 1000);
218 4 : assert_eq!(checkpoint.nextMultiOffset, 2000);
219 :
220 : // No change
221 4 : checkpoint.update_next_multixid(500, 900);
222 4 : assert_eq!(checkpoint.nextMulti, 1000);
223 4 : assert_eq!(checkpoint.nextMultiOffset, 2000);
224 :
225 : // Close to wraparound, but not wrapped around yet
226 4 : checkpoint.nextMulti = 0xffff0000;
227 4 : checkpoint.nextMultiOffset = 0xfffe0000;
228 4 : checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff);
229 4 : assert_eq!(checkpoint.nextMulti, 0xffff00ff);
230 4 : assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
231 :
232 : // Wraparound
233 4 : checkpoint.update_next_multixid(1, 900);
234 4 : assert_eq!(checkpoint.nextMulti, 1);
235 4 : assert_eq!(checkpoint.nextMultiOffset, 900);
236 :
237 : // Wraparound nextMulti to 0.
238 : //
239 : // It's a bit surprising that nextMulti can be 0, because that's a special value
240 : // (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound:
241 : // nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips
242 : // the 0 and the next multi-xid actually assigned is 1.
243 4 : checkpoint.nextMulti = 0xffff0000;
244 4 : checkpoint.nextMultiOffset = 0xfffe0000;
245 4 : checkpoint.update_next_multixid(0, 0xfffe00ff);
246 4 : assert_eq!(checkpoint.nextMulti, 0);
247 4 : assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
248 :
249 : // Wraparound nextMultiOffset to 0
250 4 : checkpoint.update_next_multixid(0, 0);
251 4 : assert_eq!(checkpoint.nextMulti, 0);
252 4 : assert_eq!(checkpoint.nextMultiOffset, 0);
253 4 : }
254 :
255 : #[test]
256 4 : pub fn test_encode_logical_message() {
257 4 : let expected = [
258 4 : 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38,
259 4 : 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102,
260 4 : 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
261 4 : ];
262 4 : let actual = encode_logical_message("prefix", "message");
263 4 : assert_eq!(expected, actual[..]);
264 4 : }
|