Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : disk_usage_eviction_task::{
12 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
13 : },
14 : metrics::SECONDARY_MODE,
15 : tenant::{
16 : config::SecondaryLocationConfig,
17 : debug_assert_current_span_has_tenant_and_timeline_id,
18 : remote_timeline_client::{
19 : index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
20 : },
21 : span::debug_assert_current_span_has_tenant_id,
22 : storage_layer::LayerFileName,
23 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
24 : },
25 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
26 : METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
27 : };
28 :
29 : use super::{
30 : heatmap::HeatMapLayer,
31 : scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
32 : SecondaryTenant,
33 : };
34 :
35 : use crate::tenant::{
36 : mgr::TenantManager,
37 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
38 : };
39 :
40 : use chrono::format::{DelayedFormat, StrftimeItems};
41 : use futures::Future;
42 : use pageserver_api::shard::TenantShardId;
43 : use rand::Rng;
44 : use remote_storage::{DownloadError, GenericRemoteStorage};
45 :
46 : use tokio_util::sync::CancellationToken;
47 : use tracing::{info_span, instrument, Instrument};
48 : use utils::{
49 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
50 : };
51 :
52 : use super::{
53 : heatmap::{HeatMapTenant, HeatMapTimeline},
54 : CommandRequest, DownloadCommand,
55 : };
56 :
57 : /// For each tenant, how long must have passed since the last download_tenant call before
58 : /// calling it again. This is approximately the time by which local data is allowed
59 : /// to fall behind remote data.
60 : ///
61 : /// TODO: this should just be a default, and the actual period should be controlled
62 : /// via the heatmap itself
63 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
64 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
65 :
66 604 : pub(super) async fn downloader_task(
67 604 : tenant_manager: Arc<TenantManager>,
68 604 : remote_storage: GenericRemoteStorage,
69 604 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
70 604 : background_jobs_can_start: Barrier,
71 604 : cancel: CancellationToken,
72 604 : ) {
73 604 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
74 604 :
75 604 : let generator = SecondaryDownloader {
76 604 : tenant_manager,
77 604 : remote_storage,
78 604 : };
79 604 : let mut scheduler = Scheduler::new(generator, concurrency);
80 604 :
81 604 : scheduler
82 604 : .run(command_queue, background_jobs_can_start, cancel)
83 604 : .instrument(info_span!("secondary_downloads"))
84 946 : .await
85 172 : }
86 :
87 : struct SecondaryDownloader {
88 : tenant_manager: Arc<TenantManager>,
89 : remote_storage: GenericRemoteStorage,
90 : }
91 :
92 1278 : #[derive(Debug, Clone)]
93 : pub(super) struct OnDiskState {
94 : metadata: LayerFileMetadata,
95 : access_time: SystemTime,
96 : }
97 :
98 : impl OnDiskState {
99 1301 : fn new(
100 1301 : _conf: &'static PageServerConf,
101 1301 : _tenant_shard_id: &TenantShardId,
102 1301 : _imeline_id: &TimelineId,
103 1301 : _ame: LayerFileName,
104 1301 : metadata: LayerFileMetadata,
105 1301 : access_time: SystemTime,
106 1301 : ) -> Self {
107 1301 : Self {
108 1301 : metadata,
109 1301 : access_time,
110 1301 : }
111 1301 : }
112 : }
113 :
114 9 : #[derive(Debug, Clone, Default)]
115 : pub(super) struct SecondaryDetailTimeline {
116 : pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
117 :
118 : /// We remember when layers were evicted, to prevent re-downloading them.
119 : pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
120 : }
121 :
122 : /// This state is written by the secondary downloader, it is opaque
123 : /// to TenantManager
124 0 : #[derive(Debug)]
125 : pub(super) struct SecondaryDetail {
126 : pub(super) config: SecondaryLocationConfig,
127 :
128 : last_download: Option<Instant>,
129 : next_download: Option<Instant>,
130 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
131 : }
132 :
133 : /// Helper for logging SystemTime
134 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
135 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
136 0 : datetime.format("%d/%m/%Y %T")
137 0 : }
138 :
139 : impl SecondaryDetail {
140 32 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
141 32 : Self {
142 32 : config,
143 32 : last_download: None,
144 32 : next_download: None,
145 32 : timelines: HashMap::new(),
146 32 : }
147 32 : }
148 :
149 2 : pub(super) fn get_layers_for_eviction(
150 2 : &self,
151 2 : parent: &Arc<SecondaryTenant>,
152 2 : ) -> DiskUsageEvictionInfo {
153 2 : let mut result = DiskUsageEvictionInfo {
154 2 : max_layer_size: None,
155 2 : resident_layers: Vec::new(),
156 2 : };
157 4 : for (timeline_id, timeline_detail) in &self.timelines {
158 2 : result
159 2 : .resident_layers
160 38 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
161 38 : EvictionCandidate {
162 38 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
163 38 : secondary_tenant: parent.clone(),
164 38 : timeline_id: *timeline_id,
165 38 : name: name.clone(),
166 38 : metadata: ods.metadata.clone(),
167 38 : }),
168 38 : last_activity_ts: ods.access_time,
169 38 : relative_last_activity: finite_f32::FiniteF32::ZERO,
170 38 : }
171 38 : }));
172 2 : }
173 2 : result.max_layer_size = result
174 2 : .resident_layers
175 2 : .iter()
176 38 : .map(|l| l.layer.get_file_size())
177 2 : .max();
178 2 :
179 2 : tracing::debug!(
180 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
181 0 : parent.get_tenant_shard_id(),
182 0 : self.timelines.len(),
183 0 : result.resident_layers.len()
184 0 : );
185 :
186 2 : result
187 2 : }
188 : }
189 :
190 : struct PendingDownload {
191 : secondary_state: Arc<SecondaryTenant>,
192 : last_download: Option<Instant>,
193 : target_time: Option<Instant>,
194 : period: Option<Duration>,
195 : }
196 :
197 : impl scheduler::PendingJob for PendingDownload {
198 29 : fn get_tenant_shard_id(&self) -> &TenantShardId {
199 29 : self.secondary_state.get_tenant_shard_id()
200 29 : }
201 : }
202 :
203 : struct RunningDownload {
204 : barrier: Barrier,
205 : }
206 :
207 : impl scheduler::RunningJob for RunningDownload {
208 6 : fn get_barrier(&self) -> Barrier {
209 6 : self.barrier.clone()
210 6 : }
211 : }
212 :
213 : struct CompleteDownload {
214 : secondary_state: Arc<SecondaryTenant>,
215 : completed_at: Instant,
216 : }
217 :
218 : impl scheduler::Completion for CompleteDownload {
219 30 : fn get_tenant_shard_id(&self) -> &TenantShardId {
220 30 : self.secondary_state.get_tenant_shard_id()
221 30 : }
222 : }
223 :
224 : type Scheduler = TenantBackgroundJobs<
225 : SecondaryDownloader,
226 : PendingDownload,
227 : RunningDownload,
228 : CompleteDownload,
229 : DownloadCommand,
230 : >;
231 :
232 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
233 : for SecondaryDownloader
234 : {
235 10 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
236 : fn on_completion(&mut self, completion: CompleteDownload) {
237 : let CompleteDownload {
238 : secondary_state,
239 : completed_at: _completed_at,
240 : } = completion;
241 :
242 0 : tracing::debug!("Secondary tenant download completed");
243 :
244 : // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
245 : // take priority to run again.
246 : let mut detail = secondary_state.detail.lock().unwrap();
247 : detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
248 : }
249 :
250 1180 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
251 1180 : let mut result = SchedulingResult {
252 1180 : jobs: Vec::new(),
253 1180 : want_interval: None,
254 1180 : };
255 1180 :
256 1180 : // Step 1: identify some tenants that we may work on
257 1180 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
258 1180 : self.tenant_manager
259 1180 : .foreach_secondary_tenants(|_id, secondary_state| {
260 11 : tenants.push(secondary_state.clone());
261 1180 : });
262 1180 :
263 1180 : // Step 2: filter out tenants which are not yet elegible to run
264 1180 : let now = Instant::now();
265 1180 : result.jobs = tenants
266 1180 : .into_iter()
267 1180 : .filter_map(|secondary_tenant| {
268 7 : let (last_download, next_download) = {
269 11 : let mut detail = secondary_tenant.detail.lock().unwrap();
270 11 :
271 11 : if !detail.config.warm {
272 : // Downloads are disabled for this tenant
273 4 : detail.next_download = None;
274 4 : return None;
275 7 : }
276 7 :
277 7 : if detail.next_download.is_none() {
278 6 : // Initialize with a jitter: this spreads initial downloads on startup
279 6 : // or mass-attach across our freshen interval.
280 6 : let jittered_period =
281 6 : rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
282 6 : detail.next_download = Some(now.checked_add(jittered_period).expect(
283 6 : "Using our constant, which is known to be small compared with clock range",
284 6 : ));
285 6 : }
286 7 : (detail.last_download, detail.next_download.unwrap())
287 7 : };
288 7 :
289 7 : if now < next_download {
290 7 : Some(PendingDownload {
291 7 : secondary_state: secondary_tenant,
292 7 : last_download,
293 7 : target_time: Some(next_download),
294 7 : period: Some(DOWNLOAD_FRESHEN_INTERVAL),
295 7 : })
296 : } else {
297 0 : None
298 : }
299 1180 : })
300 1180 : .collect();
301 1180 :
302 1180 : // Step 3: sort by target execution time to run most urgent first.
303 1180 : result.jobs.sort_by_key(|j| j.target_time);
304 1180 :
305 1180 : result
306 1180 : }
307 :
308 6 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
309 6 : let tenant_shard_id = command.get_tenant_shard_id();
310 6 :
311 6 : let tenant = self
312 6 : .tenant_manager
313 6 : .get_secondary_tenant_shard(*tenant_shard_id);
314 6 : let Some(tenant) = tenant else {
315 : {
316 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
317 : }
318 : };
319 :
320 6 : Ok(PendingDownload {
321 6 : target_time: None,
322 6 : period: None,
323 6 : last_download: None,
324 6 : secondary_state: tenant,
325 6 : })
326 6 : }
327 :
328 10 : fn spawn(
329 10 : &mut self,
330 10 : job: PendingDownload,
331 10 : ) -> (
332 10 : RunningDownload,
333 10 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
334 10 : ) {
335 10 : let PendingDownload {
336 10 : secondary_state,
337 10 : last_download,
338 10 : target_time,
339 10 : period,
340 10 : } = job;
341 10 :
342 10 : let (completion, barrier) = utils::completion::channel();
343 10 : let remote_storage = self.remote_storage.clone();
344 10 : let conf = self.tenant_manager.get_conf();
345 10 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
346 10 : (RunningDownload { barrier }, Box::pin(async move {
347 10 : let _completion = completion;
348 10 :
349 10 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
350 10 : .download()
351 64798 : .await
352 : {
353 : Err(UpdateError::NoData) => {
354 1 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
355 : },
356 : Err(UpdateError::NoSpace) => {
357 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
358 : }
359 : Err(UpdateError::Cancelled) => {
360 0 : tracing::debug!("Shut down while downloading");
361 : },
362 0 : Err(UpdateError::Deserialize(e)) => {
363 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
364 : },
365 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
366 0 : tracing::error!("Error while downloading tenant: {e}");
367 : },
368 9 : Ok(()) => {}
369 : };
370 :
371 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
372 :
373 : // If the job had a target execution time, we may check our final execution
374 : // time against that for observability purposes.
375 10 : if let (Some(target_time), Some(period)) = (target_time, period) {
376 : // Only track execution lag if this isn't our first download: otherwise, it is expected
377 : // that execution will have taken longer than our configured interval, for example
378 : // when starting up a pageserver and
379 4 : if last_download.is_some() {
380 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
381 0 : let elapsed = Instant::now().duration_since(target_time);
382 0 :
383 0 : warn_when_period_overrun(
384 0 : elapsed,
385 0 : period,
386 0 : BackgroundLoopKind::SecondaryDownload,
387 0 : );
388 4 : }
389 6 : }
390 :
391 10 : CompleteDownload {
392 10 : secondary_state,
393 10 : completed_at: Instant::now(),
394 10 : }
395 10 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
396 10 : }
397 : }
398 :
399 : /// This type is a convenience to group together the various functions involved in
400 : /// freshening a secondary tenant.
401 : struct TenantDownloader<'a> {
402 : conf: &'static PageServerConf,
403 : remote_storage: &'a GenericRemoteStorage,
404 : secondary_state: &'a SecondaryTenant,
405 : }
406 :
407 : /// Errors that may be encountered while updating a tenant
408 0 : #[derive(thiserror::Error, Debug)]
409 : enum UpdateError {
410 : #[error("No remote data found")]
411 : NoData,
412 : #[error("Insufficient local storage space")]
413 : NoSpace,
414 : #[error("Failed to download")]
415 : DownloadError(DownloadError),
416 : #[error(transparent)]
417 : Deserialize(#[from] serde_json::Error),
418 : #[error("Cancelled")]
419 : Cancelled,
420 : #[error(transparent)]
421 : Other(#[from] anyhow::Error),
422 : }
423 :
424 : impl From<DownloadError> for UpdateError {
425 1 : fn from(value: DownloadError) -> Self {
426 1 : match &value {
427 0 : DownloadError::Cancelled => Self::Cancelled,
428 1 : DownloadError::NotFound => Self::NoData,
429 0 : _ => Self::DownloadError(value),
430 : }
431 1 : }
432 : }
433 :
434 : impl From<std::io::Error> for UpdateError {
435 0 : fn from(value: std::io::Error) -> Self {
436 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
437 0 : UpdateError::NoSpace
438 : } else {
439 : // An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
440 0 : UpdateError::Other(anyhow::anyhow!(value))
441 : }
442 0 : }
443 : }
444 :
445 : impl<'a> TenantDownloader<'a> {
446 10 : fn new(
447 10 : conf: &'static PageServerConf,
448 10 : remote_storage: &'a GenericRemoteStorage,
449 10 : secondary_state: &'a SecondaryTenant,
450 10 : ) -> Self {
451 10 : Self {
452 10 : conf,
453 10 : remote_storage,
454 10 : secondary_state,
455 10 : }
456 10 : }
457 :
458 10 : async fn download(&self) -> Result<(), UpdateError> {
459 10 : debug_assert_current_span_has_tenant_id();
460 :
461 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
462 : // cover our access to local storage.
463 10 : let Ok(_guard) = self.secondary_state.gate.enter() else {
464 : // Shutting down
465 0 : return Ok(());
466 : };
467 :
468 10 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
469 : // Download the tenant's heatmap
470 10 : let heatmap_bytes = tokio::select!(
471 10 : bytes = self.download_heatmap() => {bytes?},
472 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
473 : );
474 :
475 9 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
476 :
477 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
478 : // layer metadata without having to re-download it.
479 9 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
480 9 :
481 9 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
482 9 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
483 9 : let heatmap_path_bg = heatmap_path.clone();
484 9 : tokio::task::spawn_blocking(move || {
485 9 : tokio::runtime::Handle::current().block_on(async move {
486 9 : VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
487 9 : })
488 9 : })
489 9 : .await
490 9 : .expect("Blocking task is never aborted")
491 9 : .maybe_fatal_err(&context_msg)?;
492 :
493 0 : tracing::debug!("Wrote local heatmap to {}", heatmap_path);
494 :
495 : // Download the layers in the heatmap
496 18 : for timeline in heatmap.timelines {
497 9 : if self.secondary_state.cancel.is_cancelled() {
498 0 : return Ok(());
499 9 : }
500 9 :
501 9 : let timeline_id = timeline.timeline_id;
502 9 : self.download_timeline(timeline)
503 : .instrument(tracing::info_span!(
504 : "secondary_download_timeline",
505 : tenant_id=%tenant_shard_id.tenant_id,
506 9 : shard_id=%tenant_shard_id.shard_slug(),
507 : %timeline_id
508 : ))
509 64740 : .await?;
510 : }
511 :
512 9 : Ok(())
513 10 : }
514 :
515 10 : async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
516 10 : debug_assert_current_span_has_tenant_id();
517 10 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
518 : // TODO: make download conditional on ETag having changed since last download
519 : // (https://github.com/neondatabase/neon/issues/6199)
520 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
521 :
522 10 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
523 :
524 10 : let heatmap_bytes = backoff::retry(
525 10 : || async {
526 10 : let download = self
527 10 : .remote_storage
528 10 : .download(&heatmap_path)
529 21 : .await
530 10 : .map_err(UpdateError::from)?;
531 9 : let mut heatmap_bytes = Vec::new();
532 9 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
533 28 : let _size = tokio::io::copy(&mut body, &mut heatmap_bytes).await?;
534 9 : Ok(heatmap_bytes)
535 10 : },
536 10 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
537 10 : FAILED_DOWNLOAD_WARN_THRESHOLD,
538 10 : FAILED_REMOTE_OP_RETRIES,
539 10 : "download heatmap",
540 10 : &self.secondary_state.cancel,
541 10 : )
542 49 : .await
543 10 : .ok_or_else(|| UpdateError::Cancelled)
544 10 : .and_then(|x| x)?;
545 :
546 9 : SECONDARY_MODE.download_heatmap.inc();
547 9 :
548 9 : Ok(heatmap_bytes)
549 10 : }
550 :
551 9 : async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
552 9 : debug_assert_current_span_has_tenant_and_timeline_id();
553 9 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
554 9 : let timeline_path = self
555 9 : .conf
556 9 : .timeline_path(tenant_shard_id, &timeline.timeline_id);
557 9 :
558 9 : // Accumulate updates to the state
559 9 : let mut touched = Vec::new();
560 9 :
561 9 : // Clone a view of what layers already exist on disk
562 9 : let timeline_state = self
563 9 : .secondary_state
564 9 : .detail
565 9 : .lock()
566 9 : .unwrap()
567 9 : .timelines
568 9 : .get(&timeline.timeline_id)
569 9 : .cloned();
570 :
571 9 : let timeline_state = match timeline_state {
572 3 : Some(t) => t,
573 : None => {
574 : // We have no existing state: need to scan local disk for layers first.
575 6 : let timeline_state =
576 48 : init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
577 :
578 : // Re-acquire detail lock now that we're done with async load from local FS
579 6 : self.secondary_state
580 6 : .detail
581 6 : .lock()
582 6 : .unwrap()
583 6 : .timelines
584 6 : .insert(timeline.timeline_id, timeline_state.clone());
585 6 : timeline_state
586 : }
587 : };
588 :
589 9 : let layers_in_heatmap = timeline
590 9 : .layers
591 9 : .iter()
592 2540 : .map(|l| &l.name)
593 9 : .collect::<HashSet<_>>();
594 9 : let layers_on_disk = timeline_state
595 9 : .on_disk_layers
596 9 : .iter()
597 1278 : .map(|l| l.0)
598 9 : .collect::<HashSet<_>>();
599 :
600 : // Remove on-disk layers that are no longer present in heatmap
601 9 : for layer in layers_on_disk.difference(&layers_in_heatmap) {
602 1 : let local_path = timeline_path.join(layer.to_string());
603 1 : tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
604 1 : tokio::fs::remove_file(&local_path)
605 1 : .await
606 1 : .or_else(fs_ext::ignore_not_found)
607 1 : .maybe_fatal_err("Removing secondary layer")?;
608 : }
609 :
610 : // Download heatmap layers that are not present on local disk, or update their
611 : // access time if they are already present.
612 2549 : for layer in timeline.layers {
613 2540 : if self.secondary_state.cancel.is_cancelled() {
614 0 : return Ok(());
615 2540 : }
616 :
617 : // Existing on-disk layers: just update their access time.
618 2540 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
619 0 : tracing::debug!("Layer {} is already on disk", layer.name);
620 1277 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
621 1277 : || on_disk.access_time != layer.access_time
622 : {
623 : // We already have this layer on disk. Update its access time.
624 0 : tracing::debug!(
625 0 : "Access time updated for layer {}: {} -> {}",
626 0 : layer.name,
627 0 : strftime(&on_disk.access_time),
628 0 : strftime(&layer.access_time)
629 0 : );
630 200 : touched.push(layer);
631 1077 : }
632 1277 : continue;
633 : } else {
634 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
635 : }
636 :
637 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
638 : // recently than it was evicted.
639 1263 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
640 0 : if &layer.access_time > evicted_at {
641 0 : tracing::info!(
642 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
643 0 : layer.name,
644 0 : strftime(&layer.access_time),
645 0 : strftime(evicted_at)
646 0 : );
647 : } else {
648 0 : tracing::trace!(
649 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
650 0 : layer.name,
651 0 : strftime(&layer.access_time),
652 0 : strftime(evicted_at)
653 0 : );
654 0 : continue;
655 : }
656 1263 : }
657 :
658 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
659 1263 : let downloaded_bytes = match download_layer_file(
660 1263 : self.conf,
661 1263 : self.remote_storage,
662 1263 : *tenant_shard_id,
663 1263 : timeline.timeline_id,
664 1263 : &layer.name,
665 1263 : &LayerFileMetadata::from(&layer.metadata),
666 1263 : &self.secondary_state.cancel,
667 1263 : )
668 64691 : .await
669 : {
670 1263 : Ok(bytes) => bytes,
671 0 : Err(e) => {
672 0 : if let DownloadError::NotFound = e {
673 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
674 : // This is harmless: continue to download the next layer. It is expected during compaction
675 : // GC.
676 0 : tracing::debug!(
677 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
678 0 : layer.name
679 0 : );
680 0 : continue;
681 : } else {
682 0 : return Err(e.into());
683 : }
684 : }
685 : };
686 :
687 1263 : if downloaded_bytes != layer.metadata.file_size {
688 0 : let local_path = timeline_path.join(layer.name.to_string());
689 :
690 0 : tracing::warn!(
691 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
692 0 : layer.name,
693 0 : downloaded_bytes,
694 0 : layer.metadata.file_size
695 0 : );
696 :
697 0 : tokio::fs::remove_file(&local_path)
698 0 : .await
699 0 : .or_else(fs_ext::ignore_not_found)?;
700 1263 : }
701 :
702 1263 : SECONDARY_MODE.download_layer.inc();
703 1263 : touched.push(layer)
704 : }
705 :
706 : // Write updates to state to record layers we just downloaded or touched.
707 : {
708 9 : let mut detail = self.secondary_state.detail.lock().unwrap();
709 9 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
710 :
711 9 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
712 :
713 1472 : for t in touched {
714 : use std::collections::hash_map::Entry;
715 1463 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
716 200 : Entry::Occupied(mut v) => {
717 200 : v.get_mut().access_time = t.access_time;
718 200 : }
719 1263 : Entry::Vacant(e) => {
720 1263 : e.insert(OnDiskState::new(
721 1263 : self.conf,
722 1263 : tenant_shard_id,
723 1263 : &timeline.timeline_id,
724 1263 : t.name,
725 1263 : LayerFileMetadata::from(&t.metadata),
726 1263 : t.access_time,
727 1263 : ));
728 1263 : }
729 : }
730 : }
731 : }
732 :
733 9 : Ok(())
734 9 : }
735 : }
736 :
737 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
738 6 : async fn init_timeline_state(
739 6 : conf: &'static PageServerConf,
740 6 : tenant_shard_id: &TenantShardId,
741 6 : heatmap: &HeatMapTimeline,
742 6 : ) -> SecondaryDetailTimeline {
743 6 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
744 6 : let mut detail = SecondaryDetailTimeline::default();
745 :
746 6 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
747 2 : Ok(d) => d,
748 4 : Err(e) => {
749 4 : if e.kind() == std::io::ErrorKind::NotFound {
750 4 : let context = format!("Creating timeline directory {timeline_path}");
751 4 : tracing::info!("{}", context);
752 4 : tokio::fs::create_dir_all(&timeline_path)
753 4 : .await
754 4 : .fatal_err(&context);
755 4 :
756 4 : // No entries to report: drop out.
757 4 : return detail;
758 : } else {
759 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
760 : }
761 : }
762 : };
763 :
764 : // As we iterate through layers found on disk, we will look up their metadata from this map.
765 : // Layers not present in metadata will be discarded.
766 2 : let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
767 38 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
768 :
769 40 : while let Some(dentry) = dir
770 40 : .next_entry()
771 0 : .await
772 40 : .fatal_err(&format!("Listing {timeline_path}"))
773 : {
774 38 : let dentry_file_name = dentry.file_name();
775 38 : let file_name = dentry_file_name.to_string_lossy();
776 38 : let local_meta = dentry.metadata().await.fatal_err(&format!(
777 38 : "Read metadata on {}",
778 38 : dentry.path().to_string_lossy()
779 38 : ));
780 38 :
781 38 : // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
782 38 : if file_name == METADATA_FILE_NAME {
783 0 : continue;
784 38 : }
785 38 :
786 38 : match LayerFileName::from_str(&file_name) {
787 38 : Ok(name) => {
788 38 : let remote_meta = heatmap_metadata.get(&name);
789 38 : match remote_meta {
790 38 : Some(remote_meta) => {
791 38 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
792 38 : if local_meta.len() != remote_meta.metadata.file_size {
793 : // This should not happen, because we do crashsafe write-then-rename when downloading
794 : // layers, and layers in remote storage are immutable. Remove the local file because
795 : // we cannot trust it.
796 0 : tracing::warn!(
797 0 : "Removing local layer {name} with unexpected local size {} != {}",
798 0 : local_meta.len(),
799 0 : remote_meta.metadata.file_size
800 0 : );
801 38 : } else {
802 38 : // We expect the access time to be initialized immediately afterwards, when
803 38 : // the latest heatmap is applied to the state.
804 38 : detail.on_disk_layers.insert(
805 38 : name.clone(),
806 38 : OnDiskState::new(
807 38 : conf,
808 38 : tenant_shard_id,
809 38 : &heatmap.timeline_id,
810 38 : name,
811 38 : LayerFileMetadata::from(&remote_meta.metadata),
812 38 : remote_meta.access_time,
813 38 : ),
814 38 : );
815 38 : }
816 : }
817 : None => {
818 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
819 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
820 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
821 0 : tracing::info!(
822 0 : "Removing secondary local layer {} because it's absent in heatmap",
823 0 : name
824 0 : );
825 0 : tokio::fs::remove_file(&dentry.path())
826 0 : .await
827 0 : .or_else(fs_ext::ignore_not_found)
828 0 : .fatal_err(&format!(
829 0 : "Removing layer {}",
830 0 : dentry.path().to_string_lossy()
831 0 : ));
832 : }
833 : }
834 : }
835 : Err(_) => {
836 : // Ignore it.
837 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
838 : }
839 : }
840 : }
841 :
842 2 : detail
843 6 : }
|