LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/secondary - downloader.rs (source / functions) Coverage Total Hit UBC GBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 80.1 % 477 382 95 41 341
Current Date: 2024-01-09 02:06:09 Functions: 56.5 % 69 39 30 1 38
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::{
       2                 :     collections::{HashMap, HashSet},
       3                 :     pin::Pin,
       4                 :     str::FromStr,
       5                 :     sync::Arc,
       6                 :     time::{Duration, Instant, SystemTime},
       7                 : };
       8                 : 
       9                 : use crate::{
      10                 :     config::PageServerConf,
      11                 :     metrics::SECONDARY_MODE,
      12                 :     tenant::{
      13                 :         config::SecondaryLocationConfig,
      14                 :         debug_assert_current_span_has_tenant_and_timeline_id,
      15                 :         remote_timeline_client::{
      16                 :             index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
      17                 :         },
      18                 :         span::debug_assert_current_span_has_tenant_id,
      19                 :         storage_layer::LayerFileName,
      20                 :         tasks::{warn_when_period_overrun, BackgroundLoopKind},
      21                 :     },
      22                 :     virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
      23                 :     METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
      24                 : };
      25                 : 
      26                 : use super::{
      27                 :     heatmap::HeatMapLayer,
      28                 :     scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
      29                 :     SecondaryTenant,
      30                 : };
      31                 : 
      32                 : use crate::tenant::{
      33                 :     mgr::TenantManager,
      34                 :     remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
      35                 : };
      36                 : 
      37                 : use chrono::format::{DelayedFormat, StrftimeItems};
      38                 : use futures::Future;
      39                 : use pageserver_api::shard::TenantShardId;
      40                 : use rand::Rng;
      41                 : use remote_storage::{DownloadError, GenericRemoteStorage};
      42                 : 
      43                 : use tokio_util::sync::CancellationToken;
      44                 : use tracing::{info_span, instrument, Instrument};
      45                 : use utils::{
      46                 :     backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
      47                 : };
      48                 : 
      49                 : use super::{
      50                 :     heatmap::{HeatMapTenant, HeatMapTimeline},
      51                 :     CommandRequest, DownloadCommand,
      52                 : };
      53                 : 
      54                 : /// For each tenant, how long must have passed since the last download_tenant call before
      55                 : /// calling it again.  This is approximately the time by which local data is allowed
      56                 : /// to fall behind remote data.
      57                 : ///
      58                 : /// TODO: this should just be a default, and the actual period should be controlled
      59                 : /// via the heatmap itself
      60                 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
      61                 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
      62                 : 
      63 CBC         557 : pub(super) async fn downloader_task(
      64             557 :     tenant_manager: Arc<TenantManager>,
      65             557 :     remote_storage: GenericRemoteStorage,
      66             557 :     command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
      67             557 :     background_jobs_can_start: Barrier,
      68             557 :     cancel: CancellationToken,
      69             557 : ) {
      70             557 :     let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
      71             557 : 
      72             557 :     let generator = SecondaryDownloader {
      73             557 :         tenant_manager,
      74             557 :         remote_storage,
      75             557 :     };
      76             557 :     let mut scheduler = Scheduler::new(generator, concurrency);
      77             557 : 
      78             557 :     scheduler
      79             557 :         .run(command_queue, background_jobs_can_start, cancel)
      80             557 :         .instrument(info_span!("secondary_downloads"))
      81             907 :         .await
      82             159 : }
      83                 : 
      84                 : struct SecondaryDownloader {
      85                 :     tenant_manager: Arc<TenantManager>,
      86                 :     remote_storage: GenericRemoteStorage,
      87                 : }
      88                 : 
      89            1858 : #[derive(Debug, Clone)]
      90                 : pub(super) struct OnDiskState {
      91                 :     metadata: LayerFileMetadata,
      92                 :     access_time: SystemTime,
      93                 : }
      94                 : 
      95                 : impl OnDiskState {
      96            1649 :     fn new(
      97            1649 :         _conf: &'static PageServerConf,
      98            1649 :         _tenant_shard_id: &TenantShardId,
      99            1649 :         _imeline_id: &TimelineId,
     100            1649 :         _ame: LayerFileName,
     101            1649 :         metadata: LayerFileMetadata,
     102            1649 :         access_time: SystemTime,
     103            1649 :     ) -> Self {
     104            1649 :         Self {
     105            1649 :             metadata,
     106            1649 :             access_time,
     107            1649 :         }
     108            1649 :     }
     109                 : }
     110                 : 
     111               5 : #[derive(Debug, Clone, Default)]
     112                 : pub(super) struct SecondaryDetailTimeline {
     113                 :     pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
     114                 : 
     115                 :     /// We remember when layers were evicted, to prevent re-downloading them.
     116                 :     pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
     117                 : }
     118                 : 
     119                 : /// This state is written by the secondary downloader, it is opaque
     120                 : /// to TenantManager
     121 UBC           0 : #[derive(Debug)]
     122                 : pub(super) struct SecondaryDetail {
     123                 :     pub(super) config: SecondaryLocationConfig,
     124                 : 
     125                 :     last_download: Option<Instant>,
     126                 :     next_download: Option<Instant>,
     127                 :     pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
     128                 : }
     129                 : 
     130                 : /// Helper for logging SystemTime
     131               0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
     132               0 :     let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
     133               0 :     datetime.format("%d/%m/%Y %T")
     134               0 : }
     135                 : 
     136                 : impl SecondaryDetail {
     137 CBC          27 :     pub(super) fn new(config: SecondaryLocationConfig) -> Self {
     138              27 :         Self {
     139              27 :             config,
     140              27 :             last_download: None,
     141              27 :             next_download: None,
     142              27 :             timelines: HashMap::new(),
     143              27 :         }
     144              27 :     }
     145                 : }
     146                 : 
     147                 : struct PendingDownload {
     148                 :     secondary_state: Arc<SecondaryTenant>,
     149                 :     last_download: Option<Instant>,
     150                 :     target_time: Option<Instant>,
     151                 :     period: Option<Duration>,
     152                 : }
     153                 : 
     154                 : impl scheduler::PendingJob for PendingDownload {
     155              20 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     156              20 :         self.secondary_state.get_tenant_shard_id()
     157              20 :     }
     158                 : }
     159                 : 
     160                 : struct RunningDownload {
     161                 :     barrier: Barrier,
     162                 : }
     163                 : 
     164                 : impl scheduler::RunningJob for RunningDownload {
     165               4 :     fn get_barrier(&self) -> Barrier {
     166               4 :         self.barrier.clone()
     167               4 :     }
     168                 : }
     169                 : 
     170                 : struct CompleteDownload {
     171                 :     secondary_state: Arc<SecondaryTenant>,
     172                 :     completed_at: Instant,
     173                 : }
     174                 : 
     175                 : impl scheduler::Completion for CompleteDownload {
     176              18 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     177              18 :         self.secondary_state.get_tenant_shard_id()
     178              18 :     }
     179                 : }
     180                 : 
     181                 : type Scheduler = TenantBackgroundJobs<
     182                 :     SecondaryDownloader,
     183                 :     PendingDownload,
     184                 :     RunningDownload,
     185                 :     CompleteDownload,
     186                 :     DownloadCommand,
     187                 : >;
     188                 : 
     189                 : #[async_trait::async_trait]
     190                 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
     191                 :     for SecondaryDownloader
     192                 : {
     193               6 :     #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
     194                 :     fn on_completion(&mut self, completion: CompleteDownload) {
     195                 :         let CompleteDownload {
     196                 :             secondary_state,
     197                 :             completed_at: _completed_at,
     198                 :         } = completion;
     199                 : 
     200 UBC           0 :         tracing::debug!("Secondary tenant download completed");
     201                 : 
     202                 :         // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
     203                 :         // take priority to run again.
     204                 :         let mut detail = secondary_state.detail.lock().unwrap();
     205                 :         detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
     206                 :     }
     207                 : 
     208 CBC        1109 :     async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
     209            1109 :         let mut result = SchedulingResult {
     210            1109 :             jobs: Vec::new(),
     211            1109 :             want_interval: None,
     212            1109 :         };
     213            1109 : 
     214            1109 :         // Step 1: identify some tenants that we may work on
     215            1109 :         let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
     216            1109 :         self.tenant_manager
     217            1109 :             .foreach_secondary_tenants(|_id, secondary_state| {
     218              11 :                 tenants.push(secondary_state.clone());
     219            1109 :             });
     220            1109 : 
     221            1109 :         // Step 2: filter out tenants which are not yet elegible to run
     222            1109 :         let now = Instant::now();
     223            1109 :         result.jobs = tenants
     224            1109 :             .into_iter()
     225            1109 :             .filter_map(|secondary_tenant| {
     226               5 :                 let (last_download, next_download) = {
     227              11 :                     let mut detail = secondary_tenant.detail.lock().unwrap();
     228              11 : 
     229              11 :                     if !detail.config.warm {
     230                 :                         // Downloads are disabled for this tenant
     231               6 :                         detail.next_download = None;
     232               6 :                         return None;
     233               5 :                     }
     234               5 : 
     235               5 :                     if detail.next_download.is_none() {
     236               4 :                         // Initialize with a jitter: this spreads initial downloads on startup
     237               4 :                         // or mass-attach across our freshen interval.
     238               4 :                         let jittered_period =
     239               4 :                             rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
     240               4 :                         detail.next_download = Some(now.checked_add(jittered_period).expect(
     241               4 :                         "Using our constant, which is known to be small compared with clock range",
     242               4 :                     ));
     243               4 :                     }
     244               5 :                     (detail.last_download, detail.next_download.unwrap())
     245               5 :                 };
     246               5 : 
     247               5 :                 if now < next_download {
     248               5 :                     Some(PendingDownload {
     249               5 :                         secondary_state: secondary_tenant,
     250               5 :                         last_download,
     251               5 :                         target_time: Some(next_download),
     252               5 :                         period: Some(DOWNLOAD_FRESHEN_INTERVAL),
     253               5 :                     })
     254                 :                 } else {
     255 UBC           0 :                     None
     256                 :                 }
     257 CBC        1109 :             })
     258            1109 :             .collect();
     259            1109 : 
     260            1109 :         // Step 3: sort by target execution time to run most urgent first.
     261            1109 :         result.jobs.sort_by_key(|j| j.target_time);
     262            1109 : 
     263            1109 :         result
     264            1109 :     }
     265                 : 
     266               4 :     fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
     267               4 :         let tenant_shard_id = command.get_tenant_shard_id();
     268               4 : 
     269               4 :         let tenant = self
     270               4 :             .tenant_manager
     271               4 :             .get_secondary_tenant_shard(*tenant_shard_id);
     272               4 :         let Some(tenant) = tenant else {
     273                 :             {
     274 UBC           0 :                 return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
     275                 :             }
     276                 :         };
     277                 : 
     278 CBC           4 :         Ok(PendingDownload {
     279               4 :             target_time: None,
     280               4 :             period: None,
     281               4 :             last_download: None,
     282               4 :             secondary_state: tenant,
     283               4 :         })
     284               4 :     }
     285                 : 
     286               7 :     fn spawn(
     287               7 :         &mut self,
     288               7 :         job: PendingDownload,
     289               7 :     ) -> (
     290               7 :         RunningDownload,
     291               7 :         Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
     292               7 :     ) {
     293               7 :         let PendingDownload {
     294               7 :             secondary_state,
     295               7 :             last_download,
     296               7 :             target_time,
     297               7 :             period,
     298               7 :         } = job;
     299               7 : 
     300               7 :         let (completion, barrier) = utils::completion::channel();
     301               7 :         let remote_storage = self.remote_storage.clone();
     302               7 :         let conf = self.tenant_manager.get_conf();
     303               7 :         let tenant_shard_id = *secondary_state.get_tenant_shard_id();
     304               7 :         (RunningDownload { barrier }, Box::pin(async move {
     305               7 :             let _completion = completion;
     306               7 : 
     307               7 :             match TenantDownloader::new(conf, &remote_storage, &secondary_state)
     308               7 :                 .download()
     309           18190 :                 .await
     310                 :             {
     311                 :                 Err(UpdateError::NoData) => {
     312               1 :                     tracing::info!("No heatmap found for tenant.  This is fine if it is new.");
     313                 :                 },
     314                 :                 Err(UpdateError::NoSpace) => {
     315 UBC           0 :                     tracing::warn!("Insufficient space while downloading.  Will retry later.");
     316                 :                 }
     317                 :                 Err(UpdateError::Cancelled) => {
     318               0 :                     tracing::debug!("Shut down while downloading");
     319                 :                 },
     320               0 :                 Err(UpdateError::Deserialize(e)) => {
     321               0 :                     tracing::error!("Corrupt content while downloading tenant: {e}");
     322                 :                 },
     323               0 :                 Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
     324               0 :                     tracing::error!("Error while downloading tenant: {e}");
     325                 :                 },
     326 CBC           5 :                 Ok(()) => {}
     327                 :             };
     328                 : 
     329                 :             // Irrespective of the result, we will reschedule ourselves to run after our usual period.
     330                 : 
     331                 :             // If the job had a target execution time, we may check our final execution
     332                 :             // time against that for observability purposes.
     333               6 :             if let (Some(target_time), Some(period)) = (target_time, period) {
     334                 :                 // Only track execution lag if this isn't our first download: otherwise, it is expected
     335                 :                 // that execution will have taken longer than our configured interval, for example
     336                 :                 // when starting up a pageserver and
     337               2 :                 if last_download.is_some() {
     338 UBC           0 :                     // Elapsed time includes any scheduling lag as well as the execution of the job
     339               0 :                     let elapsed = Instant::now().duration_since(target_time);
     340               0 : 
     341               0 :                     warn_when_period_overrun(
     342               0 :                         elapsed,
     343               0 :                         period,
     344               0 :                         BackgroundLoopKind::SecondaryDownload,
     345               0 :                     );
     346 CBC           2 :                 }
     347               4 :             }
     348                 : 
     349               6 :             CompleteDownload {
     350               6 :                     secondary_state,
     351               6 :                     completed_at: Instant::now(),
     352               6 :                 }
     353               7 :         }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
     354               7 :     }
     355                 : }
     356                 : 
     357                 : /// This type is a convenience to group together the various functions involved in
     358                 : /// freshening a secondary tenant.
     359                 : struct TenantDownloader<'a> {
     360                 :     conf: &'static PageServerConf,
     361                 :     remote_storage: &'a GenericRemoteStorage,
     362                 :     secondary_state: &'a SecondaryTenant,
     363                 : }
     364                 : 
     365                 : /// Errors that may be encountered while updating a tenant
     366 UBC           0 : #[derive(thiserror::Error, Debug)]
     367                 : enum UpdateError {
     368                 :     #[error("No remote data found")]
     369                 :     NoData,
     370                 :     #[error("Insufficient local storage space")]
     371                 :     NoSpace,
     372                 :     #[error("Failed to download")]
     373                 :     DownloadError(DownloadError),
     374                 :     #[error(transparent)]
     375                 :     Deserialize(#[from] serde_json::Error),
     376                 :     #[error("Cancelled")]
     377                 :     Cancelled,
     378                 :     #[error(transparent)]
     379                 :     Other(#[from] anyhow::Error),
     380                 : }
     381                 : 
     382                 : impl From<DownloadError> for UpdateError {
     383 CBC           1 :     fn from(value: DownloadError) -> Self {
     384               1 :         match &value {
     385 UBC           0 :             DownloadError::Cancelled => Self::Cancelled,
     386 CBC           1 :             DownloadError::NotFound => Self::NoData,
     387 UBC           0 :             _ => Self::DownloadError(value),
     388                 :         }
     389 CBC           1 :     }
     390                 : }
     391                 : 
     392                 : impl From<std::io::Error> for UpdateError {
     393 UBC           0 :     fn from(value: std::io::Error) -> Self {
     394               0 :         if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
     395               0 :             UpdateError::NoSpace
     396                 :         } else {
     397                 :             // An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
     398               0 :             UpdateError::Other(anyhow::anyhow!(value))
     399                 :         }
     400               0 :     }
     401                 : }
     402                 : 
     403                 : impl<'a> TenantDownloader<'a> {
     404 CBC           7 :     fn new(
     405               7 :         conf: &'static PageServerConf,
     406               7 :         remote_storage: &'a GenericRemoteStorage,
     407               7 :         secondary_state: &'a SecondaryTenant,
     408               7 :     ) -> Self {
     409               7 :         Self {
     410               7 :             conf,
     411               7 :             remote_storage,
     412               7 :             secondary_state,
     413               7 :         }
     414               7 :     }
     415                 : 
     416               7 :     async fn download(&self) -> Result<(), UpdateError> {
     417               7 :         debug_assert_current_span_has_tenant_id();
     418                 : 
     419                 :         // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
     420                 :         // cover our access to local storage.
     421               7 :         let Ok(_guard) = self.secondary_state.gate.enter() else {
     422                 :             // Shutting down
     423 UBC           0 :             return Ok(());
     424                 :         };
     425                 : 
     426 CBC           7 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
     427                 :         // Download the tenant's heatmap
     428               7 :         let heatmap_bytes = tokio::select!(
     429               7 :             bytes = self.download_heatmap() => {bytes?},
     430                 :             _ = self.secondary_state.cancel.cancelled() => return Ok(())
     431                 :         );
     432                 : 
     433               6 :         let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
     434                 : 
     435                 :         // Save the heatmap: this will be useful on restart, allowing us to reconstruct
     436                 :         // layer metadata without having to re-download it.
     437               6 :         let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
     438               6 : 
     439               6 :         let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
     440               6 :         let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
     441               6 :         let heatmap_path_bg = heatmap_path.clone();
     442               6 :         tokio::task::spawn_blocking(move || {
     443               6 :             tokio::runtime::Handle::current().block_on(async move {
     444               6 :                 VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
     445               6 :             })
     446               6 :         })
     447               6 :         .await
     448               6 :         .expect("Blocking task is never aborted")
     449               6 :         .maybe_fatal_err(&context_msg)?;
     450                 : 
     451 UBC           0 :         tracing::debug!("Wrote local heatmap to {}", heatmap_path);
     452                 : 
     453                 :         // Download the layers in the heatmap
     454 CBC          11 :         for timeline in heatmap.timelines {
     455               6 :             if self.secondary_state.cancel.is_cancelled() {
     456 UBC           0 :                 return Ok(());
     457 CBC           6 :             }
     458               6 : 
     459               6 :             let timeline_id = timeline.timeline_id;
     460               6 :             self.download_timeline(timeline)
     461                 :                 .instrument(tracing::info_span!(
     462                 :                     "secondary_download_timeline",
     463                 :                     tenant_id=%tenant_shard_id.tenant_id,
     464               6 :                     shard_id=%tenant_shard_id.shard_slug(),
     465                 :                     %timeline_id
     466                 :                 ))
     467           18136 :                 .await?;
     468                 :         }
     469                 : 
     470               5 :         Ok(())
     471               6 :     }
     472                 : 
     473               7 :     async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
     474               7 :         debug_assert_current_span_has_tenant_id();
     475               7 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
     476                 :         // TODO: make download conditional on ETag having changed since last download
     477                 :         // (https://github.com/neondatabase/neon/issues/6199)
     478 UBC           0 :         tracing::debug!("Downloading heatmap for secondary tenant",);
     479                 : 
     480 CBC           7 :         let heatmap_path = remote_heatmap_path(tenant_shard_id);
     481                 : 
     482               7 :         let heatmap_bytes = backoff::retry(
     483               7 :             || async {
     484               7 :                 let download = self
     485               7 :                     .remote_storage
     486               7 :                     .download(&heatmap_path)
     487              22 :                     .await
     488               7 :                     .map_err(UpdateError::from)?;
     489               6 :                 let mut heatmap_bytes = Vec::new();
     490               6 :                 let mut body = tokio_util::io::StreamReader::new(download.download_stream);
     491              26 :                 let _size = tokio::io::copy(&mut body, &mut heatmap_bytes).await?;
     492               6 :                 Ok(heatmap_bytes)
     493               7 :             },
     494               7 :             |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
     495               7 :             FAILED_DOWNLOAD_WARN_THRESHOLD,
     496               7 :             FAILED_REMOTE_OP_RETRIES,
     497               7 :             "download heatmap",
     498               7 :             backoff::Cancel::new(self.secondary_state.cancel.clone(), || {
     499 UBC           0 :                 UpdateError::Cancelled
     500 CBC           7 :             }),
     501               7 :         )
     502              48 :         .await?;
     503                 : 
     504               6 :         SECONDARY_MODE.download_heatmap.inc();
     505               6 : 
     506               6 :         Ok(heatmap_bytes)
     507               7 :     }
     508                 : 
     509               6 :     async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
     510               6 :         debug_assert_current_span_has_tenant_and_timeline_id();
     511               6 :         let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
     512               6 :         let timeline_path = self
     513               6 :             .conf
     514               6 :             .timeline_path(tenant_shard_id, &timeline.timeline_id);
     515               6 : 
     516               6 :         // Accumulate updates to the state
     517               6 :         let mut touched = Vec::new();
     518               6 : 
     519               6 :         // Clone a view of what layers already exist on disk
     520               6 :         let timeline_state = self
     521               6 :             .secondary_state
     522               6 :             .detail
     523               6 :             .lock()
     524               6 :             .unwrap()
     525               6 :             .timelines
     526               6 :             .get(&timeline.timeline_id)
     527               6 :             .cloned();
     528                 : 
     529               6 :         let timeline_state = match timeline_state {
     530               3 :             Some(t) => t,
     531                 :             None => {
     532                 :                 // We have no existing state: need to scan local disk for layers first.
     533               2 :                 let timeline_state =
     534             422 :                     init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
     535                 : 
     536                 :                 // Re-acquire detail lock now that we're done with async load from local FS
     537               2 :                 self.secondary_state
     538               2 :                     .detail
     539               2 :                     .lock()
     540               2 :                     .unwrap()
     541               2 :                     .timelines
     542               2 :                     .insert(timeline.timeline_id, timeline_state.clone());
     543               2 :                 timeline_state
     544                 :             }
     545                 :         };
     546                 : 
     547               5 :         let layers_in_heatmap = timeline
     548               5 :             .layers
     549               5 :             .iter()
     550            3080 :             .map(|l| &l.name)
     551               5 :             .collect::<HashSet<_>>();
     552               5 :         let layers_on_disk = timeline_state
     553               5 :             .on_disk_layers
     554               5 :             .iter()
     555            1858 :             .map(|l| l.0)
     556               5 :             .collect::<HashSet<_>>();
     557                 : 
     558                 :         // Remove on-disk layers that are no longer present in heatmap
     559               5 :         for layer in layers_on_disk.difference(&layers_in_heatmap) {
     560               2 :             let local_path = timeline_path.join(layer.to_string());
     561               2 :             tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
     562               2 :             tokio::fs::remove_file(&local_path)
     563               2 :                 .await
     564               2 :                 .or_else(fs_ext::ignore_not_found)
     565               2 :                 .maybe_fatal_err("Removing secondary layer")?;
     566                 :         }
     567                 : 
     568                 :         // Download heatmap layers that are not present on local disk, or update their
     569                 :         // access time if they are already present.
     570            3085 :         for layer in timeline.layers {
     571            3080 :             if self.secondary_state.cancel.is_cancelled() {
     572 UBC           0 :                 return Ok(());
     573 CBC        3080 :             }
     574                 : 
     575                 :             // Existing on-disk layers: just update their access time.
     576            3080 :             if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
     577 UBC           0 :                 tracing::debug!("Layer {} is already on disk", layer.name);
     578 CBC        1856 :                 if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
     579            1856 :                     || on_disk.access_time != layer.access_time
     580                 :                 {
     581                 :                     // We already have this layer on disk.  Update its access time.
     582 UBC           0 :                     tracing::debug!(
     583               0 :                         "Access time updated for layer {}: {} -> {}",
     584               0 :                         layer.name,
     585               0 :                         strftime(&on_disk.access_time),
     586               0 :                         strftime(&layer.access_time)
     587               0 :                     );
     588 CBC         200 :                     touched.push(layer);
     589            1656 :                 }
     590            1856 :                 continue;
     591                 :             } else {
     592 UBC           0 :                 tracing::debug!("Layer {} not present on disk yet", layer.name);
     593                 :             }
     594                 : 
     595                 :             // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
     596                 :             // recently than it was evicted.
     597 CBC        1224 :             if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
     598 UBC           0 :                 if &layer.access_time > evicted_at {
     599               0 :                     tracing::info!(
     600               0 :                         "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
     601               0 :                         layer.name,
     602               0 :                         strftime(&layer.access_time),
     603               0 :                         strftime(evicted_at)
     604               0 :                     );
     605                 :                 } else {
     606               0 :                     tracing::trace!(
     607               0 :                         "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
     608               0 :                         layer.name,
     609               0 :                         strftime(&layer.access_time),
     610               0 :                         strftime(evicted_at)
     611               0 :                     );
     612               0 :                     continue;
     613                 :                 }
     614 CBC        1224 :             }
     615                 : 
     616                 :             // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
     617            1224 :             let downloaded_bytes = match download_layer_file(
     618            1224 :                 self.conf,
     619            1224 :                 self.remote_storage,
     620            1224 :                 *tenant_shard_id,
     621            1224 :                 timeline.timeline_id,
     622            1224 :                 &layer.name,
     623            1224 :                 &LayerFileMetadata::from(&layer.metadata),
     624            1224 :                 &self.secondary_state.cancel,
     625            1224 :             )
     626           17712 :             .await
     627                 :             {
     628            1224 :                 Ok(bytes) => bytes,
     629 UBC           0 :                 Err(e) => {
     630               0 :                     if let DownloadError::NotFound = e {
     631                 :                         // A heatmap might be out of date and refer to a layer that doesn't exist any more.
     632                 :                         // This is harmless: continue to download the next layer. It is expected during compaction
     633                 :                         // GC.
     634               0 :                         tracing::debug!(
     635               0 :                             "Skipped downloading missing layer {}, raced with compaction/gc?",
     636               0 :                             layer.name
     637               0 :                         );
     638               0 :                         continue;
     639                 :                     } else {
     640               0 :                         return Err(e.into());
     641                 :                     }
     642                 :                 }
     643                 :             };
     644                 : 
     645 CBC        1224 :             if downloaded_bytes != layer.metadata.file_size {
     646 UBC           0 :                 let local_path = timeline_path.join(layer.name.to_string());
     647                 : 
     648               0 :                 tracing::warn!(
     649               0 :                     "Downloaded layer {} with unexpected size {} != {}.  Removing download.",
     650               0 :                     layer.name,
     651               0 :                     downloaded_bytes,
     652               0 :                     layer.metadata.file_size
     653               0 :                 );
     654                 : 
     655               0 :                 tokio::fs::remove_file(&local_path)
     656               0 :                     .await
     657               0 :                     .or_else(fs_ext::ignore_not_found)?;
     658 CBC        1224 :             }
     659                 : 
     660            1224 :             SECONDARY_MODE.download_layer.inc();
     661            1224 :             touched.push(layer)
     662                 :         }
     663                 : 
     664                 :         // Write updates to state to record layers we just downloaded or touched.
     665                 :         {
     666               5 :             let mut detail = self.secondary_state.detail.lock().unwrap();
     667               5 :             let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
     668                 : 
     669               5 :             tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
     670                 : 
     671            1429 :             for t in touched {
     672                 :                 use std::collections::hash_map::Entry;
     673            1424 :                 match timeline_detail.on_disk_layers.entry(t.name.clone()) {
     674             200 :                     Entry::Occupied(mut v) => {
     675             200 :                         v.get_mut().access_time = t.access_time;
     676             200 :                     }
     677            1224 :                     Entry::Vacant(e) => {
     678            1224 :                         e.insert(OnDiskState::new(
     679            1224 :                             self.conf,
     680            1224 :                             tenant_shard_id,
     681            1224 :                             &timeline.timeline_id,
     682            1224 :                             t.name,
     683            1224 :                             LayerFileMetadata::from(&t.metadata),
     684            1224 :                             t.access_time,
     685            1224 :                         ));
     686            1224 :                     }
     687                 :                 }
     688                 :             }
     689                 :         }
     690                 : 
     691               5 :         Ok(())
     692               5 :     }
     693                 : }
     694                 : 
     695                 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
     696               3 : async fn init_timeline_state(
     697               3 :     conf: &'static PageServerConf,
     698               3 :     tenant_shard_id: &TenantShardId,
     699               3 :     heatmap: &HeatMapTimeline,
     700               3 : ) -> SecondaryDetailTimeline {
     701               3 :     let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
     702               3 :     let mut detail = SecondaryDetailTimeline::default();
     703                 : 
     704               3 :     let mut dir = match tokio::fs::read_dir(&timeline_path).await {
     705 GBC           1 :         Ok(d) => d,
     706 CBC           2 :         Err(e) => {
     707               2 :             if e.kind() == std::io::ErrorKind::NotFound {
     708               2 :                 let context = format!("Creating timeline directory {timeline_path}");
     709               2 :                 tracing::info!("{}", context);
     710               2 :                 tokio::fs::create_dir_all(&timeline_path)
     711               2 :                     .await
     712               2 :                     .fatal_err(&context);
     713               2 : 
     714               2 :                 // No entries to report: drop out.
     715               2 :                 return detail;
     716                 :             } else {
     717 UBC           0 :                 on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
     718                 :             }
     719                 :         }
     720                 :     };
     721                 : 
     722                 :     // As we iterate through layers found on disk, we will look up their metadata from this map.
     723                 :     // Layers not present in metadata will be discarded.
     724 GBC           1 :     let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
     725             589 :         heatmap.layers.iter().map(|l| (&l.name, l)).collect();
     726                 : 
     727             423 :     while let Some(dentry) = dir
     728             423 :         .next_entry()
     729              13 :         .await
     730             423 :         .fatal_err(&format!("Listing {timeline_path}"))
     731                 :     {
     732             423 :         let dentry_file_name = dentry.file_name();
     733             423 :         let file_name = dentry_file_name.to_string_lossy();
     734             423 :         let local_meta = dentry.metadata().await.fatal_err(&format!(
     735             423 :             "Read metadata on {}",
     736             423 :             dentry.path().to_string_lossy()
     737             423 :         ));
     738             423 : 
     739             423 :         // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
     740             423 :         if file_name == METADATA_FILE_NAME {
     741               1 :             continue;
     742             422 :         }
     743             422 : 
     744             422 :         match LayerFileName::from_str(&file_name) {
     745             422 :             Ok(name) => {
     746             422 :                 let remote_meta = heatmap_metadata.get(&name);
     747             422 :                 match remote_meta {
     748             421 :                     Some(remote_meta) => {
     749             421 :                         // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
     750             421 :                         if local_meta.len() != remote_meta.metadata.file_size {
     751                 :                             // This should not happen, because we do crashsafe write-then-rename when downloading
     752                 :                             // layers, and layers in remote storage are immutable.  Remove the local file because
     753                 :                             // we cannot trust it.
     754 UBC           0 :                             tracing::warn!(
     755               0 :                                 "Removing local layer {name} with unexpected local size {} != {}",
     756               0 :                                 local_meta.len(),
     757               0 :                                 remote_meta.metadata.file_size
     758               0 :                             );
     759 GBC         421 :                         } else {
     760             421 :                             // We expect the access time to be initialized immediately afterwards, when
     761             421 :                             // the latest heatmap is applied to the state.
     762             421 :                             detail.on_disk_layers.insert(
     763             421 :                                 name.clone(),
     764             421 :                                 OnDiskState::new(
     765             421 :                                     conf,
     766             421 :                                     tenant_shard_id,
     767             421 :                                     &heatmap.timeline_id,
     768             421 :                                     name,
     769             421 :                                     LayerFileMetadata::from(&remote_meta.metadata),
     770             421 :                                     remote_meta.access_time,
     771             421 :                                 ),
     772             421 :                             );
     773             421 :                         }
     774                 :                     }
     775                 :                     None => {
     776                 :                         // FIXME: consider some optimization when transitioning from attached to secondary: maybe
     777                 :                         // wait until we have seen a heatmap that is more recent than the most recent on-disk state?  Otherwise
     778                 :                         // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
     779 UBC           0 :                         tracing::info!(
     780               0 :                             "Removing secondary local layer {} because it's absent in heatmap",
     781               0 :                             name
     782               0 :                         );
     783               0 :                         tokio::fs::remove_file(&dentry.path())
     784               0 :                             .await
     785               0 :                             .or_else(fs_ext::ignore_not_found)
     786               0 :                             .fatal_err(&format!(
     787               0 :                                 "Removing layer {}",
     788               0 :                                 dentry.path().to_string_lossy()
     789               0 :                             ));
     790                 :                     }
     791                 :                 }
     792                 :             }
     793                 :             Err(_) => {
     794                 :                 // Ignore it.
     795               0 :                 tracing::warn!("Unexpected file in timeline directory: {file_name}");
     796                 :             }
     797                 :         }
     798                 :     }
     799                 : 
     800               0 :     detail
     801 CBC           2 : }
        

Generated by: LCOV version 2.1-beta