Line data Source code
1 : use crate::pgdatadir_mapping::AuxFilesDirectory;
2 : use crate::walrecord::NeonWalRecord;
3 : use anyhow::Context;
4 : use byteorder::{ByteOrder, LittleEndian};
5 : use bytes::{BufMut, BytesMut};
6 : use pageserver_api::key::Key;
7 : use pageserver_api::reltag::SlruKind;
8 : use postgres_ffi::pg_constants;
9 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
10 : use postgres_ffi::v14::nonrelfile_utils::{
11 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
12 : transaction_id_set_status,
13 : };
14 : use postgres_ffi::BLCKSZ;
15 : use tracing::*;
16 : use utils::bin_ser::BeSer;
17 : use utils::lsn::Lsn;
18 :
19 : /// Can this request be served by neon redo functions
20 : /// or we need to pass it to wal-redo postgres process?
21 1362 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
22 1362 : // Currently, we don't have bespoken Rust code to replay any
23 1362 : // Postgres WAL records. But everything else is handled in neon.
24 1362 : #[allow(clippy::match_like_matches_macro)]
25 1362 : match rec {
26 : NeonWalRecord::Postgres {
27 : will_init: _,
28 : rec: _,
29 36 : } => false,
30 1326 : _ => true,
31 : }
32 1362 : }
33 :
34 1344 : pub(crate) fn apply_in_neon(
35 1344 : record: &NeonWalRecord,
36 1344 : lsn: Lsn,
37 1344 : key: Key,
38 1344 : page: &mut BytesMut,
39 1344 : ) -> Result<(), anyhow::Error> {
40 1344 : match record {
41 : NeonWalRecord::Postgres {
42 : will_init: _,
43 : rec: _,
44 : } => {
45 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
46 : }
47 : NeonWalRecord::ClearVisibilityMapFlags {
48 0 : new_heap_blkno,
49 0 : old_heap_blkno,
50 0 : flags,
51 : } => {
52 : // sanity check that this is modifying the correct relation
53 0 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
54 0 : assert!(
55 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
56 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
57 : rel
58 : );
59 0 : if let Some(heap_blkno) = *new_heap_blkno {
60 : // Calculate the VM block and offset that corresponds to the heap block.
61 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
62 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
63 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
64 0 :
65 0 : // Check that we're modifying the correct VM block.
66 0 : assert!(map_block == blknum);
67 :
68 : // equivalent to PageGetContents(page)
69 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
70 0 :
71 0 : map[map_byte as usize] &= !(flags << map_offset);
72 0 : postgres_ffi::page_set_lsn(page, lsn);
73 0 : }
74 :
75 : // Repeat for 'old_heap_blkno', if any
76 0 : if let Some(heap_blkno) = *old_heap_blkno {
77 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
78 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
79 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
80 0 :
81 0 : assert!(map_block == blknum);
82 :
83 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
84 0 :
85 0 : map[map_byte as usize] &= !(flags << map_offset);
86 0 : postgres_ffi::page_set_lsn(page, lsn);
87 0 : }
88 : }
89 : // Non-relational WAL records are handled here, with custom code that has the
90 : // same effects as the corresponding Postgres WAL redo function.
91 0 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
92 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
93 0 : assert_eq!(
94 : slru_kind,
95 : SlruKind::Clog,
96 0 : "ClogSetCommitted record with unexpected key {}",
97 : key
98 : );
99 0 : for &xid in xids {
100 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
101 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
102 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
103 0 :
104 0 : // Check that we're modifying the correct CLOG block.
105 0 : assert!(
106 0 : segno == expected_segno,
107 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
108 : xid,
109 : key
110 : );
111 0 : assert!(
112 0 : blknum == expected_blknum,
113 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
114 : xid,
115 : key
116 : );
117 :
118 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
119 : }
120 :
121 : // Append the timestamp
122 0 : if page.len() == BLCKSZ as usize + 8 {
123 0 : page.truncate(BLCKSZ as usize);
124 0 : }
125 0 : if page.len() == BLCKSZ as usize {
126 0 : page.extend_from_slice(×tamp.to_be_bytes());
127 0 : } else {
128 0 : warn!(
129 0 : "CLOG blk {} in seg {} has invalid size {}",
130 0 : blknum,
131 0 : segno,
132 0 : page.len()
133 : );
134 : }
135 : }
136 0 : NeonWalRecord::ClogSetAborted { xids } => {
137 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
138 0 : assert_eq!(
139 : slru_kind,
140 : SlruKind::Clog,
141 0 : "ClogSetAborted record with unexpected key {}",
142 : key
143 : );
144 0 : for &xid in xids {
145 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
146 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
147 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
148 0 :
149 0 : // Check that we're modifying the correct CLOG block.
150 0 : assert!(
151 0 : segno == expected_segno,
152 0 : "ClogSetAborted record for XID {} with unexpected key {}",
153 : xid,
154 : key
155 : );
156 0 : assert!(
157 0 : blknum == expected_blknum,
158 0 : "ClogSetAborted record for XID {} with unexpected key {}",
159 : xid,
160 : key
161 : );
162 :
163 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
164 : }
165 : }
166 0 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
167 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
168 0 : assert_eq!(
169 : slru_kind,
170 : SlruKind::MultiXactOffsets,
171 0 : "MultixactOffsetCreate record with unexpected key {}",
172 : key
173 : );
174 : // Compute the block and offset to modify.
175 : // See RecordNewMultiXact in PostgreSQL sources.
176 0 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
177 0 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
178 0 : let offset = (entryno * 4) as usize;
179 0 :
180 0 : // Check that we're modifying the correct multixact-offsets block.
181 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
182 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
183 0 : assert!(
184 0 : segno == expected_segno,
185 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
186 : mid,
187 : key
188 : );
189 0 : assert!(
190 0 : blknum == expected_blknum,
191 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
192 : mid,
193 : key
194 : );
195 :
196 0 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
197 : }
198 0 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
199 0 : let (slru_kind, segno, blknum) = key.to_slru_block().context("invalid record")?;
200 0 : assert_eq!(
201 : slru_kind,
202 : SlruKind::MultiXactMembers,
203 0 : "MultixactMembersCreate record with unexpected key {}",
204 : key
205 : );
206 0 : for (i, member) in members.iter().enumerate() {
207 0 : let offset = moff + i as u32;
208 0 :
209 0 : // Compute the block and offset to modify.
210 0 : // See RecordNewMultiXact in PostgreSQL sources.
211 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
212 0 : let memberoff = mx_offset_to_member_offset(offset);
213 0 : let flagsoff = mx_offset_to_flags_offset(offset);
214 0 : let bshift = mx_offset_to_flags_bitshift(offset);
215 0 :
216 0 : // Check that we're modifying the correct multixact-members block.
217 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
218 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
219 0 : assert!(
220 0 : segno == expected_segno,
221 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
222 : moff,
223 : key
224 : );
225 0 : assert!(
226 0 : blknum == expected_blknum,
227 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
228 : moff,
229 : key
230 : );
231 :
232 0 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
233 0 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
234 0 : flagsval |= member.status << bshift;
235 0 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
236 0 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
237 : }
238 : }
239 30 : NeonWalRecord::AuxFile { file_path, content } => {
240 30 : let mut dir = AuxFilesDirectory::des(page)?;
241 30 : dir.upsert(file_path.clone(), content.clone());
242 30 :
243 30 : page.clear();
244 30 : let mut writer = page.writer();
245 30 : dir.ser_into(&mut writer)?;
246 : }
247 : #[cfg(test)]
248 : NeonWalRecord::Test {
249 1314 : append,
250 1314 : clear,
251 1314 : will_init,
252 1314 : } => {
253 1314 : if *will_init {
254 0 : assert!(*clear, "init record must be clear to ensure correctness");
255 1314 : }
256 1314 : if *clear {
257 0 : page.clear();
258 1314 : }
259 1314 : page.put_slice(append.as_bytes());
260 : }
261 : }
262 1344 : Ok(())
263 1344 : }
264 :
265 : #[cfg(test)]
266 : mod test {
267 : use bytes::Bytes;
268 : use pageserver_api::key::AUX_FILES_KEY;
269 :
270 : use super::*;
271 : use std::collections::HashMap;
272 :
273 : /// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
274 : #[test]
275 6 : fn apply_aux_file_deltas() -> anyhow::Result<()> {
276 6 : let base_dir = AuxFilesDirectory {
277 6 : files: HashMap::from([
278 6 : ("two".to_string(), Bytes::from_static(b"content0")),
279 6 : ("three".to_string(), Bytes::from_static(b"contentX")),
280 6 : ]),
281 6 : };
282 6 : let base_image = AuxFilesDirectory::ser(&base_dir)?;
283 :
284 6 : let deltas = vec![
285 6 : // Insert
286 6 : NeonWalRecord::AuxFile {
287 6 : file_path: "one".to_string(),
288 6 : content: Some(Bytes::from_static(b"content1")),
289 6 : },
290 6 : // Update
291 6 : NeonWalRecord::AuxFile {
292 6 : file_path: "two".to_string(),
293 6 : content: Some(Bytes::from_static(b"content99")),
294 6 : },
295 6 : // Delete
296 6 : NeonWalRecord::AuxFile {
297 6 : file_path: "three".to_string(),
298 6 : content: None,
299 6 : },
300 6 : ];
301 6 :
302 6 : let file_path = AUX_FILES_KEY;
303 6 : let mut page = BytesMut::from_iter(base_image);
304 :
305 24 : for record in deltas {
306 18 : apply_in_neon(&record, Lsn(8), file_path, &mut page)?;
307 : }
308 :
309 6 : let reconstructed = AuxFilesDirectory::des(&page)?;
310 6 : let expect = HashMap::from([
311 6 : ("one".to_string(), Bytes::from_static(b"content1")),
312 6 : ("two".to_string(), Bytes::from_static(b"content99")),
313 6 : ]);
314 6 :
315 6 : assert_eq!(reconstructed.files, expect);
316 :
317 6 : Ok(())
318 6 : }
319 : }
|