Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : disk_usage_eviction_task::{
12 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
13 : },
14 : metrics::SECONDARY_MODE,
15 : tenant::{
16 : config::SecondaryLocationConfig,
17 : debug_assert_current_span_has_tenant_and_timeline_id,
18 : ephemeral_file::is_ephemeral_file,
19 : remote_timeline_client::{
20 : index::LayerFileMetadata, is_temp_download_file, FAILED_DOWNLOAD_WARN_THRESHOLD,
21 : FAILED_REMOTE_OP_RETRIES,
22 : },
23 : span::debug_assert_current_span_has_tenant_id,
24 : storage_layer::LayerFileName,
25 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
26 : },
27 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
28 : METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
29 : };
30 :
31 : use super::{
32 : heatmap::HeatMapLayer,
33 : scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
34 : SecondaryTenant,
35 : };
36 :
37 : use crate::tenant::{
38 : mgr::TenantManager,
39 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
40 : };
41 :
42 : use camino::Utf8PathBuf;
43 : use chrono::format::{DelayedFormat, StrftimeItems};
44 : use futures::Future;
45 : use pageserver_api::models::SecondaryProgress;
46 : use pageserver_api::shard::TenantShardId;
47 : use rand::Rng;
48 : use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
49 :
50 : use tokio_util::sync::CancellationToken;
51 : use tracing::{info_span, instrument, warn, Instrument};
52 : use utils::{
53 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext,
54 : id::TimelineId, serde_system_time,
55 : };
56 :
57 : use super::{
58 : heatmap::{HeatMapTenant, HeatMapTimeline},
59 : CommandRequest, DownloadCommand,
60 : };
61 :
62 : /// For each tenant, how long must have passed since the last download_tenant call before
63 : /// calling it again. This is approximately the time by which local data is allowed
64 : /// to fall behind remote data.
65 : ///
66 : /// TODO: this should just be a default, and the actual period should be controlled
67 : /// via the heatmap itself
68 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
69 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
70 :
71 0 : pub(super) async fn downloader_task(
72 0 : tenant_manager: Arc<TenantManager>,
73 0 : remote_storage: GenericRemoteStorage,
74 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
75 0 : background_jobs_can_start: Barrier,
76 0 : cancel: CancellationToken,
77 0 : ) {
78 0 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
79 0 :
80 0 : let generator = SecondaryDownloader {
81 0 : tenant_manager,
82 0 : remote_storage,
83 0 : };
84 0 : let mut scheduler = Scheduler::new(generator, concurrency);
85 0 :
86 0 : scheduler
87 0 : .run(command_queue, background_jobs_can_start, cancel)
88 0 : .instrument(info_span!("secondary_downloads"))
89 0 : .await
90 0 : }
91 :
92 : struct SecondaryDownloader {
93 : tenant_manager: Arc<TenantManager>,
94 : remote_storage: GenericRemoteStorage,
95 : }
96 :
97 : #[derive(Debug, Clone)]
98 : pub(super) struct OnDiskState {
99 : metadata: LayerFileMetadata,
100 : access_time: SystemTime,
101 : }
102 :
103 : impl OnDiskState {
104 0 : fn new(
105 0 : _conf: &'static PageServerConf,
106 0 : _tenant_shard_id: &TenantShardId,
107 0 : _imeline_id: &TimelineId,
108 0 : _ame: LayerFileName,
109 0 : metadata: LayerFileMetadata,
110 0 : access_time: SystemTime,
111 0 : ) -> Self {
112 0 : Self {
113 0 : metadata,
114 0 : access_time,
115 0 : }
116 0 : }
117 : }
118 :
119 : #[derive(Debug, Clone, Default)]
120 : pub(super) struct SecondaryDetailTimeline {
121 : pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
122 :
123 : /// We remember when layers were evicted, to prevent re-downloading them.
124 : pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
125 : }
126 :
127 : /// This state is written by the secondary downloader, it is opaque
128 : /// to TenantManager
129 : #[derive(Debug)]
130 : pub(super) struct SecondaryDetail {
131 : pub(super) config: SecondaryLocationConfig,
132 :
133 : last_download: Option<Instant>,
134 : last_etag: Option<Etag>,
135 : next_download: Option<Instant>,
136 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
137 : }
138 :
139 : /// Helper for logging SystemTime
140 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
141 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
142 0 : datetime.format("%d/%m/%Y %T")
143 0 : }
144 :
145 : /// Information returned from download function when it detects the heatmap has changed
146 : struct HeatMapModified {
147 : etag: Etag,
148 : last_modified: SystemTime,
149 : bytes: Vec<u8>,
150 : }
151 :
152 : enum HeatMapDownload {
153 : // The heatmap's etag has changed: return the new etag, mtime and the body bytes
154 : Modified(HeatMapModified),
155 : // The heatmap's etag is unchanged
156 : Unmodified,
157 : }
158 :
159 : impl SecondaryDetail {
160 0 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
161 0 : Self {
162 0 : config,
163 0 : last_download: None,
164 0 : last_etag: None,
165 0 : next_download: None,
166 0 : timelines: HashMap::new(),
167 0 : }
168 0 : }
169 :
170 : /// Additionally returns the total number of layers, used for more stable relative access time
171 : /// based eviction.
172 0 : pub(super) fn get_layers_for_eviction(
173 0 : &self,
174 0 : parent: &Arc<SecondaryTenant>,
175 0 : ) -> (DiskUsageEvictionInfo, usize) {
176 0 : let mut result = DiskUsageEvictionInfo::default();
177 0 : let mut total_layers = 0;
178 :
179 0 : for (timeline_id, timeline_detail) in &self.timelines {
180 0 : result
181 0 : .resident_layers
182 0 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
183 0 : EvictionCandidate {
184 0 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
185 0 : secondary_tenant: parent.clone(),
186 0 : timeline_id: *timeline_id,
187 0 : name: name.clone(),
188 0 : metadata: ods.metadata.clone(),
189 0 : }),
190 0 : last_activity_ts: ods.access_time,
191 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
192 0 : }
193 0 : }));
194 0 :
195 0 : // total might be missing currently downloading layers, but as a lower than actual
196 0 : // value it is good enough approximation.
197 0 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
198 0 : }
199 0 : result.max_layer_size = result
200 0 : .resident_layers
201 0 : .iter()
202 0 : .map(|l| l.layer.get_file_size())
203 0 : .max();
204 0 :
205 0 : tracing::debug!(
206 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
207 0 : parent.get_tenant_shard_id(),
208 0 : self.timelines.len(),
209 0 : result.resident_layers.len()
210 0 : );
211 :
212 0 : (result, total_layers)
213 0 : }
214 : }
215 :
216 : struct PendingDownload {
217 : secondary_state: Arc<SecondaryTenant>,
218 : last_download: Option<Instant>,
219 : target_time: Option<Instant>,
220 : period: Option<Duration>,
221 : }
222 :
223 : impl scheduler::PendingJob for PendingDownload {
224 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
225 0 : self.secondary_state.get_tenant_shard_id()
226 0 : }
227 : }
228 :
229 : struct RunningDownload {
230 : barrier: Barrier,
231 : }
232 :
233 : impl scheduler::RunningJob for RunningDownload {
234 0 : fn get_barrier(&self) -> Barrier {
235 0 : self.barrier.clone()
236 0 : }
237 : }
238 :
239 : struct CompleteDownload {
240 : secondary_state: Arc<SecondaryTenant>,
241 : completed_at: Instant,
242 : }
243 :
244 : impl scheduler::Completion for CompleteDownload {
245 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
246 0 : self.secondary_state.get_tenant_shard_id()
247 0 : }
248 : }
249 :
250 : type Scheduler = TenantBackgroundJobs<
251 : SecondaryDownloader,
252 : PendingDownload,
253 : RunningDownload,
254 : CompleteDownload,
255 : DownloadCommand,
256 : >;
257 :
258 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
259 : for SecondaryDownloader
260 : {
261 0 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
262 : fn on_completion(&mut self, completion: CompleteDownload) {
263 : let CompleteDownload {
264 : secondary_state,
265 : completed_at: _completed_at,
266 : } = completion;
267 :
268 0 : tracing::debug!("Secondary tenant download completed");
269 :
270 : // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
271 : // take priority to run again.
272 : let mut detail = secondary_state.detail.lock().unwrap();
273 : detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
274 : }
275 :
276 0 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
277 0 : let mut result = SchedulingResult {
278 0 : jobs: Vec::new(),
279 0 : want_interval: None,
280 0 : };
281 0 :
282 0 : // Step 1: identify some tenants that we may work on
283 0 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
284 0 : self.tenant_manager
285 0 : .foreach_secondary_tenants(|_id, secondary_state| {
286 0 : tenants.push(secondary_state.clone());
287 0 : });
288 0 :
289 0 : // Step 2: filter out tenants which are not yet elegible to run
290 0 : let now = Instant::now();
291 0 : result.jobs = tenants
292 0 : .into_iter()
293 0 : .filter_map(|secondary_tenant| {
294 0 : let (last_download, next_download) = {
295 0 : let mut detail = secondary_tenant.detail.lock().unwrap();
296 0 :
297 0 : if !detail.config.warm {
298 : // Downloads are disabled for this tenant
299 0 : detail.next_download = None;
300 0 : return None;
301 0 : }
302 0 :
303 0 : if detail.next_download.is_none() {
304 0 : // Initialize with a jitter: this spreads initial downloads on startup
305 0 : // or mass-attach across our freshen interval.
306 0 : let jittered_period =
307 0 : rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
308 0 : detail.next_download = Some(now.checked_add(jittered_period).expect(
309 0 : "Using our constant, which is known to be small compared with clock range",
310 0 : ));
311 0 : }
312 0 : (detail.last_download, detail.next_download.unwrap())
313 0 : };
314 0 :
315 0 : if now > next_download {
316 0 : Some(PendingDownload {
317 0 : secondary_state: secondary_tenant,
318 0 : last_download,
319 0 : target_time: Some(next_download),
320 0 : period: Some(DOWNLOAD_FRESHEN_INTERVAL),
321 0 : })
322 : } else {
323 0 : None
324 : }
325 0 : })
326 0 : .collect();
327 0 :
328 0 : // Step 3: sort by target execution time to run most urgent first.
329 0 : result.jobs.sort_by_key(|j| j.target_time);
330 0 :
331 0 : result
332 0 : }
333 :
334 0 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
335 0 : let tenant_shard_id = command.get_tenant_shard_id();
336 0 :
337 0 : let tenant = self
338 0 : .tenant_manager
339 0 : .get_secondary_tenant_shard(*tenant_shard_id);
340 0 : let Some(tenant) = tenant else {
341 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
342 : };
343 :
344 0 : Ok(PendingDownload {
345 0 : target_time: None,
346 0 : period: None,
347 0 : last_download: None,
348 0 : secondary_state: tenant,
349 0 : })
350 0 : }
351 :
352 0 : fn spawn(
353 0 : &mut self,
354 0 : job: PendingDownload,
355 0 : ) -> (
356 0 : RunningDownload,
357 0 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
358 0 : ) {
359 0 : let PendingDownload {
360 0 : secondary_state,
361 0 : last_download,
362 0 : target_time,
363 0 : period,
364 0 : } = job;
365 0 :
366 0 : let (completion, barrier) = utils::completion::channel();
367 0 : let remote_storage = self.remote_storage.clone();
368 0 : let conf = self.tenant_manager.get_conf();
369 0 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
370 0 : (RunningDownload { barrier }, Box::pin(async move {
371 0 : let _completion = completion;
372 0 :
373 0 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
374 0 : .download()
375 0 : .await
376 : {
377 : Err(UpdateError::NoData) => {
378 0 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
379 : },
380 : Err(UpdateError::NoSpace) => {
381 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
382 : }
383 : Err(UpdateError::Cancelled) => {
384 0 : tracing::debug!("Shut down while downloading");
385 : },
386 0 : Err(UpdateError::Deserialize(e)) => {
387 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
388 : },
389 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
390 0 : tracing::error!("Error while downloading tenant: {e}");
391 : },
392 0 : Ok(()) => {}
393 : };
394 :
395 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
396 :
397 : // If the job had a target execution time, we may check our final execution
398 : // time against that for observability purposes.
399 0 : if let (Some(target_time), Some(period)) = (target_time, period) {
400 : // Only track execution lag if this isn't our first download: otherwise, it is expected
401 : // that execution will have taken longer than our configured interval, for example
402 : // when starting up a pageserver and
403 0 : if last_download.is_some() {
404 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
405 0 : let elapsed = Instant::now().duration_since(target_time);
406 0 :
407 0 : warn_when_period_overrun(
408 0 : elapsed,
409 0 : period,
410 0 : BackgroundLoopKind::SecondaryDownload,
411 0 : );
412 0 : }
413 0 : }
414 :
415 0 : CompleteDownload {
416 0 : secondary_state,
417 0 : completed_at: Instant::now(),
418 0 : }
419 0 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
420 0 : }
421 : }
422 :
423 : /// This type is a convenience to group together the various functions involved in
424 : /// freshening a secondary tenant.
425 : struct TenantDownloader<'a> {
426 : conf: &'static PageServerConf,
427 : remote_storage: &'a GenericRemoteStorage,
428 : secondary_state: &'a SecondaryTenant,
429 : }
430 :
431 : /// Errors that may be encountered while updating a tenant
432 0 : #[derive(thiserror::Error, Debug)]
433 : enum UpdateError {
434 : #[error("No remote data found")]
435 : NoData,
436 : #[error("Insufficient local storage space")]
437 : NoSpace,
438 : #[error("Failed to download")]
439 : DownloadError(DownloadError),
440 : #[error(transparent)]
441 : Deserialize(#[from] serde_json::Error),
442 : #[error("Cancelled")]
443 : Cancelled,
444 : #[error(transparent)]
445 : Other(#[from] anyhow::Error),
446 : }
447 :
448 : impl From<DownloadError> for UpdateError {
449 0 : fn from(value: DownloadError) -> Self {
450 0 : match &value {
451 0 : DownloadError::Cancelled => Self::Cancelled,
452 0 : DownloadError::NotFound => Self::NoData,
453 0 : _ => Self::DownloadError(value),
454 : }
455 0 : }
456 : }
457 :
458 : impl From<std::io::Error> for UpdateError {
459 0 : fn from(value: std::io::Error) -> Self {
460 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
461 0 : UpdateError::NoSpace
462 0 : } else if value
463 0 : .get_ref()
464 0 : .and_then(|x| x.downcast_ref::<DownloadError>())
465 0 : .is_some()
466 : {
467 0 : UpdateError::from(DownloadError::from(value))
468 : } else {
469 : // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
470 0 : UpdateError::Other(anyhow::anyhow!(value))
471 : }
472 0 : }
473 : }
474 :
475 : impl<'a> TenantDownloader<'a> {
476 0 : fn new(
477 0 : conf: &'static PageServerConf,
478 0 : remote_storage: &'a GenericRemoteStorage,
479 0 : secondary_state: &'a SecondaryTenant,
480 0 : ) -> Self {
481 0 : Self {
482 0 : conf,
483 0 : remote_storage,
484 0 : secondary_state,
485 0 : }
486 0 : }
487 :
488 0 : async fn download(&self) -> Result<(), UpdateError> {
489 0 : debug_assert_current_span_has_tenant_id();
490 :
491 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
492 : // cover our access to local storage.
493 0 : let Ok(_guard) = self.secondary_state.gate.enter() else {
494 : // Shutting down
495 0 : return Ok(());
496 : };
497 :
498 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
499 0 :
500 0 : // We will use the etag from last successful download to make the download conditional on changes
501 0 : let last_etag = self
502 0 : .secondary_state
503 0 : .detail
504 0 : .lock()
505 0 : .unwrap()
506 0 : .last_etag
507 0 : .clone();
508 :
509 : // Download the tenant's heatmap
510 : let HeatMapModified {
511 0 : last_modified: heatmap_mtime,
512 0 : etag: heatmap_etag,
513 0 : bytes: heatmap_bytes,
514 0 : } = match tokio::select!(
515 0 : bytes = self.download_heatmap(last_etag.as_ref()) => {bytes?},
516 0 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
517 0 : ) {
518 : HeatMapDownload::Unmodified => {
519 0 : tracing::info!("Heatmap unchanged since last successful download");
520 0 : return Ok(());
521 : }
522 0 : HeatMapDownload::Modified(m) => m,
523 : };
524 :
525 0 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
526 :
527 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
528 : // layer metadata without having to re-download it.
529 0 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
530 0 :
531 0 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
532 0 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
533 0 : let heatmap_path_bg = heatmap_path.clone();
534 0 : VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
535 0 : .await
536 0 : .maybe_fatal_err(&context_msg)?;
537 :
538 0 : tracing::debug!(
539 0 : "Wrote local heatmap to {}, with {} timelines",
540 0 : heatmap_path,
541 0 : heatmap.timelines.len()
542 0 : );
543 :
544 : // Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general
545 : // principle that deletions should be done before writes wherever possible, and so that we can use this
546 : // phase to initialize our SecondaryProgress.
547 0 : {
548 0 : *self.secondary_state.progress.lock().unwrap() =
549 0 : self.prepare_timelines(&heatmap, heatmap_mtime).await?;
550 : }
551 :
552 : // Download the layers in the heatmap
553 0 : for timeline in heatmap.timelines {
554 0 : if self.secondary_state.cancel.is_cancelled() {
555 0 : tracing::debug!(
556 0 : "Cancelled before downloading timeline {}",
557 0 : timeline.timeline_id
558 0 : );
559 0 : return Ok(());
560 0 : }
561 0 :
562 0 : let timeline_id = timeline.timeline_id;
563 0 : self.download_timeline(timeline)
564 0 : .instrument(tracing::info_span!(
565 : "secondary_download_timeline",
566 : tenant_id=%tenant_shard_id.tenant_id,
567 0 : shard_id=%tenant_shard_id.shard_slug(),
568 : %timeline_id
569 : ))
570 0 : .await?;
571 : }
572 :
573 : // Only update last_etag after a full successful download: this way will not skip
574 : // the next download, even if the heatmap's actual etag is unchanged.
575 0 : self.secondary_state.detail.lock().unwrap().last_etag = Some(heatmap_etag);
576 0 :
577 0 : Ok(())
578 0 : }
579 :
580 : /// Do any fast local cleanup that comes before the much slower process of downloading
581 : /// layers from remote storage. In the process, initialize the SecondaryProgress object
582 : /// that will later be updated incrementally as we download layers.
583 0 : async fn prepare_timelines(
584 0 : &self,
585 0 : heatmap: &HeatMapTenant,
586 0 : heatmap_mtime: SystemTime,
587 0 : ) -> Result<SecondaryProgress, UpdateError> {
588 0 : let heatmap_stats = heatmap.get_stats();
589 0 : // We will construct a progress object, and then populate its initial "downloaded" numbers
590 0 : // while iterating through local layer state in [`Self::prepare_timelines`]
591 0 : let mut progress = SecondaryProgress {
592 0 : layers_total: heatmap_stats.layers,
593 0 : bytes_total: heatmap_stats.bytes,
594 0 : heatmap_mtime: Some(serde_system_time::SystemTime(heatmap_mtime)),
595 0 : layers_downloaded: 0,
596 0 : bytes_downloaded: 0,
597 0 : };
598 0 : // Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
599 0 : let mut delete_layers = Vec::new();
600 0 : let mut delete_timelines = Vec::new();
601 0 : {
602 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
603 0 : for (timeline_id, timeline_state) in &mut detail.timelines {
604 0 : let Some(heatmap_timeline_index) = heatmap
605 0 : .timelines
606 0 : .iter()
607 0 : .position(|t| t.timeline_id == *timeline_id)
608 : else {
609 : // This timeline is no longer referenced in the heatmap: delete it locally
610 0 : delete_timelines.push(*timeline_id);
611 0 : continue;
612 : };
613 :
614 0 : let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
615 0 :
616 0 : let layers_in_heatmap = heatmap_timeline
617 0 : .layers
618 0 : .iter()
619 0 : .map(|l| &l.name)
620 0 : .collect::<HashSet<_>>();
621 0 : let layers_on_disk = timeline_state
622 0 : .on_disk_layers
623 0 : .iter()
624 0 : .map(|l| l.0)
625 0 : .collect::<HashSet<_>>();
626 0 :
627 0 : let mut layer_count = layers_on_disk.len();
628 0 : let mut layer_byte_count: u64 = timeline_state
629 0 : .on_disk_layers
630 0 : .values()
631 0 : .map(|l| l.metadata.file_size())
632 0 : .sum();
633 :
634 : // Remove on-disk layers that are no longer present in heatmap
635 0 : for layer in layers_on_disk.difference(&layers_in_heatmap) {
636 0 : layer_count -= 1;
637 0 : layer_byte_count -= timeline_state
638 0 : .on_disk_layers
639 0 : .get(layer)
640 0 : .unwrap()
641 0 : .metadata
642 0 : .file_size();
643 0 :
644 0 : delete_layers.push((*timeline_id, (*layer).clone()));
645 0 : }
646 :
647 0 : progress.bytes_downloaded += layer_byte_count;
648 0 : progress.layers_downloaded += layer_count;
649 : }
650 :
651 0 : for delete_timeline in &delete_timelines {
652 0 : // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
653 0 : // from disk fails that will be a fatal error.
654 0 : detail.timelines.remove(delete_timeline);
655 0 : }
656 : }
657 :
658 : // Execute accumulated deletions
659 0 : for (timeline_id, layer_name) in delete_layers {
660 0 : let timeline_path = self
661 0 : .conf
662 0 : .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
663 0 : let local_path = timeline_path.join(layer_name.to_string());
664 0 : tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
665 :
666 0 : tokio::fs::remove_file(&local_path)
667 0 : .await
668 0 : .or_else(fs_ext::ignore_not_found)
669 0 : .maybe_fatal_err("Removing secondary layer")?;
670 :
671 : // Update in-memory housekeeping to reflect the absence of the deleted layer
672 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
673 0 : let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
674 0 : continue;
675 : };
676 0 : timeline_state.on_disk_layers.remove(&layer_name);
677 : }
678 :
679 0 : for timeline_id in delete_timelines {
680 0 : let timeline_path = self
681 0 : .conf
682 0 : .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
683 0 : tracing::info!(timeline_id=%timeline_id,
684 0 : "Timeline no longer in heatmap, removing from secondary location"
685 0 : );
686 0 : tokio::fs::remove_dir_all(&timeline_path)
687 0 : .await
688 0 : .or_else(fs_ext::ignore_not_found)
689 0 : .maybe_fatal_err("Removing secondary timeline")?;
690 : }
691 :
692 0 : Ok(progress)
693 0 : }
694 :
695 : /// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object
696 : /// still matches `prev_etag`.
697 0 : async fn download_heatmap(
698 0 : &self,
699 0 : prev_etag: Option<&Etag>,
700 0 : ) -> Result<HeatMapDownload, UpdateError> {
701 0 : debug_assert_current_span_has_tenant_id();
702 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
703 0 : // TODO: pull up etag check into the request, to do a conditional GET rather than
704 0 : // issuing a GET and then maybe ignoring the response body
705 0 : // (https://github.com/neondatabase/neon/issues/6199)
706 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
707 :
708 0 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
709 0 : let cancel = &self.secondary_state.cancel;
710 0 :
711 0 : backoff::retry(
712 0 : || async {
713 0 : let download = self
714 0 : .remote_storage
715 0 : .download(&heatmap_path, cancel)
716 0 : .await
717 0 : .map_err(UpdateError::from)?;
718 :
719 0 : SECONDARY_MODE.download_heatmap.inc();
720 0 :
721 0 : if Some(&download.etag) == prev_etag {
722 0 : Ok(HeatMapDownload::Unmodified)
723 : } else {
724 0 : let mut heatmap_bytes = Vec::new();
725 0 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
726 0 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
727 0 : Ok(HeatMapDownload::Modified(HeatMapModified {
728 0 : etag: download.etag,
729 0 : last_modified: download.last_modified,
730 0 : bytes: heatmap_bytes,
731 0 : }))
732 : }
733 0 : },
734 0 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
735 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
736 0 : FAILED_REMOTE_OP_RETRIES,
737 0 : "download heatmap",
738 0 : cancel,
739 0 : )
740 0 : .await
741 0 : .ok_or_else(|| UpdateError::Cancelled)
742 0 : .and_then(|x| x)
743 0 : }
744 :
745 0 : async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
746 0 : debug_assert_current_span_has_tenant_and_timeline_id();
747 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
748 0 : let timeline_path = self
749 0 : .conf
750 0 : .timeline_path(tenant_shard_id, &timeline.timeline_id);
751 0 :
752 0 : // Accumulate updates to the state
753 0 : let mut touched = Vec::new();
754 0 :
755 0 : // Clone a view of what layers already exist on disk
756 0 : let timeline_state = self
757 0 : .secondary_state
758 0 : .detail
759 0 : .lock()
760 0 : .unwrap()
761 0 : .timelines
762 0 : .get(&timeline.timeline_id)
763 0 : .cloned();
764 :
765 0 : let timeline_state = match timeline_state {
766 0 : Some(t) => t,
767 : None => {
768 : // We have no existing state: need to scan local disk for layers first.
769 0 : let timeline_state =
770 0 : init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
771 :
772 : // Re-acquire detail lock now that we're done with async load from local FS
773 0 : self.secondary_state
774 0 : .detail
775 0 : .lock()
776 0 : .unwrap()
777 0 : .timelines
778 0 : .insert(timeline.timeline_id, timeline_state.clone());
779 0 : timeline_state
780 : }
781 : };
782 :
783 0 : tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
784 :
785 : // Download heatmap layers that are not present on local disk, or update their
786 : // access time if they are already present.
787 0 : for layer in timeline.layers {
788 0 : if self.secondary_state.cancel.is_cancelled() {
789 0 : tracing::debug!("Cancelled -- dropping out of layer loop");
790 0 : return Ok(());
791 0 : }
792 :
793 : // Existing on-disk layers: just update their access time.
794 0 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
795 0 : tracing::debug!("Layer {} is already on disk", layer.name);
796 :
797 0 : if cfg!(debug_assertions) {
798 : // Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
799 : // are already present on disk are really there.
800 0 : let local_path = self
801 0 : .conf
802 0 : .timeline_path(tenant_shard_id, &timeline.timeline_id)
803 0 : .join(layer.name.file_name());
804 0 : match tokio::fs::metadata(&local_path).await {
805 0 : Ok(meta) => {
806 0 : tracing::debug!(
807 0 : "Layer {} present at {}, size {}",
808 0 : layer.name,
809 0 : local_path,
810 0 : meta.len(),
811 0 : );
812 : }
813 0 : Err(e) => {
814 0 : tracing::warn!(
815 0 : "Layer {} not found at {} ({})",
816 0 : layer.name,
817 0 : local_path,
818 0 : e
819 0 : );
820 0 : debug_assert!(false);
821 : }
822 : }
823 0 : }
824 :
825 0 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
826 0 : || on_disk.access_time != layer.access_time
827 : {
828 : // We already have this layer on disk. Update its access time.
829 0 : tracing::debug!(
830 0 : "Access time updated for layer {}: {} -> {}",
831 0 : layer.name,
832 0 : strftime(&on_disk.access_time),
833 0 : strftime(&layer.access_time)
834 0 : );
835 0 : touched.push(layer);
836 0 : }
837 0 : continue;
838 : } else {
839 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
840 : }
841 :
842 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
843 : // recently than it was evicted.
844 0 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
845 0 : if &layer.access_time > evicted_at {
846 0 : tracing::info!(
847 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
848 0 : layer.name,
849 0 : strftime(&layer.access_time),
850 0 : strftime(evicted_at)
851 0 : );
852 : } else {
853 0 : tracing::trace!(
854 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
855 0 : layer.name,
856 0 : strftime(&layer.access_time),
857 0 : strftime(evicted_at)
858 0 : );
859 0 : continue;
860 : }
861 0 : }
862 :
863 : // Failpoint for simulating slow remote storage
864 0 : failpoint_support::sleep_millis_async!(
865 0 : "secondary-layer-download-sleep",
866 0 : &self.secondary_state.cancel
867 0 : );
868 :
869 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
870 0 : let downloaded_bytes = match download_layer_file(
871 0 : self.conf,
872 0 : self.remote_storage,
873 0 : *tenant_shard_id,
874 0 : timeline.timeline_id,
875 0 : &layer.name,
876 0 : &LayerFileMetadata::from(&layer.metadata),
877 0 : &self.secondary_state.cancel,
878 0 : )
879 0 : .await
880 : {
881 0 : Ok(bytes) => bytes,
882 : Err(DownloadError::NotFound) => {
883 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
884 : // This is harmless: continue to download the next layer. It is expected during compaction
885 : // GC.
886 0 : tracing::debug!(
887 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
888 0 : layer.name
889 0 : );
890 0 : continue;
891 : }
892 0 : Err(e) => return Err(e.into()),
893 : };
894 :
895 0 : if downloaded_bytes != layer.metadata.file_size {
896 0 : let local_path = timeline_path.join(layer.name.to_string());
897 0 :
898 0 : tracing::warn!(
899 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
900 0 : layer.name,
901 0 : downloaded_bytes,
902 0 : layer.metadata.file_size
903 0 : );
904 :
905 0 : tokio::fs::remove_file(&local_path)
906 0 : .await
907 0 : .or_else(fs_ext::ignore_not_found)?;
908 : } else {
909 0 : tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
910 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
911 0 : progress.bytes_downloaded += downloaded_bytes;
912 0 : progress.layers_downloaded += 1;
913 : }
914 :
915 0 : SECONDARY_MODE.download_layer.inc();
916 0 : touched.push(layer)
917 : }
918 :
919 : // Write updates to state to record layers we just downloaded or touched.
920 : {
921 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
922 0 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
923 0 :
924 0 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
925 :
926 0 : for t in touched {
927 : use std::collections::hash_map::Entry;
928 0 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
929 0 : Entry::Occupied(mut v) => {
930 0 : v.get_mut().access_time = t.access_time;
931 0 : }
932 0 : Entry::Vacant(e) => {
933 0 : e.insert(OnDiskState::new(
934 0 : self.conf,
935 0 : tenant_shard_id,
936 0 : &timeline.timeline_id,
937 0 : t.name,
938 0 : LayerFileMetadata::from(&t.metadata),
939 0 : t.access_time,
940 0 : ));
941 0 : }
942 : }
943 : }
944 : }
945 :
946 0 : Ok(())
947 0 : }
948 : }
949 :
950 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
951 0 : async fn init_timeline_state(
952 0 : conf: &'static PageServerConf,
953 0 : tenant_shard_id: &TenantShardId,
954 0 : heatmap: &HeatMapTimeline,
955 0 : ) -> SecondaryDetailTimeline {
956 0 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
957 0 : let mut detail = SecondaryDetailTimeline::default();
958 :
959 0 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
960 0 : Ok(d) => d,
961 0 : Err(e) => {
962 0 : if e.kind() == std::io::ErrorKind::NotFound {
963 0 : let context = format!("Creating timeline directory {timeline_path}");
964 0 : tracing::info!("{}", context);
965 0 : tokio::fs::create_dir_all(&timeline_path)
966 0 : .await
967 0 : .fatal_err(&context);
968 0 :
969 0 : // No entries to report: drop out.
970 0 : return detail;
971 : } else {
972 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
973 : }
974 : }
975 : };
976 :
977 : // As we iterate through layers found on disk, we will look up their metadata from this map.
978 : // Layers not present in metadata will be discarded.
979 0 : let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
980 0 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
981 :
982 0 : while let Some(dentry) = dir
983 0 : .next_entry()
984 0 : .await
985 0 : .fatal_err(&format!("Listing {timeline_path}"))
986 : {
987 0 : let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
988 0 : tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
989 0 : continue;
990 : };
991 0 : let local_meta = dentry
992 0 : .metadata()
993 0 : .await
994 0 : .fatal_err(&format!("Read metadata on {}", file_path));
995 0 :
996 0 : let file_name = file_path.file_name().expect("created it from the dentry");
997 0 : if file_name == METADATA_FILE_NAME {
998 : // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
999 0 : warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config");
1000 0 : continue;
1001 0 : } else if crate::is_temporary(&file_path)
1002 0 : || is_temp_download_file(&file_path)
1003 0 : || is_ephemeral_file(file_name)
1004 : {
1005 : // Temporary files are frequently left behind from restarting during downloads
1006 0 : tracing::info!("Cleaning up temporary file {file_path}");
1007 0 : if let Err(e) = tokio::fs::remove_file(&file_path)
1008 0 : .await
1009 0 : .or_else(fs_ext::ignore_not_found)
1010 : {
1011 0 : tracing::error!("Failed to remove temporary file {file_path}: {e}");
1012 0 : }
1013 0 : continue;
1014 0 : }
1015 0 :
1016 0 : match LayerFileName::from_str(file_name) {
1017 0 : Ok(name) => {
1018 0 : let remote_meta = heatmap_metadata.get(&name);
1019 0 : match remote_meta {
1020 0 : Some(remote_meta) => {
1021 0 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
1022 0 : if local_meta.len() != remote_meta.metadata.file_size {
1023 : // This should not happen, because we do crashsafe write-then-rename when downloading
1024 : // layers, and layers in remote storage are immutable. Remove the local file because
1025 : // we cannot trust it.
1026 0 : tracing::warn!(
1027 0 : "Removing local layer {name} with unexpected local size {} != {}",
1028 0 : local_meta.len(),
1029 0 : remote_meta.metadata.file_size
1030 0 : );
1031 0 : } else {
1032 0 : // We expect the access time to be initialized immediately afterwards, when
1033 0 : // the latest heatmap is applied to the state.
1034 0 : detail.on_disk_layers.insert(
1035 0 : name.clone(),
1036 0 : OnDiskState::new(
1037 0 : conf,
1038 0 : tenant_shard_id,
1039 0 : &heatmap.timeline_id,
1040 0 : name,
1041 0 : LayerFileMetadata::from(&remote_meta.metadata),
1042 0 : remote_meta.access_time,
1043 0 : ),
1044 0 : );
1045 0 : }
1046 : }
1047 : None => {
1048 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
1049 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
1050 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
1051 0 : tracing::info!(
1052 0 : "Removing secondary local layer {} because it's absent in heatmap",
1053 0 : name
1054 0 : );
1055 0 : tokio::fs::remove_file(&dentry.path())
1056 0 : .await
1057 0 : .or_else(fs_ext::ignore_not_found)
1058 0 : .fatal_err(&format!(
1059 0 : "Removing layer {}",
1060 0 : dentry.path().to_string_lossy()
1061 0 : ));
1062 : }
1063 : }
1064 : }
1065 : Err(_) => {
1066 : // Ignore it.
1067 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
1068 : }
1069 : }
1070 : }
1071 :
1072 0 : detail
1073 0 : }
|