Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use anyhow::Context;
4 : use async_stream::stream;
5 : use camino::Utf8PathBuf;
6 : use futures::{StreamExt, TryStreamExt};
7 : use pageserver::tenant::IndexPart;
8 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
9 : use pageserver::tenant::remote_timeline_client::remote_layer_path;
10 : use pageserver::tenant::storage_layer::LayerName;
11 : use pageserver_api::shard::TenantShardId;
12 : use remote_storage::GenericRemoteStorage;
13 : use tokio_util::sync::CancellationToken;
14 : use utils::generation::Generation;
15 : use utils::id::TenantId;
16 :
17 : use crate::checks::{BlobDataParseResult, RemoteTimelineBlobData, list_timeline_blobs};
18 : use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
19 : use crate::{
20 : BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file, init_remote,
21 : };
22 :
23 : pub struct SnapshotDownloader {
24 : remote_client: GenericRemoteStorage,
25 : #[allow(dead_code)]
26 : target: RootTarget,
27 : tenant_id: TenantId,
28 : output_path: Utf8PathBuf,
29 : concurrency: usize,
30 : }
31 :
32 : impl SnapshotDownloader {
33 0 : pub async fn new(
34 0 : bucket_config: BucketConfig,
35 0 : tenant_id: TenantId,
36 0 : output_path: Utf8PathBuf,
37 0 : concurrency: usize,
38 0 : ) -> anyhow::Result<Self> {
39 0 : let (remote_client, target) =
40 0 : init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
41 :
42 0 : Ok(Self {
43 0 : remote_client,
44 0 : target,
45 0 : tenant_id,
46 0 : output_path,
47 0 : concurrency,
48 0 : })
49 0 : }
50 :
51 0 : async fn download_layer(
52 0 : &self,
53 0 : ttid: TenantShardTimelineId,
54 0 : layer_name: LayerName,
55 0 : layer_metadata: LayerFileMetadata,
56 0 : ) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
57 0 : let cancel = CancellationToken::new();
58 : // Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
59 : // different layer names (remote-style has the generation suffix)
60 0 : let local_path = self.output_path.join(format!(
61 0 : "{}/timelines/{}/{}{}",
62 0 : ttid.tenant_shard_id,
63 0 : ttid.timeline_id,
64 0 : layer_name,
65 0 : layer_metadata.generation.get_suffix()
66 0 : ));
67 :
68 : // We should only be called for layers that are owned by the input TTID
69 0 : assert_eq!(layer_metadata.shard, ttid.tenant_shard_id.to_index());
70 :
71 : // Assumption: we always write layer files atomically, and layer files are immutable. Therefore if the file
72 : // already exists on local disk, we assume it is fully correct and skip it.
73 0 : if tokio::fs::try_exists(&local_path).await? {
74 0 : tracing::debug!("{} already exists", local_path);
75 0 : return Ok((layer_name, layer_metadata));
76 : } else {
77 0 : tracing::debug!("{} requires download...", local_path);
78 :
79 0 : let remote_path = remote_layer_path(
80 0 : &ttid.tenant_shard_id.tenant_id,
81 0 : &ttid.timeline_id,
82 0 : layer_metadata.shard,
83 0 : &layer_name,
84 0 : layer_metadata.generation,
85 : );
86 0 : let mode = remote_storage::ListingMode::NoDelimiter;
87 :
88 : // List versions: the object might be deleted.
89 0 : let versions = self
90 0 : .remote_client
91 0 : .list_versions(Some(&remote_path), mode, None, &cancel)
92 0 : .await?;
93 0 : let Some(version) = versions.versions.first() else {
94 0 : return Err(anyhow::anyhow!("No versions found for {remote_path}"));
95 : };
96 0 : download_object_to_file(
97 0 : &self.remote_client,
98 0 : &remote_path,
99 0 : version.version_id().cloned(),
100 0 : &local_path,
101 0 : )
102 0 : .await?;
103 :
104 0 : tracing::debug!("Downloaded successfully to {local_path}");
105 : }
106 :
107 0 : Ok((layer_name, layer_metadata))
108 0 : }
109 :
110 : /// Download many layers belonging to the same TTID, with some concurrency
111 0 : async fn download_layers(
112 0 : &self,
113 0 : ttid: TenantShardTimelineId,
114 0 : layers: Vec<(LayerName, LayerFileMetadata)>,
115 0 : ) -> anyhow::Result<()> {
116 0 : let layer_count = layers.len();
117 0 : tracing::info!("Downloading {} layers for timeline {ttid}...", layer_count);
118 0 : let layers_stream = stream! {
119 : for (layer_name, layer_metadata) in layers {
120 : yield self.download_layer(ttid, layer_name, layer_metadata);
121 : }
122 : };
123 :
124 0 : tokio::fs::create_dir_all(self.output_path.join(format!(
125 0 : "{}/timelines/{}",
126 0 : ttid.tenant_shard_id, ttid.timeline_id
127 0 : )))
128 0 : .await?;
129 :
130 0 : let layer_results = layers_stream.buffered(self.concurrency);
131 0 : let mut layer_results = std::pin::pin!(layer_results);
132 :
133 0 : let mut err = None;
134 0 : let mut download_count = 0;
135 0 : while let Some(i) = layer_results.next().await {
136 0 : download_count += 1;
137 0 : match i {
138 0 : Ok((layer_name, layer_metadata)) => {
139 0 : tracing::info!(
140 0 : "[{download_count}/{layer_count}] OK: {} bytes {ttid} {}",
141 : layer_metadata.file_size,
142 : layer_name
143 : );
144 : }
145 0 : Err(e) => {
146 : // Warn and continue: we will download what we can
147 0 : tracing::warn!("Download error: {e}");
148 0 : err = Some(e);
149 : }
150 : }
151 : }
152 0 : if let Some(e) = err {
153 0 : tracing::warn!("Some errors occurred downloading {ttid} layers, last error: {e}");
154 0 : Err(e)
155 : } else {
156 0 : Ok(())
157 : }
158 0 : }
159 :
160 0 : async fn download_timeline(
161 0 : &self,
162 0 : ttid: TenantShardTimelineId,
163 0 : index_part: Box<IndexPart>,
164 0 : index_part_generation: Generation,
165 0 : ancestor_layers: &mut HashMap<TenantShardTimelineId, HashMap<LayerName, LayerFileMetadata>>,
166 0 : ) -> anyhow::Result<()> {
167 0 : let index_bytes = serde_json::to_string(&index_part).unwrap();
168 :
169 0 : let layers = index_part
170 0 : .layer_metadata
171 0 : .into_iter()
172 0 : .filter_map(|(layer_name, layer_metadata)| {
173 0 : if layer_metadata.shard.shard_count != ttid.tenant_shard_id.shard_count {
174 : // Accumulate ancestor layers for later download
175 0 : let ancestor_ttid = TenantShardTimelineId::new(
176 0 : TenantShardId {
177 0 : tenant_id: ttid.tenant_shard_id.tenant_id,
178 0 : shard_number: layer_metadata.shard.shard_number,
179 0 : shard_count: layer_metadata.shard.shard_count,
180 0 : },
181 0 : ttid.timeline_id,
182 : );
183 0 : let ancestor_ttid_layers = ancestor_layers.entry(ancestor_ttid).or_default();
184 : use std::collections::hash_map::Entry;
185 0 : match ancestor_ttid_layers.entry(layer_name) {
186 0 : Entry::Occupied(entry) => {
187 : // Descendent shards that reference a layer from an ancestor should always have matching metadata,
188 : // as their siblings, because it is read atomically during a shard split.
189 0 : assert_eq!(entry.get(), &layer_metadata);
190 : }
191 0 : Entry::Vacant(entry) => {
192 0 : entry.insert(layer_metadata);
193 0 : }
194 : }
195 0 : None
196 : } else {
197 0 : Some((layer_name, layer_metadata))
198 : }
199 0 : })
200 0 : .collect();
201 :
202 0 : let download_result = self.download_layers(ttid, layers).await;
203 :
204 : // Write index last, once all the layers it references are downloaded
205 0 : let local_index_path = self.output_path.join(format!(
206 0 : "{}/timelines/{}/index_part.json{}",
207 0 : ttid.tenant_shard_id,
208 0 : ttid.timeline_id,
209 0 : index_part_generation.get_suffix()
210 0 : ));
211 0 : tokio::fs::write(&local_index_path, index_bytes)
212 0 : .await
213 0 : .context("writing index")?;
214 :
215 0 : download_result
216 0 : }
217 :
218 0 : pub async fn download(&self) -> anyhow::Result<()> {
219 : // Generate a stream of TenantShardId
220 0 : let shards =
221 0 : stream_tenant_shards(&self.remote_client, &self.target, self.tenant_id).await?;
222 0 : let shards: Vec<TenantShardId> = shards.try_collect().await?;
223 :
224 : // Only read from shards that have the highest count: avoids redundantly downloading
225 : // from ancestor shards.
226 0 : let Some(shard_count) = shards.iter().map(|s| s.shard_count).max() else {
227 0 : anyhow::bail!("No shards found");
228 : };
229 :
230 : // We will build a collection of layers in anccestor shards to download (this will only
231 : // happen if this tenant has been split at some point)
232 0 : let mut ancestor_layers: HashMap<
233 0 : TenantShardTimelineId,
234 0 : HashMap<LayerName, LayerFileMetadata>,
235 0 : > = Default::default();
236 :
237 0 : for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
238 : // Generate a stream of TenantTimelineId
239 0 : let timelines =
240 0 : stream_tenant_timelines(&self.remote_client, &self.target, shard).await?;
241 :
242 : // Generate a stream of S3TimelineBlobData
243 0 : async fn load_timeline_index(
244 0 : remote_client: &GenericRemoteStorage,
245 0 : target: &RootTarget,
246 0 : ttid: TenantShardTimelineId,
247 0 : ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
248 0 : let data = list_timeline_blobs(remote_client, ttid, target).await?;
249 0 : Ok((ttid, data))
250 0 : }
251 0 : let timelines = timelines
252 0 : .map_ok(|ttid| load_timeline_index(&self.remote_client, &self.target, ttid));
253 0 : let mut timelines = std::pin::pin!(timelines.try_buffered(8));
254 :
255 0 : while let Some(i) = timelines.next().await {
256 0 : let (ttid, data) = i?;
257 0 : match data.blob_data {
258 : BlobDataParseResult::Parsed {
259 0 : index_part,
260 0 : index_part_generation,
261 : s3_layers: _,
262 : index_part_last_modified_time: _,
263 : index_part_snapshot_time: _,
264 : } => {
265 0 : self.download_timeline(
266 0 : ttid,
267 0 : index_part,
268 0 : index_part_generation,
269 0 : &mut ancestor_layers,
270 0 : )
271 0 : .await
272 0 : .context("Downloading timeline")?;
273 : }
274 0 : BlobDataParseResult::Relic => {}
275 : BlobDataParseResult::Incorrect { .. } => {
276 0 : tracing::error!("Bad metadata in timeline {ttid}");
277 : }
278 : };
279 : }
280 : }
281 :
282 0 : for (ttid, layers) in ancestor_layers.into_iter() {
283 0 : tracing::info!(
284 0 : "Downloading {} layers from ancestor timeline {ttid}...",
285 0 : layers.len()
286 : );
287 :
288 0 : self.download_layers(ttid, layers.into_iter().collect())
289 0 : .await?;
290 : }
291 :
292 0 : Ok(())
293 0 : }
294 : }
|