Line data Source code
1 : use futures::StreamExt;
2 : use std::{
3 : collections::{HashMap, HashSet},
4 : str::FromStr,
5 : time::Duration,
6 : };
7 :
8 : use clap::{Parser, Subcommand};
9 : use pageserver_api::{
10 : controller_api::{
11 : AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
12 : SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
13 : ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy,
14 : TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
15 : },
16 : models::{
17 : EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
18 : ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest,
19 : TenantShardSplitRequest, TenantShardSplitResponse,
20 : },
21 : shard::{ShardStripeSize, TenantShardId},
22 : };
23 : use pageserver_client::mgmt_api::{self};
24 : use reqwest::{Method, StatusCode, Url};
25 : use utils::id::{NodeId, TenantId};
26 :
27 : use pageserver_api::controller_api::{
28 : NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
29 : TenantShardMigrateRequest, TenantShardMigrateResponse,
30 : };
31 : use storage_controller_client::control_api::Client;
32 :
33 : #[derive(Subcommand, Debug)]
34 : enum Command {
35 : /// Register a pageserver with the storage controller. This shouldn't usually be necessary,
36 : /// since pageservers auto-register when they start up
37 : NodeRegister {
38 : #[arg(long)]
39 0 : node_id: NodeId,
40 :
41 : #[arg(long)]
42 0 : listen_pg_addr: String,
43 : #[arg(long)]
44 0 : listen_pg_port: u16,
45 :
46 : #[arg(long)]
47 0 : listen_http_addr: String,
48 : #[arg(long)]
49 0 : listen_http_port: u16,
50 : #[arg(long)]
51 0 : availability_zone_id: String,
52 : },
53 :
54 : /// Modify a node's configuration in the storage controller
55 : NodeConfigure {
56 : #[arg(long)]
57 0 : node_id: NodeId,
58 :
59 : /// Availability is usually auto-detected based on heartbeats. Set 'offline' here to
60 : /// manually mark a node offline
61 : #[arg(long)]
62 : availability: Option<NodeAvailabilityArg>,
63 : /// Scheduling policy controls whether tenant shards may be scheduled onto this node.
64 : #[arg(long)]
65 : scheduling: Option<NodeSchedulingPolicy>,
66 : },
67 : NodeDelete {
68 : #[arg(long)]
69 0 : node_id: NodeId,
70 : },
71 : /// Modify a tenant's policies in the storage controller
72 : TenantPolicy {
73 : #[arg(long)]
74 0 : tenant_id: TenantId,
75 : /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
76 : /// or is in the normal attached state with N secondary locations (`attached:N`)
77 : #[arg(long)]
78 : placement: Option<PlacementPolicyArg>,
79 : /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal,
80 : /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
81 : /// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant
82 : /// unavailable, and are only for use in emergencies.
83 : #[arg(long)]
84 : scheduling: Option<ShardSchedulingPolicyArg>,
85 : },
86 : /// List nodes known to the storage controller
87 : Nodes {},
88 : /// List tenants known to the storage controller
89 : Tenants {
90 : /// If this field is set, it will list the tenants on a specific node
91 : node_id: Option<NodeId>,
92 : },
93 : /// Create a new tenant in the storage controller, and by extension on pageservers.
94 : TenantCreate {
95 : #[arg(long)]
96 0 : tenant_id: TenantId,
97 : },
98 : /// Delete a tenant in the storage controller, and by extension on pageservers.
99 : TenantDelete {
100 : #[arg(long)]
101 0 : tenant_id: TenantId,
102 : },
103 : /// Split an existing tenant into a higher number of shards than its current shard count.
104 : TenantShardSplit {
105 : #[arg(long)]
106 0 : tenant_id: TenantId,
107 : #[arg(long)]
108 0 : shard_count: u8,
109 : /// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes.
110 : #[arg(long)]
111 : stripe_size: Option<u32>,
112 : },
113 : /// Migrate the attached location for a tenant shard to a specific pageserver.
114 : TenantShardMigrate {
115 : #[arg(long)]
116 0 : tenant_shard_id: TenantShardId,
117 : #[arg(long)]
118 0 : node: NodeId,
119 : },
120 : /// Migrate the secondary location for a tenant shard to a specific pageserver.
121 : TenantShardMigrateSecondary {
122 : #[arg(long)]
123 0 : tenant_shard_id: TenantShardId,
124 : #[arg(long)]
125 0 : node: NodeId,
126 : },
127 : /// Cancel any ongoing reconciliation for this shard
128 : TenantShardCancelReconcile {
129 : #[arg(long)]
130 0 : tenant_shard_id: TenantShardId,
131 : },
132 : /// Set the pageserver tenant configuration of a tenant: this is the configuration structure
133 : /// that is passed through to pageservers, and does not affect storage controller behavior.
134 : /// Any previous tenant configs are overwritten.
135 : SetTenantConfig {
136 : #[arg(long)]
137 0 : tenant_id: TenantId,
138 : #[arg(long)]
139 0 : config: String,
140 : },
141 : /// Patch the pageserver tenant configuration of a tenant. Any fields with null values in the
142 : /// provided JSON are unset from the tenant config and all fields with non-null values are set.
143 : /// Unspecified fields are not changed.
144 : PatchTenantConfig {
145 : #[arg(long)]
146 0 : tenant_id: TenantId,
147 : #[arg(long)]
148 0 : config: String,
149 : },
150 : /// Print details about a particular tenant, including all its shards' states.
151 : TenantDescribe {
152 : #[arg(long)]
153 0 : tenant_id: TenantId,
154 : },
155 : /// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
156 : /// mode so that it can warm up content on a pageserver.
157 : TenantWarmup {
158 : #[arg(long)]
159 0 : tenant_id: TenantId,
160 : },
161 : TenantSetPreferredAz {
162 : #[arg(long)]
163 0 : tenant_id: TenantId,
164 : #[arg(long)]
165 : preferred_az: Option<String>,
166 : },
167 : /// Uncleanly drop a tenant from the storage controller: this doesn't delete anything from pageservers. Appropriate
168 : /// if you e.g. used `tenant-warmup` by mistake on a tenant ID that doesn't really exist, or is in some other region.
169 : TenantDrop {
170 : #[arg(long)]
171 0 : tenant_id: TenantId,
172 : #[arg(long)]
173 0 : unclean: bool,
174 : },
175 : NodeDrop {
176 : #[arg(long)]
177 0 : node_id: NodeId,
178 : #[arg(long)]
179 0 : unclean: bool,
180 : },
181 : TenantSetTimeBasedEviction {
182 : #[arg(long)]
183 0 : tenant_id: TenantId,
184 : #[arg(long)]
185 0 : period: humantime::Duration,
186 : #[arg(long)]
187 0 : threshold: humantime::Duration,
188 : },
189 : // Migrate away from a set of specified pageservers by moving the primary attachments to pageservers
190 : // outside of the specified set.
191 : BulkMigrate {
192 : // Set of pageserver node ids to drain.
193 : #[arg(long)]
194 0 : nodes: Vec<NodeId>,
195 : // Optional: migration concurrency (default is 8)
196 : #[arg(long)]
197 : concurrency: Option<usize>,
198 : // Optional: maximum number of shards to migrate
199 : #[arg(long)]
200 : max_shards: Option<usize>,
201 : // Optional: when set to true, nothing is migrated, but the plan is printed to stdout
202 : #[arg(long)]
203 : dry_run: Option<bool>,
204 : },
205 : /// Start draining the specified pageserver.
206 : /// The drain is complete when the schedulling policy returns to active.
207 : StartDrain {
208 : #[arg(long)]
209 0 : node_id: NodeId,
210 : },
211 : /// Cancel draining the specified pageserver and wait for `timeout`
212 : /// for the operation to be canceled. May be retried.
213 : CancelDrain {
214 : #[arg(long)]
215 0 : node_id: NodeId,
216 : #[arg(long)]
217 0 : timeout: humantime::Duration,
218 : },
219 : /// Start filling the specified pageserver.
220 : /// The drain is complete when the schedulling policy returns to active.
221 : StartFill {
222 : #[arg(long)]
223 0 : node_id: NodeId,
224 : },
225 : /// Cancel filling the specified pageserver and wait for `timeout`
226 : /// for the operation to be canceled. May be retried.
227 : CancelFill {
228 : #[arg(long)]
229 0 : node_id: NodeId,
230 : #[arg(long)]
231 0 : timeout: humantime::Duration,
232 : },
233 : /// List safekeepers known to the storage controller
234 : Safekeepers {},
235 : /// Set the scheduling policy of the specified safekeeper
236 : SafekeeperScheduling {
237 : #[arg(long)]
238 0 : node_id: NodeId,
239 : #[arg(long)]
240 0 : scheduling_policy: SkSchedulingPolicyArg,
241 : },
242 : }
243 :
244 : #[derive(Parser)]
245 : #[command(
246 : author,
247 : version,
248 : about,
249 : long_about = "CLI for Storage Controller Support/Debug"
250 : )]
251 : #[command(arg_required_else_help(true))]
252 : struct Cli {
253 : #[arg(long)]
254 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
255 0 : api: Url,
256 :
257 : #[arg(long)]
258 : /// JWT token for authenticating with storage controller. Depending on the API used, this
259 : /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
260 : /// a token with both scopes to use with this tool.
261 : jwt: Option<String>,
262 :
263 : #[command(subcommand)]
264 : command: Command,
265 : }
266 :
267 : #[derive(Debug, Clone)]
268 : struct PlacementPolicyArg(PlacementPolicy);
269 :
270 : impl FromStr for PlacementPolicyArg {
271 : type Err = anyhow::Error;
272 :
273 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
274 0 : match s {
275 0 : "detached" => Ok(Self(PlacementPolicy::Detached)),
276 0 : "secondary" => Ok(Self(PlacementPolicy::Secondary)),
277 0 : _ if s.starts_with("attached:") => {
278 0 : let mut splitter = s.split(':');
279 0 : let _prefix = splitter.next().unwrap();
280 0 : match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
281 0 : Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
282 0 : None => Err(anyhow::anyhow!(
283 0 : "Invalid format '{s}', a valid example is 'attached:1'"
284 0 : )),
285 : }
286 : }
287 0 : _ => Err(anyhow::anyhow!(
288 0 : "Unknown placement policy '{s}', try detached,secondary,attached:<n>"
289 0 : )),
290 : }
291 0 : }
292 : }
293 :
294 : #[derive(Debug, Clone)]
295 : struct SkSchedulingPolicyArg(SkSchedulingPolicy);
296 :
297 : impl FromStr for SkSchedulingPolicyArg {
298 : type Err = anyhow::Error;
299 :
300 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
301 0 : SkSchedulingPolicy::from_str(s).map(Self)
302 0 : }
303 : }
304 :
305 : #[derive(Debug, Clone)]
306 : struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
307 :
308 : impl FromStr for ShardSchedulingPolicyArg {
309 : type Err = anyhow::Error;
310 :
311 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
312 0 : match s {
313 0 : "active" => Ok(Self(ShardSchedulingPolicy::Active)),
314 0 : "essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
315 0 : "pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
316 0 : "stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
317 0 : _ => Err(anyhow::anyhow!(
318 0 : "Unknown scheduling policy '{s}', try active,essential,pause,stop"
319 0 : )),
320 : }
321 0 : }
322 : }
323 :
324 : #[derive(Debug, Clone)]
325 : struct NodeAvailabilityArg(NodeAvailabilityWrapper);
326 :
327 : impl FromStr for NodeAvailabilityArg {
328 : type Err = anyhow::Error;
329 :
330 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
331 0 : match s {
332 0 : "active" => Ok(Self(NodeAvailabilityWrapper::Active)),
333 0 : "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
334 0 : _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
335 : }
336 0 : }
337 : }
338 :
339 0 : async fn wait_for_scheduling_policy<F>(
340 0 : client: Client,
341 0 : node_id: NodeId,
342 0 : timeout: Duration,
343 0 : f: F,
344 0 : ) -> anyhow::Result<NodeSchedulingPolicy>
345 0 : where
346 0 : F: Fn(NodeSchedulingPolicy) -> bool,
347 0 : {
348 0 : let waiter = tokio::time::timeout(timeout, async move {
349 : loop {
350 0 : let node = client
351 0 : .dispatch::<(), NodeDescribeResponse>(
352 0 : Method::GET,
353 0 : format!("control/v1/node/{node_id}"),
354 0 : None,
355 0 : )
356 0 : .await?;
357 :
358 0 : if f(node.scheduling) {
359 0 : return Ok::<NodeSchedulingPolicy, mgmt_api::Error>(node.scheduling);
360 0 : }
361 : }
362 0 : });
363 0 :
364 0 : Ok(waiter.await??)
365 0 : }
366 :
367 : #[tokio::main]
368 0 : async fn main() -> anyhow::Result<()> {
369 0 : let cli = Cli::parse();
370 0 :
371 0 : let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
372 0 :
373 0 : let mut trimmed = cli.api.to_string();
374 0 : trimmed.pop();
375 0 : let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
376 0 :
377 0 : match cli.command {
378 0 : Command::NodeRegister {
379 0 : node_id,
380 0 : listen_pg_addr,
381 0 : listen_pg_port,
382 0 : listen_http_addr,
383 0 : listen_http_port,
384 0 : availability_zone_id,
385 0 : } => {
386 0 : storcon_client
387 0 : .dispatch::<_, ()>(
388 0 : Method::POST,
389 0 : "control/v1/node".to_string(),
390 0 : Some(NodeRegisterRequest {
391 0 : node_id,
392 0 : listen_pg_addr,
393 0 : listen_pg_port,
394 0 : listen_http_addr,
395 0 : listen_http_port,
396 0 : availability_zone_id: AvailabilityZone(availability_zone_id),
397 0 : }),
398 0 : )
399 0 : .await?;
400 0 : }
401 0 : Command::TenantCreate { tenant_id } => {
402 0 : storcon_client
403 0 : .dispatch::<_, ()>(
404 0 : Method::POST,
405 0 : "v1/tenant".to_string(),
406 0 : Some(TenantCreateRequest {
407 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
408 0 : generation: None,
409 0 : shard_parameters: ShardParameters::default(),
410 0 : placement_policy: Some(PlacementPolicy::Attached(1)),
411 0 : config: TenantConfig::default(),
412 0 : }),
413 0 : )
414 0 : .await?;
415 0 : }
416 0 : Command::TenantDelete { tenant_id } => {
417 0 : let status = vps_client
418 0 : .tenant_delete(TenantShardId::unsharded(tenant_id))
419 0 : .await?;
420 0 : tracing::info!("Delete status: {}", status);
421 0 : }
422 0 : Command::Nodes {} => {
423 0 : let mut resp = storcon_client
424 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
425 0 : Method::GET,
426 0 : "control/v1/node".to_string(),
427 0 : None,
428 0 : )
429 0 : .await?;
430 0 :
431 0 : resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
432 0 :
433 0 : let mut table = comfy_table::Table::new();
434 0 : table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
435 0 : for node in resp {
436 0 : table.add_row([
437 0 : format!("{}", node.id),
438 0 : node.listen_http_addr,
439 0 : node.availability_zone_id,
440 0 : format!("{:?}", node.scheduling),
441 0 : format!("{:?}", node.availability),
442 0 : ]);
443 0 : }
444 0 : println!("{table}");
445 0 : }
446 0 : Command::NodeConfigure {
447 0 : node_id,
448 0 : availability,
449 0 : scheduling,
450 0 : } => {
451 0 : let req = NodeConfigureRequest {
452 0 : node_id,
453 0 : availability: availability.map(|a| a.0),
454 0 : scheduling,
455 0 : };
456 0 : storcon_client
457 0 : .dispatch::<_, ()>(
458 0 : Method::PUT,
459 0 : format!("control/v1/node/{node_id}/config"),
460 0 : Some(req),
461 0 : )
462 0 : .await?;
463 0 : }
464 0 : Command::Tenants {
465 0 : node_id: Some(node_id),
466 0 : } => {
467 0 : let describe_response = storcon_client
468 0 : .dispatch::<(), NodeShardResponse>(
469 0 : Method::GET,
470 0 : format!("control/v1/node/{node_id}/shards"),
471 0 : None,
472 0 : )
473 0 : .await?;
474 0 : let shards = describe_response.shards;
475 0 : let mut table = comfy_table::Table::new();
476 0 : table.set_header([
477 0 : "Shard",
478 0 : "Intended Primary/Secondary",
479 0 : "Observed Primary/Secondary",
480 0 : ]);
481 0 : for shard in shards {
482 0 : table.add_row([
483 0 : format!("{}", shard.tenant_shard_id),
484 0 : match shard.is_intended_secondary {
485 0 : None => "".to_string(),
486 0 : Some(true) => "Secondary".to_string(),
487 0 : Some(false) => "Primary".to_string(),
488 0 : },
489 0 : match shard.is_observed_secondary {
490 0 : None => "".to_string(),
491 0 : Some(true) => "Secondary".to_string(),
492 0 : Some(false) => "Primary".to_string(),
493 0 : },
494 0 : ]);
495 0 : }
496 0 : println!("{table}");
497 0 : }
498 0 : Command::Tenants { node_id: None } => {
499 0 : // Set up output formatting
500 0 : let mut table = comfy_table::Table::new();
501 0 : table.set_header([
502 0 : "TenantId",
503 0 : "Preferred AZ",
504 0 : "ShardCount",
505 0 : "StripeSize",
506 0 : "Placement",
507 0 : "Scheduling",
508 0 : ]);
509 0 :
510 0 : // Pagination loop over listing API
511 0 : let mut start_after = None;
512 0 : const LIMIT: usize = 1000;
513 0 : loop {
514 0 : let path = match start_after {
515 0 : None => format!("control/v1/tenant?limit={LIMIT}"),
516 0 : Some(start_after) => {
517 0 : format!("control/v1/tenant?limit={LIMIT}&start_after={start_after}")
518 0 : }
519 0 : };
520 0 :
521 0 : let resp = storcon_client
522 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(Method::GET, path, None)
523 0 : .await?;
524 0 :
525 0 : if resp.is_empty() {
526 0 : // End of data reached
527 0 : break;
528 0 : }
529 0 :
530 0 : // Give some visual feedback while we're building up the table (comfy_table doesn't have
531 0 : // streaming output)
532 0 : if resp.len() >= LIMIT {
533 0 : eprint!(".");
534 0 : }
535 0 :
536 0 : start_after = Some(resp.last().unwrap().tenant_id);
537 0 :
538 0 : for tenant in resp {
539 0 : let shard_zero = tenant.shards.into_iter().next().unwrap();
540 0 : table.add_row([
541 0 : format!("{}", tenant.tenant_id),
542 0 : shard_zero
543 0 : .preferred_az_id
544 0 : .as_ref()
545 0 : .cloned()
546 0 : .unwrap_or("".to_string()),
547 0 : format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
548 0 : format!("{:?}", tenant.stripe_size),
549 0 : format!("{:?}", tenant.policy),
550 0 : format!("{:?}", shard_zero.scheduling_policy),
551 0 : ]);
552 0 : }
553 0 : }
554 0 :
555 0 : // Terminate progress dots
556 0 : if table.row_count() > LIMIT {
557 0 : eprint!("");
558 0 : }
559 0 :
560 0 : println!("{table}");
561 0 : }
562 0 : Command::TenantPolicy {
563 0 : tenant_id,
564 0 : placement,
565 0 : scheduling,
566 0 : } => {
567 0 : let req = TenantPolicyRequest {
568 0 : scheduling: scheduling.map(|s| s.0),
569 0 : placement: placement.map(|p| p.0),
570 0 : };
571 0 : storcon_client
572 0 : .dispatch::<_, ()>(
573 0 : Method::PUT,
574 0 : format!("control/v1/tenant/{tenant_id}/policy"),
575 0 : Some(req),
576 0 : )
577 0 : .await?;
578 0 : }
579 0 : Command::TenantShardSplit {
580 0 : tenant_id,
581 0 : shard_count,
582 0 : stripe_size,
583 0 : } => {
584 0 : let req = TenantShardSplitRequest {
585 0 : new_shard_count: shard_count,
586 0 : new_stripe_size: stripe_size.map(ShardStripeSize),
587 0 : };
588 0 :
589 0 : let response = storcon_client
590 0 : .dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
591 0 : Method::PUT,
592 0 : format!("control/v1/tenant/{tenant_id}/shard_split"),
593 0 : Some(req),
594 0 : )
595 0 : .await?;
596 0 : println!(
597 0 : "Split tenant {} into {} shards: {}",
598 0 : tenant_id,
599 0 : shard_count,
600 0 : response
601 0 : .new_shards
602 0 : .iter()
603 0 : .map(|s| format!("{:?}", s))
604 0 : .collect::<Vec<_>>()
605 0 : .join(",")
606 0 : );
607 0 : }
608 0 : Command::TenantShardMigrate {
609 0 : tenant_shard_id,
610 0 : node,
611 0 : } => {
612 0 : let req = TenantShardMigrateRequest { node_id: node };
613 0 :
614 0 : storcon_client
615 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
616 0 : Method::PUT,
617 0 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
618 0 : Some(req),
619 0 : )
620 0 : .await?;
621 0 : }
622 0 : Command::TenantShardMigrateSecondary {
623 0 : tenant_shard_id,
624 0 : node,
625 0 : } => {
626 0 : let req = TenantShardMigrateRequest { node_id: node };
627 0 :
628 0 : storcon_client
629 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
630 0 : Method::PUT,
631 0 : format!("control/v1/tenant/{tenant_shard_id}/migrate_secondary"),
632 0 : Some(req),
633 0 : )
634 0 : .await?;
635 0 : }
636 0 : Command::TenantShardCancelReconcile { tenant_shard_id } => {
637 0 : storcon_client
638 0 : .dispatch::<(), ()>(
639 0 : Method::PUT,
640 0 : format!("control/v1/tenant/{tenant_shard_id}/cancel_reconcile"),
641 0 : None,
642 0 : )
643 0 : .await?;
644 0 : }
645 0 : Command::SetTenantConfig { tenant_id, config } => {
646 0 : let tenant_conf = serde_json::from_str(&config)?;
647 0 :
648 0 : vps_client
649 0 : .set_tenant_config(&TenantConfigRequest {
650 0 : tenant_id,
651 0 : config: tenant_conf,
652 0 : })
653 0 : .await?;
654 0 : }
655 0 : Command::PatchTenantConfig { tenant_id, config } => {
656 0 : let tenant_conf = serde_json::from_str(&config)?;
657 0 :
658 0 : vps_client
659 0 : .patch_tenant_config(&TenantConfigPatchRequest {
660 0 : tenant_id,
661 0 : config: tenant_conf,
662 0 : })
663 0 : .await?;
664 0 : }
665 0 : Command::TenantDescribe { tenant_id } => {
666 0 : let TenantDescribeResponse {
667 0 : tenant_id,
668 0 : shards,
669 0 : stripe_size,
670 0 : policy,
671 0 : config,
672 0 : } = storcon_client
673 0 : .dispatch::<(), TenantDescribeResponse>(
674 0 : Method::GET,
675 0 : format!("control/v1/tenant/{tenant_id}"),
676 0 : None,
677 0 : )
678 0 : .await?;
679 0 :
680 0 : let nodes = storcon_client
681 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
682 0 : Method::GET,
683 0 : "control/v1/node".to_string(),
684 0 : None,
685 0 : )
686 0 : .await?;
687 0 : let nodes = nodes
688 0 : .into_iter()
689 0 : .map(|n| (n.id, n))
690 0 : .collect::<HashMap<_, _>>();
691 0 :
692 0 : println!("Tenant {tenant_id}");
693 0 : let mut table = comfy_table::Table::new();
694 0 : table.add_row(["Policy", &format!("{:?}", policy)]);
695 0 : table.add_row(["Stripe size", &format!("{:?}", stripe_size)]);
696 0 : table.add_row(["Config", &serde_json::to_string_pretty(&config).unwrap()]);
697 0 : println!("{table}");
698 0 : println!("Shards:");
699 0 : let mut table = comfy_table::Table::new();
700 0 : table.set_header([
701 0 : "Shard",
702 0 : "Attached",
703 0 : "Attached AZ",
704 0 : "Secondary",
705 0 : "Last error",
706 0 : "status",
707 0 : ]);
708 0 : for shard in shards {
709 0 : let secondary = shard
710 0 : .node_secondary
711 0 : .iter()
712 0 : .map(|n| format!("{}", n))
713 0 : .collect::<Vec<_>>()
714 0 : .join(",");
715 0 :
716 0 : let mut status_parts = Vec::new();
717 0 : if shard.is_reconciling {
718 0 : status_parts.push("reconciling");
719 0 : }
720 0 :
721 0 : if shard.is_pending_compute_notification {
722 0 : status_parts.push("pending_compute");
723 0 : }
724 0 :
725 0 : if shard.is_splitting {
726 0 : status_parts.push("splitting");
727 0 : }
728 0 : let status = status_parts.join(",");
729 0 :
730 0 : let attached_node = shard
731 0 : .node_attached
732 0 : .as_ref()
733 0 : .map(|id| nodes.get(id).expect("Shard references nonexistent node"));
734 0 :
735 0 : table.add_row([
736 0 : format!("{}", shard.tenant_shard_id),
737 0 : attached_node
738 0 : .map(|n| format!("{} ({})", n.listen_http_addr, n.id))
739 0 : .unwrap_or(String::new()),
740 0 : attached_node
741 0 : .map(|n| n.availability_zone_id.clone())
742 0 : .unwrap_or(String::new()),
743 0 : secondary,
744 0 : shard.last_error,
745 0 : status,
746 0 : ]);
747 0 : }
748 0 : println!("{table}");
749 0 : }
750 0 : Command::TenantSetPreferredAz {
751 0 : tenant_id,
752 0 : preferred_az,
753 0 : } => {
754 0 : // First learn about the tenant's shards
755 0 : let describe_response = storcon_client
756 0 : .dispatch::<(), TenantDescribeResponse>(
757 0 : Method::GET,
758 0 : format!("control/v1/tenant/{tenant_id}"),
759 0 : None,
760 0 : )
761 0 : .await?;
762 0 :
763 0 : // Learn about nodes to validate the AZ ID
764 0 : let nodes = storcon_client
765 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
766 0 : Method::GET,
767 0 : "control/v1/node".to_string(),
768 0 : None,
769 0 : )
770 0 : .await?;
771 0 :
772 0 : if let Some(preferred_az) = &preferred_az {
773 0 : let azs = nodes
774 0 : .into_iter()
775 0 : .map(|n| (n.availability_zone_id))
776 0 : .collect::<HashSet<_>>();
777 0 : if !azs.contains(preferred_az) {
778 0 : anyhow::bail!(
779 0 : "AZ {} not found on any node: known AZs are: {:?}",
780 0 : preferred_az,
781 0 : azs
782 0 : );
783 0 : }
784 0 : } else {
785 0 : // Make it obvious to the user that since they've omitted an AZ, we're clearing it
786 0 : eprintln!("Clearing preferred AZ for tenant {}", tenant_id);
787 0 : }
788 0 :
789 0 : // Construct a request that modifies all the tenant's shards
790 0 : let req = ShardsPreferredAzsRequest {
791 0 : preferred_az_ids: describe_response
792 0 : .shards
793 0 : .into_iter()
794 0 : .map(|s| {
795 0 : (
796 0 : s.tenant_shard_id,
797 0 : preferred_az.clone().map(AvailabilityZone),
798 0 : )
799 0 : })
800 0 : .collect(),
801 0 : };
802 0 : storcon_client
803 0 : .dispatch::<ShardsPreferredAzsRequest, ShardsPreferredAzsResponse>(
804 0 : Method::PUT,
805 0 : "control/v1/preferred_azs".to_string(),
806 0 : Some(req),
807 0 : )
808 0 : .await?;
809 0 : }
810 0 : Command::TenantWarmup { tenant_id } => {
811 0 : let describe_response = storcon_client
812 0 : .dispatch::<(), TenantDescribeResponse>(
813 0 : Method::GET,
814 0 : format!("control/v1/tenant/{tenant_id}"),
815 0 : None,
816 0 : )
817 0 : .await;
818 0 : match describe_response {
819 0 : Ok(describe) => {
820 0 : if matches!(describe.policy, PlacementPolicy::Secondary) {
821 0 : // Fine: it's already known to controller in secondary mode: calling
822 0 : // again to put it into secondary mode won't cause problems.
823 0 : } else {
824 0 : anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
825 0 : }
826 0 : }
827 0 : Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
828 0 : // Fine: this tenant isn't know to the storage controller yet.
829 0 : }
830 0 : Err(e) => {
831 0 : // Unexpected API error
832 0 : return Err(e.into());
833 0 : }
834 0 : }
835 0 :
836 0 : vps_client
837 0 : .location_config(
838 0 : TenantShardId::unsharded(tenant_id),
839 0 : pageserver_api::models::LocationConfig {
840 0 : mode: pageserver_api::models::LocationConfigMode::Secondary,
841 0 : generation: None,
842 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
843 0 : shard_number: 0,
844 0 : shard_count: 0,
845 0 : shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
846 0 : tenant_conf: TenantConfig::default(),
847 0 : },
848 0 : None,
849 0 : true,
850 0 : )
851 0 : .await?;
852 0 :
853 0 : let describe_response = storcon_client
854 0 : .dispatch::<(), TenantDescribeResponse>(
855 0 : Method::GET,
856 0 : format!("control/v1/tenant/{tenant_id}"),
857 0 : None,
858 0 : )
859 0 : .await?;
860 0 :
861 0 : let secondary_ps_id = describe_response
862 0 : .shards
863 0 : .first()
864 0 : .unwrap()
865 0 : .node_secondary
866 0 : .first()
867 0 : .unwrap();
868 0 :
869 0 : println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
870 0 : loop {
871 0 : let (status, progress) = vps_client
872 0 : .tenant_secondary_download(
873 0 : TenantShardId::unsharded(tenant_id),
874 0 : Some(Duration::from_secs(10)),
875 0 : )
876 0 : .await?;
877 0 : println!(
878 0 : "Progress: {}/{} layers, {}/{} bytes",
879 0 : progress.layers_downloaded,
880 0 : progress.layers_total,
881 0 : progress.bytes_downloaded,
882 0 : progress.bytes_total
883 0 : );
884 0 : match status {
885 0 : StatusCode::OK => {
886 0 : println!("Download complete");
887 0 : break;
888 0 : }
889 0 : StatusCode::ACCEPTED => {
890 0 : // Loop
891 0 : }
892 0 : _ => {
893 0 : anyhow::bail!("Unexpected download status: {status}");
894 0 : }
895 0 : }
896 0 : }
897 0 : }
898 0 : Command::TenantDrop { tenant_id, unclean } => {
899 0 : if !unclean {
900 0 : anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.")
901 0 : }
902 0 : storcon_client
903 0 : .dispatch::<(), ()>(
904 0 : Method::POST,
905 0 : format!("debug/v1/tenant/{tenant_id}/drop"),
906 0 : None,
907 0 : )
908 0 : .await?;
909 0 : }
910 0 : Command::NodeDrop { node_id, unclean } => {
911 0 : if !unclean {
912 0 : anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.")
913 0 : }
914 0 : storcon_client
915 0 : .dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
916 0 : .await?;
917 0 : }
918 0 : Command::NodeDelete { node_id } => {
919 0 : storcon_client
920 0 : .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
921 0 : .await?;
922 0 : }
923 0 : Command::TenantSetTimeBasedEviction {
924 0 : tenant_id,
925 0 : period,
926 0 : threshold,
927 0 : } => {
928 0 : vps_client
929 0 : .set_tenant_config(&TenantConfigRequest {
930 0 : tenant_id,
931 0 : config: TenantConfig {
932 0 : eviction_policy: Some(EvictionPolicy::LayerAccessThreshold(
933 0 : EvictionPolicyLayerAccessThreshold {
934 0 : period: period.into(),
935 0 : threshold: threshold.into(),
936 0 : },
937 0 : )),
938 0 : heatmap_period: Some("300s".to_string()),
939 0 : ..Default::default()
940 0 : },
941 0 : })
942 0 : .await?;
943 0 : }
944 0 : Command::BulkMigrate {
945 0 : nodes,
946 0 : concurrency,
947 0 : max_shards,
948 0 : dry_run,
949 0 : } => {
950 0 : // Load the list of nodes, split them up into the drained and filled sets,
951 0 : // and validate that draining is possible.
952 0 : let node_descs = storcon_client
953 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
954 0 : Method::GET,
955 0 : "control/v1/node".to_string(),
956 0 : None,
957 0 : )
958 0 : .await?;
959 0 :
960 0 : let mut node_to_drain_descs = Vec::new();
961 0 : let mut node_to_fill_descs = Vec::new();
962 0 :
963 0 : for desc in node_descs {
964 0 : let to_drain = nodes.iter().any(|id| *id == desc.id);
965 0 : if to_drain {
966 0 : node_to_drain_descs.push(desc);
967 0 : } else {
968 0 : node_to_fill_descs.push(desc);
969 0 : }
970 0 : }
971 0 :
972 0 : if nodes.len() != node_to_drain_descs.len() {
973 0 : anyhow::bail!("Bulk migration requested away from node which doesn't exist.")
974 0 : }
975 0 :
976 0 : node_to_fill_descs.retain(|desc| {
977 0 : matches!(desc.availability, NodeAvailabilityWrapper::Active)
978 0 : && matches!(
979 0 : desc.scheduling,
980 0 : NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
981 0 : )
982 0 : });
983 0 :
984 0 : if node_to_fill_descs.is_empty() {
985 0 : anyhow::bail!("There are no nodes to migrate to")
986 0 : }
987 0 :
988 0 : // Set the node scheduling policy to draining for the nodes which
989 0 : // we plan to drain.
990 0 : for node_desc in node_to_drain_descs.iter() {
991 0 : let req = NodeConfigureRequest {
992 0 : node_id: node_desc.id,
993 0 : availability: None,
994 0 : scheduling: Some(NodeSchedulingPolicy::Draining),
995 0 : };
996 0 :
997 0 : storcon_client
998 0 : .dispatch::<_, ()>(
999 0 : Method::PUT,
1000 0 : format!("control/v1/node/{}/config", node_desc.id),
1001 0 : Some(req),
1002 0 : )
1003 0 : .await?;
1004 0 : }
1005 0 :
1006 0 : // Perform the migration: move each tenant shard scheduled on a node to
1007 0 : // be drained to a node which is being filled. A simple round robin
1008 0 : // strategy is used to pick the new node.
1009 0 : let tenants = storcon_client
1010 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(
1011 0 : Method::GET,
1012 0 : "control/v1/tenant".to_string(),
1013 0 : None,
1014 0 : )
1015 0 : .await?;
1016 0 :
1017 0 : let mut selected_node_idx = 0;
1018 0 :
1019 0 : struct MigrationMove {
1020 0 : tenant_shard_id: TenantShardId,
1021 0 : from: NodeId,
1022 0 : to: NodeId,
1023 0 : }
1024 0 :
1025 0 : let mut moves: Vec<MigrationMove> = Vec::new();
1026 0 :
1027 0 : let shards = tenants
1028 0 : .into_iter()
1029 0 : .flat_map(|tenant| tenant.shards.into_iter());
1030 0 : for shard in shards {
1031 0 : if let Some(max_shards) = max_shards {
1032 0 : if moves.len() >= max_shards {
1033 0 : println!(
1034 0 : "Stop planning shard moves since the requested maximum was reached"
1035 0 : );
1036 0 : break;
1037 0 : }
1038 0 : }
1039 0 :
1040 0 : let should_migrate = {
1041 0 : if let Some(attached_to) = shard.node_attached {
1042 0 : node_to_drain_descs
1043 0 : .iter()
1044 0 : .map(|desc| desc.id)
1045 0 : .any(|id| id == attached_to)
1046 0 : } else {
1047 0 : false
1048 0 : }
1049 0 : };
1050 0 :
1051 0 : if !should_migrate {
1052 0 : continue;
1053 0 : }
1054 0 :
1055 0 : moves.push(MigrationMove {
1056 0 : tenant_shard_id: shard.tenant_shard_id,
1057 0 : from: shard
1058 0 : .node_attached
1059 0 : .expect("We only migrate attached tenant shards"),
1060 0 : to: node_to_fill_descs[selected_node_idx].id,
1061 0 : });
1062 0 : selected_node_idx = (selected_node_idx + 1) % node_to_fill_descs.len();
1063 0 : }
1064 0 :
1065 0 : let total_moves = moves.len();
1066 0 :
1067 0 : if dry_run == Some(true) {
1068 0 : println!("Dryrun requested. Planned {total_moves} moves:");
1069 0 : for mv in &moves {
1070 0 : println!("{}: {} -> {}", mv.tenant_shard_id, mv.from, mv.to)
1071 0 : }
1072 0 :
1073 0 : return Ok(());
1074 0 : }
1075 0 :
1076 0 : const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
1077 0 : let mut stream = futures::stream::iter(moves)
1078 0 : .map(|mv| {
1079 0 : let client = Client::new(cli.api.clone(), cli.jwt.clone());
1080 0 : async move {
1081 0 : client
1082 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
1083 0 : Method::PUT,
1084 0 : format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
1085 0 : Some(TenantShardMigrateRequest { node_id: mv.to }),
1086 0 : )
1087 0 : .await
1088 0 : .map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
1089 0 : }
1090 0 : })
1091 0 : .buffered(concurrency.unwrap_or(DEFAULT_MIGRATE_CONCURRENCY));
1092 0 :
1093 0 : let mut success = 0;
1094 0 : let mut failure = 0;
1095 0 :
1096 0 : while let Some(res) = stream.next().await {
1097 0 : match res {
1098 0 : Ok(_) => {
1099 0 : success += 1;
1100 0 : }
1101 0 : Err((tenant_shard_id, from, to, error)) => {
1102 0 : failure += 1;
1103 0 : println!(
1104 0 : "Failed to migrate {} from node {} to node {}: {}",
1105 0 : tenant_shard_id, from, to, error
1106 0 : );
1107 0 : }
1108 0 : }
1109 0 :
1110 0 : if (success + failure) % 20 == 0 {
1111 0 : println!(
1112 0 : "Processed {}/{} shards: {} succeeded, {} failed",
1113 0 : success + failure,
1114 0 : total_moves,
1115 0 : success,
1116 0 : failure
1117 0 : );
1118 0 : }
1119 0 : }
1120 0 :
1121 0 : println!(
1122 0 : "Processed {}/{} shards: {} succeeded, {} failed",
1123 0 : success + failure,
1124 0 : total_moves,
1125 0 : success,
1126 0 : failure
1127 0 : );
1128 0 : }
1129 0 : Command::StartDrain { node_id } => {
1130 0 : storcon_client
1131 0 : .dispatch::<(), ()>(
1132 0 : Method::PUT,
1133 0 : format!("control/v1/node/{node_id}/drain"),
1134 0 : None,
1135 0 : )
1136 0 : .await?;
1137 0 : println!("Drain started for {node_id}");
1138 0 : }
1139 0 : Command::CancelDrain { node_id, timeout } => {
1140 0 : storcon_client
1141 0 : .dispatch::<(), ()>(
1142 0 : Method::DELETE,
1143 0 : format!("control/v1/node/{node_id}/drain"),
1144 0 : None,
1145 0 : )
1146 0 : .await?;
1147 0 :
1148 0 : println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
1149 0 :
1150 0 : let final_policy =
1151 0 : wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
1152 0 : use NodeSchedulingPolicy::*;
1153 0 : matches!(sched, Active | PauseForRestart)
1154 0 : })
1155 0 : .await?;
1156 0 :
1157 0 : println!(
1158 0 : "Drain was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
1159 0 : );
1160 0 : }
1161 0 : Command::StartFill { node_id } => {
1162 0 : storcon_client
1163 0 : .dispatch::<(), ()>(Method::PUT, format!("control/v1/node/{node_id}/fill"), None)
1164 0 : .await?;
1165 0 :
1166 0 : println!("Fill started for {node_id}");
1167 0 : }
1168 0 : Command::CancelFill { node_id, timeout } => {
1169 0 : storcon_client
1170 0 : .dispatch::<(), ()>(
1171 0 : Method::DELETE,
1172 0 : format!("control/v1/node/{node_id}/fill"),
1173 0 : None,
1174 0 : )
1175 0 : .await?;
1176 0 :
1177 0 : println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
1178 0 :
1179 0 : let final_policy =
1180 0 : wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
1181 0 : use NodeSchedulingPolicy::*;
1182 0 : matches!(sched, Active)
1183 0 : })
1184 0 : .await?;
1185 0 :
1186 0 : println!(
1187 0 : "Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
1188 0 : );
1189 0 : }
1190 0 : Command::Safekeepers {} => {
1191 0 : let mut resp = storcon_client
1192 0 : .dispatch::<(), Vec<SafekeeperDescribeResponse>>(
1193 0 : Method::GET,
1194 0 : "control/v1/safekeeper".to_string(),
1195 0 : None,
1196 0 : )
1197 0 : .await?;
1198 0 :
1199 0 : resp.sort_by(|a, b| a.id.cmp(&b.id));
1200 0 :
1201 0 : let mut table = comfy_table::Table::new();
1202 0 : table.set_header([
1203 0 : "Id",
1204 0 : "Version",
1205 0 : "Host",
1206 0 : "Port",
1207 0 : "Http Port",
1208 0 : "AZ Id",
1209 0 : "Scheduling",
1210 0 : ]);
1211 0 : for sk in resp {
1212 0 : table.add_row([
1213 0 : format!("{}", sk.id),
1214 0 : format!("{}", sk.version),
1215 0 : sk.host,
1216 0 : format!("{}", sk.port),
1217 0 : format!("{}", sk.http_port),
1218 0 : sk.availability_zone_id.clone(),
1219 0 : String::from(sk.scheduling_policy),
1220 0 : ]);
1221 0 : }
1222 0 : println!("{table}");
1223 0 : }
1224 0 : Command::SafekeeperScheduling {
1225 0 : node_id,
1226 0 : scheduling_policy,
1227 0 : } => {
1228 0 : let scheduling_policy = scheduling_policy.0;
1229 0 : storcon_client
1230 0 : .dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
1231 0 : Method::POST,
1232 0 : format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
1233 0 : Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
1234 0 : )
1235 0 : .await?;
1236 0 : println!(
1237 0 : "Scheduling policy of {node_id} set to {}",
1238 0 : String::from(scheduling_policy)
1239 0 : );
1240 0 : }
1241 0 : }
1242 0 :
1243 0 : Ok(())
1244 0 : }
|