Line data Source code
1 : //! Timeline utility module to hydrate everything from the current heatmap.
2 : //!
3 : //! Provides utilities to spawn and abort a background task where the downloads happen.
4 : //! See /v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers.
5 :
6 : use std::sync::{Arc, Mutex};
7 :
8 : use futures::StreamExt;
9 : use http_utils::error::ApiError;
10 : use tokio_util::sync::CancellationToken;
11 : use utils::sync::gate::Gate;
12 :
13 : use crate::context::RequestContext;
14 :
15 : use super::Timeline;
16 :
17 : // This status is not strictly necessary now, but gives us a nice place
18 : // to store progress information if we ever wish to expose it.
19 : pub(super) enum HeatmapLayersDownloadStatus {
20 : InProgress,
21 : Complete,
22 : }
23 :
24 : pub(super) struct HeatmapLayersDownloader {
25 : handle: tokio::task::JoinHandle<()>,
26 : status: Arc<Mutex<HeatmapLayersDownloadStatus>>,
27 : cancel: CancellationToken,
28 : downloads_guard: Arc<Gate>,
29 : }
30 :
31 : impl HeatmapLayersDownloader {
32 0 : fn new(
33 0 : timeline: Arc<Timeline>,
34 0 : concurrency: usize,
35 0 : recurse: bool,
36 0 : ctx: RequestContext,
37 0 : ) -> Result<HeatmapLayersDownloader, ApiError> {
38 0 : let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
39 :
40 0 : let cancel = timeline.cancel.child_token();
41 0 : let downloads_guard = Arc::new(Gate::default());
42 0 :
43 0 : let status = Arc::new(Mutex::new(HeatmapLayersDownloadStatus::InProgress));
44 0 :
45 0 : let handle = tokio::task::spawn({
46 0 : let status = status.clone();
47 0 : let downloads_guard = downloads_guard.clone();
48 0 : let cancel = cancel.clone();
49 0 :
50 0 : async move {
51 0 : let _guard = tl_guard;
52 0 :
53 0 : scopeguard::defer! {
54 0 : *status.lock().unwrap() = HeatmapLayersDownloadStatus::Complete;
55 0 : }
56 :
57 0 : let Some(heatmap) = timeline.generate_heatmap().await else {
58 0 : tracing::info!("Heatmap layers download failed to generate heatmap");
59 0 : return;
60 : };
61 :
62 0 : tracing::info!(
63 0 : resident_size=%timeline.resident_physical_size(),
64 0 : heatmap_layers=%heatmap.all_layers().count(),
65 0 : "Starting heatmap layers download"
66 : );
67 :
68 0 : let stream = futures::stream::iter(heatmap.all_layers().cloned().filter_map(
69 0 : |layer| {
70 0 : let ctx = ctx.attached_child();
71 0 : let tl = timeline.clone();
72 0 : let dl_guard = match downloads_guard.enter() {
73 0 : Ok(g) => g,
74 : Err(_) => {
75 : // [`Self::shutdown`] was called. Don't spawn any more downloads.
76 0 : return None;
77 : }
78 : };
79 :
80 0 : Some(async move {
81 0 : let _dl_guard = dl_guard;
82 :
83 0 : let res = tl.download_layer(&layer.name, &ctx).await;
84 0 : if let Err(err) = res {
85 0 : if !err.is_cancelled() {
86 0 : tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}")
87 0 : }
88 0 : }
89 0 : })
90 0 : }
91 0 : )).buffered(concurrency);
92 0 :
93 0 : tokio::select! {
94 0 : _ = stream.collect::<()>() => {
95 0 : tracing::info!(
96 0 : resident_size=%timeline.resident_physical_size(),
97 0 : "Heatmap layers download completed"
98 : );
99 : },
100 0 : _ = cancel.cancelled() => {
101 0 : tracing::info!("Heatmap layers download cancelled");
102 0 : return;
103 : }
104 : }
105 :
106 0 : if recurse {
107 0 : if let Some(ancestor) = timeline.ancestor_timeline() {
108 0 : let ctx = ctx.attached_child();
109 0 : let res =
110 0 : ancestor.start_heatmap_layers_download(concurrency, recurse, &ctx);
111 0 : if let Err(err) = res {
112 0 : tracing::info!(
113 0 : "Failed to start heatmap layers download for ancestor: {err}"
114 : );
115 0 : }
116 0 : }
117 0 : }
118 0 : }
119 0 : });
120 0 :
121 0 : Ok(Self {
122 0 : status,
123 0 : handle,
124 0 : cancel,
125 0 : downloads_guard,
126 0 : })
127 0 : }
128 :
129 0 : fn is_complete(&self) -> bool {
130 0 : matches!(
131 0 : *self.status.lock().unwrap(),
132 : HeatmapLayersDownloadStatus::Complete
133 : )
134 0 : }
135 :
136 : /// Drive any in-progress downloads to completion and stop spawning any new ones.
137 : ///
138 : /// This has two callers and they behave differently
139 : /// 1. [`Timeline::shutdown`]: the drain will be immediate since downloads themselves
140 : /// are sensitive to timeline cancellation.
141 : ///
142 : /// 2. Endpoint handler in [`crate::http::routes`]: the drain will wait for any in-progress
143 : /// downloads to complete.
144 0 : async fn stop_and_drain(self) {
145 0 : // Counterintuitive: close the guard before cancelling.
146 0 : // Something needs to poll the already created download futures to completion.
147 0 : // If we cancel first, then the underlying task exits and we lost
148 0 : // the poller.
149 0 : self.downloads_guard.close().await;
150 0 : self.cancel.cancel();
151 0 : if let Err(err) = self.handle.await {
152 0 : tracing::warn!("Failed to join heatmap layer downloader task: {err}");
153 0 : }
154 0 : }
155 : }
156 :
157 : impl Timeline {
158 0 : pub(crate) fn start_heatmap_layers_download(
159 0 : self: &Arc<Self>,
160 0 : concurrency: usize,
161 0 : recurse: bool,
162 0 : ctx: &RequestContext,
163 0 : ) -> Result<(), ApiError> {
164 0 : let mut locked = self.heatmap_layers_downloader.lock().unwrap();
165 0 : if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
166 0 : let dl = HeatmapLayersDownloader::new(
167 0 : self.clone(),
168 0 : concurrency,
169 0 : recurse,
170 0 : ctx.attached_child(),
171 0 : )?;
172 0 : *locked = Some(dl);
173 0 : Ok(())
174 : } else {
175 0 : Err(ApiError::Conflict("Already running".to_string()))
176 : }
177 0 : }
178 :
179 20 : pub(crate) async fn stop_and_drain_heatmap_layers_download(&self) {
180 20 : // This can race with the start of a new downloader and lead to a situation
181 20 : // where one donloader is shutting down and another one is in-flight.
182 20 : // The only impact is that we'd end up using more remote storage semaphore
183 20 : // units than expected.
184 20 : let downloader = self.heatmap_layers_downloader.lock().unwrap().take();
185 20 : if let Some(dl) = downloader {
186 0 : dl.stop_and_drain().await;
187 20 : }
188 20 : }
189 : }
|