Line data Source code
1 : //
2 : // This file contains common utilities for dealing with PostgreSQL WAL files and
3 : // LSNs.
4 : //
5 : // Many of these functions have been copied from PostgreSQL, and rewritten in
6 : // Rust. That's why they don't follow the usual Rust naming conventions, they
7 : // have been named the same as the corresponding PostgreSQL functions instead.
8 : //
9 :
10 : use crc32c::crc32c_append;
11 :
12 : use super::super::waldecoder::WalStreamDecoder;
13 : use super::bindings::{
14 : CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
15 : XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
16 : };
17 : use super::PG_MAJORVERSION;
18 : use crate::pg_constants;
19 : use crate::PG_TLI;
20 : use crate::{uint32, uint64, Oid};
21 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
22 :
23 : use bytes::BytesMut;
24 : use bytes::{Buf, Bytes};
25 :
26 : use log::*;
27 :
28 : use serde::Serialize;
29 : use std::fs::File;
30 : use std::io::prelude::*;
31 : use std::io::ErrorKind;
32 : use std::io::SeekFrom;
33 : use std::path::{Path, PathBuf};
34 : use std::time::SystemTime;
35 : use utils::bin_ser::DeserializeError;
36 : use utils::bin_ser::SerializeError;
37 :
38 : use utils::lsn::Lsn;
39 :
40 : pub const XLOG_FNAME_LEN: usize = 24;
41 : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
42 : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
43 : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
44 :
45 : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
46 : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
47 : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
48 : #[allow(clippy::identity_op)]
49 : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
50 :
51 : /// Interval of checkpointing metadata file. We should store metadata file to enforce
52 : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
53 : /// But flushing checkpoint file for each transaction seems to be too expensive,
54 : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
55 : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
56 : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
57 : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
58 : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
59 :
60 11482 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
61 11482 : (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
62 11482 : }
63 :
64 741 : pub fn XLogSegNoOffsetToRecPtr(
65 741 : segno: XLogSegNo,
66 741 : offset: u32,
67 741 : wal_segsz_bytes: usize,
68 741 : ) -> XLogRecPtr {
69 741 : segno * (wal_segsz_bytes as u64) + (offset as u64)
70 741 : }
71 :
72 5613 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
73 5613 : format!(
74 5613 : "{:>08X}{:>08X}{:>08X}",
75 5613 : tli,
76 5613 : logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
77 5613 : logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
78 5613 : )
79 5613 : }
80 :
81 256 : pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
82 256 : let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
83 256 : let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
84 256 : let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
85 256 : (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
86 256 : }
87 :
88 1006 : pub fn IsXLogFileName(fname: &str) -> bool {
89 6384 : return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
90 1006 : }
91 :
92 720 : pub fn IsPartialXLogFileName(fname: &str) -> bool {
93 720 : fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
94 720 : }
95 :
96 : /// If LSN points to the beginning of the page, then shift it to first record,
97 : /// otherwise align on 8-bytes boundary (required for WAL records)
98 1412 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
99 1412 : if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
100 11 : let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
101 6 : XLOG_SIZE_OF_XLOG_LONG_PHD
102 : } else {
103 5 : XLOG_SIZE_OF_XLOG_SHORT_PHD
104 : };
105 11 : lsn + hdr_size as u64
106 : } else {
107 1401 : lsn.align()
108 : }
109 1412 : }
110 :
111 660 : pub fn generate_pg_control(
112 660 : pg_control_bytes: &[u8],
113 660 : checkpoint_bytes: &[u8],
114 660 : lsn: Lsn,
115 660 : ) -> anyhow::Result<(Bytes, u64)> {
116 660 : let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
117 660 : let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
118 :
119 : // Generate new pg_control needed for bootstrap
120 660 : checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
121 660 :
122 660 : //reset some fields we don't want to preserve
123 660 : //TODO Check this.
124 660 : //We may need to determine the value from twophase data.
125 660 : checkpoint.oldestActiveXid = 0;
126 660 :
127 660 : //save new values in pg_control
128 660 : pg_control.checkPoint = 0;
129 660 : pg_control.checkPointCopy = checkpoint;
130 660 : pg_control.state = DBState_DB_SHUTDOWNED;
131 660 :
132 660 : Ok((pg_control.encode(), pg_control.system_identifier))
133 660 : }
134 :
135 742245 : pub fn get_current_timestamp() -> TimestampTz {
136 742245 : to_pg_timestamp(SystemTime::now())
137 742245 : }
138 :
139 742448 : pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
140 742448 : const UNIX_EPOCH_JDATE: u64 = 2440588; /* == date2j(1970, 1, 1) */
141 742448 : const POSTGRES_EPOCH_JDATE: u64 = 2451545; /* == date2j(2000, 1, 1) */
142 742448 : const SECS_PER_DAY: u64 = 86400;
143 742448 : const USECS_PER_SEC: u64 = 1000000;
144 742448 : match time.duration_since(SystemTime::UNIX_EPOCH) {
145 742448 : Ok(n) => {
146 742448 : ((n.as_secs() - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY))
147 742448 : * USECS_PER_SEC
148 742448 : + n.subsec_micros() as u64) as i64
149 : }
150 0 : Err(_) => panic!("SystemTime before UNIX EPOCH!"),
151 : }
152 742448 : }
153 :
154 : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
155 : // start_lsn must point to some previously known record boundary (beginning of
156 : // the next record). If no valid record after is found, start_lsn is returned
157 : // back.
158 105 : pub fn find_end_of_wal(
159 105 : data_dir: &Path,
160 105 : wal_seg_size: usize,
161 105 : start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
162 105 : ) -> anyhow::Result<Lsn> {
163 105 : let mut result = start_lsn;
164 105 : let mut curr_lsn = start_lsn;
165 105 : let mut buf = [0u8; XLOG_BLCKSZ];
166 105 : let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
167 105 : debug!("find_end_of_wal PG_VERSION: {}", pg_version);
168 :
169 105 : let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
170 :
171 : // loop over segments
172 134 : loop {
173 134 : let segno = curr_lsn.segment_number(wal_seg_size);
174 134 : let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
175 134 : let seg_file_path = data_dir.join(seg_file_name);
176 134 : match open_wal_segment(&seg_file_path)? {
177 : None => {
178 : // no more segments
179 10 : debug!(
180 0 : "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
181 : result, seg_file_path
182 : );
183 10 : return Ok(result);
184 : }
185 124 : Some(mut segment) => {
186 124 : let seg_offs = curr_lsn.segment_offset(wal_seg_size);
187 124 : segment.seek(SeekFrom::Start(seg_offs as u64))?;
188 : // loop inside segment
189 : loop {
190 61403 : let bytes_read = segment.read(&mut buf)?;
191 61403 : if bytes_read == 0 {
192 29 : break; // EOF
193 61374 : }
194 61374 : curr_lsn += bytes_read as u64;
195 61374 : decoder.feed_bytes(&buf[0..bytes_read]);
196 :
197 : // advance result past all completely read records
198 : loop {
199 5443270 : match decoder.poll_decode() {
200 5381896 : Ok(Some(record)) => result = record.0,
201 95 : Err(e) => {
202 95 : debug!(
203 14 : "find_end_of_wal reached end at {:?}, decode error: {:?}",
204 : result, e
205 : );
206 95 : return Ok(result);
207 : }
208 61279 : Ok(None) => break, // need more data
209 : }
210 : }
211 : }
212 : }
213 : }
214 : }
215 105 : }
216 :
217 : // Open .partial or full WAL segment file, if present.
218 134 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
219 134 : let mut partial_path = seg_file_path.to_owned();
220 134 : partial_path.set_extension("partial");
221 134 : match File::open(partial_path) {
222 95 : Ok(file) => Ok(Some(file)),
223 39 : Err(e) => match e.kind() {
224 : ErrorKind::NotFound => {
225 : // .partial not found, try full
226 39 : match File::open(seg_file_path) {
227 29 : Ok(file) => Ok(Some(file)),
228 10 : Err(e) => match e.kind() {
229 10 : ErrorKind::NotFound => Ok(None),
230 0 : _ => Err(e.into()),
231 : },
232 : }
233 : }
234 0 : _ => Err(e.into()),
235 : },
236 : }
237 134 : }
238 :
239 0 : pub fn main() {
240 0 : let mut data_dir = PathBuf::new();
241 0 : data_dir.push(".");
242 0 : let wal_end = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, Lsn(0)).unwrap();
243 0 : println!("wal_end={:?}", wal_end);
244 0 : }
245 :
246 : impl XLogRecord {
247 180464954 : pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
248 180464954 : use utils::bin_ser::LeSer;
249 180464954 : XLogRecord::des(buf)
250 180464954 : }
251 :
252 73526894 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
253 73526894 : use utils::bin_ser::LeSer;
254 73526894 : XLogRecord::des_from(&mut buf.reader())
255 73526894 : }
256 :
257 10 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
258 10 : use utils::bin_ser::LeSer;
259 10 : Ok(self.ser()?.into())
260 10 : }
261 :
262 : // Is this record an XLOG_SWITCH record? They need some special processing,
263 180464954 : pub fn is_xlog_switch_record(&self) -> bool {
264 180464954 : self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
265 180464954 : }
266 : }
267 :
268 : impl XLogPageHeaderData {
269 3231357 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
270 3231357 : use utils::bin_ser::LeSer;
271 3231357 : XLogPageHeaderData::des_from(&mut buf.reader())
272 3231357 : }
273 :
274 658 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
275 658 : use utils::bin_ser::LeSer;
276 658 : self.ser().map(|b| b.into())
277 658 : }
278 : }
279 :
280 : impl XLogLongPageHeaderData {
281 1533 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
282 1533 : use utils::bin_ser::LeSer;
283 1533 : XLogLongPageHeaderData::des_from(&mut buf.reader())
284 1533 : }
285 :
286 661 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
287 661 : use utils::bin_ser::LeSer;
288 661 : self.ser().map(|b| b.into())
289 661 : }
290 : }
291 :
292 : pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
293 :
294 : impl CheckPoint {
295 29040 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
296 29040 : use utils::bin_ser::LeSer;
297 29040 : Ok(self.ser()?.into())
298 29040 : }
299 :
300 2747 : pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
301 2747 : use utils::bin_ser::LeSer;
302 2747 : CheckPoint::des(buf)
303 2747 : }
304 :
305 : /// Update next XID based on provided new_xid and stored epoch.
306 : /// Next XID should be greater than new_xid. This handles 32-bit
307 : /// XID wraparound correctly.
308 : ///
309 : /// Returns 'true' if the XID was updated.
310 73551426 : pub fn update_next_xid(&mut self, xid: u32) -> bool {
311 73551426 : // nextXid should nw greater than any XID in WAL, so increment provided XID and check for wraparround.
312 73551426 : let mut new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
313 73551426 : // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
314 73551426 : // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
315 73551426 : new_xid =
316 73551426 : new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
317 73551426 : let full_xid = self.nextXid.value;
318 73551426 : let old_xid = full_xid as u32;
319 73551426 : if new_xid.wrapping_sub(old_xid) as i32 > 0 {
320 3940 : let mut epoch = full_xid >> 32;
321 3940 : if new_xid < old_xid {
322 0 : // wrap-around
323 0 : epoch += 1;
324 3940 : }
325 3940 : let nextXid = (epoch << 32) | new_xid as u64;
326 3940 :
327 3940 : if nextXid != self.nextXid.value {
328 3940 : self.nextXid = FullTransactionId { value: nextXid };
329 3940 : return true;
330 0 : }
331 73547486 : }
332 73547486 : false
333 73551426 : }
334 : }
335 :
336 : /// Generate new, empty WAL segment, with correct block headers at the first
337 : /// page of the segment and the page that contains the given LSN.
338 : /// We need this segment to start compute node.
339 661 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
340 661 : let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
341 661 :
342 661 : let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
343 661 :
344 661 : let page_off = lsn.block_offset();
345 661 : let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
346 661 :
347 661 : let first_page_only = seg_off < XLOG_BLCKSZ;
348 661 : let (shdr_rem_len, infoflags) = if first_page_only {
349 3 : (seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
350 : } else {
351 658 : (0, 0)
352 : };
353 :
354 661 : let hdr = XLogLongPageHeaderData {
355 661 : std: {
356 661 : XLogPageHeaderData {
357 661 : xlp_magic: XLOG_PAGE_MAGIC as u16,
358 661 : xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
359 661 : xlp_tli: PG_TLI,
360 661 : xlp_pageaddr: pageaddr,
361 661 : xlp_rem_len: shdr_rem_len as u32,
362 661 : ..Default::default() // Put 0 in padding fields.
363 661 : }
364 661 : },
365 661 : xlp_sysid: system_id,
366 661 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
367 661 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
368 661 : };
369 :
370 661 : let hdr_bytes = hdr.encode()?;
371 661 : seg_buf.extend_from_slice(&hdr_bytes);
372 661 :
373 661 : //zero out the rest of the file
374 661 : seg_buf.resize(WAL_SEGMENT_SIZE, 0);
375 661 :
376 661 : if !first_page_only {
377 658 : let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
378 658 : let header = XLogPageHeaderData {
379 658 : xlp_magic: XLOG_PAGE_MAGIC as u16,
380 658 : xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
381 657 : pg_constants::XLP_FIRST_IS_CONTRECORD
382 : } else {
383 1 : 0
384 : },
385 : xlp_tli: PG_TLI,
386 658 : xlp_pageaddr: lsn.page_lsn().0,
387 658 : xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
388 657 : page_off as u32
389 : } else {
390 1 : 0u32
391 : },
392 658 : ..Default::default() // Put 0 in padding fields.
393 : };
394 658 : let hdr_bytes = header.encode()?;
395 :
396 658 : debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
397 658 : debug_assert_ne!(block_offset, 0);
398 :
399 658 : seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
400 3 : }
401 :
402 661 : Ok(seg_buf.freeze())
403 661 : }
404 :
405 : #[repr(C)]
406 10 : #[derive(Serialize)]
407 : struct XlLogicalMessage {
408 : db_id: Oid,
409 : transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
410 : prefix_size: uint64,
411 : message_size: uint64,
412 : }
413 :
414 : impl XlLogicalMessage {
415 5 : pub fn encode(&self) -> Bytes {
416 5 : use utils::bin_ser::LeSer;
417 5 : self.ser().unwrap().into()
418 5 : }
419 : }
420 :
421 : /// Create new WAL record for non-transactional logical message.
422 : /// Used for creating artificial WAL for tests, as LogicalMessage
423 : /// record is basically no-op.
424 : ///
425 : /// NOTE: This leaves the xl_prev field zero. The safekeeper and
426 : /// pageserver tolerate that, but PostgreSQL does not.
427 5 : pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
428 5 : let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
429 5 : prefix_bytes.write_all(prefix.as_bytes()).unwrap();
430 5 : prefix_bytes.push(0);
431 5 :
432 5 : let message_bytes = message.as_bytes();
433 5 :
434 5 : let logical_message = XlLogicalMessage {
435 5 : db_id: 0,
436 5 : transactional: 0,
437 5 : prefix_size: prefix_bytes.len() as u64,
438 5 : message_size: message_bytes.len() as u64,
439 5 : };
440 5 :
441 5 : let mainrdata = logical_message.encode();
442 5 : let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
443 5 : // only short mainrdata is supported for now
444 5 : assert!(mainrdata_len <= 255);
445 5 : let mainrdata_len = mainrdata_len as u8;
446 5 :
447 5 : let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
448 5 : data.extend_from_slice(&mainrdata);
449 5 : data.extend_from_slice(&prefix_bytes);
450 5 : data.extend_from_slice(message_bytes);
451 5 :
452 5 : let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
453 5 :
454 5 : let mut header = XLogRecord {
455 5 : xl_tot_len: total_len as u32,
456 5 : xl_xid: 0,
457 5 : xl_prev: 0,
458 5 : xl_info: 0,
459 5 : xl_rmid: 21,
460 5 : __bindgen_padding_0: [0u8; 2usize],
461 5 : xl_crc: 0, // crc will be calculated later
462 5 : };
463 5 :
464 5 : let header_bytes = header.encode().expect("failed to encode header");
465 5 : let crc = crc32c_append(0, &data);
466 5 : let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
467 5 : header.xl_crc = crc;
468 5 :
469 5 : let mut wal: Vec<u8> = Vec::new();
470 5 : wal.extend_from_slice(&header.encode().expect("failed to encode header"));
471 5 : wal.extend_from_slice(&data);
472 5 :
473 5 : // WAL start position must be aligned at 8 bytes,
474 5 : // this will add padding for the next WAL record.
475 5 : const PADDING: usize = 8;
476 5 : let padding_rem = wal.len() % PADDING;
477 5 : if padding_rem != 0 {
478 0 : wal.resize(wal.len() + PADDING - padding_rem, 0);
479 5 : }
480 :
481 5 : wal
482 5 : }
483 :
484 : // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
|