Line data Source code
1 : use std::pin::pin;
2 :
3 : use futures::{StreamExt, TryStreamExt};
4 : use pageserver::tenant::storage_layer::LayerName;
5 : use remote_storage::ListingMode;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use crate::{
9 : checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic,
10 : stream_objects_with_retries, BucketConfig, NodeKind,
11 : };
12 :
13 0 : #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
14 : enum LargeObjectKind {
15 : DeltaLayer,
16 : ImageLayer,
17 : Other,
18 : }
19 :
20 : impl LargeObjectKind {
21 0 : fn from_key(key: &str) -> Self {
22 0 : let fname = key.split('/').last().unwrap();
23 :
24 0 : let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
25 0 : return LargeObjectKind::Other;
26 : };
27 :
28 0 : match layer_name {
29 0 : LayerName::Image(_) => LargeObjectKind::ImageLayer,
30 0 : LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
31 : }
32 0 : }
33 : }
34 :
35 0 : #[derive(Serialize, Deserialize, Clone)]
36 : pub struct LargeObject {
37 : pub key: String,
38 : pub size: u64,
39 : kind: LargeObjectKind,
40 : }
41 :
42 0 : #[derive(Serialize, Deserialize)]
43 : pub struct LargeObjectListing {
44 : pub objects: Vec<LargeObject>,
45 : }
46 :
47 0 : pub async fn find_large_objects(
48 0 : bucket_config: BucketConfig,
49 0 : min_size: u64,
50 0 : ignore_deltas: bool,
51 0 : concurrency: usize,
52 0 : ) -> anyhow::Result<LargeObjectListing> {
53 0 : let (remote_client, target) =
54 0 : init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
55 0 : let tenants = pin!(stream_tenants_generic(&remote_client, &target));
56 0 :
57 0 : let objects_stream = tenants.map_ok(|tenant_shard_id| {
58 0 : let mut tenant_root = target.tenant_root(&tenant_shard_id);
59 0 : let remote_client = remote_client.clone();
60 0 : async move {
61 0 : let mut objects = Vec::new();
62 0 : let mut total_objects_ctr = 0u64;
63 0 : // We want the objects and not just common prefixes
64 0 : tenant_root.delimiter.clear();
65 0 : let mut objects_stream = pin!(stream_objects_with_retries(
66 0 : &remote_client,
67 0 : ListingMode::NoDelimiter,
68 0 : &tenant_root
69 0 : ));
70 0 : while let Some(listing) = objects_stream.next().await {
71 0 : let listing = listing?;
72 0 : for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
73 0 : let key = obj.key.to_string();
74 0 : let kind = LargeObjectKind::from_key(&key);
75 0 : if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
76 0 : continue;
77 0 : }
78 0 : objects.push(LargeObject {
79 0 : key,
80 0 : size: obj.size,
81 0 : kind,
82 0 : })
83 : }
84 0 : total_objects_ctr += listing.keys.len() as u64;
85 : }
86 :
87 0 : Ok((tenant_shard_id, objects, total_objects_ctr))
88 0 : }
89 0 : });
90 0 : let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
91 0 :
92 0 : let mut objects = Vec::new();
93 0 :
94 0 : let mut tenant_ctr = 0u64;
95 0 : let mut object_ctr = 0u64;
96 0 : while let Some(res) = objects_stream.next().await {
97 0 : let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
98 0 : objects.extend_from_slice(&objects_slice);
99 0 :
100 0 : object_ctr += total_objects_ctr;
101 0 : tenant_ctr += 1;
102 0 : if tenant_ctr % 100 == 0 {
103 0 : tracing::info!(
104 0 : "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
105 0 : objects.len()
106 : );
107 0 : }
108 : }
109 :
110 0 : let bucket_name = target.bucket_name();
111 0 : tracing::info!(
112 0 : "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
113 0 : objects.len()
114 : );
115 0 : Ok(LargeObjectListing { objects })
116 0 : }
|