Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 :
21 : /// Process lifecycle and abstracction for the IPC protocol.
22 : mod process;
23 :
24 : /// Code to apply [`NeonWalRecord`]s.
25 : pub(crate) mod apply_neon;
26 :
27 : use crate::config::PageServerConf;
28 : use crate::metrics::{
29 : WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
30 : WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
31 : };
32 : use crate::repository::Key;
33 : use crate::walrecord::NeonWalRecord;
34 : use anyhow::Context;
35 : use bytes::{Bytes, BytesMut};
36 : use pageserver_api::key::key_to_rel_block;
37 : use pageserver_api::models::WalRedoManagerStatus;
38 : use pageserver_api::shard::TenantShardId;
39 : use std::sync::Arc;
40 : use std::time::Duration;
41 : use std::time::Instant;
42 : use tracing::*;
43 : use utils::lsn::Lsn;
44 : use utils::sync::heavier_once_cell;
45 :
46 : ///
47 : /// This is the real implementation that uses a Postgres process to
48 : /// perform WAL replay. Only one thread can use the process at a time,
49 : /// that is controlled by the Mutex. In the future, we might want to
50 : /// launch a pool of processes to allow concurrent replay of multiple
51 : /// records.
52 : ///
53 : pub struct PostgresRedoManager {
54 : tenant_shard_id: TenantShardId,
55 : conf: &'static PageServerConf,
56 : last_redo_at: std::sync::Mutex<Option<Instant>>,
57 : /// The current [`process::WalRedoProcess`] that is used by new redo requests.
58 : /// We use [`heavier_once_cell`] for coalescing the spawning, but the redo
59 : /// requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
60 : /// their process object; we use [`Arc::clone`] for that.
61 : /// This is primarily because earlier implementations that didn't use [`heavier_once_cell`]
62 : /// had that behavior; it's probably unnecessary.
63 : /// The only merit of it is that if one walredo process encounters an error,
64 : /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`].
65 : /// and retry redo, thereby starting the new process, while other redo tasks might
66 : /// still be using the old redo process. But, those other tasks will most likely
67 : /// encounter an error as well, and errors are an unexpected condition anyway.
68 : /// So, probably we could get rid of the `Arc` in the future.
69 : redo_process: heavier_once_cell::OnceCell<Arc<process::WalRedoProcess>>,
70 : }
71 :
72 : ///
73 : /// Public interface of WAL redo manager
74 : ///
75 : impl PostgresRedoManager {
76 : ///
77 : /// Request the WAL redo manager to apply some WAL records
78 : ///
79 : /// The WAL redo is handled by a separate thread, so this just sends a request
80 : /// to the thread and waits for response.
81 : ///
82 : /// # Cancel-Safety
83 : ///
84 : /// This method is cancellation-safe.
85 6 : pub async fn request_redo(
86 6 : &self,
87 6 : key: Key,
88 6 : lsn: Lsn,
89 6 : base_img: Option<(Lsn, Bytes)>,
90 6 : records: Vec<(Lsn, NeonWalRecord)>,
91 6 : pg_version: u32,
92 6 : ) -> anyhow::Result<Bytes> {
93 6 : if records.is_empty() {
94 0 : anyhow::bail!("invalid WAL redo request with no records");
95 6 : }
96 6 :
97 6 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
98 6 : let mut img = base_img.map(|p| p.1);
99 6 : let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
100 6 : let mut batch_start = 0;
101 6 : for (i, record) in records.iter().enumerate().skip(1) {
102 6 : let rec_neon = apply_neon::can_apply_in_neon(&record.1);
103 6 :
104 6 : if rec_neon != batch_neon {
105 0 : let result = if batch_neon {
106 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
107 : } else {
108 0 : self.apply_batch_postgres(
109 0 : key,
110 0 : lsn,
111 0 : img,
112 0 : base_img_lsn,
113 0 : &records[batch_start..i],
114 0 : self.conf.wal_redo_timeout,
115 0 : pg_version,
116 0 : )
117 0 : .await
118 : };
119 0 : img = Some(result?);
120 :
121 0 : batch_neon = rec_neon;
122 0 : batch_start = i;
123 6 : }
124 : }
125 : // last batch
126 6 : if batch_neon {
127 0 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
128 : } else {
129 6 : self.apply_batch_postgres(
130 6 : key,
131 6 : lsn,
132 6 : img,
133 6 : base_img_lsn,
134 6 : &records[batch_start..],
135 6 : self.conf.wal_redo_timeout,
136 6 : pg_version,
137 6 : )
138 0 : .await
139 : }
140 6 : }
141 :
142 0 : pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
143 0 : Some(WalRedoManagerStatus {
144 0 : last_redo_at: {
145 0 : let at = *self.last_redo_at.lock().unwrap();
146 0 : at.and_then(|at| {
147 0 : let age = at.elapsed();
148 0 : // map any chrono errors silently to None here
149 0 : chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
150 0 : })
151 0 : },
152 0 : pid: self.redo_process.get().map(|p| p.id()),
153 0 : })
154 0 : }
155 : }
156 :
157 : impl PostgresRedoManager {
158 : ///
159 : /// Create a new PostgresRedoManager.
160 : ///
161 6 : pub fn new(
162 6 : conf: &'static PageServerConf,
163 6 : tenant_shard_id: TenantShardId,
164 6 : ) -> PostgresRedoManager {
165 6 : // The actual process is launched lazily, on first request.
166 6 : PostgresRedoManager {
167 6 : tenant_shard_id,
168 6 : conf,
169 6 : last_redo_at: std::sync::Mutex::default(),
170 6 : redo_process: heavier_once_cell::OnceCell::default(),
171 6 : }
172 6 : }
173 :
174 : /// This type doesn't have its own background task to check for idleness: we
175 : /// rely on our owner calling this function periodically in its own housekeeping
176 : /// loops.
177 0 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
178 0 : if let Ok(g) = self.last_redo_at.try_lock() {
179 0 : if let Some(last_redo_at) = *g {
180 0 : if last_redo_at.elapsed() >= idle_timeout {
181 0 : drop(g);
182 0 : drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
183 0 : }
184 0 : }
185 0 : }
186 0 : }
187 :
188 : ///
189 : /// Process one request for WAL redo using wal-redo postgres
190 : ///
191 : /// # Cancel-Safety
192 : ///
193 : /// Cancellation safe.
194 : #[allow(clippy::too_many_arguments)]
195 6 : async fn apply_batch_postgres(
196 6 : &self,
197 6 : key: Key,
198 6 : lsn: Lsn,
199 6 : base_img: Option<Bytes>,
200 6 : base_img_lsn: Lsn,
201 6 : records: &[(Lsn, NeonWalRecord)],
202 6 : wal_redo_timeout: Duration,
203 6 : pg_version: u32,
204 6 : ) -> anyhow::Result<Bytes> {
205 6 : *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
206 :
207 6 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
208 : const MAX_RETRY_ATTEMPTS: u32 = 1;
209 6 : let mut n_attempts = 0u32;
210 : loop {
211 8 : let proc: Arc<process::WalRedoProcess> =
212 8 : match self.redo_process.get_or_init_detached().await {
213 0 : Ok(guard) => Arc::clone(&guard),
214 8 : Err(permit) => {
215 8 : // don't hold poison_guard, the launch code can bail
216 8 : let start = Instant::now();
217 8 : let proc = Arc::new(
218 8 : process::WalRedoProcess::launch(
219 8 : self.conf,
220 8 : self.tenant_shard_id,
221 8 : pg_version,
222 8 : )
223 8 : .context("launch walredo process")?,
224 : );
225 8 : let duration = start.elapsed();
226 8 : WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
227 8 : info!(
228 8 : duration_ms = duration.as_millis(),
229 8 : pid = proc.id(),
230 8 : "launched walredo process"
231 8 : );
232 8 : self.redo_process.set(Arc::clone(&proc), permit);
233 8 : proc
234 : }
235 : };
236 :
237 8 : let started_at = std::time::Instant::now();
238 8 :
239 8 : // Relational WAL records are applied using wal-redo-postgres
240 8 : let result = proc
241 8 : .apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
242 8 : .context("apply_wal_records");
243 8 :
244 8 : let duration = started_at.elapsed();
245 8 :
246 8 : let len = records.len();
247 16 : let nbytes = records.iter().fold(0, |acumulator, record| {
248 16 : acumulator
249 16 : + match &record.1 {
250 16 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
251 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
252 : }
253 16 : });
254 8 :
255 8 : WAL_REDO_TIME.observe(duration.as_secs_f64());
256 8 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
257 8 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
258 8 :
259 8 : debug!(
260 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
261 0 : len,
262 0 : nbytes,
263 0 : duration.as_micros(),
264 0 : lsn
265 0 : );
266 :
267 : // If something went wrong, don't try to reuse the process. Kill it, and
268 : // next request will launch a new one.
269 8 : if let Err(e) = result.as_ref() {
270 4 : error!(
271 4 : "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
272 4 : records.len(),
273 4 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
274 4 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
275 4 : nbytes,
276 4 : base_img_lsn,
277 4 : lsn,
278 4 : n_attempts,
279 4 : e,
280 4 : );
281 : // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation.
282 : // Note that there may be other tasks concurrent with us that also hold `proc`.
283 : // We have to deal with that here.
284 : // Also read the doc comment on field `self.redo_process`.
285 : //
286 : // NB: there may still be other concurrent threads using `proc`.
287 : // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
288 : //
289 : // NB: the drop impl blocks the dropping thread with a wait() system call for
290 : // the child process. In some ways the blocking is actually good: if we
291 : // deferred the waiting into the background / to tokio if we used `tokio::process`,
292 : // it could happen that if walredo always fails immediately, we spawn processes faster
293 : // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
294 : // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
295 : // This probably needs revisiting at some later point.
296 4 : match self.redo_process.get() {
297 0 : None => (),
298 4 : Some(guard) => {
299 4 : if Arc::ptr_eq(&proc, &*guard) {
300 4 : // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
301 4 : guard.take_and_deinit();
302 4 : } else {
303 0 : // Another task already spawned another redo process (further up in this method)
304 0 : // and put it into `redo_process`. Do nothing, our view of the world is behind.
305 0 : }
306 : }
307 : }
308 : // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall.
309 4 : drop(proc);
310 4 : } else if n_attempts != 0 {
311 0 : info!(n_attempts, "retried walredo succeeded");
312 4 : }
313 8 : n_attempts += 1;
314 8 : if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
315 6 : return result;
316 2 : }
317 : }
318 6 : }
319 :
320 : ///
321 : /// Process a batch of WAL records using bespoken Neon code.
322 : ///
323 0 : fn apply_batch_neon(
324 0 : &self,
325 0 : key: Key,
326 0 : lsn: Lsn,
327 0 : base_img: Option<Bytes>,
328 0 : records: &[(Lsn, NeonWalRecord)],
329 0 : ) -> anyhow::Result<Bytes> {
330 0 : let start_time = Instant::now();
331 0 :
332 0 : let mut page = BytesMut::new();
333 0 : if let Some(fpi) = base_img {
334 0 : // If full-page image is provided, then use it...
335 0 : page.extend_from_slice(&fpi[..]);
336 0 : } else {
337 : // All the current WAL record types that we can handle require a base image.
338 0 : anyhow::bail!("invalid neon WAL redo request with no base image");
339 : }
340 :
341 : // Apply all the WAL records in the batch
342 0 : for (record_lsn, record) in records.iter() {
343 0 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
344 : }
345 : // Success!
346 0 : let duration = start_time.elapsed();
347 0 : // FIXME: using the same metric here creates a bimodal distribution by default, and because
348 0 : // there could be multiple batch sizes this would be N+1 modal.
349 0 : WAL_REDO_TIME.observe(duration.as_secs_f64());
350 0 :
351 0 : debug!(
352 0 : "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
353 0 : records.len(),
354 0 : duration.as_micros(),
355 0 : lsn
356 0 : );
357 :
358 0 : Ok(page.freeze())
359 0 : }
360 :
361 0 : fn apply_record_neon(
362 0 : &self,
363 0 : key: Key,
364 0 : page: &mut BytesMut,
365 0 : _record_lsn: Lsn,
366 0 : record: &NeonWalRecord,
367 0 : ) -> anyhow::Result<()> {
368 0 : apply_neon::apply_in_neon(record, key, page)?;
369 :
370 0 : Ok(())
371 0 : }
372 : }
373 :
374 : #[cfg(test)]
375 : mod tests {
376 : use super::PostgresRedoManager;
377 : use crate::repository::Key;
378 : use crate::{config::PageServerConf, walrecord::NeonWalRecord};
379 : use bytes::Bytes;
380 : use pageserver_api::shard::TenantShardId;
381 : use std::str::FromStr;
382 : use tracing::Instrument;
383 : use utils::{id::TenantId, lsn::Lsn};
384 :
385 : #[tokio::test]
386 2 : async fn short_v14_redo() {
387 2 : let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
388 2 :
389 2 : let h = RedoHarness::new().unwrap();
390 2 :
391 2 : let page = h
392 2 : .manager
393 2 : .request_redo(
394 2 : Key {
395 2 : field1: 0,
396 2 : field2: 1663,
397 2 : field3: 13010,
398 2 : field4: 1259,
399 2 : field5: 0,
400 2 : field6: 0,
401 2 : },
402 2 : Lsn::from_str("0/16E2408").unwrap(),
403 2 : None,
404 2 : short_records(),
405 2 : 14,
406 2 : )
407 2 : .instrument(h.span())
408 2 : .await
409 2 : .unwrap();
410 2 :
411 2 : assert_eq!(&expected, &*page);
412 2 : }
413 :
414 : #[tokio::test]
415 2 : async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
416 2 : let h = RedoHarness::new().unwrap();
417 2 :
418 2 : let page = h
419 2 : .manager
420 2 : .request_redo(
421 2 : Key {
422 2 : field1: 0,
423 2 : field2: 1663,
424 2 : // key should be 13010
425 2 : field3: 13130,
426 2 : field4: 1259,
427 2 : field5: 0,
428 2 : field6: 0,
429 2 : },
430 2 : Lsn::from_str("0/16E2408").unwrap(),
431 2 : None,
432 2 : short_records(),
433 2 : 14,
434 2 : )
435 2 : .instrument(h.span())
436 2 : .await
437 2 : .unwrap();
438 2 :
439 2 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
440 2 : // perhaps be captured as long as it's in the same thread.
441 2 : assert_eq!(page, crate::ZERO_PAGE);
442 2 : }
443 :
444 : #[tokio::test]
445 2 : async fn test_stderr() {
446 2 : let h = RedoHarness::new().unwrap();
447 2 : h
448 2 : .manager
449 2 : .request_redo(
450 2 : Key::from_i128(0),
451 2 : Lsn::INVALID,
452 2 : None,
453 2 : short_records(),
454 2 : 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
455 2 : )
456 2 : .instrument(h.span())
457 2 : .await
458 2 : .unwrap_err();
459 2 : }
460 :
461 : #[allow(clippy::octal_escapes)]
462 6 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
463 6 : vec![
464 6 : (
465 6 : Lsn::from_str("0/16A9388").unwrap(),
466 6 : NeonWalRecord::Postgres {
467 6 : will_init: true,
468 6 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
469 6 : }
470 6 : ),
471 6 : (
472 6 : Lsn::from_str("0/16D4080").unwrap(),
473 6 : NeonWalRecord::Postgres {
474 6 : will_init: false,
475 6 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
476 6 : }
477 6 : )
478 6 : ]
479 6 : }
480 :
481 : struct RedoHarness {
482 : // underscored because unused, except for removal at drop
483 : _repo_dir: camino_tempfile::Utf8TempDir,
484 : manager: PostgresRedoManager,
485 : tenant_shard_id: TenantShardId,
486 : }
487 :
488 : impl RedoHarness {
489 6 : fn new() -> anyhow::Result<Self> {
490 6 : crate::tenant::harness::setup_logging();
491 :
492 6 : let repo_dir = camino_tempfile::tempdir()?;
493 6 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
494 6 : let conf = Box::leak(Box::new(conf));
495 6 : let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
496 6 :
497 6 : let manager = PostgresRedoManager::new(conf, tenant_shard_id);
498 6 :
499 6 : Ok(RedoHarness {
500 6 : _repo_dir: repo_dir,
501 6 : manager,
502 6 : tenant_shard_id,
503 6 : })
504 6 : }
505 6 : fn span(&self) -> tracing::Span {
506 6 : tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
507 6 : }
508 : }
509 : }
|