Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 :
21 : /// Process lifecycle and abstracction for the IPC protocol.
22 : mod process;
23 :
24 : /// Code to apply [`NeonWalRecord`]s.
25 : pub(crate) mod apply_neon;
26 :
27 : use std::future::Future;
28 : use std::sync::Arc;
29 : use std::time::{Duration, Instant};
30 :
31 : use anyhow::Context;
32 : use bytes::{Bytes, BytesMut};
33 : use pageserver_api::key::Key;
34 : use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
35 : use pageserver_api::shard::TenantShardId;
36 : use postgres_ffi::PgMajorVersion;
37 : use tracing::*;
38 : use utils::lsn::Lsn;
39 : use utils::sync::gate::GateError;
40 : use utils::sync::heavier_once_cell;
41 : use wal_decoder::models::record::NeonWalRecord;
42 :
43 : use crate::config::PageServerConf;
44 : use crate::metrics::{
45 : WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
46 : WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
47 : };
48 :
49 : /// The real implementation that uses a Postgres process to
50 : /// perform WAL replay.
51 : ///
52 : /// Only one thread can use the process at a time, that is controlled by the
53 : /// Mutex. In the future, we might want to launch a pool of processes to allow
54 : /// concurrent replay of multiple records.
55 : pub struct PostgresRedoManager {
56 : tenant_shard_id: TenantShardId,
57 : conf: &'static PageServerConf,
58 : last_redo_at: std::sync::Mutex<Option<Instant>>,
59 : /// We use [`heavier_once_cell`] for
60 : ///
61 : /// 1. coalescing the lazy spawning of walredo processes ([`ProcessOnceCell::Spawned`])
62 : /// 2. prevent new processes from being spawned on [`Self::shutdown`] (=> [`ProcessOnceCell::ManagerShutDown`]).
63 : ///
64 : /// # Spawning
65 : ///
66 : /// Redo requests use the once cell to coalesce onto one call to [`process::WalRedoProcess::launch`].
67 : ///
68 : /// Notably, requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
69 : /// their process object; we use [`Arc::clone`] for that.
70 : ///
71 : /// This is primarily because earlier implementations that didn't use [`heavier_once_cell`]
72 : /// had that behavior; it's probably unnecessary.
73 : /// The only merit of it is that if one walredo process encounters an error,
74 : /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`].
75 : /// and retry redo, thereby starting the new process, while other redo tasks might
76 : /// still be using the old redo process. But, those other tasks will most likely
77 : /// encounter an error as well, and errors are an unexpected condition anyway.
78 : /// So, probably we could get rid of the `Arc` in the future.
79 : ///
80 : /// # Shutdown
81 : ///
82 : /// See [`Self::launched_processes`].
83 : redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
84 :
85 : /// Gate that is entered when launching a walredo process and held open
86 : /// until the process has been `kill()`ed and `wait()`ed upon.
87 : ///
88 : /// Manager shutdown waits for this gate to close after setting the
89 : /// [`ProcessOnceCell::ManagerShutDown`] state in [`Self::redo_process`].
90 : ///
91 : /// This type of usage is a bit unusual because gates usually keep track of
92 : /// concurrent operations, e.g., every [`Self::request_redo`] that is inflight.
93 : /// But we use it here to keep track of the _processes_ that we have launched,
94 : /// which may outlive any individual redo request because
95 : /// - we keep walredo process around until its quiesced to amortize spawn cost and
96 : /// - the Arc may be held by multiple concurrent redo requests, so, just because
97 : /// you replace the [`Self::redo_process`] cell's content doesn't mean the
98 : /// process gets killed immediately.
99 : ///
100 : /// We could simplify this by getting rid of the [`Arc`].
101 : /// See the comment on [`Self::redo_process`] for more details.
102 : launched_processes: utils::sync::gate::Gate,
103 : }
104 :
105 : /// See [`PostgresRedoManager::redo_process`].
106 : enum ProcessOnceCell {
107 : Spawned(Arc<Process>),
108 : ManagerShutDown,
109 : }
110 :
111 : struct Process {
112 : process: process::WalRedoProcess,
113 : /// This field is last in this struct so the guard gets dropped _after_ [`Self::process`].
114 : /// (Reminder: dropping [`Self::process`] synchronously sends SIGKILL and then `wait()`s for it to exit).
115 : _launched_processes_guard: utils::sync::gate::GateGuard,
116 : }
117 :
118 : impl std::ops::Deref for Process {
119 : type Target = process::WalRedoProcess;
120 :
121 12 : fn deref(&self) -> &Self::Target {
122 12 : &self.process
123 12 : }
124 : }
125 :
126 : #[derive(Debug, thiserror::Error)]
127 : pub enum Error {
128 : #[error("cancelled")]
129 : Cancelled,
130 : #[error(transparent)]
131 : Other(#[from] anyhow::Error),
132 : }
133 :
134 : macro_rules! bail {
135 : ($($arg:tt)*) => {
136 : return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
137 : }
138 : }
139 :
140 : #[derive(Debug, Clone, Copy)]
141 : pub enum RedoAttemptType {
142 : /// Used for the read path. Will fire critical errors and retry twice if failure.
143 : ReadPage,
144 : // Used for legacy compaction (only used in image compaction). Will fire critical errors and retry once if failure.
145 : LegacyCompaction,
146 : // Used for gc compaction. Will not fire critical errors and not retry.
147 : GcCompaction,
148 : }
149 :
150 : impl std::fmt::Display for RedoAttemptType {
151 3 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 3 : match self {
153 3 : RedoAttemptType::ReadPage => write!(f, "read page"),
154 0 : RedoAttemptType::LegacyCompaction => write!(f, "legacy compaction"),
155 0 : RedoAttemptType::GcCompaction => write!(f, "gc compaction"),
156 : }
157 3 : }
158 : }
159 :
160 : ///
161 : /// Public interface of WAL redo manager
162 : ///
163 : impl PostgresRedoManager {
164 : ///
165 : /// Request the WAL redo manager to apply some WAL records
166 : ///
167 : /// The WAL redo is handled by a separate thread, so this just sends a request
168 : /// to the thread and waits for response.
169 : ///
170 : /// # Cancel-Safety
171 : ///
172 : /// This method is cancellation-safe.
173 3 : pub async fn request_redo(
174 3 : &self,
175 3 : key: Key,
176 3 : lsn: Lsn,
177 3 : base_img: Option<(Lsn, Bytes)>,
178 3 : records: Vec<(Lsn, NeonWalRecord)>,
179 3 : pg_version: PgMajorVersion,
180 3 : redo_attempt_type: RedoAttemptType,
181 3 : ) -> Result<Bytes, Error> {
182 3 : if records.is_empty() {
183 0 : bail!("invalid WAL redo request with no records");
184 3 : }
185 :
186 3 : let max_retry_attempts = match redo_attempt_type {
187 3 : RedoAttemptType::ReadPage => 2,
188 0 : RedoAttemptType::LegacyCompaction => 1,
189 0 : RedoAttemptType::GcCompaction => 0,
190 : };
191 :
192 3 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
193 3 : let mut img = base_img.map(|p| p.1);
194 3 : let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
195 3 : let mut batch_start = 0;
196 3 : for (i, record) in records.iter().enumerate().skip(1) {
197 3 : let rec_neon = apply_neon::can_apply_in_neon(&record.1);
198 :
199 3 : if rec_neon != batch_neon {
200 0 : let result = if batch_neon {
201 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
202 : } else {
203 0 : self.apply_batch_postgres(
204 0 : key,
205 0 : lsn,
206 0 : img,
207 0 : base_img_lsn,
208 0 : &records[batch_start..i],
209 0 : self.conf.wal_redo_timeout,
210 0 : pg_version,
211 0 : max_retry_attempts,
212 0 : redo_attempt_type,
213 0 : )
214 0 : .await
215 : };
216 0 : img = Some(result?);
217 :
218 0 : batch_neon = rec_neon;
219 0 : batch_start = i;
220 3 : }
221 : }
222 : // last batch
223 3 : if batch_neon {
224 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
225 : } else {
226 3 : self.apply_batch_postgres(
227 3 : key,
228 3 : lsn,
229 3 : img,
230 3 : base_img_lsn,
231 3 : &records[batch_start..],
232 3 : self.conf.wal_redo_timeout,
233 3 : pg_version,
234 3 : max_retry_attempts,
235 3 : redo_attempt_type,
236 3 : )
237 3 : .await
238 : }
239 3 : }
240 :
241 : /// Do a ping request-response roundtrip.
242 : ///
243 : /// Not used in production, but by Rust benchmarks.
244 : ///
245 : /// # Cancel-Safety
246 : ///
247 : /// This method is cancellation-safe.
248 1 : pub async fn ping(&self, pg_version: PgMajorVersion) -> Result<(), Error> {
249 1 : self.do_with_walredo_process(pg_version, |proc| async move {
250 1 : proc.ping(Duration::from_secs(1))
251 1 : .await
252 1 : .map_err(Error::Other)
253 2 : })
254 1 : .await
255 1 : }
256 :
257 0 : pub fn status(&self) -> WalRedoManagerStatus {
258 : WalRedoManagerStatus {
259 : last_redo_at: {
260 0 : let at = *self.last_redo_at.lock().unwrap();
261 0 : at.and_then(|at| {
262 0 : let age = at.elapsed();
263 : // map any chrono errors silently to None here
264 0 : chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
265 0 : })
266 : },
267 0 : process: self.redo_process.get().and_then(|p| match &*p {
268 0 : ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
269 0 : ProcessOnceCell::ManagerShutDown => None,
270 0 : }),
271 : }
272 0 : }
273 : }
274 :
275 : impl PostgresRedoManager {
276 : ///
277 : /// Create a new PostgresRedoManager.
278 : ///
279 4 : pub fn new(
280 4 : conf: &'static PageServerConf,
281 4 : tenant_shard_id: TenantShardId,
282 4 : ) -> PostgresRedoManager {
283 : // The actual process is launched lazily, on first request.
284 4 : PostgresRedoManager {
285 4 : tenant_shard_id,
286 4 : conf,
287 4 : last_redo_at: std::sync::Mutex::default(),
288 4 : redo_process: heavier_once_cell::OnceCell::default(),
289 4 : launched_processes: utils::sync::gate::Gate::default(),
290 4 : }
291 4 : }
292 :
293 : /// Shut down the WAL redo manager.
294 : ///
295 : /// Returns `true` if this call was the one that initiated shutdown.
296 : /// `true` may be observed by no caller if the first caller stops polling.
297 : ///
298 : /// After this future completes
299 : /// - no redo process is running
300 : /// - no new redo process will be spawned
301 : /// - redo requests that need walredo process will fail with [`Error::Cancelled`]
302 : /// - [`apply_neon`]-only redo requests may still work, but this may change in the future
303 : ///
304 : /// # Cancel-Safety
305 : ///
306 : /// This method is cancellation-safe.
307 0 : pub async fn shutdown(&self) -> bool {
308 : // prevent new processes from being spawned
309 0 : let maybe_permit = match self.redo_process.get_or_init_detached().await {
310 0 : Ok(guard) => {
311 0 : if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
312 0 : None
313 : } else {
314 0 : let (proc, permit) = guard.take_and_deinit();
315 0 : drop(proc); // this just drops the Arc, its refcount may not be zero yet
316 0 : Some(permit)
317 : }
318 : }
319 0 : Err(permit) => Some(permit),
320 : };
321 0 : let it_was_us = if let Some(permit) = maybe_permit {
322 0 : self.redo_process
323 0 : .set(ProcessOnceCell::ManagerShutDown, permit);
324 0 : true
325 : } else {
326 0 : false
327 : };
328 : // wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
329 : // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
330 : // for the underlying process.
331 0 : self.launched_processes.close().await;
332 0 : it_was_us
333 0 : }
334 :
335 : /// This type doesn't have its own background task to check for idleness: we
336 : /// rely on our owner calling this function periodically in its own housekeeping
337 : /// loops.
338 0 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
339 0 : if let Ok(g) = self.last_redo_at.try_lock() {
340 0 : if let Some(last_redo_at) = *g {
341 0 : if last_redo_at.elapsed() >= idle_timeout {
342 0 : drop(g);
343 0 : drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
344 0 : }
345 0 : }
346 0 : }
347 0 : }
348 :
349 : /// # Cancel-Safety
350 : ///
351 : /// This method is cancel-safe iff `closure` is cancel-safe.
352 6 : async fn do_with_walredo_process<
353 6 : F: FnOnce(Arc<Process>) -> Fut,
354 6 : Fut: Future<Output = Result<O, Error>>,
355 6 : O,
356 6 : >(
357 6 : &self,
358 6 : pg_version: PgMajorVersion,
359 6 : closure: F,
360 6 : ) -> Result<O, Error> {
361 6 : let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
362 0 : Ok(guard) => match &*guard {
363 0 : ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
364 : ProcessOnceCell::ManagerShutDown => {
365 0 : return Err(Error::Cancelled);
366 : }
367 : },
368 6 : Err(permit) => {
369 6 : let start = Instant::now();
370 : // acquire guard before spawning process, so that we don't spawn new processes
371 : // if the gate is already closed.
372 6 : let _launched_processes_guard = match self.launched_processes.enter() {
373 6 : Ok(guard) => guard,
374 0 : Err(GateError::GateClosed) => unreachable!(
375 : "shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
376 : ),
377 : };
378 6 : let proc = Arc::new(Process {
379 6 : process: process::WalRedoProcess::launch(
380 6 : self.conf,
381 6 : self.tenant_shard_id,
382 6 : pg_version,
383 : )
384 6 : .context("launch walredo process")?,
385 6 : _launched_processes_guard,
386 : });
387 6 : let duration = start.elapsed();
388 6 : WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
389 6 : info!(
390 0 : elapsed_ms = duration.as_millis(),
391 0 : pid = proc.id(),
392 0 : "launched walredo process"
393 : );
394 6 : self.redo_process
395 6 : .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
396 6 : proc
397 : }
398 : };
399 :
400 : // async closures are unstable, would support &Process
401 6 : let result = closure(proc.clone()).await;
402 :
403 6 : if result.is_err() {
404 : // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation.
405 : // Note that there may be other tasks concurrent with us that also hold `proc`.
406 : // We have to deal with that here.
407 : // Also read the doc comment on field `self.redo_process`.
408 : //
409 : // NB: there may still be other concurrent threads using `proc`.
410 : // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
411 : //
412 : // NB: the drop impl blocks the dropping thread with a wait() system call for
413 : // the child process. In some ways the blocking is actually good: if we
414 : // deferred the waiting into the background / to tokio if we used `tokio::process`,
415 : // it could happen that if walredo always fails immediately, we spawn processes faster
416 : // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
417 : // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
418 : // This probably needs revisiting at some later point.
419 3 : match self.redo_process.get() {
420 0 : None => (),
421 3 : Some(guard) => {
422 3 : match &*guard {
423 0 : ProcessOnceCell::ManagerShutDown => {}
424 3 : ProcessOnceCell::Spawned(guard_proc) => {
425 3 : if Arc::ptr_eq(&proc, guard_proc) {
426 3 : // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
427 3 : guard.take_and_deinit();
428 3 : } else {
429 0 : // Another task already spawned another redo process (further up in this method)
430 0 : // and put it into `redo_process`. Do nothing, our view of the world is behind.
431 0 : }
432 : }
433 : }
434 : }
435 : }
436 : // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall.
437 3 : drop(proc);
438 3 : }
439 :
440 6 : result
441 6 : }
442 :
443 : ///
444 : /// Process one request for WAL redo using wal-redo postgres
445 : ///
446 : /// # Cancel-Safety
447 : ///
448 : /// Cancellation safe.
449 : #[allow(clippy::too_many_arguments)]
450 3 : async fn apply_batch_postgres(
451 3 : &self,
452 3 : key: Key,
453 3 : lsn: Lsn,
454 3 : base_img: Option<Bytes>,
455 3 : base_img_lsn: Lsn,
456 3 : records: &[(Lsn, NeonWalRecord)],
457 3 : wal_redo_timeout: Duration,
458 3 : pg_version: PgMajorVersion,
459 3 : max_retry_attempts: u32,
460 3 : redo_attempt_type: RedoAttemptType,
461 3 : ) -> Result<Bytes, Error> {
462 3 : *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
463 :
464 3 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
465 3 : let mut n_attempts = 0u32;
466 : loop {
467 5 : let base_img = &base_img;
468 5 : let closure = |proc: Arc<Process>| async move {
469 5 : let started_at = std::time::Instant::now();
470 :
471 : // Relational WAL records are applied using wal-redo-postgres
472 5 : let result = proc
473 5 : .apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
474 5 : .await
475 5 : .context("apply_wal_records");
476 :
477 5 : let duration = started_at.elapsed();
478 :
479 5 : let len = records.len();
480 10 : let nbytes = records.iter().fold(0, |acumulator, record| {
481 10 : acumulator
482 10 : + match &record.1 {
483 10 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
484 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
485 : }
486 10 : });
487 :
488 5 : WAL_REDO_TIME.observe(duration.as_secs_f64());
489 5 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
490 5 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
491 :
492 5 : debug!(
493 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
494 : len,
495 : nbytes,
496 0 : duration.as_micros(),
497 : lsn
498 : );
499 :
500 5 : if let Err(e) = result.as_ref() {
501 : macro_rules! message {
502 : ($level:tt) => {
503 : $level!(
504 : "error applying {} WAL records {}..{} ({} bytes) to key {} during {}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
505 : records.len(),
506 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
507 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
508 : nbytes,
509 : key,
510 : redo_attempt_type,
511 : base_img_lsn,
512 : lsn,
513 : n_attempts,
514 : e,
515 : )
516 : }
517 : }
518 3 : match redo_attempt_type {
519 3 : RedoAttemptType::ReadPage => message!(error),
520 0 : RedoAttemptType::LegacyCompaction => message!(error),
521 0 : RedoAttemptType::GcCompaction => message!(warn),
522 : }
523 2 : }
524 :
525 5 : result.map_err(Error::Other)
526 10 : };
527 5 : let result = self.do_with_walredo_process(pg_version, closure).await;
528 :
529 5 : if result.is_ok() && n_attempts != 0 {
530 0 : info!(n_attempts, "retried walredo succeeded");
531 5 : }
532 5 : n_attempts += 1;
533 5 : if n_attempts > max_retry_attempts || result.is_ok() {
534 3 : return result;
535 2 : }
536 : }
537 3 : }
538 :
539 : ///
540 : /// Process a batch of WAL records using bespoken Neon code.
541 : ///
542 0 : fn apply_batch_neon(
543 0 : &self,
544 0 : key: Key,
545 0 : lsn: Lsn,
546 0 : base_img: Option<Bytes>,
547 0 : records: &[(Lsn, NeonWalRecord)],
548 0 : ) -> Result<Bytes, Error> {
549 0 : let start_time = Instant::now();
550 :
551 0 : let mut page = BytesMut::new();
552 0 : if let Some(fpi) = base_img {
553 0 : // If full-page image is provided, then use it...
554 0 : page.extend_from_slice(&fpi[..]);
555 0 : } else {
556 : // All the current WAL record types that we can handle require a base image.
557 0 : bail!("invalid neon WAL redo request with no base image");
558 : }
559 :
560 : // Apply all the WAL records in the batch
561 0 : for (record_lsn, record) in records.iter() {
562 0 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
563 : }
564 : // Success!
565 0 : let duration = start_time.elapsed();
566 : // FIXME: using the same metric here creates a bimodal distribution by default, and because
567 : // there could be multiple batch sizes this would be N+1 modal.
568 0 : WAL_REDO_TIME.observe(duration.as_secs_f64());
569 :
570 0 : debug!(
571 0 : "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
572 0 : records.len(),
573 0 : duration.as_micros(),
574 : lsn
575 : );
576 :
577 0 : Ok(page.freeze())
578 0 : }
579 :
580 0 : fn apply_record_neon(
581 0 : &self,
582 0 : key: Key,
583 0 : page: &mut BytesMut,
584 0 : record_lsn: Lsn,
585 0 : record: &NeonWalRecord,
586 0 : ) -> anyhow::Result<()> {
587 0 : apply_neon::apply_in_neon(record, record_lsn, key, page)?;
588 :
589 0 : Ok(())
590 0 : }
591 : }
592 :
593 : #[cfg(test)]
594 : pub(crate) mod harness {
595 : use super::PostgresRedoManager;
596 : use crate::config::PageServerConf;
597 : use utils::{id::TenantId, shard::TenantShardId};
598 :
599 : pub struct RedoHarness {
600 : // underscored because unused, except for removal at drop
601 : _repo_dir: camino_tempfile::Utf8TempDir,
602 : pub manager: PostgresRedoManager,
603 : tenant_shard_id: TenantShardId,
604 : }
605 :
606 : impl RedoHarness {
607 4 : pub fn new() -> anyhow::Result<Self> {
608 4 : crate::tenant::harness::setup_logging();
609 :
610 4 : let repo_dir = camino_tempfile::tempdir()?;
611 4 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
612 4 : let conf = Box::leak(Box::new(conf));
613 4 : let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
614 :
615 4 : let manager = PostgresRedoManager::new(conf, tenant_shard_id);
616 :
617 4 : Ok(RedoHarness {
618 4 : _repo_dir: repo_dir,
619 4 : manager,
620 4 : tenant_shard_id,
621 4 : })
622 4 : }
623 4 : pub fn span(&self) -> tracing::Span {
624 4 : tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
625 4 : }
626 : }
627 : }
628 :
629 : #[cfg(test)]
630 : mod tests {
631 : use std::str::FromStr;
632 :
633 : use bytes::Bytes;
634 : use pageserver_api::key::Key;
635 : use postgres_ffi::PgMajorVersion;
636 : use tracing::Instrument;
637 : use utils::lsn::Lsn;
638 : use wal_decoder::models::record::NeonWalRecord;
639 :
640 : use crate::walredo::RedoAttemptType;
641 : use crate::walredo::harness::RedoHarness;
642 :
643 : #[tokio::test]
644 1 : async fn test_ping() {
645 1 : let h = RedoHarness::new().unwrap();
646 :
647 1 : h.manager
648 1 : .ping(PgMajorVersion::PG14)
649 1 : .instrument(h.span())
650 1 : .await
651 1 : .expect("ping should work");
652 1 : }
653 :
654 : #[tokio::test]
655 1 : async fn short_v14_redo() {
656 1 : let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
657 :
658 1 : let h = RedoHarness::new().unwrap();
659 :
660 1 : let page = h
661 1 : .manager
662 1 : .request_redo(
663 1 : Key {
664 1 : field1: 0,
665 1 : field2: 1663,
666 1 : field3: 13010,
667 1 : field4: 1259,
668 1 : field5: 0,
669 1 : field6: 0,
670 1 : },
671 1 : Lsn::from_str("0/16E2408").unwrap(),
672 1 : None,
673 1 : short_records(),
674 1 : PgMajorVersion::PG14,
675 1 : RedoAttemptType::ReadPage,
676 1 : )
677 1 : .instrument(h.span())
678 1 : .await
679 1 : .unwrap();
680 :
681 1 : assert_eq!(&expected, &*page);
682 1 : }
683 :
684 : #[tokio::test]
685 1 : async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
686 1 : let h = RedoHarness::new().unwrap();
687 :
688 1 : let page = h
689 1 : .manager
690 1 : .request_redo(
691 1 : Key {
692 1 : field1: 0,
693 1 : field2: 1663,
694 1 : // key should be 13010
695 1 : field3: 13130,
696 1 : field4: 1259,
697 1 : field5: 0,
698 1 : field6: 0,
699 1 : },
700 1 : Lsn::from_str("0/16E2408").unwrap(),
701 1 : None,
702 1 : short_records(),
703 1 : PgMajorVersion::PG14,
704 1 : RedoAttemptType::ReadPage,
705 1 : )
706 1 : .instrument(h.span())
707 1 : .await
708 1 : .unwrap();
709 :
710 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
711 : // perhaps be captured as long as it's in the same thread.
712 1 : assert_eq!(page, crate::ZERO_PAGE);
713 1 : }
714 :
715 : #[tokio::test]
716 1 : async fn test_stderr() {
717 1 : let h = RedoHarness::new().unwrap();
718 1 : h
719 1 : .manager
720 1 : .request_redo(
721 1 : Key::from_i128(0),
722 1 : Lsn::INVALID,
723 1 : None,
724 1 : short_records(),
725 1 : PgMajorVersion::PG16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
726 1 : RedoAttemptType::ReadPage,
727 1 : )
728 1 : .instrument(h.span())
729 1 : .await
730 1 : .unwrap_err();
731 1 : }
732 :
733 : #[allow(clippy::octal_escapes)]
734 3 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
735 3 : vec![
736 3 : (
737 3 : Lsn::from_str("0/16A9388").unwrap(),
738 3 : NeonWalRecord::Postgres {
739 3 : will_init: true,
740 3 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
741 3 : }
742 3 : ),
743 3 : (
744 3 : Lsn::from_str("0/16D4080").unwrap(),
745 3 : NeonWalRecord::Postgres {
746 3 : will_init: false,
747 3 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
748 3 : }
749 3 : )
750 : ]
751 3 : }
752 : }
|