TLA Line data Source code
1 : use anyhow::Context;
2 : use async_stream::{stream, try_stream};
3 : use aws_sdk_s3::Client;
4 : use tokio_stream::Stream;
5 :
6 : use crate::{list_objects_with_retries, RootTarget, TenantId};
7 : use utils::id::{TenantTimelineId, TimelineId};
8 :
9 : /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
10 UBC 0 : pub fn stream_tenants<'a>(
11 0 : s3_client: &'a Client,
12 0 : target: &'a RootTarget,
13 0 : ) -> impl Stream<Item = anyhow::Result<TenantId>> + 'a {
14 0 : try_stream! {
15 0 : let mut continuation_token = None;
16 0 : loop {
17 0 : let tenants_target = target.tenants_root();
18 0 : let fetch_response =
19 0 : list_objects_with_retries(s3_client, tenants_target, continuation_token.clone()).await?;
20 0 :
21 0 : let new_entry_ids = fetch_response
22 0 : .common_prefixes()
23 0 : .unwrap_or_default()
24 0 : .iter()
25 0 : .filter_map(|prefix| prefix.prefix())
26 0 : .filter_map(|prefix| -> Option<&str> {
27 0 : prefix
28 0 : .strip_prefix(&tenants_target.prefix_in_bucket)?
29 0 : .strip_suffix('/')
30 0 : }).map(|entry_id_str| {
31 0 : entry_id_str
32 0 : .parse()
33 0 : .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
34 0 : });
35 0 :
36 0 : for i in new_entry_ids {
37 0 : yield i?;
38 0 : }
39 0 :
40 0 : match fetch_response.next_continuation_token {
41 0 : Some(new_token) => continuation_token = Some(new_token),
42 0 : None => break,
43 0 : }
44 0 : }
45 0 : }
46 0 : }
47 :
48 : /// Given a TenantId, output a stream of the timelines within that tenant, discovered
49 : /// using ListObjectsv2. The listing is done before the stream is built, so that this
50 : /// function can be used to generate concurrency on a stream using buffer_unordered.
51 0 : pub async fn stream_tenant_timelines<'a>(
52 0 : s3_client: &'a Client,
53 0 : target: &'a RootTarget,
54 0 : tenant: TenantId,
55 0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantTimelineId, anyhow::Error>> + 'a> {
56 0 : let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
57 0 : let mut continuation_token = None;
58 0 : let timelines_target = target.timelines_root(&tenant);
59 :
60 : loop {
61 0 : tracing::info!("Listing in {}", tenant);
62 0 : let fetch_response =
63 0 : list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
64 0 : .await;
65 0 : let fetch_response = match fetch_response {
66 0 : Err(e) => {
67 0 : timeline_ids.push(Err(e));
68 0 : break;
69 : }
70 0 : Ok(r) => r,
71 0 : };
72 0 :
73 0 : let new_entry_ids = fetch_response
74 0 : .common_prefixes()
75 0 : .unwrap_or_default()
76 0 : .iter()
77 0 : .filter_map(|prefix| prefix.prefix())
78 0 : .filter_map(|prefix| -> Option<&str> {
79 0 : prefix
80 0 : .strip_prefix(&timelines_target.prefix_in_bucket)?
81 0 : .strip_suffix('/')
82 0 : })
83 0 : .map(|entry_id_str| {
84 0 : entry_id_str
85 0 : .parse::<TimelineId>()
86 0 : .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
87 0 : });
88 :
89 0 : for i in new_entry_ids {
90 0 : timeline_ids.push(i);
91 0 : }
92 :
93 0 : match fetch_response.next_continuation_token {
94 0 : Some(new_token) => continuation_token = Some(new_token),
95 0 : None => break,
96 : }
97 : }
98 :
99 0 : tracing::info!("Yielding for {}", tenant);
100 0 : Ok(stream! {
101 0 : for i in timeline_ids {
102 0 : let id = i?;
103 0 : yield Ok(TenantTimelineId::new(tenant, id));
104 : }
105 0 : })
106 0 : }
|