Line data Source code
1 : use futures::StreamExt;
2 : use std::{collections::HashMap, str::FromStr, time::Duration};
3 :
4 : use clap::{Parser, Subcommand};
5 : use pageserver_api::{
6 : controller_api::{
7 : NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
8 : TenantDescribeResponse, TenantPolicyRequest,
9 : },
10 : models::{
11 : EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
12 : ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
13 : TenantShardSplitRequest, TenantShardSplitResponse,
14 : },
15 : shard::{ShardStripeSize, TenantShardId},
16 : };
17 : use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
18 : use reqwest::{Method, StatusCode, Url};
19 : use serde::{de::DeserializeOwned, Serialize};
20 : use utils::id::{NodeId, TenantId};
21 :
22 : use pageserver_api::controller_api::{
23 : NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
24 : TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
25 : };
26 :
27 0 : #[derive(Subcommand, Debug)]
28 : enum Command {
29 : /// Register a pageserver with the storage controller. This shouldn't usually be necessary,
30 : /// since pageservers auto-register when they start up
31 : NodeRegister {
32 : #[arg(long)]
33 0 : node_id: NodeId,
34 :
35 : #[arg(long)]
36 0 : listen_pg_addr: String,
37 : #[arg(long)]
38 0 : listen_pg_port: u16,
39 :
40 : #[arg(long)]
41 0 : listen_http_addr: String,
42 : #[arg(long)]
43 0 : listen_http_port: u16,
44 : },
45 :
46 : /// Modify a node's configuration in the storage controller
47 : NodeConfigure {
48 : #[arg(long)]
49 0 : node_id: NodeId,
50 :
51 : /// Availability is usually auto-detected based on heartbeats. Set 'offline' here to
52 : /// manually mark a node offline
53 : #[arg(long)]
54 : availability: Option<NodeAvailabilityArg>,
55 : /// Scheduling policy controls whether tenant shards may be scheduled onto this node.
56 : #[arg(long)]
57 : scheduling: Option<NodeSchedulingPolicy>,
58 : },
59 : /// Modify a tenant's policies in the storage controller
60 : TenantPolicy {
61 : #[arg(long)]
62 0 : tenant_id: TenantId,
63 : /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
64 : /// or is in the normal attached state with N secondary locations (`attached:N`)
65 : #[arg(long)]
66 : placement: Option<PlacementPolicyArg>,
67 : /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal,
68 : /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
69 : /// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant
70 : /// unavailable, and are only for use in emergencies.
71 : #[arg(long)]
72 : scheduling: Option<ShardSchedulingPolicyArg>,
73 : },
74 : /// List nodes known to the storage controller
75 : Nodes {},
76 : /// List tenants known to the storage controller
77 : Tenants {},
78 : /// Create a new tenant in the storage controller, and by extension on pageservers.
79 : TenantCreate {
80 : #[arg(long)]
81 0 : tenant_id: TenantId,
82 : },
83 : /// Delete a tenant in the storage controller, and by extension on pageservers.
84 : TenantDelete {
85 : #[arg(long)]
86 0 : tenant_id: TenantId,
87 : },
88 : /// Split an existing tenant into a higher number of shards than its current shard count.
89 : TenantShardSplit {
90 : #[arg(long)]
91 0 : tenant_id: TenantId,
92 : #[arg(long)]
93 0 : shard_count: u8,
94 : /// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes.
95 : #[arg(long)]
96 : stripe_size: Option<u32>,
97 : },
98 : /// Migrate the attached location for a tenant shard to a specific pageserver.
99 : TenantShardMigrate {
100 : #[arg(long)]
101 0 : tenant_shard_id: TenantShardId,
102 : #[arg(long)]
103 0 : node: NodeId,
104 : },
105 : /// Modify the pageserver tenant configuration of a tenant: this is the configuration structure
106 : /// that is passed through to pageservers, and does not affect storage controller behavior.
107 : TenantConfig {
108 : #[arg(long)]
109 0 : tenant_id: TenantId,
110 : #[arg(long)]
111 0 : config: String,
112 : },
113 : /// Attempt to balance the locations for a tenant across pageservers. This is a client-side
114 : /// alternative to the storage controller's scheduling optimization behavior.
115 : TenantScatter {
116 : #[arg(long)]
117 0 : tenant_id: TenantId,
118 : },
119 : /// Print details about a particular tenant, including all its shards' states.
120 : TenantDescribe {
121 : #[arg(long)]
122 0 : tenant_id: TenantId,
123 : },
124 : /// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
125 : /// mode so that it can warm up content on a pageserver.
126 : TenantWarmup {
127 : #[arg(long)]
128 0 : tenant_id: TenantId,
129 : },
130 : /// Uncleanly drop a tenant from the storage controller: this doesn't delete anything from pageservers. Appropriate
131 : /// if you e.g. used `tenant-warmup` by mistake on a tenant ID that doesn't really exist, or is in some other region.
132 : TenantDrop {
133 : #[arg(long)]
134 0 : tenant_id: TenantId,
135 : #[arg(long)]
136 0 : unclean: bool,
137 : },
138 : NodeDrop {
139 : #[arg(long)]
140 0 : node_id: NodeId,
141 : #[arg(long)]
142 0 : unclean: bool,
143 : },
144 : TenantSetTimeBasedEviction {
145 : #[arg(long)]
146 0 : tenant_id: TenantId,
147 : #[arg(long)]
148 0 : period: humantime::Duration,
149 : #[arg(long)]
150 0 : threshold: humantime::Duration,
151 : },
152 : // Drain a set of specified pageservers by moving the primary attachments to pageservers
153 : // outside of the specified set.
154 : Drain {
155 : // Set of pageserver node ids to drain.
156 : #[arg(long)]
157 0 : nodes: Vec<NodeId>,
158 : // Optional: migration concurrency (default is 8)
159 : #[arg(long)]
160 : concurrency: Option<usize>,
161 : // Optional: maximum number of shards to migrate
162 : #[arg(long)]
163 : max_shards: Option<usize>,
164 : // Optional: when set to true, nothing is migrated, but the plan is printed to stdout
165 : #[arg(long)]
166 : dry_run: Option<bool>,
167 : },
168 : }
169 :
170 0 : #[derive(Parser)]
171 : #[command(
172 : author,
173 : version,
174 : about,
175 : long_about = "CLI for Storage Controller Support/Debug"
176 : )]
177 : #[command(arg_required_else_help(true))]
178 : struct Cli {
179 : #[arg(long)]
180 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
181 0 : api: Url,
182 :
183 : #[arg(long)]
184 : /// JWT token for authenticating with storage controller. Depending on the API used, this
185 : /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
186 : /// a token with both scopes to use with this tool.
187 : jwt: Option<String>,
188 :
189 : #[command(subcommand)]
190 : command: Command,
191 : }
192 :
193 : #[derive(Debug, Clone)]
194 : struct PlacementPolicyArg(PlacementPolicy);
195 :
196 : impl FromStr for PlacementPolicyArg {
197 : type Err = anyhow::Error;
198 :
199 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
200 0 : match s {
201 0 : "detached" => Ok(Self(PlacementPolicy::Detached)),
202 0 : "secondary" => Ok(Self(PlacementPolicy::Secondary)),
203 0 : _ if s.starts_with("attached:") => {
204 0 : let mut splitter = s.split(':');
205 0 : let _prefix = splitter.next().unwrap();
206 0 : match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
207 0 : Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
208 0 : None => Err(anyhow::anyhow!(
209 0 : "Invalid format '{s}', a valid example is 'attached:1'"
210 0 : )),
211 : }
212 : }
213 0 : _ => Err(anyhow::anyhow!(
214 0 : "Unknown placement policy '{s}', try detached,secondary,attached:<n>"
215 0 : )),
216 : }
217 0 : }
218 : }
219 :
220 : #[derive(Debug, Clone)]
221 : struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
222 :
223 : impl FromStr for ShardSchedulingPolicyArg {
224 : type Err = anyhow::Error;
225 :
226 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
227 0 : match s {
228 0 : "active" => Ok(Self(ShardSchedulingPolicy::Active)),
229 0 : "essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
230 0 : "pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
231 0 : "stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
232 0 : _ => Err(anyhow::anyhow!(
233 0 : "Unknown scheduling policy '{s}', try active,essential,pause,stop"
234 0 : )),
235 : }
236 0 : }
237 : }
238 :
239 : #[derive(Debug, Clone)]
240 : struct NodeAvailabilityArg(NodeAvailabilityWrapper);
241 :
242 : impl FromStr for NodeAvailabilityArg {
243 : type Err = anyhow::Error;
244 :
245 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
246 0 : match s {
247 0 : "active" => Ok(Self(NodeAvailabilityWrapper::Active)),
248 0 : "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
249 0 : _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
250 : }
251 0 : }
252 : }
253 :
254 : struct Client {
255 : base_url: Url,
256 : jwt_token: Option<String>,
257 : client: reqwest::Client,
258 : }
259 :
260 : impl Client {
261 0 : fn new(base_url: Url, jwt_token: Option<String>) -> Self {
262 0 : Self {
263 0 : base_url,
264 0 : jwt_token,
265 0 : client: reqwest::ClientBuilder::new()
266 0 : .build()
267 0 : .expect("Failed to construct http client"),
268 0 : }
269 0 : }
270 :
271 : /// Simple HTTP request wrapper for calling into storage controller
272 0 : async fn dispatch<RQ, RS>(
273 0 : &self,
274 0 : method: Method,
275 0 : path: String,
276 0 : body: Option<RQ>,
277 0 : ) -> mgmt_api::Result<RS>
278 0 : where
279 0 : RQ: Serialize + Sized,
280 0 : RS: DeserializeOwned + Sized,
281 0 : {
282 0 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
283 0 : // for general purpose API access.
284 0 : let url = Url::from_str(&format!(
285 0 : "http://{}:{}/{path}",
286 0 : self.base_url.host_str().unwrap(),
287 0 : self.base_url.port().unwrap()
288 0 : ))
289 0 : .unwrap();
290 0 :
291 0 : let mut builder = self.client.request(method, url);
292 0 : if let Some(body) = body {
293 0 : builder = builder.json(&body)
294 0 : }
295 0 : if let Some(jwt_token) = &self.jwt_token {
296 0 : builder = builder.header(
297 0 : reqwest::header::AUTHORIZATION,
298 0 : format!("Bearer {jwt_token}"),
299 0 : );
300 0 : }
301 :
302 0 : let response = builder.send().await.map_err(mgmt_api::Error::ReceiveBody)?;
303 0 : let response = response.error_from_body().await?;
304 :
305 0 : response
306 0 : .json()
307 0 : .await
308 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)
309 0 : }
310 : }
311 :
312 : #[tokio::main]
313 0 : async fn main() -> anyhow::Result<()> {
314 0 : let cli = Cli::parse();
315 0 :
316 0 : let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
317 0 :
318 0 : let mut trimmed = cli.api.to_string();
319 0 : trimmed.pop();
320 0 : let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
321 0 :
322 0 : match cli.command {
323 0 : Command::NodeRegister {
324 0 : node_id,
325 0 : listen_pg_addr,
326 0 : listen_pg_port,
327 0 : listen_http_addr,
328 0 : listen_http_port,
329 0 : } => {
330 0 : storcon_client
331 0 : .dispatch::<_, ()>(
332 0 : Method::POST,
333 0 : "control/v1/node".to_string(),
334 0 : Some(NodeRegisterRequest {
335 0 : node_id,
336 0 : listen_pg_addr,
337 0 : listen_pg_port,
338 0 : listen_http_addr,
339 0 : listen_http_port,
340 0 : }),
341 0 : )
342 0 : .await?;
343 0 : }
344 0 : Command::TenantCreate { tenant_id } => {
345 0 : vps_client
346 0 : .tenant_create(&TenantCreateRequest {
347 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
348 0 : generation: None,
349 0 : shard_parameters: ShardParameters::default(),
350 0 : placement_policy: Some(PlacementPolicy::Attached(1)),
351 0 : config: TenantConfig::default(),
352 0 : })
353 0 : .await?;
354 0 : }
355 0 : Command::TenantDelete { tenant_id } => {
356 0 : let status = vps_client
357 0 : .tenant_delete(TenantShardId::unsharded(tenant_id))
358 0 : .await?;
359 0 : tracing::info!("Delete status: {}", status);
360 0 : }
361 0 : Command::Nodes {} => {
362 0 : let resp = storcon_client
363 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
364 0 : Method::GET,
365 0 : "control/v1/node".to_string(),
366 0 : None,
367 0 : )
368 0 : .await?;
369 0 : let mut table = comfy_table::Table::new();
370 0 : table.set_header(["Id", "Hostname", "Scheduling", "Availability"]);
371 0 : for node in resp {
372 0 : table.add_row([
373 0 : format!("{}", node.id),
374 0 : node.listen_http_addr,
375 0 : format!("{:?}", node.scheduling),
376 0 : format!("{:?}", node.availability),
377 0 : ]);
378 0 : }
379 0 : println!("{table}");
380 0 : }
381 0 : Command::NodeConfigure {
382 0 : node_id,
383 0 : availability,
384 0 : scheduling,
385 0 : } => {
386 0 : let req = NodeConfigureRequest {
387 0 : node_id,
388 0 : availability: availability.map(|a| a.0),
389 0 : scheduling,
390 0 : };
391 0 : storcon_client
392 0 : .dispatch::<_, ()>(
393 0 : Method::PUT,
394 0 : format!("control/v1/node/{node_id}/config"),
395 0 : Some(req),
396 0 : )
397 0 : .await?;
398 0 : }
399 0 : Command::Tenants {} => {
400 0 : let resp = storcon_client
401 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(
402 0 : Method::GET,
403 0 : "control/v1/tenant".to_string(),
404 0 : None,
405 0 : )
406 0 : .await?;
407 0 : let mut table = comfy_table::Table::new();
408 0 : table.set_header([
409 0 : "TenantId",
410 0 : "ShardCount",
411 0 : "StripeSize",
412 0 : "Placement",
413 0 : "Scheduling",
414 0 : ]);
415 0 : for tenant in resp {
416 0 : let shard_zero = tenant.shards.into_iter().next().unwrap();
417 0 : table.add_row([
418 0 : format!("{}", tenant.tenant_id),
419 0 : format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
420 0 : format!("{:?}", tenant.stripe_size),
421 0 : format!("{:?}", tenant.policy),
422 0 : format!("{:?}", shard_zero.scheduling_policy),
423 0 : ]);
424 0 : }
425 0 :
426 0 : println!("{table}");
427 0 : }
428 0 : Command::TenantPolicy {
429 0 : tenant_id,
430 0 : placement,
431 0 : scheduling,
432 0 : } => {
433 0 : let req = TenantPolicyRequest {
434 0 : scheduling: scheduling.map(|s| s.0),
435 0 : placement: placement.map(|p| p.0),
436 0 : };
437 0 : storcon_client
438 0 : .dispatch::<_, ()>(
439 0 : Method::PUT,
440 0 : format!("control/v1/tenant/{tenant_id}/policy"),
441 0 : Some(req),
442 0 : )
443 0 : .await?;
444 0 : }
445 0 : Command::TenantShardSplit {
446 0 : tenant_id,
447 0 : shard_count,
448 0 : stripe_size,
449 0 : } => {
450 0 : let req = TenantShardSplitRequest {
451 0 : new_shard_count: shard_count,
452 0 : new_stripe_size: stripe_size.map(ShardStripeSize),
453 0 : };
454 0 :
455 0 : let response = storcon_client
456 0 : .dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
457 0 : Method::PUT,
458 0 : format!("control/v1/tenant/{tenant_id}/shard_split"),
459 0 : Some(req),
460 0 : )
461 0 : .await?;
462 0 : println!(
463 0 : "Split tenant {} into {} shards: {}",
464 0 : tenant_id,
465 0 : shard_count,
466 0 : response
467 0 : .new_shards
468 0 : .iter()
469 0 : .map(|s| format!("{:?}", s))
470 0 : .collect::<Vec<_>>()
471 0 : .join(",")
472 0 : );
473 0 : }
474 0 : Command::TenantShardMigrate {
475 0 : tenant_shard_id,
476 0 : node,
477 0 : } => {
478 0 : let req = TenantShardMigrateRequest {
479 0 : tenant_shard_id,
480 0 : node_id: node,
481 0 : };
482 0 :
483 0 : storcon_client
484 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
485 0 : Method::PUT,
486 0 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
487 0 : Some(req),
488 0 : )
489 0 : .await?;
490 0 : }
491 0 : Command::TenantConfig { tenant_id, config } => {
492 0 : let tenant_conf = serde_json::from_str(&config)?;
493 0 :
494 0 : vps_client
495 0 : .tenant_config(&TenantConfigRequest {
496 0 : tenant_id,
497 0 : config: tenant_conf,
498 0 : })
499 0 : .await?;
500 0 : }
501 0 : Command::TenantScatter { tenant_id } => {
502 0 : // Find the shards
503 0 : let locate_response = storcon_client
504 0 : .dispatch::<(), TenantLocateResponse>(
505 0 : Method::GET,
506 0 : format!("control/v1/tenant/{tenant_id}/locate"),
507 0 : None,
508 0 : )
509 0 : .await?;
510 0 : let shards = locate_response.shards;
511 0 :
512 0 : let mut node_to_shards: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
513 0 : let shard_count = shards.len();
514 0 : for s in shards {
515 0 : let entry = node_to_shards.entry(s.node_id).or_default();
516 0 : entry.push(s.shard_id);
517 0 : }
518 0 :
519 0 : // Load list of available nodes
520 0 : let nodes_resp = storcon_client
521 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
522 0 : Method::GET,
523 0 : "control/v1/node".to_string(),
524 0 : None,
525 0 : )
526 0 : .await?;
527 0 :
528 0 : for node in nodes_resp {
529 0 : if matches!(node.availability, NodeAvailabilityWrapper::Active) {
530 0 : node_to_shards.entry(node.id).or_default();
531 0 : }
532 0 : }
533 0 :
534 0 : let max_shard_per_node = shard_count / node_to_shards.len();
535 0 :
536 0 : loop {
537 0 : let mut migrate_shard = None;
538 0 : for shards in node_to_shards.values_mut() {
539 0 : if shards.len() > max_shard_per_node {
540 0 : // Pick the emptiest
541 0 : migrate_shard = Some(shards.pop().unwrap());
542 0 : }
543 0 : }
544 0 : let Some(migrate_shard) = migrate_shard else {
545 0 : break;
546 0 : };
547 0 :
548 0 : // Pick the emptiest node to migrate to
549 0 : let mut destinations = node_to_shards
550 0 : .iter()
551 0 : .map(|(k, v)| (k, v.len()))
552 0 : .collect::<Vec<_>>();
553 0 : destinations.sort_by_key(|i| i.1);
554 0 : let (destination_node, destination_count) = *destinations.first().unwrap();
555 0 : if destination_count + 1 > max_shard_per_node {
556 0 : // Even the emptiest destination doesn't have space: we're done
557 0 : break;
558 0 : }
559 0 : let destination_node = *destination_node;
560 0 :
561 0 : node_to_shards
562 0 : .get_mut(&destination_node)
563 0 : .unwrap()
564 0 : .push(migrate_shard);
565 0 :
566 0 : println!("Migrate {} -> {} ...", migrate_shard, destination_node);
567 0 :
568 0 : storcon_client
569 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
570 0 : Method::PUT,
571 0 : format!("control/v1/tenant/{migrate_shard}/migrate"),
572 0 : Some(TenantShardMigrateRequest {
573 0 : tenant_shard_id: migrate_shard,
574 0 : node_id: destination_node,
575 0 : }),
576 0 : )
577 0 : .await?;
578 0 : println!("Migrate {} -> {} OK", migrate_shard, destination_node);
579 0 : }
580 0 :
581 0 : // Spread the shards across the nodes
582 0 : }
583 0 : Command::TenantDescribe { tenant_id } => {
584 0 : let describe_response = storcon_client
585 0 : .dispatch::<(), TenantDescribeResponse>(
586 0 : Method::GET,
587 0 : format!("control/v1/tenant/{tenant_id}"),
588 0 : None,
589 0 : )
590 0 : .await?;
591 0 : let shards = describe_response.shards;
592 0 : let mut table = comfy_table::Table::new();
593 0 : table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]);
594 0 : for shard in shards {
595 0 : let secondary = shard
596 0 : .node_secondary
597 0 : .iter()
598 0 : .map(|n| format!("{}", n))
599 0 : .collect::<Vec<_>>()
600 0 : .join(",");
601 0 :
602 0 : let mut status_parts = Vec::new();
603 0 : if shard.is_reconciling {
604 0 : status_parts.push("reconciling");
605 0 : }
606 0 :
607 0 : if shard.is_pending_compute_notification {
608 0 : status_parts.push("pending_compute");
609 0 : }
610 0 :
611 0 : if shard.is_splitting {
612 0 : status_parts.push("splitting");
613 0 : }
614 0 : let status = status_parts.join(",");
615 0 :
616 0 : table.add_row([
617 0 : format!("{}", shard.tenant_shard_id),
618 0 : shard
619 0 : .node_attached
620 0 : .map(|n| format!("{}", n))
621 0 : .unwrap_or(String::new()),
622 0 : secondary,
623 0 : shard.last_error,
624 0 : status,
625 0 : ]);
626 0 : }
627 0 : println!("{table}");
628 0 : }
629 0 : Command::TenantWarmup { tenant_id } => {
630 0 : let describe_response = storcon_client
631 0 : .dispatch::<(), TenantDescribeResponse>(
632 0 : Method::GET,
633 0 : format!("control/v1/tenant/{tenant_id}"),
634 0 : None,
635 0 : )
636 0 : .await;
637 0 : match describe_response {
638 0 : Ok(describe) => {
639 0 : if matches!(describe.policy, PlacementPolicy::Secondary) {
640 0 : // Fine: it's already known to controller in secondary mode: calling
641 0 : // again to put it into secondary mode won't cause problems.
642 0 : } else {
643 0 : anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
644 0 : }
645 0 : }
646 0 : Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
647 0 : // Fine: this tenant isn't know to the storage controller yet.
648 0 : }
649 0 : Err(e) => {
650 0 : // Unexpected API error
651 0 : return Err(e.into());
652 0 : }
653 0 : }
654 0 :
655 0 : vps_client
656 0 : .location_config(
657 0 : TenantShardId::unsharded(tenant_id),
658 0 : pageserver_api::models::LocationConfig {
659 0 : mode: pageserver_api::models::LocationConfigMode::Secondary,
660 0 : generation: None,
661 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
662 0 : shard_number: 0,
663 0 : shard_count: 0,
664 0 : shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
665 0 : tenant_conf: TenantConfig::default(),
666 0 : },
667 0 : None,
668 0 : true,
669 0 : )
670 0 : .await?;
671 0 :
672 0 : let describe_response = storcon_client
673 0 : .dispatch::<(), TenantDescribeResponse>(
674 0 : Method::GET,
675 0 : format!("control/v1/tenant/{tenant_id}"),
676 0 : None,
677 0 : )
678 0 : .await?;
679 0 :
680 0 : let secondary_ps_id = describe_response
681 0 : .shards
682 0 : .first()
683 0 : .unwrap()
684 0 : .node_secondary
685 0 : .first()
686 0 : .unwrap();
687 0 :
688 0 : println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
689 0 : loop {
690 0 : let (status, progress) = vps_client
691 0 : .tenant_secondary_download(
692 0 : TenantShardId::unsharded(tenant_id),
693 0 : Some(Duration::from_secs(10)),
694 0 : )
695 0 : .await?;
696 0 : println!(
697 0 : "Progress: {}/{} layers, {}/{} bytes",
698 0 : progress.layers_downloaded,
699 0 : progress.layers_total,
700 0 : progress.bytes_downloaded,
701 0 : progress.bytes_total
702 0 : );
703 0 : match status {
704 0 : StatusCode::OK => {
705 0 : println!("Download complete");
706 0 : break;
707 0 : }
708 0 : StatusCode::ACCEPTED => {
709 0 : // Loop
710 0 : }
711 0 : _ => {
712 0 : anyhow::bail!("Unexpected download status: {status}");
713 0 : }
714 0 : }
715 0 : }
716 0 : }
717 0 : Command::TenantDrop { tenant_id, unclean } => {
718 0 : if !unclean {
719 0 : anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.")
720 0 : }
721 0 : storcon_client
722 0 : .dispatch::<(), ()>(
723 0 : Method::POST,
724 0 : format!("debug/v1/tenant/{tenant_id}/drop"),
725 0 : None,
726 0 : )
727 0 : .await?;
728 0 : }
729 0 : Command::NodeDrop { node_id, unclean } => {
730 0 : if !unclean {
731 0 : anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.")
732 0 : }
733 0 : storcon_client
734 0 : .dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
735 0 : .await?;
736 0 : }
737 0 : Command::TenantSetTimeBasedEviction {
738 0 : tenant_id,
739 0 : period,
740 0 : threshold,
741 0 : } => {
742 0 : vps_client
743 0 : .tenant_config(&TenantConfigRequest {
744 0 : tenant_id,
745 0 : config: TenantConfig {
746 0 : eviction_policy: Some(EvictionPolicy::LayerAccessThreshold(
747 0 : EvictionPolicyLayerAccessThreshold {
748 0 : period: period.into(),
749 0 : threshold: threshold.into(),
750 0 : },
751 0 : )),
752 0 : ..Default::default()
753 0 : },
754 0 : })
755 0 : .await?;
756 0 : }
757 0 : Command::Drain {
758 0 : nodes,
759 0 : concurrency,
760 0 : max_shards,
761 0 : dry_run,
762 0 : } => {
763 0 : // Load the list of nodes, split them up into the drained and filled sets,
764 0 : // and validate that draining is possible.
765 0 : let node_descs = storcon_client
766 0 : .dispatch::<(), Vec<NodeDescribeResponse>>(
767 0 : Method::GET,
768 0 : "control/v1/node".to_string(),
769 0 : None,
770 0 : )
771 0 : .await?;
772 0 :
773 0 : let mut node_to_drain_descs = Vec::new();
774 0 : let mut node_to_fill_descs = Vec::new();
775 0 :
776 0 : for desc in node_descs {
777 0 : let to_drain = nodes.iter().any(|id| *id == desc.id);
778 0 : if to_drain {
779 0 : node_to_drain_descs.push(desc);
780 0 : } else {
781 0 : node_to_fill_descs.push(desc);
782 0 : }
783 0 : }
784 0 :
785 0 : if nodes.len() != node_to_drain_descs.len() {
786 0 : anyhow::bail!("Drain requested for node which doesn't exist.")
787 0 : }
788 0 :
789 0 : node_to_fill_descs.retain(|desc| {
790 0 : matches!(desc.availability, NodeAvailabilityWrapper::Active)
791 0 : && matches!(
792 0 : desc.scheduling,
793 0 : NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
794 0 : )
795 0 : });
796 0 :
797 0 : if node_to_fill_descs.is_empty() {
798 0 : anyhow::bail!("There are no nodes to drain to")
799 0 : }
800 0 :
801 0 : // Set the node scheduling policy to draining for the nodes which
802 0 : // we plan to drain.
803 0 : for node_desc in node_to_drain_descs.iter() {
804 0 : let req = NodeConfigureRequest {
805 0 : node_id: node_desc.id,
806 0 : availability: None,
807 0 : scheduling: Some(NodeSchedulingPolicy::Draining),
808 0 : };
809 0 :
810 0 : storcon_client
811 0 : .dispatch::<_, ()>(
812 0 : Method::PUT,
813 0 : format!("control/v1/node/{}/config", node_desc.id),
814 0 : Some(req),
815 0 : )
816 0 : .await?;
817 0 : }
818 0 :
819 0 : // Perform the drain: move each tenant shard scheduled on a node to
820 0 : // be drained to a node which is being filled. A simple round robin
821 0 : // strategy is used to pick the new node.
822 0 : let tenants = storcon_client
823 0 : .dispatch::<(), Vec<TenantDescribeResponse>>(
824 0 : Method::GET,
825 0 : "control/v1/tenant".to_string(),
826 0 : None,
827 0 : )
828 0 : .await?;
829 0 :
830 0 : let mut selected_node_idx = 0;
831 0 :
832 0 : struct DrainMove {
833 0 : tenant_shard_id: TenantShardId,
834 0 : from: NodeId,
835 0 : to: NodeId,
836 0 : }
837 0 :
838 0 : let mut moves: Vec<DrainMove> = Vec::new();
839 0 :
840 0 : let shards = tenants
841 0 : .into_iter()
842 0 : .flat_map(|tenant| tenant.shards.into_iter());
843 0 : for shard in shards {
844 0 : if let Some(max_shards) = max_shards {
845 0 : if moves.len() >= max_shards {
846 0 : println!(
847 0 : "Stop planning shard moves since the requested maximum was reached"
848 0 : );
849 0 : break;
850 0 : }
851 0 : }
852 0 :
853 0 : let should_migrate = {
854 0 : if let Some(attached_to) = shard.node_attached {
855 0 : node_to_drain_descs
856 0 : .iter()
857 0 : .map(|desc| desc.id)
858 0 : .any(|id| id == attached_to)
859 0 : } else {
860 0 : false
861 0 : }
862 0 : };
863 0 :
864 0 : if !should_migrate {
865 0 : continue;
866 0 : }
867 0 :
868 0 : moves.push(DrainMove {
869 0 : tenant_shard_id: shard.tenant_shard_id,
870 0 : from: shard
871 0 : .node_attached
872 0 : .expect("We only migrate attached tenant shards"),
873 0 : to: node_to_fill_descs[selected_node_idx].id,
874 0 : });
875 0 : selected_node_idx = (selected_node_idx + 1) % node_to_fill_descs.len();
876 0 : }
877 0 :
878 0 : let total_moves = moves.len();
879 0 :
880 0 : if dry_run == Some(true) {
881 0 : println!("Dryrun requested. Planned {total_moves} moves:");
882 0 : for mv in &moves {
883 0 : println!("{}: {} -> {}", mv.tenant_shard_id, mv.from, mv.to)
884 0 : }
885 0 :
886 0 : return Ok(());
887 0 : }
888 0 :
889 0 : const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
890 0 : let mut stream = futures::stream::iter(moves)
891 0 : .map(|mv| {
892 0 : let client = Client::new(cli.api.clone(), cli.jwt.clone());
893 0 : async move {
894 0 : client
895 0 : .dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
896 0 : Method::PUT,
897 0 : format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
898 0 : Some(TenantShardMigrateRequest {
899 0 : tenant_shard_id: mv.tenant_shard_id,
900 0 : node_id: mv.to,
901 0 : }),
902 0 : )
903 0 : .await
904 0 : .map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
905 0 : }
906 0 : })
907 0 : .buffered(concurrency.unwrap_or(DEFAULT_MIGRATE_CONCURRENCY));
908 0 :
909 0 : let mut success = 0;
910 0 : let mut failure = 0;
911 0 :
912 0 : while let Some(res) = stream.next().await {
913 0 : match res {
914 0 : Ok(_) => {
915 0 : success += 1;
916 0 : }
917 0 : Err((tenant_shard_id, from, to, error)) => {
918 0 : failure += 1;
919 0 : println!(
920 0 : "Failed to migrate {} from node {} to node {}: {}",
921 0 : tenant_shard_id, from, to, error
922 0 : );
923 0 : }
924 0 : }
925 0 :
926 0 : if (success + failure) % 20 == 0 {
927 0 : println!(
928 0 : "Processed {}/{} shards: {} succeeded, {} failed",
929 0 : success + failure,
930 0 : total_moves,
931 0 : success,
932 0 : failure
933 0 : );
934 0 : }
935 0 : }
936 0 :
937 0 : println!(
938 0 : "Processed {}/{} shards: {} succeeded, {} failed",
939 0 : success + failure,
940 0 : total_moves,
941 0 : success,
942 0 : failure
943 0 : );
944 0 : }
945 0 : }
946 0 :
947 0 : Ok(())
948 0 : }
|