Line data Source code
1 : use std::{collections::HashMap, str::FromStr};
2 :
3 : use clap::{Parser, Subcommand};
4 : use hyper::Method;
5 : use pageserver_api::{
6 : controller_api::{
7 : NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
8 : TenantDescribeResponse, TenantPolicyRequest,
9 : },
10 : models::{
11 : ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
12 : TenantShardSplitRequest, TenantShardSplitResponse,
13 : },
14 : shard::{ShardStripeSize, TenantShardId},
15 : };
16 : use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
17 : use reqwest::Url;
18 : use serde::{de::DeserializeOwned, Serialize};
19 : use utils::id::{NodeId, TenantId};
20 :
21 : use pageserver_api::controller_api::{
22 : NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
23 : TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
24 : };
25 :
26 0 : #[derive(Subcommand, Debug)]
27 : enum Command {
28 : /// Register a pageserver with the storage controller. This shouldn't usually be necessary,
29 : /// since pageservers auto-register when they start up
30 : NodeRegister {
31 : #[arg(long)]
32 0 : node_id: NodeId,
33 :
34 : #[arg(long)]
35 0 : listen_pg_addr: String,
36 : #[arg(long)]
37 0 : listen_pg_port: u16,
38 :
39 : #[arg(long)]
40 0 : listen_http_addr: String,
41 : #[arg(long)]
42 0 : listen_http_port: u16,
43 : },
44 :
45 : /// Modify a node's configuration in the storage controller
46 : NodeConfigure {
47 : #[arg(long)]
48 0 : node_id: NodeId,
49 :
50 : /// Availability is usually auto-detected based on heartbeats. Set 'offline' here to
51 : /// manually mark a node offline
52 : #[arg(long)]
53 : availability: Option<NodeAvailabilityArg>,
54 : /// Scheduling policy controls whether tenant shards may be scheduled onto this node.
55 : #[arg(long)]
56 : scheduling: Option<NodeSchedulingPolicy>,
57 : },
58 : /// Modify a tenant's policies in the storage controller
59 : TenantPolicy {
60 : #[arg(long)]
61 0 : tenant_id: TenantId,
62 : /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
63 : /// or is in the normal attached state with N secondary locations (`attached:N`)
64 : #[arg(long)]
65 : placement: Option<PlacementPolicyArg>,
66 : /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal,
67 : /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
68 : /// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant
69 : /// unavailable, and are only for use in emergencies.
70 : #[arg(long)]
71 : scheduling: Option<ShardSchedulingPolicyArg>,
72 : },
73 : /// List nodes known to the storage controller
74 : Nodes {},
75 : /// List tenants known to the storage controller
76 : Tenants {},
77 : /// Create a new tenant in the storage controller, and by extension on pageservers.
78 : TenantCreate {
79 : #[arg(long)]
80 0 : tenant_id: TenantId,
81 : },
82 : /// Delete a tenant in the storage controller, and by extension on pageservers.
83 : TenantDelete {
84 : #[arg(long)]
85 0 : tenant_id: TenantId,
86 : },
87 : /// Split an existing tenant into a higher number of shards than its current shard count.
88 : TenantShardSplit {
89 : #[arg(long)]
90 0 : tenant_id: TenantId,
91 : #[arg(long)]
92 0 : shard_count: u8,
93 : /// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes.
94 : #[arg(long)]
95 : stripe_size: Option<u32>,
96 : },
97 : /// Migrate the attached location for a tenant shard to a specific pageserver.
98 : TenantShardMigrate {
99 : #[arg(long)]
100 0 : tenant_shard_id: TenantShardId,
101 : #[arg(long)]
102 0 : node: NodeId,
103 : },
104 : /// Modify the pageserver tenant configuration of a tenant: this is the configuration structure
105 : /// that is passed through to pageservers, and does not affect storage controller behavior.
106 : TenantConfig {
107 : #[arg(long)]
108 0 : tenant_id: TenantId,
109 : #[arg(long)]
110 0 : config: String,
111 : },
112 : /// Attempt to balance the locations for a tenant across pageservers. This is a client-side
113 : /// alternative to the storage controller's scheduling optimization behavior.
114 : TenantScatter {
115 : #[arg(long)]
116 0 : tenant_id: TenantId,
117 : },
118 : /// Print details about a particular tenant, including all its shards' states.
119 : TenantDescribe {
120 : #[arg(long)]
121 0 : tenant_id: TenantId,
122 : },
123 : }
124 :
125 0 : #[derive(Parser)]
126 : #[command(
127 : author,
128 : version,
129 : about,
130 : long_about = "CLI for Storage Controller Support/Debug"
131 : )]
132 : #[command(arg_required_else_help(true))]
133 : struct Cli {
134 : #[arg(long)]
135 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
136 0 : api: Url,
137 :
138 : #[arg(long)]
139 : /// JWT token for authenticating with storage controller. Depending on the API used, this
140 : /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
141 : /// a token with both scopes to use with this tool.
142 : jwt: Option<String>,
143 :
144 : #[command(subcommand)]
145 : command: Command,
146 : }
147 :
148 : #[derive(Debug, Clone)]
149 : struct PlacementPolicyArg(PlacementPolicy);
150 :
151 : impl FromStr for PlacementPolicyArg {
152 : type Err = anyhow::Error;
153 :
154 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
155 0 : match s {
156 0 : "detached" => Ok(Self(PlacementPolicy::Detached)),
157 0 : "secondary" => Ok(Self(PlacementPolicy::Secondary)),
158 0 : _ if s.starts_with("attached:") => {
159 0 : let mut splitter = s.split(':');
160 0 : let _prefix = splitter.next().unwrap();
161 0 : match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
162 0 : Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
163 0 : None => Err(anyhow::anyhow!(
164 0 : "Invalid format '{s}', a valid example is 'attached:1'"
165 0 : )),
166 : }
167 : }
168 0 : _ => Err(anyhow::anyhow!(
169 0 : "Unknown placement policy '{s}', try detached,secondary,attached:<n>"
170 0 : )),
171 : }
172 0 : }
173 : }
174 :
175 : #[derive(Debug, Clone)]
176 : struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
177 :
178 : impl FromStr for ShardSchedulingPolicyArg {
179 : type Err = anyhow::Error;
180 :
181 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
182 0 : match s {
183 0 : "active" => Ok(Self(ShardSchedulingPolicy::Active)),
184 0 : "essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
185 0 : "pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
186 0 : "stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
187 0 : _ => Err(anyhow::anyhow!(
188 0 : "Unknown scheduling policy '{s}', try active,essential,pause,stop"
189 0 : )),
190 : }
191 0 : }
192 : }
193 :
194 : #[derive(Debug, Clone)]
195 : struct NodeAvailabilityArg(NodeAvailabilityWrapper);
196 :
197 : impl FromStr for NodeAvailabilityArg {
198 : type Err = anyhow::Error;
199 :
200 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
201 0 : match s {
202 0 : "active" => Ok(Self(NodeAvailabilityWrapper::Active)),
203 0 : "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
204 0 : _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
205 : }
206 0 : }
207 : }
208 :
209 : struct Client {
210 : base_url: Url,
211 : jwt_token: Option<String>,
212 : client: reqwest::Client,
213 : }
214 :
215 : impl Client {
216 0 : fn new(base_url: Url, jwt_token: Option<String>) -> Self {
217 0 : Self {
218 0 : base_url,
219 0 : jwt_token,
220 0 : client: reqwest::ClientBuilder::new()
221 0 : .build()
222 0 : .expect("Failed to construct http client"),
223 0 : }
224 0 : }
225 :
226 : /// Simple HTTP request wrapper for calling into storage controller
227 0 : async fn dispatch<RQ, RS>(
228 0 : &self,
229 0 : method: hyper::Method,
230 0 : path: String,
231 0 : body: Option<RQ>,
232 0 : ) -> mgmt_api::Result<RS>
233 0 : where
234 0 : RQ: Serialize + Sized,
235 0 : RS: DeserializeOwned + Sized,
236 0 : {
237 0 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
238 0 : // for general purpose API access.
239 0 : let url = Url::from_str(&format!(
240 0 : "http://{}:{}/{path}",
241 0 : self.base_url.host_str().unwrap(),
242 0 : self.base_url.port().unwrap()
243 0 : ))
244 0 : .unwrap();
245 0 :
246 0 : let mut builder = self.client.request(method, url);
247 0 : if let Some(body) = body {
248 0 : builder = builder.json(&body)
249 0 : }
250 0 : if let Some(jwt_token) = &self.jwt_token {
251 0 : builder = builder.header(
252 0 : reqwest::header::AUTHORIZATION,
253 0 : format!("Bearer {jwt_token}"),
254 0 : );
255 0 : }
256 :
257 0 : let response = builder.send().await.map_err(mgmt_api::Error::ReceiveBody)?;
258 0 : let response = response.error_from_body().await?;
259 :
260 0 : response
261 0 : .json()
262 0 : .await
263 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)
264 0 : }
265 : }
266 :
267 : #[tokio::main]
268 0 : async fn main() -> anyhow::Result<()> {
269 0 : let cli = Cli::parse();
270 0 :
271 0 : let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
272 0 :
273 0 : let mut trimmed = cli.api.to_string();
274 0 : trimmed.pop();
275 0 : let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
276 0 :
277 0 : match cli.command {
278 0 : Command::NodeRegister {
279 0 : node_id,
280 0 : listen_pg_addr,
281 0 : listen_pg_port,
282 0 : listen_http_addr,
283 0 : listen_http_port,
284 0 : } => {
285 0 : storcon_client
286 0 : .dispatch::<_, ()>(
287 0 : Method::POST,
288 0 : "control/v1/node".to_string(),
289 0 : Some(NodeRegisterRequest {
290 0 : node_id,
291 0 : listen_pg_addr,
292 0 : listen_pg_port,
293 0 : listen_http_addr,
294 0 : listen_http_port,
295 0 : }),
296 0 : )
297 0 : .await?;
298 0 : }
299 0 : Command::TenantCreate { tenant_id } => {
300 0 : vps_client
301 0 : .tenant_create(&TenantCreateRequest {
302 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
303 0 : generation: None,
304 0 : shard_parameters: ShardParameters::default(),
305 0 : placement_policy: Some(PlacementPolicy::Attached(1)),
306 0 : config: TenantConfig::default(),
307 0 : })
308 0 : .await?;
309 0 : }
310 0 : Command::TenantDelete { tenant_id } => {
311 0 : let status = vps_client
312 0 : .tenant_delete(TenantShardId::unsharded(tenant_id))
313 0 : .await?;
314 0 : tracing::info!("Delete status: {}", status);
315 0 : }
316 0 : Command::Nodes {} => {
317 0 : let resp = storcon_client
318 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
319 0 : Method::GET,
320 0 : "control/v1/node".to_string(),
321 0 : None,
322 0 : )
323 0 : .await?;
324 0 : let mut table = comfy_table::Table::new();
325 0 : table.set_header(["Id", "Hostname", "Scheduling", "Availability"]);
326 0 : for node in resp {
327 0 : table.add_row([
328 0 : format!("{}", node.id),
329 0 : node.listen_http_addr,
330 0 : format!("{:?}", node.scheduling),
331 0 : format!("{:?}", node.availability),
332 0 : ]);
333 0 : }
334 0 : println!("{table}");
335 0 : }
336 0 : Command::NodeConfigure {
337 0 : node_id,
338 0 : availability,
339 0 : scheduling,
340 0 : } => {
341 0 : let req = NodeConfigureRequest {
342 0 : node_id,
343 0 : availability: availability.map(|a| a.0),
344 0 : scheduling,
345 0 : };
346 0 : storcon_client
347 0 : .dispatch::<_, ()>(
348 0 : Method::PUT,
349 0 : format!("control/v1/node/{node_id}/config"),
350 0 : Some(req),
351 0 : )
352 0 : .await?;
353 0 : }
354 0 : Command::Tenants {} => {
355 0 : let resp = storcon_client
356 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(
357 0 : Method::GET,
358 0 : "control/v1/tenant".to_string(),
359 0 : None,
360 0 : )
361 0 : .await?;
362 0 : let mut table = comfy_table::Table::new();
363 0 : table.set_header([
364 0 : "TenantId",
365 0 : "ShardCount",
366 0 : "StripeSize",
367 0 : "Placement",
368 0 : "Scheduling",
369 0 : ]);
370 0 : for tenant in resp {
371 0 : let shard_zero = tenant.shards.into_iter().next().unwrap();
372 0 : table.add_row([
373 0 : format!("{}", tenant.tenant_id),
374 0 : format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
375 0 : format!("{:?}", tenant.stripe_size),
376 0 : format!("{:?}", tenant.policy),
377 0 : format!("{:?}", shard_zero.scheduling_policy),
378 0 : ]);
379 0 : }
380 0 :
381 0 : println!("{table}");
382 0 : }
383 0 : Command::TenantPolicy {
384 0 : tenant_id,
385 0 : placement,
386 0 : scheduling,
387 0 : } => {
388 0 : let req = TenantPolicyRequest {
389 0 : scheduling: scheduling.map(|s| s.0),
390 0 : placement: placement.map(|p| p.0),
391 0 : };
392 0 : storcon_client
393 0 : .dispatch::<_, ()>(
394 0 : Method::PUT,
395 0 : format!("control/v1/tenant/{tenant_id}/policy"),
396 0 : Some(req),
397 0 : )
398 0 : .await?;
399 0 : }
400 0 : Command::TenantShardSplit {
401 0 : tenant_id,
402 0 : shard_count,
403 0 : stripe_size,
404 0 : } => {
405 0 : let req = TenantShardSplitRequest {
406 0 : new_shard_count: shard_count,
407 0 : new_stripe_size: stripe_size.map(ShardStripeSize),
408 0 : };
409 0 :
410 0 : let response = storcon_client
411 0 : .dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
412 0 : Method::PUT,
413 0 : format!("control/v1/tenant/{tenant_id}/shard_split"),
414 0 : Some(req),
415 0 : )
416 0 : .await?;
417 0 : println!(
418 0 : "Split tenant {} into {} shards: {}",
419 0 : tenant_id,
420 0 : shard_count,
421 0 : response
422 0 : .new_shards
423 0 : .iter()
424 0 : .map(|s| format!("{:?}", s))
425 0 : .collect::<Vec<_>>()
426 0 : .join(",")
427 0 : );
428 0 : }
429 0 : Command::TenantShardMigrate {
430 0 : tenant_shard_id,
431 0 : node,
432 0 : } => {
433 0 : let req = TenantShardMigrateRequest {
434 0 : tenant_shard_id,
435 0 : node_id: node,
436 0 : };
437 0 :
438 0 : storcon_client
439 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
440 0 : Method::PUT,
441 0 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
442 0 : Some(req),
443 0 : )
444 0 : .await?;
445 0 : }
446 0 : Command::TenantConfig { tenant_id, config } => {
447 0 : let tenant_conf = serde_json::from_str(&config)?;
448 0 :
449 0 : vps_client
450 0 : .tenant_config(&TenantConfigRequest {
451 0 : tenant_id,
452 0 : config: tenant_conf,
453 0 : })
454 0 : .await?;
455 0 : }
456 0 : Command::TenantScatter { tenant_id } => {
457 0 : // Find the shards
458 0 : let locate_response = storcon_client
459 0 : .dispatch::<(), TenantLocateResponse>(
460 0 : Method::GET,
461 0 : format!("control/v1/tenant/{tenant_id}/locate"),
462 0 : None,
463 0 : )
464 0 : .await?;
465 0 : let shards = locate_response.shards;
466 0 :
467 0 : let mut node_to_shards: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
468 0 : let shard_count = shards.len();
469 0 : for s in shards {
470 0 : let entry = node_to_shards.entry(s.node_id).or_default();
471 0 : entry.push(s.shard_id);
472 0 : }
473 0 :
474 0 : // Load list of available nodes
475 0 : let nodes_resp = storcon_client
476 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
477 0 : Method::GET,
478 0 : "control/v1/node".to_string(),
479 0 : None,
480 0 : )
481 0 : .await?;
482 0 :
483 0 : for node in nodes_resp {
484 0 : if matches!(node.availability, NodeAvailabilityWrapper::Active) {
485 0 : node_to_shards.entry(node.id).or_default();
486 0 : }
487 0 : }
488 0 :
489 0 : let max_shard_per_node = shard_count / node_to_shards.len();
490 0 :
491 0 : loop {
492 0 : let mut migrate_shard = None;
493 0 : for shards in node_to_shards.values_mut() {
494 0 : if shards.len() > max_shard_per_node {
495 0 : // Pick the emptiest
496 0 : migrate_shard = Some(shards.pop().unwrap());
497 0 : }
498 0 : }
499 0 : let Some(migrate_shard) = migrate_shard else {
500 0 : break;
501 0 : };
502 0 :
503 0 : // Pick the emptiest node to migrate to
504 0 : let mut destinations = node_to_shards
505 0 : .iter()
506 0 : .map(|(k, v)| (k, v.len()))
507 0 : .collect::<Vec<_>>();
508 0 : destinations.sort_by_key(|i| i.1);
509 0 : let (destination_node, destination_count) = *destinations.first().unwrap();
510 0 : if destination_count + 1 > max_shard_per_node {
511 0 : // Even the emptiest destination doesn't have space: we're done
512 0 : break;
513 0 : }
514 0 : let destination_node = *destination_node;
515 0 :
516 0 : node_to_shards
517 0 : .get_mut(&destination_node)
518 0 : .unwrap()
519 0 : .push(migrate_shard);
520 0 :
521 0 : println!("Migrate {} -> {} ...", migrate_shard, destination_node);
522 0 :
523 0 : storcon_client
524 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
525 0 : Method::PUT,
526 0 : format!("control/v1/tenant/{migrate_shard}/migrate"),
527 0 : Some(TenantShardMigrateRequest {
528 0 : tenant_shard_id: migrate_shard,
529 0 : node_id: destination_node,
530 0 : }),
531 0 : )
532 0 : .await?;
533 0 : println!("Migrate {} -> {} OK", migrate_shard, destination_node);
534 0 : }
535 0 :
536 0 : // Spread the shards across the nodes
537 0 : }
538 0 : Command::TenantDescribe { tenant_id } => {
539 0 : let describe_response = storcon_client
540 0 : .dispatch::<(), TenantDescribeResponse>(
541 0 : Method::GET,
542 0 : format!("control/v1/tenant/{tenant_id}"),
543 0 : None,
544 0 : )
545 0 : .await?;
546 0 : let shards = describe_response.shards;
547 0 : let mut table = comfy_table::Table::new();
548 0 : table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]);
549 0 : for shard in shards {
550 0 : let secondary = shard
551 0 : .node_secondary
552 0 : .iter()
553 0 : .map(|n| format!("{}", n))
554 0 : .collect::<Vec<_>>()
555 0 : .join(",");
556 0 :
557 0 : let mut status_parts = Vec::new();
558 0 : if shard.is_reconciling {
559 0 : status_parts.push("reconciling");
560 0 : }
561 0 :
562 0 : if shard.is_pending_compute_notification {
563 0 : status_parts.push("pending_compute");
564 0 : }
565 0 :
566 0 : if shard.is_splitting {
567 0 : status_parts.push("splitting");
568 0 : }
569 0 : let status = status_parts.join(",");
570 0 :
571 0 : table.add_row([
572 0 : format!("{}", shard.tenant_shard_id),
573 0 : shard
574 0 : .node_attached
575 0 : .map(|n| format!("{}", n))
576 0 : .unwrap_or(String::new()),
577 0 : secondary,
578 0 : shard.last_error,
579 0 : status,
580 0 : ]);
581 0 : }
582 0 : println!("{table}");
583 0 : }
584 0 : }
585 0 :
586 0 : Ok(())
587 0 : }
|