LCOV - code coverage report
Current view: top level - storage_scrubber/src - metadata_stream.rs (source / functions) Coverage Total Hit
Test: 52d9d4a58355424a48c56cb9ba9670a073f618b9.info Lines: 0.0 % 143 0
Test Date: 2024-11-21 08:31:22 Functions: 0.0 % 16 0

            Line data    Source code
       1              : use std::str::FromStr;
       2              : 
       3              : use anyhow::{anyhow, Context};
       4              : use async_stream::{stream, try_stream};
       5              : use futures::StreamExt;
       6              : use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
       7              : use tokio_stream::Stream;
       8              : 
       9              : use crate::{
      10              :     list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
      11              :     TenantShardTimelineId,
      12              : };
      13              : use pageserver_api::shard::TenantShardId;
      14              : use utils::id::{TenantId, TimelineId};
      15              : 
      16              : /// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
      17            0 : pub fn stream_tenants<'a>(
      18            0 :     remote_client: &'a GenericRemoteStorage,
      19            0 :     target: &'a RootTarget,
      20            0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
      21            0 :     stream_tenants_maybe_prefix(remote_client, target, None)
      22            0 : }
      23              : /// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
      24            0 : pub fn stream_tenants_maybe_prefix<'a>(
      25            0 :     remote_client: &'a GenericRemoteStorage,
      26            0 :     target: &'a RootTarget,
      27            0 :     tenant_id_prefix: Option<String>,
      28            0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
      29            0 :     try_stream! {
      30            0 :         let mut tenants_target = target.tenants_root();
      31            0 :         if let Some(tenant_id_prefix) = tenant_id_prefix {
      32            0 :             tenants_target.prefix_in_bucket += &tenant_id_prefix;
      33            0 :         }
      34            0 :         let mut tenants_stream =
      35            0 :             std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
      36            0 :         while let Some(chunk) = tenants_stream.next().await {
      37            0 :             let chunk = chunk?;
      38            0 :             let entry_ids = chunk.prefixes.iter()
      39            0 :                 .map(|prefix| prefix.get_path().file_name().ok_or_else(|| anyhow!("no final component in path '{prefix}'")));
      40            0 :             for dir_name_res in entry_ids {
      41            0 :                 let dir_name = dir_name_res?;
      42            0 :                 let id = TenantShardId::from_str(dir_name)?;
      43            0 :                 yield id;
      44            0 :             }
      45            0 :         }
      46            0 :     }
      47            0 : }
      48              : 
      49            0 : pub async fn stream_tenant_shards<'a>(
      50            0 :     remote_client: &'a GenericRemoteStorage,
      51            0 :     target: &'a RootTarget,
      52            0 :     tenant_id: TenantId,
      53            0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
      54            0 :     let shards_target = target.tenant_shards_prefix(&tenant_id);
      55            0 : 
      56            0 :     let strip_prefix = target.tenants_root().prefix_in_bucket;
      57            0 :     let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix);
      58            0 : 
      59            0 :     tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket);
      60            0 :     let listing =
      61            0 :         list_objects_with_retries(remote_client, ListingMode::WithDelimiter, &shards_target)
      62            0 :             .await?;
      63              : 
      64            0 :     let tenant_shard_ids = listing
      65            0 :         .prefixes
      66            0 :         .iter()
      67            0 :         .map(|prefix| prefix.get_path().as_str())
      68            0 :         .filter_map(|prefix| -> Option<&str> { prefix.strip_prefix(prefix_str) })
      69            0 :         .map(|entry_id_str| {
      70            0 :             let first_part = entry_id_str.split('/').next().unwrap();
      71            0 : 
      72            0 :             first_part
      73            0 :                 .parse::<TenantShardId>()
      74            0 :                 .with_context(|| format!("Incorrect tenant entry id str: {first_part}"))
      75            0 :         })
      76            0 :         .collect::<Vec<_>>();
      77            0 : 
      78            0 :     tracing::debug!("Yielding {} shards for {tenant_id}", tenant_shard_ids.len());
      79            0 :     Ok(stream! {
      80            0 :         for i in tenant_shard_ids {
      81            0 :             let id = i?;
      82            0 :             yield Ok(id);
      83            0 :         }
      84            0 :     })
      85            0 : }
      86              : 
      87              : /// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
      88              : /// using a listing.
      89              : ///
      90              : /// The listing is done before the stream is built, so that this
      91              : /// function can be used to generate concurrency on a stream using buffer_unordered.
      92            0 : pub async fn stream_tenant_timelines<'a>(
      93            0 :     remote_client: &'a GenericRemoteStorage,
      94            0 :     target: &'a RootTarget,
      95            0 :     tenant: TenantShardId,
      96            0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
      97            0 :     let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
      98            0 :     let timelines_target = target.timelines_root(&tenant);
      99            0 : 
     100            0 :     let prefix_str = &timelines_target
     101            0 :         .prefix_in_bucket
     102            0 :         .strip_prefix("/")
     103            0 :         .unwrap_or(&timelines_target.prefix_in_bucket);
     104            0 : 
     105            0 :     let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
     106            0 :         remote_client,
     107            0 :         ListingMode::WithDelimiter,
     108            0 :         &timelines_target
     109            0 :     ));
     110              :     loop {
     111            0 :         tracing::debug!("Listing in {tenant}");
     112            0 :         let fetch_response = match objects_stream.next().await {
     113            0 :             None => break,
     114            0 :             Some(Err(e)) => {
     115            0 :                 timeline_ids.push(Err(e));
     116            0 :                 break;
     117              :             }
     118            0 :             Some(Ok(r)) => r,
     119            0 :         };
     120            0 : 
     121            0 :         let new_entry_ids = fetch_response
     122            0 :             .prefixes
     123            0 :             .iter()
     124            0 :             .filter_map(|prefix| -> Option<&str> {
     125            0 :                 prefix.get_path().as_str().strip_prefix(prefix_str)
     126            0 :             })
     127            0 :             .map(|entry_id_str| {
     128            0 :                 let first_part = entry_id_str.split('/').next().unwrap();
     129            0 :                 first_part
     130            0 :                     .parse::<TimelineId>()
     131            0 :                     .with_context(|| format!("Incorrect timeline entry id str: {entry_id_str}"))
     132            0 :             });
     133              : 
     134            0 :         for i in new_entry_ids {
     135            0 :             timeline_ids.push(i);
     136            0 :         }
     137              :     }
     138              : 
     139            0 :     tracing::debug!("Yielding {} timelines for {}", timeline_ids.len(), tenant);
     140            0 :     Ok(stream! {
     141            0 :         for i in timeline_ids {
     142            0 :             let id = i?;
     143            0 :             yield Ok(TenantShardTimelineId::new(tenant, id));
     144            0 :         }
     145            0 :     })
     146            0 : }
     147              : 
     148            0 : pub(crate) fn stream_listing<'a>(
     149            0 :     remote_client: &'a GenericRemoteStorage,
     150            0 :     target: &'a S3Target,
     151            0 : ) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {
     152            0 :     let listing_mode = if target.delimiter.is_empty() {
     153            0 :         ListingMode::NoDelimiter
     154              :     } else {
     155            0 :         ListingMode::WithDelimiter
     156              :     };
     157            0 :     try_stream! {
     158            0 :         let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
     159            0 :             remote_client,
     160            0 :             listing_mode,
     161            0 :             target,
     162            0 :         ));
     163            0 :         while let Some(list) = objects_stream.next().await {
     164            0 :             let list = list?;
     165            0 :             if target.delimiter.is_empty() {
     166            0 :                 for key in list.keys {
     167            0 :                     yield (key.key.clone(), Some(key));
     168            0 :                 }
     169            0 :             } else {
     170            0 :                 for key in list.prefixes {
     171            0 :                     yield (key, None);
     172            0 :                 }
     173            0 :             }
     174            0 :         }
     175            0 :     }
     176            0 : }
        

Generated by: LCOV version 2.1-beta