LCOV - code coverage report
Current view: top level - storage_scrubber/src - find_large_objects.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 0.0 % 80 0
Test Date: 2024-07-21 16:16:09 Functions: 0.0 % 29 0

            Line data    Source code
       1              : use futures::{StreamExt, TryStreamExt};
       2              : use pageserver::tenant::storage_layer::LayerName;
       3              : use serde::{Deserialize, Serialize};
       4              : 
       5              : use crate::{
       6              :     checks::parse_layer_object_name, init_remote, list_objects_with_retries,
       7              :     metadata_stream::stream_tenants, BucketConfig, NodeKind,
       8              : };
       9              : 
      10            0 : #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
      11              : enum LargeObjectKind {
      12              :     DeltaLayer,
      13              :     ImageLayer,
      14              :     Other,
      15              : }
      16              : 
      17              : impl LargeObjectKind {
      18            0 :     fn from_key(key: &str) -> Self {
      19            0 :         let fname = key.split('/').last().unwrap();
      20              : 
      21            0 :         let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
      22            0 :             return LargeObjectKind::Other;
      23              :         };
      24              : 
      25            0 :         match layer_name {
      26            0 :             LayerName::Image(_) => LargeObjectKind::ImageLayer,
      27            0 :             LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
      28              :         }
      29            0 :     }
      30              : }
      31              : 
      32            0 : #[derive(Serialize, Deserialize, Clone)]
      33              : pub struct LargeObject {
      34              :     pub key: String,
      35              :     pub size: u64,
      36              :     kind: LargeObjectKind,
      37              : }
      38              : 
      39            0 : #[derive(Serialize, Deserialize)]
      40              : pub struct LargeObjectListing {
      41              :     pub objects: Vec<LargeObject>,
      42              : }
      43              : 
      44            0 : pub async fn find_large_objects(
      45            0 :     bucket_config: BucketConfig,
      46            0 :     min_size: u64,
      47            0 :     ignore_deltas: bool,
      48            0 :     concurrency: usize,
      49            0 : ) -> anyhow::Result<LargeObjectListing> {
      50            0 :     let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
      51            0 :     let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
      52            0 : 
      53            0 :     let objects_stream = tenants.map_ok(|tenant_shard_id| {
      54            0 :         let mut tenant_root = target.tenant_root(&tenant_shard_id);
      55            0 :         let s3_client = s3_client.clone();
      56            0 :         async move {
      57            0 :             let mut objects = Vec::new();
      58            0 :             let mut total_objects_ctr = 0u64;
      59            0 :             // We want the objects and not just common prefixes
      60            0 :             tenant_root.delimiter.clear();
      61            0 :             let mut continuation_token = None;
      62              :             loop {
      63            0 :                 let fetch_response =
      64            0 :                     list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
      65            0 :                         .await?;
      66            0 :                 for obj in fetch_response.contents().iter().filter(|o| {
      67            0 :                     if let Some(obj_size) = o.size {
      68            0 :                         min_size as i64 <= obj_size
      69              :                     } else {
      70            0 :                         false
      71              :                     }
      72            0 :                 }) {
      73            0 :                     let key = obj.key().expect("couldn't get key").to_owned();
      74            0 :                     let kind = LargeObjectKind::from_key(&key);
      75            0 :                     if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
      76            0 :                         continue;
      77            0 :                     }
      78            0 :                     objects.push(LargeObject {
      79            0 :                         key,
      80            0 :                         size: obj.size.unwrap() as u64,
      81            0 :                         kind,
      82            0 :                     })
      83              :                 }
      84            0 :                 total_objects_ctr += fetch_response.contents().len() as u64;
      85            0 :                 match fetch_response.next_continuation_token {
      86            0 :                     Some(new_token) => continuation_token = Some(new_token),
      87            0 :                     None => break,
      88            0 :                 }
      89            0 :             }
      90            0 : 
      91            0 :             Ok((tenant_shard_id, objects, total_objects_ctr))
      92            0 :         }
      93            0 :     });
      94            0 :     let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
      95            0 : 
      96            0 :     let mut objects = Vec::new();
      97            0 : 
      98            0 :     let mut tenant_ctr = 0u64;
      99            0 :     let mut object_ctr = 0u64;
     100            0 :     while let Some(res) = objects_stream.next().await {
     101            0 :         let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
     102            0 :         objects.extend_from_slice(&objects_slice);
     103            0 : 
     104            0 :         object_ctr += total_objects_ctr;
     105            0 :         tenant_ctr += 1;
     106            0 :         if tenant_ctr % 100 == 0 {
     107            0 :             tracing::info!(
     108            0 :                 "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
     109            0 :                 objects.len()
     110              :             );
     111            0 :         }
     112              :     }
     113              : 
     114            0 :     let bucket_name = target.bucket_name();
     115            0 :     tracing::info!(
     116            0 :         "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
     117            0 :         objects.len()
     118              :     );
     119            0 :     Ok(LargeObjectListing { objects })
     120            0 : }
        

Generated by: LCOV version 2.1-beta