Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : context::RequestContext,
12 : disk_usage_eviction_task::{
13 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
14 : },
15 : metrics::SECONDARY_MODE,
16 : tenant::{
17 : config::SecondaryLocationConfig,
18 : debug_assert_current_span_has_tenant_and_timeline_id,
19 : ephemeral_file::is_ephemeral_file,
20 : remote_timeline_client::{
21 : index::LayerFileMetadata, is_temp_download_file, FAILED_DOWNLOAD_WARN_THRESHOLD,
22 : FAILED_REMOTE_OP_RETRIES,
23 : },
24 : span::debug_assert_current_span_has_tenant_id,
25 : storage_layer::{layer::local_layer_path, LayerName, LayerVisibilityHint},
26 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
27 : },
28 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
29 : TEMP_FILE_SUFFIX,
30 : };
31 :
32 : use super::{
33 : heatmap::HeatMapLayer,
34 : scheduler::{
35 : self, period_jitter, period_warmup, Completion, JobGenerator, SchedulingResult,
36 : TenantBackgroundJobs,
37 : },
38 : GetTenantError, SecondaryTenant, SecondaryTenantError,
39 : };
40 :
41 : use crate::tenant::{
42 : mgr::TenantManager,
43 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
44 : };
45 :
46 : use camino::Utf8PathBuf;
47 : use chrono::format::{DelayedFormat, StrftimeItems};
48 : use futures::Future;
49 : use metrics::UIntGauge;
50 : use pageserver_api::models::SecondaryProgress;
51 : use pageserver_api::shard::TenantShardId;
52 : use remote_storage::{DownloadError, DownloadKind, DownloadOpts, Etag, GenericRemoteStorage};
53 :
54 : use tokio_util::sync::CancellationToken;
55 : use tracing::{info_span, instrument, warn, Instrument};
56 : use utils::{
57 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext,
58 : id::TimelineId, pausable_failpoint, serde_system_time,
59 : };
60 :
61 : use super::{
62 : heatmap::{HeatMapTenant, HeatMapTimeline},
63 : CommandRequest, DownloadCommand,
64 : };
65 :
66 : /// For each tenant, default period for how long must have passed since the last download_tenant call before
67 : /// calling it again. This default is replaced with the value of [`HeatMapTenant::upload_period_ms`] after first
68 : /// download, if the uploader populated it.
69 : const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
70 :
71 0 : pub(super) async fn downloader_task(
72 0 : tenant_manager: Arc<TenantManager>,
73 0 : remote_storage: GenericRemoteStorage,
74 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
75 0 : background_jobs_can_start: Barrier,
76 0 : cancel: CancellationToken,
77 0 : root_ctx: RequestContext,
78 0 : ) {
79 0 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
80 0 :
81 0 : let generator = SecondaryDownloader {
82 0 : tenant_manager,
83 0 : remote_storage,
84 0 : root_ctx,
85 0 : };
86 0 : let mut scheduler = Scheduler::new(generator, concurrency);
87 0 :
88 0 : scheduler
89 0 : .run(command_queue, background_jobs_can_start, cancel)
90 0 : .instrument(info_span!("secondary_download_scheduler"))
91 0 : .await
92 0 : }
93 :
94 : struct SecondaryDownloader {
95 : tenant_manager: Arc<TenantManager>,
96 : remote_storage: GenericRemoteStorage,
97 : root_ctx: RequestContext,
98 : }
99 :
100 : #[derive(Debug, Clone)]
101 : pub(super) struct OnDiskState {
102 : metadata: LayerFileMetadata,
103 : access_time: SystemTime,
104 : local_path: Utf8PathBuf,
105 : }
106 :
107 : impl OnDiskState {
108 0 : fn new(
109 0 : _conf: &'static PageServerConf,
110 0 : _tenant_shard_id: &TenantShardId,
111 0 : _imeline_id: &TimelineId,
112 0 : _ame: LayerName,
113 0 : metadata: LayerFileMetadata,
114 0 : access_time: SystemTime,
115 0 : local_path: Utf8PathBuf,
116 0 : ) -> Self {
117 0 : Self {
118 0 : metadata,
119 0 : access_time,
120 0 : local_path,
121 0 : }
122 0 : }
123 :
124 : // This is infallible, because all errors are either acceptable (ENOENT), or totally
125 : // unexpected (fatal).
126 0 : pub(super) fn remove_blocking(&self) {
127 0 : // We tolerate ENOENT, because between planning eviction and executing
128 0 : // it, the secondary downloader could have seen an updated heatmap that
129 0 : // resulted in a layer being deleted.
130 0 : // Other local I/O errors are process-fatal: these should never happen.
131 0 : std::fs::remove_file(&self.local_path)
132 0 : .or_else(fs_ext::ignore_not_found)
133 0 : .fatal_err("Deleting secondary layer")
134 0 : }
135 :
136 0 : pub(crate) fn file_size(&self) -> u64 {
137 0 : self.metadata.file_size
138 0 : }
139 : }
140 :
141 : #[derive(Debug, Clone, Default)]
142 : pub(super) struct SecondaryDetailTimeline {
143 : on_disk_layers: HashMap<LayerName, OnDiskState>,
144 :
145 : /// We remember when layers were evicted, to prevent re-downloading them.
146 : pub(super) evicted_at: HashMap<LayerName, SystemTime>,
147 : }
148 :
149 : impl SecondaryDetailTimeline {
150 0 : pub(super) fn remove_layer(
151 0 : &mut self,
152 0 : name: &LayerName,
153 0 : resident_metric: &UIntGauge,
154 0 : ) -> Option<OnDiskState> {
155 0 : let removed = self.on_disk_layers.remove(name);
156 0 : if let Some(removed) = &removed {
157 0 : resident_metric.sub(removed.file_size());
158 0 : }
159 0 : removed
160 0 : }
161 :
162 : /// `local_path`
163 0 : fn touch_layer<F>(
164 0 : &mut self,
165 0 : conf: &'static PageServerConf,
166 0 : tenant_shard_id: &TenantShardId,
167 0 : timeline_id: &TimelineId,
168 0 : touched: &HeatMapLayer,
169 0 : resident_metric: &UIntGauge,
170 0 : local_path: F,
171 0 : ) where
172 0 : F: FnOnce() -> Utf8PathBuf,
173 0 : {
174 : use std::collections::hash_map::Entry;
175 0 : match self.on_disk_layers.entry(touched.name.clone()) {
176 0 : Entry::Occupied(mut v) => {
177 0 : v.get_mut().access_time = touched.access_time;
178 0 : }
179 0 : Entry::Vacant(e) => {
180 0 : e.insert(OnDiskState::new(
181 0 : conf,
182 0 : tenant_shard_id,
183 0 : timeline_id,
184 0 : touched.name.clone(),
185 0 : touched.metadata.clone(),
186 0 : touched.access_time,
187 0 : local_path(),
188 0 : ));
189 0 : resident_metric.add(touched.metadata.file_size);
190 0 : }
191 : }
192 0 : }
193 : }
194 :
195 : // Aspects of a heatmap that we remember after downloading it
196 : #[derive(Clone, Debug)]
197 : struct DownloadSummary {
198 : etag: Etag,
199 : #[allow(unused)]
200 : mtime: SystemTime,
201 : upload_period: Duration,
202 : }
203 :
204 : /// This state is written by the secondary downloader, it is opaque
205 : /// to TenantManager
206 : #[derive(Debug)]
207 : pub(super) struct SecondaryDetail {
208 : pub(super) config: SecondaryLocationConfig,
209 :
210 : last_download: Option<DownloadSummary>,
211 : next_download: Option<Instant>,
212 : timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
213 : }
214 :
215 : /// Helper for logging SystemTime
216 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
217 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
218 0 : datetime.format("%d/%m/%Y %T")
219 0 : }
220 :
221 : /// Information returned from download function when it detects the heatmap has changed
222 : struct HeatMapModified {
223 : etag: Etag,
224 : last_modified: SystemTime,
225 : bytes: Vec<u8>,
226 : }
227 :
228 : enum HeatMapDownload {
229 : // The heatmap's etag has changed: return the new etag, mtime and the body bytes
230 : Modified(HeatMapModified),
231 : // The heatmap's etag is unchanged
232 : Unmodified,
233 : }
234 :
235 : impl SecondaryDetail {
236 0 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
237 0 : Self {
238 0 : config,
239 0 : last_download: None,
240 0 : next_download: None,
241 0 : timelines: HashMap::new(),
242 0 : }
243 0 : }
244 :
245 : #[cfg(feature = "testing")]
246 0 : pub(crate) fn total_resident_size(&self) -> u64 {
247 0 : self.timelines
248 0 : .values()
249 0 : .map(|tl| {
250 0 : tl.on_disk_layers
251 0 : .values()
252 0 : .map(|v| v.metadata.file_size)
253 0 : .sum::<u64>()
254 0 : })
255 0 : .sum::<u64>()
256 0 : }
257 :
258 0 : pub(super) fn evict_layer(
259 0 : &mut self,
260 0 : name: LayerName,
261 0 : timeline_id: &TimelineId,
262 0 : now: SystemTime,
263 0 : resident_metric: &UIntGauge,
264 0 : ) -> Option<OnDiskState> {
265 0 : let timeline = self.timelines.get_mut(timeline_id)?;
266 0 : let removed = timeline.remove_layer(&name, resident_metric);
267 0 : if removed.is_some() {
268 0 : timeline.evicted_at.insert(name, now);
269 0 : }
270 0 : removed
271 0 : }
272 :
273 0 : pub(super) fn remove_timeline(
274 0 : &mut self,
275 0 : timeline_id: &TimelineId,
276 0 : resident_metric: &UIntGauge,
277 0 : ) {
278 0 : let removed = self.timelines.remove(timeline_id);
279 0 : if let Some(removed) = removed {
280 0 : resident_metric.sub(
281 0 : removed
282 0 : .on_disk_layers
283 0 : .values()
284 0 : .map(|l| l.metadata.file_size)
285 0 : .sum(),
286 0 : );
287 0 : }
288 0 : }
289 :
290 : /// Additionally returns the total number of layers, used for more stable relative access time
291 : /// based eviction.
292 0 : pub(super) fn get_layers_for_eviction(
293 0 : &self,
294 0 : parent: &Arc<SecondaryTenant>,
295 0 : ) -> (DiskUsageEvictionInfo, usize) {
296 0 : let mut result = DiskUsageEvictionInfo::default();
297 0 : let mut total_layers = 0;
298 :
299 0 : for (timeline_id, timeline_detail) in &self.timelines {
300 0 : result
301 0 : .resident_layers
302 0 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
303 0 : EvictionCandidate {
304 0 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
305 0 : secondary_tenant: parent.clone(),
306 0 : timeline_id: *timeline_id,
307 0 : name: name.clone(),
308 0 : metadata: ods.metadata.clone(),
309 0 : }),
310 0 : last_activity_ts: ods.access_time,
311 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
312 0 : // Secondary location layers are presumed visible, because Covered layers
313 0 : // are excluded from the heatmap
314 0 : visibility: LayerVisibilityHint::Visible,
315 0 : }
316 0 : }));
317 0 :
318 0 : // total might be missing currently downloading layers, but as a lower than actual
319 0 : // value it is good enough approximation.
320 0 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
321 0 : }
322 0 : result.max_layer_size = result
323 0 : .resident_layers
324 0 : .iter()
325 0 : .map(|l| l.layer.get_file_size())
326 0 : .max();
327 0 :
328 0 : tracing::debug!(
329 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
330 0 : parent.get_tenant_shard_id(),
331 0 : self.timelines.len(),
332 0 : result.resident_layers.len()
333 : );
334 :
335 0 : (result, total_layers)
336 0 : }
337 : }
338 :
339 : struct PendingDownload {
340 : secondary_state: Arc<SecondaryTenant>,
341 : last_download: Option<DownloadSummary>,
342 : target_time: Option<Instant>,
343 : }
344 :
345 : impl scheduler::PendingJob for PendingDownload {
346 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
347 0 : self.secondary_state.get_tenant_shard_id()
348 0 : }
349 : }
350 :
351 : struct RunningDownload {
352 : barrier: Barrier,
353 : }
354 :
355 : impl scheduler::RunningJob for RunningDownload {
356 0 : fn get_barrier(&self) -> Barrier {
357 0 : self.barrier.clone()
358 0 : }
359 : }
360 :
361 : struct CompleteDownload {
362 : secondary_state: Arc<SecondaryTenant>,
363 : completed_at: Instant,
364 : result: Result<(), UpdateError>,
365 : }
366 :
367 : impl scheduler::Completion for CompleteDownload {
368 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
369 0 : self.secondary_state.get_tenant_shard_id()
370 0 : }
371 : }
372 :
373 : type Scheduler = TenantBackgroundJobs<
374 : SecondaryDownloader,
375 : PendingDownload,
376 : RunningDownload,
377 : CompleteDownload,
378 : DownloadCommand,
379 : >;
380 :
381 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
382 : for SecondaryDownloader
383 : {
384 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
385 : fn on_completion(&mut self, completion: CompleteDownload) {
386 : let CompleteDownload {
387 : secondary_state,
388 : completed_at: _completed_at,
389 : result,
390 : } = completion;
391 :
392 : tracing::debug!("Secondary tenant download completed");
393 :
394 : let mut detail = secondary_state.detail.lock().unwrap();
395 :
396 : match result {
397 : Err(UpdateError::Restart) => {
398 : // Start downloading again as soon as we can. This will involve waiting for the scheduler's
399 : // scheduling interval. This slightly reduces the peak download speed of tenants that hit their
400 : // deadline and keep restarting, but that also helps give other tenants a chance to execute rather
401 : // that letting one big tenant dominate for a long time.
402 : detail.next_download = Some(Instant::now());
403 : }
404 : _ => {
405 : let period = detail
406 : .last_download
407 : .as_ref()
408 0 : .map(|d| d.upload_period)
409 : .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
410 :
411 : // We advance next_download irrespective of errors: we don't want error cases to result in
412 : // expensive busy-polling.
413 : detail.next_download = Some(Instant::now() + period_jitter(period, 5));
414 : }
415 : }
416 : }
417 :
418 0 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
419 0 : let mut result = SchedulingResult {
420 0 : jobs: Vec::new(),
421 0 : want_interval: None,
422 0 : };
423 0 :
424 0 : // Step 1: identify some tenants that we may work on
425 0 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
426 0 : self.tenant_manager
427 0 : .foreach_secondary_tenants(|_id, secondary_state| {
428 0 : tenants.push(secondary_state.clone());
429 0 : });
430 0 :
431 0 : // Step 2: filter out tenants which are not yet elegible to run
432 0 : let now = Instant::now();
433 0 : result.jobs = tenants
434 0 : .into_iter()
435 0 : .filter_map(|secondary_tenant| {
436 0 : let (last_download, next_download) = {
437 0 : let mut detail = secondary_tenant.detail.lock().unwrap();
438 0 :
439 0 : if !detail.config.warm {
440 : // Downloads are disabled for this tenant
441 0 : detail.next_download = None;
442 0 : return None;
443 0 : }
444 0 :
445 0 : if detail.next_download.is_none() {
446 0 : // Initialize randomly in the range from 0 to our interval: this uniformly spreads the start times. Subsequent
447 0 : // rounds will use a smaller jitter to avoid accidentally synchronizing later.
448 0 : detail.next_download = Some(now.checked_add(period_warmup(DEFAULT_DOWNLOAD_INTERVAL)).expect(
449 0 : "Using our constant, which is known to be small compared with clock range",
450 0 : ));
451 0 : }
452 0 : (detail.last_download.clone(), detail.next_download.unwrap())
453 0 : };
454 0 :
455 0 : if now > next_download {
456 0 : Some(PendingDownload {
457 0 : secondary_state: secondary_tenant,
458 0 : last_download,
459 0 : target_time: Some(next_download),
460 0 : })
461 : } else {
462 0 : None
463 : }
464 0 : })
465 0 : .collect();
466 0 :
467 0 : // Step 3: sort by target execution time to run most urgent first.
468 0 : result.jobs.sort_by_key(|j| j.target_time);
469 0 :
470 0 : result
471 0 : }
472 :
473 0 : fn on_command(
474 0 : &mut self,
475 0 : command: DownloadCommand,
476 0 : ) -> Result<PendingDownload, SecondaryTenantError> {
477 0 : let tenant_shard_id = command.get_tenant_shard_id();
478 :
479 0 : let tenant = self
480 0 : .tenant_manager
481 0 : .get_secondary_tenant_shard(*tenant_shard_id)
482 0 : .ok_or(GetTenantError::ShardNotFound(*tenant_shard_id))?;
483 :
484 0 : Ok(PendingDownload {
485 0 : target_time: None,
486 0 : last_download: None,
487 0 : secondary_state: tenant,
488 0 : })
489 0 : }
490 :
491 0 : fn spawn(
492 0 : &mut self,
493 0 : job: PendingDownload,
494 0 : ) -> (
495 0 : RunningDownload,
496 0 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
497 0 : ) {
498 0 : let PendingDownload {
499 0 : secondary_state,
500 0 : last_download,
501 0 : target_time,
502 0 : } = job;
503 0 :
504 0 : let (completion, barrier) = utils::completion::channel();
505 0 : let remote_storage = self.remote_storage.clone();
506 0 : let conf = self.tenant_manager.get_conf();
507 0 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
508 0 : let download_ctx = self.root_ctx.attached_child();
509 0 : (RunningDownload { barrier }, Box::pin(async move {
510 0 : let _completion = completion;
511 :
512 0 : let result = TenantDownloader::new(conf, &remote_storage, &secondary_state)
513 0 : .download(&download_ctx)
514 0 : .await;
515 0 : match &result
516 : {
517 : Err(UpdateError::NoData) => {
518 0 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
519 : },
520 : Err(UpdateError::NoSpace) => {
521 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
522 : }
523 : Err(UpdateError::Cancelled) => {
524 0 : tracing::info!("Shut down while downloading");
525 : },
526 0 : Err(UpdateError::Deserialize(e)) => {
527 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
528 : },
529 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
530 0 : tracing::error!("Error while downloading tenant: {e}");
531 : },
532 : Err(UpdateError::Restart) => {
533 0 : tracing::info!("Download reached deadline & will restart to update heatmap")
534 : }
535 0 : Ok(()) => {}
536 : };
537 :
538 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
539 :
540 : // If the job had a target execution time, we may check our final execution
541 : // time against that for observability purposes.
542 0 : if let (Some(target_time), Some(last_download)) = (target_time, last_download) {
543 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
544 0 : let elapsed = Instant::now().duration_since(target_time);
545 0 :
546 0 : warn_when_period_overrun(
547 0 : elapsed,
548 0 : last_download.upload_period,
549 0 : BackgroundLoopKind::SecondaryDownload,
550 0 : );
551 0 : }
552 :
553 0 : CompleteDownload {
554 0 : secondary_state,
555 0 : completed_at: Instant::now(),
556 0 : result
557 0 : }
558 0 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
559 0 : }
560 : }
561 :
562 : enum LayerAction {
563 : Download,
564 : NoAction,
565 : Skip,
566 : Touch,
567 : }
568 :
569 : /// This type is a convenience to group together the various functions involved in
570 : /// freshening a secondary tenant.
571 : struct TenantDownloader<'a> {
572 : conf: &'static PageServerConf,
573 : remote_storage: &'a GenericRemoteStorage,
574 : secondary_state: &'a SecondaryTenant,
575 : }
576 :
577 : /// Errors that may be encountered while updating a tenant
578 : #[derive(thiserror::Error, Debug)]
579 : enum UpdateError {
580 : /// This is not a true failure, but it's how a download indicates that it would like to be restarted by
581 : /// the scheduler, to pick up the latest heatmap
582 : #[error("Reached deadline, restarting downloads")]
583 : Restart,
584 :
585 : #[error("No remote data found")]
586 : NoData,
587 : #[error("Insufficient local storage space")]
588 : NoSpace,
589 : #[error("Failed to download")]
590 : DownloadError(DownloadError),
591 : #[error(transparent)]
592 : Deserialize(#[from] serde_json::Error),
593 : #[error("Cancelled")]
594 : Cancelled,
595 : #[error(transparent)]
596 : Other(#[from] anyhow::Error),
597 : }
598 :
599 : impl From<DownloadError> for UpdateError {
600 0 : fn from(value: DownloadError) -> Self {
601 0 : match &value {
602 0 : DownloadError::Cancelled => Self::Cancelled,
603 0 : DownloadError::NotFound => Self::NoData,
604 0 : _ => Self::DownloadError(value),
605 : }
606 0 : }
607 : }
608 :
609 : impl From<std::io::Error> for UpdateError {
610 0 : fn from(value: std::io::Error) -> Self {
611 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
612 0 : UpdateError::NoSpace
613 0 : } else if value
614 0 : .get_ref()
615 0 : .and_then(|x| x.downcast_ref::<DownloadError>())
616 0 : .is_some()
617 : {
618 0 : UpdateError::from(DownloadError::from(value))
619 : } else {
620 : // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
621 0 : UpdateError::Other(anyhow::anyhow!(value))
622 : }
623 0 : }
624 : }
625 :
626 : impl<'a> TenantDownloader<'a> {
627 0 : fn new(
628 0 : conf: &'static PageServerConf,
629 0 : remote_storage: &'a GenericRemoteStorage,
630 0 : secondary_state: &'a SecondaryTenant,
631 0 : ) -> Self {
632 0 : Self {
633 0 : conf,
634 0 : remote_storage,
635 0 : secondary_state,
636 0 : }
637 0 : }
638 :
639 0 : async fn download(&self, ctx: &RequestContext) -> Result<(), UpdateError> {
640 0 : debug_assert_current_span_has_tenant_id();
641 :
642 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
643 : // cover our access to local storage.
644 0 : let Ok(_guard) = self.secondary_state.gate.enter() else {
645 : // Shutting down
646 0 : return Err(UpdateError::Cancelled);
647 : };
648 :
649 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
650 0 :
651 0 : // We will use the etag from last successful download to make the download conditional on changes
652 0 : let last_download = self
653 0 : .secondary_state
654 0 : .detail
655 0 : .lock()
656 0 : .unwrap()
657 0 : .last_download
658 0 : .clone();
659 :
660 : // Download the tenant's heatmap
661 : let HeatMapModified {
662 0 : last_modified: heatmap_mtime,
663 0 : etag: heatmap_etag,
664 0 : bytes: heatmap_bytes,
665 0 : } = match tokio::select!(
666 0 : bytes = self.download_heatmap(last_download.as_ref().map(|d| &d.etag)) => {bytes?},
667 0 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
668 : ) {
669 : HeatMapDownload::Unmodified => {
670 0 : tracing::info!("Heatmap unchanged since last successful download");
671 0 : return Ok(());
672 : }
673 0 : HeatMapDownload::Modified(m) => m,
674 : };
675 :
676 0 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
677 :
678 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
679 : // layer metadata without having to re-download it.
680 0 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
681 0 :
682 0 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
683 0 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
684 0 : let heatmap_path_bg = heatmap_path.clone();
685 0 : VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
686 0 : .await
687 0 : .maybe_fatal_err(&context_msg)?;
688 :
689 0 : tracing::debug!(
690 0 : "Wrote local heatmap to {}, with {} timelines",
691 0 : heatmap_path,
692 0 : heatmap.timelines.len()
693 : );
694 :
695 : // Get or initialize the local disk state for the timelines we will update
696 0 : let mut timeline_states = HashMap::new();
697 0 : for timeline in &heatmap.timelines {
698 0 : let timeline_state = self
699 0 : .secondary_state
700 0 : .detail
701 0 : .lock()
702 0 : .unwrap()
703 0 : .timelines
704 0 : .get(&timeline.timeline_id)
705 0 : .cloned();
706 :
707 0 : let timeline_state = match timeline_state {
708 0 : Some(t) => t,
709 : None => {
710 : // We have no existing state: need to scan local disk for layers first.
711 0 : let timeline_state = init_timeline_state(
712 0 : self.conf,
713 0 : tenant_shard_id,
714 0 : timeline,
715 0 : &self.secondary_state.resident_size_metric,
716 0 : )
717 0 : .await;
718 :
719 : // Re-acquire detail lock now that we're done with async load from local FS
720 0 : self.secondary_state
721 0 : .detail
722 0 : .lock()
723 0 : .unwrap()
724 0 : .timelines
725 0 : .insert(timeline.timeline_id, timeline_state.clone());
726 0 : timeline_state
727 : }
728 : };
729 :
730 0 : timeline_states.insert(timeline.timeline_id, timeline_state);
731 : }
732 :
733 : // Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general
734 : // principle that deletions should be done before writes wherever possible, and so that we can use this
735 : // phase to initialize our SecondaryProgress.
736 : {
737 0 : *self.secondary_state.progress.lock().unwrap() =
738 0 : self.prepare_timelines(&heatmap, heatmap_mtime).await?;
739 : }
740 :
741 : // Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again,
742 : // so that we are always using reasonably a fresh heatmap. Otherwise, if we had really huge content to download, we might
743 : // spend 10s of minutes downloading layers we don't need.
744 : // (see https://github.com/neondatabase/neon/issues/8182)
745 0 : let deadline = {
746 0 : let period = self
747 0 : .secondary_state
748 0 : .detail
749 0 : .lock()
750 0 : .unwrap()
751 0 : .last_download
752 0 : .as_ref()
753 0 : .map(|d| d.upload_period)
754 0 : .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
755 0 :
756 0 : // Use double the period: we are not promising to complete within the period, this is just a heuristic
757 0 : // to keep using a "reasonably fresh" heatmap.
758 0 : Instant::now() + period * 2
759 : };
760 :
761 : // Download the layers in the heatmap
762 0 : for timeline in heatmap.timelines {
763 0 : let timeline_state = timeline_states
764 0 : .remove(&timeline.timeline_id)
765 0 : .expect("Just populated above");
766 0 :
767 0 : if self.secondary_state.cancel.is_cancelled() {
768 0 : tracing::debug!(
769 0 : "Cancelled before downloading timeline {}",
770 : timeline.timeline_id
771 : );
772 0 : return Ok(());
773 0 : }
774 0 :
775 0 : let timeline_id = timeline.timeline_id;
776 0 : self.download_timeline(timeline, timeline_state, deadline, ctx)
777 0 : .instrument(tracing::info_span!(
778 : "secondary_download_timeline",
779 : tenant_id=%tenant_shard_id.tenant_id,
780 0 : shard_id=%tenant_shard_id.shard_slug(),
781 : %timeline_id
782 : ))
783 0 : .await?;
784 : }
785 :
786 : // Metrics consistency check in testing builds
787 0 : self.secondary_state.validate_metrics();
788 0 : // Only update last_etag after a full successful download: this way will not skip
789 0 : // the next download, even if the heatmap's actual etag is unchanged.
790 0 : self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
791 0 : etag: heatmap_etag,
792 0 : mtime: heatmap_mtime,
793 0 : upload_period: heatmap
794 0 : .upload_period_ms
795 0 : .map(|ms| Duration::from_millis(ms as u64))
796 0 : .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL),
797 0 : });
798 0 :
799 0 : // Robustness: we should have updated progress properly, but in case we didn't, make sure
800 0 : // we don't leave the tenant in a state where we claim to have successfully downloaded
801 0 : // everything, but our progress is incomplete. The invariant here should be that if
802 0 : // we have set `last_download` to this heatmap's etag, then the next time we see that
803 0 : // etag we can safely do no work (i.e. we must be complete).
804 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
805 0 : debug_assert!(progress.layers_downloaded == progress.layers_total);
806 0 : debug_assert!(progress.bytes_downloaded == progress.bytes_total);
807 0 : if progress.layers_downloaded != progress.layers_total
808 0 : || progress.bytes_downloaded != progress.bytes_total
809 : {
810 0 : tracing::warn!("Correcting drift in progress stats ({progress:?})");
811 0 : progress.layers_downloaded = progress.layers_total;
812 0 : progress.bytes_downloaded = progress.bytes_total;
813 0 : }
814 :
815 0 : Ok(())
816 0 : }
817 :
818 : /// Do any fast local cleanup that comes before the much slower process of downloading
819 : /// layers from remote storage. In the process, initialize the SecondaryProgress object
820 : /// that will later be updated incrementally as we download layers.
821 0 : async fn prepare_timelines(
822 0 : &self,
823 0 : heatmap: &HeatMapTenant,
824 0 : heatmap_mtime: SystemTime,
825 0 : ) -> Result<SecondaryProgress, UpdateError> {
826 0 : let heatmap_stats = heatmap.get_stats();
827 0 : // We will construct a progress object, and then populate its initial "downloaded" numbers
828 0 : // while iterating through local layer state in [`Self::prepare_timelines`]
829 0 : let mut progress = SecondaryProgress {
830 0 : layers_total: heatmap_stats.layers,
831 0 : bytes_total: heatmap_stats.bytes,
832 0 : heatmap_mtime: Some(serde_system_time::SystemTime(heatmap_mtime)),
833 0 : layers_downloaded: 0,
834 0 : bytes_downloaded: 0,
835 0 : };
836 0 :
837 0 : // Also expose heatmap bytes_total as a metric
838 0 : self.secondary_state
839 0 : .heatmap_total_size_metric
840 0 : .set(heatmap_stats.bytes);
841 0 :
842 0 : // Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
843 0 : let mut delete_layers = Vec::new();
844 0 : let mut delete_timelines = Vec::new();
845 0 : {
846 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
847 0 : for (timeline_id, timeline_state) in &mut detail.timelines {
848 0 : let Some(heatmap_timeline_index) = heatmap
849 0 : .timelines
850 0 : .iter()
851 0 : .position(|t| t.timeline_id == *timeline_id)
852 : else {
853 : // This timeline is no longer referenced in the heatmap: delete it locally
854 0 : delete_timelines.push(*timeline_id);
855 0 : continue;
856 : };
857 :
858 0 : let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
859 0 :
860 0 : let layers_in_heatmap = heatmap_timeline
861 0 : .layers
862 0 : .iter()
863 0 : .map(|l| (&l.name, l.metadata.generation))
864 0 : .collect::<HashSet<_>>();
865 0 : let layers_on_disk = timeline_state
866 0 : .on_disk_layers
867 0 : .iter()
868 0 : .map(|l| (l.0, l.1.metadata.generation))
869 0 : .collect::<HashSet<_>>();
870 0 :
871 0 : let mut layer_count = layers_on_disk.len();
872 0 : let mut layer_byte_count: u64 = timeline_state
873 0 : .on_disk_layers
874 0 : .values()
875 0 : .map(|l| l.metadata.file_size)
876 0 : .sum();
877 :
878 : // Remove on-disk layers that are no longer present in heatmap
879 0 : for (layer_file_name, generation) in layers_on_disk.difference(&layers_in_heatmap) {
880 0 : layer_count -= 1;
881 0 : layer_byte_count -= timeline_state
882 0 : .on_disk_layers
883 0 : .get(layer_file_name)
884 0 : .unwrap()
885 0 : .metadata
886 0 : .file_size;
887 0 :
888 0 : let local_path = local_layer_path(
889 0 : self.conf,
890 0 : self.secondary_state.get_tenant_shard_id(),
891 0 : timeline_id,
892 0 : layer_file_name,
893 0 : generation,
894 0 : );
895 0 :
896 0 : delete_layers.push((*timeline_id, (*layer_file_name).clone(), local_path));
897 0 : }
898 :
899 0 : progress.bytes_downloaded += layer_byte_count;
900 0 : progress.layers_downloaded += layer_count;
901 : }
902 :
903 0 : for delete_timeline in &delete_timelines {
904 0 : // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
905 0 : // from disk fails that will be a fatal error.
906 0 : detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric);
907 0 : }
908 : }
909 :
910 : // Execute accumulated deletions
911 0 : for (timeline_id, layer_name, local_path) in delete_layers {
912 0 : tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
913 :
914 0 : tokio::fs::remove_file(&local_path)
915 0 : .await
916 0 : .or_else(fs_ext::ignore_not_found)
917 0 : .maybe_fatal_err("Removing secondary layer")?;
918 :
919 : // Update in-memory housekeeping to reflect the absence of the deleted layer
920 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
921 0 : let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
922 0 : continue;
923 : };
924 0 : timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric);
925 : }
926 :
927 0 : for timeline_id in delete_timelines {
928 0 : let timeline_path = self
929 0 : .conf
930 0 : .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
931 0 : tracing::info!(timeline_id=%timeline_id,
932 0 : "Timeline no longer in heatmap, removing from secondary location"
933 : );
934 0 : tokio::fs::remove_dir_all(&timeline_path)
935 0 : .await
936 0 : .or_else(fs_ext::ignore_not_found)
937 0 : .maybe_fatal_err("Removing secondary timeline")?;
938 : }
939 :
940 0 : Ok(progress)
941 0 : }
942 :
943 : /// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object
944 : /// still matches `prev_etag`.
945 0 : async fn download_heatmap(
946 0 : &self,
947 0 : prev_etag: Option<&Etag>,
948 0 : ) -> Result<HeatMapDownload, UpdateError> {
949 0 : debug_assert_current_span_has_tenant_id();
950 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
951 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
952 :
953 0 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
954 0 : let cancel = &self.secondary_state.cancel;
955 0 : let opts = DownloadOpts {
956 0 : etag: prev_etag.cloned(),
957 0 : kind: DownloadKind::Small,
958 0 : ..Default::default()
959 0 : };
960 0 :
961 0 : backoff::retry(
962 0 : || async {
963 0 : let download = match self
964 0 : .remote_storage
965 0 : .download(&heatmap_path, &opts, cancel)
966 0 : .await
967 : {
968 0 : Ok(download) => download,
969 0 : Err(DownloadError::Unmodified) => return Ok(HeatMapDownload::Unmodified),
970 0 : Err(err) => return Err(err.into()),
971 : };
972 :
973 0 : let mut heatmap_bytes = Vec::new();
974 0 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
975 0 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
976 0 : Ok(HeatMapDownload::Modified(HeatMapModified {
977 0 : etag: download.etag,
978 0 : last_modified: download.last_modified,
979 0 : bytes: heatmap_bytes,
980 0 : }))
981 0 : },
982 0 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
983 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
984 0 : FAILED_REMOTE_OP_RETRIES,
985 0 : "download heatmap",
986 0 : cancel,
987 0 : )
988 0 : .await
989 0 : .ok_or_else(|| UpdateError::Cancelled)
990 0 : .and_then(|x| x)
991 0 : .inspect(|_| SECONDARY_MODE.download_heatmap.inc())
992 0 : }
993 :
994 : /// Download heatmap layers that are not present on local disk, or update their
995 : /// access time if they are already present.
996 0 : async fn download_timeline_layers(
997 0 : &self,
998 0 : tenant_shard_id: &TenantShardId,
999 0 : timeline: HeatMapTimeline,
1000 0 : timeline_state: SecondaryDetailTimeline,
1001 0 : deadline: Instant,
1002 0 : ctx: &RequestContext,
1003 0 : ) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
1004 0 : // Accumulate updates to the state
1005 0 : let mut touched = Vec::new();
1006 :
1007 0 : for layer in timeline.layers {
1008 0 : if self.secondary_state.cancel.is_cancelled() {
1009 0 : tracing::debug!("Cancelled -- dropping out of layer loop");
1010 0 : return (Err(UpdateError::Cancelled), touched);
1011 0 : }
1012 0 :
1013 0 : if Instant::now() > deadline {
1014 : // We've been running downloads for a while, restart to download latest heatmap.
1015 0 : return (Err(UpdateError::Restart), touched);
1016 0 : }
1017 0 :
1018 0 : match self.layer_action(&timeline_state, &layer).await {
1019 0 : LayerAction::Download => (),
1020 0 : LayerAction::NoAction => continue,
1021 : LayerAction::Skip => {
1022 0 : self.skip_layer(layer);
1023 0 : continue;
1024 : }
1025 : LayerAction::Touch => {
1026 0 : touched.push(layer);
1027 0 : continue;
1028 : }
1029 : }
1030 :
1031 0 : match self
1032 0 : .download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
1033 0 : .await
1034 : {
1035 0 : Ok(Some(layer)) => touched.push(layer),
1036 0 : Ok(None) => {
1037 0 : // Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
1038 0 : // things to consider touched.
1039 0 : }
1040 0 : Err(e) => {
1041 0 : return (Err(e), touched);
1042 : }
1043 : }
1044 : }
1045 :
1046 0 : (Ok(()), touched)
1047 0 : }
1048 :
1049 0 : async fn layer_action(
1050 0 : &self,
1051 0 : timeline_state: &SecondaryDetailTimeline,
1052 0 : layer: &HeatMapLayer,
1053 0 : ) -> LayerAction {
1054 : // Existing on-disk layers: just update their access time.
1055 0 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
1056 0 : tracing::debug!("Layer {} is already on disk", layer.name);
1057 :
1058 0 : if cfg!(debug_assertions) {
1059 : // Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
1060 : // are already present on disk are really there.
1061 0 : match tokio::fs::metadata(&on_disk.local_path).await {
1062 0 : Ok(meta) => {
1063 0 : tracing::debug!(
1064 0 : "Layer {} present at {}, size {}",
1065 0 : layer.name,
1066 0 : on_disk.local_path,
1067 0 : meta.len(),
1068 : );
1069 : }
1070 0 : Err(e) => {
1071 0 : tracing::warn!(
1072 0 : "Layer {} not found at {} ({})",
1073 : layer.name,
1074 : on_disk.local_path,
1075 : e
1076 : );
1077 0 : debug_assert!(false);
1078 : }
1079 : }
1080 0 : }
1081 :
1082 0 : if on_disk.metadata.generation_file_size() != on_disk.metadata.generation_file_size() {
1083 0 : tracing::info!(
1084 0 : "Re-downloading layer {} with changed size or generation: {:?}->{:?}",
1085 0 : layer.name,
1086 0 : on_disk.metadata.generation_file_size(),
1087 0 : on_disk.metadata.generation_file_size()
1088 : );
1089 0 : return LayerAction::Download;
1090 0 : }
1091 0 : if on_disk.metadata != layer.metadata || on_disk.access_time != layer.access_time {
1092 : // We already have this layer on disk. Update its access time.
1093 0 : tracing::debug!(
1094 0 : "Access time updated for layer {}: {} -> {}",
1095 0 : layer.name,
1096 0 : strftime(&on_disk.access_time),
1097 0 : strftime(&layer.access_time)
1098 : );
1099 0 : return LayerAction::Touch;
1100 0 : }
1101 0 : return LayerAction::NoAction;
1102 : } else {
1103 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
1104 : }
1105 :
1106 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
1107 : // recently than it was evicted.
1108 0 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
1109 0 : if &layer.access_time > evicted_at {
1110 0 : tracing::info!(
1111 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
1112 0 : layer.name,
1113 0 : strftime(&layer.access_time),
1114 0 : strftime(evicted_at)
1115 : );
1116 : } else {
1117 0 : tracing::trace!(
1118 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
1119 0 : layer.name,
1120 0 : strftime(&layer.access_time),
1121 0 : strftime(evicted_at)
1122 : );
1123 0 : return LayerAction::Skip;
1124 : }
1125 0 : }
1126 0 : LayerAction::Download
1127 0 : }
1128 :
1129 0 : async fn download_timeline(
1130 0 : &self,
1131 0 : timeline: HeatMapTimeline,
1132 0 : timeline_state: SecondaryDetailTimeline,
1133 0 : deadline: Instant,
1134 0 : ctx: &RequestContext,
1135 0 : ) -> Result<(), UpdateError> {
1136 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1137 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
1138 0 : let timeline_id = timeline.timeline_id;
1139 0 :
1140 0 : tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
1141 :
1142 0 : let (result, touched) = self
1143 0 : .download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
1144 0 : .await;
1145 :
1146 : // Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
1147 : {
1148 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
1149 0 : let timeline_detail = detail.timelines.entry(timeline_id).or_default();
1150 0 :
1151 0 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
1152 0 : touched.into_iter().for_each(|t| {
1153 0 : timeline_detail.touch_layer(
1154 0 : self.conf,
1155 0 : tenant_shard_id,
1156 0 : &timeline_id,
1157 0 : &t,
1158 0 : &self.secondary_state.resident_size_metric,
1159 0 : || {
1160 0 : local_layer_path(
1161 0 : self.conf,
1162 0 : tenant_shard_id,
1163 0 : &timeline_id,
1164 0 : &t.name,
1165 0 : &t.metadata.generation,
1166 0 : )
1167 0 : },
1168 0 : )
1169 0 : });
1170 0 : }
1171 0 :
1172 0 : result
1173 0 : }
1174 :
1175 : /// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
1176 0 : fn skip_layer(&self, layer: HeatMapLayer) {
1177 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
1178 0 : progress.layers_total = progress.layers_total.saturating_sub(1);
1179 0 : progress.bytes_total = progress
1180 0 : .bytes_total
1181 0 : .saturating_sub(layer.metadata.file_size);
1182 0 : }
1183 :
1184 0 : async fn download_layer(
1185 0 : &self,
1186 0 : tenant_shard_id: &TenantShardId,
1187 0 : timeline_id: &TimelineId,
1188 0 : layer: HeatMapLayer,
1189 0 : ctx: &RequestContext,
1190 0 : ) -> Result<Option<HeatMapLayer>, UpdateError> {
1191 0 : // Failpoints for simulating slow remote storage
1192 0 : failpoint_support::sleep_millis_async!(
1193 : "secondary-layer-download-sleep",
1194 0 : &self.secondary_state.cancel
1195 : );
1196 :
1197 0 : pausable_failpoint!("secondary-layer-download-pausable");
1198 :
1199 0 : let local_path = local_layer_path(
1200 0 : self.conf,
1201 0 : tenant_shard_id,
1202 0 : timeline_id,
1203 0 : &layer.name,
1204 0 : &layer.metadata.generation,
1205 0 : );
1206 0 :
1207 0 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
1208 0 : tracing::info!(
1209 0 : "Starting download of layer {}, size {}",
1210 : layer.name,
1211 : layer.metadata.file_size
1212 : );
1213 0 : let downloaded_bytes = download_layer_file(
1214 0 : self.conf,
1215 0 : self.remote_storage,
1216 0 : *tenant_shard_id,
1217 0 : *timeline_id,
1218 0 : &layer.name,
1219 0 : &layer.metadata,
1220 0 : &local_path,
1221 0 : &self.secondary_state.gate,
1222 0 : &self.secondary_state.cancel,
1223 0 : ctx,
1224 0 : )
1225 0 : .await;
1226 :
1227 0 : let downloaded_bytes = match downloaded_bytes {
1228 0 : Ok(bytes) => bytes,
1229 : Err(DownloadError::NotFound) => {
1230 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
1231 : // This is harmless: continue to download the next layer. It is expected during compaction
1232 : // GC.
1233 0 : tracing::debug!(
1234 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
1235 : layer.name
1236 : );
1237 0 : self.skip_layer(layer);
1238 0 :
1239 0 : return Ok(None);
1240 : }
1241 0 : Err(e) => return Err(e.into()),
1242 : };
1243 :
1244 0 : if downloaded_bytes != layer.metadata.file_size {
1245 0 : let local_path = local_layer_path(
1246 0 : self.conf,
1247 0 : tenant_shard_id,
1248 0 : timeline_id,
1249 0 : &layer.name,
1250 0 : &layer.metadata.generation,
1251 0 : );
1252 0 :
1253 0 : tracing::warn!(
1254 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
1255 : layer.name,
1256 : downloaded_bytes,
1257 : layer.metadata.file_size
1258 : );
1259 :
1260 0 : tokio::fs::remove_file(&local_path)
1261 0 : .await
1262 0 : .or_else(fs_ext::ignore_not_found)?;
1263 : } else {
1264 0 : tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
1265 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
1266 0 : progress.bytes_downloaded += downloaded_bytes;
1267 0 : progress.layers_downloaded += 1;
1268 : }
1269 :
1270 0 : SECONDARY_MODE.download_layer.inc();
1271 0 :
1272 0 : Ok(Some(layer))
1273 0 : }
1274 : }
1275 :
1276 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
1277 0 : async fn init_timeline_state(
1278 0 : conf: &'static PageServerConf,
1279 0 : tenant_shard_id: &TenantShardId,
1280 0 : heatmap: &HeatMapTimeline,
1281 0 : resident_metric: &UIntGauge,
1282 0 : ) -> SecondaryDetailTimeline {
1283 0 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
1284 0 : let mut detail = SecondaryDetailTimeline::default();
1285 :
1286 0 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
1287 0 : Ok(d) => d,
1288 0 : Err(e) => {
1289 0 : if e.kind() == std::io::ErrorKind::NotFound {
1290 0 : let context = format!("Creating timeline directory {timeline_path}");
1291 0 : tracing::info!("{}", context);
1292 0 : tokio::fs::create_dir_all(&timeline_path)
1293 0 : .await
1294 0 : .fatal_err(&context);
1295 0 :
1296 0 : // No entries to report: drop out.
1297 0 : return detail;
1298 : } else {
1299 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
1300 : }
1301 : }
1302 : };
1303 :
1304 : // As we iterate through layers found on disk, we will look up their metadata from this map.
1305 : // Layers not present in metadata will be discarded.
1306 0 : let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
1307 0 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
1308 :
1309 0 : while let Some(dentry) = dir
1310 0 : .next_entry()
1311 0 : .await
1312 0 : .fatal_err(&format!("Listing {timeline_path}"))
1313 : {
1314 0 : let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
1315 0 : tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
1316 0 : continue;
1317 : };
1318 0 : let local_meta = dentry
1319 0 : .metadata()
1320 0 : .await
1321 0 : .fatal_err(&format!("Read metadata on {}", file_path));
1322 0 :
1323 0 : let file_name = file_path.file_name().expect("created it from the dentry");
1324 0 : if crate::is_temporary(&file_path)
1325 0 : || is_temp_download_file(&file_path)
1326 0 : || is_ephemeral_file(file_name)
1327 : {
1328 : // Temporary files are frequently left behind from restarting during downloads
1329 0 : tracing::info!("Cleaning up temporary file {file_path}");
1330 0 : if let Err(e) = tokio::fs::remove_file(&file_path)
1331 0 : .await
1332 0 : .or_else(fs_ext::ignore_not_found)
1333 : {
1334 0 : tracing::error!("Failed to remove temporary file {file_path}: {e}");
1335 0 : }
1336 0 : continue;
1337 0 : }
1338 0 :
1339 0 : match LayerName::from_str(file_name) {
1340 0 : Ok(name) => {
1341 0 : let remote_meta = heatmap_metadata.get(&name);
1342 0 : match remote_meta {
1343 0 : Some(remote_meta) => {
1344 0 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
1345 0 : if local_meta.len() != remote_meta.metadata.file_size {
1346 : // This should not happen, because we do crashsafe write-then-rename when downloading
1347 : // layers, and layers in remote storage are immutable. Remove the local file because
1348 : // we cannot trust it.
1349 0 : tracing::warn!(
1350 0 : "Removing local layer {name} with unexpected local size {} != {}",
1351 0 : local_meta.len(),
1352 : remote_meta.metadata.file_size
1353 : );
1354 0 : } else {
1355 0 : // We expect the access time to be initialized immediately afterwards, when
1356 0 : // the latest heatmap is applied to the state.
1357 0 : detail.touch_layer(
1358 0 : conf,
1359 0 : tenant_shard_id,
1360 0 : &heatmap.timeline_id,
1361 0 : remote_meta,
1362 0 : resident_metric,
1363 0 : || file_path,
1364 0 : );
1365 0 : }
1366 : }
1367 : None => {
1368 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
1369 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
1370 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
1371 0 : tracing::info!(
1372 0 : "Removing secondary local layer {} because it's absent in heatmap",
1373 : name
1374 : );
1375 0 : tokio::fs::remove_file(&dentry.path())
1376 0 : .await
1377 0 : .or_else(fs_ext::ignore_not_found)
1378 0 : .fatal_err(&format!(
1379 0 : "Removing layer {}",
1380 0 : dentry.path().to_string_lossy()
1381 0 : ));
1382 : }
1383 : }
1384 : }
1385 : Err(_) => {
1386 : // Ignore it.
1387 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
1388 : }
1389 : }
1390 : }
1391 :
1392 0 : detail
1393 0 : }
|