LCOV - code coverage report
Current view: top level - storage_scrubber/src - find_large_objects.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 73 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 20 0

            Line data    Source code
       1              : use std::pin::pin;
       2              : 
       3              : use futures::{StreamExt, TryStreamExt};
       4              : use pageserver::tenant::storage_layer::LayerName;
       5              : use remote_storage::ListingMode;
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use crate::checks::parse_layer_object_name;
       9              : use crate::metadata_stream::stream_tenants;
      10              : use crate::{BucketConfig, NodeKind, init_remote, stream_objects_with_retries};
      11              : 
      12            0 : #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
      13              : enum LargeObjectKind {
      14              :     DeltaLayer,
      15              :     ImageLayer,
      16              :     Other,
      17              : }
      18              : 
      19              : impl LargeObjectKind {
      20            0 :     fn from_key(key: &str) -> Self {
      21            0 :         let fname = key.split('/').last().unwrap();
      22              : 
      23            0 :         let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
      24            0 :             return LargeObjectKind::Other;
      25              :         };
      26              : 
      27            0 :         match layer_name {
      28            0 :             LayerName::Image(_) => LargeObjectKind::ImageLayer,
      29            0 :             LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
      30              :         }
      31            0 :     }
      32              : }
      33              : 
      34            0 : #[derive(Serialize, Deserialize, Clone)]
      35              : pub struct LargeObject {
      36              :     pub key: String,
      37              :     pub size: u64,
      38              :     kind: LargeObjectKind,
      39              : }
      40              : 
      41            0 : #[derive(Serialize, Deserialize)]
      42              : pub struct LargeObjectListing {
      43              :     pub objects: Vec<LargeObject>,
      44              : }
      45              : 
      46            0 : pub async fn find_large_objects(
      47            0 :     bucket_config: BucketConfig,
      48            0 :     min_size: u64,
      49            0 :     ignore_deltas: bool,
      50            0 :     concurrency: usize,
      51            0 : ) -> anyhow::Result<LargeObjectListing> {
      52            0 :     let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
      53            0 :     let tenants = pin!(stream_tenants(&remote_client, &target));
      54            0 : 
      55            0 :     let objects_stream = tenants.map_ok(|tenant_shard_id| {
      56            0 :         let mut tenant_root = target.tenant_root(&tenant_shard_id);
      57            0 :         let remote_client = remote_client.clone();
      58            0 :         async move {
      59            0 :             let mut objects = Vec::new();
      60            0 :             let mut total_objects_ctr = 0u64;
      61            0 :             // We want the objects and not just common prefixes
      62            0 :             tenant_root.delimiter.clear();
      63            0 :             let mut objects_stream = pin!(stream_objects_with_retries(
      64            0 :                 &remote_client,
      65            0 :                 ListingMode::NoDelimiter,
      66            0 :                 &tenant_root
      67            0 :             ));
      68            0 :             while let Some(listing) = objects_stream.next().await {
      69            0 :                 let listing = listing?;
      70            0 :                 for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
      71            0 :                     let key = obj.key.to_string();
      72            0 :                     let kind = LargeObjectKind::from_key(&key);
      73            0 :                     if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
      74            0 :                         continue;
      75            0 :                     }
      76            0 :                     objects.push(LargeObject {
      77            0 :                         key,
      78            0 :                         size: obj.size,
      79            0 :                         kind,
      80            0 :                     })
      81              :                 }
      82            0 :                 total_objects_ctr += listing.keys.len() as u64;
      83              :             }
      84              : 
      85            0 :             Ok((tenant_shard_id, objects, total_objects_ctr))
      86            0 :         }
      87            0 :     });
      88            0 :     let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
      89            0 : 
      90            0 :     let mut objects = Vec::new();
      91            0 : 
      92            0 :     let mut tenant_ctr = 0u64;
      93            0 :     let mut object_ctr = 0u64;
      94            0 :     while let Some(res) = objects_stream.next().await {
      95            0 :         let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
      96            0 :         objects.extend_from_slice(&objects_slice);
      97            0 : 
      98            0 :         object_ctr += total_objects_ctr;
      99            0 :         tenant_ctr += 1;
     100            0 :         if tenant_ctr % 100 == 0 {
     101            0 :             tracing::info!(
     102            0 :                 "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
     103            0 :                 objects.len()
     104              :             );
     105            0 :         }
     106              :     }
     107              : 
     108            0 :     let desc_str = target.desc_str();
     109            0 :     tracing::info!(
     110            0 :         "Scan of {desc_str} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
     111            0 :         objects.len()
     112              :     );
     113            0 :     Ok(LargeObjectListing { objects })
     114            0 : }
        

Generated by: LCOV version 2.1-beta