Line data Source code
1 : use crate::{
2 : background_process,
3 : local_env::{LocalEnv, NeonStorageControllerConf},
4 : };
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use pageserver_api::{
7 : controller_api::{
8 : NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
9 : TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
10 : TenantShardMigrateResponse,
11 : },
12 : models::{
13 : TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
14 : },
15 : shard::{ShardStripeSize, TenantShardId},
16 : };
17 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
18 : use postgres_backend::AuthType;
19 : use reqwest::Method;
20 : use serde::{de::DeserializeOwned, Deserialize, Serialize};
21 : use std::{fs, str::FromStr, time::Duration};
22 : use tokio::process::Command;
23 : use tracing::instrument;
24 : use url::Url;
25 : use utils::{
26 : auth::{encode_from_key_file, Claims, Scope},
27 : id::{NodeId, TenantId},
28 : };
29 :
30 : pub struct StorageController {
31 : env: LocalEnv,
32 : listen: String,
33 : private_key: Option<Vec<u8>>,
34 : public_key: Option<String>,
35 : postgres_port: u16,
36 : client: reqwest::Client,
37 : config: NeonStorageControllerConf,
38 : }
39 :
40 : const COMMAND: &str = "storage_controller";
41 :
42 : const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
43 :
44 : const DB_NAME: &str = "storage_controller";
45 :
46 0 : #[derive(Serialize, Deserialize)]
47 : pub struct AttachHookRequest {
48 : pub tenant_shard_id: TenantShardId,
49 : pub node_id: Option<NodeId>,
50 : pub generation_override: Option<i32>,
51 : }
52 :
53 0 : #[derive(Serialize, Deserialize)]
54 : pub struct AttachHookResponse {
55 : pub gen: Option<u32>,
56 : }
57 :
58 0 : #[derive(Serialize, Deserialize)]
59 : pub struct InspectRequest {
60 : pub tenant_shard_id: TenantShardId,
61 : }
62 :
63 0 : #[derive(Serialize, Deserialize)]
64 : pub struct InspectResponse {
65 : pub attachment: Option<(u32, NodeId)>,
66 : }
67 :
68 : impl StorageController {
69 0 : pub fn from_env(env: &LocalEnv) -> Self {
70 0 : // Makes no sense to construct this if pageservers aren't going to use it: assume
71 0 : // pageservers have control plane API set
72 0 : let listen_url = env.control_plane_api.clone().unwrap();
73 0 :
74 0 : let listen = format!(
75 0 : "{}:{}",
76 0 : listen_url.host_str().unwrap(),
77 0 : listen_url.port().unwrap()
78 0 : );
79 0 :
80 0 : // Convention: NeonEnv in python tests reserves the next port after the control_plane_api
81 0 : // port, for use by our captive postgres.
82 0 : let postgres_port = listen_url
83 0 : .port()
84 0 : .expect("Control plane API setting should always have a port")
85 0 : + 1;
86 0 :
87 0 : // Assume all pageservers have symmetric auth configuration: this service
88 0 : // expects to use one JWT token to talk to all of them.
89 0 : let ps_conf = env
90 0 : .pageservers
91 0 : .first()
92 0 : .expect("Config is validated to contain at least one pageserver");
93 0 : let (private_key, public_key) = match ps_conf.http_auth_type {
94 0 : AuthType::Trust => (None, None),
95 : AuthType::NeonJWT => {
96 0 : let private_key_path = env.get_private_key_path();
97 0 : let private_key = fs::read(private_key_path).expect("failed to read private key");
98 0 :
99 0 : // If pageserver auth is enabled, this implicitly enables auth for this service,
100 0 : // using the same credentials.
101 0 : let public_key_path =
102 0 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
103 0 : .unwrap();
104 :
105 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
106 0 : let public_key = if std::fs::metadata(&public_key_path)
107 0 : .expect("Can't stat public key")
108 0 : .is_dir()
109 : {
110 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
111 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
112 0 : let mut dir =
113 0 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
114 0 : let dent = dir
115 0 : .next()
116 0 : .expect("Empty key dir")
117 0 : .expect("Error reading key dir");
118 0 :
119 0 : std::fs::read_to_string(dent.path()).expect("Can't read public key")
120 : } else {
121 0 : std::fs::read_to_string(&public_key_path).expect("Can't read public key")
122 : };
123 0 : (Some(private_key), Some(public_key))
124 : }
125 : };
126 :
127 0 : Self {
128 0 : env: env.clone(),
129 0 : listen,
130 0 : private_key,
131 0 : public_key,
132 0 : postgres_port,
133 0 : client: reqwest::ClientBuilder::new()
134 0 : .build()
135 0 : .expect("Failed to construct http client"),
136 0 : config: env.storage_controller.clone(),
137 0 : }
138 0 : }
139 :
140 0 : fn pid_file(&self) -> Utf8PathBuf {
141 0 : Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_controller.pid"))
142 0 : .expect("non-Unicode path")
143 0 : }
144 :
145 : /// PIDFile for the postgres instance used to store storage controller state
146 0 : fn postgres_pid_file(&self) -> Utf8PathBuf {
147 0 : Utf8PathBuf::from_path_buf(
148 0 : self.env
149 0 : .base_data_dir
150 0 : .join("storage_controller_postgres.pid"),
151 0 : )
152 0 : .expect("non-Unicode path")
153 0 : }
154 :
155 : /// Find the directory containing postgres subdirectories, such `bin` and `lib`
156 : ///
157 : /// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
158 : /// to other versions if that one isn't found. Some automated tests create circumstances
159 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
160 0 : async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
161 0 : let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14];
162 :
163 0 : for v in prefer_versions {
164 0 : let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
165 0 : if tokio::fs::try_exists(&path).await? {
166 0 : return Ok(path);
167 0 : }
168 : }
169 :
170 : // Fall through
171 0 : anyhow::bail!(
172 0 : "Postgres directory '{}' not found in {}",
173 0 : dir_name,
174 0 : self.env.pg_distrib_dir.display(),
175 0 : );
176 0 : }
177 :
178 0 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
179 0 : self.get_pg_dir("bin").await
180 0 : }
181 :
182 0 : pub async fn get_pg_lib_dir(&self) -> anyhow::Result<Utf8PathBuf> {
183 0 : self.get_pg_dir("lib").await
184 0 : }
185 :
186 : /// Readiness check for our postgres process
187 0 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
188 0 : let bin_path = pg_bin_dir.join("pg_isready");
189 0 : let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
190 0 : let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
191 :
192 0 : Ok(exitcode.success())
193 0 : }
194 :
195 : /// Create our database if it doesn't exist, and run migrations.
196 : ///
197 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
198 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
199 : /// who just want to run `cargo neon_local` without knowing about diesel.
200 : ///
201 : /// Returns the database url
202 0 : pub async fn setup_database(&self) -> anyhow::Result<String> {
203 0 : let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
204 :
205 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
206 0 : let createdb_path = pg_bin_dir.join("createdb");
207 0 : let output = Command::new(&createdb_path)
208 0 : .args([
209 0 : "-h",
210 0 : "localhost",
211 0 : "-p",
212 0 : &format!("{}", self.postgres_port),
213 0 : DB_NAME,
214 0 : ])
215 0 : .output()
216 0 : .await
217 0 : .expect("Failed to spawn createdb");
218 0 :
219 0 : if !output.status.success() {
220 0 : let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
221 0 : if stderr.contains("already exists") {
222 0 : tracing::info!("Database {DB_NAME} already exists");
223 : } else {
224 0 : anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
225 : }
226 0 : }
227 :
228 0 : Ok(database_url)
229 0 : }
230 :
231 0 : pub async fn connect_to_database(
232 0 : &self,
233 0 : ) -> anyhow::Result<(
234 0 : tokio_postgres::Client,
235 0 : tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
236 0 : )> {
237 0 : tokio_postgres::Config::new()
238 0 : .host("localhost")
239 0 : .port(self.postgres_port)
240 0 : // The user is the ambient operating system user name.
241 0 : // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400
242 0 : //
243 0 : // Until we get there, use the ambient operating system user name.
244 0 : // Recent tokio-postgres versions default to this if the user isn't specified.
245 0 : // But tokio-postgres fork doesn't have this upstream commit:
246 0 : // https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
247 0 : // => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
248 0 : .user(&whoami::username())
249 0 : .dbname(DB_NAME)
250 0 : .connect(tokio_postgres::NoTls)
251 0 : .await
252 0 : .map_err(anyhow::Error::new)
253 0 : }
254 :
255 0 : pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
256 0 : // Start a vanilla Postgres process used by the storage controller for persistence.
257 0 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
258 0 : .unwrap()
259 0 : .join("storage_controller_db");
260 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
261 0 : let pg_lib_dir = self.get_pg_lib_dir().await?;
262 0 : let pg_log_path = pg_data_path.join("postgres.log");
263 0 :
264 0 : if !tokio::fs::try_exists(&pg_data_path).await? {
265 : // Initialize empty database
266 0 : let initdb_path = pg_bin_dir.join("initdb");
267 0 : let mut child = Command::new(&initdb_path)
268 0 : .envs(vec![
269 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
270 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
271 0 : ])
272 0 : .args(["-D", pg_data_path.as_ref()])
273 0 : .spawn()
274 0 : .expect("Failed to spawn initdb");
275 0 : let status = child.wait().await?;
276 0 : if !status.success() {
277 0 : anyhow::bail!("initdb failed with status {status}");
278 0 : }
279 0 : };
280 :
281 : // Write a minimal config file:
282 : // - Specify the port, since this is chosen dynamically
283 : // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
284 : // the storage controller we don't want a slow local disk to interfere with that.
285 : //
286 : // NB: it's important that we rewrite this file on each start command so we propagate changes
287 : // from `LocalEnv`'s config file (`.neon/config`).
288 0 : tokio::fs::write(
289 0 : &pg_data_path.join("postgresql.conf"),
290 0 : format!("port = {}\nfsync=off\n", self.postgres_port),
291 0 : )
292 0 : .await?;
293 :
294 0 : println!("Starting storage controller database...");
295 0 : let db_start_args = [
296 0 : "-w",
297 0 : "-D",
298 0 : pg_data_path.as_ref(),
299 0 : "-l",
300 0 : pg_log_path.as_ref(),
301 0 : "start",
302 0 : ];
303 0 :
304 0 : background_process::start_process(
305 0 : "storage_controller_db",
306 0 : &self.env.base_data_dir,
307 0 : pg_bin_dir.join("pg_ctl").as_std_path(),
308 0 : db_start_args,
309 0 : vec![
310 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
311 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
312 0 : ],
313 0 : background_process::InitialPidFile::Create(self.postgres_pid_file()),
314 0 : retry_timeout,
315 0 : || self.pg_isready(&pg_bin_dir),
316 0 : )
317 0 : .await?;
318 :
319 : // Run migrations on every startup, in case something changed.
320 0 : let database_url = self.setup_database().await?;
321 :
322 : // We support running a startup SQL script to fiddle with the database before we launch storcon.
323 : // This is used by the test suite.
324 0 : let startup_script_path = self
325 0 : .env
326 0 : .base_data_dir
327 0 : .join("storage_controller_db.startup.sql");
328 0 : let startup_script = match tokio::fs::read_to_string(&startup_script_path).await {
329 0 : Ok(script) => {
330 0 : tokio::fs::remove_file(startup_script_path).await?;
331 0 : script
332 : }
333 0 : Err(e) => {
334 0 : if e.kind() == std::io::ErrorKind::NotFound {
335 : // always run some startup script so that this code path doesn't bit rot
336 0 : "BEGIN; COMMIT;".to_string()
337 : } else {
338 0 : anyhow::bail!("Failed to read startup script: {e}")
339 : }
340 : }
341 : };
342 0 : let (mut client, conn) = self.connect_to_database().await?;
343 0 : let conn = tokio::spawn(conn);
344 0 : let tx = client.build_transaction();
345 0 : let tx = tx.start().await?;
346 0 : tx.batch_execute(&startup_script).await?;
347 0 : tx.commit().await?;
348 0 : drop(client);
349 0 : conn.await??;
350 :
351 0 : let mut args = vec![
352 0 : "-l",
353 0 : &self.listen,
354 0 : "--dev",
355 0 : "--database-url",
356 0 : &database_url,
357 0 : "--max-offline-interval",
358 0 : &humantime::Duration::from(self.config.max_offline).to_string(),
359 0 : "--max-warming-up-interval",
360 0 : &humantime::Duration::from(self.config.max_warming_up).to_string(),
361 0 : ]
362 0 : .into_iter()
363 0 : .map(|s| s.to_string())
364 0 : .collect::<Vec<_>>();
365 0 : if let Some(private_key) = &self.private_key {
366 0 : let claims = Claims::new(None, Scope::PageServerApi);
367 0 : let jwt_token =
368 0 : encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
369 0 : args.push(format!("--jwt-token={jwt_token}"));
370 0 : }
371 :
372 0 : if let Some(public_key) = &self.public_key {
373 0 : args.push(format!("--public-key=\"{public_key}\""));
374 0 : }
375 :
376 0 : if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
377 0 : args.push(format!(
378 0 : "--compute-hook-url={control_plane_compute_hook_api}"
379 0 : ));
380 0 : }
381 :
382 0 : if let Some(split_threshold) = self.config.split_threshold.as_ref() {
383 0 : args.push(format!("--split-threshold={split_threshold}"))
384 0 : }
385 :
386 0 : args.push(format!(
387 0 : "--neon-local-repo-dir={}",
388 0 : self.env.base_data_dir.display()
389 0 : ));
390 0 :
391 0 : background_process::start_process(
392 0 : COMMAND,
393 0 : &self.env.base_data_dir,
394 0 : &self.env.storage_controller_bin(),
395 0 : args,
396 0 : vec![
397 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
398 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
399 0 : ],
400 0 : background_process::InitialPidFile::Create(self.pid_file()),
401 0 : retry_timeout,
402 0 : || async {
403 0 : match self.ready().await {
404 0 : Ok(_) => Ok(true),
405 0 : Err(_) => Ok(false),
406 0 : }
407 0 : },
408 0 : )
409 0 : .await?;
410 :
411 0 : Ok(())
412 0 : }
413 :
414 0 : pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
415 0 : background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
416 :
417 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
418 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
419 :
420 0 : println!("Stopping storage controller database...");
421 0 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
422 0 : let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
423 0 : .args(pg_stop_args)
424 0 : .spawn()?
425 0 : .wait()
426 0 : .await?;
427 0 : if !stop_status.success() {
428 0 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
429 0 : let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
430 0 : .args(pg_status_args)
431 0 : .spawn()?
432 0 : .wait()
433 0 : .await?;
434 :
435 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
436 : // fine that stop failed. Otherwise it is an error that stop failed.
437 : const PG_STATUS_NOT_RUNNING: i32 = 3;
438 0 : if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
439 0 : println!("Storage controller database is already stopped");
440 0 : return Ok(());
441 : } else {
442 0 : anyhow::bail!("Failed to stop storage controller database: {stop_status}")
443 : }
444 0 : }
445 0 :
446 0 : Ok(())
447 0 : }
448 :
449 0 : fn get_claims_for_path(path: &str) -> anyhow::Result<Option<Claims>> {
450 0 : let category = match path.find('/') {
451 0 : Some(idx) => &path[..idx],
452 0 : None => path,
453 : };
454 :
455 0 : match category {
456 0 : "status" | "ready" => Ok(None),
457 0 : "control" | "debug" => Ok(Some(Claims::new(None, Scope::Admin))),
458 0 : "v1" => Ok(Some(Claims::new(None, Scope::PageServerApi))),
459 0 : _ => Err(anyhow::anyhow!("Failed to determine claims for {}", path)),
460 : }
461 0 : }
462 :
463 : /// Simple HTTP request wrapper for calling into storage controller
464 0 : async fn dispatch<RQ, RS>(
465 0 : &self,
466 0 : method: reqwest::Method,
467 0 : path: String,
468 0 : body: Option<RQ>,
469 0 : ) -> anyhow::Result<RS>
470 0 : where
471 0 : RQ: Serialize + Sized,
472 0 : RS: DeserializeOwned + Sized,
473 0 : {
474 0 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
475 0 : // for general purpose API access.
476 0 : let listen_url = self.env.control_plane_api.clone().unwrap();
477 0 : let url = Url::from_str(&format!(
478 0 : "http://{}:{}/{path}",
479 0 : listen_url.host_str().unwrap(),
480 0 : listen_url.port().unwrap()
481 0 : ))
482 0 : .unwrap();
483 0 :
484 0 : let mut builder = self.client.request(method, url);
485 0 : if let Some(body) = body {
486 0 : builder = builder.json(&body)
487 0 : }
488 0 : if let Some(private_key) = &self.private_key {
489 0 : println!("Getting claims for path {}", path);
490 0 : if let Some(required_claims) = Self::get_claims_for_path(&path)? {
491 0 : println!("Got claims {:?} for path {}", required_claims, path);
492 0 : let jwt_token = encode_from_key_file(&required_claims, private_key)?;
493 0 : builder = builder.header(
494 0 : reqwest::header::AUTHORIZATION,
495 0 : format!("Bearer {jwt_token}"),
496 0 : );
497 0 : }
498 0 : }
499 :
500 0 : let response = builder.send().await?;
501 0 : let response = response.error_from_body().await?;
502 :
503 0 : Ok(response
504 0 : .json()
505 0 : .await
506 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
507 0 : }
508 :
509 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
510 0 : #[instrument(skip(self))]
511 : pub async fn attach_hook(
512 : &self,
513 : tenant_shard_id: TenantShardId,
514 : pageserver_id: NodeId,
515 : ) -> anyhow::Result<Option<u32>> {
516 : let request = AttachHookRequest {
517 : tenant_shard_id,
518 : node_id: Some(pageserver_id),
519 : generation_override: None,
520 : };
521 :
522 : let response = self
523 : .dispatch::<_, AttachHookResponse>(
524 : Method::POST,
525 : "debug/v1/attach-hook".to_string(),
526 : Some(request),
527 : )
528 : .await?;
529 :
530 : Ok(response.gen)
531 : }
532 :
533 0 : #[instrument(skip(self))]
534 : pub async fn inspect(
535 : &self,
536 : tenant_shard_id: TenantShardId,
537 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
538 : let request = InspectRequest { tenant_shard_id };
539 :
540 : let response = self
541 : .dispatch::<_, InspectResponse>(
542 : Method::POST,
543 : "debug/v1/inspect".to_string(),
544 : Some(request),
545 : )
546 : .await?;
547 :
548 : Ok(response.attachment)
549 : }
550 :
551 0 : #[instrument(skip(self))]
552 : pub async fn tenant_create(
553 : &self,
554 : req: TenantCreateRequest,
555 : ) -> anyhow::Result<TenantCreateResponse> {
556 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
557 : .await
558 : }
559 :
560 0 : #[instrument(skip(self))]
561 : pub async fn tenant_import(&self, tenant_id: TenantId) -> anyhow::Result<TenantCreateResponse> {
562 : self.dispatch::<(), TenantCreateResponse>(
563 : Method::POST,
564 : format!("debug/v1/tenant/{tenant_id}/import"),
565 : None,
566 : )
567 : .await
568 : }
569 :
570 0 : #[instrument(skip(self))]
571 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
572 : self.dispatch::<(), _>(
573 : Method::GET,
574 : format!("debug/v1/tenant/{tenant_id}/locate"),
575 : None,
576 : )
577 : .await
578 : }
579 :
580 0 : #[instrument(skip(self))]
581 : pub async fn tenant_migrate(
582 : &self,
583 : tenant_shard_id: TenantShardId,
584 : node_id: NodeId,
585 : ) -> anyhow::Result<TenantShardMigrateResponse> {
586 : self.dispatch(
587 : Method::PUT,
588 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
589 : Some(TenantShardMigrateRequest {
590 : tenant_shard_id,
591 : node_id,
592 : }),
593 : )
594 : .await
595 : }
596 :
597 0 : #[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
598 : pub async fn tenant_split(
599 : &self,
600 : tenant_id: TenantId,
601 : new_shard_count: u8,
602 : new_stripe_size: Option<ShardStripeSize>,
603 : ) -> anyhow::Result<TenantShardSplitResponse> {
604 : self.dispatch(
605 : Method::PUT,
606 : format!("control/v1/tenant/{tenant_id}/shard_split"),
607 : Some(TenantShardSplitRequest {
608 : new_shard_count,
609 : new_stripe_size,
610 : }),
611 : )
612 : .await
613 : }
614 :
615 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
616 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
617 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
618 : .await
619 : }
620 :
621 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
622 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
623 : self.dispatch::<_, ()>(
624 : Method::PUT,
625 : format!("control/v1/node/{}/config", req.node_id),
626 : Some(req),
627 : )
628 : .await
629 : }
630 :
631 0 : pub async fn node_list(&self) -> anyhow::Result<Vec<NodeDescribeResponse>> {
632 0 : self.dispatch::<(), Vec<NodeDescribeResponse>>(
633 0 : Method::GET,
634 0 : "control/v1/node".to_string(),
635 0 : None,
636 0 : )
637 0 : .await
638 0 : }
639 :
640 0 : #[instrument(skip(self))]
641 : pub async fn ready(&self) -> anyhow::Result<()> {
642 : self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None)
643 : .await
644 : }
645 :
646 0 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
647 : pub async fn tenant_timeline_create(
648 : &self,
649 : tenant_id: TenantId,
650 : req: TimelineCreateRequest,
651 : ) -> anyhow::Result<TimelineInfo> {
652 : self.dispatch(
653 : Method::POST,
654 : format!("v1/tenant/{tenant_id}/timeline"),
655 : Some(req),
656 : )
657 : .await
658 : }
659 : }
|