Line data Source code
1 : //! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency.
2 :
3 : use super::*;
4 : use crate::{error, info};
5 : use regex::Regex;
6 : use std::cmp::min;
7 : use std::fs::{self, File};
8 : use std::io::Write;
9 : use std::{env, str::FromStr};
10 : use utils::const_assert;
11 : use utils::lsn::Lsn;
12 :
13 9 : fn init_logging() {
14 9 : let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(format!(
15 9 : "crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"
16 9 : )))
17 9 : .is_test(true)
18 9 : .try_init();
19 9 : }
20 :
21 : /// Test that find_end_of_wal returns the same results as pg_dump on various
22 : /// WALs created by Crafter.
23 9 : fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
24 9 : use crate::*;
25 9 :
26 9 : let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
27 9 :
28 9 : // Craft some WAL
29 9 : let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
30 9 : .join("..")
31 9 : .join("..")
32 9 : .join("..");
33 9 : let cfg = Conf {
34 9 : pg_version,
35 9 : pg_distrib_dir: top_path.join("pg_install"),
36 9 : datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
37 9 : };
38 9 : if cfg.datadir.exists() {
39 0 : fs::remove_dir_all(&cfg.datadir).unwrap();
40 9 : }
41 9 : cfg.initdb().unwrap();
42 9 : let srv = cfg.start_server().unwrap();
43 9 : let intermediate_lsns = C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
44 9 : let intermediate_lsns: Vec<Lsn> = intermediate_lsns
45 9 : .iter()
46 15 : .map(|&lsn| u64::from(lsn).into())
47 9 : .collect();
48 9 : // Kill postgres. Note that it might have inserted to WAL something after
49 9 : // 'craft' did its job.
50 9 : srv.kill();
51 9 :
52 9 : // Check find_end_of_wal on the initial WAL
53 9 : let last_segment = cfg
54 9 : .wal_dir()
55 9 : .read_dir()
56 9 : .unwrap()
57 24 : .map(|f| f.unwrap().file_name().into_string().unwrap())
58 24 : .filter(|fname| IsXLogFileName(fname))
59 9 : .max()
60 9 : .unwrap();
61 9 : let expected_end_of_wal = find_pg_waldump_end_of_wal(&cfg, &last_segment);
62 24 : for start_lsn in intermediate_lsns
63 9 : .iter()
64 9 : .chain(std::iter::once(&expected_end_of_wal))
65 : {
66 : // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
67 : // We assume that `start_lsn` is non-decreasing.
68 24 : info!(
69 24 : "Checking with start_lsn={}, erasing WAL before it",
70 : start_lsn
71 : );
72 66 : for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
73 66 : let fname = file.file_name().into_string().unwrap();
74 66 : if !IsXLogFileName(&fname) {
75 24 : continue;
76 42 : }
77 42 : let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
78 42 : let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
79 42 : if seg_start_lsn > u64::from(*start_lsn) {
80 6 : continue;
81 36 : }
82 36 : let mut f = File::options().write(true).open(file.path()).unwrap();
83 36 : const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
84 36 : f.write_all(
85 36 : &ZEROS[0..min(
86 36 : WAL_SEGMENT_SIZE,
87 36 : (u64::from(*start_lsn) - seg_start_lsn) as usize,
88 36 : )],
89 36 : )
90 36 : .unwrap();
91 : }
92 24 : check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal);
93 : }
94 9 : }
95 :
96 9 : fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
97 9 : // Get the actual end of WAL by pg_waldump
98 9 : let waldump_output = cfg
99 9 : .pg_waldump("000000010000000000000001", last_segment)
100 9 : .unwrap()
101 9 : .stderr;
102 9 : let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
103 9 : let caps = match Regex::new(r"invalid record length at (.+):")
104 9 : .unwrap()
105 9 : .captures(waldump_output)
106 : {
107 9 : Some(caps) => caps,
108 : None => {
109 0 : error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output);
110 0 : panic!();
111 : }
112 : };
113 9 : let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
114 9 : info!("waldump erred on {}", waldump_wal_end);
115 9 : waldump_wal_end
116 9 : }
117 :
118 24 : fn check_end_of_wal(
119 24 : cfg: &crate::Conf,
120 24 : last_segment: &str,
121 24 : start_lsn: Lsn,
122 24 : expected_end_of_wal: Lsn,
123 24 : ) {
124 24 : // Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
125 24 : // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
126 24 : // info!(
127 24 : // "find_end_of_wal returned wal_end={} with non-partial WAL segment",
128 24 : // wal_end
129 24 : // );
130 24 : // assert_eq!(wal_end, expected_end_of_wal_non_partial);
131 24 :
132 24 : // Rename file to partial to actually find last valid lsn, then rename it back.
133 24 : fs::rename(
134 24 : cfg.wal_dir().join(last_segment),
135 24 : cfg.wal_dir().join(format!("{}.partial", last_segment)),
136 24 : )
137 24 : .unwrap();
138 24 : let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
139 24 : info!(
140 24 : "find_end_of_wal returned wal_end={} with partial WAL segment",
141 : wal_end
142 : );
143 24 : assert_eq!(wal_end, expected_end_of_wal);
144 24 : fs::rename(
145 24 : cfg.wal_dir().join(format!("{}.partial", last_segment)),
146 24 : cfg.wal_dir().join(last_segment),
147 24 : )
148 24 : .unwrap();
149 24 : }
150 :
151 : const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
152 :
153 : #[test]
154 3 : pub fn test_find_end_of_wal_simple() {
155 3 : init_logging();
156 3 : test_end_of_wal::<crate::Simple>("test_find_end_of_wal_simple");
157 3 : }
158 :
159 : #[test]
160 3 : pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() {
161 3 : init_logging();
162 3 : test_end_of_wal::<crate::WalRecordCrossingSegmentFollowedBySmallOne>(
163 3 : "test_find_end_of_wal_crossing_segment_followed_by_small_one",
164 3 : );
165 3 : }
166 :
167 : #[test]
168 3 : pub fn test_find_end_of_wal_last_crossing_segment() {
169 3 : init_logging();
170 3 : test_end_of_wal::<crate::LastWalRecordCrossingSegment>(
171 3 : "test_find_end_of_wal_last_crossing_segment",
172 3 : );
173 3 : }
174 :
175 : /// Check the math in update_next_xid
176 : ///
177 : /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
178 : /// currently 1024.
179 : #[test]
180 3 : pub fn test_update_next_xid() {
181 3 : let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
182 3 : let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
183 3 :
184 3 : checkpoint.nextXid = FullTransactionId { value: 10 };
185 3 : assert_eq!(checkpoint.nextXid.value, 10);
186 :
187 : // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
188 : // boundary
189 3 : checkpoint.update_next_xid(100);
190 3 : assert_eq!(checkpoint.nextXid.value, 1024);
191 :
192 : // No change
193 3 : checkpoint.update_next_xid(500);
194 3 : assert_eq!(checkpoint.nextXid.value, 1024);
195 3 : checkpoint.update_next_xid(1023);
196 3 : assert_eq!(checkpoint.nextXid.value, 1024);
197 :
198 : // The function returns the *next* XID, given the highest XID seen so
199 : // far. So when we pass 1024, the nextXid gets bumped up to the next
200 : // XID_CHECKPOINT_INTERVAL boundary.
201 3 : checkpoint.update_next_xid(1024);
202 3 : assert_eq!(checkpoint.nextXid.value, 2048);
203 3 : }
204 :
205 : #[test]
206 3 : pub fn test_update_next_multixid() {
207 3 : let checkpoint_buf = [0u8; size_of::<CheckPoint>()];
208 3 : let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
209 3 :
210 3 : // simple case
211 3 : checkpoint.nextMulti = 20;
212 3 : checkpoint.nextMultiOffset = 20;
213 3 : checkpoint.update_next_multixid(1000, 2000);
214 3 : assert_eq!(checkpoint.nextMulti, 1000);
215 3 : assert_eq!(checkpoint.nextMultiOffset, 2000);
216 :
217 : // No change
218 3 : checkpoint.update_next_multixid(500, 900);
219 3 : assert_eq!(checkpoint.nextMulti, 1000);
220 3 : assert_eq!(checkpoint.nextMultiOffset, 2000);
221 :
222 : // Close to wraparound, but not wrapped around yet
223 3 : checkpoint.nextMulti = 0xffff0000;
224 3 : checkpoint.nextMultiOffset = 0xfffe0000;
225 3 : checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff);
226 3 : assert_eq!(checkpoint.nextMulti, 0xffff00ff);
227 3 : assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
228 :
229 : // Wraparound
230 3 : checkpoint.update_next_multixid(1, 900);
231 3 : assert_eq!(checkpoint.nextMulti, 1);
232 3 : assert_eq!(checkpoint.nextMultiOffset, 900);
233 :
234 : // Wraparound nextMulti to 0.
235 : //
236 : // It's a bit surprising that nextMulti can be 0, because that's a special value
237 : // (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound:
238 : // nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips
239 : // the 0 and the next multi-xid actually assigned is 1.
240 3 : checkpoint.nextMulti = 0xffff0000;
241 3 : checkpoint.nextMultiOffset = 0xfffe0000;
242 3 : checkpoint.update_next_multixid(0, 0xfffe00ff);
243 3 : assert_eq!(checkpoint.nextMulti, 0);
244 3 : assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
245 :
246 : // Wraparound nextMultiOffset to 0
247 3 : checkpoint.update_next_multixid(0, 0);
248 3 : assert_eq!(checkpoint.nextMulti, 0);
249 3 : assert_eq!(checkpoint.nextMultiOffset, 0);
250 3 : }
251 :
252 : #[test]
253 3 : pub fn test_encode_logical_message() {
254 3 : let expected = [
255 3 : 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38,
256 3 : 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102,
257 3 : 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
258 3 : ];
259 3 : let actual = encode_logical_message("prefix", "message");
260 3 : assert_eq!(expected, actual[..]);
261 3 : }
|