Line data Source code
1 : use std::pin::pin;
2 :
3 : use futures::{StreamExt, TryStreamExt};
4 : use pageserver::tenant::storage_layer::LayerName;
5 : use remote_storage::ListingMode;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use crate::{
9 : checks::parse_layer_object_name, init_remote, metadata_stream::stream_tenants,
10 : stream_objects_with_retries, BucketConfig, NodeKind,
11 : };
12 :
13 0 : #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
14 : enum LargeObjectKind {
15 : DeltaLayer,
16 : ImageLayer,
17 : Other,
18 : }
19 :
20 : impl LargeObjectKind {
21 0 : fn from_key(key: &str) -> Self {
22 0 : let fname = key.split('/').last().unwrap();
23 :
24 0 : let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
25 0 : return LargeObjectKind::Other;
26 : };
27 :
28 0 : match layer_name {
29 0 : LayerName::Image(_) => LargeObjectKind::ImageLayer,
30 0 : LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
31 : }
32 0 : }
33 : }
34 :
35 0 : #[derive(Serialize, Deserialize, Clone)]
36 : pub struct LargeObject {
37 : pub key: String,
38 : pub size: u64,
39 : kind: LargeObjectKind,
40 : }
41 :
42 0 : #[derive(Serialize, Deserialize)]
43 : pub struct LargeObjectListing {
44 : pub objects: Vec<LargeObject>,
45 : }
46 :
47 0 : pub async fn find_large_objects(
48 0 : bucket_config: BucketConfig,
49 0 : min_size: u64,
50 0 : ignore_deltas: bool,
51 0 : concurrency: usize,
52 0 : ) -> anyhow::Result<LargeObjectListing> {
53 0 : let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
54 0 : let tenants = pin!(stream_tenants(&remote_client, &target));
55 0 :
56 0 : let objects_stream = tenants.map_ok(|tenant_shard_id| {
57 0 : let mut tenant_root = target.tenant_root(&tenant_shard_id);
58 0 : let remote_client = remote_client.clone();
59 0 : async move {
60 0 : let mut objects = Vec::new();
61 0 : let mut total_objects_ctr = 0u64;
62 0 : // We want the objects and not just common prefixes
63 0 : tenant_root.delimiter.clear();
64 0 : let mut objects_stream = pin!(stream_objects_with_retries(
65 0 : &remote_client,
66 0 : ListingMode::NoDelimiter,
67 0 : &tenant_root
68 0 : ));
69 0 : while let Some(listing) = objects_stream.next().await {
70 0 : let listing = listing?;
71 0 : for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
72 0 : let key = obj.key.to_string();
73 0 : let kind = LargeObjectKind::from_key(&key);
74 0 : if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
75 0 : continue;
76 0 : }
77 0 : objects.push(LargeObject {
78 0 : key,
79 0 : size: obj.size,
80 0 : kind,
81 0 : })
82 : }
83 0 : total_objects_ctr += listing.keys.len() as u64;
84 : }
85 :
86 0 : Ok((tenant_shard_id, objects, total_objects_ctr))
87 0 : }
88 0 : });
89 0 : let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
90 0 :
91 0 : let mut objects = Vec::new();
92 0 :
93 0 : let mut tenant_ctr = 0u64;
94 0 : let mut object_ctr = 0u64;
95 0 : while let Some(res) = objects_stream.next().await {
96 0 : let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
97 0 : objects.extend_from_slice(&objects_slice);
98 0 :
99 0 : object_ctr += total_objects_ctr;
100 0 : tenant_ctr += 1;
101 0 : if tenant_ctr % 100 == 0 {
102 0 : tracing::info!(
103 0 : "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
104 0 : objects.len()
105 : );
106 0 : }
107 : }
108 :
109 0 : let bucket_name = target.bucket_name();
110 0 : tracing::info!(
111 0 : "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
112 0 : objects.len()
113 : );
114 0 : Ok(LargeObjectListing { objects })
115 0 : }
|