Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 :
21 : /// Process lifecycle and abstracction for the IPC protocol.
22 : mod process;
23 :
24 : /// Code to apply [`NeonWalRecord`]s.
25 : pub(crate) mod apply_neon;
26 :
27 : use std::future::Future;
28 : use std::sync::Arc;
29 : use std::time::{Duration, Instant};
30 :
31 : use anyhow::Context;
32 : use bytes::{Bytes, BytesMut};
33 : use pageserver_api::key::Key;
34 : use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
35 : use pageserver_api::record::NeonWalRecord;
36 : use pageserver_api::shard::TenantShardId;
37 : use tracing::*;
38 : use utils::lsn::Lsn;
39 : use utils::sync::gate::GateError;
40 : use utils::sync::heavier_once_cell;
41 :
42 : use crate::config::PageServerConf;
43 : use crate::metrics::{
44 : WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
45 : WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
46 : };
47 :
48 : /// The real implementation that uses a Postgres process to
49 : /// perform WAL replay.
50 : ///
51 : /// Only one thread can use the process at a time, that is controlled by the
52 : /// Mutex. In the future, we might want to launch a pool of processes to allow
53 : /// concurrent replay of multiple records.
54 : pub struct PostgresRedoManager {
55 : tenant_shard_id: TenantShardId,
56 : conf: &'static PageServerConf,
57 : last_redo_at: std::sync::Mutex<Option<Instant>>,
58 : /// We use [`heavier_once_cell`] for
59 : ///
60 : /// 1. coalescing the lazy spawning of walredo processes ([`ProcessOnceCell::Spawned`])
61 : /// 2. prevent new processes from being spawned on [`Self::shutdown`] (=> [`ProcessOnceCell::ManagerShutDown`]).
62 : ///
63 : /// # Spawning
64 : ///
65 : /// Redo requests use the once cell to coalesce onto one call to [`process::WalRedoProcess::launch`].
66 : ///
67 : /// Notably, requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
68 : /// their process object; we use [`Arc::clone`] for that.
69 : ///
70 : /// This is primarily because earlier implementations that didn't use [`heavier_once_cell`]
71 : /// had that behavior; it's probably unnecessary.
72 : /// The only merit of it is that if one walredo process encounters an error,
73 : /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`].
74 : /// and retry redo, thereby starting the new process, while other redo tasks might
75 : /// still be using the old redo process. But, those other tasks will most likely
76 : /// encounter an error as well, and errors are an unexpected condition anyway.
77 : /// So, probably we could get rid of the `Arc` in the future.
78 : ///
79 : /// # Shutdown
80 : ///
81 : /// See [`Self::launched_processes`].
82 : redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
83 :
84 : /// Gate that is entered when launching a walredo process and held open
85 : /// until the process has been `kill()`ed and `wait()`ed upon.
86 : ///
87 : /// Manager shutdown waits for this gate to close after setting the
88 : /// [`ProcessOnceCell::ManagerShutDown`] state in [`Self::redo_process`].
89 : ///
90 : /// This type of usage is a bit unusual because gates usually keep track of
91 : /// concurrent operations, e.g., every [`Self::request_redo`] that is inflight.
92 : /// But we use it here to keep track of the _processes_ that we have launched,
93 : /// which may outlive any individual redo request because
94 : /// - we keep walredo process around until its quiesced to amortize spawn cost and
95 : /// - the Arc may be held by multiple concurrent redo requests, so, just because
96 : /// you replace the [`Self::redo_process`] cell's content doesn't mean the
97 : /// process gets killed immediately.
98 : ///
99 : /// We could simplify this by getting rid of the [`Arc`].
100 : /// See the comment on [`Self::redo_process`] for more details.
101 : launched_processes: utils::sync::gate::Gate,
102 : }
103 :
104 : /// See [`PostgresRedoManager::redo_process`].
105 : enum ProcessOnceCell {
106 : Spawned(Arc<Process>),
107 : ManagerShutDown,
108 : }
109 :
110 : struct Process {
111 : process: process::WalRedoProcess,
112 : /// This field is last in this struct so the guard gets dropped _after_ [`Self::process`].
113 : /// (Reminder: dropping [`Self::process`] synchronously sends SIGKILL and then `wait()`s for it to exit).
114 : _launched_processes_guard: utils::sync::gate::GateGuard,
115 : }
116 :
117 : impl std::ops::Deref for Process {
118 : type Target = process::WalRedoProcess;
119 :
120 40 : fn deref(&self) -> &Self::Target {
121 40 : &self.process
122 40 : }
123 : }
124 :
125 : #[derive(Debug, thiserror::Error)]
126 : pub enum Error {
127 : #[error("cancelled")]
128 : Cancelled,
129 : #[error(transparent)]
130 : Other(#[from] anyhow::Error),
131 : }
132 :
133 : macro_rules! bail {
134 : ($($arg:tt)*) => {
135 : return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
136 : }
137 : }
138 :
139 : ///
140 : /// Public interface of WAL redo manager
141 : ///
142 : impl PostgresRedoManager {
143 : ///
144 : /// Request the WAL redo manager to apply some WAL records
145 : ///
146 : /// The WAL redo is handled by a separate thread, so this just sends a request
147 : /// to the thread and waits for response.
148 : ///
149 : /// # Cancel-Safety
150 : ///
151 : /// This method is cancellation-safe.
152 12 : pub async fn request_redo(
153 12 : &self,
154 12 : key: Key,
155 12 : lsn: Lsn,
156 12 : base_img: Option<(Lsn, Bytes)>,
157 12 : records: Vec<(Lsn, NeonWalRecord)>,
158 12 : pg_version: u32,
159 12 : ) -> Result<Bytes, Error> {
160 12 : if records.is_empty() {
161 0 : bail!("invalid WAL redo request with no records");
162 12 : }
163 12 :
164 12 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
165 12 : let mut img = base_img.map(|p| p.1);
166 12 : let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
167 12 : let mut batch_start = 0;
168 12 : for (i, record) in records.iter().enumerate().skip(1) {
169 12 : let rec_neon = apply_neon::can_apply_in_neon(&record.1);
170 12 :
171 12 : if rec_neon != batch_neon {
172 0 : let result = if batch_neon {
173 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
174 : } else {
175 0 : self.apply_batch_postgres(
176 0 : key,
177 0 : lsn,
178 0 : img,
179 0 : base_img_lsn,
180 0 : &records[batch_start..i],
181 0 : self.conf.wal_redo_timeout,
182 0 : pg_version,
183 0 : )
184 0 : .await
185 : };
186 0 : img = Some(result?);
187 :
188 0 : batch_neon = rec_neon;
189 0 : batch_start = i;
190 12 : }
191 : }
192 : // last batch
193 12 : if batch_neon {
194 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
195 : } else {
196 12 : self.apply_batch_postgres(
197 12 : key,
198 12 : lsn,
199 12 : img,
200 12 : base_img_lsn,
201 12 : &records[batch_start..],
202 12 : self.conf.wal_redo_timeout,
203 12 : pg_version,
204 12 : )
205 12 : .await
206 : }
207 12 : }
208 :
209 : /// Do a ping request-response roundtrip.
210 : ///
211 : /// Not used in production, but by Rust benchmarks.
212 : ///
213 : /// # Cancel-Safety
214 : ///
215 : /// This method is cancellation-safe.
216 4 : pub async fn ping(&self, pg_version: u32) -> Result<(), Error> {
217 4 : self.do_with_walredo_process(pg_version, |proc| async move {
218 4 : proc.ping(Duration::from_secs(1))
219 4 : .await
220 4 : .map_err(Error::Other)
221 4 : })
222 4 : .await
223 4 : }
224 :
225 0 : pub fn status(&self) -> WalRedoManagerStatus {
226 0 : WalRedoManagerStatus {
227 0 : last_redo_at: {
228 0 : let at = *self.last_redo_at.lock().unwrap();
229 0 : at.and_then(|at| {
230 0 : let age = at.elapsed();
231 0 : // map any chrono errors silently to None here
232 0 : chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
233 0 : })
234 0 : },
235 0 : process: self.redo_process.get().and_then(|p| match &*p {
236 0 : ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
237 0 : ProcessOnceCell::ManagerShutDown => None,
238 0 : }),
239 0 : }
240 0 : }
241 : }
242 :
243 : impl PostgresRedoManager {
244 : ///
245 : /// Create a new PostgresRedoManager.
246 : ///
247 16 : pub fn new(
248 16 : conf: &'static PageServerConf,
249 16 : tenant_shard_id: TenantShardId,
250 16 : ) -> PostgresRedoManager {
251 16 : // The actual process is launched lazily, on first request.
252 16 : PostgresRedoManager {
253 16 : tenant_shard_id,
254 16 : conf,
255 16 : last_redo_at: std::sync::Mutex::default(),
256 16 : redo_process: heavier_once_cell::OnceCell::default(),
257 16 : launched_processes: utils::sync::gate::Gate::default(),
258 16 : }
259 16 : }
260 :
261 : /// Shut down the WAL redo manager.
262 : ///
263 : /// Returns `true` if this call was the one that initiated shutdown.
264 : /// `true` may be observed by no caller if the first caller stops polling.
265 : ///
266 : /// After this future completes
267 : /// - no redo process is running
268 : /// - no new redo process will be spawned
269 : /// - redo requests that need walredo process will fail with [`Error::Cancelled`]
270 : /// - [`apply_neon`]-only redo requests may still work, but this may change in the future
271 : ///
272 : /// # Cancel-Safety
273 : ///
274 : /// This method is cancellation-safe.
275 0 : pub async fn shutdown(&self) -> bool {
276 : // prevent new processes from being spawned
277 0 : let maybe_permit = match self.redo_process.get_or_init_detached().await {
278 0 : Ok(guard) => {
279 0 : if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
280 0 : None
281 : } else {
282 0 : let (proc, permit) = guard.take_and_deinit();
283 0 : drop(proc); // this just drops the Arc, its refcount may not be zero yet
284 0 : Some(permit)
285 : }
286 : }
287 0 : Err(permit) => Some(permit),
288 : };
289 0 : let it_was_us = if let Some(permit) = maybe_permit {
290 0 : self.redo_process
291 0 : .set(ProcessOnceCell::ManagerShutDown, permit);
292 0 : true
293 : } else {
294 0 : false
295 : };
296 : // wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
297 : // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
298 : // for the underlying process.
299 0 : self.launched_processes.close().await;
300 0 : it_was_us
301 0 : }
302 :
303 : /// This type doesn't have its own background task to check for idleness: we
304 : /// rely on our owner calling this function periodically in its own housekeeping
305 : /// loops.
306 0 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
307 0 : if let Ok(g) = self.last_redo_at.try_lock() {
308 0 : if let Some(last_redo_at) = *g {
309 0 : if last_redo_at.elapsed() >= idle_timeout {
310 0 : drop(g);
311 0 : drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
312 0 : }
313 0 : }
314 0 : }
315 0 : }
316 :
317 : /// # Cancel-Safety
318 : ///
319 : /// This method is cancel-safe iff `closure` is cancel-safe.
320 20 : async fn do_with_walredo_process<
321 20 : F: FnOnce(Arc<Process>) -> Fut,
322 20 : Fut: Future<Output = Result<O, Error>>,
323 20 : O,
324 20 : >(
325 20 : &self,
326 20 : pg_version: u32,
327 20 : closure: F,
328 20 : ) -> Result<O, Error> {
329 20 : let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
330 0 : Ok(guard) => match &*guard {
331 0 : ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
332 : ProcessOnceCell::ManagerShutDown => {
333 0 : return Err(Error::Cancelled);
334 : }
335 : },
336 20 : Err(permit) => {
337 20 : let start = Instant::now();
338 : // acquire guard before spawning process, so that we don't spawn new processes
339 : // if the gate is already closed.
340 20 : let _launched_processes_guard = match self.launched_processes.enter() {
341 20 : Ok(guard) => guard,
342 0 : Err(GateError::GateClosed) => unreachable!(
343 0 : "shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
344 0 : ),
345 : };
346 20 : let proc = Arc::new(Process {
347 20 : process: process::WalRedoProcess::launch(
348 20 : self.conf,
349 20 : self.tenant_shard_id,
350 20 : pg_version,
351 20 : )
352 20 : .context("launch walredo process")?,
353 20 : _launched_processes_guard,
354 20 : });
355 20 : let duration = start.elapsed();
356 20 : WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
357 20 : info!(
358 0 : elapsed_ms = duration.as_millis(),
359 0 : pid = proc.id(),
360 0 : "launched walredo process"
361 : );
362 20 : self.redo_process
363 20 : .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
364 20 : proc
365 : }
366 : };
367 :
368 : // async closures are unstable, would support &Process
369 20 : let result = closure(proc.clone()).await;
370 :
371 20 : if result.is_err() {
372 : // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation.
373 : // Note that there may be other tasks concurrent with us that also hold `proc`.
374 : // We have to deal with that here.
375 : // Also read the doc comment on field `self.redo_process`.
376 : //
377 : // NB: there may still be other concurrent threads using `proc`.
378 : // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
379 : //
380 : // NB: the drop impl blocks the dropping thread with a wait() system call for
381 : // the child process. In some ways the blocking is actually good: if we
382 : // deferred the waiting into the background / to tokio if we used `tokio::process`,
383 : // it could happen that if walredo always fails immediately, we spawn processes faster
384 : // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
385 : // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
386 : // This probably needs revisiting at some later point.
387 8 : match self.redo_process.get() {
388 0 : None => (),
389 8 : Some(guard) => {
390 8 : match &*guard {
391 0 : ProcessOnceCell::ManagerShutDown => {}
392 8 : ProcessOnceCell::Spawned(guard_proc) => {
393 8 : if Arc::ptr_eq(&proc, guard_proc) {
394 8 : // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
395 8 : guard.take_and_deinit();
396 8 : } else {
397 0 : // Another task already spawned another redo process (further up in this method)
398 0 : // and put it into `redo_process`. Do nothing, our view of the world is behind.
399 0 : }
400 : }
401 : }
402 : }
403 : }
404 : // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall.
405 8 : drop(proc);
406 12 : }
407 :
408 20 : result
409 20 : }
410 :
411 : ///
412 : /// Process one request for WAL redo using wal-redo postgres
413 : ///
414 : /// # Cancel-Safety
415 : ///
416 : /// Cancellation safe.
417 : #[allow(clippy::too_many_arguments)]
418 12 : async fn apply_batch_postgres(
419 12 : &self,
420 12 : key: Key,
421 12 : lsn: Lsn,
422 12 : base_img: Option<Bytes>,
423 12 : base_img_lsn: Lsn,
424 12 : records: &[(Lsn, NeonWalRecord)],
425 12 : wal_redo_timeout: Duration,
426 12 : pg_version: u32,
427 12 : ) -> Result<Bytes, Error> {
428 12 : *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
429 :
430 12 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
431 : const MAX_RETRY_ATTEMPTS: u32 = 1;
432 12 : let mut n_attempts = 0u32;
433 16 : loop {
434 16 : let base_img = &base_img;
435 16 : let closure = |proc: Arc<Process>| async move {
436 16 : let started_at = std::time::Instant::now();
437 :
438 : // Relational WAL records are applied using wal-redo-postgres
439 16 : let result = proc
440 16 : .apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
441 16 : .await
442 16 : .context("apply_wal_records");
443 16 :
444 16 : let duration = started_at.elapsed();
445 16 :
446 16 : let len = records.len();
447 32 : let nbytes = records.iter().fold(0, |acumulator, record| {
448 32 : acumulator
449 32 : + match &record.1 {
450 32 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
451 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
452 : }
453 32 : });
454 16 :
455 16 : WAL_REDO_TIME.observe(duration.as_secs_f64());
456 16 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
457 16 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
458 16 :
459 16 : debug!(
460 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
461 0 : len,
462 0 : nbytes,
463 0 : duration.as_micros(),
464 : lsn
465 : );
466 :
467 16 : if let Err(e) = result.as_ref() {
468 8 : error!(
469 0 : "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
470 0 : records.len(),
471 8 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
472 8 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
473 : nbytes,
474 : base_img_lsn,
475 : lsn,
476 : n_attempts,
477 : e,
478 : );
479 8 : }
480 :
481 16 : result.map_err(Error::Other)
482 16 : };
483 16 : let result = self.do_with_walredo_process(pg_version, closure).await;
484 :
485 16 : if result.is_ok() && n_attempts != 0 {
486 0 : info!(n_attempts, "retried walredo succeeded");
487 16 : }
488 16 : n_attempts += 1;
489 16 : if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
490 12 : return result;
491 4 : }
492 : }
493 12 : }
494 :
495 : ///
496 : /// Process a batch of WAL records using bespoken Neon code.
497 : ///
498 0 : fn apply_batch_neon(
499 0 : &self,
500 0 : key: Key,
501 0 : lsn: Lsn,
502 0 : base_img: Option<Bytes>,
503 0 : records: &[(Lsn, NeonWalRecord)],
504 0 : ) -> Result<Bytes, Error> {
505 0 : let start_time = Instant::now();
506 0 :
507 0 : let mut page = BytesMut::new();
508 0 : if let Some(fpi) = base_img {
509 0 : // If full-page image is provided, then use it...
510 0 : page.extend_from_slice(&fpi[..]);
511 0 : } else {
512 : // All the current WAL record types that we can handle require a base image.
513 0 : bail!("invalid neon WAL redo request with no base image");
514 : }
515 :
516 : // Apply all the WAL records in the batch
517 0 : for (record_lsn, record) in records.iter() {
518 0 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
519 : }
520 : // Success!
521 0 : let duration = start_time.elapsed();
522 0 : // FIXME: using the same metric here creates a bimodal distribution by default, and because
523 0 : // there could be multiple batch sizes this would be N+1 modal.
524 0 : WAL_REDO_TIME.observe(duration.as_secs_f64());
525 0 :
526 0 : debug!(
527 0 : "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
528 0 : records.len(),
529 0 : duration.as_micros(),
530 : lsn
531 : );
532 :
533 0 : Ok(page.freeze())
534 0 : }
535 :
536 0 : fn apply_record_neon(
537 0 : &self,
538 0 : key: Key,
539 0 : page: &mut BytesMut,
540 0 : record_lsn: Lsn,
541 0 : record: &NeonWalRecord,
542 0 : ) -> anyhow::Result<()> {
543 0 : apply_neon::apply_in_neon(record, record_lsn, key, page)?;
544 :
545 0 : Ok(())
546 0 : }
547 : }
548 :
549 : #[cfg(test)]
550 : mod tests {
551 : use std::str::FromStr;
552 :
553 : use bytes::Bytes;
554 : use pageserver_api::key::Key;
555 : use pageserver_api::record::NeonWalRecord;
556 : use pageserver_api::shard::TenantShardId;
557 : use tracing::Instrument;
558 : use utils::id::TenantId;
559 : use utils::lsn::Lsn;
560 :
561 : use super::PostgresRedoManager;
562 : use crate::config::PageServerConf;
563 :
564 : #[tokio::test]
565 4 : async fn test_ping() {
566 4 : let h = RedoHarness::new().unwrap();
567 4 :
568 4 : h.manager
569 4 : .ping(14)
570 4 : .instrument(h.span())
571 4 : .await
572 4 : .expect("ping should work");
573 4 : }
574 :
575 : #[tokio::test]
576 4 : async fn short_v14_redo() {
577 4 : let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
578 4 :
579 4 : let h = RedoHarness::new().unwrap();
580 4 :
581 4 : let page = h
582 4 : .manager
583 4 : .request_redo(
584 4 : Key {
585 4 : field1: 0,
586 4 : field2: 1663,
587 4 : field3: 13010,
588 4 : field4: 1259,
589 4 : field5: 0,
590 4 : field6: 0,
591 4 : },
592 4 : Lsn::from_str("0/16E2408").unwrap(),
593 4 : None,
594 4 : short_records(),
595 4 : 14,
596 4 : )
597 4 : .instrument(h.span())
598 4 : .await
599 4 : .unwrap();
600 4 :
601 4 : assert_eq!(&expected, &*page);
602 4 : }
603 :
604 : #[tokio::test]
605 4 : async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
606 4 : let h = RedoHarness::new().unwrap();
607 4 :
608 4 : let page = h
609 4 : .manager
610 4 : .request_redo(
611 4 : Key {
612 4 : field1: 0,
613 4 : field2: 1663,
614 4 : // key should be 13010
615 4 : field3: 13130,
616 4 : field4: 1259,
617 4 : field5: 0,
618 4 : field6: 0,
619 4 : },
620 4 : Lsn::from_str("0/16E2408").unwrap(),
621 4 : None,
622 4 : short_records(),
623 4 : 14,
624 4 : )
625 4 : .instrument(h.span())
626 4 : .await
627 4 : .unwrap();
628 4 :
629 4 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
630 4 : // perhaps be captured as long as it's in the same thread.
631 4 : assert_eq!(page, crate::ZERO_PAGE);
632 4 : }
633 :
634 : #[tokio::test]
635 4 : async fn test_stderr() {
636 4 : let h = RedoHarness::new().unwrap();
637 4 : h
638 4 : .manager
639 4 : .request_redo(
640 4 : Key::from_i128(0),
641 4 : Lsn::INVALID,
642 4 : None,
643 4 : short_records(),
644 4 : 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
645 4 : )
646 4 : .instrument(h.span())
647 4 : .await
648 4 : .unwrap_err();
649 4 : }
650 :
651 : #[allow(clippy::octal_escapes)]
652 12 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
653 12 : vec![
654 12 : (
655 12 : Lsn::from_str("0/16A9388").unwrap(),
656 12 : NeonWalRecord::Postgres {
657 12 : will_init: true,
658 12 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
659 12 : }
660 12 : ),
661 12 : (
662 12 : Lsn::from_str("0/16D4080").unwrap(),
663 12 : NeonWalRecord::Postgres {
664 12 : will_init: false,
665 12 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
666 12 : }
667 12 : )
668 12 : ]
669 12 : }
670 :
671 : struct RedoHarness {
672 : // underscored because unused, except for removal at drop
673 : _repo_dir: camino_tempfile::Utf8TempDir,
674 : manager: PostgresRedoManager,
675 : tenant_shard_id: TenantShardId,
676 : }
677 :
678 : impl RedoHarness {
679 16 : fn new() -> anyhow::Result<Self> {
680 16 : crate::tenant::harness::setup_logging();
681 :
682 16 : let repo_dir = camino_tempfile::tempdir()?;
683 16 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
684 16 : let conf = Box::leak(Box::new(conf));
685 16 : let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
686 16 :
687 16 : let manager = PostgresRedoManager::new(conf, tenant_shard_id);
688 16 :
689 16 : Ok(RedoHarness {
690 16 : _repo_dir: repo_dir,
691 16 : manager,
692 16 : tenant_shard_id,
693 16 : })
694 16 : }
695 16 : fn span(&self) -> tracing::Span {
696 16 : tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
697 16 : }
698 : }
699 : }
|