LCOV - code coverage report
Current view: top level - pageserver/src/tenant/secondary - downloader.rs (source / functions) Coverage Total Hit
Test: a763ab26c1b6370545fbbeb2ea08bc704007644b.info Lines: 0.0 % 1007 0
Test Date: 2025-03-17 19:00:53 Functions: 0.0 % 81 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::pin::Pin;
       3              : use std::str::FromStr;
       4              : use std::sync::Arc;
       5              : use std::time::{Duration, Instant, SystemTime};
       6              : 
       7              : use crate::metrics::{STORAGE_IO_SIZE, StorageIoSizeOperation};
       8              : use camino::Utf8PathBuf;
       9              : use chrono::format::{DelayedFormat, StrftimeItems};
      10              : use futures::Future;
      11              : use metrics::UIntGauge;
      12              : use pageserver_api::models::SecondaryProgress;
      13              : use pageserver_api::shard::TenantShardId;
      14              : use remote_storage::{DownloadError, DownloadKind, DownloadOpts, Etag, GenericRemoteStorage};
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::{Instrument, info_span, instrument, warn};
      17              : use utils::completion::Barrier;
      18              : use utils::crashsafe::path_with_suffix_extension;
      19              : use utils::id::TimelineId;
      20              : use utils::{backoff, failpoint_support, fs_ext, pausable_failpoint, serde_system_time};
      21              : 
      22              : use super::heatmap::{HeatMapLayer, HeatMapTenant, HeatMapTimeline};
      23              : use super::scheduler::{
      24              :     self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs, period_jitter,
      25              :     period_warmup,
      26              : };
      27              : use super::{
      28              :     CommandRequest, DownloadCommand, GetTenantError, SecondaryTenant, SecondaryTenantError,
      29              : };
      30              : use crate::TEMP_FILE_SUFFIX;
      31              : use crate::config::PageServerConf;
      32              : use crate::context::RequestContext;
      33              : use crate::disk_usage_eviction_task::{
      34              :     DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer, finite_f32,
      35              : };
      36              : use crate::metrics::SECONDARY_MODE;
      37              : use crate::tenant::config::SecondaryLocationConfig;
      38              : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
      39              : use crate::tenant::ephemeral_file::is_ephemeral_file;
      40              : use crate::tenant::mgr::TenantManager;
      41              : use crate::tenant::remote_timeline_client::download::download_layer_file;
      42              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
      43              : use crate::tenant::remote_timeline_client::{
      44              :     FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, is_temp_download_file,
      45              :     remote_heatmap_path,
      46              : };
      47              : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
      48              : use crate::tenant::storage_layer::layer::local_layer_path;
      49              : use crate::tenant::storage_layer::{LayerName, LayerVisibilityHint};
      50              : use crate::tenant::tasks::{BackgroundLoopKind, warn_when_period_overrun};
      51              : use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
      52              : 
      53              : /// For each tenant, default period for how long must have passed since the last download_tenant call before
      54              : /// calling it again.  This default is replaced with the value of [`HeatMapTenant::upload_period_ms`] after first
      55              : /// download, if the uploader populated it.
      56              : const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
      57              : 
      58            0 : pub(super) async fn downloader_task(
      59            0 :     tenant_manager: Arc<TenantManager>,
      60            0 :     remote_storage: GenericRemoteStorage,
      61            0 :     command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
      62            0 :     background_jobs_can_start: Barrier,
      63            0 :     cancel: CancellationToken,
      64            0 :     root_ctx: RequestContext,
      65            0 : ) {
      66            0 :     let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
      67            0 : 
      68            0 :     let generator = SecondaryDownloader {
      69            0 :         tenant_manager,
      70            0 :         remote_storage,
      71            0 :         root_ctx,
      72            0 :     };
      73            0 :     let mut scheduler = Scheduler::new(generator, concurrency);
      74            0 : 
      75            0 :     scheduler
      76            0 :         .run(command_queue, background_jobs_can_start, cancel)
      77            0 :         .instrument(info_span!("secondary_download_scheduler"))
      78            0 :         .await
      79            0 : }
      80              : 
      81              : struct SecondaryDownloader {
      82              :     tenant_manager: Arc<TenantManager>,
      83              :     remote_storage: GenericRemoteStorage,
      84              :     root_ctx: RequestContext,
      85              : }
      86              : 
      87              : #[derive(Debug, Clone)]
      88              : pub(super) struct OnDiskState {
      89              :     metadata: LayerFileMetadata,
      90              :     access_time: SystemTime,
      91              :     local_path: Utf8PathBuf,
      92              : }
      93              : 
      94              : impl OnDiskState {
      95            0 :     fn new(
      96            0 :         _conf: &'static PageServerConf,
      97            0 :         _tenant_shard_id: &TenantShardId,
      98            0 :         _imeline_id: &TimelineId,
      99            0 :         _ame: LayerName,
     100            0 :         metadata: LayerFileMetadata,
     101            0 :         access_time: SystemTime,
     102            0 :         local_path: Utf8PathBuf,
     103            0 :     ) -> Self {
     104            0 :         Self {
     105            0 :             metadata,
     106            0 :             access_time,
     107            0 :             local_path,
     108            0 :         }
     109            0 :     }
     110              : 
     111              :     // This is infallible, because all errors are either acceptable (ENOENT), or totally
     112              :     // unexpected (fatal).
     113            0 :     pub(super) fn remove_blocking(&self) {
     114            0 :         // We tolerate ENOENT, because between planning eviction and executing
     115            0 :         // it, the secondary downloader could have seen an updated heatmap that
     116            0 :         // resulted in a layer being deleted.
     117            0 :         // Other local I/O errors are process-fatal: these should never happen.
     118            0 :         std::fs::remove_file(&self.local_path)
     119            0 :             .or_else(fs_ext::ignore_not_found)
     120            0 :             .fatal_err("Deleting secondary layer")
     121            0 :     }
     122              : 
     123            0 :     pub(crate) fn file_size(&self) -> u64 {
     124            0 :         self.metadata.file_size
     125            0 :     }
     126              : }
     127              : 
     128              : pub(super) struct SecondaryDetailTimeline {
     129              :     on_disk_layers: HashMap<LayerName, OnDiskState>,
     130              : 
     131              :     /// We remember when layers were evicted, to prevent re-downloading them.
     132              :     pub(super) evicted_at: HashMap<LayerName, SystemTime>,
     133              : 
     134              :     ctx: RequestContext,
     135              : }
     136              : 
     137              : impl Clone for SecondaryDetailTimeline {
     138            0 :     fn clone(&self) -> Self {
     139            0 :         Self {
     140            0 :             on_disk_layers: self.on_disk_layers.clone(),
     141            0 :             evicted_at: self.evicted_at.clone(),
     142            0 :             // This is a bit awkward. The downloader code operates on a snapshot
     143            0 :             // of the secondary list to avoid locking it for extended periods of time.
     144            0 :             // No particularly strong reason to chose [`RequestContext::detached_child`],
     145            0 :             // but makes more sense than [`RequestContext::attached_child`].
     146            0 :             ctx: self
     147            0 :                 .ctx
     148            0 :                 .detached_child(self.ctx.task_kind(), self.ctx.download_behavior()),
     149            0 :         }
     150            0 :     }
     151              : }
     152              : 
     153              : impl std::fmt::Debug for SecondaryDetailTimeline {
     154            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     155            0 :         f.debug_struct("SecondaryDetailTimeline")
     156            0 :             .field("on_disk_layers", &self.on_disk_layers)
     157            0 :             .field("evicted_at", &self.evicted_at)
     158            0 :             .finish()
     159            0 :     }
     160              : }
     161              : 
     162              : impl SecondaryDetailTimeline {
     163            0 :     pub(super) fn empty(ctx: RequestContext) -> Self {
     164            0 :         SecondaryDetailTimeline {
     165            0 :             on_disk_layers: Default::default(),
     166            0 :             evicted_at: Default::default(),
     167            0 :             ctx,
     168            0 :         }
     169            0 :     }
     170              : 
     171            0 :     pub(super) fn context(&self) -> &RequestContext {
     172            0 :         &self.ctx
     173            0 :     }
     174              : 
     175            0 :     pub(super) fn remove_layer(
     176            0 :         &mut self,
     177            0 :         name: &LayerName,
     178            0 :         resident_metric: &UIntGauge,
     179            0 :     ) -> Option<OnDiskState> {
     180            0 :         let removed = self.on_disk_layers.remove(name);
     181            0 :         if let Some(removed) = &removed {
     182            0 :             resident_metric.sub(removed.file_size());
     183            0 :         }
     184            0 :         removed
     185            0 :     }
     186              : 
     187              :     /// `local_path`
     188            0 :     fn touch_layer<F>(
     189            0 :         &mut self,
     190            0 :         conf: &'static PageServerConf,
     191            0 :         tenant_shard_id: &TenantShardId,
     192            0 :         timeline_id: &TimelineId,
     193            0 :         touched: &HeatMapLayer,
     194            0 :         resident_metric: &UIntGauge,
     195            0 :         local_path: F,
     196            0 :     ) where
     197            0 :         F: FnOnce() -> Utf8PathBuf,
     198            0 :     {
     199              :         use std::collections::hash_map::Entry;
     200            0 :         match self.on_disk_layers.entry(touched.name.clone()) {
     201            0 :             Entry::Occupied(mut v) => {
     202            0 :                 v.get_mut().access_time = touched.access_time;
     203            0 :             }
     204            0 :             Entry::Vacant(e) => {
     205            0 :                 e.insert(OnDiskState::new(
     206            0 :                     conf,
     207            0 :                     tenant_shard_id,
     208            0 :                     timeline_id,
     209            0 :                     touched.name.clone(),
     210            0 :                     touched.metadata.clone(),
     211            0 :                     touched.access_time,
     212            0 :                     local_path(),
     213            0 :                 ));
     214            0 :                 resident_metric.add(touched.metadata.file_size);
     215            0 :             }
     216              :         }
     217            0 :     }
     218              : }
     219              : 
     220              : // Aspects of a heatmap that we remember after downloading it
     221              : #[derive(Clone, Debug)]
     222              : struct DownloadSummary {
     223              :     etag: Etag,
     224              :     #[allow(unused)]
     225              :     mtime: SystemTime,
     226              :     upload_period: Duration,
     227              : }
     228              : 
     229              : /// This state is written by the secondary downloader, it is opaque
     230              : /// to TenantManager
     231              : #[derive(Debug)]
     232              : pub(super) struct SecondaryDetail {
     233              :     pub(super) config: SecondaryLocationConfig,
     234              : 
     235              :     last_download: Option<DownloadSummary>,
     236              :     next_download: Option<Instant>,
     237              :     timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
     238              : }
     239              : 
     240              : /// Helper for logging SystemTime
     241            0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
     242            0 :     let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
     243            0 :     datetime.format("%d/%m/%Y %T")
     244            0 : }
     245              : 
     246              : /// Information returned from download function when it detects the heatmap has changed
     247              : struct HeatMapModified {
     248              :     etag: Etag,
     249              :     last_modified: SystemTime,
     250              :     bytes: Vec<u8>,
     251              : }
     252              : 
     253              : enum HeatMapDownload {
     254              :     // The heatmap's etag has changed: return the new etag, mtime and the body bytes
     255              :     Modified(HeatMapModified),
     256              :     // The heatmap's etag is unchanged
     257              :     Unmodified,
     258              : }
     259              : 
     260              : impl SecondaryDetail {
     261            0 :     pub(super) fn new(config: SecondaryLocationConfig) -> Self {
     262            0 :         Self {
     263            0 :             config,
     264            0 :             last_download: None,
     265            0 :             next_download: None,
     266            0 :             timelines: HashMap::new(),
     267            0 :         }
     268            0 :     }
     269              : 
     270              :     #[cfg(feature = "testing")]
     271            0 :     pub(crate) fn total_resident_size(&self) -> u64 {
     272            0 :         self.timelines
     273            0 :             .values()
     274            0 :             .map(|tl| {
     275            0 :                 tl.on_disk_layers
     276            0 :                     .values()
     277            0 :                     .map(|v| v.metadata.file_size)
     278            0 :                     .sum::<u64>()
     279            0 :             })
     280            0 :             .sum::<u64>()
     281            0 :     }
     282              : 
     283            0 :     pub(super) fn evict_layer(
     284            0 :         &mut self,
     285            0 :         name: LayerName,
     286            0 :         timeline_id: &TimelineId,
     287            0 :         now: SystemTime,
     288            0 :         resident_metric: &UIntGauge,
     289            0 :     ) -> Option<OnDiskState> {
     290            0 :         let timeline = self.timelines.get_mut(timeline_id)?;
     291            0 :         let removed = timeline.remove_layer(&name, resident_metric);
     292            0 :         if removed.is_some() {
     293            0 :             timeline.evicted_at.insert(name, now);
     294            0 :         }
     295            0 :         removed
     296            0 :     }
     297              : 
     298            0 :     pub(super) fn remove_timeline(
     299            0 :         &mut self,
     300            0 :         tenant_shard_id: &TenantShardId,
     301            0 :         timeline_id: &TimelineId,
     302            0 :         resident_metric: &UIntGauge,
     303            0 :     ) {
     304            0 :         let removed = self.timelines.remove(timeline_id);
     305            0 :         if let Some(removed) = removed {
     306            0 :             resident_metric.sub(
     307            0 :                 removed
     308            0 :                     .on_disk_layers
     309            0 :                     .values()
     310            0 :                     .map(|l| l.metadata.file_size)
     311            0 :                     .sum(),
     312            0 :             );
     313            0 : 
     314            0 :             let shard_id = format!("{}", tenant_shard_id.shard_slug());
     315            0 :             let tenant_id = tenant_shard_id.tenant_id.to_string();
     316            0 :             let timeline_id = timeline_id.to_string();
     317            0 :             for op in StorageIoSizeOperation::VARIANTS {
     318            0 :                 let _ = STORAGE_IO_SIZE.remove_label_values(&[
     319            0 :                     op,
     320            0 :                     tenant_id.as_str(),
     321            0 :                     shard_id.as_str(),
     322            0 :                     timeline_id.as_str(),
     323            0 :                 ]);
     324            0 :             }
     325            0 :         }
     326            0 :     }
     327              : 
     328              :     /// Additionally returns the total number of layers, used for more stable relative access time
     329              :     /// based eviction.
     330            0 :     pub(super) fn get_layers_for_eviction(
     331            0 :         &self,
     332            0 :         parent: &Arc<SecondaryTenant>,
     333            0 :     ) -> (DiskUsageEvictionInfo, usize) {
     334            0 :         let mut result = DiskUsageEvictionInfo::default();
     335            0 :         let mut total_layers = 0;
     336              : 
     337            0 :         for (timeline_id, timeline_detail) in &self.timelines {
     338            0 :             result
     339            0 :                 .resident_layers
     340            0 :                 .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
     341            0 :                     EvictionCandidate {
     342            0 :                         layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
     343            0 :                             secondary_tenant: parent.clone(),
     344            0 :                             timeline_id: *timeline_id,
     345            0 :                             name: name.clone(),
     346            0 :                             metadata: ods.metadata.clone(),
     347            0 :                         }),
     348            0 :                         last_activity_ts: ods.access_time,
     349            0 :                         relative_last_activity: finite_f32::FiniteF32::ZERO,
     350            0 :                         // Secondary location layers are presumed visible, because Covered layers
     351            0 :                         // are excluded from the heatmap
     352            0 :                         visibility: LayerVisibilityHint::Visible,
     353            0 :                     }
     354            0 :                 }));
     355            0 : 
     356            0 :             // total might be missing currently downloading layers, but as a lower than actual
     357            0 :             // value it is good enough approximation.
     358            0 :             total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
     359            0 :         }
     360            0 :         result.max_layer_size = result
     361            0 :             .resident_layers
     362            0 :             .iter()
     363            0 :             .map(|l| l.layer.get_file_size())
     364            0 :             .max();
     365            0 : 
     366            0 :         tracing::debug!(
     367            0 :             "eviction: secondary tenant {} found {} timelines, {} layers",
     368            0 :             parent.get_tenant_shard_id(),
     369            0 :             self.timelines.len(),
     370            0 :             result.resident_layers.len()
     371              :         );
     372              : 
     373            0 :         (result, total_layers)
     374            0 :     }
     375              : }
     376              : 
     377              : struct PendingDownload {
     378              :     secondary_state: Arc<SecondaryTenant>,
     379              :     last_download: Option<DownloadSummary>,
     380              :     target_time: Option<Instant>,
     381              : }
     382              : 
     383              : impl scheduler::PendingJob for PendingDownload {
     384            0 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     385            0 :         self.secondary_state.get_tenant_shard_id()
     386            0 :     }
     387              : }
     388              : 
     389              : struct RunningDownload {
     390              :     barrier: Barrier,
     391              : }
     392              : 
     393              : impl scheduler::RunningJob for RunningDownload {
     394            0 :     fn get_barrier(&self) -> Barrier {
     395            0 :         self.barrier.clone()
     396            0 :     }
     397              : }
     398              : 
     399              : struct CompleteDownload {
     400              :     secondary_state: Arc<SecondaryTenant>,
     401              :     completed_at: Instant,
     402              :     result: Result<(), UpdateError>,
     403              : }
     404              : 
     405              : impl scheduler::Completion for CompleteDownload {
     406            0 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     407            0 :         self.secondary_state.get_tenant_shard_id()
     408            0 :     }
     409              : }
     410              : 
     411              : type Scheduler = TenantBackgroundJobs<
     412              :     SecondaryDownloader,
     413              :     PendingDownload,
     414              :     RunningDownload,
     415              :     CompleteDownload,
     416              :     DownloadCommand,
     417              : >;
     418              : 
     419              : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
     420              :     for SecondaryDownloader
     421              : {
     422              :     #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
     423              :     fn on_completion(&mut self, completion: CompleteDownload) {
     424              :         let CompleteDownload {
     425              :             secondary_state,
     426              :             completed_at: _completed_at,
     427              :             result,
     428              :         } = completion;
     429              : 
     430              :         tracing::debug!("Secondary tenant download completed");
     431              : 
     432              :         let mut detail = secondary_state.detail.lock().unwrap();
     433              : 
     434              :         match result {
     435              :             Err(UpdateError::Restart) => {
     436              :                 // Start downloading again as soon as we can.  This will involve waiting for the scheduler's
     437              :                 // scheduling interval.  This slightly reduces the peak download speed of tenants that hit their
     438              :                 // deadline and keep restarting, but that also helps give other tenants a chance to execute rather
     439              :                 // that letting one big tenant dominate for a long time.
     440              :                 detail.next_download = Some(Instant::now());
     441              :             }
     442              :             _ => {
     443              :                 let period = detail
     444              :                     .last_download
     445              :                     .as_ref()
     446            0 :                     .map(|d| d.upload_period)
     447              :                     .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
     448              : 
     449              :                 // We advance next_download irrespective of errors: we don't want error cases to result in
     450              :                 // expensive busy-polling.
     451              :                 detail.next_download = Some(Instant::now() + period_jitter(period, 5));
     452              :             }
     453              :         }
     454              :     }
     455              : 
     456            0 :     async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
     457            0 :         let mut result = SchedulingResult {
     458            0 :             jobs: Vec::new(),
     459            0 :             want_interval: None,
     460            0 :         };
     461            0 : 
     462            0 :         // Step 1: identify some tenants that we may work on
     463            0 :         let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
     464            0 :         self.tenant_manager
     465            0 :             .foreach_secondary_tenants(|_id, secondary_state| {
     466            0 :                 tenants.push(secondary_state.clone());
     467            0 :             });
     468            0 : 
     469            0 :         // Step 2: filter out tenants which are not yet elegible to run
     470            0 :         let now = Instant::now();
     471            0 :         result.jobs = tenants
     472            0 :             .into_iter()
     473            0 :             .filter_map(|secondary_tenant| {
     474            0 :                 let (last_download, next_download) = {
     475            0 :                     let mut detail = secondary_tenant.detail.lock().unwrap();
     476            0 : 
     477            0 :                     if !detail.config.warm {
     478              :                         // Downloads are disabled for this tenant
     479            0 :                         detail.next_download = None;
     480            0 :                         return None;
     481            0 :                     }
     482            0 : 
     483            0 :                     if detail.next_download.is_none() {
     484            0 :                         // Initialize randomly in the range from 0 to our interval: this uniformly spreads the start times.  Subsequent
     485            0 :                         // rounds will use a smaller jitter to avoid accidentally synchronizing later.
     486            0 :                         detail.next_download = Some(now.checked_add(period_warmup(DEFAULT_DOWNLOAD_INTERVAL)).expect(
     487            0 :                         "Using our constant, which is known to be small compared with clock range",
     488            0 :                     ));
     489            0 :                     }
     490            0 :                     (detail.last_download.clone(), detail.next_download.unwrap())
     491            0 :                 };
     492            0 : 
     493            0 :                 if now > next_download {
     494            0 :                     Some(PendingDownload {
     495            0 :                         secondary_state: secondary_tenant,
     496            0 :                         last_download,
     497            0 :                         target_time: Some(next_download),
     498            0 :                     })
     499              :                 } else {
     500            0 :                     None
     501              :                 }
     502            0 :             })
     503            0 :             .collect();
     504            0 : 
     505            0 :         // Step 3: sort by target execution time to run most urgent first.
     506            0 :         result.jobs.sort_by_key(|j| j.target_time);
     507            0 : 
     508            0 :         result
     509            0 :     }
     510              : 
     511            0 :     fn on_command(
     512            0 :         &mut self,
     513            0 :         command: DownloadCommand,
     514            0 :     ) -> Result<PendingDownload, SecondaryTenantError> {
     515            0 :         let tenant_shard_id = command.get_tenant_shard_id();
     516              : 
     517            0 :         let tenant = self
     518            0 :             .tenant_manager
     519            0 :             .get_secondary_tenant_shard(*tenant_shard_id)
     520            0 :             .ok_or(GetTenantError::ShardNotFound(*tenant_shard_id))?;
     521              : 
     522            0 :         Ok(PendingDownload {
     523            0 :             target_time: None,
     524            0 :             last_download: None,
     525            0 :             secondary_state: tenant,
     526            0 :         })
     527            0 :     }
     528              : 
     529            0 :     fn spawn(
     530            0 :         &mut self,
     531            0 :         job: PendingDownload,
     532            0 :     ) -> (
     533            0 :         RunningDownload,
     534            0 :         Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
     535            0 :     ) {
     536            0 :         let PendingDownload {
     537            0 :             secondary_state,
     538            0 :             last_download,
     539            0 :             target_time,
     540            0 :         } = job;
     541            0 : 
     542            0 :         let (completion, barrier) = utils::completion::channel();
     543            0 :         let remote_storage = self.remote_storage.clone();
     544            0 :         let conf = self.tenant_manager.get_conf();
     545            0 :         let tenant_shard_id = *secondary_state.get_tenant_shard_id();
     546            0 :         let download_ctx = self
     547            0 :             .root_ctx
     548            0 :             .attached_child()
     549            0 :             .with_scope_secondary_tenant(&tenant_shard_id);
     550            0 :         (RunningDownload { barrier }, Box::pin(async move {
     551            0 :             let _completion = completion;
     552              : 
     553            0 :             let result = TenantDownloader::new(conf, &remote_storage, &secondary_state)
     554            0 :                 .download(&download_ctx)
     555            0 :                 .await;
     556            0 :             match &result
     557              :             {
     558              :                 Err(UpdateError::NoData) => {
     559            0 :                     tracing::info!("No heatmap found for tenant.  This is fine if it is new.");
     560              :                 },
     561              :                 Err(UpdateError::NoSpace) => {
     562            0 :                     tracing::warn!("Insufficient space while downloading.  Will retry later.");
     563              :                 }
     564              :                 Err(UpdateError::Cancelled) => {
     565            0 :                     tracing::info!("Shut down while downloading");
     566              :                 },
     567            0 :                 Err(UpdateError::Deserialize(e)) => {
     568            0 :                     tracing::error!("Corrupt content while downloading tenant: {e}");
     569              :                 },
     570            0 :                 Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
     571            0 :                     tracing::error!("Error while downloading tenant: {e}");
     572              :                 },
     573              :                 Err(UpdateError::Restart) => {
     574            0 :                     tracing::info!("Download reached deadline & will restart to update heatmap")
     575              :                 }
     576            0 :                 Ok(()) => {}
     577              :             };
     578              : 
     579              :             // Irrespective of the result, we will reschedule ourselves to run after our usual period.
     580              : 
     581              :             // If the job had a target execution time, we may check our final execution
     582              :             // time against that for observability purposes.
     583            0 :             if let (Some(target_time), Some(last_download)) = (target_time, last_download) {
     584            0 :                 // Elapsed time includes any scheduling lag as well as the execution of the job
     585            0 :                 let elapsed = Instant::now().duration_since(target_time);
     586            0 : 
     587            0 :                 warn_when_period_overrun(
     588            0 :                     elapsed,
     589            0 :                     last_download.upload_period,
     590            0 :                     BackgroundLoopKind::SecondaryDownload,
     591            0 :                 );
     592            0 :             }
     593              : 
     594            0 :             CompleteDownload {
     595            0 :                 secondary_state,
     596            0 :                 completed_at: Instant::now(),
     597            0 :                 result
     598            0 :             }
     599            0 :         }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
     600            0 :     }
     601              : }
     602              : 
     603              : enum LayerAction {
     604              :     Download,
     605              :     NoAction,
     606              :     Skip,
     607              :     Touch,
     608              : }
     609              : 
     610              : /// This type is a convenience to group together the various functions involved in
     611              : /// freshening a secondary tenant.
     612              : struct TenantDownloader<'a> {
     613              :     conf: &'static PageServerConf,
     614              :     remote_storage: &'a GenericRemoteStorage,
     615              :     secondary_state: &'a SecondaryTenant,
     616              : }
     617              : 
     618              : /// Errors that may be encountered while updating a tenant
     619              : #[derive(thiserror::Error, Debug)]
     620              : enum UpdateError {
     621              :     /// This is not a true failure, but it's how a download indicates that it would like to be restarted by
     622              :     /// the scheduler, to pick up the latest heatmap
     623              :     #[error("Reached deadline, restarting downloads")]
     624              :     Restart,
     625              : 
     626              :     #[error("No remote data found")]
     627              :     NoData,
     628              :     #[error("Insufficient local storage space")]
     629              :     NoSpace,
     630              :     #[error("Failed to download")]
     631              :     DownloadError(DownloadError),
     632              :     #[error(transparent)]
     633              :     Deserialize(#[from] serde_json::Error),
     634              :     #[error("Cancelled")]
     635              :     Cancelled,
     636              :     #[error(transparent)]
     637              :     Other(#[from] anyhow::Error),
     638              : }
     639              : 
     640              : impl From<DownloadError> for UpdateError {
     641            0 :     fn from(value: DownloadError) -> Self {
     642            0 :         match &value {
     643            0 :             DownloadError::Cancelled => Self::Cancelled,
     644            0 :             DownloadError::NotFound => Self::NoData,
     645            0 :             _ => Self::DownloadError(value),
     646              :         }
     647            0 :     }
     648              : }
     649              : 
     650              : impl From<std::io::Error> for UpdateError {
     651            0 :     fn from(value: std::io::Error) -> Self {
     652            0 :         if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
     653            0 :             UpdateError::NoSpace
     654            0 :         } else if value
     655            0 :             .get_ref()
     656            0 :             .and_then(|x| x.downcast_ref::<DownloadError>())
     657            0 :             .is_some()
     658              :         {
     659            0 :             UpdateError::from(DownloadError::from(value))
     660              :         } else {
     661              :             // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
     662            0 :             UpdateError::Other(anyhow::anyhow!(value))
     663              :         }
     664            0 :     }
     665              : }
     666              : 
     667              : impl<'a> TenantDownloader<'a> {
     668            0 :     fn new(
     669            0 :         conf: &'static PageServerConf,
     670            0 :         remote_storage: &'a GenericRemoteStorage,
     671            0 :         secondary_state: &'a SecondaryTenant,
     672            0 :     ) -> Self {
     673            0 :         Self {
     674            0 :             conf,
     675            0 :             remote_storage,
     676            0 :             secondary_state,
     677            0 :         }
     678            0 :     }
     679              : 
     680            0 :     async fn download(&self, ctx: &RequestContext) -> Result<(), UpdateError> {
     681            0 :         debug_assert_current_span_has_tenant_id();
     682              : 
     683              :         // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
     684              :         // cover our access to local storage.
     685            0 :         let Ok(_guard) = self.secondary_state.gate.enter() else {
     686              :             // Shutting down
     687            0 :             return Err(UpdateError::Cancelled);
     688              :         };
     689              : 
     690            0 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
     691            0 : 
     692            0 :         // We will use the etag from last successful download to make the download conditional on changes
     693            0 :         let last_download = self
     694            0 :             .secondary_state
     695            0 :             .detail
     696            0 :             .lock()
     697            0 :             .unwrap()
     698            0 :             .last_download
     699            0 :             .clone();
     700              : 
     701              :         // Download the tenant's heatmap
     702              :         let HeatMapModified {
     703            0 :             last_modified: heatmap_mtime,
     704            0 :             etag: heatmap_etag,
     705            0 :             bytes: heatmap_bytes,
     706            0 :         } = match tokio::select!(
     707            0 :             bytes = self.download_heatmap(last_download.as_ref().map(|d| &d.etag)) => {bytes?},
     708            0 :             _ = self.secondary_state.cancel.cancelled() => return Ok(())
     709              :         ) {
     710              :             HeatMapDownload::Unmodified => {
     711            0 :                 tracing::info!("Heatmap unchanged since last successful download");
     712            0 :                 return Ok(());
     713              :             }
     714            0 :             HeatMapDownload::Modified(m) => m,
     715            0 :         };
     716            0 : 
     717            0 :         // Heatmap storage location
     718            0 :         let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
     719              : 
     720            0 :         let last_heatmap = if last_download.is_none() {
     721            0 :             match load_heatmap(&heatmap_path, ctx).await {
     722            0 :                 Ok(htm) => htm,
     723            0 :                 Err(e) => {
     724            0 :                     tracing::warn!("Couldn't load heatmap from {heatmap_path}: {e:?}");
     725            0 :                     None
     726              :                 }
     727              :             }
     728              :         } else {
     729            0 :             None
     730              :         };
     731              : 
     732            0 :         let last_heatmap_timelines = last_heatmap.as_ref().map(|htm| {
     733            0 :             htm.timelines
     734            0 :                 .iter()
     735            0 :                 .map(|tl| (tl.timeline_id, tl))
     736            0 :                 .collect::<HashMap<_, _>>()
     737            0 :         });
     738              : 
     739            0 :         let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
     740              : 
     741            0 :         let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
     742            0 :         let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
     743            0 :         let heatmap_path_bg = heatmap_path.clone();
     744            0 :         VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
     745            0 :             .await
     746            0 :             .maybe_fatal_err(&context_msg)?;
     747              : 
     748            0 :         tracing::debug!(
     749            0 :             "Wrote local heatmap to {}, with {} timelines",
     750            0 :             heatmap_path,
     751            0 :             heatmap.timelines.len()
     752              :         );
     753              : 
     754              :         // Get or initialize the local disk state for the timelines we will update
     755            0 :         let mut timeline_states = HashMap::new();
     756            0 :         for timeline in &heatmap.timelines {
     757            0 :             let timeline_state = self
     758            0 :                 .secondary_state
     759            0 :                 .detail
     760            0 :                 .lock()
     761            0 :                 .unwrap()
     762            0 :                 .timelines
     763            0 :                 .get(&timeline.timeline_id)
     764            0 :                 .cloned();
     765              : 
     766            0 :             let timeline_state = match timeline_state {
     767            0 :                 Some(t) => t,
     768              :                 None => {
     769            0 :                     let last_heatmap =
     770            0 :                         last_heatmap_timelines
     771            0 :                             .as_ref()
     772            0 :                             .and_then(|last_heatmap_timelines| {
     773            0 :                                 last_heatmap_timelines.get(&timeline.timeline_id).copied()
     774            0 :                             });
     775              :                     // We have no existing state: need to scan local disk for layers first.
     776            0 :                     let timeline_state = init_timeline_state(
     777            0 :                         self.conf,
     778            0 :                         tenant_shard_id,
     779            0 :                         last_heatmap,
     780            0 :                         timeline,
     781            0 :                         &self.secondary_state.resident_size_metric,
     782            0 :                         ctx,
     783            0 :                     )
     784            0 :                     .await;
     785              : 
     786              :                     // Re-acquire detail lock now that we're done with async load from local FS
     787            0 :                     self.secondary_state
     788            0 :                         .detail
     789            0 :                         .lock()
     790            0 :                         .unwrap()
     791            0 :                         .timelines
     792            0 :                         .insert(timeline.timeline_id, timeline_state.clone());
     793            0 :                     timeline_state
     794              :                 }
     795              :             };
     796              : 
     797            0 :             timeline_states.insert(timeline.timeline_id, timeline_state);
     798              :         }
     799              : 
     800              :         // Clean up any local layers that aren't in the heatmap.  We do this first for all timelines, on the general
     801              :         // principle that deletions should be done before writes wherever possible, and so that we can use this
     802              :         // phase to initialize our SecondaryProgress.
     803              :         {
     804            0 :             *self.secondary_state.progress.lock().unwrap() =
     805            0 :                 self.prepare_timelines(&heatmap, heatmap_mtime).await?;
     806              :         }
     807              : 
     808              :         // Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again,
     809              :         // so that we are always using reasonably a fresh heatmap.  Otherwise, if we had really huge content to download, we might
     810              :         // spend 10s of minutes downloading layers we don't need.
     811              :         // (see https://github.com/neondatabase/neon/issues/8182)
     812            0 :         let deadline = {
     813            0 :             let period = self
     814            0 :                 .secondary_state
     815            0 :                 .detail
     816            0 :                 .lock()
     817            0 :                 .unwrap()
     818            0 :                 .last_download
     819            0 :                 .as_ref()
     820            0 :                 .map(|d| d.upload_period)
     821            0 :                 .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
     822            0 : 
     823            0 :             // Use double the period: we are not promising to complete within the period, this is just a heuristic
     824            0 :             // to keep using a "reasonably fresh" heatmap.
     825            0 :             Instant::now() + period * 2
     826              :         };
     827              : 
     828              :         // Download the layers in the heatmap
     829            0 :         for timeline in heatmap.timelines {
     830            0 :             let timeline_state = timeline_states
     831            0 :                 .remove(&timeline.timeline_id)
     832            0 :                 .expect("Just populated above");
     833            0 : 
     834            0 :             if self.secondary_state.cancel.is_cancelled() {
     835            0 :                 tracing::debug!(
     836            0 :                     "Cancelled before downloading timeline {}",
     837              :                     timeline.timeline_id
     838              :                 );
     839            0 :                 return Ok(());
     840            0 :             }
     841            0 : 
     842            0 :             let timeline_id = timeline.timeline_id;
     843            0 :             self.download_timeline(timeline, timeline_state, deadline, ctx)
     844            0 :                 .instrument(tracing::info_span!(
     845              :                     "secondary_download_timeline",
     846              :                     tenant_id=%tenant_shard_id.tenant_id,
     847            0 :                     shard_id=%tenant_shard_id.shard_slug(),
     848              :                     %timeline_id
     849              :                 ))
     850            0 :                 .await?;
     851              :         }
     852              : 
     853              :         // Metrics consistency check in testing builds
     854            0 :         self.secondary_state.validate_metrics();
     855            0 :         // Only update last_etag after a full successful download: this way will not skip
     856            0 :         // the next download, even if the heatmap's actual etag is unchanged.
     857            0 :         self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
     858            0 :             etag: heatmap_etag,
     859            0 :             mtime: heatmap_mtime,
     860            0 :             upload_period: heatmap
     861            0 :                 .upload_period_ms
     862            0 :                 .map(|ms| Duration::from_millis(ms as u64))
     863            0 :                 .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL),
     864            0 :         });
     865            0 : 
     866            0 :         // Robustness: we should have updated progress properly, but in case we didn't, make sure
     867            0 :         // we don't leave the tenant in a state where we claim to have successfully downloaded
     868            0 :         // everything, but our progress is incomplete.  The invariant here should be that if
     869            0 :         // we have set `last_download` to this heatmap's etag, then the next time we see that
     870            0 :         // etag we can safely do no work (i.e. we must be complete).
     871            0 :         let mut progress = self.secondary_state.progress.lock().unwrap();
     872            0 :         debug_assert!(progress.layers_downloaded == progress.layers_total);
     873            0 :         debug_assert!(progress.bytes_downloaded == progress.bytes_total);
     874            0 :         if progress.layers_downloaded != progress.layers_total
     875            0 :             || progress.bytes_downloaded != progress.bytes_total
     876              :         {
     877            0 :             tracing::warn!("Correcting drift in progress stats ({progress:?})");
     878            0 :             progress.layers_downloaded = progress.layers_total;
     879            0 :             progress.bytes_downloaded = progress.bytes_total;
     880            0 :         }
     881              : 
     882            0 :         Ok(())
     883            0 :     }
     884              : 
     885              :     /// Do any fast local cleanup that comes before the much slower process of downloading
     886              :     /// layers from remote storage.  In the process, initialize the SecondaryProgress object
     887              :     /// that will later be updated incrementally as we download layers.
     888            0 :     async fn prepare_timelines(
     889            0 :         &self,
     890            0 :         heatmap: &HeatMapTenant,
     891            0 :         heatmap_mtime: SystemTime,
     892            0 :     ) -> Result<SecondaryProgress, UpdateError> {
     893            0 :         let heatmap_stats = heatmap.get_stats();
     894            0 :         // We will construct a progress object, and then populate its initial "downloaded" numbers
     895            0 :         // while iterating through local layer state in [`Self::prepare_timelines`]
     896            0 :         let mut progress = SecondaryProgress {
     897            0 :             layers_total: heatmap_stats.layers,
     898            0 :             bytes_total: heatmap_stats.bytes,
     899            0 :             heatmap_mtime: Some(serde_system_time::SystemTime(heatmap_mtime)),
     900            0 :             layers_downloaded: 0,
     901            0 :             bytes_downloaded: 0,
     902            0 :         };
     903            0 : 
     904            0 :         // Also expose heatmap bytes_total as a metric
     905            0 :         self.secondary_state
     906            0 :             .heatmap_total_size_metric
     907            0 :             .set(heatmap_stats.bytes);
     908            0 : 
     909            0 :         // Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
     910            0 :         let mut delete_layers = Vec::new();
     911            0 :         let mut delete_timelines = Vec::new();
     912            0 :         {
     913            0 :             let mut detail = self.secondary_state.detail.lock().unwrap();
     914            0 :             for (timeline_id, timeline_state) in &mut detail.timelines {
     915            0 :                 let Some(heatmap_timeline_index) = heatmap
     916            0 :                     .timelines
     917            0 :                     .iter()
     918            0 :                     .position(|t| t.timeline_id == *timeline_id)
     919              :                 else {
     920              :                     // This timeline is no longer referenced in the heatmap: delete it locally
     921            0 :                     delete_timelines.push(*timeline_id);
     922            0 :                     continue;
     923              :                 };
     924              : 
     925            0 :                 let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
     926            0 : 
     927            0 :                 let layers_in_heatmap = heatmap_timeline
     928            0 :                     .hot_layers()
     929            0 :                     .map(|l| (&l.name, l.metadata.generation))
     930            0 :                     .collect::<HashSet<_>>();
     931            0 :                 let layers_on_disk = timeline_state
     932            0 :                     .on_disk_layers
     933            0 :                     .iter()
     934            0 :                     .map(|l| (l.0, l.1.metadata.generation))
     935            0 :                     .collect::<HashSet<_>>();
     936            0 : 
     937            0 :                 let mut layer_count = layers_on_disk.len();
     938            0 :                 let mut layer_byte_count: u64 = timeline_state
     939            0 :                     .on_disk_layers
     940            0 :                     .values()
     941            0 :                     .map(|l| l.metadata.file_size)
     942            0 :                     .sum();
     943              : 
     944              :                 // Remove on-disk layers that are no longer present in heatmap
     945            0 :                 for (layer_file_name, generation) in layers_on_disk.difference(&layers_in_heatmap) {
     946            0 :                     layer_count -= 1;
     947            0 :                     layer_byte_count -= timeline_state
     948            0 :                         .on_disk_layers
     949            0 :                         .get(layer_file_name)
     950            0 :                         .unwrap()
     951            0 :                         .metadata
     952            0 :                         .file_size;
     953            0 : 
     954            0 :                     let local_path = local_layer_path(
     955            0 :                         self.conf,
     956            0 :                         self.secondary_state.get_tenant_shard_id(),
     957            0 :                         timeline_id,
     958            0 :                         layer_file_name,
     959            0 :                         generation,
     960            0 :                     );
     961            0 : 
     962            0 :                     delete_layers.push((*timeline_id, (*layer_file_name).clone(), local_path));
     963            0 :                 }
     964              : 
     965            0 :                 progress.bytes_downloaded += layer_byte_count;
     966            0 :                 progress.layers_downloaded += layer_count;
     967              :             }
     968              : 
     969            0 :             for delete_timeline in &delete_timelines {
     970            0 :                 // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
     971            0 :                 // from disk fails that will be a fatal error.
     972            0 :                 detail.remove_timeline(
     973            0 :                     self.secondary_state.get_tenant_shard_id(),
     974            0 :                     delete_timeline,
     975            0 :                     &self.secondary_state.resident_size_metric,
     976            0 :                 );
     977            0 :             }
     978              :         }
     979              : 
     980              :         // Execute accumulated deletions
     981            0 :         for (timeline_id, layer_name, local_path) in delete_layers {
     982            0 :             tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
     983              : 
     984            0 :             tokio::fs::remove_file(&local_path)
     985            0 :                 .await
     986            0 :                 .or_else(fs_ext::ignore_not_found)
     987            0 :                 .maybe_fatal_err("Removing secondary layer")?;
     988              : 
     989              :             // Update in-memory housekeeping to reflect the absence of the deleted layer
     990            0 :             let mut detail = self.secondary_state.detail.lock().unwrap();
     991            0 :             let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
     992            0 :                 continue;
     993              :             };
     994            0 :             timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric);
     995              :         }
     996              : 
     997            0 :         for timeline_id in delete_timelines {
     998            0 :             let timeline_path = self
     999            0 :                 .conf
    1000            0 :                 .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
    1001            0 :             tracing::info!(timeline_id=%timeline_id,
    1002            0 :                 "Timeline no longer in heatmap, removing from secondary location"
    1003              :             );
    1004            0 :             tokio::fs::remove_dir_all(&timeline_path)
    1005            0 :                 .await
    1006            0 :                 .or_else(fs_ext::ignore_not_found)
    1007            0 :                 .maybe_fatal_err("Removing secondary timeline")?;
    1008              :         }
    1009              : 
    1010            0 :         Ok(progress)
    1011            0 :     }
    1012              : 
    1013              :     /// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object
    1014              :     /// still matches `prev_etag`.
    1015            0 :     async fn download_heatmap(
    1016            0 :         &self,
    1017            0 :         prev_etag: Option<&Etag>,
    1018            0 :     ) -> Result<HeatMapDownload, UpdateError> {
    1019            0 :         debug_assert_current_span_has_tenant_id();
    1020            0 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
    1021            0 :         tracing::debug!("Downloading heatmap for secondary tenant",);
    1022              : 
    1023            0 :         let heatmap_path = remote_heatmap_path(tenant_shard_id);
    1024            0 :         let cancel = &self.secondary_state.cancel;
    1025            0 :         let opts = DownloadOpts {
    1026            0 :             etag: prev_etag.cloned(),
    1027            0 :             kind: DownloadKind::Small,
    1028            0 :             ..Default::default()
    1029            0 :         };
    1030            0 : 
    1031            0 :         backoff::retry(
    1032            0 :             || async {
    1033            0 :                 let download = match self
    1034            0 :                     .remote_storage
    1035            0 :                     .download(&heatmap_path, &opts, cancel)
    1036            0 :                     .await
    1037              :                 {
    1038            0 :                     Ok(download) => download,
    1039            0 :                     Err(DownloadError::Unmodified) => return Ok(HeatMapDownload::Unmodified),
    1040            0 :                     Err(err) => return Err(err.into()),
    1041              :                 };
    1042              : 
    1043            0 :                 let mut heatmap_bytes = Vec::new();
    1044            0 :                 let mut body = tokio_util::io::StreamReader::new(download.download_stream);
    1045            0 :                 let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
    1046            0 :                 Ok(HeatMapDownload::Modified(HeatMapModified {
    1047            0 :                     etag: download.etag,
    1048            0 :                     last_modified: download.last_modified,
    1049            0 :                     bytes: heatmap_bytes,
    1050            0 :                 }))
    1051            0 :             },
    1052            0 :             |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
    1053            0 :             FAILED_DOWNLOAD_WARN_THRESHOLD,
    1054            0 :             FAILED_REMOTE_OP_RETRIES,
    1055            0 :             "download heatmap",
    1056            0 :             cancel,
    1057            0 :         )
    1058            0 :         .await
    1059            0 :         .ok_or_else(|| UpdateError::Cancelled)
    1060            0 :         .and_then(|x| x)
    1061            0 :         .inspect(|_| SECONDARY_MODE.download_heatmap.inc())
    1062            0 :     }
    1063              : 
    1064              :     /// Download heatmap layers that are not present on local disk, or update their
    1065              :     /// access time if they are already present.
    1066            0 :     async fn download_timeline_layers(
    1067            0 :         &self,
    1068            0 :         tenant_shard_id: &TenantShardId,
    1069            0 :         timeline: HeatMapTimeline,
    1070            0 :         timeline_state: SecondaryDetailTimeline,
    1071            0 :         deadline: Instant,
    1072            0 :     ) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
    1073            0 :         // Accumulate updates to the state
    1074            0 :         let mut touched = Vec::new();
    1075            0 : 
    1076            0 :         let timeline_id = timeline.timeline_id;
    1077            0 :         for layer in timeline.into_hot_layers() {
    1078            0 :             if self.secondary_state.cancel.is_cancelled() {
    1079            0 :                 tracing::debug!("Cancelled -- dropping out of layer loop");
    1080            0 :                 return (Err(UpdateError::Cancelled), touched);
    1081            0 :             }
    1082            0 : 
    1083            0 :             if Instant::now() > deadline {
    1084              :                 // We've been running downloads for a while, restart to download latest heatmap.
    1085            0 :                 return (Err(UpdateError::Restart), touched);
    1086            0 :             }
    1087            0 : 
    1088            0 :             match self.layer_action(&timeline_state, &layer).await {
    1089            0 :                 LayerAction::Download => (),
    1090            0 :                 LayerAction::NoAction => continue,
    1091              :                 LayerAction::Skip => {
    1092            0 :                     self.skip_layer(layer);
    1093            0 :                     continue;
    1094              :                 }
    1095              :                 LayerAction::Touch => {
    1096            0 :                     touched.push(layer);
    1097            0 :                     continue;
    1098              :                 }
    1099              :             }
    1100              : 
    1101            0 :             match self
    1102            0 :                 .download_layer(
    1103            0 :                     tenant_shard_id,
    1104            0 :                     &timeline_id,
    1105            0 :                     layer,
    1106            0 :                     timeline_state.context(),
    1107            0 :                 )
    1108            0 :                 .await
    1109              :             {
    1110            0 :                 Ok(Some(layer)) => touched.push(layer),
    1111            0 :                 Ok(None) => {
    1112            0 :                     // Not an error but we didn't download it: remote layer is missing.  Don't add it to the list of
    1113            0 :                     // things to consider touched.
    1114            0 :                 }
    1115            0 :                 Err(e) => {
    1116            0 :                     return (Err(e), touched);
    1117              :                 }
    1118              :             }
    1119              :         }
    1120              : 
    1121            0 :         (Ok(()), touched)
    1122            0 :     }
    1123              : 
    1124            0 :     async fn layer_action(
    1125            0 :         &self,
    1126            0 :         timeline_state: &SecondaryDetailTimeline,
    1127            0 :         layer: &HeatMapLayer,
    1128            0 :     ) -> LayerAction {
    1129              :         // Existing on-disk layers: just update their access time.
    1130            0 :         if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
    1131            0 :             tracing::debug!("Layer {} is already on disk", layer.name);
    1132              : 
    1133            0 :             if cfg!(debug_assertions) {
    1134              :                 // Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
    1135              :                 // are already present on disk are really there.
    1136            0 :                 match tokio::fs::metadata(&on_disk.local_path).await {
    1137            0 :                     Ok(meta) => {
    1138            0 :                         tracing::debug!(
    1139            0 :                             "Layer {} present at {}, size {}",
    1140            0 :                             layer.name,
    1141            0 :                             on_disk.local_path,
    1142            0 :                             meta.len(),
    1143              :                         );
    1144              :                     }
    1145            0 :                     Err(e) => {
    1146            0 :                         tracing::warn!(
    1147            0 :                             "Layer {} not found at {} ({})",
    1148              :                             layer.name,
    1149              :                             on_disk.local_path,
    1150              :                             e
    1151              :                         );
    1152            0 :                         debug_assert!(false);
    1153              :                     }
    1154              :                 }
    1155            0 :             }
    1156              : 
    1157            0 :             if on_disk.metadata.generation_file_size() != layer.metadata.generation_file_size() {
    1158            0 :                 tracing::info!(
    1159            0 :                     "Re-downloading layer {} with changed size or generation: {:?}->{:?}",
    1160            0 :                     layer.name,
    1161            0 :                     on_disk.metadata.generation_file_size(),
    1162            0 :                     layer.metadata.generation_file_size()
    1163              :                 );
    1164            0 :                 return LayerAction::Download;
    1165            0 :             }
    1166            0 :             if on_disk.metadata != layer.metadata || on_disk.access_time != layer.access_time {
    1167              :                 // We already have this layer on disk.  Update its access time.
    1168            0 :                 tracing::debug!(
    1169            0 :                     "Access time updated for layer {}: {} -> {}",
    1170            0 :                     layer.name,
    1171            0 :                     strftime(&on_disk.access_time),
    1172            0 :                     strftime(&layer.access_time)
    1173              :                 );
    1174            0 :                 return LayerAction::Touch;
    1175            0 :             }
    1176            0 :             return LayerAction::NoAction;
    1177              :         } else {
    1178            0 :             tracing::debug!("Layer {} not present on disk yet", layer.name);
    1179              :         }
    1180              : 
    1181              :         // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
    1182              :         // recently than it was evicted.
    1183            0 :         if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
    1184            0 :             if &layer.access_time > evicted_at {
    1185            0 :                 tracing::info!(
    1186            0 :                     "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
    1187            0 :                     layer.name,
    1188            0 :                     strftime(&layer.access_time),
    1189            0 :                     strftime(evicted_at)
    1190              :                 );
    1191              :             } else {
    1192            0 :                 tracing::trace!(
    1193            0 :                     "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
    1194            0 :                     layer.name,
    1195            0 :                     strftime(&layer.access_time),
    1196            0 :                     strftime(evicted_at)
    1197              :                 );
    1198            0 :                 return LayerAction::Skip;
    1199              :             }
    1200            0 :         }
    1201            0 :         LayerAction::Download
    1202            0 :     }
    1203              : 
    1204            0 :     async fn download_timeline(
    1205            0 :         &self,
    1206            0 :         timeline: HeatMapTimeline,
    1207            0 :         timeline_state: SecondaryDetailTimeline,
    1208            0 :         deadline: Instant,
    1209            0 :         ctx: &RequestContext,
    1210            0 :     ) -> Result<(), UpdateError> {
    1211            0 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1212            0 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
    1213            0 :         let timeline_id = timeline.timeline_id;
    1214            0 : 
    1215            0 :         tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
    1216              : 
    1217            0 :         let (result, touched) = self
    1218            0 :             .download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline)
    1219            0 :             .await;
    1220              : 
    1221              :         // Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
    1222              :         {
    1223            0 :             let mut detail = self.secondary_state.detail.lock().unwrap();
    1224            0 :             let timeline_detail = detail.timelines.entry(timeline_id).or_insert_with(|| {
    1225            0 :                 let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline_id);
    1226            0 :                 SecondaryDetailTimeline::empty(ctx)
    1227            0 :             });
    1228            0 : 
    1229            0 :             tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
    1230            0 :             touched.into_iter().for_each(|t| {
    1231            0 :                 timeline_detail.touch_layer(
    1232            0 :                     self.conf,
    1233            0 :                     tenant_shard_id,
    1234            0 :                     &timeline_id,
    1235            0 :                     &t,
    1236            0 :                     &self.secondary_state.resident_size_metric,
    1237            0 :                     || {
    1238            0 :                         local_layer_path(
    1239            0 :                             self.conf,
    1240            0 :                             tenant_shard_id,
    1241            0 :                             &timeline_id,
    1242            0 :                             &t.name,
    1243            0 :                             &t.metadata.generation,
    1244            0 :                         )
    1245            0 :                     },
    1246            0 :                 )
    1247            0 :             });
    1248            0 :         }
    1249            0 : 
    1250            0 :         result
    1251            0 :     }
    1252              : 
    1253              :     /// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
    1254            0 :     fn skip_layer(&self, layer: HeatMapLayer) {
    1255            0 :         let mut progress = self.secondary_state.progress.lock().unwrap();
    1256            0 :         progress.layers_total = progress.layers_total.saturating_sub(1);
    1257            0 :         progress.bytes_total = progress
    1258            0 :             .bytes_total
    1259            0 :             .saturating_sub(layer.metadata.file_size);
    1260            0 :     }
    1261              : 
    1262            0 :     async fn download_layer(
    1263            0 :         &self,
    1264            0 :         tenant_shard_id: &TenantShardId,
    1265            0 :         timeline_id: &TimelineId,
    1266            0 :         layer: HeatMapLayer,
    1267            0 :         ctx: &RequestContext,
    1268            0 :     ) -> Result<Option<HeatMapLayer>, UpdateError> {
    1269            0 :         // Failpoints for simulating slow remote storage
    1270            0 :         failpoint_support::sleep_millis_async!(
    1271              :             "secondary-layer-download-sleep",
    1272            0 :             &self.secondary_state.cancel
    1273              :         );
    1274              : 
    1275            0 :         pausable_failpoint!("secondary-layer-download-pausable");
    1276              : 
    1277            0 :         let local_path = local_layer_path(
    1278            0 :             self.conf,
    1279            0 :             tenant_shard_id,
    1280            0 :             timeline_id,
    1281            0 :             &layer.name,
    1282            0 :             &layer.metadata.generation,
    1283            0 :         );
    1284            0 : 
    1285            0 :         // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
    1286            0 :         tracing::info!(
    1287            0 :             "Starting download of layer {}, size {}",
    1288              :             layer.name,
    1289              :             layer.metadata.file_size
    1290              :         );
    1291            0 :         let downloaded_bytes = download_layer_file(
    1292            0 :             self.conf,
    1293            0 :             self.remote_storage,
    1294            0 :             *tenant_shard_id,
    1295            0 :             *timeline_id,
    1296            0 :             &layer.name,
    1297            0 :             &layer.metadata,
    1298            0 :             &local_path,
    1299            0 :             &self.secondary_state.gate,
    1300            0 :             &self.secondary_state.cancel,
    1301            0 :             ctx,
    1302            0 :         )
    1303            0 :         .await;
    1304              : 
    1305            0 :         let downloaded_bytes = match downloaded_bytes {
    1306            0 :             Ok(bytes) => bytes,
    1307              :             Err(DownloadError::NotFound) => {
    1308              :                 // A heatmap might be out of date and refer to a layer that doesn't exist any more.
    1309              :                 // This is harmless: continue to download the next layer. It is expected during compaction
    1310              :                 // GC.
    1311            0 :                 tracing::debug!(
    1312            0 :                     "Skipped downloading missing layer {}, raced with compaction/gc?",
    1313              :                     layer.name
    1314              :                 );
    1315            0 :                 self.skip_layer(layer);
    1316            0 : 
    1317            0 :                 return Ok(None);
    1318              :             }
    1319            0 :             Err(e) => return Err(e.into()),
    1320              :         };
    1321              : 
    1322            0 :         if downloaded_bytes != layer.metadata.file_size {
    1323            0 :             let local_path = local_layer_path(
    1324            0 :                 self.conf,
    1325            0 :                 tenant_shard_id,
    1326            0 :                 timeline_id,
    1327            0 :                 &layer.name,
    1328            0 :                 &layer.metadata.generation,
    1329            0 :             );
    1330            0 : 
    1331            0 :             tracing::warn!(
    1332            0 :                 "Downloaded layer {} with unexpected size {} != {}.  Removing download.",
    1333              :                 layer.name,
    1334              :                 downloaded_bytes,
    1335              :                 layer.metadata.file_size
    1336              :             );
    1337              : 
    1338            0 :             tokio::fs::remove_file(&local_path)
    1339            0 :                 .await
    1340            0 :                 .or_else(fs_ext::ignore_not_found)?;
    1341              :         } else {
    1342            0 :             tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
    1343            0 :             let mut progress = self.secondary_state.progress.lock().unwrap();
    1344            0 :             progress.bytes_downloaded += downloaded_bytes;
    1345            0 :             progress.layers_downloaded += 1;
    1346              :         }
    1347              : 
    1348            0 :         SECONDARY_MODE.download_layer.inc();
    1349            0 : 
    1350            0 :         Ok(Some(layer))
    1351            0 :     }
    1352              : }
    1353              : 
    1354              : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
    1355            0 : async fn init_timeline_state(
    1356            0 :     conf: &'static PageServerConf,
    1357            0 :     tenant_shard_id: &TenantShardId,
    1358            0 :     last_heatmap: Option<&HeatMapTimeline>,
    1359            0 :     heatmap: &HeatMapTimeline,
    1360            0 :     resident_metric: &UIntGauge,
    1361            0 :     ctx: &RequestContext,
    1362            0 : ) -> SecondaryDetailTimeline {
    1363            0 :     let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &heatmap.timeline_id);
    1364            0 :     let mut detail = SecondaryDetailTimeline::empty(ctx);
    1365            0 : 
    1366            0 :     let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
    1367            0 :     let mut dir = match tokio::fs::read_dir(&timeline_path).await {
    1368            0 :         Ok(d) => d,
    1369            0 :         Err(e) => {
    1370            0 :             if e.kind() == std::io::ErrorKind::NotFound {
    1371            0 :                 let context = format!("Creating timeline directory {timeline_path}");
    1372            0 :                 tracing::info!("{}", context);
    1373            0 :                 tokio::fs::create_dir_all(&timeline_path)
    1374            0 :                     .await
    1375            0 :                     .fatal_err(&context);
    1376            0 : 
    1377            0 :                 // No entries to report: drop out.
    1378            0 :                 return detail;
    1379              :             } else {
    1380            0 :                 on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
    1381              :             }
    1382              :         }
    1383              :     };
    1384              : 
    1385              :     // As we iterate through layers found on disk, we will look up their metadata from this map.
    1386              :     // Layers not present in metadata will be discarded.
    1387            0 :     let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
    1388            0 :         heatmap.hot_layers().map(|l| (&l.name, l)).collect();
    1389              : 
    1390            0 :     let last_heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
    1391            0 :         if let Some(last_heatmap) = last_heatmap {
    1392            0 :             last_heatmap.hot_layers().map(|l| (&l.name, l)).collect()
    1393              :         } else {
    1394            0 :             HashMap::new()
    1395              :         };
    1396              : 
    1397            0 :     while let Some(dentry) = dir
    1398            0 :         .next_entry()
    1399            0 :         .await
    1400            0 :         .fatal_err(&format!("Listing {timeline_path}"))
    1401              :     {
    1402            0 :         let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
    1403            0 :             tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
    1404            0 :             continue;
    1405              :         };
    1406            0 :         let local_meta = dentry
    1407            0 :             .metadata()
    1408            0 :             .await
    1409            0 :             .fatal_err(&format!("Read metadata on {}", file_path));
    1410            0 : 
    1411            0 :         let file_name = file_path.file_name().expect("created it from the dentry");
    1412            0 :         if crate::is_temporary(&file_path)
    1413            0 :             || is_temp_download_file(&file_path)
    1414            0 :             || is_ephemeral_file(file_name)
    1415              :         {
    1416              :             // Temporary files are frequently left behind from restarting during downloads
    1417            0 :             tracing::info!("Cleaning up temporary file {file_path}");
    1418            0 :             if let Err(e) = tokio::fs::remove_file(&file_path)
    1419            0 :                 .await
    1420            0 :                 .or_else(fs_ext::ignore_not_found)
    1421              :             {
    1422            0 :                 tracing::error!("Failed to remove temporary file {file_path}: {e}");
    1423            0 :             }
    1424            0 :             continue;
    1425            0 :         }
    1426            0 : 
    1427            0 :         match LayerName::from_str(file_name) {
    1428            0 :             Ok(name) => {
    1429            0 :                 let remote_meta = heatmap_metadata.get(&name);
    1430            0 :                 let last_meta = last_heatmap_metadata.get(&name);
    1431            0 :                 let mut remove = false;
    1432            0 :                 match remote_meta {
    1433            0 :                     Some(remote_meta) => {
    1434            0 :                         let last_meta_generation_file_size = last_meta
    1435            0 :                             .map(|m| m.metadata.generation_file_size())
    1436            0 :                             .unwrap_or(remote_meta.metadata.generation_file_size());
    1437            0 :                         // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
    1438            0 :                         if remote_meta.metadata.generation_file_size()
    1439            0 :                             != last_meta_generation_file_size
    1440              :                         {
    1441            0 :                             tracing::info!(
    1442            0 :                                 "Removing local layer {name} as on-disk json metadata has different generation or file size from remote: {:?} -> {:?}",
    1443            0 :                                 last_meta_generation_file_size,
    1444            0 :                                 remote_meta.metadata.generation_file_size()
    1445              :                             );
    1446            0 :                             remove = true;
    1447            0 :                         } else if local_meta.len() != remote_meta.metadata.file_size {
    1448              :                             // This can happen in the presence of race conditions: the remote and on-disk metadata have changed, but we haven't had
    1449              :                             // the chance yet to download the new layer to disk, before the process restarted.
    1450            0 :                             tracing::info!(
    1451            0 :                                 "Removing local layer {name} with unexpected local size {} != {}",
    1452            0 :                                 local_meta.len(),
    1453              :                                 remote_meta.metadata.file_size
    1454              :                             );
    1455            0 :                             remove = true;
    1456            0 :                         } else {
    1457            0 :                             // We expect the access time to be initialized immediately afterwards, when
    1458            0 :                             // the latest heatmap is applied to the state.
    1459            0 :                             detail.touch_layer(
    1460            0 :                                 conf,
    1461            0 :                                 tenant_shard_id,
    1462            0 :                                 &heatmap.timeline_id,
    1463            0 :                                 remote_meta,
    1464            0 :                                 resident_metric,
    1465            0 :                                 || file_path,
    1466            0 :                             );
    1467            0 :                         }
    1468              :                     }
    1469              :                     None => {
    1470              :                         // FIXME: consider some optimization when transitioning from attached to secondary: maybe
    1471              :                         // wait until we have seen a heatmap that is more recent than the most recent on-disk state?  Otherwise
    1472              :                         // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
    1473            0 :                         tracing::info!(
    1474            0 :                             "Removing secondary local layer {} because it's absent in heatmap",
    1475              :                             name
    1476              :                         );
    1477            0 :                         remove = true;
    1478              :                     }
    1479              :                 }
    1480            0 :                 if remove {
    1481            0 :                     tokio::fs::remove_file(&dentry.path())
    1482            0 :                         .await
    1483            0 :                         .or_else(fs_ext::ignore_not_found)
    1484            0 :                         .fatal_err(&format!(
    1485            0 :                             "Removing layer {}",
    1486            0 :                             dentry.path().to_string_lossy()
    1487            0 :                         ));
    1488            0 :                 }
    1489              :             }
    1490              :             Err(_) => {
    1491              :                 // Ignore it.
    1492            0 :                 tracing::warn!("Unexpected file in timeline directory: {file_name}");
    1493              :             }
    1494              :         }
    1495              :     }
    1496              : 
    1497            0 :     detail
    1498            0 : }
    1499              : 
    1500              : /// Loads a json-encoded heatmap file from the provided on-disk path
    1501            0 : async fn load_heatmap(
    1502            0 :     path: &Utf8PathBuf,
    1503            0 :     ctx: &RequestContext,
    1504            0 : ) -> Result<Option<HeatMapTenant>, anyhow::Error> {
    1505            0 :     let mut file = match VirtualFile::open(path, ctx).await {
    1506            0 :         Ok(file) => file,
    1507            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
    1508            0 :         Err(e) => Err(e)?,
    1509              :     };
    1510            0 :     let st = file.read_to_string(ctx).await?;
    1511            0 :     let htm = serde_json::from_str(&st)?;
    1512            0 :     Ok(Some(htm))
    1513            0 : }
        

Generated by: LCOV version 2.1-beta