Line data Source code
1 : use std::{collections::HashMap, str::FromStr, time::Duration};
2 :
3 : use clap::{Parser, Subcommand};
4 : use pageserver_api::{
5 : controller_api::{
6 : NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
7 : TenantDescribeResponse, TenantPolicyRequest,
8 : },
9 : models::{
10 : LocationConfigSecondary, ShardParameters, TenantConfig, TenantConfigRequest,
11 : TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
12 : },
13 : shard::{ShardStripeSize, TenantShardId},
14 : };
15 : use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
16 : use reqwest::{Method, StatusCode, Url};
17 : use serde::{de::DeserializeOwned, Serialize};
18 : use utils::id::{NodeId, TenantId};
19 :
20 : use pageserver_api::controller_api::{
21 : NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
22 : TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
23 : };
24 :
25 0 : #[derive(Subcommand, Debug)]
26 : enum Command {
27 : /// Register a pageserver with the storage controller. This shouldn't usually be necessary,
28 : /// since pageservers auto-register when they start up
29 : NodeRegister {
30 : #[arg(long)]
31 0 : node_id: NodeId,
32 :
33 : #[arg(long)]
34 0 : listen_pg_addr: String,
35 : #[arg(long)]
36 0 : listen_pg_port: u16,
37 :
38 : #[arg(long)]
39 0 : listen_http_addr: String,
40 : #[arg(long)]
41 0 : listen_http_port: u16,
42 : },
43 :
44 : /// Modify a node's configuration in the storage controller
45 : NodeConfigure {
46 : #[arg(long)]
47 0 : node_id: NodeId,
48 :
49 : /// Availability is usually auto-detected based on heartbeats. Set 'offline' here to
50 : /// manually mark a node offline
51 : #[arg(long)]
52 : availability: Option<NodeAvailabilityArg>,
53 : /// Scheduling policy controls whether tenant shards may be scheduled onto this node.
54 : #[arg(long)]
55 : scheduling: Option<NodeSchedulingPolicy>,
56 : },
57 : /// Modify a tenant's policies in the storage controller
58 : TenantPolicy {
59 : #[arg(long)]
60 0 : tenant_id: TenantId,
61 : /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
62 : /// or is in the normal attached state with N secondary locations (`attached:N`)
63 : #[arg(long)]
64 : placement: Option<PlacementPolicyArg>,
65 : /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal,
66 : /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
67 : /// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant
68 : /// unavailable, and are only for use in emergencies.
69 : #[arg(long)]
70 : scheduling: Option<ShardSchedulingPolicyArg>,
71 : },
72 : /// List nodes known to the storage controller
73 : Nodes {},
74 : /// List tenants known to the storage controller
75 : Tenants {},
76 : /// Create a new tenant in the storage controller, and by extension on pageservers.
77 : TenantCreate {
78 : #[arg(long)]
79 0 : tenant_id: TenantId,
80 : },
81 : /// Delete a tenant in the storage controller, and by extension on pageservers.
82 : TenantDelete {
83 : #[arg(long)]
84 0 : tenant_id: TenantId,
85 : },
86 : /// Split an existing tenant into a higher number of shards than its current shard count.
87 : TenantShardSplit {
88 : #[arg(long)]
89 0 : tenant_id: TenantId,
90 : #[arg(long)]
91 0 : shard_count: u8,
92 : /// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes.
93 : #[arg(long)]
94 : stripe_size: Option<u32>,
95 : },
96 : /// Migrate the attached location for a tenant shard to a specific pageserver.
97 : TenantShardMigrate {
98 : #[arg(long)]
99 0 : tenant_shard_id: TenantShardId,
100 : #[arg(long)]
101 0 : node: NodeId,
102 : },
103 : /// Modify the pageserver tenant configuration of a tenant: this is the configuration structure
104 : /// that is passed through to pageservers, and does not affect storage controller behavior.
105 : TenantConfig {
106 : #[arg(long)]
107 0 : tenant_id: TenantId,
108 : #[arg(long)]
109 0 : config: String,
110 : },
111 : /// Attempt to balance the locations for a tenant across pageservers. This is a client-side
112 : /// alternative to the storage controller's scheduling optimization behavior.
113 : TenantScatter {
114 : #[arg(long)]
115 0 : tenant_id: TenantId,
116 : },
117 : /// Print details about a particular tenant, including all its shards' states.
118 : TenantDescribe {
119 : #[arg(long)]
120 0 : tenant_id: TenantId,
121 : },
122 : /// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
123 : /// mode so that it can warm up content on a pageserver.
124 : TenantWarmup {
125 : #[arg(long)]
126 0 : tenant_id: TenantId,
127 : },
128 : }
129 :
130 0 : #[derive(Parser)]
131 : #[command(
132 : author,
133 : version,
134 : about,
135 : long_about = "CLI for Storage Controller Support/Debug"
136 : )]
137 : #[command(arg_required_else_help(true))]
138 : struct Cli {
139 : #[arg(long)]
140 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
141 0 : api: Url,
142 :
143 : #[arg(long)]
144 : /// JWT token for authenticating with storage controller. Depending on the API used, this
145 : /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
146 : /// a token with both scopes to use with this tool.
147 : jwt: Option<String>,
148 :
149 : #[command(subcommand)]
150 : command: Command,
151 : }
152 :
153 : #[derive(Debug, Clone)]
154 : struct PlacementPolicyArg(PlacementPolicy);
155 :
156 : impl FromStr for PlacementPolicyArg {
157 : type Err = anyhow::Error;
158 :
159 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
160 0 : match s {
161 0 : "detached" => Ok(Self(PlacementPolicy::Detached)),
162 0 : "secondary" => Ok(Self(PlacementPolicy::Secondary)),
163 0 : _ if s.starts_with("attached:") => {
164 0 : let mut splitter = s.split(':');
165 0 : let _prefix = splitter.next().unwrap();
166 0 : match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
167 0 : Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
168 0 : None => Err(anyhow::anyhow!(
169 0 : "Invalid format '{s}', a valid example is 'attached:1'"
170 0 : )),
171 : }
172 : }
173 0 : _ => Err(anyhow::anyhow!(
174 0 : "Unknown placement policy '{s}', try detached,secondary,attached:<n>"
175 0 : )),
176 : }
177 0 : }
178 : }
179 :
180 : #[derive(Debug, Clone)]
181 : struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
182 :
183 : impl FromStr for ShardSchedulingPolicyArg {
184 : type Err = anyhow::Error;
185 :
186 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
187 0 : match s {
188 0 : "active" => Ok(Self(ShardSchedulingPolicy::Active)),
189 0 : "essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
190 0 : "pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
191 0 : "stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
192 0 : _ => Err(anyhow::anyhow!(
193 0 : "Unknown scheduling policy '{s}', try active,essential,pause,stop"
194 0 : )),
195 : }
196 0 : }
197 : }
198 :
199 : #[derive(Debug, Clone)]
200 : struct NodeAvailabilityArg(NodeAvailabilityWrapper);
201 :
202 : impl FromStr for NodeAvailabilityArg {
203 : type Err = anyhow::Error;
204 :
205 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
206 0 : match s {
207 0 : "active" => Ok(Self(NodeAvailabilityWrapper::Active)),
208 0 : "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
209 0 : _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
210 : }
211 0 : }
212 : }
213 :
214 : struct Client {
215 : base_url: Url,
216 : jwt_token: Option<String>,
217 : client: reqwest::Client,
218 : }
219 :
220 : impl Client {
221 0 : fn new(base_url: Url, jwt_token: Option<String>) -> Self {
222 0 : Self {
223 0 : base_url,
224 0 : jwt_token,
225 0 : client: reqwest::ClientBuilder::new()
226 0 : .build()
227 0 : .expect("Failed to construct http client"),
228 0 : }
229 0 : }
230 :
231 : /// Simple HTTP request wrapper for calling into storage controller
232 0 : async fn dispatch<RQ, RS>(
233 0 : &self,
234 0 : method: Method,
235 0 : path: String,
236 0 : body: Option<RQ>,
237 0 : ) -> mgmt_api::Result<RS>
238 0 : where
239 0 : RQ: Serialize + Sized,
240 0 : RS: DeserializeOwned + Sized,
241 0 : {
242 0 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
243 0 : // for general purpose API access.
244 0 : let url = Url::from_str(&format!(
245 0 : "http://{}:{}/{path}",
246 0 : self.base_url.host_str().unwrap(),
247 0 : self.base_url.port().unwrap()
248 0 : ))
249 0 : .unwrap();
250 0 :
251 0 : let mut builder = self.client.request(method, url);
252 0 : if let Some(body) = body {
253 0 : builder = builder.json(&body)
254 0 : }
255 0 : if let Some(jwt_token) = &self.jwt_token {
256 0 : builder = builder.header(
257 0 : reqwest::header::AUTHORIZATION,
258 0 : format!("Bearer {jwt_token}"),
259 0 : );
260 0 : }
261 :
262 0 : let response = builder.send().await.map_err(mgmt_api::Error::ReceiveBody)?;
263 0 : let response = response.error_from_body().await?;
264 :
265 0 : response
266 0 : .json()
267 0 : .await
268 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)
269 0 : }
270 : }
271 :
272 : #[tokio::main]
273 0 : async fn main() -> anyhow::Result<()> {
274 0 : let cli = Cli::parse();
275 0 :
276 0 : let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
277 0 :
278 0 : let mut trimmed = cli.api.to_string();
279 0 : trimmed.pop();
280 0 : let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
281 0 :
282 0 : match cli.command {
283 0 : Command::NodeRegister {
284 0 : node_id,
285 0 : listen_pg_addr,
286 0 : listen_pg_port,
287 0 : listen_http_addr,
288 0 : listen_http_port,
289 0 : } => {
290 0 : storcon_client
291 0 : .dispatch::<_, ()>(
292 0 : Method::POST,
293 0 : "control/v1/node".to_string(),
294 0 : Some(NodeRegisterRequest {
295 0 : node_id,
296 0 : listen_pg_addr,
297 0 : listen_pg_port,
298 0 : listen_http_addr,
299 0 : listen_http_port,
300 0 : }),
301 0 : )
302 0 : .await?;
303 0 : }
304 0 : Command::TenantCreate { tenant_id } => {
305 0 : vps_client
306 0 : .tenant_create(&TenantCreateRequest {
307 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
308 0 : generation: None,
309 0 : shard_parameters: ShardParameters::default(),
310 0 : placement_policy: Some(PlacementPolicy::Attached(1)),
311 0 : config: TenantConfig::default(),
312 0 : })
313 0 : .await?;
314 0 : }
315 0 : Command::TenantDelete { tenant_id } => {
316 0 : let status = vps_client
317 0 : .tenant_delete(TenantShardId::unsharded(tenant_id))
318 0 : .await?;
319 0 : tracing::info!("Delete status: {}", status);
320 0 : }
321 0 : Command::Nodes {} => {
322 0 : let resp = storcon_client
323 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
324 0 : Method::GET,
325 0 : "control/v1/node".to_string(),
326 0 : None,
327 0 : )
328 0 : .await?;
329 0 : let mut table = comfy_table::Table::new();
330 0 : table.set_header(["Id", "Hostname", "Scheduling", "Availability"]);
331 0 : for node in resp {
332 0 : table.add_row([
333 0 : format!("{}", node.id),
334 0 : node.listen_http_addr,
335 0 : format!("{:?}", node.scheduling),
336 0 : format!("{:?}", node.availability),
337 0 : ]);
338 0 : }
339 0 : println!("{table}");
340 0 : }
341 0 : Command::NodeConfigure {
342 0 : node_id,
343 0 : availability,
344 0 : scheduling,
345 0 : } => {
346 0 : let req = NodeConfigureRequest {
347 0 : node_id,
348 0 : availability: availability.map(|a| a.0),
349 0 : scheduling,
350 0 : };
351 0 : storcon_client
352 0 : .dispatch::<_, ()>(
353 0 : Method::PUT,
354 0 : format!("control/v1/node/{node_id}/config"),
355 0 : Some(req),
356 0 : )
357 0 : .await?;
358 0 : }
359 0 : Command::Tenants {} => {
360 0 : let resp = storcon_client
361 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(
362 0 : Method::GET,
363 0 : "control/v1/tenant".to_string(),
364 0 : None,
365 0 : )
366 0 : .await?;
367 0 : let mut table = comfy_table::Table::new();
368 0 : table.set_header([
369 0 : "TenantId",
370 0 : "ShardCount",
371 0 : "StripeSize",
372 0 : "Placement",
373 0 : "Scheduling",
374 0 : ]);
375 0 : for tenant in resp {
376 0 : let shard_zero = tenant.shards.into_iter().next().unwrap();
377 0 : table.add_row([
378 0 : format!("{}", tenant.tenant_id),
379 0 : format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
380 0 : format!("{:?}", tenant.stripe_size),
381 0 : format!("{:?}", tenant.policy),
382 0 : format!("{:?}", shard_zero.scheduling_policy),
383 0 : ]);
384 0 : }
385 0 :
386 0 : println!("{table}");
387 0 : }
388 0 : Command::TenantPolicy {
389 0 : tenant_id,
390 0 : placement,
391 0 : scheduling,
392 0 : } => {
393 0 : let req = TenantPolicyRequest {
394 0 : scheduling: scheduling.map(|s| s.0),
395 0 : placement: placement.map(|p| p.0),
396 0 : };
397 0 : storcon_client
398 0 : .dispatch::<_, ()>(
399 0 : Method::PUT,
400 0 : format!("control/v1/tenant/{tenant_id}/policy"),
401 0 : Some(req),
402 0 : )
403 0 : .await?;
404 0 : }
405 0 : Command::TenantShardSplit {
406 0 : tenant_id,
407 0 : shard_count,
408 0 : stripe_size,
409 0 : } => {
410 0 : let req = TenantShardSplitRequest {
411 0 : new_shard_count: shard_count,
412 0 : new_stripe_size: stripe_size.map(ShardStripeSize),
413 0 : };
414 0 :
415 0 : let response = storcon_client
416 0 : .dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
417 0 : Method::PUT,
418 0 : format!("control/v1/tenant/{tenant_id}/shard_split"),
419 0 : Some(req),
420 0 : )
421 0 : .await?;
422 0 : println!(
423 0 : "Split tenant {} into {} shards: {}",
424 0 : tenant_id,
425 0 : shard_count,
426 0 : response
427 0 : .new_shards
428 0 : .iter()
429 0 : .map(|s| format!("{:?}", s))
430 0 : .collect::<Vec<_>>()
431 0 : .join(",")
432 0 : );
433 0 : }
434 0 : Command::TenantShardMigrate {
435 0 : tenant_shard_id,
436 0 : node,
437 0 : } => {
438 0 : let req = TenantShardMigrateRequest {
439 0 : tenant_shard_id,
440 0 : node_id: node,
441 0 : };
442 0 :
443 0 : storcon_client
444 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
445 0 : Method::PUT,
446 0 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
447 0 : Some(req),
448 0 : )
449 0 : .await?;
450 0 : }
451 0 : Command::TenantConfig { tenant_id, config } => {
452 0 : let tenant_conf = serde_json::from_str(&config)?;
453 0 :
454 0 : vps_client
455 0 : .tenant_config(&TenantConfigRequest {
456 0 : tenant_id,
457 0 : config: tenant_conf,
458 0 : })
459 0 : .await?;
460 0 : }
461 0 : Command::TenantScatter { tenant_id } => {
462 0 : // Find the shards
463 0 : let locate_response = storcon_client
464 0 : .dispatch::<(), TenantLocateResponse>(
465 0 : Method::GET,
466 0 : format!("control/v1/tenant/{tenant_id}/locate"),
467 0 : None,
468 0 : )
469 0 : .await?;
470 0 : let shards = locate_response.shards;
471 0 :
472 0 : let mut node_to_shards: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
473 0 : let shard_count = shards.len();
474 0 : for s in shards {
475 0 : let entry = node_to_shards.entry(s.node_id).or_default();
476 0 : entry.push(s.shard_id);
477 0 : }
478 0 :
479 0 : // Load list of available nodes
480 0 : let nodes_resp = storcon_client
481 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
482 0 : Method::GET,
483 0 : "control/v1/node".to_string(),
484 0 : None,
485 0 : )
486 0 : .await?;
487 0 :
488 0 : for node in nodes_resp {
489 0 : if matches!(node.availability, NodeAvailabilityWrapper::Active) {
490 0 : node_to_shards.entry(node.id).or_default();
491 0 : }
492 0 : }
493 0 :
494 0 : let max_shard_per_node = shard_count / node_to_shards.len();
495 0 :
496 0 : loop {
497 0 : let mut migrate_shard = None;
498 0 : for shards in node_to_shards.values_mut() {
499 0 : if shards.len() > max_shard_per_node {
500 0 : // Pick the emptiest
501 0 : migrate_shard = Some(shards.pop().unwrap());
502 0 : }
503 0 : }
504 0 : let Some(migrate_shard) = migrate_shard else {
505 0 : break;
506 0 : };
507 0 :
508 0 : // Pick the emptiest node to migrate to
509 0 : let mut destinations = node_to_shards
510 0 : .iter()
511 0 : .map(|(k, v)| (k, v.len()))
512 0 : .collect::<Vec<_>>();
513 0 : destinations.sort_by_key(|i| i.1);
514 0 : let (destination_node, destination_count) = *destinations.first().unwrap();
515 0 : if destination_count + 1 > max_shard_per_node {
516 0 : // Even the emptiest destination doesn't have space: we're done
517 0 : break;
518 0 : }
519 0 : let destination_node = *destination_node;
520 0 :
521 0 : node_to_shards
522 0 : .get_mut(&destination_node)
523 0 : .unwrap()
524 0 : .push(migrate_shard);
525 0 :
526 0 : println!("Migrate {} -> {} ...", migrate_shard, destination_node);
527 0 :
528 0 : storcon_client
529 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
530 0 : Method::PUT,
531 0 : format!("control/v1/tenant/{migrate_shard}/migrate"),
532 0 : Some(TenantShardMigrateRequest {
533 0 : tenant_shard_id: migrate_shard,
534 0 : node_id: destination_node,
535 0 : }),
536 0 : )
537 0 : .await?;
538 0 : println!("Migrate {} -> {} OK", migrate_shard, destination_node);
539 0 : }
540 0 :
541 0 : // Spread the shards across the nodes
542 0 : }
543 0 : Command::TenantDescribe { tenant_id } => {
544 0 : let describe_response = storcon_client
545 0 : .dispatch::<(), TenantDescribeResponse>(
546 0 : Method::GET,
547 0 : format!("control/v1/tenant/{tenant_id}"),
548 0 : None,
549 0 : )
550 0 : .await?;
551 0 : let shards = describe_response.shards;
552 0 : let mut table = comfy_table::Table::new();
553 0 : table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]);
554 0 : for shard in shards {
555 0 : let secondary = shard
556 0 : .node_secondary
557 0 : .iter()
558 0 : .map(|n| format!("{}", n))
559 0 : .collect::<Vec<_>>()
560 0 : .join(",");
561 0 :
562 0 : let mut status_parts = Vec::new();
563 0 : if shard.is_reconciling {
564 0 : status_parts.push("reconciling");
565 0 : }
566 0 :
567 0 : if shard.is_pending_compute_notification {
568 0 : status_parts.push("pending_compute");
569 0 : }
570 0 :
571 0 : if shard.is_splitting {
572 0 : status_parts.push("splitting");
573 0 : }
574 0 : let status = status_parts.join(",");
575 0 :
576 0 : table.add_row([
577 0 : format!("{}", shard.tenant_shard_id),
578 0 : shard
579 0 : .node_attached
580 0 : .map(|n| format!("{}", n))
581 0 : .unwrap_or(String::new()),
582 0 : secondary,
583 0 : shard.last_error,
584 0 : status,
585 0 : ]);
586 0 : }
587 0 : println!("{table}");
588 0 : }
589 0 : Command::TenantWarmup { tenant_id } => {
590 0 : let describe_response = storcon_client
591 0 : .dispatch::<(), TenantDescribeResponse>(
592 0 : Method::GET,
593 0 : format!("control/v1/tenant/{tenant_id}"),
594 0 : None,
595 0 : )
596 0 : .await;
597 0 : match describe_response {
598 0 : Ok(describe) => {
599 0 : if matches!(describe.policy, PlacementPolicy::Secondary) {
600 0 : // Fine: it's already known to controller in secondary mode: calling
601 0 : // again to put it into secondary mode won't cause problems.
602 0 : } else {
603 0 : anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
604 0 : }
605 0 : }
606 0 : Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
607 0 : // Fine: this tenant isn't know to the storage controller yet.
608 0 : }
609 0 : Err(e) => {
610 0 : // Unexpected API error
611 0 : return Err(e.into());
612 0 : }
613 0 : }
614 0 :
615 0 : vps_client
616 0 : .location_config(
617 0 : TenantShardId::unsharded(tenant_id),
618 0 : pageserver_api::models::LocationConfig {
619 0 : mode: pageserver_api::models::LocationConfigMode::Secondary,
620 0 : generation: None,
621 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
622 0 : shard_number: 0,
623 0 : shard_count: 0,
624 0 : shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
625 0 : tenant_conf: TenantConfig::default(),
626 0 : },
627 0 : None,
628 0 : true,
629 0 : )
630 0 : .await?;
631 0 :
632 0 : let describe_response = storcon_client
633 0 : .dispatch::<(), TenantDescribeResponse>(
634 0 : Method::GET,
635 0 : format!("control/v1/tenant/{tenant_id}"),
636 0 : None,
637 0 : )
638 0 : .await?;
639 0 :
640 0 : let secondary_ps_id = describe_response
641 0 : .shards
642 0 : .first()
643 0 : .unwrap()
644 0 : .node_secondary
645 0 : .first()
646 0 : .unwrap();
647 0 :
648 0 : println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
649 0 : loop {
650 0 : let (status, progress) = vps_client
651 0 : .tenant_secondary_download(
652 0 : TenantShardId::unsharded(tenant_id),
653 0 : Some(Duration::from_secs(10)),
654 0 : )
655 0 : .await?;
656 0 : println!(
657 0 : "Progress: {}/{} layers, {}/{} bytes",
658 0 : progress.layers_downloaded,
659 0 : progress.layers_total,
660 0 : progress.bytes_downloaded,
661 0 : progress.bytes_total
662 0 : );
663 0 : match status {
664 0 : StatusCode::OK => {
665 0 : println!("Download complete");
666 0 : break;
667 0 : }
668 0 : StatusCode::ACCEPTED => {
669 0 : // Loop
670 0 : }
671 0 : _ => {
672 0 : anyhow::bail!("Unexpected download status: {status}");
673 0 : }
674 0 : }
675 0 : }
676 0 : }
677 0 : }
678 0 :
679 0 : Ok(())
680 0 : }
|