LCOV - code coverage report
Current view: top level - storage_scrubber/src - metadata_stream.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 0.0 % 161 0
Test Date: 2024-08-02 21:34:27 Functions: 0.0 % 28 0

            Line data    Source code
       1              : use std::str::FromStr;
       2              : 
       3              : use anyhow::{anyhow, Context};
       4              : use async_stream::{stream, try_stream};
       5              : use aws_sdk_s3::{types::ObjectIdentifier, Client};
       6              : use futures::StreamExt;
       7              : use remote_storage::{GenericRemoteStorage, ListingMode};
       8              : use tokio_stream::Stream;
       9              : 
      10              : use crate::{
      11              :     list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
      12              :     TenantShardTimelineId,
      13              : };
      14              : use pageserver_api::shard::TenantShardId;
      15              : use utils::id::{TenantId, TimelineId};
      16              : 
      17              : /// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
      18            0 : pub fn stream_tenants_generic<'a>(
      19            0 :     remote_client: &'a GenericRemoteStorage,
      20            0 :     target: &'a RootTarget,
      21            0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
      22              :     try_stream! {
      23              :         let tenants_target = target.tenants_root();
      24              :         let mut tenants_stream =
      25              :             std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
      26              :         while let Some(chunk) = tenants_stream.next().await {
      27              :             let chunk = chunk?;
      28              :             let entry_ids = chunk.prefixes.iter()
      29            0 :                 .map(|prefix| prefix.get_path().file_name().ok_or_else(|| anyhow!("no final component in path '{prefix}'")));
      30              :             for dir_name_res in entry_ids {
      31              :                 let dir_name = dir_name_res?;
      32              :                 let id = TenantShardId::from_str(dir_name)?;
      33              :                 yield id;
      34              :             }
      35              :         }
      36              :     }
      37            0 : }
      38              : 
      39              : /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
      40            0 : pub fn stream_tenants<'a>(
      41            0 :     s3_client: &'a Client,
      42            0 :     target: &'a RootTarget,
      43            0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
      44              :     try_stream! {
      45              :         let mut continuation_token = None;
      46              :         let tenants_target = target.tenants_root();
      47              :         loop {
      48              :             let fetch_response =
      49              :                 list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
      50              : 
      51              :             let new_entry_ids = fetch_response
      52              :                 .common_prefixes()
      53              :                 .iter()
      54            0 :                 .filter_map(|prefix| prefix.prefix())
      55            0 :                 .filter_map(|prefix| -> Option<&str> {
      56            0 :                     prefix
      57            0 :                         .strip_prefix(&tenants_target.prefix_in_bucket)?
      58            0 :                         .strip_suffix('/')
      59            0 :                 }).map(|entry_id_str| {
      60            0 :                 entry_id_str
      61            0 :                     .parse()
      62            0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
      63            0 :             });
      64              : 
      65              :             for i in new_entry_ids {
      66              :                 yield i?;
      67              :             }
      68              : 
      69              :             match fetch_response.next_continuation_token {
      70              :                 Some(new_token) => continuation_token = Some(new_token),
      71              :                 None => break,
      72              :             }
      73              :         }
      74              :     }
      75            0 : }
      76              : 
      77            0 : pub async fn stream_tenant_shards<'a>(
      78            0 :     s3_client: &'a Client,
      79            0 :     target: &'a RootTarget,
      80            0 :     tenant_id: TenantId,
      81            0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
      82            0 :     let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
      83            0 :     let mut continuation_token = None;
      84            0 :     let shards_target = target.tenant_shards_prefix(&tenant_id);
      85              : 
      86            0 :     loop {
      87            0 :         tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
      88            0 :         let fetch_response =
      89            0 :             list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
      90            0 :         let fetch_response = match fetch_response {
      91            0 :             Err(e) => {
      92            0 :                 tenant_shard_ids.push(Err(e));
      93            0 :                 break;
      94              :             }
      95            0 :             Ok(r) => r,
      96            0 :         };
      97            0 : 
      98            0 :         let new_entry_ids = fetch_response
      99            0 :             .common_prefixes()
     100            0 :             .iter()
     101            0 :             .filter_map(|prefix| prefix.prefix())
     102            0 :             .filter_map(|prefix| -> Option<&str> {
     103            0 :                 prefix
     104            0 :                     .strip_prefix(&target.tenants_root().prefix_in_bucket)?
     105            0 :                     .strip_suffix('/')
     106            0 :             })
     107            0 :             .map(|entry_id_str| {
     108            0 :                 let first_part = entry_id_str.split('/').next().unwrap();
     109            0 : 
     110            0 :                 first_part
     111            0 :                     .parse::<TenantShardId>()
     112            0 :                     .with_context(|| format!("Incorrect entry id str: {first_part}"))
     113            0 :             });
     114              : 
     115            0 :         for i in new_entry_ids {
     116            0 :             tenant_shard_ids.push(i);
     117            0 :         }
     118              : 
     119            0 :         match fetch_response.next_continuation_token {
     120            0 :             Some(new_token) => continuation_token = Some(new_token),
     121            0 :             None => break,
     122              :         }
     123              :     }
     124              : 
     125            0 :     Ok(stream! {
     126              :         for i in tenant_shard_ids {
     127              :             let id = i?;
     128              :             yield Ok(id);
     129              :         }
     130            0 :     })
     131            0 : }
     132              : 
     133              : /// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
     134              : /// using ListObjectsv2.  The listing is done before the stream is built, so that this
     135              : /// function can be used to generate concurrency on a stream using buffer_unordered.
     136            0 : pub async fn stream_tenant_timelines<'a>(
     137            0 :     s3_client: &'a Client,
     138            0 :     target: &'a RootTarget,
     139            0 :     tenant: TenantShardId,
     140            0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
     141            0 :     let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
     142            0 :     let mut continuation_token = None;
     143            0 :     let timelines_target = target.timelines_root(&tenant);
     144              : 
     145            0 :     loop {
     146            0 :         tracing::debug!("Listing in {}", tenant);
     147            0 :         let fetch_response =
     148            0 :             list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
     149            0 :                 .await;
     150            0 :         let fetch_response = match fetch_response {
     151            0 :             Err(e) => {
     152            0 :                 timeline_ids.push(Err(e));
     153            0 :                 break;
     154              :             }
     155            0 :             Ok(r) => r,
     156            0 :         };
     157            0 : 
     158            0 :         let new_entry_ids = fetch_response
     159            0 :             .common_prefixes()
     160            0 :             .iter()
     161            0 :             .filter_map(|prefix| prefix.prefix())
     162            0 :             .filter_map(|prefix| -> Option<&str> {
     163            0 :                 prefix
     164            0 :                     .strip_prefix(&timelines_target.prefix_in_bucket)?
     165            0 :                     .strip_suffix('/')
     166            0 :             })
     167            0 :             .map(|entry_id_str| {
     168            0 :                 entry_id_str
     169            0 :                     .parse::<TimelineId>()
     170            0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
     171            0 :             });
     172              : 
     173            0 :         for i in new_entry_ids {
     174            0 :             timeline_ids.push(i);
     175            0 :         }
     176              : 
     177            0 :         match fetch_response.next_continuation_token {
     178            0 :             Some(new_token) => continuation_token = Some(new_token),
     179            0 :             None => break,
     180              :         }
     181              :     }
     182              : 
     183            0 :     tracing::debug!("Yielding for {}", tenant);
     184            0 :     Ok(stream! {
     185              :         for i in timeline_ids {
     186              :             let id = i?;
     187              :             yield Ok(TenantShardTimelineId::new(tenant, id));
     188              :         }
     189            0 :     })
     190            0 : }
     191              : 
     192              : /// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
     193              : /// using a listing. The listing is done before the stream is built, so that this
     194              : /// function can be used to generate concurrency on a stream using buffer_unordered.
     195            0 : pub async fn stream_tenant_timelines_generic<'a>(
     196            0 :     remote_client: &'a GenericRemoteStorage,
     197            0 :     target: &'a RootTarget,
     198            0 :     tenant: TenantShardId,
     199            0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
     200            0 :     let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
     201            0 :     let timelines_target = target.timelines_root(&tenant);
     202            0 : 
     203            0 :     let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
     204            0 :         remote_client,
     205            0 :         ListingMode::WithDelimiter,
     206            0 :         &timelines_target
     207            0 :     ));
     208            0 :     loop {
     209            0 :         tracing::debug!("Listing in {tenant}");
     210            0 :         let fetch_response = match objects_stream.next().await {
     211            0 :             None => break,
     212            0 :             Some(Err(e)) => {
     213            0 :                 timeline_ids.push(Err(e));
     214            0 :                 break;
     215              :             }
     216            0 :             Some(Ok(r)) => r,
     217            0 :         };
     218            0 : 
     219            0 :         let new_entry_ids = fetch_response
     220            0 :             .prefixes
     221            0 :             .iter()
     222            0 :             .filter_map(|prefix| -> Option<&str> {
     223            0 :                 prefix
     224            0 :                     .get_path()
     225            0 :                     .as_str()
     226            0 :                     .strip_prefix(&timelines_target.prefix_in_bucket)?
     227            0 :                     .strip_suffix('/')
     228            0 :             })
     229            0 :             .map(|entry_id_str| {
     230            0 :                 entry_id_str
     231            0 :                     .parse::<TimelineId>()
     232            0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
     233            0 :             });
     234              : 
     235            0 :         for i in new_entry_ids {
     236            0 :             timeline_ids.push(i);
     237            0 :         }
     238              :     }
     239              : 
     240            0 :     tracing::debug!("Yielding for {}", tenant);
     241            0 :     Ok(stream! {
     242              :         for i in timeline_ids {
     243              :             let id = i?;
     244              :             yield Ok(TenantShardTimelineId::new(tenant, id));
     245              :         }
     246            0 :     })
     247            0 : }
     248              : 
     249            0 : pub(crate) fn stream_listing<'a>(
     250            0 :     s3_client: &'a Client,
     251            0 :     target: &'a S3Target,
     252            0 : ) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
     253              :     try_stream! {
     254              :         let mut continuation_token = None;
     255              :         loop {
     256              :             let fetch_response =
     257              :                 list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
     258              : 
     259              :             if target.delimiter.is_empty() {
     260            0 :                 for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
     261              :                 {
     262              :                     let object_id = ObjectIdentifier::builder().key(object_key).build()?;
     263              :                     yield object_id;
     264              :                 }
     265              :             } else {
     266            0 :                 for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
     267              :                     let object_id = ObjectIdentifier::builder().key(prefix).build()?;
     268              :                     yield object_id;
     269              :                 }
     270              :             }
     271              : 
     272              :             match fetch_response.next_continuation_token {
     273              :                 Some(new_token) => continuation_token = Some(new_token),
     274              :                 None => break,
     275              :             }
     276              :         }
     277              :     }
     278            0 : }
        

Generated by: LCOV version 2.1-beta