Line data Source code
1 : use crate::pgdatadir_mapping::AuxFilesDirectory;
2 : use crate::walrecord::NeonWalRecord;
3 : use anyhow::Context;
4 : use byteorder::{ByteOrder, LittleEndian};
5 : use bytes::{BufMut, BytesMut};
6 : use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
7 : use pageserver_api::reltag::SlruKind;
8 : use postgres_ffi::pg_constants;
9 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
10 : use postgres_ffi::v14::nonrelfile_utils::{
11 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
12 : transaction_id_set_status,
13 : };
14 : use postgres_ffi::BLCKSZ;
15 : use tracing::*;
16 : use utils::bin_ser::BeSer;
17 :
18 : /// Can this request be served by neon redo functions
19 : /// or we need to pass it to wal-redo postgres process?
20 81815748 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
21 81815748 : // Currently, we don't have bespoken Rust code to replay any
22 81815748 : // Postgres WAL records. But everything else is handled in neon.
23 81815748 : #[allow(clippy::match_like_matches_macro)]
24 81815748 : match rec {
25 : NeonWalRecord::Postgres {
26 : will_init: _,
27 : rec: _,
28 62092277 : } => false,
29 19723471 : _ => true,
30 : }
31 81815748 : }
32 :
33 19723477 : pub(crate) fn apply_in_neon(
34 19723477 : record: &NeonWalRecord,
35 19723477 : key: Key,
36 19723477 : page: &mut BytesMut,
37 19723477 : ) -> Result<(), anyhow::Error> {
38 19723477 : match record {
39 : NeonWalRecord::Postgres {
40 : will_init: _,
41 : rec: _,
42 : } => {
43 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
44 : }
45 : NeonWalRecord::ClearVisibilityMapFlags {
46 984 : new_heap_blkno,
47 984 : old_heap_blkno,
48 984 : flags,
49 : } => {
50 : // sanity check that this is modifying the correct relation
51 984 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
52 984 : assert!(
53 984 : rel.forknum == VISIBILITYMAP_FORKNUM,
54 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
55 : rel
56 : );
57 984 : if let Some(heap_blkno) = *new_heap_blkno {
58 : // Calculate the VM block and offset that corresponds to the heap block.
59 413 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
60 413 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
61 413 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
62 413 :
63 413 : // Check that we're modifying the correct VM block.
64 413 : assert!(map_block == blknum);
65 :
66 : // equivalent to PageGetContents(page)
67 413 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
68 413 :
69 413 : map[map_byte as usize] &= !(flags << map_offset);
70 571 : }
71 :
72 : // Repeat for 'old_heap_blkno', if any
73 984 : if let Some(heap_blkno) = *old_heap_blkno {
74 574 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
75 574 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
76 574 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
77 574 :
78 574 : assert!(map_block == blknum);
79 :
80 574 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
81 574 :
82 574 : map[map_byte as usize] &= !(flags << map_offset);
83 410 : }
84 : }
85 : // Non-relational WAL records are handled here, with custom code that has the
86 : // same effects as the corresponding Postgres WAL redo function.
87 19622669 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
88 19622669 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
89 19622669 : assert_eq!(
90 : slru_kind,
91 : SlruKind::Clog,
92 0 : "ClogSetCommitted record with unexpected key {}",
93 : key
94 : );
95 39355366 : for &xid in xids {
96 19732697 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
97 19732697 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
98 19732697 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
99 19732697 :
100 19732697 : // Check that we're modifying the correct CLOG block.
101 19732697 : assert!(
102 19732697 : segno == expected_segno,
103 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
104 : xid,
105 : key
106 : );
107 19732697 : assert!(
108 19732697 : blknum == expected_blknum,
109 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
110 : xid,
111 : key
112 : );
113 :
114 19732697 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
115 : }
116 :
117 : // Append the timestamp
118 19622669 : if page.len() == BLCKSZ as usize + 8 {
119 19619194 : page.truncate(BLCKSZ as usize);
120 19619194 : }
121 19622669 : if page.len() == BLCKSZ as usize {
122 19622669 : page.extend_from_slice(×tamp.to_be_bytes());
123 19622669 : } else {
124 0 : warn!(
125 0 : "CLOG blk {} in seg {} has invalid size {}",
126 0 : blknum,
127 0 : segno,
128 0 : page.len()
129 0 : );
130 : }
131 : }
132 2577 : NeonWalRecord::ClogSetAborted { xids } => {
133 2577 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
134 2577 : assert_eq!(
135 : slru_kind,
136 : SlruKind::Clog,
137 0 : "ClogSetAborted record with unexpected key {}",
138 : key
139 : );
140 5168 : for &xid in xids {
141 2591 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
142 2591 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
143 2591 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
144 2591 :
145 2591 : // Check that we're modifying the correct CLOG block.
146 2591 : assert!(
147 2591 : segno == expected_segno,
148 0 : "ClogSetAborted record for XID {} with unexpected key {}",
149 : xid,
150 : key
151 : );
152 2591 : assert!(
153 2591 : blknum == expected_blknum,
154 0 : "ClogSetAborted record for XID {} with unexpected key {}",
155 : xid,
156 : key
157 : );
158 :
159 2591 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
160 : }
161 : }
162 47665 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
163 47665 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
164 47665 : assert_eq!(
165 : slru_kind,
166 : SlruKind::MultiXactOffsets,
167 0 : "MultixactOffsetCreate record with unexpected key {}",
168 : key
169 : );
170 : // Compute the block and offset to modify.
171 : // See RecordNewMultiXact in PostgreSQL sources.
172 47665 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
173 47665 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
174 47665 : let offset = (entryno * 4) as usize;
175 47665 :
176 47665 : // Check that we're modifying the correct multixact-offsets block.
177 47665 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
178 47665 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
179 47665 : assert!(
180 47665 : segno == expected_segno,
181 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
182 : mid,
183 : key
184 : );
185 47665 : assert!(
186 47665 : blknum == expected_blknum,
187 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
188 : mid,
189 : key
190 : );
191 :
192 47665 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
193 : }
194 48213 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
195 48213 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
196 48213 : assert_eq!(
197 : slru_kind,
198 : SlruKind::MultiXactMembers,
199 0 : "MultixactMembersCreate record with unexpected key {}",
200 : key
201 : );
202 945044 : for (i, member) in members.iter().enumerate() {
203 945044 : let offset = moff + i as u32;
204 945044 :
205 945044 : // Compute the block and offset to modify.
206 945044 : // See RecordNewMultiXact in PostgreSQL sources.
207 945044 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
208 945044 : let memberoff = mx_offset_to_member_offset(offset);
209 945044 : let flagsoff = mx_offset_to_flags_offset(offset);
210 945044 : let bshift = mx_offset_to_flags_bitshift(offset);
211 945044 :
212 945044 : // Check that we're modifying the correct multixact-members block.
213 945044 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
214 945044 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
215 945044 : assert!(
216 945044 : segno == expected_segno,
217 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
218 : moff,
219 : key
220 : );
221 945044 : assert!(
222 945044 : blknum == expected_blknum,
223 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
224 : moff,
225 : key
226 : );
227 :
228 945044 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
229 945044 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
230 945044 : flagsval |= member.status << bshift;
231 945044 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
232 945044 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
233 : }
234 : }
235 1369 : NeonWalRecord::AuxFile { file_path, content } => {
236 1369 : let mut dir = AuxFilesDirectory::des(page)?;
237 1369 : dir.upsert(file_path.clone(), content.clone());
238 1369 :
239 1369 : page.clear();
240 1369 : let mut writer = page.writer();
241 1369 : dir.ser_into(&mut writer)?;
242 : }
243 : }
244 19723477 : Ok(())
245 19723477 : }
246 :
247 : #[cfg(test)]
248 : mod test {
249 : use bytes::Bytes;
250 : use pageserver_api::key::AUX_FILES_KEY;
251 :
252 : use super::*;
253 : use std::collections::HashMap;
254 :
255 : use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
256 :
257 : /// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
258 2 : #[test]
259 2 : fn apply_aux_file_deltas() -> anyhow::Result<()> {
260 2 : let base_dir = AuxFilesDirectory {
261 2 : files: HashMap::from([
262 2 : ("two".to_string(), Bytes::from_static(b"content0")),
263 2 : ("three".to_string(), Bytes::from_static(b"contentX")),
264 2 : ]),
265 2 : };
266 2 : let base_image = AuxFilesDirectory::ser(&base_dir)?;
267 :
268 2 : let deltas = vec![
269 2 : // Insert
270 2 : NeonWalRecord::AuxFile {
271 2 : file_path: "one".to_string(),
272 2 : content: Some(Bytes::from_static(b"content1")),
273 2 : },
274 2 : // Update
275 2 : NeonWalRecord::AuxFile {
276 2 : file_path: "two".to_string(),
277 2 : content: Some(Bytes::from_static(b"content99")),
278 2 : },
279 2 : // Delete
280 2 : NeonWalRecord::AuxFile {
281 2 : file_path: "three".to_string(),
282 2 : content: None,
283 2 : },
284 2 : ];
285 2 :
286 2 : let file_path = AUX_FILES_KEY;
287 2 : let mut page = BytesMut::from_iter(base_image);
288 :
289 8 : for record in deltas {
290 6 : apply_in_neon(&record, file_path, &mut page)?;
291 : }
292 :
293 2 : let reconstructed = AuxFilesDirectory::des(&page)?;
294 2 : let expect = HashMap::from([
295 2 : ("one".to_string(), Bytes::from_static(b"content1")),
296 2 : ("two".to_string(), Bytes::from_static(b"content99")),
297 2 : ]);
298 2 :
299 2 : assert_eq!(reconstructed.files, expect);
300 :
301 2 : Ok(())
302 2 : }
303 : }
|