LCOV - code coverage report
Current view: top level - storage_scrubber/src - metadata_stream.rs (source / functions) Coverage Total Hit
Test: e402c46de0a007db6b48dddbde450ddbb92e6ceb.info Lines: 0.0 % 110 0
Test Date: 2024-06-25 10:31:23 Functions: 0.0 % 20 0

            Line data    Source code
       1              : use anyhow::Context;
       2              : use async_stream::{stream, try_stream};
       3              : use aws_sdk_s3::{types::ObjectIdentifier, Client};
       4              : use tokio_stream::Stream;
       5              : 
       6              : use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
       7              : use pageserver_api::shard::TenantShardId;
       8              : use utils::id::{TenantId, TimelineId};
       9              : 
      10              : /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
      11            0 : pub fn stream_tenants<'a>(
      12            0 :     s3_client: &'a Client,
      13            0 :     target: &'a RootTarget,
      14            0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
      15              :     try_stream! {
      16              :         let mut continuation_token = None;
      17              :         let tenants_target = target.tenants_root();
      18              :         loop {
      19              :             let fetch_response =
      20              :                 list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
      21              : 
      22              :             let new_entry_ids = fetch_response
      23              :                 .common_prefixes()
      24              :                 .iter()
      25            0 :                 .filter_map(|prefix| prefix.prefix())
      26            0 :                 .filter_map(|prefix| -> Option<&str> {
      27            0 :                     prefix
      28            0 :                         .strip_prefix(&tenants_target.prefix_in_bucket)?
      29            0 :                         .strip_suffix('/')
      30            0 :                 }).map(|entry_id_str| {
      31            0 :                 entry_id_str
      32            0 :                     .parse()
      33            0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
      34            0 :             });
      35              : 
      36              :             for i in new_entry_ids {
      37              :                 yield i?;
      38              :             }
      39              : 
      40              :             match fetch_response.next_continuation_token {
      41              :                 Some(new_token) => continuation_token = Some(new_token),
      42              :                 None => break,
      43              :             }
      44              :         }
      45              :     }
      46            0 : }
      47              : 
      48            0 : pub async fn stream_tenant_shards<'a>(
      49            0 :     s3_client: &'a Client,
      50            0 :     target: &'a RootTarget,
      51            0 :     tenant_id: TenantId,
      52            0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
      53            0 :     let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
      54            0 :     let mut continuation_token = None;
      55            0 :     let shards_target = target.tenant_shards_prefix(&tenant_id);
      56              : 
      57            0 :     loop {
      58            0 :         tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
      59            0 :         let fetch_response =
      60            0 :             list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
      61            0 :         let fetch_response = match fetch_response {
      62            0 :             Err(e) => {
      63            0 :                 tenant_shard_ids.push(Err(e));
      64            0 :                 break;
      65              :             }
      66            0 :             Ok(r) => r,
      67            0 :         };
      68            0 : 
      69            0 :         let new_entry_ids = fetch_response
      70            0 :             .common_prefixes()
      71            0 :             .iter()
      72            0 :             .filter_map(|prefix| prefix.prefix())
      73            0 :             .filter_map(|prefix| -> Option<&str> {
      74            0 :                 prefix
      75            0 :                     .strip_prefix(&target.tenants_root().prefix_in_bucket)?
      76            0 :                     .strip_suffix('/')
      77            0 :             })
      78            0 :             .map(|entry_id_str| {
      79            0 :                 let first_part = entry_id_str.split('/').next().unwrap();
      80            0 : 
      81            0 :                 first_part
      82            0 :                     .parse::<TenantShardId>()
      83            0 :                     .with_context(|| format!("Incorrect entry id str: {first_part}"))
      84            0 :             });
      85              : 
      86            0 :         for i in new_entry_ids {
      87            0 :             tenant_shard_ids.push(i);
      88            0 :         }
      89              : 
      90            0 :         match fetch_response.next_continuation_token {
      91            0 :             Some(new_token) => continuation_token = Some(new_token),
      92            0 :             None => break,
      93              :         }
      94              :     }
      95              : 
      96            0 :     Ok(stream! {
      97              :         for i in tenant_shard_ids {
      98              :             let id = i?;
      99              :             yield Ok(id);
     100              :         }
     101            0 :     })
     102            0 : }
     103              : 
     104              : /// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
     105              : /// using ListObjectsv2.  The listing is done before the stream is built, so that this
     106              : /// function can be used to generate concurrency on a stream using buffer_unordered.
     107            0 : pub async fn stream_tenant_timelines<'a>(
     108            0 :     s3_client: &'a Client,
     109            0 :     target: &'a RootTarget,
     110            0 :     tenant: TenantShardId,
     111            0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
     112            0 :     let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
     113            0 :     let mut continuation_token = None;
     114            0 :     let timelines_target = target.timelines_root(&tenant);
     115              : 
     116            0 :     loop {
     117            0 :         tracing::debug!("Listing in {}", tenant);
     118            0 :         let fetch_response =
     119            0 :             list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
     120            0 :                 .await;
     121            0 :         let fetch_response = match fetch_response {
     122            0 :             Err(e) => {
     123            0 :                 timeline_ids.push(Err(e));
     124            0 :                 break;
     125              :             }
     126            0 :             Ok(r) => r,
     127            0 :         };
     128            0 : 
     129            0 :         let new_entry_ids = fetch_response
     130            0 :             .common_prefixes()
     131            0 :             .iter()
     132            0 :             .filter_map(|prefix| prefix.prefix())
     133            0 :             .filter_map(|prefix| -> Option<&str> {
     134            0 :                 prefix
     135            0 :                     .strip_prefix(&timelines_target.prefix_in_bucket)?
     136            0 :                     .strip_suffix('/')
     137            0 :             })
     138            0 :             .map(|entry_id_str| {
     139            0 :                 entry_id_str
     140            0 :                     .parse::<TimelineId>()
     141            0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
     142            0 :             });
     143              : 
     144            0 :         for i in new_entry_ids {
     145            0 :             timeline_ids.push(i);
     146            0 :         }
     147              : 
     148            0 :         match fetch_response.next_continuation_token {
     149            0 :             Some(new_token) => continuation_token = Some(new_token),
     150            0 :             None => break,
     151              :         }
     152              :     }
     153              : 
     154            0 :     tracing::debug!("Yielding for {}", tenant);
     155            0 :     Ok(stream! {
     156              :         for i in timeline_ids {
     157              :             let id = i?;
     158              :             yield Ok(TenantShardTimelineId::new(tenant, id));
     159              :         }
     160            0 :     })
     161            0 : }
     162              : 
     163            0 : pub(crate) fn stream_listing<'a>(
     164            0 :     s3_client: &'a Client,
     165            0 :     target: &'a S3Target,
     166            0 : ) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
     167              :     try_stream! {
     168              :         let mut continuation_token = None;
     169              :         loop {
     170              :             let fetch_response =
     171              :                 list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
     172              : 
     173              :             if target.delimiter.is_empty() {
     174            0 :                 for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
     175              :                 {
     176              :                     let object_id = ObjectIdentifier::builder().key(object_key).build()?;
     177              :                     yield object_id;
     178              :                 }
     179              :             } else {
     180            0 :                 for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
     181              :                     let object_id = ObjectIdentifier::builder().key(prefix).build()?;
     182              :                     yield object_id;
     183              :                 }
     184              :             }
     185              : 
     186              :             match fetch_response.next_continuation_token {
     187              :                 Some(new_token) => continuation_token = Some(new_token),
     188              :                 None => break,
     189              :             }
     190              :         }
     191              :     }
     192            0 : }
        

Generated by: LCOV version 2.1-beta