Line data Source code
1 : use std::{
2 : collections::HashMap,
3 : pin::Pin,
4 : sync::{Arc, Weak},
5 : time::{Duration, Instant},
6 : };
7 :
8 : use crate::{
9 : metrics::SECONDARY_MODE,
10 : tenant::{
11 : config::AttachmentMode,
12 : mgr::TenantManager,
13 : remote_timeline_client::remote_heatmap_path,
14 : span::debug_assert_current_span_has_tenant_id,
15 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
16 : Tenant,
17 : },
18 : };
19 :
20 : use futures::Future;
21 : use md5;
22 : use pageserver_api::shard::TenantShardId;
23 : use rand::Rng;
24 : use remote_storage::{GenericRemoteStorage, TimeoutOrCancel};
25 :
26 : use super::{
27 : heatmap::HeatMapTenant,
28 : scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs},
29 : CommandRequest, UploadCommand,
30 : };
31 : use tokio_util::sync::CancellationToken;
32 : use tracing::{info_span, instrument, Instrument};
33 : use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
34 :
35 0 : pub(super) async fn heatmap_uploader_task(
36 0 : tenant_manager: Arc<TenantManager>,
37 0 : remote_storage: GenericRemoteStorage,
38 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
39 0 : background_jobs_can_start: Barrier,
40 0 : cancel: CancellationToken,
41 0 : ) {
42 0 : let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency;
43 0 :
44 0 : let generator = HeatmapUploader {
45 0 : tenant_manager,
46 0 : remote_storage,
47 0 : cancel: cancel.clone(),
48 0 : tenants: HashMap::new(),
49 0 : };
50 0 : let mut scheduler = Scheduler::new(generator, concurrency);
51 0 :
52 0 : scheduler
53 0 : .run(command_queue, background_jobs_can_start, cancel)
54 0 : .instrument(info_span!("heatmap_uploader"))
55 0 : .await
56 0 : }
57 :
58 : /// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
59 : /// handling loop and mutates it as needed: there are no locks here, because that event loop
60 : /// can hold &mut references to this type throughout.
61 : struct HeatmapUploader {
62 : tenant_manager: Arc<TenantManager>,
63 : remote_storage: GenericRemoteStorage,
64 : cancel: CancellationToken,
65 :
66 : tenants: HashMap<TenantShardId, UploaderTenantState>,
67 : }
68 :
69 : struct WriteInProgress {
70 : barrier: Barrier,
71 : }
72 :
73 : impl RunningJob for WriteInProgress {
74 0 : fn get_barrier(&self) -> Barrier {
75 0 : self.barrier.clone()
76 0 : }
77 : }
78 :
79 : struct UploadPending {
80 : tenant: Arc<Tenant>,
81 : last_digest: Option<md5::Digest>,
82 : target_time: Option<Instant>,
83 : period: Option<Duration>,
84 : }
85 :
86 : impl scheduler::PendingJob for UploadPending {
87 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
88 0 : self.tenant.get_tenant_shard_id()
89 0 : }
90 : }
91 :
92 : struct WriteComplete {
93 : tenant_shard_id: TenantShardId,
94 : completed_at: Instant,
95 : digest: Option<md5::Digest>,
96 : next_upload: Option<Instant>,
97 : }
98 :
99 : impl scheduler::Completion for WriteComplete {
100 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
101 0 : &self.tenant_shard_id
102 0 : }
103 : }
104 :
105 : /// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember
106 : /// when we last did a write. We only populate this after doing at least one
107 : /// write for a tenant -- this avoids holding state for tenants that have
108 : /// uploads disabled.
109 :
110 : struct UploaderTenantState {
111 : // This Weak only exists to enable culling idle instances of this type
112 : // when the Tenant has been deallocated.
113 : tenant: Weak<Tenant>,
114 :
115 : /// Digest of the serialized heatmap that we last successfully uploaded
116 : ///
117 : /// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag,
118 : /// which is also an md5sum.
119 : last_digest: Option<md5::Digest>,
120 :
121 : /// When the last upload attempt completed (may have been successful or failed)
122 : last_upload: Option<Instant>,
123 :
124 : /// When should we next do an upload? None means never.
125 : next_upload: Option<Instant>,
126 : }
127 :
128 : type Scheduler = TenantBackgroundJobs<
129 : HeatmapUploader,
130 : UploadPending,
131 : WriteInProgress,
132 : WriteComplete,
133 : UploadCommand,
134 : >;
135 :
136 : impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
137 : for HeatmapUploader
138 : {
139 0 : async fn schedule(&mut self) -> SchedulingResult<UploadPending> {
140 0 : // Cull any entries in self.tenants whose Arc<Tenant> is gone
141 0 : self.tenants
142 0 : .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some());
143 0 :
144 0 : let now = Instant::now();
145 0 :
146 0 : let mut result = SchedulingResult {
147 0 : jobs: Vec::new(),
148 0 : want_interval: None,
149 0 : };
150 0 :
151 0 : let tenants = self.tenant_manager.get_attached_active_tenant_shards();
152 0 :
153 0 : yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| {
154 0 : let period = match tenant.get_heatmap_period() {
155 : None => {
156 : // Heatmaps are disabled for this tenant
157 0 : return;
158 : }
159 0 : Some(period) => {
160 0 : // If any tenant has asked for uploads more frequent than our scheduling interval,
161 0 : // reduce it to match so that we can keep up. This is mainly useful in testing, where
162 0 : // we may set rather short intervals.
163 0 : result.want_interval = match result.want_interval {
164 0 : None => Some(period),
165 0 : Some(existing) => Some(std::cmp::min(period, existing)),
166 : };
167 :
168 0 : period
169 0 : }
170 0 : };
171 0 :
172 0 : // Stale attachments do not upload anything: if we are in this state, there is probably some
173 0 : // other attachment in mode Single or Multi running on another pageserver, and we don't
174 0 : // want to thrash and overwrite their heatmap uploads.
175 0 : if tenant.get_attach_mode() == AttachmentMode::Stale {
176 0 : return;
177 0 : }
178 0 :
179 0 : // Create an entry in self.tenants if one doesn't already exist: this will later be updated
180 0 : // with the completion time in on_completion.
181 0 : let state = self
182 0 : .tenants
183 0 : .entry(*tenant.get_tenant_shard_id())
184 0 : .or_insert_with(|| {
185 0 : let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period);
186 0 :
187 0 : UploaderTenantState {
188 0 : tenant: Arc::downgrade(&tenant),
189 0 : last_upload: None,
190 0 : next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)),
191 0 : last_digest: None,
192 0 : }
193 0 : });
194 0 :
195 0 : // Decline to do the upload if insufficient time has passed
196 0 : if state.next_upload.map(|nu| nu > now).unwrap_or(false) {
197 0 : return;
198 0 : }
199 0 :
200 0 : let last_digest = state.last_digest;
201 0 : result.jobs.push(UploadPending {
202 0 : tenant,
203 0 : last_digest,
204 0 : target_time: state.next_upload,
205 0 : period: Some(period),
206 0 : });
207 0 : })
208 0 : .await
209 0 : .ok();
210 0 :
211 0 : result
212 0 : }
213 :
214 0 : fn spawn(
215 0 : &mut self,
216 0 : job: UploadPending,
217 0 : ) -> (
218 0 : WriteInProgress,
219 0 : Pin<Box<dyn Future<Output = WriteComplete> + Send>>,
220 0 : ) {
221 0 : let UploadPending {
222 0 : tenant,
223 0 : last_digest,
224 0 : target_time,
225 0 : period,
226 0 : } = job;
227 0 :
228 0 : let remote_storage = self.remote_storage.clone();
229 0 : let (completion, barrier) = utils::completion::channel();
230 0 : let tenant_shard_id = *tenant.get_tenant_shard_id();
231 0 : (WriteInProgress { barrier }, Box::pin(async move {
232 0 : // Guard for the barrier in [`WriteInProgress`]
233 0 : let _completion = completion;
234 0 :
235 0 : let started_at = Instant::now();
236 0 : let digest = match upload_tenant_heatmap(remote_storage, &tenant, last_digest).await {
237 0 : Ok(UploadHeatmapOutcome::Uploaded(digest)) => {
238 0 : let duration = Instant::now().duration_since(started_at);
239 0 : SECONDARY_MODE
240 0 : .upload_heatmap_duration
241 0 : .observe(duration.as_secs_f64());
242 0 : SECONDARY_MODE.upload_heatmap.inc();
243 0 : Some(digest)
244 : }
245 0 : Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_digest,
246 0 : Err(UploadHeatmapError::Upload(e)) => {
247 0 : tracing::warn!(
248 0 : "Failed to upload heatmap for tenant {}: {e:#}",
249 0 : tenant.get_tenant_shard_id(),
250 0 : );
251 0 : let duration = Instant::now().duration_since(started_at);
252 0 : SECONDARY_MODE
253 0 : .upload_heatmap_duration
254 0 : .observe(duration.as_secs_f64());
255 0 : SECONDARY_MODE.upload_heatmap_errors.inc();
256 0 : last_digest
257 : }
258 : Err(UploadHeatmapError::Cancelled) => {
259 0 : tracing::info!("Cancelled heatmap upload, shutting down");
260 0 : last_digest
261 : }
262 : };
263 :
264 0 : let now = Instant::now();
265 :
266 : // If the job had a target execution time, we may check our final execution
267 : // time against that for observability purposes.
268 0 : if let (Some(target_time), Some(period)) = (target_time, period) {
269 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
270 0 : let elapsed = now.duration_since(target_time);
271 0 :
272 0 : warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload);
273 0 : }
274 :
275 0 : let next_upload = tenant
276 0 : .get_heatmap_period()
277 0 : .and_then(|period| now.checked_add(period));
278 0 :
279 0 : WriteComplete {
280 0 : tenant_shard_id: *tenant.get_tenant_shard_id(),
281 0 : completed_at: now,
282 0 : digest,
283 0 : next_upload,
284 0 : }
285 0 : }.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
286 0 : }
287 :
288 0 : fn on_command(&mut self, command: UploadCommand) -> anyhow::Result<UploadPending> {
289 0 : let tenant_shard_id = command.get_tenant_shard_id();
290 0 :
291 0 : tracing::info!(
292 0 : tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
293 0 : "Starting heatmap write on command");
294 0 : let tenant = self
295 0 : .tenant_manager
296 0 : .get_attached_tenant_shard(*tenant_shard_id, true)
297 0 : .map_err(|e| anyhow::anyhow!(e))?;
298 :
299 0 : Ok(UploadPending {
300 0 : // Ignore our state for last digest: this forces an upload even if nothing has changed
301 0 : last_digest: None,
302 0 : tenant,
303 0 : target_time: None,
304 0 : period: None,
305 0 : })
306 0 : }
307 :
308 0 : #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))]
309 : fn on_completion(&mut self, completion: WriteComplete) {
310 0 : tracing::debug!("Heatmap upload completed");
311 : let WriteComplete {
312 : tenant_shard_id,
313 : completed_at,
314 : digest,
315 : next_upload,
316 : } = completion;
317 : use std::collections::hash_map::Entry;
318 : match self.tenants.entry(tenant_shard_id) {
319 : Entry::Vacant(_) => {
320 : // Tenant state was dropped, nothing to update.
321 : }
322 : Entry::Occupied(mut entry) => {
323 : entry.get_mut().last_upload = Some(completed_at);
324 : entry.get_mut().last_digest = digest;
325 : entry.get_mut().next_upload = next_upload
326 : }
327 : }
328 : }
329 : }
330 :
331 : enum UploadHeatmapOutcome {
332 : /// We successfully wrote to remote storage, with this digest.
333 : Uploaded(md5::Digest),
334 : /// We did not upload because the heatmap digest was unchanged since the last upload
335 : NoChange,
336 : /// We skipped the upload for some reason, such as tenant/timeline not ready
337 : Skipped,
338 : }
339 :
340 0 : #[derive(thiserror::Error, Debug)]
341 : enum UploadHeatmapError {
342 : #[error("Cancelled")]
343 : Cancelled,
344 :
345 : #[error(transparent)]
346 : Upload(#[from] anyhow::Error),
347 : }
348 :
349 : /// The inner upload operation. This will skip if `last_digest` is Some and matches the digest
350 : /// of the object we would have uploaded.
351 0 : async fn upload_tenant_heatmap(
352 0 : remote_storage: GenericRemoteStorage,
353 0 : tenant: &Arc<Tenant>,
354 0 : last_digest: Option<md5::Digest>,
355 0 : ) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
356 0 : debug_assert_current_span_has_tenant_id();
357 0 :
358 0 : let generation = tenant.get_generation();
359 0 : if generation.is_none() {
360 : // We do not expect this: generations were implemented before heatmap uploads. However,
361 : // handle it so that we don't have to make the generation in the heatmap an Option<>
362 : // (Generation::none is not serializable)
363 0 : tracing::warn!("Skipping heatmap upload for tenant with generation==None");
364 0 : return Ok(UploadHeatmapOutcome::Skipped);
365 0 : }
366 0 :
367 0 : let mut heatmap = HeatMapTenant {
368 0 : timelines: Vec::new(),
369 0 : generation,
370 0 : };
371 0 : let timelines = tenant.timelines.lock().unwrap().clone();
372 :
373 : // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
374 : // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
375 : // in remote storage.
376 0 : let Ok(_guard) = tenant.gate.enter() else {
377 0 : tracing::info!("Skipping heatmap upload for tenant which is shutting down");
378 0 : return Err(UploadHeatmapError::Cancelled);
379 : };
380 :
381 0 : for (timeline_id, timeline) in timelines {
382 0 : let heatmap_timeline = timeline.generate_heatmap().await;
383 0 : match heatmap_timeline {
384 : None => {
385 0 : tracing::debug!(
386 0 : "Skipping heatmap upload because timeline {timeline_id} is not ready"
387 0 : );
388 0 : return Ok(UploadHeatmapOutcome::Skipped);
389 : }
390 0 : Some(heatmap_timeline) => {
391 0 : heatmap.timelines.push(heatmap_timeline);
392 0 : }
393 : }
394 : }
395 :
396 : // Serialize the heatmap
397 0 : let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
398 0 : let bytes = bytes::Bytes::from(bytes);
399 0 : let size = bytes.len();
400 0 :
401 0 : // Drop out early if nothing changed since our last upload
402 0 : let digest = md5::compute(&bytes);
403 0 : if Some(digest) == last_digest {
404 0 : return Ok(UploadHeatmapOutcome::NoChange);
405 0 : }
406 0 :
407 0 : let path = remote_heatmap_path(tenant.get_tenant_shard_id());
408 0 :
409 0 : let cancel = &tenant.cancel;
410 :
411 0 : tracing::debug!("Uploading {size} byte heatmap to {path}");
412 0 : if let Err(e) = backoff::retry(
413 0 : || async {
414 0 : let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
415 0 : remote_storage
416 0 : .upload_storage_object(bytes, size, &path, cancel)
417 0 : .await
418 0 : },
419 0 : TimeoutOrCancel::caused_by_cancel,
420 0 : 3,
421 0 : u32::MAX,
422 0 : "Uploading heatmap",
423 0 : cancel,
424 0 : )
425 0 : .await
426 0 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
427 0 : .and_then(|x| x)
428 : {
429 0 : if cancel.is_cancelled() {
430 0 : return Err(UploadHeatmapError::Cancelled);
431 : } else {
432 0 : return Err(e.into());
433 : }
434 0 : }
435 :
436 0 : tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
437 :
438 0 : Ok(UploadHeatmapOutcome::Uploaded(digest))
439 0 : }
|