Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : context::RequestContext,
12 : disk_usage_eviction_task::{
13 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
14 : },
15 : metrics::SECONDARY_MODE,
16 : tenant::{
17 : config::SecondaryLocationConfig,
18 : debug_assert_current_span_has_tenant_and_timeline_id,
19 : ephemeral_file::is_ephemeral_file,
20 : remote_timeline_client::{
21 : index::LayerFileMetadata, is_temp_download_file, FAILED_DOWNLOAD_WARN_THRESHOLD,
22 : FAILED_REMOTE_OP_RETRIES,
23 : },
24 : span::debug_assert_current_span_has_tenant_id,
25 : storage_layer::{layer::local_layer_path, LayerName},
26 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
27 : },
28 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
29 : TEMP_FILE_SUFFIX,
30 : };
31 :
32 : use super::{
33 : heatmap::HeatMapLayer,
34 : scheduler::{
35 : self, period_jitter, period_warmup, Completion, JobGenerator, SchedulingResult,
36 : TenantBackgroundJobs,
37 : },
38 : SecondaryTenant,
39 : };
40 :
41 : use crate::tenant::{
42 : mgr::TenantManager,
43 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
44 : };
45 :
46 : use camino::Utf8PathBuf;
47 : use chrono::format::{DelayedFormat, StrftimeItems};
48 : use futures::{Future, StreamExt};
49 : use pageserver_api::models::SecondaryProgress;
50 : use pageserver_api::shard::TenantShardId;
51 : use remote_storage::{DownloadError, Etag, GenericRemoteStorage, RemoteStorageActivity};
52 :
53 : use tokio_util::sync::CancellationToken;
54 : use tracing::{info_span, instrument, warn, Instrument};
55 : use utils::{
56 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext,
57 : id::TimelineId, serde_system_time,
58 : };
59 :
60 : use super::{
61 : heatmap::{HeatMapTenant, HeatMapTimeline},
62 : CommandRequest, DownloadCommand,
63 : };
64 :
65 : /// For each tenant, default period for how long must have passed since the last download_tenant call before
66 : /// calling it again. This default is replaced with the value of [`HeatMapTenant::upload_period_ms`] after first
67 : /// download, if the uploader populated it.
68 : const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
69 :
70 : /// Range of concurrency we may use when downloading layers within a timeline. This is independent
71 : /// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in
72 : /// `PageServerConf::secondary_download_concurrency`
73 : const MAX_LAYER_CONCURRENCY: usize = 16;
74 : const MIN_LAYER_CONCURRENCY: usize = 1;
75 :
76 0 : pub(super) async fn downloader_task(
77 0 : tenant_manager: Arc<TenantManager>,
78 0 : remote_storage: GenericRemoteStorage,
79 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
80 0 : background_jobs_can_start: Barrier,
81 0 : cancel: CancellationToken,
82 0 : root_ctx: RequestContext,
83 0 : ) {
84 0 : // How many tenants' secondary download operations we will run concurrently
85 0 : let tenant_concurrency = tenant_manager.get_conf().secondary_download_concurrency;
86 0 :
87 0 : let generator = SecondaryDownloader {
88 0 : tenant_manager,
89 0 : remote_storage,
90 0 : root_ctx,
91 0 : };
92 0 : let mut scheduler = Scheduler::new(generator, tenant_concurrency);
93 0 :
94 0 : scheduler
95 0 : .run(command_queue, background_jobs_can_start, cancel)
96 0 : .instrument(info_span!("secondary_downloads"))
97 0 : .await
98 0 : }
99 :
100 : struct SecondaryDownloader {
101 : tenant_manager: Arc<TenantManager>,
102 : remote_storage: GenericRemoteStorage,
103 : root_ctx: RequestContext,
104 : }
105 :
106 : #[derive(Debug, Clone)]
107 : pub(super) struct OnDiskState {
108 : metadata: LayerFileMetadata,
109 : access_time: SystemTime,
110 : local_path: Utf8PathBuf,
111 : }
112 :
113 : impl OnDiskState {
114 0 : fn new(
115 0 : _conf: &'static PageServerConf,
116 0 : _tenant_shard_id: &TenantShardId,
117 0 : _imeline_id: &TimelineId,
118 0 : _ame: LayerName,
119 0 : metadata: LayerFileMetadata,
120 0 : access_time: SystemTime,
121 0 : local_path: Utf8PathBuf,
122 0 : ) -> Self {
123 0 : Self {
124 0 : metadata,
125 0 : access_time,
126 0 : local_path,
127 0 : }
128 0 : }
129 :
130 : // This is infallible, because all errors are either acceptable (ENOENT), or totally
131 : // unexpected (fatal).
132 0 : pub(super) fn remove_blocking(&self) {
133 0 : // We tolerate ENOENT, because between planning eviction and executing
134 0 : // it, the secondary downloader could have seen an updated heatmap that
135 0 : // resulted in a layer being deleted.
136 0 : // Other local I/O errors are process-fatal: these should never happen.
137 0 : std::fs::remove_file(&self.local_path)
138 0 : .or_else(fs_ext::ignore_not_found)
139 0 : .fatal_err("Deleting secondary layer")
140 0 : }
141 : }
142 :
143 : #[derive(Debug, Clone, Default)]
144 : pub(super) struct SecondaryDetailTimeline {
145 : pub(super) on_disk_layers: HashMap<LayerName, OnDiskState>,
146 :
147 : /// We remember when layers were evicted, to prevent re-downloading them.
148 : pub(super) evicted_at: HashMap<LayerName, SystemTime>,
149 : }
150 :
151 : // Aspects of a heatmap that we remember after downloading it
152 : #[derive(Clone, Debug)]
153 : struct DownloadSummary {
154 : etag: Etag,
155 : #[allow(unused)]
156 : mtime: SystemTime,
157 : upload_period: Duration,
158 : }
159 :
160 : /// This state is written by the secondary downloader, it is opaque
161 : /// to TenantManager
162 : #[derive(Debug)]
163 : pub(super) struct SecondaryDetail {
164 : pub(super) config: SecondaryLocationConfig,
165 :
166 : last_download: Option<DownloadSummary>,
167 : next_download: Option<Instant>,
168 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
169 : }
170 :
171 : /// Helper for logging SystemTime
172 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
173 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
174 0 : datetime.format("%d/%m/%Y %T")
175 0 : }
176 :
177 : /// Information returned from download function when it detects the heatmap has changed
178 : struct HeatMapModified {
179 : etag: Etag,
180 : last_modified: SystemTime,
181 : bytes: Vec<u8>,
182 : }
183 :
184 : enum HeatMapDownload {
185 : // The heatmap's etag has changed: return the new etag, mtime and the body bytes
186 : Modified(HeatMapModified),
187 : // The heatmap's etag is unchanged
188 : Unmodified,
189 : }
190 :
191 : impl SecondaryDetail {
192 0 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
193 0 : Self {
194 0 : config,
195 0 : last_download: None,
196 0 : next_download: None,
197 0 : timelines: HashMap::new(),
198 0 : }
199 0 : }
200 :
201 : /// Additionally returns the total number of layers, used for more stable relative access time
202 : /// based eviction.
203 0 : pub(super) fn get_layers_for_eviction(
204 0 : &self,
205 0 : parent: &Arc<SecondaryTenant>,
206 0 : ) -> (DiskUsageEvictionInfo, usize) {
207 0 : let mut result = DiskUsageEvictionInfo::default();
208 0 : let mut total_layers = 0;
209 :
210 0 : for (timeline_id, timeline_detail) in &self.timelines {
211 0 : result
212 0 : .resident_layers
213 0 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
214 0 : EvictionCandidate {
215 0 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
216 0 : secondary_tenant: parent.clone(),
217 0 : timeline_id: *timeline_id,
218 0 : name: name.clone(),
219 0 : metadata: ods.metadata.clone(),
220 0 : }),
221 0 : last_activity_ts: ods.access_time,
222 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
223 0 : }
224 0 : }));
225 0 :
226 0 : // total might be missing currently downloading layers, but as a lower than actual
227 0 : // value it is good enough approximation.
228 0 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
229 0 : }
230 0 : result.max_layer_size = result
231 0 : .resident_layers
232 0 : .iter()
233 0 : .map(|l| l.layer.get_file_size())
234 0 : .max();
235 0 :
236 0 : tracing::debug!(
237 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
238 0 : parent.get_tenant_shard_id(),
239 0 : self.timelines.len(),
240 0 : result.resident_layers.len()
241 : );
242 :
243 0 : (result, total_layers)
244 0 : }
245 : }
246 :
247 : struct PendingDownload {
248 : secondary_state: Arc<SecondaryTenant>,
249 : last_download: Option<DownloadSummary>,
250 : target_time: Option<Instant>,
251 : }
252 :
253 : impl scheduler::PendingJob for PendingDownload {
254 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
255 0 : self.secondary_state.get_tenant_shard_id()
256 0 : }
257 : }
258 :
259 : struct RunningDownload {
260 : barrier: Barrier,
261 : }
262 :
263 : impl scheduler::RunningJob for RunningDownload {
264 0 : fn get_barrier(&self) -> Barrier {
265 0 : self.barrier.clone()
266 0 : }
267 : }
268 :
269 : struct CompleteDownload {
270 : secondary_state: Arc<SecondaryTenant>,
271 : completed_at: Instant,
272 : }
273 :
274 : impl scheduler::Completion for CompleteDownload {
275 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
276 0 : self.secondary_state.get_tenant_shard_id()
277 0 : }
278 : }
279 :
280 : type Scheduler = TenantBackgroundJobs<
281 : SecondaryDownloader,
282 : PendingDownload,
283 : RunningDownload,
284 : CompleteDownload,
285 : DownloadCommand,
286 : >;
287 :
288 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
289 : for SecondaryDownloader
290 : {
291 0 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
292 : fn on_completion(&mut self, completion: CompleteDownload) {
293 : let CompleteDownload {
294 : secondary_state,
295 : completed_at: _completed_at,
296 : } = completion;
297 :
298 : tracing::debug!("Secondary tenant download completed");
299 :
300 : let mut detail = secondary_state.detail.lock().unwrap();
301 :
302 : let period = detail
303 : .last_download
304 : .as_ref()
305 0 : .map(|d| d.upload_period)
306 : .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
307 :
308 : // We advance next_download irrespective of errors: we don't want error cases to result in
309 : // expensive busy-polling.
310 : detail.next_download = Some(Instant::now() + period_jitter(period, 5));
311 : }
312 :
313 0 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
314 0 : let mut result = SchedulingResult {
315 0 : jobs: Vec::new(),
316 0 : want_interval: None,
317 0 : };
318 0 :
319 0 : // Step 1: identify some tenants that we may work on
320 0 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
321 0 : self.tenant_manager
322 0 : .foreach_secondary_tenants(|_id, secondary_state| {
323 0 : tenants.push(secondary_state.clone());
324 0 : });
325 0 :
326 0 : // Step 2: filter out tenants which are not yet elegible to run
327 0 : let now = Instant::now();
328 0 : result.jobs = tenants
329 0 : .into_iter()
330 0 : .filter_map(|secondary_tenant| {
331 0 : let (last_download, next_download) = {
332 0 : let mut detail = secondary_tenant.detail.lock().unwrap();
333 0 :
334 0 : if !detail.config.warm {
335 : // Downloads are disabled for this tenant
336 0 : detail.next_download = None;
337 0 : return None;
338 0 : }
339 0 :
340 0 : if detail.next_download.is_none() {
341 0 : // Initialize randomly in the range from 0 to our interval: this uniformly spreads the start times. Subsequent
342 0 : // rounds will use a smaller jitter to avoid accidentally synchronizing later.
343 0 : detail.next_download = Some(now.checked_add(period_warmup(DEFAULT_DOWNLOAD_INTERVAL)).expect(
344 0 : "Using our constant, which is known to be small compared with clock range",
345 0 : ));
346 0 : }
347 0 : (detail.last_download.clone(), detail.next_download.unwrap())
348 0 : };
349 0 :
350 0 : if now > next_download {
351 0 : Some(PendingDownload {
352 0 : secondary_state: secondary_tenant,
353 0 : last_download,
354 0 : target_time: Some(next_download),
355 0 : })
356 : } else {
357 0 : None
358 : }
359 0 : })
360 0 : .collect();
361 0 :
362 0 : // Step 3: sort by target execution time to run most urgent first.
363 0 : result.jobs.sort_by_key(|j| j.target_time);
364 0 :
365 0 : result
366 0 : }
367 :
368 0 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
369 0 : let tenant_shard_id = command.get_tenant_shard_id();
370 0 :
371 0 : let tenant = self
372 0 : .tenant_manager
373 0 : .get_secondary_tenant_shard(*tenant_shard_id);
374 0 : let Some(tenant) = tenant else {
375 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
376 : };
377 :
378 0 : Ok(PendingDownload {
379 0 : target_time: None,
380 0 : last_download: None,
381 0 : secondary_state: tenant,
382 0 : })
383 0 : }
384 :
385 0 : fn spawn(
386 0 : &mut self,
387 0 : job: PendingDownload,
388 0 : ) -> (
389 0 : RunningDownload,
390 0 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
391 0 : ) {
392 0 : let PendingDownload {
393 0 : secondary_state,
394 0 : last_download,
395 0 : target_time,
396 0 : } = job;
397 0 :
398 0 : let (completion, barrier) = utils::completion::channel();
399 0 : let remote_storage = self.remote_storage.clone();
400 0 : let conf = self.tenant_manager.get_conf();
401 0 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
402 0 : let download_ctx = self.root_ctx.attached_child();
403 0 : (RunningDownload { barrier }, Box::pin(async move {
404 0 : let _completion = completion;
405 0 :
406 0 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
407 0 : .download(&download_ctx)
408 0 : .await
409 : {
410 : Err(UpdateError::NoData) => {
411 0 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
412 : },
413 : Err(UpdateError::NoSpace) => {
414 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
415 : }
416 : Err(UpdateError::Cancelled) => {
417 0 : tracing::debug!("Shut down while downloading");
418 : },
419 0 : Err(UpdateError::Deserialize(e)) => {
420 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
421 : },
422 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
423 0 : tracing::error!("Error while downloading tenant: {e}");
424 : },
425 0 : Ok(()) => {}
426 : };
427 :
428 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
429 :
430 : // If the job had a target execution time, we may check our final execution
431 : // time against that for observability purposes.
432 0 : if let (Some(target_time), Some(last_download)) = (target_time, last_download) {
433 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
434 0 : let elapsed = Instant::now().duration_since(target_time);
435 0 :
436 0 : warn_when_period_overrun(
437 0 : elapsed,
438 0 : last_download.upload_period,
439 0 : BackgroundLoopKind::SecondaryDownload,
440 0 : );
441 0 : }
442 :
443 0 : CompleteDownload {
444 0 : secondary_state,
445 0 : completed_at: Instant::now(),
446 0 : }
447 0 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
448 0 : }
449 : }
450 :
451 : /// This type is a convenience to group together the various functions involved in
452 : /// freshening a secondary tenant.
453 : struct TenantDownloader<'a> {
454 : conf: &'static PageServerConf,
455 : remote_storage: &'a GenericRemoteStorage,
456 : secondary_state: &'a SecondaryTenant,
457 : }
458 :
459 : /// Errors that may be encountered while updating a tenant
460 0 : #[derive(thiserror::Error, Debug)]
461 : enum UpdateError {
462 : #[error("No remote data found")]
463 : NoData,
464 : #[error("Insufficient local storage space")]
465 : NoSpace,
466 : #[error("Failed to download")]
467 : DownloadError(DownloadError),
468 : #[error(transparent)]
469 : Deserialize(#[from] serde_json::Error),
470 : #[error("Cancelled")]
471 : Cancelled,
472 : #[error(transparent)]
473 : Other(#[from] anyhow::Error),
474 : }
475 :
476 : impl From<DownloadError> for UpdateError {
477 0 : fn from(value: DownloadError) -> Self {
478 0 : match &value {
479 0 : DownloadError::Cancelled => Self::Cancelled,
480 0 : DownloadError::NotFound => Self::NoData,
481 0 : _ => Self::DownloadError(value),
482 : }
483 0 : }
484 : }
485 :
486 : impl From<std::io::Error> for UpdateError {
487 0 : fn from(value: std::io::Error) -> Self {
488 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
489 0 : UpdateError::NoSpace
490 0 : } else if value
491 0 : .get_ref()
492 0 : .and_then(|x| x.downcast_ref::<DownloadError>())
493 0 : .is_some()
494 : {
495 0 : UpdateError::from(DownloadError::from(value))
496 : } else {
497 : // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
498 0 : UpdateError::Other(anyhow::anyhow!(value))
499 : }
500 0 : }
501 : }
502 :
503 : impl<'a> TenantDownloader<'a> {
504 0 : fn new(
505 0 : conf: &'static PageServerConf,
506 0 : remote_storage: &'a GenericRemoteStorage,
507 0 : secondary_state: &'a SecondaryTenant,
508 0 : ) -> Self {
509 0 : Self {
510 0 : conf,
511 0 : remote_storage,
512 0 : secondary_state,
513 0 : }
514 0 : }
515 :
516 0 : async fn download(&self, ctx: &RequestContext) -> Result<(), UpdateError> {
517 0 : debug_assert_current_span_has_tenant_id();
518 :
519 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
520 : // cover our access to local storage.
521 0 : let Ok(_guard) = self.secondary_state.gate.enter() else {
522 : // Shutting down
523 0 : return Ok(());
524 : };
525 :
526 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
527 0 :
528 0 : // We will use the etag from last successful download to make the download conditional on changes
529 0 : let last_download = self
530 0 : .secondary_state
531 0 : .detail
532 0 : .lock()
533 0 : .unwrap()
534 0 : .last_download
535 0 : .clone();
536 :
537 : // Download the tenant's heatmap
538 : let HeatMapModified {
539 0 : last_modified: heatmap_mtime,
540 0 : etag: heatmap_etag,
541 0 : bytes: heatmap_bytes,
542 0 : } = match tokio::select!(
543 0 : bytes = self.download_heatmap(last_download.as_ref().map(|d| &d.etag)) => {bytes?},
544 0 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
545 0 : ) {
546 : HeatMapDownload::Unmodified => {
547 0 : tracing::info!("Heatmap unchanged since last successful download");
548 0 : return Ok(());
549 : }
550 0 : HeatMapDownload::Modified(m) => m,
551 : };
552 :
553 0 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
554 :
555 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
556 : // layer metadata without having to re-download it.
557 0 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
558 0 :
559 0 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
560 0 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
561 0 : let heatmap_path_bg = heatmap_path.clone();
562 0 : VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
563 0 : .await
564 0 : .maybe_fatal_err(&context_msg)?;
565 :
566 0 : tracing::debug!(
567 0 : "Wrote local heatmap to {}, with {} timelines",
568 0 : heatmap_path,
569 0 : heatmap.timelines.len()
570 : );
571 :
572 : // Get or initialize the local disk state for the timelines we will update
573 0 : let mut timeline_states = HashMap::new();
574 0 : for timeline in &heatmap.timelines {
575 0 : let timeline_state = self
576 0 : .secondary_state
577 0 : .detail
578 0 : .lock()
579 0 : .unwrap()
580 0 : .timelines
581 0 : .get(&timeline.timeline_id)
582 0 : .cloned();
583 :
584 0 : let timeline_state = match timeline_state {
585 0 : Some(t) => t,
586 : None => {
587 : // We have no existing state: need to scan local disk for layers first.
588 0 : let timeline_state =
589 0 : init_timeline_state(self.conf, tenant_shard_id, timeline).await;
590 :
591 : // Re-acquire detail lock now that we're done with async load from local FS
592 0 : self.secondary_state
593 0 : .detail
594 0 : .lock()
595 0 : .unwrap()
596 0 : .timelines
597 0 : .insert(timeline.timeline_id, timeline_state.clone());
598 0 : timeline_state
599 : }
600 : };
601 :
602 0 : timeline_states.insert(timeline.timeline_id, timeline_state);
603 : }
604 :
605 : // Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general
606 : // principle that deletions should be done before writes wherever possible, and so that we can use this
607 : // phase to initialize our SecondaryProgress.
608 0 : {
609 0 : *self.secondary_state.progress.lock().unwrap() =
610 0 : self.prepare_timelines(&heatmap, heatmap_mtime).await?;
611 : }
612 :
613 : // Download the layers in the heatmap
614 0 : for timeline in heatmap.timelines {
615 0 : let timeline_state = timeline_states
616 0 : .remove(&timeline.timeline_id)
617 0 : .expect("Just populated above");
618 0 :
619 0 : if self.secondary_state.cancel.is_cancelled() {
620 0 : tracing::debug!(
621 0 : "Cancelled before downloading timeline {}",
622 : timeline.timeline_id
623 : );
624 0 : return Ok(());
625 0 : }
626 0 :
627 0 : let timeline_id = timeline.timeline_id;
628 0 : self.download_timeline(timeline, timeline_state, ctx)
629 0 : .instrument(tracing::info_span!(
630 : "secondary_download_timeline",
631 : tenant_id=%tenant_shard_id.tenant_id,
632 0 : shard_id=%tenant_shard_id.shard_slug(),
633 : %timeline_id
634 : ))
635 0 : .await?;
636 : }
637 :
638 : // Only update last_etag after a full successful download: this way will not skip
639 : // the next download, even if the heatmap's actual etag is unchanged.
640 0 : self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
641 0 : etag: heatmap_etag,
642 0 : mtime: heatmap_mtime,
643 0 : upload_period: heatmap
644 0 : .upload_period_ms
645 0 : .map(|ms| Duration::from_millis(ms as u64))
646 0 : .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL),
647 0 : });
648 0 :
649 0 : // Robustness: we should have updated progress properly, but in case we didn't, make sure
650 0 : // we don't leave the tenant in a state where we claim to have successfully downloaded
651 0 : // everything, but our progress is incomplete. The invariant here should be that if
652 0 : // we have set `last_download` to this heatmap's etag, then the next time we see that
653 0 : // etag we can safely do no work (i.e. we must be complete).
654 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
655 0 : debug_assert!(progress.layers_downloaded == progress.layers_total);
656 0 : debug_assert!(progress.bytes_downloaded == progress.bytes_total);
657 0 : if progress.layers_downloaded != progress.layers_total
658 0 : || progress.bytes_downloaded != progress.bytes_total
659 : {
660 0 : tracing::warn!("Correcting drift in progress stats ({progress:?})");
661 0 : progress.layers_downloaded = progress.layers_total;
662 0 : progress.bytes_downloaded = progress.bytes_total;
663 0 : }
664 :
665 0 : Ok(())
666 0 : }
667 :
668 : /// Do any fast local cleanup that comes before the much slower process of downloading
669 : /// layers from remote storage. In the process, initialize the SecondaryProgress object
670 : /// that will later be updated incrementally as we download layers.
671 0 : async fn prepare_timelines(
672 0 : &self,
673 0 : heatmap: &HeatMapTenant,
674 0 : heatmap_mtime: SystemTime,
675 0 : ) -> Result<SecondaryProgress, UpdateError> {
676 0 : let heatmap_stats = heatmap.get_stats();
677 0 : // We will construct a progress object, and then populate its initial "downloaded" numbers
678 0 : // while iterating through local layer state in [`Self::prepare_timelines`]
679 0 : let mut progress = SecondaryProgress {
680 0 : layers_total: heatmap_stats.layers,
681 0 : bytes_total: heatmap_stats.bytes,
682 0 : heatmap_mtime: Some(serde_system_time::SystemTime(heatmap_mtime)),
683 0 : layers_downloaded: 0,
684 0 : bytes_downloaded: 0,
685 0 : };
686 0 : // Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
687 0 : let mut delete_layers = Vec::new();
688 0 : let mut delete_timelines = Vec::new();
689 0 : {
690 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
691 0 : for (timeline_id, timeline_state) in &mut detail.timelines {
692 0 : let Some(heatmap_timeline_index) = heatmap
693 0 : .timelines
694 0 : .iter()
695 0 : .position(|t| t.timeline_id == *timeline_id)
696 : else {
697 : // This timeline is no longer referenced in the heatmap: delete it locally
698 0 : delete_timelines.push(*timeline_id);
699 0 : continue;
700 : };
701 :
702 0 : let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
703 0 :
704 0 : let layers_in_heatmap = heatmap_timeline
705 0 : .layers
706 0 : .iter()
707 0 : .map(|l| (&l.name, l.metadata.generation))
708 0 : .collect::<HashSet<_>>();
709 0 : let layers_on_disk = timeline_state
710 0 : .on_disk_layers
711 0 : .iter()
712 0 : .map(|l| (l.0, l.1.metadata.generation))
713 0 : .collect::<HashSet<_>>();
714 0 :
715 0 : let mut layer_count = layers_on_disk.len();
716 0 : let mut layer_byte_count: u64 = timeline_state
717 0 : .on_disk_layers
718 0 : .values()
719 0 : .map(|l| l.metadata.file_size())
720 0 : .sum();
721 :
722 : // Remove on-disk layers that are no longer present in heatmap
723 0 : for (layer_file_name, generation) in layers_on_disk.difference(&layers_in_heatmap) {
724 0 : layer_count -= 1;
725 0 : layer_byte_count -= timeline_state
726 0 : .on_disk_layers
727 0 : .get(layer_file_name)
728 0 : .unwrap()
729 0 : .metadata
730 0 : .file_size();
731 0 :
732 0 : let local_path = local_layer_path(
733 0 : self.conf,
734 0 : self.secondary_state.get_tenant_shard_id(),
735 0 : timeline_id,
736 0 : layer_file_name,
737 0 : generation,
738 0 : );
739 0 :
740 0 : delete_layers.push((*timeline_id, (*layer_file_name).clone(), local_path));
741 0 : }
742 :
743 0 : progress.bytes_downloaded += layer_byte_count;
744 0 : progress.layers_downloaded += layer_count;
745 : }
746 :
747 0 : for delete_timeline in &delete_timelines {
748 0 : // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
749 0 : // from disk fails that will be a fatal error.
750 0 : detail.timelines.remove(delete_timeline);
751 0 : }
752 : }
753 :
754 : // Execute accumulated deletions
755 0 : for (timeline_id, layer_name, local_path) in delete_layers {
756 0 : tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
757 :
758 0 : tokio::fs::remove_file(&local_path)
759 0 : .await
760 0 : .or_else(fs_ext::ignore_not_found)
761 0 : .maybe_fatal_err("Removing secondary layer")?;
762 :
763 : // Update in-memory housekeeping to reflect the absence of the deleted layer
764 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
765 0 : let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
766 0 : continue;
767 : };
768 0 : timeline_state.on_disk_layers.remove(&layer_name);
769 : }
770 :
771 0 : for timeline_id in delete_timelines {
772 0 : let timeline_path = self
773 0 : .conf
774 0 : .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
775 0 : tracing::info!(timeline_id=%timeline_id,
776 0 : "Timeline no longer in heatmap, removing from secondary location"
777 : );
778 0 : tokio::fs::remove_dir_all(&timeline_path)
779 0 : .await
780 0 : .or_else(fs_ext::ignore_not_found)
781 0 : .maybe_fatal_err("Removing secondary timeline")?;
782 : }
783 :
784 0 : Ok(progress)
785 0 : }
786 :
787 : /// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object
788 : /// still matches `prev_etag`.
789 0 : async fn download_heatmap(
790 0 : &self,
791 0 : prev_etag: Option<&Etag>,
792 0 : ) -> Result<HeatMapDownload, UpdateError> {
793 0 : debug_assert_current_span_has_tenant_id();
794 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
795 0 : // TODO: pull up etag check into the request, to do a conditional GET rather than
796 0 : // issuing a GET and then maybe ignoring the response body
797 0 : // (https://github.com/neondatabase/neon/issues/6199)
798 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
799 :
800 0 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
801 0 : let cancel = &self.secondary_state.cancel;
802 0 :
803 0 : backoff::retry(
804 0 : || async {
805 0 : let download = self
806 0 : .remote_storage
807 0 : .download(&heatmap_path, cancel)
808 0 : .await
809 0 : .map_err(UpdateError::from)?;
810 0 :
811 0 : SECONDARY_MODE.download_heatmap.inc();
812 0 :
813 0 : if Some(&download.etag) == prev_etag {
814 0 : Ok(HeatMapDownload::Unmodified)
815 0 : } else {
816 0 : let mut heatmap_bytes = Vec::new();
817 0 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
818 0 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
819 0 : Ok(HeatMapDownload::Modified(HeatMapModified {
820 0 : etag: download.etag,
821 0 : last_modified: download.last_modified,
822 0 : bytes: heatmap_bytes,
823 0 : }))
824 0 : }
825 0 : },
826 0 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
827 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
828 0 : FAILED_REMOTE_OP_RETRIES,
829 0 : "download heatmap",
830 0 : cancel,
831 0 : )
832 0 : .await
833 0 : .ok_or_else(|| UpdateError::Cancelled)
834 0 : .and_then(|x| x)
835 0 : }
836 :
837 0 : async fn download_timeline(
838 0 : &self,
839 0 : timeline: HeatMapTimeline,
840 0 : timeline_state: SecondaryDetailTimeline,
841 0 : ctx: &RequestContext,
842 0 : ) -> Result<(), UpdateError> {
843 0 : debug_assert_current_span_has_tenant_and_timeline_id();
844 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
845 0 :
846 0 : // Accumulate updates to the state
847 0 : let mut touched = Vec::new();
848 0 :
849 0 : tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
850 :
851 0 : let mut download_futs = Vec::new();
852 :
853 : // Download heatmap layers that are not present on local disk, or update their
854 : // access time if they are already present.
855 0 : for layer in timeline.layers {
856 0 : if self.secondary_state.cancel.is_cancelled() {
857 0 : tracing::debug!("Cancelled -- dropping out of layer loop");
858 0 : return Ok(());
859 0 : }
860 :
861 : // Existing on-disk layers: just update their access time.
862 0 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
863 0 : tracing::debug!("Layer {} is already on disk", layer.name);
864 :
865 0 : if cfg!(debug_assertions) {
866 : // Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
867 : // are already present on disk are really there.
868 0 : match tokio::fs::metadata(&on_disk.local_path).await {
869 0 : Ok(meta) => {
870 0 : tracing::debug!(
871 0 : "Layer {} present at {}, size {}",
872 0 : layer.name,
873 0 : on_disk.local_path,
874 0 : meta.len(),
875 : );
876 : }
877 0 : Err(e) => {
878 0 : tracing::warn!(
879 0 : "Layer {} not found at {} ({})",
880 : layer.name,
881 : on_disk.local_path,
882 : e
883 : );
884 0 : debug_assert!(false);
885 : }
886 : }
887 0 : }
888 :
889 0 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
890 0 : || on_disk.access_time != layer.access_time
891 : {
892 : // We already have this layer on disk. Update its access time.
893 0 : tracing::debug!(
894 0 : "Access time updated for layer {}: {} -> {}",
895 0 : layer.name,
896 0 : strftime(&on_disk.access_time),
897 0 : strftime(&layer.access_time)
898 : );
899 0 : touched.push(layer);
900 0 : }
901 0 : continue;
902 : } else {
903 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
904 : }
905 :
906 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
907 : // recently than it was evicted.
908 0 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
909 0 : if &layer.access_time > evicted_at {
910 0 : tracing::info!(
911 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
912 0 : layer.name,
913 0 : strftime(&layer.access_time),
914 0 : strftime(evicted_at)
915 : );
916 : } else {
917 0 : tracing::trace!(
918 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
919 0 : layer.name,
920 0 : strftime(&layer.access_time),
921 0 : strftime(evicted_at)
922 : );
923 0 : continue;
924 : }
925 0 : }
926 :
927 0 : download_futs.push(self.download_layer(
928 0 : tenant_shard_id,
929 0 : &timeline.timeline_id,
930 0 : layer,
931 0 : ctx,
932 0 : ));
933 : }
934 :
935 : // Break up layer downloads into chunks, so that for each chunk we can re-check how much
936 : // concurrency to use based on activity level of remote storage.
937 0 : while !download_futs.is_empty() {
938 0 : let chunk =
939 0 : download_futs.split_off(download_futs.len().saturating_sub(MAX_LAYER_CONCURRENCY));
940 0 :
941 0 : let concurrency = Self::layer_concurrency(self.remote_storage.activity());
942 0 :
943 0 : let mut result_stream = futures::stream::iter(chunk).buffered(concurrency);
944 0 : let mut result_stream = std::pin::pin!(result_stream);
945 0 : while let Some(result) = result_stream.next().await {
946 0 : match result {
947 0 : Err(e) => return Err(e),
948 0 : Ok(None) => {
949 0 : // No error, but we didn't download the layer. Don't mark it touched
950 0 : }
951 0 : Ok(Some(layer)) => touched.push(layer),
952 : }
953 : }
954 : }
955 :
956 : // Write updates to state to record layers we just downloaded or touched.
957 : {
958 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
959 0 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
960 0 :
961 0 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
962 :
963 0 : for t in touched {
964 : use std::collections::hash_map::Entry;
965 0 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
966 0 : Entry::Occupied(mut v) => {
967 0 : v.get_mut().access_time = t.access_time;
968 0 : }
969 0 : Entry::Vacant(e) => {
970 0 : let local_path = local_layer_path(
971 0 : self.conf,
972 0 : tenant_shard_id,
973 0 : &timeline.timeline_id,
974 0 : &t.name,
975 0 : &t.metadata.generation,
976 0 : );
977 0 : e.insert(OnDiskState::new(
978 0 : self.conf,
979 0 : tenant_shard_id,
980 0 : &timeline.timeline_id,
981 0 : t.name,
982 0 : LayerFileMetadata::from(&t.metadata),
983 0 : t.access_time,
984 0 : local_path,
985 0 : ));
986 0 : }
987 : }
988 : }
989 : }
990 :
991 0 : Ok(())
992 0 : }
993 :
994 0 : async fn download_layer(
995 0 : &self,
996 0 : tenant_shard_id: &TenantShardId,
997 0 : timeline_id: &TimelineId,
998 0 : layer: HeatMapLayer,
999 0 : ctx: &RequestContext,
1000 0 : ) -> Result<Option<HeatMapLayer>, UpdateError> {
1001 0 : // Failpoint for simulating slow remote storage
1002 0 : failpoint_support::sleep_millis_async!(
1003 : "secondary-layer-download-sleep",
1004 0 : &self.secondary_state.cancel
1005 : );
1006 :
1007 0 : let local_path = local_layer_path(
1008 0 : self.conf,
1009 0 : tenant_shard_id,
1010 0 : timeline_id,
1011 0 : &layer.name,
1012 0 : &layer.metadata.generation,
1013 0 : );
1014 :
1015 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
1016 0 : let downloaded_bytes = match download_layer_file(
1017 0 : self.conf,
1018 0 : self.remote_storage,
1019 0 : *tenant_shard_id,
1020 0 : *timeline_id,
1021 0 : &layer.name,
1022 0 : &LayerFileMetadata::from(&layer.metadata),
1023 0 : &local_path,
1024 0 : &self.secondary_state.cancel,
1025 0 : ctx,
1026 0 : )
1027 0 : .await
1028 : {
1029 0 : Ok(bytes) => bytes,
1030 : Err(DownloadError::NotFound) => {
1031 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
1032 : // This is harmless: continue to download the next layer. It is expected during compaction
1033 : // GC.
1034 0 : tracing::debug!(
1035 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
1036 : layer.name
1037 : );
1038 :
1039 : // If the layer is 404, adjust the progress statistics to reflect that we will not download it.
1040 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
1041 0 : progress.layers_total = progress.layers_total.saturating_sub(1);
1042 0 : progress.bytes_total = progress
1043 0 : .bytes_total
1044 0 : .saturating_sub(layer.metadata.file_size);
1045 0 :
1046 0 : return Ok(None);
1047 : }
1048 0 : Err(e) => return Err(e.into()),
1049 : };
1050 :
1051 0 : if downloaded_bytes != layer.metadata.file_size {
1052 0 : let local_path = local_layer_path(
1053 0 : self.conf,
1054 0 : tenant_shard_id,
1055 0 : timeline_id,
1056 0 : &layer.name,
1057 0 : &layer.metadata.generation,
1058 0 : );
1059 0 :
1060 0 : tracing::warn!(
1061 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
1062 : layer.name,
1063 : downloaded_bytes,
1064 : layer.metadata.file_size
1065 : );
1066 :
1067 0 : tokio::fs::remove_file(&local_path)
1068 0 : .await
1069 0 : .or_else(fs_ext::ignore_not_found)?;
1070 : } else {
1071 0 : tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
1072 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
1073 0 : progress.bytes_downloaded += downloaded_bytes;
1074 0 : progress.layers_downloaded += 1;
1075 : }
1076 :
1077 0 : SECONDARY_MODE.download_layer.inc();
1078 0 :
1079 0 : Ok(Some(layer))
1080 0 : }
1081 :
1082 : /// Calculate the currently allowed parallelism of layer download tasks, based on activity level of the remote storage
1083 8 : fn layer_concurrency(activity: RemoteStorageActivity) -> usize {
1084 8 : // When less than 75% of units are available, use minimum concurrency. Else, do a linear mapping
1085 8 : // of our concurrency range to the units available within the remaining 25%.
1086 8 : let clamp_at = (activity.read_total * 3) / 4;
1087 8 : if activity.read_available > clamp_at {
1088 4 : (MAX_LAYER_CONCURRENCY * (activity.read_available - clamp_at))
1089 4 : / (activity.read_total - clamp_at)
1090 : } else {
1091 4 : MIN_LAYER_CONCURRENCY
1092 : }
1093 8 : }
1094 : }
1095 :
1096 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
1097 0 : async fn init_timeline_state(
1098 0 : conf: &'static PageServerConf,
1099 0 : tenant_shard_id: &TenantShardId,
1100 0 : heatmap: &HeatMapTimeline,
1101 0 : ) -> SecondaryDetailTimeline {
1102 0 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
1103 0 : let mut detail = SecondaryDetailTimeline::default();
1104 :
1105 0 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
1106 0 : Ok(d) => d,
1107 0 : Err(e) => {
1108 0 : if e.kind() == std::io::ErrorKind::NotFound {
1109 0 : let context = format!("Creating timeline directory {timeline_path}");
1110 0 : tracing::info!("{}", context);
1111 0 : tokio::fs::create_dir_all(&timeline_path)
1112 0 : .await
1113 0 : .fatal_err(&context);
1114 0 :
1115 0 : // No entries to report: drop out.
1116 0 : return detail;
1117 : } else {
1118 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
1119 : }
1120 : }
1121 : };
1122 :
1123 : // As we iterate through layers found on disk, we will look up their metadata from this map.
1124 : // Layers not present in metadata will be discarded.
1125 0 : let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
1126 0 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
1127 :
1128 0 : while let Some(dentry) = dir
1129 0 : .next_entry()
1130 0 : .await
1131 0 : .fatal_err(&format!("Listing {timeline_path}"))
1132 : {
1133 0 : let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
1134 0 : tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
1135 0 : continue;
1136 : };
1137 0 : let local_meta = dentry
1138 0 : .metadata()
1139 0 : .await
1140 0 : .fatal_err(&format!("Read metadata on {}", file_path));
1141 0 :
1142 0 : let file_name = file_path.file_name().expect("created it from the dentry");
1143 0 : if crate::is_temporary(&file_path)
1144 0 : || is_temp_download_file(&file_path)
1145 0 : || is_ephemeral_file(file_name)
1146 : {
1147 : // Temporary files are frequently left behind from restarting during downloads
1148 0 : tracing::info!("Cleaning up temporary file {file_path}");
1149 0 : if let Err(e) = tokio::fs::remove_file(&file_path)
1150 0 : .await
1151 0 : .or_else(fs_ext::ignore_not_found)
1152 : {
1153 0 : tracing::error!("Failed to remove temporary file {file_path}: {e}");
1154 0 : }
1155 0 : continue;
1156 0 : }
1157 0 :
1158 0 : match LayerName::from_str(file_name) {
1159 0 : Ok(name) => {
1160 0 : let remote_meta = heatmap_metadata.get(&name);
1161 0 : match remote_meta {
1162 0 : Some(remote_meta) => {
1163 0 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
1164 0 : if local_meta.len() != remote_meta.metadata.file_size {
1165 : // This should not happen, because we do crashsafe write-then-rename when downloading
1166 : // layers, and layers in remote storage are immutable. Remove the local file because
1167 : // we cannot trust it.
1168 0 : tracing::warn!(
1169 0 : "Removing local layer {name} with unexpected local size {} != {}",
1170 0 : local_meta.len(),
1171 : remote_meta.metadata.file_size
1172 : );
1173 0 : } else {
1174 0 : // We expect the access time to be initialized immediately afterwards, when
1175 0 : // the latest heatmap is applied to the state.
1176 0 : detail.on_disk_layers.insert(
1177 0 : name.clone(),
1178 0 : OnDiskState::new(
1179 0 : conf,
1180 0 : tenant_shard_id,
1181 0 : &heatmap.timeline_id,
1182 0 : name,
1183 0 : LayerFileMetadata::from(&remote_meta.metadata),
1184 0 : remote_meta.access_time,
1185 0 : file_path,
1186 0 : ),
1187 0 : );
1188 0 : }
1189 : }
1190 : None => {
1191 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
1192 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
1193 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
1194 0 : tracing::info!(
1195 0 : "Removing secondary local layer {} because it's absent in heatmap",
1196 : name
1197 : );
1198 0 : tokio::fs::remove_file(&dentry.path())
1199 0 : .await
1200 0 : .or_else(fs_ext::ignore_not_found)
1201 0 : .fatal_err(&format!(
1202 0 : "Removing layer {}",
1203 0 : dentry.path().to_string_lossy()
1204 0 : ));
1205 : }
1206 : }
1207 : }
1208 : Err(_) => {
1209 : // Ignore it.
1210 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
1211 : }
1212 : }
1213 : }
1214 :
1215 0 : detail
1216 0 : }
1217 :
1218 : #[cfg(test)]
1219 : mod test {
1220 : use super::*;
1221 :
1222 : #[test]
1223 2 : fn layer_concurrency() {
1224 2 : // Totally idle
1225 2 : assert_eq!(
1226 2 : TenantDownloader::layer_concurrency(RemoteStorageActivity {
1227 2 : read_available: 16,
1228 2 : read_total: 16,
1229 2 : write_available: 16,
1230 2 : write_total: 16
1231 2 : }),
1232 2 : MAX_LAYER_CONCURRENCY
1233 2 : );
1234 :
1235 : // Totally busy
1236 2 : assert_eq!(
1237 2 : TenantDownloader::layer_concurrency(RemoteStorageActivity {
1238 2 : read_available: 0,
1239 2 : read_total: 16,
1240 2 :
1241 2 : write_available: 16,
1242 2 : write_total: 16
1243 2 : }),
1244 2 : MIN_LAYER_CONCURRENCY
1245 2 : );
1246 :
1247 : // Edge of the range at which we interpolate
1248 2 : assert_eq!(
1249 2 : TenantDownloader::layer_concurrency(RemoteStorageActivity {
1250 2 : read_available: 12,
1251 2 : read_total: 16,
1252 2 :
1253 2 : write_available: 16,
1254 2 : write_total: 16
1255 2 : }),
1256 2 : MIN_LAYER_CONCURRENCY
1257 2 : );
1258 :
1259 : // Midpoint of the range in which we interpolate
1260 2 : assert_eq!(
1261 2 : TenantDownloader::layer_concurrency(RemoteStorageActivity {
1262 2 : read_available: 14,
1263 2 : read_total: 16,
1264 2 :
1265 2 : write_available: 16,
1266 2 : write_total: 16
1267 2 : }),
1268 2 : MAX_LAYER_CONCURRENCY / 2
1269 2 : );
1270 2 : }
1271 : }
|