Line data Source code
1 : use anyhow::Context;
2 : use async_stream::{stream, try_stream};
3 : use aws_sdk_s3::{types::ObjectIdentifier, Client};
4 : use tokio_stream::Stream;
5 :
6 : use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
7 : use pageserver_api::shard::TenantShardId;
8 : use utils::id::{TenantId, TimelineId};
9 :
10 : /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
11 0 : pub fn stream_tenants<'a>(
12 0 : s3_client: &'a Client,
13 0 : target: &'a RootTarget,
14 0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
15 : try_stream! {
16 : let mut continuation_token = None;
17 : let tenants_target = target.tenants_root();
18 : loop {
19 : let fetch_response =
20 : list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
21 :
22 : let new_entry_ids = fetch_response
23 : .common_prefixes()
24 : .iter()
25 0 : .filter_map(|prefix| prefix.prefix())
26 0 : .filter_map(|prefix| -> Option<&str> {
27 0 : prefix
28 0 : .strip_prefix(&tenants_target.prefix_in_bucket)?
29 0 : .strip_suffix('/')
30 0 : }).map(|entry_id_str| {
31 0 : entry_id_str
32 0 : .parse()
33 0 : .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
34 0 : });
35 :
36 : for i in new_entry_ids {
37 : yield i?;
38 : }
39 :
40 : match fetch_response.next_continuation_token {
41 : Some(new_token) => continuation_token = Some(new_token),
42 : None => break,
43 : }
44 : }
45 : }
46 0 : }
47 :
48 0 : pub async fn stream_tenant_shards<'a>(
49 0 : s3_client: &'a Client,
50 0 : target: &'a RootTarget,
51 0 : tenant_id: TenantId,
52 0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
53 0 : let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
54 0 : let mut continuation_token = None;
55 0 : let shards_target = target.tenant_shards_prefix(&tenant_id);
56 :
57 0 : loop {
58 0 : tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
59 0 : let fetch_response =
60 0 : list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
61 0 : let fetch_response = match fetch_response {
62 0 : Err(e) => {
63 0 : tenant_shard_ids.push(Err(e));
64 0 : break;
65 : }
66 0 : Ok(r) => r,
67 0 : };
68 0 :
69 0 : let new_entry_ids = fetch_response
70 0 : .common_prefixes()
71 0 : .iter()
72 0 : .filter_map(|prefix| prefix.prefix())
73 0 : .filter_map(|prefix| -> Option<&str> {
74 0 : prefix
75 0 : .strip_prefix(&target.tenants_root().prefix_in_bucket)?
76 0 : .strip_suffix('/')
77 0 : })
78 0 : .map(|entry_id_str| {
79 0 : let first_part = entry_id_str.split('/').next().unwrap();
80 0 :
81 0 : first_part
82 0 : .parse::<TenantShardId>()
83 0 : .with_context(|| format!("Incorrect entry id str: {first_part}"))
84 0 : });
85 :
86 0 : for i in new_entry_ids {
87 0 : tenant_shard_ids.push(i);
88 0 : }
89 :
90 0 : match fetch_response.next_continuation_token {
91 0 : Some(new_token) => continuation_token = Some(new_token),
92 0 : None => break,
93 : }
94 : }
95 :
96 0 : Ok(stream! {
97 : for i in tenant_shard_ids {
98 : let id = i?;
99 : yield Ok(id);
100 : }
101 0 : })
102 0 : }
103 :
104 : /// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
105 : /// using ListObjectsv2. The listing is done before the stream is built, so that this
106 : /// function can be used to generate concurrency on a stream using buffer_unordered.
107 0 : pub async fn stream_tenant_timelines<'a>(
108 0 : s3_client: &'a Client,
109 0 : target: &'a RootTarget,
110 0 : tenant: TenantShardId,
111 0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
112 0 : let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
113 0 : let mut continuation_token = None;
114 0 : let timelines_target = target.timelines_root(&tenant);
115 :
116 0 : loop {
117 0 : tracing::debug!("Listing in {}", tenant);
118 0 : let fetch_response =
119 0 : list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
120 0 : .await;
121 0 : let fetch_response = match fetch_response {
122 0 : Err(e) => {
123 0 : timeline_ids.push(Err(e));
124 0 : break;
125 : }
126 0 : Ok(r) => r,
127 0 : };
128 0 :
129 0 : let new_entry_ids = fetch_response
130 0 : .common_prefixes()
131 0 : .iter()
132 0 : .filter_map(|prefix| prefix.prefix())
133 0 : .filter_map(|prefix| -> Option<&str> {
134 0 : prefix
135 0 : .strip_prefix(&timelines_target.prefix_in_bucket)?
136 0 : .strip_suffix('/')
137 0 : })
138 0 : .map(|entry_id_str| {
139 0 : entry_id_str
140 0 : .parse::<TimelineId>()
141 0 : .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
142 0 : });
143 :
144 0 : for i in new_entry_ids {
145 0 : timeline_ids.push(i);
146 0 : }
147 :
148 0 : match fetch_response.next_continuation_token {
149 0 : Some(new_token) => continuation_token = Some(new_token),
150 0 : None => break,
151 : }
152 : }
153 :
154 0 : tracing::debug!("Yielding for {}", tenant);
155 0 : Ok(stream! {
156 : for i in timeline_ids {
157 : let id = i?;
158 : yield Ok(TenantShardTimelineId::new(tenant, id));
159 : }
160 0 : })
161 0 : }
162 :
163 0 : pub(crate) fn stream_listing<'a>(
164 0 : s3_client: &'a Client,
165 0 : target: &'a S3Target,
166 0 : ) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
167 : try_stream! {
168 : let mut continuation_token = None;
169 : loop {
170 : let fetch_response =
171 : list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
172 :
173 : if target.delimiter.is_empty() {
174 0 : for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
175 : {
176 : let object_id = ObjectIdentifier::builder().key(object_key).build()?;
177 : yield object_id;
178 : }
179 : } else {
180 0 : for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
181 : let object_id = ObjectIdentifier::builder().key(prefix).build()?;
182 : yield object_id;
183 : }
184 : }
185 :
186 : match fetch_response.next_continuation_token {
187 : Some(new_token) => continuation_token = Some(new_token),
188 : None => break,
189 : }
190 : }
191 : }
192 0 : }
|