LCOV - code coverage report
Current view: top level - storage_scrubber/src - tenant_snapshot.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 206 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 17 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::sync::Arc;
       3              : 
       4              : use anyhow::Context;
       5              : use async_stream::stream;
       6              : use aws_sdk_s3::Client;
       7              : use camino::Utf8PathBuf;
       8              : use futures::{StreamExt, TryStreamExt};
       9              : use pageserver::tenant::IndexPart;
      10              : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
      11              : use pageserver::tenant::storage_layer::LayerName;
      12              : use pageserver_api::shard::TenantShardId;
      13              : use remote_storage::{GenericRemoteStorage, S3Config};
      14              : use utils::generation::Generation;
      15              : use utils::id::TenantId;
      16              : 
      17              : use crate::checks::{BlobDataParseResult, RemoteTimelineBlobData, list_timeline_blobs};
      18              : use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
      19              : use crate::{
      20              :     BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file_s3,
      21              :     init_remote, init_remote_s3,
      22              : };
      23              : 
      24              : pub struct SnapshotDownloader {
      25              :     s3_client: Arc<Client>,
      26              :     s3_root: RootTarget,
      27              :     bucket_config: BucketConfig,
      28              :     bucket_config_s3: S3Config,
      29              :     tenant_id: TenantId,
      30              :     output_path: Utf8PathBuf,
      31              :     concurrency: usize,
      32              : }
      33              : 
      34              : impl SnapshotDownloader {
      35            0 :     pub async fn new(
      36            0 :         bucket_config: BucketConfig,
      37            0 :         tenant_id: TenantId,
      38            0 :         output_path: Utf8PathBuf,
      39            0 :         concurrency: usize,
      40            0 :     ) -> anyhow::Result<Self> {
      41            0 :         let bucket_config_s3 = match &bucket_config.0.storage {
      42            0 :             remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(),
      43            0 :             _ => panic!("only S3 configuration is supported for snapshot downloading"),
      44              :         };
      45            0 :         let (s3_client, s3_root) =
      46            0 :             init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?;
      47            0 :         Ok(Self {
      48            0 :             s3_client,
      49            0 :             s3_root,
      50            0 :             bucket_config,
      51            0 :             bucket_config_s3,
      52            0 :             tenant_id,
      53            0 :             output_path,
      54            0 :             concurrency,
      55            0 :         })
      56            0 :     }
      57              : 
      58            0 :     async fn download_layer(
      59            0 :         &self,
      60            0 :         ttid: TenantShardTimelineId,
      61            0 :         layer_name: LayerName,
      62            0 :         layer_metadata: LayerFileMetadata,
      63            0 :     ) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
      64            0 :         // Note this is local as in a local copy of S3 data, not local as in the pageserver's local format.  They use
      65            0 :         // different layer names (remote-style has the generation suffix)
      66            0 :         let local_path = self.output_path.join(format!(
      67            0 :             "{}/timelines/{}/{}{}",
      68            0 :             ttid.tenant_shard_id,
      69            0 :             ttid.timeline_id,
      70            0 :             layer_name,
      71            0 :             layer_metadata.generation.get_suffix()
      72            0 :         ));
      73            0 : 
      74            0 :         // We should only be called for layers that are owned by the input TTID
      75            0 :         assert_eq!(layer_metadata.shard, ttid.tenant_shard_id.to_index());
      76              : 
      77              :         // Assumption: we always write layer files atomically, and layer files are immutable.  Therefore if the file
      78              :         // already exists on local disk, we assume it is fully correct and skip it.
      79            0 :         if tokio::fs::try_exists(&local_path).await? {
      80            0 :             tracing::debug!("{} already exists", local_path);
      81            0 :             return Ok((layer_name, layer_metadata));
      82              :         } else {
      83            0 :             tracing::debug!("{} requires download...", local_path);
      84              : 
      85            0 :             let timeline_root = self.s3_root.timeline_root(&ttid);
      86            0 :             let remote_layer_path = format!(
      87            0 :                 "{}{}{}",
      88            0 :                 timeline_root.prefix_in_bucket,
      89            0 :                 layer_name,
      90            0 :                 layer_metadata.generation.get_suffix()
      91            0 :             );
      92              : 
      93              :             // List versions: the object might be deleted.
      94            0 :             let versions = self
      95            0 :                 .s3_client
      96            0 :                 .list_object_versions()
      97            0 :                 .bucket(self.bucket_config_s3.bucket_name.clone())
      98            0 :                 .prefix(&remote_layer_path)
      99            0 :                 .send()
     100            0 :                 .await?;
     101            0 :             let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
     102            0 :                 return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
     103              :             };
     104            0 :             download_object_to_file_s3(
     105            0 :                 &self.s3_client,
     106            0 :                 &self.bucket_config_s3.bucket_name,
     107            0 :                 &remote_layer_path,
     108            0 :                 version.version_id.as_deref(),
     109            0 :                 &local_path,
     110            0 :             )
     111            0 :             .await?;
     112              : 
     113            0 :             tracing::debug!("Downloaded successfully to {local_path}");
     114              :         }
     115              : 
     116            0 :         Ok((layer_name, layer_metadata))
     117            0 :     }
     118              : 
     119              :     /// Download many layers belonging to the same TTID, with some concurrency
     120            0 :     async fn download_layers(
     121            0 :         &self,
     122            0 :         ttid: TenantShardTimelineId,
     123            0 :         layers: Vec<(LayerName, LayerFileMetadata)>,
     124            0 :     ) -> anyhow::Result<()> {
     125            0 :         let layer_count = layers.len();
     126            0 :         tracing::info!("Downloading {} layers for timeline {ttid}...", layer_count);
     127            0 :         let layers_stream = stream! {
     128            0 :             for (layer_name, layer_metadata) in layers {
     129            0 :                 yield self.download_layer(ttid, layer_name, layer_metadata);
     130            0 :             }
     131            0 :         };
     132            0 : 
     133            0 :         tokio::fs::create_dir_all(self.output_path.join(format!(
     134            0 :             "{}/timelines/{}",
     135            0 :             ttid.tenant_shard_id, ttid.timeline_id
     136            0 :         )))
     137            0 :         .await?;
     138              : 
     139            0 :         let layer_results = layers_stream.buffered(self.concurrency);
     140            0 :         let mut layer_results = std::pin::pin!(layer_results);
     141            0 : 
     142            0 :         let mut err = None;
     143            0 :         let mut download_count = 0;
     144            0 :         while let Some(i) = layer_results.next().await {
     145            0 :             download_count += 1;
     146            0 :             match i {
     147            0 :                 Ok((layer_name, layer_metadata)) => {
     148            0 :                     tracing::info!(
     149            0 :                         "[{download_count}/{layer_count}] OK: {} bytes {ttid} {}",
     150              :                         layer_metadata.file_size,
     151              :                         layer_name
     152              :                     );
     153              :                 }
     154            0 :                 Err(e) => {
     155            0 :                     // Warn and continue: we will download what we can
     156            0 :                     tracing::warn!("Download error: {e}");
     157            0 :                     err = Some(e);
     158              :                 }
     159              :             }
     160              :         }
     161            0 :         if let Some(e) = err {
     162            0 :             tracing::warn!("Some errors occurred downloading {ttid} layers, last error: {e}");
     163            0 :             Err(e)
     164              :         } else {
     165            0 :             Ok(())
     166              :         }
     167            0 :     }
     168              : 
     169            0 :     async fn download_timeline(
     170            0 :         &self,
     171            0 :         ttid: TenantShardTimelineId,
     172            0 :         index_part: Box<IndexPart>,
     173            0 :         index_part_generation: Generation,
     174            0 :         ancestor_layers: &mut HashMap<TenantShardTimelineId, HashMap<LayerName, LayerFileMetadata>>,
     175            0 :     ) -> anyhow::Result<()> {
     176            0 :         let index_bytes = serde_json::to_string(&index_part).unwrap();
     177            0 : 
     178            0 :         let layers = index_part
     179            0 :             .layer_metadata
     180            0 :             .into_iter()
     181            0 :             .filter_map(|(layer_name, layer_metadata)| {
     182            0 :                 if layer_metadata.shard.shard_count != ttid.tenant_shard_id.shard_count {
     183              :                     // Accumulate ancestor layers for later download
     184            0 :                     let ancestor_ttid = TenantShardTimelineId::new(
     185            0 :                         TenantShardId {
     186            0 :                             tenant_id: ttid.tenant_shard_id.tenant_id,
     187            0 :                             shard_number: layer_metadata.shard.shard_number,
     188            0 :                             shard_count: layer_metadata.shard.shard_count,
     189            0 :                         },
     190            0 :                         ttid.timeline_id,
     191            0 :                     );
     192            0 :                     let ancestor_ttid_layers = ancestor_layers.entry(ancestor_ttid).or_default();
     193              :                     use std::collections::hash_map::Entry;
     194            0 :                     match ancestor_ttid_layers.entry(layer_name) {
     195            0 :                         Entry::Occupied(entry) => {
     196            0 :                             // Descendent shards that reference a layer from an ancestor should always have matching metadata,
     197            0 :                             // as their siblings, because it is read atomically during a shard split.
     198            0 :                             assert_eq!(entry.get(), &layer_metadata);
     199              :                         }
     200            0 :                         Entry::Vacant(entry) => {
     201            0 :                             entry.insert(layer_metadata);
     202            0 :                         }
     203              :                     }
     204            0 :                     None
     205              :                 } else {
     206            0 :                     Some((layer_name, layer_metadata))
     207              :                 }
     208            0 :             })
     209            0 :             .collect();
     210              : 
     211            0 :         let download_result = self.download_layers(ttid, layers).await;
     212              : 
     213              :         // Write index last, once all the layers it references are downloaded
     214            0 :         let local_index_path = self.output_path.join(format!(
     215            0 :             "{}/timelines/{}/index_part.json{}",
     216            0 :             ttid.tenant_shard_id,
     217            0 :             ttid.timeline_id,
     218            0 :             index_part_generation.get_suffix()
     219            0 :         ));
     220            0 :         tokio::fs::write(&local_index_path, index_bytes)
     221            0 :             .await
     222            0 :             .context("writing index")?;
     223              : 
     224            0 :         download_result
     225            0 :     }
     226              : 
     227            0 :     pub async fn download(&self) -> anyhow::Result<()> {
     228            0 :         let (remote_client, target) =
     229            0 :             init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
     230              : 
     231              :         // Generate a stream of TenantShardId
     232            0 :         let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
     233            0 :         let shards: Vec<TenantShardId> = shards.try_collect().await?;
     234              : 
     235              :         // Only read from shards that have the highest count: avoids redundantly downloading
     236              :         // from ancestor shards.
     237            0 :         let Some(shard_count) = shards.iter().map(|s| s.shard_count).max() else {
     238            0 :             anyhow::bail!("No shards found");
     239              :         };
     240              : 
     241              :         // We will build a collection of layers in anccestor shards to download (this will only
     242              :         // happen if this tenant has been split at some point)
     243            0 :         let mut ancestor_layers: HashMap<
     244            0 :             TenantShardTimelineId,
     245            0 :             HashMap<LayerName, LayerFileMetadata>,
     246            0 :         > = Default::default();
     247              : 
     248            0 :         for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
     249              :             // Generate a stream of TenantTimelineId
     250            0 :             let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
     251              : 
     252              :             // Generate a stream of S3TimelineBlobData
     253            0 :             async fn load_timeline_index(
     254            0 :                 remote_client: &GenericRemoteStorage,
     255            0 :                 target: &RootTarget,
     256            0 :                 ttid: TenantShardTimelineId,
     257            0 :             ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
     258            0 :                 let data = list_timeline_blobs(remote_client, ttid, target).await?;
     259            0 :                 Ok((ttid, data))
     260            0 :             }
     261            0 :             let timelines =
     262            0 :                 timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
     263            0 :             let mut timelines = std::pin::pin!(timelines.try_buffered(8));
     264              : 
     265            0 :             while let Some(i) = timelines.next().await {
     266            0 :                 let (ttid, data) = i?;
     267            0 :                 match data.blob_data {
     268              :                     BlobDataParseResult::Parsed {
     269            0 :                         index_part,
     270            0 :                         index_part_generation,
     271            0 :                         s3_layers: _,
     272            0 :                         index_part_last_modified_time: _,
     273            0 :                         index_part_snapshot_time: _,
     274            0 :                     } => {
     275            0 :                         self.download_timeline(
     276            0 :                             ttid,
     277            0 :                             index_part,
     278            0 :                             index_part_generation,
     279            0 :                             &mut ancestor_layers,
     280            0 :                         )
     281            0 :                         .await
     282            0 :                         .context("Downloading timeline")?;
     283              :                     }
     284            0 :                     BlobDataParseResult::Relic => {}
     285              :                     BlobDataParseResult::Incorrect { .. } => {
     286            0 :                         tracing::error!("Bad metadata in timeline {ttid}");
     287              :                     }
     288              :                 };
     289              :             }
     290              :         }
     291              : 
     292            0 :         for (ttid, layers) in ancestor_layers.into_iter() {
     293            0 :             tracing::info!(
     294            0 :                 "Downloading {} layers from ancestor timeline {ttid}...",
     295            0 :                 layers.len()
     296              :             );
     297              : 
     298            0 :             self.download_layers(ttid, layers.into_iter().collect())
     299            0 :                 .await?;
     300              :         }
     301              : 
     302            0 :         Ok(())
     303            0 :     }
     304              : }
        

Generated by: LCOV version 2.1-beta