Line data Source code
1 : use crate::walrecord::NeonWalRecord;
2 : use anyhow::Context;
3 : use byteorder::{ByteOrder, LittleEndian};
4 : use bytes::BytesMut;
5 : use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
6 : use pageserver_api::reltag::SlruKind;
7 : use postgres_ffi::pg_constants;
8 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
9 : use postgres_ffi::v14::nonrelfile_utils::{
10 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
11 : transaction_id_set_status,
12 : };
13 : use postgres_ffi::BLCKSZ;
14 : use tracing::*;
15 :
16 : /// Can this request be served by neon redo functions
17 : /// or we need to pass it to wal-redo postgres process?
18 81820489 : pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
19 81820489 : // Currently, we don't have bespoken Rust code to replay any
20 81820489 : // Postgres WAL records. But everything else is handled in neon.
21 81820489 : #[allow(clippy::match_like_matches_macro)]
22 81820489 : match rec {
23 : NeonWalRecord::Postgres {
24 : will_init: _,
25 : rec: _,
26 62894964 : } => false,
27 18925525 : _ => true,
28 : }
29 81820489 : }
30 :
31 18925525 : pub(crate) fn apply_in_neon(
32 18925525 : record: &NeonWalRecord,
33 18925525 : key: Key,
34 18925525 : page: &mut BytesMut,
35 18925525 : ) -> Result<(), anyhow::Error> {
36 18925525 : match record {
37 : NeonWalRecord::Postgres {
38 : will_init: _,
39 : rec: _,
40 : } => {
41 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
42 : }
43 : NeonWalRecord::ClearVisibilityMapFlags {
44 4574 : new_heap_blkno,
45 4574 : old_heap_blkno,
46 4574 : flags,
47 : } => {
48 : // sanity check that this is modifying the correct relation
49 4574 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
50 4574 : assert!(
51 4574 : rel.forknum == VISIBILITYMAP_FORKNUM,
52 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
53 : rel
54 : );
55 4574 : if let Some(heap_blkno) = *new_heap_blkno {
56 : // Calculate the VM block and offset that corresponds to the heap block.
57 410 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
58 410 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
59 410 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
60 :
61 : // Check that we're modifying the correct VM block.
62 410 : assert!(map_block == blknum);
63 :
64 : // equivalent to PageGetContents(page)
65 410 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
66 410 :
67 410 : map[map_byte as usize] &= !(flags << map_offset);
68 4164 : }
69 :
70 : // Repeat for 'old_heap_blkno', if any
71 4574 : if let Some(heap_blkno) = *old_heap_blkno {
72 4170 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
73 4170 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
74 4170 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
75 :
76 4170 : assert!(map_block == blknum);
77 :
78 4170 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
79 4170 :
80 4170 : map[map_byte as usize] &= !(flags << map_offset);
81 404 : }
82 : }
83 : // Non-relational WAL records are handled here, with custom code that has the
84 : // same effects as the corresponding Postgres WAL redo function.
85 18819679 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
86 18819679 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
87 18819679 : assert_eq!(
88 : slru_kind,
89 : SlruKind::Clog,
90 0 : "ClogSetCommitted record with unexpected key {}",
91 : key
92 : );
93 37749399 : for &xid in xids {
94 18929720 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
95 18929720 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
96 18929720 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
97 :
98 : // Check that we're modifying the correct CLOG block.
99 18929720 : assert!(
100 18929720 : segno == expected_segno,
101 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
102 : xid,
103 : key
104 : );
105 18929720 : assert!(
106 18929720 : blknum == expected_blknum,
107 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
108 : xid,
109 : key
110 : );
111 :
112 18929720 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
113 : }
114 :
115 : // Append the timestamp
116 18819679 : if page.len() == BLCKSZ as usize + 8 {
117 18816230 : page.truncate(BLCKSZ as usize);
118 18816230 : }
119 18819679 : if page.len() == BLCKSZ as usize {
120 18819679 : page.extend_from_slice(×tamp.to_be_bytes());
121 18819679 : } else {
122 0 : warn!(
123 0 : "CLOG blk {} in seg {} has invalid size {}",
124 0 : blknum,
125 0 : segno,
126 0 : page.len()
127 0 : );
128 : }
129 : }
130 5394 : NeonWalRecord::ClogSetAborted { xids } => {
131 5394 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
132 5394 : assert_eq!(
133 : slru_kind,
134 : SlruKind::Clog,
135 0 : "ClogSetAborted record with unexpected key {}",
136 : key
137 : );
138 10809 : for &xid in xids {
139 5415 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
140 5415 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
141 5415 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
142 :
143 : // Check that we're modifying the correct CLOG block.
144 5415 : assert!(
145 5415 : segno == expected_segno,
146 0 : "ClogSetAborted record for XID {} with unexpected key {}",
147 : xid,
148 : key
149 : );
150 5415 : assert!(
151 5415 : blknum == expected_blknum,
152 0 : "ClogSetAborted record for XID {} with unexpected key {}",
153 : xid,
154 : key
155 : );
156 :
157 5415 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
158 : }
159 : }
160 47665 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
161 47665 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
162 47665 : assert_eq!(
163 : slru_kind,
164 : SlruKind::MultiXactOffsets,
165 0 : "MultixactOffsetCreate record with unexpected key {}",
166 : key
167 : );
168 : // Compute the block and offset to modify.
169 : // See RecordNewMultiXact in PostgreSQL sources.
170 47665 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
171 47665 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
172 47665 : let offset = (entryno * 4) as usize;
173 47665 :
174 47665 : // Check that we're modifying the correct multixact-offsets block.
175 47665 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
176 47665 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
177 47665 : assert!(
178 47665 : segno == expected_segno,
179 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
180 : mid,
181 : key
182 : );
183 47665 : assert!(
184 47665 : blknum == expected_blknum,
185 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
186 : mid,
187 : key
188 : );
189 :
190 47665 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
191 : }
192 48213 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
193 48213 : let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
194 48213 : assert_eq!(
195 : slru_kind,
196 : SlruKind::MultiXactMembers,
197 0 : "MultixactMembersCreate record with unexpected key {}",
198 : key
199 : );
200 945044 : for (i, member) in members.iter().enumerate() {
201 945044 : let offset = moff + i as u32;
202 945044 :
203 945044 : // Compute the block and offset to modify.
204 945044 : // See RecordNewMultiXact in PostgreSQL sources.
205 945044 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
206 945044 : let memberoff = mx_offset_to_member_offset(offset);
207 945044 : let flagsoff = mx_offset_to_flags_offset(offset);
208 945044 : let bshift = mx_offset_to_flags_bitshift(offset);
209 945044 :
210 945044 : // Check that we're modifying the correct multixact-members block.
211 945044 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
212 945044 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
213 945044 : assert!(
214 945044 : segno == expected_segno,
215 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
216 : moff,
217 : key
218 : );
219 945044 : assert!(
220 945044 : blknum == expected_blknum,
221 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
222 : moff,
223 : key
224 : );
225 :
226 945044 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
227 945044 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
228 945044 : flagsval |= member.status << bshift;
229 945044 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
230 945044 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
231 : }
232 : }
233 : }
234 18925525 : Ok(())
235 18925525 : }
|