|             Line data    Source code 
       1              : use futures::StreamExt;
       2              : use std::{collections::HashMap, str::FromStr, time::Duration};
       3              : 
       4              : use clap::{Parser, Subcommand};
       5              : use pageserver_api::{
       6              :     controller_api::{
       7              :         NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
       8              :         TenantDescribeResponse, TenantPolicyRequest,
       9              :     },
      10              :     models::{
      11              :         EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
      12              :         ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
      13              :         TenantShardSplitRequest, TenantShardSplitResponse,
      14              :     },
      15              :     shard::{ShardStripeSize, TenantShardId},
      16              : };
      17              : use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
      18              : use reqwest::{Method, StatusCode, Url};
      19              : use serde::{de::DeserializeOwned, Serialize};
      20              : use utils::id::{NodeId, TenantId};
      21              : 
      22              : use pageserver_api::controller_api::{
      23              :     NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
      24              :     TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
      25              : };
      26              : 
      27            0 : #[derive(Subcommand, Debug)]
      28              : enum Command {
      29              :     /// Register a pageserver with the storage controller.  This shouldn't usually be necessary,
      30              :     /// since pageservers auto-register when they start up
      31              :     NodeRegister {
      32              :         #[arg(long)]
      33            0 :         node_id: NodeId,
      34              : 
      35              :         #[arg(long)]
      36            0 :         listen_pg_addr: String,
      37              :         #[arg(long)]
      38            0 :         listen_pg_port: u16,
      39              : 
      40              :         #[arg(long)]
      41            0 :         listen_http_addr: String,
      42              :         #[arg(long)]
      43            0 :         listen_http_port: u16,
      44              :     },
      45              : 
      46              :     /// Modify a node's configuration in the storage controller
      47              :     NodeConfigure {
      48              :         #[arg(long)]
      49            0 :         node_id: NodeId,
      50              : 
      51              :         /// Availability is usually auto-detected based on heartbeats.  Set 'offline' here to
      52              :         /// manually mark a node offline
      53              :         #[arg(long)]
      54              :         availability: Option<NodeAvailabilityArg>,
      55              :         /// Scheduling policy controls whether tenant shards may be scheduled onto this node.
      56              :         #[arg(long)]
      57              :         scheduling: Option<NodeSchedulingPolicy>,
      58              :     },
      59              :     /// Modify a tenant's policies in the storage controller
      60              :     TenantPolicy {
      61              :         #[arg(long)]
      62            0 :         tenant_id: TenantId,
      63              :         /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
      64              :         /// or is in the normal attached state with N secondary locations (`attached:N`)
      65              :         #[arg(long)]
      66              :         placement: Option<PlacementPolicyArg>,
      67              :         /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant.  `active` is normal,
      68              :         /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
      69              :         /// all reconciliation activity including for scheduling changes already made.  `pause` and `stop` can make a tenant
      70              :         /// unavailable, and are only for use in emergencies.
      71              :         #[arg(long)]
      72              :         scheduling: Option<ShardSchedulingPolicyArg>,
      73              :     },
      74              :     /// List nodes known to the storage controller
      75              :     Nodes {},
      76              :     /// List tenants known to the storage controller
      77              :     Tenants {},
      78              :     /// Create a new tenant in the storage controller, and by extension on pageservers.
      79              :     TenantCreate {
      80              :         #[arg(long)]
      81            0 :         tenant_id: TenantId,
      82              :     },
      83              :     /// Delete a tenant in the storage controller, and by extension on pageservers.
      84              :     TenantDelete {
      85              :         #[arg(long)]
      86            0 :         tenant_id: TenantId,
      87              :     },
      88              :     /// Split an existing tenant into a higher number of shards than its current shard count.
      89              :     TenantShardSplit {
      90              :         #[arg(long)]
      91            0 :         tenant_id: TenantId,
      92              :         #[arg(long)]
      93            0 :         shard_count: u8,
      94              :         /// Optional, in 8kiB pages.  e.g. set 2048 for 16MB stripes.
      95              :         #[arg(long)]
      96              :         stripe_size: Option<u32>,
      97              :     },
      98              :     /// Migrate the attached location for a tenant shard to a specific pageserver.
      99              :     TenantShardMigrate {
     100              :         #[arg(long)]
     101            0 :         tenant_shard_id: TenantShardId,
     102              :         #[arg(long)]
     103            0 :         node: NodeId,
     104              :     },
     105              :     /// Modify the pageserver tenant configuration of a tenant: this is the configuration structure
     106              :     /// that is passed through to pageservers, and does not affect storage controller behavior.
     107              :     TenantConfig {
     108              :         #[arg(long)]
     109            0 :         tenant_id: TenantId,
     110              :         #[arg(long)]
     111            0 :         config: String,
     112              :     },
     113              :     /// Attempt to balance the locations for a tenant across pageservers.  This is a client-side
     114              :     /// alternative to the storage controller's scheduling optimization behavior.
     115              :     TenantScatter {
     116              :         #[arg(long)]
     117            0 :         tenant_id: TenantId,
     118              :     },
     119              :     /// Print details about a particular tenant, including all its shards' states.
     120              :     TenantDescribe {
     121              :         #[arg(long)]
     122            0 :         tenant_id: TenantId,
     123              :     },
     124              :     /// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
     125              :     /// mode so that it can warm up content on a pageserver.
     126              :     TenantWarmup {
     127              :         #[arg(long)]
     128            0 :         tenant_id: TenantId,
     129              :     },
     130              :     /// Uncleanly drop a tenant from the storage controller: this doesn't delete anything from pageservers. Appropriate
     131              :     /// if you e.g. used `tenant-warmup` by mistake on a tenant ID that doesn't really exist, or is in some other region.
     132              :     TenantDrop {
     133              :         #[arg(long)]
     134            0 :         tenant_id: TenantId,
     135              :         #[arg(long)]
     136            0 :         unclean: bool,
     137              :     },
     138              :     NodeDrop {
     139              :         #[arg(long)]
     140            0 :         node_id: NodeId,
     141              :         #[arg(long)]
     142            0 :         unclean: bool,
     143              :     },
     144              :     TenantSetTimeBasedEviction {
     145              :         #[arg(long)]
     146            0 :         tenant_id: TenantId,
     147              :         #[arg(long)]
     148            0 :         period: humantime::Duration,
     149              :         #[arg(long)]
     150            0 :         threshold: humantime::Duration,
     151              :     },
     152              :     // Drain a set of specified pageservers by moving the primary attachments to pageservers
     153              :     // outside of the specified set.
     154              :     Drain {
     155              :         // Set of pageserver node ids to drain.
     156              :         #[arg(long)]
     157            0 :         nodes: Vec<NodeId>,
     158              :         // Optional: migration concurrency (default is 8)
     159              :         #[arg(long)]
     160              :         concurrency: Option<usize>,
     161              :         // Optional: maximum number of shards to migrate
     162              :         #[arg(long)]
     163              :         max_shards: Option<usize>,
     164              :         // Optional: when set to true, nothing is migrated, but the plan is printed to stdout
     165              :         #[arg(long)]
     166              :         dry_run: Option<bool>,
     167              :     },
     168              : }
     169              : 
     170            0 : #[derive(Parser)]
     171              : #[command(
     172              :     author,
     173              :     version,
     174              :     about,
     175              :     long_about = "CLI for Storage Controller Support/Debug"
     176              : )]
     177              : #[command(arg_required_else_help(true))]
     178              : struct Cli {
     179              :     #[arg(long)]
     180              :     /// URL to storage controller.  e.g. http://127.0.0.1:1234 when using `neon_local`
     181            0 :     api: Url,
     182              : 
     183              :     #[arg(long)]
     184              :     /// JWT token for authenticating with storage controller.  Depending on the API used, this
     185              :     /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
     186              :     /// a token with both scopes to use with this tool.
     187              :     jwt: Option<String>,
     188              : 
     189              :     #[command(subcommand)]
     190              :     command: Command,
     191              : }
     192              : 
     193              : #[derive(Debug, Clone)]
     194              : struct PlacementPolicyArg(PlacementPolicy);
     195              : 
     196              : impl FromStr for PlacementPolicyArg {
     197              :     type Err = anyhow::Error;
     198              : 
     199            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     200            0 :         match s {
     201            0 :             "detached" => Ok(Self(PlacementPolicy::Detached)),
     202            0 :             "secondary" => Ok(Self(PlacementPolicy::Secondary)),
     203            0 :             _ if s.starts_with("attached:") => {
     204            0 :                 let mut splitter = s.split(':');
     205            0 :                 let _prefix = splitter.next().unwrap();
     206            0 :                 match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
     207            0 :                     Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
     208            0 :                     None => Err(anyhow::anyhow!(
     209            0 :                         "Invalid format '{s}', a valid example is 'attached:1'"
     210            0 :                     )),
     211              :                 }
     212              :             }
     213            0 :             _ => Err(anyhow::anyhow!(
     214            0 :                 "Unknown placement policy '{s}', try detached,secondary,attached:<n>"
     215            0 :             )),
     216              :         }
     217            0 :     }
     218              : }
     219              : 
     220              : #[derive(Debug, Clone)]
     221              : struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
     222              : 
     223              : impl FromStr for ShardSchedulingPolicyArg {
     224              :     type Err = anyhow::Error;
     225              : 
     226            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     227            0 :         match s {
     228            0 :             "active" => Ok(Self(ShardSchedulingPolicy::Active)),
     229            0 :             "essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
     230            0 :             "pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
     231            0 :             "stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
     232            0 :             _ => Err(anyhow::anyhow!(
     233            0 :                 "Unknown scheduling policy '{s}', try active,essential,pause,stop"
     234            0 :             )),
     235              :         }
     236            0 :     }
     237              : }
     238              : 
     239              : #[derive(Debug, Clone)]
     240              : struct NodeAvailabilityArg(NodeAvailabilityWrapper);
     241              : 
     242              : impl FromStr for NodeAvailabilityArg {
     243              :     type Err = anyhow::Error;
     244              : 
     245            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     246            0 :         match s {
     247            0 :             "active" => Ok(Self(NodeAvailabilityWrapper::Active)),
     248            0 :             "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
     249            0 :             _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
     250              :         }
     251            0 :     }
     252              : }
     253              : 
     254              : struct Client {
     255              :     base_url: Url,
     256              :     jwt_token: Option<String>,
     257              :     client: reqwest::Client,
     258              : }
     259              : 
     260              : impl Client {
     261            0 :     fn new(base_url: Url, jwt_token: Option<String>) -> Self {
     262            0 :         Self {
     263            0 :             base_url,
     264            0 :             jwt_token,
     265            0 :             client: reqwest::ClientBuilder::new()
     266            0 :                 .build()
     267            0 :                 .expect("Failed to construct http client"),
     268            0 :         }
     269            0 :     }
     270              : 
     271              :     /// Simple HTTP request wrapper for calling into storage controller
     272            0 :     async fn dispatch<RQ, RS>(
     273            0 :         &self,
     274            0 :         method: Method,
     275            0 :         path: String,
     276            0 :         body: Option<RQ>,
     277            0 :     ) -> mgmt_api::Result<RS>
     278            0 :     where
     279            0 :         RQ: Serialize + Sized,
     280            0 :         RS: DeserializeOwned + Sized,
     281            0 :     {
     282            0 :         // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
     283            0 :         // for general purpose API access.
     284            0 :         let url = Url::from_str(&format!(
     285            0 :             "http://{}:{}/{path}",
     286            0 :             self.base_url.host_str().unwrap(),
     287            0 :             self.base_url.port().unwrap()
     288            0 :         ))
     289            0 :         .unwrap();
     290            0 : 
     291            0 :         let mut builder = self.client.request(method, url);
     292            0 :         if let Some(body) = body {
     293            0 :             builder = builder.json(&body)
     294            0 :         }
     295            0 :         if let Some(jwt_token) = &self.jwt_token {
     296            0 :             builder = builder.header(
     297            0 :                 reqwest::header::AUTHORIZATION,
     298            0 :                 format!("Bearer {jwt_token}"),
     299            0 :             );
     300            0 :         }
     301              : 
     302            0 :         let response = builder.send().await.map_err(mgmt_api::Error::ReceiveBody)?;
     303            0 :         let response = response.error_from_body().await?;
     304              : 
     305            0 :         response
     306            0 :             .json()
     307            0 :             .await
     308            0 :             .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)
     309            0 :     }
     310              : }
     311              : 
     312              : #[tokio::main]
     313            0 : async fn main() -> anyhow::Result<()> {
     314            0 :     let cli = Cli::parse();
     315            0 : 
     316            0 :     let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
     317            0 : 
     318            0 :     let mut trimmed = cli.api.to_string();
     319            0 :     trimmed.pop();
     320            0 :     let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
     321            0 : 
     322            0 :     match cli.command {
     323            0 :         Command::NodeRegister {
     324            0 :             node_id,
     325            0 :             listen_pg_addr,
     326            0 :             listen_pg_port,
     327            0 :             listen_http_addr,
     328            0 :             listen_http_port,
     329            0 :         } => {
     330            0 :             storcon_client
     331            0 :                 .dispatch::<_, ()>(
     332            0 :                     Method::POST,
     333            0 :                     "control/v1/node".to_string(),
     334            0 :                     Some(NodeRegisterRequest {
     335            0 :                         node_id,
     336            0 :                         listen_pg_addr,
     337            0 :                         listen_pg_port,
     338            0 :                         listen_http_addr,
     339            0 :                         listen_http_port,
     340            0 :                     }),
     341            0 :                 )
     342            0 :                 .await?;
     343            0 :         }
     344            0 :         Command::TenantCreate { tenant_id } => {
     345            0 :             vps_client
     346            0 :                 .tenant_create(&TenantCreateRequest {
     347            0 :                     new_tenant_id: TenantShardId::unsharded(tenant_id),
     348            0 :                     generation: None,
     349            0 :                     shard_parameters: ShardParameters::default(),
     350            0 :                     placement_policy: Some(PlacementPolicy::Attached(1)),
     351            0 :                     config: TenantConfig::default(),
     352            0 :                 })
     353            0 :                 .await?;
     354            0 :         }
     355            0 :         Command::TenantDelete { tenant_id } => {
     356            0 :             let status = vps_client
     357            0 :                 .tenant_delete(TenantShardId::unsharded(tenant_id))
     358            0 :                 .await?;
     359            0 :             tracing::info!("Delete status: {}", status);
     360            0 :         }
     361            0 :         Command::Nodes {} => {
     362            0 :             let resp = storcon_client
     363            0 :                 .dispatch::<(), Vec<NodeDescribeResponse>>(
     364            0 :                     Method::GET,
     365            0 :                     "control/v1/node".to_string(),
     366            0 :                     None,
     367            0 :                 )
     368            0 :                 .await?;
     369            0 :             let mut table = comfy_table::Table::new();
     370            0 :             table.set_header(["Id", "Hostname", "Scheduling", "Availability"]);
     371            0 :             for node in resp {
     372            0 :                 table.add_row([
     373            0 :                     format!("{}", node.id),
     374            0 :                     node.listen_http_addr,
     375            0 :                     format!("{:?}", node.scheduling),
     376            0 :                     format!("{:?}", node.availability),
     377            0 :                 ]);
     378            0 :             }
     379            0 :             println!("{table}");
     380            0 :         }
     381            0 :         Command::NodeConfigure {
     382            0 :             node_id,
     383            0 :             availability,
     384            0 :             scheduling,
     385            0 :         } => {
     386            0 :             let req = NodeConfigureRequest {
     387            0 :                 node_id,
     388            0 :                 availability: availability.map(|a| a.0),
     389            0 :                 scheduling,
     390            0 :             };
     391            0 :             storcon_client
     392            0 :                 .dispatch::<_, ()>(
     393            0 :                     Method::PUT,
     394            0 :                     format!("control/v1/node/{node_id}/config"),
     395            0 :                     Some(req),
     396            0 :                 )
     397            0 :                 .await?;
     398            0 :         }
     399            0 :         Command::Tenants {} => {
     400            0 :             let resp = storcon_client
     401            0 :                 .dispatch::<(), Vec<TenantDescribeResponse>>(
     402            0 :                     Method::GET,
     403            0 :                     "control/v1/tenant".to_string(),
     404            0 :                     None,
     405            0 :                 )
     406            0 :                 .await?;
     407            0 :             let mut table = comfy_table::Table::new();
     408            0 :             table.set_header([
     409            0 :                 "TenantId",
     410            0 :                 "ShardCount",
     411            0 :                 "StripeSize",
     412            0 :                 "Placement",
     413            0 :                 "Scheduling",
     414            0 :             ]);
     415            0 :             for tenant in resp {
     416            0 :                 let shard_zero = tenant.shards.into_iter().next().unwrap();
     417            0 :                 table.add_row([
     418            0 :                     format!("{}", tenant.tenant_id),
     419            0 :                     format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
     420            0 :                     format!("{:?}", tenant.stripe_size),
     421            0 :                     format!("{:?}", tenant.policy),
     422            0 :                     format!("{:?}", shard_zero.scheduling_policy),
     423            0 :                 ]);
     424            0 :             }
     425            0 : 
     426            0 :             println!("{table}");
     427            0 :         }
     428            0 :         Command::TenantPolicy {
     429            0 :             tenant_id,
     430            0 :             placement,
     431            0 :             scheduling,
     432            0 :         } => {
     433            0 :             let req = TenantPolicyRequest {
     434            0 :                 scheduling: scheduling.map(|s| s.0),
     435            0 :                 placement: placement.map(|p| p.0),
     436            0 :             };
     437            0 :             storcon_client
     438            0 :                 .dispatch::<_, ()>(
     439            0 :                     Method::PUT,
     440            0 :                     format!("control/v1/tenant/{tenant_id}/policy"),
     441            0 :                     Some(req),
     442            0 :                 )
     443            0 :                 .await?;
     444            0 :         }
     445            0 :         Command::TenantShardSplit {
     446            0 :             tenant_id,
     447            0 :             shard_count,
     448            0 :             stripe_size,
     449            0 :         } => {
     450            0 :             let req = TenantShardSplitRequest {
     451            0 :                 new_shard_count: shard_count,
     452            0 :                 new_stripe_size: stripe_size.map(ShardStripeSize),
     453            0 :             };
     454            0 : 
     455            0 :             let response = storcon_client
     456            0 :                 .dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
     457            0 :                     Method::PUT,
     458            0 :                     format!("control/v1/tenant/{tenant_id}/shard_split"),
     459            0 :                     Some(req),
     460            0 :                 )
     461            0 :                 .await?;
     462            0 :             println!(
     463            0 :                 "Split tenant {} into {} shards: {}",
     464            0 :                 tenant_id,
     465            0 :                 shard_count,
     466            0 :                 response
     467            0 :                     .new_shards
     468            0 :                     .iter()
     469            0 :                     .map(|s| format!("{:?}", s))
     470            0 :                     .collect::<Vec<_>>()
     471            0 :                     .join(",")
     472            0 :             );
     473            0 :         }
     474            0 :         Command::TenantShardMigrate {
     475            0 :             tenant_shard_id,
     476            0 :             node,
     477            0 :         } => {
     478            0 :             let req = TenantShardMigrateRequest {
     479            0 :                 tenant_shard_id,
     480            0 :                 node_id: node,
     481            0 :             };
     482            0 : 
     483            0 :             storcon_client
     484            0 :                 .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
     485            0 :                     Method::PUT,
     486            0 :                     format!("control/v1/tenant/{tenant_shard_id}/migrate"),
     487            0 :                     Some(req),
     488            0 :                 )
     489            0 :                 .await?;
     490            0 :         }
     491            0 :         Command::TenantConfig { tenant_id, config } => {
     492            0 :             let tenant_conf = serde_json::from_str(&config)?;
     493            0 : 
     494            0 :             vps_client
     495            0 :                 .tenant_config(&TenantConfigRequest {
     496            0 :                     tenant_id,
     497            0 :                     config: tenant_conf,
     498            0 :                 })
     499            0 :                 .await?;
     500            0 :         }
     501            0 :         Command::TenantScatter { tenant_id } => {
     502            0 :             // Find the shards
     503            0 :             let locate_response = storcon_client
     504            0 :                 .dispatch::<(), TenantLocateResponse>(
     505            0 :                     Method::GET,
     506            0 :                     format!("control/v1/tenant/{tenant_id}/locate"),
     507            0 :                     None,
     508            0 :                 )
     509            0 :                 .await?;
     510            0 :             let shards = locate_response.shards;
     511            0 : 
     512            0 :             let mut node_to_shards: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
     513            0 :             let shard_count = shards.len();
     514            0 :             for s in shards {
     515            0 :                 let entry = node_to_shards.entry(s.node_id).or_default();
     516            0 :                 entry.push(s.shard_id);
     517            0 :             }
     518            0 : 
     519            0 :             // Load list of available nodes
     520            0 :             let nodes_resp = storcon_client
     521            0 :                 .dispatch::<(), Vec<NodeDescribeResponse>>(
     522            0 :                     Method::GET,
     523            0 :                     "control/v1/node".to_string(),
     524            0 :                     None,
     525            0 :                 )
     526            0 :                 .await?;
     527            0 : 
     528            0 :             for node in nodes_resp {
     529            0 :                 if matches!(node.availability, NodeAvailabilityWrapper::Active) {
     530            0 :                     node_to_shards.entry(node.id).or_default();
     531            0 :                 }
     532            0 :             }
     533            0 : 
     534            0 :             let max_shard_per_node = shard_count / node_to_shards.len();
     535            0 : 
     536            0 :             loop {
     537            0 :                 let mut migrate_shard = None;
     538            0 :                 for shards in node_to_shards.values_mut() {
     539            0 :                     if shards.len() > max_shard_per_node {
     540            0 :                         // Pick the emptiest
     541            0 :                         migrate_shard = Some(shards.pop().unwrap());
     542            0 :                     }
     543            0 :                 }
     544            0 :                 let Some(migrate_shard) = migrate_shard else {
     545            0 :                     break;
     546            0 :                 };
     547            0 : 
     548            0 :                 // Pick the emptiest node to migrate to
     549            0 :                 let mut destinations = node_to_shards
     550            0 :                     .iter()
     551            0 :                     .map(|(k, v)| (k, v.len()))
     552            0 :                     .collect::<Vec<_>>();
     553            0 :                 destinations.sort_by_key(|i| i.1);
     554            0 :                 let (destination_node, destination_count) = *destinations.first().unwrap();
     555            0 :                 if destination_count + 1 > max_shard_per_node {
     556            0 :                     // Even the emptiest destination doesn't have space: we're done
     557            0 :                     break;
     558            0 :                 }
     559            0 :                 let destination_node = *destination_node;
     560            0 : 
     561            0 :                 node_to_shards
     562            0 :                     .get_mut(&destination_node)
     563            0 :                     .unwrap()
     564            0 :                     .push(migrate_shard);
     565            0 : 
     566            0 :                 println!("Migrate {} -> {} ...", migrate_shard, destination_node);
     567            0 : 
     568            0 :                 storcon_client
     569            0 :                     .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
     570            0 :                         Method::PUT,
     571            0 :                         format!("control/v1/tenant/{migrate_shard}/migrate"),
     572            0 :                         Some(TenantShardMigrateRequest {
     573            0 :                             tenant_shard_id: migrate_shard,
     574            0 :                             node_id: destination_node,
     575            0 :                         }),
     576            0 :                     )
     577            0 :                     .await?;
     578            0 :                 println!("Migrate {} -> {} OK", migrate_shard, destination_node);
     579            0 :             }
     580            0 : 
     581            0 :             // Spread the shards across the nodes
     582            0 :         }
     583            0 :         Command::TenantDescribe { tenant_id } => {
     584            0 :             let describe_response = storcon_client
     585            0 :                 .dispatch::<(), TenantDescribeResponse>(
     586            0 :                     Method::GET,
     587            0 :                     format!("control/v1/tenant/{tenant_id}"),
     588            0 :                     None,
     589            0 :                 )
     590            0 :                 .await?;
     591            0 :             let shards = describe_response.shards;
     592            0 :             let mut table = comfy_table::Table::new();
     593            0 :             table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]);
     594            0 :             for shard in shards {
     595            0 :                 let secondary = shard
     596            0 :                     .node_secondary
     597            0 :                     .iter()
     598            0 :                     .map(|n| format!("{}", n))
     599            0 :                     .collect::<Vec<_>>()
     600            0 :                     .join(",");
     601            0 : 
     602            0 :                 let mut status_parts = Vec::new();
     603            0 :                 if shard.is_reconciling {
     604            0 :                     status_parts.push("reconciling");
     605            0 :                 }
     606            0 : 
     607            0 :                 if shard.is_pending_compute_notification {
     608            0 :                     status_parts.push("pending_compute");
     609            0 :                 }
     610            0 : 
     611            0 :                 if shard.is_splitting {
     612            0 :                     status_parts.push("splitting");
     613            0 :                 }
     614            0 :                 let status = status_parts.join(",");
     615            0 : 
     616            0 :                 table.add_row([
     617            0 :                     format!("{}", shard.tenant_shard_id),
     618            0 :                     shard
     619            0 :                         .node_attached
     620            0 :                         .map(|n| format!("{}", n))
     621            0 :                         .unwrap_or(String::new()),
     622            0 :                     secondary,
     623            0 :                     shard.last_error,
     624            0 :                     status,
     625            0 :                 ]);
     626            0 :             }
     627            0 :             println!("{table}");
     628            0 :         }
     629            0 :         Command::TenantWarmup { tenant_id } => {
     630            0 :             let describe_response = storcon_client
     631            0 :                 .dispatch::<(), TenantDescribeResponse>(
     632            0 :                     Method::GET,
     633            0 :                     format!("control/v1/tenant/{tenant_id}"),
     634            0 :                     None,
     635            0 :                 )
     636            0 :                 .await;
     637            0 :             match describe_response {
     638            0 :                 Ok(describe) => {
     639            0 :                     if matches!(describe.policy, PlacementPolicy::Secondary) {
     640            0 :                         // Fine: it's already known to controller in secondary mode: calling
     641            0 :                         // again to put it into secondary mode won't cause problems.
     642            0 :                     } else {
     643            0 :                         anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
     644            0 :                     }
     645            0 :                 }
     646            0 :                 Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
     647            0 :                     // Fine: this tenant isn't know to the storage controller yet.
     648            0 :                 }
     649            0 :                 Err(e) => {
     650            0 :                     // Unexpected API error
     651            0 :                     return Err(e.into());
     652            0 :                 }
     653            0 :             }
     654            0 : 
     655            0 :             vps_client
     656            0 :                 .location_config(
     657            0 :                     TenantShardId::unsharded(tenant_id),
     658            0 :                     pageserver_api::models::LocationConfig {
     659            0 :                         mode: pageserver_api::models::LocationConfigMode::Secondary,
     660            0 :                         generation: None,
     661            0 :                         secondary_conf: Some(LocationConfigSecondary { warm: true }),
     662            0 :                         shard_number: 0,
     663            0 :                         shard_count: 0,
     664            0 :                         shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
     665            0 :                         tenant_conf: TenantConfig::default(),
     666            0 :                     },
     667            0 :                     None,
     668            0 :                     true,
     669            0 :                 )
     670            0 :                 .await?;
     671            0 : 
     672            0 :             let describe_response = storcon_client
     673            0 :                 .dispatch::<(), TenantDescribeResponse>(
     674            0 :                     Method::GET,
     675            0 :                     format!("control/v1/tenant/{tenant_id}"),
     676            0 :                     None,
     677            0 :                 )
     678            0 :                 .await?;
     679            0 : 
     680            0 :             let secondary_ps_id = describe_response
     681            0 :                 .shards
     682            0 :                 .first()
     683            0 :                 .unwrap()
     684            0 :                 .node_secondary
     685            0 :                 .first()
     686            0 :                 .unwrap();
     687            0 : 
     688            0 :             println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
     689            0 :             loop {
     690            0 :                 let (status, progress) = vps_client
     691            0 :                     .tenant_secondary_download(
     692            0 :                         TenantShardId::unsharded(tenant_id),
     693            0 :                         Some(Duration::from_secs(10)),
     694            0 :                     )
     695            0 :                     .await?;
     696            0 :                 println!(
     697            0 :                     "Progress: {}/{} layers, {}/{} bytes",
     698            0 :                     progress.layers_downloaded,
     699            0 :                     progress.layers_total,
     700            0 :                     progress.bytes_downloaded,
     701            0 :                     progress.bytes_total
     702            0 :                 );
     703            0 :                 match status {
     704            0 :                     StatusCode::OK => {
     705            0 :                         println!("Download complete");
     706            0 :                         break;
     707            0 :                     }
     708            0 :                     StatusCode::ACCEPTED => {
     709            0 :                         // Loop
     710            0 :                     }
     711            0 :                     _ => {
     712            0 :                         anyhow::bail!("Unexpected download status: {status}");
     713            0 :                     }
     714            0 :                 }
     715            0 :             }
     716            0 :         }
     717            0 :         Command::TenantDrop { tenant_id, unclean } => {
     718            0 :             if !unclean {
     719            0 :                 anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant.  If you know what you're doing, add `--unclean` to proceed.")
     720            0 :             }
     721            0 :             storcon_client
     722            0 :                 .dispatch::<(), ()>(
     723            0 :                     Method::POST,
     724            0 :                     format!("debug/v1/tenant/{tenant_id}/drop"),
     725            0 :                     None,
     726            0 :                 )
     727            0 :                 .await?;
     728            0 :         }
     729            0 :         Command::NodeDrop { node_id, unclean } => {
     730            0 :             if !unclean {
     731            0 :                 anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it.  If you know what you're doing, add `--unclean` to proceed.")
     732            0 :             }
     733            0 :             storcon_client
     734            0 :                 .dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
     735            0 :                 .await?;
     736            0 :         }
     737            0 :         Command::TenantSetTimeBasedEviction {
     738            0 :             tenant_id,
     739            0 :             period,
     740            0 :             threshold,
     741            0 :         } => {
     742            0 :             vps_client
     743            0 :                 .tenant_config(&TenantConfigRequest {
     744            0 :                     tenant_id,
     745            0 :                     config: TenantConfig {
     746            0 :                         eviction_policy: Some(EvictionPolicy::LayerAccessThreshold(
     747            0 :                             EvictionPolicyLayerAccessThreshold {
     748            0 :                                 period: period.into(),
     749            0 :                                 threshold: threshold.into(),
     750            0 :                             },
     751            0 :                         )),
     752            0 :                         ..Default::default()
     753            0 :                     },
     754            0 :                 })
     755            0 :                 .await?;
     756            0 :         }
     757            0 :         Command::Drain {
     758            0 :             nodes,
     759            0 :             concurrency,
     760            0 :             max_shards,
     761            0 :             dry_run,
     762            0 :         } => {
     763            0 :             // Load the list of nodes, split them up into the drained and filled sets,
     764            0 :             // and validate that draining is possible.
     765            0 :             let node_descs = storcon_client
     766            0 :                 .dispatch::<(), Vec<NodeDescribeResponse>>(
     767            0 :                     Method::GET,
     768            0 :                     "control/v1/node".to_string(),
     769            0 :                     None,
     770            0 :                 )
     771            0 :                 .await?;
     772            0 : 
     773            0 :             let mut node_to_drain_descs = Vec::new();
     774            0 :             let mut node_to_fill_descs = Vec::new();
     775            0 : 
     776            0 :             for desc in node_descs {
     777            0 :                 let to_drain = nodes.iter().any(|id| *id == desc.id);
     778            0 :                 if to_drain {
     779            0 :                     node_to_drain_descs.push(desc);
     780            0 :                 } else {
     781            0 :                     node_to_fill_descs.push(desc);
     782            0 :                 }
     783            0 :             }
     784            0 : 
     785            0 :             if nodes.len() != node_to_drain_descs.len() {
     786            0 :                 anyhow::bail!("Drain requested for node which doesn't exist.")
     787            0 :             }
     788            0 : 
     789            0 :             node_to_fill_descs.retain(|desc| {
     790            0 :                 matches!(desc.availability, NodeAvailabilityWrapper::Active)
     791            0 :                     && matches!(
     792            0 :                         desc.scheduling,
     793            0 :                         NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
     794            0 :                     )
     795            0 :             });
     796            0 : 
     797            0 :             if node_to_fill_descs.is_empty() {
     798            0 :                 anyhow::bail!("There are no nodes to drain to")
     799            0 :             }
     800            0 : 
     801            0 :             // Set the node scheduling policy to draining for the nodes which
     802            0 :             // we plan to drain.
     803            0 :             for node_desc in node_to_drain_descs.iter() {
     804            0 :                 let req = NodeConfigureRequest {
     805            0 :                     node_id: node_desc.id,
     806            0 :                     availability: None,
     807            0 :                     scheduling: Some(NodeSchedulingPolicy::Draining),
     808            0 :                 };
     809            0 : 
     810            0 :                 storcon_client
     811            0 :                     .dispatch::<_, ()>(
     812            0 :                         Method::PUT,
     813            0 :                         format!("control/v1/node/{}/config", node_desc.id),
     814            0 :                         Some(req),
     815            0 :                     )
     816            0 :                     .await?;
     817            0 :             }
     818            0 : 
     819            0 :             // Perform the drain: move each tenant shard scheduled on a node to
     820            0 :             // be drained to a node which is being filled. A simple round robin
     821            0 :             // strategy is used to pick the new node.
     822            0 :             let tenants = storcon_client
     823            0 :                 .dispatch::<(), Vec<TenantDescribeResponse>>(
     824            0 :                     Method::GET,
     825            0 :                     "control/v1/tenant".to_string(),
     826            0 :                     None,
     827            0 :                 )
     828            0 :                 .await?;
     829            0 : 
     830            0 :             let mut selected_node_idx = 0;
     831            0 : 
     832            0 :             struct DrainMove {
     833            0 :                 tenant_shard_id: TenantShardId,
     834            0 :                 from: NodeId,
     835            0 :                 to: NodeId,
     836            0 :             }
     837            0 : 
     838            0 :             let mut moves: Vec<DrainMove> = Vec::new();
     839            0 : 
     840            0 :             let shards = tenants
     841            0 :                 .into_iter()
     842            0 :                 .flat_map(|tenant| tenant.shards.into_iter());
     843            0 :             for shard in shards {
     844            0 :                 if let Some(max_shards) = max_shards {
     845            0 :                     if moves.len() >= max_shards {
     846            0 :                         println!(
     847            0 :                             "Stop planning shard moves since the requested maximum was reached"
     848            0 :                         );
     849            0 :                         break;
     850            0 :                     }
     851            0 :                 }
     852            0 : 
     853            0 :                 let should_migrate = {
     854            0 :                     if let Some(attached_to) = shard.node_attached {
     855            0 :                         node_to_drain_descs
     856            0 :                             .iter()
     857            0 :                             .map(|desc| desc.id)
     858            0 :                             .any(|id| id == attached_to)
     859            0 :                     } else {
     860            0 :                         false
     861            0 :                     }
     862            0 :                 };
     863            0 : 
     864            0 :                 if !should_migrate {
     865            0 :                     continue;
     866            0 :                 }
     867            0 : 
     868            0 :                 moves.push(DrainMove {
     869            0 :                     tenant_shard_id: shard.tenant_shard_id,
     870            0 :                     from: shard
     871            0 :                         .node_attached
     872            0 :                         .expect("We only migrate attached tenant shards"),
     873            0 :                     to: node_to_fill_descs[selected_node_idx].id,
     874            0 :                 });
     875            0 :                 selected_node_idx = (selected_node_idx + 1) % node_to_fill_descs.len();
     876            0 :             }
     877            0 : 
     878            0 :             let total_moves = moves.len();
     879            0 : 
     880            0 :             if dry_run == Some(true) {
     881            0 :                 println!("Dryrun requested. Planned {total_moves} moves:");
     882            0 :                 for mv in &moves {
     883            0 :                     println!("{}: {} -> {}", mv.tenant_shard_id, mv.from, mv.to)
     884            0 :                 }
     885            0 : 
     886            0 :                 return Ok(());
     887            0 :             }
     888            0 : 
     889            0 :             const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
     890            0 :             let mut stream = futures::stream::iter(moves)
     891            0 :                 .map(|mv| {
     892            0 :                     let client = Client::new(cli.api.clone(), cli.jwt.clone());
     893            0 :                     async move {
     894            0 :                         client
     895            0 :                             .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
     896            0 :                                 Method::PUT,
     897            0 :                                 format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
     898            0 :                                 Some(TenantShardMigrateRequest {
     899            0 :                                     tenant_shard_id: mv.tenant_shard_id,
     900            0 :                                     node_id: mv.to,
     901            0 :                                 }),
     902            0 :                             )
     903            0 :                             .await
     904            0 :                             .map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
     905            0 :                     }
     906            0 :                 })
     907            0 :                 .buffered(concurrency.unwrap_or(DEFAULT_MIGRATE_CONCURRENCY));
     908            0 : 
     909            0 :             let mut success = 0;
     910            0 :             let mut failure = 0;
     911            0 : 
     912            0 :             while let Some(res) = stream.next().await {
     913            0 :                 match res {
     914            0 :                     Ok(_) => {
     915            0 :                         success += 1;
     916            0 :                     }
     917            0 :                     Err((tenant_shard_id, from, to, error)) => {
     918            0 :                         failure += 1;
     919            0 :                         println!(
     920            0 :                             "Failed to migrate {} from node {} to node {}: {}",
     921            0 :                             tenant_shard_id, from, to, error
     922            0 :                         );
     923            0 :                     }
     924            0 :                 }
     925            0 : 
     926            0 :                 if (success + failure) % 20 == 0 {
     927            0 :                     println!(
     928            0 :                         "Processed {}/{} shards: {} succeeded, {} failed",
     929            0 :                         success + failure,
     930            0 :                         total_moves,
     931            0 :                         success,
     932            0 :                         failure
     933            0 :                     );
     934            0 :                 }
     935            0 :             }
     936            0 : 
     937            0 :             println!(
     938            0 :                 "Processed {}/{} shards: {} succeeded, {} failed",
     939            0 :                 success + failure,
     940            0 :                 total_moves,
     941            0 :                 success,
     942            0 :                 failure
     943            0 :             );
     944            0 :         }
     945            0 :     }
     946            0 : 
     947            0 :     Ok(())
     948            0 : }
         |