TLA Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : metrics::SECONDARY_MODE,
12 : tenant::{
13 : config::SecondaryLocationConfig,
14 : debug_assert_current_span_has_tenant_and_timeline_id,
15 : remote_timeline_client::{
16 : index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
17 : },
18 : span::debug_assert_current_span_has_tenant_id,
19 : storage_layer::LayerFileName,
20 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
21 : },
22 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
23 : METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
24 : };
25 :
26 : use super::{
27 : heatmap::HeatMapLayer,
28 : scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
29 : SecondaryTenant,
30 : };
31 :
32 : use crate::tenant::{
33 : mgr::TenantManager,
34 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
35 : };
36 :
37 : use chrono::format::{DelayedFormat, StrftimeItems};
38 : use futures::Future;
39 : use pageserver_api::shard::TenantShardId;
40 : use rand::Rng;
41 : use remote_storage::{DownloadError, GenericRemoteStorage};
42 :
43 : use tokio_util::sync::CancellationToken;
44 : use tracing::{info_span, instrument, Instrument};
45 : use utils::{
46 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
47 : };
48 :
49 : use super::{
50 : heatmap::{HeatMapTenant, HeatMapTimeline},
51 : CommandRequest, DownloadCommand,
52 : };
53 :
54 : /// For each tenant, how long must have passed since the last download_tenant call before
55 : /// calling it again. This is approximately the time by which local data is allowed
56 : /// to fall behind remote data.
57 : ///
58 : /// TODO: this should just be a default, and the actual period should be controlled
59 : /// via the heatmap itself
60 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
61 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
62 :
63 CBC 557 : pub(super) async fn downloader_task(
64 557 : tenant_manager: Arc<TenantManager>,
65 557 : remote_storage: GenericRemoteStorage,
66 557 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
67 557 : background_jobs_can_start: Barrier,
68 557 : cancel: CancellationToken,
69 557 : ) {
70 557 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
71 557 :
72 557 : let generator = SecondaryDownloader {
73 557 : tenant_manager,
74 557 : remote_storage,
75 557 : };
76 557 : let mut scheduler = Scheduler::new(generator, concurrency);
77 557 :
78 557 : scheduler
79 557 : .run(command_queue, background_jobs_can_start, cancel)
80 557 : .instrument(info_span!("secondary_downloads"))
81 907 : .await
82 159 : }
83 :
84 : struct SecondaryDownloader {
85 : tenant_manager: Arc<TenantManager>,
86 : remote_storage: GenericRemoteStorage,
87 : }
88 :
89 1858 : #[derive(Debug, Clone)]
90 : pub(super) struct OnDiskState {
91 : metadata: LayerFileMetadata,
92 : access_time: SystemTime,
93 : }
94 :
95 : impl OnDiskState {
96 1649 : fn new(
97 1649 : _conf: &'static PageServerConf,
98 1649 : _tenant_shard_id: &TenantShardId,
99 1649 : _imeline_id: &TimelineId,
100 1649 : _ame: LayerFileName,
101 1649 : metadata: LayerFileMetadata,
102 1649 : access_time: SystemTime,
103 1649 : ) -> Self {
104 1649 : Self {
105 1649 : metadata,
106 1649 : access_time,
107 1649 : }
108 1649 : }
109 : }
110 :
111 5 : #[derive(Debug, Clone, Default)]
112 : pub(super) struct SecondaryDetailTimeline {
113 : pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
114 :
115 : /// We remember when layers were evicted, to prevent re-downloading them.
116 : pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
117 : }
118 :
119 : /// This state is written by the secondary downloader, it is opaque
120 : /// to TenantManager
121 UBC 0 : #[derive(Debug)]
122 : pub(super) struct SecondaryDetail {
123 : pub(super) config: SecondaryLocationConfig,
124 :
125 : last_download: Option<Instant>,
126 : next_download: Option<Instant>,
127 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
128 : }
129 :
130 : /// Helper for logging SystemTime
131 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
132 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
133 0 : datetime.format("%d/%m/%Y %T")
134 0 : }
135 :
136 : impl SecondaryDetail {
137 CBC 27 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
138 27 : Self {
139 27 : config,
140 27 : last_download: None,
141 27 : next_download: None,
142 27 : timelines: HashMap::new(),
143 27 : }
144 27 : }
145 : }
146 :
147 : struct PendingDownload {
148 : secondary_state: Arc<SecondaryTenant>,
149 : last_download: Option<Instant>,
150 : target_time: Option<Instant>,
151 : period: Option<Duration>,
152 : }
153 :
154 : impl scheduler::PendingJob for PendingDownload {
155 20 : fn get_tenant_shard_id(&self) -> &TenantShardId {
156 20 : self.secondary_state.get_tenant_shard_id()
157 20 : }
158 : }
159 :
160 : struct RunningDownload {
161 : barrier: Barrier,
162 : }
163 :
164 : impl scheduler::RunningJob for RunningDownload {
165 4 : fn get_barrier(&self) -> Barrier {
166 4 : self.barrier.clone()
167 4 : }
168 : }
169 :
170 : struct CompleteDownload {
171 : secondary_state: Arc<SecondaryTenant>,
172 : completed_at: Instant,
173 : }
174 :
175 : impl scheduler::Completion for CompleteDownload {
176 18 : fn get_tenant_shard_id(&self) -> &TenantShardId {
177 18 : self.secondary_state.get_tenant_shard_id()
178 18 : }
179 : }
180 :
181 : type Scheduler = TenantBackgroundJobs<
182 : SecondaryDownloader,
183 : PendingDownload,
184 : RunningDownload,
185 : CompleteDownload,
186 : DownloadCommand,
187 : >;
188 :
189 : #[async_trait::async_trait]
190 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
191 : for SecondaryDownloader
192 : {
193 6 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
194 : fn on_completion(&mut self, completion: CompleteDownload) {
195 : let CompleteDownload {
196 : secondary_state,
197 : completed_at: _completed_at,
198 : } = completion;
199 :
200 UBC 0 : tracing::debug!("Secondary tenant download completed");
201 :
202 : // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
203 : // take priority to run again.
204 : let mut detail = secondary_state.detail.lock().unwrap();
205 : detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
206 : }
207 :
208 CBC 1109 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
209 1109 : let mut result = SchedulingResult {
210 1109 : jobs: Vec::new(),
211 1109 : want_interval: None,
212 1109 : };
213 1109 :
214 1109 : // Step 1: identify some tenants that we may work on
215 1109 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
216 1109 : self.tenant_manager
217 1109 : .foreach_secondary_tenants(|_id, secondary_state| {
218 11 : tenants.push(secondary_state.clone());
219 1109 : });
220 1109 :
221 1109 : // Step 2: filter out tenants which are not yet elegible to run
222 1109 : let now = Instant::now();
223 1109 : result.jobs = tenants
224 1109 : .into_iter()
225 1109 : .filter_map(|secondary_tenant| {
226 5 : let (last_download, next_download) = {
227 11 : let mut detail = secondary_tenant.detail.lock().unwrap();
228 11 :
229 11 : if !detail.config.warm {
230 : // Downloads are disabled for this tenant
231 6 : detail.next_download = None;
232 6 : return None;
233 5 : }
234 5 :
235 5 : if detail.next_download.is_none() {
236 4 : // Initialize with a jitter: this spreads initial downloads on startup
237 4 : // or mass-attach across our freshen interval.
238 4 : let jittered_period =
239 4 : rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
240 4 : detail.next_download = Some(now.checked_add(jittered_period).expect(
241 4 : "Using our constant, which is known to be small compared with clock range",
242 4 : ));
243 4 : }
244 5 : (detail.last_download, detail.next_download.unwrap())
245 5 : };
246 5 :
247 5 : if now < next_download {
248 5 : Some(PendingDownload {
249 5 : secondary_state: secondary_tenant,
250 5 : last_download,
251 5 : target_time: Some(next_download),
252 5 : period: Some(DOWNLOAD_FRESHEN_INTERVAL),
253 5 : })
254 : } else {
255 UBC 0 : None
256 : }
257 CBC 1109 : })
258 1109 : .collect();
259 1109 :
260 1109 : // Step 3: sort by target execution time to run most urgent first.
261 1109 : result.jobs.sort_by_key(|j| j.target_time);
262 1109 :
263 1109 : result
264 1109 : }
265 :
266 4 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
267 4 : let tenant_shard_id = command.get_tenant_shard_id();
268 4 :
269 4 : let tenant = self
270 4 : .tenant_manager
271 4 : .get_secondary_tenant_shard(*tenant_shard_id);
272 4 : let Some(tenant) = tenant else {
273 : {
274 UBC 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
275 : }
276 : };
277 :
278 CBC 4 : Ok(PendingDownload {
279 4 : target_time: None,
280 4 : period: None,
281 4 : last_download: None,
282 4 : secondary_state: tenant,
283 4 : })
284 4 : }
285 :
286 7 : fn spawn(
287 7 : &mut self,
288 7 : job: PendingDownload,
289 7 : ) -> (
290 7 : RunningDownload,
291 7 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
292 7 : ) {
293 7 : let PendingDownload {
294 7 : secondary_state,
295 7 : last_download,
296 7 : target_time,
297 7 : period,
298 7 : } = job;
299 7 :
300 7 : let (completion, barrier) = utils::completion::channel();
301 7 : let remote_storage = self.remote_storage.clone();
302 7 : let conf = self.tenant_manager.get_conf();
303 7 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
304 7 : (RunningDownload { barrier }, Box::pin(async move {
305 7 : let _completion = completion;
306 7 :
307 7 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
308 7 : .download()
309 18190 : .await
310 : {
311 : Err(UpdateError::NoData) => {
312 1 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
313 : },
314 : Err(UpdateError::NoSpace) => {
315 UBC 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
316 : }
317 : Err(UpdateError::Cancelled) => {
318 0 : tracing::debug!("Shut down while downloading");
319 : },
320 0 : Err(UpdateError::Deserialize(e)) => {
321 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
322 : },
323 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
324 0 : tracing::error!("Error while downloading tenant: {e}");
325 : },
326 CBC 5 : Ok(()) => {}
327 : };
328 :
329 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
330 :
331 : // If the job had a target execution time, we may check our final execution
332 : // time against that for observability purposes.
333 6 : if let (Some(target_time), Some(period)) = (target_time, period) {
334 : // Only track execution lag if this isn't our first download: otherwise, it is expected
335 : // that execution will have taken longer than our configured interval, for example
336 : // when starting up a pageserver and
337 2 : if last_download.is_some() {
338 UBC 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
339 0 : let elapsed = Instant::now().duration_since(target_time);
340 0 :
341 0 : warn_when_period_overrun(
342 0 : elapsed,
343 0 : period,
344 0 : BackgroundLoopKind::SecondaryDownload,
345 0 : );
346 CBC 2 : }
347 4 : }
348 :
349 6 : CompleteDownload {
350 6 : secondary_state,
351 6 : completed_at: Instant::now(),
352 6 : }
353 7 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
354 7 : }
355 : }
356 :
357 : /// This type is a convenience to group together the various functions involved in
358 : /// freshening a secondary tenant.
359 : struct TenantDownloader<'a> {
360 : conf: &'static PageServerConf,
361 : remote_storage: &'a GenericRemoteStorage,
362 : secondary_state: &'a SecondaryTenant,
363 : }
364 :
365 : /// Errors that may be encountered while updating a tenant
366 UBC 0 : #[derive(thiserror::Error, Debug)]
367 : enum UpdateError {
368 : #[error("No remote data found")]
369 : NoData,
370 : #[error("Insufficient local storage space")]
371 : NoSpace,
372 : #[error("Failed to download")]
373 : DownloadError(DownloadError),
374 : #[error(transparent)]
375 : Deserialize(#[from] serde_json::Error),
376 : #[error("Cancelled")]
377 : Cancelled,
378 : #[error(transparent)]
379 : Other(#[from] anyhow::Error),
380 : }
381 :
382 : impl From<DownloadError> for UpdateError {
383 CBC 1 : fn from(value: DownloadError) -> Self {
384 1 : match &value {
385 UBC 0 : DownloadError::Cancelled => Self::Cancelled,
386 CBC 1 : DownloadError::NotFound => Self::NoData,
387 UBC 0 : _ => Self::DownloadError(value),
388 : }
389 CBC 1 : }
390 : }
391 :
392 : impl From<std::io::Error> for UpdateError {
393 UBC 0 : fn from(value: std::io::Error) -> Self {
394 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
395 0 : UpdateError::NoSpace
396 : } else {
397 : // An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
398 0 : UpdateError::Other(anyhow::anyhow!(value))
399 : }
400 0 : }
401 : }
402 :
403 : impl<'a> TenantDownloader<'a> {
404 CBC 7 : fn new(
405 7 : conf: &'static PageServerConf,
406 7 : remote_storage: &'a GenericRemoteStorage,
407 7 : secondary_state: &'a SecondaryTenant,
408 7 : ) -> Self {
409 7 : Self {
410 7 : conf,
411 7 : remote_storage,
412 7 : secondary_state,
413 7 : }
414 7 : }
415 :
416 7 : async fn download(&self) -> Result<(), UpdateError> {
417 7 : debug_assert_current_span_has_tenant_id();
418 :
419 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
420 : // cover our access to local storage.
421 7 : let Ok(_guard) = self.secondary_state.gate.enter() else {
422 : // Shutting down
423 UBC 0 : return Ok(());
424 : };
425 :
426 CBC 7 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
427 : // Download the tenant's heatmap
428 7 : let heatmap_bytes = tokio::select!(
429 7 : bytes = self.download_heatmap() => {bytes?},
430 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
431 : );
432 :
433 6 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
434 :
435 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
436 : // layer metadata without having to re-download it.
437 6 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
438 6 :
439 6 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
440 6 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
441 6 : let heatmap_path_bg = heatmap_path.clone();
442 6 : tokio::task::spawn_blocking(move || {
443 6 : tokio::runtime::Handle::current().block_on(async move {
444 6 : VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
445 6 : })
446 6 : })
447 6 : .await
448 6 : .expect("Blocking task is never aborted")
449 6 : .maybe_fatal_err(&context_msg)?;
450 :
451 UBC 0 : tracing::debug!("Wrote local heatmap to {}", heatmap_path);
452 :
453 : // Download the layers in the heatmap
454 CBC 11 : for timeline in heatmap.timelines {
455 6 : if self.secondary_state.cancel.is_cancelled() {
456 UBC 0 : return Ok(());
457 CBC 6 : }
458 6 :
459 6 : let timeline_id = timeline.timeline_id;
460 6 : self.download_timeline(timeline)
461 : .instrument(tracing::info_span!(
462 : "secondary_download_timeline",
463 : tenant_id=%tenant_shard_id.tenant_id,
464 6 : shard_id=%tenant_shard_id.shard_slug(),
465 : %timeline_id
466 : ))
467 18136 : .await?;
468 : }
469 :
470 5 : Ok(())
471 6 : }
472 :
473 7 : async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
474 7 : debug_assert_current_span_has_tenant_id();
475 7 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
476 : // TODO: make download conditional on ETag having changed since last download
477 : // (https://github.com/neondatabase/neon/issues/6199)
478 UBC 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
479 :
480 CBC 7 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
481 :
482 7 : let heatmap_bytes = backoff::retry(
483 7 : || async {
484 7 : let download = self
485 7 : .remote_storage
486 7 : .download(&heatmap_path)
487 22 : .await
488 7 : .map_err(UpdateError::from)?;
489 6 : let mut heatmap_bytes = Vec::new();
490 6 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
491 26 : let _size = tokio::io::copy(&mut body, &mut heatmap_bytes).await?;
492 6 : Ok(heatmap_bytes)
493 7 : },
494 7 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
495 7 : FAILED_DOWNLOAD_WARN_THRESHOLD,
496 7 : FAILED_REMOTE_OP_RETRIES,
497 7 : "download heatmap",
498 7 : backoff::Cancel::new(self.secondary_state.cancel.clone(), || {
499 UBC 0 : UpdateError::Cancelled
500 CBC 7 : }),
501 7 : )
502 48 : .await?;
503 :
504 6 : SECONDARY_MODE.download_heatmap.inc();
505 6 :
506 6 : Ok(heatmap_bytes)
507 7 : }
508 :
509 6 : async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
510 6 : debug_assert_current_span_has_tenant_and_timeline_id();
511 6 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
512 6 : let timeline_path = self
513 6 : .conf
514 6 : .timeline_path(tenant_shard_id, &timeline.timeline_id);
515 6 :
516 6 : // Accumulate updates to the state
517 6 : let mut touched = Vec::new();
518 6 :
519 6 : // Clone a view of what layers already exist on disk
520 6 : let timeline_state = self
521 6 : .secondary_state
522 6 : .detail
523 6 : .lock()
524 6 : .unwrap()
525 6 : .timelines
526 6 : .get(&timeline.timeline_id)
527 6 : .cloned();
528 :
529 6 : let timeline_state = match timeline_state {
530 3 : Some(t) => t,
531 : None => {
532 : // We have no existing state: need to scan local disk for layers first.
533 2 : let timeline_state =
534 422 : init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
535 :
536 : // Re-acquire detail lock now that we're done with async load from local FS
537 2 : self.secondary_state
538 2 : .detail
539 2 : .lock()
540 2 : .unwrap()
541 2 : .timelines
542 2 : .insert(timeline.timeline_id, timeline_state.clone());
543 2 : timeline_state
544 : }
545 : };
546 :
547 5 : let layers_in_heatmap = timeline
548 5 : .layers
549 5 : .iter()
550 3080 : .map(|l| &l.name)
551 5 : .collect::<HashSet<_>>();
552 5 : let layers_on_disk = timeline_state
553 5 : .on_disk_layers
554 5 : .iter()
555 1858 : .map(|l| l.0)
556 5 : .collect::<HashSet<_>>();
557 :
558 : // Remove on-disk layers that are no longer present in heatmap
559 5 : for layer in layers_on_disk.difference(&layers_in_heatmap) {
560 2 : let local_path = timeline_path.join(layer.to_string());
561 2 : tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
562 2 : tokio::fs::remove_file(&local_path)
563 2 : .await
564 2 : .or_else(fs_ext::ignore_not_found)
565 2 : .maybe_fatal_err("Removing secondary layer")?;
566 : }
567 :
568 : // Download heatmap layers that are not present on local disk, or update their
569 : // access time if they are already present.
570 3085 : for layer in timeline.layers {
571 3080 : if self.secondary_state.cancel.is_cancelled() {
572 UBC 0 : return Ok(());
573 CBC 3080 : }
574 :
575 : // Existing on-disk layers: just update their access time.
576 3080 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
577 UBC 0 : tracing::debug!("Layer {} is already on disk", layer.name);
578 CBC 1856 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
579 1856 : || on_disk.access_time != layer.access_time
580 : {
581 : // We already have this layer on disk. Update its access time.
582 UBC 0 : tracing::debug!(
583 0 : "Access time updated for layer {}: {} -> {}",
584 0 : layer.name,
585 0 : strftime(&on_disk.access_time),
586 0 : strftime(&layer.access_time)
587 0 : );
588 CBC 200 : touched.push(layer);
589 1656 : }
590 1856 : continue;
591 : } else {
592 UBC 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
593 : }
594 :
595 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
596 : // recently than it was evicted.
597 CBC 1224 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
598 UBC 0 : if &layer.access_time > evicted_at {
599 0 : tracing::info!(
600 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
601 0 : layer.name,
602 0 : strftime(&layer.access_time),
603 0 : strftime(evicted_at)
604 0 : );
605 : } else {
606 0 : tracing::trace!(
607 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
608 0 : layer.name,
609 0 : strftime(&layer.access_time),
610 0 : strftime(evicted_at)
611 0 : );
612 0 : continue;
613 : }
614 CBC 1224 : }
615 :
616 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
617 1224 : let downloaded_bytes = match download_layer_file(
618 1224 : self.conf,
619 1224 : self.remote_storage,
620 1224 : *tenant_shard_id,
621 1224 : timeline.timeline_id,
622 1224 : &layer.name,
623 1224 : &LayerFileMetadata::from(&layer.metadata),
624 1224 : &self.secondary_state.cancel,
625 1224 : )
626 17712 : .await
627 : {
628 1224 : Ok(bytes) => bytes,
629 UBC 0 : Err(e) => {
630 0 : if let DownloadError::NotFound = e {
631 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
632 : // This is harmless: continue to download the next layer. It is expected during compaction
633 : // GC.
634 0 : tracing::debug!(
635 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
636 0 : layer.name
637 0 : );
638 0 : continue;
639 : } else {
640 0 : return Err(e.into());
641 : }
642 : }
643 : };
644 :
645 CBC 1224 : if downloaded_bytes != layer.metadata.file_size {
646 UBC 0 : let local_path = timeline_path.join(layer.name.to_string());
647 :
648 0 : tracing::warn!(
649 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
650 0 : layer.name,
651 0 : downloaded_bytes,
652 0 : layer.metadata.file_size
653 0 : );
654 :
655 0 : tokio::fs::remove_file(&local_path)
656 0 : .await
657 0 : .or_else(fs_ext::ignore_not_found)?;
658 CBC 1224 : }
659 :
660 1224 : SECONDARY_MODE.download_layer.inc();
661 1224 : touched.push(layer)
662 : }
663 :
664 : // Write updates to state to record layers we just downloaded or touched.
665 : {
666 5 : let mut detail = self.secondary_state.detail.lock().unwrap();
667 5 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
668 :
669 5 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
670 :
671 1429 : for t in touched {
672 : use std::collections::hash_map::Entry;
673 1424 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
674 200 : Entry::Occupied(mut v) => {
675 200 : v.get_mut().access_time = t.access_time;
676 200 : }
677 1224 : Entry::Vacant(e) => {
678 1224 : e.insert(OnDiskState::new(
679 1224 : self.conf,
680 1224 : tenant_shard_id,
681 1224 : &timeline.timeline_id,
682 1224 : t.name,
683 1224 : LayerFileMetadata::from(&t.metadata),
684 1224 : t.access_time,
685 1224 : ));
686 1224 : }
687 : }
688 : }
689 : }
690 :
691 5 : Ok(())
692 5 : }
693 : }
694 :
695 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
696 3 : async fn init_timeline_state(
697 3 : conf: &'static PageServerConf,
698 3 : tenant_shard_id: &TenantShardId,
699 3 : heatmap: &HeatMapTimeline,
700 3 : ) -> SecondaryDetailTimeline {
701 3 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
702 3 : let mut detail = SecondaryDetailTimeline::default();
703 :
704 3 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
705 GBC 1 : Ok(d) => d,
706 CBC 2 : Err(e) => {
707 2 : if e.kind() == std::io::ErrorKind::NotFound {
708 2 : let context = format!("Creating timeline directory {timeline_path}");
709 2 : tracing::info!("{}", context);
710 2 : tokio::fs::create_dir_all(&timeline_path)
711 2 : .await
712 2 : .fatal_err(&context);
713 2 :
714 2 : // No entries to report: drop out.
715 2 : return detail;
716 : } else {
717 UBC 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
718 : }
719 : }
720 : };
721 :
722 : // As we iterate through layers found on disk, we will look up their metadata from this map.
723 : // Layers not present in metadata will be discarded.
724 GBC 1 : let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
725 589 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
726 :
727 423 : while let Some(dentry) = dir
728 423 : .next_entry()
729 13 : .await
730 423 : .fatal_err(&format!("Listing {timeline_path}"))
731 : {
732 423 : let dentry_file_name = dentry.file_name();
733 423 : let file_name = dentry_file_name.to_string_lossy();
734 423 : let local_meta = dentry.metadata().await.fatal_err(&format!(
735 423 : "Read metadata on {}",
736 423 : dentry.path().to_string_lossy()
737 423 : ));
738 423 :
739 423 : // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
740 423 : if file_name == METADATA_FILE_NAME {
741 1 : continue;
742 422 : }
743 422 :
744 422 : match LayerFileName::from_str(&file_name) {
745 422 : Ok(name) => {
746 422 : let remote_meta = heatmap_metadata.get(&name);
747 422 : match remote_meta {
748 421 : Some(remote_meta) => {
749 421 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
750 421 : if local_meta.len() != remote_meta.metadata.file_size {
751 : // This should not happen, because we do crashsafe write-then-rename when downloading
752 : // layers, and layers in remote storage are immutable. Remove the local file because
753 : // we cannot trust it.
754 UBC 0 : tracing::warn!(
755 0 : "Removing local layer {name} with unexpected local size {} != {}",
756 0 : local_meta.len(),
757 0 : remote_meta.metadata.file_size
758 0 : );
759 GBC 421 : } else {
760 421 : // We expect the access time to be initialized immediately afterwards, when
761 421 : // the latest heatmap is applied to the state.
762 421 : detail.on_disk_layers.insert(
763 421 : name.clone(),
764 421 : OnDiskState::new(
765 421 : conf,
766 421 : tenant_shard_id,
767 421 : &heatmap.timeline_id,
768 421 : name,
769 421 : LayerFileMetadata::from(&remote_meta.metadata),
770 421 : remote_meta.access_time,
771 421 : ),
772 421 : );
773 421 : }
774 : }
775 : None => {
776 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
777 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
778 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
779 UBC 0 : tracing::info!(
780 0 : "Removing secondary local layer {} because it's absent in heatmap",
781 0 : name
782 0 : );
783 0 : tokio::fs::remove_file(&dentry.path())
784 0 : .await
785 0 : .or_else(fs_ext::ignore_not_found)
786 0 : .fatal_err(&format!(
787 0 : "Removing layer {}",
788 0 : dentry.path().to_string_lossy()
789 0 : ));
790 : }
791 : }
792 : }
793 : Err(_) => {
794 : // Ignore it.
795 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
796 : }
797 : }
798 : }
799 :
800 0 : detail
801 CBC 2 : }
|