Line data Source code
1 : use std::sync::Arc;
2 :
3 : use humantime::Duration;
4 : use pageserver_api::shard::TenantShardId;
5 : use tokio::task::JoinSet;
6 : use utils::id::TenantTimelineId;
7 :
8 : use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
9 :
10 0 : #[derive(clap::Parser)]
11 : pub(crate) struct Args {
12 : #[clap(long, default_value = "http://localhost:9898")]
13 0 : mgmt_api_endpoint: String,
14 : #[clap(long, default_value = "localhost:64000")]
15 0 : page_service_host_port: String,
16 : #[clap(long)]
17 : pageserver_jwt: Option<String>,
18 : #[clap(
19 : long,
20 : help = "if specified, poll mgmt api to check whether init logical size calculation has completed"
21 : )]
22 : poll_for_completion: Option<Duration>,
23 : #[clap(long)]
24 : limit_to_first_n_targets: Option<usize>,
25 0 : targets: Option<Vec<TenantTimelineId>>,
26 : }
27 :
28 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
29 0 : let rt = tokio::runtime::Builder::new_multi_thread()
30 0 : .enable_all()
31 0 : .build()
32 0 : .unwrap();
33 0 :
34 0 : let main_task = rt.spawn(main_impl(args));
35 0 : rt.block_on(main_task).unwrap()
36 0 : }
37 :
38 0 : async fn main_impl(args: Args) -> anyhow::Result<()> {
39 0 : let args: &'static Args = Box::leak(Box::new(args));
40 0 :
41 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
42 0 : args.mgmt_api_endpoint.clone(),
43 0 : args.pageserver_jwt.as_deref(),
44 0 : ));
45 :
46 : // discover targets
47 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
48 0 : &mgmt_api_client,
49 0 : crate::util::cli::targets::Spec {
50 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
51 0 : targets: args.targets.clone(),
52 0 : },
53 0 : )
54 0 : .await?;
55 :
56 : // kick it off
57 :
58 0 : let mut js = JoinSet::new();
59 0 : for tl in timelines {
60 0 : let mgmt_api_client = Arc::clone(&mgmt_api_client);
61 0 : js.spawn(async move {
62 0 : let info = mgmt_api_client
63 0 : .timeline_info(
64 0 : TenantShardId::unsharded(tl.tenant_id),
65 0 : tl.timeline_id,
66 0 : ForceAwaitLogicalSize::Yes,
67 0 : )
68 0 : .await
69 0 : .unwrap();
70 :
71 : // Polling should not be strictly required here since we await
72 : // for the initial logical size, however it's possible for the request
73 : // to land before the timeline is initialised. This results in an approximate
74 : // logical size.
75 0 : if let Some(period) = args.poll_for_completion {
76 0 : let mut ticker = tokio::time::interval(period.into());
77 0 : ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
78 0 : let mut info = info;
79 0 : while !info.current_logical_size_is_accurate {
80 0 : ticker.tick().await;
81 0 : info = mgmt_api_client
82 0 : .timeline_info(
83 0 : TenantShardId::unsharded(tl.tenant_id),
84 0 : tl.timeline_id,
85 0 : ForceAwaitLogicalSize::Yes,
86 0 : )
87 0 : .await
88 0 : .unwrap();
89 : }
90 0 : }
91 0 : });
92 0 : }
93 0 : while let Some(res) = js.join_next().await {
94 0 : let _: () = res.unwrap();
95 0 : }
96 0 : Ok(())
97 0 : }
|