       1              : use std::{
       2              :     collections::HashMap,
       3              :     pin::Pin,
       4              :     sync::{Arc, Weak},
       5              :     time::{Duration, Instant},
       6              : };
       7              : 
       8              : use crate::{
       9              :     metrics::SECONDARY_MODE,
      10              :     tenant::{
      11              :         config::AttachmentMode,
      12              :         mgr::TenantManager,
      13              :         remote_timeline_client::remote_heatmap_path,
      14              :         span::debug_assert_current_span_has_tenant_id,
      15              :         tasks::{warn_when_period_overrun, BackgroundLoopKind},
      16              :         Tenant,
      17              :     },
      18              : };
      19              : 
      20              : use futures::Future;
      21              : use md5;
      22              : use pageserver_api::shard::TenantShardId;
      23              : use rand::Rng;
      24              : use remote_storage::GenericRemoteStorage;
      25              : 
      26              : use super::{
      27              :     scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs},
      28              :     CommandRequest,
      29              : };
      30              : use tokio_util::sync::CancellationToken;
      31              : use tracing::{info_span, instrument, Instrument};
      32              : use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
      33              : 
      34              : use super::{heatmap::HeatMapTenant, UploadCommand};
      35              : 
      36          604 : pub(super) async fn heatmap_uploader_task(
      37          604 :     tenant_manager: Arc<TenantManager>,
      38          604 :     remote_storage: GenericRemoteStorage,
      39          604 :     command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
      40          604 :     background_jobs_can_start: Barrier,
      41          604 :     cancel: CancellationToken,
      42          604 : ) {
      43          604 :     let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency;
      44          604 : 
      45          604 :     let generator = HeatmapUploader {
      46          604 :         tenant_manager,
      47          604 :         remote_storage,
      48          604 :         cancel: cancel.clone(),
      49          604 :         tenants: HashMap::new(),
      50          604 :     };
      51          604 :     let mut scheduler = Scheduler::new(generator, concurrency);
      52          604 : 
      53          604 :     scheduler
      54          604 :         .run(command_queue, background_jobs_can_start, cancel)
      55          604 :         .instrument(info_span!("heatmap_uploader"))
      56          946 :         .await
      57          172 : }
      58              : 
      59              : /// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
      60              : /// handling loop and mutates it as needed: there are no locks here, because that event loop
      61              : /// can hold &mut references to this type throughout.
      62              : struct HeatmapUploader {
      63              :     tenant_manager: Arc<TenantManager>,
      64              :     remote_storage: GenericRemoteStorage,
      65              :     cancel: CancellationToken,
      66              : 
      67              :     tenants: HashMap<TenantShardId, UploaderTenantState>,
      68              : }
      69              : 
      70              : struct WriteInProgress {
      71              :     barrier: Barrier,
      72              : }
      73              : 
      74              : impl RunningJob for WriteInProgress {
      75            8 :     fn get_barrier(&self) -> Barrier {
      76            8 :         self.barrier.clone()
      77            8 :     }
      78              : }
      79              : 
      80              : struct UploadPending {
      81              :     tenant: Arc<Tenant>,
      82              :     last_digest: Option<md5::Digest>,
      83              :     target_time: Option<Instant>,
      84              :     period: Option<Duration>,
      85              : }
      86              : 
      87              : impl scheduler::PendingJob for UploadPending {
      88           24 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      89           24 :         self.tenant.get_tenant_shard_id()
      90           24 :     }
      91              : }
      92              : 
      93              : struct WriteComplete {
      94              :     tenant_shard_id: TenantShardId,
      95              :     completed_at: Instant,
      96              :     digest: Option<md5::Digest>,
      97              :     next_upload: Option<Instant>,
      98              : }
      99              : 
     100              : impl scheduler::Completion for WriteComplete {
     101            8 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     102            8 :         &self.tenant_shard_id
     103            8 :     }
     104              : }
     105              : 
     106              : /// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember
     107              : /// when we last did a write.  We only populate this after doing at least one
     108              : /// write for a tenant -- this avoids holding state for tenants that have
     109              : /// uploads disabled.
     110              : 
     111              : struct UploaderTenantState {
     112              :     // This Weak only exists to enable culling idle instances of this type
     113              :     // when the Tenant has been deallocated.
     114              :     tenant: Weak<Tenant>,
     115              : 
     116              :     /// Digest of the serialized heatmap that we last successfully uploaded
     117              :     ///
     118              :     /// md5 is generally a bad hash.  We use it because it's convenient for interop with AWS S3's ETag,
     119              :     /// which is also an md5sum.
     120              :     last_digest: Option<md5::Digest>,
     121              : 
     122              :     /// When the last upload attempt completed (may have been successful or failed)
     123              :     last_upload: Option<Instant>,
     124              : 
     125              :     /// When should we next do an upload?  None means never.
     126              :     next_upload: Option<Instant>,
     127              : }
     128              : 
     129              : type Scheduler = TenantBackgroundJobs<
     130              :     HeatmapUploader,
     131              :     UploadPending,
     132              :     WriteInProgress,
     133              :     WriteComplete,
     134              :     UploadCommand,
     135              : >;
     136              : 
     137              : impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
     138              :     for HeatmapUploader
     139              : {
     140         1180 :     async fn schedule(&mut self) -> SchedulingResult<UploadPending> {
     141         1180 :         // Cull any entries in self.tenants whose Arc<Tenant> is gone
     142         1180 :         self.tenants
     143         1180 :             .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some());
     144         1180 : 
     145         1180 :         let now = Instant::now();
     146         1180 : 
     147         1180 :         let mut result = SchedulingResult {
     148         1180 :             jobs: Vec::new(),
     149         1180 :             want_interval: None,
     150         1180 :         };
     151         1180 : 
     152         1180 :         let tenants = self.tenant_manager.get_attached_active_tenant_shards();
     153         1180 : 
     154         1180 :         yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| {
     155          932 :             let period = match tenant.get_heatmap_period() {
     156              :                 None => {
     157              :                     // Heatmaps are disabled for this tenant
     158          932 :                     return;
     159              :                 }
     160            0 :                 Some(period) => {
     161            0 :                     // If any tenant has asked for uploads more frequent than our scheduling interval,
     162            0 :                     // reduce it to match so that we can keep up.  This is mainly useful in testing, where
     163            0 :                     // we may set rather short intervals.
     164            0 :                     result.want_interval = match result.want_interval {
     165            0 :                         None => Some(period),
     166            0 :                         Some(existing) => Some(std::cmp::min(period, existing)),
     167              :                     };
     168              : 
     169            0 :                     period
     170            0 :                 }
     171            0 :             };
     172            0 : 
     173            0 :             // Stale attachments do not upload anything: if we are in this state, there is probably some
     174            0 :             // other attachment in mode Single or Multi running on another pageserver, and we don't
     175            0 :             // want to thrash and overwrite their heatmap uploads.
     176            0 :             if tenant.get_attach_mode() == AttachmentMode::Stale {
     177            0 :                 return;
     178            0 :             }
     179            0 : 
     180            0 :             // Create an entry in self.tenants if one doesn't already exist: this will later be updated
     181            0 :             // with the completion time in on_completion.
     182            0 :             let state = self
     183            0 :                 .tenants
     184            0 :                 .entry(*tenant.get_tenant_shard_id())
     185            0 :                 .or_insert_with(|| {
     186            0 :                     let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period);
     187            0 : 
     188            0 :                     UploaderTenantState {
     189            0 :                         tenant: Arc::downgrade(&tenant),
     190            0 :                         last_upload: None,
     191            0 :                         next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)),
     192            0 :                         last_digest: None,
     193            0 :                     }
     194            0 :                 });
     195            0 : 
     196            0 :             // Decline to do the upload if insufficient time has passed
     197            0 :             if|nu| nu > now).unwrap_or(false) {
     198            0 :                 return;
     199            0 :             }
     200            0 : 
     201            0 :             let last_digest = state.last_digest;
     202            0 :    {
     203            0 :                 tenant,
     204            0 :                 last_digest,
     205            0 :                 target_time: state.next_upload,
     206            0 :                 period: Some(period),
     207            0 :             });
     208         1180 :         })
     209            0 :         .await
     210         1180 :         .ok();
     211         1180 : 
     212         1180 :         result
     213         1180 :     }
     214              : 
     215            8 :     fn spawn(
     216            8 :         &mut self,
     217            8 :         job: UploadPending,
     218            8 :     ) -> (
     219            8 :         WriteInProgress,
     220            8 :         Pin<Box<dyn Future<Output = WriteComplete> + Send>>,
     221            8 :     ) {
     222            8 :         let UploadPending {
     223            8 :             tenant,
     224            8 :             last_digest,
     225            8 :             target_time,
     226            8 :             period,
     227            8 :         } = job;
     228            8 : 
     229            8 :         let remote_storage = self.remote_storage.clone();
     230            8 :         let (completion, barrier) = utils::completion::channel();
     231            8 :         let tenant_shard_id = *tenant.get_tenant_shard_id();
     232            8 :         (WriteInProgress { barrier }, Box::pin(async move {
     233            8 :             // Guard for the barrier in [`WriteInProgress`]
     234            8 :             let _completion = completion;
     235            8 : 
     236            8 :             let started_at = Instant::now();
     237           41 :             let digest = match upload_tenant_heatmap(remote_storage, &tenant, last_digest).await {
     238            8 :                 Ok(UploadHeatmapOutcome::Uploaded(digest)) => {
     239            8 :                     let duration = Instant::now().duration_since(started_at);
     240            8 :                     SECONDARY_MODE
     241            8 :                         .upload_heatmap_duration
     242            8 :                         .observe(duration.as_secs_f64());
     243            8 :           ;
     244            8 :                     Some(digest)
     245              :                 }
     246            0 :                 Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_digest,
     247            0 :                 Err(UploadHeatmapError::Upload(e)) => {
     248            0 :                     tracing::warn!(
     249            0 :                         "Failed to upload heatmap for tenant {}: {e:#}",
     250            0 :                         tenant.get_tenant_shard_id(),
     251            0 :                     );
     252            0 :                     let duration = Instant::now().duration_since(started_at);
     253            0 :                     SECONDARY_MODE
     254            0 :                         .upload_heatmap_duration
     255            0 :                         .observe(duration.as_secs_f64());
     256            0 :           ;
     257            0 :                     last_digest
     258              :                 }
     259              :                 Err(UploadHeatmapError::Cancelled) => {
     260            0 :                     tracing::info!("Cancelled heatmap upload, shutting down");
     261            0 :                     last_digest
     262              :                 }
     263              :             };
     264              : 
     265            8 :             let now = Instant::now();
     266              : 
     267              :             // If the job had a target execution time, we may check our final execution
     268              :             // time against that for observability purposes.
     269            8 :             if let (Some(target_time), Some(period)) = (target_time, period) {
     270            0 :                 // Elapsed time includes any scheduling lag as well as the execution of the job
     271            0 :                 let elapsed = now.duration_since(target_time);
     272            0 : 
     273            0 :                 warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload);
     274            8 :             }
     275              : 
     276            8 :             let next_upload = tenant
     277            8 :                 .get_heatmap_period()
     278            8 :                 .and_then(|period| now.checked_add(period));
     279            8 : 
     280            8 :             WriteComplete {
     281            8 :                     tenant_shard_id: *tenant.get_tenant_shard_id(),
     282            8 :                     completed_at: now,
     283            8 :                     digest,
     284            8 :                     next_upload,
     285            8 :                 }
     286            8 :         }.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
     287            8 :     }
     288              : 
     289            8 :     fn on_command(&mut self, command: UploadCommand) -> anyhow::Result<UploadPending> {
     290            8 :         let tenant_shard_id = command.get_tenant_shard_id();
     291            8 : 
     292            8 :         tracing::info!(
     293            8 :             tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     294            8 :             "Starting heatmap write on command");
     295            8 :         let tenant = self
     296            8 :             .tenant_manager
     297            8 :             .get_attached_tenant_shard(*tenant_shard_id, true)
     298            8 :             .map_err(|e| anyhow::anyhow!(e))?;
     299              : 
     300            8 :         Ok(UploadPending {
     301            8 :             // Ignore our state for last digest: this forces an upload even if nothing has changed
     302            8 :             last_digest: None,
     303            8 :             tenant,
     304            8 :             target_time: None,
     305            8 :             period: None,
     306            8 :         })
     307            8 :     }
     308              : 
     309            8 :     #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))]
     310              :     fn on_completion(&mut self, completion: WriteComplete) {
     311            0 :         tracing::debug!("Heatmap upload completed");
     312              :         let WriteComplete {
     313              :             tenant_shard_id,
     314              :             completed_at,
     315              :             digest,
     316              :             next_upload,
     317              :         } = completion;
     318              :         use std::collections::hash_map::Entry;
     319              :         match self.tenants.entry(tenant_shard_id) {
     320              :             Entry::Vacant(_) => {
     321              :                 // Tenant state was dropped, nothing to update.
     322              :             }
     323              :             Entry::Occupied(mut entry) => {
     324              :                 entry.get_mut().last_upload = Some(completed_at);
     325              :                 entry.get_mut().last_digest = digest;
     326              :                 entry.get_mut().next_upload = next_upload
     327              :             }
     328              :         }
     329              :     }
     330              : }
     331              : 
     332              : enum UploadHeatmapOutcome {
     333              :     /// We successfully wrote to remote storage, with this digest.
     334              :     Uploaded(md5::Digest),
     335              :     /// We did not upload because the heatmap digest was unchanged since the last upload
     336              :     NoChange,
     337              :     /// We skipped the upload for some reason, such as tenant/timeline not ready
     338              :     Skipped,
     339              : }
     340              : 
     341            0 : #[derive(thiserror::Error, Debug)]
     342              : enum UploadHeatmapError {
     343              :     #[error("Cancelled")]
     344              :     Cancelled,
     345              : 
     346              :     #[error(transparent)]
     347              :     Upload(#[from] anyhow::Error),
     348              : }
     349              : 
     350              : /// The inner upload operation.  This will skip if `last_digest` is Some and matches the digest
     351              : /// of the object we would have uploaded.
     352            8 : async fn upload_tenant_heatmap(
     353            8 :     remote_storage: GenericRemoteStorage,
     354            8 :     tenant: &Arc<Tenant>,
     355            8 :     last_digest: Option<md5::Digest>,
     356            8 : ) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
     357            8 :     debug_assert_current_span_has_tenant_id();
     358            8 : 
     359            8 :     let generation = tenant.get_generation();
     360            8 :     if generation.is_none() {
     361              :         // We do not expect this: generations were implemented before heatmap uploads.  However,
     362              :         // handle it so that we don't have to make the generation in the heatmap an Option<>
     363              :         // (Generation::none is not serializable)
     364            0 :         tracing::warn!("Skipping heatmap upload for tenant with generation==None");
     365            0 :         return Ok(UploadHeatmapOutcome::Skipped);
     366            8 :     }
     367            8 : 
     368            8 :     let mut heatmap = HeatMapTenant {
     369            8 :         timelines: Vec::new(),
     370            8 :         generation,
     371            8 :     };
     372            8 :     let timelines = tenant.timelines.lock().unwrap().clone();
     373              : 
     374              :     // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
     375              :     // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
     376              :     // in remote storage.
     377            8 :     let _guard = match tenant.gate.enter() {
     378            8 :         Ok(g) => g,
     379              :         Err(_) => {
     380            0 :             tracing::info!("Skipping heatmap upload for tenant which is shutting down");
     381            0 :             return Err(UploadHeatmapError::Cancelled);
     382              :         }
     383              :     };
     384              : 
     385           16 :     for (timeline_id, timeline) in timelines {
     386           17 :         let heatmap_timeline = timeline.generate_heatmap().await;
     387            8 :         match heatmap_timeline {
     388              :             None => {
     389            0 :                 tracing::debug!(
     390            0 :                     "Skipping heatmap upload because timeline {timeline_id} is not ready"
     391            0 :                 );
     392            0 :                 return Ok(UploadHeatmapOutcome::Skipped);
     393              :             }
     394            8 :             Some(heatmap_timeline) => {
     395            8 :                 heatmap.timelines.push(heatmap_timeline);
     396            8 :             }
     397              :         }
     398              :     }
     399              : 
     400              :     // Serialize the heatmap
     401            8 :     let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
     402            8 :     let bytes = bytes::Bytes::from(bytes);
     403            8 :     let size = bytes.len();
     404            8 : 
     405            8 :     // Drop out early if nothing changed since our last upload
     406            8 :     let digest = md5::compute(&bytes);
     407            8 :     if Some(digest) == last_digest {
     408            0 :         return Ok(UploadHeatmapOutcome::NoChange);
     409            8 :     }
     410            8 : 
     411            8 :     let path = remote_heatmap_path(tenant.get_tenant_shard_id());
     412            8 : 
     413            8 :     let cancel = &tenant.cancel;
     414              : 
     415            0 :     tracing::debug!("Uploading {size} byte heatmap to {path}");
     416            8 :     if let Err(e) = backoff::retry(
     417            8 :         || async {
     418            8 :             let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
     419            8 :             remote_storage
     420            8 :                 .upload_storage_object(bytes, size, &path)
     421           24 :                 .await
     422            8 :         },
     423            8 :         |_| false,
     424            8 :         3,
     425            8 :         u32::MAX,
     426            8 :         "Uploading heatmap",
     427            8 :         cancel,
     428            8 :     )
     429           24 :     .await
     430            8 :     .ok_or_else(|| anyhow::anyhow!("Shutting down"))
     431            8 :     .and_then(|x| x)
     432              :     {
     433            0 :         if cancel.is_cancelled() {
     434            0 :             return Err(UploadHeatmapError::Cancelled);
     435              :         } else {
     436            0 :             return Err(e.into());
     437              :         }
     438            8 :     }
     439              : 
     440            8 :     tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
     441              : 
     442            8 :     Ok(UploadHeatmapOutcome::Uploaded(digest))
     443            8 : }

