LCOV - code coverage report
Current view: top level - pageserver/src/tenant/secondary - downloader.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 80.7 % 517 417
Test Date: 2024-02-12 20:26:03 Functions: 60.0 % 75 45

            Line data    Source code
       1              : use std::{
       2              :     collections::{HashMap, HashSet},
       3              :     pin::Pin,
       4              :     str::FromStr,
       5              :     sync::Arc,
       6              :     time::{Duration, Instant, SystemTime},
       7              : };
       8              : 
       9              : use crate::{
      10              :     config::PageServerConf,
      11              :     disk_usage_eviction_task::{
      12              :         finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
      13              :     },
      14              :     metrics::SECONDARY_MODE,
      15              :     tenant::{
      16              :         config::SecondaryLocationConfig,
      17              :         debug_assert_current_span_has_tenant_and_timeline_id,
      18              :         remote_timeline_client::{
      19              :             index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
      20              :         },
      21              :         span::debug_assert_current_span_has_tenant_id,
      22              :         storage_layer::LayerFileName,
      23              :         tasks::{warn_when_period_overrun, BackgroundLoopKind},
      24              :     },
      25              :     virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
      26              :     METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
      27              : };
      28              : 
      29              : use super::{
      30              :     heatmap::HeatMapLayer,
      31              :     scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
      32              :     SecondaryTenant,
      33              : };
      34              : 
      35              : use crate::tenant::{
      36              :     mgr::TenantManager,
      37              :     remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
      38              : };
      39              : 
      40              : use chrono::format::{DelayedFormat, StrftimeItems};
      41              : use futures::Future;
      42              : use pageserver_api::shard::TenantShardId;
      43              : use rand::Rng;
      44              : use remote_storage::{DownloadError, GenericRemoteStorage};
      45              : 
      46              : use tokio_util::sync::CancellationToken;
      47              : use tracing::{info_span, instrument, Instrument};
      48              : use utils::{
      49              :     backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
      50              : };
      51              : 
      52              : use super::{
      53              :     heatmap::{HeatMapTenant, HeatMapTimeline},
      54              :     CommandRequest, DownloadCommand,
      55              : };
      56              : 
      57              : /// For each tenant, how long must have passed since the last download_tenant call before
      58              : /// calling it again.  This is approximately the time by which local data is allowed
      59              : /// to fall behind remote data.
      60              : ///
      61              : /// TODO: this should just be a default, and the actual period should be controlled
      62              : /// via the heatmap itself
      63              : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
      64              : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
      65              : 
      66          624 : pub(super) async fn downloader_task(
      67          624 :     tenant_manager: Arc<TenantManager>,
      68          624 :     remote_storage: GenericRemoteStorage,
      69          624 :     command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
      70          624 :     background_jobs_can_start: Barrier,
      71          624 :     cancel: CancellationToken,
      72          624 : ) {
      73          624 :     let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
      74          624 : 
      75          624 :     let generator = SecondaryDownloader {
      76          624 :         tenant_manager,
      77          624 :         remote_storage,
      78          624 :     };
      79          624 :     let mut scheduler = Scheduler::new(generator, concurrency);
      80          624 : 
      81          624 :     scheduler
      82          624 :         .run(command_queue, background_jobs_can_start, cancel)
      83          624 :         .instrument(info_span!("secondary_downloads"))
      84          971 :         .await
      85          182 : }
      86              : 
      87              : struct SecondaryDownloader {
      88              :     tenant_manager: Arc<TenantManager>,
      89              :     remote_storage: GenericRemoteStorage,
      90              : }
      91              : 
      92         1261 : #[derive(Debug, Clone)]
      93              : pub(super) struct OnDiskState {
      94              :     metadata: LayerFileMetadata,
      95              :     access_time: SystemTime,
      96              : }
      97              : 
      98              : impl OnDiskState {
      99         1300 :     fn new(
     100         1300 :         _conf: &'static PageServerConf,
     101         1300 :         _tenant_shard_id: &TenantShardId,
     102         1300 :         _imeline_id: &TimelineId,
     103         1300 :         _ame: LayerFileName,
     104         1300 :         metadata: LayerFileMetadata,
     105         1300 :         access_time: SystemTime,
     106         1300 :     ) -> Self {
     107         1300 :         Self {
     108         1300 :             metadata,
     109         1300 :             access_time,
     110         1300 :         }
     111         1300 :     }
     112              : }
     113              : 
     114            8 : #[derive(Debug, Clone, Default)]
     115              : pub(super) struct SecondaryDetailTimeline {
     116              :     pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
     117              : 
     118              :     /// We remember when layers were evicted, to prevent re-downloading them.
     119              :     pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
     120              : }
     121              : 
     122              : /// This state is written by the secondary downloader, it is opaque
     123              : /// to TenantManager
     124            0 : #[derive(Debug)]
     125              : pub(super) struct SecondaryDetail {
     126              :     pub(super) config: SecondaryLocationConfig,
     127              : 
     128              :     last_download: Option<Instant>,
     129              :     next_download: Option<Instant>,
     130              :     pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
     131              : }
     132              : 
     133              : /// Helper for logging SystemTime
     134            0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
     135            0 :     let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
     136            0 :     datetime.format("%d/%m/%Y %T")
     137            0 : }
     138              : 
     139              : impl SecondaryDetail {
     140           35 :     pub(super) fn new(config: SecondaryLocationConfig) -> Self {
     141           35 :         Self {
     142           35 :             config,
     143           35 :             last_download: None,
     144           35 :             next_download: None,
     145           35 :             timelines: HashMap::new(),
     146           35 :         }
     147           35 :     }
     148              : 
     149              :     /// Additionally returns the total number of layers, used for more stable relative access time
     150              :     /// based eviction.
     151            2 :     pub(super) fn get_layers_for_eviction(
     152            2 :         &self,
     153            2 :         parent: &Arc<SecondaryTenant>,
     154            2 :     ) -> (DiskUsageEvictionInfo, usize) {
     155            2 :         let mut result = DiskUsageEvictionInfo::default();
     156            2 :         let mut total_layers = 0;
     157              : 
     158            4 :         for (timeline_id, timeline_detail) in &self.timelines {
     159            2 :             result
     160            2 :                 .resident_layers
     161           38 :                 .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
     162           38 :                     EvictionCandidate {
     163           38 :                         layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
     164           38 :                             secondary_tenant: parent.clone(),
     165           38 :                             timeline_id: *timeline_id,
     166           38 :                             name: name.clone(),
     167           38 :                             metadata: ods.metadata.clone(),
     168           38 :                         }),
     169           38 :                         last_activity_ts: ods.access_time,
     170           38 :                         relative_last_activity: finite_f32::FiniteF32::ZERO,
     171           38 :                     }
     172           38 :                 }));
     173            2 : 
     174            2 :             // total might be missing currently downloading layers, but as a lower than actual
     175            2 :             // value it is good enough approximation.
     176            2 :             total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
     177            2 :         }
     178            2 :         result.max_layer_size = result
     179            2 :             .resident_layers
     180            2 :             .iter()
     181           38 :             .map(|l| l.layer.get_file_size())
     182            2 :             .max();
     183            2 : 
     184            2 :         tracing::debug!(
     185            0 :             "eviction: secondary tenant {} found {} timelines, {} layers",
     186            0 :             parent.get_tenant_shard_id(),
     187            0 :             self.timelines.len(),
     188            0 :             result.resident_layers.len()
     189            0 :         );
     190              : 
     191            2 :         (result, total_layers)
     192            2 :     }
     193              : }
     194              : 
     195              : struct PendingDownload {
     196              :     secondary_state: Arc<SecondaryTenant>,
     197              :     last_download: Option<Instant>,
     198              :     target_time: Option<Instant>,
     199              :     period: Option<Duration>,
     200              : }
     201              : 
     202              : impl scheduler::PendingJob for PendingDownload {
     203           28 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     204           28 :         self.secondary_state.get_tenant_shard_id()
     205           28 :     }
     206              : }
     207              : 
     208              : struct RunningDownload {
     209              :     barrier: Barrier,
     210              : }
     211              : 
     212              : impl scheduler::RunningJob for RunningDownload {
     213            6 :     fn get_barrier(&self) -> Barrier {
     214            6 :         self.barrier.clone()
     215            6 :     }
     216              : }
     217              : 
     218              : struct CompleteDownload {
     219              :     secondary_state: Arc<SecondaryTenant>,
     220              :     completed_at: Instant,
     221              : }
     222              : 
     223              : impl scheduler::Completion for CompleteDownload {
     224           30 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     225           30 :         self.secondary_state.get_tenant_shard_id()
     226           30 :     }
     227              : }
     228              : 
     229              : type Scheduler = TenantBackgroundJobs<
     230              :     SecondaryDownloader,
     231              :     PendingDownload,
     232              :     RunningDownload,
     233              :     CompleteDownload,
     234              :     DownloadCommand,
     235              : >;
     236              : 
     237              : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
     238              :     for SecondaryDownloader
     239              : {
     240           10 :     #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
     241              :     fn on_completion(&mut self, completion: CompleteDownload) {
     242              :         let CompleteDownload {
     243              :             secondary_state,
     244              :             completed_at: _completed_at,
     245              :         } = completion;
     246              : 
     247            0 :         tracing::debug!("Secondary tenant download completed");
     248              : 
     249              :         // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
     250              :         // take priority to run again.
     251              :         let mut detail = secondary_state.detail.lock().unwrap();
     252              :         detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
     253              :     }
     254              : 
     255         1198 :     async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
     256         1198 :         let mut result = SchedulingResult {
     257         1198 :             jobs: Vec::new(),
     258         1198 :             want_interval: None,
     259         1198 :         };
     260         1198 : 
     261         1198 :         // Step 1: identify some tenants that we may work on
     262         1198 :         let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
     263         1198 :         self.tenant_manager
     264         1198 :             .foreach_secondary_tenants(|_id, secondary_state| {
     265           10 :                 tenants.push(secondary_state.clone());
     266         1198 :             });
     267         1198 : 
     268         1198 :         // Step 2: filter out tenants which are not yet elegible to run
     269         1198 :         let now = Instant::now();
     270         1198 :         result.jobs = tenants
     271         1198 :             .into_iter()
     272         1198 :             .filter_map(|secondary_tenant| {
     273            6 :                 let (last_download, next_download) = {
     274           10 :                     let mut detail = secondary_tenant.detail.lock().unwrap();
     275           10 : 
     276           10 :                     if !detail.config.warm {
     277              :                         // Downloads are disabled for this tenant
     278            4 :                         detail.next_download = None;
     279            4 :                         return None;
     280            6 :                     }
     281            6 : 
     282            6 :                     if detail.next_download.is_none() {
     283            6 :                         // Initialize with a jitter: this spreads initial downloads on startup
     284            6 :                         // or mass-attach across our freshen interval.
     285            6 :                         let jittered_period =
     286            6 :                             rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
     287            6 :                         detail.next_download = Some(now.checked_add(jittered_period).expect(
     288            6 :                         "Using our constant, which is known to be small compared with clock range",
     289            6 :                     ));
     290            6 :                     }
     291            6 :                     (detail.last_download, detail.next_download.unwrap())
     292            6 :                 };
     293            6 : 
     294            6 :                 if now < next_download {
     295            6 :                     Some(PendingDownload {
     296            6 :                         secondary_state: secondary_tenant,
     297            6 :                         last_download,
     298            6 :                         target_time: Some(next_download),
     299            6 :                         period: Some(DOWNLOAD_FRESHEN_INTERVAL),
     300            6 :                     })
     301              :                 } else {
     302            0 :                     None
     303              :                 }
     304         1198 :             })
     305         1198 :             .collect();
     306         1198 : 
     307         1198 :         // Step 3: sort by target execution time to run most urgent first.
     308         1198 :         result.jobs.sort_by_key(|j| j.target_time);
     309         1198 : 
     310         1198 :         result
     311         1198 :     }
     312              : 
     313            6 :     fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
     314            6 :         let tenant_shard_id = command.get_tenant_shard_id();
     315            6 : 
     316            6 :         let tenant = self
     317            6 :             .tenant_manager
     318            6 :             .get_secondary_tenant_shard(*tenant_shard_id);
     319            6 :         let Some(tenant) = tenant else {
     320            0 :             return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
     321              :         };
     322              : 
     323            6 :         Ok(PendingDownload {
     324            6 :             target_time: None,
     325            6 :             period: None,
     326            6 :             last_download: None,
     327            6 :             secondary_state: tenant,
     328            6 :         })
     329            6 :     }
     330              : 
     331           10 :     fn spawn(
     332           10 :         &mut self,
     333           10 :         job: PendingDownload,
     334           10 :     ) -> (
     335           10 :         RunningDownload,
     336           10 :         Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
     337           10 :     ) {
     338           10 :         let PendingDownload {
     339           10 :             secondary_state,
     340           10 :             last_download,
     341           10 :             target_time,
     342           10 :             period,
     343           10 :         } = job;
     344           10 : 
     345           10 :         let (completion, barrier) = utils::completion::channel();
     346           10 :         let remote_storage = self.remote_storage.clone();
     347           10 :         let conf = self.tenant_manager.get_conf();
     348           10 :         let tenant_shard_id = *secondary_state.get_tenant_shard_id();
     349           10 :         (RunningDownload { barrier }, Box::pin(async move {
     350           10 :             let _completion = completion;
     351           10 : 
     352           10 :             match TenantDownloader::new(conf, &remote_storage, &secondary_state)
     353           10 :                 .download()
     354        63857 :                 .await
     355              :             {
     356              :                 Err(UpdateError::NoData) => {
     357            2 :                     tracing::info!("No heatmap found for tenant.  This is fine if it is new.");
     358              :                 },
     359              :                 Err(UpdateError::NoSpace) => {
     360            0 :                     tracing::warn!("Insufficient space while downloading.  Will retry later.");
     361              :                 }
     362              :                 Err(UpdateError::Cancelled) => {
     363            0 :                     tracing::debug!("Shut down while downloading");
     364              :                 },
     365            0 :                 Err(UpdateError::Deserialize(e)) => {
     366            0 :                     tracing::error!("Corrupt content while downloading tenant: {e}");
     367              :                 },
     368            0 :                 Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
     369            0 :                     tracing::error!("Error while downloading tenant: {e}");
     370              :                 },
     371            8 :                 Ok(()) => {}
     372              :             };
     373              : 
     374              :             // Irrespective of the result, we will reschedule ourselves to run after our usual period.
     375              : 
     376              :             // If the job had a target execution time, we may check our final execution
     377              :             // time against that for observability purposes.
     378           10 :             if let (Some(target_time), Some(period)) = (target_time, period) {
     379              :                 // Only track execution lag if this isn't our first download: otherwise, it is expected
     380              :                 // that execution will have taken longer than our configured interval, for example
     381              :                 // when starting up a pageserver and
     382            4 :                 if last_download.is_some() {
     383            0 :                     // Elapsed time includes any scheduling lag as well as the execution of the job
     384            0 :                     let elapsed = Instant::now().duration_since(target_time);
     385            0 : 
     386            0 :                     warn_when_period_overrun(
     387            0 :                         elapsed,
     388            0 :                         period,
     389            0 :                         BackgroundLoopKind::SecondaryDownload,
     390            0 :                     );
     391            4 :                 }
     392            6 :             }
     393              : 
     394           10 :             CompleteDownload {
     395           10 :                 secondary_state,
     396           10 :                 completed_at: Instant::now(),
     397           10 :             }
     398           10 :         }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
     399           10 :     }
     400              : }
     401              : 
     402              : /// This type is a convenience to group together the various functions involved in
     403              : /// freshening a secondary tenant.
     404              : struct TenantDownloader<'a> {
     405              :     conf: &'static PageServerConf,
     406              :     remote_storage: &'a GenericRemoteStorage,
     407              :     secondary_state: &'a SecondaryTenant,
     408              : }
     409              : 
     410              : /// Errors that may be encountered while updating a tenant
     411            0 : #[derive(thiserror::Error, Debug)]
     412              : enum UpdateError {
     413              :     #[error("No remote data found")]
     414              :     NoData,
     415              :     #[error("Insufficient local storage space")]
     416              :     NoSpace,
     417              :     #[error("Failed to download")]
     418              :     DownloadError(DownloadError),
     419              :     #[error(transparent)]
     420              :     Deserialize(#[from] serde_json::Error),
     421              :     #[error("Cancelled")]
     422              :     Cancelled,
     423              :     #[error(transparent)]
     424              :     Other(#[from] anyhow::Error),
     425              : }
     426              : 
     427              : impl From<DownloadError> for UpdateError {
     428            2 :     fn from(value: DownloadError) -> Self {
     429            2 :         match &value {
     430            0 :             DownloadError::Cancelled => Self::Cancelled,
     431            2 :             DownloadError::NotFound => Self::NoData,
     432            0 :             _ => Self::DownloadError(value),
     433              :         }
     434            2 :     }
     435              : }
     436              : 
     437              : impl From<std::io::Error> for UpdateError {
     438            0 :     fn from(value: std::io::Error) -> Self {
     439            0 :         if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
     440            0 :             UpdateError::NoSpace
     441              :         } else {
     442              :             // An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
     443            0 :             UpdateError::Other(anyhow::anyhow!(value))
     444              :         }
     445            0 :     }
     446              : }
     447              : 
     448              : impl<'a> TenantDownloader<'a> {
     449           10 :     fn new(
     450           10 :         conf: &'static PageServerConf,
     451           10 :         remote_storage: &'a GenericRemoteStorage,
     452           10 :         secondary_state: &'a SecondaryTenant,
     453           10 :     ) -> Self {
     454           10 :         Self {
     455           10 :             conf,
     456           10 :             remote_storage,
     457           10 :             secondary_state,
     458           10 :         }
     459           10 :     }
     460              : 
     461           10 :     async fn download(&self) -> Result<(), UpdateError> {
     462           10 :         debug_assert_current_span_has_tenant_id();
     463              : 
     464              :         // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
     465              :         // cover our access to local storage.
     466           10 :         let Ok(_guard) = self.secondary_state.gate.enter() else {
     467              :             // Shutting down
     468            0 :             return Ok(());
     469              :         };
     470              : 
     471           10 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
     472              :         // Download the tenant's heatmap
     473           10 :         let heatmap_bytes = tokio::select!(
     474           10 :             bytes = self.download_heatmap() => {bytes?},
     475              :             _ = self.secondary_state.cancel.cancelled() => return Ok(())
     476              :         );
     477              : 
     478            8 :         let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
     479              : 
     480              :         // Save the heatmap: this will be useful on restart, allowing us to reconstruct
     481              :         // layer metadata without having to re-download it.
     482            8 :         let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
     483            8 : 
     484            8 :         let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
     485            8 :         let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
     486            8 :         let heatmap_path_bg = heatmap_path.clone();
     487            8 :         tokio::task::spawn_blocking(move || {
     488            8 :             tokio::runtime::Handle::current().block_on(async move {
     489            8 :                 VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
     490            8 :             })
     491            8 :         })
     492            8 :         .await
     493            8 :         .expect("Blocking task is never aborted")
     494            8 :         .maybe_fatal_err(&context_msg)?;
     495              : 
     496            0 :         tracing::debug!("Wrote local heatmap to {}", heatmap_path);
     497              : 
     498              :         // Download the layers in the heatmap
     499           16 :         for timeline in heatmap.timelines {
     500            8 :             if self.secondary_state.cancel.is_cancelled() {
     501            0 :                 return Ok(());
     502            8 :             }
     503            8 : 
     504            8 :             let timeline_id = timeline.timeline_id;
     505            8 :             self.download_timeline(timeline)
     506              :                 .instrument(tracing::info_span!(
     507              :                     "secondary_download_timeline",
     508              :                     tenant_id=%tenant_shard_id.tenant_id,
     509            8 :                     shard_id=%tenant_shard_id.shard_slug(),
     510              :                     %timeline_id
     511              :                 ))
     512        63799 :                 .await?;
     513              :         }
     514              : 
     515            8 :         Ok(())
     516           10 :     }
     517              : 
     518           10 :     async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
     519           10 :         debug_assert_current_span_has_tenant_id();
     520           10 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
     521              :         // TODO: make download conditional on ETag having changed since last download
     522              :         // (https://github.com/neondatabase/neon/issues/6199)
     523            0 :         tracing::debug!("Downloading heatmap for secondary tenant",);
     524              : 
     525           10 :         let heatmap_path = remote_heatmap_path(tenant_shard_id);
     526              : 
     527           10 :         let heatmap_bytes = backoff::retry(
     528           10 :             || async {
     529           10 :                 let download = self
     530           10 :                     .remote_storage
     531           10 :                     .download(&heatmap_path)
     532           24 :                     .await
     533           10 :                     .map_err(UpdateError::from)?;
     534            8 :                 let mut heatmap_bytes = Vec::new();
     535            8 :                 let mut body = tokio_util::io::StreamReader::new(download.download_stream);
     536           26 :                 let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
     537            8 :                 Ok(heatmap_bytes)
     538           20 :             },
     539           10 :             |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
     540           10 :             FAILED_DOWNLOAD_WARN_THRESHOLD,
     541           10 :             FAILED_REMOTE_OP_RETRIES,
     542           10 :             "download heatmap",
     543           10 :             &self.secondary_state.cancel,
     544           10 :         )
     545           50 :         .await
     546           10 :         .ok_or_else(|| UpdateError::Cancelled)
     547           10 :         .and_then(|x| x)?;
     548              : 
     549            8 :         SECONDARY_MODE.download_heatmap.inc();
     550            8 : 
     551            8 :         Ok(heatmap_bytes)
     552           10 :     }
     553              : 
     554            8 :     async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
     555            8 :         debug_assert_current_span_has_tenant_and_timeline_id();
     556            8 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
     557            8 :         let timeline_path = self
     558            8 :             .conf
     559            8 :             .timeline_path(tenant_shard_id, &timeline.timeline_id);
     560            8 : 
     561            8 :         // Accumulate updates to the state
     562            8 :         let mut touched = Vec::new();
     563            8 : 
     564            8 :         // Clone a view of what layers already exist on disk
     565            8 :         let timeline_state = self
     566            8 :             .secondary_state
     567            8 :             .detail
     568            8 :             .lock()
     569            8 :             .unwrap()
     570            8 :             .timelines
     571            8 :             .get(&timeline.timeline_id)
     572            8 :             .cloned();
     573              : 
     574            8 :         let timeline_state = match timeline_state {
     575            2 :             Some(t) => t,
     576              :             None => {
     577              :                 // We have no existing state: need to scan local disk for layers first.
     578            6 :                 let timeline_state =
     579           33 :                     init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
     580              : 
     581              :                 // Re-acquire detail lock now that we're done with async load from local FS
     582            6 :                 self.secondary_state
     583            6 :                     .detail
     584            6 :                     .lock()
     585            6 :                     .unwrap()
     586            6 :                     .timelines
     587            6 :                     .insert(timeline.timeline_id, timeline_state.clone());
     588            6 :                 timeline_state
     589              :             }
     590              :         };
     591              : 
     592            8 :         let layers_in_heatmap = timeline
     593            8 :             .layers
     594            8 :             .iter()
     595         2522 :             .map(|l| &l.name)
     596            8 :             .collect::<HashSet<_>>();
     597            8 :         let layers_on_disk = timeline_state
     598            8 :             .on_disk_layers
     599            8 :             .iter()
     600         1261 :             .map(|l| l.0)
     601            8 :             .collect::<HashSet<_>>();
     602              : 
     603              :         // Remove on-disk layers that are no longer present in heatmap
     604            8 :         for layer in layers_on_disk.difference(&layers_in_heatmap) {
     605            1 :             let local_path = timeline_path.join(layer.to_string());
     606            1 :             tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
     607            1 :             tokio::fs::remove_file(&local_path)
     608            1 :                 .await
     609            1 :                 .or_else(fs_ext::ignore_not_found)
     610            1 :                 .maybe_fatal_err("Removing secondary layer")?;
     611              :         }
     612              : 
     613              :         // Download heatmap layers that are not present on local disk, or update their
     614              :         // access time if they are already present.
     615         2530 :         for layer in timeline.layers {
     616         2522 :             if self.secondary_state.cancel.is_cancelled() {
     617            0 :                 return Ok(());
     618         2522 :             }
     619              : 
     620              :             // Existing on-disk layers: just update their access time.
     621         2522 :             if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
     622            0 :                 tracing::debug!("Layer {} is already on disk", layer.name);
     623         1260 :                 if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
     624         1260 :                     || on_disk.access_time != layer.access_time
     625              :                 {
     626              :                     // We already have this layer on disk.  Update its access time.
     627            0 :                     tracing::debug!(
     628            0 :                         "Access time updated for layer {}: {} -> {}",
     629            0 :                         layer.name,
     630            0 :                         strftime(&on_disk.access_time),
     631            0 :                         strftime(&layer.access_time)
     632            0 :                     );
     633          199 :                     touched.push(layer);
     634         1061 :                 }
     635         1260 :                 continue;
     636              :             } else {
     637            0 :                 tracing::debug!("Layer {} not present on disk yet", layer.name);
     638              :             }
     639              : 
     640              :             // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
     641              :             // recently than it was evicted.
     642         1262 :             if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
     643            0 :                 if &layer.access_time > evicted_at {
     644            0 :                     tracing::info!(
     645            0 :                         "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
     646            0 :                         layer.name,
     647            0 :                         strftime(&layer.access_time),
     648            0 :                         strftime(evicted_at)
     649            0 :                     );
     650              :                 } else {
     651            0 :                     tracing::trace!(
     652            0 :                         "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
     653            0 :                         layer.name,
     654            0 :                         strftime(&layer.access_time),
     655            0 :                         strftime(evicted_at)
     656            0 :                     );
     657            0 :                     continue;
     658              :                 }
     659         1262 :             }
     660              : 
     661              :             // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
     662         1262 :             let downloaded_bytes = match download_layer_file(
     663         1262 :                 self.conf,
     664         1262 :                 self.remote_storage,
     665         1262 :                 *tenant_shard_id,
     666         1262 :                 timeline.timeline_id,
     667         1262 :                 &layer.name,
     668         1262 :                 &LayerFileMetadata::from(&layer.metadata),
     669         1262 :                 &self.secondary_state.cancel,
     670         1262 :             )
     671        63765 :             .await
     672              :             {
     673         1262 :                 Ok(bytes) => bytes,
     674            0 :                 Err(e) => {
     675            0 :                     if let DownloadError::NotFound = e {
     676              :                         // A heatmap might be out of date and refer to a layer that doesn't exist any more.
     677              :                         // This is harmless: continue to download the next layer. It is expected during compaction
     678              :                         // GC.
     679            0 :                         tracing::debug!(
     680            0 :                             "Skipped downloading missing layer {}, raced with compaction/gc?",
     681            0 :                             layer.name
     682            0 :                         );
     683            0 :                         continue;
     684              :                     } else {
     685            0 :                         return Err(e.into());
     686              :                     }
     687              :                 }
     688              :             };
     689              : 
     690         1262 :             if downloaded_bytes != layer.metadata.file_size {
     691            0 :                 let local_path = timeline_path.join(layer.name.to_string());
     692              : 
     693            0 :                 tracing::warn!(
     694            0 :                     "Downloaded layer {} with unexpected size {} != {}.  Removing download.",
     695            0 :                     layer.name,
     696            0 :                     downloaded_bytes,
     697            0 :                     layer.metadata.file_size
     698            0 :                 );
     699              : 
     700            0 :                 tokio::fs::remove_file(&local_path)
     701            0 :                     .await
     702            0 :                     .or_else(fs_ext::ignore_not_found)?;
     703         1262 :             }
     704              : 
     705         1262 :             SECONDARY_MODE.download_layer.inc();
     706         1262 :             touched.push(layer)
     707              :         }
     708              : 
     709              :         // Write updates to state to record layers we just downloaded or touched.
     710              :         {
     711            8 :             let mut detail = self.secondary_state.detail.lock().unwrap();
     712            8 :             let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
     713              : 
     714            8 :             tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
     715              : 
     716         1469 :             for t in touched {
     717              :                 use std::collections::hash_map::Entry;
     718         1461 :                 match timeline_detail.on_disk_layers.entry(t.name.clone()) {
     719          199 :                     Entry::Occupied(mut v) => {
     720          199 :                         v.get_mut().access_time = t.access_time;
     721          199 :                     }
     722         1262 :                     Entry::Vacant(e) => {
     723         1262 :                         e.insert(OnDiskState::new(
     724         1262 :                             self.conf,
     725         1262 :                             tenant_shard_id,
     726         1262 :                             &timeline.timeline_id,
     727         1262 :                             t.name,
     728         1262 :                             LayerFileMetadata::from(&t.metadata),
     729         1262 :                             t.access_time,
     730         1262 :                         ));
     731         1262 :                     }
     732              :                 }
     733              :             }
     734              :         }
     735              : 
     736            8 :         Ok(())
     737            8 :     }
     738              : }
     739              : 
     740              : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
     741            6 : async fn init_timeline_state(
     742            6 :     conf: &'static PageServerConf,
     743            6 :     tenant_shard_id: &TenantShardId,
     744            6 :     heatmap: &HeatMapTimeline,
     745            6 : ) -> SecondaryDetailTimeline {
     746            6 :     let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
     747            6 :     let mut detail = SecondaryDetailTimeline::default();
     748              : 
     749            6 :     let mut dir = match tokio::fs::read_dir(&timeline_path).await {
     750            2 :         Ok(d) => d,
     751            4 :         Err(e) => {
     752            4 :             if e.kind() == std::io::ErrorKind::NotFound {
     753            4 :                 let context = format!("Creating timeline directory {timeline_path}");
     754            4 :                 tracing::info!("{}", context);
     755            4 :                 tokio::fs::create_dir_all(&timeline_path)
     756            4 :                     .await
     757            4 :                     .fatal_err(&context);
     758            4 : 
     759            4 :                 // No entries to report: drop out.
     760            4 :                 return detail;
     761              :             } else {
     762            0 :                 on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
     763              :             }
     764              :         }
     765              :     };
     766              : 
     767              :     // As we iterate through layers found on disk, we will look up their metadata from this map.
     768              :     // Layers not present in metadata will be discarded.
     769            2 :     let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
     770           38 :         heatmap.layers.iter().map(|l| (&l.name, l)).collect();
     771              : 
     772           40 :     while let Some(dentry) = dir
     773           40 :         .next_entry()
     774            0 :         .await
     775           40 :         .fatal_err(&format!("Listing {timeline_path}"))
     776              :     {
     777           38 :         let dentry_file_name = dentry.file_name();
     778           38 :         let file_name = dentry_file_name.to_string_lossy();
     779           38 :         let local_meta = dentry.metadata().await.fatal_err(&format!(
     780           38 :             "Read metadata on {}",
     781           38 :             dentry.path().to_string_lossy()
     782           38 :         ));
     783           38 : 
     784           38 :         // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
     785           38 :         if file_name == METADATA_FILE_NAME {
     786            0 :             continue;
     787           38 :         }
     788           38 : 
     789           38 :         match LayerFileName::from_str(&file_name) {
     790           38 :             Ok(name) => {
     791           38 :                 let remote_meta = heatmap_metadata.get(&name);
     792           38 :                 match remote_meta {
     793           38 :                     Some(remote_meta) => {
     794           38 :                         // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
     795           38 :                         if local_meta.len() != remote_meta.metadata.file_size {
     796              :                             // This should not happen, because we do crashsafe write-then-rename when downloading
     797              :                             // layers, and layers in remote storage are immutable.  Remove the local file because
     798              :                             // we cannot trust it.
     799            0 :                             tracing::warn!(
     800            0 :                                 "Removing local layer {name} with unexpected local size {} != {}",
     801            0 :                                 local_meta.len(),
     802            0 :                                 remote_meta.metadata.file_size
     803            0 :                             );
     804           38 :                         } else {
     805           38 :                             // We expect the access time to be initialized immediately afterwards, when
     806           38 :                             // the latest heatmap is applied to the state.
     807           38 :                             detail.on_disk_layers.insert(
     808           38 :                                 name.clone(),
     809           38 :                                 OnDiskState::new(
     810           38 :                                     conf,
     811           38 :                                     tenant_shard_id,
     812           38 :                                     &heatmap.timeline_id,
     813           38 :                                     name,
     814           38 :                                     LayerFileMetadata::from(&remote_meta.metadata),
     815           38 :                                     remote_meta.access_time,
     816           38 :                                 ),
     817           38 :                             );
     818           38 :                         }
     819              :                     }
     820              :                     None => {
     821              :                         // FIXME: consider some optimization when transitioning from attached to secondary: maybe
     822              :                         // wait until we have seen a heatmap that is more recent than the most recent on-disk state?  Otherwise
     823              :                         // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
     824            0 :                         tracing::info!(
     825            0 :                             "Removing secondary local layer {} because it's absent in heatmap",
     826            0 :                             name
     827            0 :                         );
     828            0 :                         tokio::fs::remove_file(&dentry.path())
     829            0 :                             .await
     830            0 :                             .or_else(fs_ext::ignore_not_found)
     831            0 :                             .fatal_err(&format!(
     832            0 :                                 "Removing layer {}",
     833            0 :                                 dentry.path().to_string_lossy()
     834            0 :                             ));
     835              :                     }
     836              :                 }
     837              :             }
     838              :             Err(_) => {
     839              :                 // Ignore it.
     840            0 :                 tracing::warn!("Unexpected file in timeline directory: {file_name}");
     841              :             }
     842              :         }
     843              :     }
     844              : 
     845            2 :     detail
     846            6 : }
        

Generated by: LCOV version 2.1-beta