Line data Source code
1 : use crate::walrecord::NeonWalRecord;
2 : use anyhow::Context;
3 : use byteorder::{ByteOrder, LittleEndian};
4 : use bytes::BytesMut;
5 : use pageserver_api::key::Key;
6 : use pageserver_api::reltag::SlruKind;
7 : use postgres_ffi::pg_constants;
8 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
9 : use postgres_ffi::v14::nonrelfile_utils::{
10 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
11 : transaction_id_set_status,
12 : };
13 : use postgres_ffi::BLCKSZ;
14 : use tracing::*;
15 : use utils::lsn::Lsn;
16 :
17 : /// Can this request be served by neon redo functions
18 : /// or we need to pass it to wal-redo postgres process?
19 470 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
20 470 : // Currently, we don't have bespoken Rust code to replay any
21 470 : // Postgres WAL records. But everything else is handled in neon.
22 470 : #[allow(clippy::match_like_matches_macro)]
23 470 : match rec {
24 : NeonWalRecord::Postgres {
25 : will_init: _,
26 : rec: _,
27 12 : } => false,
28 458 : _ => true,
29 : }
30 470 : }
31 :
32 458 : pub(crate) fn apply_in_neon(
33 458 : record: &NeonWalRecord,
34 458 : lsn: Lsn,
35 458 : key: Key,
36 458 : page: &mut BytesMut,
37 458 : ) -> Result<(), anyhow::Error> {
38 458 : match record {
39 : NeonWalRecord::Postgres {
40 : will_init: _,
41 : rec: _,
42 : } => {
43 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
44 : }
45 : NeonWalRecord::ClearVisibilityMapFlags {
46 0 : new_heap_blkno,
47 0 : old_heap_blkno,
48 0 : flags,
49 : } => {
50 : // sanity check that this is modifying the correct relation
51 0 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
52 0 : assert!(
53 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
54 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
55 : rel
56 : );
57 0 : if let Some(heap_blkno) = *new_heap_blkno {
58 : // Calculate the VM block and offset that corresponds to the heap block.
59 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
60 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
61 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
62 0 :
63 0 : // Check that we're modifying the correct VM block.
64 0 : assert!(map_block == blknum);
65 :
66 : // equivalent to PageGetContents(page)
67 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
68 0 :
69 0 : map[map_byte as usize] &= !(flags << map_offset);
70 0 : postgres_ffi::page_set_lsn(page, lsn);
71 0 : }
72 :
73 : // Repeat for 'old_heap_blkno', if any
74 0 : if let Some(heap_blkno) = *old_heap_blkno {
75 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
76 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
77 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
78 0 :
79 0 : assert!(map_block == blknum);
80 :
81 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
82 0 :
83 0 : map[map_byte as usize] &= !(flags << map_offset);
84 0 : postgres_ffi::page_set_lsn(page, lsn);
85 0 : }
86 : }
87 : // Non-relational WAL records are handled here, with custom code that has the
88 : // same effects as the corresponding Postgres WAL redo function.
89 0 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
90 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
91 0 : assert_eq!(
92 : slru_kind,
93 : SlruKind::Clog,
94 0 : "ClogSetCommitted record with unexpected key {}",
95 : key
96 : );
97 0 : for &xid in xids {
98 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
99 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
100 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
101 0 :
102 0 : // Check that we're modifying the correct CLOG block.
103 0 : assert!(
104 0 : segno == expected_segno,
105 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
106 : xid,
107 : key
108 : );
109 0 : assert!(
110 0 : blknum == expected_blknum,
111 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
112 : xid,
113 : key
114 : );
115 :
116 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
117 : }
118 :
119 : // Append the timestamp
120 0 : if page.len() == BLCKSZ as usize + 8 {
121 0 : page.truncate(BLCKSZ as usize);
122 0 : }
123 0 : if page.len() == BLCKSZ as usize {
124 0 : page.extend_from_slice(×tamp.to_be_bytes());
125 0 : } else {
126 0 : warn!(
127 0 : "CLOG blk {} in seg {} has invalid size {}",
128 0 : blknum,
129 0 : segno,
130 0 : page.len()
131 : );
132 : }
133 : }
134 0 : NeonWalRecord::ClogSetAborted { xids } => {
135 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
136 0 : assert_eq!(
137 : slru_kind,
138 : SlruKind::Clog,
139 0 : "ClogSetAborted record with unexpected key {}",
140 : key
141 : );
142 0 : for &xid in xids {
143 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
144 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
145 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
146 0 :
147 0 : // Check that we're modifying the correct CLOG block.
148 0 : assert!(
149 0 : segno == expected_segno,
150 0 : "ClogSetAborted record for XID {} with unexpected key {}",
151 : xid,
152 : key
153 : );
154 0 : assert!(
155 0 : blknum == expected_blknum,
156 0 : "ClogSetAborted record for XID {} with unexpected key {}",
157 : xid,
158 : key
159 : );
160 :
161 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
162 : }
163 : }
164 0 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
165 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
166 0 : assert_eq!(
167 : slru_kind,
168 : SlruKind::MultiXactOffsets,
169 0 : "MultixactOffsetCreate record with unexpected key {}",
170 : key
171 : );
172 : // Compute the block and offset to modify.
173 : // See RecordNewMultiXact in PostgreSQL sources.
174 0 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
175 0 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
176 0 : let offset = (entryno * 4) as usize;
177 0 :
178 0 : // Check that we're modifying the correct multixact-offsets block.
179 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
180 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
181 0 : assert!(
182 0 : segno == expected_segno,
183 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
184 : mid,
185 : key
186 : );
187 0 : assert!(
188 0 : blknum == expected_blknum,
189 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
190 : mid,
191 : key
192 : );
193 :
194 0 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
195 : }
196 0 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
197 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
198 0 : assert_eq!(
199 : slru_kind,
200 : SlruKind::MultiXactMembers,
201 0 : "MultixactMembersCreate record with unexpected key {}",
202 : key
203 : );
204 0 : for (i, member) in members.iter().enumerate() {
205 0 : let offset = moff + i as u32;
206 0 :
207 0 : // Compute the block and offset to modify.
208 0 : // See RecordNewMultiXact in PostgreSQL sources.
209 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
210 0 : let memberoff = mx_offset_to_member_offset(offset);
211 0 : let flagsoff = mx_offset_to_flags_offset(offset);
212 0 : let bshift = mx_offset_to_flags_bitshift(offset);
213 0 :
214 0 : // Check that we're modifying the correct multixact-members block.
215 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
216 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
217 0 : assert!(
218 0 : segno == expected_segno,
219 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
220 : moff,
221 : key
222 : );
223 0 : assert!(
224 0 : blknum == expected_blknum,
225 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
226 : moff,
227 : key
228 : );
229 :
230 0 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
231 0 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
232 0 : flagsval |= member.status << bshift;
233 0 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
234 0 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
235 : }
236 : }
237 : NeonWalRecord::AuxFile { .. } => {
238 : // No-op: this record will never be created in aux v2.
239 0 : warn!("AuxFile record should not be created in aux v2");
240 : }
241 : #[cfg(test)]
242 : NeonWalRecord::Test {
243 458 : append,
244 458 : clear,
245 458 : will_init,
246 458 : } => {
247 458 : use bytes::BufMut;
248 458 : if *will_init {
249 4 : assert!(*clear, "init record must be clear to ensure correctness");
250 454 : }
251 458 : if *clear {
252 4 : page.clear();
253 454 : }
254 458 : page.put_slice(append.as_bytes());
255 : }
256 : }
257 458 : Ok(())
258 458 : }
|