TLA Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 : //!
21 : use anyhow::Context;
22 : use byteorder::{ByteOrder, LittleEndian};
23 : use bytes::{BufMut, Bytes, BytesMut};
24 : use nix::poll::*;
25 : use pageserver_api::shard::TenantShardId;
26 : use serde::Serialize;
27 : use std::collections::VecDeque;
28 : use std::io;
29 : use std::io::prelude::*;
30 : use std::ops::{Deref, DerefMut};
31 : use std::os::unix::io::AsRawFd;
32 : use std::os::unix::prelude::CommandExt;
33 : use std::process::Stdio;
34 : use std::process::{Child, ChildStdin, ChildStdout, Command};
35 : use std::sync::{Arc, Mutex, MutexGuard, RwLock};
36 : use std::time::Duration;
37 : use std::time::Instant;
38 : use tracing::*;
39 : use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
40 :
41 : #[cfg(feature = "testing")]
42 : use std::sync::atomic::{AtomicUsize, Ordering};
43 :
44 : use crate::config::PageServerConf;
45 : use crate::metrics::{
46 : WalRedoKillCause, WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_COUNTERS,
47 : WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM,
48 : WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
49 : };
50 : use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
51 : use crate::repository::Key;
52 : use crate::walrecord::NeonWalRecord;
53 : use pageserver_api::reltag::{RelTag, SlruKind};
54 : use postgres_ffi::pg_constants;
55 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
56 : use postgres_ffi::v14::nonrelfile_utils::{
57 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
58 : transaction_id_set_status,
59 : };
60 : use postgres_ffi::BLCKSZ;
61 :
62 : ///
63 : /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
64 : ///
65 : /// In Postgres `BufferTag` structure is used for exactly the same purpose.
66 : /// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
67 : ///
68 CBC 4544070 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)]
69 : pub(crate) struct BufferTag {
70 : pub rel: RelTag,
71 : pub blknum: u32,
72 : }
73 :
74 : struct ProcessInput {
75 : stdin: ChildStdin,
76 : n_requests: usize,
77 : }
78 :
79 : struct ProcessOutput {
80 : stdout: ChildStdout,
81 : pending_responses: VecDeque<Option<Bytes>>,
82 : n_processed_responses: usize,
83 : }
84 :
85 : ///
86 : /// This is the real implementation that uses a Postgres process to
87 : /// perform WAL replay. Only one thread can use the process at a time,
88 : /// that is controlled by the Mutex. In the future, we might want to
89 : /// launch a pool of processes to allow concurrent replay of multiple
90 : /// records.
91 : ///
92 : pub struct PostgresRedoManager {
93 : tenant_shard_id: TenantShardId,
94 : conf: &'static PageServerConf,
95 : last_redo_at: std::sync::Mutex<Option<Instant>>,
96 : redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
97 : }
98 :
99 : /// Can this request be served by neon redo functions
100 : /// or we need to pass it to wal-redo postgres process?
101 72749603 : fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
102 72749603 : // Currently, we don't have bespoken Rust code to replay any
103 72749603 : // Postgres WAL records. But everything else is handled in neon.
104 72749603 : #[allow(clippy::match_like_matches_macro)]
105 72749603 : match rec {
106 : NeonWalRecord::Postgres {
107 : will_init: _,
108 : rec: _,
109 56589789 : } => false,
110 16159814 : _ => true,
111 : }
112 72749603 : }
113 :
114 : ///
115 : /// Public interface of WAL redo manager
116 : ///
117 : impl PostgresRedoManager {
118 : ///
119 : /// Request the WAL redo manager to apply some WAL records
120 : ///
121 : /// The WAL redo is handled by a separate thread, so this just sends a request
122 : /// to the thread and waits for response.
123 : ///
124 : /// # Cancel-Safety
125 : ///
126 : /// This method is cancellation-safe.
127 2060221 : pub async fn request_redo(
128 2060221 : &self,
129 2060221 : key: Key,
130 2060221 : lsn: Lsn,
131 2060221 : base_img: Option<(Lsn, Bytes)>,
132 2060221 : records: Vec<(Lsn, NeonWalRecord)>,
133 2060221 : pg_version: u32,
134 2060221 : ) -> anyhow::Result<Bytes> {
135 2060221 : if records.is_empty() {
136 UBC 0 : anyhow::bail!("invalid WAL redo request with no records");
137 CBC 2060221 : }
138 2060221 :
139 2060221 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
140 2060221 : let mut img = base_img.map(|p| p.1);
141 2060221 : let mut batch_neon = can_apply_in_neon(&records[0].1);
142 2060221 : let mut batch_start = 0;
143 70689382 : for (i, record) in records.iter().enumerate().skip(1) {
144 70689382 : let rec_neon = can_apply_in_neon(&record.1);
145 70689382 :
146 70689382 : if rec_neon != batch_neon {
147 13 : let result = if batch_neon {
148 10 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
149 : } else {
150 3 : self.apply_batch_postgres(
151 3 : key,
152 3 : lsn,
153 3 : img,
154 3 : base_img_lsn,
155 3 : &records[batch_start..i],
156 3 : self.conf.wal_redo_timeout,
157 3 : pg_version,
158 3 : )
159 : };
160 13 : img = Some(result?);
161 :
162 13 : batch_neon = rec_neon;
163 13 : batch_start = i;
164 70689369 : }
165 : }
166 : // last batch
167 2060221 : if batch_neon {
168 3974 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
169 : } else {
170 2056247 : self.apply_batch_postgres(
171 2056247 : key,
172 2056247 : lsn,
173 2056247 : img,
174 2056247 : base_img_lsn,
175 2056247 : &records[batch_start..],
176 2056247 : self.conf.wal_redo_timeout,
177 2056247 : pg_version,
178 2056247 : )
179 : }
180 2060221 : }
181 : }
182 :
183 : impl PostgresRedoManager {
184 : ///
185 : /// Create a new PostgresRedoManager.
186 : ///
187 739 : pub fn new(
188 739 : conf: &'static PageServerConf,
189 739 : tenant_shard_id: TenantShardId,
190 739 : ) -> PostgresRedoManager {
191 739 : // The actual process is launched lazily, on first request.
192 739 : PostgresRedoManager {
193 739 : tenant_shard_id,
194 739 : conf,
195 739 : last_redo_at: std::sync::Mutex::default(),
196 739 : redo_process: RwLock::new(None),
197 739 : }
198 739 : }
199 :
200 : /// This type doesn't have its own background task to check for idleness: we
201 : /// rely on our owner calling this function periodically in its own housekeeping
202 : /// loops.
203 727 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
204 727 : if let Ok(g) = self.last_redo_at.try_lock() {
205 727 : if let Some(last_redo_at) = *g {
206 344 : if last_redo_at.elapsed() >= idle_timeout {
207 192 : drop(g);
208 192 : let mut guard = self.redo_process.write().unwrap();
209 192 : *guard = None;
210 192 : }
211 383 : }
212 UBC 0 : }
213 CBC 727 : }
214 :
215 : ///
216 : /// Process one request for WAL redo using wal-redo postgres
217 : ///
218 : #[allow(clippy::too_many_arguments)]
219 2056250 : fn apply_batch_postgres(
220 2056250 : &self,
221 2056250 : key: Key,
222 2056250 : lsn: Lsn,
223 2056250 : base_img: Option<Bytes>,
224 2056250 : base_img_lsn: Lsn,
225 2056250 : records: &[(Lsn, NeonWalRecord)],
226 2056250 : wal_redo_timeout: Duration,
227 2056250 : pg_version: u32,
228 2056250 : ) -> anyhow::Result<Bytes> {
229 2056250 : *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
230 :
231 2056250 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
232 : const MAX_RETRY_ATTEMPTS: u32 = 1;
233 2056250 : let mut n_attempts = 0u32;
234 : loop {
235 : // launch the WAL redo process on first use
236 2056251 : let proc: Arc<WalRedoProcess> = {
237 2056251 : let proc_guard = self.redo_process.read().unwrap();
238 2056251 : match &*proc_guard {
239 : None => {
240 : // "upgrade" to write lock to launch the process
241 547 : drop(proc_guard);
242 547 : let mut proc_guard = self.redo_process.write().unwrap();
243 547 : match &*proc_guard {
244 : None => {
245 544 : let timer =
246 544 : WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.start_timer();
247 544 : let proc = Arc::new(
248 544 : WalRedoProcess::launch(
249 544 : self.conf,
250 544 : self.tenant_shard_id,
251 544 : pg_version,
252 544 : )
253 544 : .context("launch walredo process")?,
254 : );
255 544 : timer.observe_duration();
256 544 : *proc_guard = Some(Arc::clone(&proc));
257 544 : proc
258 : }
259 3 : Some(proc) => Arc::clone(proc),
260 : }
261 : }
262 2055704 : Some(proc) => Arc::clone(proc),
263 : }
264 : };
265 :
266 2056251 : let started_at = std::time::Instant::now();
267 2056251 :
268 2056251 : // Relational WAL records are applied using wal-redo-postgres
269 2056251 : let buf_tag = BufferTag { rel, blknum };
270 2056251 : let result = proc
271 2056251 : .apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
272 2056251 : .context("apply_wal_records");
273 2056251 :
274 2056251 : let duration = started_at.elapsed();
275 2056251 :
276 2056251 : let len = records.len();
277 56589791 : let nbytes = records.iter().fold(0, |acumulator, record| {
278 56589791 : acumulator
279 56589791 : + match &record.1 {
280 56589791 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
281 UBC 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
282 : }
283 CBC 56589791 : });
284 2056251 :
285 2056251 : WAL_REDO_TIME.observe(duration.as_secs_f64());
286 2056251 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
287 2056251 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
288 2056251 :
289 2056251 : debug!(
290 UBC 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
291 0 : len,
292 0 : nbytes,
293 0 : duration.as_micros(),
294 0 : lsn
295 0 : );
296 :
297 : // If something went wrong, don't try to reuse the process. Kill it, and
298 : // next request will launch a new one.
299 CBC 2056251 : if let Err(e) = result.as_ref() {
300 2 : error!(
301 2 : "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
302 2 : records.len(),
303 2 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
304 2 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
305 2 : nbytes,
306 2 : base_img_lsn,
307 2 : lsn,
308 2 : n_attempts,
309 2 : e,
310 2 : );
311 : // Avoid concurrent callers hitting the same issue.
312 : // We can't prevent it from happening because we want to enable parallelism.
313 : {
314 2 : let mut guard = self.redo_process.write().unwrap();
315 2 : match &*guard {
316 2 : Some(current_field_value) => {
317 2 : if Arc::ptr_eq(current_field_value, &proc) {
318 2 : // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
319 2 : *guard = None;
320 2 : }
321 : }
322 UBC 0 : None => {
323 0 : // Another thread was faster to observe the error, and already took the process out of rotation.
324 0 : }
325 : }
326 : }
327 : // NB: there may still be other concurrent threads using `proc`.
328 : // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
329 : // NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
330 : // holding the lock while waiting for the process to exit.
331 : // NB: the drop impl blocks the current threads with a wait() system call for
332 : // the child process. We dropped the `guard` above so that other threads aren't
333 : // affected. But, it's good that the current thread _does_ block to wait.
334 : // If we instead deferred the waiting into the background / to tokio, it could
335 : // happen that if walredo always fails immediately, we spawn processes faster
336 : // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
337 : // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
338 : // This probably needs revisiting at some later point.
339 CBC 2 : drop(proc);
340 2056249 : } else if n_attempts != 0 {
341 UBC 0 : info!(n_attempts, "retried walredo succeeded");
342 CBC 2056249 : }
343 2056251 : n_attempts += 1;
344 2056251 : if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
345 2056250 : return result;
346 1 : }
347 : }
348 2056250 : }
349 :
350 : ///
351 : /// Process a batch of WAL records using bespoken Neon code.
352 : ///
353 3984 : fn apply_batch_neon(
354 3984 : &self,
355 3984 : key: Key,
356 3984 : lsn: Lsn,
357 3984 : base_img: Option<Bytes>,
358 3984 : records: &[(Lsn, NeonWalRecord)],
359 3984 : ) -> anyhow::Result<Bytes> {
360 3984 : let start_time = Instant::now();
361 3984 :
362 3984 : let mut page = BytesMut::new();
363 3984 : if let Some(fpi) = base_img {
364 3984 : // If full-page image is provided, then use it...
365 3984 : page.extend_from_slice(&fpi[..]);
366 3984 : } else {
367 : // All the current WAL record types that we can handle require a base image.
368 UBC 0 : anyhow::bail!("invalid neon WAL redo request with no base image");
369 : }
370 :
371 : // Apply all the WAL records in the batch
372 CBC 16159814 : for (record_lsn, record) in records.iter() {
373 16159814 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
374 : }
375 : // Success!
376 3984 : let duration = start_time.elapsed();
377 3984 : // FIXME: using the same metric here creates a bimodal distribution by default, and because
378 3984 : // there could be multiple batch sizes this would be N+1 modal.
379 3984 : WAL_REDO_TIME.observe(duration.as_secs_f64());
380 3984 :
381 3984 : debug!(
382 UBC 0 : "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
383 0 : records.len(),
384 0 : duration.as_micros(),
385 0 : lsn
386 0 : );
387 :
388 CBC 3984 : Ok(page.freeze())
389 3984 : }
390 :
391 16159814 : fn apply_record_neon(
392 16159814 : &self,
393 16159814 : key: Key,
394 16159814 : page: &mut BytesMut,
395 16159814 : _record_lsn: Lsn,
396 16159814 : record: &NeonWalRecord,
397 16159814 : ) -> anyhow::Result<()> {
398 16159814 : match record {
399 : NeonWalRecord::Postgres {
400 : will_init: _,
401 : rec: _,
402 : } => {
403 UBC 0 : anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
404 : }
405 : NeonWalRecord::ClearVisibilityMapFlags {
406 CBC 870 : new_heap_blkno,
407 870 : old_heap_blkno,
408 870 : flags,
409 : } => {
410 : // sanity check that this is modifying the correct relation
411 870 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
412 870 : assert!(
413 870 : rel.forknum == VISIBILITYMAP_FORKNUM,
414 UBC 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
415 : rel
416 : );
417 CBC 870 : if let Some(heap_blkno) = *new_heap_blkno {
418 : // Calculate the VM block and offset that corresponds to the heap block.
419 398 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
420 398 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
421 398 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
422 :
423 : // Check that we're modifying the correct VM block.
424 398 : assert!(map_block == blknum);
425 :
426 : // equivalent to PageGetContents(page)
427 398 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
428 398 :
429 398 : map[map_byte as usize] &= !(flags << map_offset);
430 472 : }
431 :
432 : // Repeat for 'old_heap_blkno', if any
433 870 : if let Some(heap_blkno) = *old_heap_blkno {
434 474 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
435 474 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
436 474 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
437 :
438 474 : assert!(map_block == blknum);
439 :
440 474 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
441 474 :
442 474 : map[map_byte as usize] &= !(flags << map_offset);
443 396 : }
444 : }
445 : // Non-relational WAL records are handled here, with custom code that has the
446 : // same effects as the corresponding Postgres WAL redo function.
447 16061763 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
448 16061763 : let (slru_kind, segno, blknum) =
449 16061763 : key_to_slru_block(key).context("invalid record")?;
450 16061763 : assert_eq!(
451 : slru_kind,
452 : SlruKind::Clog,
453 UBC 0 : "ClogSetCommitted record with unexpected key {}",
454 : key
455 : );
456 CBC 32223539 : for &xid in xids {
457 16161776 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
458 16161776 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
459 16161776 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
460 :
461 : // Check that we're modifying the correct CLOG block.
462 16161776 : assert!(
463 16161776 : segno == expected_segno,
464 UBC 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
465 : xid,
466 : key
467 : );
468 CBC 16161776 : assert!(
469 16161776 : blknum == expected_blknum,
470 UBC 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
471 : xid,
472 : key
473 : );
474 :
475 CBC 16161776 : transaction_id_set_status(
476 16161776 : xid,
477 16161776 : pg_constants::TRANSACTION_STATUS_COMMITTED,
478 16161776 : page,
479 16161776 : );
480 : }
481 :
482 : // Append the timestamp
483 16061763 : if page.len() == BLCKSZ as usize + 8 {
484 16058614 : page.truncate(BLCKSZ as usize);
485 16058614 : }
486 16061763 : if page.len() == BLCKSZ as usize {
487 16061763 : page.extend_from_slice(×tamp.to_be_bytes());
488 16061763 : } else {
489 UBC 0 : warn!(
490 0 : "CLOG blk {} in seg {} has invalid size {}",
491 0 : blknum,
492 0 : segno,
493 0 : page.len()
494 0 : );
495 : }
496 : }
497 CBC 1309 : NeonWalRecord::ClogSetAborted { xids } => {
498 1309 : let (slru_kind, segno, blknum) =
499 1309 : key_to_slru_block(key).context("invalid record")?;
500 1309 : assert_eq!(
501 : slru_kind,
502 : SlruKind::Clog,
503 UBC 0 : "ClogSetAborted record with unexpected key {}",
504 : key
505 : );
506 CBC 2625 : for &xid in xids {
507 1316 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
508 1316 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
509 1316 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
510 :
511 : // Check that we're modifying the correct CLOG block.
512 1316 : assert!(
513 1316 : segno == expected_segno,
514 UBC 0 : "ClogSetAborted record for XID {} with unexpected key {}",
515 : xid,
516 : key
517 : );
518 CBC 1316 : assert!(
519 1316 : blknum == expected_blknum,
520 UBC 0 : "ClogSetAborted record for XID {} with unexpected key {}",
521 : xid,
522 : key
523 : );
524 :
525 CBC 1316 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
526 : }
527 : }
528 47662 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
529 47662 : let (slru_kind, segno, blknum) =
530 47662 : key_to_slru_block(key).context("invalid record")?;
531 47662 : assert_eq!(
532 : slru_kind,
533 : SlruKind::MultiXactOffsets,
534 UBC 0 : "MultixactOffsetCreate record with unexpected key {}",
535 : key
536 : );
537 : // Compute the block and offset to modify.
538 : // See RecordNewMultiXact in PostgreSQL sources.
539 CBC 47662 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
540 47662 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
541 47662 : let offset = (entryno * 4) as usize;
542 47662 :
543 47662 : // Check that we're modifying the correct multixact-offsets block.
544 47662 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
545 47662 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
546 47662 : assert!(
547 47662 : segno == expected_segno,
548 UBC 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
549 : mid,
550 : key
551 : );
552 CBC 47662 : assert!(
553 47662 : blknum == expected_blknum,
554 UBC 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
555 : mid,
556 : key
557 : );
558 :
559 CBC 47662 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
560 : }
561 48210 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
562 48210 : let (slru_kind, segno, blknum) =
563 48210 : key_to_slru_block(key).context("invalid record")?;
564 48210 : assert_eq!(
565 : slru_kind,
566 : SlruKind::MultiXactMembers,
567 UBC 0 : "MultixactMembersCreate record with unexpected key {}",
568 : key
569 : );
570 CBC 945038 : for (i, member) in members.iter().enumerate() {
571 945038 : let offset = moff + i as u32;
572 945038 :
573 945038 : // Compute the block and offset to modify.
574 945038 : // See RecordNewMultiXact in PostgreSQL sources.
575 945038 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
576 945038 : let memberoff = mx_offset_to_member_offset(offset);
577 945038 : let flagsoff = mx_offset_to_flags_offset(offset);
578 945038 : let bshift = mx_offset_to_flags_bitshift(offset);
579 945038 :
580 945038 : // Check that we're modifying the correct multixact-members block.
581 945038 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
582 945038 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
583 945038 : assert!(
584 945038 : segno == expected_segno,
585 UBC 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
586 : moff,
587 : key
588 : );
589 CBC 945038 : assert!(
590 945038 : blknum == expected_blknum,
591 UBC 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
592 : moff,
593 : key
594 : );
595 :
596 CBC 945038 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
597 945038 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
598 945038 : flagsval |= member.status << bshift;
599 945038 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
600 945038 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
601 : }
602 : }
603 : }
604 :
605 16159814 : Ok(())
606 16159814 : }
607 : }
608 :
609 : ///
610 : /// Command with ability not to give all file descriptors to child process
611 : ///
612 : trait CloseFileDescriptors: CommandExt {
613 : ///
614 : /// Close file descriptors (other than stdin, stdout, stderr) in child process
615 : ///
616 : fn close_fds(&mut self) -> &mut Command;
617 : }
618 :
619 : impl<C: CommandExt> CloseFileDescriptors for C {
620 544 : fn close_fds(&mut self) -> &mut Command {
621 544 : // SAFETY: Code executed inside pre_exec should have async-signal-safety,
622 544 : // which means it should be safe to execute inside a signal handler.
623 544 : // The precise meaning depends on platform. See `man signal-safety`
624 544 : // for the linux definition.
625 544 : //
626 544 : // The set_fds_cloexec_threadsafe function is documented to be
627 544 : // async-signal-safe.
628 544 : //
629 544 : // Aside from this function, the rest of the code is re-entrant and
630 544 : // doesn't make any syscalls. We're just passing constants.
631 544 : //
632 544 : // NOTE: It's easy to indirectly cause a malloc or lock a mutex,
633 544 : // which is not async-signal-safe. Be careful.
634 544 : unsafe {
635 544 : self.pre_exec(move || {
636 UBC 0 : close_fds::set_fds_cloexec_threadsafe(3, &[]);
637 0 : Ok(())
638 CBC 544 : })
639 544 : }
640 544 : }
641 : }
642 :
643 : struct WalRedoProcess {
644 : #[allow(dead_code)]
645 : conf: &'static PageServerConf,
646 : tenant_shard_id: TenantShardId,
647 : // Some() on construction, only becomes None on Drop.
648 : child: Option<NoLeakChild>,
649 : stdout: Mutex<ProcessOutput>,
650 : stdin: Mutex<ProcessInput>,
651 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
652 : #[cfg(feature = "testing")]
653 : dump_sequence: AtomicUsize,
654 : }
655 :
656 : impl WalRedoProcess {
657 : //
658 : // Start postgres binary in special WAL redo mode.
659 : //
660 544 : #[instrument(skip_all,fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), pg_version=pg_version))]
661 : fn launch(
662 : conf: &'static PageServerConf,
663 : tenant_shard_id: TenantShardId,
664 : pg_version: u32,
665 : ) -> anyhow::Result<Self> {
666 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
667 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
668 :
669 : // Start postgres itself
670 : let child = Command::new(pg_bin_dir_path.join("postgres"))
671 : .arg("--wal-redo")
672 : .stdin(Stdio::piped())
673 : .stderr(Stdio::piped())
674 : .stdout(Stdio::piped())
675 : .env_clear()
676 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
677 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
678 : // The redo process is not trusted, and runs in seccomp mode that
679 : // doesn't allow it to open any files. We have to also make sure it
680 : // doesn't inherit any file descriptors from the pageserver, that
681 : // would allow an attacker to read any files that happen to be open
682 : // in the pageserver.
683 : //
684 : // The Rust standard library makes sure to mark any file descriptors with
685 : // as close-on-exec by default, but that's not enough, since we use
686 : // libraries that directly call libc open without setting that flag.
687 : .close_fds()
688 : .spawn_no_leak_child(tenant_shard_id)
689 : .context("spawn process")?;
690 : WAL_REDO_PROCESS_COUNTERS.started.inc();
691 UBC 0 : let mut child = scopeguard::guard(child, |child| {
692 0 : error!("killing wal-redo-postgres process due to a problem during launch");
693 0 : child.kill_and_wait(WalRedoKillCause::Startup);
694 0 : });
695 :
696 : let stdin = child.stdin.take().unwrap();
697 : let stdout = child.stdout.take().unwrap();
698 : let stderr = child.stderr.take().unwrap();
699 : let stderr = tokio::process::ChildStderr::from_std(stderr)
700 : .context("convert to tokio::ChildStderr")?;
701 : macro_rules! set_nonblock_or_log_err {
702 : ($file:ident) => {{
703 : let res = set_nonblock($file.as_raw_fd());
704 : if let Err(e) = &res {
705 : error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
706 : }
707 : res
708 : }};
709 : }
710 0 : set_nonblock_or_log_err!(stdin)?;
711 0 : set_nonblock_or_log_err!(stdout)?;
712 :
713 : // all fallible operations post-spawn are complete, so get rid of the guard
714 : let child = scopeguard::ScopeGuard::into_inner(child);
715 :
716 : tokio::spawn(
717 CBC 540 : async move {
718 265 : scopeguard::defer! {
719 265 : debug!("wal-redo-postgres stderr_logger_task finished");
720 265 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
721 : }
722 540 : debug!("wal-redo-postgres stderr_logger_task started");
723 540 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
724 540 :
725 540 : use tokio::io::AsyncBufReadExt;
726 540 : let mut stderr_lines = tokio::io::BufReader::new(stderr);
727 540 : let mut buf = Vec::new();
728 540 : let res = loop {
729 540 : buf.clear();
730 540 : // TODO we don't trust the process to cap its stderr length.
731 540 : // Currently it can do unbounded Vec allocation.
732 540 : match stderr_lines.read_until(b'\n', &mut buf).await {
733 265 : Ok(0) => break Ok(()), // eof
734 UBC 0 : Ok(num_bytes) => {
735 0 : let output = String::from_utf8_lossy(&buf[..num_bytes]);
736 0 : error!(%output, "received output");
737 : }
738 0 : Err(e) => {
739 0 : break Err(e);
740 : }
741 : }
742 : };
743 CBC 265 : match res {
744 265 : Ok(()) => (),
745 UBC 0 : Err(e) => {
746 0 : error!(error=?e, "failed to read from walredo stderr");
747 : }
748 : }
749 CBC 265 : }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
750 : );
751 :
752 : Ok(Self {
753 : conf,
754 : tenant_shard_id,
755 : child: Some(child),
756 : stdin: Mutex::new(ProcessInput {
757 : stdin,
758 : n_requests: 0,
759 : }),
760 : stdout: Mutex::new(ProcessOutput {
761 : stdout,
762 : pending_responses: VecDeque::new(),
763 : n_processed_responses: 0,
764 : }),
765 : #[cfg(feature = "testing")]
766 : dump_sequence: AtomicUsize::default(),
767 : })
768 : }
769 :
770 2056251 : fn id(&self) -> u32 {
771 2056251 : self.child
772 2056251 : .as_ref()
773 2056251 : .expect("must not call this during Drop")
774 2056251 : .id()
775 2056251 : }
776 :
777 : // Apply given WAL records ('records') over an old page image. Returns
778 : // new page image.
779 : //
780 2056251 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
781 : fn apply_wal_records(
782 : &self,
783 : tag: BufferTag,
784 : base_img: &Option<Bytes>,
785 : records: &[(Lsn, NeonWalRecord)],
786 : wal_redo_timeout: Duration,
787 : ) -> anyhow::Result<Bytes> {
788 : let input = self.stdin.lock().unwrap();
789 :
790 : // Serialize all the messages to send the WAL redo process first.
791 : //
792 : // This could be problematic if there are millions of records to replay,
793 : // but in practice the number of records is usually so small that it doesn't
794 : // matter, and it's better to keep this code simple.
795 : //
796 : // Most requests start with a before-image with BLCKSZ bytes, followed by
797 : // by some other WAL records. Start with a buffer that can hold that
798 : // comfortably.
799 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
800 : build_begin_redo_for_block_msg(tag, &mut writebuf);
801 : if let Some(img) = base_img {
802 : build_push_page_msg(tag, img, &mut writebuf);
803 : }
804 : for (lsn, rec) in records.iter() {
805 : if let NeonWalRecord::Postgres {
806 : will_init: _,
807 : rec: postgres_rec,
808 : } = rec
809 : {
810 : build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
811 : } else {
812 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
813 : }
814 : }
815 : build_get_page_msg(tag, &mut writebuf);
816 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
817 :
818 : let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
819 :
820 : if res.is_err() {
821 : // not all of these can be caused by this particular input, however these are so rare
822 : // in tests so capture all.
823 : self.record_and_log(&writebuf);
824 : }
825 :
826 : res
827 : }
828 :
829 2056251 : fn apply_wal_records0(
830 2056251 : &self,
831 2056251 : writebuf: &[u8],
832 2056251 : input: MutexGuard<ProcessInput>,
833 2056251 : wal_redo_timeout: Duration,
834 2056251 : ) -> anyhow::Result<Bytes> {
835 2056251 : let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
836 2056251 : let mut nwrite = 0usize;
837 2056251 :
838 2056251 : let mut stdin_pollfds = [PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT)];
839 :
840 4118051 : while nwrite < writebuf.len() {
841 2061800 : let n = loop {
842 2061800 : match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
843 UBC 0 : Err(nix::errno::Errno::EINTR) => continue,
844 CBC 2061800 : res => break res,
845 : }
846 UBC 0 : }?;
847 :
848 CBC 2061800 : if n == 0 {
849 UBC 0 : anyhow::bail!("WAL redo timed out");
850 CBC 2061800 : }
851 2061800 :
852 2061800 : // If 'stdin' is writeable, do write.
853 2061800 : let in_revents = stdin_pollfds[0].revents().unwrap();
854 2061800 : if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
855 2061800 : nwrite += proc.stdin.write(&writebuf[nwrite..])?;
856 UBC 0 : }
857 CBC 2061800 : if in_revents.contains(PollFlags::POLLHUP) {
858 : // We still have more data to write, but the process closed the pipe.
859 UBC 0 : anyhow::bail!("WAL redo process closed its stdin unexpectedly");
860 CBC 2061800 : }
861 : }
862 2056251 : let request_no = proc.n_requests;
863 2056251 : proc.n_requests += 1;
864 2056251 : drop(proc);
865 2056251 :
866 2056251 : // To improve walredo performance we separate sending requests and receiving
867 2056251 : // responses. Them are protected by different mutexes (output and input).
868 2056251 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
869 2056251 : // then there is not warranty that T1 will first granted output mutex lock.
870 2056251 : // To address this issue we maintain number of sent requests, number of processed
871 2056251 : // responses and ring buffer with pending responses. After sending response
872 2056251 : // (under input mutex), threads remembers request number. Then it releases
873 2056251 : // input mutex, locks output mutex and fetch in ring buffer all responses until
874 2056251 : // its stored request number. The it takes correspondent element from
875 2056251 : // pending responses ring buffer and truncate all empty elements from the front,
876 2056251 : // advancing processed responses number.
877 2056251 :
878 2056251 : let mut output = self.stdout.lock().unwrap();
879 2056251 : let mut stdout_pollfds = [PollFd::new(output.stdout.as_raw_fd(), PollFlags::POLLIN)];
880 2056251 : let n_processed_responses = output.n_processed_responses;
881 4112500 : while n_processed_responses + output.pending_responses.len() <= request_no {
882 : // We expect the WAL redo process to respond with an 8k page image. We read it
883 : // into this buffer.
884 2056251 : let mut resultbuf = vec![0; BLCKSZ.into()];
885 2056251 : let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
886 4112500 : while nresult < BLCKSZ.into() {
887 : // We do two things simultaneously: reading response from stdout
888 : // and forward any logging information that the child writes to its stderr to the page server's log.
889 2056251 : let n = loop {
890 : match nix::poll::poll(
891 2056251 : &mut stdout_pollfds[..],
892 2056251 : wal_redo_timeout.as_millis() as i32,
893 : ) {
894 UBC 0 : Err(nix::errno::Errno::EINTR) => continue,
895 CBC 2056251 : res => break res,
896 : }
897 UBC 0 : }?;
898 :
899 CBC 2056251 : if n == 0 {
900 UBC 0 : anyhow::bail!("WAL redo timed out");
901 CBC 2056251 : }
902 2056251 :
903 2056251 : // If we have some data in stdout, read it to the result buffer.
904 2056251 : let out_revents = stdout_pollfds[0].revents().unwrap();
905 2056251 : if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
906 2056249 : nresult += output.stdout.read(&mut resultbuf[nresult..])?;
907 2 : }
908 2056251 : if out_revents.contains(PollFlags::POLLHUP) {
909 2 : anyhow::bail!("WAL redo process closed its stdout unexpectedly");
910 2056249 : }
911 : }
912 2056249 : output
913 2056249 : .pending_responses
914 2056249 : .push_back(Some(Bytes::from(resultbuf)));
915 : }
916 : // Replace our request's response with None in `pending_responses`.
917 : // Then make space in the ring buffer by clearing out any seqence of contiguous
918 : // `None`'s from the front of `pending_responses`.
919 : // NB: We can't pop_front() because other requests' responses because another
920 : // requester might have grabbed the output mutex before us:
921 : // T1: grab input mutex
922 : // T1: send request_no 23
923 : // T1: release input mutex
924 : // T2: grab input mutex
925 : // T2: send request_no 24
926 : // T2: release input mutex
927 : // T2: grab output mutex
928 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
929 : // 23 0 24
930 : // T2: enters poll loop that reads stdout
931 : // T2: put response for 23 into pending_responses
932 : // T2: put response for 24 into pending_resposnes
933 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
934 : // T2: takes its response_24
935 : // pending_responses now looks like this: Front Some(response_23) None Back
936 : // T2: does the while loop below
937 : // pending_responses now looks like this: Front Some(response_23) None Back
938 : // T2: releases output mutex
939 : // T1: grabs output mutex
940 : // T1: n_processed_responses + output.pending_responses.len() > request_no
941 : // 23 2 23
942 : // T1: skips poll loop that reads stdout
943 : // T1: takes its response_23
944 : // pending_responses now looks like this: Front None None Back
945 : // T2: does the while loop below
946 : // pending_responses now looks like this: Front Back
947 : // n_processed_responses now has value 25
948 2056249 : let res = output.pending_responses[request_no - n_processed_responses]
949 2056249 : .take()
950 2056249 : .expect("we own this request_no, nobody else is supposed to take it");
951 4112498 : while let Some(front) = output.pending_responses.front() {
952 2088372 : if front.is_none() {
953 2056249 : output.pending_responses.pop_front();
954 2056249 : output.n_processed_responses += 1;
955 2056249 : } else {
956 32123 : break;
957 : }
958 : }
959 2056249 : Ok(res)
960 2056251 : }
961 :
962 : #[cfg(feature = "testing")]
963 2 : fn record_and_log(&self, writebuf: &[u8]) {
964 2 : let millis = std::time::SystemTime::now()
965 2 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
966 2 : .unwrap()
967 2 : .as_millis();
968 2 :
969 2 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
970 2 :
971 2 : // these files will be collected to an allure report
972 2 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
973 2 :
974 2 : let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
975 2 :
976 2 : let res = std::fs::OpenOptions::new()
977 2 : .write(true)
978 2 : .create_new(true)
979 2 : .read(true)
980 2 : .open(path)
981 2 : .and_then(|mut f| f.write_all(writebuf));
982 :
983 : // trip up allowed_errors
984 2 : if let Err(e) = res {
985 2 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
986 : } else {
987 UBC 0 : tracing::error!(filename, "erroring walredo input saved");
988 : }
989 CBC 2 : }
990 :
991 : #[cfg(not(feature = "testing"))]
992 : fn record_and_log(&self, _: &[u8]) {}
993 : }
994 :
995 : impl Drop for WalRedoProcess {
996 269 : fn drop(&mut self) {
997 269 : self.child
998 269 : .take()
999 269 : .expect("we only do this once")
1000 269 : .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
1001 269 : // no way to wait for stderr_logger_task from Drop because that is async only
1002 269 : }
1003 : }
1004 :
1005 : /// Wrapper type around `std::process::Child` which guarantees that the child
1006 : /// will be killed and waited-for by this process before being dropped.
1007 : struct NoLeakChild {
1008 : tenant_id: TenantShardId,
1009 : child: Option<Child>,
1010 : }
1011 :
1012 : impl Deref for NoLeakChild {
1013 : type Target = Child;
1014 :
1015 2056795 : fn deref(&self) -> &Self::Target {
1016 2056795 : self.child.as_ref().expect("must not use from drop")
1017 2056795 : }
1018 : }
1019 :
1020 : impl DerefMut for NoLeakChild {
1021 1632 : fn deref_mut(&mut self) -> &mut Self::Target {
1022 1632 : self.child.as_mut().expect("must not use from drop")
1023 1632 : }
1024 : }
1025 :
1026 : impl NoLeakChild {
1027 544 : fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
1028 544 : let child = command.spawn()?;
1029 544 : Ok(NoLeakChild {
1030 544 : tenant_id,
1031 544 : child: Some(child),
1032 544 : })
1033 544 : }
1034 :
1035 269 : fn kill_and_wait(mut self, cause: WalRedoKillCause) {
1036 269 : let child = match self.child.take() {
1037 269 : Some(child) => child,
1038 UBC 0 : None => return,
1039 : };
1040 CBC 269 : Self::kill_and_wait_impl(child, cause);
1041 269 : }
1042 :
1043 269 : #[instrument(skip_all, fields(pid=child.id(), ?cause))]
1044 : fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
1045 269 : scopeguard::defer! {
1046 269 : WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
1047 269 : }
1048 : let res = child.kill();
1049 : if let Err(e) = res {
1050 : // This branch is very unlikely because:
1051 : // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
1052 : // - This is the only place that calls .kill()
1053 : // - We consume `self`, so, .kill() can't be called twice.
1054 : // - If the process exited by itself or was killed by someone else,
1055 : // .kill() will still succeed because we haven't wait()'ed yet.
1056 : //
1057 : // So, if we arrive here, we have really no idea what happened,
1058 : // whether the PID stored in self.child is still valid, etc.
1059 : // If this function were fallible, we'd return an error, but
1060 : // since it isn't, all we can do is log an error and proceed
1061 : // with the wait().
1062 UBC 0 : error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
1063 : }
1064 :
1065 : match child.wait() {
1066 : Ok(exit_status) => {
1067 CBC 269 : info!(exit_status = %exit_status, "wait successful");
1068 : }
1069 : Err(e) => {
1070 UBC 0 : error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
1071 : }
1072 : }
1073 : }
1074 : }
1075 :
1076 : impl Drop for NoLeakChild {
1077 CBC 269 : fn drop(&mut self) {
1078 269 : let child = match self.child.take() {
1079 UBC 0 : Some(child) => child,
1080 CBC 269 : None => return,
1081 : };
1082 UBC 0 : let tenant_shard_id = self.tenant_id;
1083 0 : // Offload the kill+wait of the child process into the background.
1084 0 : // If someone stops the runtime, we'll leak the child process.
1085 0 : // We can ignore that case because we only stop the runtime on pageserver exit.
1086 0 : tokio::runtime::Handle::current().spawn(async move {
1087 0 : tokio::task::spawn_blocking(move || {
1088 : // Intentionally don't inherit the tracing context from whoever is dropping us.
1089 : // This thread here is going to outlive of our dropper.
1090 0 : let span = tracing::info_span!(
1091 : "walredo",
1092 : tenant_id = %tenant_shard_id.tenant_id,
1093 0 : shard_id = %tenant_shard_id.shard_slug()
1094 : );
1095 0 : let _entered = span.enter();
1096 0 : Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
1097 0 : })
1098 0 : .await
1099 0 : });
1100 CBC 269 : }
1101 : }
1102 :
1103 : trait NoLeakChildCommandExt {
1104 : fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
1105 : }
1106 :
1107 : impl NoLeakChildCommandExt for Command {
1108 544 : fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
1109 544 : NoLeakChild::spawn(tenant_id, self)
1110 544 : }
1111 : }
1112 :
1113 : // Functions for constructing messages to send to the postgres WAL redo
1114 : // process. See pgxn/neon_walredo/walredoproc.c for
1115 : // explanation of the protocol.
1116 :
1117 2056251 : fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
1118 2056251 : let len = 4 + 1 + 4 * 4;
1119 2056251 :
1120 2056251 : buf.put_u8(b'B');
1121 2056251 : buf.put_u32(len as u32);
1122 2056251 :
1123 2056251 : tag.ser_into(buf)
1124 2056251 : .expect("serialize BufferTag should always succeed");
1125 2056251 : }
1126 :
1127 431568 : fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
1128 431568 : assert!(base_img.len() == 8192);
1129 :
1130 431568 : let len = 4 + 1 + 4 * 4 + base_img.len();
1131 431568 :
1132 431568 : buf.put_u8(b'P');
1133 431568 : buf.put_u32(len as u32);
1134 431568 : tag.ser_into(buf)
1135 431568 : .expect("serialize BufferTag should always succeed");
1136 431568 : buf.put(base_img);
1137 431568 : }
1138 :
1139 56589791 : fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
1140 56589791 : let len = 4 + 8 + rec.len();
1141 56589791 :
1142 56589791 : buf.put_u8(b'A');
1143 56589791 : buf.put_u32(len as u32);
1144 56589791 : buf.put_u64(endlsn.0);
1145 56589791 : buf.put(rec);
1146 56589791 : }
1147 :
1148 2056251 : fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
1149 2056251 : let len = 4 + 1 + 4 * 4;
1150 2056251 :
1151 2056251 : buf.put_u8(b'G');
1152 2056251 : buf.put_u32(len as u32);
1153 2056251 : tag.ser_into(buf)
1154 2056251 : .expect("serialize BufferTag should always succeed");
1155 2056251 : }
1156 :
1157 : #[cfg(test)]
1158 : mod tests {
1159 : use super::PostgresRedoManager;
1160 : use crate::repository::Key;
1161 : use crate::{config::PageServerConf, walrecord::NeonWalRecord};
1162 : use bytes::Bytes;
1163 : use pageserver_api::shard::TenantShardId;
1164 : use std::str::FromStr;
1165 : use utils::{id::TenantId, lsn::Lsn};
1166 :
1167 1 : #[tokio::test]
1168 1 : async fn short_v14_redo() {
1169 1 : let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
1170 1 :
1171 1 : let h = RedoHarness::new().unwrap();
1172 :
1173 1 : let page = h
1174 1 : .manager
1175 1 : .request_redo(
1176 1 : Key {
1177 1 : field1: 0,
1178 1 : field2: 1663,
1179 1 : field3: 13010,
1180 1 : field4: 1259,
1181 1 : field5: 0,
1182 1 : field6: 0,
1183 1 : },
1184 1 : Lsn::from_str("0/16E2408").unwrap(),
1185 1 : None,
1186 1 : short_records(),
1187 1 : 14,
1188 1 : )
1189 UBC 0 : .await
1190 CBC 1 : .unwrap();
1191 1 :
1192 1 : assert_eq!(&expected, &*page);
1193 : }
1194 :
1195 1 : #[tokio::test]
1196 1 : async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
1197 1 : let h = RedoHarness::new().unwrap();
1198 :
1199 1 : let page = h
1200 1 : .manager
1201 1 : .request_redo(
1202 1 : Key {
1203 1 : field1: 0,
1204 1 : field2: 1663,
1205 1 : // key should be 13010
1206 1 : field3: 13130,
1207 1 : field4: 1259,
1208 1 : field5: 0,
1209 1 : field6: 0,
1210 1 : },
1211 1 : Lsn::from_str("0/16E2408").unwrap(),
1212 1 : None,
1213 1 : short_records(),
1214 1 : 14,
1215 1 : )
1216 UBC 0 : .await
1217 CBC 1 : .unwrap();
1218 1 :
1219 1 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
1220 1 : // perhaps be captured as long as it's in the same thread.
1221 1 : assert_eq!(page, crate::ZERO_PAGE);
1222 : }
1223 :
1224 1 : #[tokio::test]
1225 1 : async fn test_stderr() {
1226 1 : let h = RedoHarness::new().unwrap();
1227 1 : h
1228 1 : .manager
1229 1 : .request_redo(
1230 1 : Key::from_i128(0),
1231 1 : Lsn::INVALID,
1232 1 : None,
1233 1 : short_records(),
1234 1 : 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
1235 1 : )
1236 UBC 0 : .await
1237 CBC 1 : .unwrap_err();
1238 : }
1239 :
1240 : #[allow(clippy::octal_escapes)]
1241 3 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
1242 3 : vec![
1243 3 : (
1244 3 : Lsn::from_str("0/16A9388").unwrap(),
1245 3 : NeonWalRecord::Postgres {
1246 3 : will_init: true,
1247 3 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
1248 3 : }
1249 3 : ),
1250 3 : (
1251 3 : Lsn::from_str("0/16D4080").unwrap(),
1252 3 : NeonWalRecord::Postgres {
1253 3 : will_init: false,
1254 3 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
1255 3 : }
1256 3 : )
1257 3 : ]
1258 3 : }
1259 :
1260 : struct RedoHarness {
1261 : // underscored because unused, except for removal at drop
1262 : _repo_dir: camino_tempfile::Utf8TempDir,
1263 : manager: PostgresRedoManager,
1264 : }
1265 :
1266 : impl RedoHarness {
1267 3 : fn new() -> anyhow::Result<Self> {
1268 3 : crate::tenant::harness::setup_logging();
1269 :
1270 3 : let repo_dir = camino_tempfile::tempdir()?;
1271 3 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
1272 3 : let conf = Box::leak(Box::new(conf));
1273 3 : let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
1274 3 :
1275 3 : let manager = PostgresRedoManager::new(conf, tenant_shard_id);
1276 3 :
1277 3 : Ok(RedoHarness {
1278 3 : _repo_dir: repo_dir,
1279 3 : manager,
1280 3 : })
1281 3 : }
1282 : }
1283 : }
|