Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : disk_usage_eviction_task::{
12 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
13 : },
14 : metrics::SECONDARY_MODE,
15 : tenant::{
16 : config::SecondaryLocationConfig,
17 : debug_assert_current_span_has_tenant_and_timeline_id,
18 : remote_timeline_client::{
19 : index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
20 : },
21 : span::debug_assert_current_span_has_tenant_id,
22 : storage_layer::LayerFileName,
23 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
24 : },
25 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
26 : METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
27 : };
28 :
29 : use super::{
30 : heatmap::HeatMapLayer,
31 : scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
32 : SecondaryTenant,
33 : };
34 :
35 : use crate::tenant::{
36 : mgr::TenantManager,
37 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
38 : };
39 :
40 : use camino::Utf8PathBuf;
41 : use chrono::format::{DelayedFormat, StrftimeItems};
42 : use futures::Future;
43 : use pageserver_api::shard::TenantShardId;
44 : use rand::Rng;
45 : use remote_storage::{DownloadError, GenericRemoteStorage};
46 :
47 : use tokio_util::sync::CancellationToken;
48 : use tracing::{info_span, instrument, warn, Instrument};
49 : use utils::{
50 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
51 : };
52 :
53 : use super::{
54 : heatmap::{HeatMapTenant, HeatMapTimeline},
55 : CommandRequest, DownloadCommand,
56 : };
57 :
58 : /// For each tenant, how long must have passed since the last download_tenant call before
59 : /// calling it again. This is approximately the time by which local data is allowed
60 : /// to fall behind remote data.
61 : ///
62 : /// TODO: this should just be a default, and the actual period should be controlled
63 : /// via the heatmap itself
64 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
65 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
66 :
67 0 : pub(super) async fn downloader_task(
68 0 : tenant_manager: Arc<TenantManager>,
69 0 : remote_storage: GenericRemoteStorage,
70 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
71 0 : background_jobs_can_start: Barrier,
72 0 : cancel: CancellationToken,
73 0 : ) {
74 0 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
75 0 :
76 0 : let generator = SecondaryDownloader {
77 0 : tenant_manager,
78 0 : remote_storage,
79 0 : };
80 0 : let mut scheduler = Scheduler::new(generator, concurrency);
81 0 :
82 0 : scheduler
83 0 : .run(command_queue, background_jobs_can_start, cancel)
84 0 : .instrument(info_span!("secondary_downloads"))
85 0 : .await
86 0 : }
87 :
88 : struct SecondaryDownloader {
89 : tenant_manager: Arc<TenantManager>,
90 : remote_storage: GenericRemoteStorage,
91 : }
92 :
93 0 : #[derive(Debug, Clone)]
94 : pub(super) struct OnDiskState {
95 : metadata: LayerFileMetadata,
96 : access_time: SystemTime,
97 : }
98 :
99 : impl OnDiskState {
100 0 : fn new(
101 0 : _conf: &'static PageServerConf,
102 0 : _tenant_shard_id: &TenantShardId,
103 0 : _imeline_id: &TimelineId,
104 0 : _ame: LayerFileName,
105 0 : metadata: LayerFileMetadata,
106 0 : access_time: SystemTime,
107 0 : ) -> Self {
108 0 : Self {
109 0 : metadata,
110 0 : access_time,
111 0 : }
112 0 : }
113 : }
114 :
115 0 : #[derive(Debug, Clone, Default)]
116 : pub(super) struct SecondaryDetailTimeline {
117 : pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
118 :
119 : /// We remember when layers were evicted, to prevent re-downloading them.
120 : pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
121 : }
122 :
123 : /// This state is written by the secondary downloader, it is opaque
124 : /// to TenantManager
125 0 : #[derive(Debug)]
126 : pub(super) struct SecondaryDetail {
127 : pub(super) config: SecondaryLocationConfig,
128 :
129 : last_download: Option<Instant>,
130 : next_download: Option<Instant>,
131 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
132 : }
133 :
134 : /// Helper for logging SystemTime
135 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
136 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
137 0 : datetime.format("%d/%m/%Y %T")
138 0 : }
139 :
140 : impl SecondaryDetail {
141 0 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
142 0 : Self {
143 0 : config,
144 0 : last_download: None,
145 0 : next_download: None,
146 0 : timelines: HashMap::new(),
147 0 : }
148 0 : }
149 :
150 : /// Additionally returns the total number of layers, used for more stable relative access time
151 : /// based eviction.
152 0 : pub(super) fn get_layers_for_eviction(
153 0 : &self,
154 0 : parent: &Arc<SecondaryTenant>,
155 0 : ) -> (DiskUsageEvictionInfo, usize) {
156 0 : let mut result = DiskUsageEvictionInfo::default();
157 0 : let mut total_layers = 0;
158 :
159 0 : for (timeline_id, timeline_detail) in &self.timelines {
160 0 : result
161 0 : .resident_layers
162 0 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
163 0 : EvictionCandidate {
164 0 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
165 0 : secondary_tenant: parent.clone(),
166 0 : timeline_id: *timeline_id,
167 0 : name: name.clone(),
168 0 : metadata: ods.metadata.clone(),
169 0 : }),
170 0 : last_activity_ts: ods.access_time,
171 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
172 0 : }
173 0 : }));
174 0 :
175 0 : // total might be missing currently downloading layers, but as a lower than actual
176 0 : // value it is good enough approximation.
177 0 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
178 0 : }
179 0 : result.max_layer_size = result
180 0 : .resident_layers
181 0 : .iter()
182 0 : .map(|l| l.layer.get_file_size())
183 0 : .max();
184 0 :
185 0 : tracing::debug!(
186 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
187 0 : parent.get_tenant_shard_id(),
188 0 : self.timelines.len(),
189 0 : result.resident_layers.len()
190 0 : );
191 :
192 0 : (result, total_layers)
193 0 : }
194 : }
195 :
196 : struct PendingDownload {
197 : secondary_state: Arc<SecondaryTenant>,
198 : last_download: Option<Instant>,
199 : target_time: Option<Instant>,
200 : period: Option<Duration>,
201 : }
202 :
203 : impl scheduler::PendingJob for PendingDownload {
204 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
205 0 : self.secondary_state.get_tenant_shard_id()
206 0 : }
207 : }
208 :
209 : struct RunningDownload {
210 : barrier: Barrier,
211 : }
212 :
213 : impl scheduler::RunningJob for RunningDownload {
214 0 : fn get_barrier(&self) -> Barrier {
215 0 : self.barrier.clone()
216 0 : }
217 : }
218 :
219 : struct CompleteDownload {
220 : secondary_state: Arc<SecondaryTenant>,
221 : completed_at: Instant,
222 : }
223 :
224 : impl scheduler::Completion for CompleteDownload {
225 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
226 0 : self.secondary_state.get_tenant_shard_id()
227 0 : }
228 : }
229 :
230 : type Scheduler = TenantBackgroundJobs<
231 : SecondaryDownloader,
232 : PendingDownload,
233 : RunningDownload,
234 : CompleteDownload,
235 : DownloadCommand,
236 : >;
237 :
238 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
239 : for SecondaryDownloader
240 : {
241 0 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
242 : fn on_completion(&mut self, completion: CompleteDownload) {
243 : let CompleteDownload {
244 : secondary_state,
245 : completed_at: _completed_at,
246 : } = completion;
247 :
248 0 : tracing::debug!("Secondary tenant download completed");
249 :
250 : // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
251 : // take priority to run again.
252 : let mut detail = secondary_state.detail.lock().unwrap();
253 : detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
254 : }
255 :
256 0 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
257 0 : let mut result = SchedulingResult {
258 0 : jobs: Vec::new(),
259 0 : want_interval: None,
260 0 : };
261 0 :
262 0 : // Step 1: identify some tenants that we may work on
263 0 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
264 0 : self.tenant_manager
265 0 : .foreach_secondary_tenants(|_id, secondary_state| {
266 0 : tenants.push(secondary_state.clone());
267 0 : });
268 0 :
269 0 : // Step 2: filter out tenants which are not yet elegible to run
270 0 : let now = Instant::now();
271 0 : result.jobs = tenants
272 0 : .into_iter()
273 0 : .filter_map(|secondary_tenant| {
274 0 : let (last_download, next_download) = {
275 0 : let mut detail = secondary_tenant.detail.lock().unwrap();
276 0 :
277 0 : if !detail.config.warm {
278 : // Downloads are disabled for this tenant
279 0 : detail.next_download = None;
280 0 : return None;
281 0 : }
282 0 :
283 0 : if detail.next_download.is_none() {
284 0 : // Initialize with a jitter: this spreads initial downloads on startup
285 0 : // or mass-attach across our freshen interval.
286 0 : let jittered_period =
287 0 : rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
288 0 : detail.next_download = Some(now.checked_add(jittered_period).expect(
289 0 : "Using our constant, which is known to be small compared with clock range",
290 0 : ));
291 0 : }
292 0 : (detail.last_download, detail.next_download.unwrap())
293 0 : };
294 0 :
295 0 : if now < next_download {
296 0 : Some(PendingDownload {
297 0 : secondary_state: secondary_tenant,
298 0 : last_download,
299 0 : target_time: Some(next_download),
300 0 : period: Some(DOWNLOAD_FRESHEN_INTERVAL),
301 0 : })
302 : } else {
303 0 : None
304 : }
305 0 : })
306 0 : .collect();
307 0 :
308 0 : // Step 3: sort by target execution time to run most urgent first.
309 0 : result.jobs.sort_by_key(|j| j.target_time);
310 0 :
311 0 : result
312 0 : }
313 :
314 0 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
315 0 : let tenant_shard_id = command.get_tenant_shard_id();
316 0 :
317 0 : let tenant = self
318 0 : .tenant_manager
319 0 : .get_secondary_tenant_shard(*tenant_shard_id);
320 0 : let Some(tenant) = tenant else {
321 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
322 : };
323 :
324 0 : Ok(PendingDownload {
325 0 : target_time: None,
326 0 : period: None,
327 0 : last_download: None,
328 0 : secondary_state: tenant,
329 0 : })
330 0 : }
331 :
332 0 : fn spawn(
333 0 : &mut self,
334 0 : job: PendingDownload,
335 0 : ) -> (
336 0 : RunningDownload,
337 0 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
338 0 : ) {
339 0 : let PendingDownload {
340 0 : secondary_state,
341 0 : last_download,
342 0 : target_time,
343 0 : period,
344 0 : } = job;
345 0 :
346 0 : let (completion, barrier) = utils::completion::channel();
347 0 : let remote_storage = self.remote_storage.clone();
348 0 : let conf = self.tenant_manager.get_conf();
349 0 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
350 0 : (RunningDownload { barrier }, Box::pin(async move {
351 0 : let _completion = completion;
352 0 :
353 0 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
354 0 : .download()
355 0 : .await
356 : {
357 : Err(UpdateError::NoData) => {
358 0 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
359 : },
360 : Err(UpdateError::NoSpace) => {
361 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
362 : }
363 : Err(UpdateError::Cancelled) => {
364 0 : tracing::debug!("Shut down while downloading");
365 : },
366 0 : Err(UpdateError::Deserialize(e)) => {
367 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
368 : },
369 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
370 0 : tracing::error!("Error while downloading tenant: {e}");
371 : },
372 0 : Ok(()) => {}
373 : };
374 :
375 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
376 :
377 : // If the job had a target execution time, we may check our final execution
378 : // time against that for observability purposes.
379 0 : if let (Some(target_time), Some(period)) = (target_time, period) {
380 : // Only track execution lag if this isn't our first download: otherwise, it is expected
381 : // that execution will have taken longer than our configured interval, for example
382 : // when starting up a pageserver and
383 0 : if last_download.is_some() {
384 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
385 0 : let elapsed = Instant::now().duration_since(target_time);
386 0 :
387 0 : warn_when_period_overrun(
388 0 : elapsed,
389 0 : period,
390 0 : BackgroundLoopKind::SecondaryDownload,
391 0 : );
392 0 : }
393 0 : }
394 :
395 0 : CompleteDownload {
396 0 : secondary_state,
397 0 : completed_at: Instant::now(),
398 0 : }
399 0 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
400 0 : }
401 : }
402 :
403 : /// This type is a convenience to group together the various functions involved in
404 : /// freshening a secondary tenant.
405 : struct TenantDownloader<'a> {
406 : conf: &'static PageServerConf,
407 : remote_storage: &'a GenericRemoteStorage,
408 : secondary_state: &'a SecondaryTenant,
409 : }
410 :
411 : /// Errors that may be encountered while updating a tenant
412 0 : #[derive(thiserror::Error, Debug)]
413 : enum UpdateError {
414 : #[error("No remote data found")]
415 : NoData,
416 : #[error("Insufficient local storage space")]
417 : NoSpace,
418 : #[error("Failed to download")]
419 : DownloadError(DownloadError),
420 : #[error(transparent)]
421 : Deserialize(#[from] serde_json::Error),
422 : #[error("Cancelled")]
423 : Cancelled,
424 : #[error(transparent)]
425 : Other(#[from] anyhow::Error),
426 : }
427 :
428 : impl From<DownloadError> for UpdateError {
429 0 : fn from(value: DownloadError) -> Self {
430 0 : match &value {
431 0 : DownloadError::Cancelled => Self::Cancelled,
432 0 : DownloadError::NotFound => Self::NoData,
433 0 : _ => Self::DownloadError(value),
434 : }
435 0 : }
436 : }
437 :
438 : impl From<std::io::Error> for UpdateError {
439 0 : fn from(value: std::io::Error) -> Self {
440 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
441 0 : UpdateError::NoSpace
442 0 : } else if value
443 0 : .get_ref()
444 0 : .and_then(|x| x.downcast_ref::<DownloadError>())
445 0 : .is_some()
446 : {
447 0 : UpdateError::from(DownloadError::from(value))
448 : } else {
449 : // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
450 0 : UpdateError::Other(anyhow::anyhow!(value))
451 : }
452 0 : }
453 : }
454 :
455 : impl<'a> TenantDownloader<'a> {
456 0 : fn new(
457 0 : conf: &'static PageServerConf,
458 0 : remote_storage: &'a GenericRemoteStorage,
459 0 : secondary_state: &'a SecondaryTenant,
460 0 : ) -> Self {
461 0 : Self {
462 0 : conf,
463 0 : remote_storage,
464 0 : secondary_state,
465 0 : }
466 0 : }
467 :
468 0 : async fn download(&self) -> Result<(), UpdateError> {
469 0 : debug_assert_current_span_has_tenant_id();
470 :
471 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
472 : // cover our access to local storage.
473 0 : let Ok(_guard) = self.secondary_state.gate.enter() else {
474 : // Shutting down
475 0 : return Ok(());
476 : };
477 :
478 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
479 : // Download the tenant's heatmap
480 0 : let heatmap_bytes = tokio::select!(
481 0 : bytes = self.download_heatmap() => {bytes?},
482 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
483 : );
484 :
485 0 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
486 :
487 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
488 : // layer metadata without having to re-download it.
489 0 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
490 0 :
491 0 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
492 0 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
493 0 : let heatmap_path_bg = heatmap_path.clone();
494 0 : VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
495 0 : .await
496 0 : .maybe_fatal_err(&context_msg)?;
497 :
498 0 : tracing::debug!("Wrote local heatmap to {}", heatmap_path);
499 :
500 : // Download the layers in the heatmap
501 0 : for timeline in heatmap.timelines {
502 0 : if self.secondary_state.cancel.is_cancelled() {
503 0 : return Ok(());
504 0 : }
505 0 :
506 0 : let timeline_id = timeline.timeline_id;
507 0 : self.download_timeline(timeline)
508 : .instrument(tracing::info_span!(
509 : "secondary_download_timeline",
510 : tenant_id=%tenant_shard_id.tenant_id,
511 0 : shard_id=%tenant_shard_id.shard_slug(),
512 : %timeline_id
513 : ))
514 0 : .await?;
515 : }
516 :
517 0 : Ok(())
518 0 : }
519 :
520 0 : async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
521 0 : debug_assert_current_span_has_tenant_id();
522 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
523 : // TODO: make download conditional on ETag having changed since last download
524 : // (https://github.com/neondatabase/neon/issues/6199)
525 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
526 :
527 0 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
528 0 : let cancel = &self.secondary_state.cancel;
529 :
530 0 : let heatmap_bytes = backoff::retry(
531 0 : || async {
532 0 : let download = self
533 0 : .remote_storage
534 0 : .download(&heatmap_path, cancel)
535 0 : .await
536 0 : .map_err(UpdateError::from)?;
537 0 : let mut heatmap_bytes = Vec::new();
538 0 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
539 0 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
540 0 : Ok(heatmap_bytes)
541 0 : },
542 0 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
543 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
544 0 : FAILED_REMOTE_OP_RETRIES,
545 0 : "download heatmap",
546 0 : cancel,
547 0 : )
548 0 : .await
549 0 : .ok_or_else(|| UpdateError::Cancelled)
550 0 : .and_then(|x| x)?;
551 :
552 0 : SECONDARY_MODE.download_heatmap.inc();
553 0 :
554 0 : Ok(heatmap_bytes)
555 0 : }
556 :
557 0 : async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
558 0 : debug_assert_current_span_has_tenant_and_timeline_id();
559 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
560 0 : let timeline_path = self
561 0 : .conf
562 0 : .timeline_path(tenant_shard_id, &timeline.timeline_id);
563 0 :
564 0 : // Accumulate updates to the state
565 0 : let mut touched = Vec::new();
566 0 :
567 0 : // Clone a view of what layers already exist on disk
568 0 : let timeline_state = self
569 0 : .secondary_state
570 0 : .detail
571 0 : .lock()
572 0 : .unwrap()
573 0 : .timelines
574 0 : .get(&timeline.timeline_id)
575 0 : .cloned();
576 :
577 0 : let timeline_state = match timeline_state {
578 0 : Some(t) => t,
579 : None => {
580 : // We have no existing state: need to scan local disk for layers first.
581 0 : let timeline_state =
582 0 : init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
583 :
584 : // Re-acquire detail lock now that we're done with async load from local FS
585 0 : self.secondary_state
586 0 : .detail
587 0 : .lock()
588 0 : .unwrap()
589 0 : .timelines
590 0 : .insert(timeline.timeline_id, timeline_state.clone());
591 0 : timeline_state
592 : }
593 : };
594 :
595 0 : let layers_in_heatmap = timeline
596 0 : .layers
597 0 : .iter()
598 0 : .map(|l| &l.name)
599 0 : .collect::<HashSet<_>>();
600 0 : let layers_on_disk = timeline_state
601 0 : .on_disk_layers
602 0 : .iter()
603 0 : .map(|l| l.0)
604 0 : .collect::<HashSet<_>>();
605 :
606 : // Remove on-disk layers that are no longer present in heatmap
607 0 : for layer in layers_on_disk.difference(&layers_in_heatmap) {
608 0 : let local_path = timeline_path.join(layer.to_string());
609 0 : tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
610 0 : tokio::fs::remove_file(&local_path)
611 0 : .await
612 0 : .or_else(fs_ext::ignore_not_found)
613 0 : .maybe_fatal_err("Removing secondary layer")?;
614 : }
615 :
616 : // Download heatmap layers that are not present on local disk, or update their
617 : // access time if they are already present.
618 0 : for layer in timeline.layers {
619 0 : if self.secondary_state.cancel.is_cancelled() {
620 0 : return Ok(());
621 0 : }
622 :
623 : // Existing on-disk layers: just update their access time.
624 0 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
625 0 : tracing::debug!("Layer {} is already on disk", layer.name);
626 0 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
627 0 : || on_disk.access_time != layer.access_time
628 : {
629 : // We already have this layer on disk. Update its access time.
630 0 : tracing::debug!(
631 0 : "Access time updated for layer {}: {} -> {}",
632 0 : layer.name,
633 0 : strftime(&on_disk.access_time),
634 0 : strftime(&layer.access_time)
635 0 : );
636 0 : touched.push(layer);
637 0 : }
638 0 : continue;
639 : } else {
640 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
641 : }
642 :
643 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
644 : // recently than it was evicted.
645 0 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
646 0 : if &layer.access_time > evicted_at {
647 0 : tracing::info!(
648 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
649 0 : layer.name,
650 0 : strftime(&layer.access_time),
651 0 : strftime(evicted_at)
652 0 : );
653 : } else {
654 0 : tracing::trace!(
655 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
656 0 : layer.name,
657 0 : strftime(&layer.access_time),
658 0 : strftime(evicted_at)
659 0 : );
660 0 : continue;
661 : }
662 0 : }
663 :
664 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
665 0 : let downloaded_bytes = match download_layer_file(
666 0 : self.conf,
667 0 : self.remote_storage,
668 0 : *tenant_shard_id,
669 0 : timeline.timeline_id,
670 0 : &layer.name,
671 0 : &LayerFileMetadata::from(&layer.metadata),
672 0 : &self.secondary_state.cancel,
673 0 : )
674 0 : .await
675 : {
676 0 : Ok(bytes) => bytes,
677 : Err(DownloadError::NotFound) => {
678 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
679 : // This is harmless: continue to download the next layer. It is expected during compaction
680 : // GC.
681 0 : tracing::debug!(
682 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
683 0 : layer.name
684 0 : );
685 0 : continue;
686 : }
687 0 : Err(e) => return Err(e.into()),
688 : };
689 :
690 0 : if downloaded_bytes != layer.metadata.file_size {
691 0 : let local_path = timeline_path.join(layer.name.to_string());
692 :
693 0 : tracing::warn!(
694 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
695 0 : layer.name,
696 0 : downloaded_bytes,
697 0 : layer.metadata.file_size
698 0 : );
699 :
700 0 : tokio::fs::remove_file(&local_path)
701 0 : .await
702 0 : .or_else(fs_ext::ignore_not_found)?;
703 0 : }
704 :
705 0 : SECONDARY_MODE.download_layer.inc();
706 0 : touched.push(layer)
707 : }
708 :
709 : // Write updates to state to record layers we just downloaded or touched.
710 : {
711 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
712 0 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
713 :
714 0 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
715 :
716 0 : for t in touched {
717 : use std::collections::hash_map::Entry;
718 0 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
719 0 : Entry::Occupied(mut v) => {
720 0 : v.get_mut().access_time = t.access_time;
721 0 : }
722 0 : Entry::Vacant(e) => {
723 0 : e.insert(OnDiskState::new(
724 0 : self.conf,
725 0 : tenant_shard_id,
726 0 : &timeline.timeline_id,
727 0 : t.name,
728 0 : LayerFileMetadata::from(&t.metadata),
729 0 : t.access_time,
730 0 : ));
731 0 : }
732 : }
733 : }
734 : }
735 :
736 0 : Ok(())
737 0 : }
738 : }
739 :
740 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
741 0 : async fn init_timeline_state(
742 0 : conf: &'static PageServerConf,
743 0 : tenant_shard_id: &TenantShardId,
744 0 : heatmap: &HeatMapTimeline,
745 0 : ) -> SecondaryDetailTimeline {
746 0 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
747 0 : let mut detail = SecondaryDetailTimeline::default();
748 :
749 0 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
750 0 : Ok(d) => d,
751 0 : Err(e) => {
752 0 : if e.kind() == std::io::ErrorKind::NotFound {
753 0 : let context = format!("Creating timeline directory {timeline_path}");
754 0 : tracing::info!("{}", context);
755 0 : tokio::fs::create_dir_all(&timeline_path)
756 0 : .await
757 0 : .fatal_err(&context);
758 0 :
759 0 : // No entries to report: drop out.
760 0 : return detail;
761 : } else {
762 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
763 : }
764 : }
765 : };
766 :
767 : // As we iterate through layers found on disk, we will look up their metadata from this map.
768 : // Layers not present in metadata will be discarded.
769 0 : let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
770 0 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
771 :
772 0 : while let Some(dentry) = dir
773 0 : .next_entry()
774 0 : .await
775 0 : .fatal_err(&format!("Listing {timeline_path}"))
776 : {
777 0 : let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
778 0 : tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
779 0 : continue;
780 : };
781 0 : let local_meta = dentry
782 0 : .metadata()
783 0 : .await
784 0 : .fatal_err(&format!("Read metadata on {}", file_path));
785 0 :
786 0 : let file_name = file_path.file_name().expect("created it from the dentry");
787 0 : if file_name == METADATA_FILE_NAME {
788 : // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
789 0 : warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config");
790 0 : continue;
791 0 : } else if crate::is_temporary(&file_path) {
792 : // Temporary files are frequently left behind from restarting during downloads
793 0 : tracing::info!("Cleaning up temporary file {file_path}");
794 0 : if let Err(e) = tokio::fs::remove_file(&file_path)
795 0 : .await
796 0 : .or_else(fs_ext::ignore_not_found)
797 : {
798 0 : tracing::error!("Failed to remove temporary file {file_path}: {e}");
799 0 : }
800 0 : continue;
801 0 : }
802 0 :
803 0 : match LayerFileName::from_str(file_name) {
804 0 : Ok(name) => {
805 0 : let remote_meta = heatmap_metadata.get(&name);
806 0 : match remote_meta {
807 0 : Some(remote_meta) => {
808 0 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
809 0 : if local_meta.len() != remote_meta.metadata.file_size {
810 : // This should not happen, because we do crashsafe write-then-rename when downloading
811 : // layers, and layers in remote storage are immutable. Remove the local file because
812 : // we cannot trust it.
813 0 : tracing::warn!(
814 0 : "Removing local layer {name} with unexpected local size {} != {}",
815 0 : local_meta.len(),
816 0 : remote_meta.metadata.file_size
817 0 : );
818 0 : } else {
819 0 : // We expect the access time to be initialized immediately afterwards, when
820 0 : // the latest heatmap is applied to the state.
821 0 : detail.on_disk_layers.insert(
822 0 : name.clone(),
823 0 : OnDiskState::new(
824 0 : conf,
825 0 : tenant_shard_id,
826 0 : &heatmap.timeline_id,
827 0 : name,
828 0 : LayerFileMetadata::from(&remote_meta.metadata),
829 0 : remote_meta.access_time,
830 0 : ),
831 0 : );
832 0 : }
833 : }
834 : None => {
835 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
836 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
837 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
838 0 : tracing::info!(
839 0 : "Removing secondary local layer {} because it's absent in heatmap",
840 0 : name
841 0 : );
842 0 : tokio::fs::remove_file(&dentry.path())
843 0 : .await
844 0 : .or_else(fs_ext::ignore_not_found)
845 0 : .fatal_err(&format!(
846 0 : "Removing layer {}",
847 0 : dentry.path().to_string_lossy()
848 0 : ));
849 : }
850 : }
851 : }
852 : Err(_) => {
853 : // Ignore it.
854 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
855 : }
856 : }
857 : }
858 :
859 0 : detail
860 0 : }
|