Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : disk_usage_eviction_task::{
12 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
13 : },
14 : metrics::SECONDARY_MODE,
15 : tenant::{
16 : config::SecondaryLocationConfig,
17 : debug_assert_current_span_has_tenant_and_timeline_id,
18 : remote_timeline_client::{
19 : index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
20 : },
21 : span::debug_assert_current_span_has_tenant_id,
22 : storage_layer::LayerFileName,
23 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
24 : },
25 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
26 : METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
27 : };
28 :
29 : use super::{
30 : heatmap::HeatMapLayer,
31 : scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
32 : SecondaryTenant,
33 : };
34 :
35 : use crate::tenant::{
36 : mgr::TenantManager,
37 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
38 : };
39 :
40 : use chrono::format::{DelayedFormat, StrftimeItems};
41 : use futures::Future;
42 : use pageserver_api::shard::TenantShardId;
43 : use rand::Rng;
44 : use remote_storage::{DownloadError, GenericRemoteStorage};
45 :
46 : use tokio_util::sync::CancellationToken;
47 : use tracing::{info_span, instrument, Instrument};
48 : use utils::{
49 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
50 : };
51 :
52 : use super::{
53 : heatmap::{HeatMapTenant, HeatMapTimeline},
54 : CommandRequest, DownloadCommand,
55 : };
56 :
57 : /// For each tenant, how long must have passed since the last download_tenant call before
58 : /// calling it again. This is approximately the time by which local data is allowed
59 : /// to fall behind remote data.
60 : ///
61 : /// TODO: this should just be a default, and the actual period should be controlled
62 : /// via the heatmap itself
63 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
64 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
65 :
66 624 : pub(super) async fn downloader_task(
67 624 : tenant_manager: Arc<TenantManager>,
68 624 : remote_storage: GenericRemoteStorage,
69 624 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
70 624 : background_jobs_can_start: Barrier,
71 624 : cancel: CancellationToken,
72 624 : ) {
73 624 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
74 624 :
75 624 : let generator = SecondaryDownloader {
76 624 : tenant_manager,
77 624 : remote_storage,
78 624 : };
79 624 : let mut scheduler = Scheduler::new(generator, concurrency);
80 624 :
81 624 : scheduler
82 624 : .run(command_queue, background_jobs_can_start, cancel)
83 624 : .instrument(info_span!("secondary_downloads"))
84 971 : .await
85 182 : }
86 :
87 : struct SecondaryDownloader {
88 : tenant_manager: Arc<TenantManager>,
89 : remote_storage: GenericRemoteStorage,
90 : }
91 :
92 1261 : #[derive(Debug, Clone)]
93 : pub(super) struct OnDiskState {
94 : metadata: LayerFileMetadata,
95 : access_time: SystemTime,
96 : }
97 :
98 : impl OnDiskState {
99 1300 : fn new(
100 1300 : _conf: &'static PageServerConf,
101 1300 : _tenant_shard_id: &TenantShardId,
102 1300 : _imeline_id: &TimelineId,
103 1300 : _ame: LayerFileName,
104 1300 : metadata: LayerFileMetadata,
105 1300 : access_time: SystemTime,
106 1300 : ) -> Self {
107 1300 : Self {
108 1300 : metadata,
109 1300 : access_time,
110 1300 : }
111 1300 : }
112 : }
113 :
114 8 : #[derive(Debug, Clone, Default)]
115 : pub(super) struct SecondaryDetailTimeline {
116 : pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
117 :
118 : /// We remember when layers were evicted, to prevent re-downloading them.
119 : pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
120 : }
121 :
122 : /// This state is written by the secondary downloader, it is opaque
123 : /// to TenantManager
124 0 : #[derive(Debug)]
125 : pub(super) struct SecondaryDetail {
126 : pub(super) config: SecondaryLocationConfig,
127 :
128 : last_download: Option<Instant>,
129 : next_download: Option<Instant>,
130 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
131 : }
132 :
133 : /// Helper for logging SystemTime
134 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
135 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
136 0 : datetime.format("%d/%m/%Y %T")
137 0 : }
138 :
139 : impl SecondaryDetail {
140 35 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
141 35 : Self {
142 35 : config,
143 35 : last_download: None,
144 35 : next_download: None,
145 35 : timelines: HashMap::new(),
146 35 : }
147 35 : }
148 :
149 : /// Additionally returns the total number of layers, used for more stable relative access time
150 : /// based eviction.
151 2 : pub(super) fn get_layers_for_eviction(
152 2 : &self,
153 2 : parent: &Arc<SecondaryTenant>,
154 2 : ) -> (DiskUsageEvictionInfo, usize) {
155 2 : let mut result = DiskUsageEvictionInfo::default();
156 2 : let mut total_layers = 0;
157 :
158 4 : for (timeline_id, timeline_detail) in &self.timelines {
159 2 : result
160 2 : .resident_layers
161 38 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
162 38 : EvictionCandidate {
163 38 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
164 38 : secondary_tenant: parent.clone(),
165 38 : timeline_id: *timeline_id,
166 38 : name: name.clone(),
167 38 : metadata: ods.metadata.clone(),
168 38 : }),
169 38 : last_activity_ts: ods.access_time,
170 38 : relative_last_activity: finite_f32::FiniteF32::ZERO,
171 38 : }
172 38 : }));
173 2 :
174 2 : // total might be missing currently downloading layers, but as a lower than actual
175 2 : // value it is good enough approximation.
176 2 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
177 2 : }
178 2 : result.max_layer_size = result
179 2 : .resident_layers
180 2 : .iter()
181 38 : .map(|l| l.layer.get_file_size())
182 2 : .max();
183 2 :
184 2 : tracing::debug!(
185 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
186 0 : parent.get_tenant_shard_id(),
187 0 : self.timelines.len(),
188 0 : result.resident_layers.len()
189 0 : );
190 :
191 2 : (result, total_layers)
192 2 : }
193 : }
194 :
195 : struct PendingDownload {
196 : secondary_state: Arc<SecondaryTenant>,
197 : last_download: Option<Instant>,
198 : target_time: Option<Instant>,
199 : period: Option<Duration>,
200 : }
201 :
202 : impl scheduler::PendingJob for PendingDownload {
203 28 : fn get_tenant_shard_id(&self) -> &TenantShardId {
204 28 : self.secondary_state.get_tenant_shard_id()
205 28 : }
206 : }
207 :
208 : struct RunningDownload {
209 : barrier: Barrier,
210 : }
211 :
212 : impl scheduler::RunningJob for RunningDownload {
213 6 : fn get_barrier(&self) -> Barrier {
214 6 : self.barrier.clone()
215 6 : }
216 : }
217 :
218 : struct CompleteDownload {
219 : secondary_state: Arc<SecondaryTenant>,
220 : completed_at: Instant,
221 : }
222 :
223 : impl scheduler::Completion for CompleteDownload {
224 30 : fn get_tenant_shard_id(&self) -> &TenantShardId {
225 30 : self.secondary_state.get_tenant_shard_id()
226 30 : }
227 : }
228 :
229 : type Scheduler = TenantBackgroundJobs<
230 : SecondaryDownloader,
231 : PendingDownload,
232 : RunningDownload,
233 : CompleteDownload,
234 : DownloadCommand,
235 : >;
236 :
237 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
238 : for SecondaryDownloader
239 : {
240 10 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
241 : fn on_completion(&mut self, completion: CompleteDownload) {
242 : let CompleteDownload {
243 : secondary_state,
244 : completed_at: _completed_at,
245 : } = completion;
246 :
247 0 : tracing::debug!("Secondary tenant download completed");
248 :
249 : // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
250 : // take priority to run again.
251 : let mut detail = secondary_state.detail.lock().unwrap();
252 : detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
253 : }
254 :
255 1198 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
256 1198 : let mut result = SchedulingResult {
257 1198 : jobs: Vec::new(),
258 1198 : want_interval: None,
259 1198 : };
260 1198 :
261 1198 : // Step 1: identify some tenants that we may work on
262 1198 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
263 1198 : self.tenant_manager
264 1198 : .foreach_secondary_tenants(|_id, secondary_state| {
265 10 : tenants.push(secondary_state.clone());
266 1198 : });
267 1198 :
268 1198 : // Step 2: filter out tenants which are not yet elegible to run
269 1198 : let now = Instant::now();
270 1198 : result.jobs = tenants
271 1198 : .into_iter()
272 1198 : .filter_map(|secondary_tenant| {
273 6 : let (last_download, next_download) = {
274 10 : let mut detail = secondary_tenant.detail.lock().unwrap();
275 10 :
276 10 : if !detail.config.warm {
277 : // Downloads are disabled for this tenant
278 4 : detail.next_download = None;
279 4 : return None;
280 6 : }
281 6 :
282 6 : if detail.next_download.is_none() {
283 6 : // Initialize with a jitter: this spreads initial downloads on startup
284 6 : // or mass-attach across our freshen interval.
285 6 : let jittered_period =
286 6 : rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
287 6 : detail.next_download = Some(now.checked_add(jittered_period).expect(
288 6 : "Using our constant, which is known to be small compared with clock range",
289 6 : ));
290 6 : }
291 6 : (detail.last_download, detail.next_download.unwrap())
292 6 : };
293 6 :
294 6 : if now < next_download {
295 6 : Some(PendingDownload {
296 6 : secondary_state: secondary_tenant,
297 6 : last_download,
298 6 : target_time: Some(next_download),
299 6 : period: Some(DOWNLOAD_FRESHEN_INTERVAL),
300 6 : })
301 : } else {
302 0 : None
303 : }
304 1198 : })
305 1198 : .collect();
306 1198 :
307 1198 : // Step 3: sort by target execution time to run most urgent first.
308 1198 : result.jobs.sort_by_key(|j| j.target_time);
309 1198 :
310 1198 : result
311 1198 : }
312 :
313 6 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
314 6 : let tenant_shard_id = command.get_tenant_shard_id();
315 6 :
316 6 : let tenant = self
317 6 : .tenant_manager
318 6 : .get_secondary_tenant_shard(*tenant_shard_id);
319 6 : let Some(tenant) = tenant else {
320 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
321 : };
322 :
323 6 : Ok(PendingDownload {
324 6 : target_time: None,
325 6 : period: None,
326 6 : last_download: None,
327 6 : secondary_state: tenant,
328 6 : })
329 6 : }
330 :
331 10 : fn spawn(
332 10 : &mut self,
333 10 : job: PendingDownload,
334 10 : ) -> (
335 10 : RunningDownload,
336 10 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
337 10 : ) {
338 10 : let PendingDownload {
339 10 : secondary_state,
340 10 : last_download,
341 10 : target_time,
342 10 : period,
343 10 : } = job;
344 10 :
345 10 : let (completion, barrier) = utils::completion::channel();
346 10 : let remote_storage = self.remote_storage.clone();
347 10 : let conf = self.tenant_manager.get_conf();
348 10 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
349 10 : (RunningDownload { barrier }, Box::pin(async move {
350 10 : let _completion = completion;
351 10 :
352 10 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
353 10 : .download()
354 63857 : .await
355 : {
356 : Err(UpdateError::NoData) => {
357 2 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
358 : },
359 : Err(UpdateError::NoSpace) => {
360 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
361 : }
362 : Err(UpdateError::Cancelled) => {
363 0 : tracing::debug!("Shut down while downloading");
364 : },
365 0 : Err(UpdateError::Deserialize(e)) => {
366 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
367 : },
368 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
369 0 : tracing::error!("Error while downloading tenant: {e}");
370 : },
371 8 : Ok(()) => {}
372 : };
373 :
374 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
375 :
376 : // If the job had a target execution time, we may check our final execution
377 : // time against that for observability purposes.
378 10 : if let (Some(target_time), Some(period)) = (target_time, period) {
379 : // Only track execution lag if this isn't our first download: otherwise, it is expected
380 : // that execution will have taken longer than our configured interval, for example
381 : // when starting up a pageserver and
382 4 : if last_download.is_some() {
383 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
384 0 : let elapsed = Instant::now().duration_since(target_time);
385 0 :
386 0 : warn_when_period_overrun(
387 0 : elapsed,
388 0 : period,
389 0 : BackgroundLoopKind::SecondaryDownload,
390 0 : );
391 4 : }
392 6 : }
393 :
394 10 : CompleteDownload {
395 10 : secondary_state,
396 10 : completed_at: Instant::now(),
397 10 : }
398 10 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
399 10 : }
400 : }
401 :
402 : /// This type is a convenience to group together the various functions involved in
403 : /// freshening a secondary tenant.
404 : struct TenantDownloader<'a> {
405 : conf: &'static PageServerConf,
406 : remote_storage: &'a GenericRemoteStorage,
407 : secondary_state: &'a SecondaryTenant,
408 : }
409 :
410 : /// Errors that may be encountered while updating a tenant
411 0 : #[derive(thiserror::Error, Debug)]
412 : enum UpdateError {
413 : #[error("No remote data found")]
414 : NoData,
415 : #[error("Insufficient local storage space")]
416 : NoSpace,
417 : #[error("Failed to download")]
418 : DownloadError(DownloadError),
419 : #[error(transparent)]
420 : Deserialize(#[from] serde_json::Error),
421 : #[error("Cancelled")]
422 : Cancelled,
423 : #[error(transparent)]
424 : Other(#[from] anyhow::Error),
425 : }
426 :
427 : impl From<DownloadError> for UpdateError {
428 2 : fn from(value: DownloadError) -> Self {
429 2 : match &value {
430 0 : DownloadError::Cancelled => Self::Cancelled,
431 2 : DownloadError::NotFound => Self::NoData,
432 0 : _ => Self::DownloadError(value),
433 : }
434 2 : }
435 : }
436 :
437 : impl From<std::io::Error> for UpdateError {
438 0 : fn from(value: std::io::Error) -> Self {
439 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
440 0 : UpdateError::NoSpace
441 : } else {
442 : // An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
443 0 : UpdateError::Other(anyhow::anyhow!(value))
444 : }
445 0 : }
446 : }
447 :
448 : impl<'a> TenantDownloader<'a> {
449 10 : fn new(
450 10 : conf: &'static PageServerConf,
451 10 : remote_storage: &'a GenericRemoteStorage,
452 10 : secondary_state: &'a SecondaryTenant,
453 10 : ) -> Self {
454 10 : Self {
455 10 : conf,
456 10 : remote_storage,
457 10 : secondary_state,
458 10 : }
459 10 : }
460 :
461 10 : async fn download(&self) -> Result<(), UpdateError> {
462 10 : debug_assert_current_span_has_tenant_id();
463 :
464 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
465 : // cover our access to local storage.
466 10 : let Ok(_guard) = self.secondary_state.gate.enter() else {
467 : // Shutting down
468 0 : return Ok(());
469 : };
470 :
471 10 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
472 : // Download the tenant's heatmap
473 10 : let heatmap_bytes = tokio::select!(
474 10 : bytes = self.download_heatmap() => {bytes?},
475 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
476 : );
477 :
478 8 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
479 :
480 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
481 : // layer metadata without having to re-download it.
482 8 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
483 8 :
484 8 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
485 8 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
486 8 : let heatmap_path_bg = heatmap_path.clone();
487 8 : tokio::task::spawn_blocking(move || {
488 8 : tokio::runtime::Handle::current().block_on(async move {
489 8 : VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
490 8 : })
491 8 : })
492 8 : .await
493 8 : .expect("Blocking task is never aborted")
494 8 : .maybe_fatal_err(&context_msg)?;
495 :
496 0 : tracing::debug!("Wrote local heatmap to {}", heatmap_path);
497 :
498 : // Download the layers in the heatmap
499 16 : for timeline in heatmap.timelines {
500 8 : if self.secondary_state.cancel.is_cancelled() {
501 0 : return Ok(());
502 8 : }
503 8 :
504 8 : let timeline_id = timeline.timeline_id;
505 8 : self.download_timeline(timeline)
506 : .instrument(tracing::info_span!(
507 : "secondary_download_timeline",
508 : tenant_id=%tenant_shard_id.tenant_id,
509 8 : shard_id=%tenant_shard_id.shard_slug(),
510 : %timeline_id
511 : ))
512 63799 : .await?;
513 : }
514 :
515 8 : Ok(())
516 10 : }
517 :
518 10 : async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
519 10 : debug_assert_current_span_has_tenant_id();
520 10 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
521 : // TODO: make download conditional on ETag having changed since last download
522 : // (https://github.com/neondatabase/neon/issues/6199)
523 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
524 :
525 10 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
526 :
527 10 : let heatmap_bytes = backoff::retry(
528 10 : || async {
529 10 : let download = self
530 10 : .remote_storage
531 10 : .download(&heatmap_path)
532 24 : .await
533 10 : .map_err(UpdateError::from)?;
534 8 : let mut heatmap_bytes = Vec::new();
535 8 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
536 26 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
537 8 : Ok(heatmap_bytes)
538 20 : },
539 10 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
540 10 : FAILED_DOWNLOAD_WARN_THRESHOLD,
541 10 : FAILED_REMOTE_OP_RETRIES,
542 10 : "download heatmap",
543 10 : &self.secondary_state.cancel,
544 10 : )
545 50 : .await
546 10 : .ok_or_else(|| UpdateError::Cancelled)
547 10 : .and_then(|x| x)?;
548 :
549 8 : SECONDARY_MODE.download_heatmap.inc();
550 8 :
551 8 : Ok(heatmap_bytes)
552 10 : }
553 :
554 8 : async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
555 8 : debug_assert_current_span_has_tenant_and_timeline_id();
556 8 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
557 8 : let timeline_path = self
558 8 : .conf
559 8 : .timeline_path(tenant_shard_id, &timeline.timeline_id);
560 8 :
561 8 : // Accumulate updates to the state
562 8 : let mut touched = Vec::new();
563 8 :
564 8 : // Clone a view of what layers already exist on disk
565 8 : let timeline_state = self
566 8 : .secondary_state
567 8 : .detail
568 8 : .lock()
569 8 : .unwrap()
570 8 : .timelines
571 8 : .get(&timeline.timeline_id)
572 8 : .cloned();
573 :
574 8 : let timeline_state = match timeline_state {
575 2 : Some(t) => t,
576 : None => {
577 : // We have no existing state: need to scan local disk for layers first.
578 6 : let timeline_state =
579 33 : init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
580 :
581 : // Re-acquire detail lock now that we're done with async load from local FS
582 6 : self.secondary_state
583 6 : .detail
584 6 : .lock()
585 6 : .unwrap()
586 6 : .timelines
587 6 : .insert(timeline.timeline_id, timeline_state.clone());
588 6 : timeline_state
589 : }
590 : };
591 :
592 8 : let layers_in_heatmap = timeline
593 8 : .layers
594 8 : .iter()
595 2522 : .map(|l| &l.name)
596 8 : .collect::<HashSet<_>>();
597 8 : let layers_on_disk = timeline_state
598 8 : .on_disk_layers
599 8 : .iter()
600 1261 : .map(|l| l.0)
601 8 : .collect::<HashSet<_>>();
602 :
603 : // Remove on-disk layers that are no longer present in heatmap
604 8 : for layer in layers_on_disk.difference(&layers_in_heatmap) {
605 1 : let local_path = timeline_path.join(layer.to_string());
606 1 : tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
607 1 : tokio::fs::remove_file(&local_path)
608 1 : .await
609 1 : .or_else(fs_ext::ignore_not_found)
610 1 : .maybe_fatal_err("Removing secondary layer")?;
611 : }
612 :
613 : // Download heatmap layers that are not present on local disk, or update their
614 : // access time if they are already present.
615 2530 : for layer in timeline.layers {
616 2522 : if self.secondary_state.cancel.is_cancelled() {
617 0 : return Ok(());
618 2522 : }
619 :
620 : // Existing on-disk layers: just update their access time.
621 2522 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
622 0 : tracing::debug!("Layer {} is already on disk", layer.name);
623 1260 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
624 1260 : || on_disk.access_time != layer.access_time
625 : {
626 : // We already have this layer on disk. Update its access time.
627 0 : tracing::debug!(
628 0 : "Access time updated for layer {}: {} -> {}",
629 0 : layer.name,
630 0 : strftime(&on_disk.access_time),
631 0 : strftime(&layer.access_time)
632 0 : );
633 199 : touched.push(layer);
634 1061 : }
635 1260 : continue;
636 : } else {
637 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
638 : }
639 :
640 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
641 : // recently than it was evicted.
642 1262 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
643 0 : if &layer.access_time > evicted_at {
644 0 : tracing::info!(
645 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
646 0 : layer.name,
647 0 : strftime(&layer.access_time),
648 0 : strftime(evicted_at)
649 0 : );
650 : } else {
651 0 : tracing::trace!(
652 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
653 0 : layer.name,
654 0 : strftime(&layer.access_time),
655 0 : strftime(evicted_at)
656 0 : );
657 0 : continue;
658 : }
659 1262 : }
660 :
661 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
662 1262 : let downloaded_bytes = match download_layer_file(
663 1262 : self.conf,
664 1262 : self.remote_storage,
665 1262 : *tenant_shard_id,
666 1262 : timeline.timeline_id,
667 1262 : &layer.name,
668 1262 : &LayerFileMetadata::from(&layer.metadata),
669 1262 : &self.secondary_state.cancel,
670 1262 : )
671 63765 : .await
672 : {
673 1262 : Ok(bytes) => bytes,
674 0 : Err(e) => {
675 0 : if let DownloadError::NotFound = e {
676 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
677 : // This is harmless: continue to download the next layer. It is expected during compaction
678 : // GC.
679 0 : tracing::debug!(
680 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
681 0 : layer.name
682 0 : );
683 0 : continue;
684 : } else {
685 0 : return Err(e.into());
686 : }
687 : }
688 : };
689 :
690 1262 : if downloaded_bytes != layer.metadata.file_size {
691 0 : let local_path = timeline_path.join(layer.name.to_string());
692 :
693 0 : tracing::warn!(
694 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
695 0 : layer.name,
696 0 : downloaded_bytes,
697 0 : layer.metadata.file_size
698 0 : );
699 :
700 0 : tokio::fs::remove_file(&local_path)
701 0 : .await
702 0 : .or_else(fs_ext::ignore_not_found)?;
703 1262 : }
704 :
705 1262 : SECONDARY_MODE.download_layer.inc();
706 1262 : touched.push(layer)
707 : }
708 :
709 : // Write updates to state to record layers we just downloaded or touched.
710 : {
711 8 : let mut detail = self.secondary_state.detail.lock().unwrap();
712 8 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
713 :
714 8 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
715 :
716 1469 : for t in touched {
717 : use std::collections::hash_map::Entry;
718 1461 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
719 199 : Entry::Occupied(mut v) => {
720 199 : v.get_mut().access_time = t.access_time;
721 199 : }
722 1262 : Entry::Vacant(e) => {
723 1262 : e.insert(OnDiskState::new(
724 1262 : self.conf,
725 1262 : tenant_shard_id,
726 1262 : &timeline.timeline_id,
727 1262 : t.name,
728 1262 : LayerFileMetadata::from(&t.metadata),
729 1262 : t.access_time,
730 1262 : ));
731 1262 : }
732 : }
733 : }
734 : }
735 :
736 8 : Ok(())
737 8 : }
738 : }
739 :
740 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
741 6 : async fn init_timeline_state(
742 6 : conf: &'static PageServerConf,
743 6 : tenant_shard_id: &TenantShardId,
744 6 : heatmap: &HeatMapTimeline,
745 6 : ) -> SecondaryDetailTimeline {
746 6 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
747 6 : let mut detail = SecondaryDetailTimeline::default();
748 :
749 6 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
750 2 : Ok(d) => d,
751 4 : Err(e) => {
752 4 : if e.kind() == std::io::ErrorKind::NotFound {
753 4 : let context = format!("Creating timeline directory {timeline_path}");
754 4 : tracing::info!("{}", context);
755 4 : tokio::fs::create_dir_all(&timeline_path)
756 4 : .await
757 4 : .fatal_err(&context);
758 4 :
759 4 : // No entries to report: drop out.
760 4 : return detail;
761 : } else {
762 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
763 : }
764 : }
765 : };
766 :
767 : // As we iterate through layers found on disk, we will look up their metadata from this map.
768 : // Layers not present in metadata will be discarded.
769 2 : let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
770 38 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
771 :
772 40 : while let Some(dentry) = dir
773 40 : .next_entry()
774 0 : .await
775 40 : .fatal_err(&format!("Listing {timeline_path}"))
776 : {
777 38 : let dentry_file_name = dentry.file_name();
778 38 : let file_name = dentry_file_name.to_string_lossy();
779 38 : let local_meta = dentry.metadata().await.fatal_err(&format!(
780 38 : "Read metadata on {}",
781 38 : dentry.path().to_string_lossy()
782 38 : ));
783 38 :
784 38 : // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
785 38 : if file_name == METADATA_FILE_NAME {
786 0 : continue;
787 38 : }
788 38 :
789 38 : match LayerFileName::from_str(&file_name) {
790 38 : Ok(name) => {
791 38 : let remote_meta = heatmap_metadata.get(&name);
792 38 : match remote_meta {
793 38 : Some(remote_meta) => {
794 38 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
795 38 : if local_meta.len() != remote_meta.metadata.file_size {
796 : // This should not happen, because we do crashsafe write-then-rename when downloading
797 : // layers, and layers in remote storage are immutable. Remove the local file because
798 : // we cannot trust it.
799 0 : tracing::warn!(
800 0 : "Removing local layer {name} with unexpected local size {} != {}",
801 0 : local_meta.len(),
802 0 : remote_meta.metadata.file_size
803 0 : );
804 38 : } else {
805 38 : // We expect the access time to be initialized immediately afterwards, when
806 38 : // the latest heatmap is applied to the state.
807 38 : detail.on_disk_layers.insert(
808 38 : name.clone(),
809 38 : OnDiskState::new(
810 38 : conf,
811 38 : tenant_shard_id,
812 38 : &heatmap.timeline_id,
813 38 : name,
814 38 : LayerFileMetadata::from(&remote_meta.metadata),
815 38 : remote_meta.access_time,
816 38 : ),
817 38 : );
818 38 : }
819 : }
820 : None => {
821 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
822 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
823 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
824 0 : tracing::info!(
825 0 : "Removing secondary local layer {} because it's absent in heatmap",
826 0 : name
827 0 : );
828 0 : tokio::fs::remove_file(&dentry.path())
829 0 : .await
830 0 : .or_else(fs_ext::ignore_not_found)
831 0 : .fatal_err(&format!(
832 0 : "Removing layer {}",
833 0 : dentry.path().to_string_lossy()
834 0 : ));
835 : }
836 : }
837 : }
838 : Err(_) => {
839 : // Ignore it.
840 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
841 : }
842 : }
843 : }
844 :
845 2 : detail
846 6 : }
|