Line data Source code
1 : use futures::{StreamExt, TryStreamExt};
2 : use pageserver::tenant::storage_layer::LayerName;
3 : use serde::{Deserialize, Serialize};
4 :
5 : use crate::{
6 : checks::parse_layer_object_name, init_remote, list_objects_with_retries,
7 : metadata_stream::stream_tenants, BucketConfig, NodeKind,
8 : };
9 :
10 0 : #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
11 : enum LargeObjectKind {
12 : DeltaLayer,
13 : ImageLayer,
14 : Other,
15 : }
16 :
17 : impl LargeObjectKind {
18 0 : fn from_key(key: &str) -> Self {
19 0 : let fname = key.split('/').last().unwrap();
20 :
21 0 : let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
22 0 : return LargeObjectKind::Other;
23 : };
24 :
25 0 : match layer_name {
26 0 : LayerName::Image(_) => LargeObjectKind::ImageLayer,
27 0 : LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
28 : }
29 0 : }
30 : }
31 :
32 0 : #[derive(Serialize, Deserialize, Clone)]
33 : pub struct LargeObject {
34 : pub key: String,
35 : pub size: u64,
36 : kind: LargeObjectKind,
37 : }
38 :
39 0 : #[derive(Serialize, Deserialize)]
40 : pub struct LargeObjectListing {
41 : pub objects: Vec<LargeObject>,
42 : }
43 :
44 0 : pub async fn find_large_objects(
45 0 : bucket_config: BucketConfig,
46 0 : min_size: u64,
47 0 : ignore_deltas: bool,
48 0 : concurrency: usize,
49 0 : ) -> anyhow::Result<LargeObjectListing> {
50 0 : let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
51 0 : let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
52 0 :
53 0 : let objects_stream = tenants.map_ok(|tenant_shard_id| {
54 0 : let mut tenant_root = target.tenant_root(&tenant_shard_id);
55 0 : let s3_client = s3_client.clone();
56 0 : async move {
57 0 : let mut objects = Vec::new();
58 0 : let mut total_objects_ctr = 0u64;
59 0 : // We want the objects and not just common prefixes
60 0 : tenant_root.delimiter.clear();
61 0 : let mut continuation_token = None;
62 : loop {
63 0 : let fetch_response =
64 0 : list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
65 0 : .await?;
66 0 : for obj in fetch_response.contents().iter().filter(|o| {
67 0 : if let Some(obj_size) = o.size {
68 0 : min_size as i64 <= obj_size
69 : } else {
70 0 : false
71 : }
72 0 : }) {
73 0 : let key = obj.key().expect("couldn't get key").to_owned();
74 0 : let kind = LargeObjectKind::from_key(&key);
75 0 : if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
76 0 : continue;
77 0 : }
78 0 : objects.push(LargeObject {
79 0 : key,
80 0 : size: obj.size.unwrap() as u64,
81 0 : kind,
82 0 : })
83 : }
84 0 : total_objects_ctr += fetch_response.contents().len() as u64;
85 0 : match fetch_response.next_continuation_token {
86 0 : Some(new_token) => continuation_token = Some(new_token),
87 0 : None => break,
88 0 : }
89 0 : }
90 0 :
91 0 : Ok((tenant_shard_id, objects, total_objects_ctr))
92 0 : }
93 0 : });
94 0 : let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
95 0 :
96 0 : let mut objects = Vec::new();
97 0 :
98 0 : let mut tenant_ctr = 0u64;
99 0 : let mut object_ctr = 0u64;
100 0 : while let Some(res) = objects_stream.next().await {
101 0 : let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
102 0 : objects.extend_from_slice(&objects_slice);
103 0 :
104 0 : object_ctr += total_objects_ctr;
105 0 : tenant_ctr += 1;
106 0 : if tenant_ctr % 100 == 0 {
107 0 : tracing::info!(
108 0 : "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
109 0 : objects.len()
110 : );
111 0 : }
112 : }
113 :
114 0 : let bucket_name = target.bucket_name();
115 0 : tracing::info!(
116 0 : "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
117 0 : objects.len()
118 : );
119 0 : Ok(LargeObjectListing { objects })
120 0 : }
|