Line data Source code
1 : mod downloader;
2 : pub mod heatmap;
3 : mod heatmap_uploader;
4 : mod scheduler;
5 :
6 : use std::sync::Arc;
7 : use std::time::SystemTime;
8 :
9 : use metrics::UIntGauge;
10 : use pageserver_api::models;
11 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
12 : use remote_storage::GenericRemoteStorage;
13 : use tokio::task::JoinHandle;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::instrument;
16 : use utils::completion::Barrier;
17 : use utils::id::TimelineId;
18 : use utils::sync::gate::Gate;
19 :
20 : use self::downloader::{SecondaryDetail, downloader_task};
21 : use self::heatmap_uploader::heatmap_uploader_task;
22 : use super::GetTenantError;
23 : use super::config::SecondaryLocationConfig;
24 : use super::mgr::TenantManager;
25 : use super::span::debug_assert_current_span_has_tenant_id;
26 : use super::storage_layer::LayerName;
27 : use crate::context::RequestContext;
28 : use crate::disk_usage_eviction_task::DiskUsageEvictionInfo;
29 : use crate::metrics::{SECONDARY_HEATMAP_TOTAL_SIZE, SECONDARY_RESIDENT_PHYSICAL_SIZE};
30 : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
31 :
32 : enum DownloadCommand {
33 : Download(TenantShardId),
34 : }
35 : enum UploadCommand {
36 : Upload(TenantShardId),
37 : }
38 :
39 : impl UploadCommand {
40 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
41 0 : match self {
42 0 : Self::Upload(id) => id,
43 0 : }
44 0 : }
45 : }
46 :
47 : impl DownloadCommand {
48 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
49 0 : match self {
50 0 : Self::Download(id) => id,
51 0 : }
52 0 : }
53 : }
54 :
55 : struct CommandRequest<T> {
56 : payload: T,
57 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
58 : }
59 :
60 : struct CommandResponse {
61 : result: Result<(), SecondaryTenantError>,
62 : }
63 :
64 : #[derive(thiserror::Error, Debug)]
65 : pub(crate) enum SecondaryTenantError {
66 : #[error("{0}")]
67 : GetTenant(GetTenantError),
68 : #[error("shutting down")]
69 : ShuttingDown,
70 : }
71 :
72 : impl From<GetTenantError> for SecondaryTenantError {
73 0 : fn from(gte: GetTenantError) -> Self {
74 0 : Self::GetTenant(gte)
75 0 : }
76 : }
77 :
78 : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
79 : // we do for secondary tenant locations: where we are not serving clients or
80 : // ingesting WAL, but we are maintaining a warm cache of layer files.
81 : //
82 : // This type is all about the _download_ path for secondary mode. The upload path
83 : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
84 : //
85 : // This structure coordinates TenantManager and SecondaryDownloader,
86 : // so that the downloader can indicate which tenants it is currently
87 : // operating on, and the manager can indicate when a particular
88 : // secondary tenant should cancel any work in flight.
89 : #[derive(Debug)]
90 : pub(crate) struct SecondaryTenant {
91 : /// Carrying a tenant shard ID simplifies callers such as the downloader
92 : /// which need to organize many of these objects by ID.
93 : tenant_shard_id: TenantShardId,
94 :
95 : /// Cancellation token indicates to SecondaryDownloader that it should stop doing
96 : /// any work for this tenant at the next opportunity.
97 : pub(crate) cancel: CancellationToken,
98 :
99 : pub(crate) gate: Gate,
100 :
101 : // Secondary mode does not need the full shard identity or the pageserver_api::models::TenantConfig. However,
102 : // storing these enables us to report our full LocationConf, enabling convenient reconciliation
103 : // by the control plane (see [`Self::get_location_conf`])
104 : shard_identity: ShardIdentity,
105 : tenant_conf: std::sync::Mutex<pageserver_api::models::TenantConfig>,
106 :
107 : // Internal state used by the Downloader.
108 : detail: std::sync::Mutex<SecondaryDetail>,
109 :
110 : // Public state indicating overall progress of downloads relative to the last heatmap seen
111 : pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
112 :
113 : // Sum of layer sizes on local disk
114 : pub(super) resident_size_metric: UIntGauge,
115 :
116 : // Sum of layer sizes in the most recently downloaded heatmap
117 : pub(super) heatmap_total_size_metric: UIntGauge,
118 : }
119 :
120 : impl SecondaryTenant {
121 0 : pub(crate) fn new(
122 0 : tenant_shard_id: TenantShardId,
123 0 : shard_identity: ShardIdentity,
124 0 : tenant_conf: pageserver_api::models::TenantConfig,
125 0 : config: &SecondaryLocationConfig,
126 0 : ) -> Arc<Self> {
127 0 : let tenant_id = tenant_shard_id.tenant_id.to_string();
128 0 : let shard_id = format!("{}", tenant_shard_id.shard_slug());
129 0 : let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE
130 0 : .get_metric_with_label_values(&[&tenant_id, &shard_id])
131 0 : .unwrap();
132 0 :
133 0 : let heatmap_total_size_metric = SECONDARY_HEATMAP_TOTAL_SIZE
134 0 : .get_metric_with_label_values(&[&tenant_id, &shard_id])
135 0 : .unwrap();
136 0 :
137 0 : Arc::new(Self {
138 0 : tenant_shard_id,
139 0 : // todo: shall we make this a descendent of the
140 0 : // main cancellation token, or is it sufficient that
141 0 : // on shutdown we walk the tenants and fire their
142 0 : // individual cancellations?
143 0 : cancel: CancellationToken::new(),
144 0 : gate: Gate::default(),
145 0 :
146 0 : shard_identity,
147 0 : tenant_conf: std::sync::Mutex::new(tenant_conf),
148 0 :
149 0 : detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
150 0 :
151 0 : progress: std::sync::Mutex::default(),
152 0 :
153 0 : resident_size_metric,
154 0 : heatmap_total_size_metric,
155 0 : })
156 0 : }
157 :
158 0 : pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
159 0 : self.tenant_shard_id
160 0 : }
161 :
162 0 : pub(crate) async fn shutdown(&self) {
163 0 : self.cancel.cancel();
164 0 :
165 0 : // Wait for any secondary downloader work to complete
166 0 : self.gate.close().await;
167 :
168 0 : self.validate_metrics();
169 0 :
170 0 : // Metrics are subtracted from and/or removed eagerly.
171 0 : // Deletions are done in the background via [`BackgroundPurges::spawn`].
172 0 : let tenant_id = self.tenant_shard_id.tenant_id.to_string();
173 0 : let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
174 0 : let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
175 0 : let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
176 0 :
177 0 : self.detail
178 0 : .lock()
179 0 : .unwrap()
180 0 : .drain_timelines(&self.tenant_shard_id, &self.resident_size_metric);
181 0 : }
182 :
183 0 : pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
184 0 : self.detail.lock().unwrap().config = config.clone();
185 0 : }
186 :
187 0 : pub(crate) fn set_tenant_conf(&self, config: &pageserver_api::models::TenantConfig) {
188 0 : *(self.tenant_conf.lock().unwrap()) = config.clone();
189 0 : }
190 :
191 : /// For API access: generate a LocationConfig equivalent to the one that would be used to
192 : /// create a Tenant in the same state. Do not use this in hot paths: it's for relatively
193 : /// rare external API calls, like a reconciliation at startup.
194 0 : pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
195 0 : let conf = self.detail.lock().unwrap().config.clone();
196 0 :
197 0 : let conf = models::LocationConfigSecondary { warm: conf.warm };
198 0 :
199 0 : let tenant_conf = self.tenant_conf.lock().unwrap().clone();
200 0 : models::LocationConfig {
201 0 : mode: models::LocationConfigMode::Secondary,
202 0 : generation: None,
203 0 : secondary_conf: Some(conf),
204 0 : shard_number: self.tenant_shard_id.shard_number.0,
205 0 : shard_count: self.tenant_shard_id.shard_count.literal(),
206 0 : shard_stripe_size: self.shard_identity.stripe_size.0,
207 0 : tenant_conf,
208 0 : }
209 0 : }
210 :
211 0 : pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
212 0 : &self.tenant_shard_id
213 0 : }
214 :
215 0 : pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> (DiskUsageEvictionInfo, usize) {
216 0 : self.detail.lock().unwrap().get_layers_for_eviction(self)
217 0 : }
218 :
219 : /// Cancellation safe, but on cancellation the eviction will go through
220 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
221 : pub(crate) async fn evict_layer(self: &Arc<Self>, timeline_id: TimelineId, name: LayerName) {
222 : debug_assert_current_span_has_tenant_id();
223 :
224 : let guard = match self.gate.enter() {
225 : Ok(g) => g,
226 : Err(_) => {
227 : tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
228 : return;
229 : }
230 : };
231 :
232 : let now = SystemTime::now();
233 : tracing::info!("Evicting secondary layer");
234 :
235 : let this = self.clone();
236 :
237 : // spawn it to be cancellation safe
238 0 : tokio::task::spawn_blocking(move || {
239 0 : let _guard = guard;
240 0 :
241 0 : // Update the timeline's state. This does not have to be synchronized with
242 0 : // the download process, because:
243 0 : // - If downloader is racing with us to remove a file (e.g. because it is
244 0 : // removed from heatmap), then our mutual .remove() operations will both
245 0 : // succeed.
246 0 : // - If downloader is racing with us to download the object (this would require
247 0 : // multiple eviction iterations to race with multiple download iterations), then
248 0 : // if we remove it from the state, the worst that happens is the downloader
249 0 : // downloads it again before re-inserting, or we delete the file but it remains
250 0 : // in the state map (in which case it will be downloaded if this secondary
251 0 : // tenant transitions to attached and tries to access it)
252 0 : //
253 0 : // The important assumption here is that the secondary timeline state does not
254 0 : // have to 100% match what is on disk, because it's a best-effort warming
255 0 : // of the cache.
256 0 : let mut detail = this.detail.lock().unwrap();
257 0 : if let Some(removed) =
258 0 : detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric)
259 0 : {
260 0 : // We might race with removal of the same layer during downloads, so finding the layer we
261 0 : // were trying to remove is optional. Only issue the disk I/O to remove it if we found it.
262 0 : removed.remove_blocking();
263 0 : }
264 0 : })
265 : .await
266 : .expect("secondary eviction should not have panicked");
267 : }
268 :
269 : /// Exhaustive check that incrementally updated metrics match the actual state.
270 : #[cfg(feature = "testing")]
271 0 : fn validate_metrics(&self) {
272 0 : let detail = self.detail.lock().unwrap();
273 0 : let resident_size = detail.total_resident_size();
274 0 :
275 0 : assert_eq!(resident_size, self.resident_size_metric.get());
276 0 : }
277 :
278 : #[cfg(not(feature = "testing"))]
279 : fn validate_metrics(&self) {
280 : // No-op in non-testing builds
281 : }
282 : }
283 :
284 : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
285 : /// and heatmap uploads. This is not a hot data path: it's used for:
286 : /// - Live migrations, where we want to ensure a migration destination has the freshest possible
287 : /// content before trying to cut over.
288 : /// - Tests, where we want to immediately upload/download for a particular tenant.
289 : ///
290 : /// In normal operations, outside of migrations, uploads & downloads are autonomous and not driven by this interface.
291 : pub struct SecondaryController {
292 : upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
293 : download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
294 : }
295 :
296 : impl SecondaryController {
297 0 : async fn dispatch<T>(
298 0 : &self,
299 0 : queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
300 0 : payload: T,
301 0 : ) -> Result<(), SecondaryTenantError> {
302 0 : let (response_tx, response_rx) = tokio::sync::oneshot::channel();
303 0 :
304 0 : queue
305 0 : .send(CommandRequest {
306 0 : payload,
307 0 : response_tx,
308 0 : })
309 0 : .await
310 0 : .map_err(|_| SecondaryTenantError::ShuttingDown)?;
311 :
312 0 : let response = response_rx
313 0 : .await
314 0 : .map_err(|_| SecondaryTenantError::ShuttingDown)?;
315 :
316 0 : response.result
317 0 : }
318 :
319 0 : pub(crate) async fn upload_tenant(
320 0 : &self,
321 0 : tenant_shard_id: TenantShardId,
322 0 : ) -> Result<(), SecondaryTenantError> {
323 0 : self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
324 0 : .await
325 0 : }
326 0 : pub(crate) async fn download_tenant(
327 0 : &self,
328 0 : tenant_shard_id: TenantShardId,
329 0 : ) -> Result<(), SecondaryTenantError> {
330 0 : self.dispatch(
331 0 : &self.download_req_tx,
332 0 : DownloadCommand::Download(tenant_shard_id),
333 0 : )
334 0 : .await
335 0 : }
336 : }
337 :
338 : pub struct GlobalTasks {
339 : cancel: CancellationToken,
340 : uploader: JoinHandle<()>,
341 : downloader: JoinHandle<()>,
342 : }
343 :
344 : impl GlobalTasks {
345 : /// Caller is responsible for requesting shutdown via the cancellation token that was
346 : /// passed to [`spawn_tasks`].
347 : ///
348 : /// # Panics
349 : ///
350 : /// This method panics if that token is not cancelled.
351 : /// This is low-risk because we're calling this during process shutdown, so, a panic
352 : /// will be informative but not cause undue downtime.
353 0 : pub async fn wait(self) {
354 0 : let Self {
355 0 : cancel,
356 0 : uploader,
357 0 : downloader,
358 0 : } = self;
359 0 : assert!(
360 0 : cancel.is_cancelled(),
361 0 : "must cancel cancellation token, otherwise the tasks will not shut down"
362 : );
363 :
364 0 : let (uploader, downloader) = futures::future::join(uploader, downloader).await;
365 0 : uploader.expect(
366 0 : "unreachable: exit_on_panic_or_error would catch the panic and exit the process",
367 0 : );
368 0 : downloader.expect(
369 0 : "unreachable: exit_on_panic_or_error would catch the panic and exit the process",
370 0 : );
371 0 : }
372 : }
373 :
374 0 : pub fn spawn_tasks(
375 0 : tenant_manager: Arc<TenantManager>,
376 0 : remote_storage: GenericRemoteStorage,
377 0 : background_jobs_can_start: Barrier,
378 0 : cancel: CancellationToken,
379 0 : ) -> (SecondaryController, GlobalTasks) {
380 0 : let mgr_clone = tenant_manager.clone();
381 0 : let storage_clone = remote_storage.clone();
382 0 : let bg_jobs_clone = background_jobs_can_start.clone();
383 0 :
384 0 : let (download_req_tx, download_req_rx) =
385 0 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
386 0 : let (upload_req_tx, upload_req_rx) =
387 0 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
388 0 :
389 0 : let cancel_clone = cancel.clone();
390 0 : let downloader = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
391 0 : "secondary tenant downloads",
392 0 : async move {
393 0 : downloader_task(
394 0 : mgr_clone,
395 0 : storage_clone,
396 0 : download_req_rx,
397 0 : bg_jobs_clone,
398 0 : cancel_clone,
399 0 : RequestContext::new(
400 0 : TaskKind::SecondaryDownloads,
401 0 : crate::context::DownloadBehavior::Download,
402 0 : ),
403 0 : )
404 0 : .await;
405 0 : anyhow::Ok(())
406 0 : },
407 0 : ));
408 0 :
409 0 : let cancel_clone = cancel.clone();
410 0 : let uploader = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
411 0 : "heatmap uploads",
412 0 : async move {
413 0 : heatmap_uploader_task(
414 0 : tenant_manager,
415 0 : remote_storage,
416 0 : upload_req_rx,
417 0 : background_jobs_can_start,
418 0 : cancel_clone,
419 0 : )
420 0 : .await;
421 0 : anyhow::Ok(())
422 0 : },
423 0 : ));
424 0 :
425 0 : (
426 0 : SecondaryController {
427 0 : upload_req_tx,
428 0 : download_req_tx,
429 0 : },
430 0 : GlobalTasks {
431 0 : cancel,
432 0 : uploader,
433 0 : downloader,
434 0 : },
435 0 : )
436 0 : }
|