Line data Source code
1 : use std::{
2 : collections::HashMap,
3 : pin::Pin,
4 : sync::{Arc, Weak},
5 : time::{Duration, Instant},
6 : };
7 :
8 : use crate::{
9 : metrics::SECONDARY_MODE,
10 : tenant::{
11 : config::AttachmentMode,
12 : mgr::TenantManager,
13 : remote_timeline_client::remote_heatmap_path,
14 : span::debug_assert_current_span_has_tenant_id,
15 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
16 : Tenant,
17 : },
18 : };
19 :
20 : use futures::Future;
21 : use md5;
22 : use pageserver_api::shard::TenantShardId;
23 : use rand::Rng;
24 : use remote_storage::GenericRemoteStorage;
25 :
26 : use super::{
27 : scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs},
28 : CommandRequest,
29 : };
30 : use tokio_util::sync::CancellationToken;
31 : use tracing::{info_span, instrument, Instrument};
32 : use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
33 :
34 : use super::{heatmap::HeatMapTenant, UploadCommand};
35 :
36 604 : pub(super) async fn heatmap_uploader_task(
37 604 : tenant_manager: Arc<TenantManager>,
38 604 : remote_storage: GenericRemoteStorage,
39 604 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
40 604 : background_jobs_can_start: Barrier,
41 604 : cancel: CancellationToken,
42 604 : ) {
43 604 : let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency;
44 604 :
45 604 : let generator = HeatmapUploader {
46 604 : tenant_manager,
47 604 : remote_storage,
48 604 : cancel: cancel.clone(),
49 604 : tenants: HashMap::new(),
50 604 : };
51 604 : let mut scheduler = Scheduler::new(generator, concurrency);
52 604 :
53 604 : scheduler
54 604 : .run(command_queue, background_jobs_can_start, cancel)
55 604 : .instrument(info_span!("heatmap_uploader"))
56 946 : .await
57 172 : }
58 :
59 : /// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
60 : /// handling loop and mutates it as needed: there are no locks here, because that event loop
61 : /// can hold &mut references to this type throughout.
62 : struct HeatmapUploader {
63 : tenant_manager: Arc<TenantManager>,
64 : remote_storage: GenericRemoteStorage,
65 : cancel: CancellationToken,
66 :
67 : tenants: HashMap<TenantShardId, UploaderTenantState>,
68 : }
69 :
70 : struct WriteInProgress {
71 : barrier: Barrier,
72 : }
73 :
74 : impl RunningJob for WriteInProgress {
75 8 : fn get_barrier(&self) -> Barrier {
76 8 : self.barrier.clone()
77 8 : }
78 : }
79 :
80 : struct UploadPending {
81 : tenant: Arc<Tenant>,
82 : last_digest: Option<md5::Digest>,
83 : target_time: Option<Instant>,
84 : period: Option<Duration>,
85 : }
86 :
87 : impl scheduler::PendingJob for UploadPending {
88 24 : fn get_tenant_shard_id(&self) -> &TenantShardId {
89 24 : self.tenant.get_tenant_shard_id()
90 24 : }
91 : }
92 :
93 : struct WriteComplete {
94 : tenant_shard_id: TenantShardId,
95 : completed_at: Instant,
96 : digest: Option<md5::Digest>,
97 : next_upload: Option<Instant>,
98 : }
99 :
100 : impl scheduler::Completion for WriteComplete {
101 8 : fn get_tenant_shard_id(&self) -> &TenantShardId {
102 8 : &self.tenant_shard_id
103 8 : }
104 : }
105 :
106 : /// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember
107 : /// when we last did a write. We only populate this after doing at least one
108 : /// write for a tenant -- this avoids holding state for tenants that have
109 : /// uploads disabled.
110 :
111 : struct UploaderTenantState {
112 : // This Weak only exists to enable culling idle instances of this type
113 : // when the Tenant has been deallocated.
114 : tenant: Weak<Tenant>,
115 :
116 : /// Digest of the serialized heatmap that we last successfully uploaded
117 : ///
118 : /// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag,
119 : /// which is also an md5sum.
120 : last_digest: Option<md5::Digest>,
121 :
122 : /// When the last upload attempt completed (may have been successful or failed)
123 : last_upload: Option<Instant>,
124 :
125 : /// When should we next do an upload? None means never.
126 : next_upload: Option<Instant>,
127 : }
128 :
129 : type Scheduler = TenantBackgroundJobs<
130 : HeatmapUploader,
131 : UploadPending,
132 : WriteInProgress,
133 : WriteComplete,
134 : UploadCommand,
135 : >;
136 :
137 : impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
138 : for HeatmapUploader
139 : {
140 1180 : async fn schedule(&mut self) -> SchedulingResult<UploadPending> {
141 1180 : // Cull any entries in self.tenants whose Arc<Tenant> is gone
142 1180 : self.tenants
143 1180 : .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some());
144 1180 :
145 1180 : let now = Instant::now();
146 1180 :
147 1180 : let mut result = SchedulingResult {
148 1180 : jobs: Vec::new(),
149 1180 : want_interval: None,
150 1180 : };
151 1180 :
152 1180 : let tenants = self.tenant_manager.get_attached_active_tenant_shards();
153 1180 :
154 1180 : yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| {
155 932 : let period = match tenant.get_heatmap_period() {
156 : None => {
157 : // Heatmaps are disabled for this tenant
158 932 : return;
159 : }
160 0 : Some(period) => {
161 0 : // If any tenant has asked for uploads more frequent than our scheduling interval,
162 0 : // reduce it to match so that we can keep up. This is mainly useful in testing, where
163 0 : // we may set rather short intervals.
164 0 : result.want_interval = match result.want_interval {
165 0 : None => Some(period),
166 0 : Some(existing) => Some(std::cmp::min(period, existing)),
167 : };
168 :
169 0 : period
170 0 : }
171 0 : };
172 0 :
173 0 : // Stale attachments do not upload anything: if we are in this state, there is probably some
174 0 : // other attachment in mode Single or Multi running on another pageserver, and we don't
175 0 : // want to thrash and overwrite their heatmap uploads.
176 0 : if tenant.get_attach_mode() == AttachmentMode::Stale {
177 0 : return;
178 0 : }
179 0 :
180 0 : // Create an entry in self.tenants if one doesn't already exist: this will later be updated
181 0 : // with the completion time in on_completion.
182 0 : let state = self
183 0 : .tenants
184 0 : .entry(*tenant.get_tenant_shard_id())
185 0 : .or_insert_with(|| {
186 0 : let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period);
187 0 :
188 0 : UploaderTenantState {
189 0 : tenant: Arc::downgrade(&tenant),
190 0 : last_upload: None,
191 0 : next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)),
192 0 : last_digest: None,
193 0 : }
194 0 : });
195 0 :
196 0 : // Decline to do the upload if insufficient time has passed
197 0 : if state.next_upload.map(|nu| nu > now).unwrap_or(false) {
198 0 : return;
199 0 : }
200 0 :
201 0 : let last_digest = state.last_digest;
202 0 : result.jobs.push(UploadPending {
203 0 : tenant,
204 0 : last_digest,
205 0 : target_time: state.next_upload,
206 0 : period: Some(period),
207 0 : });
208 1180 : })
209 0 : .await
210 1180 : .ok();
211 1180 :
212 1180 : result
213 1180 : }
214 :
215 8 : fn spawn(
216 8 : &mut self,
217 8 : job: UploadPending,
218 8 : ) -> (
219 8 : WriteInProgress,
220 8 : Pin<Box<dyn Future<Output = WriteComplete> + Send>>,
221 8 : ) {
222 8 : let UploadPending {
223 8 : tenant,
224 8 : last_digest,
225 8 : target_time,
226 8 : period,
227 8 : } = job;
228 8 :
229 8 : let remote_storage = self.remote_storage.clone();
230 8 : let (completion, barrier) = utils::completion::channel();
231 8 : let tenant_shard_id = *tenant.get_tenant_shard_id();
232 8 : (WriteInProgress { barrier }, Box::pin(async move {
233 8 : // Guard for the barrier in [`WriteInProgress`]
234 8 : let _completion = completion;
235 8 :
236 8 : let started_at = Instant::now();
237 41 : let digest = match upload_tenant_heatmap(remote_storage, &tenant, last_digest).await {
238 8 : Ok(UploadHeatmapOutcome::Uploaded(digest)) => {
239 8 : let duration = Instant::now().duration_since(started_at);
240 8 : SECONDARY_MODE
241 8 : .upload_heatmap_duration
242 8 : .observe(duration.as_secs_f64());
243 8 : SECONDARY_MODE.upload_heatmap.inc();
244 8 : Some(digest)
245 : }
246 0 : Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_digest,
247 0 : Err(UploadHeatmapError::Upload(e)) => {
248 0 : tracing::warn!(
249 0 : "Failed to upload heatmap for tenant {}: {e:#}",
250 0 : tenant.get_tenant_shard_id(),
251 0 : );
252 0 : let duration = Instant::now().duration_since(started_at);
253 0 : SECONDARY_MODE
254 0 : .upload_heatmap_duration
255 0 : .observe(duration.as_secs_f64());
256 0 : SECONDARY_MODE.upload_heatmap_errors.inc();
257 0 : last_digest
258 : }
259 : Err(UploadHeatmapError::Cancelled) => {
260 0 : tracing::info!("Cancelled heatmap upload, shutting down");
261 0 : last_digest
262 : }
263 : };
264 :
265 8 : let now = Instant::now();
266 :
267 : // If the job had a target execution time, we may check our final execution
268 : // time against that for observability purposes.
269 8 : if let (Some(target_time), Some(period)) = (target_time, period) {
270 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
271 0 : let elapsed = now.duration_since(target_time);
272 0 :
273 0 : warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload);
274 8 : }
275 :
276 8 : let next_upload = tenant
277 8 : .get_heatmap_period()
278 8 : .and_then(|period| now.checked_add(period));
279 8 :
280 8 : WriteComplete {
281 8 : tenant_shard_id: *tenant.get_tenant_shard_id(),
282 8 : completed_at: now,
283 8 : digest,
284 8 : next_upload,
285 8 : }
286 8 : }.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
287 8 : }
288 :
289 8 : fn on_command(&mut self, command: UploadCommand) -> anyhow::Result<UploadPending> {
290 8 : let tenant_shard_id = command.get_tenant_shard_id();
291 8 :
292 8 : tracing::info!(
293 8 : tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
294 8 : "Starting heatmap write on command");
295 8 : let tenant = self
296 8 : .tenant_manager
297 8 : .get_attached_tenant_shard(*tenant_shard_id, true)
298 8 : .map_err(|e| anyhow::anyhow!(e))?;
299 :
300 8 : Ok(UploadPending {
301 8 : // Ignore our state for last digest: this forces an upload even if nothing has changed
302 8 : last_digest: None,
303 8 : tenant,
304 8 : target_time: None,
305 8 : period: None,
306 8 : })
307 8 : }
308 :
309 8 : #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))]
310 : fn on_completion(&mut self, completion: WriteComplete) {
311 0 : tracing::debug!("Heatmap upload completed");
312 : let WriteComplete {
313 : tenant_shard_id,
314 : completed_at,
315 : digest,
316 : next_upload,
317 : } = completion;
318 : use std::collections::hash_map::Entry;
319 : match self.tenants.entry(tenant_shard_id) {
320 : Entry::Vacant(_) => {
321 : // Tenant state was dropped, nothing to update.
322 : }
323 : Entry::Occupied(mut entry) => {
324 : entry.get_mut().last_upload = Some(completed_at);
325 : entry.get_mut().last_digest = digest;
326 : entry.get_mut().next_upload = next_upload
327 : }
328 : }
329 : }
330 : }
331 :
332 : enum UploadHeatmapOutcome {
333 : /// We successfully wrote to remote storage, with this digest.
334 : Uploaded(md5::Digest),
335 : /// We did not upload because the heatmap digest was unchanged since the last upload
336 : NoChange,
337 : /// We skipped the upload for some reason, such as tenant/timeline not ready
338 : Skipped,
339 : }
340 :
341 0 : #[derive(thiserror::Error, Debug)]
342 : enum UploadHeatmapError {
343 : #[error("Cancelled")]
344 : Cancelled,
345 :
346 : #[error(transparent)]
347 : Upload(#[from] anyhow::Error),
348 : }
349 :
350 : /// The inner upload operation. This will skip if `last_digest` is Some and matches the digest
351 : /// of the object we would have uploaded.
352 8 : async fn upload_tenant_heatmap(
353 8 : remote_storage: GenericRemoteStorage,
354 8 : tenant: &Arc<Tenant>,
355 8 : last_digest: Option<md5::Digest>,
356 8 : ) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
357 8 : debug_assert_current_span_has_tenant_id();
358 8 :
359 8 : let generation = tenant.get_generation();
360 8 : if generation.is_none() {
361 : // We do not expect this: generations were implemented before heatmap uploads. However,
362 : // handle it so that we don't have to make the generation in the heatmap an Option<>
363 : // (Generation::none is not serializable)
364 0 : tracing::warn!("Skipping heatmap upload for tenant with generation==None");
365 0 : return Ok(UploadHeatmapOutcome::Skipped);
366 8 : }
367 8 :
368 8 : let mut heatmap = HeatMapTenant {
369 8 : timelines: Vec::new(),
370 8 : generation,
371 8 : };
372 8 : let timelines = tenant.timelines.lock().unwrap().clone();
373 :
374 : // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
375 : // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
376 : // in remote storage.
377 8 : let _guard = match tenant.gate.enter() {
378 8 : Ok(g) => g,
379 : Err(_) => {
380 0 : tracing::info!("Skipping heatmap upload for tenant which is shutting down");
381 0 : return Err(UploadHeatmapError::Cancelled);
382 : }
383 : };
384 :
385 16 : for (timeline_id, timeline) in timelines {
386 17 : let heatmap_timeline = timeline.generate_heatmap().await;
387 8 : match heatmap_timeline {
388 : None => {
389 0 : tracing::debug!(
390 0 : "Skipping heatmap upload because timeline {timeline_id} is not ready"
391 0 : );
392 0 : return Ok(UploadHeatmapOutcome::Skipped);
393 : }
394 8 : Some(heatmap_timeline) => {
395 8 : heatmap.timelines.push(heatmap_timeline);
396 8 : }
397 : }
398 : }
399 :
400 : // Serialize the heatmap
401 8 : let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
402 8 : let bytes = bytes::Bytes::from(bytes);
403 8 : let size = bytes.len();
404 8 :
405 8 : // Drop out early if nothing changed since our last upload
406 8 : let digest = md5::compute(&bytes);
407 8 : if Some(digest) == last_digest {
408 0 : return Ok(UploadHeatmapOutcome::NoChange);
409 8 : }
410 8 :
411 8 : let path = remote_heatmap_path(tenant.get_tenant_shard_id());
412 8 :
413 8 : let cancel = &tenant.cancel;
414 :
415 0 : tracing::debug!("Uploading {size} byte heatmap to {path}");
416 8 : if let Err(e) = backoff::retry(
417 8 : || async {
418 8 : let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
419 8 : remote_storage
420 8 : .upload_storage_object(bytes, size, &path)
421 24 : .await
422 8 : },
423 8 : |_| false,
424 8 : 3,
425 8 : u32::MAX,
426 8 : "Uploading heatmap",
427 8 : cancel,
428 8 : )
429 24 : .await
430 8 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
431 8 : .and_then(|x| x)
432 : {
433 0 : if cancel.is_cancelled() {
434 0 : return Err(UploadHeatmapError::Cancelled);
435 : } else {
436 0 : return Err(e.into());
437 : }
438 8 : }
439 :
440 8 : tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
441 :
442 8 : Ok(UploadHeatmapOutcome::Uploaded(digest))
443 8 : }
|