LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - metadata_stream.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 93 0 93
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 19 0 19
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use anyhow::Context;
       2                 : use async_stream::{stream, try_stream};
       3                 : use aws_sdk_s3::{types::ObjectIdentifier, Client};
       4                 : use tokio_stream::Stream;
       5                 : 
       6                 : use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
       7                 : use pageserver_api::shard::TenantShardId;
       8                 : use utils::id::TimelineId;
       9                 : 
      10                 : /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
      11 UBC           0 : pub fn stream_tenants<'a>(
      12               0 :     s3_client: &'a Client,
      13               0 :     target: &'a RootTarget,
      14               0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
      15               0 :     try_stream! {
      16               0 :         let mut continuation_token = None;
      17               0 :         let tenants_target = target.tenants_root();
      18                 :         loop {
      19               0 :             let fetch_response =
      20               0 :                 list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
      21                 : 
      22               0 :             let new_entry_ids = fetch_response
      23               0 :                 .common_prefixes()
      24               0 :                 .iter()
      25               0 :                 .filter_map(|prefix| prefix.prefix())
      26               0 :                 .filter_map(|prefix| -> Option<&str> {
      27               0 :                     prefix
      28               0 :                         .strip_prefix(&tenants_target.prefix_in_bucket)?
      29               0 :                         .strip_suffix('/')
      30               0 :                 }).map(|entry_id_str| {
      31               0 :                 entry_id_str
      32               0 :                     .parse()
      33               0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
      34               0 :             });
      35                 : 
      36               0 :             for i in new_entry_ids {
      37               0 :                 yield i?;
      38                 :             }
      39                 : 
      40               0 :             match fetch_response.next_continuation_token {
      41               0 :                 Some(new_token) => continuation_token = Some(new_token),
      42               0 :                 None => break,
      43                 :             }
      44                 :         }
      45                 :     }
      46               0 : }
      47                 : 
      48                 : /// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
      49                 : /// using ListObjectsv2.  The listing is done before the stream is built, so that this
      50                 : /// function can be used to generate concurrency on a stream using buffer_unordered.
      51               0 : pub async fn stream_tenant_timelines<'a>(
      52               0 :     s3_client: &'a Client,
      53               0 :     target: &'a RootTarget,
      54               0 :     tenant: TenantShardId,
      55               0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
      56               0 :     let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
      57               0 :     let mut continuation_token = None;
      58               0 :     let timelines_target = target.timelines_root(&tenant);
      59                 : 
      60                 :     loop {
      61               0 :         tracing::info!("Listing in {}", tenant);
      62               0 :         let fetch_response =
      63               0 :             list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
      64               0 :                 .await;
      65               0 :         let fetch_response = match fetch_response {
      66               0 :             Err(e) => {
      67               0 :                 timeline_ids.push(Err(e));
      68               0 :                 break;
      69                 :             }
      70               0 :             Ok(r) => r,
      71               0 :         };
      72               0 : 
      73               0 :         let new_entry_ids = fetch_response
      74               0 :             .common_prefixes()
      75               0 :             .iter()
      76               0 :             .filter_map(|prefix| prefix.prefix())
      77               0 :             .filter_map(|prefix| -> Option<&str> {
      78               0 :                 prefix
      79               0 :                     .strip_prefix(&timelines_target.prefix_in_bucket)?
      80               0 :                     .strip_suffix('/')
      81               0 :             })
      82               0 :             .map(|entry_id_str| {
      83               0 :                 entry_id_str
      84               0 :                     .parse::<TimelineId>()
      85               0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
      86               0 :             });
      87                 : 
      88               0 :         for i in new_entry_ids {
      89               0 :             timeline_ids.push(i);
      90               0 :         }
      91                 : 
      92               0 :         match fetch_response.next_continuation_token {
      93               0 :             Some(new_token) => continuation_token = Some(new_token),
      94               0 :             None => break,
      95                 :         }
      96                 :     }
      97                 : 
      98               0 :     tracing::info!("Yielding for {}", tenant);
      99               0 :     Ok(stream! {
     100               0 :         for i in timeline_ids {
     101               0 :             let id = i?;
     102               0 :             yield Ok(TenantShardTimelineId::new(tenant, id));
     103                 :         }
     104               0 :     })
     105               0 : }
     106                 : 
     107               0 : pub(crate) fn stream_listing<'a>(
     108               0 :     s3_client: &'a Client,
     109               0 :     target: &'a S3Target,
     110               0 : ) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
     111               0 :     try_stream! {
     112               0 :         let mut continuation_token = None;
     113                 :         loop {
     114               0 :             let fetch_response =
     115               0 :                 list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
     116                 : 
     117               0 :             if target.delimiter.is_empty() {
     118               0 :                 for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
     119                 :                 {
     120               0 :                     let object_id = ObjectIdentifier::builder().key(object_key).build()?;
     121               0 :                     yield object_id;
     122                 :                 }
     123                 :             } else {
     124               0 :                 for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
     125               0 :                     let object_id = ObjectIdentifier::builder().key(prefix).build()?;
     126               0 :                     yield object_id;
     127                 :                 }
     128                 :             }
     129                 : 
     130               0 :             match fetch_response.next_continuation_token {
     131               0 :                 Some(new_token) => continuation_token = Some(new_token),
     132               0 :                 None => break,
     133                 :             }
     134                 :         }
     135                 :     }
     136               0 : }
        

Generated by: LCOV version 2.1-beta