LCOV - code coverage report
Current view: top level - storage_scrubber/src - find_large_objects.rs (source / functions) Coverage Total Hit
Test: a43a77853355b937a79c57b07a8f05607cf29e6c.info Lines: 0.0 % 73 0
Test Date: 2024-09-19 12:04:32 Functions: 0.0 % 29 0

            Line data    Source code
       1              : use std::pin::pin;
       2              : 
       3              : use futures::{StreamExt, TryStreamExt};
       4              : use pageserver::tenant::storage_layer::LayerName;
       5              : use remote_storage::ListingMode;
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use crate::{
       9              :     checks::parse_layer_object_name, init_remote, metadata_stream::stream_tenants,
      10              :     stream_objects_with_retries, BucketConfig, NodeKind,
      11              : };
      12              : 
      13            0 : #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
      14              : enum LargeObjectKind {
      15              :     DeltaLayer,
      16              :     ImageLayer,
      17              :     Other,
      18              : }
      19              : 
      20              : impl LargeObjectKind {
      21            0 :     fn from_key(key: &str) -> Self {
      22            0 :         let fname = key.split('/').last().unwrap();
      23              : 
      24            0 :         let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
      25            0 :             return LargeObjectKind::Other;
      26              :         };
      27              : 
      28            0 :         match layer_name {
      29            0 :             LayerName::Image(_) => LargeObjectKind::ImageLayer,
      30            0 :             LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
      31              :         }
      32            0 :     }
      33              : }
      34              : 
      35            0 : #[derive(Serialize, Deserialize, Clone)]
      36              : pub struct LargeObject {
      37              :     pub key: String,
      38              :     pub size: u64,
      39              :     kind: LargeObjectKind,
      40              : }
      41              : 
      42            0 : #[derive(Serialize, Deserialize)]
      43              : pub struct LargeObjectListing {
      44              :     pub objects: Vec<LargeObject>,
      45              : }
      46              : 
      47            0 : pub async fn find_large_objects(
      48            0 :     bucket_config: BucketConfig,
      49            0 :     min_size: u64,
      50            0 :     ignore_deltas: bool,
      51            0 :     concurrency: usize,
      52            0 : ) -> anyhow::Result<LargeObjectListing> {
      53            0 :     let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
      54            0 :     let tenants = pin!(stream_tenants(&remote_client, &target));
      55            0 : 
      56            0 :     let objects_stream = tenants.map_ok(|tenant_shard_id| {
      57            0 :         let mut tenant_root = target.tenant_root(&tenant_shard_id);
      58            0 :         let remote_client = remote_client.clone();
      59            0 :         async move {
      60            0 :             let mut objects = Vec::new();
      61            0 :             let mut total_objects_ctr = 0u64;
      62            0 :             // We want the objects and not just common prefixes
      63            0 :             tenant_root.delimiter.clear();
      64            0 :             let mut objects_stream = pin!(stream_objects_with_retries(
      65            0 :                 &remote_client,
      66            0 :                 ListingMode::NoDelimiter,
      67            0 :                 &tenant_root
      68            0 :             ));
      69            0 :             while let Some(listing) = objects_stream.next().await {
      70            0 :                 let listing = listing?;
      71            0 :                 for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
      72            0 :                     let key = obj.key.to_string();
      73            0 :                     let kind = LargeObjectKind::from_key(&key);
      74            0 :                     if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
      75            0 :                         continue;
      76            0 :                     }
      77            0 :                     objects.push(LargeObject {
      78            0 :                         key,
      79            0 :                         size: obj.size,
      80            0 :                         kind,
      81            0 :                     })
      82              :                 }
      83            0 :                 total_objects_ctr += listing.keys.len() as u64;
      84              :             }
      85              : 
      86            0 :             Ok((tenant_shard_id, objects, total_objects_ctr))
      87            0 :         }
      88            0 :     });
      89            0 :     let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
      90            0 : 
      91            0 :     let mut objects = Vec::new();
      92            0 : 
      93            0 :     let mut tenant_ctr = 0u64;
      94            0 :     let mut object_ctr = 0u64;
      95            0 :     while let Some(res) = objects_stream.next().await {
      96            0 :         let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
      97            0 :         objects.extend_from_slice(&objects_slice);
      98            0 : 
      99            0 :         object_ctr += total_objects_ctr;
     100            0 :         tenant_ctr += 1;
     101            0 :         if tenant_ctr % 100 == 0 {
     102            0 :             tracing::info!(
     103            0 :                 "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
     104            0 :                 objects.len()
     105              :             );
     106            0 :         }
     107              :     }
     108              : 
     109            0 :     let bucket_name = target.bucket_name();
     110            0 :     tracing::info!(
     111            0 :         "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
     112            0 :         objects.len()
     113              :     );
     114            0 :     Ok(LargeObjectListing { objects })
     115            0 : }
        

Generated by: LCOV version 2.1-beta