LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - heatmap_layers_downloader.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 7.4 % 121 9
Test Date: 2025-03-12 00:01:28 Functions: 16.7 % 12 2

            Line data    Source code
       1              : //! Timeline utility module to hydrate everything from the current heatmap.
       2              : //!
       3              : //! Provides utilities to spawn and abort a background task where the downloads happen.
       4              : //! See /v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers.
       5              : 
       6              : use std::sync::{Arc, Mutex};
       7              : 
       8              : use futures::StreamExt;
       9              : use http_utils::error::ApiError;
      10              : use tokio_util::sync::CancellationToken;
      11              : use utils::sync::gate::Gate;
      12              : 
      13              : use crate::context::RequestContext;
      14              : 
      15              : use super::Timeline;
      16              : 
      17              : // This status is not strictly necessary now, but gives us a nice place
      18              : // to store progress information if we ever wish to expose it.
      19              : pub(super) enum HeatmapLayersDownloadStatus {
      20              :     InProgress,
      21              :     Complete,
      22              : }
      23              : 
      24              : pub(super) struct HeatmapLayersDownloader {
      25              :     handle: tokio::task::JoinHandle<()>,
      26              :     status: Arc<Mutex<HeatmapLayersDownloadStatus>>,
      27              :     cancel: CancellationToken,
      28              :     downloads_guard: Arc<Gate>,
      29              : }
      30              : 
      31              : impl HeatmapLayersDownloader {
      32            0 :     fn new(
      33            0 :         timeline: Arc<Timeline>,
      34            0 :         concurrency: usize,
      35            0 :         recurse: bool,
      36            0 :         ctx: RequestContext,
      37            0 :     ) -> Result<HeatmapLayersDownloader, ApiError> {
      38            0 :         let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
      39              : 
      40            0 :         let cancel = timeline.cancel.child_token();
      41            0 :         let downloads_guard = Arc::new(Gate::default());
      42            0 : 
      43            0 :         let status = Arc::new(Mutex::new(HeatmapLayersDownloadStatus::InProgress));
      44            0 : 
      45            0 :         let handle = tokio::task::spawn({
      46            0 :             let status = status.clone();
      47            0 :             let downloads_guard = downloads_guard.clone();
      48            0 :             let cancel = cancel.clone();
      49            0 : 
      50            0 :             async move {
      51            0 :                 let _guard = tl_guard;
      52            0 : 
      53            0 :                 scopeguard::defer! {
      54            0 :                     *status.lock().unwrap() = HeatmapLayersDownloadStatus::Complete;
      55            0 :                 }
      56              : 
      57            0 :                 let Some(heatmap) = timeline.generate_heatmap().await else {
      58            0 :                     tracing::info!("Heatmap layers download failed to generate heatmap");
      59            0 :                     return;
      60              :                 };
      61              : 
      62            0 :                 tracing::info!(
      63            0 :                     resident_size=%timeline.resident_physical_size(),
      64            0 :                     heatmap_layers=%heatmap.all_layers().count(),
      65            0 :                     "Starting heatmap layers download"
      66              :                 );
      67              : 
      68            0 :                 let stream = futures::stream::iter(heatmap.all_layers().cloned().filter_map(
      69            0 :                     |layer| {
      70            0 :                         let ctx = ctx.attached_child();
      71            0 :                         let tl = timeline.clone();
      72            0 :                         let dl_guard = match downloads_guard.enter() {
      73            0 :                             Ok(g) => g,
      74              :                             Err(_) => {
      75              :                                 // [`Self::shutdown`] was called. Don't spawn any more downloads.
      76            0 :                                 return None;
      77              :                             }
      78              :                         };
      79              : 
      80            0 :                         Some(async move {
      81            0 :                             let _dl_guard = dl_guard;
      82              : 
      83            0 :                             let res = tl.download_layer(&layer.name, &ctx).await;
      84            0 :                             if let Err(err) = res {
      85            0 :                                 if !err.is_cancelled() {
      86            0 :                                     tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}")
      87            0 :                                 }
      88            0 :                             }
      89            0 :                         })
      90            0 :                     }
      91            0 :                 )).buffered(concurrency);
      92            0 : 
      93            0 :                 tokio::select! {
      94            0 :                     _ = stream.collect::<()>() => {
      95            0 :                         tracing::info!(
      96            0 :                             resident_size=%timeline.resident_physical_size(),
      97            0 :                             "Heatmap layers download completed"
      98              :                         );
      99              :                     },
     100            0 :                     _ = cancel.cancelled() => {
     101            0 :                         tracing::info!("Heatmap layers download cancelled");
     102            0 :                         return;
     103              :                     }
     104              :                 }
     105              : 
     106            0 :                 if recurse {
     107            0 :                     if let Some(ancestor) = timeline.ancestor_timeline() {
     108            0 :                         let ctx = ctx.attached_child();
     109            0 :                         let res =
     110            0 :                             ancestor.start_heatmap_layers_download(concurrency, recurse, &ctx);
     111            0 :                         if let Err(err) = res {
     112            0 :                             tracing::info!(
     113            0 :                                 "Failed to start heatmap layers download for ancestor: {err}"
     114              :                             );
     115            0 :                         }
     116            0 :                     }
     117            0 :                 }
     118            0 :             }
     119            0 :         });
     120            0 : 
     121            0 :         Ok(Self {
     122            0 :             status,
     123            0 :             handle,
     124            0 :             cancel,
     125            0 :             downloads_guard,
     126            0 :         })
     127            0 :     }
     128              : 
     129            0 :     fn is_complete(&self) -> bool {
     130            0 :         matches!(
     131            0 :             *self.status.lock().unwrap(),
     132              :             HeatmapLayersDownloadStatus::Complete
     133              :         )
     134            0 :     }
     135              : 
     136              :     /// Drive any in-progress downloads to completion and stop spawning any new ones.
     137              :     ///
     138              :     /// This has two callers and they behave differently
     139              :     /// 1. [`Timeline::shutdown`]: the drain will be immediate since downloads themselves
     140              :     ///    are sensitive to timeline cancellation.
     141              :     ///
     142              :     /// 2. Endpoint handler in [`crate::http::routes`]: the drain will wait for any in-progress
     143              :     ///    downloads to complete.
     144            0 :     async fn stop_and_drain(self) {
     145            0 :         // Counterintuitive: close the guard before cancelling.
     146            0 :         // Something needs to poll the already created download futures to completion.
     147            0 :         // If we cancel first, then the underlying task exits and we lost
     148            0 :         // the poller.
     149            0 :         self.downloads_guard.close().await;
     150            0 :         self.cancel.cancel();
     151            0 :         if let Err(err) = self.handle.await {
     152            0 :             tracing::warn!("Failed to join heatmap layer downloader task: {err}");
     153            0 :         }
     154            0 :     }
     155              : }
     156              : 
     157              : impl Timeline {
     158            0 :     pub(crate) fn start_heatmap_layers_download(
     159            0 :         self: &Arc<Self>,
     160            0 :         concurrency: usize,
     161            0 :         recurse: bool,
     162            0 :         ctx: &RequestContext,
     163            0 :     ) -> Result<(), ApiError> {
     164            0 :         let mut locked = self.heatmap_layers_downloader.lock().unwrap();
     165            0 :         if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
     166            0 :             let dl = HeatmapLayersDownloader::new(
     167            0 :                 self.clone(),
     168            0 :                 concurrency,
     169            0 :                 recurse,
     170            0 :                 ctx.attached_child(),
     171            0 :             )?;
     172            0 :             *locked = Some(dl);
     173            0 :             Ok(())
     174              :         } else {
     175            0 :             Err(ApiError::Conflict("Already running".to_string()))
     176              :         }
     177            0 :     }
     178              : 
     179           20 :     pub(crate) async fn stop_and_drain_heatmap_layers_download(&self) {
     180           20 :         // This can race with the start of a new downloader and lead to a situation
     181           20 :         // where one donloader is shutting down and another one is in-flight.
     182           20 :         // The only impact is that we'd end up using more remote storage semaphore
     183           20 :         // units than expected.
     184           20 :         let downloader = self.heatmap_layers_downloader.lock().unwrap().take();
     185           20 :         if let Some(dl) = downloader {
     186            0 :             dl.stop_and_drain().await;
     187           20 :         }
     188           20 :     }
     189              : }
        

Generated by: LCOV version 2.1-beta