Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::pin::Pin;
3 : use std::str::FromStr;
4 : use std::sync::Arc;
5 : use std::time::{Duration, Instant, SystemTime};
6 :
7 : use crate::metrics::{STORAGE_IO_SIZE, StorageIoSizeOperation};
8 : use camino::Utf8PathBuf;
9 : use chrono::format::{DelayedFormat, StrftimeItems};
10 : use futures::Future;
11 : use metrics::UIntGauge;
12 : use pageserver_api::models::SecondaryProgress;
13 : use pageserver_api::shard::TenantShardId;
14 : use remote_storage::{DownloadError, DownloadKind, DownloadOpts, Etag, GenericRemoteStorage};
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::{Instrument, info_span, instrument, warn};
17 : use utils::completion::Barrier;
18 : use utils::crashsafe::path_with_suffix_extension;
19 : use utils::id::TimelineId;
20 : use utils::{backoff, failpoint_support, fs_ext, pausable_failpoint, serde_system_time};
21 :
22 : use super::heatmap::{HeatMapLayer, HeatMapTenant, HeatMapTimeline};
23 : use super::scheduler::{
24 : self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs, period_jitter,
25 : period_warmup,
26 : };
27 : use super::{
28 : CommandRequest, DownloadCommand, GetTenantError, SecondaryTenant, SecondaryTenantError,
29 : };
30 : use crate::TEMP_FILE_SUFFIX;
31 : use crate::config::PageServerConf;
32 : use crate::context::RequestContext;
33 : use crate::disk_usage_eviction_task::{
34 : DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer, finite_f32,
35 : };
36 : use crate::metrics::SECONDARY_MODE;
37 : use crate::tenant::config::SecondaryLocationConfig;
38 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
39 : use crate::tenant::ephemeral_file::is_ephemeral_file;
40 : use crate::tenant::mgr::TenantManager;
41 : use crate::tenant::remote_timeline_client::download::download_layer_file;
42 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
43 : use crate::tenant::remote_timeline_client::{
44 : FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, is_temp_download_file,
45 : remote_heatmap_path,
46 : };
47 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
48 : use crate::tenant::storage_layer::layer::local_layer_path;
49 : use crate::tenant::storage_layer::{LayerName, LayerVisibilityHint};
50 : use crate::tenant::tasks::{BackgroundLoopKind, warn_when_period_overrun};
51 : use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
52 :
53 : /// For each tenant, default period for how long must have passed since the last download_tenant call before
54 : /// calling it again. This default is replaced with the value of [`HeatMapTenant::upload_period_ms`] after first
55 : /// download, if the uploader populated it.
56 : const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
57 :
58 0 : pub(super) async fn downloader_task(
59 0 : tenant_manager: Arc<TenantManager>,
60 0 : remote_storage: GenericRemoteStorage,
61 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
62 0 : background_jobs_can_start: Barrier,
63 0 : cancel: CancellationToken,
64 0 : root_ctx: RequestContext,
65 0 : ) {
66 0 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
67 0 :
68 0 : let generator = SecondaryDownloader {
69 0 : tenant_manager,
70 0 : remote_storage,
71 0 : root_ctx,
72 0 : };
73 0 : let mut scheduler = Scheduler::new(generator, concurrency);
74 0 :
75 0 : scheduler
76 0 : .run(command_queue, background_jobs_can_start, cancel)
77 0 : .instrument(info_span!("secondary_download_scheduler"))
78 0 : .await
79 0 : }
80 :
81 : struct SecondaryDownloader {
82 : tenant_manager: Arc<TenantManager>,
83 : remote_storage: GenericRemoteStorage,
84 : root_ctx: RequestContext,
85 : }
86 :
87 : #[derive(Debug, Clone)]
88 : pub(super) struct OnDiskState {
89 : metadata: LayerFileMetadata,
90 : access_time: SystemTime,
91 : local_path: Utf8PathBuf,
92 : }
93 :
94 : impl OnDiskState {
95 0 : fn new(
96 0 : _conf: &'static PageServerConf,
97 0 : _tenant_shard_id: &TenantShardId,
98 0 : _imeline_id: &TimelineId,
99 0 : _ame: LayerName,
100 0 : metadata: LayerFileMetadata,
101 0 : access_time: SystemTime,
102 0 : local_path: Utf8PathBuf,
103 0 : ) -> Self {
104 0 : Self {
105 0 : metadata,
106 0 : access_time,
107 0 : local_path,
108 0 : }
109 0 : }
110 :
111 : // This is infallible, because all errors are either acceptable (ENOENT), or totally
112 : // unexpected (fatal).
113 0 : pub(super) fn remove_blocking(&self) {
114 0 : // We tolerate ENOENT, because between planning eviction and executing
115 0 : // it, the secondary downloader could have seen an updated heatmap that
116 0 : // resulted in a layer being deleted.
117 0 : // Other local I/O errors are process-fatal: these should never happen.
118 0 : std::fs::remove_file(&self.local_path)
119 0 : .or_else(fs_ext::ignore_not_found)
120 0 : .fatal_err("Deleting secondary layer")
121 0 : }
122 :
123 0 : pub(crate) fn file_size(&self) -> u64 {
124 0 : self.metadata.file_size
125 0 : }
126 : }
127 :
128 : pub(super) struct SecondaryDetailTimeline {
129 : on_disk_layers: HashMap<LayerName, OnDiskState>,
130 :
131 : /// We remember when layers were evicted, to prevent re-downloading them.
132 : pub(super) evicted_at: HashMap<LayerName, SystemTime>,
133 :
134 : ctx: RequestContext,
135 : }
136 :
137 : impl Clone for SecondaryDetailTimeline {
138 0 : fn clone(&self) -> Self {
139 0 : Self {
140 0 : on_disk_layers: self.on_disk_layers.clone(),
141 0 : evicted_at: self.evicted_at.clone(),
142 0 : // This is a bit awkward. The downloader code operates on a snapshot
143 0 : // of the secondary list to avoid locking it for extended periods of time.
144 0 : // No particularly strong reason to chose [`RequestContext::detached_child`],
145 0 : // but makes more sense than [`RequestContext::attached_child`].
146 0 : ctx: self
147 0 : .ctx
148 0 : .detached_child(self.ctx.task_kind(), self.ctx.download_behavior()),
149 0 : }
150 0 : }
151 : }
152 :
153 : impl std::fmt::Debug for SecondaryDetailTimeline {
154 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 0 : f.debug_struct("SecondaryDetailTimeline")
156 0 : .field("on_disk_layers", &self.on_disk_layers)
157 0 : .field("evicted_at", &self.evicted_at)
158 0 : .finish()
159 0 : }
160 : }
161 :
162 : impl SecondaryDetailTimeline {
163 0 : pub(super) fn empty(ctx: RequestContext) -> Self {
164 0 : SecondaryDetailTimeline {
165 0 : on_disk_layers: Default::default(),
166 0 : evicted_at: Default::default(),
167 0 : ctx,
168 0 : }
169 0 : }
170 :
171 0 : pub(super) fn context(&self) -> &RequestContext {
172 0 : &self.ctx
173 0 : }
174 :
175 0 : pub(super) fn remove_layer(
176 0 : &mut self,
177 0 : name: &LayerName,
178 0 : resident_metric: &UIntGauge,
179 0 : ) -> Option<OnDiskState> {
180 0 : let removed = self.on_disk_layers.remove(name);
181 0 : if let Some(removed) = &removed {
182 0 : resident_metric.sub(removed.file_size());
183 0 : }
184 0 : removed
185 0 : }
186 :
187 : /// `local_path`
188 0 : fn touch_layer<F>(
189 0 : &mut self,
190 0 : conf: &'static PageServerConf,
191 0 : tenant_shard_id: &TenantShardId,
192 0 : timeline_id: &TimelineId,
193 0 : touched: &HeatMapLayer,
194 0 : resident_metric: &UIntGauge,
195 0 : local_path: F,
196 0 : ) where
197 0 : F: FnOnce() -> Utf8PathBuf,
198 0 : {
199 : use std::collections::hash_map::Entry;
200 0 : match self.on_disk_layers.entry(touched.name.clone()) {
201 0 : Entry::Occupied(mut v) => {
202 0 : v.get_mut().access_time = touched.access_time;
203 0 : }
204 0 : Entry::Vacant(e) => {
205 0 : e.insert(OnDiskState::new(
206 0 : conf,
207 0 : tenant_shard_id,
208 0 : timeline_id,
209 0 : touched.name.clone(),
210 0 : touched.metadata.clone(),
211 0 : touched.access_time,
212 0 : local_path(),
213 0 : ));
214 0 : resident_metric.add(touched.metadata.file_size);
215 0 : }
216 : }
217 0 : }
218 : }
219 :
220 : // Aspects of a heatmap that we remember after downloading it
221 : #[derive(Clone, Debug)]
222 : struct DownloadSummary {
223 : etag: Etag,
224 : #[allow(unused)]
225 : mtime: SystemTime,
226 : upload_period: Duration,
227 : }
228 :
229 : /// This state is written by the secondary downloader, it is opaque
230 : /// to TenantManager
231 : #[derive(Debug)]
232 : pub(super) struct SecondaryDetail {
233 : pub(super) config: SecondaryLocationConfig,
234 :
235 : last_download: Option<DownloadSummary>,
236 : next_download: Option<Instant>,
237 : timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
238 : }
239 :
240 : /// Helper for logging SystemTime
241 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
242 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
243 0 : datetime.format("%d/%m/%Y %T")
244 0 : }
245 :
246 : /// Information returned from download function when it detects the heatmap has changed
247 : struct HeatMapModified {
248 : etag: Etag,
249 : last_modified: SystemTime,
250 : bytes: Vec<u8>,
251 : }
252 :
253 : enum HeatMapDownload {
254 : // The heatmap's etag has changed: return the new etag, mtime and the body bytes
255 : Modified(HeatMapModified),
256 : // The heatmap's etag is unchanged
257 : Unmodified,
258 : }
259 :
260 : impl SecondaryDetail {
261 0 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
262 0 : Self {
263 0 : config,
264 0 : last_download: None,
265 0 : next_download: None,
266 0 : timelines: HashMap::new(),
267 0 : }
268 0 : }
269 :
270 : #[cfg(feature = "testing")]
271 0 : pub(crate) fn total_resident_size(&self) -> u64 {
272 0 : self.timelines
273 0 : .values()
274 0 : .map(|tl| {
275 0 : tl.on_disk_layers
276 0 : .values()
277 0 : .map(|v| v.metadata.file_size)
278 0 : .sum::<u64>()
279 0 : })
280 0 : .sum::<u64>()
281 0 : }
282 :
283 0 : pub(super) fn evict_layer(
284 0 : &mut self,
285 0 : name: LayerName,
286 0 : timeline_id: &TimelineId,
287 0 : now: SystemTime,
288 0 : resident_metric: &UIntGauge,
289 0 : ) -> Option<OnDiskState> {
290 0 : let timeline = self.timelines.get_mut(timeline_id)?;
291 0 : let removed = timeline.remove_layer(&name, resident_metric);
292 0 : if removed.is_some() {
293 0 : timeline.evicted_at.insert(name, now);
294 0 : }
295 0 : removed
296 0 : }
297 :
298 0 : pub(super) fn remove_timeline(
299 0 : &mut self,
300 0 : tenant_shard_id: &TenantShardId,
301 0 : timeline_id: &TimelineId,
302 0 : resident_metric: &UIntGauge,
303 0 : ) {
304 0 : let removed = self.timelines.remove(timeline_id);
305 0 : if let Some(removed) = removed {
306 0 : Self::clear_timeline_metrics(tenant_shard_id, timeline_id, removed, resident_metric);
307 0 : }
308 0 : }
309 :
310 0 : pub(super) fn drain_timelines(
311 0 : &mut self,
312 0 : tenant_shard_id: &TenantShardId,
313 0 : resident_metric: &UIntGauge,
314 0 : ) {
315 0 : for (timeline_id, removed) in self.timelines.drain() {
316 0 : Self::clear_timeline_metrics(tenant_shard_id, &timeline_id, removed, resident_metric);
317 0 : }
318 0 : }
319 :
320 0 : fn clear_timeline_metrics(
321 0 : tenant_shard_id: &TenantShardId,
322 0 : timeline_id: &TimelineId,
323 0 : detail: SecondaryDetailTimeline,
324 0 : resident_metric: &UIntGauge,
325 0 : ) {
326 0 : resident_metric.sub(
327 0 : detail
328 0 : .on_disk_layers
329 0 : .values()
330 0 : .map(|l| l.metadata.file_size)
331 0 : .sum(),
332 0 : );
333 0 :
334 0 : let shard_id = format!("{}", tenant_shard_id.shard_slug());
335 0 : let tenant_id = tenant_shard_id.tenant_id.to_string();
336 0 : let timeline_id = timeline_id.to_string();
337 0 : for op in StorageIoSizeOperation::VARIANTS {
338 0 : let _ = STORAGE_IO_SIZE.remove_label_values(&[
339 0 : op,
340 0 : tenant_id.as_str(),
341 0 : shard_id.as_str(),
342 0 : timeline_id.as_str(),
343 0 : ]);
344 0 : }
345 0 : }
346 :
347 : /// Additionally returns the total number of layers, used for more stable relative access time
348 : /// based eviction.
349 0 : pub(super) fn get_layers_for_eviction(
350 0 : &self,
351 0 : parent: &Arc<SecondaryTenant>,
352 0 : ) -> (DiskUsageEvictionInfo, usize) {
353 0 : let mut result = DiskUsageEvictionInfo::default();
354 0 : let mut total_layers = 0;
355 :
356 0 : for (timeline_id, timeline_detail) in &self.timelines {
357 0 : result
358 0 : .resident_layers
359 0 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
360 0 : EvictionCandidate {
361 0 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
362 0 : secondary_tenant: parent.clone(),
363 0 : timeline_id: *timeline_id,
364 0 : name: name.clone(),
365 0 : metadata: ods.metadata.clone(),
366 0 : }),
367 0 : last_activity_ts: ods.access_time,
368 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
369 0 : // Secondary location layers are presumed visible, because Covered layers
370 0 : // are excluded from the heatmap
371 0 : visibility: LayerVisibilityHint::Visible,
372 0 : }
373 0 : }));
374 0 :
375 0 : // total might be missing currently downloading layers, but as a lower than actual
376 0 : // value it is good enough approximation.
377 0 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
378 0 : }
379 0 : result.max_layer_size = result
380 0 : .resident_layers
381 0 : .iter()
382 0 : .map(|l| l.layer.get_file_size())
383 0 : .max();
384 0 :
385 0 : tracing::debug!(
386 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
387 0 : parent.get_tenant_shard_id(),
388 0 : self.timelines.len(),
389 0 : result.resident_layers.len()
390 : );
391 :
392 0 : (result, total_layers)
393 0 : }
394 : }
395 :
396 : struct PendingDownload {
397 : secondary_state: Arc<SecondaryTenant>,
398 : last_download: Option<DownloadSummary>,
399 : target_time: Option<Instant>,
400 : }
401 :
402 : impl scheduler::PendingJob for PendingDownload {
403 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
404 0 : self.secondary_state.get_tenant_shard_id()
405 0 : }
406 : }
407 :
408 : struct RunningDownload {
409 : barrier: Barrier,
410 : }
411 :
412 : impl scheduler::RunningJob for RunningDownload {
413 0 : fn get_barrier(&self) -> Barrier {
414 0 : self.barrier.clone()
415 0 : }
416 : }
417 :
418 : struct CompleteDownload {
419 : secondary_state: Arc<SecondaryTenant>,
420 : completed_at: Instant,
421 : result: Result<(), UpdateError>,
422 : }
423 :
424 : impl scheduler::Completion for CompleteDownload {
425 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
426 0 : self.secondary_state.get_tenant_shard_id()
427 0 : }
428 : }
429 :
430 : type Scheduler = TenantBackgroundJobs<
431 : SecondaryDownloader,
432 : PendingDownload,
433 : RunningDownload,
434 : CompleteDownload,
435 : DownloadCommand,
436 : >;
437 :
438 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
439 : for SecondaryDownloader
440 : {
441 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
442 : fn on_completion(&mut self, completion: CompleteDownload) {
443 : let CompleteDownload {
444 : secondary_state,
445 : completed_at: _completed_at,
446 : result,
447 : } = completion;
448 :
449 : tracing::debug!("Secondary tenant download completed");
450 :
451 : let mut detail = secondary_state.detail.lock().unwrap();
452 :
453 : match result {
454 : Err(UpdateError::Restart) => {
455 : // Start downloading again as soon as we can. This will involve waiting for the scheduler's
456 : // scheduling interval. This slightly reduces the peak download speed of tenants that hit their
457 : // deadline and keep restarting, but that also helps give other tenants a chance to execute rather
458 : // that letting one big tenant dominate for a long time.
459 : detail.next_download = Some(Instant::now());
460 : }
461 : _ => {
462 : let period = detail
463 : .last_download
464 : .as_ref()
465 0 : .map(|d| d.upload_period)
466 : .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
467 :
468 : // We advance next_download irrespective of errors: we don't want error cases to result in
469 : // expensive busy-polling.
470 : detail.next_download = Some(Instant::now() + period_jitter(period, 5));
471 : }
472 : }
473 : }
474 :
475 0 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
476 0 : let mut result = SchedulingResult {
477 0 : jobs: Vec::new(),
478 0 : want_interval: None,
479 0 : };
480 0 :
481 0 : // Step 1: identify some tenants that we may work on
482 0 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
483 0 : self.tenant_manager
484 0 : .foreach_secondary_tenants(|_id, secondary_state| {
485 0 : tenants.push(secondary_state.clone());
486 0 : });
487 0 :
488 0 : // Step 2: filter out tenants which are not yet elegible to run
489 0 : let now = Instant::now();
490 0 : result.jobs = tenants
491 0 : .into_iter()
492 0 : .filter_map(|secondary_tenant| {
493 0 : let (last_download, next_download) = {
494 0 : let mut detail = secondary_tenant.detail.lock().unwrap();
495 0 :
496 0 : if !detail.config.warm {
497 : // Downloads are disabled for this tenant
498 0 : detail.next_download = None;
499 0 : return None;
500 0 : }
501 0 :
502 0 : if detail.next_download.is_none() {
503 0 : // Initialize randomly in the range from 0 to our interval: this uniformly spreads the start times. Subsequent
504 0 : // rounds will use a smaller jitter to avoid accidentally synchronizing later.
505 0 : detail.next_download = Some(now.checked_add(period_warmup(DEFAULT_DOWNLOAD_INTERVAL)).expect(
506 0 : "Using our constant, which is known to be small compared with clock range",
507 0 : ));
508 0 : }
509 0 : (detail.last_download.clone(), detail.next_download.unwrap())
510 0 : };
511 0 :
512 0 : if now > next_download {
513 0 : Some(PendingDownload {
514 0 : secondary_state: secondary_tenant,
515 0 : last_download,
516 0 : target_time: Some(next_download),
517 0 : })
518 : } else {
519 0 : None
520 : }
521 0 : })
522 0 : .collect();
523 0 :
524 0 : // Step 3: sort by target execution time to run most urgent first.
525 0 : result.jobs.sort_by_key(|j| j.target_time);
526 0 :
527 0 : result
528 0 : }
529 :
530 0 : fn on_command(
531 0 : &mut self,
532 0 : command: DownloadCommand,
533 0 : ) -> Result<PendingDownload, SecondaryTenantError> {
534 0 : let tenant_shard_id = command.get_tenant_shard_id();
535 :
536 0 : let tenant = self
537 0 : .tenant_manager
538 0 : .get_secondary_tenant_shard(*tenant_shard_id)
539 0 : .ok_or(GetTenantError::ShardNotFound(*tenant_shard_id))?;
540 :
541 0 : Ok(PendingDownload {
542 0 : target_time: None,
543 0 : last_download: None,
544 0 : secondary_state: tenant,
545 0 : })
546 0 : }
547 :
548 0 : fn spawn(
549 0 : &mut self,
550 0 : job: PendingDownload,
551 0 : ) -> (
552 0 : RunningDownload,
553 0 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
554 0 : ) {
555 0 : let PendingDownload {
556 0 : secondary_state,
557 0 : last_download,
558 0 : target_time,
559 0 : } = job;
560 0 :
561 0 : let (completion, barrier) = utils::completion::channel();
562 0 : let remote_storage = self.remote_storage.clone();
563 0 : let conf = self.tenant_manager.get_conf();
564 0 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
565 0 : let download_ctx = self
566 0 : .root_ctx
567 0 : .attached_child()
568 0 : .with_scope_secondary_tenant(&tenant_shard_id);
569 0 : (RunningDownload { barrier }, Box::pin(async move {
570 0 : let _completion = completion;
571 :
572 0 : let result = TenantDownloader::new(conf, &remote_storage, &secondary_state)
573 0 : .download(&download_ctx)
574 0 : .await;
575 0 : match &result
576 : {
577 : Err(UpdateError::NoData) => {
578 0 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
579 : },
580 : Err(UpdateError::NoSpace) => {
581 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
582 : }
583 : Err(UpdateError::Cancelled) => {
584 0 : tracing::info!("Shut down while downloading");
585 : },
586 0 : Err(UpdateError::Deserialize(e)) => {
587 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
588 : },
589 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
590 0 : tracing::error!("Error while downloading tenant: {e}");
591 : },
592 : Err(UpdateError::Restart) => {
593 0 : tracing::info!("Download reached deadline & will restart to update heatmap")
594 : }
595 0 : Ok(()) => {}
596 : };
597 :
598 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
599 :
600 : // If the job had a target execution time, we may check our final execution
601 : // time against that for observability purposes.
602 0 : if let (Some(target_time), Some(last_download)) = (target_time, last_download) {
603 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
604 0 : let elapsed = Instant::now().duration_since(target_time);
605 0 :
606 0 : warn_when_period_overrun(
607 0 : elapsed,
608 0 : last_download.upload_period,
609 0 : BackgroundLoopKind::SecondaryDownload,
610 0 : );
611 0 : }
612 :
613 0 : CompleteDownload {
614 0 : secondary_state,
615 0 : completed_at: Instant::now(),
616 0 : result
617 0 : }
618 0 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
619 0 : }
620 : }
621 :
622 : enum LayerAction {
623 : Download,
624 : NoAction,
625 : Skip,
626 : Touch,
627 : }
628 :
629 : /// This type is a convenience to group together the various functions involved in
630 : /// freshening a secondary tenant.
631 : struct TenantDownloader<'a> {
632 : conf: &'static PageServerConf,
633 : remote_storage: &'a GenericRemoteStorage,
634 : secondary_state: &'a SecondaryTenant,
635 : }
636 :
637 : /// Errors that may be encountered while updating a tenant
638 : #[derive(thiserror::Error, Debug)]
639 : enum UpdateError {
640 : /// This is not a true failure, but it's how a download indicates that it would like to be restarted by
641 : /// the scheduler, to pick up the latest heatmap
642 : #[error("Reached deadline, restarting downloads")]
643 : Restart,
644 :
645 : #[error("No remote data found")]
646 : NoData,
647 : #[error("Insufficient local storage space")]
648 : NoSpace,
649 : #[error("Failed to download")]
650 : DownloadError(DownloadError),
651 : #[error(transparent)]
652 : Deserialize(#[from] serde_json::Error),
653 : #[error("Cancelled")]
654 : Cancelled,
655 : #[error(transparent)]
656 : Other(#[from] anyhow::Error),
657 : }
658 :
659 : impl From<DownloadError> for UpdateError {
660 0 : fn from(value: DownloadError) -> Self {
661 0 : match &value {
662 0 : DownloadError::Cancelled => Self::Cancelled,
663 0 : DownloadError::NotFound => Self::NoData,
664 0 : _ => Self::DownloadError(value),
665 : }
666 0 : }
667 : }
668 :
669 : impl From<std::io::Error> for UpdateError {
670 0 : fn from(value: std::io::Error) -> Self {
671 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
672 0 : UpdateError::NoSpace
673 0 : } else if value
674 0 : .get_ref()
675 0 : .and_then(|x| x.downcast_ref::<DownloadError>())
676 0 : .is_some()
677 : {
678 0 : UpdateError::from(DownloadError::from(value))
679 : } else {
680 : // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
681 0 : UpdateError::Other(anyhow::anyhow!(value))
682 : }
683 0 : }
684 : }
685 :
686 : impl<'a> TenantDownloader<'a> {
687 0 : fn new(
688 0 : conf: &'static PageServerConf,
689 0 : remote_storage: &'a GenericRemoteStorage,
690 0 : secondary_state: &'a SecondaryTenant,
691 0 : ) -> Self {
692 0 : Self {
693 0 : conf,
694 0 : remote_storage,
695 0 : secondary_state,
696 0 : }
697 0 : }
698 :
699 0 : async fn download(&self, ctx: &RequestContext) -> Result<(), UpdateError> {
700 0 : debug_assert_current_span_has_tenant_id();
701 :
702 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
703 : // cover our access to local storage.
704 0 : let Ok(_guard) = self.secondary_state.gate.enter() else {
705 : // Shutting down
706 0 : return Err(UpdateError::Cancelled);
707 : };
708 :
709 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
710 0 :
711 0 : // We will use the etag from last successful download to make the download conditional on changes
712 0 : let last_download = self
713 0 : .secondary_state
714 0 : .detail
715 0 : .lock()
716 0 : .unwrap()
717 0 : .last_download
718 0 : .clone();
719 :
720 : // Download the tenant's heatmap
721 : let HeatMapModified {
722 0 : last_modified: heatmap_mtime,
723 0 : etag: heatmap_etag,
724 0 : bytes: heatmap_bytes,
725 0 : } = match tokio::select!(
726 0 : bytes = self.download_heatmap(last_download.as_ref().map(|d| &d.etag)) => {bytes?},
727 0 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
728 : ) {
729 : HeatMapDownload::Unmodified => {
730 0 : tracing::info!("Heatmap unchanged since last successful download");
731 0 : return Ok(());
732 : }
733 0 : HeatMapDownload::Modified(m) => m,
734 0 : };
735 0 :
736 0 : // Heatmap storage location
737 0 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
738 :
739 0 : let last_heatmap = if last_download.is_none() {
740 0 : match load_heatmap(&heatmap_path, ctx).await {
741 0 : Ok(htm) => htm,
742 0 : Err(e) => {
743 0 : tracing::warn!("Couldn't load heatmap from {heatmap_path}: {e:?}");
744 0 : None
745 : }
746 : }
747 : } else {
748 0 : None
749 : };
750 :
751 0 : let last_heatmap_timelines = last_heatmap.as_ref().map(|htm| {
752 0 : htm.timelines
753 0 : .iter()
754 0 : .map(|tl| (tl.timeline_id, tl))
755 0 : .collect::<HashMap<_, _>>()
756 0 : });
757 :
758 0 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
759 :
760 0 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
761 0 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
762 0 : let heatmap_path_bg = heatmap_path.clone();
763 0 : VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
764 0 : .await
765 0 : .maybe_fatal_err(&context_msg)?;
766 :
767 0 : tracing::debug!(
768 0 : "Wrote local heatmap to {}, with {} timelines",
769 0 : heatmap_path,
770 0 : heatmap.timelines.len()
771 : );
772 :
773 : // Get or initialize the local disk state for the timelines we will update
774 0 : let mut timeline_states = HashMap::new();
775 0 : for timeline in &heatmap.timelines {
776 0 : let timeline_state = self
777 0 : .secondary_state
778 0 : .detail
779 0 : .lock()
780 0 : .unwrap()
781 0 : .timelines
782 0 : .get(&timeline.timeline_id)
783 0 : .cloned();
784 :
785 0 : let timeline_state = match timeline_state {
786 0 : Some(t) => t,
787 : None => {
788 0 : let last_heatmap =
789 0 : last_heatmap_timelines
790 0 : .as_ref()
791 0 : .and_then(|last_heatmap_timelines| {
792 0 : last_heatmap_timelines.get(&timeline.timeline_id).copied()
793 0 : });
794 : // We have no existing state: need to scan local disk for layers first.
795 0 : let timeline_state = init_timeline_state(
796 0 : self.conf,
797 0 : tenant_shard_id,
798 0 : last_heatmap,
799 0 : timeline,
800 0 : &self.secondary_state.resident_size_metric,
801 0 : ctx,
802 0 : )
803 0 : .await;
804 :
805 : // Re-acquire detail lock now that we're done with async load from local FS
806 0 : self.secondary_state
807 0 : .detail
808 0 : .lock()
809 0 : .unwrap()
810 0 : .timelines
811 0 : .insert(timeline.timeline_id, timeline_state.clone());
812 0 : timeline_state
813 : }
814 : };
815 :
816 0 : timeline_states.insert(timeline.timeline_id, timeline_state);
817 : }
818 :
819 : // Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general
820 : // principle that deletions should be done before writes wherever possible, and so that we can use this
821 : // phase to initialize our SecondaryProgress.
822 : {
823 0 : *self.secondary_state.progress.lock().unwrap() =
824 0 : self.prepare_timelines(&heatmap, heatmap_mtime).await?;
825 : }
826 :
827 : // Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again,
828 : // so that we are always using reasonably a fresh heatmap. Otherwise, if we had really huge content to download, we might
829 : // spend 10s of minutes downloading layers we don't need.
830 : // (see https://github.com/neondatabase/neon/issues/8182)
831 0 : let deadline = {
832 0 : let period = self
833 0 : .secondary_state
834 0 : .detail
835 0 : .lock()
836 0 : .unwrap()
837 0 : .last_download
838 0 : .as_ref()
839 0 : .map(|d| d.upload_period)
840 0 : .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
841 0 :
842 0 : // Use double the period: we are not promising to complete within the period, this is just a heuristic
843 0 : // to keep using a "reasonably fresh" heatmap.
844 0 : Instant::now() + period * 2
845 : };
846 :
847 : // Download the layers in the heatmap
848 0 : for timeline in heatmap.timelines {
849 0 : let timeline_state = timeline_states
850 0 : .remove(&timeline.timeline_id)
851 0 : .expect("Just populated above");
852 0 :
853 0 : if self.secondary_state.cancel.is_cancelled() {
854 0 : tracing::debug!(
855 0 : "Cancelled before downloading timeline {}",
856 : timeline.timeline_id
857 : );
858 0 : return Ok(());
859 0 : }
860 0 :
861 0 : let timeline_id = timeline.timeline_id;
862 0 : self.download_timeline(timeline, timeline_state, deadline, ctx)
863 0 : .instrument(tracing::info_span!(
864 : "secondary_download_timeline",
865 : tenant_id=%tenant_shard_id.tenant_id,
866 0 : shard_id=%tenant_shard_id.shard_slug(),
867 : %timeline_id
868 : ))
869 0 : .await?;
870 : }
871 :
872 : // Metrics consistency check in testing builds
873 0 : self.secondary_state.validate_metrics();
874 0 : // Only update last_etag after a full successful download: this way will not skip
875 0 : // the next download, even if the heatmap's actual etag is unchanged.
876 0 : self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
877 0 : etag: heatmap_etag,
878 0 : mtime: heatmap_mtime,
879 0 : upload_period: heatmap
880 0 : .upload_period_ms
881 0 : .map(|ms| Duration::from_millis(ms as u64))
882 0 : .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL),
883 0 : });
884 0 :
885 0 : // Robustness: we should have updated progress properly, but in case we didn't, make sure
886 0 : // we don't leave the tenant in a state where we claim to have successfully downloaded
887 0 : // everything, but our progress is incomplete. The invariant here should be that if
888 0 : // we have set `last_download` to this heatmap's etag, then the next time we see that
889 0 : // etag we can safely do no work (i.e. we must be complete).
890 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
891 0 : debug_assert!(progress.layers_downloaded == progress.layers_total);
892 0 : debug_assert!(progress.bytes_downloaded == progress.bytes_total);
893 0 : if progress.layers_downloaded != progress.layers_total
894 0 : || progress.bytes_downloaded != progress.bytes_total
895 : {
896 0 : tracing::warn!("Correcting drift in progress stats ({progress:?})");
897 0 : progress.layers_downloaded = progress.layers_total;
898 0 : progress.bytes_downloaded = progress.bytes_total;
899 0 : }
900 :
901 0 : Ok(())
902 0 : }
903 :
904 : /// Do any fast local cleanup that comes before the much slower process of downloading
905 : /// layers from remote storage. In the process, initialize the SecondaryProgress object
906 : /// that will later be updated incrementally as we download layers.
907 0 : async fn prepare_timelines(
908 0 : &self,
909 0 : heatmap: &HeatMapTenant,
910 0 : heatmap_mtime: SystemTime,
911 0 : ) -> Result<SecondaryProgress, UpdateError> {
912 0 : let heatmap_stats = heatmap.get_stats();
913 0 : // We will construct a progress object, and then populate its initial "downloaded" numbers
914 0 : // while iterating through local layer state in [`Self::prepare_timelines`]
915 0 : let mut progress = SecondaryProgress {
916 0 : layers_total: heatmap_stats.layers,
917 0 : bytes_total: heatmap_stats.bytes,
918 0 : heatmap_mtime: Some(serde_system_time::SystemTime(heatmap_mtime)),
919 0 : layers_downloaded: 0,
920 0 : bytes_downloaded: 0,
921 0 : };
922 0 :
923 0 : // Also expose heatmap bytes_total as a metric
924 0 : self.secondary_state
925 0 : .heatmap_total_size_metric
926 0 : .set(heatmap_stats.bytes);
927 0 :
928 0 : // Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
929 0 : let mut delete_layers = Vec::new();
930 0 : let mut delete_timelines = Vec::new();
931 0 : {
932 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
933 0 : for (timeline_id, timeline_state) in &mut detail.timelines {
934 0 : let Some(heatmap_timeline_index) = heatmap
935 0 : .timelines
936 0 : .iter()
937 0 : .position(|t| t.timeline_id == *timeline_id)
938 : else {
939 : // This timeline is no longer referenced in the heatmap: delete it locally
940 0 : delete_timelines.push(*timeline_id);
941 0 : continue;
942 : };
943 :
944 0 : let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
945 0 :
946 0 : let layers_in_heatmap = heatmap_timeline
947 0 : .hot_layers()
948 0 : .map(|l| (&l.name, l.metadata.generation))
949 0 : .collect::<HashSet<_>>();
950 0 : let layers_on_disk = timeline_state
951 0 : .on_disk_layers
952 0 : .iter()
953 0 : .map(|l| (l.0, l.1.metadata.generation))
954 0 : .collect::<HashSet<_>>();
955 0 :
956 0 : let mut layer_count = layers_on_disk.len();
957 0 : let mut layer_byte_count: u64 = timeline_state
958 0 : .on_disk_layers
959 0 : .values()
960 0 : .map(|l| l.metadata.file_size)
961 0 : .sum();
962 :
963 : // Remove on-disk layers that are no longer present in heatmap
964 0 : for (layer_file_name, generation) in layers_on_disk.difference(&layers_in_heatmap) {
965 0 : layer_count -= 1;
966 0 : layer_byte_count -= timeline_state
967 0 : .on_disk_layers
968 0 : .get(layer_file_name)
969 0 : .unwrap()
970 0 : .metadata
971 0 : .file_size;
972 0 :
973 0 : let local_path = local_layer_path(
974 0 : self.conf,
975 0 : self.secondary_state.get_tenant_shard_id(),
976 0 : timeline_id,
977 0 : layer_file_name,
978 0 : generation,
979 0 : );
980 0 :
981 0 : delete_layers.push((*timeline_id, (*layer_file_name).clone(), local_path));
982 0 : }
983 :
984 0 : progress.bytes_downloaded += layer_byte_count;
985 0 : progress.layers_downloaded += layer_count;
986 : }
987 :
988 0 : for delete_timeline in &delete_timelines {
989 0 : // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
990 0 : // from disk fails that will be a fatal error.
991 0 : detail.remove_timeline(
992 0 : self.secondary_state.get_tenant_shard_id(),
993 0 : delete_timeline,
994 0 : &self.secondary_state.resident_size_metric,
995 0 : );
996 0 : }
997 : }
998 :
999 : // Execute accumulated deletions
1000 0 : for (timeline_id, layer_name, local_path) in delete_layers {
1001 0 : tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
1002 :
1003 0 : tokio::fs::remove_file(&local_path)
1004 0 : .await
1005 0 : .or_else(fs_ext::ignore_not_found)
1006 0 : .maybe_fatal_err("Removing secondary layer")?;
1007 :
1008 : // Update in-memory housekeeping to reflect the absence of the deleted layer
1009 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
1010 0 : let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
1011 0 : continue;
1012 : };
1013 0 : timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric);
1014 : }
1015 :
1016 0 : for timeline_id in delete_timelines {
1017 0 : let timeline_path = self
1018 0 : .conf
1019 0 : .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
1020 0 : tracing::info!(timeline_id=%timeline_id,
1021 0 : "Timeline no longer in heatmap, removing from secondary location"
1022 : );
1023 0 : tokio::fs::remove_dir_all(&timeline_path)
1024 0 : .await
1025 0 : .or_else(fs_ext::ignore_not_found)
1026 0 : .maybe_fatal_err("Removing secondary timeline")?;
1027 : }
1028 :
1029 0 : Ok(progress)
1030 0 : }
1031 :
1032 : /// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object
1033 : /// still matches `prev_etag`.
1034 0 : async fn download_heatmap(
1035 0 : &self,
1036 0 : prev_etag: Option<&Etag>,
1037 0 : ) -> Result<HeatMapDownload, UpdateError> {
1038 0 : debug_assert_current_span_has_tenant_id();
1039 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
1040 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
1041 :
1042 0 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
1043 0 : let cancel = &self.secondary_state.cancel;
1044 0 : let opts = DownloadOpts {
1045 0 : etag: prev_etag.cloned(),
1046 0 : kind: DownloadKind::Small,
1047 0 : ..Default::default()
1048 0 : };
1049 0 :
1050 0 : backoff::retry(
1051 0 : || async {
1052 0 : let download = match self
1053 0 : .remote_storage
1054 0 : .download(&heatmap_path, &opts, cancel)
1055 0 : .await
1056 : {
1057 0 : Ok(download) => download,
1058 0 : Err(DownloadError::Unmodified) => return Ok(HeatMapDownload::Unmodified),
1059 0 : Err(err) => return Err(err.into()),
1060 : };
1061 :
1062 0 : let mut heatmap_bytes = Vec::new();
1063 0 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
1064 0 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
1065 0 : Ok(HeatMapDownload::Modified(HeatMapModified {
1066 0 : etag: download.etag,
1067 0 : last_modified: download.last_modified,
1068 0 : bytes: heatmap_bytes,
1069 0 : }))
1070 0 : },
1071 0 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
1072 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1073 0 : FAILED_REMOTE_OP_RETRIES,
1074 0 : "download heatmap",
1075 0 : cancel,
1076 0 : )
1077 0 : .await
1078 0 : .ok_or_else(|| UpdateError::Cancelled)
1079 0 : .and_then(|x| x)
1080 0 : .inspect(|_| SECONDARY_MODE.download_heatmap.inc())
1081 0 : }
1082 :
1083 : /// Download heatmap layers that are not present on local disk, or update their
1084 : /// access time if they are already present.
1085 0 : async fn download_timeline_layers(
1086 0 : &self,
1087 0 : tenant_shard_id: &TenantShardId,
1088 0 : timeline: HeatMapTimeline,
1089 0 : timeline_state: SecondaryDetailTimeline,
1090 0 : deadline: Instant,
1091 0 : ) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
1092 0 : // Accumulate updates to the state
1093 0 : let mut touched = Vec::new();
1094 0 :
1095 0 : let timeline_id = timeline.timeline_id;
1096 0 : for layer in timeline.into_hot_layers() {
1097 0 : if self.secondary_state.cancel.is_cancelled() {
1098 0 : tracing::debug!("Cancelled -- dropping out of layer loop");
1099 0 : return (Err(UpdateError::Cancelled), touched);
1100 0 : }
1101 0 :
1102 0 : if Instant::now() > deadline {
1103 : // We've been running downloads for a while, restart to download latest heatmap.
1104 0 : return (Err(UpdateError::Restart), touched);
1105 0 : }
1106 0 :
1107 0 : match self.layer_action(&timeline_state, &layer).await {
1108 0 : LayerAction::Download => (),
1109 0 : LayerAction::NoAction => continue,
1110 : LayerAction::Skip => {
1111 0 : self.skip_layer(layer);
1112 0 : continue;
1113 : }
1114 : LayerAction::Touch => {
1115 0 : touched.push(layer);
1116 0 : continue;
1117 : }
1118 : }
1119 :
1120 0 : match self
1121 0 : .download_layer(
1122 0 : tenant_shard_id,
1123 0 : &timeline_id,
1124 0 : layer,
1125 0 : timeline_state.context(),
1126 0 : )
1127 0 : .await
1128 : {
1129 0 : Ok(Some(layer)) => touched.push(layer),
1130 0 : Ok(None) => {
1131 0 : // Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
1132 0 : // things to consider touched.
1133 0 : }
1134 0 : Err(e) => {
1135 0 : return (Err(e), touched);
1136 : }
1137 : }
1138 : }
1139 :
1140 0 : (Ok(()), touched)
1141 0 : }
1142 :
1143 0 : async fn layer_action(
1144 0 : &self,
1145 0 : timeline_state: &SecondaryDetailTimeline,
1146 0 : layer: &HeatMapLayer,
1147 0 : ) -> LayerAction {
1148 : // Existing on-disk layers: just update their access time.
1149 0 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
1150 0 : tracing::debug!("Layer {} is already on disk", layer.name);
1151 :
1152 0 : if cfg!(debug_assertions) {
1153 : // Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
1154 : // are already present on disk are really there.
1155 0 : match tokio::fs::metadata(&on_disk.local_path).await {
1156 0 : Ok(meta) => {
1157 0 : tracing::debug!(
1158 0 : "Layer {} present at {}, size {}",
1159 0 : layer.name,
1160 0 : on_disk.local_path,
1161 0 : meta.len(),
1162 : );
1163 : }
1164 0 : Err(e) => {
1165 0 : tracing::warn!(
1166 0 : "Layer {} not found at {} ({})",
1167 : layer.name,
1168 : on_disk.local_path,
1169 : e
1170 : );
1171 0 : debug_assert!(false);
1172 : }
1173 : }
1174 0 : }
1175 :
1176 0 : if on_disk.metadata.generation_file_size() != layer.metadata.generation_file_size() {
1177 0 : tracing::info!(
1178 0 : "Re-downloading layer {} with changed size or generation: {:?}->{:?}",
1179 0 : layer.name,
1180 0 : on_disk.metadata.generation_file_size(),
1181 0 : layer.metadata.generation_file_size()
1182 : );
1183 0 : return LayerAction::Download;
1184 0 : }
1185 0 : if on_disk.metadata != layer.metadata || on_disk.access_time != layer.access_time {
1186 : // We already have this layer on disk. Update its access time.
1187 0 : tracing::debug!(
1188 0 : "Access time updated for layer {}: {} -> {}",
1189 0 : layer.name,
1190 0 : strftime(&on_disk.access_time),
1191 0 : strftime(&layer.access_time)
1192 : );
1193 0 : return LayerAction::Touch;
1194 0 : }
1195 0 : return LayerAction::NoAction;
1196 : } else {
1197 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
1198 : }
1199 :
1200 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
1201 : // recently than it was evicted.
1202 0 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
1203 0 : if &layer.access_time > evicted_at {
1204 0 : tracing::info!(
1205 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
1206 0 : layer.name,
1207 0 : strftime(&layer.access_time),
1208 0 : strftime(evicted_at)
1209 : );
1210 : } else {
1211 0 : tracing::trace!(
1212 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
1213 0 : layer.name,
1214 0 : strftime(&layer.access_time),
1215 0 : strftime(evicted_at)
1216 : );
1217 0 : return LayerAction::Skip;
1218 : }
1219 0 : }
1220 0 : LayerAction::Download
1221 0 : }
1222 :
1223 0 : async fn download_timeline(
1224 0 : &self,
1225 0 : timeline: HeatMapTimeline,
1226 0 : timeline_state: SecondaryDetailTimeline,
1227 0 : deadline: Instant,
1228 0 : ctx: &RequestContext,
1229 0 : ) -> Result<(), UpdateError> {
1230 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1231 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
1232 0 : let timeline_id = timeline.timeline_id;
1233 0 :
1234 0 : tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
1235 :
1236 0 : let (result, touched) = self
1237 0 : .download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline)
1238 0 : .await;
1239 :
1240 : // Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
1241 : {
1242 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
1243 0 : let timeline_detail = detail.timelines.entry(timeline_id).or_insert_with(|| {
1244 0 : let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline_id);
1245 0 : SecondaryDetailTimeline::empty(ctx)
1246 0 : });
1247 0 :
1248 0 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
1249 0 : touched.into_iter().for_each(|t| {
1250 0 : timeline_detail.touch_layer(
1251 0 : self.conf,
1252 0 : tenant_shard_id,
1253 0 : &timeline_id,
1254 0 : &t,
1255 0 : &self.secondary_state.resident_size_metric,
1256 0 : || {
1257 0 : local_layer_path(
1258 0 : self.conf,
1259 0 : tenant_shard_id,
1260 0 : &timeline_id,
1261 0 : &t.name,
1262 0 : &t.metadata.generation,
1263 0 : )
1264 0 : },
1265 0 : )
1266 0 : });
1267 0 : }
1268 0 :
1269 0 : result
1270 0 : }
1271 :
1272 : /// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
1273 0 : fn skip_layer(&self, layer: HeatMapLayer) {
1274 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
1275 0 : progress.layers_total = progress.layers_total.saturating_sub(1);
1276 0 : progress.bytes_total = progress
1277 0 : .bytes_total
1278 0 : .saturating_sub(layer.metadata.file_size);
1279 0 : }
1280 :
1281 0 : async fn download_layer(
1282 0 : &self,
1283 0 : tenant_shard_id: &TenantShardId,
1284 0 : timeline_id: &TimelineId,
1285 0 : layer: HeatMapLayer,
1286 0 : ctx: &RequestContext,
1287 0 : ) -> Result<Option<HeatMapLayer>, UpdateError> {
1288 0 : // Failpoints for simulating slow remote storage
1289 0 : failpoint_support::sleep_millis_async!(
1290 : "secondary-layer-download-sleep",
1291 0 : &self.secondary_state.cancel
1292 : );
1293 :
1294 0 : pausable_failpoint!("secondary-layer-download-pausable");
1295 :
1296 0 : let local_path = local_layer_path(
1297 0 : self.conf,
1298 0 : tenant_shard_id,
1299 0 : timeline_id,
1300 0 : &layer.name,
1301 0 : &layer.metadata.generation,
1302 0 : );
1303 0 :
1304 0 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
1305 0 : tracing::info!(
1306 0 : "Starting download of layer {}, size {}",
1307 : layer.name,
1308 : layer.metadata.file_size
1309 : );
1310 0 : let downloaded_bytes = download_layer_file(
1311 0 : self.conf,
1312 0 : self.remote_storage,
1313 0 : *tenant_shard_id,
1314 0 : *timeline_id,
1315 0 : &layer.name,
1316 0 : &layer.metadata,
1317 0 : &local_path,
1318 0 : &self.secondary_state.gate,
1319 0 : &self.secondary_state.cancel,
1320 0 : ctx,
1321 0 : )
1322 0 : .await;
1323 :
1324 0 : let downloaded_bytes = match downloaded_bytes {
1325 0 : Ok(bytes) => bytes,
1326 : Err(DownloadError::NotFound) => {
1327 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
1328 : // This is harmless: continue to download the next layer. It is expected during compaction
1329 : // GC.
1330 0 : tracing::debug!(
1331 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
1332 : layer.name
1333 : );
1334 0 : self.skip_layer(layer);
1335 0 :
1336 0 : return Ok(None);
1337 : }
1338 0 : Err(e) => return Err(e.into()),
1339 : };
1340 :
1341 0 : if downloaded_bytes != layer.metadata.file_size {
1342 0 : let local_path = local_layer_path(
1343 0 : self.conf,
1344 0 : tenant_shard_id,
1345 0 : timeline_id,
1346 0 : &layer.name,
1347 0 : &layer.metadata.generation,
1348 0 : );
1349 0 :
1350 0 : tracing::warn!(
1351 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
1352 : layer.name,
1353 : downloaded_bytes,
1354 : layer.metadata.file_size
1355 : );
1356 :
1357 0 : tokio::fs::remove_file(&local_path)
1358 0 : .await
1359 0 : .or_else(fs_ext::ignore_not_found)?;
1360 : } else {
1361 0 : tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
1362 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
1363 0 : progress.bytes_downloaded += downloaded_bytes;
1364 0 : progress.layers_downloaded += 1;
1365 : }
1366 :
1367 0 : SECONDARY_MODE.download_layer.inc();
1368 0 :
1369 0 : Ok(Some(layer))
1370 0 : }
1371 : }
1372 :
1373 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
1374 0 : async fn init_timeline_state(
1375 0 : conf: &'static PageServerConf,
1376 0 : tenant_shard_id: &TenantShardId,
1377 0 : last_heatmap: Option<&HeatMapTimeline>,
1378 0 : heatmap: &HeatMapTimeline,
1379 0 : resident_metric: &UIntGauge,
1380 0 : ctx: &RequestContext,
1381 0 : ) -> SecondaryDetailTimeline {
1382 0 : let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &heatmap.timeline_id);
1383 0 : let mut detail = SecondaryDetailTimeline::empty(ctx);
1384 0 :
1385 0 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
1386 0 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
1387 0 : Ok(d) => d,
1388 0 : Err(e) => {
1389 0 : if e.kind() == std::io::ErrorKind::NotFound {
1390 0 : let context = format!("Creating timeline directory {timeline_path}");
1391 0 : tracing::info!("{}", context);
1392 0 : tokio::fs::create_dir_all(&timeline_path)
1393 0 : .await
1394 0 : .fatal_err(&context);
1395 0 :
1396 0 : // No entries to report: drop out.
1397 0 : return detail;
1398 : } else {
1399 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
1400 : }
1401 : }
1402 : };
1403 :
1404 : // As we iterate through layers found on disk, we will look up their metadata from this map.
1405 : // Layers not present in metadata will be discarded.
1406 0 : let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
1407 0 : heatmap.hot_layers().map(|l| (&l.name, l)).collect();
1408 :
1409 0 : let last_heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
1410 0 : if let Some(last_heatmap) = last_heatmap {
1411 0 : last_heatmap.hot_layers().map(|l| (&l.name, l)).collect()
1412 : } else {
1413 0 : HashMap::new()
1414 : };
1415 :
1416 0 : while let Some(dentry) = dir
1417 0 : .next_entry()
1418 0 : .await
1419 0 : .fatal_err(&format!("Listing {timeline_path}"))
1420 : {
1421 0 : let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
1422 0 : tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
1423 0 : continue;
1424 : };
1425 0 : let local_meta = dentry
1426 0 : .metadata()
1427 0 : .await
1428 0 : .fatal_err(&format!("Read metadata on {}", file_path));
1429 0 :
1430 0 : let file_name = file_path.file_name().expect("created it from the dentry");
1431 0 : if crate::is_temporary(&file_path)
1432 0 : || is_temp_download_file(&file_path)
1433 0 : || is_ephemeral_file(file_name)
1434 : {
1435 : // Temporary files are frequently left behind from restarting during downloads
1436 0 : tracing::info!("Cleaning up temporary file {file_path}");
1437 0 : if let Err(e) = tokio::fs::remove_file(&file_path)
1438 0 : .await
1439 0 : .or_else(fs_ext::ignore_not_found)
1440 : {
1441 0 : tracing::error!("Failed to remove temporary file {file_path}: {e}");
1442 0 : }
1443 0 : continue;
1444 0 : }
1445 0 :
1446 0 : match LayerName::from_str(file_name) {
1447 0 : Ok(name) => {
1448 0 : let remote_meta = heatmap_metadata.get(&name);
1449 0 : let last_meta = last_heatmap_metadata.get(&name);
1450 0 : let mut remove = false;
1451 0 : match remote_meta {
1452 0 : Some(remote_meta) => {
1453 0 : let last_meta_generation_file_size = last_meta
1454 0 : .map(|m| m.metadata.generation_file_size())
1455 0 : .unwrap_or(remote_meta.metadata.generation_file_size());
1456 0 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
1457 0 : if remote_meta.metadata.generation_file_size()
1458 0 : != last_meta_generation_file_size
1459 : {
1460 0 : tracing::info!(
1461 0 : "Removing local layer {name} as on-disk json metadata has different generation or file size from remote: {:?} -> {:?}",
1462 0 : last_meta_generation_file_size,
1463 0 : remote_meta.metadata.generation_file_size()
1464 : );
1465 0 : remove = true;
1466 0 : } else if local_meta.len() != remote_meta.metadata.file_size {
1467 : // This can happen in the presence of race conditions: the remote and on-disk metadata have changed, but we haven't had
1468 : // the chance yet to download the new layer to disk, before the process restarted.
1469 0 : tracing::info!(
1470 0 : "Removing local layer {name} with unexpected local size {} != {}",
1471 0 : local_meta.len(),
1472 : remote_meta.metadata.file_size
1473 : );
1474 0 : remove = true;
1475 0 : } else {
1476 0 : // We expect the access time to be initialized immediately afterwards, when
1477 0 : // the latest heatmap is applied to the state.
1478 0 : detail.touch_layer(
1479 0 : conf,
1480 0 : tenant_shard_id,
1481 0 : &heatmap.timeline_id,
1482 0 : remote_meta,
1483 0 : resident_metric,
1484 0 : || file_path,
1485 0 : );
1486 0 : }
1487 : }
1488 : None => {
1489 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
1490 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
1491 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
1492 0 : tracing::info!(
1493 0 : "Removing secondary local layer {} because it's absent in heatmap",
1494 : name
1495 : );
1496 0 : remove = true;
1497 : }
1498 : }
1499 0 : if remove {
1500 0 : tokio::fs::remove_file(&dentry.path())
1501 0 : .await
1502 0 : .or_else(fs_ext::ignore_not_found)
1503 0 : .fatal_err(&format!(
1504 0 : "Removing layer {}",
1505 0 : dentry.path().to_string_lossy()
1506 0 : ));
1507 0 : }
1508 : }
1509 : Err(_) => {
1510 : // Ignore it.
1511 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
1512 : }
1513 : }
1514 : }
1515 :
1516 0 : detail
1517 0 : }
1518 :
1519 : /// Loads a json-encoded heatmap file from the provided on-disk path
1520 0 : async fn load_heatmap(
1521 0 : path: &Utf8PathBuf,
1522 0 : ctx: &RequestContext,
1523 0 : ) -> Result<Option<HeatMapTenant>, anyhow::Error> {
1524 0 : let mut file = match VirtualFile::open(path, ctx).await {
1525 0 : Ok(file) => file,
1526 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1527 0 : Err(e) => Err(e)?,
1528 : };
1529 0 : let st = file.read_to_string(ctx).await?;
1530 0 : let htm = serde_json::from_str(&st)?;
1531 0 : Ok(Some(htm))
1532 0 : }
|