Line data Source code
1 : #![allow(non_upper_case_globals)]
2 : #![allow(non_camel_case_types)]
3 : #![allow(non_snake_case)]
4 : // bindgen creates some unsafe code with no doc comments.
5 : #![allow(clippy::missing_safety_doc)]
6 : // noted at 1.63 that in many cases there's u32 -> u32 transmutes in bindgen code.
7 : #![allow(clippy::useless_transmute)]
8 : // modules included with the postgres_ffi macro depend on the types of the specific version's
9 : // types, and trigger a too eager lint.
10 : #![allow(clippy::duplicate_mod)]
11 : #![deny(clippy::undocumented_unsafe_blocks)]
12 :
13 : use bytes::Bytes;
14 : use utils::bin_ser::SerializeError;
15 : use utils::lsn::Lsn;
16 :
17 : pub use postgres_versioninfo::PgMajorVersion;
18 :
19 : macro_rules! postgres_ffi {
20 : ($version:ident) => {
21 : #[path = "."]
22 : pub mod $version {
23 : pub mod bindings {
24 : // bindgen generates bindings for a lot of stuff we don't need
25 : #![allow(dead_code)]
26 : #![allow(unsafe_op_in_unsafe_fn)]
27 : #![allow(clippy::undocumented_unsafe_blocks)]
28 : #![allow(clippy::ptr_offset_with_cast)]
29 :
30 : use serde::{Deserialize, Serialize};
31 : include!(concat!(
32 : env!("OUT_DIR"),
33 : "/bindings_",
34 : stringify!($version),
35 : ".rs"
36 : ));
37 :
38 : include!(concat!("pg_constants_", stringify!($version), ".rs"));
39 : }
40 : pub mod controlfile_utils;
41 : pub mod nonrelfile_utils;
42 : pub mod wal_craft_test_export;
43 : pub mod wal_generator;
44 : pub mod waldecoder_handler;
45 : pub mod xlog_utils;
46 :
47 : pub const PG_MAJORVERSION: &str = stringify!($version);
48 :
49 : // Re-export some symbols from bindings
50 : pub use bindings::{CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, XLogRecord};
51 :
52 : pub const ZERO_CHECKPOINT: bytes::Bytes =
53 : bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
54 : }
55 : };
56 : }
57 :
58 : #[macro_export]
59 : macro_rules! for_all_postgres_versions {
60 : ($macro:tt) => {
61 : $macro!(v14);
62 : $macro!(v15);
63 : $macro!(v16);
64 : $macro!(v17);
65 : };
66 : }
67 :
68 : for_all_postgres_versions! { postgres_ffi }
69 :
70 : /// dispatch_pgversion
71 : ///
72 : /// Run a code block in a context where the postgres_ffi bindings for a
73 : /// specific (supported) PostgreSQL version are `use`-ed in scope under the pgv
74 : /// identifier.
75 : /// If the provided pg_version is not supported, we panic!(), unless the
76 : /// optional third argument was provided (in which case that code will provide
77 : /// the default handling instead).
78 : ///
79 : /// Use like
80 : ///
81 : /// dispatch_pgversion!(my_pgversion, { pgv::constants::XLOG_DBASE_CREATE })
82 : /// dispatch_pgversion!(my_pgversion, pgv::constants::XLOG_DBASE_CREATE)
83 : ///
84 : /// Other uses are for macro-internal purposes only and strictly unsupported.
85 : ///
86 : #[macro_export]
87 : macro_rules! dispatch_pgversion {
88 : ($version:expr, $code:expr) => {
89 : dispatch_pgversion!($version, $code, panic!("Unknown PostgreSQL version {}", $version))
90 : };
91 : ($version:expr, $code:expr, $invalid_pgver_handling:expr) => {
92 : dispatch_pgversion!(
93 : $version => $code,
94 : default = $invalid_pgver_handling,
95 : pgversions = [
96 : $crate::PgMajorVersion::PG14 => v14,
97 : $crate::PgMajorVersion::PG15 => v15,
98 : $crate::PgMajorVersion::PG16 => v16,
99 : $crate::PgMajorVersion::PG17 => v17,
100 : ]
101 : )
102 : };
103 : ($pgversion:expr => $code:expr,
104 : default = $default:expr,
105 : pgversions = [$($sv:pat => $vsv:ident),+ $(,)?]) => {
106 : match ($pgversion.clone().into()) {
107 : $($sv => {
108 : use $crate::$vsv as pgv;
109 : $code
110 : },)+
111 : #[allow(unreachable_patterns)]
112 : _ => {
113 : $default
114 : }
115 : }
116 : };
117 : }
118 :
119 : #[macro_export]
120 : macro_rules! enum_pgversion_dispatch {
121 : ($name:expr, $typ:ident, $bind:ident, $code:block) => {
122 : enum_pgversion_dispatch!(
123 : name = $name,
124 : bind = $bind,
125 : typ = $typ,
126 : code = $code,
127 : pgversions = [
128 : V14 : v14,
129 : V15 : v15,
130 : V16 : v16,
131 : V17 : v17,
132 : ]
133 : )
134 : };
135 : (name = $name:expr,
136 : bind = $bind:ident,
137 : typ = $typ:ident,
138 : code = $code:block,
139 : pgversions = [$($variant:ident : $md:ident),+ $(,)?]) => {
140 : match $name {
141 : $(
142 : self::$typ::$variant($bind) => {
143 : use $crate::$md as pgv;
144 : $code
145 : }
146 : ),+,
147 : }
148 : };
149 : }
150 :
151 : #[macro_export]
152 : macro_rules! enum_pgversion {
153 : {$name:ident, pgv :: $t:ident} => {
154 : enum_pgversion!{
155 : name = $name,
156 : typ = $t,
157 : pgversions = [
158 : V14 : v14,
159 : V15 : v15,
160 : V16 : v16,
161 : V17 : v17,
162 : ]
163 : }
164 : };
165 : {$name:ident, pgv :: $p:ident :: $t:ident} => {
166 : enum_pgversion!{
167 : name = $name,
168 : path = $p,
169 : typ = $t,
170 : pgversions = [
171 : V14 : v14,
172 : V15 : v15,
173 : V16 : v16,
174 : V17 : v17,
175 : ]
176 : }
177 : };
178 : {name = $name:ident,
179 : typ = $t:ident,
180 : pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
181 : pub enum $name {
182 : $($variant ( $crate::$md::$t )),+
183 : }
184 : impl self::$name {
185 0 : pub fn pg_version(&self) -> PgMajorVersion {
186 0 : enum_pgversion_dispatch!(self, $name, _ign, {
187 0 : pgv::bindings::MY_PGVERSION
188 : })
189 : }
190 : }
191 : $(
192 : impl Into<self::$name> for $crate::$md::$t {
193 6 : fn into(self) -> self::$name {
194 6 : self::$name::$variant (self)
195 : }
196 : }
197 : )+
198 : };
199 : {name = $name:ident,
200 : path = $p:ident,
201 : $(typ = $t:ident,)?
202 : pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
203 : pub enum $name {
204 : $($variant $(($crate::$md::$p::$t))?),+
205 : }
206 : impl $name {
207 : pub fn pg_version(&self) -> PgMajorVersion {
208 : enum_pgversion_dispatch!(self, $name, _ign, {
209 : pgv::bindings::MY_PGVERSION
210 : })
211 : }
212 : }
213 : $(
214 : impl Into<$name> for $crate::$md::$p::$t {
215 : fn into(self) -> $name {
216 : $name::$variant (self)
217 : }
218 : }
219 : )+
220 : };
221 : }
222 :
223 : pub mod pg_constants;
224 : pub mod relfile_utils;
225 : pub mod walrecord;
226 :
227 : // Export some widely used datatypes that are unlikely to change across Postgres versions
228 : pub use v14::bindings::{
229 : BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
230 : RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
231 : uint64,
232 : };
233 : // Likewise for these, although the assumption that these don't change is a little more iffy.
234 : pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
235 : pub use v14::xlog_utils::{
236 : XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
237 : };
238 :
239 : // from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
240 : // --with-segsize=SEGSIZE, but assume the defaults for now.
241 : pub const BLCKSZ: u16 = 8192;
242 : pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
243 : pub const XLOG_BLCKSZ: usize = 8192;
244 : pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
245 :
246 : pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
247 :
248 : // Export some version independent functions that are used outside of this mod
249 : pub use v14::bindings::DBState_DB_SHUTDOWNED;
250 : pub use v14::xlog_utils::{
251 : XLogFileName, encode_logical_message, get_current_timestamp, to_pg_timestamp,
252 : try_from_pg_timestamp,
253 : };
254 :
255 54 : pub fn bkpimage_is_compressed(bimg_info: u8, version: PgMajorVersion) -> bool {
256 54 : dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info))
257 54 : }
258 :
259 5 : pub fn generate_wal_segment(
260 5 : segno: u64,
261 5 : system_id: u64,
262 5 : pg_version: PgMajorVersion,
263 5 : lsn: Lsn,
264 5 : ) -> Result<Bytes, SerializeError> {
265 5 : assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
266 :
267 5 : dispatch_pgversion!(
268 5 : pg_version,
269 0 : pgv::xlog_utils::generate_wal_segment(segno, system_id, lsn)
270 : )
271 5 : }
272 :
273 0 : pub fn generate_pg_control(
274 0 : pg_control_bytes: &[u8],
275 0 : checkpoint_bytes: &[u8],
276 0 : lsn: Lsn,
277 0 : pg_version: PgMajorVersion,
278 0 : ) -> anyhow::Result<(Bytes, u64, bool)> {
279 0 : dispatch_pgversion!(
280 0 : pg_version,
281 0 : pgv::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
282 0 : anyhow::bail!("Unknown version {}", pg_version)
283 : )
284 0 : }
285 :
286 : // PG timeline is always 1, changing it doesn't have any useful meaning in Neon.
287 : //
288 : // NOTE: this is not to be confused with Neon timelines; different concept!
289 : //
290 : // It's a shaky assumption, that it's always 1. We might import a
291 : // PostgreSQL data directory that has gone through timeline bumps,
292 : // for example. FIXME later.
293 : pub const PG_TLI: u32 = 1;
294 :
295 : // See TransactionIdIsNormal in transam.h
296 0 : pub const fn transaction_id_is_normal(id: TransactionId) -> bool {
297 0 : id > pg_constants::FIRST_NORMAL_TRANSACTION_ID
298 0 : }
299 :
300 : // See TransactionIdPrecedes in transam.c
301 0 : pub const fn transaction_id_precedes(id1: TransactionId, id2: TransactionId) -> bool {
302 : /*
303 : * If either ID is a permanent XID then we can just do unsigned
304 : * comparison. If both are normal, do a modulo-2^32 comparison.
305 : */
306 :
307 0 : if !(transaction_id_is_normal(id1)) || !transaction_id_is_normal(id2) {
308 0 : return id1 < id2;
309 0 : }
310 :
311 0 : let diff = id1.wrapping_sub(id2) as i32;
312 0 : diff < 0
313 0 : }
314 :
315 : // Check if page is not yet initialized (port of Postgres PageIsInit() macro)
316 12 : pub fn page_is_new(pg: &[u8]) -> bool {
317 12 : pg[14] == 0 && pg[15] == 0 // pg_upper == 0
318 12 : }
319 :
320 : // ExtractLSN from page header
321 0 : pub fn page_get_lsn(pg: &[u8]) -> Lsn {
322 0 : Lsn(
323 0 : ((u32::from_le_bytes(pg[0..4].try_into().unwrap()) as u64) << 32)
324 0 : | u32::from_le_bytes(pg[4..8].try_into().unwrap()) as u64,
325 0 : )
326 0 : }
327 :
328 9 : pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
329 9 : pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
330 9 : pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
331 9 : }
332 :
333 : // This is port of function with the same name from freespace.c.
334 : // The only difference is that it does not have "level" parameter because XLogRecordPageWithFreeSpace
335 : // always call it with level=FSM_BOTTOM_LEVEL
336 0 : pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber {
337 0 : let mut leafno = addr;
338 : const FSM_TREE_DEPTH: u32 = if pg_constants::SLOTS_PER_FSM_PAGE >= 1626 {
339 : 3
340 : } else {
341 : 4
342 : };
343 :
344 : /* Count upper level nodes required to address the leaf page */
345 0 : let mut pages: BlockNumber = 0;
346 0 : for _l in 0..FSM_TREE_DEPTH {
347 0 : pages += leafno + 1;
348 0 : leafno /= pg_constants::SLOTS_PER_FSM_PAGE;
349 0 : }
350 : /* Turn the page count into 0-based block number */
351 0 : pages - 1
352 0 : }
353 :
354 : pub mod waldecoder {
355 : use std::num::NonZeroU32;
356 :
357 : use crate::PgMajorVersion;
358 : use bytes::{Buf, Bytes, BytesMut};
359 : use thiserror::Error;
360 : use utils::lsn::Lsn;
361 :
362 : pub enum State {
363 : WaitingForRecord,
364 : ReassemblingRecord {
365 : recordbuf: BytesMut,
366 : contlen: NonZeroU32,
367 : },
368 : SkippingEverything {
369 : skip_until_lsn: Lsn,
370 : },
371 : }
372 :
373 : pub struct WalStreamDecoder {
374 : pub lsn: Lsn,
375 : pub pg_version: PgMajorVersion,
376 : pub inputbuf: BytesMut,
377 : pub state: State,
378 : }
379 :
380 : #[derive(Error, Debug, Clone)]
381 : #[error("{msg} at {lsn}")]
382 : pub struct WalDecodeError {
383 : pub msg: String,
384 : pub lsn: Lsn,
385 : }
386 :
387 : impl WalStreamDecoder {
388 9973 : pub fn new(lsn: Lsn, pg_version: PgMajorVersion) -> WalStreamDecoder {
389 9973 : WalStreamDecoder {
390 9973 : lsn,
391 9973 : pg_version,
392 9973 : inputbuf: BytesMut::new(),
393 9973 : state: State::WaitingForRecord,
394 9973 : }
395 9973 : }
396 :
397 : // The latest LSN position fed to the decoder.
398 1394 : pub fn available(&self) -> Lsn {
399 1394 : self.lsn + self.inputbuf.remaining() as u64
400 1394 : }
401 :
402 : /// Returns the LSN up to which the WAL decoder has processed.
403 : ///
404 : /// If [`Self::poll_decode`] returned a record, then this will return
405 : /// the end LSN of said record.
406 606 : pub fn lsn(&self) -> Lsn {
407 606 : self.lsn
408 606 : }
409 :
410 255739 : pub fn feed_bytes(&mut self, buf: &[u8]) {
411 255739 : self.inputbuf.extend_from_slice(buf);
412 255739 : }
413 :
414 353679 : pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
415 353679 : dispatch_pgversion!(
416 353679 : self.pg_version,
417 : {
418 : use pgv::waldecoder_handler::WalStreamDecoderHandler;
419 4167 : self.poll_decode_internal()
420 : },
421 0 : Err(WalDecodeError {
422 0 : msg: format!("Unknown version {}", self.pg_version),
423 0 : lsn: self.lsn,
424 0 : })
425 : )
426 353679 : }
427 : }
428 : }
|