Line data Source code
1 : #![allow(non_upper_case_globals)]
2 : #![allow(non_camel_case_types)]
3 : #![allow(non_snake_case)]
4 : // bindgen creates some unsafe code with no doc comments.
5 : #![allow(clippy::missing_safety_doc)]
6 : // noted at 1.63 that in many cases there's u32 -> u32 transmutes in bindgen code.
7 : #![allow(clippy::useless_transmute)]
8 : // modules included with the postgres_ffi macro depend on the types of the specific version's
9 : // types, and trigger a too eager lint.
10 : #![allow(clippy::duplicate_mod)]
11 : #![deny(clippy::undocumented_unsafe_blocks)]
12 :
13 : use bytes::Bytes;
14 : use utils::bin_ser::SerializeError;
15 : use utils::lsn::Lsn;
16 :
17 : pub use postgres_versioninfo::PgMajorVersion;
18 :
19 : macro_rules! postgres_ffi {
20 : ($version:ident) => {
21 : #[path = "."]
22 : pub mod $version {
23 : pub mod bindings {
24 : // bindgen generates bindings for a lot of stuff we don't need
25 : #![allow(dead_code)]
26 : #![allow(unsafe_op_in_unsafe_fn)]
27 : #![allow(clippy::undocumented_unsafe_blocks)]
28 : #![allow(clippy::ptr_offset_with_cast)]
29 :
30 : use serde::{Deserialize, Serialize};
31 : include!(concat!(
32 : env!("OUT_DIR"),
33 : "/bindings_",
34 : stringify!($version),
35 : ".rs"
36 : ));
37 :
38 : include!(concat!("pg_constants_", stringify!($version), ".rs"));
39 : }
40 : pub mod controlfile_utils;
41 : pub mod nonrelfile_utils;
42 : pub mod wal_craft_test_export;
43 : pub mod wal_generator;
44 : pub mod waldecoder_handler;
45 : pub mod xlog_utils;
46 :
47 : pub const PG_MAJORVERSION: &str = stringify!($version);
48 :
49 : // Re-export some symbols from bindings
50 : pub use bindings::{CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, XLogRecord};
51 :
52 : pub const ZERO_CHECKPOINT: bytes::Bytes =
53 : bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
54 : }
55 : };
56 : }
57 :
58 : #[macro_export]
59 : macro_rules! for_all_postgres_versions {
60 : ($macro:tt) => {
61 : $macro!(v14);
62 : $macro!(v15);
63 : $macro!(v16);
64 : $macro!(v17);
65 : };
66 : }
67 :
68 : for_all_postgres_versions! { postgres_ffi }
69 :
70 : /// dispatch_pgversion
71 : ///
72 : /// Run a code block in a context where the postgres_ffi bindings for a
73 : /// specific (supported) PostgreSQL version are `use`-ed in scope under the pgv
74 : /// identifier.
75 : /// If the provided pg_version is not supported, we panic!(), unless the
76 : /// optional third argument was provided (in which case that code will provide
77 : /// the default handling instead).
78 : ///
79 : /// Use like
80 : ///
81 : /// dispatch_pgversion!(my_pgversion, { pgv::constants::XLOG_DBASE_CREATE })
82 : /// dispatch_pgversion!(my_pgversion, pgv::constants::XLOG_DBASE_CREATE)
83 : ///
84 : /// Other uses are for macro-internal purposes only and strictly unsupported.
85 : ///
86 : #[macro_export]
87 : macro_rules! dispatch_pgversion {
88 : ($version:expr, $code:expr) => {
89 : dispatch_pgversion!($version, $code, panic!("Unknown PostgreSQL version {}", $version))
90 : };
91 : ($version:expr, $code:expr, $invalid_pgver_handling:expr) => {
92 : dispatch_pgversion!(
93 : $version => $code,
94 : default = $invalid_pgver_handling,
95 : pgversions = [
96 : $crate::PgMajorVersion::PG14 => v14,
97 : $crate::PgMajorVersion::PG15 => v15,
98 : $crate::PgMajorVersion::PG16 => v16,
99 : $crate::PgMajorVersion::PG17 => v17,
100 : ]
101 : )
102 : };
103 : ($pgversion:expr => $code:expr,
104 : default = $default:expr,
105 : pgversions = [$($sv:pat => $vsv:ident),+ $(,)?]) => {
106 : match ($pgversion.clone().into()) {
107 : $($sv => {
108 : use $crate::$vsv as pgv;
109 : $code
110 : },)+
111 : #[allow(unreachable_patterns)]
112 : _ => {
113 : $default
114 : }
115 : }
116 : };
117 : }
118 :
119 : #[macro_export]
120 : macro_rules! enum_pgversion_dispatch {
121 : ($name:expr, $typ:ident, $bind:ident, $code:block) => {
122 : enum_pgversion_dispatch!(
123 : name = $name,
124 : bind = $bind,
125 : typ = $typ,
126 : code = $code,
127 : pgversions = [
128 : V14 : v14,
129 : V15 : v15,
130 : V16 : v16,
131 : V17 : v17,
132 : ]
133 : )
134 : };
135 : (name = $name:expr,
136 : bind = $bind:ident,
137 : typ = $typ:ident,
138 : code = $code:block,
139 : pgversions = [$($variant:ident : $md:ident),+ $(,)?]) => {
140 : match $name {
141 : $(
142 : self::$typ::$variant($bind) => {
143 : use $crate::$md as pgv;
144 : $code
145 : }
146 : ),+,
147 : }
148 : };
149 : }
150 :
151 : #[macro_export]
152 : macro_rules! enum_pgversion {
153 : {$name:ident, pgv :: $t:ident} => {
154 : enum_pgversion!{
155 : name = $name,
156 : typ = $t,
157 : pgversions = [
158 : V14 : v14,
159 : V15 : v15,
160 : V16 : v16,
161 : V17 : v17,
162 : ]
163 : }
164 : };
165 : {$name:ident, pgv :: $p:ident :: $t:ident} => {
166 : enum_pgversion!{
167 : name = $name,
168 : path = $p,
169 : typ = $t,
170 : pgversions = [
171 : V14 : v14,
172 : V15 : v15,
173 : V16 : v16,
174 : V17 : v17,
175 : ]
176 : }
177 : };
178 : {name = $name:ident,
179 : typ = $t:ident,
180 : pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
181 : pub enum $name {
182 : $($variant ( $crate::$md::$t )),+
183 : }
184 : impl self::$name {
185 0 : pub fn pg_version(&self) -> PgMajorVersion {
186 0 : enum_pgversion_dispatch!(self, $name, _ign, {
187 0 : pgv::bindings::MY_PGVERSION
188 : })
189 : }
190 : }
191 : $(
192 : impl Into<self::$name> for $crate::$md::$t {
193 6 : fn into(self) -> self::$name {
194 6 : self::$name::$variant (self)
195 : }
196 : }
197 : )+
198 : };
199 : {name = $name:ident,
200 : path = $p:ident,
201 : $(typ = $t:ident,)?
202 : pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
203 : pub enum $name {
204 : $($variant $(($crate::$md::$p::$t))?),+
205 : }
206 : impl $name {
207 : pub fn pg_version(&self) -> PgMajorVersion {
208 : enum_pgversion_dispatch!(self, $name, _ign, {
209 : pgv::bindings::MY_PGVERSION
210 : })
211 : }
212 : }
213 : $(
214 : impl Into<$name> for $crate::$md::$p::$t {
215 : fn into(self) -> $name {
216 : $name::$variant (self)
217 : }
218 : }
219 : )+
220 : };
221 : }
222 :
223 : pub mod pg_constants;
224 : pub mod relfile_utils;
225 : pub mod walrecord;
226 :
227 : // Export some widely used datatypes that are unlikely to change across Postgres versions
228 : pub use v14::bindings::{
229 : BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
230 : RepOriginId, TimeLineID, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32, uint64,
231 : };
232 : // Likewise for these, although the assumption that these don't change is a little more iffy.
233 : pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
234 : pub use v14::xlog_utils::{
235 : XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
236 : };
237 :
238 : // from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
239 : // --with-segsize=SEGSIZE, but assume the defaults for now.
240 : pub const BLCKSZ: u16 = 8192;
241 : pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
242 : pub const XLOG_BLCKSZ: usize = 8192;
243 : pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
244 :
245 : pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
246 :
247 : // Export some version independent functions that are used outside of this mod
248 : pub use v14::bindings::DBState_DB_SHUTDOWNED;
249 : pub use v14::xlog_utils::{
250 : XLogFileName, encode_logical_message, get_current_timestamp, to_pg_timestamp,
251 : try_from_pg_timestamp,
252 : };
253 :
254 54 : pub fn bkpimage_is_compressed(bimg_info: u8, version: PgMajorVersion) -> bool {
255 54 : dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info))
256 54 : }
257 :
258 5 : pub fn generate_wal_segment(
259 5 : segno: u64,
260 5 : system_id: u64,
261 5 : pg_version: PgMajorVersion,
262 5 : lsn: Lsn,
263 5 : ) -> Result<Bytes, SerializeError> {
264 5 : assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
265 :
266 5 : dispatch_pgversion!(
267 5 : pg_version,
268 0 : pgv::xlog_utils::generate_wal_segment(segno, system_id, lsn)
269 : )
270 5 : }
271 :
272 0 : pub fn generate_pg_control(
273 0 : pg_control_bytes: &[u8],
274 0 : checkpoint_bytes: &[u8],
275 0 : lsn: Lsn,
276 0 : pg_version: PgMajorVersion,
277 0 : ) -> anyhow::Result<(Bytes, u64, bool)> {
278 0 : dispatch_pgversion!(
279 0 : pg_version,
280 0 : pgv::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
281 0 : anyhow::bail!("Unknown version {}", pg_version)
282 : )
283 0 : }
284 :
285 : // PG timeline is always 1, changing it doesn't have any useful meaning in Neon.
286 : //
287 : // NOTE: this is not to be confused with Neon timelines; different concept!
288 : //
289 : // It's a shaky assumption, that it's always 1. We might import a
290 : // PostgreSQL data directory that has gone through timeline bumps,
291 : // for example. FIXME later.
292 : pub const PG_TLI: u32 = 1;
293 :
294 : // See TransactionIdIsNormal in transam.h
295 0 : pub const fn transaction_id_is_normal(id: TransactionId) -> bool {
296 0 : id > pg_constants::FIRST_NORMAL_TRANSACTION_ID
297 0 : }
298 :
299 : // See TransactionIdPrecedes in transam.c
300 0 : pub const fn transaction_id_precedes(id1: TransactionId, id2: TransactionId) -> bool {
301 : /*
302 : * If either ID is a permanent XID then we can just do unsigned
303 : * comparison. If both are normal, do a modulo-2^32 comparison.
304 : */
305 :
306 0 : if !(transaction_id_is_normal(id1)) || !transaction_id_is_normal(id2) {
307 0 : return id1 < id2;
308 0 : }
309 :
310 0 : let diff = id1.wrapping_sub(id2) as i32;
311 0 : diff < 0
312 0 : }
313 :
314 : // Check if page is not yet initialized (port of Postgres PageIsInit() macro)
315 12 : pub fn page_is_new(pg: &[u8]) -> bool {
316 12 : pg[14] == 0 && pg[15] == 0 // pg_upper == 0
317 12 : }
318 :
319 : // ExtractLSN from page header
320 0 : pub fn page_get_lsn(pg: &[u8]) -> Lsn {
321 0 : Lsn(
322 0 : ((u32::from_le_bytes(pg[0..4].try_into().unwrap()) as u64) << 32)
323 0 : | u32::from_le_bytes(pg[4..8].try_into().unwrap()) as u64,
324 0 : )
325 0 : }
326 :
327 9 : pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
328 9 : pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
329 9 : pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
330 9 : }
331 :
332 : // This is port of function with the same name from freespace.c.
333 : // The only difference is that it does not have "level" parameter because XLogRecordPageWithFreeSpace
334 : // always call it with level=FSM_BOTTOM_LEVEL
335 0 : pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber {
336 0 : let mut leafno = addr;
337 : const FSM_TREE_DEPTH: u32 = if pg_constants::SLOTS_PER_FSM_PAGE >= 1626 {
338 : 3
339 : } else {
340 : 4
341 : };
342 :
343 : /* Count upper level nodes required to address the leaf page */
344 0 : let mut pages: BlockNumber = 0;
345 0 : for _l in 0..FSM_TREE_DEPTH {
346 0 : pages += leafno + 1;
347 0 : leafno /= pg_constants::SLOTS_PER_FSM_PAGE;
348 0 : }
349 : /* Turn the page count into 0-based block number */
350 0 : pages - 1
351 0 : }
352 :
353 : pub mod waldecoder {
354 : use std::num::NonZeroU32;
355 :
356 : use crate::PgMajorVersion;
357 : use bytes::{Buf, Bytes, BytesMut};
358 : use thiserror::Error;
359 : use utils::lsn::Lsn;
360 :
361 : pub enum State {
362 : WaitingForRecord,
363 : ReassemblingRecord {
364 : recordbuf: BytesMut,
365 : contlen: NonZeroU32,
366 : },
367 : SkippingEverything {
368 : skip_until_lsn: Lsn,
369 : },
370 : }
371 :
372 : pub struct WalStreamDecoder {
373 : pub lsn: Lsn,
374 : pub pg_version: PgMajorVersion,
375 : pub inputbuf: BytesMut,
376 : pub state: State,
377 : }
378 :
379 : #[derive(Error, Debug, Clone)]
380 : #[error("{msg} at {lsn}")]
381 : pub struct WalDecodeError {
382 : pub msg: String,
383 : pub lsn: Lsn,
384 : }
385 :
386 : impl WalStreamDecoder {
387 10274 : pub fn new(lsn: Lsn, pg_version: PgMajorVersion) -> WalStreamDecoder {
388 10274 : WalStreamDecoder {
389 10274 : lsn,
390 10274 : pg_version,
391 10274 : inputbuf: BytesMut::new(),
392 10274 : state: State::WaitingForRecord,
393 10274 : }
394 10274 : }
395 :
396 : // The latest LSN position fed to the decoder.
397 1353 : pub fn available(&self) -> Lsn {
398 1353 : self.lsn + self.inputbuf.remaining() as u64
399 1353 : }
400 :
401 : /// Returns the LSN up to which the WAL decoder has processed.
402 : ///
403 : /// If [`Self::poll_decode`] returned a record, then this will return
404 : /// the end LSN of said record.
405 606 : pub fn lsn(&self) -> Lsn {
406 606 : self.lsn
407 606 : }
408 :
409 255683 : pub fn feed_bytes(&mut self, buf: &[u8]) {
410 255683 : self.inputbuf.extend_from_slice(buf);
411 255683 : }
412 :
413 353660 : pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
414 353660 : dispatch_pgversion!(
415 353660 : self.pg_version,
416 : {
417 : use pgv::waldecoder_handler::WalStreamDecoderHandler;
418 4167 : self.poll_decode_internal()
419 : },
420 0 : Err(WalDecodeError {
421 0 : msg: format!("Unknown version {}", self.pg_version),
422 0 : lsn: self.lsn,
423 0 : })
424 : )
425 353660 : }
426 : }
427 : }
|