Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : disk_usage_eviction_task::{
12 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
13 : },
14 : metrics::SECONDARY_MODE,
15 : tenant::{
16 : config::SecondaryLocationConfig,
17 : debug_assert_current_span_has_tenant_and_timeline_id,
18 : remote_timeline_client::{
19 : index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
20 : },
21 : span::debug_assert_current_span_has_tenant_id,
22 : storage_layer::LayerFileName,
23 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
24 : },
25 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
26 : METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
27 : };
28 :
29 : use super::{
30 : heatmap::HeatMapLayer,
31 : scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
32 : SecondaryTenant,
33 : };
34 :
35 : use crate::tenant::{
36 : mgr::TenantManager,
37 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
38 : };
39 :
40 : use camino::Utf8PathBuf;
41 : use chrono::format::{DelayedFormat, StrftimeItems};
42 : use futures::Future;
43 : use pageserver_api::shard::TenantShardId;
44 : use rand::Rng;
45 : use remote_storage::{DownloadError, GenericRemoteStorage};
46 :
47 : use tokio_util::sync::CancellationToken;
48 : use tracing::{info_span, instrument, Instrument};
49 : use utils::{
50 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
51 : };
52 :
53 : use super::{
54 : heatmap::{HeatMapTenant, HeatMapTimeline},
55 : CommandRequest, DownloadCommand,
56 : };
57 :
58 : /// For each tenant, how long must have passed since the last download_tenant call before
59 : /// calling it again. This is approximately the time by which local data is allowed
60 : /// to fall behind remote data.
61 : ///
62 : /// TODO: this should just be a default, and the actual period should be controlled
63 : /// via the heatmap itself
64 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
65 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
66 :
67 0 : pub(super) async fn downloader_task(
68 0 : tenant_manager: Arc<TenantManager>,
69 0 : remote_storage: GenericRemoteStorage,
70 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
71 0 : background_jobs_can_start: Barrier,
72 0 : cancel: CancellationToken,
73 0 : ) {
74 0 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
75 0 :
76 0 : let generator = SecondaryDownloader {
77 0 : tenant_manager,
78 0 : remote_storage,
79 0 : };
80 0 : let mut scheduler = Scheduler::new(generator, concurrency);
81 0 :
82 0 : scheduler
83 0 : .run(command_queue, background_jobs_can_start, cancel)
84 0 : .instrument(info_span!("secondary_downloads"))
85 0 : .await
86 0 : }
87 :
88 : struct SecondaryDownloader {
89 : tenant_manager: Arc<TenantManager>,
90 : remote_storage: GenericRemoteStorage,
91 : }
92 :
93 0 : #[derive(Debug, Clone)]
94 : pub(super) struct OnDiskState {
95 : metadata: LayerFileMetadata,
96 : access_time: SystemTime,
97 : }
98 :
99 : impl OnDiskState {
100 0 : fn new(
101 0 : _conf: &'static PageServerConf,
102 0 : _tenant_shard_id: &TenantShardId,
103 0 : _imeline_id: &TimelineId,
104 0 : _ame: LayerFileName,
105 0 : metadata: LayerFileMetadata,
106 0 : access_time: SystemTime,
107 0 : ) -> Self {
108 0 : Self {
109 0 : metadata,
110 0 : access_time,
111 0 : }
112 0 : }
113 : }
114 :
115 0 : #[derive(Debug, Clone, Default)]
116 : pub(super) struct SecondaryDetailTimeline {
117 : pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
118 :
119 : /// We remember when layers were evicted, to prevent re-downloading them.
120 : pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
121 : }
122 :
123 : /// This state is written by the secondary downloader, it is opaque
124 : /// to TenantManager
125 0 : #[derive(Debug)]
126 : pub(super) struct SecondaryDetail {
127 : pub(super) config: SecondaryLocationConfig,
128 :
129 : last_download: Option<Instant>,
130 : next_download: Option<Instant>,
131 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
132 : }
133 :
134 : /// Helper for logging SystemTime
135 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
136 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
137 0 : datetime.format("%d/%m/%Y %T")
138 0 : }
139 :
140 : impl SecondaryDetail {
141 0 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
142 0 : Self {
143 0 : config,
144 0 : last_download: None,
145 0 : next_download: None,
146 0 : timelines: HashMap::new(),
147 0 : }
148 0 : }
149 :
150 : /// Additionally returns the total number of layers, used for more stable relative access time
151 : /// based eviction.
152 0 : pub(super) fn get_layers_for_eviction(
153 0 : &self,
154 0 : parent: &Arc<SecondaryTenant>,
155 0 : ) -> (DiskUsageEvictionInfo, usize) {
156 0 : let mut result = DiskUsageEvictionInfo::default();
157 0 : let mut total_layers = 0;
158 :
159 0 : for (timeline_id, timeline_detail) in &self.timelines {
160 0 : result
161 0 : .resident_layers
162 0 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
163 0 : EvictionCandidate {
164 0 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
165 0 : secondary_tenant: parent.clone(),
166 0 : timeline_id: *timeline_id,
167 0 : name: name.clone(),
168 0 : metadata: ods.metadata.clone(),
169 0 : }),
170 0 : last_activity_ts: ods.access_time,
171 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
172 0 : }
173 0 : }));
174 0 :
175 0 : // total might be missing currently downloading layers, but as a lower than actual
176 0 : // value it is good enough approximation.
177 0 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
178 0 : }
179 0 : result.max_layer_size = result
180 0 : .resident_layers
181 0 : .iter()
182 0 : .map(|l| l.layer.get_file_size())
183 0 : .max();
184 0 :
185 0 : tracing::debug!(
186 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
187 0 : parent.get_tenant_shard_id(),
188 0 : self.timelines.len(),
189 0 : result.resident_layers.len()
190 0 : );
191 :
192 0 : (result, total_layers)
193 0 : }
194 : }
195 :
196 : struct PendingDownload {
197 : secondary_state: Arc<SecondaryTenant>,
198 : last_download: Option<Instant>,
199 : target_time: Option<Instant>,
200 : period: Option<Duration>,
201 : }
202 :
203 : impl scheduler::PendingJob for PendingDownload {
204 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
205 0 : self.secondary_state.get_tenant_shard_id()
206 0 : }
207 : }
208 :
209 : struct RunningDownload {
210 : barrier: Barrier,
211 : }
212 :
213 : impl scheduler::RunningJob for RunningDownload {
214 0 : fn get_barrier(&self) -> Barrier {
215 0 : self.barrier.clone()
216 0 : }
217 : }
218 :
219 : struct CompleteDownload {
220 : secondary_state: Arc<SecondaryTenant>,
221 : completed_at: Instant,
222 : }
223 :
224 : impl scheduler::Completion for CompleteDownload {
225 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
226 0 : self.secondary_state.get_tenant_shard_id()
227 0 : }
228 : }
229 :
230 : type Scheduler = TenantBackgroundJobs<
231 : SecondaryDownloader,
232 : PendingDownload,
233 : RunningDownload,
234 : CompleteDownload,
235 : DownloadCommand,
236 : >;
237 :
238 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
239 : for SecondaryDownloader
240 : {
241 0 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
242 : fn on_completion(&mut self, completion: CompleteDownload) {
243 : let CompleteDownload {
244 : secondary_state,
245 : completed_at: _completed_at,
246 : } = completion;
247 :
248 0 : tracing::debug!("Secondary tenant download completed");
249 :
250 : // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
251 : // take priority to run again.
252 : let mut detail = secondary_state.detail.lock().unwrap();
253 : detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
254 : }
255 :
256 0 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
257 0 : let mut result = SchedulingResult {
258 0 : jobs: Vec::new(),
259 0 : want_interval: None,
260 0 : };
261 0 :
262 0 : // Step 1: identify some tenants that we may work on
263 0 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
264 0 : self.tenant_manager
265 0 : .foreach_secondary_tenants(|_id, secondary_state| {
266 0 : tenants.push(secondary_state.clone());
267 0 : });
268 0 :
269 0 : // Step 2: filter out tenants which are not yet elegible to run
270 0 : let now = Instant::now();
271 0 : result.jobs = tenants
272 0 : .into_iter()
273 0 : .filter_map(|secondary_tenant| {
274 0 : let (last_download, next_download) = {
275 0 : let mut detail = secondary_tenant.detail.lock().unwrap();
276 0 :
277 0 : if !detail.config.warm {
278 : // Downloads are disabled for this tenant
279 0 : detail.next_download = None;
280 0 : return None;
281 0 : }
282 0 :
283 0 : if detail.next_download.is_none() {
284 0 : // Initialize with a jitter: this spreads initial downloads on startup
285 0 : // or mass-attach across our freshen interval.
286 0 : let jittered_period =
287 0 : rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
288 0 : detail.next_download = Some(now.checked_add(jittered_period).expect(
289 0 : "Using our constant, which is known to be small compared with clock range",
290 0 : ));
291 0 : }
292 0 : (detail.last_download, detail.next_download.unwrap())
293 0 : };
294 0 :
295 0 : if now < next_download {
296 0 : Some(PendingDownload {
297 0 : secondary_state: secondary_tenant,
298 0 : last_download,
299 0 : target_time: Some(next_download),
300 0 : period: Some(DOWNLOAD_FRESHEN_INTERVAL),
301 0 : })
302 : } else {
303 0 : None
304 : }
305 0 : })
306 0 : .collect();
307 0 :
308 0 : // Step 3: sort by target execution time to run most urgent first.
309 0 : result.jobs.sort_by_key(|j| j.target_time);
310 0 :
311 0 : result
312 0 : }
313 :
314 0 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
315 0 : let tenant_shard_id = command.get_tenant_shard_id();
316 0 :
317 0 : let tenant = self
318 0 : .tenant_manager
319 0 : .get_secondary_tenant_shard(*tenant_shard_id);
320 0 : let Some(tenant) = tenant else {
321 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
322 : };
323 :
324 0 : Ok(PendingDownload {
325 0 : target_time: None,
326 0 : period: None,
327 0 : last_download: None,
328 0 : secondary_state: tenant,
329 0 : })
330 0 : }
331 :
332 0 : fn spawn(
333 0 : &mut self,
334 0 : job: PendingDownload,
335 0 : ) -> (
336 0 : RunningDownload,
337 0 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
338 0 : ) {
339 0 : let PendingDownload {
340 0 : secondary_state,
341 0 : last_download,
342 0 : target_time,
343 0 : period,
344 0 : } = job;
345 0 :
346 0 : let (completion, barrier) = utils::completion::channel();
347 0 : let remote_storage = self.remote_storage.clone();
348 0 : let conf = self.tenant_manager.get_conf();
349 0 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
350 0 : (RunningDownload { barrier }, Box::pin(async move {
351 0 : let _completion = completion;
352 0 :
353 0 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
354 0 : .download()
355 0 : .await
356 : {
357 : Err(UpdateError::NoData) => {
358 0 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
359 : },
360 : Err(UpdateError::NoSpace) => {
361 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
362 : }
363 : Err(UpdateError::Cancelled) => {
364 0 : tracing::debug!("Shut down while downloading");
365 : },
366 0 : Err(UpdateError::Deserialize(e)) => {
367 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
368 : },
369 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
370 0 : tracing::error!("Error while downloading tenant: {e}");
371 : },
372 0 : Ok(()) => {}
373 : };
374 :
375 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
376 :
377 : // If the job had a target execution time, we may check our final execution
378 : // time against that for observability purposes.
379 0 : if let (Some(target_time), Some(period)) = (target_time, period) {
380 : // Only track execution lag if this isn't our first download: otherwise, it is expected
381 : // that execution will have taken longer than our configured interval, for example
382 : // when starting up a pageserver and
383 0 : if last_download.is_some() {
384 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
385 0 : let elapsed = Instant::now().duration_since(target_time);
386 0 :
387 0 : warn_when_period_overrun(
388 0 : elapsed,
389 0 : period,
390 0 : BackgroundLoopKind::SecondaryDownload,
391 0 : );
392 0 : }
393 0 : }
394 :
395 0 : CompleteDownload {
396 0 : secondary_state,
397 0 : completed_at: Instant::now(),
398 0 : }
399 0 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
400 0 : }
401 : }
402 :
403 : /// This type is a convenience to group together the various functions involved in
404 : /// freshening a secondary tenant.
405 : struct TenantDownloader<'a> {
406 : conf: &'static PageServerConf,
407 : remote_storage: &'a GenericRemoteStorage,
408 : secondary_state: &'a SecondaryTenant,
409 : }
410 :
411 : /// Errors that may be encountered while updating a tenant
412 0 : #[derive(thiserror::Error, Debug)]
413 : enum UpdateError {
414 : #[error("No remote data found")]
415 : NoData,
416 : #[error("Insufficient local storage space")]
417 : NoSpace,
418 : #[error("Failed to download")]
419 : DownloadError(DownloadError),
420 : #[error(transparent)]
421 : Deserialize(#[from] serde_json::Error),
422 : #[error("Cancelled")]
423 : Cancelled,
424 : #[error(transparent)]
425 : Other(#[from] anyhow::Error),
426 : }
427 :
428 : impl From<DownloadError> for UpdateError {
429 0 : fn from(value: DownloadError) -> Self {
430 0 : match &value {
431 0 : DownloadError::Cancelled => Self::Cancelled,
432 0 : DownloadError::NotFound => Self::NoData,
433 0 : _ => Self::DownloadError(value),
434 : }
435 0 : }
436 : }
437 :
438 : impl From<std::io::Error> for UpdateError {
439 0 : fn from(value: std::io::Error) -> Self {
440 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
441 0 : UpdateError::NoSpace
442 0 : } else if value
443 0 : .get_ref()
444 0 : .and_then(|x| x.downcast_ref::<DownloadError>())
445 0 : .is_some()
446 : {
447 0 : UpdateError::from(DownloadError::from(value))
448 : } else {
449 : // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
450 0 : UpdateError::Other(anyhow::anyhow!(value))
451 : }
452 0 : }
453 : }
454 :
455 : impl<'a> TenantDownloader<'a> {
456 0 : fn new(
457 0 : conf: &'static PageServerConf,
458 0 : remote_storage: &'a GenericRemoteStorage,
459 0 : secondary_state: &'a SecondaryTenant,
460 0 : ) -> Self {
461 0 : Self {
462 0 : conf,
463 0 : remote_storage,
464 0 : secondary_state,
465 0 : }
466 0 : }
467 :
468 0 : async fn download(&self) -> Result<(), UpdateError> {
469 0 : debug_assert_current_span_has_tenant_id();
470 :
471 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
472 : // cover our access to local storage.
473 0 : let Ok(_guard) = self.secondary_state.gate.enter() else {
474 : // Shutting down
475 0 : return Ok(());
476 : };
477 :
478 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
479 : // Download the tenant's heatmap
480 0 : let heatmap_bytes = tokio::select!(
481 0 : bytes = self.download_heatmap() => {bytes?},
482 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
483 : );
484 :
485 0 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
486 :
487 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
488 : // layer metadata without having to re-download it.
489 0 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
490 0 :
491 0 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
492 0 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
493 0 : let heatmap_path_bg = heatmap_path.clone();
494 0 : tokio::task::spawn_blocking(move || {
495 0 : tokio::runtime::Handle::current().block_on(async move {
496 0 : VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
497 0 : })
498 0 : })
499 0 : .await
500 0 : .expect("Blocking task is never aborted")
501 0 : .maybe_fatal_err(&context_msg)?;
502 :
503 0 : tracing::debug!("Wrote local heatmap to {}", heatmap_path);
504 :
505 : // Download the layers in the heatmap
506 0 : for timeline in heatmap.timelines {
507 0 : if self.secondary_state.cancel.is_cancelled() {
508 0 : return Ok(());
509 0 : }
510 0 :
511 0 : let timeline_id = timeline.timeline_id;
512 0 : self.download_timeline(timeline)
513 : .instrument(tracing::info_span!(
514 : "secondary_download_timeline",
515 : tenant_id=%tenant_shard_id.tenant_id,
516 0 : shard_id=%tenant_shard_id.shard_slug(),
517 : %timeline_id
518 : ))
519 0 : .await?;
520 : }
521 :
522 0 : Ok(())
523 0 : }
524 :
525 0 : async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
526 0 : debug_assert_current_span_has_tenant_id();
527 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
528 : // TODO: make download conditional on ETag having changed since last download
529 : // (https://github.com/neondatabase/neon/issues/6199)
530 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
531 :
532 0 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
533 0 : let cancel = &self.secondary_state.cancel;
534 :
535 0 : let heatmap_bytes = backoff::retry(
536 0 : || async {
537 0 : let download = self
538 0 : .remote_storage
539 0 : .download(&heatmap_path, cancel)
540 0 : .await
541 0 : .map_err(UpdateError::from)?;
542 0 : let mut heatmap_bytes = Vec::new();
543 0 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
544 0 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
545 0 : Ok(heatmap_bytes)
546 0 : },
547 0 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
548 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
549 0 : FAILED_REMOTE_OP_RETRIES,
550 0 : "download heatmap",
551 0 : cancel,
552 0 : )
553 0 : .await
554 0 : .ok_or_else(|| UpdateError::Cancelled)
555 0 : .and_then(|x| x)?;
556 :
557 0 : SECONDARY_MODE.download_heatmap.inc();
558 0 :
559 0 : Ok(heatmap_bytes)
560 0 : }
561 :
562 0 : async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
563 0 : debug_assert_current_span_has_tenant_and_timeline_id();
564 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
565 0 : let timeline_path = self
566 0 : .conf
567 0 : .timeline_path(tenant_shard_id, &timeline.timeline_id);
568 0 :
569 0 : // Accumulate updates to the state
570 0 : let mut touched = Vec::new();
571 0 :
572 0 : // Clone a view of what layers already exist on disk
573 0 : let timeline_state = self
574 0 : .secondary_state
575 0 : .detail
576 0 : .lock()
577 0 : .unwrap()
578 0 : .timelines
579 0 : .get(&timeline.timeline_id)
580 0 : .cloned();
581 :
582 0 : let timeline_state = match timeline_state {
583 0 : Some(t) => t,
584 : None => {
585 : // We have no existing state: need to scan local disk for layers first.
586 0 : let timeline_state =
587 0 : init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
588 :
589 : // Re-acquire detail lock now that we're done with async load from local FS
590 0 : self.secondary_state
591 0 : .detail
592 0 : .lock()
593 0 : .unwrap()
594 0 : .timelines
595 0 : .insert(timeline.timeline_id, timeline_state.clone());
596 0 : timeline_state
597 : }
598 : };
599 :
600 0 : let layers_in_heatmap = timeline
601 0 : .layers
602 0 : .iter()
603 0 : .map(|l| &l.name)
604 0 : .collect::<HashSet<_>>();
605 0 : let layers_on_disk = timeline_state
606 0 : .on_disk_layers
607 0 : .iter()
608 0 : .map(|l| l.0)
609 0 : .collect::<HashSet<_>>();
610 :
611 : // Remove on-disk layers that are no longer present in heatmap
612 0 : for layer in layers_on_disk.difference(&layers_in_heatmap) {
613 0 : let local_path = timeline_path.join(layer.to_string());
614 0 : tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
615 0 : tokio::fs::remove_file(&local_path)
616 0 : .await
617 0 : .or_else(fs_ext::ignore_not_found)
618 0 : .maybe_fatal_err("Removing secondary layer")?;
619 : }
620 :
621 : // Download heatmap layers that are not present on local disk, or update their
622 : // access time if they are already present.
623 0 : for layer in timeline.layers {
624 0 : if self.secondary_state.cancel.is_cancelled() {
625 0 : return Ok(());
626 0 : }
627 :
628 : // Existing on-disk layers: just update their access time.
629 0 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
630 0 : tracing::debug!("Layer {} is already on disk", layer.name);
631 0 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
632 0 : || on_disk.access_time != layer.access_time
633 : {
634 : // We already have this layer on disk. Update its access time.
635 0 : tracing::debug!(
636 0 : "Access time updated for layer {}: {} -> {}",
637 0 : layer.name,
638 0 : strftime(&on_disk.access_time),
639 0 : strftime(&layer.access_time)
640 0 : );
641 0 : touched.push(layer);
642 0 : }
643 0 : continue;
644 : } else {
645 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
646 : }
647 :
648 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
649 : // recently than it was evicted.
650 0 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
651 0 : if &layer.access_time > evicted_at {
652 0 : tracing::info!(
653 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
654 0 : layer.name,
655 0 : strftime(&layer.access_time),
656 0 : strftime(evicted_at)
657 0 : );
658 : } else {
659 0 : tracing::trace!(
660 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
661 0 : layer.name,
662 0 : strftime(&layer.access_time),
663 0 : strftime(evicted_at)
664 0 : );
665 0 : continue;
666 : }
667 0 : }
668 :
669 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
670 0 : let downloaded_bytes = match download_layer_file(
671 0 : self.conf,
672 0 : self.remote_storage,
673 0 : *tenant_shard_id,
674 0 : timeline.timeline_id,
675 0 : &layer.name,
676 0 : &LayerFileMetadata::from(&layer.metadata),
677 0 : &self.secondary_state.cancel,
678 0 : )
679 0 : .await
680 : {
681 0 : Ok(bytes) => bytes,
682 : Err(DownloadError::NotFound) => {
683 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
684 : // This is harmless: continue to download the next layer. It is expected during compaction
685 : // GC.
686 0 : tracing::debug!(
687 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
688 0 : layer.name
689 0 : );
690 0 : continue;
691 : }
692 0 : Err(e) => return Err(e.into()),
693 : };
694 :
695 0 : if downloaded_bytes != layer.metadata.file_size {
696 0 : let local_path = timeline_path.join(layer.name.to_string());
697 :
698 0 : tracing::warn!(
699 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
700 0 : layer.name,
701 0 : downloaded_bytes,
702 0 : layer.metadata.file_size
703 0 : );
704 :
705 0 : tokio::fs::remove_file(&local_path)
706 0 : .await
707 0 : .or_else(fs_ext::ignore_not_found)?;
708 0 : }
709 :
710 0 : SECONDARY_MODE.download_layer.inc();
711 0 : touched.push(layer)
712 : }
713 :
714 : // Write updates to state to record layers we just downloaded or touched.
715 : {
716 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
717 0 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
718 :
719 0 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
720 :
721 0 : for t in touched {
722 : use std::collections::hash_map::Entry;
723 0 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
724 0 : Entry::Occupied(mut v) => {
725 0 : v.get_mut().access_time = t.access_time;
726 0 : }
727 0 : Entry::Vacant(e) => {
728 0 : e.insert(OnDiskState::new(
729 0 : self.conf,
730 0 : tenant_shard_id,
731 0 : &timeline.timeline_id,
732 0 : t.name,
733 0 : LayerFileMetadata::from(&t.metadata),
734 0 : t.access_time,
735 0 : ));
736 0 : }
737 : }
738 : }
739 : }
740 :
741 0 : Ok(())
742 0 : }
743 : }
744 :
745 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
746 0 : async fn init_timeline_state(
747 0 : conf: &'static PageServerConf,
748 0 : tenant_shard_id: &TenantShardId,
749 0 : heatmap: &HeatMapTimeline,
750 0 : ) -> SecondaryDetailTimeline {
751 0 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
752 0 : let mut detail = SecondaryDetailTimeline::default();
753 :
754 0 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
755 0 : Ok(d) => d,
756 0 : Err(e) => {
757 0 : if e.kind() == std::io::ErrorKind::NotFound {
758 0 : let context = format!("Creating timeline directory {timeline_path}");
759 0 : tracing::info!("{}", context);
760 0 : tokio::fs::create_dir_all(&timeline_path)
761 0 : .await
762 0 : .fatal_err(&context);
763 0 :
764 0 : // No entries to report: drop out.
765 0 : return detail;
766 : } else {
767 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
768 : }
769 : }
770 : };
771 :
772 : // As we iterate through layers found on disk, we will look up their metadata from this map.
773 : // Layers not present in metadata will be discarded.
774 0 : let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
775 0 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
776 :
777 0 : while let Some(dentry) = dir
778 0 : .next_entry()
779 0 : .await
780 0 : .fatal_err(&format!("Listing {timeline_path}"))
781 : {
782 0 : let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
783 0 : tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
784 0 : continue;
785 : };
786 0 : let local_meta = dentry
787 0 : .metadata()
788 0 : .await
789 0 : .fatal_err(&format!("Read metadata on {}", file_path));
790 0 :
791 0 : let file_name = file_path.file_name().expect("created it from the dentry");
792 0 : if file_name == METADATA_FILE_NAME {
793 : // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
794 0 : continue;
795 0 : } else if crate::is_temporary(&file_path) {
796 : // Temporary files are frequently left behind from restarting during downloads
797 0 : tracing::info!("Cleaning up temporary file {file_path}");
798 0 : if let Err(e) = tokio::fs::remove_file(&file_path)
799 0 : .await
800 0 : .or_else(fs_ext::ignore_not_found)
801 : {
802 0 : tracing::error!("Failed to remove temporary file {file_path}: {e}");
803 0 : }
804 0 : continue;
805 0 : }
806 0 :
807 0 : match LayerFileName::from_str(file_name) {
808 0 : Ok(name) => {
809 0 : let remote_meta = heatmap_metadata.get(&name);
810 0 : match remote_meta {
811 0 : Some(remote_meta) => {
812 0 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
813 0 : if local_meta.len() != remote_meta.metadata.file_size {
814 : // This should not happen, because we do crashsafe write-then-rename when downloading
815 : // layers, and layers in remote storage are immutable. Remove the local file because
816 : // we cannot trust it.
817 0 : tracing::warn!(
818 0 : "Removing local layer {name} with unexpected local size {} != {}",
819 0 : local_meta.len(),
820 0 : remote_meta.metadata.file_size
821 0 : );
822 0 : } else {
823 0 : // We expect the access time to be initialized immediately afterwards, when
824 0 : // the latest heatmap is applied to the state.
825 0 : detail.on_disk_layers.insert(
826 0 : name.clone(),
827 0 : OnDiskState::new(
828 0 : conf,
829 0 : tenant_shard_id,
830 0 : &heatmap.timeline_id,
831 0 : name,
832 0 : LayerFileMetadata::from(&remote_meta.metadata),
833 0 : remote_meta.access_time,
834 0 : ),
835 0 : );
836 0 : }
837 : }
838 : None => {
839 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
840 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
841 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
842 0 : tracing::info!(
843 0 : "Removing secondary local layer {} because it's absent in heatmap",
844 0 : name
845 0 : );
846 0 : tokio::fs::remove_file(&dentry.path())
847 0 : .await
848 0 : .or_else(fs_ext::ignore_not_found)
849 0 : .fatal_err(&format!(
850 0 : "Removing layer {}",
851 0 : dentry.path().to_string_lossy()
852 0 : ));
853 : }
854 : }
855 : }
856 : Err(_) => {
857 : // Ignore it.
858 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
859 : }
860 : }
861 : }
862 :
863 0 : detail
864 0 : }
|