TLA Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 : //!
21 : use anyhow::Context;
22 : use byteorder::{ByteOrder, LittleEndian};
23 : use bytes::{BufMut, Bytes, BytesMut};
24 : use nix::poll::*;
25 : use serde::Serialize;
26 : use std::collections::VecDeque;
27 : use std::io;
28 : use std::io::prelude::*;
29 : use std::ops::{Deref, DerefMut};
30 : use std::os::unix::io::{AsRawFd, RawFd};
31 : use std::os::unix::prelude::CommandExt;
32 : use std::process::Stdio;
33 : use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
34 : use std::sync::{Arc, Mutex, MutexGuard, RwLock};
35 : use std::time::Duration;
36 : use std::time::Instant;
37 : use tracing::*;
38 : use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
39 :
40 : #[cfg(feature = "testing")]
41 : use std::sync::atomic::{AtomicUsize, Ordering};
42 :
43 : use crate::config::PageServerConf;
44 : use crate::metrics::{
45 : WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
46 : WAL_REDO_WAIT_TIME,
47 : };
48 : use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
49 : use crate::repository::Key;
50 : use crate::task_mgr::BACKGROUND_RUNTIME;
51 : use crate::walrecord::NeonWalRecord;
52 : use pageserver_api::reltag::{RelTag, SlruKind};
53 : use postgres_ffi::pg_constants;
54 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
55 : use postgres_ffi::v14::nonrelfile_utils::{
56 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
57 : transaction_id_set_status,
58 : };
59 : use postgres_ffi::BLCKSZ;
60 :
61 : ///
62 : /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
63 : ///
64 : /// In Postgres `BufferTag` structure is used for exactly the same purpose.
65 : /// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
66 : ///
67 CBC 5003353 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)]
68 : pub(crate) struct BufferTag {
69 : pub rel: RelTag,
70 : pub blknum: u32,
71 : }
72 :
73 : struct ProcessInput {
74 : stdin: ChildStdin,
75 : stderr_fd: RawFd,
76 : stdout_fd: RawFd,
77 : n_requests: usize,
78 : }
79 :
80 : struct ProcessOutput {
81 : stdout: ChildStdout,
82 : pending_responses: VecDeque<Option<Bytes>>,
83 : n_processed_responses: usize,
84 : }
85 :
86 : ///
87 : /// This is the real implementation that uses a Postgres process to
88 : /// perform WAL replay. Only one thread can use the process at a time,
89 : /// that is controlled by the Mutex. In the future, we might want to
90 : /// launch a pool of processes to allow concurrent replay of multiple
91 : /// records.
92 : ///
93 : pub struct PostgresRedoManager {
94 : tenant_id: TenantId,
95 : conf: &'static PageServerConf,
96 : redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
97 : }
98 :
99 : /// Can this request be served by neon redo functions
100 : /// or we need to pass it to wal-redo postgres process?
101 159709630 : fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
102 159709630 : // Currently, we don't have bespoken Rust code to replay any
103 159709630 : // Postgres WAL records. But everything else is handled in neon.
104 159709630 : #[allow(clippy::match_like_matches_macro)]
105 159709630 : match rec {
106 : NeonWalRecord::Postgres {
107 : will_init: _,
108 : rec: _,
109 136659412 : } => false,
110 23050218 : _ => true,
111 : }
112 159709630 : }
113 :
114 : ///
115 : /// Public interface of WAL redo manager
116 : ///
117 : impl PostgresRedoManager {
118 : ///
119 : /// Request the WAL redo manager to apply some WAL records
120 : ///
121 : /// The WAL redo is handled by a separate thread, so this just sends a request
122 : /// to the thread and waits for response.
123 : ///
124 2255850 : pub async fn request_redo(
125 2255850 : &self,
126 2255850 : key: Key,
127 2255850 : lsn: Lsn,
128 2255850 : base_img: Option<(Lsn, Bytes)>,
129 2255850 : records: Vec<(Lsn, NeonWalRecord)>,
130 2255850 : pg_version: u32,
131 2255852 : ) -> anyhow::Result<Bytes> {
132 2255852 : if records.is_empty() {
133 UBC 0 : anyhow::bail!("invalid WAL redo request with no records");
134 CBC 2255852 : }
135 2255852 :
136 2255852 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
137 2255852 : let mut img = base_img.map(|p| p.1);
138 2255852 : let mut batch_neon = can_apply_in_neon(&records[0].1);
139 2255852 : let mut batch_start = 0;
140 157453869 : for (i, record) in records.iter().enumerate().skip(1) {
141 157453869 : let rec_neon = can_apply_in_neon(&record.1);
142 157453869 :
143 157453869 : if rec_neon != batch_neon {
144 16 : let result = if batch_neon {
145 13 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
146 : } else {
147 3 : self.apply_batch_postgres(
148 3 : key,
149 3 : lsn,
150 3 : img,
151 3 : base_img_lsn,
152 3 : &records[batch_start..i],
153 3 : self.conf.wal_redo_timeout,
154 3 : pg_version,
155 3 : )
156 : };
157 16 : img = Some(result?);
158 :
159 16 : batch_neon = rec_neon;
160 16 : batch_start = i;
161 157453853 : }
162 : }
163 : // last batch
164 2255852 : if batch_neon {
165 4038 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
166 : } else {
167 2251814 : self.apply_batch_postgres(
168 2251814 : key,
169 2251814 : lsn,
170 2251814 : img,
171 2251814 : base_img_lsn,
172 2251814 : &records[batch_start..],
173 2251814 : self.conf.wal_redo_timeout,
174 2251814 : pg_version,
175 2251814 : )
176 : }
177 2255852 : }
178 : }
179 :
180 : impl PostgresRedoManager {
181 : ///
182 : /// Create a new PostgresRedoManager.
183 : ///
184 708 : pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
185 708 : // The actual process is launched lazily, on first request.
186 708 : PostgresRedoManager {
187 708 : tenant_id,
188 708 : conf,
189 708 : redo_process: RwLock::new(None),
190 708 : }
191 708 : }
192 :
193 : ///
194 : /// Process one request for WAL redo using wal-redo postgres
195 : ///
196 : #[allow(clippy::too_many_arguments)]
197 2251815 : fn apply_batch_postgres(
198 2251815 : &self,
199 2251815 : key: Key,
200 2251815 : lsn: Lsn,
201 2251815 : base_img: Option<Bytes>,
202 2251815 : base_img_lsn: Lsn,
203 2251815 : records: &[(Lsn, NeonWalRecord)],
204 2251815 : wal_redo_timeout: Duration,
205 2251815 : pg_version: u32,
206 2251815 : ) -> anyhow::Result<Bytes> {
207 2251815 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
208 : const MAX_RETRY_ATTEMPTS: u32 = 1;
209 2251815 : let start_time = Instant::now();
210 2251815 : let mut n_attempts = 0u32;
211 2251815 : loop {
212 2251815 : let lock_time = Instant::now();
213 :
214 : // launch the WAL redo process on first use
215 2251815 : let proc: Arc<WalRedoProcess> = {
216 2251815 : let proc_guard = self.redo_process.read().unwrap();
217 2251815 : match &*proc_guard {
218 : None => {
219 : // "upgrade" to write lock to launch the process
220 386 : drop(proc_guard);
221 386 : let mut proc_guard = self.redo_process.write().unwrap();
222 386 : match &*proc_guard {
223 : None => {
224 386 : let proc = Arc::new(
225 386 : WalRedoProcess::launch(self.conf, self.tenant_id, pg_version)
226 386 : .context("launch walredo process")?,
227 : );
228 386 : *proc_guard = Some(Arc::clone(&proc));
229 386 : proc
230 : }
231 UBC 0 : Some(proc) => Arc::clone(proc),
232 : }
233 : }
234 CBC 2251429 : Some(proc) => Arc::clone(proc),
235 : }
236 : };
237 :
238 2251815 : WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
239 2251815 :
240 2251815 : // Relational WAL records are applied using wal-redo-postgres
241 2251815 : let buf_tag = BufferTag { rel, blknum };
242 2251815 : let result = proc
243 2251815 : .apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
244 2251815 : .context("apply_wal_records");
245 2251815 :
246 2251815 : let end_time = Instant::now();
247 2251815 : let duration = end_time.duration_since(lock_time);
248 2251815 :
249 2251815 : let len = records.len();
250 136658410 : let nbytes = records.iter().fold(0, |acumulator, record| {
251 136658410 : acumulator
252 136658410 : + match &record.1 {
253 136658410 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
254 UBC 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
255 : }
256 CBC 136658410 : });
257 2251815 :
258 2251815 : WAL_REDO_TIME.observe(duration.as_secs_f64());
259 2251815 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
260 2251815 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
261 2251815 :
262 2251815 : debug!(
263 UBC 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
264 0 : len,
265 0 : nbytes,
266 0 : duration.as_micros(),
267 0 : lsn
268 0 : );
269 :
270 : // If something went wrong, don't try to reuse the process. Kill it, and
271 : // next request will launch a new one.
272 CBC 2251813 : if let Err(e) = result.as_ref() {
273 UBC 0 : error!(
274 0 : "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
275 0 : records.len(),
276 0 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
277 0 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
278 0 : nbytes,
279 0 : base_img_lsn,
280 0 : lsn,
281 0 : n_attempts,
282 0 : e,
283 0 : );
284 : // Avoid concurrent callers hitting the same issue.
285 : // We can't prevent it from happening because we want to enable parallelism.
286 0 : let mut guard = self.redo_process.write().unwrap();
287 0 : match &*guard {
288 0 : Some(current_field_value) => {
289 0 : if Arc::ptr_eq(current_field_value, &proc) {
290 0 : // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
291 0 : *guard = None;
292 0 : }
293 : }
294 0 : None => {
295 0 : // Another thread was faster to observe the error, and already took the process out of rotation.
296 0 : }
297 : }
298 0 : drop(guard);
299 0 : // NB: there may still be other concurrent threads using `proc`.
300 0 : // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
301 0 : // NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
302 0 : // holding the lock while waiting for the process to exit.
303 0 : // NB: the drop impl blocks the current threads with a wait() system call for
304 0 : // the child process. We dropped the `guard` above so that other threads aren't
305 0 : // affected. But, it's good that the current thread _does_ block to wait.
306 0 : // If we instead deferred the waiting into the background / to tokio, it could
307 0 : // happen that if walredo always fails immediately, we spawn processes faster
308 0 : // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
309 0 : // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
310 0 : // This probably needs revisiting at some later point.
311 0 : drop(proc);
312 CBC 2251813 : } else if n_attempts != 0 {
313 UBC 0 : info!(n_attempts, "retried walredo succeeded");
314 CBC 2251813 : }
315 2251813 : n_attempts += 1;
316 2251813 : if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
317 2251813 : return result;
318 UBC 0 : }
319 : }
320 CBC 2251813 : }
321 :
322 : ///
323 : /// Process a batch of WAL records using bespoken Neon code.
324 : ///
325 4051 : fn apply_batch_neon(
326 4051 : &self,
327 4051 : key: Key,
328 4051 : lsn: Lsn,
329 4051 : base_img: Option<Bytes>,
330 4051 : records: &[(Lsn, NeonWalRecord)],
331 4051 : ) -> anyhow::Result<Bytes> {
332 4051 : let start_time = Instant::now();
333 4051 :
334 4051 : let mut page = BytesMut::new();
335 4051 : if let Some(fpi) = base_img {
336 4051 : // If full-page image is provided, then use it...
337 4051 : page.extend_from_slice(&fpi[..]);
338 4051 : } else {
339 : // All the current WAL record types that we can handle require a base image.
340 UBC 0 : anyhow::bail!("invalid neon WAL redo request with no base image");
341 : }
342 :
343 : // Apply all the WAL records in the batch
344 CBC 23050218 : for (record_lsn, record) in records.iter() {
345 23050218 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
346 : }
347 : // Success!
348 4051 : let end_time = Instant::now();
349 4051 : let duration = end_time.duration_since(start_time);
350 4051 : WAL_REDO_TIME.observe(duration.as_secs_f64());
351 4051 :
352 4051 : debug!(
353 UBC 0 : "neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
354 0 : records.len(),
355 0 : duration.as_micros(),
356 0 : lsn
357 0 : );
358 :
359 CBC 4051 : Ok(page.freeze())
360 4051 : }
361 :
362 23050218 : fn apply_record_neon(
363 23050218 : &self,
364 23050218 : key: Key,
365 23050218 : page: &mut BytesMut,
366 23050218 : _record_lsn: Lsn,
367 23050218 : record: &NeonWalRecord,
368 23050218 : ) -> anyhow::Result<()> {
369 23050218 : match record {
370 : NeonWalRecord::Postgres {
371 : will_init: _,
372 : rec: _,
373 : } => {
374 UBC 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
375 : }
376 : NeonWalRecord::ClearVisibilityMapFlags {
377 CBC 10637 : new_heap_blkno,
378 10637 : old_heap_blkno,
379 10637 : flags,
380 : } => {
381 : // sanity check that this is modifying the correct relation
382 10637 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
383 10637 : assert!(
384 10637 : rel.forknum == VISIBILITYMAP_FORKNUM,
385 UBC 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
386 : rel
387 : );
388 CBC 10637 : if let Some(heap_blkno) = *new_heap_blkno {
389 : // Calculate the VM block and offset that corresponds to the heap block.
390 676 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
391 676 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
392 676 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
393 676 :
394 676 : // Check that we're modifying the correct VM block.
395 676 : assert!(map_block == blknum);
396 :
397 : // equivalent to PageGetContents(page)
398 676 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
399 676 :
400 676 : map[map_byte as usize] &= !(flags << map_offset);
401 9961 : }
402 :
403 : // Repeat for 'old_heap_blkno', if any
404 10637 : if let Some(heap_blkno) = *old_heap_blkno {
405 9965 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
406 9965 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
407 9965 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
408 9965 :
409 9965 : assert!(map_block == blknum);
410 :
411 9965 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
412 9965 :
413 9965 : map[map_byte as usize] &= !(flags << map_offset);
414 672 : }
415 : }
416 : // Non-relational WAL records are handled here, with custom code that has the
417 : // same effects as the corresponding Postgres WAL redo function.
418 22942379 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
419 22942379 : let (slru_kind, segno, blknum) =
420 22942379 : key_to_slru_block(key).context("invalid record")?;
421 22942379 : assert_eq!(
422 : slru_kind,
423 : SlruKind::Clog,
424 UBC 0 : "ClogSetCommitted record with unexpected key {}",
425 : key
426 : );
427 CBC 45984771 : for &xid in xids {
428 23042392 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
429 23042392 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
430 23042392 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
431 23042392 :
432 23042392 : // Check that we're modifying the correct CLOG block.
433 23042392 : assert!(
434 23042392 : segno == expected_segno,
435 UBC 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
436 : xid,
437 : key
438 : );
439 CBC 23042392 : assert!(
440 23042392 : blknum == expected_blknum,
441 UBC 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
442 : xid,
443 : key
444 : );
445 :
446 CBC 23042392 : transaction_id_set_status(
447 23042392 : xid,
448 23042392 : pg_constants::TRANSACTION_STATUS_COMMITTED,
449 23042392 : page,
450 23042392 : );
451 : }
452 :
453 : // Append the timestamp
454 22942379 : if page.len() == BLCKSZ as usize + 8 {
455 22939144 : page.truncate(BLCKSZ as usize);
456 22939144 : }
457 22942379 : if page.len() == BLCKSZ as usize {
458 22942379 : page.extend_from_slice(×tamp.to_be_bytes());
459 22942379 : } else {
460 UBC 0 : warn!(
461 0 : "CLOG blk {} in seg {} has invalid size {}",
462 0 : blknum,
463 0 : segno,
464 0 : page.len()
465 0 : );
466 : }
467 : }
468 CBC 1330 : NeonWalRecord::ClogSetAborted { xids } => {
469 1330 : let (slru_kind, segno, blknum) =
470 1330 : key_to_slru_block(key).context("invalid record")?;
471 1330 : assert_eq!(
472 : slru_kind,
473 : SlruKind::Clog,
474 UBC 0 : "ClogSetAborted record with unexpected key {}",
475 : key
476 : );
477 CBC 2667 : for &xid in xids {
478 1337 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
479 1337 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
480 1337 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
481 1337 :
482 1337 : // Check that we're modifying the correct CLOG block.
483 1337 : assert!(
484 1337 : segno == expected_segno,
485 UBC 0 : "ClogSetAborted record for XID {} with unexpected key {}",
486 : xid,
487 : key
488 : );
489 CBC 1337 : assert!(
490 1337 : blknum == expected_blknum,
491 UBC 0 : "ClogSetAborted record for XID {} with unexpected key {}",
492 : xid,
493 : key
494 : );
495 :
496 CBC 1337 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
497 : }
498 : }
499 47662 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
500 47662 : let (slru_kind, segno, blknum) =
501 47662 : key_to_slru_block(key).context("invalid record")?;
502 47662 : assert_eq!(
503 : slru_kind,
504 : SlruKind::MultiXactOffsets,
505 UBC 0 : "MultixactOffsetCreate record with unexpected key {}",
506 : key
507 : );
508 : // Compute the block and offset to modify.
509 : // See RecordNewMultiXact in PostgreSQL sources.
510 CBC 47662 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
511 47662 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
512 47662 : let offset = (entryno * 4) as usize;
513 47662 :
514 47662 : // Check that we're modifying the correct multixact-offsets block.
515 47662 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
516 47662 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
517 47662 : assert!(
518 47662 : segno == expected_segno,
519 UBC 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
520 : mid,
521 : key
522 : );
523 CBC 47662 : assert!(
524 47662 : blknum == expected_blknum,
525 UBC 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
526 : mid,
527 : key
528 : );
529 :
530 CBC 47662 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
531 : }
532 48210 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
533 48210 : let (slru_kind, segno, blknum) =
534 48210 : key_to_slru_block(key).context("invalid record")?;
535 48210 : assert_eq!(
536 : slru_kind,
537 : SlruKind::MultiXactMembers,
538 UBC 0 : "MultixactMembersCreate record with unexpected key {}",
539 : key
540 : );
541 CBC 945038 : for (i, member) in members.iter().enumerate() {
542 945038 : let offset = moff + i as u32;
543 945038 :
544 945038 : // Compute the block and offset to modify.
545 945038 : // See RecordNewMultiXact in PostgreSQL sources.
546 945038 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
547 945038 : let memberoff = mx_offset_to_member_offset(offset);
548 945038 : let flagsoff = mx_offset_to_flags_offset(offset);
549 945038 : let bshift = mx_offset_to_flags_bitshift(offset);
550 945038 :
551 945038 : // Check that we're modifying the correct multixact-members block.
552 945038 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
553 945038 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
554 945038 : assert!(
555 945038 : segno == expected_segno,
556 UBC 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
557 : moff,
558 : key
559 : );
560 CBC 945038 : assert!(
561 945038 : blknum == expected_blknum,
562 UBC 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
563 : moff,
564 : key
565 : );
566 :
567 CBC 945038 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
568 945038 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
569 945038 : flagsval |= member.status << bshift;
570 945038 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
571 945038 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
572 : }
573 : }
574 : }
575 :
576 23050218 : Ok(())
577 23050218 : }
578 : }
579 :
580 : ///
581 : /// Command with ability not to give all file descriptors to child process
582 : ///
583 : trait CloseFileDescriptors: CommandExt {
584 : ///
585 : /// Close file descriptors (other than stdin, stdout, stderr) in child process
586 : ///
587 : fn close_fds(&mut self) -> &mut Command;
588 : }
589 :
590 : impl<C: CommandExt> CloseFileDescriptors for C {
591 386 : fn close_fds(&mut self) -> &mut Command {
592 386 : unsafe {
593 386 : self.pre_exec(move || {
594 UBC 0 : // SAFETY: Code executed inside pre_exec should have async-signal-safety,
595 0 : // which means it should be safe to execute inside a signal handler.
596 0 : // The precise meaning depends on platform. See `man signal-safety`
597 0 : // for the linux definition.
598 0 : //
599 0 : // The set_fds_cloexec_threadsafe function is documented to be
600 0 : // async-signal-safe.
601 0 : //
602 0 : // Aside from this function, the rest of the code is re-entrant and
603 0 : // doesn't make any syscalls. We're just passing constants.
604 0 : //
605 0 : // NOTE: It's easy to indirectly cause a malloc or lock a mutex,
606 0 : // which is not async-signal-safe. Be careful.
607 0 : close_fds::set_fds_cloexec_threadsafe(3, &[]);
608 0 : Ok(())
609 CBC 386 : })
610 386 : }
611 386 : }
612 : }
613 :
614 : struct WalRedoProcess {
615 : #[allow(dead_code)]
616 : conf: &'static PageServerConf,
617 : tenant_id: TenantId,
618 : // Some() on construction, only becomes None on Drop.
619 : child: Option<NoLeakChild>,
620 : stdout: Mutex<ProcessOutput>,
621 : stdin: Mutex<ProcessInput>,
622 : stderr: Mutex<ChildStderr>,
623 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
624 : #[cfg(feature = "testing")]
625 : dump_sequence: AtomicUsize,
626 : }
627 :
628 : impl WalRedoProcess {
629 : //
630 : // Start postgres binary in special WAL redo mode.
631 : //
632 386 : #[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
633 : fn launch(
634 : conf: &'static PageServerConf,
635 : tenant_id: TenantId,
636 : pg_version: u32,
637 : ) -> anyhow::Result<Self> {
638 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
639 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
640 :
641 : // Start postgres itself
642 : let child = Command::new(pg_bin_dir_path.join("postgres"))
643 : .arg("--wal-redo")
644 : .stdin(Stdio::piped())
645 : .stderr(Stdio::piped())
646 : .stdout(Stdio::piped())
647 : .env_clear()
648 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
649 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
650 : // The redo process is not trusted, and runs in seccomp mode that
651 : // doesn't allow it to open any files. We have to also make sure it
652 : // doesn't inherit any file descriptors from the pageserver, that
653 : // would allow an attacker to read any files that happen to be open
654 : // in the pageserver.
655 : //
656 : // The Rust standard library makes sure to mark any file descriptors with
657 : // as close-on-exec by default, but that's not enough, since we use
658 : // libraries that directly call libc open without setting that flag.
659 : .close_fds()
660 : .spawn_no_leak_child(tenant_id)
661 : .context("spawn process")?;
662 :
663 UBC 0 : let mut child = scopeguard::guard(child, |child| {
664 0 : error!("killing wal-redo-postgres process due to a problem during launch");
665 0 : child.kill_and_wait();
666 0 : });
667 :
668 : let stdin = child.stdin.take().unwrap();
669 : let stdout = child.stdout.take().unwrap();
670 : let stderr = child.stderr.take().unwrap();
671 :
672 : macro_rules! set_nonblock_or_log_err {
673 : ($file:ident) => {{
674 : let res = set_nonblock($file.as_raw_fd());
675 : if let Err(e) = &res {
676 : error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
677 : }
678 : res
679 : }};
680 : }
681 0 : set_nonblock_or_log_err!(stdin)?;
682 0 : set_nonblock_or_log_err!(stdout)?;
683 0 : set_nonblock_or_log_err!(stderr)?;
684 :
685 : // all fallible operations post-spawn are complete, so get rid of the guard
686 : let child = scopeguard::ScopeGuard::into_inner(child);
687 :
688 : Ok(Self {
689 : conf,
690 : tenant_id,
691 : child: Some(child),
692 : stdin: Mutex::new(ProcessInput {
693 : stdout_fd: stdout.as_raw_fd(),
694 : stderr_fd: stderr.as_raw_fd(),
695 : stdin,
696 : n_requests: 0,
697 : }),
698 : stdout: Mutex::new(ProcessOutput {
699 : stdout,
700 : pending_responses: VecDeque::new(),
701 : n_processed_responses: 0,
702 : }),
703 : stderr: Mutex::new(stderr),
704 : #[cfg(feature = "testing")]
705 : dump_sequence: AtomicUsize::default(),
706 : })
707 : }
708 :
709 CBC 2251815 : fn id(&self) -> u32 {
710 2251815 : self.child
711 2251815 : .as_ref()
712 2251815 : .expect("must not call this during Drop")
713 2251815 : .id()
714 2251815 : }
715 :
716 : // Apply given WAL records ('records') over an old page image. Returns
717 : // new page image.
718 : //
719 2251815 : #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.id()))]
720 : fn apply_wal_records(
721 : &self,
722 : tag: BufferTag,
723 : base_img: &Option<Bytes>,
724 : records: &[(Lsn, NeonWalRecord)],
725 : wal_redo_timeout: Duration,
726 : ) -> anyhow::Result<Bytes> {
727 : let input = self.stdin.lock().unwrap();
728 :
729 : // Serialize all the messages to send the WAL redo process first.
730 : //
731 : // This could be problematic if there are millions of records to replay,
732 : // but in practice the number of records is usually so small that it doesn't
733 : // matter, and it's better to keep this code simple.
734 : //
735 : // Most requests start with a before-image with BLCKSZ bytes, followed by
736 : // by some other WAL records. Start with a buffer that can hold that
737 : // comfortably.
738 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
739 : build_begin_redo_for_block_msg(tag, &mut writebuf);
740 : if let Some(img) = base_img {
741 : build_push_page_msg(tag, img, &mut writebuf);
742 : }
743 : for (lsn, rec) in records.iter() {
744 : if let NeonWalRecord::Postgres {
745 : will_init: _,
746 : rec: postgres_rec,
747 : } = rec
748 : {
749 : build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
750 : } else {
751 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
752 : }
753 : }
754 : build_get_page_msg(tag, &mut writebuf);
755 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
756 :
757 : let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
758 :
759 : if res.is_err() {
760 : // not all of these can be caused by this particular input, however these are so rare
761 : // in tests so capture all.
762 : self.record_and_log(&writebuf);
763 : }
764 :
765 : res
766 : }
767 :
768 2251814 : fn apply_wal_records0(
769 2251814 : &self,
770 2251814 : writebuf: &[u8],
771 2251814 : input: MutexGuard<ProcessInput>,
772 2251814 : wal_redo_timeout: Duration,
773 2251814 : ) -> anyhow::Result<Bytes> {
774 2251814 : let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
775 2251814 : let mut nwrite = 0usize;
776 2251814 :
777 2251814 : // Prepare for calling poll()
778 2251814 : let mut pollfds = [
779 2251814 : PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
780 2251814 : PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
781 2251814 : PollFd::new(proc.stdout_fd, PollFlags::POLLIN),
782 2251814 : ];
783 :
784 : // We do two things simultaneously: send the old base image and WAL records to
785 : // the child process's stdin and forward any logging
786 : // information that the child writes to its stderr to the page server's log.
787 4547832 : while nwrite < writebuf.len() {
788 2296018 : let n = loop {
789 2296018 : match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
790 UBC 0 : Err(nix::errno::Errno::EINTR) => continue,
791 CBC 2296018 : res => break res,
792 : }
793 UBC 0 : }?;
794 :
795 CBC 2296018 : if n == 0 {
796 UBC 0 : anyhow::bail!("WAL redo timed out");
797 CBC 2296018 : }
798 2296018 :
799 2296018 : // If we have some messages in stderr, forward them to the log.
800 2296018 : let err_revents = pollfds[1].revents().unwrap();
801 2296018 : if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
802 UBC 0 : let mut errbuf: [u8; 16384] = [0; 16384];
803 0 : let mut stderr = self.stderr.lock().unwrap();
804 0 : let len = stderr.read(&mut errbuf)?;
805 :
806 : // The message might not be split correctly into lines here. But this is
807 : // good enough, the important thing is to get the message to the log.
808 0 : if len > 0 {
809 0 : error!(
810 0 : "wal-redo-postgres: {}",
811 0 : String::from_utf8_lossy(&errbuf[0..len])
812 0 : );
813 :
814 : // To make sure we capture all log from the process if it fails, keep
815 : // reading from the stderr, before checking the stdout.
816 0 : continue;
817 0 : }
818 CBC 2296018 : } else if err_revents.contains(PollFlags::POLLHUP) {
819 UBC 0 : anyhow::bail!("WAL redo process closed its stderr unexpectedly");
820 CBC 2296018 : }
821 :
822 : // If 'stdin' is writeable, do write.
823 2296018 : let in_revents = pollfds[0].revents().unwrap();
824 2296018 : if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
825 2296018 : nwrite += proc.stdin.write(&writebuf[nwrite..])?;
826 UBC 0 : } else if in_revents.contains(PollFlags::POLLHUP) {
827 : // We still have more data to write, but the process closed the pipe.
828 0 : anyhow::bail!("WAL redo process closed its stdin unexpectedly");
829 0 : }
830 : }
831 CBC 2251814 : let request_no = proc.n_requests;
832 2251814 : proc.n_requests += 1;
833 2251814 : drop(proc);
834 2251814 :
835 2251814 : // To improve walredo performance we separate sending requests and receiving
836 2251814 : // responses. Them are protected by different mutexes (output and input).
837 2251814 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
838 2251814 : // then there is not warranty that T1 will first granted output mutex lock.
839 2251814 : // To address this issue we maintain number of sent requests, number of processed
840 2251814 : // responses and ring buffer with pending responses. After sending response
841 2251814 : // (under input mutex), threads remembers request number. Then it releases
842 2251814 : // input mutex, locks output mutex and fetch in ring buffer all responses until
843 2251814 : // its stored request number. The it takes correspondent element from
844 2251814 : // pending responses ring buffer and truncate all empty elements from the front,
845 2251814 : // advancing processed responses number.
846 2251814 :
847 2251814 : let mut output = self.stdout.lock().unwrap();
848 2251814 : let n_processed_responses = output.n_processed_responses;
849 4503627 : while n_processed_responses + output.pending_responses.len() <= request_no {
850 : // We expect the WAL redo process to respond with an 8k page image. We read it
851 : // into this buffer.
852 2251814 : let mut resultbuf = vec![0; BLCKSZ.into()];
853 2251814 : let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
854 4503629 : while nresult < BLCKSZ.into() {
855 : // We do two things simultaneously: reading response from stdout
856 : // and forward any logging information that the child writes to its stderr to the page server's log.
857 2251815 : let n = loop {
858 2251816 : match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
859 UBC 0 : Err(nix::errno::Errno::EINTR) => continue,
860 CBC 2251815 : res => break res,
861 : }
862 UBC 0 : }?;
863 :
864 CBC 2251815 : if n == 0 {
865 UBC 0 : anyhow::bail!("WAL redo timed out");
866 CBC 2251815 : }
867 2251815 :
868 2251815 : // If we have some messages in stderr, forward them to the log.
869 2251815 : let err_revents = pollfds[1].revents().unwrap();
870 2251815 : if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
871 2 : let mut errbuf: [u8; 16384] = [0; 16384];
872 2 : let mut stderr = self.stderr.lock().unwrap();
873 2 : let len = stderr.read(&mut errbuf)?;
874 :
875 : // The message might not be split correctly into lines here. But this is
876 : // good enough, the important thing is to get the message to the log.
877 2 : if len > 0 {
878 2 : error!(
879 2 : "wal-redo-postgres: {}",
880 2 : String::from_utf8_lossy(&errbuf[0..len])
881 2 : );
882 :
883 : // To make sure we capture all log from the process if it fails, keep
884 : // reading from the stderr, before checking the stdout.
885 2 : continue;
886 UBC 0 : }
887 CBC 2251813 : } else if err_revents.contains(PollFlags::POLLHUP) {
888 UBC 0 : anyhow::bail!("WAL redo process closed its stderr unexpectedly");
889 CBC 2251813 : }
890 :
891 : // If we have some data in stdout, read it to the result buffer.
892 2251813 : let out_revents = pollfds[2].revents().unwrap();
893 2251813 : if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
894 2251813 : nresult += output.stdout.read(&mut resultbuf[nresult..])?;
895 UBC 0 : } else if out_revents.contains(PollFlags::POLLHUP) {
896 0 : anyhow::bail!("WAL redo process closed its stdout unexpectedly");
897 0 : }
898 : }
899 CBC 2251813 : output
900 2251813 : .pending_responses
901 2251813 : .push_back(Some(Bytes::from(resultbuf)));
902 : }
903 : // Replace our request's response with None in `pending_responses`.
904 : // Then make space in the ring buffer by clearing out any seqence of contiguous
905 : // `None`'s from the front of `pending_responses`.
906 : // NB: We can't pop_front() because other requests' responses because another
907 : // requester might have grabbed the output mutex before us:
908 : // T1: grab input mutex
909 : // T1: send request_no 23
910 : // T1: release input mutex
911 : // T2: grab input mutex
912 : // T2: send request_no 24
913 : // T2: release input mutex
914 : // T2: grab output mutex
915 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
916 : // 23 0 24
917 : // T2: enters poll loop that reads stdout
918 : // T2: put response for 23 into pending_responses
919 : // T2: put response for 24 into pending_resposnes
920 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
921 : // T2: takes its response_24
922 : // pending_responses now looks like this: Front Some(response_23) None Back
923 : // T2: does the while loop below
924 : // pending_responses now looks like this: Front Some(response_23) None Back
925 : // T2: releases output mutex
926 : // T1: grabs output mutex
927 : // T1: n_processed_responses + output.pending_responses.len() > request_no
928 : // 23 2 23
929 : // T1: skips poll loop that reads stdout
930 : // T1: takes its response_23
931 : // pending_responses now looks like this: Front None None Back
932 : // T2: does the while loop below
933 : // pending_responses now looks like this: Front Back
934 : // n_processed_responses now has value 25
935 2251813 : let res = output.pending_responses[request_no - n_processed_responses]
936 2251813 : .take()
937 2251813 : .expect("we own this request_no, nobody else is supposed to take it");
938 4503626 : while let Some(front) = output.pending_responses.front() {
939 2261161 : if front.is_none() {
940 2251813 : output.pending_responses.pop_front();
941 2251813 : output.n_processed_responses += 1;
942 2251813 : } else {
943 9348 : break;
944 : }
945 : }
946 2251813 : Ok(res)
947 2251813 : }
948 :
949 : #[cfg(feature = "testing")]
950 UBC 0 : fn record_and_log(&self, writebuf: &[u8]) {
951 0 : let millis = std::time::SystemTime::now()
952 0 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
953 0 : .unwrap()
954 0 : .as_millis();
955 0 :
956 0 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
957 0 :
958 0 : // these files will be collected to an allure report
959 0 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
960 0 :
961 0 : let path = self.conf.tenant_path(&self.tenant_id).join(&filename);
962 0 :
963 0 : let res = std::fs::OpenOptions::new()
964 0 : .write(true)
965 0 : .create_new(true)
966 0 : .read(true)
967 0 : .open(path)
968 0 : .and_then(|mut f| f.write_all(writebuf));
969 :
970 : // trip up allowed_errors
971 0 : if let Err(e) = res {
972 0 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
973 : } else {
974 0 : tracing::error!(filename, "erroring walredo input saved");
975 : }
976 0 : }
977 :
978 : #[cfg(not(feature = "testing"))]
979 : fn record_and_log(&self, _: &[u8]) {}
980 : }
981 :
982 : impl Drop for WalRedoProcess {
983 CBC 50 : fn drop(&mut self) {
984 50 : self.child
985 50 : .take()
986 50 : .expect("we only do this once")
987 50 : .kill_and_wait();
988 50 : }
989 : }
990 :
991 : /// Wrapper type around `std::process::Child` which guarantees that the child
992 : /// will be killed and waited-for by this process before being dropped.
993 : struct NoLeakChild {
994 : tenant_id: TenantId,
995 : child: Option<Child>,
996 : }
997 :
998 : impl Deref for NoLeakChild {
999 : type Target = Child;
1000 :
1001 2251815 : fn deref(&self) -> &Self::Target {
1002 2251815 : self.child.as_ref().expect("must not use from drop")
1003 2251815 : }
1004 : }
1005 :
1006 : impl DerefMut for NoLeakChild {
1007 1158 : fn deref_mut(&mut self) -> &mut Self::Target {
1008 1158 : self.child.as_mut().expect("must not use from drop")
1009 1158 : }
1010 : }
1011 :
1012 : impl NoLeakChild {
1013 386 : fn spawn(tenant_id: TenantId, command: &mut Command) -> io::Result<Self> {
1014 386 : let child = command.spawn()?;
1015 386 : Ok(NoLeakChild {
1016 386 : tenant_id,
1017 386 : child: Some(child),
1018 386 : })
1019 386 : }
1020 :
1021 50 : fn kill_and_wait(mut self) {
1022 50 : let child = match self.child.take() {
1023 50 : Some(child) => child,
1024 UBC 0 : None => return,
1025 : };
1026 CBC 50 : Self::kill_and_wait_impl(child);
1027 50 : }
1028 :
1029 50 : #[instrument(skip_all, fields(pid=child.id()))]
1030 : fn kill_and_wait_impl(mut child: Child) {
1031 : let res = child.kill();
1032 : if let Err(e) = res {
1033 : // This branch is very unlikely because:
1034 : // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
1035 : // - This is the only place that calls .kill()
1036 : // - We consume `self`, so, .kill() can't be called twice.
1037 : // - If the process exited by itself or was killed by someone else,
1038 : // .kill() will still succeed because we haven't wait()'ed yet.
1039 : //
1040 : // So, if we arrive here, we have really no idea what happened,
1041 : // whether the PID stored in self.child is still valid, etc.
1042 : // If this function were fallible, we'd return an error, but
1043 : // since it isn't, all we can do is log an error and proceed
1044 : // with the wait().
1045 UBC 0 : error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
1046 : }
1047 :
1048 : match child.wait() {
1049 : Ok(exit_status) => {
1050 CBC 50 : info!(exit_status = %exit_status, "wait successful");
1051 : }
1052 : Err(e) => {
1053 UBC 0 : error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
1054 : }
1055 : }
1056 : }
1057 : }
1058 :
1059 : impl Drop for NoLeakChild {
1060 CBC 50 : fn drop(&mut self) {
1061 50 : let child = match self.child.take() {
1062 UBC 0 : Some(child) => child,
1063 CBC 50 : None => return,
1064 : };
1065 UBC 0 : let tenant_id = self.tenant_id;
1066 0 : // Offload the kill+wait of the child process into the background.
1067 0 : // If someone stops the runtime, we'll leak the child process.
1068 0 : // We can ignore that case because we only stop the runtime on pageserver exit.
1069 0 : BACKGROUND_RUNTIME.spawn(async move {
1070 0 : tokio::task::spawn_blocking(move || {
1071 : // Intentionally don't inherit the tracing context from whoever is dropping us.
1072 : // This thread here is going to outlive of our dropper.
1073 0 : let span = tracing::info_span!("walredo", %tenant_id);
1074 0 : let _entered = span.enter();
1075 0 : Self::kill_and_wait_impl(child);
1076 0 : })
1077 0 : .await
1078 0 : });
1079 CBC 50 : }
1080 : }
1081 :
1082 : trait NoLeakChildCommandExt {
1083 : fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild>;
1084 : }
1085 :
1086 : impl NoLeakChildCommandExt for Command {
1087 386 : fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild> {
1088 386 : NoLeakChild::spawn(tenant_id, self)
1089 386 : }
1090 : }
1091 :
1092 : // Functions for constructing messages to send to the postgres WAL redo
1093 : // process. See pgxn/neon_walredo/walredoproc.c for
1094 : // explanation of the protocol.
1095 :
1096 2251814 : fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
1097 2251814 : let len = 4 + 1 + 4 * 4;
1098 2251814 :
1099 2251814 : buf.put_u8(b'B');
1100 2251814 : buf.put_u32(len as u32);
1101 2251814 :
1102 2251814 : tag.ser_into(buf)
1103 2251814 : .expect("serialize BufferTag should always succeed");
1104 2251814 : }
1105 :
1106 499721 : fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
1107 499721 : assert!(base_img.len() == 8192);
1108 :
1109 499721 : let len = 4 + 1 + 4 * 4 + base_img.len();
1110 499721 :
1111 499721 : buf.put_u8(b'P');
1112 499721 : buf.put_u32(len as u32);
1113 499721 : tag.ser_into(buf)
1114 499721 : .expect("serialize BufferTag should always succeed");
1115 499721 : buf.put(base_img);
1116 499721 : }
1117 :
1118 136659088 : fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
1119 136659088 : let len = 4 + 8 + rec.len();
1120 136659088 :
1121 136659088 : buf.put_u8(b'A');
1122 136659088 : buf.put_u32(len as u32);
1123 136659088 : buf.put_u64(endlsn.0);
1124 136659088 : buf.put(rec);
1125 136659088 : }
1126 :
1127 2251814 : fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
1128 2251814 : let len = 4 + 1 + 4 * 4;
1129 2251814 :
1130 2251814 : buf.put_u8(b'G');
1131 2251814 : buf.put_u32(len as u32);
1132 2251814 : tag.ser_into(buf)
1133 2251814 : .expect("serialize BufferTag should always succeed");
1134 2251814 : }
1135 :
1136 : #[cfg(test)]
1137 : mod tests {
1138 : use super::PostgresRedoManager;
1139 : use crate::repository::Key;
1140 : use crate::{config::PageServerConf, walrecord::NeonWalRecord};
1141 : use bytes::Bytes;
1142 : use std::str::FromStr;
1143 : use utils::{id::TenantId, lsn::Lsn};
1144 :
1145 1 : #[tokio::test]
1146 1 : async fn short_v14_redo() {
1147 1 : let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap();
1148 1 :
1149 1 : let h = RedoHarness::new().unwrap();
1150 :
1151 1 : let page = h
1152 1 : .manager
1153 1 : .request_redo(
1154 1 : Key {
1155 1 : field1: 0,
1156 1 : field2: 1663,
1157 1 : field3: 13010,
1158 1 : field4: 1259,
1159 1 : field5: 0,
1160 1 : field6: 0,
1161 1 : },
1162 1 : Lsn::from_str("0/16E2408").unwrap(),
1163 1 : None,
1164 1 : short_records(),
1165 1 : 14,
1166 1 : )
1167 UBC 0 : .await
1168 CBC 1 : .unwrap();
1169 1 :
1170 1 : assert_eq!(&expected, &*page);
1171 : }
1172 :
1173 1 : #[tokio::test]
1174 1 : async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
1175 1 : let h = RedoHarness::new().unwrap();
1176 :
1177 1 : let page = h
1178 1 : .manager
1179 1 : .request_redo(
1180 1 : Key {
1181 1 : field1: 0,
1182 1 : field2: 1663,
1183 1 : // key should be 13010
1184 1 : field3: 13130,
1185 1 : field4: 1259,
1186 1 : field5: 0,
1187 1 : field6: 0,
1188 1 : },
1189 1 : Lsn::from_str("0/16E2408").unwrap(),
1190 1 : None,
1191 1 : short_records(),
1192 1 : 14,
1193 1 : )
1194 UBC 0 : .await
1195 CBC 1 : .unwrap();
1196 1 :
1197 1 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
1198 1 : // perhaps be captured as long as it's in the same thread.
1199 1 : assert_eq!(page, crate::ZERO_PAGE);
1200 : }
1201 :
1202 : #[allow(clippy::octal_escapes)]
1203 2 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
1204 2 : vec![
1205 2 : (
1206 2 : Lsn::from_str("0/16A9388").unwrap(),
1207 2 : NeonWalRecord::Postgres {
1208 2 : will_init: true,
1209 2 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
1210 2 : }
1211 2 : ),
1212 2 : (
1213 2 : Lsn::from_str("0/16D4080").unwrap(),
1214 2 : NeonWalRecord::Postgres {
1215 2 : will_init: false,
1216 2 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
1217 2 : }
1218 2 : )
1219 2 : ]
1220 2 : }
1221 :
1222 : struct RedoHarness {
1223 : // underscored because unused, except for removal at drop
1224 : _repo_dir: camino_tempfile::Utf8TempDir,
1225 : manager: PostgresRedoManager,
1226 : }
1227 :
1228 : impl RedoHarness {
1229 2 : fn new() -> anyhow::Result<Self> {
1230 2 : let repo_dir = camino_tempfile::tempdir()?;
1231 2 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
1232 2 : let conf = Box::leak(Box::new(conf));
1233 2 : let tenant_id = TenantId::generate();
1234 2 :
1235 2 : let manager = PostgresRedoManager::new(conf, tenant_id);
1236 2 :
1237 2 : Ok(RedoHarness {
1238 2 : _repo_dir: repo_dir,
1239 2 : manager,
1240 2 : })
1241 2 : }
1242 : }
1243 : }
|