TLA Line data Source code
1 : //! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency.
2 :
3 : use super::*;
4 : use crate::{error, info};
5 : use regex::Regex;
6 : use std::cmp::min;
7 : use std::fs::{self, File};
8 : use std::io::Write;
9 : use std::{env, str::FromStr};
10 : use utils::const_assert;
11 : use utils::lsn::Lsn;
12 :
13 CBC 9 : fn init_logging() {
14 9 : let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(
15 9 : format!("crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"),
16 9 : ))
17 9 : .is_test(true)
18 9 : .try_init();
19 9 : }
20 :
21 9 : fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
22 9 : use crate::*;
23 9 :
24 9 : let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
25 9 :
26 9 : // Craft some WAL
27 9 : let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
28 9 : .join("..")
29 9 : .join("..")
30 9 : .join("..");
31 9 : let cfg = Conf {
32 9 : pg_version,
33 9 : pg_distrib_dir: top_path.join("pg_install"),
34 9 : datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
35 9 : };
36 9 : if cfg.datadir.exists() {
37 UBC 0 : fs::remove_dir_all(&cfg.datadir).unwrap();
38 CBC 9 : }
39 9 : cfg.initdb().unwrap();
40 9 : let srv = cfg.start_server().unwrap();
41 9 : let (intermediate_lsns, expected_end_of_wal_partial) =
42 9 : C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
43 9 : let intermediate_lsns: Vec<Lsn> = intermediate_lsns
44 9 : .iter()
45 12 : .map(|&lsn| u64::from(lsn).into())
46 9 : .collect();
47 9 : let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into();
48 9 : srv.kill();
49 9 :
50 9 : // Check find_end_of_wal on the initial WAL
51 9 : let last_segment = cfg
52 9 : .wal_dir()
53 9 : .read_dir()
54 9 : .unwrap()
55 24 : .map(|f| f.unwrap().file_name().into_string().unwrap())
56 24 : .filter(|fname| IsXLogFileName(fname))
57 9 : .max()
58 9 : .unwrap();
59 9 : check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal);
60 21 : for start_lsn in intermediate_lsns
61 9 : .iter()
62 9 : .chain(std::iter::once(&expected_end_of_wal))
63 : {
64 : // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
65 : // We assume that `start_lsn` is non-decreasing.
66 21 : info!(
67 21 : "Checking with start_lsn={}, erasing WAL before it",
68 : start_lsn
69 : );
70 57 : for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
71 57 : let fname = file.file_name().into_string().unwrap();
72 57 : if !IsXLogFileName(&fname) {
73 21 : continue;
74 36 : }
75 36 : let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
76 36 : let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
77 36 : if seg_start_lsn > u64::from(*start_lsn) {
78 6 : continue;
79 30 : }
80 30 : let mut f = File::options().write(true).open(file.path()).unwrap();
81 30 : const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
82 30 : f.write_all(
83 30 : &ZEROS[0..min(
84 30 : WAL_SEGMENT_SIZE,
85 30 : (u64::from(*start_lsn) - seg_start_lsn) as usize,
86 30 : )],
87 30 : )
88 30 : .unwrap();
89 : }
90 21 : check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal);
91 : }
92 9 : }
93 :
94 9 : fn check_pg_waldump_end_of_wal(
95 9 : cfg: &crate::Conf,
96 9 : last_segment: &str,
97 9 : expected_end_of_wal: Lsn,
98 9 : ) {
99 9 : // Get the actual end of WAL by pg_waldump
100 9 : let waldump_output = cfg
101 9 : .pg_waldump("000000010000000000000001", last_segment)
102 9 : .unwrap()
103 9 : .stderr;
104 9 : let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
105 9 : let caps = match Regex::new(r"invalid record length at (.+):")
106 9 : .unwrap()
107 9 : .captures(waldump_output)
108 : {
109 9 : Some(caps) => caps,
110 : None => {
111 UBC 0 : error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output);
112 0 : panic!();
113 : }
114 : };
115 CBC 9 : let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
116 9 : info!(
117 9 : "waldump erred on {}, expected wal end at {}",
118 : waldump_wal_end, expected_end_of_wal
119 : );
120 9 : assert_eq!(waldump_wal_end, expected_end_of_wal);
121 9 : }
122 :
123 21 : fn check_end_of_wal(
124 21 : cfg: &crate::Conf,
125 21 : last_segment: &str,
126 21 : start_lsn: Lsn,
127 21 : expected_end_of_wal: Lsn,
128 21 : ) {
129 21 : // Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
130 21 : // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
131 21 : // info!(
132 21 : // "find_end_of_wal returned wal_end={} with non-partial WAL segment",
133 21 : // wal_end
134 21 : // );
135 21 : // assert_eq!(wal_end, expected_end_of_wal_non_partial);
136 21 :
137 21 : // Rename file to partial to actually find last valid lsn, then rename it back.
138 21 : fs::rename(
139 21 : cfg.wal_dir().join(last_segment),
140 21 : cfg.wal_dir().join(format!("{}.partial", last_segment)),
141 21 : )
142 21 : .unwrap();
143 21 : let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
144 21 : info!(
145 21 : "find_end_of_wal returned wal_end={} with partial WAL segment",
146 : wal_end
147 : );
148 21 : assert_eq!(wal_end, expected_end_of_wal);
149 21 : fs::rename(
150 21 : cfg.wal_dir().join(format!("{}.partial", last_segment)),
151 21 : cfg.wal_dir().join(last_segment),
152 21 : )
153 21 : .unwrap();
154 21 : }
155 :
156 : const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
157 :
158 3 : #[test]
159 3 : pub fn test_find_end_of_wal_simple() {
160 3 : init_logging();
161 3 : test_end_of_wal::<crate::Simple>("test_find_end_of_wal_simple");
162 3 : }
163 :
164 3 : #[test]
165 3 : pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() {
166 3 : init_logging();
167 3 : test_end_of_wal::<crate::WalRecordCrossingSegmentFollowedBySmallOne>(
168 3 : "test_find_end_of_wal_crossing_segment_followed_by_small_one",
169 3 : );
170 3 : }
171 :
172 3 : #[test]
173 3 : pub fn test_find_end_of_wal_last_crossing_segment() {
174 3 : init_logging();
175 3 : test_end_of_wal::<crate::LastWalRecordCrossingSegment>(
176 3 : "test_find_end_of_wal_last_crossing_segment",
177 3 : );
178 3 : }
179 :
180 : /// Check the math in update_next_xid
181 : ///
182 : /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
183 : /// currently 1024.
184 3 : #[test]
185 3 : pub fn test_update_next_xid() {
186 3 : let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
187 3 : let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
188 3 :
189 3 : checkpoint.nextXid = FullTransactionId { value: 10 };
190 3 : assert_eq!(checkpoint.nextXid.value, 10);
191 :
192 : // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
193 : // boundary
194 3 : checkpoint.update_next_xid(100);
195 3 : assert_eq!(checkpoint.nextXid.value, 1024);
196 :
197 : // No change
198 3 : checkpoint.update_next_xid(500);
199 3 : assert_eq!(checkpoint.nextXid.value, 1024);
200 3 : checkpoint.update_next_xid(1023);
201 3 : assert_eq!(checkpoint.nextXid.value, 1024);
202 :
203 : // The function returns the *next* XID, given the highest XID seen so
204 : // far. So when we pass 1024, the nextXid gets bumped up to the next
205 : // XID_CHECKPOINT_INTERVAL boundary.
206 3 : checkpoint.update_next_xid(1024);
207 3 : assert_eq!(checkpoint.nextXid.value, 2048);
208 3 : }
209 :
210 3 : #[test]
211 3 : pub fn test_encode_logical_message() {
212 3 : let expected = [
213 3 : 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255,
214 3 : 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114,
215 3 : 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
216 3 : ];
217 3 : let actual = encode_logical_message("prefix", "message");
218 3 : assert_eq!(expected, actual[..]);
219 3 : }
|