Line data Source code
1 : use std::collections::HashMap;
2 : use std::sync::Arc;
3 :
4 : use crate::checks::{list_timeline_blobs, BlobDataParseResult, RemoteTimelineBlobData};
5 : use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
6 : use crate::{
7 : download_object_to_file_s3, init_remote, init_remote_s3, BucketConfig, NodeKind, RootTarget,
8 : TenantShardTimelineId,
9 : };
10 : use anyhow::Context;
11 : use async_stream::stream;
12 : use aws_sdk_s3::Client;
13 : use camino::Utf8PathBuf;
14 : use futures::{StreamExt, TryStreamExt};
15 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
16 : use pageserver::tenant::storage_layer::LayerName;
17 : use pageserver::tenant::IndexPart;
18 : use pageserver_api::shard::TenantShardId;
19 : use remote_storage::{GenericRemoteStorage, S3Config};
20 : use utils::generation::Generation;
21 : use utils::id::TenantId;
22 :
23 : pub struct SnapshotDownloader {
24 : s3_client: Arc<Client>,
25 : s3_root: RootTarget,
26 : bucket_config: BucketConfig,
27 : bucket_config_s3: S3Config,
28 : tenant_id: TenantId,
29 : output_path: Utf8PathBuf,
30 : concurrency: usize,
31 : }
32 :
33 : impl SnapshotDownloader {
34 0 : pub async fn new(
35 0 : bucket_config: BucketConfig,
36 0 : tenant_id: TenantId,
37 0 : output_path: Utf8PathBuf,
38 0 : concurrency: usize,
39 0 : ) -> anyhow::Result<Self> {
40 0 : let bucket_config_s3 = match &bucket_config.0.storage {
41 0 : remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(),
42 0 : _ => panic!("only S3 configuration is supported for snapshot downloading"),
43 : };
44 0 : let (s3_client, s3_root) =
45 0 : init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?;
46 0 : Ok(Self {
47 0 : s3_client,
48 0 : s3_root,
49 0 : bucket_config,
50 0 : bucket_config_s3,
51 0 : tenant_id,
52 0 : output_path,
53 0 : concurrency,
54 0 : })
55 0 : }
56 :
57 0 : async fn download_layer(
58 0 : &self,
59 0 : ttid: TenantShardTimelineId,
60 0 : layer_name: LayerName,
61 0 : layer_metadata: LayerFileMetadata,
62 0 : ) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
63 0 : // Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
64 0 : // different layer names (remote-style has the generation suffix)
65 0 : let local_path = self.output_path.join(format!(
66 0 : "{}/timelines/{}/{}{}",
67 0 : ttid.tenant_shard_id,
68 0 : ttid.timeline_id,
69 0 : layer_name,
70 0 : layer_metadata.generation.get_suffix()
71 0 : ));
72 0 :
73 0 : // We should only be called for layers that are owned by the input TTID
74 0 : assert_eq!(layer_metadata.shard, ttid.tenant_shard_id.to_index());
75 :
76 : // Assumption: we always write layer files atomically, and layer files are immutable. Therefore if the file
77 : // already exists on local disk, we assume it is fully correct and skip it.
78 0 : if tokio::fs::try_exists(&local_path).await? {
79 0 : tracing::debug!("{} already exists", local_path);
80 0 : return Ok((layer_name, layer_metadata));
81 : } else {
82 0 : tracing::debug!("{} requires download...", local_path);
83 :
84 0 : let timeline_root = self.s3_root.timeline_root(&ttid);
85 0 : let remote_layer_path = format!(
86 0 : "{}{}{}",
87 0 : timeline_root.prefix_in_bucket,
88 0 : layer_name,
89 0 : layer_metadata.generation.get_suffix()
90 0 : );
91 :
92 : // List versions: the object might be deleted.
93 0 : let versions = self
94 0 : .s3_client
95 0 : .list_object_versions()
96 0 : .bucket(self.bucket_config_s3.bucket_name.clone())
97 0 : .prefix(&remote_layer_path)
98 0 : .send()
99 0 : .await?;
100 0 : let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
101 0 : return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
102 : };
103 0 : download_object_to_file_s3(
104 0 : &self.s3_client,
105 0 : &self.bucket_config_s3.bucket_name,
106 0 : &remote_layer_path,
107 0 : version.version_id.as_deref(),
108 0 : &local_path,
109 0 : )
110 0 : .await?;
111 :
112 0 : tracing::debug!("Downloaded successfully to {local_path}");
113 : }
114 :
115 0 : Ok((layer_name, layer_metadata))
116 0 : }
117 :
118 : /// Download many layers belonging to the same TTID, with some concurrency
119 0 : async fn download_layers(
120 0 : &self,
121 0 : ttid: TenantShardTimelineId,
122 0 : layers: Vec<(LayerName, LayerFileMetadata)>,
123 0 : ) -> anyhow::Result<()> {
124 0 : let layer_count = layers.len();
125 0 : tracing::info!("Downloading {} layers for timeline {ttid}...", layer_count);
126 0 : let layers_stream = stream! {
127 0 : for (layer_name, layer_metadata) in layers {
128 0 : yield self.download_layer(ttid, layer_name, layer_metadata);
129 0 : }
130 0 : };
131 0 :
132 0 : tokio::fs::create_dir_all(self.output_path.join(format!(
133 0 : "{}/timelines/{}",
134 0 : ttid.tenant_shard_id, ttid.timeline_id
135 0 : )))
136 0 : .await?;
137 :
138 0 : let layer_results = layers_stream.buffered(self.concurrency);
139 0 : let mut layer_results = std::pin::pin!(layer_results);
140 0 :
141 0 : let mut err = None;
142 0 : let mut download_count = 0;
143 0 : while let Some(i) = layer_results.next().await {
144 0 : download_count += 1;
145 0 : match i {
146 0 : Ok((layer_name, layer_metadata)) => {
147 0 : tracing::info!(
148 0 : "[{download_count}/{layer_count}] OK: {} bytes {ttid} {}",
149 : layer_metadata.file_size,
150 : layer_name
151 : );
152 : }
153 0 : Err(e) => {
154 0 : // Warn and continue: we will download what we can
155 0 : tracing::warn!("Download error: {e}");
156 0 : err = Some(e);
157 : }
158 : }
159 : }
160 0 : if let Some(e) = err {
161 0 : tracing::warn!("Some errors occurred downloading {ttid} layers, last error: {e}");
162 0 : Err(e)
163 : } else {
164 0 : Ok(())
165 : }
166 0 : }
167 :
168 0 : async fn download_timeline(
169 0 : &self,
170 0 : ttid: TenantShardTimelineId,
171 0 : index_part: Box<IndexPart>,
172 0 : index_part_generation: Generation,
173 0 : ancestor_layers: &mut HashMap<TenantShardTimelineId, HashMap<LayerName, LayerFileMetadata>>,
174 0 : ) -> anyhow::Result<()> {
175 0 : let index_bytes = serde_json::to_string(&index_part).unwrap();
176 0 :
177 0 : let layers = index_part
178 0 : .layer_metadata
179 0 : .into_iter()
180 0 : .filter_map(|(layer_name, layer_metadata)| {
181 0 : if layer_metadata.shard.shard_count != ttid.tenant_shard_id.shard_count {
182 : // Accumulate ancestor layers for later download
183 0 : let ancestor_ttid = TenantShardTimelineId::new(
184 0 : TenantShardId {
185 0 : tenant_id: ttid.tenant_shard_id.tenant_id,
186 0 : shard_number: layer_metadata.shard.shard_number,
187 0 : shard_count: layer_metadata.shard.shard_count,
188 0 : },
189 0 : ttid.timeline_id,
190 0 : );
191 0 : let ancestor_ttid_layers = ancestor_layers.entry(ancestor_ttid).or_default();
192 : use std::collections::hash_map::Entry;
193 0 : match ancestor_ttid_layers.entry(layer_name) {
194 0 : Entry::Occupied(entry) => {
195 0 : // Descendent shards that reference a layer from an ancestor should always have matching metadata,
196 0 : // as their siblings, because it is read atomically during a shard split.
197 0 : assert_eq!(entry.get(), &layer_metadata);
198 : }
199 0 : Entry::Vacant(entry) => {
200 0 : entry.insert(layer_metadata);
201 0 : }
202 : }
203 0 : None
204 : } else {
205 0 : Some((layer_name, layer_metadata))
206 : }
207 0 : })
208 0 : .collect();
209 :
210 0 : let download_result = self.download_layers(ttid, layers).await;
211 :
212 : // Write index last, once all the layers it references are downloaded
213 0 : let local_index_path = self.output_path.join(format!(
214 0 : "{}/timelines/{}/index_part.json{}",
215 0 : ttid.tenant_shard_id,
216 0 : ttid.timeline_id,
217 0 : index_part_generation.get_suffix()
218 0 : ));
219 0 : tokio::fs::write(&local_index_path, index_bytes)
220 0 : .await
221 0 : .context("writing index")?;
222 :
223 0 : download_result
224 0 : }
225 :
226 0 : pub async fn download(&self) -> anyhow::Result<()> {
227 0 : let (remote_client, target) =
228 0 : init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
229 :
230 : // Generate a stream of TenantShardId
231 0 : let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
232 0 : let shards: Vec<TenantShardId> = shards.try_collect().await?;
233 :
234 : // Only read from shards that have the highest count: avoids redundantly downloading
235 : // from ancestor shards.
236 0 : let Some(shard_count) = shards.iter().map(|s| s.shard_count).max() else {
237 0 : anyhow::bail!("No shards found");
238 : };
239 :
240 : // We will build a collection of layers in anccestor shards to download (this will only
241 : // happen if this tenant has been split at some point)
242 0 : let mut ancestor_layers: HashMap<
243 0 : TenantShardTimelineId,
244 0 : HashMap<LayerName, LayerFileMetadata>,
245 0 : > = Default::default();
246 :
247 0 : for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
248 : // Generate a stream of TenantTimelineId
249 0 : let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
250 :
251 : // Generate a stream of S3TimelineBlobData
252 0 : async fn load_timeline_index(
253 0 : remote_client: &GenericRemoteStorage,
254 0 : target: &RootTarget,
255 0 : ttid: TenantShardTimelineId,
256 0 : ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
257 0 : let data = list_timeline_blobs(remote_client, ttid, target).await?;
258 0 : Ok((ttid, data))
259 0 : }
260 0 : let timelines =
261 0 : timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
262 0 : let mut timelines = std::pin::pin!(timelines.try_buffered(8));
263 :
264 0 : while let Some(i) = timelines.next().await {
265 0 : let (ttid, data) = i?;
266 0 : match data.blob_data {
267 : BlobDataParseResult::Parsed {
268 0 : index_part,
269 0 : index_part_generation,
270 0 : s3_layers: _,
271 0 : } => {
272 0 : self.download_timeline(
273 0 : ttid,
274 0 : index_part,
275 0 : index_part_generation,
276 0 : &mut ancestor_layers,
277 0 : )
278 0 : .await
279 0 : .context("Downloading timeline")?;
280 : }
281 0 : BlobDataParseResult::Relic => {}
282 : BlobDataParseResult::Incorrect { .. } => {
283 0 : tracing::error!("Bad metadata in timeline {ttid}");
284 : }
285 : };
286 : }
287 : }
288 :
289 0 : for (ttid, layers) in ancestor_layers.into_iter() {
290 0 : tracing::info!(
291 0 : "Downloading {} layers from ancestor timeline {ttid}...",
292 0 : layers.len()
293 : );
294 :
295 0 : self.download_layers(ttid, layers.into_iter().collect())
296 0 : .await?;
297 : }
298 :
299 0 : Ok(())
300 0 : }
301 : }
|