LCOV - code coverage report
Current view: top level - s3_scrubber/src - metadata_stream.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 84 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 15 0

            Line data    Source code
       1              : use anyhow::Context;
       2              : use async_stream::{stream, try_stream};
       3              : use aws_sdk_s3::Client;
       4              : use tokio_stream::Stream;
       5              : 
       6              : use crate::{list_objects_with_retries, RootTarget, TenantId};
       7              : use utils::id::{TenantTimelineId, TimelineId};
       8              : 
       9              : /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
      10            0 : pub fn stream_tenants<'a>(
      11            0 :     s3_client: &'a Client,
      12            0 :     target: &'a RootTarget,
      13            0 : ) -> impl Stream<Item = anyhow::Result<TenantId>> + 'a {
      14            0 :     try_stream! {
      15            0 :         let mut continuation_token = None;
      16            0 :         loop {
      17            0 :             let tenants_target = target.tenants_root();
      18            0 :             let fetch_response =
      19            0 :                 list_objects_with_retries(s3_client, tenants_target, continuation_token.clone()).await?;
      20            0 : 
      21            0 :             let new_entry_ids = fetch_response
      22            0 :                 .common_prefixes()
      23            0 :                 .unwrap_or_default()
      24            0 :                 .iter()
      25            0 :                 .filter_map(|prefix| prefix.prefix())
      26            0 :                 .filter_map(|prefix| -> Option<&str> {
      27            0 :                     prefix
      28            0 :                         .strip_prefix(&tenants_target.prefix_in_bucket)?
      29            0 :                         .strip_suffix('/')
      30            0 :                 }).map(|entry_id_str| {
      31            0 :                 entry_id_str
      32            0 :                     .parse()
      33            0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
      34            0 :             });
      35            0 : 
      36            0 :             for i in new_entry_ids {
      37            0 :                 yield i?;
      38            0 :             }
      39            0 : 
      40            0 :             match fetch_response.next_continuation_token {
      41            0 :                 Some(new_token) => continuation_token = Some(new_token),
      42            0 :                 None => break,
      43            0 :             }
      44            0 :         }
      45            0 :     }
      46            0 : }
      47              : 
      48              : /// Given a TenantId, output a stream of the timelines within that tenant, discovered
      49              : /// using ListObjectsv2.  The listing is done before the stream is built, so that this
      50              : /// function can be used to generate concurrency on a stream using buffer_unordered.
      51            0 : pub async fn stream_tenant_timelines<'a>(
      52            0 :     s3_client: &'a Client,
      53            0 :     target: &'a RootTarget,
      54            0 :     tenant: TenantId,
      55            0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantTimelineId, anyhow::Error>> + 'a> {
      56            0 :     let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
      57            0 :     let mut continuation_token = None;
      58            0 :     let timelines_target = target.timelines_root(&tenant);
      59              : 
      60              :     loop {
      61            0 :         tracing::info!("Listing in {}", tenant);
      62            0 :         let fetch_response =
      63            0 :             list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
      64            0 :                 .await;
      65            0 :         let fetch_response = match fetch_response {
      66            0 :             Err(e) => {
      67            0 :                 timeline_ids.push(Err(e));
      68            0 :                 break;
      69              :             }
      70            0 :             Ok(r) => r,
      71            0 :         };
      72            0 : 
      73            0 :         let new_entry_ids = fetch_response
      74            0 :             .common_prefixes()
      75            0 :             .unwrap_or_default()
      76            0 :             .iter()
      77            0 :             .filter_map(|prefix| prefix.prefix())
      78            0 :             .filter_map(|prefix| -> Option<&str> {
      79            0 :                 prefix
      80            0 :                     .strip_prefix(&timelines_target.prefix_in_bucket)?
      81            0 :                     .strip_suffix('/')
      82            0 :             })
      83            0 :             .map(|entry_id_str| {
      84            0 :                 entry_id_str
      85            0 :                     .parse::<TimelineId>()
      86            0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
      87            0 :             });
      88              : 
      89            0 :         for i in new_entry_ids {
      90            0 :             timeline_ids.push(i);
      91            0 :         }
      92              : 
      93            0 :         match fetch_response.next_continuation_token {
      94            0 :             Some(new_token) => continuation_token = Some(new_token),
      95            0 :             None => break,
      96              :         }
      97              :     }
      98              : 
      99            0 :     tracing::info!("Yielding for {}", tenant);
     100            0 :     Ok(stream! {
     101            0 :         for i in timeline_ids {
     102            0 :             let id = i?;
     103            0 :             yield Ok(TenantTimelineId::new(tenant, id));
     104              :         }
     105            0 :     })
     106            0 : }
        

Generated by: LCOV version 2.1-beta