LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/secondary - heatmap_uploader.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 68.1 % 251 171 80 171
Current Date: 2024-01-09 02:06:09 Functions: 47.2 % 36 17 19 17
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::{
       2                 :     collections::HashMap,
       3                 :     pin::Pin,
       4                 :     sync::{Arc, Weak},
       5                 :     time::{Duration, Instant},
       6                 : };
       7                 : 
       8                 : use crate::{
       9                 :     metrics::SECONDARY_MODE,
      10                 :     tenant::{
      11                 :         config::AttachmentMode,
      12                 :         mgr::TenantManager,
      13                 :         remote_timeline_client::remote_heatmap_path,
      14                 :         span::debug_assert_current_span_has_tenant_id,
      15                 :         tasks::{warn_when_period_overrun, BackgroundLoopKind},
      16                 :         Tenant,
      17                 :     },
      18                 : };
      19                 : 
      20                 : use futures::Future;
      21                 : use md5;
      22                 : use pageserver_api::shard::TenantShardId;
      23                 : use rand::Rng;
      24                 : use remote_storage::GenericRemoteStorage;
      25                 : 
      26                 : use super::{
      27                 :     scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs},
      28                 :     CommandRequest,
      29                 : };
      30                 : use tokio_util::sync::CancellationToken;
      31                 : use tracing::{info_span, instrument, Instrument};
      32                 : use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
      33                 : 
      34                 : use super::{heatmap::HeatMapTenant, UploadCommand};
      35                 : 
      36 CBC         557 : pub(super) async fn heatmap_uploader_task(
      37             557 :     tenant_manager: Arc<TenantManager>,
      38             557 :     remote_storage: GenericRemoteStorage,
      39             557 :     command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
      40             557 :     background_jobs_can_start: Barrier,
      41             557 :     cancel: CancellationToken,
      42             557 : ) {
      43             557 :     let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency;
      44             557 : 
      45             557 :     let generator = HeatmapUploader {
      46             557 :         tenant_manager,
      47             557 :         remote_storage,
      48             557 :         cancel: cancel.clone(),
      49             557 :         tenants: HashMap::new(),
      50             557 :     };
      51             557 :     let mut scheduler = Scheduler::new(generator, concurrency);
      52             557 : 
      53             557 :     scheduler
      54             557 :         .run(command_queue, background_jobs_can_start, cancel)
      55             557 :         .instrument(info_span!("heatmap_uploader"))
      56             908 :         .await
      57             159 : }
      58                 : 
      59                 : /// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
      60                 : /// handling loop and mutates it as needed: there are no locks here, because that event loop
      61                 : /// can hold &mut references to this type throughout.
      62                 : struct HeatmapUploader {
      63                 :     tenant_manager: Arc<TenantManager>,
      64                 :     remote_storage: GenericRemoteStorage,
      65                 :     cancel: CancellationToken,
      66                 : 
      67                 :     tenants: HashMap<TenantShardId, UploaderTenantState>,
      68                 : }
      69                 : 
      70                 : struct WriteInProgress {
      71                 :     barrier: Barrier,
      72                 : }
      73                 : 
      74                 : impl RunningJob for WriteInProgress {
      75               6 :     fn get_barrier(&self) -> Barrier {
      76               6 :         self.barrier.clone()
      77               6 :     }
      78                 : }
      79                 : 
      80                 : struct UploadPending {
      81                 :     tenant: Arc<Tenant>,
      82                 :     last_digest: Option<md5::Digest>,
      83                 :     target_time: Option<Instant>,
      84                 :     period: Option<Duration>,
      85                 : }
      86                 : 
      87                 : impl scheduler::PendingJob for UploadPending {
      88              18 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      89              18 :         self.tenant.get_tenant_shard_id()
      90              18 :     }
      91                 : }
      92                 : 
      93                 : struct WriteComplete {
      94                 :     tenant_shard_id: TenantShardId,
      95                 :     completed_at: Instant,
      96                 :     digest: Option<md5::Digest>,
      97                 :     next_upload: Option<Instant>,
      98                 : }
      99                 : 
     100                 : impl scheduler::Completion for WriteComplete {
     101               6 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     102               6 :         &self.tenant_shard_id
     103               6 :     }
     104                 : }
     105                 : 
     106                 : /// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember
     107                 : /// when we last did a write.  We only populate this after doing at least one
     108                 : /// write for a tenant -- this avoids holding state for tenants that have
     109                 : /// uploads disabled.
     110                 : 
     111                 : struct UploaderTenantState {
     112                 :     // This Weak only exists to enable culling idle instances of this type
     113                 :     // when the Tenant has been deallocated.
     114                 :     tenant: Weak<Tenant>,
     115                 : 
     116                 :     /// Digest of the serialized heatmap that we last successfully uploaded
     117                 :     ///
     118                 :     /// md5 is generally a bad hash.  We use it because it's convenient for interop with AWS S3's ETag,
     119                 :     /// which is also an md5sum.
     120                 :     last_digest: Option<md5::Digest>,
     121                 : 
     122                 :     /// When the last upload attempt completed (may have been successful or failed)
     123                 :     last_upload: Option<Instant>,
     124                 : 
     125                 :     /// When should we next do an upload?  None means never.
     126                 :     next_upload: Option<Instant>,
     127                 : }
     128                 : 
     129                 : type Scheduler = TenantBackgroundJobs<
     130                 :     HeatmapUploader,
     131                 :     UploadPending,
     132                 :     WriteInProgress,
     133                 :     WriteComplete,
     134                 :     UploadCommand,
     135                 : >;
     136                 : 
     137                 : #[async_trait::async_trait]
     138                 : impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
     139                 :     for HeatmapUploader
     140                 : {
     141            1109 :     async fn schedule(&mut self) -> SchedulingResult<UploadPending> {
     142                 :         // Cull any entries in self.tenants whose Arc<Tenant> is gone
     143            1109 :         self.tenants
     144            1109 :             .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some());
     145            1109 : 
     146            1109 :         let now = Instant::now();
     147            1109 : 
     148            1109 :         let mut result = SchedulingResult {
     149            1109 :             jobs: Vec::new(),
     150            1109 :             want_interval: None,
     151            1109 :         };
     152            1109 : 
     153            1109 :         let tenants = self.tenant_manager.get_attached_active_tenant_shards();
     154            1109 : 
     155            1109 :         yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| {
     156             836 :             let period = match tenant.get_heatmap_period() {
     157                 :                 None => {
     158                 :                     // Heatmaps are disabled for this tenant
     159             836 :                     return;
     160                 :                 }
     161 UBC           0 :                 Some(period) => {
     162               0 :                     // If any tenant has asked for uploads more frequent than our scheduling interval,
     163               0 :                     // reduce it to match so that we can keep up.  This is mainly useful in testing, where
     164               0 :                     // we may set rather short intervals.
     165               0 :                     result.want_interval = match result.want_interval {
     166               0 :                         None => Some(period),
     167               0 :                         Some(existing) => Some(std::cmp::min(period, existing)),
     168                 :                     };
     169                 : 
     170               0 :                     period
     171               0 :                 }
     172               0 :             };
     173               0 : 
     174               0 :             // Stale attachments do not upload anything: if we are in this state, there is probably some
     175               0 :             // other attachment in mode Single or Multi running on another pageserver, and we don't
     176               0 :             // want to thrash and overwrite their heatmap uploads.
     177               0 :             if tenant.get_attach_mode() == AttachmentMode::Stale {
     178               0 :                 return;
     179               0 :             }
     180               0 : 
     181               0 :             // Create an entry in self.tenants if one doesn't already exist: this will later be updated
     182               0 :             // with the completion time in on_completion.
     183               0 :             let state = self
     184               0 :                 .tenants
     185               0 :                 .entry(*tenant.get_tenant_shard_id())
     186               0 :                 .or_insert_with(|| {
     187               0 :                     let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period);
     188               0 : 
     189               0 :                     UploaderTenantState {
     190               0 :                         tenant: Arc::downgrade(&tenant),
     191               0 :                         last_upload: None,
     192               0 :                         next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)),
     193               0 :                         last_digest: None,
     194               0 :                     }
     195               0 :                 });
     196               0 : 
     197               0 :             // Decline to do the upload if insufficient time has passed
     198               0 :             if state.next_upload.map(|nu| nu > now).unwrap_or(false) {
     199               0 :                 return;
     200               0 :             }
     201               0 : 
     202               0 :             let last_digest = state.last_digest;
     203               0 :             result.jobs.push(UploadPending {
     204               0 :                 tenant,
     205               0 :                 last_digest,
     206               0 :                 target_time: state.next_upload,
     207               0 :                 period: Some(period),
     208               0 :             });
     209 CBC        1109 :         })
     210 UBC           0 :         .await
     211 CBC        1109 :         .ok();
     212            1109 : 
     213            1109 :         result
     214            2218 :     }
     215                 : 
     216               6 :     fn spawn(
     217               6 :         &mut self,
     218               6 :         job: UploadPending,
     219               6 :     ) -> (
     220               6 :         WriteInProgress,
     221               6 :         Pin<Box<dyn Future<Output = WriteComplete> + Send>>,
     222               6 :     ) {
     223               6 :         let UploadPending {
     224               6 :             tenant,
     225               6 :             last_digest,
     226               6 :             target_time,
     227               6 :             period,
     228               6 :         } = job;
     229               6 : 
     230               6 :         let remote_storage = self.remote_storage.clone();
     231               6 :         let (completion, barrier) = utils::completion::channel();
     232               6 :         let tenant_shard_id = *tenant.get_tenant_shard_id();
     233               6 :         (WriteInProgress { barrier }, Box::pin(async move {
     234               6 :             // Guard for the barrier in [`WriteInProgress`]
     235               6 :             let _completion = completion;
     236               6 : 
     237               6 :             let started_at = Instant::now();
     238              18 :             let digest = match upload_tenant_heatmap(remote_storage, &tenant, last_digest).await {
     239               6 :                 Ok(UploadHeatmapOutcome::Uploaded(digest)) => {
     240               6 :                     let duration = Instant::now().duration_since(started_at);
     241               6 :                     SECONDARY_MODE
     242               6 :                         .upload_heatmap_duration
     243               6 :                         .observe(duration.as_secs_f64());
     244               6 :                     SECONDARY_MODE.upload_heatmap.inc();
     245               6 :                     Some(digest)
     246                 :                 }
     247 UBC           0 :                 Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_digest,
     248               0 :                 Err(UploadHeatmapError::Upload(e)) => {
     249               0 :                     tracing::warn!(
     250               0 :                         "Failed to upload heatmap for tenant {}: {e:#}",
     251               0 :                         tenant.get_tenant_shard_id(),
     252               0 :                     );
     253               0 :                     let duration = Instant::now().duration_since(started_at);
     254               0 :                     SECONDARY_MODE
     255               0 :                         .upload_heatmap_duration
     256               0 :                         .observe(duration.as_secs_f64());
     257               0 :                     SECONDARY_MODE.upload_heatmap_errors.inc();
     258               0 :                     last_digest
     259                 :                 }
     260                 :                 Err(UploadHeatmapError::Cancelled) => {
     261               0 :                     tracing::info!("Cancelled heatmap upload, shutting down");
     262               0 :                     last_digest
     263                 :                 }
     264                 :             };
     265                 : 
     266 CBC           6 :             let now = Instant::now();
     267                 : 
     268                 :             // If the job had a target execution time, we may check our final execution
     269                 :             // time against that for observability purposes.
     270               6 :             if let (Some(target_time), Some(period)) = (target_time, period) {
     271 UBC           0 :                 // Elapsed time includes any scheduling lag as well as the execution of the job
     272               0 :                 let elapsed = now.duration_since(target_time);
     273               0 : 
     274               0 :                 warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload);
     275 CBC           6 :             }
     276                 : 
     277               6 :             let next_upload = tenant
     278               6 :                 .get_heatmap_period()
     279               6 :                 .and_then(|period| now.checked_add(period));
     280               6 : 
     281               6 :             WriteComplete {
     282               6 :                     tenant_shard_id: *tenant.get_tenant_shard_id(),
     283               6 :                     completed_at: now,
     284               6 :                     digest,
     285               6 :                     next_upload,
     286               6 :                 }
     287               6 :         }.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
     288               6 :     }
     289                 : 
     290               6 :     fn on_command(&mut self, command: UploadCommand) -> anyhow::Result<UploadPending> {
     291               6 :         let tenant_shard_id = command.get_tenant_shard_id();
     292               6 : 
     293               6 :         tracing::info!(
     294               6 :             tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     295               6 :             "Starting heatmap write on command");
     296               6 :         let tenant = self
     297               6 :             .tenant_manager
     298               6 :             .get_attached_tenant_shard(*tenant_shard_id, true)
     299               6 :             .map_err(|e| anyhow::anyhow!(e))?;
     300                 : 
     301               6 :         Ok(UploadPending {
     302               6 :             // Ignore our state for last digest: this forces an upload even if nothing has changed
     303               6 :             last_digest: None,
     304               6 :             tenant,
     305               6 :             target_time: None,
     306               6 :             period: None,
     307               6 :         })
     308               6 :     }
     309                 : 
     310               6 :     #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))]
     311                 :     fn on_completion(&mut self, completion: WriteComplete) {
     312 UBC           0 :         tracing::debug!("Heatmap upload completed");
     313                 :         let WriteComplete {
     314                 :             tenant_shard_id,
     315                 :             completed_at,
     316                 :             digest,
     317                 :             next_upload,
     318                 :         } = completion;
     319                 :         use std::collections::hash_map::Entry;
     320                 :         match self.tenants.entry(tenant_shard_id) {
     321                 :             Entry::Vacant(_) => {
     322                 :                 // Tenant state was dropped, nothing to update.
     323                 :             }
     324                 :             Entry::Occupied(mut entry) => {
     325                 :                 entry.get_mut().last_upload = Some(completed_at);
     326                 :                 entry.get_mut().last_digest = digest;
     327                 :                 entry.get_mut().next_upload = next_upload
     328                 :             }
     329                 :         }
     330                 :     }
     331                 : }
     332                 : 
     333                 : enum UploadHeatmapOutcome {
     334                 :     /// We successfully wrote to remote storage, with this digest.
     335                 :     Uploaded(md5::Digest),
     336                 :     /// We did not upload because the heatmap digest was unchanged since the last upload
     337                 :     NoChange,
     338                 :     /// We skipped the upload for some reason, such as tenant/timeline not ready
     339                 :     Skipped,
     340                 : }
     341                 : 
     342               0 : #[derive(thiserror::Error, Debug)]
     343                 : enum UploadHeatmapError {
     344                 :     #[error("Cancelled")]
     345                 :     Cancelled,
     346                 : 
     347                 :     #[error(transparent)]
     348                 :     Upload(#[from] anyhow::Error),
     349                 : }
     350                 : 
     351                 : /// The inner upload operation.  This will skip if `last_digest` is Some and matches the digest
     352                 : /// of the object we would have uploaded.
     353 CBC           6 : async fn upload_tenant_heatmap(
     354               6 :     remote_storage: GenericRemoteStorage,
     355               6 :     tenant: &Arc<Tenant>,
     356               6 :     last_digest: Option<md5::Digest>,
     357               6 : ) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
     358               6 :     debug_assert_current_span_has_tenant_id();
     359               6 : 
     360               6 :     let generation = tenant.get_generation();
     361               6 :     if generation.is_none() {
     362                 :         // We do not expect this: generations were implemented before heatmap uploads.  However,
     363                 :         // handle it so that we don't have to make the generation in the heatmap an Option<>
     364                 :         // (Generation::none is not serializable)
     365 UBC           0 :         tracing::warn!("Skipping heatmap upload for tenant with generation==None");
     366               0 :         return Ok(UploadHeatmapOutcome::Skipped);
     367 CBC           6 :     }
     368               6 : 
     369               6 :     let mut heatmap = HeatMapTenant {
     370               6 :         timelines: Vec::new(),
     371               6 :         generation,
     372               6 :     };
     373               6 :     let timelines = tenant.timelines.lock().unwrap().clone();
     374               6 : 
     375               6 :     let tenant_cancel = tenant.cancel.clone();
     376                 : 
     377                 :     // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
     378                 :     // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
     379                 :     // in remote storage.
     380               6 :     let _guard = match tenant.gate.enter() {
     381               6 :         Ok(g) => g,
     382                 :         Err(_) => {
     383 UBC           0 :             tracing::info!("Skipping heatmap upload for tenant which is shutting down");
     384               0 :             return Err(UploadHeatmapError::Cancelled);
     385                 :         }
     386                 :     };
     387                 : 
     388 CBC          12 :     for (timeline_id, timeline) in timelines {
     389               6 :         let heatmap_timeline = timeline.generate_heatmap().await;
     390               6 :         match heatmap_timeline {
     391                 :             None => {
     392 UBC           0 :                 tracing::debug!(
     393               0 :                     "Skipping heatmap upload because timeline {timeline_id} is not ready"
     394               0 :                 );
     395               0 :                 return Ok(UploadHeatmapOutcome::Skipped);
     396                 :             }
     397 CBC           6 :             Some(heatmap_timeline) => {
     398               6 :                 heatmap.timelines.push(heatmap_timeline);
     399               6 :             }
     400                 :         }
     401                 :     }
     402                 : 
     403                 :     // Serialize the heatmap
     404               6 :     let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
     405               6 :     let size = bytes.len();
     406               6 : 
     407               6 :     // Drop out early if nothing changed since our last upload
     408               6 :     let digest = md5::compute(&bytes);
     409               6 :     if Some(digest) == last_digest {
     410 UBC           0 :         return Ok(UploadHeatmapOutcome::NoChange);
     411 CBC           6 :     }
     412               6 : 
     413               6 :     let path = remote_heatmap_path(tenant.get_tenant_shard_id());
     414                 : 
     415                 :     // Write the heatmap.
     416 UBC           0 :     tracing::debug!("Uploading {size} byte heatmap to {path}");
     417 CBC           6 :     if let Err(e) = backoff::retry(
     418               6 :         || async {
     419               6 :             let bytes = futures::stream::once(futures::future::ready(Ok(bytes::Bytes::from(
     420               6 :                 bytes.clone(),
     421               6 :             ))));
     422               6 :             remote_storage
     423               6 :                 .upload_storage_object(bytes, size, &path)
     424              17 :                 .await
     425               6 :         },
     426               6 :         |_| false,
     427               6 :         3,
     428               6 :         u32::MAX,
     429               6 :         "Uploading heatmap",
     430               6 :         backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
     431               6 :     )
     432              17 :     .await
     433                 :     {
     434 UBC           0 :         if tenant_cancel.is_cancelled() {
     435               0 :             return Err(UploadHeatmapError::Cancelled);
     436                 :         } else {
     437               0 :             return Err(e.into());
     438                 :         }
     439 CBC           6 :     }
     440                 : 
     441               6 :     tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
     442                 : 
     443               6 :     Ok(UploadHeatmapOutcome::Uploaded(digest))
     444               6 : }
        

Generated by: LCOV version 2.1-beta