Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 :
21 : /// Process lifecycle and abstracction for the IPC protocol.
22 : mod process;
23 :
24 : /// Code to apply [`NeonWalRecord`]s.
25 : pub(crate) mod apply_neon;
26 :
27 : use crate::config::PageServerConf;
28 : use crate::metrics::{
29 : WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
30 : WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
31 : };
32 : use crate::repository::Key;
33 : use crate::walrecord::NeonWalRecord;
34 : use anyhow::Context;
35 : use bytes::{Bytes, BytesMut};
36 : use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
37 : use pageserver_api::shard::TenantShardId;
38 : use std::sync::Arc;
39 : use std::time::Duration;
40 : use std::time::Instant;
41 : use tracing::*;
42 : use utils::lsn::Lsn;
43 : use utils::sync::gate::GateError;
44 : use utils::sync::heavier_once_cell;
45 :
46 : /// The real implementation that uses a Postgres process to
47 : /// perform WAL replay.
48 : ///
49 : /// Only one thread can use the process at a time, that is controlled by the
50 : /// Mutex. In the future, we might want to launch a pool of processes to allow
51 : /// concurrent replay of multiple records.
52 : pub struct PostgresRedoManager {
53 : tenant_shard_id: TenantShardId,
54 : conf: &'static PageServerConf,
55 : last_redo_at: std::sync::Mutex<Option<Instant>>,
56 : /// We use [`heavier_once_cell`] for
57 : ///
58 : /// 1. coalescing the lazy spawning of walredo processes ([`ProcessOnceCell::Spawned`])
59 : /// 2. prevent new processes from being spawned on [`Self::shutdown`] (=> [`ProcessOnceCell::ManagerShutDown`]).
60 : ///
61 : /// # Spawning
62 : ///
63 : /// Redo requests use the once cell to coalesce onto one call to [`process::WalRedoProcess::launch`].
64 : ///
65 : /// Notably, requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
66 : /// their process object; we use [`Arc::clone`] for that.
67 : ///
68 : /// This is primarily because earlier implementations that didn't use [`heavier_once_cell`]
69 : /// had that behavior; it's probably unnecessary.
70 : /// The only merit of it is that if one walredo process encounters an error,
71 : /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`].
72 : /// and retry redo, thereby starting the new process, while other redo tasks might
73 : /// still be using the old redo process. But, those other tasks will most likely
74 : /// encounter an error as well, and errors are an unexpected condition anyway.
75 : /// So, probably we could get rid of the `Arc` in the future.
76 : ///
77 : /// # Shutdown
78 : ///
79 : /// See [`Self::launched_processes`].
80 : redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
81 :
82 : /// Gate that is entered when launching a walredo process and held open
83 : /// until the process has been `kill()`ed and `wait()`ed upon.
84 : ///
85 : /// Manager shutdown waits for this gate to close after setting the
86 : /// [`ProcessOnceCell::ManagerShutDown`] state in [`Self::redo_process`].
87 : ///
88 : /// This type of usage is a bit unusual because gates usually keep track of
89 : /// concurrent operations, e.g., every [`Self::request_redo`] that is inflight.
90 : /// But we use it here to keep track of the _processes_ that we have launched,
91 : /// which may outlive any individual redo request because
92 : /// - we keep walredo process around until its quiesced to amortize spawn cost and
93 : /// - the Arc may be held by multiple concurrent redo requests, so, just because
94 : /// you replace the [`Self::redo_process`] cell's content doesn't mean the
95 : /// process gets killed immediately.
96 : ///
97 : /// We could simplify this by getting rid of the [`Arc`].
98 : /// See the comment on [`Self::redo_process`] for more details.
99 : launched_processes: utils::sync::gate::Gate,
100 : }
101 :
102 : /// See [`PostgresRedoManager::redo_process`].
103 : enum ProcessOnceCell {
104 : Spawned(Arc<Process>),
105 : ManagerShutDown,
106 : }
107 :
108 : struct Process {
109 : process: process::WalRedoProcess,
110 : /// This field is last in this struct so the guard gets dropped _after_ [`Self::process`].
111 : /// (Reminder: dropping [`Self::process`] synchronously sends SIGKILL and then `wait()`s for it to exit).
112 : _launched_processes_guard: utils::sync::gate::GateGuard,
113 : }
114 :
115 : impl std::ops::Deref for Process {
116 : type Target = process::WalRedoProcess;
117 :
118 48 : fn deref(&self) -> &Self::Target {
119 48 : &self.process
120 48 : }
121 : }
122 :
123 0 : #[derive(Debug, thiserror::Error)]
124 : pub enum Error {
125 : #[error("cancelled")]
126 : Cancelled,
127 : #[error(transparent)]
128 : Other(#[from] anyhow::Error),
129 : }
130 :
131 : macro_rules! bail {
132 : ($($arg:tt)*) => {
133 : return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
134 : }
135 : }
136 :
137 : ///
138 : /// Public interface of WAL redo manager
139 : ///
140 : impl PostgresRedoManager {
141 : ///
142 : /// Request the WAL redo manager to apply some WAL records
143 : ///
144 : /// The WAL redo is handled by a separate thread, so this just sends a request
145 : /// to the thread and waits for response.
146 : ///
147 : /// # Cancel-Safety
148 : ///
149 : /// This method is cancellation-safe.
150 18 : pub async fn request_redo(
151 18 : &self,
152 18 : key: Key,
153 18 : lsn: Lsn,
154 18 : base_img: Option<(Lsn, Bytes)>,
155 18 : records: Vec<(Lsn, NeonWalRecord)>,
156 18 : pg_version: u32,
157 18 : ) -> Result<Bytes, Error> {
158 18 : if records.is_empty() {
159 0 : bail!("invalid WAL redo request with no records");
160 18 : }
161 18 :
162 18 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
163 18 : let mut img = base_img.map(|p| p.1);
164 18 : let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
165 18 : let mut batch_start = 0;
166 18 : for (i, record) in records.iter().enumerate().skip(1) {
167 18 : let rec_neon = apply_neon::can_apply_in_neon(&record.1);
168 18 :
169 18 : if rec_neon != batch_neon {
170 0 : let result = if batch_neon {
171 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
172 : } else {
173 0 : self.apply_batch_postgres(
174 0 : key,
175 0 : lsn,
176 0 : img,
177 0 : base_img_lsn,
178 0 : &records[batch_start..i],
179 0 : self.conf.wal_redo_timeout,
180 0 : pg_version,
181 0 : )
182 0 : .await
183 : };
184 0 : img = Some(result?);
185 :
186 0 : batch_neon = rec_neon;
187 0 : batch_start = i;
188 18 : }
189 : }
190 : // last batch
191 18 : if batch_neon {
192 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
193 : } else {
194 18 : self.apply_batch_postgres(
195 18 : key,
196 18 : lsn,
197 18 : img,
198 18 : base_img_lsn,
199 18 : &records[batch_start..],
200 18 : self.conf.wal_redo_timeout,
201 18 : pg_version,
202 18 : )
203 48 : .await
204 : }
205 18 : }
206 :
207 0 : pub fn status(&self) -> WalRedoManagerStatus {
208 0 : WalRedoManagerStatus {
209 0 : last_redo_at: {
210 0 : let at = *self.last_redo_at.lock().unwrap();
211 0 : at.and_then(|at| {
212 0 : let age = at.elapsed();
213 0 : // map any chrono errors silently to None here
214 0 : chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
215 0 : })
216 0 : },
217 0 : process: self.redo_process.get().and_then(|p| match &*p {
218 0 : ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
219 0 : ProcessOnceCell::ManagerShutDown => None,
220 0 : }),
221 0 : }
222 0 : }
223 : }
224 :
225 : impl PostgresRedoManager {
226 : ///
227 : /// Create a new PostgresRedoManager.
228 : ///
229 18 : pub fn new(
230 18 : conf: &'static PageServerConf,
231 18 : tenant_shard_id: TenantShardId,
232 18 : ) -> PostgresRedoManager {
233 18 : // The actual process is launched lazily, on first request.
234 18 : PostgresRedoManager {
235 18 : tenant_shard_id,
236 18 : conf,
237 18 : last_redo_at: std::sync::Mutex::default(),
238 18 : redo_process: heavier_once_cell::OnceCell::default(),
239 18 : launched_processes: utils::sync::gate::Gate::default(),
240 18 : }
241 18 : }
242 :
243 : /// Shut down the WAL redo manager.
244 : ///
245 : /// Returns `true` if this call was the one that initiated shutdown.
246 : /// `true` may be observed by no caller if the first caller stops polling.
247 : ///
248 : /// After this future completes
249 : /// - no redo process is running
250 : /// - no new redo process will be spawned
251 : /// - redo requests that need walredo process will fail with [`Error::Cancelled`]
252 : /// - [`apply_neon`]-only redo requests may still work, but this may change in the future
253 : ///
254 : /// # Cancel-Safety
255 : ///
256 : /// This method is cancellation-safe.
257 0 : pub async fn shutdown(&self) -> bool {
258 : // prevent new processes from being spawned
259 0 : let maybe_permit = match self.redo_process.get_or_init_detached().await {
260 0 : Ok(guard) => {
261 0 : if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
262 0 : None
263 : } else {
264 0 : let (proc, permit) = guard.take_and_deinit();
265 0 : drop(proc); // this just drops the Arc, its refcount may not be zero yet
266 0 : Some(permit)
267 : }
268 : }
269 0 : Err(permit) => Some(permit),
270 : };
271 0 : let it_was_us = if let Some(permit) = maybe_permit {
272 0 : self.redo_process
273 0 : .set(ProcessOnceCell::ManagerShutDown, permit);
274 0 : true
275 : } else {
276 0 : false
277 : };
278 : // wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
279 : // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
280 : // for the underlying process.
281 0 : self.launched_processes.close().await;
282 0 : it_was_us
283 0 : }
284 :
285 : /// This type doesn't have its own background task to check for idleness: we
286 : /// rely on our owner calling this function periodically in its own housekeeping
287 : /// loops.
288 0 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
289 0 : if let Ok(g) = self.last_redo_at.try_lock() {
290 0 : if let Some(last_redo_at) = *g {
291 0 : if last_redo_at.elapsed() >= idle_timeout {
292 0 : drop(g);
293 0 : drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
294 0 : }
295 0 : }
296 0 : }
297 0 : }
298 :
299 : ///
300 : /// Process one request for WAL redo using wal-redo postgres
301 : ///
302 : /// # Cancel-Safety
303 : ///
304 : /// Cancellation safe.
305 : #[allow(clippy::too_many_arguments)]
306 18 : async fn apply_batch_postgres(
307 18 : &self,
308 18 : key: Key,
309 18 : lsn: Lsn,
310 18 : base_img: Option<Bytes>,
311 18 : base_img_lsn: Lsn,
312 18 : records: &[(Lsn, NeonWalRecord)],
313 18 : wal_redo_timeout: Duration,
314 18 : pg_version: u32,
315 18 : ) -> Result<Bytes, Error> {
316 18 : *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
317 :
318 18 : let (rel, blknum) = key.to_rel_block().context("invalid record")?;
319 : const MAX_RETRY_ATTEMPTS: u32 = 1;
320 18 : let mut n_attempts = 0u32;
321 : loop {
322 24 : let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
323 0 : Ok(guard) => match &*guard {
324 0 : ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
325 : ProcessOnceCell::ManagerShutDown => {
326 0 : return Err(Error::Cancelled);
327 : }
328 : },
329 24 : Err(permit) => {
330 24 : let start = Instant::now();
331 : // acquire guard before spawning process, so that we don't spawn new processes
332 : // if the gate is already closed.
333 24 : let _launched_processes_guard = match self.launched_processes.enter() {
334 24 : Ok(guard) => guard,
335 0 : Err(GateError::GateClosed) => unreachable!(
336 0 : "shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
337 0 : ),
338 : };
339 24 : let proc = Arc::new(Process {
340 24 : process: process::WalRedoProcess::launch(
341 24 : self.conf,
342 24 : self.tenant_shard_id,
343 24 : pg_version,
344 24 : )
345 24 : .context("launch walredo process")?,
346 24 : _launched_processes_guard,
347 24 : });
348 24 : let duration = start.elapsed();
349 24 : WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
350 24 : info!(
351 0 : duration_ms = duration.as_millis(),
352 0 : pid = proc.id(),
353 0 : "launched walredo process"
354 : );
355 24 : self.redo_process
356 24 : .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
357 24 : proc
358 : }
359 : };
360 :
361 24 : let started_at = std::time::Instant::now();
362 :
363 : // Relational WAL records are applied using wal-redo-postgres
364 24 : let result = proc
365 24 : .apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
366 48 : .await
367 24 : .context("apply_wal_records");
368 24 :
369 24 : let duration = started_at.elapsed();
370 24 :
371 24 : let len = records.len();
372 48 : let nbytes = records.iter().fold(0, |acumulator, record| {
373 48 : acumulator
374 48 : + match &record.1 {
375 48 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
376 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
377 : }
378 48 : });
379 24 :
380 24 : WAL_REDO_TIME.observe(duration.as_secs_f64());
381 24 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
382 24 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
383 24 :
384 24 : debug!(
385 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
386 0 : len,
387 0 : nbytes,
388 0 : duration.as_micros(),
389 : lsn
390 : );
391 :
392 : // If something went wrong, don't try to reuse the process. Kill it, and
393 : // next request will launch a new one.
394 24 : if let Err(e) = result.as_ref() {
395 12 : error!(
396 0 : "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
397 0 : records.len(),
398 12 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
399 12 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
400 : nbytes,
401 : base_img_lsn,
402 : lsn,
403 : n_attempts,
404 : e,
405 : );
406 : // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation.
407 : // Note that there may be other tasks concurrent with us that also hold `proc`.
408 : // We have to deal with that here.
409 : // Also read the doc comment on field `self.redo_process`.
410 : //
411 : // NB: there may still be other concurrent threads using `proc`.
412 : // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
413 : //
414 : // NB: the drop impl blocks the dropping thread with a wait() system call for
415 : // the child process. In some ways the blocking is actually good: if we
416 : // deferred the waiting into the background / to tokio if we used `tokio::process`,
417 : // it could happen that if walredo always fails immediately, we spawn processes faster
418 : // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
419 : // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
420 : // This probably needs revisiting at some later point.
421 12 : match self.redo_process.get() {
422 0 : None => (),
423 12 : Some(guard) => {
424 12 : match &*guard {
425 0 : ProcessOnceCell::ManagerShutDown => {}
426 12 : ProcessOnceCell::Spawned(guard_proc) => {
427 12 : if Arc::ptr_eq(&proc, guard_proc) {
428 12 : // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
429 12 : guard.take_and_deinit();
430 12 : } else {
431 0 : // Another task already spawned another redo process (further up in this method)
432 0 : // and put it into `redo_process`. Do nothing, our view of the world is behind.
433 0 : }
434 : }
435 : }
436 : }
437 : }
438 : // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall.
439 12 : drop(proc);
440 12 : } else if n_attempts != 0 {
441 0 : info!(n_attempts, "retried walredo succeeded");
442 12 : }
443 24 : n_attempts += 1;
444 24 : if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
445 18 : return result.map_err(Error::Other);
446 6 : }
447 : }
448 18 : }
449 :
450 : ///
451 : /// Process a batch of WAL records using bespoken Neon code.
452 : ///
453 0 : fn apply_batch_neon(
454 0 : &self,
455 0 : key: Key,
456 0 : lsn: Lsn,
457 0 : base_img: Option<Bytes>,
458 0 : records: &[(Lsn, NeonWalRecord)],
459 0 : ) -> Result<Bytes, Error> {
460 0 : let start_time = Instant::now();
461 0 :
462 0 : let mut page = BytesMut::new();
463 0 : if let Some(fpi) = base_img {
464 0 : // If full-page image is provided, then use it...
465 0 : page.extend_from_slice(&fpi[..]);
466 0 : } else {
467 : // All the current WAL record types that we can handle require a base image.
468 0 : bail!("invalid neon WAL redo request with no base image");
469 : }
470 :
471 : // Apply all the WAL records in the batch
472 0 : for (record_lsn, record) in records.iter() {
473 0 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
474 : }
475 : // Success!
476 0 : let duration = start_time.elapsed();
477 0 : // FIXME: using the same metric here creates a bimodal distribution by default, and because
478 0 : // there could be multiple batch sizes this would be N+1 modal.
479 0 : WAL_REDO_TIME.observe(duration.as_secs_f64());
480 0 :
481 0 : debug!(
482 0 : "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
483 0 : records.len(),
484 0 : duration.as_micros(),
485 : lsn
486 : );
487 :
488 0 : Ok(page.freeze())
489 0 : }
490 :
491 0 : fn apply_record_neon(
492 0 : &self,
493 0 : key: Key,
494 0 : page: &mut BytesMut,
495 0 : record_lsn: Lsn,
496 0 : record: &NeonWalRecord,
497 0 : ) -> anyhow::Result<()> {
498 0 : apply_neon::apply_in_neon(record, record_lsn, key, page)?;
499 :
500 0 : Ok(())
501 0 : }
502 : }
503 :
504 : #[cfg(test)]
505 : mod tests {
506 : use super::PostgresRedoManager;
507 : use crate::repository::Key;
508 : use crate::{config::PageServerConf, walrecord::NeonWalRecord};
509 : use bytes::Bytes;
510 : use pageserver_api::shard::TenantShardId;
511 : use std::str::FromStr;
512 : use tracing::Instrument;
513 : use utils::{id::TenantId, lsn::Lsn};
514 :
515 : #[tokio::test]
516 6 : async fn short_v14_redo() {
517 6 : let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
518 6 :
519 6 : let h = RedoHarness::new().unwrap();
520 6 :
521 6 : let page = h
522 6 : .manager
523 6 : .request_redo(
524 6 : Key {
525 6 : field1: 0,
526 6 : field2: 1663,
527 6 : field3: 13010,
528 6 : field4: 1259,
529 6 : field5: 0,
530 6 : field6: 0,
531 6 : },
532 6 : Lsn::from_str("0/16E2408").unwrap(),
533 6 : None,
534 6 : short_records(),
535 6 : 14,
536 6 : )
537 6 : .instrument(h.span())
538 12 : .await
539 6 : .unwrap();
540 6 :
541 6 : assert_eq!(&expected, &*page);
542 6 : }
543 :
544 : #[tokio::test]
545 6 : async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
546 6 : let h = RedoHarness::new().unwrap();
547 6 :
548 6 : let page = h
549 6 : .manager
550 6 : .request_redo(
551 6 : Key {
552 6 : field1: 0,
553 6 : field2: 1663,
554 6 : // key should be 13010
555 6 : field3: 13130,
556 6 : field4: 1259,
557 6 : field5: 0,
558 6 : field6: 0,
559 6 : },
560 6 : Lsn::from_str("0/16E2408").unwrap(),
561 6 : None,
562 6 : short_records(),
563 6 : 14,
564 6 : )
565 6 : .instrument(h.span())
566 12 : .await
567 6 : .unwrap();
568 6 :
569 6 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
570 6 : // perhaps be captured as long as it's in the same thread.
571 6 : assert_eq!(page, crate::ZERO_PAGE);
572 6 : }
573 :
574 : #[tokio::test]
575 6 : async fn test_stderr() {
576 6 : let h = RedoHarness::new().unwrap();
577 6 : h
578 6 : .manager
579 6 : .request_redo(
580 6 : Key::from_i128(0),
581 6 : Lsn::INVALID,
582 6 : None,
583 6 : short_records(),
584 6 : 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
585 6 : )
586 6 : .instrument(h.span())
587 24 : .await
588 6 : .unwrap_err();
589 6 : }
590 :
591 : #[allow(clippy::octal_escapes)]
592 18 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
593 18 : vec![
594 18 : (
595 18 : Lsn::from_str("0/16A9388").unwrap(),
596 18 : NeonWalRecord::Postgres {
597 18 : will_init: true,
598 18 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
599 18 : }
600 18 : ),
601 18 : (
602 18 : Lsn::from_str("0/16D4080").unwrap(),
603 18 : NeonWalRecord::Postgres {
604 18 : will_init: false,
605 18 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
606 18 : }
607 18 : )
608 18 : ]
609 18 : }
610 :
611 : struct RedoHarness {
612 : // underscored because unused, except for removal at drop
613 : _repo_dir: camino_tempfile::Utf8TempDir,
614 : manager: PostgresRedoManager,
615 : tenant_shard_id: TenantShardId,
616 : }
617 :
618 : impl RedoHarness {
619 18 : fn new() -> anyhow::Result<Self> {
620 18 : crate::tenant::harness::setup_logging();
621 :
622 18 : let repo_dir = camino_tempfile::tempdir()?;
623 18 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
624 18 : let conf = Box::leak(Box::new(conf));
625 18 : let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
626 18 :
627 18 : let manager = PostgresRedoManager::new(conf, tenant_shard_id);
628 18 :
629 18 : Ok(RedoHarness {
630 18 : _repo_dir: repo_dir,
631 18 : manager,
632 18 : tenant_shard_id,
633 18 : })
634 18 : }
635 18 : fn span(&self) -> tracing::Span {
636 18 : tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
637 18 : }
638 : }
639 : }
|