Line data Source code
1 : use std::collections::HashMap;
2 : use std::pin::Pin;
3 : use std::sync::{Arc, Weak};
4 : use std::time::{Duration, Instant};
5 :
6 : use futures::Future;
7 : use pageserver_api::shard::TenantShardId;
8 : use remote_storage::{GenericRemoteStorage, TimeoutOrCancel};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::{Instrument, info_span, instrument};
11 : use utils::backoff;
12 : use utils::completion::Barrier;
13 : use utils::crashsafe::path_with_suffix_extension;
14 : use utils::yielding_loop::yielding_loop;
15 :
16 : use super::heatmap::HeatMapTenant;
17 : use super::scheduler::{
18 : self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs, period_jitter,
19 : period_warmup,
20 : };
21 : use super::{CommandRequest, SecondaryTenantError, UploadCommand};
22 : use crate::TEMP_FILE_SUFFIX;
23 : use crate::metrics::SECONDARY_MODE;
24 : use crate::tenant::Tenant;
25 : use crate::tenant::config::AttachmentMode;
26 : use crate::tenant::mgr::{GetTenantError, TenantManager};
27 : use crate::tenant::remote_timeline_client::remote_heatmap_path;
28 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
29 : use crate::tenant::tasks::{BackgroundLoopKind, warn_when_period_overrun};
30 : use crate::virtual_file::VirtualFile;
31 :
32 0 : pub(super) async fn heatmap_uploader_task(
33 0 : tenant_manager: Arc<TenantManager>,
34 0 : remote_storage: GenericRemoteStorage,
35 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
36 0 : background_jobs_can_start: Barrier,
37 0 : cancel: CancellationToken,
38 0 : ) {
39 0 : let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency;
40 0 :
41 0 : let generator = HeatmapUploader {
42 0 : tenant_manager,
43 0 : remote_storage,
44 0 : cancel: cancel.clone(),
45 0 : tenants: HashMap::new(),
46 0 : };
47 0 : let mut scheduler = Scheduler::new(generator, concurrency);
48 0 :
49 0 : scheduler
50 0 : .run(command_queue, background_jobs_can_start, cancel)
51 0 : .instrument(info_span!("heatmap_upload_scheduler"))
52 0 : .await
53 0 : }
54 :
55 : /// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
56 : /// handling loop and mutates it as needed: there are no locks here, because that event loop
57 : /// can hold &mut references to this type throughout.
58 : struct HeatmapUploader {
59 : tenant_manager: Arc<TenantManager>,
60 : remote_storage: GenericRemoteStorage,
61 : cancel: CancellationToken,
62 :
63 : tenants: HashMap<TenantShardId, UploaderTenantState>,
64 : }
65 :
66 : struct WriteInProgress {
67 : barrier: Barrier,
68 : }
69 :
70 : impl RunningJob for WriteInProgress {
71 0 : fn get_barrier(&self) -> Barrier {
72 0 : self.barrier.clone()
73 0 : }
74 : }
75 :
76 : struct UploadPending {
77 : tenant: Arc<Tenant>,
78 : last_upload: Option<LastUploadState>,
79 : target_time: Option<Instant>,
80 : period: Option<Duration>,
81 : }
82 :
83 : impl scheduler::PendingJob for UploadPending {
84 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
85 0 : self.tenant.get_tenant_shard_id()
86 0 : }
87 : }
88 :
89 : struct WriteComplete {
90 : tenant_shard_id: TenantShardId,
91 : completed_at: Instant,
92 : uploaded: Option<LastUploadState>,
93 : next_upload: Option<Instant>,
94 : }
95 :
96 : impl scheduler::Completion for WriteComplete {
97 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
98 0 : &self.tenant_shard_id
99 0 : }
100 : }
101 :
102 : /// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember
103 : /// when we last did a write. We only populate this after doing at least one
104 : /// write for a tenant -- this avoids holding state for tenants that have
105 : /// uploads disabled.
106 : struct UploaderTenantState {
107 : // This Weak only exists to enable culling idle instances of this type
108 : // when the Tenant has been deallocated.
109 : tenant: Weak<Tenant>,
110 :
111 : /// Digest of the serialized heatmap that we last successfully uploaded
112 : last_upload_state: Option<LastUploadState>,
113 :
114 : /// When the last upload attempt completed (may have been successful or failed)
115 : last_upload: Option<Instant>,
116 :
117 : /// When should we next do an upload? None means never.
118 : next_upload: Option<Instant>,
119 : }
120 :
121 : type Scheduler = TenantBackgroundJobs<
122 : HeatmapUploader,
123 : UploadPending,
124 : WriteInProgress,
125 : WriteComplete,
126 : UploadCommand,
127 : >;
128 :
129 : impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
130 : for HeatmapUploader
131 : {
132 0 : async fn schedule(&mut self) -> SchedulingResult<UploadPending> {
133 0 : // Cull any entries in self.tenants whose Arc<Tenant> is gone
134 0 : self.tenants
135 0 : .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some());
136 0 :
137 0 : let now = Instant::now();
138 0 :
139 0 : let mut result = SchedulingResult {
140 0 : jobs: Vec::new(),
141 0 : want_interval: None,
142 0 : };
143 0 :
144 0 : let tenants = self.tenant_manager.get_attached_active_tenant_shards();
145 0 :
146 0 : yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| {
147 0 : let period = match tenant.get_heatmap_period() {
148 : None => {
149 : // Heatmaps are disabled for this tenant
150 0 : return;
151 : }
152 0 : Some(period) => {
153 0 : // If any tenant has asked for uploads more frequent than our scheduling interval,
154 0 : // reduce it to match so that we can keep up. This is mainly useful in testing, where
155 0 : // we may set rather short intervals.
156 0 : result.want_interval = match result.want_interval {
157 0 : None => Some(period),
158 0 : Some(existing) => Some(std::cmp::min(period, existing)),
159 : };
160 :
161 0 : period
162 0 : }
163 0 : };
164 0 :
165 0 : // Stale attachments do not upload anything: if we are in this state, there is probably some
166 0 : // other attachment in mode Single or Multi running on another pageserver, and we don't
167 0 : // want to thrash and overwrite their heatmap uploads.
168 0 : if tenant.get_attach_mode() == AttachmentMode::Stale {
169 0 : return;
170 0 : }
171 0 :
172 0 : // Create an entry in self.tenants if one doesn't already exist: this will later be updated
173 0 : // with the completion time in on_completion.
174 0 : let state = self
175 0 : .tenants
176 0 : .entry(*tenant.get_tenant_shard_id())
177 0 : .or_insert_with(|| UploaderTenantState {
178 0 : tenant: Arc::downgrade(&tenant),
179 0 : last_upload: None,
180 0 : next_upload: Some(now.checked_add(period_warmup(period)).unwrap_or(now)),
181 0 : last_upload_state: None,
182 0 : });
183 0 :
184 0 : // Decline to do the upload if insufficient time has passed
185 0 : if state.next_upload.map(|nu| nu > now).unwrap_or(false) {
186 0 : return;
187 0 : }
188 0 :
189 0 : let last_upload = state.last_upload_state.clone();
190 0 : result.jobs.push(UploadPending {
191 0 : tenant,
192 0 : last_upload,
193 0 : target_time: state.next_upload,
194 0 : period: Some(period),
195 0 : });
196 0 : })
197 0 : .await
198 0 : .ok();
199 0 :
200 0 : result
201 0 : }
202 :
203 0 : fn spawn(
204 0 : &mut self,
205 0 : job: UploadPending,
206 0 : ) -> (
207 0 : WriteInProgress,
208 0 : Pin<Box<dyn Future<Output = WriteComplete> + Send>>,
209 0 : ) {
210 0 : let UploadPending {
211 0 : tenant,
212 0 : last_upload,
213 0 : target_time,
214 0 : period,
215 0 : } = job;
216 0 :
217 0 : let remote_storage = self.remote_storage.clone();
218 0 : let (completion, barrier) = utils::completion::channel();
219 0 : let tenant_shard_id = *tenant.get_tenant_shard_id();
220 0 : (WriteInProgress { barrier }, Box::pin(async move {
221 0 : // Guard for the barrier in [`WriteInProgress`]
222 0 : let _completion = completion;
223 0 :
224 0 : let started_at = Instant::now();
225 0 : let uploaded = match upload_tenant_heatmap(remote_storage, &tenant, last_upload.clone()).await {
226 0 : Ok(UploadHeatmapOutcome::Uploaded(uploaded)) => {
227 0 : let duration = Instant::now().duration_since(started_at);
228 0 : SECONDARY_MODE
229 0 : .upload_heatmap_duration
230 0 : .observe(duration.as_secs_f64());
231 0 : SECONDARY_MODE.upload_heatmap.inc();
232 0 : Some(uploaded)
233 : }
234 0 : Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_upload,
235 0 : Err(UploadHeatmapError::Upload(e)) => {
236 0 : tracing::warn!(
237 0 : "Failed to upload heatmap for tenant {}: {e:#}",
238 0 : tenant.get_tenant_shard_id(),
239 : );
240 0 : let duration = Instant::now().duration_since(started_at);
241 0 : SECONDARY_MODE
242 0 : .upload_heatmap_duration
243 0 : .observe(duration.as_secs_f64());
244 0 : SECONDARY_MODE.upload_heatmap_errors.inc();
245 0 : last_upload
246 : }
247 : Err(UploadHeatmapError::Cancelled) => {
248 0 : tracing::info!("Cancelled heatmap upload, shutting down");
249 0 : last_upload
250 : }
251 : };
252 :
253 0 : let now = Instant::now();
254 :
255 : // If the job had a target execution time, we may check our final execution
256 : // time against that for observability purposes.
257 0 : if let (Some(target_time), Some(period)) = (target_time, period) {
258 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
259 0 : let elapsed = now.duration_since(target_time);
260 0 :
261 0 : warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload);
262 0 : }
263 :
264 0 : let next_upload = tenant
265 0 : .get_heatmap_period()
266 0 : .and_then(|period| now.checked_add(period_jitter(period, 5)));
267 0 :
268 0 : WriteComplete {
269 0 : tenant_shard_id: *tenant.get_tenant_shard_id(),
270 0 : completed_at: now,
271 0 : uploaded,
272 0 : next_upload,
273 0 : }
274 0 : }.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
275 0 : }
276 :
277 0 : fn on_command(
278 0 : &mut self,
279 0 : command: UploadCommand,
280 0 : ) -> Result<UploadPending, SecondaryTenantError> {
281 0 : let tenant_shard_id = command.get_tenant_shard_id();
282 0 :
283 0 : tracing::info!(
284 0 : tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
285 0 : "Starting heatmap write on command");
286 0 : let tenant = self
287 0 : .tenant_manager
288 0 : .get_attached_tenant_shard(*tenant_shard_id)?;
289 0 : if !tenant.is_active() {
290 0 : return Err(GetTenantError::NotActive(*tenant_shard_id).into());
291 0 : }
292 0 :
293 0 : Ok(UploadPending {
294 0 : // Ignore our state for last digest: this forces an upload even if nothing has changed
295 0 : last_upload: None,
296 0 : tenant,
297 0 : target_time: None,
298 0 : period: None,
299 0 : })
300 0 : }
301 :
302 : #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))]
303 : fn on_completion(&mut self, completion: WriteComplete) {
304 : tracing::debug!("Heatmap upload completed");
305 : let WriteComplete {
306 : tenant_shard_id,
307 : completed_at,
308 : uploaded,
309 : next_upload,
310 : } = completion;
311 : use std::collections::hash_map::Entry;
312 : match self.tenants.entry(tenant_shard_id) {
313 : Entry::Vacant(_) => {
314 : // Tenant state was dropped, nothing to update.
315 : }
316 : Entry::Occupied(mut entry) => {
317 : entry.get_mut().last_upload = Some(completed_at);
318 : entry.get_mut().last_upload_state = uploaded;
319 : entry.get_mut().next_upload = next_upload
320 : }
321 : }
322 : }
323 : }
324 :
325 : enum UploadHeatmapOutcome {
326 : /// We successfully wrote to remote storage, with this digest.
327 : Uploaded(LastUploadState),
328 : /// We did not upload because the heatmap digest was unchanged since the last upload
329 : NoChange,
330 : /// We skipped the upload for some reason, such as tenant/timeline not ready
331 : Skipped,
332 : }
333 :
334 : #[derive(thiserror::Error, Debug)]
335 : enum UploadHeatmapError {
336 : #[error("Cancelled")]
337 : Cancelled,
338 :
339 : #[error(transparent)]
340 : Upload(#[from] anyhow::Error),
341 : }
342 :
343 : /// Digests describing the heatmap we most recently uploaded successfully.
344 : ///
345 : /// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag,
346 : /// which is also an md5sum.
347 : #[derive(Clone)]
348 : struct LastUploadState {
349 : // Digest of json-encoded HeatMapTenant
350 : uploaded_digest: md5::Digest,
351 :
352 : // Digest without atimes set.
353 : layers_only_digest: md5::Digest,
354 : }
355 :
356 : /// The inner upload operation. This will skip if `last_digest` is Some and matches the digest
357 : /// of the object we would have uploaded.
358 0 : async fn upload_tenant_heatmap(
359 0 : remote_storage: GenericRemoteStorage,
360 0 : tenant: &Arc<Tenant>,
361 0 : last_upload: Option<LastUploadState>,
362 0 : ) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
363 0 : debug_assert_current_span_has_tenant_id();
364 0 :
365 0 : let generation = tenant.get_generation();
366 0 : debug_assert!(!generation.is_none());
367 0 : if generation.is_none() {
368 : // We do not expect this: None generations should only appear in historic layer metadata, not in running Tenants
369 0 : tracing::warn!("Skipping heatmap upload for tenant with generation==None");
370 0 : return Ok(UploadHeatmapOutcome::Skipped);
371 0 : }
372 0 :
373 0 : let mut heatmap = HeatMapTenant {
374 0 : timelines: Vec::new(),
375 0 : generation,
376 0 : upload_period_ms: tenant.get_heatmap_period().map(|p| p.as_millis()),
377 0 : };
378 0 : let timelines = tenant.timelines.lock().unwrap().clone();
379 :
380 : // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
381 : // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
382 : // in remote storage.
383 0 : let Ok(_guard) = tenant.gate.enter() else {
384 0 : tracing::info!("Skipping heatmap upload for tenant which is shutting down");
385 0 : return Err(UploadHeatmapError::Cancelled);
386 : };
387 :
388 0 : for (timeline_id, timeline) in timelines {
389 0 : let heatmap_timeline = timeline.generate_heatmap().await;
390 0 : match heatmap_timeline {
391 : None => {
392 0 : tracing::debug!(
393 0 : "Skipping heatmap upload because timeline {timeline_id} is not ready"
394 : );
395 0 : return Ok(UploadHeatmapOutcome::Skipped);
396 : }
397 0 : Some(heatmap_timeline) => {
398 0 : heatmap.timelines.push(heatmap_timeline);
399 0 : }
400 : }
401 : }
402 :
403 : // Serialize the heatmap
404 0 : let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
405 :
406 : // Drop out early if nothing changed since our last upload
407 0 : let digest = md5::compute(&bytes);
408 0 : if Some(&digest) == last_upload.as_ref().map(|d| &d.uploaded_digest) {
409 0 : return Ok(UploadHeatmapOutcome::NoChange);
410 0 : }
411 0 :
412 0 : // Calculate a digest that omits atimes, so that we can distinguish actual changes in
413 0 : // layers from changes only in atimes.
414 0 : let heatmap_size_bytes = heatmap.get_stats().bytes;
415 0 : let layers_only_bytes =
416 0 : serde_json::to_vec(&heatmap.strip_atimes()).map_err(|e| anyhow::anyhow!(e))?;
417 0 : let layers_only_digest = md5::compute(&layers_only_bytes);
418 0 : if heatmap_size_bytes < tenant.get_checkpoint_distance() {
419 : // For small tenants, skip upload if only atimes changed. This avoids doing frequent
420 : // uploads from long-idle tenants whose atimes are just incremented by periodic
421 : // size calculations.
422 0 : if Some(&layers_only_digest) == last_upload.as_ref().map(|d| &d.layers_only_digest) {
423 0 : return Ok(UploadHeatmapOutcome::NoChange);
424 0 : }
425 0 : }
426 :
427 0 : let bytes = bytes::Bytes::from(bytes);
428 0 : let size = bytes.len();
429 0 :
430 0 : let path = remote_heatmap_path(tenant.get_tenant_shard_id());
431 0 :
432 0 : let cancel = &tenant.cancel;
433 0 :
434 0 : tracing::debug!("Uploading {size} byte heatmap to {path}");
435 0 : if let Err(e) = backoff::retry(
436 0 : || async {
437 0 : let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
438 0 : remote_storage
439 0 : .upload_storage_object(bytes, size, &path, cancel)
440 0 : .await
441 0 : },
442 0 : TimeoutOrCancel::caused_by_cancel,
443 0 : 3,
444 0 : u32::MAX,
445 0 : "Uploading heatmap",
446 0 : cancel,
447 0 : )
448 0 : .await
449 0 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
450 0 : .and_then(|x| x)
451 : {
452 0 : if cancel.is_cancelled() {
453 0 : return Err(UploadHeatmapError::Cancelled);
454 : } else {
455 0 : return Err(e.into());
456 : }
457 0 : }
458 0 :
459 0 : // After a successful upload persist the fresh heatmap to disk.
460 0 : // When restarting, the tenant will read the heatmap from disk
461 0 : // and additively generate a new heatmap (see [`Timeline::generate_heatmap`]).
462 0 : // If the heatmap is stale, the additive generation can lead to keeping previously
463 0 : // evicted timelines on the secondarie's disk.
464 0 : let tenant_shard_id = tenant.get_tenant_shard_id();
465 0 : let heatmap_path = tenant.conf.tenant_heatmap_path(tenant_shard_id);
466 0 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
467 0 : if let Err(err) = VirtualFile::crashsafe_overwrite(heatmap_path, temp_path, bytes).await {
468 0 : tracing::warn!("Non fatal IO error writing to disk after heatmap upload: {err}");
469 0 : }
470 :
471 0 : tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
472 :
473 0 : Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {
474 0 : uploaded_digest: digest,
475 0 : layers_only_digest,
476 0 : }))
477 0 : }
|