Line data Source code
1 : use crate::{background_process, local_env::LocalEnv};
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use diesel::{
4 : backend::Backend,
5 : query_builder::{AstPass, QueryFragment, QueryId},
6 : Connection, PgConnection, QueryResult, RunQueryDsl,
7 : };
8 : use diesel_migrations::{HarnessWithOutput, MigrationHarness};
9 : use hyper::Method;
10 : use pageserver_api::{
11 : models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo},
12 : shard::TenantShardId,
13 : };
14 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
15 : use postgres_backend::AuthType;
16 : use serde::{de::DeserializeOwned, Deserialize, Serialize};
17 : use std::{env, str::FromStr};
18 : use tokio::process::Command;
19 : use tracing::instrument;
20 : use url::Url;
21 : use utils::{
22 : auth::{Claims, Scope},
23 : id::{NodeId, TenantId},
24 : };
25 :
26 : pub struct AttachmentService {
27 : env: LocalEnv,
28 : listen: String,
29 : path: Utf8PathBuf,
30 : jwt_token: Option<String>,
31 : public_key: Option<String>,
32 : postgres_port: u16,
33 : client: reqwest::Client,
34 : }
35 :
36 : const COMMAND: &str = "attachment_service";
37 :
38 : const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16;
39 :
40 1060 : #[derive(Serialize, Deserialize)]
41 : pub struct AttachHookRequest {
42 : pub tenant_shard_id: TenantShardId,
43 : pub node_id: Option<NodeId>,
44 : }
45 :
46 212 : #[derive(Serialize, Deserialize)]
47 : pub struct AttachHookResponse {
48 : pub gen: Option<u32>,
49 : }
50 :
51 216 : #[derive(Serialize, Deserialize)]
52 : pub struct InspectRequest {
53 : pub tenant_shard_id: TenantShardId,
54 : }
55 :
56 72 : #[derive(Serialize, Deserialize)]
57 : pub struct InspectResponse {
58 : pub attachment: Option<(u32, NodeId)>,
59 : }
60 :
61 3367 : #[derive(Serialize, Deserialize)]
62 : pub struct TenantCreateResponseShard {
63 : pub shard_id: TenantShardId,
64 : pub node_id: NodeId,
65 : pub generation: u32,
66 : }
67 :
68 1371 : #[derive(Serialize, Deserialize)]
69 : pub struct TenantCreateResponse {
70 : pub shards: Vec<TenantCreateResponseShard>,
71 : }
72 :
73 6633 : #[derive(Serialize, Deserialize)]
74 : pub struct NodeRegisterRequest {
75 : pub node_id: NodeId,
76 :
77 : pub listen_pg_addr: String,
78 : pub listen_pg_port: u16,
79 :
80 : pub listen_http_addr: String,
81 : pub listen_http_port: u16,
82 : }
83 :
84 20 : #[derive(Serialize, Deserialize)]
85 : pub struct NodeConfigureRequest {
86 : pub node_id: NodeId,
87 :
88 : pub availability: Option<NodeAvailability>,
89 : pub scheduling: Option<NodeSchedulingPolicy>,
90 : }
91 :
92 7566 : #[derive(Serialize, Deserialize, Debug)]
93 : pub struct TenantLocateResponseShard {
94 : pub shard_id: TenantShardId,
95 : pub node_id: NodeId,
96 :
97 : pub listen_pg_addr: String,
98 : pub listen_pg_port: u16,
99 :
100 : pub listen_http_addr: String,
101 : pub listen_http_port: u16,
102 : }
103 :
104 2820 : #[derive(Serialize, Deserialize)]
105 : pub struct TenantLocateResponse {
106 : pub shards: Vec<TenantLocateResponseShard>,
107 : pub shard_params: ShardParameters,
108 : }
109 :
110 : /// Explicitly migrating a particular shard is a low level operation
111 : /// TODO: higher level "Reschedule tenant" operation where the request
112 : /// specifies some constraints, e.g. asking it to get off particular node(s)
113 0 : #[derive(Serialize, Deserialize, Debug)]
114 : pub struct TenantShardMigrateRequest {
115 : pub tenant_shard_id: TenantShardId,
116 : pub node_id: NodeId,
117 : }
118 :
119 914 : #[derive(Serialize, Deserialize, Clone, Copy)]
120 : pub enum NodeAvailability {
121 : // Normal, happy state
122 : Active,
123 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
124 : // secondary locations on this node still exist. Newly added nodes are in this
125 : // state until we successfully contact them.
126 : Offline,
127 : }
128 :
129 : impl FromStr for NodeAvailability {
130 : type Err = anyhow::Error;
131 :
132 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
133 0 : match s {
134 0 : "active" => Ok(Self::Active),
135 0 : "offline" => Ok(Self::Offline),
136 0 : _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
137 : }
138 0 : }
139 : }
140 :
141 : /// FIXME: this is a duplicate of the type in the attachment_service crate, because the
142 : /// type needs to be defined with diesel traits in there.
143 914 : #[derive(Serialize, Deserialize, Clone, Copy)]
144 : pub enum NodeSchedulingPolicy {
145 : Active,
146 : Filling,
147 : Pause,
148 : Draining,
149 : }
150 :
151 : impl FromStr for NodeSchedulingPolicy {
152 : type Err = anyhow::Error;
153 :
154 13 : fn from_str(s: &str) -> Result<Self, Self::Err> {
155 13 : match s {
156 13 : "active" => Ok(Self::Active),
157 13 : "filling" => Ok(Self::Filling),
158 0 : "pause" => Ok(Self::Pause),
159 0 : "draining" => Ok(Self::Draining),
160 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
161 : }
162 13 : }
163 : }
164 :
165 : impl From<NodeSchedulingPolicy> for String {
166 393 : fn from(value: NodeSchedulingPolicy) -> String {
167 393 : use NodeSchedulingPolicy::*;
168 393 : match value {
169 0 : Active => "active",
170 393 : Filling => "filling",
171 0 : Pause => "pause",
172 0 : Draining => "draining",
173 : }
174 393 : .to_string()
175 393 : }
176 : }
177 :
178 0 : #[derive(Serialize, Deserialize, Debug)]
179 : pub struct TenantShardMigrateResponse {}
180 :
181 : impl AttachmentService {
182 2688 : pub fn from_env(env: &LocalEnv) -> Self {
183 2688 : let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
184 2688 : .unwrap()
185 2688 : .join("attachments.json");
186 2688 :
187 2688 : // Makes no sense to construct this if pageservers aren't going to use it: assume
188 2688 : // pageservers have control plane API set
189 2688 : let listen_url = env.control_plane_api.clone().unwrap();
190 2688 :
191 2688 : let listen = format!(
192 2688 : "{}:{}",
193 2688 : listen_url.host_str().unwrap(),
194 2688 : listen_url.port().unwrap()
195 2688 : );
196 2688 :
197 2688 : // Convention: NeonEnv in python tests reserves the next port after the control_plane_api
198 2688 : // port, for use by our captive postgres.
199 2688 : let postgres_port = listen_url
200 2688 : .port()
201 2688 : .expect("Control plane API setting should always have a port")
202 2688 : + 1;
203 2688 :
204 2688 : // Assume all pageservers have symmetric auth configuration: this service
205 2688 : // expects to use one JWT token to talk to all of them.
206 2688 : let ps_conf = env
207 2688 : .pageservers
208 2688 : .first()
209 2688 : .expect("Config is validated to contain at least one pageserver");
210 2688 : let (jwt_token, public_key) = match ps_conf.http_auth_type {
211 2618 : AuthType::Trust => (None, None),
212 : AuthType::NeonJWT => {
213 70 : let jwt_token = env
214 70 : .generate_auth_token(&Claims::new(None, Scope::PageServerApi))
215 70 : .unwrap();
216 70 :
217 70 : // If pageserver auth is enabled, this implicitly enables auth for this service,
218 70 : // using the same credentials.
219 70 : let public_key_path =
220 70 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
221 70 : .unwrap();
222 :
223 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
224 70 : let public_key = if std::fs::metadata(&public_key_path)
225 70 : .expect("Can't stat public key")
226 70 : .is_dir()
227 : {
228 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
229 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
230 1 : let mut dir =
231 1 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
232 1 : let dent = dir
233 1 : .next()
234 1 : .expect("Empty key dir")
235 1 : .expect("Error reading key dir");
236 1 :
237 1 : std::fs::read_to_string(dent.path()).expect("Can't read public key")
238 : } else {
239 69 : std::fs::read_to_string(&public_key_path).expect("Can't read public key")
240 : };
241 70 : (Some(jwt_token), Some(public_key))
242 : }
243 : };
244 :
245 2688 : Self {
246 2688 : env: env.clone(),
247 2688 : path,
248 2688 : listen,
249 2688 : jwt_token,
250 2688 : public_key,
251 2688 : postgres_port,
252 2688 : client: reqwest::ClientBuilder::new()
253 2688 : .build()
254 2688 : .expect("Failed to construct http client"),
255 2688 : }
256 2688 : }
257 :
258 724 : fn pid_file(&self) -> Utf8PathBuf {
259 724 : Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("attachment_service.pid"))
260 724 : .expect("non-Unicode path")
261 724 : }
262 :
263 : /// PIDFile for the postgres instance used to store attachment service state
264 361 : fn postgres_pid_file(&self) -> Utf8PathBuf {
265 361 : Utf8PathBuf::from_path_buf(
266 361 : self.env
267 361 : .base_data_dir
268 361 : .join("attachment_service_postgres.pid"),
269 361 : )
270 361 : .expect("non-Unicode path")
271 361 : }
272 :
273 : /// In order to access database migrations, we need to find the Neon source tree
274 361 : async fn find_source_root(&self) -> anyhow::Result<Utf8PathBuf> {
275 : // We assume that either prd or our binary is in the source tree. The former is usually
276 : // true for automated test runners, the latter is usually true for developer workstations. Often
277 : // both are true, which is fine.
278 361 : let candidate_start_points = [
279 : // Current working directory
280 361 : Utf8PathBuf::from_path_buf(std::env::current_dir()?).unwrap(),
281 361 : // Directory containing the binary we're running inside
282 361 : Utf8PathBuf::from_path_buf(env::current_exe()?.parent().unwrap().to_owned()).unwrap(),
283 : ];
284 :
285 : // For each candidate start point, search through ancestors looking for a neon.git source tree root
286 361 : for start_point in &candidate_start_points {
287 : // Start from the build dir: assumes we are running out of a built neon source tree
288 361 : for path in start_point.ancestors() {
289 : // A crude approximation: the root of the source tree is whatever contains a "control_plane"
290 : // subdirectory.
291 361 : let control_plane = path.join("control_plane");
292 361 : if tokio::fs::try_exists(&control_plane).await? {
293 361 : return Ok(path.to_owned());
294 0 : }
295 : }
296 : }
297 :
298 : // Fall-through
299 0 : Err(anyhow::anyhow!(
300 0 : "Could not find control_plane src dir, after searching ancestors of {candidate_start_points:?}"
301 0 : ))
302 361 : }
303 :
304 : /// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
305 : ///
306 : /// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
307 : /// to other versions if that one isn't found. Some automated tests create circumstances
308 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
309 724 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
310 724 : let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14];
311 :
312 728 : for v in prefer_versions {
313 728 : let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
314 728 : if tokio::fs::try_exists(&path).await? {
315 724 : return Ok(path);
316 4 : }
317 : }
318 :
319 : // Fall through
320 0 : anyhow::bail!(
321 0 : "Postgres binaries not found in {}",
322 0 : self.env.pg_distrib_dir.display()
323 0 : );
324 724 : }
325 :
326 : /// Readiness check for our postgres process
327 727 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
328 727 : let bin_path = pg_bin_dir.join("pg_isready");
329 727 : let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
330 1087 : let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
331 :
332 727 : Ok(exitcode.success())
333 727 : }
334 :
335 : /// Create our database if it doesn't exist, and run migrations.
336 : ///
337 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
338 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
339 : /// who just want to run `cargo neon_local` without knowing about diesel.
340 : ///
341 : /// Returns the database url
342 361 : pub async fn setup_database(&self) -> anyhow::Result<String> {
343 361 : let database_url = format!(
344 361 : "postgresql://localhost:{}/attachment_service",
345 361 : self.postgres_port
346 361 : );
347 361 : println!("Running attachment service database setup...");
348 361 : fn change_database_of_url(database_url: &str, default_database: &str) -> (String, String) {
349 355 : let base = ::url::Url::parse(database_url).unwrap();
350 355 : let database = base.path_segments().unwrap().last().unwrap().to_owned();
351 355 : let mut new_url = base.join(default_database).unwrap();
352 355 : new_url.set_query(base.query());
353 355 : (database, new_url.into())
354 355 : }
355 361 :
356 361 : #[derive(Debug, Clone)]
357 361 : pub struct CreateDatabaseStatement {
358 361 : db_name: String,
359 361 : }
360 361 :
361 361 : impl CreateDatabaseStatement {
362 361 : pub fn new(db_name: &str) -> Self {
363 355 : CreateDatabaseStatement {
364 355 : db_name: db_name.to_owned(),
365 355 : }
366 355 : }
367 361 : }
368 361 :
369 361 : impl<DB: Backend> QueryFragment<DB> for CreateDatabaseStatement {
370 1420 : fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
371 1420 : out.push_sql("CREATE DATABASE ");
372 1420 : out.push_identifier(&self.db_name)?;
373 1420 : Ok(())
374 1420 : }
375 361 : }
376 361 :
377 361 : impl<Conn> RunQueryDsl<Conn> for CreateDatabaseStatement {}
378 361 :
379 361 : impl QueryId for CreateDatabaseStatement {
380 361 : type QueryId = ();
381 361 :
382 361 : const HAS_STATIC_QUERY_ID: bool = false;
383 361 : }
384 361 : if PgConnection::establish(&database_url).is_err() {
385 355 : let (database, postgres_url) = change_database_of_url(&database_url, "postgres");
386 355 : println!("Creating database: {database}");
387 355 : let mut conn = PgConnection::establish(&postgres_url)?;
388 355 : CreateDatabaseStatement::new(&database).execute(&mut conn)?;
389 6 : }
390 361 : let mut conn = PgConnection::establish(&database_url)?;
391 :
392 361 : let migrations_dir = self
393 361 : .find_source_root()
394 360 : .await?
395 361 : .join("control_plane/attachment_service/migrations");
396 :
397 361 : let migrations = diesel_migrations::FileBasedMigrations::from_path(migrations_dir)?;
398 361 : println!("Running migrations in {}", migrations.path().display());
399 361 : HarnessWithOutput::write_to_stdout(&mut conn)
400 361 : .run_pending_migrations(migrations)
401 361 : .map(|_| ())
402 361 : .map_err(|e| anyhow::anyhow!(e))?;
403 :
404 361 : println!("Migrations complete");
405 361 :
406 361 : Ok(database_url)
407 361 : }
408 :
409 361 : pub async fn start(&self) -> anyhow::Result<()> {
410 361 : // Start a vanilla Postgres process used by the attachment service for persistence.
411 361 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
412 361 : .unwrap()
413 361 : .join("attachment_service_db");
414 363 : let pg_bin_dir = self.get_pg_bin_dir().await?;
415 361 : let pg_log_path = pg_data_path.join("postgres.log");
416 361 :
417 361 : if !tokio::fs::try_exists(&pg_data_path).await? {
418 : // Initialize empty database
419 355 : let initdb_path = pg_bin_dir.join("initdb");
420 355 : let mut child = Command::new(&initdb_path)
421 355 : .args(["-D", pg_data_path.as_ref()])
422 355 : .spawn()
423 355 : .expect("Failed to spawn initdb");
424 355 : let status = child.wait().await?;
425 355 : if !status.success() {
426 0 : anyhow::bail!("initdb failed with status {status}");
427 355 : }
428 355 :
429 355 : tokio::fs::write(
430 355 : &pg_data_path.join("postgresql.conf"),
431 355 : format!("port = {}", self.postgres_port),
432 355 : )
433 355 : .await?;
434 6 : };
435 :
436 361 : println!("Starting attachment service database...");
437 361 : let db_start_args = [
438 361 : "-w",
439 361 : "-D",
440 361 : pg_data_path.as_ref(),
441 361 : "-l",
442 361 : pg_log_path.as_ref(),
443 361 : "start",
444 361 : ];
445 361 :
446 361 : background_process::start_process(
447 361 : "attachment_service_db",
448 361 : &self.env.base_data_dir,
449 361 : pg_bin_dir.join("pg_ctl").as_std_path(),
450 361 : db_start_args,
451 361 : [],
452 361 : background_process::InitialPidFile::Create(self.postgres_pid_file()),
453 727 : || self.pg_isready(&pg_bin_dir),
454 361 : )
455 1087 : .await?;
456 :
457 : // Run migrations on every startup, in case something changed.
458 361 : let database_url = self.setup_database().await?;
459 :
460 361 : let mut args = vec![
461 361 : "-l",
462 361 : &self.listen,
463 361 : "-p",
464 361 : self.path.as_ref(),
465 361 : "--database-url",
466 361 : &database_url,
467 361 : ]
468 361 : .into_iter()
469 2166 : .map(|s| s.to_string())
470 361 : .collect::<Vec<_>>();
471 361 : if let Some(jwt_token) = &self.jwt_token {
472 11 : args.push(format!("--jwt-token={jwt_token}"));
473 350 : }
474 :
475 361 : if let Some(public_key) = &self.public_key {
476 11 : args.push(format!("--public-key=\"{public_key}\""));
477 350 : }
478 :
479 361 : if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
480 2 : args.push(format!(
481 2 : "--compute-hook-url={control_plane_compute_hook_api}"
482 2 : ));
483 359 : }
484 :
485 361 : background_process::start_process(
486 361 : COMMAND,
487 361 : &self.env.base_data_dir,
488 361 : &self.env.attachment_service_bin(),
489 361 : args,
490 361 : [(
491 361 : "NEON_REPO_DIR".to_string(),
492 361 : self.env.base_data_dir.to_string_lossy().to_string(),
493 361 : )],
494 361 : background_process::InitialPidFile::Create(self.pid_file()),
495 722 : || async {
496 1444 : match self.status().await {
497 361 : Ok(_) => Ok(true),
498 361 : Err(_) => Ok(false),
499 : }
500 722 : },
501 361 : )
502 1444 : .await?;
503 :
504 361 : Ok(())
505 361 : }
506 :
507 363 : pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
508 363 : background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
509 :
510 363 : let pg_data_path = self.env.base_data_dir.join("attachment_service_db");
511 365 : let pg_bin_dir = self.get_pg_bin_dir().await?;
512 :
513 363 : println!("Stopping attachment service database...");
514 363 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
515 363 : let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
516 363 : .args(pg_stop_args)
517 363 : .spawn()?
518 363 : .wait()
519 363 : .await?;
520 363 : if !stop_status.success() {
521 2 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
522 2 : let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
523 2 : .args(pg_status_args)
524 2 : .spawn()?
525 2 : .wait()
526 2 : .await?;
527 :
528 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
529 : // fine that stop failed. Otherwise it is an error that stop failed.
530 : const PG_STATUS_NOT_RUNNING: i32 = 3;
531 2 : if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
532 2 : println!("Attachment service data base is already stopped");
533 2 : return Ok(());
534 : } else {
535 0 : anyhow::bail!("Failed to stop attachment service database: {stop_status}")
536 : }
537 361 : }
538 361 :
539 361 : Ok(())
540 363 : }
541 :
542 : /// Simple HTTP request wrapper for calling into attachment service
543 3143 : async fn dispatch<RQ, RS>(
544 3143 : &self,
545 3143 : method: hyper::Method,
546 3143 : path: String,
547 3143 : body: Option<RQ>,
548 3143 : ) -> anyhow::Result<RS>
549 3143 : where
550 3143 : RQ: Serialize + Sized,
551 3143 : RS: DeserializeOwned + Sized,
552 3143 : {
553 3143 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
554 3143 : // for general purpose API access.
555 3143 : let listen_url = self.env.control_plane_api.clone().unwrap();
556 3143 : let url = Url::from_str(&format!(
557 3143 : "http://{}:{}/{path}",
558 3143 : listen_url.host_str().unwrap(),
559 3143 : listen_url.port().unwrap()
560 3143 : ))
561 3143 : .unwrap();
562 3143 :
563 3143 : let mut builder = self.client.request(method, url);
564 3143 : if let Some(body) = body {
565 1857 : builder = builder.json(&body)
566 1286 : }
567 3143 : if let Some(jwt_token) = &self.jwt_token {
568 82 : builder = builder.header(
569 82 : reqwest::header::AUTHORIZATION,
570 82 : format!("Bearer {jwt_token}"),
571 82 : );
572 3061 : }
573 :
574 7793 : let response = builder.send().await?;
575 2782 : let response = response.error_from_body().await?;
576 :
577 2777 : Ok(response
578 2777 : .json()
579 0 : .await
580 2777 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
581 3143 : }
582 :
583 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
584 0 : #[instrument(skip(self))]
585 : pub async fn attach_hook(
586 : &self,
587 : tenant_shard_id: TenantShardId,
588 : pageserver_id: NodeId,
589 : ) -> anyhow::Result<Option<u32>> {
590 : let request = AttachHookRequest {
591 : tenant_shard_id,
592 : node_id: Some(pageserver_id),
593 : };
594 :
595 : let response = self
596 : .dispatch::<_, AttachHookResponse>(
597 : Method::POST,
598 : "debug/v1/attach-hook".to_string(),
599 : Some(request),
600 : )
601 : .await?;
602 :
603 : Ok(response.gen)
604 : }
605 :
606 0 : #[instrument(skip(self))]
607 : pub async fn inspect(
608 : &self,
609 : tenant_shard_id: TenantShardId,
610 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
611 : let request = InspectRequest { tenant_shard_id };
612 :
613 : let response = self
614 : .dispatch::<_, InspectResponse>(
615 : Method::POST,
616 : "debug/v1/inspect".to_string(),
617 : Some(request),
618 : )
619 : .await?;
620 :
621 : Ok(response.attachment)
622 : }
623 :
624 916 : #[instrument(skip(self))]
625 : pub async fn tenant_create(
626 : &self,
627 : req: TenantCreateRequest,
628 : ) -> anyhow::Result<TenantCreateResponse> {
629 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
630 : .await
631 : }
632 :
633 564 : #[instrument(skip(self))]
634 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
635 : self.dispatch::<(), _>(
636 : Method::GET,
637 : format!("control/v1/tenant/{tenant_id}/locate"),
638 : None,
639 : )
640 : .await
641 : }
642 :
643 0 : #[instrument(skip(self))]
644 : pub async fn tenant_migrate(
645 : &self,
646 : tenant_shard_id: TenantShardId,
647 : node_id: NodeId,
648 : ) -> anyhow::Result<TenantShardMigrateResponse> {
649 : self.dispatch(
650 : Method::PUT,
651 : format!("tenant/{tenant_shard_id}/migrate"),
652 : Some(TenantShardMigrateRequest {
653 : tenant_shard_id,
654 : node_id,
655 : }),
656 : )
657 : .await
658 : }
659 :
660 603 : #[instrument(skip_all, fields(node_id=%req.node_id))]
661 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
662 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
663 : .await
664 : }
665 :
666 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
667 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
668 : self.dispatch::<_, ()>(
669 : Method::PUT,
670 : format!("control/v1/node/{}/config", req.node_id),
671 : Some(req),
672 : )
673 : .await
674 : }
675 :
676 722 : #[instrument(skip(self))]
677 : pub async fn status(&self) -> anyhow::Result<()> {
678 : self.dispatch::<(), ()>(Method::GET, "status".to_string(), None)
679 : .await
680 : }
681 :
682 796 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
683 : pub async fn tenant_timeline_create(
684 : &self,
685 : tenant_id: TenantId,
686 : req: TimelineCreateRequest,
687 : ) -> anyhow::Result<TimelineInfo> {
688 : self.dispatch(
689 : Method::POST,
690 : format!("v1/tenant/{tenant_id}/timeline"),
691 : Some(req),
692 : )
693 : .await
694 : }
695 : }
|