Line data Source code
1 : use anyhow::Context;
2 : use byteorder::{ByteOrder, LittleEndian};
3 : use bytes::BytesMut;
4 : use pageserver_api::key::Key;
5 : use pageserver_api::record::NeonWalRecord;
6 : use pageserver_api::reltag::SlruKind;
7 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
8 : use postgres_ffi::v14::nonrelfile_utils::{
9 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
10 : transaction_id_set_status,
11 : };
12 : use postgres_ffi::{BLCKSZ, pg_constants};
13 : use tracing::*;
14 : use utils::lsn::Lsn;
15 :
16 : /// Can this request be served by neon redo functions
17 : /// or we need to pass it to wal-redo postgres process?
18 2496 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
19 2496 : // Currently, we don't have bespoken Rust code to replay any
20 2496 : // Postgres WAL records. But everything else is handled in neon.
21 2496 : #[allow(clippy::match_like_matches_macro)]
22 2496 : match rec {
23 : NeonWalRecord::Postgres {
24 : will_init: _,
25 : rec: _,
26 24 : } => false,
27 2472 : _ => true,
28 : }
29 2496 : }
30 :
31 2472 : pub(crate) fn apply_in_neon(
32 2472 : record: &NeonWalRecord,
33 2472 : lsn: Lsn,
34 2472 : key: Key,
35 2472 : page: &mut BytesMut,
36 2472 : ) -> Result<(), anyhow::Error> {
37 2472 : match record {
38 : NeonWalRecord::Postgres {
39 : will_init: _,
40 : rec: _,
41 : } => {
42 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
43 : }
44 : //
45 : // Code copied from PostgreSQL `visibilitymap_prepare_truncate` function in `visibilitymap.c`
46 : //
47 : NeonWalRecord::TruncateVisibilityMap {
48 0 : trunc_byte,
49 0 : trunc_offs,
50 : } => {
51 : // sanity check that this is modifying the correct relation
52 0 : let (rel, _) = key.to_rel_block().context("invalid record")?;
53 0 : assert!(
54 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
55 0 : "TruncateVisibilityMap record on unexpected rel {}",
56 : rel
57 : );
58 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
59 0 : map[*trunc_byte + 1..].fill(0u8);
60 0 : /*----
61 0 : * Mask out the unwanted bits of the last remaining byte.
62 0 : *
63 0 : * ((1 << 0) - 1) = 00000000
64 0 : * ((1 << 1) - 1) = 00000001
65 0 : * ...
66 0 : * ((1 << 6) - 1) = 00111111
67 0 : * ((1 << 7) - 1) = 01111111
68 0 : *----
69 0 : */
70 0 : map[*trunc_byte] &= (1 << *trunc_offs) - 1;
71 : }
72 : NeonWalRecord::ClearVisibilityMapFlags {
73 0 : new_heap_blkno,
74 0 : old_heap_blkno,
75 0 : flags,
76 : } => {
77 : // sanity check that this is modifying the correct relation
78 0 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
79 0 : assert!(
80 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
81 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
82 : rel
83 : );
84 0 : if let Some(heap_blkno) = *new_heap_blkno {
85 : // Calculate the VM block and offset that corresponds to the heap block.
86 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
87 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
88 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
89 0 :
90 0 : // Check that we're modifying the correct VM block.
91 0 : assert!(map_block == blknum);
92 :
93 : // equivalent to PageGetContents(page)
94 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
95 0 :
96 0 : map[map_byte as usize] &= !(flags << map_offset);
97 0 : // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
98 0 : if !postgres_ffi::page_is_new(page) {
99 0 : postgres_ffi::page_set_lsn(page, lsn);
100 0 : }
101 0 : }
102 :
103 : // Repeat for 'old_heap_blkno', if any
104 0 : if let Some(heap_blkno) = *old_heap_blkno {
105 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
106 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
107 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
108 0 :
109 0 : assert!(map_block == blknum);
110 :
111 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
112 0 :
113 0 : map[map_byte as usize] &= !(flags << map_offset);
114 0 : // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
115 0 : if !postgres_ffi::page_is_new(page) {
116 0 : postgres_ffi::page_set_lsn(page, lsn);
117 0 : }
118 0 : }
119 : }
120 : // Non-relational WAL records are handled here, with custom code that has the
121 : // same effects as the corresponding Postgres WAL redo function.
122 0 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
123 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
124 0 : assert_eq!(
125 : slru_kind,
126 : SlruKind::Clog,
127 0 : "ClogSetCommitted record with unexpected key {}",
128 : key
129 : );
130 0 : for &xid in xids {
131 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
132 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
133 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
134 0 :
135 0 : // Check that we're modifying the correct CLOG block.
136 0 : assert!(
137 0 : segno == expected_segno,
138 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
139 : xid,
140 : key
141 : );
142 0 : assert!(
143 0 : blknum == expected_blknum,
144 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
145 : xid,
146 : key
147 : );
148 :
149 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
150 : }
151 :
152 : // Append the timestamp
153 0 : if page.len() == BLCKSZ as usize + 8 {
154 0 : page.truncate(BLCKSZ as usize);
155 0 : }
156 0 : if page.len() == BLCKSZ as usize {
157 0 : page.extend_from_slice(×tamp.to_be_bytes());
158 0 : } else {
159 0 : warn!(
160 0 : "CLOG blk {} in seg {} has invalid size {}",
161 0 : blknum,
162 0 : segno,
163 0 : page.len()
164 : );
165 : }
166 : }
167 0 : NeonWalRecord::ClogSetAborted { xids } => {
168 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
169 0 : assert_eq!(
170 : slru_kind,
171 : SlruKind::Clog,
172 0 : "ClogSetAborted record with unexpected key {}",
173 : key
174 : );
175 0 : for &xid in xids {
176 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
177 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
178 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
179 0 :
180 0 : // Check that we're modifying the correct CLOG block.
181 0 : assert!(
182 0 : segno == expected_segno,
183 0 : "ClogSetAborted record for XID {} with unexpected key {}",
184 : xid,
185 : key
186 : );
187 0 : assert!(
188 0 : blknum == expected_blknum,
189 0 : "ClogSetAborted record for XID {} with unexpected key {}",
190 : xid,
191 : key
192 : );
193 :
194 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
195 : }
196 : }
197 0 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
198 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
199 0 : assert_eq!(
200 : slru_kind,
201 : SlruKind::MultiXactOffsets,
202 0 : "MultixactOffsetCreate record with unexpected key {}",
203 : key
204 : );
205 : // Compute the block and offset to modify.
206 : // See RecordNewMultiXact in PostgreSQL sources.
207 0 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
208 0 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
209 0 : let offset = (entryno * 4) as usize;
210 0 :
211 0 : // Check that we're modifying the correct multixact-offsets block.
212 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
213 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
214 0 : assert!(
215 0 : segno == expected_segno,
216 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
217 : mid,
218 : key
219 : );
220 0 : assert!(
221 0 : blknum == expected_blknum,
222 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
223 : mid,
224 : key
225 : );
226 :
227 0 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
228 : }
229 0 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
230 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
231 0 : assert_eq!(
232 : slru_kind,
233 : SlruKind::MultiXactMembers,
234 0 : "MultixactMembersCreate record with unexpected key {}",
235 : key
236 : );
237 0 : for (i, member) in members.iter().enumerate() {
238 0 : let offset = moff + i as u32;
239 0 :
240 0 : // Compute the block and offset to modify.
241 0 : // See RecordNewMultiXact in PostgreSQL sources.
242 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
243 0 : let memberoff = mx_offset_to_member_offset(offset);
244 0 : let flagsoff = mx_offset_to_flags_offset(offset);
245 0 : let bshift = mx_offset_to_flags_bitshift(offset);
246 0 :
247 0 : // Check that we're modifying the correct multixact-members block.
248 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
249 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
250 0 : assert!(
251 0 : segno == expected_segno,
252 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
253 : moff,
254 : key
255 : );
256 0 : assert!(
257 0 : blknum == expected_blknum,
258 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
259 : moff,
260 : key
261 : );
262 :
263 0 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
264 0 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
265 0 : flagsval |= member.status << bshift;
266 0 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
267 0 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
268 : }
269 : }
270 : NeonWalRecord::AuxFile { .. } => {
271 : // No-op: this record will never be created in aux v2.
272 0 : warn!("AuxFile record should not be created in aux v2");
273 : }
274 : #[cfg(feature = "testing")]
275 : NeonWalRecord::Test {
276 2472 : append,
277 2472 : clear,
278 2472 : will_init,
279 2472 : } => {
280 2472 : use bytes::BufMut;
281 2472 : if *will_init {
282 140 : assert!(*clear, "init record must be clear to ensure correctness");
283 140 : assert!(
284 140 : page.is_empty(),
285 0 : "init record must be the first entry to ensure correctness"
286 : );
287 2332 : }
288 2472 : if *clear {
289 140 : page.clear();
290 2332 : }
291 2472 : page.put_slice(append.as_bytes());
292 : }
293 : }
294 2472 : Ok(())
295 2472 : }
|