Line data Source code
1 : use std::{ffi::CString, sync::Arc};
2 :
3 : use byteorder::{LittleEndian, WriteBytesExt};
4 : use crc32c::crc32c_append;
5 : use parking_lot::{Mutex, MutexGuard};
6 : use postgres_ffi::{
7 : pg_constants::{
8 : RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG,
9 : XLR_BLOCK_ID_DATA_SHORT,
10 : },
11 : v16::{
12 : wal_craft_test_export::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC},
13 : xlog_utils::{
14 : XLogSegNoOffsetToRecPtr, XlLogicalMessage, XLOG_RECORD_CRC_OFFS,
15 : XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
16 : XLP_FIRST_IS_CONTRECORD,
17 : },
18 : XLogRecord,
19 : },
20 : WAL_SEGMENT_SIZE, XLOG_BLCKSZ,
21 : };
22 : use utils::lsn::Lsn;
23 :
24 : use super::block_storage::BlockStorage;
25 :
26 : /// Simulation implementation of walproposer WAL storage.
27 : pub struct DiskWalProposer {
28 : state: Mutex<State>,
29 : }
30 :
31 : impl DiskWalProposer {
32 74318 : pub fn new() -> Arc<DiskWalProposer> {
33 74318 : Arc::new(DiskWalProposer {
34 74318 : state: Mutex::new(State {
35 74318 : internal_available_lsn: Lsn(0),
36 74318 : prev_lsn: Lsn(0),
37 74318 : disk: BlockStorage::new(),
38 74318 : }),
39 74318 : })
40 74318 : }
41 :
42 76686 : pub fn lock(&self) -> MutexGuard<State> {
43 76686 : self.state.lock()
44 76686 : }
45 : }
46 :
47 : pub struct State {
48 : // flush_lsn
49 : internal_available_lsn: Lsn,
50 : // needed for WAL generation
51 : prev_lsn: Lsn,
52 : // actual WAL storage
53 : disk: BlockStorage,
54 : }
55 :
56 : impl State {
57 27128 : pub fn read(&self, pos: u64, buf: &mut [u8]) {
58 27128 : self.disk.read(pos, buf);
59 27128 : // TODO: fail on reading uninitialized data
60 27128 : }
61 :
62 136681 : pub fn write(&mut self, pos: u64, buf: &[u8]) {
63 136681 : self.disk.write(pos, buf);
64 136681 : }
65 :
66 : /// Update the internal available LSN to the given value.
67 2817 : pub fn reset_to(&mut self, lsn: Lsn) {
68 2817 : self.internal_available_lsn = lsn;
69 2817 : }
70 :
71 : /// Get current LSN.
72 11472 : pub fn flush_rec_ptr(&self) -> Lsn {
73 11472 : self.internal_available_lsn
74 11472 : }
75 :
76 : /// Generate a new WAL record at the current LSN.
77 33682 : pub fn insert_logical_message(&mut self, prefix: &str, msg: &[u8]) -> anyhow::Result<()> {
78 33682 : let prefix_cstr = CString::new(prefix)?;
79 33682 : let prefix_bytes = prefix_cstr.as_bytes_with_nul();
80 33682 :
81 33682 : let lm = XlLogicalMessage {
82 33682 : db_id: 0,
83 33682 : transactional: 0,
84 33682 : prefix_size: prefix_bytes.len() as ::std::os::raw::c_ulong,
85 33682 : message_size: msg.len() as ::std::os::raw::c_ulong,
86 33682 : };
87 33682 :
88 33682 : let record_bytes = lm.encode();
89 33682 : let rdatas: Vec<&[u8]> = vec![&record_bytes, prefix_bytes, msg];
90 33682 : insert_wal_record(self, rdatas, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE)
91 33682 : }
92 : }
93 :
94 33682 : fn insert_wal_record(
95 33682 : state: &mut State,
96 33682 : rdatas: Vec<&[u8]>,
97 33682 : rmid: u8,
98 33682 : info: u8,
99 33682 : ) -> anyhow::Result<()> {
100 33682 : // bytes right after the header, in the same rdata block
101 33682 : let mut scratch = Vec::new();
102 101046 : let mainrdata_len: usize = rdatas.iter().map(|rdata| rdata.len()).sum();
103 33682 :
104 33682 : if mainrdata_len > 0 {
105 33682 : if mainrdata_len > 255 {
106 0 : scratch.push(XLR_BLOCK_ID_DATA_LONG);
107 0 : // TODO: verify endiness
108 0 : let _ = scratch.write_u32::<LittleEndian>(mainrdata_len as u32);
109 33682 : } else {
110 33682 : scratch.push(XLR_BLOCK_ID_DATA_SHORT);
111 33682 : scratch.push(mainrdata_len as u8);
112 33682 : }
113 0 : }
114 :
115 33682 : let total_len: u32 = (XLOG_SIZE_OF_XLOG_RECORD + scratch.len() + mainrdata_len) as u32;
116 33682 : let size = maxalign(total_len);
117 33682 : assert!(size as usize > XLOG_SIZE_OF_XLOG_RECORD);
118 :
119 33682 : let start_bytepos = recptr_to_bytepos(state.internal_available_lsn);
120 33682 : let end_bytepos = start_bytepos + size as u64;
121 33682 :
122 33682 : let start_recptr = bytepos_to_recptr(start_bytepos);
123 33682 : let end_recptr = bytepos_to_recptr(end_bytepos);
124 33682 :
125 33682 : assert!(recptr_to_bytepos(start_recptr) == start_bytepos);
126 33682 : assert!(recptr_to_bytepos(end_recptr) == end_bytepos);
127 :
128 33682 : let mut crc = crc32c_append(0, &scratch);
129 134728 : for rdata in &rdatas {
130 101046 : crc = crc32c_append(crc, rdata);
131 101046 : }
132 :
133 33682 : let mut header = XLogRecord {
134 33682 : xl_tot_len: total_len,
135 33682 : xl_xid: 0,
136 33682 : xl_prev: state.prev_lsn.0,
137 33682 : xl_info: info,
138 33682 : xl_rmid: rmid,
139 33682 : __bindgen_padding_0: [0u8; 2usize],
140 33682 : xl_crc: crc,
141 33682 : };
142 :
143 : // now we have the header and can finish the crc
144 33682 : let header_bytes = header.encode()?;
145 33682 : let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
146 33682 : header.xl_crc = crc;
147 :
148 33682 : let mut header_bytes = header.encode()?.to_vec();
149 33682 : assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_RECORD);
150 :
151 33682 : header_bytes.extend_from_slice(&scratch);
152 33682 :
153 33682 : // finish rdatas
154 33682 : let mut rdatas = rdatas;
155 33682 : rdatas.insert(0, &header_bytes);
156 33682 :
157 33682 : write_walrecord_to_disk(state, total_len as u64, rdatas, start_recptr, end_recptr)?;
158 :
159 33682 : state.internal_available_lsn = end_recptr;
160 33682 : state.prev_lsn = start_recptr;
161 33682 : Ok(())
162 33682 : }
163 :
164 33682 : fn write_walrecord_to_disk(
165 33682 : state: &mut State,
166 33682 : total_len: u64,
167 33682 : rdatas: Vec<&[u8]>,
168 33682 : start: Lsn,
169 33682 : end: Lsn,
170 33682 : ) -> anyhow::Result<()> {
171 33682 : let mut curr_ptr = start;
172 33682 : let mut freespace = insert_freespace(curr_ptr);
173 33682 : let mut written: usize = 0;
174 33682 :
175 33682 : assert!(freespace >= std::mem::size_of::<u32>());
176 :
177 168410 : for mut rdata in rdatas {
178 134911 : while rdata.len() >= freespace {
179 183 : assert!(
180 183 : curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD
181 0 : || freespace == 0
182 : );
183 :
184 183 : state.write(curr_ptr.0, &rdata[..freespace]);
185 183 : rdata = &rdata[freespace..];
186 183 : written += freespace;
187 183 : curr_ptr = Lsn(curr_ptr.0 + freespace as u64);
188 183 :
189 183 : let mut new_page = XLogPageHeaderData {
190 183 : xlp_magic: XLOG_PAGE_MAGIC as u16,
191 183 : xlp_info: XLP_BKP_REMOVABLE,
192 183 : xlp_tli: 1,
193 183 : xlp_pageaddr: curr_ptr.0,
194 183 : xlp_rem_len: (total_len - written as u64) as u32,
195 183 : ..Default::default() // Put 0 in padding fields.
196 183 : };
197 183 : if new_page.xlp_rem_len > 0 {
198 165 : new_page.xlp_info |= XLP_FIRST_IS_CONTRECORD;
199 165 : }
200 :
201 183 : if curr_ptr.segment_offset(WAL_SEGMENT_SIZE) == 0 {
202 0 : new_page.xlp_info |= XLP_LONG_HEADER;
203 0 : let long_page = XLogLongPageHeaderData {
204 0 : std: new_page,
205 0 : xlp_sysid: 0,
206 0 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
207 0 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
208 0 : };
209 0 : let header_bytes = long_page.encode()?;
210 0 : assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_LONG_PHD);
211 0 : state.write(curr_ptr.0, &header_bytes);
212 0 : curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64);
213 : } else {
214 183 : let header_bytes = new_page.encode()?;
215 183 : assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_SHORT_PHD);
216 183 : state.write(curr_ptr.0, &header_bytes);
217 183 : curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64);
218 : }
219 183 : freespace = insert_freespace(curr_ptr);
220 : }
221 :
222 134728 : assert!(
223 134728 : curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD
224 0 : || rdata.is_empty()
225 : );
226 134728 : state.write(curr_ptr.0, rdata);
227 134728 : curr_ptr = Lsn(curr_ptr.0 + rdata.len() as u64);
228 134728 : written += rdata.len();
229 134728 : freespace -= rdata.len();
230 : }
231 :
232 33682 : assert!(written == total_len as usize);
233 33682 : curr_ptr.0 = maxalign(curr_ptr.0);
234 33682 : assert!(curr_ptr == end);
235 33682 : Ok(())
236 33682 : }
237 :
238 67364 : fn maxalign<T>(size: T) -> T
239 67364 : where
240 67364 : T: std::ops::BitAnd<Output = T>
241 67364 : + std::ops::Add<Output = T>
242 67364 : + std::ops::Not<Output = T>
243 67364 : + From<u8>,
244 67364 : {
245 67364 : (size + T::from(7)) & !T::from(7)
246 67364 : }
247 :
248 33865 : fn insert_freespace(ptr: Lsn) -> usize {
249 33865 : if ptr.block_offset() == 0 {
250 0 : 0
251 : } else {
252 33865 : (XLOG_BLCKSZ as u64 - ptr.block_offset()) as usize
253 : }
254 33865 : }
255 :
256 : const XLP_BKP_REMOVABLE: u16 = 0x0004;
257 : const USABLE_BYTES_IN_PAGE: u64 = (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
258 : const USABLE_BYTES_IN_SEGMENT: u64 = ((WAL_SEGMENT_SIZE / XLOG_BLCKSZ) as u64
259 : * USABLE_BYTES_IN_PAGE)
260 : - (XLOG_SIZE_OF_XLOG_RECORD - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
261 :
262 67364 : fn bytepos_to_recptr(bytepos: u64) -> Lsn {
263 67364 : let fullsegs = bytepos / USABLE_BYTES_IN_SEGMENT;
264 67364 : let mut bytesleft = bytepos % USABLE_BYTES_IN_SEGMENT;
265 :
266 67364 : let seg_offset = if bytesleft < (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64 {
267 : // fits on first page of segment
268 0 : bytesleft + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
269 : } else {
270 : // account for the first page on segment with long header
271 67364 : bytesleft -= (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
272 67364 : let fullpages = bytesleft / USABLE_BYTES_IN_PAGE;
273 67364 : bytesleft %= USABLE_BYTES_IN_PAGE;
274 67364 :
275 67364 : XLOG_BLCKSZ as u64
276 67364 : + fullpages * XLOG_BLCKSZ as u64
277 67364 : + bytesleft
278 67364 : + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
279 : };
280 :
281 67364 : Lsn(XLogSegNoOffsetToRecPtr(
282 67364 : fullsegs,
283 67364 : seg_offset as u32,
284 67364 : WAL_SEGMENT_SIZE,
285 67364 : ))
286 67364 : }
287 :
288 101046 : fn recptr_to_bytepos(ptr: Lsn) -> u64 {
289 101046 : let fullsegs = ptr.segment_number(WAL_SEGMENT_SIZE);
290 101046 : let offset = ptr.segment_offset(WAL_SEGMENT_SIZE) as u64;
291 101046 :
292 101046 : let fullpages = offset / XLOG_BLCKSZ as u64;
293 101046 : let offset = offset % XLOG_BLCKSZ as u64;
294 101046 :
295 101046 : if fullpages == 0 {
296 0 : fullsegs * USABLE_BYTES_IN_SEGMENT
297 0 : + if offset > 0 {
298 0 : assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
299 0 : offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
300 : } else {
301 0 : 0
302 : }
303 : } else {
304 101046 : fullsegs * USABLE_BYTES_IN_SEGMENT
305 101046 : + (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64
306 101046 : + (fullpages - 1) * USABLE_BYTES_IN_PAGE
307 101046 : + if offset > 0 {
308 101046 : assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
309 101046 : offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
310 : } else {
311 0 : 0
312 : }
313 : }
314 101046 : }
|