Line data Source code
1 : use crate::pgdatadir_mapping::AuxFilesDirectory;
2 : use crate::walrecord::NeonWalRecord;
3 : use anyhow::Context;
4 : use byteorder::{ByteOrder, LittleEndian};
5 : use bytes::{BufMut, BytesMut};
6 : use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
7 : use pageserver_api::reltag::SlruKind;
8 : use postgres_ffi::pg_constants;
9 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
10 : use postgres_ffi::v14::nonrelfile_utils::{
11 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
12 : transaction_id_set_status,
13 : };
14 : use postgres_ffi::BLCKSZ;
15 : use tracing::*;
16 : use utils::bin_ser::BeSer;
17 :
18 : /// Can this request be served by neon redo functions
19 : /// or we need to pass it to wal-redo postgres process?
20 22 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
21 22 : // Currently, we don't have bespoken Rust code to replay any
22 22 : // Postgres WAL records. But everything else is handled in neon.
23 22 : #[allow(clippy::match_like_matches_macro)]
24 22 : match rec {
25 : NeonWalRecord::Postgres {
26 : will_init: _,
27 : rec: _,
28 12 : } => false,
29 10 : _ => true,
30 : }
31 22 : }
32 :
33 16 : pub(crate) fn apply_in_neon(
34 16 : record: &NeonWalRecord,
35 16 : key: Key,
36 16 : page: &mut BytesMut,
37 16 : ) -> Result<(), anyhow::Error> {
38 16 : match record {
39 : NeonWalRecord::Postgres {
40 : will_init: _,
41 : rec: _,
42 : } => {
43 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
44 : }
45 : NeonWalRecord::ClearVisibilityMapFlags {
46 0 : new_heap_blkno,
47 0 : old_heap_blkno,
48 0 : flags,
49 : } => {
50 : // sanity check that this is modifying the correct relation
51 0 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
52 0 : assert!(
53 0 : rel.forknum == VISIBILITYMAP_FORKNUM,
54 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
55 : rel
56 : );
57 0 : if let Some(heap_blkno) = *new_heap_blkno {
58 : // Calculate the VM block and offset that corresponds to the heap block.
59 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
60 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
61 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
62 0 :
63 0 : // Check that we're modifying the correct VM block.
64 0 : assert!(map_block == blknum);
65 :
66 : // equivalent to PageGetContents(page)
67 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
68 0 :
69 0 : map[map_byte as usize] &= !(flags << map_offset);
70 0 : }
71 :
72 : // Repeat for 'old_heap_blkno', if any
73 0 : if let Some(heap_blkno) = *old_heap_blkno {
74 0 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
75 0 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
76 0 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
77 0 :
78 0 : assert!(map_block == blknum);
79 :
80 0 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
81 0 :
82 0 : map[map_byte as usize] &= !(flags << map_offset);
83 0 : }
84 : }
85 : // Non-relational WAL records are handled here, with custom code that has the
86 : // same effects as the corresponding Postgres WAL redo function.
87 0 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
88 0 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
89 0 : assert_eq!(
90 : slru_kind,
91 : SlruKind::Clog,
92 0 : "ClogSetCommitted record with unexpected key {}",
93 : key
94 : );
95 0 : for &xid in xids {
96 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
97 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
98 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
99 0 :
100 0 : // Check that we're modifying the correct CLOG block.
101 0 : assert!(
102 0 : segno == expected_segno,
103 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
104 : xid,
105 : key
106 : );
107 0 : assert!(
108 0 : blknum == expected_blknum,
109 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
110 : xid,
111 : key
112 : );
113 :
114 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
115 : }
116 :
117 : // Append the timestamp
118 0 : if page.len() == BLCKSZ as usize + 8 {
119 0 : page.truncate(BLCKSZ as usize);
120 0 : }
121 0 : if page.len() == BLCKSZ as usize {
122 0 : page.extend_from_slice(×tamp.to_be_bytes());
123 0 : } else {
124 0 : warn!(
125 0 : "CLOG blk {} in seg {} has invalid size {}",
126 0 : blknum,
127 0 : segno,
128 0 : page.len()
129 0 : );
130 : }
131 : }
132 0 : NeonWalRecord::ClogSetAborted { xids } => {
133 0 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
134 0 : assert_eq!(
135 : slru_kind,
136 : SlruKind::Clog,
137 0 : "ClogSetAborted record with unexpected key {}",
138 : key
139 : );
140 0 : for &xid in xids {
141 0 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
142 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
143 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
144 0 :
145 0 : // Check that we're modifying the correct CLOG block.
146 0 : assert!(
147 0 : segno == expected_segno,
148 0 : "ClogSetAborted record for XID {} with unexpected key {}",
149 : xid,
150 : key
151 : );
152 0 : assert!(
153 0 : blknum == expected_blknum,
154 0 : "ClogSetAborted record for XID {} with unexpected key {}",
155 : xid,
156 : key
157 : );
158 :
159 0 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
160 : }
161 : }
162 0 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
163 0 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
164 0 : assert_eq!(
165 : slru_kind,
166 : SlruKind::MultiXactOffsets,
167 0 : "MultixactOffsetCreate record with unexpected key {}",
168 : key
169 : );
170 : // Compute the block and offset to modify.
171 : // See RecordNewMultiXact in PostgreSQL sources.
172 0 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
173 0 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
174 0 : let offset = (entryno * 4) as usize;
175 0 :
176 0 : // Check that we're modifying the correct multixact-offsets block.
177 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
178 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
179 0 : assert!(
180 0 : segno == expected_segno,
181 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
182 : mid,
183 : key
184 : );
185 0 : assert!(
186 0 : blknum == expected_blknum,
187 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
188 : mid,
189 : key
190 : );
191 :
192 0 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
193 : }
194 0 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
195 0 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
196 0 : assert_eq!(
197 : slru_kind,
198 : SlruKind::MultiXactMembers,
199 0 : "MultixactMembersCreate record with unexpected key {}",
200 : key
201 : );
202 0 : for (i, member) in members.iter().enumerate() {
203 0 : let offset = moff + i as u32;
204 0 :
205 0 : // Compute the block and offset to modify.
206 0 : // See RecordNewMultiXact in PostgreSQL sources.
207 0 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
208 0 : let memberoff = mx_offset_to_member_offset(offset);
209 0 : let flagsoff = mx_offset_to_flags_offset(offset);
210 0 : let bshift = mx_offset_to_flags_bitshift(offset);
211 0 :
212 0 : // Check that we're modifying the correct multixact-members block.
213 0 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
214 0 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
215 0 : assert!(
216 0 : segno == expected_segno,
217 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
218 : moff,
219 : key
220 : );
221 0 : assert!(
222 0 : blknum == expected_blknum,
223 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
224 : moff,
225 : key
226 : );
227 :
228 0 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
229 0 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
230 0 : flagsval |= member.status << bshift;
231 0 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
232 0 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
233 : }
234 : }
235 16 : NeonWalRecord::AuxFile { file_path, content } => {
236 16 : let mut dir = AuxFilesDirectory::des(page)?;
237 16 : dir.upsert(file_path.clone(), content.clone());
238 16 :
239 16 : page.clear();
240 16 : let mut writer = page.writer();
241 16 : dir.ser_into(&mut writer)?;
242 : }
243 : }
244 16 : Ok(())
245 16 : }
246 :
247 : #[cfg(test)]
248 : mod test {
249 : use bytes::Bytes;
250 : use pageserver_api::key::AUX_FILES_KEY;
251 :
252 : use super::*;
253 : use std::collections::HashMap;
254 :
255 : use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
256 :
257 : /// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
258 2 : #[test]
259 2 : fn apply_aux_file_deltas() -> anyhow::Result<()> {
260 2 : let base_dir = AuxFilesDirectory {
261 2 : files: HashMap::from([
262 2 : ("two".to_string(), Bytes::from_static(b"content0")),
263 2 : ("three".to_string(), Bytes::from_static(b"contentX")),
264 2 : ]),
265 2 : };
266 2 : let base_image = AuxFilesDirectory::ser(&base_dir)?;
267 :
268 2 : let deltas = vec![
269 2 : // Insert
270 2 : NeonWalRecord::AuxFile {
271 2 : file_path: "one".to_string(),
272 2 : content: Some(Bytes::from_static(b"content1")),
273 2 : },
274 2 : // Update
275 2 : NeonWalRecord::AuxFile {
276 2 : file_path: "two".to_string(),
277 2 : content: Some(Bytes::from_static(b"content99")),
278 2 : },
279 2 : // Delete
280 2 : NeonWalRecord::AuxFile {
281 2 : file_path: "three".to_string(),
282 2 : content: None,
283 2 : },
284 2 : ];
285 2 :
286 2 : let file_path = AUX_FILES_KEY;
287 2 : let mut page = BytesMut::from_iter(base_image);
288 :
289 8 : for record in deltas {
290 6 : apply_in_neon(&record, file_path, &mut page)?;
291 : }
292 :
293 2 : let reconstructed = AuxFilesDirectory::des(&page)?;
294 2 : let expect = HashMap::from([
295 2 : ("one".to_string(), Bytes::from_static(b"content1")),
296 2 : ("two".to_string(), Bytes::from_static(b"content99")),
297 2 : ]);
298 2 :
299 2 : assert_eq!(reconstructed.files, expect);
300 :
301 2 : Ok(())
302 2 : }
303 : }
|