Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 :
21 : /// Process lifecycle and abstracction for the IPC protocol.
22 : mod process;
23 :
24 : /// Code to apply [`NeonWalRecord`]s.
25 : mod apply_neon;
26 :
27 : use crate::config::PageServerConf;
28 : use crate::metrics::{
29 : WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
30 : WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
31 : };
32 : use crate::repository::Key;
33 : use crate::walrecord::NeonWalRecord;
34 : use anyhow::Context;
35 : use bytes::{Bytes, BytesMut};
36 : use pageserver_api::key::key_to_rel_block;
37 : use pageserver_api::models::WalRedoManagerStatus;
38 : use pageserver_api::shard::TenantShardId;
39 : use std::sync::{Arc, RwLock};
40 : use std::time::Duration;
41 : use std::time::Instant;
42 : use tracing::*;
43 : use utils::lsn::Lsn;
44 :
45 : ///
46 : /// This is the real implementation that uses a Postgres process to
47 : /// perform WAL replay. Only one thread can use the process at a time,
48 : /// that is controlled by the Mutex. In the future, we might want to
49 : /// launch a pool of processes to allow concurrent replay of multiple
50 : /// records.
51 : ///
52 : pub struct PostgresRedoManager {
53 : tenant_shard_id: TenantShardId,
54 : conf: &'static PageServerConf,
55 : last_redo_at: std::sync::Mutex<Option<Instant>>,
56 : redo_process: RwLock<Option<Arc<process::WalRedoProcess>>>,
57 : }
58 :
59 : ///
60 : /// Public interface of WAL redo manager
61 : ///
62 : impl PostgresRedoManager {
63 : ///
64 : /// Request the WAL redo manager to apply some WAL records
65 : ///
66 : /// The WAL redo is handled by a separate thread, so this just sends a request
67 : /// to the thread and waits for response.
68 : ///
69 : /// # Cancel-Safety
70 : ///
71 : /// This method is cancellation-safe.
72 2860923 : pub async fn request_redo(
73 2860923 : &self,
74 2860923 : key: Key,
75 2860923 : lsn: Lsn,
76 2860923 : base_img: Option<(Lsn, Bytes)>,
77 2860923 : records: Vec<(Lsn, NeonWalRecord)>,
78 2860923 : pg_version: u32,
79 2860923 : ) -> anyhow::Result<Bytes> {
80 2860923 : if records.is_empty() {
81 0 : anyhow::bail!("invalid WAL redo request with no records");
82 2860923 : }
83 2860923 :
84 2860923 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
85 2860923 : let mut img = base_img.map(|p| p.1);
86 2860923 : let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
87 2860923 : let mut batch_start = 0;
88 78959566 : for (i, record) in records.iter().enumerate().skip(1) {
89 78959566 : let rec_neon = apply_neon::can_apply_in_neon(&record.1);
90 78959566 :
91 78959566 : if rec_neon != batch_neon {
92 14 : let result = if batch_neon {
93 13 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
94 : } else {
95 1 : self.apply_batch_postgres(
96 1 : key,
97 1 : lsn,
98 1 : img,
99 1 : base_img_lsn,
100 1 : &records[batch_start..i],
101 1 : self.conf.wal_redo_timeout,
102 1 : pg_version,
103 1 : )
104 : };
105 14 : img = Some(result?);
106 :
107 14 : batch_neon = rec_neon;
108 14 : batch_start = i;
109 78959552 : }
110 : }
111 : // last batch
112 2860923 : if batch_neon {
113 4300 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
114 : } else {
115 2856623 : self.apply_batch_postgres(
116 2856623 : key,
117 2856623 : lsn,
118 2856623 : img,
119 2856623 : base_img_lsn,
120 2856623 : &records[batch_start..],
121 2856623 : self.conf.wal_redo_timeout,
122 2856623 : pg_version,
123 2856623 : )
124 : }
125 2860923 : }
126 :
127 486 : pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
128 486 : Some(WalRedoManagerStatus {
129 486 : last_redo_at: {
130 486 : let at = *self.last_redo_at.lock().unwrap();
131 486 : at.and_then(|at| {
132 190 : let age = at.elapsed();
133 190 : // map any chrono errors silently to None here
134 190 : chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
135 486 : })
136 486 : },
137 486 : pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()),
138 486 : })
139 486 : }
140 : }
141 :
142 : impl PostgresRedoManager {
143 : ///
144 : /// Create a new PostgresRedoManager.
145 : ///
146 864 : pub fn new(
147 864 : conf: &'static PageServerConf,
148 864 : tenant_shard_id: TenantShardId,
149 864 : ) -> PostgresRedoManager {
150 864 : // The actual process is launched lazily, on first request.
151 864 : PostgresRedoManager {
152 864 : tenant_shard_id,
153 864 : conf,
154 864 : last_redo_at: std::sync::Mutex::default(),
155 864 : redo_process: RwLock::new(None),
156 864 : }
157 864 : }
158 :
159 : /// This type doesn't have its own background task to check for idleness: we
160 : /// rely on our owner calling this function periodically in its own housekeeping
161 : /// loops.
162 775 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
163 775 : if let Ok(g) = self.last_redo_at.try_lock() {
164 775 : if let Some(last_redo_at) = *g {
165 395 : if last_redo_at.elapsed() >= idle_timeout {
166 203 : drop(g);
167 203 : let mut guard = self.redo_process.write().unwrap();
168 203 : *guard = None;
169 203 : }
170 380 : }
171 0 : }
172 775 : }
173 :
174 : ///
175 : /// Process one request for WAL redo using wal-redo postgres
176 : ///
177 : #[allow(clippy::too_many_arguments)]
178 2856624 : fn apply_batch_postgres(
179 2856624 : &self,
180 2856624 : key: Key,
181 2856624 : lsn: Lsn,
182 2856624 : base_img: Option<Bytes>,
183 2856624 : base_img_lsn: Lsn,
184 2856624 : records: &[(Lsn, NeonWalRecord)],
185 2856624 : wal_redo_timeout: Duration,
186 2856624 : pg_version: u32,
187 2856624 : ) -> anyhow::Result<Bytes> {
188 2856624 : *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
189 :
190 2856624 : let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
191 : const MAX_RETRY_ATTEMPTS: u32 = 1;
192 2856624 : let mut n_attempts = 0u32;
193 : loop {
194 : // launch the WAL redo process on first use
195 2856626 : let proc: Arc<process::WalRedoProcess> = {
196 2856626 : let proc_guard = self.redo_process.read().unwrap();
197 2856626 : match &*proc_guard {
198 : None => {
199 : // "upgrade" to write lock to launch the process
200 609 : drop(proc_guard);
201 609 : let mut proc_guard = self.redo_process.write().unwrap();
202 609 : match &*proc_guard {
203 : None => {
204 604 : let start = Instant::now();
205 604 : let proc = Arc::new(
206 604 : process::WalRedoProcess::launch(
207 604 : self.conf,
208 604 : self.tenant_shard_id,
209 604 : pg_version,
210 604 : )
211 604 : .context("launch walredo process")?,
212 : );
213 604 : let duration = start.elapsed();
214 604 : WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
215 604 : .observe(duration.as_secs_f64());
216 604 : info!(
217 604 : duration_ms = duration.as_millis(),
218 604 : pid = proc.id(),
219 604 : "launched walredo process"
220 604 : );
221 604 : *proc_guard = Some(Arc::clone(&proc));
222 604 : proc
223 : }
224 5 : Some(proc) => Arc::clone(proc),
225 : }
226 : }
227 2856017 : Some(proc) => Arc::clone(proc),
228 : }
229 : };
230 :
231 2856626 : let started_at = std::time::Instant::now();
232 2856626 :
233 2856626 : // Relational WAL records are applied using wal-redo-postgres
234 2856626 : let result = proc
235 2856626 : .apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
236 2856626 : .context("apply_wal_records");
237 2856626 :
238 2856626 : let duration = started_at.elapsed();
239 2856626 :
240 2856626 : let len = records.len();
241 62894968 : let nbytes = records.iter().fold(0, |acumulator, record| {
242 62894968 : acumulator
243 62894968 : + match &record.1 {
244 62894968 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
245 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
246 : }
247 62894968 : });
248 2856626 :
249 2856626 : WAL_REDO_TIME.observe(duration.as_secs_f64());
250 2856626 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
251 2856626 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
252 2856626 :
253 2856626 : debug!(
254 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
255 0 : len,
256 0 : nbytes,
257 0 : duration.as_micros(),
258 0 : lsn
259 0 : );
260 :
261 : // If something went wrong, don't try to reuse the process. Kill it, and
262 : // next request will launch a new one.
263 2856626 : if let Err(e) = result.as_ref() {
264 4 : error!(
265 4 : "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
266 4 : records.len(),
267 4 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
268 4 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
269 4 : nbytes,
270 4 : base_img_lsn,
271 4 : lsn,
272 4 : n_attempts,
273 4 : e,
274 4 : );
275 : // Avoid concurrent callers hitting the same issue.
276 : // We can't prevent it from happening because we want to enable parallelism.
277 : {
278 4 : let mut guard = self.redo_process.write().unwrap();
279 4 : match &*guard {
280 4 : Some(current_field_value) => {
281 4 : if Arc::ptr_eq(current_field_value, &proc) {
282 4 : // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
283 4 : *guard = None;
284 4 : }
285 : }
286 0 : None => {
287 0 : // Another thread was faster to observe the error, and already took the process out of rotation.
288 0 : }
289 : }
290 : }
291 : // NB: there may still be other concurrent threads using `proc`.
292 : // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
293 : // NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
294 : // holding the lock while waiting for the process to exit.
295 : // NB: the drop impl blocks the current threads with a wait() system call for
296 : // the child process. We dropped the `guard` above so that other threads aren't
297 : // affected. But, it's good that the current thread _does_ block to wait.
298 : // If we instead deferred the waiting into the background / to tokio, it could
299 : // happen that if walredo always fails immediately, we spawn processes faster
300 : // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
301 : // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
302 : // This probably needs revisiting at some later point.
303 4 : drop(proc);
304 2856622 : } else if n_attempts != 0 {
305 0 : info!(n_attempts, "retried walredo succeeded");
306 2856622 : }
307 2856626 : n_attempts += 1;
308 2856626 : if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
309 2856624 : return result;
310 2 : }
311 : }
312 2856624 : }
313 :
314 : ///
315 : /// Process a batch of WAL records using bespoken Neon code.
316 : ///
317 4313 : fn apply_batch_neon(
318 4313 : &self,
319 4313 : key: Key,
320 4313 : lsn: Lsn,
321 4313 : base_img: Option<Bytes>,
322 4313 : records: &[(Lsn, NeonWalRecord)],
323 4313 : ) -> anyhow::Result<Bytes> {
324 4313 : let start_time = Instant::now();
325 4313 :
326 4313 : let mut page = BytesMut::new();
327 4313 : if let Some(fpi) = base_img {
328 4313 : // If full-page image is provided, then use it...
329 4313 : page.extend_from_slice(&fpi[..]);
330 4313 : } else {
331 : // All the current WAL record types that we can handle require a base image.
332 0 : anyhow::bail!("invalid neon WAL redo request with no base image");
333 : }
334 :
335 : // Apply all the WAL records in the batch
336 18925525 : for (record_lsn, record) in records.iter() {
337 18925525 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
338 : }
339 : // Success!
340 4313 : let duration = start_time.elapsed();
341 4313 : // FIXME: using the same metric here creates a bimodal distribution by default, and because
342 4313 : // there could be multiple batch sizes this would be N+1 modal.
343 4313 : WAL_REDO_TIME.observe(duration.as_secs_f64());
344 4313 :
345 4313 : debug!(
346 0 : "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
347 0 : records.len(),
348 0 : duration.as_micros(),
349 0 : lsn
350 0 : );
351 :
352 4313 : Ok(page.freeze())
353 4313 : }
354 :
355 18925525 : fn apply_record_neon(
356 18925525 : &self,
357 18925525 : key: Key,
358 18925525 : page: &mut BytesMut,
359 18925525 : _record_lsn: Lsn,
360 18925525 : record: &NeonWalRecord,
361 18925525 : ) -> anyhow::Result<()> {
362 18925525 : apply_neon::apply_in_neon(record, key, page)?;
363 :
364 18925525 : Ok(())
365 18925525 : }
366 : }
367 :
368 : #[cfg(test)]
369 : mod tests {
370 : use super::PostgresRedoManager;
371 : use crate::repository::Key;
372 : use crate::{config::PageServerConf, walrecord::NeonWalRecord};
373 : use bytes::Bytes;
374 : use pageserver_api::shard::TenantShardId;
375 : use std::str::FromStr;
376 : use tracing::Instrument;
377 : use utils::{id::TenantId, lsn::Lsn};
378 :
379 2 : #[tokio::test]
380 2 : async fn short_v14_redo() {
381 2 : let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
382 2 :
383 2 : let h = RedoHarness::new().unwrap();
384 :
385 2 : let page = h
386 2 : .manager
387 2 : .request_redo(
388 2 : Key {
389 2 : field1: 0,
390 2 : field2: 1663,
391 2 : field3: 13010,
392 2 : field4: 1259,
393 2 : field5: 0,
394 2 : field6: 0,
395 2 : },
396 2 : Lsn::from_str("0/16E2408").unwrap(),
397 2 : None,
398 2 : short_records(),
399 2 : 14,
400 2 : )
401 2 : .instrument(h.span())
402 0 : .await
403 2 : .unwrap();
404 2 :
405 2 : assert_eq!(&expected, &*page);
406 : }
407 :
408 2 : #[tokio::test]
409 2 : async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
410 2 : let h = RedoHarness::new().unwrap();
411 :
412 2 : let page = h
413 2 : .manager
414 2 : .request_redo(
415 2 : Key {
416 2 : field1: 0,
417 2 : field2: 1663,
418 2 : // key should be 13010
419 2 : field3: 13130,
420 2 : field4: 1259,
421 2 : field5: 0,
422 2 : field6: 0,
423 2 : },
424 2 : Lsn::from_str("0/16E2408").unwrap(),
425 2 : None,
426 2 : short_records(),
427 2 : 14,
428 2 : )
429 2 : .instrument(h.span())
430 0 : .await
431 2 : .unwrap();
432 2 :
433 2 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
434 2 : // perhaps be captured as long as it's in the same thread.
435 2 : assert_eq!(page, crate::ZERO_PAGE);
436 : }
437 :
438 2 : #[tokio::test]
439 2 : async fn test_stderr() {
440 2 : let h = RedoHarness::new().unwrap();
441 2 : h
442 2 : .manager
443 2 : .request_redo(
444 2 : Key::from_i128(0),
445 2 : Lsn::INVALID,
446 2 : None,
447 2 : short_records(),
448 2 : 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
449 2 : )
450 2 : .instrument(h.span())
451 0 : .await
452 2 : .unwrap_err();
453 : }
454 :
455 : #[allow(clippy::octal_escapes)]
456 6 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
457 6 : vec![
458 6 : (
459 6 : Lsn::from_str("0/16A9388").unwrap(),
460 6 : NeonWalRecord::Postgres {
461 6 : will_init: true,
462 6 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
463 6 : }
464 6 : ),
465 6 : (
466 6 : Lsn::from_str("0/16D4080").unwrap(),
467 6 : NeonWalRecord::Postgres {
468 6 : will_init: false,
469 6 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
470 6 : }
471 6 : )
472 6 : ]
473 6 : }
474 :
475 : struct RedoHarness {
476 : // underscored because unused, except for removal at drop
477 : _repo_dir: camino_tempfile::Utf8TempDir,
478 : manager: PostgresRedoManager,
479 : tenant_shard_id: TenantShardId,
480 : }
481 :
482 : impl RedoHarness {
483 6 : fn new() -> anyhow::Result<Self> {
484 6 : crate::tenant::harness::setup_logging();
485 :
486 6 : let repo_dir = camino_tempfile::tempdir()?;
487 6 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
488 6 : let conf = Box::leak(Box::new(conf));
489 6 : let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
490 6 :
491 6 : let manager = PostgresRedoManager::new(conf, tenant_shard_id);
492 6 :
493 6 : Ok(RedoHarness {
494 6 : _repo_dir: repo_dir,
495 6 : manager,
496 6 : tenant_shard_id,
497 6 : })
498 6 : }
499 6 : fn span(&self) -> tracing::Span {
500 6 : tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
501 6 : }
502 : }
503 : }
|