LCOV - code coverage report
Current view: top level - pageserver/src/tenant/secondary - heatmap_uploader.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 277 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 26 0

            Line data    Source code
       1              : use std::{
       2              :     collections::HashMap,
       3              :     pin::Pin,
       4              :     sync::{Arc, Weak},
       5              :     time::{Duration, Instant},
       6              : };
       7              : 
       8              : use crate::{
       9              :     metrics::SECONDARY_MODE,
      10              :     tenant::{
      11              :         config::AttachmentMode,
      12              :         mgr::{GetTenantError, TenantManager},
      13              :         remote_timeline_client::remote_heatmap_path,
      14              :         span::debug_assert_current_span_has_tenant_id,
      15              :         tasks::{warn_when_period_overrun, BackgroundLoopKind},
      16              :         Tenant,
      17              :     },
      18              :     virtual_file::VirtualFile,
      19              :     TEMP_FILE_SUFFIX,
      20              : };
      21              : 
      22              : use futures::Future;
      23              : use pageserver_api::shard::TenantShardId;
      24              : use remote_storage::{GenericRemoteStorage, TimeoutOrCancel};
      25              : 
      26              : use super::{
      27              :     heatmap::HeatMapTenant,
      28              :     scheduler::{
      29              :         self, period_jitter, period_warmup, JobGenerator, RunningJob, SchedulingResult,
      30              :         TenantBackgroundJobs,
      31              :     },
      32              :     CommandRequest, SecondaryTenantError, UploadCommand,
      33              : };
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::{info_span, instrument, Instrument};
      36              : use utils::{
      37              :     backoff, completion::Barrier, crashsafe::path_with_suffix_extension,
      38              :     yielding_loop::yielding_loop,
      39              : };
      40              : 
      41            0 : pub(super) async fn heatmap_uploader_task(
      42            0 :     tenant_manager: Arc<TenantManager>,
      43            0 :     remote_storage: GenericRemoteStorage,
      44            0 :     command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
      45            0 :     background_jobs_can_start: Barrier,
      46            0 :     cancel: CancellationToken,
      47            0 : ) {
      48            0 :     let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency;
      49            0 : 
      50            0 :     let generator = HeatmapUploader {
      51            0 :         tenant_manager,
      52            0 :         remote_storage,
      53            0 :         cancel: cancel.clone(),
      54            0 :         tenants: HashMap::new(),
      55            0 :     };
      56            0 :     let mut scheduler = Scheduler::new(generator, concurrency);
      57            0 : 
      58            0 :     scheduler
      59            0 :         .run(command_queue, background_jobs_can_start, cancel)
      60            0 :         .instrument(info_span!("heatmap_upload_scheduler"))
      61            0 :         .await
      62            0 : }
      63              : 
      64              : /// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
      65              : /// handling loop and mutates it as needed: there are no locks here, because that event loop
      66              : /// can hold &mut references to this type throughout.
      67              : struct HeatmapUploader {
      68              :     tenant_manager: Arc<TenantManager>,
      69              :     remote_storage: GenericRemoteStorage,
      70              :     cancel: CancellationToken,
      71              : 
      72              :     tenants: HashMap<TenantShardId, UploaderTenantState>,
      73              : }
      74              : 
      75              : struct WriteInProgress {
      76              :     barrier: Barrier,
      77              : }
      78              : 
      79              : impl RunningJob for WriteInProgress {
      80            0 :     fn get_barrier(&self) -> Barrier {
      81            0 :         self.barrier.clone()
      82            0 :     }
      83              : }
      84              : 
      85              : struct UploadPending {
      86              :     tenant: Arc<Tenant>,
      87              :     last_upload: Option<LastUploadState>,
      88              :     target_time: Option<Instant>,
      89              :     period: Option<Duration>,
      90              : }
      91              : 
      92              : impl scheduler::PendingJob for UploadPending {
      93            0 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      94            0 :         self.tenant.get_tenant_shard_id()
      95            0 :     }
      96              : }
      97              : 
      98              : struct WriteComplete {
      99              :     tenant_shard_id: TenantShardId,
     100              :     completed_at: Instant,
     101              :     uploaded: Option<LastUploadState>,
     102              :     next_upload: Option<Instant>,
     103              : }
     104              : 
     105              : impl scheduler::Completion for WriteComplete {
     106            0 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     107            0 :         &self.tenant_shard_id
     108            0 :     }
     109              : }
     110              : 
     111              : /// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember
     112              : /// when we last did a write.  We only populate this after doing at least one
     113              : /// write for a tenant -- this avoids holding state for tenants that have
     114              : /// uploads disabled.
     115              : struct UploaderTenantState {
     116              :     // This Weak only exists to enable culling idle instances of this type
     117              :     // when the Tenant has been deallocated.
     118              :     tenant: Weak<Tenant>,
     119              : 
     120              :     /// Digest of the serialized heatmap that we last successfully uploaded
     121              :     last_upload_state: Option<LastUploadState>,
     122              : 
     123              :     /// When the last upload attempt completed (may have been successful or failed)
     124              :     last_upload: Option<Instant>,
     125              : 
     126              :     /// When should we next do an upload?  None means never.
     127              :     next_upload: Option<Instant>,
     128              : }
     129              : 
     130              : type Scheduler = TenantBackgroundJobs<
     131              :     HeatmapUploader,
     132              :     UploadPending,
     133              :     WriteInProgress,
     134              :     WriteComplete,
     135              :     UploadCommand,
     136              : >;
     137              : 
     138              : impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
     139              :     for HeatmapUploader
     140              : {
     141            0 :     async fn schedule(&mut self) -> SchedulingResult<UploadPending> {
     142            0 :         // Cull any entries in self.tenants whose Arc<Tenant> is gone
     143            0 :         self.tenants
     144            0 :             .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some());
     145            0 : 
     146            0 :         let now = Instant::now();
     147            0 : 
     148            0 :         let mut result = SchedulingResult {
     149            0 :             jobs: Vec::new(),
     150            0 :             want_interval: None,
     151            0 :         };
     152            0 : 
     153            0 :         let tenants = self.tenant_manager.get_attached_active_tenant_shards();
     154            0 : 
     155            0 :         yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| {
     156            0 :             let period = match tenant.get_heatmap_period() {
     157              :                 None => {
     158              :                     // Heatmaps are disabled for this tenant
     159            0 :                     return;
     160              :                 }
     161            0 :                 Some(period) => {
     162            0 :                     // If any tenant has asked for uploads more frequent than our scheduling interval,
     163            0 :                     // reduce it to match so that we can keep up.  This is mainly useful in testing, where
     164            0 :                     // we may set rather short intervals.
     165            0 :                     result.want_interval = match result.want_interval {
     166            0 :                         None => Some(period),
     167            0 :                         Some(existing) => Some(std::cmp::min(period, existing)),
     168              :                     };
     169              : 
     170            0 :                     period
     171            0 :                 }
     172            0 :             };
     173            0 : 
     174            0 :             // Stale attachments do not upload anything: if we are in this state, there is probably some
     175            0 :             // other attachment in mode Single or Multi running on another pageserver, and we don't
     176            0 :             // want to thrash and overwrite their heatmap uploads.
     177            0 :             if tenant.get_attach_mode() == AttachmentMode::Stale {
     178            0 :                 return;
     179            0 :             }
     180            0 : 
     181            0 :             // Create an entry in self.tenants if one doesn't already exist: this will later be updated
     182            0 :             // with the completion time in on_completion.
     183            0 :             let state = self
     184            0 :                 .tenants
     185            0 :                 .entry(*tenant.get_tenant_shard_id())
     186            0 :                 .or_insert_with(|| UploaderTenantState {
     187            0 :                     tenant: Arc::downgrade(&tenant),
     188            0 :                     last_upload: None,
     189            0 :                     next_upload: Some(now.checked_add(period_warmup(period)).unwrap_or(now)),
     190            0 :                     last_upload_state: None,
     191            0 :                 });
     192            0 : 
     193            0 :             // Decline to do the upload if insufficient time has passed
     194            0 :             if state.next_upload.map(|nu| nu > now).unwrap_or(false) {
     195            0 :                 return;
     196            0 :             }
     197            0 : 
     198            0 :             let last_upload = state.last_upload_state.clone();
     199            0 :             result.jobs.push(UploadPending {
     200            0 :                 tenant,
     201            0 :                 last_upload,
     202            0 :                 target_time: state.next_upload,
     203            0 :                 period: Some(period),
     204            0 :             });
     205            0 :         })
     206            0 :         .await
     207            0 :         .ok();
     208            0 : 
     209            0 :         result
     210            0 :     }
     211              : 
     212            0 :     fn spawn(
     213            0 :         &mut self,
     214            0 :         job: UploadPending,
     215            0 :     ) -> (
     216            0 :         WriteInProgress,
     217            0 :         Pin<Box<dyn Future<Output = WriteComplete> + Send>>,
     218            0 :     ) {
     219            0 :         let UploadPending {
     220            0 :             tenant,
     221            0 :             last_upload,
     222            0 :             target_time,
     223            0 :             period,
     224            0 :         } = job;
     225            0 : 
     226            0 :         let remote_storage = self.remote_storage.clone();
     227            0 :         let (completion, barrier) = utils::completion::channel();
     228            0 :         let tenant_shard_id = *tenant.get_tenant_shard_id();
     229            0 :         (WriteInProgress { barrier }, Box::pin(async move {
     230            0 :             // Guard for the barrier in [`WriteInProgress`]
     231            0 :             let _completion = completion;
     232            0 : 
     233            0 :             let started_at = Instant::now();
     234            0 :             let uploaded = match upload_tenant_heatmap(remote_storage, &tenant, last_upload.clone()).await {
     235            0 :                 Ok(UploadHeatmapOutcome::Uploaded(uploaded)) => {
     236            0 :                     let duration = Instant::now().duration_since(started_at);
     237            0 :                     SECONDARY_MODE
     238            0 :                         .upload_heatmap_duration
     239            0 :                         .observe(duration.as_secs_f64());
     240            0 :                     SECONDARY_MODE.upload_heatmap.inc();
     241            0 :                     Some(uploaded)
     242              :                 }
     243            0 :                 Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_upload,
     244            0 :                 Err(UploadHeatmapError::Upload(e)) => {
     245            0 :                     tracing::warn!(
     246            0 :                         "Failed to upload heatmap for tenant {}: {e:#}",
     247            0 :                         tenant.get_tenant_shard_id(),
     248              :                     );
     249            0 :                     let duration = Instant::now().duration_since(started_at);
     250            0 :                     SECONDARY_MODE
     251            0 :                         .upload_heatmap_duration
     252            0 :                         .observe(duration.as_secs_f64());
     253            0 :                     SECONDARY_MODE.upload_heatmap_errors.inc();
     254            0 :                     last_upload
     255              :                 }
     256              :                 Err(UploadHeatmapError::Cancelled) => {
     257            0 :                     tracing::info!("Cancelled heatmap upload, shutting down");
     258            0 :                     last_upload
     259              :                 }
     260              :             };
     261              : 
     262            0 :             let now = Instant::now();
     263              : 
     264              :             // If the job had a target execution time, we may check our final execution
     265              :             // time against that for observability purposes.
     266            0 :             if let (Some(target_time), Some(period)) = (target_time, period) {
     267            0 :                 // Elapsed time includes any scheduling lag as well as the execution of the job
     268            0 :                 let elapsed = now.duration_since(target_time);
     269            0 : 
     270            0 :                 warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload);
     271            0 :             }
     272              : 
     273            0 :             let next_upload = tenant
     274            0 :                 .get_heatmap_period()
     275            0 :                 .and_then(|period| now.checked_add(period_jitter(period, 5)));
     276            0 : 
     277            0 :             WriteComplete {
     278            0 :                     tenant_shard_id: *tenant.get_tenant_shard_id(),
     279            0 :                     completed_at: now,
     280            0 :                     uploaded,
     281            0 :                     next_upload,
     282            0 :                 }
     283            0 :         }.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
     284            0 :     }
     285              : 
     286            0 :     fn on_command(
     287            0 :         &mut self,
     288            0 :         command: UploadCommand,
     289            0 :     ) -> Result<UploadPending, SecondaryTenantError> {
     290            0 :         let tenant_shard_id = command.get_tenant_shard_id();
     291            0 : 
     292            0 :         tracing::info!(
     293            0 :             tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     294            0 :             "Starting heatmap write on command");
     295            0 :         let tenant = self
     296            0 :             .tenant_manager
     297            0 :             .get_attached_tenant_shard(*tenant_shard_id)?;
     298            0 :         if !tenant.is_active() {
     299            0 :             return Err(GetTenantError::NotActive(*tenant_shard_id).into());
     300            0 :         }
     301            0 : 
     302            0 :         Ok(UploadPending {
     303            0 :             // Ignore our state for last digest: this forces an upload even if nothing has changed
     304            0 :             last_upload: None,
     305            0 :             tenant,
     306            0 :             target_time: None,
     307            0 :             period: None,
     308            0 :         })
     309            0 :     }
     310              : 
     311              :     #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))]
     312              :     fn on_completion(&mut self, completion: WriteComplete) {
     313              :         tracing::debug!("Heatmap upload completed");
     314              :         let WriteComplete {
     315              :             tenant_shard_id,
     316              :             completed_at,
     317              :             uploaded,
     318              :             next_upload,
     319              :         } = completion;
     320              :         use std::collections::hash_map::Entry;
     321              :         match self.tenants.entry(tenant_shard_id) {
     322              :             Entry::Vacant(_) => {
     323              :                 // Tenant state was dropped, nothing to update.
     324              :             }
     325              :             Entry::Occupied(mut entry) => {
     326              :                 entry.get_mut().last_upload = Some(completed_at);
     327              :                 entry.get_mut().last_upload_state = uploaded;
     328              :                 entry.get_mut().next_upload = next_upload
     329              :             }
     330              :         }
     331              :     }
     332              : }
     333              : 
     334              : enum UploadHeatmapOutcome {
     335              :     /// We successfully wrote to remote storage, with this digest.
     336              :     Uploaded(LastUploadState),
     337              :     /// We did not upload because the heatmap digest was unchanged since the last upload
     338              :     NoChange,
     339              :     /// We skipped the upload for some reason, such as tenant/timeline not ready
     340              :     Skipped,
     341              : }
     342              : 
     343              : #[derive(thiserror::Error, Debug)]
     344              : enum UploadHeatmapError {
     345              :     #[error("Cancelled")]
     346              :     Cancelled,
     347              : 
     348              :     #[error(transparent)]
     349              :     Upload(#[from] anyhow::Error),
     350              : }
     351              : 
     352              : /// Digests describing the heatmap we most recently uploaded successfully.
     353              : ///
     354              : /// md5 is generally a bad hash.  We use it because it's convenient for interop with AWS S3's ETag,
     355              : /// which is also an md5sum.
     356              : #[derive(Clone)]
     357              : struct LastUploadState {
     358              :     // Digest of json-encoded HeatMapTenant
     359              :     uploaded_digest: md5::Digest,
     360              : 
     361              :     // Digest without atimes set.
     362              :     layers_only_digest: md5::Digest,
     363              : }
     364              : 
     365              : /// The inner upload operation.  This will skip if `last_digest` is Some and matches the digest
     366              : /// of the object we would have uploaded.
     367            0 : async fn upload_tenant_heatmap(
     368            0 :     remote_storage: GenericRemoteStorage,
     369            0 :     tenant: &Arc<Tenant>,
     370            0 :     last_upload: Option<LastUploadState>,
     371            0 : ) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
     372            0 :     debug_assert_current_span_has_tenant_id();
     373            0 : 
     374            0 :     let generation = tenant.get_generation();
     375            0 :     debug_assert!(!generation.is_none());
     376            0 :     if generation.is_none() {
     377              :         // We do not expect this: None generations should only appear in historic layer metadata, not in running Tenants
     378            0 :         tracing::warn!("Skipping heatmap upload for tenant with generation==None");
     379            0 :         return Ok(UploadHeatmapOutcome::Skipped);
     380            0 :     }
     381            0 : 
     382            0 :     let mut heatmap = HeatMapTenant {
     383            0 :         timelines: Vec::new(),
     384            0 :         generation,
     385            0 :         upload_period_ms: tenant.get_heatmap_period().map(|p| p.as_millis()),
     386            0 :     };
     387            0 :     let timelines = tenant.timelines.lock().unwrap().clone();
     388              : 
     389              :     // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
     390              :     // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
     391              :     // in remote storage.
     392            0 :     let Ok(_guard) = tenant.gate.enter() else {
     393            0 :         tracing::info!("Skipping heatmap upload for tenant which is shutting down");
     394            0 :         return Err(UploadHeatmapError::Cancelled);
     395              :     };
     396              : 
     397            0 :     for (timeline_id, timeline) in timelines {
     398            0 :         let heatmap_timeline = timeline.generate_heatmap().await;
     399            0 :         match heatmap_timeline {
     400              :             None => {
     401            0 :                 tracing::debug!(
     402            0 :                     "Skipping heatmap upload because timeline {timeline_id} is not ready"
     403              :                 );
     404            0 :                 return Ok(UploadHeatmapOutcome::Skipped);
     405              :             }
     406            0 :             Some(heatmap_timeline) => {
     407            0 :                 heatmap.timelines.push(heatmap_timeline);
     408            0 :             }
     409              :         }
     410              :     }
     411              : 
     412              :     // Serialize the heatmap
     413            0 :     let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
     414              : 
     415              :     // Drop out early if nothing changed since our last upload
     416            0 :     let digest = md5::compute(&bytes);
     417            0 :     if Some(&digest) == last_upload.as_ref().map(|d| &d.uploaded_digest) {
     418            0 :         return Ok(UploadHeatmapOutcome::NoChange);
     419            0 :     }
     420            0 : 
     421            0 :     // Calculate a digest that omits atimes, so that we can distinguish actual changes in
     422            0 :     // layers from changes only in atimes.
     423            0 :     let heatmap_size_bytes = heatmap.get_stats().bytes;
     424            0 :     let layers_only_bytes =
     425            0 :         serde_json::to_vec(&heatmap.strip_atimes()).map_err(|e| anyhow::anyhow!(e))?;
     426            0 :     let layers_only_digest = md5::compute(&layers_only_bytes);
     427            0 :     if heatmap_size_bytes < tenant.get_checkpoint_distance() {
     428              :         // For small tenants, skip upload if only atimes changed. This avoids doing frequent
     429              :         // uploads from long-idle tenants whose atimes are just incremented by periodic
     430              :         // size calculations.
     431            0 :         if Some(&layers_only_digest) == last_upload.as_ref().map(|d| &d.layers_only_digest) {
     432            0 :             return Ok(UploadHeatmapOutcome::NoChange);
     433            0 :         }
     434            0 :     }
     435              : 
     436            0 :     let bytes = bytes::Bytes::from(bytes);
     437            0 :     let size = bytes.len();
     438            0 : 
     439            0 :     let path = remote_heatmap_path(tenant.get_tenant_shard_id());
     440            0 : 
     441            0 :     let cancel = &tenant.cancel;
     442            0 : 
     443            0 :     tracing::debug!("Uploading {size} byte heatmap to {path}");
     444            0 :     if let Err(e) = backoff::retry(
     445            0 :         || async {
     446            0 :             let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
     447            0 :             remote_storage
     448            0 :                 .upload_storage_object(bytes, size, &path, cancel)
     449            0 :                 .await
     450            0 :         },
     451            0 :         TimeoutOrCancel::caused_by_cancel,
     452            0 :         3,
     453            0 :         u32::MAX,
     454            0 :         "Uploading heatmap",
     455            0 :         cancel,
     456            0 :     )
     457            0 :     .await
     458            0 :     .ok_or_else(|| anyhow::anyhow!("Shutting down"))
     459            0 :     .and_then(|x| x)
     460              :     {
     461            0 :         if cancel.is_cancelled() {
     462            0 :             return Err(UploadHeatmapError::Cancelled);
     463              :         } else {
     464            0 :             return Err(e.into());
     465              :         }
     466            0 :     }
     467            0 : 
     468            0 :     // After a successful upload persist the fresh heatmap to disk.
     469            0 :     // When restarting, the tenant will read the heatmap from disk
     470            0 :     // and additively generate a new heatmap (see [`Timeline::generate_heatmap`]).
     471            0 :     // If the heatmap is stale, the additive generation can lead to keeping previously
     472            0 :     // evicted timelines on the secondarie's disk.
     473            0 :     let tenant_shard_id = tenant.get_tenant_shard_id();
     474            0 :     let heatmap_path = tenant.conf.tenant_heatmap_path(tenant_shard_id);
     475            0 :     let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
     476            0 :     if let Err(err) = VirtualFile::crashsafe_overwrite(heatmap_path, temp_path, bytes).await {
     477            0 :         tracing::warn!("Non fatal IO error writing to disk after heatmap upload: {err}");
     478            0 :     }
     479              : 
     480            0 :     tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
     481              : 
     482            0 :     Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {
     483            0 :         uploaded_digest: digest,
     484            0 :         layers_only_digest,
     485            0 :     }))
     486            0 : }
        

Generated by: LCOV version 2.1-beta