Line data Source code
1 : use anyhow::Context;
2 : use byteorder::{ByteOrder, LittleEndian};
3 : use bytes::BytesMut;
4 : use pageserver_api::key::Key;
5 : use pageserver_api::reltag::SlruKind;
6 : use postgres_ffi::v14::nonrelfile_utils::{
7 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
8 : transaction_id_set_status,
9 : };
10 : use postgres_ffi::{BLCKSZ, pg_constants};
11 : use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
12 : use tracing::*;
13 : use utils::lsn::Lsn;
14 : use wal_decoder::models::record::NeonWalRecord;
15 :
16 : /// Can this request be served by neon redo functions
17 : /// or we need to pass it to wal-redo postgres process?
18 1403516 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
19 : // Currently, we don't have bespoken Rust code to replay any
20 : // Postgres WAL records. But everything else is handled in neon.
21 : #[allow(clippy::match_like_matches_macro)]
22 1403516 : match rec {
23 : NeonWalRecord::Postgres {
24 : will_init: _,
25 : rec: _,
26 6 : } => false,
27 1403510 : _ => true,
28 : }
29 1403516 : }
30 :
31 1403510 : pub(crate) fn apply_in_neon(
32 1403510 : record: &NeonWalRecord,
33 1403510 : lsn: Lsn,
34 1403510 : key: Key,
35 1403510 : page: &mut BytesMut,
36 1403510 : ) -> Result<(), anyhow::Error> {
37 1403510 : match record {
38 : NeonWalRecord::Postgres {
39 : will_init: _,
40 : rec: _,
41 : } => {
42 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
43 : }
44 : //
45 : // Code copied from PostgreSQL `visibilitymap_prepare_truncate` function in `visibilitymap.c`
46 : //
47 : NeonWalRecord::TruncateVisibilityMap {
48 0 : trunc_byte,
49 0 : trunc_offs,
50 : } => {
51 : // sanity check that this is modifying the correct relation
52 0 : let (rel, _) = key.to_rel_block().context("invalid record")?;
53 0 : assert!(
54 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
55 0 : "TruncateVisibilityMap record on unexpected rel {rel}"
56 : );
57 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
58 0 : map[*trunc_byte + 1..].fill(0u8);
59 : /*----
60 : * Mask out the unwanted bits of the last remaining byte.
61 : *
62 : * ((1 << 0) - 1) = 00000000
63 : * ((1 << 1) - 1) = 00000001
64 : * ...
65 : * ((1 << 6) - 1) = 00111111
66 : * ((1 << 7) - 1) = 01111111
67 : *----
68 : */
69 0 : map[*trunc_byte] &= (1 << *trunc_offs) - 1;
70 : }
71 : NeonWalRecord::ClearVisibilityMapFlags {
72 0 : new_heap_blkno,
73 0 : old_heap_blkno,
74 0 : flags,
75 : } => {
76 : // sanity check that this is modifying the correct relation
77 0 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
78 0 : assert!(
79 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
80 0 : "ClearVisibilityMapFlags record on unexpected rel {rel}"
81 : );
82 0 : if let Some(heap_blkno) = *new_heap_blkno {
83 : // Calculate the VM block and offset that corresponds to the heap block.
84 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
85 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
86 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
87 :
88 : // Check that we're modifying the correct VM block.
89 0 : assert!(map_block == blknum);
90 :
91 : // equivalent to PageGetContents(page)
92 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
93 :
94 0 : map[map_byte as usize] &= !(flags << map_offset);
95 : // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
96 0 : if !postgres_ffi::page_is_new(page) {
97 0 : postgres_ffi::page_set_lsn(page, lsn);
98 0 : }
99 0 : }
100 :
101 : // Repeat for 'old_heap_blkno', if any
102 0 : if let Some(heap_blkno) = *old_heap_blkno {
103 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
104 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
105 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
106 :
107 0 : assert!(map_block == blknum);
108 :
109 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
110 :
111 0 : map[map_byte as usize] &= !(flags << map_offset);
112 : // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
113 0 : if !postgres_ffi::page_is_new(page) {
114 0 : postgres_ffi::page_set_lsn(page, lsn);
115 0 : }
116 0 : }
117 : }
118 : // Non-relational WAL records are handled here, with custom code that has the
119 : // same effects as the corresponding Postgres WAL redo function.
120 0 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
121 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
122 0 : assert_eq!(
123 : slru_kind,
124 : SlruKind::Clog,
125 0 : "ClogSetCommitted record with unexpected key {key}"
126 : );
127 0 : for &xid in xids {
128 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
129 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
130 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
131 :
132 : // Check that we're modifying the correct CLOG block.
133 0 : assert!(
134 0 : segno == expected_segno,
135 0 : "ClogSetCommitted record for XID {xid} with unexpected key {key}"
136 : );
137 0 : assert!(
138 0 : blknum == expected_blknum,
139 0 : "ClogSetCommitted record for XID {xid} with unexpected key {key}"
140 : );
141 :
142 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
143 : }
144 :
145 : // Append the timestamp
146 0 : if page.len() == BLCKSZ as usize + 8 {
147 0 : page.truncate(BLCKSZ as usize);
148 0 : }
149 0 : if page.len() == BLCKSZ as usize {
150 0 : page.extend_from_slice(×tamp.to_be_bytes());
151 0 : } else {
152 0 : warn!(
153 0 : "CLOG blk {} in seg {} has invalid size {}",
154 : blknum,
155 : segno,
156 0 : page.len()
157 : );
158 : }
159 : }
160 0 : NeonWalRecord::ClogSetAborted { xids } => {
161 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
162 0 : assert_eq!(
163 : slru_kind,
164 : SlruKind::Clog,
165 0 : "ClogSetAborted record with unexpected key {key}"
166 : );
167 0 : for &xid in xids {
168 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
169 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
170 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
171 :
172 : // Check that we're modifying the correct CLOG block.
173 0 : assert!(
174 0 : segno == expected_segno,
175 0 : "ClogSetAborted record for XID {xid} with unexpected key {key}"
176 : );
177 0 : assert!(
178 0 : blknum == expected_blknum,
179 0 : "ClogSetAborted record for XID {xid} with unexpected key {key}"
180 : );
181 :
182 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
183 : }
184 : }
185 0 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
186 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
187 0 : assert_eq!(
188 : slru_kind,
189 : SlruKind::MultiXactOffsets,
190 0 : "MultixactOffsetCreate record with unexpected key {key}"
191 : );
192 : // Compute the block and offset to modify.
193 : // See RecordNewMultiXact in PostgreSQL sources.
194 0 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
195 0 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
196 0 : let offset = (entryno * 4) as usize;
197 :
198 : // Check that we're modifying the correct multixact-offsets block.
199 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
200 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
201 0 : assert!(
202 0 : segno == expected_segno,
203 0 : "MultiXactOffsetsCreate record for multi-xid {mid} with unexpected key {key}"
204 : );
205 0 : assert!(
206 0 : blknum == expected_blknum,
207 0 : "MultiXactOffsetsCreate record for multi-xid {mid} with unexpected key {key}"
208 : );
209 :
210 0 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
211 : }
212 0 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
213 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
214 0 : assert_eq!(
215 : slru_kind,
216 : SlruKind::MultiXactMembers,
217 0 : "MultixactMembersCreate record with unexpected key {key}"
218 : );
219 0 : for (i, member) in members.iter().enumerate() {
220 0 : let offset = moff + i as u32;
221 :
222 : // Compute the block and offset to modify.
223 : // See RecordNewMultiXact in PostgreSQL sources.
224 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
225 0 : let memberoff = mx_offset_to_member_offset(offset);
226 0 : let flagsoff = mx_offset_to_flags_offset(offset);
227 0 : let bshift = mx_offset_to_flags_bitshift(offset);
228 :
229 : // Check that we're modifying the correct multixact-members block.
230 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
231 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
232 0 : assert!(
233 0 : segno == expected_segno,
234 0 : "MultiXactMembersCreate record for offset {moff} with unexpected key {key}"
235 : );
236 0 : assert!(
237 0 : blknum == expected_blknum,
238 0 : "MultiXactMembersCreate record for offset {moff} with unexpected key {key}"
239 : );
240 :
241 0 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
242 0 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
243 0 : flagsval |= member.status << bshift;
244 0 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
245 0 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
246 : }
247 : }
248 : NeonWalRecord::AuxFile { .. } => {
249 : // No-op: this record will never be created in aux v2.
250 0 : warn!("AuxFile record should not be created in aux v2");
251 : }
252 : #[cfg(feature = "testing")]
253 : NeonWalRecord::Test {
254 1403510 : append,
255 1403510 : clear,
256 1403510 : will_init,
257 1403510 : only_if,
258 : } => {
259 : use bytes::BufMut;
260 1403510 : if *will_init {
261 13745 : assert!(*clear, "init record must be clear to ensure correctness");
262 13745 : assert!(
263 13745 : page.is_empty(),
264 0 : "init record must be the first entry to ensure correctness"
265 : );
266 1389765 : }
267 1403510 : if *clear {
268 13746 : page.clear();
269 1389764 : }
270 1403510 : if let Some(only_if) = only_if {
271 2 : if page != only_if.as_bytes() {
272 1 : return Err(anyhow::anyhow!(
273 1 : "the current image does not match the expected image, cannot append"
274 1 : ));
275 1 : }
276 1403508 : }
277 1403509 : page.put_slice(append.as_bytes());
278 : }
279 : }
280 1403509 : Ok(())
281 1403510 : }
|