Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : disk_usage_eviction_task::{
12 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
13 : },
14 : metrics::SECONDARY_MODE,
15 : tenant::{
16 : config::SecondaryLocationConfig,
17 : debug_assert_current_span_has_tenant_and_timeline_id,
18 : remote_timeline_client::{
19 : index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
20 : },
21 : span::debug_assert_current_span_has_tenant_id,
22 : storage_layer::LayerFileName,
23 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
24 : },
25 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
26 : METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
27 : };
28 :
29 : use super::{
30 : heatmap::HeatMapLayer,
31 : scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
32 : SecondaryTenant,
33 : };
34 :
35 : use crate::tenant::{
36 : mgr::TenantManager,
37 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
38 : };
39 :
40 : use chrono::format::{DelayedFormat, StrftimeItems};
41 : use futures::Future;
42 : use pageserver_api::shard::TenantShardId;
43 : use rand::Rng;
44 : use remote_storage::{DownloadError, GenericRemoteStorage};
45 :
46 : use tokio_util::sync::CancellationToken;
47 : use tracing::{info_span, instrument, Instrument};
48 : use utils::{
49 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
50 : };
51 :
52 : use super::{
53 : heatmap::{HeatMapTenant, HeatMapTimeline},
54 : CommandRequest, DownloadCommand,
55 : };
56 :
57 : /// For each tenant, how long must have passed since the last download_tenant call before
58 : /// calling it again. This is approximately the time by which local data is allowed
59 : /// to fall behind remote data.
60 : ///
61 : /// TODO: this should just be a default, and the actual period should be controlled
62 : /// via the heatmap itself
63 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
64 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
65 :
66 625 : pub(super) async fn downloader_task(
67 625 : tenant_manager: Arc<TenantManager>,
68 625 : remote_storage: GenericRemoteStorage,
69 625 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
70 625 : background_jobs_can_start: Barrier,
71 625 : cancel: CancellationToken,
72 625 : ) {
73 625 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
74 625 :
75 625 : let generator = SecondaryDownloader {
76 625 : tenant_manager,
77 625 : remote_storage,
78 625 : };
79 625 : let mut scheduler = Scheduler::new(generator, concurrency);
80 625 :
81 625 : scheduler
82 625 : .run(command_queue, background_jobs_can_start, cancel)
83 625 : .instrument(info_span!("secondary_downloads"))
84 1023 : .await
85 183 : }
86 :
87 : struct SecondaryDownloader {
88 : tenant_manager: Arc<TenantManager>,
89 : remote_storage: GenericRemoteStorage,
90 : }
91 :
92 1858 : #[derive(Debug, Clone)]
93 : pub(super) struct OnDiskState {
94 : metadata: LayerFileMetadata,
95 : access_time: SystemTime,
96 : }
97 :
98 : impl OnDiskState {
99 1262 : fn new(
100 1262 : _conf: &'static PageServerConf,
101 1262 : _tenant_shard_id: &TenantShardId,
102 1262 : _imeline_id: &TimelineId,
103 1262 : _ame: LayerFileName,
104 1262 : metadata: LayerFileMetadata,
105 1262 : access_time: SystemTime,
106 1262 : ) -> Self {
107 1262 : Self {
108 1262 : metadata,
109 1262 : access_time,
110 1262 : }
111 1262 : }
112 : }
113 :
114 7 : #[derive(Debug, Clone, Default)]
115 : pub(super) struct SecondaryDetailTimeline {
116 : pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
117 :
118 : /// We remember when layers were evicted, to prevent re-downloading them.
119 : pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
120 : }
121 :
122 : /// This state is written by the secondary downloader, it is opaque
123 : /// to TenantManager
124 0 : #[derive(Debug)]
125 : pub(super) struct SecondaryDetail {
126 : pub(super) config: SecondaryLocationConfig,
127 :
128 : last_download: Option<Instant>,
129 : next_download: Option<Instant>,
130 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
131 : }
132 :
133 : /// Helper for logging SystemTime
134 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
135 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
136 0 : datetime.format("%d/%m/%Y %T")
137 0 : }
138 :
139 : impl SecondaryDetail {
140 33 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
141 33 : Self {
142 33 : config,
143 33 : last_download: None,
144 33 : next_download: None,
145 33 : timelines: HashMap::new(),
146 33 : }
147 33 : }
148 :
149 : /// Additionally returns the total number of layers, used for more stable relative access time
150 : /// based eviction.
151 2 : pub(super) fn get_layers_for_eviction(
152 2 : &self,
153 2 : parent: &Arc<SecondaryTenant>,
154 2 : ) -> (DiskUsageEvictionInfo, usize) {
155 2 : let mut result = DiskUsageEvictionInfo::default();
156 2 : let mut total_layers = 0;
157 :
158 4 : for (timeline_id, timeline_detail) in &self.timelines {
159 2 : result
160 2 : .resident_layers
161 38 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
162 38 : EvictionCandidate {
163 38 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
164 38 : secondary_tenant: parent.clone(),
165 38 : timeline_id: *timeline_id,
166 38 : name: name.clone(),
167 38 : metadata: ods.metadata.clone(),
168 38 : }),
169 38 : last_activity_ts: ods.access_time,
170 38 : relative_last_activity: finite_f32::FiniteF32::ZERO,
171 38 : }
172 38 : }));
173 2 :
174 2 : // total might be missing currently downloading layers, but as a lower than actual
175 2 : // value it is good enough approximation.
176 2 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
177 2 : }
178 2 : result.max_layer_size = result
179 2 : .resident_layers
180 2 : .iter()
181 38 : .map(|l| l.layer.get_file_size())
182 2 : .max();
183 2 :
184 2 : tracing::debug!(
185 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
186 0 : parent.get_tenant_shard_id(),
187 0 : self.timelines.len(),
188 0 : result.resident_layers.len()
189 0 : );
190 :
191 2 : (result, total_layers)
192 2 : }
193 : }
194 :
195 : struct PendingDownload {
196 : secondary_state: Arc<SecondaryTenant>,
197 : last_download: Option<Instant>,
198 : target_time: Option<Instant>,
199 : period: Option<Duration>,
200 : }
201 :
202 : impl scheduler::PendingJob for PendingDownload {
203 25 : fn get_tenant_shard_id(&self) -> &TenantShardId {
204 25 : self.secondary_state.get_tenant_shard_id()
205 25 : }
206 : }
207 :
208 : struct RunningDownload {
209 : barrier: Barrier,
210 : }
211 :
212 : impl scheduler::RunningJob for RunningDownload {
213 6 : fn get_barrier(&self) -> Barrier {
214 6 : self.barrier.clone()
215 6 : }
216 : }
217 :
218 : struct CompleteDownload {
219 : secondary_state: Arc<SecondaryTenant>,
220 : completed_at: Instant,
221 : }
222 :
223 : impl scheduler::Completion for CompleteDownload {
224 24 : fn get_tenant_shard_id(&self) -> &TenantShardId {
225 24 : self.secondary_state.get_tenant_shard_id()
226 24 : }
227 : }
228 :
229 : type Scheduler = TenantBackgroundJobs<
230 : SecondaryDownloader,
231 : PendingDownload,
232 : RunningDownload,
233 : CompleteDownload,
234 : DownloadCommand,
235 : >;
236 :
237 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
238 : for SecondaryDownloader
239 : {
240 8 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
241 : fn on_completion(&mut self, completion: CompleteDownload) {
242 : let CompleteDownload {
243 : secondary_state,
244 : completed_at: _completed_at,
245 : } = completion;
246 :
247 0 : tracing::debug!("Secondary tenant download completed");
248 :
249 : // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
250 : // take priority to run again.
251 : let mut detail = secondary_state.detail.lock().unwrap();
252 : detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
253 : }
254 :
255 1257 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
256 1257 : let mut result = SchedulingResult {
257 1257 : jobs: Vec::new(),
258 1257 : want_interval: None,
259 1257 : };
260 1257 :
261 1257 : // Step 1: identify some tenants that we may work on
262 1257 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
263 1257 : self.tenant_manager
264 1257 : .foreach_secondary_tenants(|_id, secondary_state| {
265 10 : tenants.push(secondary_state.clone());
266 1257 : });
267 1257 :
268 1257 : // Step 2: filter out tenants which are not yet elegible to run
269 1257 : let now = Instant::now();
270 1257 : result.jobs = tenants
271 1257 : .into_iter()
272 1257 : .filter_map(|secondary_tenant| {
273 5 : let (last_download, next_download) = {
274 10 : let mut detail = secondary_tenant.detail.lock().unwrap();
275 10 :
276 10 : if !detail.config.warm {
277 : // Downloads are disabled for this tenant
278 5 : detail.next_download = None;
279 5 : return None;
280 5 : }
281 5 :
282 5 : if detail.next_download.is_none() {
283 4 : // Initialize with a jitter: this spreads initial downloads on startup
284 4 : // or mass-attach across our freshen interval.
285 4 : let jittered_period =
286 4 : rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
287 4 : detail.next_download = Some(now.checked_add(jittered_period).expect(
288 4 : "Using our constant, which is known to be small compared with clock range",
289 4 : ));
290 4 : }
291 5 : (detail.last_download, detail.next_download.unwrap())
292 5 : };
293 5 :
294 5 : if now < next_download {
295 5 : Some(PendingDownload {
296 5 : secondary_state: secondary_tenant,
297 5 : last_download,
298 5 : target_time: Some(next_download),
299 5 : period: Some(DOWNLOAD_FRESHEN_INTERVAL),
300 5 : })
301 : } else {
302 0 : None
303 : }
304 1257 : })
305 1257 : .collect();
306 1257 :
307 1257 : // Step 3: sort by target execution time to run most urgent first.
308 1257 : result.jobs.sort_by_key(|j| j.target_time);
309 1257 :
310 1257 : result
311 1257 : }
312 :
313 6 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
314 6 : let tenant_shard_id = command.get_tenant_shard_id();
315 6 :
316 6 : let tenant = self
317 6 : .tenant_manager
318 6 : .get_secondary_tenant_shard(*tenant_shard_id);
319 6 : let Some(tenant) = tenant else {
320 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
321 : };
322 :
323 6 : Ok(PendingDownload {
324 6 : target_time: None,
325 6 : period: None,
326 6 : last_download: None,
327 6 : secondary_state: tenant,
328 6 : })
329 6 : }
330 :
331 8 : fn spawn(
332 8 : &mut self,
333 8 : job: PendingDownload,
334 8 : ) -> (
335 8 : RunningDownload,
336 8 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
337 8 : ) {
338 8 : let PendingDownload {
339 8 : secondary_state,
340 8 : last_download,
341 8 : target_time,
342 8 : period,
343 8 : } = job;
344 8 :
345 8 : let (completion, barrier) = utils::completion::channel();
346 8 : let remote_storage = self.remote_storage.clone();
347 8 : let conf = self.tenant_manager.get_conf();
348 8 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
349 8 : (RunningDownload { barrier }, Box::pin(async move {
350 8 : let _completion = completion;
351 8 :
352 8 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
353 8 : .download()
354 65188 : .await
355 : {
356 : Err(UpdateError::NoData) => {
357 1 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
358 : },
359 : Err(UpdateError::NoSpace) => {
360 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
361 : }
362 : Err(UpdateError::Cancelled) => {
363 0 : tracing::debug!("Shut down while downloading");
364 : },
365 0 : Err(UpdateError::Deserialize(e)) => {
366 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
367 : },
368 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
369 0 : tracing::error!("Error while downloading tenant: {e}");
370 : },
371 7 : Ok(()) => {}
372 : };
373 :
374 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
375 :
376 : // If the job had a target execution time, we may check our final execution
377 : // time against that for observability purposes.
378 8 : if let (Some(target_time), Some(period)) = (target_time, period) {
379 : // Only track execution lag if this isn't our first download: otherwise, it is expected
380 : // that execution will have taken longer than our configured interval, for example
381 : // when starting up a pageserver and
382 2 : if last_download.is_some() {
383 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
384 0 : let elapsed = Instant::now().duration_since(target_time);
385 0 :
386 0 : warn_when_period_overrun(
387 0 : elapsed,
388 0 : period,
389 0 : BackgroundLoopKind::SecondaryDownload,
390 0 : );
391 2 : }
392 6 : }
393 :
394 8 : CompleteDownload {
395 8 : secondary_state,
396 8 : completed_at: Instant::now(),
397 8 : }
398 8 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
399 8 : }
400 : }
401 :
402 : /// This type is a convenience to group together the various functions involved in
403 : /// freshening a secondary tenant.
404 : struct TenantDownloader<'a> {
405 : conf: &'static PageServerConf,
406 : remote_storage: &'a GenericRemoteStorage,
407 : secondary_state: &'a SecondaryTenant,
408 : }
409 :
410 : /// Errors that may be encountered while updating a tenant
411 0 : #[derive(thiserror::Error, Debug)]
412 : enum UpdateError {
413 : #[error("No remote data found")]
414 : NoData,
415 : #[error("Insufficient local storage space")]
416 : NoSpace,
417 : #[error("Failed to download")]
418 : DownloadError(DownloadError),
419 : #[error(transparent)]
420 : Deserialize(#[from] serde_json::Error),
421 : #[error("Cancelled")]
422 : Cancelled,
423 : #[error(transparent)]
424 : Other(#[from] anyhow::Error),
425 : }
426 :
427 : impl From<DownloadError> for UpdateError {
428 1 : fn from(value: DownloadError) -> Self {
429 1 : match &value {
430 0 : DownloadError::Cancelled => Self::Cancelled,
431 1 : DownloadError::NotFound => Self::NoData,
432 0 : _ => Self::DownloadError(value),
433 : }
434 1 : }
435 : }
436 :
437 : impl From<std::io::Error> for UpdateError {
438 0 : fn from(value: std::io::Error) -> Self {
439 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
440 0 : UpdateError::NoSpace
441 : } else {
442 : // An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
443 0 : UpdateError::Other(anyhow::anyhow!(value))
444 : }
445 0 : }
446 : }
447 :
448 : impl<'a> TenantDownloader<'a> {
449 8 : fn new(
450 8 : conf: &'static PageServerConf,
451 8 : remote_storage: &'a GenericRemoteStorage,
452 8 : secondary_state: &'a SecondaryTenant,
453 8 : ) -> Self {
454 8 : Self {
455 8 : conf,
456 8 : remote_storage,
457 8 : secondary_state,
458 8 : }
459 8 : }
460 :
461 8 : async fn download(&self) -> Result<(), UpdateError> {
462 8 : debug_assert_current_span_has_tenant_id();
463 :
464 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
465 : // cover our access to local storage.
466 8 : let Ok(_guard) = self.secondary_state.gate.enter() else {
467 : // Shutting down
468 0 : return Ok(());
469 : };
470 :
471 8 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
472 : // Download the tenant's heatmap
473 8 : let heatmap_bytes = tokio::select!(
474 8 : bytes = self.download_heatmap() => {bytes?},
475 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
476 : );
477 :
478 7 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
479 :
480 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
481 : // layer metadata without having to re-download it.
482 7 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
483 7 :
484 7 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
485 7 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
486 7 : let heatmap_path_bg = heatmap_path.clone();
487 7 : VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
488 7 : .await
489 7 : .maybe_fatal_err(&context_msg)?;
490 :
491 0 : tracing::debug!("Wrote local heatmap to {}", heatmap_path);
492 :
493 : // Download the layers in the heatmap
494 14 : for timeline in heatmap.timelines {
495 7 : if self.secondary_state.cancel.is_cancelled() {
496 0 : return Ok(());
497 7 : }
498 7 :
499 7 : let timeline_id = timeline.timeline_id;
500 7 : self.download_timeline(timeline)
501 : .instrument(tracing::info_span!(
502 : "secondary_download_timeline",
503 : tenant_id=%tenant_shard_id.tenant_id,
504 7 : shard_id=%tenant_shard_id.shard_slug(),
505 : %timeline_id
506 : ))
507 65134 : .await?;
508 : }
509 :
510 7 : Ok(())
511 8 : }
512 :
513 8 : async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
514 8 : debug_assert_current_span_has_tenant_id();
515 8 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
516 : // TODO: make download conditional on ETag having changed since last download
517 : // (https://github.com/neondatabase/neon/issues/6199)
518 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
519 :
520 8 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
521 :
522 8 : let heatmap_bytes = backoff::retry(
523 8 : || async {
524 8 : let download = self
525 8 : .remote_storage
526 8 : .download(&heatmap_path)
527 21 : .await
528 8 : .map_err(UpdateError::from)?;
529 7 : let mut heatmap_bytes = Vec::new();
530 7 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
531 26 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
532 7 : Ok(heatmap_bytes)
533 16 : },
534 8 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
535 8 : FAILED_DOWNLOAD_WARN_THRESHOLD,
536 8 : FAILED_REMOTE_OP_RETRIES,
537 8 : "download heatmap",
538 8 : &self.secondary_state.cancel,
539 8 : )
540 47 : .await
541 8 : .ok_or_else(|| UpdateError::Cancelled)
542 8 : .and_then(|x| x)?;
543 :
544 7 : SECONDARY_MODE.download_heatmap.inc();
545 7 :
546 7 : Ok(heatmap_bytes)
547 8 : }
548 :
549 7 : async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
550 7 : debug_assert_current_span_has_tenant_and_timeline_id();
551 7 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
552 7 : let timeline_path = self
553 7 : .conf
554 7 : .timeline_path(tenant_shard_id, &timeline.timeline_id);
555 7 :
556 7 : // Accumulate updates to the state
557 7 : let mut touched = Vec::new();
558 7 :
559 7 : // Clone a view of what layers already exist on disk
560 7 : let timeline_state = self
561 7 : .secondary_state
562 7 : .detail
563 7 : .lock()
564 7 : .unwrap()
565 7 : .timelines
566 7 : .get(&timeline.timeline_id)
567 7 : .cloned();
568 :
569 7 : let timeline_state = match timeline_state {
570 3 : Some(t) => t,
571 : None => {
572 : // We have no existing state: need to scan local disk for layers first.
573 4 : let timeline_state =
574 8 : init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
575 :
576 : // Re-acquire detail lock now that we're done with async load from local FS
577 4 : self.secondary_state
578 4 : .detail
579 4 : .lock()
580 4 : .unwrap()
581 4 : .timelines
582 4 : .insert(timeline.timeline_id, timeline_state.clone());
583 4 : timeline_state
584 : }
585 : };
586 :
587 7 : let layers_in_heatmap = timeline
588 7 : .layers
589 7 : .iter()
590 3119 : .map(|l| &l.name)
591 7 : .collect::<HashSet<_>>();
592 7 : let layers_on_disk = timeline_state
593 7 : .on_disk_layers
594 7 : .iter()
595 1858 : .map(|l| l.0)
596 7 : .collect::<HashSet<_>>();
597 :
598 : // Remove on-disk layers that are no longer present in heatmap
599 7 : for layer in layers_on_disk.difference(&layers_in_heatmap) {
600 1 : let local_path = timeline_path.join(layer.to_string());
601 1 : tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
602 1 : tokio::fs::remove_file(&local_path)
603 1 : .await
604 1 : .or_else(fs_ext::ignore_not_found)
605 1 : .maybe_fatal_err("Removing secondary layer")?;
606 : }
607 :
608 : // Download heatmap layers that are not present on local disk, or update their
609 : // access time if they are already present.
610 3126 : for layer in timeline.layers {
611 3119 : if self.secondary_state.cancel.is_cancelled() {
612 0 : return Ok(());
613 3119 : }
614 :
615 : // Existing on-disk layers: just update their access time.
616 3119 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
617 0 : tracing::debug!("Layer {} is already on disk", layer.name);
618 1857 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
619 1857 : || on_disk.access_time != layer.access_time
620 : {
621 : // We already have this layer on disk. Update its access time.
622 0 : tracing::debug!(
623 0 : "Access time updated for layer {}: {} -> {}",
624 0 : layer.name,
625 0 : strftime(&on_disk.access_time),
626 0 : strftime(&layer.access_time)
627 0 : );
628 199 : touched.push(layer);
629 1658 : }
630 1857 : continue;
631 : } else {
632 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
633 : }
634 :
635 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
636 : // recently than it was evicted.
637 1262 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
638 0 : if &layer.access_time > evicted_at {
639 0 : tracing::info!(
640 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
641 0 : layer.name,
642 0 : strftime(&layer.access_time),
643 0 : strftime(evicted_at)
644 0 : );
645 : } else {
646 0 : tracing::trace!(
647 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
648 0 : layer.name,
649 0 : strftime(&layer.access_time),
650 0 : strftime(evicted_at)
651 0 : );
652 0 : continue;
653 : }
654 1262 : }
655 :
656 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
657 1262 : let downloaded_bytes = match download_layer_file(
658 1262 : self.conf,
659 1262 : self.remote_storage,
660 1262 : *tenant_shard_id,
661 1262 : timeline.timeline_id,
662 1262 : &layer.name,
663 1262 : &LayerFileMetadata::from(&layer.metadata),
664 1262 : &self.secondary_state.cancel,
665 1262 : )
666 65125 : .await
667 : {
668 1262 : Ok(bytes) => bytes,
669 0 : Err(e) => {
670 0 : if let DownloadError::NotFound = e {
671 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
672 : // This is harmless: continue to download the next layer. It is expected during compaction
673 : // GC.
674 0 : tracing::debug!(
675 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
676 0 : layer.name
677 0 : );
678 0 : continue;
679 : } else {
680 0 : return Err(e.into());
681 : }
682 : }
683 : };
684 :
685 1262 : if downloaded_bytes != layer.metadata.file_size {
686 0 : let local_path = timeline_path.join(layer.name.to_string());
687 :
688 0 : tracing::warn!(
689 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
690 0 : layer.name,
691 0 : downloaded_bytes,
692 0 : layer.metadata.file_size
693 0 : );
694 :
695 0 : tokio::fs::remove_file(&local_path)
696 0 : .await
697 0 : .or_else(fs_ext::ignore_not_found)?;
698 1262 : }
699 :
700 1262 : SECONDARY_MODE.download_layer.inc();
701 1262 : touched.push(layer)
702 : }
703 :
704 : // Write updates to state to record layers we just downloaded or touched.
705 : {
706 7 : let mut detail = self.secondary_state.detail.lock().unwrap();
707 7 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
708 :
709 7 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
710 :
711 1468 : for t in touched {
712 : use std::collections::hash_map::Entry;
713 1461 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
714 199 : Entry::Occupied(mut v) => {
715 199 : v.get_mut().access_time = t.access_time;
716 199 : }
717 1262 : Entry::Vacant(e) => {
718 1262 : e.insert(OnDiskState::new(
719 1262 : self.conf,
720 1262 : tenant_shard_id,
721 1262 : &timeline.timeline_id,
722 1262 : t.name,
723 1262 : LayerFileMetadata::from(&t.metadata),
724 1262 : t.access_time,
725 1262 : ));
726 1262 : }
727 : }
728 : }
729 : }
730 :
731 7 : Ok(())
732 7 : }
733 : }
734 :
735 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
736 4 : async fn init_timeline_state(
737 4 : conf: &'static PageServerConf,
738 4 : tenant_shard_id: &TenantShardId,
739 4 : heatmap: &HeatMapTimeline,
740 4 : ) -> SecondaryDetailTimeline {
741 4 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
742 4 : let mut detail = SecondaryDetailTimeline::default();
743 :
744 4 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
745 0 : Ok(d) => d,
746 4 : Err(e) => {
747 4 : if e.kind() == std::io::ErrorKind::NotFound {
748 4 : let context = format!("Creating timeline directory {timeline_path}");
749 4 : tracing::info!("{}", context);
750 4 : tokio::fs::create_dir_all(&timeline_path)
751 4 : .await
752 4 : .fatal_err(&context);
753 4 :
754 4 : // No entries to report: drop out.
755 4 : return detail;
756 : } else {
757 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
758 : }
759 : }
760 : };
761 :
762 : // As we iterate through layers found on disk, we will look up their metadata from this map.
763 : // Layers not present in metadata will be discarded.
764 0 : let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
765 0 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
766 :
767 0 : while let Some(dentry) = dir
768 0 : .next_entry()
769 0 : .await
770 0 : .fatal_err(&format!("Listing {timeline_path}"))
771 : {
772 0 : let dentry_file_name = dentry.file_name();
773 0 : let file_name = dentry_file_name.to_string_lossy();
774 0 : let local_meta = dentry.metadata().await.fatal_err(&format!(
775 0 : "Read metadata on {}",
776 0 : dentry.path().to_string_lossy()
777 0 : ));
778 0 :
779 0 : // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
780 0 : if file_name == METADATA_FILE_NAME {
781 0 : continue;
782 0 : }
783 0 :
784 0 : match LayerFileName::from_str(&file_name) {
785 0 : Ok(name) => {
786 0 : let remote_meta = heatmap_metadata.get(&name);
787 0 : match remote_meta {
788 0 : Some(remote_meta) => {
789 0 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
790 0 : if local_meta.len() != remote_meta.metadata.file_size {
791 : // This should not happen, because we do crashsafe write-then-rename when downloading
792 : // layers, and layers in remote storage are immutable. Remove the local file because
793 : // we cannot trust it.
794 0 : tracing::warn!(
795 0 : "Removing local layer {name} with unexpected local size {} != {}",
796 0 : local_meta.len(),
797 0 : remote_meta.metadata.file_size
798 0 : );
799 0 : } else {
800 0 : // We expect the access time to be initialized immediately afterwards, when
801 0 : // the latest heatmap is applied to the state.
802 0 : detail.on_disk_layers.insert(
803 0 : name.clone(),
804 0 : OnDiskState::new(
805 0 : conf,
806 0 : tenant_shard_id,
807 0 : &heatmap.timeline_id,
808 0 : name,
809 0 : LayerFileMetadata::from(&remote_meta.metadata),
810 0 : remote_meta.access_time,
811 0 : ),
812 0 : );
813 0 : }
814 : }
815 : None => {
816 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
817 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
818 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
819 0 : tracing::info!(
820 0 : "Removing secondary local layer {} because it's absent in heatmap",
821 0 : name
822 0 : );
823 0 : tokio::fs::remove_file(&dentry.path())
824 0 : .await
825 0 : .or_else(fs_ext::ignore_not_found)
826 0 : .fatal_err(&format!(
827 0 : "Removing layer {}",
828 0 : dentry.path().to_string_lossy()
829 0 : ));
830 : }
831 : }
832 : }
833 : Err(_) => {
834 : // Ignore it.
835 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
836 : }
837 : }
838 : }
839 :
840 0 : detail
841 4 : }
|