|             Line data    Source code 
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::path::PathBuf;
       3              : use std::str::FromStr;
       4              : use std::time::Duration;
       5              : 
       6              : use clap::{Parser, Subcommand};
       7              : use futures::StreamExt;
       8              : use pageserver_api::controller_api::{
       9              :     AvailabilityZone, MigrationConfig, NodeAvailabilityWrapper, NodeConfigureRequest,
      10              :     NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse,
      11              :     PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest,
      12              :     ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
      13              :     SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
      14              :     TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest,
      15              : };
      16              : use pageserver_api::models::{
      17              :     EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig,
      18              :     TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest,
      19              :     TenantShardSplitResponse,
      20              : };
      21              : use pageserver_api::shard::{ShardStripeSize, TenantShardId};
      22              : use pageserver_client::mgmt_api::{self};
      23              : use reqwest::{Certificate, Method, StatusCode, Url};
      24              : use safekeeper_api::models::TimelineLocateResponse;
      25              : use storage_controller_client::control_api::Client;
      26              : use utils::id::{NodeId, TenantId, TimelineId};
      27              : 
      28              : #[derive(Subcommand, Debug)]
      29              : enum Command {
      30              :     /// Register a pageserver with the storage controller.  This shouldn't usually be necessary,
      31              :     /// since pageservers auto-register when they start up
      32              :     NodeRegister {
      33              :         #[arg(long)]
      34              :         node_id: NodeId,
      35              : 
      36              :         #[arg(long)]
      37              :         listen_pg_addr: String,
      38              :         #[arg(long)]
      39              :         listen_pg_port: u16,
      40              :         #[arg(long)]
      41              :         listen_grpc_addr: Option<String>,
      42              :         #[arg(long)]
      43              :         listen_grpc_port: Option<u16>,
      44              : 
      45              :         #[arg(long)]
      46              :         listen_http_addr: String,
      47              :         #[arg(long)]
      48              :         listen_http_port: u16,
      49              :         #[arg(long)]
      50              :         listen_https_port: Option<u16>,
      51              : 
      52              :         #[arg(long)]
      53              :         availability_zone_id: String,
      54              :     },
      55              : 
      56              :     /// Modify a node's configuration in the storage controller
      57              :     NodeConfigure {
      58              :         #[arg(long)]
      59              :         node_id: NodeId,
      60              : 
      61              :         /// Availability is usually auto-detected based on heartbeats.  Set 'offline' here to
      62              :         /// manually mark a node offline
      63              :         #[arg(long)]
      64              :         availability: Option<NodeAvailabilityArg>,
      65              :         /// Scheduling policy controls whether tenant shards may be scheduled onto this node.
      66              :         #[arg(long)]
      67              :         scheduling: Option<NodeSchedulingPolicy>,
      68              :     },
      69              :     /// Exists for backup usage and will be removed in future.
      70              :     /// Use [`Command::NodeStartDelete`] instead, if possible.
      71              :     NodeDelete {
      72              :         #[arg(long)]
      73              :         node_id: NodeId,
      74              :     },
      75              :     /// Start deletion of the specified pageserver.
      76              :     NodeStartDelete {
      77              :         #[arg(long)]
      78              :         node_id: NodeId,
      79              :         /// When `force` is true, skip waiting for shards to prewarm during migration.
      80              :         /// This can significantly speed up node deletion since prewarming all shards
      81              :         /// can take considerable time, but may result in slower initial access to
      82              :         /// migrated shards until they warm up naturally.
      83              :         #[arg(long)]
      84              :         force: bool,
      85              :     },
      86              :     /// Cancel deletion of the specified pageserver and wait for `timeout`
      87              :     /// for the operation to be canceled. May be retried.
      88              :     NodeCancelDelete {
      89              :         #[arg(long)]
      90              :         node_id: NodeId,
      91              :         #[arg(long)]
      92              :         timeout: humantime::Duration,
      93              :     },
      94              :     /// Delete a tombstone of node from the storage controller.
      95              :     /// This is used when we want to allow the node to be re-registered.
      96              :     NodeDeleteTombstone {
      97              :         #[arg(long)]
      98              :         node_id: NodeId,
      99              :     },
     100              :     /// Modify a tenant's policies in the storage controller
     101              :     TenantPolicy {
     102              :         #[arg(long)]
     103              :         tenant_id: TenantId,
     104              :         /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
     105              :         /// or is in the normal attached state with N secondary locations (`attached:N`)
     106              :         #[arg(long)]
     107              :         placement: Option<PlacementPolicyArg>,
     108              :         /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant.  `active` is normal,
     109              :         /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
     110              :         /// all reconciliation activity including for scheduling changes already made.  `pause` and `stop` can make a tenant
     111              :         /// unavailable, and are only for use in emergencies.
     112              :         #[arg(long)]
     113              :         scheduling: Option<ShardSchedulingPolicyArg>,
     114              :     },
     115              :     /// List nodes known to the storage controller
     116              :     Nodes {},
     117              :     /// List soft deleted nodes known to the storage controller
     118              :     NodeTombstones {},
     119              :     /// List tenants known to the storage controller
     120              :     Tenants {
     121              :         /// If this field is set, it will list the tenants on a specific node
     122              :         node_id: Option<NodeId>,
     123              :     },
     124              :     /// Create a new tenant in the storage controller, and by extension on pageservers.
     125              :     TenantCreate {
     126              :         #[arg(long)]
     127              :         tenant_id: TenantId,
     128              :     },
     129              :     /// Delete a tenant in the storage controller, and by extension on pageservers.
     130              :     TenantDelete {
     131              :         #[arg(long)]
     132              :         tenant_id: TenantId,
     133              :     },
     134              :     /// Split an existing tenant into a higher number of shards than its current shard count.
     135              :     TenantShardSplit {
     136              :         #[arg(long)]
     137              :         tenant_id: TenantId,
     138              :         #[arg(long)]
     139              :         shard_count: u8,
     140              :         /// Optional, in 8kiB pages.  e.g. set 2048 for 16MB stripes.
     141              :         #[arg(long)]
     142              :         stripe_size: Option<u32>,
     143              :     },
     144              :     /// Migrate the attached location for a tenant shard to a specific pageserver.
     145              :     TenantShardMigrate {
     146              :         #[arg(long)]
     147              :         tenant_shard_id: TenantShardId,
     148              :         #[arg(long)]
     149              :         node: NodeId,
     150              :         #[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
     151              :         prewarm: bool,
     152              :         #[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
     153              :         override_scheduler: bool,
     154              :     },
     155              :     /// Watch the location of a tenant shard evolve, e.g. while expecting it to migrate
     156              :     TenantShardWatch {
     157              :         #[arg(long)]
     158              :         tenant_shard_id: TenantShardId,
     159              :     },
     160              :     /// Migrate the secondary location for a tenant shard to a specific pageserver.
     161              :     TenantShardMigrateSecondary {
     162              :         #[arg(long)]
     163              :         tenant_shard_id: TenantShardId,
     164              :         #[arg(long)]
     165              :         node: NodeId,
     166              :     },
     167              :     /// Cancel any ongoing reconciliation for this shard
     168              :     TenantShardCancelReconcile {
     169              :         #[arg(long)]
     170              :         tenant_shard_id: TenantShardId,
     171              :     },
     172              :     /// Set the pageserver tenant configuration of a tenant: this is the configuration structure
     173              :     /// that is passed through to pageservers, and does not affect storage controller behavior.
     174              :     /// Any previous tenant configs are overwritten.
     175              :     SetTenantConfig {
     176              :         #[arg(long)]
     177              :         tenant_id: TenantId,
     178              :         #[arg(long)]
     179              :         config: String,
     180              :     },
     181              :     /// Patch the pageserver tenant configuration of a tenant. Any fields with null values in the
     182              :     /// provided JSON are unset from the tenant config and all fields with non-null values are set.
     183              :     /// Unspecified fields are not changed.
     184              :     PatchTenantConfig {
     185              :         #[arg(long)]
     186              :         tenant_id: TenantId,
     187              :         #[arg(long)]
     188              :         config: String,
     189              :     },
     190              :     /// Print details about a particular tenant, including all its shards' states.
     191              :     TenantDescribe {
     192              :         #[arg(long)]
     193              :         tenant_id: TenantId,
     194              :     },
     195              :     TenantSetPreferredAz {
     196              :         #[arg(long)]
     197              :         tenant_id: TenantId,
     198              :         #[arg(long)]
     199              :         preferred_az: Option<String>,
     200              :     },
     201              :     /// Uncleanly drop a tenant from the storage controller: this doesn't delete anything from pageservers. Appropriate
     202              :     /// if you e.g. used `tenant-warmup` by mistake on a tenant ID that doesn't really exist, or is in some other region.
     203              :     TenantDrop {
     204              :         #[arg(long)]
     205              :         tenant_id: TenantId,
     206              :         #[arg(long)]
     207              :         unclean: bool,
     208              :     },
     209              :     NodeDrop {
     210              :         #[arg(long)]
     211              :         node_id: NodeId,
     212              :         #[arg(long)]
     213              :         unclean: bool,
     214              :     },
     215              :     TenantSetTimeBasedEviction {
     216              :         #[arg(long)]
     217              :         tenant_id: TenantId,
     218              :         #[arg(long)]
     219              :         period: humantime::Duration,
     220              :         #[arg(long)]
     221              :         threshold: humantime::Duration,
     222              :     },
     223              :     // Migrate away from a set of specified pageservers by moving the primary attachments to pageservers
     224              :     // outside of the specified set.
     225              :     BulkMigrate {
     226              :         // Set of pageserver node ids to drain.
     227              :         #[arg(long)]
     228              :         nodes: Vec<NodeId>,
     229              :         // Optional: migration concurrency (default is 8)
     230              :         #[arg(long)]
     231              :         concurrency: Option<usize>,
     232              :         // Optional: maximum number of shards to migrate
     233              :         #[arg(long)]
     234              :         max_shards: Option<usize>,
     235              :         // Optional: when set to true, nothing is migrated, but the plan is printed to stdout
     236              :         #[arg(long)]
     237              :         dry_run: Option<bool>,
     238              :     },
     239              :     /// Start draining the specified pageserver.
     240              :     /// The drain is complete when the schedulling policy returns to active.
     241              :     StartDrain {
     242              :         #[arg(long)]
     243              :         node_id: NodeId,
     244              :     },
     245              :     /// Cancel draining the specified pageserver and wait for `timeout`
     246              :     /// for the operation to be canceled. May be retried.
     247              :     CancelDrain {
     248              :         #[arg(long)]
     249              :         node_id: NodeId,
     250              :         #[arg(long)]
     251              :         timeout: humantime::Duration,
     252              :     },
     253              :     /// Start filling the specified pageserver.
     254              :     /// The drain is complete when the schedulling policy returns to active.
     255              :     StartFill {
     256              :         #[arg(long)]
     257              :         node_id: NodeId,
     258              :     },
     259              :     /// Cancel filling the specified pageserver and wait for `timeout`
     260              :     /// for the operation to be canceled. May be retried.
     261              :     CancelFill {
     262              :         #[arg(long)]
     263              :         node_id: NodeId,
     264              :         #[arg(long)]
     265              :         timeout: humantime::Duration,
     266              :     },
     267              :     /// List safekeepers known to the storage controller
     268              :     Safekeepers {},
     269              :     /// Set the scheduling policy of the specified safekeeper
     270              :     SafekeeperScheduling {
     271              :         #[arg(long)]
     272              :         node_id: NodeId,
     273              :         #[arg(long)]
     274              :         scheduling_policy: SkSchedulingPolicyArg,
     275              :     },
     276              :     /// Downloads any missing heatmap layers for all shard for a given timeline
     277              :     DownloadHeatmapLayers {
     278              :         /// Tenant ID or tenant shard ID. When an unsharded tenant ID is specified,
     279              :         /// the operation is performed on all shards. When a sharded tenant ID is
     280              :         /// specified, the operation is only performed on the specified shard.
     281              :         #[arg(long)]
     282              :         tenant_shard_id: TenantShardId,
     283              :         #[arg(long)]
     284              :         timeline_id: TimelineId,
     285              :         /// Optional: Maximum download concurrency (default is 16)
     286              :         #[arg(long)]
     287              :         concurrency: Option<usize>,
     288              :     },
     289              :     /// Locate safekeepers for a timeline from the storcon DB.
     290              :     TimelineLocate {
     291              :         #[arg(long)]
     292              :         tenant_id: TenantId,
     293              :         #[arg(long)]
     294              :         timeline_id: TimelineId,
     295              :     },
     296              :     /// Migrate a timeline to a new set of safekeepers
     297              :     TimelineSafekeeperMigrate {
     298              :         #[arg(long)]
     299              :         tenant_id: TenantId,
     300              :         #[arg(long)]
     301              :         timeline_id: TimelineId,
     302              :         /// Example: --new-sk-set 1,2,3
     303              :         #[arg(long, required = true, value_delimiter = ',')]
     304              :         new_sk_set: Vec<NodeId>,
     305              :     },
     306              :     /// Abort ongoing safekeeper migration.
     307              :     TimelineSafekeeperMigrateAbort {
     308              :         #[arg(long)]
     309              :         tenant_id: TenantId,
     310              :         #[arg(long)]
     311              :         timeline_id: TimelineId,
     312              :     },
     313              : }
     314              : 
     315              : #[derive(Parser)]
     316              : #[command(
     317              :     author,
     318              :     version,
     319              :     about,
     320              :     long_about = "CLI for Storage Controller Support/Debug"
     321              : )]
     322              : #[command(arg_required_else_help(true))]
     323              : struct Cli {
     324              :     #[arg(long)]
     325              :     /// URL to storage controller.  e.g. http://127.0.0.1:1234 when using `neon_local`
     326              :     api: Url,
     327              : 
     328              :     #[arg(long)]
     329              :     /// JWT token for authenticating with storage controller.  Depending on the API used, this
     330              :     /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
     331              :     /// a token with both scopes to use with this tool.
     332              :     jwt: Option<String>,
     333              : 
     334              :     #[arg(long)]
     335              :     /// Trusted root CA certificates to use in https APIs.
     336              :     ssl_ca_file: Option<PathBuf>,
     337              : 
     338              :     #[command(subcommand)]
     339              :     command: Command,
     340              : }
     341              : 
     342              : #[derive(Debug, Clone)]
     343              : struct PlacementPolicyArg(PlacementPolicy);
     344              : 
     345              : impl FromStr for PlacementPolicyArg {
     346              :     type Err = anyhow::Error;
     347              : 
     348            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     349            0 :         match s {
     350            0 :             "detached" => Ok(Self(PlacementPolicy::Detached)),
     351            0 :             "secondary" => Ok(Self(PlacementPolicy::Secondary)),
     352            0 :             _ if s.starts_with("attached:") => {
     353            0 :                 let mut splitter = s.split(':');
     354            0 :                 let _prefix = splitter.next().unwrap();
     355            0 :                 match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
     356            0 :                     Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
     357            0 :                     None => Err(anyhow::anyhow!(
     358            0 :                         "Invalid format '{s}', a valid example is 'attached:1'"
     359            0 :                     )),
     360              :                 }
     361              :             }
     362            0 :             _ => Err(anyhow::anyhow!(
     363            0 :                 "Unknown placement policy '{s}', try detached,secondary,attached:<n>"
     364            0 :             )),
     365              :         }
     366            0 :     }
     367              : }
     368              : 
     369              : #[derive(Debug, Clone)]
     370              : struct SkSchedulingPolicyArg(SkSchedulingPolicy);
     371              : 
     372              : impl FromStr for SkSchedulingPolicyArg {
     373              :     type Err = anyhow::Error;
     374              : 
     375            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     376            0 :         SkSchedulingPolicy::from_str(s).map(Self)
     377            0 :     }
     378              : }
     379              : 
     380              : #[derive(Debug, Clone)]
     381              : struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
     382              : 
     383              : impl FromStr for ShardSchedulingPolicyArg {
     384              :     type Err = anyhow::Error;
     385              : 
     386            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     387            0 :         match s {
     388            0 :             "active" => Ok(Self(ShardSchedulingPolicy::Active)),
     389            0 :             "essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
     390            0 :             "pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
     391            0 :             "stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
     392            0 :             _ => Err(anyhow::anyhow!(
     393            0 :                 "Unknown scheduling policy '{s}', try active,essential,pause,stop"
     394            0 :             )),
     395              :         }
     396            0 :     }
     397              : }
     398              : 
     399              : #[derive(Debug, Clone)]
     400              : struct NodeAvailabilityArg(NodeAvailabilityWrapper);
     401              : 
     402              : impl FromStr for NodeAvailabilityArg {
     403              :     type Err = anyhow::Error;
     404              : 
     405            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     406            0 :         match s {
     407            0 :             "active" => Ok(Self(NodeAvailabilityWrapper::Active)),
     408            0 :             "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
     409            0 :             _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
     410              :         }
     411            0 :     }
     412              : }
     413              : 
     414            0 : async fn wait_for_scheduling_policy<F>(
     415            0 :     client: Client,
     416            0 :     node_id: NodeId,
     417            0 :     timeout: Duration,
     418            0 :     f: F,
     419            0 : ) -> anyhow::Result<NodeSchedulingPolicy>
     420            0 : where
     421            0 :     F: Fn(NodeSchedulingPolicy) -> bool,
     422            0 : {
     423            0 :     let waiter = tokio::time::timeout(timeout, async move {
     424              :         loop {
     425            0 :             let node = client
     426            0 :                 .dispatch::<(), NodeDescribeResponse>(
     427            0 :                     Method::GET,
     428            0 :                     format!("control/v1/node/{node_id}"),
     429            0 :                     None,
     430            0 :                 )
     431            0 :                 .await?;
     432              : 
     433            0 :             if f(node.scheduling) {
     434            0 :                 return Ok::<NodeSchedulingPolicy, mgmt_api::Error>(node.scheduling);
     435            0 :             }
     436              :         }
     437            0 :     });
     438              : 
     439            0 :     Ok(waiter.await??)
     440            0 : }
     441              : 
     442              : #[tokio::main]
     443            0 : async fn main() -> anyhow::Result<()> {
     444            0 :     let cli = Cli::parse();
     445              : 
     446            0 :     let ssl_ca_certs = match &cli.ssl_ca_file {
     447            0 :         Some(ssl_ca_file) => {
     448            0 :             let buf = tokio::fs::read(ssl_ca_file).await?;
     449            0 :             Certificate::from_pem_bundle(&buf)?
     450              :         }
     451            0 :         None => Vec::new(),
     452              :     };
     453              : 
     454            0 :     let mut http_client = reqwest::Client::builder();
     455            0 :     for ssl_ca_cert in ssl_ca_certs {
     456            0 :         http_client = http_client.add_root_certificate(ssl_ca_cert);
     457            0 :     }
     458            0 :     let http_client = http_client.build()?;
     459              : 
     460            0 :     let storcon_client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
     461              : 
     462            0 :     let mut trimmed = cli.api.to_string();
     463            0 :     trimmed.pop();
     464            0 :     let vps_client = mgmt_api::Client::new(http_client.clone(), trimmed, cli.jwt.as_deref());
     465              : 
     466            0 :     match cli.command {
     467            0 :         Command::NodeRegister {
     468            0 :             node_id,
     469            0 :             listen_pg_addr,
     470            0 :             listen_pg_port,
     471            0 :             listen_grpc_addr,
     472            0 :             listen_grpc_port,
     473            0 :             listen_http_addr,
     474            0 :             listen_http_port,
     475            0 :             listen_https_port,
     476            0 :             availability_zone_id,
     477            0 :         } => {
     478            0 :             storcon_client
     479            0 :                 .dispatch::<_, ()>(
     480            0 :                     Method::POST,
     481            0 :                     "control/v1/node".to_string(),
     482            0 :                     Some(NodeRegisterRequest {
     483            0 :                         node_id,
     484            0 :                         listen_pg_addr,
     485            0 :                         listen_pg_port,
     486            0 :                         listen_grpc_addr,
     487            0 :                         listen_grpc_port,
     488            0 :                         listen_http_addr,
     489            0 :                         listen_http_port,
     490            0 :                         listen_https_port,
     491            0 :                         availability_zone_id: AvailabilityZone(availability_zone_id),
     492            0 :                         node_ip_addr: None,
     493            0 :                     }),
     494            0 :                 )
     495            0 :                 .await?;
     496            0 :         }
     497            0 :         Command::TenantCreate { tenant_id } => {
     498            0 :             storcon_client
     499            0 :                 .dispatch::<_, ()>(
     500            0 :                     Method::POST,
     501            0 :                     "v1/tenant".to_string(),
     502            0 :                     Some(TenantCreateRequest {
     503            0 :                         new_tenant_id: TenantShardId::unsharded(tenant_id),
     504            0 :                         generation: None,
     505            0 :                         shard_parameters: ShardParameters::default(),
     506            0 :                         placement_policy: Some(PlacementPolicy::Attached(1)),
     507            0 :                         config: TenantConfig::default(),
     508            0 :                     }),
     509            0 :                 )
     510            0 :                 .await?;
     511            0 :         }
     512            0 :         Command::TenantDelete { tenant_id } => {
     513            0 :             let status = vps_client
     514            0 :                 .tenant_delete(TenantShardId::unsharded(tenant_id))
     515            0 :                 .await?;
     516            0 :             tracing::info!("Delete status: {}", status);
     517            0 :         }
     518            0 :         Command::Nodes {} => {
     519            0 :             let mut resp = storcon_client
     520            0 :                 .dispatch::<(), Vec<NodeDescribeResponse>>(
     521            0 :                     Method::GET,
     522            0 :                     "control/v1/node".to_string(),
     523            0 :                     None,
     524            0 :                 )
     525            0 :                 .await?;
     526            0 : 
     527            0 :             resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
     528            0 : 
     529            0 :             let mut table = comfy_table::Table::new();
     530            0 :             table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
     531            0 :             for node in resp {
     532            0 :                 table.add_row([
     533            0 :                     format!("{}", node.id),
     534            0 :                     node.listen_http_addr,
     535            0 :                     node.availability_zone_id,
     536            0 :                     format!("{:?}", node.scheduling),
     537            0 :                     format!("{:?}", node.availability),
     538            0 :                 ]);
     539            0 :             }
     540            0 :             println!("{table}");
     541            0 :         }
     542            0 :         Command::NodeConfigure {
     543            0 :             node_id,
     544            0 :             availability,
     545            0 :             scheduling,
     546            0 :         } => {
     547            0 :             let req = NodeConfigureRequest {
     548            0 :                 node_id,
     549            0 :                 availability: availability.map(|a| a.0),
     550            0 :                 scheduling,
     551            0 :             };
     552            0 :             storcon_client
     553            0 :                 .dispatch::<_, ()>(
     554            0 :                     Method::PUT,
     555            0 :                     format!("control/v1/node/{node_id}/config"),
     556            0 :                     Some(req),
     557            0 :                 )
     558            0 :                 .await?;
     559            0 :         }
     560            0 :         Command::Tenants {
     561            0 :             node_id: Some(node_id),
     562            0 :         } => {
     563            0 :             let describe_response = storcon_client
     564            0 :                 .dispatch::<(), NodeShardResponse>(
     565            0 :                     Method::GET,
     566            0 :                     format!("control/v1/node/{node_id}/shards"),
     567            0 :                     None,
     568            0 :                 )
     569            0 :                 .await?;
     570            0 :             let shards = describe_response.shards;
     571            0 :             let mut table = comfy_table::Table::new();
     572            0 :             table.set_header([
     573            0 :                 "Shard",
     574            0 :                 "Intended Primary/Secondary",
     575            0 :                 "Observed Primary/Secondary",
     576            0 :             ]);
     577            0 :             for shard in shards {
     578            0 :                 table.add_row([
     579            0 :                     format!("{}", shard.tenant_shard_id),
     580            0 :                     match shard.is_intended_secondary {
     581            0 :                         None => "".to_string(),
     582            0 :                         Some(true) => "Secondary".to_string(),
     583            0 :                         Some(false) => "Primary".to_string(),
     584            0 :                     },
     585            0 :                     match shard.is_observed_secondary {
     586            0 :                         None => "".to_string(),
     587            0 :                         Some(true) => "Secondary".to_string(),
     588            0 :                         Some(false) => "Primary".to_string(),
     589            0 :                     },
     590            0 :                 ]);
     591            0 :             }
     592            0 :             println!("{table}");
     593            0 :         }
     594            0 :         Command::Tenants { node_id: None } => {
     595            0 :             // Set up output formatting
     596            0 :             let mut table = comfy_table::Table::new();
     597            0 :             table.set_header([
     598            0 :                 "TenantId",
     599            0 :                 "Preferred AZ",
     600            0 :                 "ShardCount",
     601            0 :                 "StripeSize",
     602            0 :                 "Placement",
     603            0 :                 "Scheduling",
     604            0 :             ]);
     605            0 : 
     606            0 :             // Pagination loop over listing API
     607            0 :             let mut start_after = None;
     608            0 :             const LIMIT: usize = 1000;
     609            0 :             loop {
     610            0 :                 let path = match start_after {
     611            0 :                     None => format!("control/v1/tenant?limit={LIMIT}"),
     612            0 :                     Some(start_after) => {
     613            0 :                         format!("control/v1/tenant?limit={LIMIT}&start_after={start_after}")
     614            0 :                     }
     615            0 :                 };
     616            0 : 
     617            0 :                 let resp = storcon_client
     618            0 :                     .dispatch::<(), Vec<TenantDescribeResponse>>(Method::GET, path, None)
     619            0 :                     .await?;
     620            0 : 
     621            0 :                 if resp.is_empty() {
     622            0 :                     // End of data reached
     623            0 :                     break;
     624            0 :                 }
     625            0 : 
     626            0 :                 // Give some visual feedback while we're building up the table (comfy_table doesn't have
     627            0 :                 // streaming output)
     628            0 :                 if resp.len() >= LIMIT {
     629            0 :                     eprint!(".");
     630            0 :                 }
     631            0 : 
     632            0 :                 start_after = Some(resp.last().unwrap().tenant_id);
     633            0 : 
     634            0 :                 for tenant in resp {
     635            0 :                     let shard_zero = tenant.shards.into_iter().next().unwrap();
     636            0 :                     table.add_row([
     637            0 :                         format!("{}", tenant.tenant_id),
     638            0 :                         shard_zero
     639            0 :                             .preferred_az_id
     640            0 :                             .as_ref()
     641            0 :                             .cloned()
     642            0 :                             .unwrap_or("".to_string()),
     643            0 :                         format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
     644            0 :                         format!("{:?}", tenant.stripe_size),
     645            0 :                         format!("{:?}", tenant.policy),
     646            0 :                         format!("{:?}", shard_zero.scheduling_policy),
     647            0 :                     ]);
     648            0 :                 }
     649            0 :             }
     650            0 : 
     651            0 :             // Terminate progress dots
     652            0 :             if table.row_count() > LIMIT {
     653            0 :                 eprint!("");
     654            0 :             }
     655            0 : 
     656            0 :             println!("{table}");
     657            0 :         }
     658            0 :         Command::TenantPolicy {
     659            0 :             tenant_id,
     660            0 :             placement,
     661            0 :             scheduling,
     662            0 :         } => {
     663            0 :             let req = TenantPolicyRequest {
     664            0 :                 scheduling: scheduling.map(|s| s.0),
     665            0 :                 placement: placement.map(|p| p.0),
     666            0 :             };
     667            0 :             storcon_client
     668            0 :                 .dispatch::<_, ()>(
     669            0 :                     Method::PUT,
     670            0 :                     format!("control/v1/tenant/{tenant_id}/policy"),
     671            0 :                     Some(req),
     672            0 :                 )
     673            0 :                 .await?;
     674            0 :         }
     675            0 :         Command::TenantShardSplit {
     676            0 :             tenant_id,
     677            0 :             shard_count,
     678            0 :             stripe_size,
     679            0 :         } => {
     680            0 :             let req = TenantShardSplitRequest {
     681            0 :                 new_shard_count: shard_count,
     682            0 :                 new_stripe_size: stripe_size.map(ShardStripeSize),
     683            0 :             };
     684            0 : 
     685            0 :             let response = storcon_client
     686            0 :                 .dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
     687            0 :                     Method::PUT,
     688            0 :                     format!("control/v1/tenant/{tenant_id}/shard_split"),
     689            0 :                     Some(req),
     690            0 :                 )
     691            0 :                 .await?;
     692            0 :             println!(
     693            0 :                 "Split tenant {} into {} shards: {}",
     694            0 :                 tenant_id,
     695            0 :                 shard_count,
     696            0 :                 response
     697            0 :                     .new_shards
     698            0 :                     .iter()
     699            0 :                     .map(|s| format!("{s:?}"))
     700            0 :                     .collect::<Vec<_>>()
     701            0 :                     .join(",")
     702            0 :             );
     703            0 :         }
     704            0 :         Command::TenantShardMigrate {
     705            0 :             tenant_shard_id,
     706            0 :             node,
     707            0 :             prewarm,
     708            0 :             override_scheduler,
     709            0 :         } => {
     710            0 :             let migration_config = MigrationConfig {
     711            0 :                 prewarm,
     712            0 :                 override_scheduler,
     713            0 :                 ..Default::default()
     714            0 :             };
     715            0 : 
     716            0 :             let req = TenantShardMigrateRequest {
     717            0 :                 node_id: node,
     718            0 :                 origin_node_id: None,
     719            0 :                 migration_config,
     720            0 :             };
     721            0 : 
     722            0 :             match storcon_client
     723            0 :                 .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
     724            0 :                     Method::PUT,
     725            0 :                     format!("control/v1/tenant/{tenant_shard_id}/migrate"),
     726            0 :                     Some(req),
     727            0 :                 )
     728            0 :                 .await
     729            0 :             {
     730            0 :                 Err(mgmt_api::Error::ApiError(StatusCode::PRECONDITION_FAILED, msg)) => {
     731            0 :                     anyhow::bail!(
     732            0 :                         "Migration to {node} rejected, may require `--force` ({}) ",
     733            0 :                         msg
     734            0 :                     );
     735            0 :                 }
     736            0 :                 Err(e) => return Err(e.into()),
     737            0 :                 Ok(_) => {}
     738            0 :             }
     739            0 : 
     740            0 :             watch_tenant_shard(storcon_client, tenant_shard_id, Some(node)).await?;
     741            0 :         }
     742            0 :         Command::TenantShardWatch { tenant_shard_id } => {
     743            0 :             watch_tenant_shard(storcon_client, tenant_shard_id, None).await?;
     744            0 :         }
     745            0 :         Command::TenantShardMigrateSecondary {
     746            0 :             tenant_shard_id,
     747            0 :             node,
     748            0 :         } => {
     749            0 :             let req = TenantShardMigrateRequest {
     750            0 :                 node_id: node,
     751            0 :                 origin_node_id: None,
     752            0 :                 migration_config: MigrationConfig::default(),
     753            0 :             };
     754            0 : 
     755            0 :             storcon_client
     756            0 :                 .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
     757            0 :                     Method::PUT,
     758            0 :                     format!("control/v1/tenant/{tenant_shard_id}/migrate_secondary"),
     759            0 :                     Some(req),
     760            0 :                 )
     761            0 :                 .await?;
     762            0 :         }
     763            0 :         Command::TenantShardCancelReconcile { tenant_shard_id } => {
     764            0 :             storcon_client
     765            0 :                 .dispatch::<(), ()>(
     766            0 :                     Method::PUT,
     767            0 :                     format!("control/v1/tenant/{tenant_shard_id}/cancel_reconcile"),
     768            0 :                     None,
     769            0 :                 )
     770            0 :                 .await?;
     771            0 :         }
     772            0 :         Command::SetTenantConfig { tenant_id, config } => {
     773            0 :             let tenant_conf = serde_json::from_str(&config)?;
     774            0 : 
     775            0 :             vps_client
     776            0 :                 .set_tenant_config(&TenantConfigRequest {
     777            0 :                     tenant_id,
     778            0 :                     config: tenant_conf,
     779            0 :                 })
     780            0 :                 .await?;
     781            0 :         }
     782            0 :         Command::PatchTenantConfig { tenant_id, config } => {
     783            0 :             let tenant_conf = serde_json::from_str(&config)?;
     784            0 : 
     785            0 :             vps_client
     786            0 :                 .patch_tenant_config(&TenantConfigPatchRequest {
     787            0 :                     tenant_id,
     788            0 :                     config: tenant_conf,
     789            0 :                 })
     790            0 :                 .await?;
     791            0 :         }
     792            0 :         Command::TenantDescribe { tenant_id } => {
     793            0 :             let TenantDescribeResponse {
     794            0 :                 tenant_id,
     795            0 :                 shards,
     796            0 :                 stripe_size,
     797            0 :                 policy,
     798            0 :                 config,
     799            0 :             } = storcon_client
     800            0 :                 .dispatch::<(), TenantDescribeResponse>(
     801            0 :                     Method::GET,
     802            0 :                     format!("control/v1/tenant/{tenant_id}"),
     803            0 :                     None,
     804            0 :                 )
     805            0 :                 .await?;
     806            0 : 
     807            0 :             let nodes = storcon_client
     808            0 :                 .dispatch::<(), Vec<NodeDescribeResponse>>(
     809            0 :                     Method::GET,
     810            0 :                     "control/v1/node".to_string(),
     811            0 :                     None,
     812            0 :                 )
     813            0 :                 .await?;
     814            0 :             let nodes = nodes
     815            0 :                 .into_iter()
     816            0 :                 .map(|n| (n.id, n))
     817            0 :                 .collect::<HashMap<_, _>>();
     818            0 : 
     819            0 :             println!("Tenant {tenant_id}");
     820            0 :             let mut table = comfy_table::Table::new();
     821            0 :             table.add_row(["Policy", &format!("{policy:?}")]);
     822            0 :             table.add_row(["Stripe size", &format!("{stripe_size:?}")]);
     823            0 :             table.add_row(["Config", &serde_json::to_string_pretty(&config).unwrap()]);
     824            0 :             println!("{table}");
     825            0 :             println!("Shards:");
     826            0 :             let mut table = comfy_table::Table::new();
     827            0 :             table.set_header([
     828            0 :                 "Shard",
     829            0 :                 "Attached",
     830            0 :                 "Attached AZ",
     831            0 :                 "Secondary",
     832            0 :                 "Last error",
     833            0 :                 "status",
     834            0 :             ]);
     835            0 :             for shard in shards {
     836            0 :                 let secondary = shard
     837            0 :                     .node_secondary
     838            0 :                     .iter()
     839            0 :                     .map(|n| format!("{n}"))
     840            0 :                     .collect::<Vec<_>>()
     841            0 :                     .join(",");
     842            0 : 
     843            0 :                 let mut status_parts = Vec::new();
     844            0 :                 if shard.is_reconciling {
     845            0 :                     status_parts.push("reconciling");
     846            0 :                 }
     847            0 : 
     848            0 :                 if shard.is_pending_compute_notification {
     849            0 :                     status_parts.push("pending_compute");
     850            0 :                 }
     851            0 : 
     852            0 :                 if shard.is_splitting {
     853            0 :                     status_parts.push("splitting");
     854            0 :                 }
     855            0 :                 let status = status_parts.join(",");
     856            0 : 
     857            0 :                 let attached_node = shard
     858            0 :                     .node_attached
     859            0 :                     .as_ref()
     860            0 :                     .map(|id| nodes.get(id).expect("Shard references nonexistent node"));
     861            0 : 
     862            0 :                 table.add_row([
     863            0 :                     format!("{}", shard.tenant_shard_id),
     864            0 :                     attached_node
     865            0 :                         .map(|n| format!("{} ({})", n.listen_http_addr, n.id))
     866            0 :                         .unwrap_or(String::new()),
     867            0 :                     attached_node
     868            0 :                         .map(|n| n.availability_zone_id.clone())
     869            0 :                         .unwrap_or(String::new()),
     870            0 :                     secondary,
     871            0 :                     shard.last_error,
     872            0 :                     status,
     873            0 :                 ]);
     874            0 :             }
     875            0 :             println!("{table}");
     876            0 :         }
     877            0 :         Command::TenantSetPreferredAz {
     878            0 :             tenant_id,
     879            0 :             preferred_az,
     880            0 :         } => {
     881            0 :             // First learn about the tenant's shards
     882            0 :             let describe_response = storcon_client
     883            0 :                 .dispatch::<(), TenantDescribeResponse>(
     884            0 :                     Method::GET,
     885            0 :                     format!("control/v1/tenant/{tenant_id}"),
     886            0 :                     None,
     887            0 :                 )
     888            0 :                 .await?;
     889            0 : 
     890            0 :             // Learn about nodes to validate the AZ ID
     891            0 :             let nodes = storcon_client
     892            0 :                 .dispatch::<(), Vec<NodeDescribeResponse>>(
     893            0 :                     Method::GET,
     894            0 :                     "control/v1/node".to_string(),
     895            0 :                     None,
     896            0 :                 )
     897            0 :                 .await?;
     898            0 : 
     899            0 :             if let Some(preferred_az) = &preferred_az {
     900            0 :                 let azs = nodes
     901            0 :                     .into_iter()
     902            0 :                     .map(|n| (n.availability_zone_id))
     903            0 :                     .collect::<HashSet<_>>();
     904            0 :                 if !azs.contains(preferred_az) {
     905            0 :                     anyhow::bail!(
     906            0 :                         "AZ {} not found on any node: known AZs are: {:?}",
     907            0 :                         preferred_az,
     908            0 :                         azs
     909            0 :                     );
     910            0 :                 }
     911            0 :             } else {
     912            0 :                 // Make it obvious to the user that since they've omitted an AZ, we're clearing it
     913            0 :                 eprintln!("Clearing preferred AZ for tenant {tenant_id}");
     914            0 :             }
     915            0 : 
     916            0 :             // Construct a request that modifies all the tenant's shards
     917            0 :             let req = ShardsPreferredAzsRequest {
     918            0 :                 preferred_az_ids: describe_response
     919            0 :                     .shards
     920            0 :                     .into_iter()
     921            0 :                     .map(|s| {
     922            0 :                         (
     923            0 :                             s.tenant_shard_id,
     924            0 :                             preferred_az.clone().map(AvailabilityZone),
     925            0 :                         )
     926            0 :                     })
     927            0 :                     .collect(),
     928            0 :             };
     929            0 :             storcon_client
     930            0 :                 .dispatch::<ShardsPreferredAzsRequest, ShardsPreferredAzsResponse>(
     931            0 :                     Method::PUT,
     932            0 :                     "control/v1/preferred_azs".to_string(),
     933            0 :                     Some(req),
     934            0 :                 )
     935            0 :                 .await?;
     936            0 :         }
     937            0 :         Command::TenantDrop { tenant_id, unclean } => {
     938            0 :             if !unclean {
     939            0 :                 anyhow::bail!(
     940            0 :                     "This command is not a tenant deletion, and uncleanly drops all controller state for the tenant.  If you know what you're doing, add `--unclean` to proceed."
     941            0 :                 )
     942            0 :             }
     943            0 :             storcon_client
     944            0 :                 .dispatch::<(), ()>(
     945            0 :                     Method::POST,
     946            0 :                     format!("debug/v1/tenant/{tenant_id}/drop"),
     947            0 :                     None,
     948            0 :                 )
     949            0 :                 .await?;
     950            0 :         }
     951            0 :         Command::NodeDrop { node_id, unclean } => {
     952            0 :             if !unclean {
     953            0 :                 anyhow::bail!(
     954            0 :                     "This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it.  If you know what you're doing, add `--unclean` to proceed."
     955            0 :                 )
     956            0 :             }
     957            0 :             storcon_client
     958            0 :                 .dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
     959            0 :                 .await?;
     960            0 :         }
     961            0 :         Command::NodeDelete { node_id } => {
     962            0 :             eprintln!("Warning: This command is obsolete and will be removed in a future version");
     963            0 :             eprintln!("Use `NodeStartDelete` instead, if possible");
     964            0 :             storcon_client
     965            0 :                 .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
     966            0 :                 .await?;
     967            0 :         }
     968            0 :         Command::NodeStartDelete { node_id, force } => {
     969            0 :             let query = if force {
     970            0 :                 format!("control/v1/node/{node_id}/delete?force=true")
     971            0 :             } else {
     972            0 :                 format!("control/v1/node/{node_id}/delete")
     973            0 :             };
     974            0 :             storcon_client
     975            0 :                 .dispatch::<(), ()>(Method::PUT, query, None)
     976            0 :                 .await?;
     977            0 :             println!("Delete started for {node_id}");
     978            0 :         }
     979            0 :         Command::NodeCancelDelete { node_id, timeout } => {
     980            0 :             storcon_client
     981            0 :                 .dispatch::<(), ()>(
     982            0 :                     Method::DELETE,
     983            0 :                     format!("control/v1/node/{node_id}/delete"),
     984            0 :                     None,
     985            0 :                 )
     986            0 :                 .await?;
     987            0 : 
     988            0 :             println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
     989            0 : 
     990            0 :             let final_policy =
     991            0 :                 wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
     992            0 :                     !matches!(sched, NodeSchedulingPolicy::Deleting)
     993            0 :                 })
     994            0 :                 .await?;
     995            0 : 
     996            0 :             println!(
     997            0 :                 "Delete was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
     998            0 :             );
     999            0 :         }
    1000            0 :         Command::NodeDeleteTombstone { node_id } => {
    1001            0 :             storcon_client
    1002            0 :                 .dispatch::<(), ()>(
    1003            0 :                     Method::DELETE,
    1004            0 :                     format!("debug/v1/tombstone/{node_id}"),
    1005            0 :                     None,
    1006            0 :                 )
    1007            0 :                 .await?;
    1008            0 :         }
    1009            0 :         Command::NodeTombstones {} => {
    1010            0 :             let mut resp = storcon_client
    1011            0 :                 .dispatch::<(), Vec<NodeDescribeResponse>>(
    1012            0 :                     Method::GET,
    1013            0 :                     "debug/v1/tombstone".to_string(),
    1014            0 :                     None,
    1015            0 :                 )
    1016            0 :                 .await?;
    1017            0 : 
    1018            0 :             resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
    1019            0 : 
    1020            0 :             let mut table = comfy_table::Table::new();
    1021            0 :             table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
    1022            0 :             for node in resp {
    1023            0 :                 table.add_row([
    1024            0 :                     format!("{}", node.id),
    1025            0 :                     node.listen_http_addr,
    1026            0 :                     node.availability_zone_id,
    1027            0 :                     format!("{:?}", node.scheduling),
    1028            0 :                     format!("{:?}", node.availability),
    1029            0 :                 ]);
    1030            0 :             }
    1031            0 :             println!("{table}");
    1032            0 :         }
    1033            0 :         Command::TenantSetTimeBasedEviction {
    1034            0 :             tenant_id,
    1035            0 :             period,
    1036            0 :             threshold,
    1037            0 :         } => {
    1038            0 :             vps_client
    1039            0 :                 .set_tenant_config(&TenantConfigRequest {
    1040            0 :                     tenant_id,
    1041            0 :                     config: TenantConfig {
    1042            0 :                         eviction_policy: Some(EvictionPolicy::LayerAccessThreshold(
    1043            0 :                             EvictionPolicyLayerAccessThreshold {
    1044            0 :                                 period: period.into(),
    1045            0 :                                 threshold: threshold.into(),
    1046            0 :                             },
    1047            0 :                         )),
    1048            0 :                         heatmap_period: Some(Duration::from_secs(300)),
    1049            0 :                         ..Default::default()
    1050            0 :                     },
    1051            0 :                 })
    1052            0 :                 .await?;
    1053            0 :         }
    1054            0 :         Command::BulkMigrate {
    1055            0 :             nodes,
    1056            0 :             concurrency,
    1057            0 :             max_shards,
    1058            0 :             dry_run,
    1059            0 :         } => {
    1060            0 :             // Load the list of nodes, split them up into the drained and filled sets,
    1061            0 :             // and validate that draining is possible.
    1062            0 :             let node_descs = storcon_client
    1063            0 :                 .dispatch::<(), Vec<NodeDescribeResponse>>(
    1064            0 :                     Method::GET,
    1065            0 :                     "control/v1/node".to_string(),
    1066            0 :                     None,
    1067            0 :                 )
    1068            0 :                 .await?;
    1069            0 : 
    1070            0 :             let mut node_to_drain_descs = Vec::new();
    1071            0 :             let mut node_to_fill_descs = Vec::new();
    1072            0 : 
    1073            0 :             for desc in node_descs {
    1074            0 :                 let to_drain = nodes.contains(&desc.id);
    1075            0 :                 if to_drain {
    1076            0 :                     node_to_drain_descs.push(desc);
    1077            0 :                 } else {
    1078            0 :                     node_to_fill_descs.push(desc);
    1079            0 :                 }
    1080            0 :             }
    1081            0 : 
    1082            0 :             if nodes.len() != node_to_drain_descs.len() {
    1083            0 :                 anyhow::bail!("Bulk migration requested away from node which doesn't exist.")
    1084            0 :             }
    1085            0 : 
    1086            0 :             node_to_fill_descs.retain(|desc| {
    1087            0 :                 matches!(desc.availability, NodeAvailabilityWrapper::Active)
    1088            0 :                     && matches!(
    1089            0 :                         desc.scheduling,
    1090            0 :                         NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
    1091            0 :                     )
    1092            0 :             });
    1093            0 : 
    1094            0 :             if node_to_fill_descs.is_empty() {
    1095            0 :                 anyhow::bail!("There are no nodes to migrate to")
    1096            0 :             }
    1097            0 : 
    1098            0 :             // Set the node scheduling policy to draining for the nodes which
    1099            0 :             // we plan to drain.
    1100            0 :             for node_desc in node_to_drain_descs.iter() {
    1101            0 :                 let req = NodeConfigureRequest {
    1102            0 :                     node_id: node_desc.id,
    1103            0 :                     availability: None,
    1104            0 :                     scheduling: Some(NodeSchedulingPolicy::Draining),
    1105            0 :                 };
    1106            0 : 
    1107            0 :                 storcon_client
    1108            0 :                     .dispatch::<_, ()>(
    1109            0 :                         Method::PUT,
    1110            0 :                         format!("control/v1/node/{}/config", node_desc.id),
    1111            0 :                         Some(req),
    1112            0 :                     )
    1113            0 :                     .await?;
    1114            0 :             }
    1115            0 : 
    1116            0 :             // Perform the migration: move each tenant shard scheduled on a node to
    1117            0 :             // be drained to a node which is being filled. A simple round robin
    1118            0 :             // strategy is used to pick the new node.
    1119            0 :             let tenants = storcon_client
    1120            0 :                 .dispatch::<(), Vec<TenantDescribeResponse>>(
    1121            0 :                     Method::GET,
    1122            0 :                     "control/v1/tenant".to_string(),
    1123            0 :                     None,
    1124            0 :                 )
    1125            0 :                 .await?;
    1126            0 : 
    1127            0 :             let mut selected_node_idx = 0;
    1128            0 : 
    1129            0 :             struct MigrationMove {
    1130            0 :                 tenant_shard_id: TenantShardId,
    1131            0 :                 from: NodeId,
    1132            0 :                 to: NodeId,
    1133            0 :             }
    1134            0 : 
    1135            0 :             let mut moves: Vec<MigrationMove> = Vec::new();
    1136            0 : 
    1137            0 :             let shards = tenants
    1138            0 :                 .into_iter()
    1139            0 :                 .flat_map(|tenant| tenant.shards.into_iter());
    1140            0 :             for shard in shards {
    1141            0 :                 if let Some(max_shards) = max_shards {
    1142            0 :                     if moves.len() >= max_shards {
    1143            0 :                         println!(
    1144            0 :                             "Stop planning shard moves since the requested maximum was reached"
    1145            0 :                         );
    1146            0 :                         break;
    1147            0 :                     }
    1148            0 :                 }
    1149            0 : 
    1150            0 :                 let should_migrate = {
    1151            0 :                     if let Some(attached_to) = shard.node_attached {
    1152            0 :                         node_to_drain_descs
    1153            0 :                             .iter()
    1154            0 :                             .map(|desc| desc.id)
    1155            0 :                             .any(|id| id == attached_to)
    1156            0 :                     } else {
    1157            0 :                         false
    1158            0 :                     }
    1159            0 :                 };
    1160            0 : 
    1161            0 :                 if !should_migrate {
    1162            0 :                     continue;
    1163            0 :                 }
    1164            0 : 
    1165            0 :                 moves.push(MigrationMove {
    1166            0 :                     tenant_shard_id: shard.tenant_shard_id,
    1167            0 :                     from: shard
    1168            0 :                         .node_attached
    1169            0 :                         .expect("We only migrate attached tenant shards"),
    1170            0 :                     to: node_to_fill_descs[selected_node_idx].id,
    1171            0 :                 });
    1172            0 :                 selected_node_idx = (selected_node_idx + 1) % node_to_fill_descs.len();
    1173            0 :             }
    1174            0 : 
    1175            0 :             let total_moves = moves.len();
    1176            0 : 
    1177            0 :             if dry_run == Some(true) {
    1178            0 :                 println!("Dryrun requested. Planned {total_moves} moves:");
    1179            0 :                 for mv in &moves {
    1180            0 :                     println!("{}: {} -> {}", mv.tenant_shard_id, mv.from, mv.to)
    1181            0 :                 }
    1182            0 : 
    1183            0 :                 return Ok(());
    1184            0 :             }
    1185            0 : 
    1186            0 :             const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
    1187            0 :             let mut stream = futures::stream::iter(moves)
    1188            0 :                 .map(|mv| {
    1189            0 :                     let client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
    1190            0 :                     async move {
    1191            0 :                         client
    1192            0 :                             .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
    1193            0 :                                 Method::PUT,
    1194            0 :                                 format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
    1195            0 :                                 Some(TenantShardMigrateRequest {
    1196            0 :                                     node_id: mv.to,
    1197            0 :                                     origin_node_id: Some(mv.from),
    1198            0 :                                     migration_config: MigrationConfig::default(),
    1199            0 :                                 }),
    1200            0 :                             )
    1201            0 :                             .await
    1202            0 :                             .map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
    1203            0 :                     }
    1204            0 :                 })
    1205            0 :                 .buffered(concurrency.unwrap_or(DEFAULT_MIGRATE_CONCURRENCY));
    1206            0 : 
    1207            0 :             let mut success = 0;
    1208            0 :             let mut failure = 0;
    1209            0 : 
    1210            0 :             while let Some(res) = stream.next().await {
    1211            0 :                 match res {
    1212            0 :                     Ok(_) => {
    1213            0 :                         success += 1;
    1214            0 :                     }
    1215            0 :                     Err((tenant_shard_id, from, to, error)) => {
    1216            0 :                         failure += 1;
    1217            0 :                         println!(
    1218            0 :                             "Failed to migrate {tenant_shard_id} from node {from} to node {to}: {error}"
    1219            0 :                         );
    1220            0 :                     }
    1221            0 :                 }
    1222            0 : 
    1223            0 :                 if (success + failure) % 20 == 0 {
    1224            0 :                     println!(
    1225            0 :                         "Processed {}/{} shards: {} succeeded, {} failed",
    1226            0 :                         success + failure,
    1227            0 :                         total_moves,
    1228            0 :                         success,
    1229            0 :                         failure
    1230            0 :                     );
    1231            0 :                 }
    1232            0 :             }
    1233            0 : 
    1234            0 :             println!(
    1235            0 :                 "Processed {}/{} shards: {} succeeded, {} failed",
    1236            0 :                 success + failure,
    1237            0 :                 total_moves,
    1238            0 :                 success,
    1239            0 :                 failure
    1240            0 :             );
    1241            0 :         }
    1242            0 :         Command::StartDrain { node_id } => {
    1243            0 :             storcon_client
    1244            0 :                 .dispatch::<(), ()>(
    1245            0 :                     Method::PUT,
    1246            0 :                     format!("control/v1/node/{node_id}/drain"),
    1247            0 :                     None,
    1248            0 :                 )
    1249            0 :                 .await?;
    1250            0 :             println!("Drain started for {node_id}");
    1251            0 :         }
    1252            0 :         Command::CancelDrain { node_id, timeout } => {
    1253            0 :             storcon_client
    1254            0 :                 .dispatch::<(), ()>(
    1255            0 :                     Method::DELETE,
    1256            0 :                     format!("control/v1/node/{node_id}/drain"),
    1257            0 :                     None,
    1258            0 :                 )
    1259            0 :                 .await?;
    1260            0 : 
    1261            0 :             println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
    1262            0 : 
    1263            0 :             let final_policy =
    1264            0 :                 wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
    1265            0 :                     use NodeSchedulingPolicy::*;
    1266            0 :                     matches!(sched, Active | PauseForRestart)
    1267            0 :                 })
    1268            0 :                 .await?;
    1269            0 : 
    1270            0 :             println!(
    1271            0 :                 "Drain was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
    1272            0 :             );
    1273            0 :         }
    1274            0 :         Command::StartFill { node_id } => {
    1275            0 :             storcon_client
    1276            0 :                 .dispatch::<(), ()>(Method::PUT, format!("control/v1/node/{node_id}/fill"), None)
    1277            0 :                 .await?;
    1278            0 : 
    1279            0 :             println!("Fill started for {node_id}");
    1280            0 :         }
    1281            0 :         Command::CancelFill { node_id, timeout } => {
    1282            0 :             storcon_client
    1283            0 :                 .dispatch::<(), ()>(
    1284            0 :                     Method::DELETE,
    1285            0 :                     format!("control/v1/node/{node_id}/fill"),
    1286            0 :                     None,
    1287            0 :                 )
    1288            0 :                 .await?;
    1289            0 : 
    1290            0 :             println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
    1291            0 : 
    1292            0 :             let final_policy =
    1293            0 :                 wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
    1294            0 :                     use NodeSchedulingPolicy::*;
    1295            0 :                     matches!(sched, Active)
    1296            0 :                 })
    1297            0 :                 .await?;
    1298            0 : 
    1299            0 :             println!(
    1300            0 :                 "Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
    1301            0 :             );
    1302            0 :         }
    1303            0 :         Command::Safekeepers {} => {
    1304            0 :             let mut resp = storcon_client
    1305            0 :                 .dispatch::<(), Vec<SafekeeperDescribeResponse>>(
    1306            0 :                     Method::GET,
    1307            0 :                     "control/v1/safekeeper".to_string(),
    1308            0 :                     None,
    1309            0 :                 )
    1310            0 :                 .await?;
    1311            0 : 
    1312            0 :             resp.sort_by(|a, b| a.id.cmp(&b.id));
    1313            0 : 
    1314            0 :             let mut table = comfy_table::Table::new();
    1315            0 :             table.set_header([
    1316            0 :                 "Id",
    1317            0 :                 "Version",
    1318            0 :                 "Host",
    1319            0 :                 "Port",
    1320            0 :                 "Http Port",
    1321            0 :                 "AZ Id",
    1322            0 :                 "Scheduling",
    1323            0 :             ]);
    1324            0 :             for sk in resp {
    1325            0 :                 table.add_row([
    1326            0 :                     format!("{}", sk.id),
    1327            0 :                     format!("{}", sk.version),
    1328            0 :                     sk.host,
    1329            0 :                     format!("{}", sk.port),
    1330            0 :                     format!("{}", sk.http_port),
    1331            0 :                     sk.availability_zone_id.clone(),
    1332            0 :                     String::from(sk.scheduling_policy),
    1333            0 :                 ]);
    1334            0 :             }
    1335            0 :             println!("{table}");
    1336            0 :         }
    1337            0 :         Command::SafekeeperScheduling {
    1338            0 :             node_id,
    1339            0 :             scheduling_policy,
    1340            0 :         } => {
    1341            0 :             let scheduling_policy = scheduling_policy.0;
    1342            0 :             storcon_client
    1343            0 :                 .dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
    1344            0 :                     Method::POST,
    1345            0 :                     format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
    1346            0 :                     Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
    1347            0 :                 )
    1348            0 :                 .await?;
    1349            0 :             println!(
    1350            0 :                 "Scheduling policy of {node_id} set to {}",
    1351            0 :                 String::from(scheduling_policy)
    1352            0 :             );
    1353            0 :         }
    1354            0 :         Command::DownloadHeatmapLayers {
    1355            0 :             tenant_shard_id,
    1356            0 :             timeline_id,
    1357            0 :             concurrency,
    1358            0 :         } => {
    1359            0 :             let mut path = format!(
    1360            0 :                 "v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
    1361            0 :             );
    1362            0 : 
    1363            0 :             if let Some(c) = concurrency {
    1364            0 :                 path = format!("{path}?concurrency={c}");
    1365            0 :             }
    1366            0 : 
    1367            0 :             storcon_client
    1368            0 :                 .dispatch::<(), ()>(Method::POST, path, None)
    1369            0 :                 .await?;
    1370            0 :         }
    1371            0 :         Command::TimelineLocate {
    1372            0 :             tenant_id,
    1373            0 :             timeline_id,
    1374            0 :         } => {
    1375            0 :             let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate");
    1376            0 : 
    1377            0 :             let resp = storcon_client
    1378            0 :                 .dispatch::<(), TimelineLocateResponse>(Method::GET, path, None)
    1379            0 :                 .await?;
    1380            0 : 
    1381            0 :             let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
    1382            0 :             let new_sk_set = resp
    1383            0 :                 .new_sk_set
    1384            0 :                 .as_ref()
    1385            0 :                 .map(|ids| ids.iter().map(|id| id.0 as i64).collect::<Vec<_>>());
    1386            0 : 
    1387            0 :             println!("generation = {}", resp.generation);
    1388            0 :             println!("sk_set = {sk_set:?}");
    1389            0 :             println!("new_sk_set = {new_sk_set:?}");
    1390            0 :         }
    1391            0 :         Command::TimelineSafekeeperMigrate {
    1392            0 :             tenant_id,
    1393            0 :             timeline_id,
    1394            0 :             new_sk_set,
    1395            0 :         } => {
    1396            0 :             let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate");
    1397            0 : 
    1398            0 :             storcon_client
    1399            0 :                 .dispatch::<_, ()>(
    1400            0 :                     Method::POST,
    1401            0 :                     path,
    1402            0 :                     Some(TimelineSafekeeperMigrateRequest { new_sk_set }),
    1403            0 :                 )
    1404            0 :                 .await?;
    1405            0 :         }
    1406            0 :         Command::TimelineSafekeeperMigrateAbort {
    1407            0 :             tenant_id,
    1408            0 :             timeline_id,
    1409            0 :         } => {
    1410            0 :             let path =
    1411            0 :                 format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort");
    1412            0 : 
    1413            0 :             storcon_client
    1414            0 :                 .dispatch::<(), ()>(Method::POST, path, None)
    1415            0 :                 .await?;
    1416            0 :         }
    1417            0 :     }
    1418            0 : 
    1419            0 :     Ok(())
    1420            0 : }
    1421              : 
    1422              : static WATCH_INTERVAL: Duration = Duration::from_secs(5);
    1423              : 
    1424            0 : async fn watch_tenant_shard(
    1425            0 :     storcon_client: Client,
    1426            0 :     tenant_shard_id: TenantShardId,
    1427            0 :     until_migrated_to: Option<NodeId>,
    1428            0 : ) -> anyhow::Result<()> {
    1429            0 :     if let Some(until_migrated_to) = until_migrated_to {
    1430            0 :         println!(
    1431            0 :             "Waiting for tenant shard {tenant_shard_id} to be migrated to node {until_migrated_to}"
    1432            0 :         );
    1433            0 :     }
    1434              : 
    1435              :     loop {
    1436            0 :         let desc = storcon_client
    1437            0 :             .dispatch::<(), TenantDescribeResponse>(
    1438            0 :                 Method::GET,
    1439            0 :                 format!("control/v1/tenant/{}", tenant_shard_id.tenant_id),
    1440            0 :                 None,
    1441            0 :             )
    1442            0 :             .await?;
    1443              : 
    1444              :         // Output the current state of the tenant shard
    1445            0 :         let shard = desc
    1446            0 :             .shards
    1447            0 :             .iter()
    1448            0 :             .find(|s| s.tenant_shard_id == tenant_shard_id)
    1449            0 :             .ok_or(anyhow::anyhow!("Tenant shard not found"))?;
    1450            0 :         let summary = format!(
    1451            0 :             "attached: {} secondary: {} {}",
    1452            0 :             shard
    1453            0 :                 .node_attached
    1454            0 :                 .map(|n| format!("{n}"))
    1455            0 :                 .unwrap_or("none".to_string()),
    1456            0 :             shard
    1457            0 :                 .node_secondary
    1458            0 :                 .iter()
    1459            0 :                 .map(|n| n.to_string())
    1460            0 :                 .collect::<Vec<_>>()
    1461            0 :                 .join(","),
    1462            0 :             if shard.is_reconciling {
    1463            0 :                 "(reconciler active)"
    1464              :             } else {
    1465            0 :                 "(reconciler idle)"
    1466              :             }
    1467              :         );
    1468            0 :         println!("{summary}");
    1469              : 
    1470              :         // Maybe drop out if we finished migration
    1471            0 :         if let Some(until_migrated_to) = until_migrated_to {
    1472            0 :             if shard.node_attached == Some(until_migrated_to) && !shard.is_reconciling {
    1473            0 :                 println!("Tenant shard {tenant_shard_id} is now on node {until_migrated_to}");
    1474            0 :                 break;
    1475            0 :             }
    1476            0 :         }
    1477              : 
    1478            0 :         tokio::time::sleep(WATCH_INTERVAL).await;
    1479              :     }
    1480            0 :     Ok(())
    1481            0 : }
         |