Line data Source code
1 : use std::str::FromStr;
2 :
3 : use anyhow::{anyhow, Context};
4 : use async_stream::{stream, try_stream};
5 : use futures::StreamExt;
6 : use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
7 : use tokio_stream::Stream;
8 :
9 : use crate::{
10 : list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
11 : TenantShardTimelineId,
12 : };
13 : use pageserver_api::shard::TenantShardId;
14 : use utils::id::{TenantId, TimelineId};
15 :
16 : /// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
17 0 : pub fn stream_tenants<'a>(
18 0 : remote_client: &'a GenericRemoteStorage,
19 0 : target: &'a RootTarget,
20 0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
21 0 : stream_tenants_maybe_prefix(remote_client, target, None)
22 0 : }
23 : /// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
24 0 : pub fn stream_tenants_maybe_prefix<'a>(
25 0 : remote_client: &'a GenericRemoteStorage,
26 0 : target: &'a RootTarget,
27 0 : tenant_id_prefix: Option<String>,
28 0 : ) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
29 0 : try_stream! {
30 0 : let mut tenants_target = target.tenants_root();
31 0 : if let Some(tenant_id_prefix) = tenant_id_prefix {
32 0 : tenants_target.prefix_in_bucket += &tenant_id_prefix;
33 0 : }
34 0 : let mut tenants_stream =
35 0 : std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
36 0 : while let Some(chunk) = tenants_stream.next().await {
37 0 : let chunk = chunk?;
38 0 : let entry_ids = chunk.prefixes.iter()
39 0 : .map(|prefix| prefix.get_path().file_name().ok_or_else(|| anyhow!("no final component in path '{prefix}'")));
40 0 : for dir_name_res in entry_ids {
41 0 : let dir_name = dir_name_res?;
42 0 : let id = TenantShardId::from_str(dir_name)?;
43 0 : yield id;
44 0 : }
45 0 : }
46 0 : }
47 0 : }
48 :
49 0 : pub async fn stream_tenant_shards<'a>(
50 0 : remote_client: &'a GenericRemoteStorage,
51 0 : target: &'a RootTarget,
52 0 : tenant_id: TenantId,
53 0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
54 0 : let shards_target = target.tenant_shards_prefix(&tenant_id);
55 0 :
56 0 : let strip_prefix = target.tenants_root().prefix_in_bucket;
57 0 : let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix);
58 0 :
59 0 : tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket);
60 0 : let listing =
61 0 : list_objects_with_retries(remote_client, ListingMode::WithDelimiter, &shards_target)
62 0 : .await?;
63 :
64 0 : let tenant_shard_ids = listing
65 0 : .prefixes
66 0 : .iter()
67 0 : .map(|prefix| prefix.get_path().as_str())
68 0 : .filter_map(|prefix| -> Option<&str> { prefix.strip_prefix(prefix_str) })
69 0 : .map(|entry_id_str| {
70 0 : let first_part = entry_id_str.split('/').next().unwrap();
71 0 :
72 0 : first_part
73 0 : .parse::<TenantShardId>()
74 0 : .with_context(|| format!("Incorrect tenant entry id str: {first_part}"))
75 0 : })
76 0 : .collect::<Vec<_>>();
77 0 :
78 0 : tracing::debug!("Yielding {} shards for {tenant_id}", tenant_shard_ids.len());
79 0 : Ok(stream! {
80 0 : for i in tenant_shard_ids {
81 0 : let id = i?;
82 0 : yield Ok(id);
83 0 : }
84 0 : })
85 0 : }
86 :
87 : /// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
88 : /// using a listing.
89 : ///
90 : /// The listing is done before the stream is built, so that this
91 : /// function can be used to generate concurrency on a stream using buffer_unordered.
92 0 : pub async fn stream_tenant_timelines<'a>(
93 0 : remote_client: &'a GenericRemoteStorage,
94 0 : target: &'a RootTarget,
95 0 : tenant: TenantShardId,
96 0 : ) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
97 0 : let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
98 0 : let timelines_target = target.timelines_root(&tenant);
99 0 :
100 0 : let prefix_str = &timelines_target
101 0 : .prefix_in_bucket
102 0 : .strip_prefix("/")
103 0 : .unwrap_or(&timelines_target.prefix_in_bucket);
104 0 :
105 0 : let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
106 0 : remote_client,
107 0 : ListingMode::WithDelimiter,
108 0 : &timelines_target
109 0 : ));
110 : loop {
111 0 : tracing::debug!("Listing in {tenant}");
112 0 : let fetch_response = match objects_stream.next().await {
113 0 : None => break,
114 0 : Some(Err(e)) => {
115 0 : timeline_ids.push(Err(e));
116 0 : break;
117 : }
118 0 : Some(Ok(r)) => r,
119 0 : };
120 0 :
121 0 : let new_entry_ids = fetch_response
122 0 : .prefixes
123 0 : .iter()
124 0 : .filter_map(|prefix| -> Option<&str> {
125 0 : prefix.get_path().as_str().strip_prefix(prefix_str)
126 0 : })
127 0 : .map(|entry_id_str| {
128 0 : let first_part = entry_id_str.split('/').next().unwrap();
129 0 : first_part
130 0 : .parse::<TimelineId>()
131 0 : .with_context(|| format!("Incorrect timeline entry id str: {entry_id_str}"))
132 0 : });
133 :
134 0 : for i in new_entry_ids {
135 0 : timeline_ids.push(i);
136 0 : }
137 : }
138 :
139 0 : tracing::debug!("Yielding {} timelines for {}", timeline_ids.len(), tenant);
140 0 : Ok(stream! {
141 0 : for i in timeline_ids {
142 0 : let id = i?;
143 0 : yield Ok(TenantShardTimelineId::new(tenant, id));
144 0 : }
145 0 : })
146 0 : }
147 :
148 0 : pub(crate) fn stream_listing<'a>(
149 0 : remote_client: &'a GenericRemoteStorage,
150 0 : target: &'a S3Target,
151 0 : ) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {
152 0 : let listing_mode = if target.delimiter.is_empty() {
153 0 : ListingMode::NoDelimiter
154 : } else {
155 0 : ListingMode::WithDelimiter
156 : };
157 0 : try_stream! {
158 0 : let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
159 0 : remote_client,
160 0 : listing_mode,
161 0 : target,
162 0 : ));
163 0 : while let Some(list) = objects_stream.next().await {
164 0 : let list = list?;
165 0 : if target.delimiter.is_empty() {
166 0 : for key in list.keys {
167 0 : yield (key.key.clone(), Some(key));
168 0 : }
169 0 : } else {
170 0 : for key in list.prefixes {
171 0 : yield (key, None);
172 0 : }
173 0 : }
174 0 : }
175 0 : }
176 0 : }
|