LCOV - code coverage report
Current view: top level - pageserver/src - basebackup_cache.rs (source / functions) Coverage Total Hit
Test: 553e39c2773e5840c720c90d86e56f89a4330d43.info Lines: 0.0 % 355 0
Test Date: 2025-06-13 20:01:21 Functions: 0.0 % 21 0

            Line data    Source code
       1              : use std::{collections::HashMap, sync::Arc};
       2              : 
       3              : use anyhow::Context;
       4              : use async_compression::tokio::write::GzipEncoder;
       5              : use camino::{Utf8Path, Utf8PathBuf};
       6              : use metrics::core::{AtomicU64, GenericCounter};
       7              : use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
       8              : use tokio::{
       9              :     io::{AsyncWriteExt, BufWriter},
      10              :     sync::mpsc::{UnboundedReceiver, UnboundedSender},
      11              : };
      12              : use tokio_util::sync::CancellationToken;
      13              : use utils::{
      14              :     id::{TenantId, TenantTimelineId, TimelineId},
      15              :     lsn::Lsn,
      16              :     shard::TenantShardId,
      17              : };
      18              : 
      19              : use crate::{
      20              :     basebackup::send_basebackup_tarball,
      21              :     context::{DownloadBehavior, RequestContext},
      22              :     metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
      23              :     task_mgr::TaskKind,
      24              :     tenant::{
      25              :         Timeline,
      26              :         mgr::{TenantManager, TenantSlot},
      27              :     },
      28              : };
      29              : 
      30              : pub struct BasebackupPrepareRequest {
      31              :     pub tenant_shard_id: TenantShardId,
      32              :     pub timeline_id: TimelineId,
      33              :     pub lsn: Lsn,
      34              : }
      35              : 
      36              : pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
      37              : pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
      38              : 
      39              : type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
      40              : type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
      41              : 
      42              : /// BasebackupCache stores cached basebackup archives for timelines on local disk.
      43              : ///
      44              : /// The main purpose of this cache is to speed up the startup process of compute nodes
      45              : /// after scaling to zero.
      46              : /// Thus, the basebackup is stored only for the latest LSN of the timeline and with
      47              : /// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
      48              : ///
      49              : /// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
      50              : /// generates a basebackup from the timeline in the background, and stores it on disk.
      51              : ///
      52              : /// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
      53              : /// and ~1 RPS for get requests.
      54              : pub struct BasebackupCache {
      55              :     data_dir: Utf8PathBuf,
      56              :     config: BasebackupCacheConfig,
      57              :     tenant_manager: Arc<TenantManager>,
      58              :     remove_entry_sender: BasebackupRemoveEntrySender,
      59              : 
      60              :     entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
      61              : 
      62              :     cancel: CancellationToken,
      63              : 
      64              :     read_hit_count: GenericCounter<AtomicU64>,
      65              :     read_miss_count: GenericCounter<AtomicU64>,
      66              :     read_err_count: GenericCounter<AtomicU64>,
      67              : 
      68              :     prepare_ok_count: GenericCounter<AtomicU64>,
      69              :     prepare_skip_count: GenericCounter<AtomicU64>,
      70              :     prepare_err_count: GenericCounter<AtomicU64>,
      71              : }
      72              : 
      73              : impl BasebackupCache {
      74              :     /// Creates a BasebackupCache and spawns the background task.
      75              :     /// The initialization of the cache is performed in the background and does not
      76              :     /// block the caller. The cache will return `None` for any get requests until
      77              :     /// initialization is complete.
      78            0 :     pub fn spawn(
      79            0 :         runtime_handle: &tokio::runtime::Handle,
      80            0 :         data_dir: Utf8PathBuf,
      81            0 :         config: Option<BasebackupCacheConfig>,
      82            0 :         prepare_receiver: BasebackupPrepareReceiver,
      83            0 :         tenant_manager: Arc<TenantManager>,
      84            0 :         cancel: CancellationToken,
      85            0 :     ) -> Arc<Self> {
      86            0 :         let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
      87            0 : 
      88            0 :         let enabled = config.is_some();
      89            0 : 
      90            0 :         let cache = Arc::new(BasebackupCache {
      91            0 :             data_dir,
      92            0 :             config: config.unwrap_or_default(),
      93            0 :             tenant_manager,
      94            0 :             remove_entry_sender,
      95            0 : 
      96            0 :             entries: std::sync::Mutex::new(HashMap::new()),
      97            0 : 
      98            0 :             cancel,
      99            0 : 
     100            0 :             read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
     101            0 :             read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
     102            0 :             read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
     103            0 : 
     104            0 :             prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
     105            0 :             prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
     106            0 :             prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
     107            0 :         });
     108            0 : 
     109            0 :         if enabled {
     110            0 :             runtime_handle.spawn(
     111            0 :                 cache
     112            0 :                     .clone()
     113            0 :                     .background(prepare_receiver, remove_entry_receiver),
     114            0 :             );
     115            0 :         }
     116              : 
     117            0 :         cache
     118            0 :     }
     119              : 
     120              :     /// Gets a basebackup entry from the cache.
     121              :     /// If the entry is found, opens a file with the basebackup archive and returns it.
     122              :     /// The open file descriptor will prevent the file system from deleting the file
     123              :     /// even if the entry is removed from the cache in the background.
     124            0 :     pub async fn get(
     125            0 :         &self,
     126            0 :         tenant_id: TenantId,
     127            0 :         timeline_id: TimelineId,
     128            0 :         lsn: Lsn,
     129            0 :     ) -> Option<tokio::fs::File> {
     130            0 :         // Fast path. Check if the entry exists using the in-memory state.
     131            0 :         let tti = TenantTimelineId::new(tenant_id, timeline_id);
     132            0 :         if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
     133            0 :             self.read_miss_count.inc();
     134            0 :             return None;
     135            0 :         }
     136            0 : 
     137            0 :         let path = self.entry_path(tenant_id, timeline_id, lsn);
     138            0 : 
     139            0 :         match tokio::fs::File::open(path).await {
     140            0 :             Ok(file) => {
     141            0 :                 self.read_hit_count.inc();
     142            0 :                 Some(file)
     143              :             }
     144            0 :             Err(e) => {
     145            0 :                 if e.kind() == std::io::ErrorKind::NotFound {
     146            0 :                     // We may end up here if the basebackup was concurrently removed by the cleanup task.
     147            0 :                     self.read_miss_count.inc();
     148            0 :                 } else {
     149            0 :                     self.read_err_count.inc();
     150            0 :                     tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
     151              :                 }
     152            0 :                 None
     153              :             }
     154              :         }
     155            0 :     }
     156              : 
     157              :     // Private methods.
     158              : 
     159            0 :     fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
     160            0 :         // The default format for LSN is 0/ABCDEF.
     161            0 :         // The backslash is not filename friendly, so serialize it as plain hex.
     162            0 :         let lsn = lsn.0;
     163            0 :         format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
     164            0 :     }
     165              : 
     166            0 :     fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
     167            0 :         self.data_dir
     168            0 :             .join(Self::entry_filename(tenant_id, timeline_id, lsn))
     169            0 :     }
     170              : 
     171            0 :     fn tmp_dir(&self) -> Utf8PathBuf {
     172            0 :         self.data_dir.join("tmp")
     173            0 :     }
     174              : 
     175            0 :     fn entry_tmp_path(
     176            0 :         &self,
     177            0 :         tenant_id: TenantId,
     178            0 :         timeline_id: TimelineId,
     179            0 :         lsn: Lsn,
     180            0 :     ) -> Utf8PathBuf {
     181            0 :         self.tmp_dir()
     182            0 :             .join(Self::entry_filename(tenant_id, timeline_id, lsn))
     183            0 :     }
     184              : 
     185            0 :     fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
     186            0 :         let parts: Vec<&str> = filename
     187            0 :             .strip_prefix("basebackup_")?
     188            0 :             .strip_suffix(".tar.gz")?
     189            0 :             .split('_')
     190            0 :             .collect();
     191            0 :         if parts.len() != 3 {
     192            0 :             return None;
     193            0 :         }
     194            0 :         let tenant_id = parts[0].parse::<TenantId>().ok()?;
     195            0 :         let timeline_id = parts[1].parse::<TimelineId>().ok()?;
     196            0 :         let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
     197              : 
     198            0 :         Some((tenant_id, timeline_id, lsn))
     199            0 :     }
     200              : 
     201              :     // Recreate the tmp directory to clear all files in it.
     202            0 :     async fn clean_tmp_dir(&self) -> anyhow::Result<()> {
     203            0 :         let tmp_dir = self.tmp_dir();
     204            0 :         if tmp_dir.exists() {
     205            0 :             tokio::fs::remove_dir_all(&tmp_dir).await?;
     206            0 :         }
     207            0 :         tokio::fs::create_dir_all(&tmp_dir).await?;
     208            0 :         Ok(())
     209            0 :     }
     210              : 
     211            0 :     async fn cleanup(&self) -> anyhow::Result<()> {
     212            0 :         self.clean_tmp_dir().await?;
     213              : 
     214              :         // Remove outdated entries.
     215            0 :         let entries_old = self.entries.lock().unwrap().clone();
     216            0 :         let mut entries_new = HashMap::new();
     217            0 :         for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
     218            0 :             if !tenant_shard_id.is_shard_zero() {
     219            0 :                 continue;
     220            0 :             }
     221            0 :             let TenantSlot::Attached(tenant) = tenant_slot else {
     222            0 :                 continue;
     223              :             };
     224            0 :             let tenant_id = tenant_shard_id.tenant_id;
     225              : 
     226            0 :             for timeline in tenant.list_timelines() {
     227            0 :                 let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
     228            0 :                 if let Some(&entry_lsn) = entries_old.get(&tti) {
     229            0 :                     if timeline.get_last_record_lsn() <= entry_lsn {
     230            0 :                         entries_new.insert(tti, entry_lsn);
     231            0 :                     }
     232            0 :                 }
     233              :             }
     234              :         }
     235              : 
     236            0 :         for (&tti, &lsn) in entries_old.iter() {
     237            0 :             if !entries_new.contains_key(&tti) {
     238            0 :                 self.remove_entry_sender
     239            0 :                     .send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
     240            0 :                     .unwrap();
     241            0 :             }
     242              :         }
     243              : 
     244            0 :         BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
     245            0 :         *self.entries.lock().unwrap() = entries_new;
     246            0 : 
     247            0 :         Ok(())
     248            0 :     }
     249              : 
     250            0 :     async fn on_startup(&self) -> anyhow::Result<()> {
     251            0 :         // Create data_dir if it does not exist.
     252            0 :         tokio::fs::create_dir_all(&self.data_dir)
     253            0 :             .await
     254            0 :             .context("Failed to create basebackup cache data directory")?;
     255              : 
     256            0 :         self.clean_tmp_dir()
     257            0 :             .await
     258            0 :             .context("Failed to clean tmp directory")?;
     259              : 
     260              :         // Read existing entries from the data_dir and add them to in-memory state.
     261            0 :         let mut entries = HashMap::new();
     262            0 :         let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
     263            0 :         while let Some(dir_entry) = dir.next_entry().await? {
     264            0 :             let filename = dir_entry.file_name();
     265            0 : 
     266            0 :             if filename == "tmp" {
     267              :                 // Skip the tmp directory.
     268            0 :                 continue;
     269            0 :             }
     270            0 : 
     271            0 :             let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
     272            0 :             let Some((tenant_id, timeline_id, lsn)) = parsed else {
     273            0 :                 tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
     274            0 :                 continue;
     275              :             };
     276              : 
     277            0 :             let tti = TenantTimelineId::new(tenant_id, timeline_id);
     278              : 
     279              :             use std::collections::hash_map::Entry::*;
     280              : 
     281            0 :             match entries.entry(tti) {
     282            0 :                 Occupied(mut entry) => {
     283            0 :                     let entry_lsn = *entry.get();
     284            0 :                     // Leave only the latest entry, remove the old one.
     285            0 :                     if lsn < entry_lsn {
     286            0 :                         self.remove_entry_sender.send(self.entry_path(
     287            0 :                             tenant_id,
     288            0 :                             timeline_id,
     289            0 :                             lsn,
     290            0 :                         ))?;
     291            0 :                     } else if lsn > entry_lsn {
     292            0 :                         self.remove_entry_sender.send(self.entry_path(
     293            0 :                             tenant_id,
     294            0 :                             timeline_id,
     295            0 :                             entry_lsn,
     296            0 :                         ))?;
     297            0 :                         entry.insert(lsn);
     298              :                     } else {
     299              :                         // Two different filenames parsed to the same timline_id and LSN.
     300              :                         // Should never happen.
     301            0 :                         return Err(anyhow::anyhow!(
     302            0 :                             "Duplicate basebackup cache entry with the same LSN: {:?}",
     303            0 :                             filename
     304            0 :                         ));
     305              :                     }
     306              :                 }
     307            0 :                 Vacant(entry) => {
     308            0 :                     entry.insert(lsn);
     309            0 :                 }
     310              :             }
     311              :         }
     312              : 
     313            0 :         BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
     314            0 :         *self.entries.lock().unwrap() = entries;
     315            0 : 
     316            0 :         Ok(())
     317            0 :     }
     318              : 
     319            0 :     async fn background(
     320            0 :         self: Arc<Self>,
     321            0 :         mut prepare_receiver: BasebackupPrepareReceiver,
     322            0 :         mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
     323            0 :     ) {
     324            0 :         // Panic in the background is a safe fallback.
     325            0 :         // It will drop receivers and the cache will be effectively disabled.
     326            0 :         self.on_startup()
     327            0 :             .await
     328            0 :             .expect("Failed to initialize basebackup cache");
     329            0 : 
     330            0 :         let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
     331            0 :         cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
     332              : 
     333              :         loop {
     334            0 :             tokio::select! {
     335            0 :                 Some(req) = prepare_receiver.recv() => {
     336            0 :                     if let Err(err) = self.prepare_basebackup(
     337            0 :                         req.tenant_shard_id,
     338            0 :                         req.timeline_id,
     339            0 :                         req.lsn,
     340            0 :                     ).await {
     341            0 :                         tracing::info!("Failed to prepare basebackup: {:#}", err);
     342            0 :                         self.prepare_err_count.inc();
     343            0 :                         continue;
     344            0 :                     }
     345              :                 }
     346            0 :                 Some(req) = remove_entry_receiver.recv() => {
     347            0 :                     if let Err(e) = tokio::fs::remove_file(req).await {
     348            0 :                         tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
     349            0 :                     }
     350              :                 }
     351            0 :                 _ = cleanup_ticker.tick() => {
     352            0 :                     self.cleanup().await.unwrap_or_else(|e| {
     353            0 :                         tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
     354            0 :                     });
     355            0 :                 }
     356            0 :                 _ = self.cancel.cancelled() => {
     357            0 :                     tracing::info!("BasebackupCache background task cancelled");
     358            0 :                     break;
     359            0 :                 }
     360            0 :             }
     361            0 :         }
     362            0 :     }
     363              : 
     364              :     /// Prepare a basebackup for the given timeline.
     365              :     ///
     366              :     /// If the basebackup already exists with a higher LSN or the timeline already
     367              :     /// has a higher last_record_lsn, skip the preparation.
     368              :     ///
     369              :     /// The basebackup is prepared in a temporary directory and then moved to the final
     370              :     /// location to make the operation atomic.
     371            0 :     async fn prepare_basebackup(
     372            0 :         &self,
     373            0 :         tenant_shard_id: TenantShardId,
     374            0 :         timeline_id: TimelineId,
     375            0 :         req_lsn: Lsn,
     376            0 :     ) -> anyhow::Result<()> {
     377            0 :         tracing::info!(
     378              :             tenant_id = %tenant_shard_id.tenant_id,
     379              :             %timeline_id,
     380              :             %req_lsn,
     381            0 :             "Preparing basebackup for timeline",
     382              :         );
     383              : 
     384            0 :         let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
     385            0 : 
     386            0 :         {
     387            0 :             let entries = self.entries.lock().unwrap();
     388            0 :             if let Some(&entry_lsn) = entries.get(&tti) {
     389            0 :                 if entry_lsn >= req_lsn {
     390            0 :                     tracing::info!(
     391              :                         %timeline_id,
     392              :                         %req_lsn,
     393              :                         %entry_lsn,
     394            0 :                         "Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
     395              :                     );
     396            0 :                     self.prepare_skip_count.inc();
     397            0 :                     return Ok(());
     398            0 :                 }
     399            0 :             }
     400              : 
     401            0 :             if entries.len() as i64 >= self.config.max_size_entries {
     402            0 :                 tracing::info!(
     403              :                     %timeline_id,
     404              :                     %req_lsn,
     405            0 :                     "Basebackup cache is full, skipping basebackup",
     406              :                 );
     407            0 :                 self.prepare_skip_count.inc();
     408            0 :                 return Ok(());
     409            0 :             }
     410              :         }
     411              : 
     412            0 :         let tenant = self
     413            0 :             .tenant_manager
     414            0 :             .get_attached_tenant_shard(tenant_shard_id)?;
     415              : 
     416            0 :         let tenant_state = tenant.current_state();
     417            0 :         if tenant_state != TenantState::Active {
     418            0 :             anyhow::bail!(
     419            0 :                 "Tenant {} is not active, current state: {:?}",
     420            0 :                 tenant_shard_id.tenant_id,
     421            0 :                 tenant_state
     422            0 :             )
     423            0 :         }
     424              : 
     425            0 :         let timeline = tenant.get_timeline(timeline_id, true)?;
     426              : 
     427            0 :         let last_record_lsn = timeline.get_last_record_lsn();
     428            0 :         if last_record_lsn > req_lsn {
     429            0 :             tracing::info!(
     430              :                 %timeline_id,
     431              :                 %req_lsn,
     432              :                 %last_record_lsn,
     433            0 :                 "Timeline has a higher LSN than the requested one, skipping basebackup",
     434              :             );
     435            0 :             self.prepare_skip_count.inc();
     436            0 :             return Ok(());
     437            0 :         }
     438            0 : 
     439            0 :         let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
     440              : 
     441            0 :         let res = self
     442            0 :             .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
     443            0 :             .await;
     444              : 
     445            0 :         if let Err(err) = res {
     446            0 :             tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
     447              :             // Try to clean up tmp file. If we fail, the background clean up task will take care of it.
     448            0 :             match tokio::fs::remove_file(&entry_tmp_path).await {
     449            0 :                 Ok(_) => {}
     450            0 :                 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     451            0 :                 Err(e) => {
     452            0 :                     tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
     453              :                 }
     454              :             }
     455            0 :             return Err(err);
     456            0 :         }
     457            0 : 
     458            0 :         // Move the tmp file to the final location atomically.
     459            0 :         // The tmp file is fsynced, so it's guaranteed that we will not have a partial file
     460            0 :         // in the main directory.
     461            0 :         // It's not necessary to fsync the inode after renaming, because the worst case is that
     462            0 :         // the rename operation will be rolled back on the disk failure, the entry will disappear
     463            0 :         // from the main directory, and the entry access will cause a cache miss.
     464            0 :         let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
     465            0 :         tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
     466              : 
     467            0 :         let mut entries = self.entries.lock().unwrap();
     468            0 :         if let Some(old_lsn) = entries.insert(tti, req_lsn) {
     469            0 :             // Remove the old entry if it exists.
     470            0 :             self.remove_entry_sender
     471            0 :                 .send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
     472            0 :                 .unwrap();
     473            0 :         }
     474            0 :         BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
     475            0 : 
     476            0 :         self.prepare_ok_count.inc();
     477            0 :         Ok(())
     478            0 :     }
     479              : 
     480              :     /// Prepares a basebackup in a temporary file.
     481              :     /// Guarantees that the tmp file is fsynced before returning.
     482            0 :     async fn prepare_basebackup_tmp(
     483            0 :         &self,
     484            0 :         entry_tmp_path: &Utf8Path,
     485            0 :         timeline: &Arc<Timeline>,
     486            0 :         req_lsn: Lsn,
     487            0 :     ) -> anyhow::Result<()> {
     488            0 :         let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
     489            0 :         let ctx = ctx.with_scope_timeline(timeline);
     490              : 
     491            0 :         let file = tokio::fs::File::create(entry_tmp_path).await?;
     492            0 :         let mut writer = BufWriter::new(file);
     493            0 : 
     494            0 :         let mut encoder = GzipEncoder::with_quality(
     495            0 :             &mut writer,
     496            0 :             // Level::Best because compression is not on the hot path of basebackup requests.
     497            0 :             // The decompression is almost not affected by the compression level.
     498            0 :             async_compression::Level::Best,
     499            0 :         );
     500            0 : 
     501            0 :         // We may receive a request before the WAL record is applied to the timeline.
     502            0 :         // Wait for the requested LSN to be applied.
     503            0 :         timeline
     504            0 :             .wait_lsn(
     505            0 :                 req_lsn,
     506            0 :                 crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
     507            0 :                 crate::tenant::timeline::WaitLsnTimeout::Default,
     508            0 :                 &ctx,
     509            0 :             )
     510            0 :             .await?;
     511              : 
     512            0 :         send_basebackup_tarball(
     513            0 :             &mut encoder,
     514            0 :             timeline,
     515            0 :             Some(req_lsn),
     516            0 :             None,
     517            0 :             false,
     518            0 :             false,
     519            0 :             &ctx,
     520            0 :         )
     521            0 :         .await?;
     522              : 
     523            0 :         encoder.shutdown().await?;
     524            0 :         writer.flush().await?;
     525            0 :         writer.into_inner().sync_all().await?;
     526              : 
     527            0 :         Ok(())
     528            0 :     }
     529              : }
        

Generated by: LCOV version 2.1-beta