Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : pin::Pin,
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant, SystemTime},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : context::RequestContext,
12 : disk_usage_eviction_task::{
13 : finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
14 : },
15 : metrics::SECONDARY_MODE,
16 : tenant::{
17 : config::SecondaryLocationConfig,
18 : debug_assert_current_span_has_tenant_and_timeline_id,
19 : ephemeral_file::is_ephemeral_file,
20 : remote_timeline_client::{
21 : index::LayerFileMetadata, is_temp_download_file, FAILED_DOWNLOAD_WARN_THRESHOLD,
22 : FAILED_REMOTE_OP_RETRIES,
23 : },
24 : span::debug_assert_current_span_has_tenant_id,
25 : storage_layer::{layer::local_layer_path, LayerName},
26 : tasks::{warn_when_period_overrun, BackgroundLoopKind},
27 : },
28 : virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
29 : METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
30 : };
31 :
32 : use super::{
33 : heatmap::HeatMapLayer,
34 : scheduler::{
35 : self, period_jitter, period_warmup, Completion, JobGenerator, SchedulingResult,
36 : TenantBackgroundJobs,
37 : },
38 : SecondaryTenant,
39 : };
40 :
41 : use crate::tenant::{
42 : mgr::TenantManager,
43 : remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
44 : };
45 :
46 : use camino::Utf8PathBuf;
47 : use chrono::format::{DelayedFormat, StrftimeItems};
48 : use futures::Future;
49 : use pageserver_api::models::SecondaryProgress;
50 : use pageserver_api::shard::TenantShardId;
51 : use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
52 :
53 : use tokio_util::sync::CancellationToken;
54 : use tracing::{info_span, instrument, warn, Instrument};
55 : use utils::{
56 : backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext,
57 : id::TimelineId, serde_system_time,
58 : };
59 :
60 : use super::{
61 : heatmap::{HeatMapTenant, HeatMapTimeline},
62 : CommandRequest, DownloadCommand,
63 : };
64 :
65 : /// For each tenant, how long must have passed since the last download_tenant call before
66 : /// calling it again. This is approximately the time by which local data is allowed
67 : /// to fall behind remote data.
68 : ///
69 : /// TODO: this should just be a default, and the actual period should be controlled
70 : /// via the heatmap itself
71 : /// `<ttps://github.com/neondatabase/neon/issues/6200>`
72 : const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
73 :
74 0 : pub(super) async fn downloader_task(
75 0 : tenant_manager: Arc<TenantManager>,
76 0 : remote_storage: GenericRemoteStorage,
77 0 : command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
78 0 : background_jobs_can_start: Barrier,
79 0 : cancel: CancellationToken,
80 0 : root_ctx: RequestContext,
81 0 : ) {
82 0 : let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
83 0 :
84 0 : let generator = SecondaryDownloader {
85 0 : tenant_manager,
86 0 : remote_storage,
87 0 : root_ctx,
88 0 : };
89 0 : let mut scheduler = Scheduler::new(generator, concurrency);
90 0 :
91 0 : scheduler
92 0 : .run(command_queue, background_jobs_can_start, cancel)
93 0 : .instrument(info_span!("secondary_downloads"))
94 0 : .await
95 0 : }
96 :
97 : struct SecondaryDownloader {
98 : tenant_manager: Arc<TenantManager>,
99 : remote_storage: GenericRemoteStorage,
100 : root_ctx: RequestContext,
101 : }
102 :
103 : #[derive(Debug, Clone)]
104 : pub(super) struct OnDiskState {
105 : metadata: LayerFileMetadata,
106 : access_time: SystemTime,
107 : }
108 :
109 : impl OnDiskState {
110 0 : fn new(
111 0 : _conf: &'static PageServerConf,
112 0 : _tenant_shard_id: &TenantShardId,
113 0 : _imeline_id: &TimelineId,
114 0 : _ame: LayerName,
115 0 : metadata: LayerFileMetadata,
116 0 : access_time: SystemTime,
117 0 : ) -> Self {
118 0 : Self {
119 0 : metadata,
120 0 : access_time,
121 0 : }
122 0 : }
123 : }
124 :
125 : #[derive(Debug, Clone, Default)]
126 : pub(super) struct SecondaryDetailTimeline {
127 : pub(super) on_disk_layers: HashMap<LayerName, OnDiskState>,
128 :
129 : /// We remember when layers were evicted, to prevent re-downloading them.
130 : pub(super) evicted_at: HashMap<LayerName, SystemTime>,
131 : }
132 :
133 : /// This state is written by the secondary downloader, it is opaque
134 : /// to TenantManager
135 : #[derive(Debug)]
136 : pub(super) struct SecondaryDetail {
137 : pub(super) config: SecondaryLocationConfig,
138 :
139 : last_download: Option<Instant>,
140 : last_etag: Option<Etag>,
141 : next_download: Option<Instant>,
142 : pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
143 : }
144 :
145 : /// Helper for logging SystemTime
146 0 : fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
147 0 : let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
148 0 : datetime.format("%d/%m/%Y %T")
149 0 : }
150 :
151 : /// Information returned from download function when it detects the heatmap has changed
152 : struct HeatMapModified {
153 : etag: Etag,
154 : last_modified: SystemTime,
155 : bytes: Vec<u8>,
156 : }
157 :
158 : enum HeatMapDownload {
159 : // The heatmap's etag has changed: return the new etag, mtime and the body bytes
160 : Modified(HeatMapModified),
161 : // The heatmap's etag is unchanged
162 : Unmodified,
163 : }
164 :
165 : impl SecondaryDetail {
166 0 : pub(super) fn new(config: SecondaryLocationConfig) -> Self {
167 0 : Self {
168 0 : config,
169 0 : last_download: None,
170 0 : last_etag: None,
171 0 : next_download: None,
172 0 : timelines: HashMap::new(),
173 0 : }
174 0 : }
175 :
176 : /// Additionally returns the total number of layers, used for more stable relative access time
177 : /// based eviction.
178 0 : pub(super) fn get_layers_for_eviction(
179 0 : &self,
180 0 : parent: &Arc<SecondaryTenant>,
181 0 : ) -> (DiskUsageEvictionInfo, usize) {
182 0 : let mut result = DiskUsageEvictionInfo::default();
183 0 : let mut total_layers = 0;
184 :
185 0 : for (timeline_id, timeline_detail) in &self.timelines {
186 0 : result
187 0 : .resident_layers
188 0 : .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
189 0 : EvictionCandidate {
190 0 : layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
191 0 : secondary_tenant: parent.clone(),
192 0 : timeline_id: *timeline_id,
193 0 : name: name.clone(),
194 0 : metadata: ods.metadata.clone(),
195 0 : }),
196 0 : last_activity_ts: ods.access_time,
197 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
198 0 : }
199 0 : }));
200 0 :
201 0 : // total might be missing currently downloading layers, but as a lower than actual
202 0 : // value it is good enough approximation.
203 0 : total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
204 0 : }
205 0 : result.max_layer_size = result
206 0 : .resident_layers
207 0 : .iter()
208 0 : .map(|l| l.layer.get_file_size())
209 0 : .max();
210 0 :
211 0 : tracing::debug!(
212 0 : "eviction: secondary tenant {} found {} timelines, {} layers",
213 0 : parent.get_tenant_shard_id(),
214 0 : self.timelines.len(),
215 0 : result.resident_layers.len()
216 : );
217 :
218 0 : (result, total_layers)
219 0 : }
220 : }
221 :
222 : struct PendingDownload {
223 : secondary_state: Arc<SecondaryTenant>,
224 : last_download: Option<Instant>,
225 : target_time: Option<Instant>,
226 : period: Option<Duration>,
227 : }
228 :
229 : impl scheduler::PendingJob for PendingDownload {
230 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
231 0 : self.secondary_state.get_tenant_shard_id()
232 0 : }
233 : }
234 :
235 : struct RunningDownload {
236 : barrier: Barrier,
237 : }
238 :
239 : impl scheduler::RunningJob for RunningDownload {
240 0 : fn get_barrier(&self) -> Barrier {
241 0 : self.barrier.clone()
242 0 : }
243 : }
244 :
245 : struct CompleteDownload {
246 : secondary_state: Arc<SecondaryTenant>,
247 : completed_at: Instant,
248 : }
249 :
250 : impl scheduler::Completion for CompleteDownload {
251 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
252 0 : self.secondary_state.get_tenant_shard_id()
253 0 : }
254 : }
255 :
256 : type Scheduler = TenantBackgroundJobs<
257 : SecondaryDownloader,
258 : PendingDownload,
259 : RunningDownload,
260 : CompleteDownload,
261 : DownloadCommand,
262 : >;
263 :
264 : impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
265 : for SecondaryDownloader
266 : {
267 0 : #[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
268 : fn on_completion(&mut self, completion: CompleteDownload) {
269 : let CompleteDownload {
270 : secondary_state,
271 : completed_at: _completed_at,
272 : } = completion;
273 :
274 : tracing::debug!("Secondary tenant download completed");
275 :
276 : // Update freshened_at even if there was an error: we don't want errored tenants to implicitly
277 : // take priority to run again.
278 : let mut detail = secondary_state.detail.lock().unwrap();
279 : detail.next_download = Some(Instant::now() + period_jitter(DOWNLOAD_FRESHEN_INTERVAL, 5));
280 : }
281 :
282 0 : async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
283 0 : let mut result = SchedulingResult {
284 0 : jobs: Vec::new(),
285 0 : want_interval: None,
286 0 : };
287 0 :
288 0 : // Step 1: identify some tenants that we may work on
289 0 : let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
290 0 : self.tenant_manager
291 0 : .foreach_secondary_tenants(|_id, secondary_state| {
292 0 : tenants.push(secondary_state.clone());
293 0 : });
294 0 :
295 0 : // Step 2: filter out tenants which are not yet elegible to run
296 0 : let now = Instant::now();
297 0 : result.jobs = tenants
298 0 : .into_iter()
299 0 : .filter_map(|secondary_tenant| {
300 0 : let (last_download, next_download) = {
301 0 : let mut detail = secondary_tenant.detail.lock().unwrap();
302 0 :
303 0 : if !detail.config.warm {
304 : // Downloads are disabled for this tenant
305 0 : detail.next_download = None;
306 0 : return None;
307 0 : }
308 0 :
309 0 : if detail.next_download.is_none() {
310 0 : // Initialize randomly in the range from 0 to our interval: this uniformly spreads the start times. Subsequent
311 0 : // rounds will use a smaller jitter to avoid accidentally synchronizing later.
312 0 : detail.next_download = Some(now.checked_add(period_warmup(DOWNLOAD_FRESHEN_INTERVAL)).expect(
313 0 : "Using our constant, which is known to be small compared with clock range",
314 0 : ));
315 0 : }
316 0 : (detail.last_download, detail.next_download.unwrap())
317 0 : };
318 0 :
319 0 : if now > next_download {
320 0 : Some(PendingDownload {
321 0 : secondary_state: secondary_tenant,
322 0 : last_download,
323 0 : target_time: Some(next_download),
324 0 : period: Some(DOWNLOAD_FRESHEN_INTERVAL),
325 0 : })
326 : } else {
327 0 : None
328 : }
329 0 : })
330 0 : .collect();
331 0 :
332 0 : // Step 3: sort by target execution time to run most urgent first.
333 0 : result.jobs.sort_by_key(|j| j.target_time);
334 0 :
335 0 : result
336 0 : }
337 :
338 0 : fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
339 0 : let tenant_shard_id = command.get_tenant_shard_id();
340 0 :
341 0 : let tenant = self
342 0 : .tenant_manager
343 0 : .get_secondary_tenant_shard(*tenant_shard_id);
344 0 : let Some(tenant) = tenant else {
345 0 : return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
346 : };
347 :
348 0 : Ok(PendingDownload {
349 0 : target_time: None,
350 0 : period: None,
351 0 : last_download: None,
352 0 : secondary_state: tenant,
353 0 : })
354 0 : }
355 :
356 0 : fn spawn(
357 0 : &mut self,
358 0 : job: PendingDownload,
359 0 : ) -> (
360 0 : RunningDownload,
361 0 : Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
362 0 : ) {
363 0 : let PendingDownload {
364 0 : secondary_state,
365 0 : last_download,
366 0 : target_time,
367 0 : period,
368 0 : } = job;
369 0 :
370 0 : let (completion, barrier) = utils::completion::channel();
371 0 : let remote_storage = self.remote_storage.clone();
372 0 : let conf = self.tenant_manager.get_conf();
373 0 : let tenant_shard_id = *secondary_state.get_tenant_shard_id();
374 0 : let download_ctx = self.root_ctx.attached_child();
375 0 : (RunningDownload { barrier }, Box::pin(async move {
376 0 : let _completion = completion;
377 0 :
378 0 : match TenantDownloader::new(conf, &remote_storage, &secondary_state)
379 0 : .download(&download_ctx)
380 0 : .await
381 : {
382 : Err(UpdateError::NoData) => {
383 0 : tracing::info!("No heatmap found for tenant. This is fine if it is new.");
384 : },
385 : Err(UpdateError::NoSpace) => {
386 0 : tracing::warn!("Insufficient space while downloading. Will retry later.");
387 : }
388 : Err(UpdateError::Cancelled) => {
389 0 : tracing::debug!("Shut down while downloading");
390 : },
391 0 : Err(UpdateError::Deserialize(e)) => {
392 0 : tracing::error!("Corrupt content while downloading tenant: {e}");
393 : },
394 0 : Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
395 0 : tracing::error!("Error while downloading tenant: {e}");
396 : },
397 0 : Ok(()) => {}
398 : };
399 :
400 : // Irrespective of the result, we will reschedule ourselves to run after our usual period.
401 :
402 : // If the job had a target execution time, we may check our final execution
403 : // time against that for observability purposes.
404 0 : if let (Some(target_time), Some(period)) = (target_time, period) {
405 : // Only track execution lag if this isn't our first download: otherwise, it is expected
406 : // that execution will have taken longer than our configured interval, for example
407 : // when starting up a pageserver and
408 0 : if last_download.is_some() {
409 0 : // Elapsed time includes any scheduling lag as well as the execution of the job
410 0 : let elapsed = Instant::now().duration_since(target_time);
411 0 :
412 0 : warn_when_period_overrun(
413 0 : elapsed,
414 0 : period,
415 0 : BackgroundLoopKind::SecondaryDownload,
416 0 : );
417 0 : }
418 0 : }
419 :
420 0 : CompleteDownload {
421 0 : secondary_state,
422 0 : completed_at: Instant::now(),
423 0 : }
424 0 : }.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
425 0 : }
426 : }
427 :
428 : /// This type is a convenience to group together the various functions involved in
429 : /// freshening a secondary tenant.
430 : struct TenantDownloader<'a> {
431 : conf: &'static PageServerConf,
432 : remote_storage: &'a GenericRemoteStorage,
433 : secondary_state: &'a SecondaryTenant,
434 : }
435 :
436 : /// Errors that may be encountered while updating a tenant
437 0 : #[derive(thiserror::Error, Debug)]
438 : enum UpdateError {
439 : #[error("No remote data found")]
440 : NoData,
441 : #[error("Insufficient local storage space")]
442 : NoSpace,
443 : #[error("Failed to download")]
444 : DownloadError(DownloadError),
445 : #[error(transparent)]
446 : Deserialize(#[from] serde_json::Error),
447 : #[error("Cancelled")]
448 : Cancelled,
449 : #[error(transparent)]
450 : Other(#[from] anyhow::Error),
451 : }
452 :
453 : impl From<DownloadError> for UpdateError {
454 0 : fn from(value: DownloadError) -> Self {
455 0 : match &value {
456 0 : DownloadError::Cancelled => Self::Cancelled,
457 0 : DownloadError::NotFound => Self::NoData,
458 0 : _ => Self::DownloadError(value),
459 : }
460 0 : }
461 : }
462 :
463 : impl From<std::io::Error> for UpdateError {
464 0 : fn from(value: std::io::Error) -> Self {
465 0 : if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
466 0 : UpdateError::NoSpace
467 0 : } else if value
468 0 : .get_ref()
469 0 : .and_then(|x| x.downcast_ref::<DownloadError>())
470 0 : .is_some()
471 : {
472 0 : UpdateError::from(DownloadError::from(value))
473 : } else {
474 : // An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
475 0 : UpdateError::Other(anyhow::anyhow!(value))
476 : }
477 0 : }
478 : }
479 :
480 : impl<'a> TenantDownloader<'a> {
481 0 : fn new(
482 0 : conf: &'static PageServerConf,
483 0 : remote_storage: &'a GenericRemoteStorage,
484 0 : secondary_state: &'a SecondaryTenant,
485 0 : ) -> Self {
486 0 : Self {
487 0 : conf,
488 0 : remote_storage,
489 0 : secondary_state,
490 0 : }
491 0 : }
492 :
493 0 : async fn download(&self, ctx: &RequestContext) -> Result<(), UpdateError> {
494 0 : debug_assert_current_span_has_tenant_id();
495 :
496 : // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
497 : // cover our access to local storage.
498 0 : let Ok(_guard) = self.secondary_state.gate.enter() else {
499 : // Shutting down
500 0 : return Ok(());
501 : };
502 :
503 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
504 0 :
505 0 : // We will use the etag from last successful download to make the download conditional on changes
506 0 : let last_etag = self
507 0 : .secondary_state
508 0 : .detail
509 0 : .lock()
510 0 : .unwrap()
511 0 : .last_etag
512 0 : .clone();
513 :
514 : // Download the tenant's heatmap
515 : let HeatMapModified {
516 0 : last_modified: heatmap_mtime,
517 0 : etag: heatmap_etag,
518 0 : bytes: heatmap_bytes,
519 0 : } = match tokio::select!(
520 0 : bytes = self.download_heatmap(last_etag.as_ref()) => {bytes?},
521 0 : _ = self.secondary_state.cancel.cancelled() => return Ok(())
522 0 : ) {
523 : HeatMapDownload::Unmodified => {
524 0 : tracing::info!("Heatmap unchanged since last successful download");
525 0 : return Ok(());
526 : }
527 0 : HeatMapDownload::Modified(m) => m,
528 : };
529 :
530 0 : let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
531 :
532 : // Save the heatmap: this will be useful on restart, allowing us to reconstruct
533 : // layer metadata without having to re-download it.
534 0 : let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
535 0 :
536 0 : let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
537 0 : let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
538 0 : let heatmap_path_bg = heatmap_path.clone();
539 0 : VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
540 0 : .await
541 0 : .maybe_fatal_err(&context_msg)?;
542 :
543 0 : tracing::debug!(
544 0 : "Wrote local heatmap to {}, with {} timelines",
545 0 : heatmap_path,
546 0 : heatmap.timelines.len()
547 : );
548 :
549 : // Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general
550 : // principle that deletions should be done before writes wherever possible, and so that we can use this
551 : // phase to initialize our SecondaryProgress.
552 0 : {
553 0 : *self.secondary_state.progress.lock().unwrap() =
554 0 : self.prepare_timelines(&heatmap, heatmap_mtime).await?;
555 : }
556 :
557 : // Download the layers in the heatmap
558 0 : for timeline in heatmap.timelines {
559 0 : if self.secondary_state.cancel.is_cancelled() {
560 0 : tracing::debug!(
561 0 : "Cancelled before downloading timeline {}",
562 : timeline.timeline_id
563 : );
564 0 : return Ok(());
565 0 : }
566 0 :
567 0 : let timeline_id = timeline.timeline_id;
568 0 : self.download_timeline(timeline, ctx)
569 0 : .instrument(tracing::info_span!(
570 : "secondary_download_timeline",
571 : tenant_id=%tenant_shard_id.tenant_id,
572 0 : shard_id=%tenant_shard_id.shard_slug(),
573 : %timeline_id
574 : ))
575 0 : .await?;
576 : }
577 :
578 : // Only update last_etag after a full successful download: this way will not skip
579 : // the next download, even if the heatmap's actual etag is unchanged.
580 0 : self.secondary_state.detail.lock().unwrap().last_etag = Some(heatmap_etag);
581 0 :
582 0 : Ok(())
583 0 : }
584 :
585 : /// Do any fast local cleanup that comes before the much slower process of downloading
586 : /// layers from remote storage. In the process, initialize the SecondaryProgress object
587 : /// that will later be updated incrementally as we download layers.
588 0 : async fn prepare_timelines(
589 0 : &self,
590 0 : heatmap: &HeatMapTenant,
591 0 : heatmap_mtime: SystemTime,
592 0 : ) -> Result<SecondaryProgress, UpdateError> {
593 0 : let heatmap_stats = heatmap.get_stats();
594 0 : // We will construct a progress object, and then populate its initial "downloaded" numbers
595 0 : // while iterating through local layer state in [`Self::prepare_timelines`]
596 0 : let mut progress = SecondaryProgress {
597 0 : layers_total: heatmap_stats.layers,
598 0 : bytes_total: heatmap_stats.bytes,
599 0 : heatmap_mtime: Some(serde_system_time::SystemTime(heatmap_mtime)),
600 0 : layers_downloaded: 0,
601 0 : bytes_downloaded: 0,
602 0 : };
603 0 : // Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
604 0 : let mut delete_layers = Vec::new();
605 0 : let mut delete_timelines = Vec::new();
606 0 : {
607 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
608 0 : for (timeline_id, timeline_state) in &mut detail.timelines {
609 0 : let Some(heatmap_timeline_index) = heatmap
610 0 : .timelines
611 0 : .iter()
612 0 : .position(|t| t.timeline_id == *timeline_id)
613 : else {
614 : // This timeline is no longer referenced in the heatmap: delete it locally
615 0 : delete_timelines.push(*timeline_id);
616 0 : continue;
617 : };
618 :
619 0 : let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
620 0 :
621 0 : let layers_in_heatmap = heatmap_timeline
622 0 : .layers
623 0 : .iter()
624 0 : .map(|l| (&l.name, l.metadata.generation))
625 0 : .collect::<HashSet<_>>();
626 0 : let layers_on_disk = timeline_state
627 0 : .on_disk_layers
628 0 : .iter()
629 0 : .map(|l| (l.0, l.1.metadata.generation))
630 0 : .collect::<HashSet<_>>();
631 0 :
632 0 : let mut layer_count = layers_on_disk.len();
633 0 : let mut layer_byte_count: u64 = timeline_state
634 0 : .on_disk_layers
635 0 : .values()
636 0 : .map(|l| l.metadata.file_size())
637 0 : .sum();
638 :
639 : // Remove on-disk layers that are no longer present in heatmap
640 0 : for (layer_file_name, generation) in layers_on_disk.difference(&layers_in_heatmap) {
641 0 : layer_count -= 1;
642 0 : layer_byte_count -= timeline_state
643 0 : .on_disk_layers
644 0 : .get(layer_file_name)
645 0 : .unwrap()
646 0 : .metadata
647 0 : .file_size();
648 0 :
649 0 : let local_path = local_layer_path(
650 0 : self.conf,
651 0 : self.secondary_state.get_tenant_shard_id(),
652 0 : timeline_id,
653 0 : layer_file_name,
654 0 : generation,
655 0 : );
656 0 :
657 0 : delete_layers.push((*timeline_id, (*layer_file_name).clone(), local_path));
658 0 : }
659 :
660 0 : progress.bytes_downloaded += layer_byte_count;
661 0 : progress.layers_downloaded += layer_count;
662 : }
663 :
664 0 : for delete_timeline in &delete_timelines {
665 0 : // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
666 0 : // from disk fails that will be a fatal error.
667 0 : detail.timelines.remove(delete_timeline);
668 0 : }
669 : }
670 :
671 : // Execute accumulated deletions
672 0 : for (timeline_id, layer_name, local_path) in delete_layers {
673 0 : tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
674 :
675 0 : tokio::fs::remove_file(&local_path)
676 0 : .await
677 0 : .or_else(fs_ext::ignore_not_found)
678 0 : .maybe_fatal_err("Removing secondary layer")?;
679 :
680 : // Update in-memory housekeeping to reflect the absence of the deleted layer
681 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
682 0 : let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
683 0 : continue;
684 : };
685 0 : timeline_state.on_disk_layers.remove(&layer_name);
686 : }
687 :
688 0 : for timeline_id in delete_timelines {
689 0 : let timeline_path = self
690 0 : .conf
691 0 : .timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
692 0 : tracing::info!(timeline_id=%timeline_id,
693 0 : "Timeline no longer in heatmap, removing from secondary location"
694 : );
695 0 : tokio::fs::remove_dir_all(&timeline_path)
696 0 : .await
697 0 : .or_else(fs_ext::ignore_not_found)
698 0 : .maybe_fatal_err("Removing secondary timeline")?;
699 : }
700 :
701 0 : Ok(progress)
702 0 : }
703 :
704 : /// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object
705 : /// still matches `prev_etag`.
706 0 : async fn download_heatmap(
707 0 : &self,
708 0 : prev_etag: Option<&Etag>,
709 0 : ) -> Result<HeatMapDownload, UpdateError> {
710 0 : debug_assert_current_span_has_tenant_id();
711 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
712 0 : // TODO: pull up etag check into the request, to do a conditional GET rather than
713 0 : // issuing a GET and then maybe ignoring the response body
714 0 : // (https://github.com/neondatabase/neon/issues/6199)
715 0 : tracing::debug!("Downloading heatmap for secondary tenant",);
716 :
717 0 : let heatmap_path = remote_heatmap_path(tenant_shard_id);
718 0 : let cancel = &self.secondary_state.cancel;
719 0 :
720 0 : backoff::retry(
721 0 : || async {
722 0 : let download = self
723 0 : .remote_storage
724 0 : .download(&heatmap_path, cancel)
725 0 : .await
726 0 : .map_err(UpdateError::from)?;
727 0 :
728 0 : SECONDARY_MODE.download_heatmap.inc();
729 0 :
730 0 : if Some(&download.etag) == prev_etag {
731 0 : Ok(HeatMapDownload::Unmodified)
732 0 : } else {
733 0 : let mut heatmap_bytes = Vec::new();
734 0 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
735 0 : let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
736 0 : Ok(HeatMapDownload::Modified(HeatMapModified {
737 0 : etag: download.etag,
738 0 : last_modified: download.last_modified,
739 0 : bytes: heatmap_bytes,
740 0 : }))
741 0 : }
742 0 : },
743 0 : |e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
744 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
745 0 : FAILED_REMOTE_OP_RETRIES,
746 0 : "download heatmap",
747 0 : cancel,
748 0 : )
749 0 : .await
750 0 : .ok_or_else(|| UpdateError::Cancelled)
751 0 : .and_then(|x| x)
752 0 : }
753 :
754 0 : async fn download_timeline(
755 0 : &self,
756 0 : timeline: HeatMapTimeline,
757 0 : ctx: &RequestContext,
758 0 : ) -> Result<(), UpdateError> {
759 0 : debug_assert_current_span_has_tenant_and_timeline_id();
760 0 : let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
761 0 :
762 0 : // Accumulate updates to the state
763 0 : let mut touched = Vec::new();
764 0 :
765 0 : // Clone a view of what layers already exist on disk
766 0 : let timeline_state = self
767 0 : .secondary_state
768 0 : .detail
769 0 : .lock()
770 0 : .unwrap()
771 0 : .timelines
772 0 : .get(&timeline.timeline_id)
773 0 : .cloned();
774 :
775 0 : let timeline_state = match timeline_state {
776 0 : Some(t) => t,
777 : None => {
778 : // We have no existing state: need to scan local disk for layers first.
779 0 : let timeline_state =
780 0 : init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
781 :
782 : // Re-acquire detail lock now that we're done with async load from local FS
783 0 : self.secondary_state
784 0 : .detail
785 0 : .lock()
786 0 : .unwrap()
787 0 : .timelines
788 0 : .insert(timeline.timeline_id, timeline_state.clone());
789 0 : timeline_state
790 : }
791 : };
792 :
793 0 : tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
794 :
795 : // Download heatmap layers that are not present on local disk, or update their
796 : // access time if they are already present.
797 0 : for layer in timeline.layers {
798 0 : if self.secondary_state.cancel.is_cancelled() {
799 0 : tracing::debug!("Cancelled -- dropping out of layer loop");
800 0 : return Ok(());
801 0 : }
802 :
803 : // Existing on-disk layers: just update their access time.
804 0 : if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
805 0 : tracing::debug!("Layer {} is already on disk", layer.name);
806 :
807 0 : if cfg!(debug_assertions) {
808 : // Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
809 : // are already present on disk are really there.
810 0 : let local_path = local_layer_path(
811 0 : self.conf,
812 0 : tenant_shard_id,
813 0 : &timeline.timeline_id,
814 0 : &layer.name,
815 0 : &layer.metadata.generation,
816 0 : );
817 0 :
818 0 : match tokio::fs::metadata(&local_path).await {
819 0 : Ok(meta) => {
820 0 : tracing::debug!(
821 0 : "Layer {} present at {}, size {}",
822 0 : layer.name,
823 0 : local_path,
824 0 : meta.len(),
825 : );
826 : }
827 0 : Err(e) => {
828 0 : tracing::warn!(
829 0 : "Layer {} not found at {} ({})",
830 : layer.name,
831 : local_path,
832 : e
833 : );
834 0 : debug_assert!(false);
835 : }
836 : }
837 0 : }
838 :
839 0 : if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
840 0 : || on_disk.access_time != layer.access_time
841 : {
842 : // We already have this layer on disk. Update its access time.
843 0 : tracing::debug!(
844 0 : "Access time updated for layer {}: {} -> {}",
845 0 : layer.name,
846 0 : strftime(&on_disk.access_time),
847 0 : strftime(&layer.access_time)
848 : );
849 0 : touched.push(layer);
850 0 : }
851 0 : continue;
852 : } else {
853 0 : tracing::debug!("Layer {} not present on disk yet", layer.name);
854 : }
855 :
856 : // Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
857 : // recently than it was evicted.
858 0 : if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
859 0 : if &layer.access_time > evicted_at {
860 0 : tracing::info!(
861 0 : "Re-downloading evicted layer {}, accessed at {}, evicted at {}",
862 0 : layer.name,
863 0 : strftime(&layer.access_time),
864 0 : strftime(evicted_at)
865 : );
866 : } else {
867 0 : tracing::trace!(
868 0 : "Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
869 0 : layer.name,
870 0 : strftime(&layer.access_time),
871 0 : strftime(evicted_at)
872 : );
873 0 : continue;
874 : }
875 0 : }
876 :
877 : // Failpoint for simulating slow remote storage
878 0 : failpoint_support::sleep_millis_async!(
879 : "secondary-layer-download-sleep",
880 0 : &self.secondary_state.cancel
881 : );
882 :
883 : // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
884 0 : let downloaded_bytes = match download_layer_file(
885 0 : self.conf,
886 0 : self.remote_storage,
887 0 : *tenant_shard_id,
888 0 : timeline.timeline_id,
889 0 : &layer.name,
890 0 : &LayerFileMetadata::from(&layer.metadata),
891 0 : &self.secondary_state.cancel,
892 0 : ctx,
893 0 : )
894 0 : .await
895 : {
896 0 : Ok(bytes) => bytes,
897 : Err(DownloadError::NotFound) => {
898 : // A heatmap might be out of date and refer to a layer that doesn't exist any more.
899 : // This is harmless: continue to download the next layer. It is expected during compaction
900 : // GC.
901 0 : tracing::debug!(
902 0 : "Skipped downloading missing layer {}, raced with compaction/gc?",
903 : layer.name
904 : );
905 0 : continue;
906 : }
907 0 : Err(e) => return Err(e.into()),
908 : };
909 :
910 0 : if downloaded_bytes != layer.metadata.file_size {
911 0 : let local_path = local_layer_path(
912 0 : self.conf,
913 0 : tenant_shard_id,
914 0 : &timeline.timeline_id,
915 0 : &layer.name,
916 0 : &layer.metadata.generation,
917 0 : );
918 0 :
919 0 : tracing::warn!(
920 0 : "Downloaded layer {} with unexpected size {} != {}. Removing download.",
921 : layer.name,
922 : downloaded_bytes,
923 : layer.metadata.file_size
924 : );
925 :
926 0 : tokio::fs::remove_file(&local_path)
927 0 : .await
928 0 : .or_else(fs_ext::ignore_not_found)?;
929 : } else {
930 0 : tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
931 0 : let mut progress = self.secondary_state.progress.lock().unwrap();
932 0 : progress.bytes_downloaded += downloaded_bytes;
933 0 : progress.layers_downloaded += 1;
934 : }
935 :
936 0 : SECONDARY_MODE.download_layer.inc();
937 0 : touched.push(layer)
938 : }
939 :
940 : // Write updates to state to record layers we just downloaded or touched.
941 : {
942 0 : let mut detail = self.secondary_state.detail.lock().unwrap();
943 0 : let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
944 0 :
945 0 : tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
946 :
947 0 : for t in touched {
948 : use std::collections::hash_map::Entry;
949 0 : match timeline_detail.on_disk_layers.entry(t.name.clone()) {
950 0 : Entry::Occupied(mut v) => {
951 0 : v.get_mut().access_time = t.access_time;
952 0 : }
953 0 : Entry::Vacant(e) => {
954 0 : e.insert(OnDiskState::new(
955 0 : self.conf,
956 0 : tenant_shard_id,
957 0 : &timeline.timeline_id,
958 0 : t.name,
959 0 : LayerFileMetadata::from(&t.metadata),
960 0 : t.access_time,
961 0 : ));
962 0 : }
963 : }
964 : }
965 : }
966 :
967 0 : Ok(())
968 0 : }
969 : }
970 :
971 : /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
972 0 : async fn init_timeline_state(
973 0 : conf: &'static PageServerConf,
974 0 : tenant_shard_id: &TenantShardId,
975 0 : heatmap: &HeatMapTimeline,
976 0 : ) -> SecondaryDetailTimeline {
977 0 : let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
978 0 : let mut detail = SecondaryDetailTimeline::default();
979 :
980 0 : let mut dir = match tokio::fs::read_dir(&timeline_path).await {
981 0 : Ok(d) => d,
982 0 : Err(e) => {
983 0 : if e.kind() == std::io::ErrorKind::NotFound {
984 0 : let context = format!("Creating timeline directory {timeline_path}");
985 0 : tracing::info!("{}", context);
986 0 : tokio::fs::create_dir_all(&timeline_path)
987 0 : .await
988 0 : .fatal_err(&context);
989 0 :
990 0 : // No entries to report: drop out.
991 0 : return detail;
992 : } else {
993 0 : on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
994 : }
995 : }
996 : };
997 :
998 : // As we iterate through layers found on disk, we will look up their metadata from this map.
999 : // Layers not present in metadata will be discarded.
1000 0 : let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> =
1001 0 : heatmap.layers.iter().map(|l| (&l.name, l)).collect();
1002 :
1003 0 : while let Some(dentry) = dir
1004 0 : .next_entry()
1005 0 : .await
1006 0 : .fatal_err(&format!("Listing {timeline_path}"))
1007 : {
1008 0 : let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
1009 0 : tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
1010 0 : continue;
1011 : };
1012 0 : let local_meta = dentry
1013 0 : .metadata()
1014 0 : .await
1015 0 : .fatal_err(&format!("Read metadata on {}", file_path));
1016 0 :
1017 0 : let file_name = file_path.file_name().expect("created it from the dentry");
1018 0 : if file_name == METADATA_FILE_NAME {
1019 : // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
1020 0 : warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config");
1021 0 : continue;
1022 0 : } else if crate::is_temporary(&file_path)
1023 0 : || is_temp_download_file(&file_path)
1024 0 : || is_ephemeral_file(file_name)
1025 : {
1026 : // Temporary files are frequently left behind from restarting during downloads
1027 0 : tracing::info!("Cleaning up temporary file {file_path}");
1028 0 : if let Err(e) = tokio::fs::remove_file(&file_path)
1029 0 : .await
1030 0 : .or_else(fs_ext::ignore_not_found)
1031 : {
1032 0 : tracing::error!("Failed to remove temporary file {file_path}: {e}");
1033 0 : }
1034 0 : continue;
1035 0 : }
1036 0 :
1037 0 : match LayerName::from_str(file_name) {
1038 0 : Ok(name) => {
1039 0 : let remote_meta = heatmap_metadata.get(&name);
1040 0 : match remote_meta {
1041 0 : Some(remote_meta) => {
1042 0 : // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
1043 0 : if local_meta.len() != remote_meta.metadata.file_size {
1044 : // This should not happen, because we do crashsafe write-then-rename when downloading
1045 : // layers, and layers in remote storage are immutable. Remove the local file because
1046 : // we cannot trust it.
1047 0 : tracing::warn!(
1048 0 : "Removing local layer {name} with unexpected local size {} != {}",
1049 0 : local_meta.len(),
1050 : remote_meta.metadata.file_size
1051 : );
1052 0 : } else {
1053 0 : // We expect the access time to be initialized immediately afterwards, when
1054 0 : // the latest heatmap is applied to the state.
1055 0 : detail.on_disk_layers.insert(
1056 0 : name.clone(),
1057 0 : OnDiskState::new(
1058 0 : conf,
1059 0 : tenant_shard_id,
1060 0 : &heatmap.timeline_id,
1061 0 : name,
1062 0 : LayerFileMetadata::from(&remote_meta.metadata),
1063 0 : remote_meta.access_time,
1064 0 : ),
1065 0 : );
1066 0 : }
1067 : }
1068 : None => {
1069 : // FIXME: consider some optimization when transitioning from attached to secondary: maybe
1070 : // wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
1071 : // we will end up deleting any layers which were created+uploaded more recently than the heatmap.
1072 0 : tracing::info!(
1073 0 : "Removing secondary local layer {} because it's absent in heatmap",
1074 : name
1075 : );
1076 0 : tokio::fs::remove_file(&dentry.path())
1077 0 : .await
1078 0 : .or_else(fs_ext::ignore_not_found)
1079 0 : .fatal_err(&format!(
1080 0 : "Removing layer {}",
1081 0 : dentry.path().to_string_lossy()
1082 0 : ));
1083 : }
1084 : }
1085 : }
1086 : Err(_) => {
1087 : // Ignore it.
1088 0 : tracing::warn!("Unexpected file in timeline directory: {file_name}");
1089 : }
1090 : }
1091 : }
1092 :
1093 0 : detail
1094 0 : }
|