Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::path::PathBuf;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 :
6 : use clap::{Parser, Subcommand};
7 : use futures::StreamExt;
8 : use pageserver_api::controller_api::{
9 : AvailabilityZone, MigrationConfig, NodeAvailabilityWrapper, NodeConfigureRequest,
10 : NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse,
11 : PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest,
12 : ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
13 : SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
14 : TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest,
15 : };
16 : use pageserver_api::models::{
17 : EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig,
18 : TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest,
19 : TenantShardSplitResponse,
20 : };
21 : use pageserver_api::shard::{ShardStripeSize, TenantShardId};
22 : use pageserver_client::mgmt_api::{self};
23 : use reqwest::{Certificate, Method, StatusCode, Url};
24 : use safekeeper_api::models::TimelineLocateResponse;
25 : use storage_controller_client::control_api::Client;
26 : use utils::id::{NodeId, TenantId, TimelineId};
27 :
28 : #[derive(Subcommand, Debug)]
29 : enum Command {
30 : /// Register a pageserver with the storage controller. This shouldn't usually be necessary,
31 : /// since pageservers auto-register when they start up
32 : NodeRegister {
33 : #[arg(long)]
34 : node_id: NodeId,
35 :
36 : #[arg(long)]
37 : listen_pg_addr: String,
38 : #[arg(long)]
39 : listen_pg_port: u16,
40 : #[arg(long)]
41 : listen_grpc_addr: Option<String>,
42 : #[arg(long)]
43 : listen_grpc_port: Option<u16>,
44 :
45 : #[arg(long)]
46 : listen_http_addr: String,
47 : #[arg(long)]
48 : listen_http_port: u16,
49 : #[arg(long)]
50 : listen_https_port: Option<u16>,
51 :
52 : #[arg(long)]
53 : availability_zone_id: String,
54 : },
55 :
56 : /// Modify a node's configuration in the storage controller
57 : NodeConfigure {
58 : #[arg(long)]
59 : node_id: NodeId,
60 :
61 : /// Availability is usually auto-detected based on heartbeats. Set 'offline' here to
62 : /// manually mark a node offline
63 : #[arg(long)]
64 : availability: Option<NodeAvailabilityArg>,
65 : /// Scheduling policy controls whether tenant shards may be scheduled onto this node.
66 : #[arg(long)]
67 : scheduling: Option<NodeSchedulingPolicy>,
68 : },
69 : /// Exists for backup usage and will be removed in future.
70 : /// Use [`Command::NodeStartDelete`] instead, if possible.
71 : NodeDelete {
72 : #[arg(long)]
73 : node_id: NodeId,
74 : },
75 : /// Start deletion of the specified pageserver.
76 : NodeStartDelete {
77 : #[arg(long)]
78 : node_id: NodeId,
79 : /// When `force` is true, skip waiting for shards to prewarm during migration.
80 : /// This can significantly speed up node deletion since prewarming all shards
81 : /// can take considerable time, but may result in slower initial access to
82 : /// migrated shards until they warm up naturally.
83 : #[arg(long)]
84 : force: bool,
85 : },
86 : /// Cancel deletion of the specified pageserver and wait for `timeout`
87 : /// for the operation to be canceled. May be retried.
88 : NodeCancelDelete {
89 : #[arg(long)]
90 : node_id: NodeId,
91 : #[arg(long)]
92 : timeout: humantime::Duration,
93 : },
94 : /// Delete a tombstone of node from the storage controller.
95 : /// This is used when we want to allow the node to be re-registered.
96 : NodeDeleteTombstone {
97 : #[arg(long)]
98 : node_id: NodeId,
99 : },
100 : /// Modify a tenant's policies in the storage controller
101 : TenantPolicy {
102 : #[arg(long)]
103 : tenant_id: TenantId,
104 : /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
105 : /// or is in the normal attached state with N secondary locations (`attached:N`)
106 : #[arg(long)]
107 : placement: Option<PlacementPolicyArg>,
108 : /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal,
109 : /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
110 : /// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant
111 : /// unavailable, and are only for use in emergencies.
112 : #[arg(long)]
113 : scheduling: Option<ShardSchedulingPolicyArg>,
114 : },
115 : /// List nodes known to the storage controller
116 : Nodes {},
117 : /// List soft deleted nodes known to the storage controller
118 : NodeTombstones {},
119 : /// List tenants known to the storage controller
120 : Tenants {
121 : /// If this field is set, it will list the tenants on a specific node
122 : node_id: Option<NodeId>,
123 : },
124 : /// Create a new tenant in the storage controller, and by extension on pageservers.
125 : TenantCreate {
126 : #[arg(long)]
127 : tenant_id: TenantId,
128 : },
129 : /// Delete a tenant in the storage controller, and by extension on pageservers.
130 : TenantDelete {
131 : #[arg(long)]
132 : tenant_id: TenantId,
133 : },
134 : /// Split an existing tenant into a higher number of shards than its current shard count.
135 : TenantShardSplit {
136 : #[arg(long)]
137 : tenant_id: TenantId,
138 : #[arg(long)]
139 : shard_count: u8,
140 : /// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes.
141 : #[arg(long)]
142 : stripe_size: Option<u32>,
143 : },
144 : /// Migrate the attached location for a tenant shard to a specific pageserver.
145 : TenantShardMigrate {
146 : #[arg(long)]
147 : tenant_shard_id: TenantShardId,
148 : #[arg(long)]
149 : node: NodeId,
150 : #[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
151 : prewarm: bool,
152 : #[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
153 : override_scheduler: bool,
154 : },
155 : /// Watch the location of a tenant shard evolve, e.g. while expecting it to migrate
156 : TenantShardWatch {
157 : #[arg(long)]
158 : tenant_shard_id: TenantShardId,
159 : },
160 : /// Migrate the secondary location for a tenant shard to a specific pageserver.
161 : TenantShardMigrateSecondary {
162 : #[arg(long)]
163 : tenant_shard_id: TenantShardId,
164 : #[arg(long)]
165 : node: NodeId,
166 : },
167 : /// Cancel any ongoing reconciliation for this shard
168 : TenantShardCancelReconcile {
169 : #[arg(long)]
170 : tenant_shard_id: TenantShardId,
171 : },
172 : /// Set the pageserver tenant configuration of a tenant: this is the configuration structure
173 : /// that is passed through to pageservers, and does not affect storage controller behavior.
174 : /// Any previous tenant configs are overwritten.
175 : SetTenantConfig {
176 : #[arg(long)]
177 : tenant_id: TenantId,
178 : #[arg(long)]
179 : config: String,
180 : },
181 : /// Patch the pageserver tenant configuration of a tenant. Any fields with null values in the
182 : /// provided JSON are unset from the tenant config and all fields with non-null values are set.
183 : /// Unspecified fields are not changed.
184 : PatchTenantConfig {
185 : #[arg(long)]
186 : tenant_id: TenantId,
187 : #[arg(long)]
188 : config: String,
189 : },
190 : /// Print details about a particular tenant, including all its shards' states.
191 : TenantDescribe {
192 : #[arg(long)]
193 : tenant_id: TenantId,
194 : },
195 : TenantSetPreferredAz {
196 : #[arg(long)]
197 : tenant_id: TenantId,
198 : #[arg(long)]
199 : preferred_az: Option<String>,
200 : },
201 : /// Uncleanly drop a tenant from the storage controller: this doesn't delete anything from pageservers. Appropriate
202 : /// if you e.g. used `tenant-warmup` by mistake on a tenant ID that doesn't really exist, or is in some other region.
203 : TenantDrop {
204 : #[arg(long)]
205 : tenant_id: TenantId,
206 : #[arg(long)]
207 : unclean: bool,
208 : },
209 : NodeDrop {
210 : #[arg(long)]
211 : node_id: NodeId,
212 : #[arg(long)]
213 : unclean: bool,
214 : },
215 : TenantSetTimeBasedEviction {
216 : #[arg(long)]
217 : tenant_id: TenantId,
218 : #[arg(long)]
219 : period: humantime::Duration,
220 : #[arg(long)]
221 : threshold: humantime::Duration,
222 : },
223 : // Migrate away from a set of specified pageservers by moving the primary attachments to pageservers
224 : // outside of the specified set.
225 : BulkMigrate {
226 : // Set of pageserver node ids to drain.
227 : #[arg(long)]
228 : nodes: Vec<NodeId>,
229 : // Optional: migration concurrency (default is 8)
230 : #[arg(long)]
231 : concurrency: Option<usize>,
232 : // Optional: maximum number of shards to migrate
233 : #[arg(long)]
234 : max_shards: Option<usize>,
235 : // Optional: when set to true, nothing is migrated, but the plan is printed to stdout
236 : #[arg(long)]
237 : dry_run: Option<bool>,
238 : },
239 : /// Start draining the specified pageserver.
240 : /// The drain is complete when the schedulling policy returns to active.
241 : StartDrain {
242 : #[arg(long)]
243 : node_id: NodeId,
244 : },
245 : /// Cancel draining the specified pageserver and wait for `timeout`
246 : /// for the operation to be canceled. May be retried.
247 : CancelDrain {
248 : #[arg(long)]
249 : node_id: NodeId,
250 : #[arg(long)]
251 : timeout: humantime::Duration,
252 : },
253 : /// Start filling the specified pageserver.
254 : /// The drain is complete when the schedulling policy returns to active.
255 : StartFill {
256 : #[arg(long)]
257 : node_id: NodeId,
258 : },
259 : /// Cancel filling the specified pageserver and wait for `timeout`
260 : /// for the operation to be canceled. May be retried.
261 : CancelFill {
262 : #[arg(long)]
263 : node_id: NodeId,
264 : #[arg(long)]
265 : timeout: humantime::Duration,
266 : },
267 : /// List safekeepers known to the storage controller
268 : Safekeepers {},
269 : /// Set the scheduling policy of the specified safekeeper
270 : SafekeeperScheduling {
271 : #[arg(long)]
272 : node_id: NodeId,
273 : #[arg(long)]
274 : scheduling_policy: SkSchedulingPolicyArg,
275 : },
276 : /// Downloads any missing heatmap layers for all shard for a given timeline
277 : DownloadHeatmapLayers {
278 : /// Tenant ID or tenant shard ID. When an unsharded tenant ID is specified,
279 : /// the operation is performed on all shards. When a sharded tenant ID is
280 : /// specified, the operation is only performed on the specified shard.
281 : #[arg(long)]
282 : tenant_shard_id: TenantShardId,
283 : #[arg(long)]
284 : timeline_id: TimelineId,
285 : /// Optional: Maximum download concurrency (default is 16)
286 : #[arg(long)]
287 : concurrency: Option<usize>,
288 : },
289 : /// Locate safekeepers for a timeline from the storcon DB.
290 : TimelineLocate {
291 : #[arg(long)]
292 : tenant_id: TenantId,
293 : #[arg(long)]
294 : timeline_id: TimelineId,
295 : },
296 : /// Migrate a timeline to a new set of safekeepers
297 : TimelineSafekeeperMigrate {
298 : #[arg(long)]
299 : tenant_id: TenantId,
300 : #[arg(long)]
301 : timeline_id: TimelineId,
302 : /// Example: --new-sk-set 1,2,3
303 : #[arg(long, required = true, value_delimiter = ',')]
304 : new_sk_set: Vec<NodeId>,
305 : },
306 : /// Abort ongoing safekeeper migration.
307 : TimelineSafekeeperMigrateAbort {
308 : #[arg(long)]
309 : tenant_id: TenantId,
310 : #[arg(long)]
311 : timeline_id: TimelineId,
312 : },
313 : }
314 :
315 : #[derive(Parser)]
316 : #[command(
317 : author,
318 : version,
319 : about,
320 : long_about = "CLI for Storage Controller Support/Debug"
321 : )]
322 : #[command(arg_required_else_help(true))]
323 : struct Cli {
324 : #[arg(long)]
325 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
326 : api: Url,
327 :
328 : #[arg(long)]
329 : /// JWT token for authenticating with storage controller. Depending on the API used, this
330 : /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
331 : /// a token with both scopes to use with this tool.
332 : jwt: Option<String>,
333 :
334 : #[arg(long)]
335 : /// Trusted root CA certificates to use in https APIs.
336 : ssl_ca_file: Option<PathBuf>,
337 :
338 : #[command(subcommand)]
339 : command: Command,
340 : }
341 :
342 : #[derive(Debug, Clone)]
343 : struct PlacementPolicyArg(PlacementPolicy);
344 :
345 : impl FromStr for PlacementPolicyArg {
346 : type Err = anyhow::Error;
347 :
348 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
349 0 : match s {
350 0 : "detached" => Ok(Self(PlacementPolicy::Detached)),
351 0 : "secondary" => Ok(Self(PlacementPolicy::Secondary)),
352 0 : _ if s.starts_with("attached:") => {
353 0 : let mut splitter = s.split(':');
354 0 : let _prefix = splitter.next().unwrap();
355 0 : match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
356 0 : Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
357 0 : None => Err(anyhow::anyhow!(
358 0 : "Invalid format '{s}', a valid example is 'attached:1'"
359 0 : )),
360 : }
361 : }
362 0 : _ => Err(anyhow::anyhow!(
363 0 : "Unknown placement policy '{s}', try detached,secondary,attached:<n>"
364 0 : )),
365 : }
366 0 : }
367 : }
368 :
369 : #[derive(Debug, Clone)]
370 : struct SkSchedulingPolicyArg(SkSchedulingPolicy);
371 :
372 : impl FromStr for SkSchedulingPolicyArg {
373 : type Err = anyhow::Error;
374 :
375 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
376 0 : SkSchedulingPolicy::from_str(s).map(Self)
377 0 : }
378 : }
379 :
380 : #[derive(Debug, Clone)]
381 : struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
382 :
383 : impl FromStr for ShardSchedulingPolicyArg {
384 : type Err = anyhow::Error;
385 :
386 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
387 0 : match s {
388 0 : "active" => Ok(Self(ShardSchedulingPolicy::Active)),
389 0 : "essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
390 0 : "pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
391 0 : "stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
392 0 : _ => Err(anyhow::anyhow!(
393 0 : "Unknown scheduling policy '{s}', try active,essential,pause,stop"
394 0 : )),
395 : }
396 0 : }
397 : }
398 :
399 : #[derive(Debug, Clone)]
400 : struct NodeAvailabilityArg(NodeAvailabilityWrapper);
401 :
402 : impl FromStr for NodeAvailabilityArg {
403 : type Err = anyhow::Error;
404 :
405 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
406 0 : match s {
407 0 : "active" => Ok(Self(NodeAvailabilityWrapper::Active)),
408 0 : "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
409 0 : _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
410 : }
411 0 : }
412 : }
413 :
414 0 : async fn wait_for_scheduling_policy<F>(
415 0 : client: Client,
416 0 : node_id: NodeId,
417 0 : timeout: Duration,
418 0 : f: F,
419 0 : ) -> anyhow::Result<NodeSchedulingPolicy>
420 0 : where
421 0 : F: Fn(NodeSchedulingPolicy) -> bool,
422 0 : {
423 0 : let waiter = tokio::time::timeout(timeout, async move {
424 : loop {
425 0 : let node = client
426 0 : .dispatch::<(), NodeDescribeResponse>(
427 0 : Method::GET,
428 0 : format!("control/v1/node/{node_id}"),
429 0 : None,
430 0 : )
431 0 : .await?;
432 :
433 0 : if f(node.scheduling) {
434 0 : return Ok::<NodeSchedulingPolicy, mgmt_api::Error>(node.scheduling);
435 0 : }
436 : }
437 0 : });
438 :
439 0 : Ok(waiter.await??)
440 0 : }
441 :
442 : #[tokio::main]
443 0 : async fn main() -> anyhow::Result<()> {
444 0 : let cli = Cli::parse();
445 :
446 0 : let ssl_ca_certs = match &cli.ssl_ca_file {
447 0 : Some(ssl_ca_file) => {
448 0 : let buf = tokio::fs::read(ssl_ca_file).await?;
449 0 : Certificate::from_pem_bundle(&buf)?
450 : }
451 0 : None => Vec::new(),
452 : };
453 :
454 0 : let mut http_client = reqwest::Client::builder();
455 0 : for ssl_ca_cert in ssl_ca_certs {
456 0 : http_client = http_client.add_root_certificate(ssl_ca_cert);
457 0 : }
458 0 : let http_client = http_client.build()?;
459 :
460 0 : let storcon_client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
461 :
462 0 : let mut trimmed = cli.api.to_string();
463 0 : trimmed.pop();
464 0 : let vps_client = mgmt_api::Client::new(http_client.clone(), trimmed, cli.jwt.as_deref());
465 :
466 0 : match cli.command {
467 0 : Command::NodeRegister {
468 0 : node_id,
469 0 : listen_pg_addr,
470 0 : listen_pg_port,
471 0 : listen_grpc_addr,
472 0 : listen_grpc_port,
473 0 : listen_http_addr,
474 0 : listen_http_port,
475 0 : listen_https_port,
476 0 : availability_zone_id,
477 0 : } => {
478 0 : storcon_client
479 0 : .dispatch::<_, ()>(
480 0 : Method::POST,
481 0 : "control/v1/node".to_string(),
482 0 : Some(NodeRegisterRequest {
483 0 : node_id,
484 0 : listen_pg_addr,
485 0 : listen_pg_port,
486 0 : listen_grpc_addr,
487 0 : listen_grpc_port,
488 0 : listen_http_addr,
489 0 : listen_http_port,
490 0 : listen_https_port,
491 0 : availability_zone_id: AvailabilityZone(availability_zone_id),
492 0 : node_ip_addr: None,
493 0 : }),
494 0 : )
495 0 : .await?;
496 0 : }
497 0 : Command::TenantCreate { tenant_id } => {
498 0 : storcon_client
499 0 : .dispatch::<_, ()>(
500 0 : Method::POST,
501 0 : "v1/tenant".to_string(),
502 0 : Some(TenantCreateRequest {
503 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
504 0 : generation: None,
505 0 : shard_parameters: ShardParameters::default(),
506 0 : placement_policy: Some(PlacementPolicy::Attached(1)),
507 0 : config: TenantConfig::default(),
508 0 : }),
509 0 : )
510 0 : .await?;
511 0 : }
512 0 : Command::TenantDelete { tenant_id } => {
513 0 : let status = vps_client
514 0 : .tenant_delete(TenantShardId::unsharded(tenant_id))
515 0 : .await?;
516 0 : tracing::info!("Delete status: {}", status);
517 0 : }
518 0 : Command::Nodes {} => {
519 0 : let mut resp = storcon_client
520 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
521 0 : Method::GET,
522 0 : "control/v1/node".to_string(),
523 0 : None,
524 0 : )
525 0 : .await?;
526 0 :
527 0 : resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
528 0 :
529 0 : let mut table = comfy_table::Table::new();
530 0 : table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
531 0 : for node in resp {
532 0 : table.add_row([
533 0 : format!("{}", node.id),
534 0 : node.listen_http_addr,
535 0 : node.availability_zone_id,
536 0 : format!("{:?}", node.scheduling),
537 0 : format!("{:?}", node.availability),
538 0 : ]);
539 0 : }
540 0 : println!("{table}");
541 0 : }
542 0 : Command::NodeConfigure {
543 0 : node_id,
544 0 : availability,
545 0 : scheduling,
546 0 : } => {
547 0 : let req = NodeConfigureRequest {
548 0 : node_id,
549 0 : availability: availability.map(|a| a.0),
550 0 : scheduling,
551 0 : };
552 0 : storcon_client
553 0 : .dispatch::<_, ()>(
554 0 : Method::PUT,
555 0 : format!("control/v1/node/{node_id}/config"),
556 0 : Some(req),
557 0 : )
558 0 : .await?;
559 0 : }
560 0 : Command::Tenants {
561 0 : node_id: Some(node_id),
562 0 : } => {
563 0 : let describe_response = storcon_client
564 0 : .dispatch::<(), NodeShardResponse>(
565 0 : Method::GET,
566 0 : format!("control/v1/node/{node_id}/shards"),
567 0 : None,
568 0 : )
569 0 : .await?;
570 0 : let shards = describe_response.shards;
571 0 : let mut table = comfy_table::Table::new();
572 0 : table.set_header([
573 0 : "Shard",
574 0 : "Intended Primary/Secondary",
575 0 : "Observed Primary/Secondary",
576 0 : ]);
577 0 : for shard in shards {
578 0 : table.add_row([
579 0 : format!("{}", shard.tenant_shard_id),
580 0 : match shard.is_intended_secondary {
581 0 : None => "".to_string(),
582 0 : Some(true) => "Secondary".to_string(),
583 0 : Some(false) => "Primary".to_string(),
584 0 : },
585 0 : match shard.is_observed_secondary {
586 0 : None => "".to_string(),
587 0 : Some(true) => "Secondary".to_string(),
588 0 : Some(false) => "Primary".to_string(),
589 0 : },
590 0 : ]);
591 0 : }
592 0 : println!("{table}");
593 0 : }
594 0 : Command::Tenants { node_id: None } => {
595 0 : // Set up output formatting
596 0 : let mut table = comfy_table::Table::new();
597 0 : table.set_header([
598 0 : "TenantId",
599 0 : "Preferred AZ",
600 0 : "ShardCount",
601 0 : "StripeSize",
602 0 : "Placement",
603 0 : "Scheduling",
604 0 : ]);
605 0 :
606 0 : // Pagination loop over listing API
607 0 : let mut start_after = None;
608 0 : const LIMIT: usize = 1000;
609 0 : loop {
610 0 : let path = match start_after {
611 0 : None => format!("control/v1/tenant?limit={LIMIT}"),
612 0 : Some(start_after) => {
613 0 : format!("control/v1/tenant?limit={LIMIT}&start_after={start_after}")
614 0 : }
615 0 : };
616 0 :
617 0 : let resp = storcon_client
618 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(Method::GET, path, None)
619 0 : .await?;
620 0 :
621 0 : if resp.is_empty() {
622 0 : // End of data reached
623 0 : break;
624 0 : }
625 0 :
626 0 : // Give some visual feedback while we're building up the table (comfy_table doesn't have
627 0 : // streaming output)
628 0 : if resp.len() >= LIMIT {
629 0 : eprint!(".");
630 0 : }
631 0 :
632 0 : start_after = Some(resp.last().unwrap().tenant_id);
633 0 :
634 0 : for tenant in resp {
635 0 : let shard_zero = tenant.shards.into_iter().next().unwrap();
636 0 : table.add_row([
637 0 : format!("{}", tenant.tenant_id),
638 0 : shard_zero
639 0 : .preferred_az_id
640 0 : .as_ref()
641 0 : .cloned()
642 0 : .unwrap_or("".to_string()),
643 0 : format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
644 0 : format!("{:?}", tenant.stripe_size),
645 0 : format!("{:?}", tenant.policy),
646 0 : format!("{:?}", shard_zero.scheduling_policy),
647 0 : ]);
648 0 : }
649 0 : }
650 0 :
651 0 : // Terminate progress dots
652 0 : if table.row_count() > LIMIT {
653 0 : eprint!("");
654 0 : }
655 0 :
656 0 : println!("{table}");
657 0 : }
658 0 : Command::TenantPolicy {
659 0 : tenant_id,
660 0 : placement,
661 0 : scheduling,
662 0 : } => {
663 0 : let req = TenantPolicyRequest {
664 0 : scheduling: scheduling.map(|s| s.0),
665 0 : placement: placement.map(|p| p.0),
666 0 : };
667 0 : storcon_client
668 0 : .dispatch::<_, ()>(
669 0 : Method::PUT,
670 0 : format!("control/v1/tenant/{tenant_id}/policy"),
671 0 : Some(req),
672 0 : )
673 0 : .await?;
674 0 : }
675 0 : Command::TenantShardSplit {
676 0 : tenant_id,
677 0 : shard_count,
678 0 : stripe_size,
679 0 : } => {
680 0 : let req = TenantShardSplitRequest {
681 0 : new_shard_count: shard_count,
682 0 : new_stripe_size: stripe_size.map(ShardStripeSize),
683 0 : };
684 0 :
685 0 : let response = storcon_client
686 0 : .dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
687 0 : Method::PUT,
688 0 : format!("control/v1/tenant/{tenant_id}/shard_split"),
689 0 : Some(req),
690 0 : )
691 0 : .await?;
692 0 : println!(
693 0 : "Split tenant {} into {} shards: {}",
694 0 : tenant_id,
695 0 : shard_count,
696 0 : response
697 0 : .new_shards
698 0 : .iter()
699 0 : .map(|s| format!("{s:?}"))
700 0 : .collect::<Vec<_>>()
701 0 : .join(",")
702 0 : );
703 0 : }
704 0 : Command::TenantShardMigrate {
705 0 : tenant_shard_id,
706 0 : node,
707 0 : prewarm,
708 0 : override_scheduler,
709 0 : } => {
710 0 : let migration_config = MigrationConfig {
711 0 : prewarm,
712 0 : override_scheduler,
713 0 : ..Default::default()
714 0 : };
715 0 :
716 0 : let req = TenantShardMigrateRequest {
717 0 : node_id: node,
718 0 : origin_node_id: None,
719 0 : migration_config,
720 0 : };
721 0 :
722 0 : match storcon_client
723 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
724 0 : Method::PUT,
725 0 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
726 0 : Some(req),
727 0 : )
728 0 : .await
729 0 : {
730 0 : Err(mgmt_api::Error::ApiError(StatusCode::PRECONDITION_FAILED, msg)) => {
731 0 : anyhow::bail!(
732 0 : "Migration to {node} rejected, may require `--force` ({}) ",
733 0 : msg
734 0 : );
735 0 : }
736 0 : Err(e) => return Err(e.into()),
737 0 : Ok(_) => {}
738 0 : }
739 0 :
740 0 : watch_tenant_shard(storcon_client, tenant_shard_id, Some(node)).await?;
741 0 : }
742 0 : Command::TenantShardWatch { tenant_shard_id } => {
743 0 : watch_tenant_shard(storcon_client, tenant_shard_id, None).await?;
744 0 : }
745 0 : Command::TenantShardMigrateSecondary {
746 0 : tenant_shard_id,
747 0 : node,
748 0 : } => {
749 0 : let req = TenantShardMigrateRequest {
750 0 : node_id: node,
751 0 : origin_node_id: None,
752 0 : migration_config: MigrationConfig::default(),
753 0 : };
754 0 :
755 0 : storcon_client
756 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
757 0 : Method::PUT,
758 0 : format!("control/v1/tenant/{tenant_shard_id}/migrate_secondary"),
759 0 : Some(req),
760 0 : )
761 0 : .await?;
762 0 : }
763 0 : Command::TenantShardCancelReconcile { tenant_shard_id } => {
764 0 : storcon_client
765 0 : .dispatch::<(), ()>(
766 0 : Method::PUT,
767 0 : format!("control/v1/tenant/{tenant_shard_id}/cancel_reconcile"),
768 0 : None,
769 0 : )
770 0 : .await?;
771 0 : }
772 0 : Command::SetTenantConfig { tenant_id, config } => {
773 0 : let tenant_conf = serde_json::from_str(&config)?;
774 0 :
775 0 : vps_client
776 0 : .set_tenant_config(&TenantConfigRequest {
777 0 : tenant_id,
778 0 : config: tenant_conf,
779 0 : })
780 0 : .await?;
781 0 : }
782 0 : Command::PatchTenantConfig { tenant_id, config } => {
783 0 : let tenant_conf = serde_json::from_str(&config)?;
784 0 :
785 0 : vps_client
786 0 : .patch_tenant_config(&TenantConfigPatchRequest {
787 0 : tenant_id,
788 0 : config: tenant_conf,
789 0 : })
790 0 : .await?;
791 0 : }
792 0 : Command::TenantDescribe { tenant_id } => {
793 0 : let TenantDescribeResponse {
794 0 : tenant_id,
795 0 : shards,
796 0 : stripe_size,
797 0 : policy,
798 0 : config,
799 0 : } = storcon_client
800 0 : .dispatch::<(), TenantDescribeResponse>(
801 0 : Method::GET,
802 0 : format!("control/v1/tenant/{tenant_id}"),
803 0 : None,
804 0 : )
805 0 : .await?;
806 0 :
807 0 : let nodes = storcon_client
808 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
809 0 : Method::GET,
810 0 : "control/v1/node".to_string(),
811 0 : None,
812 0 : )
813 0 : .await?;
814 0 : let nodes = nodes
815 0 : .into_iter()
816 0 : .map(|n| (n.id, n))
817 0 : .collect::<HashMap<_, _>>();
818 0 :
819 0 : println!("Tenant {tenant_id}");
820 0 : let mut table = comfy_table::Table::new();
821 0 : table.add_row(["Policy", &format!("{policy:?}")]);
822 0 : table.add_row(["Stripe size", &format!("{stripe_size:?}")]);
823 0 : table.add_row(["Config", &serde_json::to_string_pretty(&config).unwrap()]);
824 0 : println!("{table}");
825 0 : println!("Shards:");
826 0 : let mut table = comfy_table::Table::new();
827 0 : table.set_header([
828 0 : "Shard",
829 0 : "Attached",
830 0 : "Attached AZ",
831 0 : "Secondary",
832 0 : "Last error",
833 0 : "status",
834 0 : ]);
835 0 : for shard in shards {
836 0 : let secondary = shard
837 0 : .node_secondary
838 0 : .iter()
839 0 : .map(|n| format!("{n}"))
840 0 : .collect::<Vec<_>>()
841 0 : .join(",");
842 0 :
843 0 : let mut status_parts = Vec::new();
844 0 : if shard.is_reconciling {
845 0 : status_parts.push("reconciling");
846 0 : }
847 0 :
848 0 : if shard.is_pending_compute_notification {
849 0 : status_parts.push("pending_compute");
850 0 : }
851 0 :
852 0 : if shard.is_splitting {
853 0 : status_parts.push("splitting");
854 0 : }
855 0 : let status = status_parts.join(",");
856 0 :
857 0 : let attached_node = shard
858 0 : .node_attached
859 0 : .as_ref()
860 0 : .map(|id| nodes.get(id).expect("Shard references nonexistent node"));
861 0 :
862 0 : table.add_row([
863 0 : format!("{}", shard.tenant_shard_id),
864 0 : attached_node
865 0 : .map(|n| format!("{} ({})", n.listen_http_addr, n.id))
866 0 : .unwrap_or(String::new()),
867 0 : attached_node
868 0 : .map(|n| n.availability_zone_id.clone())
869 0 : .unwrap_or(String::new()),
870 0 : secondary,
871 0 : shard.last_error,
872 0 : status,
873 0 : ]);
874 0 : }
875 0 : println!("{table}");
876 0 : }
877 0 : Command::TenantSetPreferredAz {
878 0 : tenant_id,
879 0 : preferred_az,
880 0 : } => {
881 0 : // First learn about the tenant's shards
882 0 : let describe_response = storcon_client
883 0 : .dispatch::<(), TenantDescribeResponse>(
884 0 : Method::GET,
885 0 : format!("control/v1/tenant/{tenant_id}"),
886 0 : None,
887 0 : )
888 0 : .await?;
889 0 :
890 0 : // Learn about nodes to validate the AZ ID
891 0 : let nodes = storcon_client
892 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
893 0 : Method::GET,
894 0 : "control/v1/node".to_string(),
895 0 : None,
896 0 : )
897 0 : .await?;
898 0 :
899 0 : if let Some(preferred_az) = &preferred_az {
900 0 : let azs = nodes
901 0 : .into_iter()
902 0 : .map(|n| (n.availability_zone_id))
903 0 : .collect::<HashSet<_>>();
904 0 : if !azs.contains(preferred_az) {
905 0 : anyhow::bail!(
906 0 : "AZ {} not found on any node: known AZs are: {:?}",
907 0 : preferred_az,
908 0 : azs
909 0 : );
910 0 : }
911 0 : } else {
912 0 : // Make it obvious to the user that since they've omitted an AZ, we're clearing it
913 0 : eprintln!("Clearing preferred AZ for tenant {tenant_id}");
914 0 : }
915 0 :
916 0 : // Construct a request that modifies all the tenant's shards
917 0 : let req = ShardsPreferredAzsRequest {
918 0 : preferred_az_ids: describe_response
919 0 : .shards
920 0 : .into_iter()
921 0 : .map(|s| {
922 0 : (
923 0 : s.tenant_shard_id,
924 0 : preferred_az.clone().map(AvailabilityZone),
925 0 : )
926 0 : })
927 0 : .collect(),
928 0 : };
929 0 : storcon_client
930 0 : .dispatch::<ShardsPreferredAzsRequest, ShardsPreferredAzsResponse>(
931 0 : Method::PUT,
932 0 : "control/v1/preferred_azs".to_string(),
933 0 : Some(req),
934 0 : )
935 0 : .await?;
936 0 : }
937 0 : Command::TenantDrop { tenant_id, unclean } => {
938 0 : if !unclean {
939 0 : anyhow::bail!(
940 0 : "This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed."
941 0 : )
942 0 : }
943 0 : storcon_client
944 0 : .dispatch::<(), ()>(
945 0 : Method::POST,
946 0 : format!("debug/v1/tenant/{tenant_id}/drop"),
947 0 : None,
948 0 : )
949 0 : .await?;
950 0 : }
951 0 : Command::NodeDrop { node_id, unclean } => {
952 0 : if !unclean {
953 0 : anyhow::bail!(
954 0 : "This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed."
955 0 : )
956 0 : }
957 0 : storcon_client
958 0 : .dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
959 0 : .await?;
960 0 : }
961 0 : Command::NodeDelete { node_id } => {
962 0 : eprintln!("Warning: This command is obsolete and will be removed in a future version");
963 0 : eprintln!("Use `NodeStartDelete` instead, if possible");
964 0 : storcon_client
965 0 : .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
966 0 : .await?;
967 0 : }
968 0 : Command::NodeStartDelete { node_id, force } => {
969 0 : let query = if force {
970 0 : format!("control/v1/node/{node_id}/delete?force=true")
971 0 : } else {
972 0 : format!("control/v1/node/{node_id}/delete")
973 0 : };
974 0 : storcon_client
975 0 : .dispatch::<(), ()>(Method::PUT, query, None)
976 0 : .await?;
977 0 : println!("Delete started for {node_id}");
978 0 : }
979 0 : Command::NodeCancelDelete { node_id, timeout } => {
980 0 : storcon_client
981 0 : .dispatch::<(), ()>(
982 0 : Method::DELETE,
983 0 : format!("control/v1/node/{node_id}/delete"),
984 0 : None,
985 0 : )
986 0 : .await?;
987 0 :
988 0 : println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
989 0 :
990 0 : let final_policy =
991 0 : wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
992 0 : !matches!(sched, NodeSchedulingPolicy::Deleting)
993 0 : })
994 0 : .await?;
995 0 :
996 0 : println!(
997 0 : "Delete was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
998 0 : );
999 0 : }
1000 0 : Command::NodeDeleteTombstone { node_id } => {
1001 0 : storcon_client
1002 0 : .dispatch::<(), ()>(
1003 0 : Method::DELETE,
1004 0 : format!("debug/v1/tombstone/{node_id}"),
1005 0 : None,
1006 0 : )
1007 0 : .await?;
1008 0 : }
1009 0 : Command::NodeTombstones {} => {
1010 0 : let mut resp = storcon_client
1011 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
1012 0 : Method::GET,
1013 0 : "debug/v1/tombstone".to_string(),
1014 0 : None,
1015 0 : )
1016 0 : .await?;
1017 0 :
1018 0 : resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
1019 0 :
1020 0 : let mut table = comfy_table::Table::new();
1021 0 : table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
1022 0 : for node in resp {
1023 0 : table.add_row([
1024 0 : format!("{}", node.id),
1025 0 : node.listen_http_addr,
1026 0 : node.availability_zone_id,
1027 0 : format!("{:?}", node.scheduling),
1028 0 : format!("{:?}", node.availability),
1029 0 : ]);
1030 0 : }
1031 0 : println!("{table}");
1032 0 : }
1033 0 : Command::TenantSetTimeBasedEviction {
1034 0 : tenant_id,
1035 0 : period,
1036 0 : threshold,
1037 0 : } => {
1038 0 : vps_client
1039 0 : .set_tenant_config(&TenantConfigRequest {
1040 0 : tenant_id,
1041 0 : config: TenantConfig {
1042 0 : eviction_policy: Some(EvictionPolicy::LayerAccessThreshold(
1043 0 : EvictionPolicyLayerAccessThreshold {
1044 0 : period: period.into(),
1045 0 : threshold: threshold.into(),
1046 0 : },
1047 0 : )),
1048 0 : heatmap_period: Some(Duration::from_secs(300)),
1049 0 : ..Default::default()
1050 0 : },
1051 0 : })
1052 0 : .await?;
1053 0 : }
1054 0 : Command::BulkMigrate {
1055 0 : nodes,
1056 0 : concurrency,
1057 0 : max_shards,
1058 0 : dry_run,
1059 0 : } => {
1060 0 : // Load the list of nodes, split them up into the drained and filled sets,
1061 0 : // and validate that draining is possible.
1062 0 : let node_descs = storcon_client
1063 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
1064 0 : Method::GET,
1065 0 : "control/v1/node".to_string(),
1066 0 : None,
1067 0 : )
1068 0 : .await?;
1069 0 :
1070 0 : let mut node_to_drain_descs = Vec::new();
1071 0 : let mut node_to_fill_descs = Vec::new();
1072 0 :
1073 0 : for desc in node_descs {
1074 0 : let to_drain = nodes.contains(&desc.id);
1075 0 : if to_drain {
1076 0 : node_to_drain_descs.push(desc);
1077 0 : } else {
1078 0 : node_to_fill_descs.push(desc);
1079 0 : }
1080 0 : }
1081 0 :
1082 0 : if nodes.len() != node_to_drain_descs.len() {
1083 0 : anyhow::bail!("Bulk migration requested away from node which doesn't exist.")
1084 0 : }
1085 0 :
1086 0 : node_to_fill_descs.retain(|desc| {
1087 0 : matches!(desc.availability, NodeAvailabilityWrapper::Active)
1088 0 : && matches!(
1089 0 : desc.scheduling,
1090 0 : NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
1091 0 : )
1092 0 : });
1093 0 :
1094 0 : if node_to_fill_descs.is_empty() {
1095 0 : anyhow::bail!("There are no nodes to migrate to")
1096 0 : }
1097 0 :
1098 0 : // Set the node scheduling policy to draining for the nodes which
1099 0 : // we plan to drain.
1100 0 : for node_desc in node_to_drain_descs.iter() {
1101 0 : let req = NodeConfigureRequest {
1102 0 : node_id: node_desc.id,
1103 0 : availability: None,
1104 0 : scheduling: Some(NodeSchedulingPolicy::Draining),
1105 0 : };
1106 0 :
1107 0 : storcon_client
1108 0 : .dispatch::<_, ()>(
1109 0 : Method::PUT,
1110 0 : format!("control/v1/node/{}/config", node_desc.id),
1111 0 : Some(req),
1112 0 : )
1113 0 : .await?;
1114 0 : }
1115 0 :
1116 0 : // Perform the migration: move each tenant shard scheduled on a node to
1117 0 : // be drained to a node which is being filled. A simple round robin
1118 0 : // strategy is used to pick the new node.
1119 0 : let tenants = storcon_client
1120 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(
1121 0 : Method::GET,
1122 0 : "control/v1/tenant".to_string(),
1123 0 : None,
1124 0 : )
1125 0 : .await?;
1126 0 :
1127 0 : let mut selected_node_idx = 0;
1128 0 :
1129 0 : struct MigrationMove {
1130 0 : tenant_shard_id: TenantShardId,
1131 0 : from: NodeId,
1132 0 : to: NodeId,
1133 0 : }
1134 0 :
1135 0 : let mut moves: Vec<MigrationMove> = Vec::new();
1136 0 :
1137 0 : let shards = tenants
1138 0 : .into_iter()
1139 0 : .flat_map(|tenant| tenant.shards.into_iter());
1140 0 : for shard in shards {
1141 0 : if let Some(max_shards) = max_shards {
1142 0 : if moves.len() >= max_shards {
1143 0 : println!(
1144 0 : "Stop planning shard moves since the requested maximum was reached"
1145 0 : );
1146 0 : break;
1147 0 : }
1148 0 : }
1149 0 :
1150 0 : let should_migrate = {
1151 0 : if let Some(attached_to) = shard.node_attached {
1152 0 : node_to_drain_descs
1153 0 : .iter()
1154 0 : .map(|desc| desc.id)
1155 0 : .any(|id| id == attached_to)
1156 0 : } else {
1157 0 : false
1158 0 : }
1159 0 : };
1160 0 :
1161 0 : if !should_migrate {
1162 0 : continue;
1163 0 : }
1164 0 :
1165 0 : moves.push(MigrationMove {
1166 0 : tenant_shard_id: shard.tenant_shard_id,
1167 0 : from: shard
1168 0 : .node_attached
1169 0 : .expect("We only migrate attached tenant shards"),
1170 0 : to: node_to_fill_descs[selected_node_idx].id,
1171 0 : });
1172 0 : selected_node_idx = (selected_node_idx + 1) % node_to_fill_descs.len();
1173 0 : }
1174 0 :
1175 0 : let total_moves = moves.len();
1176 0 :
1177 0 : if dry_run == Some(true) {
1178 0 : println!("Dryrun requested. Planned {total_moves} moves:");
1179 0 : for mv in &moves {
1180 0 : println!("{}: {} -> {}", mv.tenant_shard_id, mv.from, mv.to)
1181 0 : }
1182 0 :
1183 0 : return Ok(());
1184 0 : }
1185 0 :
1186 0 : const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
1187 0 : let mut stream = futures::stream::iter(moves)
1188 0 : .map(|mv| {
1189 0 : let client = Client::new(http_client.clone(), cli.api.clone(), cli.jwt.clone());
1190 0 : async move {
1191 0 : client
1192 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
1193 0 : Method::PUT,
1194 0 : format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
1195 0 : Some(TenantShardMigrateRequest {
1196 0 : node_id: mv.to,
1197 0 : origin_node_id: Some(mv.from),
1198 0 : migration_config: MigrationConfig::default(),
1199 0 : }),
1200 0 : )
1201 0 : .await
1202 0 : .map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
1203 0 : }
1204 0 : })
1205 0 : .buffered(concurrency.unwrap_or(DEFAULT_MIGRATE_CONCURRENCY));
1206 0 :
1207 0 : let mut success = 0;
1208 0 : let mut failure = 0;
1209 0 :
1210 0 : while let Some(res) = stream.next().await {
1211 0 : match res {
1212 0 : Ok(_) => {
1213 0 : success += 1;
1214 0 : }
1215 0 : Err((tenant_shard_id, from, to, error)) => {
1216 0 : failure += 1;
1217 0 : println!(
1218 0 : "Failed to migrate {tenant_shard_id} from node {from} to node {to}: {error}"
1219 0 : );
1220 0 : }
1221 0 : }
1222 0 :
1223 0 : if (success + failure) % 20 == 0 {
1224 0 : println!(
1225 0 : "Processed {}/{} shards: {} succeeded, {} failed",
1226 0 : success + failure,
1227 0 : total_moves,
1228 0 : success,
1229 0 : failure
1230 0 : );
1231 0 : }
1232 0 : }
1233 0 :
1234 0 : println!(
1235 0 : "Processed {}/{} shards: {} succeeded, {} failed",
1236 0 : success + failure,
1237 0 : total_moves,
1238 0 : success,
1239 0 : failure
1240 0 : );
1241 0 : }
1242 0 : Command::StartDrain { node_id } => {
1243 0 : storcon_client
1244 0 : .dispatch::<(), ()>(
1245 0 : Method::PUT,
1246 0 : format!("control/v1/node/{node_id}/drain"),
1247 0 : None,
1248 0 : )
1249 0 : .await?;
1250 0 : println!("Drain started for {node_id}");
1251 0 : }
1252 0 : Command::CancelDrain { node_id, timeout } => {
1253 0 : storcon_client
1254 0 : .dispatch::<(), ()>(
1255 0 : Method::DELETE,
1256 0 : format!("control/v1/node/{node_id}/drain"),
1257 0 : None,
1258 0 : )
1259 0 : .await?;
1260 0 :
1261 0 : println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
1262 0 :
1263 0 : let final_policy =
1264 0 : wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
1265 0 : use NodeSchedulingPolicy::*;
1266 0 : matches!(sched, Active | PauseForRestart)
1267 0 : })
1268 0 : .await?;
1269 0 :
1270 0 : println!(
1271 0 : "Drain was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
1272 0 : );
1273 0 : }
1274 0 : Command::StartFill { node_id } => {
1275 0 : storcon_client
1276 0 : .dispatch::<(), ()>(Method::PUT, format!("control/v1/node/{node_id}/fill"), None)
1277 0 : .await?;
1278 0 :
1279 0 : println!("Fill started for {node_id}");
1280 0 : }
1281 0 : Command::CancelFill { node_id, timeout } => {
1282 0 : storcon_client
1283 0 : .dispatch::<(), ()>(
1284 0 : Method::DELETE,
1285 0 : format!("control/v1/node/{node_id}/fill"),
1286 0 : None,
1287 0 : )
1288 0 : .await?;
1289 0 :
1290 0 : println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
1291 0 :
1292 0 : let final_policy =
1293 0 : wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
1294 0 : use NodeSchedulingPolicy::*;
1295 0 : matches!(sched, Active)
1296 0 : })
1297 0 : .await?;
1298 0 :
1299 0 : println!(
1300 0 : "Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
1301 0 : );
1302 0 : }
1303 0 : Command::Safekeepers {} => {
1304 0 : let mut resp = storcon_client
1305 0 : .dispatch::<(), Vec<SafekeeperDescribeResponse>>(
1306 0 : Method::GET,
1307 0 : "control/v1/safekeeper".to_string(),
1308 0 : None,
1309 0 : )
1310 0 : .await?;
1311 0 :
1312 0 : resp.sort_by(|a, b| a.id.cmp(&b.id));
1313 0 :
1314 0 : let mut table = comfy_table::Table::new();
1315 0 : table.set_header([
1316 0 : "Id",
1317 0 : "Version",
1318 0 : "Host",
1319 0 : "Port",
1320 0 : "Http Port",
1321 0 : "AZ Id",
1322 0 : "Scheduling",
1323 0 : ]);
1324 0 : for sk in resp {
1325 0 : table.add_row([
1326 0 : format!("{}", sk.id),
1327 0 : format!("{}", sk.version),
1328 0 : sk.host,
1329 0 : format!("{}", sk.port),
1330 0 : format!("{}", sk.http_port),
1331 0 : sk.availability_zone_id.clone(),
1332 0 : String::from(sk.scheduling_policy),
1333 0 : ]);
1334 0 : }
1335 0 : println!("{table}");
1336 0 : }
1337 0 : Command::SafekeeperScheduling {
1338 0 : node_id,
1339 0 : scheduling_policy,
1340 0 : } => {
1341 0 : let scheduling_policy = scheduling_policy.0;
1342 0 : storcon_client
1343 0 : .dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
1344 0 : Method::POST,
1345 0 : format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
1346 0 : Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
1347 0 : )
1348 0 : .await?;
1349 0 : println!(
1350 0 : "Scheduling policy of {node_id} set to {}",
1351 0 : String::from(scheduling_policy)
1352 0 : );
1353 0 : }
1354 0 : Command::DownloadHeatmapLayers {
1355 0 : tenant_shard_id,
1356 0 : timeline_id,
1357 0 : concurrency,
1358 0 : } => {
1359 0 : let mut path = format!(
1360 0 : "v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
1361 0 : );
1362 0 :
1363 0 : if let Some(c) = concurrency {
1364 0 : path = format!("{path}?concurrency={c}");
1365 0 : }
1366 0 :
1367 0 : storcon_client
1368 0 : .dispatch::<(), ()>(Method::POST, path, None)
1369 0 : .await?;
1370 0 : }
1371 0 : Command::TimelineLocate {
1372 0 : tenant_id,
1373 0 : timeline_id,
1374 0 : } => {
1375 0 : let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate");
1376 0 :
1377 0 : let resp = storcon_client
1378 0 : .dispatch::<(), TimelineLocateResponse>(Method::GET, path, None)
1379 0 : .await?;
1380 0 :
1381 0 : let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
1382 0 : let new_sk_set = resp
1383 0 : .new_sk_set
1384 0 : .as_ref()
1385 0 : .map(|ids| ids.iter().map(|id| id.0 as i64).collect::<Vec<_>>());
1386 0 :
1387 0 : println!("generation = {}", resp.generation);
1388 0 : println!("sk_set = {sk_set:?}");
1389 0 : println!("new_sk_set = {new_sk_set:?}");
1390 0 : }
1391 0 : Command::TimelineSafekeeperMigrate {
1392 0 : tenant_id,
1393 0 : timeline_id,
1394 0 : new_sk_set,
1395 0 : } => {
1396 0 : let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate");
1397 0 :
1398 0 : storcon_client
1399 0 : .dispatch::<_, ()>(
1400 0 : Method::POST,
1401 0 : path,
1402 0 : Some(TimelineSafekeeperMigrateRequest { new_sk_set }),
1403 0 : )
1404 0 : .await?;
1405 0 : }
1406 0 : Command::TimelineSafekeeperMigrateAbort {
1407 0 : tenant_id,
1408 0 : timeline_id,
1409 0 : } => {
1410 0 : let path =
1411 0 : format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort");
1412 0 :
1413 0 : storcon_client
1414 0 : .dispatch::<(), ()>(Method::POST, path, None)
1415 0 : .await?;
1416 0 : }
1417 0 : }
1418 0 :
1419 0 : Ok(())
1420 0 : }
1421 :
1422 : static WATCH_INTERVAL: Duration = Duration::from_secs(5);
1423 :
1424 0 : async fn watch_tenant_shard(
1425 0 : storcon_client: Client,
1426 0 : tenant_shard_id: TenantShardId,
1427 0 : until_migrated_to: Option<NodeId>,
1428 0 : ) -> anyhow::Result<()> {
1429 0 : if let Some(until_migrated_to) = until_migrated_to {
1430 0 : println!(
1431 0 : "Waiting for tenant shard {tenant_shard_id} to be migrated to node {until_migrated_to}"
1432 0 : );
1433 0 : }
1434 :
1435 : loop {
1436 0 : let desc = storcon_client
1437 0 : .dispatch::<(), TenantDescribeResponse>(
1438 0 : Method::GET,
1439 0 : format!("control/v1/tenant/{}", tenant_shard_id.tenant_id),
1440 0 : None,
1441 0 : )
1442 0 : .await?;
1443 :
1444 : // Output the current state of the tenant shard
1445 0 : let shard = desc
1446 0 : .shards
1447 0 : .iter()
1448 0 : .find(|s| s.tenant_shard_id == tenant_shard_id)
1449 0 : .ok_or(anyhow::anyhow!("Tenant shard not found"))?;
1450 0 : let summary = format!(
1451 0 : "attached: {} secondary: {} {}",
1452 0 : shard
1453 0 : .node_attached
1454 0 : .map(|n| format!("{n}"))
1455 0 : .unwrap_or("none".to_string()),
1456 0 : shard
1457 0 : .node_secondary
1458 0 : .iter()
1459 0 : .map(|n| n.to_string())
1460 0 : .collect::<Vec<_>>()
1461 0 : .join(","),
1462 0 : if shard.is_reconciling {
1463 0 : "(reconciler active)"
1464 : } else {
1465 0 : "(reconciler idle)"
1466 : }
1467 : );
1468 0 : println!("{summary}");
1469 :
1470 : // Maybe drop out if we finished migration
1471 0 : if let Some(until_migrated_to) = until_migrated_to {
1472 0 : if shard.node_attached == Some(until_migrated_to) && !shard.is_reconciling {
1473 0 : println!("Tenant shard {tenant_shard_id} is now on node {until_migrated_to}");
1474 0 : break;
1475 0 : }
1476 0 : }
1477 :
1478 0 : tokio::time::sleep(WATCH_INTERVAL).await;
1479 : }
1480 0 : Ok(())
1481 0 : }
|