Line data Source code
1 : use std::str::FromStr;
2 :
3 : use anyhow::{anyhow, Context};
4 : use async_stream::{stream, try_stream};
5 : use aws_sdk_s3::{types::ObjectIdentifier, Client};
6 : use futures::StreamExt;
7 : use remote_storage::{GenericRemoteStorage, ListingMode};
8 : use tokio_stream::Stream;
9 :
10 : use crate::{
11 : list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
12 : TenantShardTimelineId,
13 : };
14 : use pageserver_api::shard::TenantShardId;
15 : use utils::id::{TenantId, TimelineId};
16 :
17 : /// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
18 0 : pub fn stream_tenants_generic<'a>(
19 0 : remote_client: &'a GenericRemoteStorage,
20 0 : target: &'a RootTarget,
21 0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
22 : try_stream! {
23 : let tenants_target = target.tenants_root();
24 : let mut tenants_stream =
25 : std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
26 : while let Some(chunk) = tenants_stream.next().await {
27 : let chunk = chunk?;
28 : let entry_ids = chunk.prefixes.iter()
29 0 : .map(|prefix| prefix.get_path().file_name().ok_or_else(|| anyhow!("no final component in path '{prefix}'")));
30 : for dir_name_res in entry_ids {
31 : let dir_name = dir_name_res?;
32 : let id = TenantShardId::from_str(dir_name)?;
33 : yield id;
34 : }
35 : }
36 : }
37 0 : }
38 :
39 : /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
40 0 : pub fn stream_tenants<'a>(
41 0 : s3_client: &'a Client,
42 0 : target: &'a RootTarget,
43 0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
44 : try_stream! {
45 : let mut continuation_token = None;
46 : let tenants_target = target.tenants_root();
47 : loop {
48 : let fetch_response =
49 : list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
50 :
51 : let new_entry_ids = fetch_response
52 : .common_prefixes()
53 : .iter()
54 0 : .filter_map(|prefix| prefix.prefix())
55 0 : .filter_map(|prefix| -> Option<&str> {
56 0 : prefix
57 0 : .strip_prefix(&tenants_target.prefix_in_bucket)?
58 0 : .strip_suffix('/')
59 0 : }).map(|entry_id_str| {
60 0 : entry_id_str
61 0 : .parse()
62 0 : .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
63 0 : });
64 :
65 : for i in new_entry_ids {
66 : yield i?;
67 : }
68 :
69 : match fetch_response.next_continuation_token {
70 : Some(new_token) => continuation_token = Some(new_token),
71 : None => break,
72 : }
73 : }
74 : }
75 0 : }
76 :
77 0 : pub async fn stream_tenant_shards<'a>(
78 0 : s3_client: &'a Client,
79 0 : target: &'a RootTarget,
80 0 : tenant_id: TenantId,
81 0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
82 0 : let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
83 0 : let mut continuation_token = None;
84 0 : let shards_target = target.tenant_shards_prefix(&tenant_id);
85 :
86 0 : loop {
87 0 : tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
88 0 : let fetch_response =
89 0 : list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
90 0 : let fetch_response = match fetch_response {
91 0 : Err(e) => {
92 0 : tenant_shard_ids.push(Err(e));
93 0 : break;
94 : }
95 0 : Ok(r) => r,
96 0 : };
97 0 :
98 0 : let new_entry_ids = fetch_response
99 0 : .common_prefixes()
100 0 : .iter()
101 0 : .filter_map(|prefix| prefix.prefix())
102 0 : .filter_map(|prefix| -> Option<&str> {
103 0 : prefix
104 0 : .strip_prefix(&target.tenants_root().prefix_in_bucket)?
105 0 : .strip_suffix('/')
106 0 : })
107 0 : .map(|entry_id_str| {
108 0 : let first_part = entry_id_str.split('/').next().unwrap();
109 0 :
110 0 : first_part
111 0 : .parse::<TenantShardId>()
112 0 : .with_context(|| format!("Incorrect entry id str: {first_part}"))
113 0 : });
114 :
115 0 : for i in new_entry_ids {
116 0 : tenant_shard_ids.push(i);
117 0 : }
118 :
119 0 : match fetch_response.next_continuation_token {
120 0 : Some(new_token) => continuation_token = Some(new_token),
121 0 : None => break,
122 : }
123 : }
124 :
125 0 : Ok(stream! {
126 : for i in tenant_shard_ids {
127 : let id = i?;
128 : yield Ok(id);
129 : }
130 0 : })
131 0 : }
132 :
133 : /// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
134 : /// using ListObjectsv2. The listing is done before the stream is built, so that this
135 : /// function can be used to generate concurrency on a stream using buffer_unordered.
136 0 : pub async fn stream_tenant_timelines<'a>(
137 0 : s3_client: &'a Client,
138 0 : target: &'a RootTarget,
139 0 : tenant: TenantShardId,
140 0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
141 0 : let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
142 0 : let mut continuation_token = None;
143 0 : let timelines_target = target.timelines_root(&tenant);
144 :
145 0 : loop {
146 0 : tracing::debug!("Listing in {}", tenant);
147 0 : let fetch_response =
148 0 : list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
149 0 : .await;
150 0 : let fetch_response = match fetch_response {
151 0 : Err(e) => {
152 0 : timeline_ids.push(Err(e));
153 0 : break;
154 : }
155 0 : Ok(r) => r,
156 0 : };
157 0 :
158 0 : let new_entry_ids = fetch_response
159 0 : .common_prefixes()
160 0 : .iter()
161 0 : .filter_map(|prefix| prefix.prefix())
162 0 : .filter_map(|prefix| -> Option<&str> {
163 0 : prefix
164 0 : .strip_prefix(&timelines_target.prefix_in_bucket)?
165 0 : .strip_suffix('/')
166 0 : })
167 0 : .map(|entry_id_str| {
168 0 : entry_id_str
169 0 : .parse::<TimelineId>()
170 0 : .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
171 0 : });
172 :
173 0 : for i in new_entry_ids {
174 0 : timeline_ids.push(i);
175 0 : }
176 :
177 0 : match fetch_response.next_continuation_token {
178 0 : Some(new_token) => continuation_token = Some(new_token),
179 0 : None => break,
180 : }
181 : }
182 :
183 0 : tracing::debug!("Yielding for {}", tenant);
184 0 : Ok(stream! {
185 : for i in timeline_ids {
186 : let id = i?;
187 : yield Ok(TenantShardTimelineId::new(tenant, id));
188 : }
189 0 : })
190 0 : }
191 :
192 : /// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
193 : /// using a listing. The listing is done before the stream is built, so that this
194 : /// function can be used to generate concurrency on a stream using buffer_unordered.
195 0 : pub async fn stream_tenant_timelines_generic<'a>(
196 0 : remote_client: &'a GenericRemoteStorage,
197 0 : target: &'a RootTarget,
198 0 : tenant: TenantShardId,
199 0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
200 0 : let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
201 0 : let timelines_target = target.timelines_root(&tenant);
202 0 :
203 0 : let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
204 0 : remote_client,
205 0 : ListingMode::WithDelimiter,
206 0 : &timelines_target
207 0 : ));
208 0 : loop {
209 0 : tracing::debug!("Listing in {tenant}");
210 0 : let fetch_response = match objects_stream.next().await {
211 0 : None => break,
212 0 : Some(Err(e)) => {
213 0 : timeline_ids.push(Err(e));
214 0 : break;
215 : }
216 0 : Some(Ok(r)) => r,
217 0 : };
218 0 :
219 0 : let new_entry_ids = fetch_response
220 0 : .prefixes
221 0 : .iter()
222 0 : .filter_map(|prefix| -> Option<&str> {
223 0 : prefix
224 0 : .get_path()
225 0 : .as_str()
226 0 : .strip_prefix(&timelines_target.prefix_in_bucket)?
227 0 : .strip_suffix('/')
228 0 : })
229 0 : .map(|entry_id_str| {
230 0 : entry_id_str
231 0 : .parse::<TimelineId>()
232 0 : .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
233 0 : });
234 :
235 0 : for i in new_entry_ids {
236 0 : timeline_ids.push(i);
237 0 : }
238 : }
239 :
240 0 : tracing::debug!("Yielding for {}", tenant);
241 0 : Ok(stream! {
242 : for i in timeline_ids {
243 : let id = i?;
244 : yield Ok(TenantShardTimelineId::new(tenant, id));
245 : }
246 0 : })
247 0 : }
248 :
249 0 : pub(crate) fn stream_listing<'a>(
250 0 : s3_client: &'a Client,
251 0 : target: &'a S3Target,
252 0 : ) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
253 : try_stream! {
254 : let mut continuation_token = None;
255 : loop {
256 : let fetch_response =
257 : list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
258 :
259 : if target.delimiter.is_empty() {
260 0 : for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
261 : {
262 : let object_id = ObjectIdentifier::builder().key(object_key).build()?;
263 : yield object_id;
264 : }
265 : } else {
266 0 : for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
267 : let object_id = ObjectIdentifier::builder().key(prefix).build()?;
268 : yield object_id;
269 : }
270 : }
271 :
272 : match fetch_response.next_continuation_token {
273 : Some(new_token) => continuation_token = Some(new_token),
274 : None => break,
275 : }
276 : }
277 : }
278 0 : }
|