Line data Source code
1 : use anyhow::Context;
2 : use byteorder::{ByteOrder, LittleEndian};
3 : use bytes::BytesMut;
4 : use pageserver_api::key::Key;
5 : use pageserver_api::record::NeonWalRecord;
6 : use pageserver_api::reltag::SlruKind;
7 : use postgres_ffi::pg_constants;
8 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
9 : use postgres_ffi::v14::nonrelfile_utils::{
10 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
11 : transaction_id_set_status,
12 : };
13 : use postgres_ffi::BLCKSZ;
14 : use tracing::*;
15 : use utils::lsn::Lsn;
16 :
17 : /// Can this request be served by neon redo functions
18 : /// or we need to pass it to wal-redo postgres process?
19 582 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
20 582 : // Currently, we don't have bespoken Rust code to replay any
21 582 : // Postgres WAL records. But everything else is handled in neon.
22 582 : #[allow(clippy::match_like_matches_macro)]
23 582 : match rec {
24 : NeonWalRecord::Postgres {
25 : will_init: _,
26 : rec: _,
27 12 : } => false,
28 570 : _ => true,
29 : }
30 582 : }
31 :
32 570 : pub(crate) fn apply_in_neon(
33 570 : record: &NeonWalRecord,
34 570 : lsn: Lsn,
35 570 : key: Key,
36 570 : page: &mut BytesMut,
37 570 : ) -> Result<(), anyhow::Error> {
38 570 : match record {
39 : NeonWalRecord::Postgres {
40 : will_init: _,
41 : rec: _,
42 : } => {
43 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
44 : }
45 : NeonWalRecord::ClearVisibilityMapFlags {
46 0 : new_heap_blkno,
47 0 : old_heap_blkno,
48 0 : flags,
49 : } => {
50 : // sanity check that this is modifying the correct relation
51 0 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
52 0 : assert!(
53 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
54 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
55 : rel
56 : );
57 0 : if let Some(heap_blkno) = *new_heap_blkno {
58 : // Calculate the VM block and offset that corresponds to the heap block.
59 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
60 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
61 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
62 0 :
63 0 : // Check that we're modifying the correct VM block.
64 0 : assert!(map_block == blknum);
65 :
66 : // equivalent to PageGetContents(page)
67 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
68 0 :
69 0 : map[map_byte as usize] &= !(flags << map_offset);
70 0 : // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
71 0 : if !postgres_ffi::page_is_new(page) {
72 0 : postgres_ffi::page_set_lsn(page, lsn);
73 0 : }
74 0 : }
75 :
76 : // Repeat for 'old_heap_blkno', if any
77 0 : if let Some(heap_blkno) = *old_heap_blkno {
78 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
79 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
80 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
81 0 :
82 0 : assert!(map_block == blknum);
83 :
84 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
85 0 :
86 0 : map[map_byte as usize] &= !(flags << map_offset);
87 0 : // The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
88 0 : if !postgres_ffi::page_is_new(page) {
89 0 : postgres_ffi::page_set_lsn(page, lsn);
90 0 : }
91 0 : }
92 : }
93 : // Non-relational WAL records are handled here, with custom code that has the
94 : // same effects as the corresponding Postgres WAL redo function.
95 0 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
96 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
97 0 : assert_eq!(
98 : slru_kind,
99 : SlruKind::Clog,
100 0 : "ClogSetCommitted record with unexpected key {}",
101 : key
102 : );
103 0 : for &xid in xids {
104 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
105 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
106 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
107 0 :
108 0 : // Check that we're modifying the correct CLOG block.
109 0 : assert!(
110 0 : segno == expected_segno,
111 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
112 : xid,
113 : key
114 : );
115 0 : assert!(
116 0 : blknum == expected_blknum,
117 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
118 : xid,
119 : key
120 : );
121 :
122 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
123 : }
124 :
125 : // Append the timestamp
126 0 : if page.len() == BLCKSZ as usize + 8 {
127 0 : page.truncate(BLCKSZ as usize);
128 0 : }
129 0 : if page.len() == BLCKSZ as usize {
130 0 : page.extend_from_slice(×tamp.to_be_bytes());
131 0 : } else {
132 0 : warn!(
133 0 : "CLOG blk {} in seg {} has invalid size {}",
134 0 : blknum,
135 0 : segno,
136 0 : page.len()
137 : );
138 : }
139 : }
140 0 : NeonWalRecord::ClogSetAborted { xids } => {
141 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
142 0 : assert_eq!(
143 : slru_kind,
144 : SlruKind::Clog,
145 0 : "ClogSetAborted record with unexpected key {}",
146 : key
147 : );
148 0 : for &xid in xids {
149 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
150 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
151 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
152 0 :
153 0 : // Check that we're modifying the correct CLOG block.
154 0 : assert!(
155 0 : segno == expected_segno,
156 0 : "ClogSetAborted record for XID {} with unexpected key {}",
157 : xid,
158 : key
159 : );
160 0 : assert!(
161 0 : blknum == expected_blknum,
162 0 : "ClogSetAborted record for XID {} with unexpected key {}",
163 : xid,
164 : key
165 : );
166 :
167 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
168 : }
169 : }
170 0 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
171 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
172 0 : assert_eq!(
173 : slru_kind,
174 : SlruKind::MultiXactOffsets,
175 0 : "MultixactOffsetCreate record with unexpected key {}",
176 : key
177 : );
178 : // Compute the block and offset to modify.
179 : // See RecordNewMultiXact in PostgreSQL sources.
180 0 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
181 0 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
182 0 : let offset = (entryno * 4) as usize;
183 0 :
184 0 : // Check that we're modifying the correct multixact-offsets block.
185 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
186 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
187 0 : assert!(
188 0 : segno == expected_segno,
189 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
190 : mid,
191 : key
192 : );
193 0 : assert!(
194 0 : blknum == expected_blknum,
195 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
196 : mid,
197 : key
198 : );
199 :
200 0 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
201 : }
202 0 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
203 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
204 0 : assert_eq!(
205 : slru_kind,
206 : SlruKind::MultiXactMembers,
207 0 : "MultixactMembersCreate record with unexpected key {}",
208 : key
209 : );
210 0 : for (i, member) in members.iter().enumerate() {
211 0 : let offset = moff + i as u32;
212 0 :
213 0 : // Compute the block and offset to modify.
214 0 : // See RecordNewMultiXact in PostgreSQL sources.
215 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
216 0 : let memberoff = mx_offset_to_member_offset(offset);
217 0 : let flagsoff = mx_offset_to_flags_offset(offset);
218 0 : let bshift = mx_offset_to_flags_bitshift(offset);
219 0 :
220 0 : // Check that we're modifying the correct multixact-members block.
221 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
222 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
223 0 : assert!(
224 0 : segno == expected_segno,
225 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
226 : moff,
227 : key
228 : );
229 0 : assert!(
230 0 : blknum == expected_blknum,
231 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
232 : moff,
233 : key
234 : );
235 :
236 0 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
237 0 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
238 0 : flagsval |= member.status << bshift;
239 0 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
240 0 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
241 : }
242 : }
243 : NeonWalRecord::AuxFile { .. } => {
244 : // No-op: this record will never be created in aux v2.
245 0 : warn!("AuxFile record should not be created in aux v2");
246 : }
247 : #[cfg(feature = "testing")]
248 : NeonWalRecord::Test {
249 570 : append,
250 570 : clear,
251 570 : will_init,
252 570 : } => {
253 570 : use bytes::BufMut;
254 570 : if *will_init {
255 66 : assert!(*clear, "init record must be clear to ensure correctness");
256 66 : assert!(
257 66 : page.is_empty(),
258 0 : "init record must be the first entry to ensure correctness"
259 : );
260 504 : }
261 570 : if *clear {
262 66 : page.clear();
263 504 : }
264 570 : page.put_slice(append.as_bytes());
265 : }
266 : }
267 570 : Ok(())
268 570 : }
|