Line data Source code
1 : use anyhow::Context;
2 : use byteorder::{ByteOrder, LittleEndian};
3 : use bytes::BytesMut;
4 : use pageserver_api::key::Key;
5 : use pageserver_api::record::NeonWalRecord;
6 : use pageserver_api::reltag::SlruKind;
7 : use postgres_ffi::pg_constants;
8 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
9 : use postgres_ffi::v14::nonrelfile_utils::{
10 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
11 : transaction_id_set_status,
12 : };
13 : use postgres_ffi::BLCKSZ;
14 : use tracing::*;
15 : use utils::lsn::Lsn;
16 :
17 : /// Can this request be served by neon redo functions
18 : /// or we need to pass it to wal-redo postgres process?
19 582 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
20 582 : // Currently, we don't have bespoken Rust code to replay any
21 582 : // Postgres WAL records. But everything else is handled in neon.
22 582 : #[allow(clippy::match_like_matches_macro)]
23 582 : match rec {
24 : NeonWalRecord::Postgres {
25 : will_init: _,
26 : rec: _,
27 12 : } => false,
28 570 : _ => true,
29 : }
30 582 : }
31 :
32 570 : pub(crate) fn apply_in_neon(
33 570 : record: &NeonWalRecord,
34 570 : lsn: Lsn,
35 570 : key: Key,
36 570 : page: &mut BytesMut,
37 570 : ) -> Result<(), anyhow::Error> {
38 570 : match record {
39 : NeonWalRecord::Postgres {
40 : will_init: _,
41 : rec: _,
42 : } => {
43 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
44 : }
45 : //
46 : // Code copied from PostgreSQL `visibilitymap_prepare_truncate` function in `visibilitymap.c`
47 : //
48 : NeonWalRecord::TruncateVisibilityMap {
49 0 : trunc_byte,
50 0 : trunc_offs,
51 : } => {
52 : // sanity check that this is modifying the correct relation
53 0 : let (rel, _) = key.to_rel_block().context("invalid record")?;
54 0 : assert!(
55 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
56 0 : "TruncateVisibilityMap record on unexpected rel {}",
57 : rel
58 : );
59 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
60 0 : map[*trunc_byte + 1..].fill(0u8);
61 0 : /*----
62 0 : * Mask out the unwanted bits of the last remaining byte.
63 0 : *
64 0 : * ((1 << 0) - 1) = 00000000
65 0 : * ((1 << 1) - 1) = 00000001
66 0 : * ...
67 0 : * ((1 << 6) - 1) = 00111111
68 0 : * ((1 << 7) - 1) = 01111111
69 0 : *----
70 0 : */
71 0 : map[*trunc_byte] &= (1 << *trunc_offs) - 1;
72 : }
73 : NeonWalRecord::ClearVisibilityMapFlags {
74 0 : new_heap_blkno,
75 0 : old_heap_blkno,
76 0 : flags,
77 : } => {
78 : // sanity check that this is modifying the correct relation
79 0 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
80 0 : assert!(
81 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
82 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
83 : rel
84 : );
85 0 : if let Some(heap_blkno) = *new_heap_blkno {
86 : // Calculate the VM block and offset that corresponds to the heap block.
87 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
88 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
89 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
90 0 :
91 0 : // Check that we're modifying the correct VM block.
92 0 : assert!(map_block == blknum);
93 :
94 : // equivalent to PageGetContents(page)
95 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
96 0 :
97 0 : map[map_byte as usize] &= !(flags << map_offset);
98 0 : // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
99 0 : if !postgres_ffi::page_is_new(page) {
100 0 : postgres_ffi::page_set_lsn(page, lsn);
101 0 : }
102 0 : }
103 :
104 : // Repeat for 'old_heap_blkno', if any
105 0 : if let Some(heap_blkno) = *old_heap_blkno {
106 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
107 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
108 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
109 0 :
110 0 : assert!(map_block == blknum);
111 :
112 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
113 0 :
114 0 : map[map_byte as usize] &= !(flags << map_offset);
115 0 : // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
116 0 : if !postgres_ffi::page_is_new(page) {
117 0 : postgres_ffi::page_set_lsn(page, lsn);
118 0 : }
119 0 : }
120 : }
121 : // Non-relational WAL records are handled here, with custom code that has the
122 : // same effects as the corresponding Postgres WAL redo function.
123 0 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
124 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
125 0 : assert_eq!(
126 : slru_kind,
127 : SlruKind::Clog,
128 0 : "ClogSetCommitted record with unexpected key {}",
129 : key
130 : );
131 0 : for &xid in xids {
132 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
133 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
134 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
135 0 :
136 0 : // Check that we're modifying the correct CLOG block.
137 0 : assert!(
138 0 : segno == expected_segno,
139 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
140 : xid,
141 : key
142 : );
143 0 : assert!(
144 0 : blknum == expected_blknum,
145 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
146 : xid,
147 : key
148 : );
149 :
150 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
151 : }
152 :
153 : // Append the timestamp
154 0 : if page.len() == BLCKSZ as usize + 8 {
155 0 : page.truncate(BLCKSZ as usize);
156 0 : }
157 0 : if page.len() == BLCKSZ as usize {
158 0 : page.extend_from_slice(×tamp.to_be_bytes());
159 0 : } else {
160 0 : warn!(
161 0 : "CLOG blk {} in seg {} has invalid size {}",
162 0 : blknum,
163 0 : segno,
164 0 : page.len()
165 : );
166 : }
167 : }
168 0 : NeonWalRecord::ClogSetAborted { xids } => {
169 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
170 0 : assert_eq!(
171 : slru_kind,
172 : SlruKind::Clog,
173 0 : "ClogSetAborted record with unexpected key {}",
174 : key
175 : );
176 0 : for &xid in xids {
177 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
178 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
179 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
180 0 :
181 0 : // Check that we're modifying the correct CLOG block.
182 0 : assert!(
183 0 : segno == expected_segno,
184 0 : "ClogSetAborted record for XID {} with unexpected key {}",
185 : xid,
186 : key
187 : );
188 0 : assert!(
189 0 : blknum == expected_blknum,
190 0 : "ClogSetAborted record for XID {} with unexpected key {}",
191 : xid,
192 : key
193 : );
194 :
195 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
196 : }
197 : }
198 0 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
199 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
200 0 : assert_eq!(
201 : slru_kind,
202 : SlruKind::MultiXactOffsets,
203 0 : "MultixactOffsetCreate record with unexpected key {}",
204 : key
205 : );
206 : // Compute the block and offset to modify.
207 : // See RecordNewMultiXact in PostgreSQL sources.
208 0 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
209 0 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
210 0 : let offset = (entryno * 4) as usize;
211 0 :
212 0 : // Check that we're modifying the correct multixact-offsets block.
213 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
214 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
215 0 : assert!(
216 0 : segno == expected_segno,
217 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
218 : mid,
219 : key
220 : );
221 0 : assert!(
222 0 : blknum == expected_blknum,
223 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
224 : mid,
225 : key
226 : );
227 :
228 0 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
229 : }
230 0 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
231 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
232 0 : assert_eq!(
233 : slru_kind,
234 : SlruKind::MultiXactMembers,
235 0 : "MultixactMembersCreate record with unexpected key {}",
236 : key
237 : );
238 0 : for (i, member) in members.iter().enumerate() {
239 0 : let offset = moff + i as u32;
240 0 :
241 0 : // Compute the block and offset to modify.
242 0 : // See RecordNewMultiXact in PostgreSQL sources.
243 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
244 0 : let memberoff = mx_offset_to_member_offset(offset);
245 0 : let flagsoff = mx_offset_to_flags_offset(offset);
246 0 : let bshift = mx_offset_to_flags_bitshift(offset);
247 0 :
248 0 : // Check that we're modifying the correct multixact-members block.
249 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
250 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
251 0 : assert!(
252 0 : segno == expected_segno,
253 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
254 : moff,
255 : key
256 : );
257 0 : assert!(
258 0 : blknum == expected_blknum,
259 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
260 : moff,
261 : key
262 : );
263 :
264 0 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
265 0 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
266 0 : flagsval |= member.status << bshift;
267 0 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
268 0 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
269 : }
270 : }
271 : NeonWalRecord::AuxFile { .. } => {
272 : // No-op: this record will never be created in aux v2.
273 0 : warn!("AuxFile record should not be created in aux v2");
274 : }
275 : #[cfg(feature = "testing")]
276 : NeonWalRecord::Test {
277 570 : append,
278 570 : clear,
279 570 : will_init,
280 570 : } => {
281 570 : use bytes::BufMut;
282 570 : if *will_init {
283 66 : assert!(*clear, "init record must be clear to ensure correctness");
284 66 : assert!(
285 66 : page.is_empty(),
286 0 : "init record must be the first entry to ensure correctness"
287 : );
288 504 : }
289 570 : if *clear {
290 66 : page.clear();
291 504 : }
292 570 : page.put_slice(append.as_bytes());
293 : }
294 : }
295 570 : Ok(())
296 570 : }
|