LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - heatmap_layers_downloader.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 9.1 % 99 9
Test Date: 2025-02-20 13:11:02 Functions: 15.4 % 13 2

            Line data    Source code
       1              : //! Timeline utility module to hydrate everything from the current heatmap.
       2              : //!
       3              : //! Provides utilities to spawn and abort a background task where the downloads happen.
       4              : //! See /v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers.
       5              : 
       6              : use futures::StreamExt;
       7              : use http_utils::error::ApiError;
       8              : use std::sync::{Arc, Mutex};
       9              : use tokio_util::sync::CancellationToken;
      10              : use utils::sync::gate::Gate;
      11              : 
      12              : use super::Timeline;
      13              : 
      14              : // This status is not strictly necessary now, but gives us a nice place
      15              : // to store progress information if we ever wish to expose it.
      16              : pub(super) enum HeatmapLayersDownloadStatus {
      17              :     InProgress,
      18              :     Complete,
      19              : }
      20              : 
      21              : pub(super) struct HeatmapLayersDownloader {
      22              :     handle: tokio::task::JoinHandle<()>,
      23              :     status: Arc<Mutex<HeatmapLayersDownloadStatus>>,
      24              :     cancel: CancellationToken,
      25              :     downloads_guard: Arc<Gate>,
      26              : }
      27              : 
      28              : impl HeatmapLayersDownloader {
      29            0 :     fn new(
      30            0 :         timeline: Arc<Timeline>,
      31            0 :         concurrency: usize,
      32            0 :     ) -> Result<HeatmapLayersDownloader, ApiError> {
      33            0 :         let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
      34              : 
      35            0 :         let cancel = timeline.cancel.child_token();
      36            0 :         let downloads_guard = Arc::new(Gate::default());
      37            0 : 
      38            0 :         let status = Arc::new(Mutex::new(HeatmapLayersDownloadStatus::InProgress));
      39            0 : 
      40            0 :         let handle = tokio::task::spawn({
      41            0 :             let status = status.clone();
      42            0 :             let downloads_guard = downloads_guard.clone();
      43            0 :             let cancel = cancel.clone();
      44            0 : 
      45            0 :             async move {
      46            0 :                 let _guard = tl_guard;
      47            0 : 
      48            0 :                 scopeguard::defer! {
      49            0 :                     *status.lock().unwrap() = HeatmapLayersDownloadStatus::Complete;
      50            0 :                 }
      51              : 
      52            0 :                 let Some(heatmap) = timeline.generate_heatmap().await else {
      53            0 :                     tracing::info!("Heatmap layers download failed to generate heatmap");
      54            0 :                     return;
      55              :                 };
      56              : 
      57            0 :                 tracing::info!(
      58            0 :                     resident_size=%timeline.resident_physical_size(),
      59            0 :                     heatmap_layers=%heatmap.layers.len(),
      60            0 :                     "Starting heatmap layers download"
      61              :                 );
      62              : 
      63            0 :                 let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
      64            0 :                     |layer| {
      65            0 :                         let tl = timeline.clone();
      66            0 :                         let dl_guard = match downloads_guard.enter() {
      67            0 :                             Ok(g) => g,
      68              :                             Err(_) => {
      69              :                                 // [`Self::shutdown`] was called. Don't spawn any more downloads.
      70            0 :                                 return None;
      71              :                             }
      72              :                         };
      73              : 
      74            0 :                         Some(async move {
      75            0 :                             let _dl_guard = dl_guard;
      76              : 
      77            0 :                             let res = tl.download_layer(&layer.name).await;
      78            0 :                             if let Err(err) = res {
      79            0 :                                 if !err.is_cancelled() {
      80            0 :                                     tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}")
      81            0 :                                 }
      82            0 :                             }
      83            0 :                         })
      84            0 :                     }
      85            0 :                 )).buffered(concurrency);
      86            0 : 
      87            0 :                 tokio::select! {
      88            0 :                     _ = stream.collect::<()>() => {
      89            0 :                         tracing::info!(
      90            0 :                             resident_size=%timeline.resident_physical_size(),
      91            0 :                             "Heatmap layers download completed"
      92              :                         );
      93              :                     },
      94            0 :                     _ = cancel.cancelled() => {
      95            0 :                         tracing::info!("Heatmap layers download cancelled");
      96              :                     }
      97              :                 }
      98            0 :             }
      99            0 :         });
     100            0 : 
     101            0 :         Ok(Self {
     102            0 :             status,
     103            0 :             handle,
     104            0 :             cancel,
     105            0 :             downloads_guard,
     106            0 :         })
     107            0 :     }
     108              : 
     109            0 :     fn is_complete(&self) -> bool {
     110            0 :         matches!(
     111            0 :             *self.status.lock().unwrap(),
     112              :             HeatmapLayersDownloadStatus::Complete
     113              :         )
     114            0 :     }
     115              : 
     116              :     /// Drive any in-progress downloads to completion and stop spawning any new ones.
     117              :     ///
     118              :     /// This has two callers and they behave differently
     119              :     /// 1. [`Timeline::shutdown`]: the drain will be immediate since downloads themselves
     120              :     ///    are sensitive to timeline cancellation.
     121              :     ///
     122              :     /// 2. Endpoint handler in [`crate::http::routes`]: the drain will wait for any in-progress
     123              :     ///    downloads to complete.
     124            0 :     async fn stop_and_drain(self) {
     125            0 :         // Counterintuitive: close the guard before cancelling.
     126            0 :         // Something needs to poll the already created download futures to completion.
     127            0 :         // If we cancel first, then the underlying task exits and we lost
     128            0 :         // the poller.
     129            0 :         self.downloads_guard.close().await;
     130            0 :         self.cancel.cancel();
     131            0 :         if let Err(err) = self.handle.await {
     132            0 :             tracing::warn!("Failed to join heatmap layer downloader task: {err}");
     133            0 :         }
     134            0 :     }
     135              : }
     136              : 
     137              : impl Timeline {
     138            0 :     pub(crate) async fn start_heatmap_layers_download(
     139            0 :         self: &Arc<Self>,
     140            0 :         concurrency: usize,
     141            0 :     ) -> Result<(), ApiError> {
     142            0 :         let mut locked = self.heatmap_layers_downloader.lock().unwrap();
     143            0 :         if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
     144            0 :             let dl = HeatmapLayersDownloader::new(self.clone(), concurrency)?;
     145            0 :             *locked = Some(dl);
     146            0 :             Ok(())
     147              :         } else {
     148            0 :             Err(ApiError::Conflict("Already running".to_string()))
     149              :         }
     150            0 :     }
     151              : 
     152           20 :     pub(crate) async fn stop_and_drain_heatmap_layers_download(&self) {
     153           20 :         // This can race with the start of a new downloader and lead to a situation
     154           20 :         // where one donloader is shutting down and another one is in-flight.
     155           20 :         // The only impact is that we'd end up using more remote storage semaphore
     156           20 :         // units than expected.
     157           20 :         let downloader = self.heatmap_layers_downloader.lock().unwrap().take();
     158           20 :         if let Some(dl) = downloader {
     159            0 :             dl.stop_and_drain().await;
     160           20 :         }
     161           20 :     }
     162              : }
        

Generated by: LCOV version 2.1-beta