Line data Source code
1 : //! Timeline utility module to hydrate everything from the current heatmap.
2 : //!
3 : //! Provides utilities to spawn and abort a background task where the downloads happen.
4 : //! See /v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers.
5 :
6 : use futures::StreamExt;
7 : use http_utils::error::ApiError;
8 : use std::sync::{Arc, Mutex};
9 : use tokio_util::sync::CancellationToken;
10 : use utils::sync::gate::Gate;
11 :
12 : use super::Timeline;
13 :
14 : // This status is not strictly necessary now, but gives us a nice place
15 : // to store progress information if we ever wish to expose it.
16 : pub(super) enum HeatmapLayersDownloadStatus {
17 : InProgress,
18 : Complete,
19 : }
20 :
21 : pub(super) struct HeatmapLayersDownloader {
22 : handle: tokio::task::JoinHandle<()>,
23 : status: Arc<Mutex<HeatmapLayersDownloadStatus>>,
24 : cancel: CancellationToken,
25 : downloads_guard: Arc<Gate>,
26 : }
27 :
28 : impl HeatmapLayersDownloader {
29 0 : fn new(
30 0 : timeline: Arc<Timeline>,
31 0 : concurrency: usize,
32 0 : ) -> Result<HeatmapLayersDownloader, ApiError> {
33 0 : let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
34 :
35 0 : let cancel = timeline.cancel.child_token();
36 0 : let downloads_guard = Arc::new(Gate::default());
37 0 :
38 0 : let status = Arc::new(Mutex::new(HeatmapLayersDownloadStatus::InProgress));
39 0 :
40 0 : let handle = tokio::task::spawn({
41 0 : let status = status.clone();
42 0 : let downloads_guard = downloads_guard.clone();
43 0 : let cancel = cancel.clone();
44 0 :
45 0 : async move {
46 0 : let _guard = tl_guard;
47 0 :
48 0 : scopeguard::defer! {
49 0 : *status.lock().unwrap() = HeatmapLayersDownloadStatus::Complete;
50 0 : }
51 :
52 0 : let Some(heatmap) = timeline.generate_heatmap().await else {
53 0 : tracing::info!("Heatmap layers download failed to generate heatmap");
54 0 : return;
55 : };
56 :
57 0 : tracing::info!(
58 0 : resident_size=%timeline.resident_physical_size(),
59 0 : heatmap_layers=%heatmap.layers.len(),
60 0 : "Starting heatmap layers download"
61 : );
62 :
63 0 : let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
64 0 : |layer| {
65 0 : let tl = timeline.clone();
66 0 : let dl_guard = match downloads_guard.enter() {
67 0 : Ok(g) => g,
68 : Err(_) => {
69 : // [`Self::shutdown`] was called. Don't spawn any more downloads.
70 0 : return None;
71 : }
72 : };
73 :
74 0 : Some(async move {
75 0 : let _dl_guard = dl_guard;
76 :
77 0 : let res = tl.download_layer(&layer.name).await;
78 0 : if let Err(err) = res {
79 0 : if !err.is_cancelled() {
80 0 : tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}")
81 0 : }
82 0 : }
83 0 : })
84 0 : }
85 0 : )).buffered(concurrency);
86 0 :
87 0 : tokio::select! {
88 0 : _ = stream.collect::<()>() => {
89 0 : tracing::info!(
90 0 : resident_size=%timeline.resident_physical_size(),
91 0 : "Heatmap layers download completed"
92 : );
93 : },
94 0 : _ = cancel.cancelled() => {
95 0 : tracing::info!("Heatmap layers download cancelled");
96 : }
97 : }
98 0 : }
99 0 : });
100 0 :
101 0 : Ok(Self {
102 0 : status,
103 0 : handle,
104 0 : cancel,
105 0 : downloads_guard,
106 0 : })
107 0 : }
108 :
109 0 : fn is_complete(&self) -> bool {
110 0 : matches!(
111 0 : *self.status.lock().unwrap(),
112 : HeatmapLayersDownloadStatus::Complete
113 : )
114 0 : }
115 :
116 : /// Drive any in-progress downloads to completion and stop spawning any new ones.
117 : ///
118 : /// This has two callers and they behave differently
119 : /// 1. [`Timeline::shutdown`]: the drain will be immediate since downloads themselves
120 : /// are sensitive to timeline cancellation.
121 : ///
122 : /// 2. Endpoint handler in [`crate::http::routes`]: the drain will wait for any in-progress
123 : /// downloads to complete.
124 0 : async fn stop_and_drain(self) {
125 0 : // Counterintuitive: close the guard before cancelling.
126 0 : // Something needs to poll the already created download futures to completion.
127 0 : // If we cancel first, then the underlying task exits and we lost
128 0 : // the poller.
129 0 : self.downloads_guard.close().await;
130 0 : self.cancel.cancel();
131 0 : if let Err(err) = self.handle.await {
132 0 : tracing::warn!("Failed to join heatmap layer downloader task: {err}");
133 0 : }
134 0 : }
135 : }
136 :
137 : impl Timeline {
138 0 : pub(crate) async fn start_heatmap_layers_download(
139 0 : self: &Arc<Self>,
140 0 : concurrency: usize,
141 0 : ) -> Result<(), ApiError> {
142 0 : let mut locked = self.heatmap_layers_downloader.lock().unwrap();
143 0 : if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
144 0 : let dl = HeatmapLayersDownloader::new(self.clone(), concurrency)?;
145 0 : *locked = Some(dl);
146 0 : Ok(())
147 : } else {
148 0 : Err(ApiError::Conflict("Already running".to_string()))
149 : }
150 0 : }
151 :
152 20 : pub(crate) async fn stop_and_drain_heatmap_layers_download(&self) {
153 20 : // This can race with the start of a new downloader and lead to a situation
154 20 : // where one donloader is shutting down and another one is in-flight.
155 20 : // The only impact is that we'd end up using more remote storage semaphore
156 20 : // units than expected.
157 20 : let downloader = self.heatmap_layers_downloader.lock().unwrap().take();
158 20 : if let Some(dl) = downloader {
159 0 : dl.stop_and_drain().await;
160 20 : }
161 20 : }
162 : }
|