LCOV - code coverage report
Current view: top level - pageserver/src - basebackup_cache.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 5.2 % 383 20
Test Date: 2025-07-16 12:29:03 Functions: 3.4 % 29 1

            Line data    Source code
       1              : use std::{collections::HashMap, sync::Arc};
       2              : 
       3              : use anyhow::Context;
       4              : use camino::{Utf8Path, Utf8PathBuf};
       5              : use metrics::core::{AtomicU64, GenericCounter};
       6              : use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
       7              : use tokio::{
       8              :     io::{AsyncWriteExt, BufWriter},
       9              :     sync::mpsc::{Receiver, Sender, error::TrySendError},
      10              : };
      11              : use tokio_util::sync::CancellationToken;
      12              : use utils::{
      13              :     id::{TenantId, TenantTimelineId, TimelineId},
      14              :     lsn::Lsn,
      15              :     shard::TenantShardId,
      16              : };
      17              : 
      18              : use crate::{
      19              :     basebackup::send_basebackup_tarball,
      20              :     context::{DownloadBehavior, RequestContext},
      21              :     metrics::{
      22              :         BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE,
      23              :         BASEBACKUP_CACHE_READ, BASEBACKUP_CACHE_SIZE,
      24              :     },
      25              :     task_mgr::TaskKind,
      26              :     tenant::{
      27              :         Timeline,
      28              :         mgr::{TenantManager, TenantSlot},
      29              :     },
      30              : };
      31              : 
      32              : pub struct BasebackupPrepareRequest {
      33              :     pub tenant_shard_id: TenantShardId,
      34              :     pub timeline_id: TimelineId,
      35              :     pub lsn: Lsn,
      36              : }
      37              : 
      38              : pub type BasebackupPrepareSender = Sender<BasebackupPrepareRequest>;
      39              : pub type BasebackupPrepareReceiver = Receiver<BasebackupPrepareRequest>;
      40              : 
      41              : #[derive(Clone)]
      42              : struct CacheEntry {
      43              :     /// LSN at which the basebackup was taken.
      44              :     lsn: Lsn,
      45              :     /// Size of the basebackup archive in bytes.
      46              :     size_bytes: u64,
      47              : }
      48              : 
      49              : /// BasebackupCache stores cached basebackup archives for timelines on local disk.
      50              : ///
      51              : /// The main purpose of this cache is to speed up the startup process of compute nodes
      52              : /// after scaling to zero.
      53              : /// Thus, the basebackup is stored only for the latest LSN of the timeline and with
      54              : /// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
      55              : ///
      56              : /// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
      57              : /// generates a basebackup from the timeline in the background, and stores it on disk.
      58              : ///
      59              : /// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
      60              : /// and ~1 RPS for get requests.
      61              : pub struct BasebackupCache {
      62              :     data_dir: Utf8PathBuf,
      63              :     config: Option<BasebackupCacheConfig>,
      64              : 
      65              :     entries: std::sync::Mutex<HashMap<TenantTimelineId, CacheEntry>>,
      66              : 
      67              :     prepare_sender: BasebackupPrepareSender,
      68              : 
      69              :     read_hit_count: GenericCounter<AtomicU64>,
      70              :     read_miss_count: GenericCounter<AtomicU64>,
      71              :     read_err_count: GenericCounter<AtomicU64>,
      72              : 
      73              :     prepare_skip_count: GenericCounter<AtomicU64>,
      74              : }
      75              : 
      76              : impl BasebackupCache {
      77              :     /// Create a new BasebackupCache instance.
      78              :     /// Also returns a BasebackupPrepareReceiver which is needed to start
      79              :     /// the background task.
      80              :     /// The cache is initialized from the data_dir in the background task.
      81              :     /// The cache will return `None` for any get requests until the initialization is complete.
      82              :     /// The background task is spawned separately using [`Self::spawn_background_task`]
      83              :     /// to avoid a circular dependency between the cache and the tenant manager.
      84          120 :     pub fn new(
      85          120 :         data_dir: Utf8PathBuf,
      86          120 :         config: Option<BasebackupCacheConfig>,
      87          120 :     ) -> (Arc<Self>, BasebackupPrepareReceiver) {
      88          120 :         let chan_size = config.as_ref().map(|c| c.max_size_entries).unwrap_or(1);
      89              : 
      90          120 :         let (prepare_sender, prepare_receiver) = tokio::sync::mpsc::channel(chan_size);
      91              : 
      92          120 :         let cache = Arc::new(BasebackupCache {
      93          120 :             data_dir,
      94          120 :             config,
      95          120 :             entries: std::sync::Mutex::new(HashMap::new()),
      96          120 :             prepare_sender,
      97          120 : 
      98          120 :             read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
      99          120 :             read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
     100          120 :             read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
     101          120 : 
     102          120 :             prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
     103          120 :         });
     104              : 
     105          120 :         (cache, prepare_receiver)
     106          120 :     }
     107              : 
     108              :     /// Spawns the background task.
     109              :     /// The background task initializes the cache from the disk,
     110              :     /// processes prepare requests, and cleans up outdated cache entries.
     111              :     /// Noop if the cache is disabled (config is None).
     112            0 :     pub fn spawn_background_task(
     113            0 :         self: Arc<Self>,
     114            0 :         runtime_handle: &tokio::runtime::Handle,
     115            0 :         prepare_receiver: BasebackupPrepareReceiver,
     116            0 :         tenant_manager: Arc<TenantManager>,
     117            0 :         cancel: CancellationToken,
     118            0 :     ) {
     119            0 :         if let Some(config) = self.config.clone() {
     120            0 :             let background = BackgroundTask {
     121            0 :                 c: self,
     122            0 : 
     123            0 :                 config,
     124            0 :                 tenant_manager,
     125            0 :                 cancel,
     126            0 : 
     127            0 :                 entry_count: 0,
     128            0 :                 total_size_bytes: 0,
     129            0 : 
     130            0 :                 prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
     131            0 :                 prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
     132            0 :                 prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
     133            0 :             };
     134            0 :             runtime_handle.spawn(background.run(prepare_receiver));
     135            0 :         }
     136            0 :     }
     137              : 
     138              :     /// Send a basebackup prepare request to the background task.
     139              :     /// The basebackup will be prepared asynchronously, it does not block the caller.
     140              :     /// The request will be skipped if any cache limits are exceeded.
     141            0 :     pub fn send_prepare(&self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, lsn: Lsn) {
     142            0 :         let req = BasebackupPrepareRequest {
     143            0 :             tenant_shard_id,
     144            0 :             timeline_id,
     145            0 :             lsn,
     146            0 :         };
     147              : 
     148            0 :         BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.inc();
     149            0 :         let res = self.prepare_sender.try_send(req);
     150              : 
     151            0 :         if let Err(e) = res {
     152            0 :             BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec();
     153            0 :             self.prepare_skip_count.inc();
     154            0 :             match e {
     155              :                 TrySendError::Full(_) => {
     156              :                     // Basebackup prepares are pretty rare, normally we should not hit this.
     157            0 :                     tracing::info!(
     158              :                         tenant_id = %tenant_shard_id.tenant_id,
     159              :                         %timeline_id,
     160              :                         %lsn,
     161            0 :                         "Basebackup prepare channel is full, skipping the request"
     162              :                     );
     163              :                 }
     164              :                 TrySendError::Closed(_) => {
     165              :                     // Normal during shutdown, not critical.
     166            0 :                     tracing::info!(
     167              :                         tenant_id = %tenant_shard_id.tenant_id,
     168              :                         %timeline_id,
     169              :                         %lsn,
     170            0 :                         "Basebackup prepare channel is closed, skipping the request"
     171              :                     );
     172              :                 }
     173              :             }
     174            0 :         }
     175            0 :     }
     176              : 
     177              :     /// Gets a basebackup entry from the cache.
     178              :     /// If the entry is found, opens a file with the basebackup archive and returns it.
     179              :     /// The open file descriptor will prevent the file system from deleting the file
     180              :     /// even if the entry is removed from the cache in the background.
     181            0 :     pub async fn get(
     182            0 :         &self,
     183            0 :         tenant_id: TenantId,
     184            0 :         timeline_id: TimelineId,
     185            0 :         lsn: Lsn,
     186            0 :     ) -> Option<tokio::fs::File> {
     187            0 :         if !self.is_enabled() {
     188            0 :             return None;
     189            0 :         }
     190              : 
     191              :         // Fast path. Check if the entry exists using the in-memory state.
     192            0 :         let tti = TenantTimelineId::new(tenant_id, timeline_id);
     193            0 :         if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) {
     194            0 :             self.read_miss_count.inc();
     195            0 :             return None;
     196            0 :         }
     197              : 
     198            0 :         let path = self.entry_path(tenant_id, timeline_id, lsn);
     199              : 
     200            0 :         match tokio::fs::File::open(path).await {
     201            0 :             Ok(file) => {
     202            0 :                 self.read_hit_count.inc();
     203            0 :                 Some(file)
     204              :             }
     205            0 :             Err(e) => {
     206            0 :                 if e.kind() == std::io::ErrorKind::NotFound {
     207            0 :                     // We may end up here if the basebackup was concurrently removed by the cleanup task.
     208            0 :                     self.read_miss_count.inc();
     209            0 :                 } else {
     210            0 :                     self.read_err_count.inc();
     211            0 :                     tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
     212              :                 }
     213            0 :                 None
     214              :             }
     215              :         }
     216            0 :     }
     217              : 
     218            0 :     pub fn is_enabled(&self) -> bool {
     219            0 :         self.config.is_some()
     220            0 :     }
     221              : 
     222              :     // Private methods.
     223              : 
     224            0 :     fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
     225              :         // The default format for LSN is 0/ABCDEF.
     226              :         // The backslash is not filename friendly, so serialize it as plain hex.
     227            0 :         let lsn = lsn.0;
     228            0 :         format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
     229            0 :     }
     230              : 
     231            0 :     fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
     232            0 :         self.data_dir
     233            0 :             .join(Self::entry_filename(tenant_id, timeline_id, lsn))
     234            0 :     }
     235              : }
     236              : 
     237              : /// The background task that does the job to prepare basebackups
     238              : /// and manage the cache entries on disk.
     239              : /// It is a separate struct from BasebackupCache to allow holding
     240              : /// a mutable reference to this state without a mutex lock,
     241              : /// while BasebackupCache is referenced by the clients.
     242              : struct BackgroundTask {
     243              :     c: Arc<BasebackupCache>,
     244              : 
     245              :     config: BasebackupCacheConfig,
     246              :     tenant_manager: Arc<TenantManager>,
     247              :     cancel: CancellationToken,
     248              : 
     249              :     /// Number of the entries in the cache.
     250              :     /// This counter is used for metrics and applying cache limits.
     251              :     /// It generally should be equal to c.entries.len(), but it's calculated
     252              :     /// pessimistically for abnormal situations: if we encountered some errors
     253              :     /// during removing the entry from disk, we won't decrement this counter to
     254              :     /// make sure that we don't exceed the limit with "trashed" files on the disk.
     255              :     /// It will also count files in the data_dir that are not valid cache entries.
     256              :     entry_count: usize,
     257              :     /// Total size of all the entries on the disk.
     258              :     /// This counter is used for metrics and applying cache limits.
     259              :     /// Similar to entry_count, it is calculated pessimistically for abnormal situations.
     260              :     total_size_bytes: u64,
     261              : 
     262              :     prepare_ok_count: GenericCounter<AtomicU64>,
     263              :     prepare_skip_count: GenericCounter<AtomicU64>,
     264              :     prepare_err_count: GenericCounter<AtomicU64>,
     265              : }
     266              : 
     267              : impl BackgroundTask {
     268            0 :     fn tmp_dir(&self) -> Utf8PathBuf {
     269            0 :         self.c.data_dir.join("tmp")
     270            0 :     }
     271              : 
     272            0 :     fn entry_tmp_path(
     273            0 :         &self,
     274            0 :         tenant_id: TenantId,
     275            0 :         timeline_id: TimelineId,
     276            0 :         lsn: Lsn,
     277            0 :     ) -> Utf8PathBuf {
     278            0 :         self.tmp_dir()
     279            0 :             .join(BasebackupCache::entry_filename(tenant_id, timeline_id, lsn))
     280            0 :     }
     281              : 
     282            0 :     fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
     283            0 :         let parts: Vec<&str> = filename
     284            0 :             .strip_prefix("basebackup_")?
     285            0 :             .strip_suffix(".tar.gz")?
     286            0 :             .split('_')
     287            0 :             .collect();
     288            0 :         if parts.len() != 3 {
     289            0 :             return None;
     290            0 :         }
     291            0 :         let tenant_id = parts[0].parse::<TenantId>().ok()?;
     292            0 :         let timeline_id = parts[1].parse::<TimelineId>().ok()?;
     293            0 :         let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
     294              : 
     295            0 :         Some((tenant_id, timeline_id, lsn))
     296            0 :     }
     297              : 
     298              :     // Recreate the tmp directory to clear all files in it.
     299            0 :     async fn clean_tmp_dir(&self) -> anyhow::Result<()> {
     300            0 :         let tmp_dir = self.tmp_dir();
     301            0 :         if tmp_dir.exists() {
     302            0 :             tokio::fs::remove_dir_all(&tmp_dir).await?;
     303            0 :         }
     304            0 :         tokio::fs::create_dir_all(&tmp_dir).await?;
     305            0 :         Ok(())
     306            0 :     }
     307              : 
     308            0 :     async fn cleanup(&mut self) -> anyhow::Result<()> {
     309            0 :         self.clean_tmp_dir().await?;
     310              : 
     311              :         // Leave only up-to-date entries.
     312            0 :         let entries_old = self.c.entries.lock().unwrap().clone();
     313            0 :         let mut entries_new = HashMap::new();
     314            0 :         for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
     315            0 :             if !tenant_shard_id.is_shard_zero() {
     316            0 :                 continue;
     317            0 :             }
     318            0 :             let TenantSlot::Attached(tenant) = tenant_slot else {
     319            0 :                 continue;
     320              :             };
     321            0 :             let tenant_id = tenant_shard_id.tenant_id;
     322              : 
     323            0 :             for timeline in tenant.list_timelines() {
     324            0 :                 let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
     325            0 :                 if let Some(entry) = entries_old.get(&tti) {
     326            0 :                     if timeline.get_last_record_lsn() <= entry.lsn {
     327            0 :                         entries_new.insert(tti, entry.clone());
     328            0 :                     }
     329            0 :                 }
     330              :             }
     331              :         }
     332              : 
     333              :         // Try to remove all entries that are not up-to-date.
     334            0 :         for (&tti, entry) in entries_old.iter() {
     335            0 :             if !entries_new.contains_key(&tti) {
     336            0 :                 self.try_remove_entry(tti.tenant_id, tti.timeline_id, entry)
     337            0 :                     .await;
     338            0 :             }
     339              :         }
     340              : 
     341              :         // Note: BackgroundTask is the only writer for self.c.entries,
     342              :         // so it couldn't have been modified concurrently.
     343            0 :         *self.c.entries.lock().unwrap() = entries_new;
     344              : 
     345            0 :         Ok(())
     346            0 :     }
     347              : 
     348            0 :     async fn on_startup(&mut self) -> anyhow::Result<()> {
     349              :         // Create data_dir if it does not exist.
     350            0 :         tokio::fs::create_dir_all(&self.c.data_dir)
     351            0 :             .await
     352            0 :             .context("Failed to create basebackup cache data directory")?;
     353              : 
     354            0 :         self.clean_tmp_dir()
     355            0 :             .await
     356            0 :             .context("Failed to clean tmp directory")?;
     357              : 
     358              :         // Read existing entries from the data_dir and add them to in-memory state.
     359            0 :         let mut entries = HashMap::<TenantTimelineId, CacheEntry>::new();
     360            0 :         let mut dir = tokio::fs::read_dir(&self.c.data_dir).await?;
     361            0 :         while let Some(dir_entry) = dir.next_entry().await? {
     362            0 :             let filename = dir_entry.file_name();
     363              : 
     364            0 :             if filename == "tmp" {
     365              :                 // Skip the tmp directory.
     366            0 :                 continue;
     367            0 :             }
     368              : 
     369            0 :             let size_bytes = dir_entry
     370            0 :                 .metadata()
     371            0 :                 .await
     372            0 :                 .map_err(|e| {
     373            0 :                     anyhow::anyhow!("Failed to read metadata for file {:?}: {:?}", filename, e)
     374            0 :                 })?
     375            0 :                 .len();
     376              : 
     377            0 :             self.entry_count += 1;
     378            0 :             BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
     379              : 
     380            0 :             self.total_size_bytes += size_bytes;
     381            0 :             BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
     382              : 
     383            0 :             let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
     384            0 :             let Some((tenant_id, timeline_id, lsn)) = parsed else {
     385            0 :                 tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
     386            0 :                 continue;
     387              :             };
     388              : 
     389            0 :             let cur_entry = CacheEntry { lsn, size_bytes };
     390              : 
     391            0 :             let tti = TenantTimelineId::new(tenant_id, timeline_id);
     392              : 
     393              :             use std::collections::hash_map::Entry::*;
     394              : 
     395            0 :             match entries.entry(tti) {
     396            0 :                 Occupied(mut entry) => {
     397            0 :                     let found_entry = entry.get();
     398              :                     // Leave only the latest entry, remove the old one.
     399            0 :                     if cur_entry.lsn < found_entry.lsn {
     400            0 :                         self.try_remove_entry(tenant_id, timeline_id, &cur_entry)
     401            0 :                             .await;
     402            0 :                     } else if cur_entry.lsn > found_entry.lsn {
     403            0 :                         self.try_remove_entry(tenant_id, timeline_id, found_entry)
     404            0 :                             .await;
     405            0 :                         entry.insert(cur_entry);
     406              :                     } else {
     407              :                         // Two different filenames parsed to the same timline_id and LSN.
     408              :                         // Should never happen.
     409            0 :                         return Err(anyhow::anyhow!(
     410            0 :                             "Duplicate basebackup cache entry with the same LSN: {:?}",
     411            0 :                             filename
     412            0 :                         ));
     413              :                     }
     414              :                 }
     415            0 :                 Vacant(entry) => {
     416            0 :                     entry.insert(cur_entry);
     417            0 :                 }
     418              :             }
     419              :         }
     420              : 
     421            0 :         *self.c.entries.lock().unwrap() = entries;
     422              : 
     423            0 :         Ok(())
     424            0 :     }
     425              : 
     426            0 :     async fn run(mut self, mut prepare_receiver: BasebackupPrepareReceiver) {
     427              :         // Panic in the background is a safe fallback.
     428              :         // It will drop receivers and the cache will be effectively disabled.
     429            0 :         self.on_startup()
     430            0 :             .await
     431            0 :             .expect("Failed to initialize basebackup cache");
     432              : 
     433            0 :         let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
     434            0 :         cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
     435              : 
     436              :         loop {
     437            0 :             tokio::select! {
     438            0 :                 Some(req) = prepare_receiver.recv() => {
     439            0 :                     BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec();
     440            0 :                     if let Err(err) = self.prepare_basebackup(
     441            0 :                         req.tenant_shard_id,
     442            0 :                         req.timeline_id,
     443            0 :                         req.lsn,
     444            0 :                     ).await {
     445            0 :                         tracing::info!("Failed to prepare basebackup: {:#}", err);
     446            0 :                         self.prepare_err_count.inc();
     447            0 :                         continue;
     448            0 :                     }
     449              :                 }
     450            0 :                 _ = cleanup_ticker.tick() => {
     451            0 :                     self.cleanup().await.unwrap_or_else(|e| {
     452            0 :                         tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
     453            0 :                     });
     454              :                 }
     455            0 :                 _ = self.cancel.cancelled() => {
     456            0 :                     tracing::info!("BasebackupCache background task cancelled");
     457            0 :                     break;
     458              :                 }
     459              :             }
     460              :         }
     461            0 :     }
     462              : 
     463              :     /// Try to remove an entry from disk.
     464              :     /// The caller is responsible for removing the entry from the in-memory state.
     465              :     /// Updates size counters and corresponding metrics.
     466              :     /// Ignores the filesystem errors as not-so-important, but the size counters
     467              :     /// are not decremented in this case, so the file will continue to be counted
     468              :     /// towards the size limits.
     469            0 :     async fn try_remove_entry(
     470            0 :         &mut self,
     471            0 :         tenant_id: TenantId,
     472            0 :         timeline_id: TimelineId,
     473            0 :         entry: &CacheEntry,
     474            0 :     ) {
     475            0 :         let entry_path = self.c.entry_path(tenant_id, timeline_id, entry.lsn);
     476              : 
     477            0 :         match tokio::fs::remove_file(&entry_path).await {
     478            0 :             Ok(_) => {}
     479            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     480            0 :             Err(e) => {
     481            0 :                 tracing::warn!(
     482            0 :                     "Failed to remove basebackup cache file for tenant {} timeline {} LSN {}: {:#}",
     483              :                     tenant_id,
     484              :                     timeline_id,
     485              :                     entry.lsn,
     486              :                     e
     487              :                 );
     488            0 :                 return;
     489              :             }
     490              :         }
     491              : 
     492            0 :         self.entry_count -= 1;
     493            0 :         BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
     494              : 
     495            0 :         self.total_size_bytes -= entry.size_bytes;
     496            0 :         BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
     497            0 :     }
     498              : 
     499              :     /// Insert the cache entry into in-memory state and update the size counters.
     500              :     /// Assumes that the file for the entry already exists on disk.
     501              :     /// If the entry already exists with previous LSN, it will be removed.
     502            0 :     async fn upsert_entry(
     503            0 :         &mut self,
     504            0 :         tenant_id: TenantId,
     505            0 :         timeline_id: TimelineId,
     506            0 :         entry: CacheEntry,
     507            0 :     ) {
     508            0 :         let tti = TenantTimelineId::new(tenant_id, timeline_id);
     509              : 
     510            0 :         self.entry_count += 1;
     511            0 :         BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
     512              : 
     513            0 :         self.total_size_bytes += entry.size_bytes;
     514            0 :         BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
     515              : 
     516            0 :         let old_entry = self.c.entries.lock().unwrap().insert(tti, entry);
     517              : 
     518            0 :         if let Some(old_entry) = old_entry {
     519            0 :             self.try_remove_entry(tenant_id, timeline_id, &old_entry)
     520            0 :                 .await;
     521            0 :         }
     522            0 :     }
     523              : 
     524              :     /// Prepare a basebackup for the given timeline.
     525              :     ///
     526              :     /// If the basebackup already exists with a higher LSN or the timeline already
     527              :     /// has a higher last_record_lsn, skip the preparation.
     528              :     ///
     529              :     /// The basebackup is prepared in a temporary directory and then moved to the final
     530              :     /// location to make the operation atomic.
     531            0 :     async fn prepare_basebackup(
     532            0 :         &mut self,
     533            0 :         tenant_shard_id: TenantShardId,
     534            0 :         timeline_id: TimelineId,
     535            0 :         req_lsn: Lsn,
     536            0 :     ) -> anyhow::Result<()> {
     537            0 :         tracing::info!(
     538              :             tenant_id = %tenant_shard_id.tenant_id,
     539              :             %timeline_id,
     540              :             %req_lsn,
     541            0 :             "Preparing basebackup for timeline",
     542              :         );
     543              : 
     544            0 :         let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
     545              : 
     546              :         // TODO(diko): I don't think we will hit the limit,
     547              :         // but if we do, it makes sense to try to evict oldest entries. here
     548            0 :         if self.entry_count >= self.config.max_size_entries {
     549            0 :             tracing::info!(
     550              :                 %tenant_shard_id,
     551              :                 %timeline_id,
     552              :                 %req_lsn,
     553            0 :                 "Basebackup cache is full (max_size_entries), skipping basebackup",
     554              :             );
     555            0 :             self.prepare_skip_count.inc();
     556            0 :             return Ok(());
     557            0 :         }
     558              : 
     559            0 :         if self.total_size_bytes >= self.config.max_total_size_bytes {
     560            0 :             tracing::info!(
     561              :                 %tenant_shard_id,
     562              :                 %timeline_id,
     563              :                 %req_lsn,
     564            0 :                 "Basebackup cache is full (max_total_size_bytes), skipping basebackup",
     565              :             );
     566            0 :             self.prepare_skip_count.inc();
     567            0 :             return Ok(());
     568            0 :         }
     569              : 
     570              :         {
     571            0 :             let entries = self.c.entries.lock().unwrap();
     572            0 :             if let Some(entry) = entries.get(&tti) {
     573            0 :                 if entry.lsn >= req_lsn {
     574            0 :                     tracing::info!(
     575              :                         %timeline_id,
     576              :                         %req_lsn,
     577              :                         %entry.lsn,
     578            0 :                         "Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
     579              :                     );
     580            0 :                     self.prepare_skip_count.inc();
     581            0 :                     return Ok(());
     582            0 :                 }
     583            0 :             }
     584              :         }
     585              : 
     586            0 :         let tenant = self
     587            0 :             .tenant_manager
     588            0 :             .get_attached_tenant_shard(tenant_shard_id)?;
     589              : 
     590            0 :         let tenant_state = tenant.current_state();
     591            0 :         if tenant_state != TenantState::Active {
     592            0 :             anyhow::bail!(
     593            0 :                 "Tenant {} is not active, current state: {:?}",
     594              :                 tenant_shard_id.tenant_id,
     595              :                 tenant_state
     596              :             )
     597            0 :         }
     598              : 
     599            0 :         let timeline = tenant.get_timeline(timeline_id, true)?;
     600              : 
     601            0 :         let last_record_lsn = timeline.get_last_record_lsn();
     602            0 :         if last_record_lsn > req_lsn {
     603            0 :             tracing::info!(
     604              :                 %timeline_id,
     605              :                 %req_lsn,
     606              :                 %last_record_lsn,
     607            0 :                 "Timeline has a higher LSN than the requested one, skipping basebackup",
     608              :             );
     609            0 :             self.prepare_skip_count.inc();
     610            0 :             return Ok(());
     611            0 :         }
     612              : 
     613            0 :         let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
     614              : 
     615            0 :         let res = self
     616            0 :             .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
     617            0 :             .await;
     618              : 
     619            0 :         let entry = match res {
     620            0 :             Ok(entry) => entry,
     621            0 :             Err(err) => {
     622            0 :                 tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
     623              :                 // Try to clean up tmp file. If we fail, the background clean up task will take care of it.
     624            0 :                 match tokio::fs::remove_file(&entry_tmp_path).await {
     625            0 :                     Ok(_) => {}
     626            0 :                     Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     627            0 :                     Err(e) => {
     628            0 :                         tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
     629              :                     }
     630              :                 }
     631            0 :                 return Err(err);
     632              :             }
     633              :         };
     634              : 
     635              :         // Move the tmp file to the final location atomically.
     636              :         // The tmp file is fsynced, so it's guaranteed that we will not have a partial file
     637              :         // in the main directory.
     638              :         // It's not necessary to fsync the inode after renaming, because the worst case is that
     639              :         // the rename operation will be rolled back on the disk failure, the entry will disappear
     640              :         // from the main directory, and the entry access will cause a cache miss.
     641            0 :         let entry_path = self
     642            0 :             .c
     643            0 :             .entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
     644            0 :         tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
     645              : 
     646            0 :         self.upsert_entry(tenant_shard_id.tenant_id, timeline_id, entry)
     647            0 :             .await;
     648              : 
     649            0 :         self.prepare_ok_count.inc();
     650            0 :         Ok(())
     651            0 :     }
     652              : 
     653              :     /// Prepares a basebackup in a temporary file.
     654              :     /// Guarantees that the tmp file is fsynced before returning.
     655            0 :     async fn prepare_basebackup_tmp(
     656            0 :         &self,
     657            0 :         entry_tmp_path: &Utf8Path,
     658            0 :         timeline: &Arc<Timeline>,
     659            0 :         req_lsn: Lsn,
     660            0 :     ) -> anyhow::Result<CacheEntry> {
     661            0 :         let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
     662            0 :         let ctx = ctx.with_scope_timeline(timeline);
     663              : 
     664            0 :         let file = tokio::fs::File::create(entry_tmp_path).await?;
     665            0 :         let mut writer = BufWriter::new(file);
     666              : 
     667              :         // We may receive a request before the WAL record is applied to the timeline.
     668              :         // Wait for the requested LSN to be applied.
     669            0 :         timeline
     670            0 :             .wait_lsn(
     671            0 :                 req_lsn,
     672            0 :                 crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
     673            0 :                 crate::tenant::timeline::WaitLsnTimeout::Default,
     674            0 :                 &ctx,
     675            0 :             )
     676            0 :             .await?;
     677              : 
     678            0 :         send_basebackup_tarball(
     679            0 :             &mut writer,
     680            0 :             timeline,
     681            0 :             Some(req_lsn),
     682            0 :             None,
     683            0 :             false,
     684            0 :             false,
     685            0 :             // Level::Best because compression is not on the hot path of basebackup requests.
     686            0 :             // The decompression is almost not affected by the compression level.
     687            0 :             Some(async_compression::Level::Best),
     688            0 :             &ctx,
     689            0 :         )
     690            0 :         .await?;
     691              : 
     692            0 :         writer.flush().await?;
     693            0 :         writer.into_inner().sync_all().await?;
     694              : 
     695              :         // TODO(diko): we can count it via Writer wrapper instead of a syscall.
     696            0 :         let size_bytes = tokio::fs::metadata(entry_tmp_path).await?.len();
     697              : 
     698            0 :         Ok(CacheEntry {
     699            0 :             lsn: req_lsn,
     700            0 :             size_bytes,
     701            0 :         })
     702            0 :     }
     703              : }
        

Generated by: LCOV version 2.1-beta