Line data Source code
1 : use futures::StreamExt;
2 : use std::{str::FromStr, time::Duration};
3 :
4 : use clap::{Parser, Subcommand};
5 : use pageserver_api::{
6 : controller_api::{
7 : AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
8 : SafekeeperDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
9 : TenantDescribeResponse, TenantPolicyRequest,
10 : },
11 : models::{
12 : EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
13 : ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest,
14 : TenantShardSplitRequest, TenantShardSplitResponse,
15 : },
16 : shard::{ShardStripeSize, TenantShardId},
17 : };
18 : use pageserver_client::mgmt_api::{self};
19 : use reqwest::{Method, StatusCode, Url};
20 : use utils::id::{NodeId, TenantId};
21 :
22 : use pageserver_api::controller_api::{
23 : NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
24 : TenantShardMigrateRequest, TenantShardMigrateResponse,
25 : };
26 : use storage_controller_client::control_api::Client;
27 :
28 : #[derive(Subcommand, Debug)]
29 : enum Command {
30 : /// Register a pageserver with the storage controller. This shouldn't usually be necessary,
31 : /// since pageservers auto-register when they start up
32 : NodeRegister {
33 : #[arg(long)]
34 0 : node_id: NodeId,
35 :
36 : #[arg(long)]
37 0 : listen_pg_addr: String,
38 : #[arg(long)]
39 0 : listen_pg_port: u16,
40 :
41 : #[arg(long)]
42 0 : listen_http_addr: String,
43 : #[arg(long)]
44 0 : listen_http_port: u16,
45 : #[arg(long)]
46 0 : availability_zone_id: String,
47 : },
48 :
49 : /// Modify a node's configuration in the storage controller
50 : NodeConfigure {
51 : #[arg(long)]
52 0 : node_id: NodeId,
53 :
54 : /// Availability is usually auto-detected based on heartbeats. Set 'offline' here to
55 : /// manually mark a node offline
56 : #[arg(long)]
57 : availability: Option<NodeAvailabilityArg>,
58 : /// Scheduling policy controls whether tenant shards may be scheduled onto this node.
59 : #[arg(long)]
60 : scheduling: Option<NodeSchedulingPolicy>,
61 : },
62 : NodeDelete {
63 : #[arg(long)]
64 0 : node_id: NodeId,
65 : },
66 : /// Modify a tenant's policies in the storage controller
67 : TenantPolicy {
68 : #[arg(long)]
69 0 : tenant_id: TenantId,
70 : /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
71 : /// or is in the normal attached state with N secondary locations (`attached:N`)
72 : #[arg(long)]
73 : placement: Option<PlacementPolicyArg>,
74 : /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal,
75 : /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
76 : /// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant
77 : /// unavailable, and are only for use in emergencies.
78 : #[arg(long)]
79 : scheduling: Option<ShardSchedulingPolicyArg>,
80 : },
81 : /// List nodes known to the storage controller
82 : Nodes {},
83 : /// List tenants known to the storage controller
84 : Tenants {
85 : /// If this field is set, it will list the tenants on a specific node
86 : node_id: Option<NodeId>,
87 : },
88 : /// Create a new tenant in the storage controller, and by extension on pageservers.
89 : TenantCreate {
90 : #[arg(long)]
91 0 : tenant_id: TenantId,
92 : },
93 : /// Delete a tenant in the storage controller, and by extension on pageservers.
94 : TenantDelete {
95 : #[arg(long)]
96 0 : tenant_id: TenantId,
97 : },
98 : /// Split an existing tenant into a higher number of shards than its current shard count.
99 : TenantShardSplit {
100 : #[arg(long)]
101 0 : tenant_id: TenantId,
102 : #[arg(long)]
103 0 : shard_count: u8,
104 : /// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes.
105 : #[arg(long)]
106 : stripe_size: Option<u32>,
107 : },
108 : /// Migrate the attached location for a tenant shard to a specific pageserver.
109 : TenantShardMigrate {
110 : #[arg(long)]
111 0 : tenant_shard_id: TenantShardId,
112 : #[arg(long)]
113 0 : node: NodeId,
114 : },
115 : /// Cancel any ongoing reconciliation for this shard
116 : TenantShardCancelReconcile {
117 : #[arg(long)]
118 0 : tenant_shard_id: TenantShardId,
119 : },
120 : /// Set the pageserver tenant configuration of a tenant: this is the configuration structure
121 : /// that is passed through to pageservers, and does not affect storage controller behavior.
122 : /// Any previous tenant configs are overwritten.
123 : SetTenantConfig {
124 : #[arg(long)]
125 0 : tenant_id: TenantId,
126 : #[arg(long)]
127 0 : config: String,
128 : },
129 : /// Patch the pageserver tenant configuration of a tenant. Any fields with null values in the
130 : /// provided JSON are unset from the tenant config and all fields with non-null values are set.
131 : /// Unspecified fields are not changed.
132 : PatchTenantConfig {
133 : #[arg(long)]
134 0 : tenant_id: TenantId,
135 : #[arg(long)]
136 0 : config: String,
137 : },
138 : /// Print details about a particular tenant, including all its shards' states.
139 : TenantDescribe {
140 : #[arg(long)]
141 0 : tenant_id: TenantId,
142 : },
143 : /// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
144 : /// mode so that it can warm up content on a pageserver.
145 : TenantWarmup {
146 : #[arg(long)]
147 0 : tenant_id: TenantId,
148 : },
149 : /// Uncleanly drop a tenant from the storage controller: this doesn't delete anything from pageservers. Appropriate
150 : /// if you e.g. used `tenant-warmup` by mistake on a tenant ID that doesn't really exist, or is in some other region.
151 : TenantDrop {
152 : #[arg(long)]
153 0 : tenant_id: TenantId,
154 : #[arg(long)]
155 0 : unclean: bool,
156 : },
157 : NodeDrop {
158 : #[arg(long)]
159 0 : node_id: NodeId,
160 : #[arg(long)]
161 0 : unclean: bool,
162 : },
163 : TenantSetTimeBasedEviction {
164 : #[arg(long)]
165 0 : tenant_id: TenantId,
166 : #[arg(long)]
167 0 : period: humantime::Duration,
168 : #[arg(long)]
169 0 : threshold: humantime::Duration,
170 : },
171 : // Migrate away from a set of specified pageservers by moving the primary attachments to pageservers
172 : // outside of the specified set.
173 : BulkMigrate {
174 : // Set of pageserver node ids to drain.
175 : #[arg(long)]
176 0 : nodes: Vec<NodeId>,
177 : // Optional: migration concurrency (default is 8)
178 : #[arg(long)]
179 : concurrency: Option<usize>,
180 : // Optional: maximum number of shards to migrate
181 : #[arg(long)]
182 : max_shards: Option<usize>,
183 : // Optional: when set to true, nothing is migrated, but the plan is printed to stdout
184 : #[arg(long)]
185 : dry_run: Option<bool>,
186 : },
187 : /// Start draining the specified pageserver.
188 : /// The drain is complete when the schedulling policy returns to active.
189 : StartDrain {
190 : #[arg(long)]
191 0 : node_id: NodeId,
192 : },
193 : /// Cancel draining the specified pageserver and wait for `timeout`
194 : /// for the operation to be canceled. May be retried.
195 : CancelDrain {
196 : #[arg(long)]
197 0 : node_id: NodeId,
198 : #[arg(long)]
199 0 : timeout: humantime::Duration,
200 : },
201 : /// Start filling the specified pageserver.
202 : /// The drain is complete when the schedulling policy returns to active.
203 : StartFill {
204 : #[arg(long)]
205 0 : node_id: NodeId,
206 : },
207 : /// Cancel filling the specified pageserver and wait for `timeout`
208 : /// for the operation to be canceled. May be retried.
209 : CancelFill {
210 : #[arg(long)]
211 0 : node_id: NodeId,
212 : #[arg(long)]
213 0 : timeout: humantime::Duration,
214 : },
215 : /// List safekeepers known to the storage controller
216 : Safekeepers {},
217 : }
218 :
219 : #[derive(Parser)]
220 : #[command(
221 : author,
222 : version,
223 : about,
224 : long_about = "CLI for Storage Controller Support/Debug"
225 : )]
226 : #[command(arg_required_else_help(true))]
227 : struct Cli {
228 : #[arg(long)]
229 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
230 0 : api: Url,
231 :
232 : #[arg(long)]
233 : /// JWT token for authenticating with storage controller. Depending on the API used, this
234 : /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
235 : /// a token with both scopes to use with this tool.
236 : jwt: Option<String>,
237 :
238 : #[command(subcommand)]
239 : command: Command,
240 : }
241 :
242 : #[derive(Debug, Clone)]
243 : struct PlacementPolicyArg(PlacementPolicy);
244 :
245 : impl FromStr for PlacementPolicyArg {
246 : type Err = anyhow::Error;
247 :
248 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
249 0 : match s {
250 0 : "detached" => Ok(Self(PlacementPolicy::Detached)),
251 0 : "secondary" => Ok(Self(PlacementPolicy::Secondary)),
252 0 : _ if s.starts_with("attached:") => {
253 0 : let mut splitter = s.split(':');
254 0 : let _prefix = splitter.next().unwrap();
255 0 : match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
256 0 : Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
257 0 : None => Err(anyhow::anyhow!(
258 0 : "Invalid format '{s}', a valid example is 'attached:1'"
259 0 : )),
260 : }
261 : }
262 0 : _ => Err(anyhow::anyhow!(
263 0 : "Unknown placement policy '{s}', try detached,secondary,attached:<n>"
264 0 : )),
265 : }
266 0 : }
267 : }
268 :
269 : #[derive(Debug, Clone)]
270 : struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
271 :
272 : impl FromStr for ShardSchedulingPolicyArg {
273 : type Err = anyhow::Error;
274 :
275 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
276 0 : match s {
277 0 : "active" => Ok(Self(ShardSchedulingPolicy::Active)),
278 0 : "essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
279 0 : "pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
280 0 : "stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
281 0 : _ => Err(anyhow::anyhow!(
282 0 : "Unknown scheduling policy '{s}', try active,essential,pause,stop"
283 0 : )),
284 : }
285 0 : }
286 : }
287 :
288 : #[derive(Debug, Clone)]
289 : struct NodeAvailabilityArg(NodeAvailabilityWrapper);
290 :
291 : impl FromStr for NodeAvailabilityArg {
292 : type Err = anyhow::Error;
293 :
294 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
295 0 : match s {
296 0 : "active" => Ok(Self(NodeAvailabilityWrapper::Active)),
297 0 : "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
298 0 : _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
299 : }
300 0 : }
301 : }
302 :
303 0 : async fn wait_for_scheduling_policy<F>(
304 0 : client: Client,
305 0 : node_id: NodeId,
306 0 : timeout: Duration,
307 0 : f: F,
308 0 : ) -> anyhow::Result<NodeSchedulingPolicy>
309 0 : where
310 0 : F: Fn(NodeSchedulingPolicy) -> bool,
311 0 : {
312 0 : let waiter = tokio::time::timeout(timeout, async move {
313 : loop {
314 0 : let node = client
315 0 : .dispatch::<(), NodeDescribeResponse>(
316 0 : Method::GET,
317 0 : format!("control/v1/node/{node_id}"),
318 0 : None,
319 0 : )
320 0 : .await?;
321 :
322 0 : if f(node.scheduling) {
323 0 : return Ok::<NodeSchedulingPolicy, mgmt_api::Error>(node.scheduling);
324 0 : }
325 : }
326 0 : });
327 0 :
328 0 : Ok(waiter.await??)
329 0 : }
330 :
331 : #[tokio::main]
332 0 : async fn main() -> anyhow::Result<()> {
333 0 : let cli = Cli::parse();
334 0 :
335 0 : let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
336 0 :
337 0 : let mut trimmed = cli.api.to_string();
338 0 : trimmed.pop();
339 0 : let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
340 0 :
341 0 : match cli.command {
342 0 : Command::NodeRegister {
343 0 : node_id,
344 0 : listen_pg_addr,
345 0 : listen_pg_port,
346 0 : listen_http_addr,
347 0 : listen_http_port,
348 0 : availability_zone_id,
349 0 : } => {
350 0 : storcon_client
351 0 : .dispatch::<_, ()>(
352 0 : Method::POST,
353 0 : "control/v1/node".to_string(),
354 0 : Some(NodeRegisterRequest {
355 0 : node_id,
356 0 : listen_pg_addr,
357 0 : listen_pg_port,
358 0 : listen_http_addr,
359 0 : listen_http_port,
360 0 : availability_zone_id: AvailabilityZone(availability_zone_id),
361 0 : }),
362 0 : )
363 0 : .await?;
364 0 : }
365 0 : Command::TenantCreate { tenant_id } => {
366 0 : storcon_client
367 0 : .dispatch::<_, ()>(
368 0 : Method::POST,
369 0 : "v1/tenant".to_string(),
370 0 : Some(TenantCreateRequest {
371 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
372 0 : generation: None,
373 0 : shard_parameters: ShardParameters::default(),
374 0 : placement_policy: Some(PlacementPolicy::Attached(1)),
375 0 : config: TenantConfig::default(),
376 0 : }),
377 0 : )
378 0 : .await?;
379 0 : }
380 0 : Command::TenantDelete { tenant_id } => {
381 0 : let status = vps_client
382 0 : .tenant_delete(TenantShardId::unsharded(tenant_id))
383 0 : .await?;
384 0 : tracing::info!("Delete status: {}", status);
385 0 : }
386 0 : Command::Nodes {} => {
387 0 : let mut resp = storcon_client
388 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
389 0 : Method::GET,
390 0 : "control/v1/node".to_string(),
391 0 : None,
392 0 : )
393 0 : .await?;
394 0 :
395 0 : resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
396 0 :
397 0 : let mut table = comfy_table::Table::new();
398 0 : table.set_header(["Id", "Hostname", "Scheduling", "Availability"]);
399 0 : for node in resp {
400 0 : table.add_row([
401 0 : format!("{}", node.id),
402 0 : node.listen_http_addr,
403 0 : format!("{:?}", node.scheduling),
404 0 : format!("{:?}", node.availability),
405 0 : ]);
406 0 : }
407 0 : println!("{table}");
408 0 : }
409 0 : Command::NodeConfigure {
410 0 : node_id,
411 0 : availability,
412 0 : scheduling,
413 0 : } => {
414 0 : let req = NodeConfigureRequest {
415 0 : node_id,
416 0 : availability: availability.map(|a| a.0),
417 0 : scheduling,
418 0 : };
419 0 : storcon_client
420 0 : .dispatch::<_, ()>(
421 0 : Method::PUT,
422 0 : format!("control/v1/node/{node_id}/config"),
423 0 : Some(req),
424 0 : )
425 0 : .await?;
426 0 : }
427 0 : Command::Tenants {
428 0 : node_id: Some(node_id),
429 0 : } => {
430 0 : let describe_response = storcon_client
431 0 : .dispatch::<(), NodeShardResponse>(
432 0 : Method::GET,
433 0 : format!("control/v1/node/{node_id}/shards"),
434 0 : None,
435 0 : )
436 0 : .await?;
437 0 : let shards = describe_response.shards;
438 0 : let mut table = comfy_table::Table::new();
439 0 : table.set_header([
440 0 : "Shard",
441 0 : "Intended Primary/Secondary",
442 0 : "Observed Primary/Secondary",
443 0 : ]);
444 0 : for shard in shards {
445 0 : table.add_row([
446 0 : format!("{}", shard.tenant_shard_id),
447 0 : match shard.is_intended_secondary {
448 0 : None => "".to_string(),
449 0 : Some(true) => "Secondary".to_string(),
450 0 : Some(false) => "Primary".to_string(),
451 0 : },
452 0 : match shard.is_observed_secondary {
453 0 : None => "".to_string(),
454 0 : Some(true) => "Secondary".to_string(),
455 0 : Some(false) => "Primary".to_string(),
456 0 : },
457 0 : ]);
458 0 : }
459 0 : println!("{table}");
460 0 : }
461 0 : Command::Tenants { node_id: None } => {
462 0 : let mut resp = storcon_client
463 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(
464 0 : Method::GET,
465 0 : "control/v1/tenant".to_string(),
466 0 : None,
467 0 : )
468 0 : .await?;
469 0 :
470 0 : resp.sort_by(|a, b| a.tenant_id.cmp(&b.tenant_id));
471 0 :
472 0 : let mut table = comfy_table::Table::new();
473 0 : table.set_header([
474 0 : "TenantId",
475 0 : "ShardCount",
476 0 : "StripeSize",
477 0 : "Placement",
478 0 : "Scheduling",
479 0 : ]);
480 0 : for tenant in resp {
481 0 : let shard_zero = tenant.shards.into_iter().next().unwrap();
482 0 : table.add_row([
483 0 : format!("{}", tenant.tenant_id),
484 0 : format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
485 0 : format!("{:?}", tenant.stripe_size),
486 0 : format!("{:?}", tenant.policy),
487 0 : format!("{:?}", shard_zero.scheduling_policy),
488 0 : ]);
489 0 : }
490 0 :
491 0 : println!("{table}");
492 0 : }
493 0 : Command::TenantPolicy {
494 0 : tenant_id,
495 0 : placement,
496 0 : scheduling,
497 0 : } => {
498 0 : let req = TenantPolicyRequest {
499 0 : scheduling: scheduling.map(|s| s.0),
500 0 : placement: placement.map(|p| p.0),
501 0 : };
502 0 : storcon_client
503 0 : .dispatch::<_, ()>(
504 0 : Method::PUT,
505 0 : format!("control/v1/tenant/{tenant_id}/policy"),
506 0 : Some(req),
507 0 : )
508 0 : .await?;
509 0 : }
510 0 : Command::TenantShardSplit {
511 0 : tenant_id,
512 0 : shard_count,
513 0 : stripe_size,
514 0 : } => {
515 0 : let req = TenantShardSplitRequest {
516 0 : new_shard_count: shard_count,
517 0 : new_stripe_size: stripe_size.map(ShardStripeSize),
518 0 : };
519 0 :
520 0 : let response = storcon_client
521 0 : .dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
522 0 : Method::PUT,
523 0 : format!("control/v1/tenant/{tenant_id}/shard_split"),
524 0 : Some(req),
525 0 : )
526 0 : .await?;
527 0 : println!(
528 0 : "Split tenant {} into {} shards: {}",
529 0 : tenant_id,
530 0 : shard_count,
531 0 : response
532 0 : .new_shards
533 0 : .iter()
534 0 : .map(|s| format!("{:?}", s))
535 0 : .collect::<Vec<_>>()
536 0 : .join(",")
537 0 : );
538 0 : }
539 0 : Command::TenantShardMigrate {
540 0 : tenant_shard_id,
541 0 : node,
542 0 : } => {
543 0 : let req = TenantShardMigrateRequest {
544 0 : tenant_shard_id,
545 0 : node_id: node,
546 0 : };
547 0 :
548 0 : storcon_client
549 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
550 0 : Method::PUT,
551 0 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
552 0 : Some(req),
553 0 : )
554 0 : .await?;
555 0 : }
556 0 : Command::TenantShardCancelReconcile { tenant_shard_id } => {
557 0 : storcon_client
558 0 : .dispatch::<(), ()>(
559 0 : Method::PUT,
560 0 : format!("control/v1/tenant/{tenant_shard_id}/cancel_reconcile"),
561 0 : None,
562 0 : )
563 0 : .await?;
564 0 : }
565 0 : Command::SetTenantConfig { tenant_id, config } => {
566 0 : let tenant_conf = serde_json::from_str(&config)?;
567 0 :
568 0 : vps_client
569 0 : .set_tenant_config(&TenantConfigRequest {
570 0 : tenant_id,
571 0 : config: tenant_conf,
572 0 : })
573 0 : .await?;
574 0 : }
575 0 : Command::PatchTenantConfig { tenant_id, config } => {
576 0 : let tenant_conf = serde_json::from_str(&config)?;
577 0 :
578 0 : vps_client
579 0 : .patch_tenant_config(&TenantConfigPatchRequest {
580 0 : tenant_id,
581 0 : config: tenant_conf,
582 0 : })
583 0 : .await?;
584 0 : }
585 0 : Command::TenantDescribe { tenant_id } => {
586 0 : let TenantDescribeResponse {
587 0 : tenant_id,
588 0 : shards,
589 0 : stripe_size,
590 0 : policy,
591 0 : config,
592 0 : } = storcon_client
593 0 : .dispatch::<(), TenantDescribeResponse>(
594 0 : Method::GET,
595 0 : format!("control/v1/tenant/{tenant_id}"),
596 0 : None,
597 0 : )
598 0 : .await?;
599 0 : println!("Tenant {tenant_id}");
600 0 : let mut table = comfy_table::Table::new();
601 0 : table.add_row(["Policy", &format!("{:?}", policy)]);
602 0 : table.add_row(["Stripe size", &format!("{:?}", stripe_size)]);
603 0 : table.add_row(["Config", &serde_json::to_string_pretty(&config).unwrap()]);
604 0 : println!("{table}");
605 0 : println!("Shards:");
606 0 : let mut table = comfy_table::Table::new();
607 0 : table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]);
608 0 : for shard in shards {
609 0 : let secondary = shard
610 0 : .node_secondary
611 0 : .iter()
612 0 : .map(|n| format!("{}", n))
613 0 : .collect::<Vec<_>>()
614 0 : .join(",");
615 0 :
616 0 : let mut status_parts = Vec::new();
617 0 : if shard.is_reconciling {
618 0 : status_parts.push("reconciling");
619 0 : }
620 0 :
621 0 : if shard.is_pending_compute_notification {
622 0 : status_parts.push("pending_compute");
623 0 : }
624 0 :
625 0 : if shard.is_splitting {
626 0 : status_parts.push("splitting");
627 0 : }
628 0 : let status = status_parts.join(",");
629 0 :
630 0 : table.add_row([
631 0 : format!("{}", shard.tenant_shard_id),
632 0 : shard
633 0 : .node_attached
634 0 : .map(|n| format!("{}", n))
635 0 : .unwrap_or(String::new()),
636 0 : secondary,
637 0 : shard.last_error,
638 0 : status,
639 0 : ]);
640 0 : }
641 0 : println!("{table}");
642 0 : }
643 0 : Command::TenantWarmup { tenant_id } => {
644 0 : let describe_response = storcon_client
645 0 : .dispatch::<(), TenantDescribeResponse>(
646 0 : Method::GET,
647 0 : format!("control/v1/tenant/{tenant_id}"),
648 0 : None,
649 0 : )
650 0 : .await;
651 0 : match describe_response {
652 0 : Ok(describe) => {
653 0 : if matches!(describe.policy, PlacementPolicy::Secondary) {
654 0 : // Fine: it's already known to controller in secondary mode: calling
655 0 : // again to put it into secondary mode won't cause problems.
656 0 : } else {
657 0 : anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
658 0 : }
659 0 : }
660 0 : Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
661 0 : // Fine: this tenant isn't know to the storage controller yet.
662 0 : }
663 0 : Err(e) => {
664 0 : // Unexpected API error
665 0 : return Err(e.into());
666 0 : }
667 0 : }
668 0 :
669 0 : vps_client
670 0 : .location_config(
671 0 : TenantShardId::unsharded(tenant_id),
672 0 : pageserver_api::models::LocationConfig {
673 0 : mode: pageserver_api::models::LocationConfigMode::Secondary,
674 0 : generation: None,
675 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
676 0 : shard_number: 0,
677 0 : shard_count: 0,
678 0 : shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
679 0 : tenant_conf: TenantConfig::default(),
680 0 : },
681 0 : None,
682 0 : true,
683 0 : )
684 0 : .await?;
685 0 :
686 0 : let describe_response = storcon_client
687 0 : .dispatch::<(), TenantDescribeResponse>(
688 0 : Method::GET,
689 0 : format!("control/v1/tenant/{tenant_id}"),
690 0 : None,
691 0 : )
692 0 : .await?;
693 0 :
694 0 : let secondary_ps_id = describe_response
695 0 : .shards
696 0 : .first()
697 0 : .unwrap()
698 0 : .node_secondary
699 0 : .first()
700 0 : .unwrap();
701 0 :
702 0 : println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
703 0 : loop {
704 0 : let (status, progress) = vps_client
705 0 : .tenant_secondary_download(
706 0 : TenantShardId::unsharded(tenant_id),
707 0 : Some(Duration::from_secs(10)),
708 0 : )
709 0 : .await?;
710 0 : println!(
711 0 : "Progress: {}/{} layers, {}/{} bytes",
712 0 : progress.layers_downloaded,
713 0 : progress.layers_total,
714 0 : progress.bytes_downloaded,
715 0 : progress.bytes_total
716 0 : );
717 0 : match status {
718 0 : StatusCode::OK => {
719 0 : println!("Download complete");
720 0 : break;
721 0 : }
722 0 : StatusCode::ACCEPTED => {
723 0 : // Loop
724 0 : }
725 0 : _ => {
726 0 : anyhow::bail!("Unexpected download status: {status}");
727 0 : }
728 0 : }
729 0 : }
730 0 : }
731 0 : Command::TenantDrop { tenant_id, unclean } => {
732 0 : if !unclean {
733 0 : anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.")
734 0 : }
735 0 : storcon_client
736 0 : .dispatch::<(), ()>(
737 0 : Method::POST,
738 0 : format!("debug/v1/tenant/{tenant_id}/drop"),
739 0 : None,
740 0 : )
741 0 : .await?;
742 0 : }
743 0 : Command::NodeDrop { node_id, unclean } => {
744 0 : if !unclean {
745 0 : anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.")
746 0 : }
747 0 : storcon_client
748 0 : .dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
749 0 : .await?;
750 0 : }
751 0 : Command::NodeDelete { node_id } => {
752 0 : storcon_client
753 0 : .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
754 0 : .await?;
755 0 : }
756 0 : Command::TenantSetTimeBasedEviction {
757 0 : tenant_id,
758 0 : period,
759 0 : threshold,
760 0 : } => {
761 0 : vps_client
762 0 : .set_tenant_config(&TenantConfigRequest {
763 0 : tenant_id,
764 0 : config: TenantConfig {
765 0 : eviction_policy: Some(EvictionPolicy::LayerAccessThreshold(
766 0 : EvictionPolicyLayerAccessThreshold {
767 0 : period: period.into(),
768 0 : threshold: threshold.into(),
769 0 : },
770 0 : )),
771 0 : heatmap_period: Some("300s".to_string()),
772 0 : ..Default::default()
773 0 : },
774 0 : })
775 0 : .await?;
776 0 : }
777 0 : Command::BulkMigrate {
778 0 : nodes,
779 0 : concurrency,
780 0 : max_shards,
781 0 : dry_run,
782 0 : } => {
783 0 : // Load the list of nodes, split them up into the drained and filled sets,
784 0 : // and validate that draining is possible.
785 0 : let node_descs = storcon_client
786 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
787 0 : Method::GET,
788 0 : "control/v1/node".to_string(),
789 0 : None,
790 0 : )
791 0 : .await?;
792 0 :
793 0 : let mut node_to_drain_descs = Vec::new();
794 0 : let mut node_to_fill_descs = Vec::new();
795 0 :
796 0 : for desc in node_descs {
797 0 : let to_drain = nodes.iter().any(|id| *id == desc.id);
798 0 : if to_drain {
799 0 : node_to_drain_descs.push(desc);
800 0 : } else {
801 0 : node_to_fill_descs.push(desc);
802 0 : }
803 0 : }
804 0 :
805 0 : if nodes.len() != node_to_drain_descs.len() {
806 0 : anyhow::bail!("Bulk migration requested away from node which doesn't exist.")
807 0 : }
808 0 :
809 0 : node_to_fill_descs.retain(|desc| {
810 0 : matches!(desc.availability, NodeAvailabilityWrapper::Active)
811 0 : && matches!(
812 0 : desc.scheduling,
813 0 : NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
814 0 : )
815 0 : });
816 0 :
817 0 : if node_to_fill_descs.is_empty() {
818 0 : anyhow::bail!("There are no nodes to migrate to")
819 0 : }
820 0 :
821 0 : // Set the node scheduling policy to draining for the nodes which
822 0 : // we plan to drain.
823 0 : for node_desc in node_to_drain_descs.iter() {
824 0 : let req = NodeConfigureRequest {
825 0 : node_id: node_desc.id,
826 0 : availability: None,
827 0 : scheduling: Some(NodeSchedulingPolicy::Draining),
828 0 : };
829 0 :
830 0 : storcon_client
831 0 : .dispatch::<_, ()>(
832 0 : Method::PUT,
833 0 : format!("control/v1/node/{}/config", node_desc.id),
834 0 : Some(req),
835 0 : )
836 0 : .await?;
837 0 : }
838 0 :
839 0 : // Perform the migration: move each tenant shard scheduled on a node to
840 0 : // be drained to a node which is being filled. A simple round robin
841 0 : // strategy is used to pick the new node.
842 0 : let tenants = storcon_client
843 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(
844 0 : Method::GET,
845 0 : "control/v1/tenant".to_string(),
846 0 : None,
847 0 : )
848 0 : .await?;
849 0 :
850 0 : let mut selected_node_idx = 0;
851 0 :
852 0 : struct MigrationMove {
853 0 : tenant_shard_id: TenantShardId,
854 0 : from: NodeId,
855 0 : to: NodeId,
856 0 : }
857 0 :
858 0 : let mut moves: Vec<MigrationMove> = Vec::new();
859 0 :
860 0 : let shards = tenants
861 0 : .into_iter()
862 0 : .flat_map(|tenant| tenant.shards.into_iter());
863 0 : for shard in shards {
864 0 : if let Some(max_shards) = max_shards {
865 0 : if moves.len() >= max_shards {
866 0 : println!(
867 0 : "Stop planning shard moves since the requested maximum was reached"
868 0 : );
869 0 : break;
870 0 : }
871 0 : }
872 0 :
873 0 : let should_migrate = {
874 0 : if let Some(attached_to) = shard.node_attached {
875 0 : node_to_drain_descs
876 0 : .iter()
877 0 : .map(|desc| desc.id)
878 0 : .any(|id| id == attached_to)
879 0 : } else {
880 0 : false
881 0 : }
882 0 : };
883 0 :
884 0 : if !should_migrate {
885 0 : continue;
886 0 : }
887 0 :
888 0 : moves.push(MigrationMove {
889 0 : tenant_shard_id: shard.tenant_shard_id,
890 0 : from: shard
891 0 : .node_attached
892 0 : .expect("We only migrate attached tenant shards"),
893 0 : to: node_to_fill_descs[selected_node_idx].id,
894 0 : });
895 0 : selected_node_idx = (selected_node_idx + 1) % node_to_fill_descs.len();
896 0 : }
897 0 :
898 0 : let total_moves = moves.len();
899 0 :
900 0 : if dry_run == Some(true) {
901 0 : println!("Dryrun requested. Planned {total_moves} moves:");
902 0 : for mv in &moves {
903 0 : println!("{}: {} -> {}", mv.tenant_shard_id, mv.from, mv.to)
904 0 : }
905 0 :
906 0 : return Ok(());
907 0 : }
908 0 :
909 0 : const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
910 0 : let mut stream = futures::stream::iter(moves)
911 0 : .map(|mv| {
912 0 : let client = Client::new(cli.api.clone(), cli.jwt.clone());
913 0 : async move {
914 0 : client
915 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
916 0 : Method::PUT,
917 0 : format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
918 0 : Some(TenantShardMigrateRequest {
919 0 : tenant_shard_id: mv.tenant_shard_id,
920 0 : node_id: mv.to,
921 0 : }),
922 0 : )
923 0 : .await
924 0 : .map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
925 0 : }
926 0 : })
927 0 : .buffered(concurrency.unwrap_or(DEFAULT_MIGRATE_CONCURRENCY));
928 0 :
929 0 : let mut success = 0;
930 0 : let mut failure = 0;
931 0 :
932 0 : while let Some(res) = stream.next().await {
933 0 : match res {
934 0 : Ok(_) => {
935 0 : success += 1;
936 0 : }
937 0 : Err((tenant_shard_id, from, to, error)) => {
938 0 : failure += 1;
939 0 : println!(
940 0 : "Failed to migrate {} from node {} to node {}: {}",
941 0 : tenant_shard_id, from, to, error
942 0 : );
943 0 : }
944 0 : }
945 0 :
946 0 : if (success + failure) % 20 == 0 {
947 0 : println!(
948 0 : "Processed {}/{} shards: {} succeeded, {} failed",
949 0 : success + failure,
950 0 : total_moves,
951 0 : success,
952 0 : failure
953 0 : );
954 0 : }
955 0 : }
956 0 :
957 0 : println!(
958 0 : "Processed {}/{} shards: {} succeeded, {} failed",
959 0 : success + failure,
960 0 : total_moves,
961 0 : success,
962 0 : failure
963 0 : );
964 0 : }
965 0 : Command::StartDrain { node_id } => {
966 0 : storcon_client
967 0 : .dispatch::<(), ()>(
968 0 : Method::PUT,
969 0 : format!("control/v1/node/{node_id}/drain"),
970 0 : None,
971 0 : )
972 0 : .await?;
973 0 : println!("Drain started for {node_id}");
974 0 : }
975 0 : Command::CancelDrain { node_id, timeout } => {
976 0 : storcon_client
977 0 : .dispatch::<(), ()>(
978 0 : Method::DELETE,
979 0 : format!("control/v1/node/{node_id}/drain"),
980 0 : None,
981 0 : )
982 0 : .await?;
983 0 :
984 0 : println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
985 0 :
986 0 : let final_policy =
987 0 : wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
988 0 : use NodeSchedulingPolicy::*;
989 0 : matches!(sched, Active | PauseForRestart)
990 0 : })
991 0 : .await?;
992 0 :
993 0 : println!(
994 0 : "Drain was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
995 0 : );
996 0 : }
997 0 : Command::StartFill { node_id } => {
998 0 : storcon_client
999 0 : .dispatch::<(), ()>(Method::PUT, format!("control/v1/node/{node_id}/fill"), None)
1000 0 : .await?;
1001 0 :
1002 0 : println!("Fill started for {node_id}");
1003 0 : }
1004 0 : Command::CancelFill { node_id, timeout } => {
1005 0 : storcon_client
1006 0 : .dispatch::<(), ()>(
1007 0 : Method::DELETE,
1008 0 : format!("control/v1/node/{node_id}/fill"),
1009 0 : None,
1010 0 : )
1011 0 : .await?;
1012 0 :
1013 0 : println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
1014 0 :
1015 0 : let final_policy =
1016 0 : wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
1017 0 : use NodeSchedulingPolicy::*;
1018 0 : matches!(sched, Active)
1019 0 : })
1020 0 : .await?;
1021 0 :
1022 0 : println!(
1023 0 : "Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
1024 0 : );
1025 0 : }
1026 0 : Command::Safekeepers {} => {
1027 0 : let mut resp = storcon_client
1028 0 : .dispatch::<(), Vec<SafekeeperDescribeResponse>>(
1029 0 : Method::GET,
1030 0 : "control/v1/safekeeper".to_string(),
1031 0 : None,
1032 0 : )
1033 0 : .await?;
1034 0 :
1035 0 : resp.sort_by(|a, b| a.id.cmp(&b.id));
1036 0 :
1037 0 : let mut table = comfy_table::Table::new();
1038 0 : table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]);
1039 0 : for sk in resp {
1040 0 : table.add_row([
1041 0 : format!("{}", sk.id),
1042 0 : format!("{}", sk.version),
1043 0 : sk.host,
1044 0 : format!("{}", sk.port),
1045 0 : format!("{}", sk.http_port),
1046 0 : sk.availability_zone_id.to_string(),
1047 0 : ]);
1048 0 : }
1049 0 : println!("{table}");
1050 0 : }
1051 0 : }
1052 0 :
1053 0 : Ok(())
1054 0 : }
|