Line data Source code
1 : use std::pin::pin;
2 :
3 : use futures::{StreamExt, TryStreamExt};
4 : use pageserver::tenant::storage_layer::LayerName;
5 : use remote_storage::ListingMode;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use crate::checks::parse_layer_object_name;
9 : use crate::metadata_stream::stream_tenants;
10 : use crate::{BucketConfig, NodeKind, init_remote, stream_objects_with_retries};
11 :
12 0 : #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
13 : enum LargeObjectKind {
14 : DeltaLayer,
15 : ImageLayer,
16 : Other,
17 : }
18 :
19 : impl LargeObjectKind {
20 0 : fn from_key(key: &str) -> Self {
21 0 : let fname = key.split('/').last().unwrap();
22 :
23 0 : let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
24 0 : return LargeObjectKind::Other;
25 : };
26 :
27 0 : match layer_name {
28 0 : LayerName::Image(_) => LargeObjectKind::ImageLayer,
29 0 : LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
30 : }
31 0 : }
32 : }
33 :
34 0 : #[derive(Serialize, Deserialize, Clone)]
35 : pub struct LargeObject {
36 : pub key: String,
37 : pub size: u64,
38 : kind: LargeObjectKind,
39 : }
40 :
41 0 : #[derive(Serialize, Deserialize)]
42 : pub struct LargeObjectListing {
43 : pub objects: Vec<LargeObject>,
44 : }
45 :
46 0 : pub async fn find_large_objects(
47 0 : bucket_config: BucketConfig,
48 0 : min_size: u64,
49 0 : ignore_deltas: bool,
50 0 : concurrency: usize,
51 0 : ) -> anyhow::Result<LargeObjectListing> {
52 0 : let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
53 0 : let tenants = pin!(stream_tenants(&remote_client, &target));
54 0 :
55 0 : let objects_stream = tenants.map_ok(|tenant_shard_id| {
56 0 : let mut tenant_root = target.tenant_root(&tenant_shard_id);
57 0 : let remote_client = remote_client.clone();
58 0 : async move {
59 0 : let mut objects = Vec::new();
60 0 : let mut total_objects_ctr = 0u64;
61 0 : // We want the objects and not just common prefixes
62 0 : tenant_root.delimiter.clear();
63 0 : let mut objects_stream = pin!(stream_objects_with_retries(
64 0 : &remote_client,
65 0 : ListingMode::NoDelimiter,
66 0 : &tenant_root
67 0 : ));
68 0 : while let Some(listing) = objects_stream.next().await {
69 0 : let listing = listing?;
70 0 : for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
71 0 : let key = obj.key.to_string();
72 0 : let kind = LargeObjectKind::from_key(&key);
73 0 : if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
74 0 : continue;
75 0 : }
76 0 : objects.push(LargeObject {
77 0 : key,
78 0 : size: obj.size,
79 0 : kind,
80 0 : })
81 : }
82 0 : total_objects_ctr += listing.keys.len() as u64;
83 : }
84 :
85 0 : Ok((tenant_shard_id, objects, total_objects_ctr))
86 0 : }
87 0 : });
88 0 : let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
89 0 :
90 0 : let mut objects = Vec::new();
91 0 :
92 0 : let mut tenant_ctr = 0u64;
93 0 : let mut object_ctr = 0u64;
94 0 : while let Some(res) = objects_stream.next().await {
95 0 : let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
96 0 : objects.extend_from_slice(&objects_slice);
97 0 :
98 0 : object_ctr += total_objects_ctr;
99 0 : tenant_ctr += 1;
100 0 : if tenant_ctr % 100 == 0 {
101 0 : tracing::info!(
102 0 : "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
103 0 : objects.len()
104 : );
105 0 : }
106 : }
107 :
108 0 : let desc_str = target.desc_str();
109 0 : tracing::info!(
110 0 : "Scan of {desc_str} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
111 0 : objects.len()
112 : );
113 0 : Ok(LargeObjectListing { objects })
114 0 : }
|