Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 :
21 : /// Process lifecycle and abstracction for the IPC protocol.
22 : mod process;
23 :
24 : /// Code to apply [`NeonWalRecord`]s.
25 : pub(crate) mod apply_neon;
26 :
27 : use std::future::Future;
28 : use std::sync::Arc;
29 : use std::time::{Duration, Instant};
30 :
31 : use anyhow::Context;
32 : use bytes::{Bytes, BytesMut};
33 : use pageserver_api::key::Key;
34 : use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
35 : use pageserver_api::shard::TenantShardId;
36 : use postgres_ffi::PgMajorVersion;
37 : use tracing::*;
38 : use utils::lsn::Lsn;
39 : use utils::sync::gate::GateError;
40 : use utils::sync::heavier_once_cell;
41 : use wal_decoder::models::record::NeonWalRecord;
42 :
43 : use crate::config::PageServerConf;
44 : use crate::metrics::{
45 : WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
46 : WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
47 : };
48 :
49 : /// The real implementation that uses a Postgres process to
50 : /// perform WAL replay.
51 : ///
52 : /// Only one thread can use the process at a time, that is controlled by the
53 : /// Mutex. In the future, we might want to launch a pool of processes to allow
54 : /// concurrent replay of multiple records.
55 : pub struct PostgresRedoManager {
56 : tenant_shard_id: TenantShardId,
57 : conf: &'static PageServerConf,
58 : last_redo_at: std::sync::Mutex<Option<Instant>>,
59 : /// We use [`heavier_once_cell`] for
60 : ///
61 : /// 1. coalescing the lazy spawning of walredo processes ([`ProcessOnceCell::Spawned`])
62 : /// 2. prevent new processes from being spawned on [`Self::shutdown`] (=> [`ProcessOnceCell::ManagerShutDown`]).
63 : ///
64 : /// # Spawning
65 : ///
66 : /// Redo requests use the once cell to coalesce onto one call to [`process::WalRedoProcess::launch`].
67 : ///
68 : /// Notably, requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
69 : /// their process object; we use [`Arc::clone`] for that.
70 : ///
71 : /// This is primarily because earlier implementations that didn't use [`heavier_once_cell`]
72 : /// had that behavior; it's probably unnecessary.
73 : /// The only merit of it is that if one walredo process encounters an error,
74 : /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`].
75 : /// and retry redo, thereby starting the new process, while other redo tasks might
76 : /// still be using the old redo process. But, those other tasks will most likely
77 : /// encounter an error as well, and errors are an unexpected condition anyway.
78 : /// So, probably we could get rid of the `Arc` in the future.
79 : ///
80 : /// # Shutdown
81 : ///
82 : /// See [`Self::launched_processes`].
83 : redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
84 :
85 : /// Gate that is entered when launching a walredo process and held open
86 : /// until the process has been `kill()`ed and `wait()`ed upon.
87 : ///
88 : /// Manager shutdown waits for this gate to close after setting the
89 : /// [`ProcessOnceCell::ManagerShutDown`] state in [`Self::redo_process`].
90 : ///
91 : /// This type of usage is a bit unusual because gates usually keep track of
92 : /// concurrent operations, e.g., every [`Self::request_redo`] that is inflight.
93 : /// But we use it here to keep track of the _processes_ that we have launched,
94 : /// which may outlive any individual redo request because
95 : /// - we keep walredo process around until its quiesced to amortize spawn cost and
96 : /// - the Arc may be held by multiple concurrent redo requests, so, just because
97 : /// you replace the [`Self::redo_process`] cell's content doesn't mean the
98 : /// process gets killed immediately.
99 : ///
100 : /// We could simplify this by getting rid of the [`Arc`].
101 : /// See the comment on [`Self::redo_process`] for more details.
102 : launched_processes: utils::sync::gate::Gate,
103 : }
104 :
105 : /// See [`PostgresRedoManager::redo_process`].
106 : enum ProcessOnceCell {
107 : Spawned(Arc<Process>),
108 : ManagerShutDown,
109 : }
110 :
111 : struct Process {
112 : process: process::WalRedoProcess,
113 : /// This field is last in this struct so the guard gets dropped _after_ [`Self::process`].
114 : /// (Reminder: dropping [`Self::process`] synchronously sends SIGKILL and then `wait()`s for it to exit).
115 : _launched_processes_guard: utils::sync::gate::GateGuard,
116 : }
117 :
118 : impl std::ops::Deref for Process {
119 : type Target = process::WalRedoProcess;
120 :
121 12 : fn deref(&self) -> &Self::Target {
122 12 : &self.process
123 12 : }
124 : }
125 :
126 : #[derive(Debug, thiserror::Error)]
127 : pub enum Error {
128 : #[error("cancelled")]
129 : Cancelled,
130 : #[error(transparent)]
131 : Other(#[from] anyhow::Error),
132 : }
133 :
134 : macro_rules! bail {
135 : ($($arg:tt)*) => {
136 : return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
137 : }
138 : }
139 :
140 : #[derive(Debug, Clone, Copy)]
141 : pub enum RedoAttemptType {
142 : /// Used for the read path. Will fire critical errors and retry twice if failure.
143 : ReadPage,
144 : // Used for legacy compaction (only used in image compaction). Will fire critical errors and retry once if failure.
145 : LegacyCompaction,
146 : // Used for gc compaction. Will not fire critical errors and not retry.
147 : GcCompaction,
148 : }
149 :
150 : ///
151 : /// Public interface of WAL redo manager
152 : ///
153 : impl PostgresRedoManager {
154 : ///
155 : /// Request the WAL redo manager to apply some WAL records
156 : ///
157 : /// The WAL redo is handled by a separate thread, so this just sends a request
158 : /// to the thread and waits for response.
159 : ///
160 : /// # Cancel-Safety
161 : ///
162 : /// This method is cancellation-safe.
163 3 : pub async fn request_redo(
164 3 : &self,
165 3 : key: Key,
166 3 : lsn: Lsn,
167 3 : base_img: Option<(Lsn, Bytes)>,
168 3 : records: Vec<(Lsn, NeonWalRecord)>,
169 3 : pg_version: PgMajorVersion,
170 3 : redo_attempt_type: RedoAttemptType,
171 3 : ) -> Result<Bytes, Error> {
172 3 : if records.is_empty() {
173 0 : bail!("invalid WAL redo request with no records");
174 3 : }
175 :
176 3 : let max_retry_attempts = match redo_attempt_type {
177 3 : RedoAttemptType::ReadPage => 2,
178 0 : RedoAttemptType::LegacyCompaction => 1,
179 0 : RedoAttemptType::GcCompaction => 0,
180 : };
181 :
182 3 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
183 3 : let mut img = base_img.map(|p| p.1);
184 3 : let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
185 3 : let mut batch_start = 0;
186 3 : for (i, record) in records.iter().enumerate().skip(1) {
187 3 : let rec_neon = apply_neon::can_apply_in_neon(&record.1);
188 :
189 3 : if rec_neon != batch_neon {
190 0 : let result = if batch_neon {
191 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
192 : } else {
193 0 : self.apply_batch_postgres(
194 0 : key,
195 0 : lsn,
196 0 : img,
197 0 : base_img_lsn,
198 0 : &records[batch_start..i],
199 0 : self.conf.wal_redo_timeout,
200 0 : pg_version,
201 0 : max_retry_attempts,
202 0 : )
203 0 : .await
204 : };
205 0 : img = Some(result?);
206 :
207 0 : batch_neon = rec_neon;
208 0 : batch_start = i;
209 3 : }
210 : }
211 : // last batch
212 3 : if batch_neon {
213 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
214 : } else {
215 3 : self.apply_batch_postgres(
216 3 : key,
217 3 : lsn,
218 3 : img,
219 3 : base_img_lsn,
220 3 : &records[batch_start..],
221 3 : self.conf.wal_redo_timeout,
222 3 : pg_version,
223 3 : max_retry_attempts,
224 3 : )
225 3 : .await
226 : }
227 3 : }
228 :
229 : /// Do a ping request-response roundtrip.
230 : ///
231 : /// Not used in production, but by Rust benchmarks.
232 : ///
233 : /// # Cancel-Safety
234 : ///
235 : /// This method is cancellation-safe.
236 1 : pub async fn ping(&self, pg_version: PgMajorVersion) -> Result<(), Error> {
237 1 : self.do_with_walredo_process(pg_version, |proc| async move {
238 1 : proc.ping(Duration::from_secs(1))
239 1 : .await
240 1 : .map_err(Error::Other)
241 2 : })
242 1 : .await
243 1 : }
244 :
245 0 : pub fn status(&self) -> WalRedoManagerStatus {
246 : WalRedoManagerStatus {
247 : last_redo_at: {
248 0 : let at = *self.last_redo_at.lock().unwrap();
249 0 : at.and_then(|at| {
250 0 : let age = at.elapsed();
251 : // map any chrono errors silently to None here
252 0 : chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
253 0 : })
254 : },
255 0 : process: self.redo_process.get().and_then(|p| match &*p {
256 0 : ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
257 0 : ProcessOnceCell::ManagerShutDown => None,
258 0 : }),
259 : }
260 0 : }
261 : }
262 :
263 : impl PostgresRedoManager {
264 : ///
265 : /// Create a new PostgresRedoManager.
266 : ///
267 4 : pub fn new(
268 4 : conf: &'static PageServerConf,
269 4 : tenant_shard_id: TenantShardId,
270 4 : ) -> PostgresRedoManager {
271 : // The actual process is launched lazily, on first request.
272 4 : PostgresRedoManager {
273 4 : tenant_shard_id,
274 4 : conf,
275 4 : last_redo_at: std::sync::Mutex::default(),
276 4 : redo_process: heavier_once_cell::OnceCell::default(),
277 4 : launched_processes: utils::sync::gate::Gate::default(),
278 4 : }
279 4 : }
280 :
281 : /// Shut down the WAL redo manager.
282 : ///
283 : /// Returns `true` if this call was the one that initiated shutdown.
284 : /// `true` may be observed by no caller if the first caller stops polling.
285 : ///
286 : /// After this future completes
287 : /// - no redo process is running
288 : /// - no new redo process will be spawned
289 : /// - redo requests that need walredo process will fail with [`Error::Cancelled`]
290 : /// - [`apply_neon`]-only redo requests may still work, but this may change in the future
291 : ///
292 : /// # Cancel-Safety
293 : ///
294 : /// This method is cancellation-safe.
295 0 : pub async fn shutdown(&self) -> bool {
296 : // prevent new processes from being spawned
297 0 : let maybe_permit = match self.redo_process.get_or_init_detached().await {
298 0 : Ok(guard) => {
299 0 : if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
300 0 : None
301 : } else {
302 0 : let (proc, permit) = guard.take_and_deinit();
303 0 : drop(proc); // this just drops the Arc, its refcount may not be zero yet
304 0 : Some(permit)
305 : }
306 : }
307 0 : Err(permit) => Some(permit),
308 : };
309 0 : let it_was_us = if let Some(permit) = maybe_permit {
310 0 : self.redo_process
311 0 : .set(ProcessOnceCell::ManagerShutDown, permit);
312 0 : true
313 : } else {
314 0 : false
315 : };
316 : // wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
317 : // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
318 : // for the underlying process.
319 0 : self.launched_processes.close().await;
320 0 : it_was_us
321 0 : }
322 :
323 : /// This type doesn't have its own background task to check for idleness: we
324 : /// rely on our owner calling this function periodically in its own housekeeping
325 : /// loops.
326 0 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
327 0 : if let Ok(g) = self.last_redo_at.try_lock() {
328 0 : if let Some(last_redo_at) = *g {
329 0 : if last_redo_at.elapsed() >= idle_timeout {
330 0 : drop(g);
331 0 : drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
332 0 : }
333 0 : }
334 0 : }
335 0 : }
336 :
337 : /// # Cancel-Safety
338 : ///
339 : /// This method is cancel-safe iff `closure` is cancel-safe.
340 6 : async fn do_with_walredo_process<
341 6 : F: FnOnce(Arc<Process>) -> Fut,
342 6 : Fut: Future<Output = Result<O, Error>>,
343 6 : O,
344 6 : >(
345 6 : &self,
346 6 : pg_version: PgMajorVersion,
347 6 : closure: F,
348 6 : ) -> Result<O, Error> {
349 6 : let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
350 0 : Ok(guard) => match &*guard {
351 0 : ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
352 : ProcessOnceCell::ManagerShutDown => {
353 0 : return Err(Error::Cancelled);
354 : }
355 : },
356 6 : Err(permit) => {
357 6 : let start = Instant::now();
358 : // acquire guard before spawning process, so that we don't spawn new processes
359 : // if the gate is already closed.
360 6 : let _launched_processes_guard = match self.launched_processes.enter() {
361 6 : Ok(guard) => guard,
362 0 : Err(GateError::GateClosed) => unreachable!(
363 : "shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
364 : ),
365 : };
366 6 : let proc = Arc::new(Process {
367 6 : process: process::WalRedoProcess::launch(
368 6 : self.conf,
369 6 : self.tenant_shard_id,
370 6 : pg_version,
371 : )
372 6 : .context("launch walredo process")?,
373 6 : _launched_processes_guard,
374 : });
375 6 : let duration = start.elapsed();
376 6 : WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
377 6 : info!(
378 0 : elapsed_ms = duration.as_millis(),
379 0 : pid = proc.id(),
380 0 : "launched walredo process"
381 : );
382 6 : self.redo_process
383 6 : .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
384 6 : proc
385 : }
386 : };
387 :
388 : // async closures are unstable, would support &Process
389 6 : let result = closure(proc.clone()).await;
390 :
391 6 : if result.is_err() {
392 : // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation.
393 : // Note that there may be other tasks concurrent with us that also hold `proc`.
394 : // We have to deal with that here.
395 : // Also read the doc comment on field `self.redo_process`.
396 : //
397 : // NB: there may still be other concurrent threads using `proc`.
398 : // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
399 : //
400 : // NB: the drop impl blocks the dropping thread with a wait() system call for
401 : // the child process. In some ways the blocking is actually good: if we
402 : // deferred the waiting into the background / to tokio if we used `tokio::process`,
403 : // it could happen that if walredo always fails immediately, we spawn processes faster
404 : // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
405 : // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
406 : // This probably needs revisiting at some later point.
407 3 : match self.redo_process.get() {
408 0 : None => (),
409 3 : Some(guard) => {
410 3 : match &*guard {
411 0 : ProcessOnceCell::ManagerShutDown => {}
412 3 : ProcessOnceCell::Spawned(guard_proc) => {
413 3 : if Arc::ptr_eq(&proc, guard_proc) {
414 3 : // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
415 3 : guard.take_and_deinit();
416 3 : } else {
417 0 : // Another task already spawned another redo process (further up in this method)
418 0 : // and put it into `redo_process`. Do nothing, our view of the world is behind.
419 0 : }
420 : }
421 : }
422 : }
423 : }
424 : // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall.
425 3 : drop(proc);
426 3 : }
427 :
428 6 : result
429 6 : }
430 :
431 : ///
432 : /// Process one request for WAL redo using wal-redo postgres
433 : ///
434 : /// # Cancel-Safety
435 : ///
436 : /// Cancellation safe.
437 : #[allow(clippy::too_many_arguments)]
438 3 : async fn apply_batch_postgres(
439 3 : &self,
440 3 : key: Key,
441 3 : lsn: Lsn,
442 3 : base_img: Option<Bytes>,
443 3 : base_img_lsn: Lsn,
444 3 : records: &[(Lsn, NeonWalRecord)],
445 3 : wal_redo_timeout: Duration,
446 3 : pg_version: PgMajorVersion,
447 3 : max_retry_attempts: u32,
448 3 : ) -> Result<Bytes, Error> {
449 3 : *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
450 :
451 3 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
452 3 : let mut n_attempts = 0u32;
453 : loop {
454 5 : let base_img = &base_img;
455 5 : let closure = |proc: Arc<Process>| async move {
456 5 : let started_at = std::time::Instant::now();
457 :
458 : // Relational WAL records are applied using wal-redo-postgres
459 5 : let result = proc
460 5 : .apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
461 5 : .await
462 5 : .context("apply_wal_records");
463 :
464 5 : let duration = started_at.elapsed();
465 :
466 5 : let len = records.len();
467 10 : let nbytes = records.iter().fold(0, |acumulator, record| {
468 10 : acumulator
469 10 : + match &record.1 {
470 10 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
471 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
472 : }
473 10 : });
474 :
475 5 : WAL_REDO_TIME.observe(duration.as_secs_f64());
476 5 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
477 5 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
478 :
479 5 : debug!(
480 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
481 : len,
482 : nbytes,
483 0 : duration.as_micros(),
484 : lsn
485 : );
486 :
487 5 : if let Err(e) = result.as_ref() {
488 3 : error!(
489 0 : "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
490 0 : records.len(),
491 0 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
492 0 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
493 : nbytes,
494 : base_img_lsn,
495 : lsn,
496 : n_attempts,
497 : e,
498 : );
499 2 : }
500 :
501 5 : result.map_err(Error::Other)
502 10 : };
503 5 : let result = self.do_with_walredo_process(pg_version, closure).await;
504 :
505 5 : if result.is_ok() && n_attempts != 0 {
506 0 : info!(n_attempts, "retried walredo succeeded");
507 5 : }
508 5 : n_attempts += 1;
509 5 : if n_attempts > max_retry_attempts || result.is_ok() {
510 3 : return result;
511 2 : }
512 : }
513 3 : }
514 :
515 : ///
516 : /// Process a batch of WAL records using bespoken Neon code.
517 : ///
518 0 : fn apply_batch_neon(
519 0 : &self,
520 0 : key: Key,
521 0 : lsn: Lsn,
522 0 : base_img: Option<Bytes>,
523 0 : records: &[(Lsn, NeonWalRecord)],
524 0 : ) -> Result<Bytes, Error> {
525 0 : let start_time = Instant::now();
526 :
527 0 : let mut page = BytesMut::new();
528 0 : if let Some(fpi) = base_img {
529 0 : // If full-page image is provided, then use it...
530 0 : page.extend_from_slice(&fpi[..]);
531 0 : } else {
532 : // All the current WAL record types that we can handle require a base image.
533 0 : bail!("invalid neon WAL redo request with no base image");
534 : }
535 :
536 : // Apply all the WAL records in the batch
537 0 : for (record_lsn, record) in records.iter() {
538 0 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
539 : }
540 : // Success!
541 0 : let duration = start_time.elapsed();
542 : // FIXME: using the same metric here creates a bimodal distribution by default, and because
543 : // there could be multiple batch sizes this would be N+1 modal.
544 0 : WAL_REDO_TIME.observe(duration.as_secs_f64());
545 :
546 0 : debug!(
547 0 : "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
548 0 : records.len(),
549 0 : duration.as_micros(),
550 : lsn
551 : );
552 :
553 0 : Ok(page.freeze())
554 0 : }
555 :
556 0 : fn apply_record_neon(
557 0 : &self,
558 0 : key: Key,
559 0 : page: &mut BytesMut,
560 0 : record_lsn: Lsn,
561 0 : record: &NeonWalRecord,
562 0 : ) -> anyhow::Result<()> {
563 0 : apply_neon::apply_in_neon(record, record_lsn, key, page)?;
564 :
565 0 : Ok(())
566 0 : }
567 : }
568 :
569 : #[cfg(test)]
570 : mod tests {
571 : use std::str::FromStr;
572 :
573 : use bytes::Bytes;
574 : use pageserver_api::key::Key;
575 : use pageserver_api::shard::TenantShardId;
576 : use postgres_ffi::PgMajorVersion;
577 : use tracing::Instrument;
578 : use utils::id::TenantId;
579 : use utils::lsn::Lsn;
580 : use wal_decoder::models::record::NeonWalRecord;
581 :
582 : use super::PostgresRedoManager;
583 : use crate::config::PageServerConf;
584 : use crate::walredo::RedoAttemptType;
585 :
586 : #[tokio::test]
587 1 : async fn test_ping() {
588 1 : let h = RedoHarness::new().unwrap();
589 :
590 1 : h.manager
591 1 : .ping(PgMajorVersion::PG14)
592 1 : .instrument(h.span())
593 1 : .await
594 1 : .expect("ping should work");
595 1 : }
596 :
597 : #[tokio::test]
598 1 : async fn short_v14_redo() {
599 1 : let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
600 :
601 1 : let h = RedoHarness::new().unwrap();
602 :
603 1 : let page = h
604 1 : .manager
605 1 : .request_redo(
606 1 : Key {
607 1 : field1: 0,
608 1 : field2: 1663,
609 1 : field3: 13010,
610 1 : field4: 1259,
611 1 : field5: 0,
612 1 : field6: 0,
613 1 : },
614 1 : Lsn::from_str("0/16E2408").unwrap(),
615 1 : None,
616 1 : short_records(),
617 1 : PgMajorVersion::PG14,
618 1 : RedoAttemptType::ReadPage,
619 1 : )
620 1 : .instrument(h.span())
621 1 : .await
622 1 : .unwrap();
623 :
624 1 : assert_eq!(&expected, &*page);
625 1 : }
626 :
627 : #[tokio::test]
628 1 : async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
629 1 : let h = RedoHarness::new().unwrap();
630 :
631 1 : let page = h
632 1 : .manager
633 1 : .request_redo(
634 1 : Key {
635 1 : field1: 0,
636 1 : field2: 1663,
637 1 : // key should be 13010
638 1 : field3: 13130,
639 1 : field4: 1259,
640 1 : field5: 0,
641 1 : field6: 0,
642 1 : },
643 1 : Lsn::from_str("0/16E2408").unwrap(),
644 1 : None,
645 1 : short_records(),
646 1 : PgMajorVersion::PG14,
647 1 : RedoAttemptType::ReadPage,
648 1 : )
649 1 : .instrument(h.span())
650 1 : .await
651 1 : .unwrap();
652 :
653 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
654 : // perhaps be captured as long as it's in the same thread.
655 1 : assert_eq!(page, crate::ZERO_PAGE);
656 1 : }
657 :
658 : #[tokio::test]
659 1 : async fn test_stderr() {
660 1 : let h = RedoHarness::new().unwrap();
661 1 : h
662 1 : .manager
663 1 : .request_redo(
664 1 : Key::from_i128(0),
665 1 : Lsn::INVALID,
666 1 : None,
667 1 : short_records(),
668 1 : PgMajorVersion::PG16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
669 1 : RedoAttemptType::ReadPage,
670 1 : )
671 1 : .instrument(h.span())
672 1 : .await
673 1 : .unwrap_err();
674 1 : }
675 :
676 : #[allow(clippy::octal_escapes)]
677 3 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
678 3 : vec![
679 3 : (
680 3 : Lsn::from_str("0/16A9388").unwrap(),
681 3 : NeonWalRecord::Postgres {
682 3 : will_init: true,
683 3 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
684 3 : }
685 3 : ),
686 3 : (
687 3 : Lsn::from_str("0/16D4080").unwrap(),
688 3 : NeonWalRecord::Postgres {
689 3 : will_init: false,
690 3 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
691 3 : }
692 3 : )
693 : ]
694 3 : }
695 :
696 : struct RedoHarness {
697 : // underscored because unused, except for removal at drop
698 : _repo_dir: camino_tempfile::Utf8TempDir,
699 : manager: PostgresRedoManager,
700 : tenant_shard_id: TenantShardId,
701 : }
702 :
703 : impl RedoHarness {
704 4 : fn new() -> anyhow::Result<Self> {
705 4 : crate::tenant::harness::setup_logging();
706 :
707 4 : let repo_dir = camino_tempfile::tempdir()?;
708 4 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
709 4 : let conf = Box::leak(Box::new(conf));
710 4 : let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
711 :
712 4 : let manager = PostgresRedoManager::new(conf, tenant_shard_id);
713 :
714 4 : Ok(RedoHarness {
715 4 : _repo_dir: repo_dir,
716 4 : manager,
717 4 : tenant_shard_id,
718 4 : })
719 4 : }
720 4 : fn span(&self) -> tracing::Span {
721 4 : tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
722 4 : }
723 : }
724 : }
|