LCOV - code coverage report
Current view: top level - control_plane/src - attachment_service.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 92.1 % 368 339
Test Date: 2024-02-07 07:37:29 Functions: 31.3 % 278 87

            Line data    Source code
       1              : use crate::{background_process, local_env::LocalEnv};
       2              : use camino::{Utf8Path, Utf8PathBuf};
       3              : use diesel::{
       4              :     backend::Backend,
       5              :     query_builder::{AstPass, QueryFragment, QueryId},
       6              :     Connection, PgConnection, QueryResult, RunQueryDsl,
       7              : };
       8              : use diesel_migrations::{HarnessWithOutput, MigrationHarness};
       9              : use hyper::Method;
      10              : use pageserver_api::{
      11              :     models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo},
      12              :     shard::TenantShardId,
      13              : };
      14              : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
      15              : use postgres_backend::AuthType;
      16              : use serde::{de::DeserializeOwned, Deserialize, Serialize};
      17              : use std::{env, str::FromStr};
      18              : use tokio::process::Command;
      19              : use tracing::instrument;
      20              : use url::Url;
      21              : use utils::{
      22              :     auth::{Claims, Scope},
      23              :     id::{NodeId, TenantId},
      24              : };
      25              : 
      26              : pub struct AttachmentService {
      27              :     env: LocalEnv,
      28              :     listen: String,
      29              :     path: Utf8PathBuf,
      30              :     jwt_token: Option<String>,
      31              :     public_key: Option<String>,
      32              :     postgres_port: u16,
      33              :     client: reqwest::Client,
      34              : }
      35              : 
      36              : const COMMAND: &str = "attachment_service";
      37              : 
      38              : const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16;
      39              : 
      40         1060 : #[derive(Serialize, Deserialize)]
      41              : pub struct AttachHookRequest {
      42              :     pub tenant_shard_id: TenantShardId,
      43              :     pub node_id: Option<NodeId>,
      44              : }
      45              : 
      46          212 : #[derive(Serialize, Deserialize)]
      47              : pub struct AttachHookResponse {
      48              :     pub gen: Option<u32>,
      49              : }
      50              : 
      51          216 : #[derive(Serialize, Deserialize)]
      52              : pub struct InspectRequest {
      53              :     pub tenant_shard_id: TenantShardId,
      54              : }
      55              : 
      56           72 : #[derive(Serialize, Deserialize)]
      57              : pub struct InspectResponse {
      58              :     pub attachment: Option<(u32, NodeId)>,
      59              : }
      60              : 
      61         3367 : #[derive(Serialize, Deserialize)]
      62              : pub struct TenantCreateResponseShard {
      63              :     pub shard_id: TenantShardId,
      64              :     pub node_id: NodeId,
      65              :     pub generation: u32,
      66              : }
      67              : 
      68         1371 : #[derive(Serialize, Deserialize)]
      69              : pub struct TenantCreateResponse {
      70              :     pub shards: Vec<TenantCreateResponseShard>,
      71              : }
      72              : 
      73         6633 : #[derive(Serialize, Deserialize)]
      74              : pub struct NodeRegisterRequest {
      75              :     pub node_id: NodeId,
      76              : 
      77              :     pub listen_pg_addr: String,
      78              :     pub listen_pg_port: u16,
      79              : 
      80              :     pub listen_http_addr: String,
      81              :     pub listen_http_port: u16,
      82              : }
      83              : 
      84           20 : #[derive(Serialize, Deserialize)]
      85              : pub struct NodeConfigureRequest {
      86              :     pub node_id: NodeId,
      87              : 
      88              :     pub availability: Option<NodeAvailability>,
      89              :     pub scheduling: Option<NodeSchedulingPolicy>,
      90              : }
      91              : 
      92         7566 : #[derive(Serialize, Deserialize, Debug)]
      93              : pub struct TenantLocateResponseShard {
      94              :     pub shard_id: TenantShardId,
      95              :     pub node_id: NodeId,
      96              : 
      97              :     pub listen_pg_addr: String,
      98              :     pub listen_pg_port: u16,
      99              : 
     100              :     pub listen_http_addr: String,
     101              :     pub listen_http_port: u16,
     102              : }
     103              : 
     104         2820 : #[derive(Serialize, Deserialize)]
     105              : pub struct TenantLocateResponse {
     106              :     pub shards: Vec<TenantLocateResponseShard>,
     107              :     pub shard_params: ShardParameters,
     108              : }
     109              : 
     110              : /// Explicitly migrating a particular shard is a low level operation
     111              : /// TODO: higher level "Reschedule tenant" operation where the request
     112              : /// specifies some constraints, e.g. asking it to get off particular node(s)
     113            0 : #[derive(Serialize, Deserialize, Debug)]
     114              : pub struct TenantShardMigrateRequest {
     115              :     pub tenant_shard_id: TenantShardId,
     116              :     pub node_id: NodeId,
     117              : }
     118              : 
     119          914 : #[derive(Serialize, Deserialize, Clone, Copy)]
     120              : pub enum NodeAvailability {
     121              :     // Normal, happy state
     122              :     Active,
     123              :     // Offline: Tenants shouldn't try to attach here, but they may assume that their
     124              :     // secondary locations on this node still exist.  Newly added nodes are in this
     125              :     // state until we successfully contact them.
     126              :     Offline,
     127              : }
     128              : 
     129              : impl FromStr for NodeAvailability {
     130              :     type Err = anyhow::Error;
     131              : 
     132            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     133            0 :         match s {
     134            0 :             "active" => Ok(Self::Active),
     135            0 :             "offline" => Ok(Self::Offline),
     136            0 :             _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
     137              :         }
     138            0 :     }
     139              : }
     140              : 
     141              : /// FIXME: this is a duplicate of the type in the attachment_service crate, because the
     142              : /// type needs to be defined with diesel traits in there.
     143          914 : #[derive(Serialize, Deserialize, Clone, Copy)]
     144              : pub enum NodeSchedulingPolicy {
     145              :     Active,
     146              :     Filling,
     147              :     Pause,
     148              :     Draining,
     149              : }
     150              : 
     151              : impl FromStr for NodeSchedulingPolicy {
     152              :     type Err = anyhow::Error;
     153              : 
     154           13 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     155           13 :         match s {
     156           13 :             "active" => Ok(Self::Active),
     157           13 :             "filling" => Ok(Self::Filling),
     158            0 :             "pause" => Ok(Self::Pause),
     159            0 :             "draining" => Ok(Self::Draining),
     160            0 :             _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
     161              :         }
     162           13 :     }
     163              : }
     164              : 
     165              : impl From<NodeSchedulingPolicy> for String {
     166          393 :     fn from(value: NodeSchedulingPolicy) -> String {
     167          393 :         use NodeSchedulingPolicy::*;
     168          393 :         match value {
     169            0 :             Active => "active",
     170          393 :             Filling => "filling",
     171            0 :             Pause => "pause",
     172            0 :             Draining => "draining",
     173              :         }
     174          393 :         .to_string()
     175          393 :     }
     176              : }
     177              : 
     178            0 : #[derive(Serialize, Deserialize, Debug)]
     179              : pub struct TenantShardMigrateResponse {}
     180              : 
     181              : impl AttachmentService {
     182         2688 :     pub fn from_env(env: &LocalEnv) -> Self {
     183         2688 :         let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
     184         2688 :             .unwrap()
     185         2688 :             .join("attachments.json");
     186         2688 : 
     187         2688 :         // Makes no sense to construct this if pageservers aren't going to use it: assume
     188         2688 :         // pageservers have control plane API set
     189         2688 :         let listen_url = env.control_plane_api.clone().unwrap();
     190         2688 : 
     191         2688 :         let listen = format!(
     192         2688 :             "{}:{}",
     193         2688 :             listen_url.host_str().unwrap(),
     194         2688 :             listen_url.port().unwrap()
     195         2688 :         );
     196         2688 : 
     197         2688 :         // Convention: NeonEnv in python tests reserves the next port after the control_plane_api
     198         2688 :         // port, for use by our captive postgres.
     199         2688 :         let postgres_port = listen_url
     200         2688 :             .port()
     201         2688 :             .expect("Control plane API setting should always have a port")
     202         2688 :             + 1;
     203         2688 : 
     204         2688 :         // Assume all pageservers have symmetric auth configuration: this service
     205         2688 :         // expects to use one JWT token to talk to all of them.
     206         2688 :         let ps_conf = env
     207         2688 :             .pageservers
     208         2688 :             .first()
     209         2688 :             .expect("Config is validated to contain at least one pageserver");
     210         2688 :         let (jwt_token, public_key) = match ps_conf.http_auth_type {
     211         2618 :             AuthType::Trust => (None, None),
     212              :             AuthType::NeonJWT => {
     213           70 :                 let jwt_token = env
     214           70 :                     .generate_auth_token(&Claims::new(None, Scope::PageServerApi))
     215           70 :                     .unwrap();
     216           70 : 
     217           70 :                 // If pageserver auth is enabled, this implicitly enables auth for this service,
     218           70 :                 // using the same credentials.
     219           70 :                 let public_key_path =
     220           70 :                     camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
     221           70 :                         .unwrap();
     222              : 
     223              :                 // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
     224           70 :                 let public_key = if std::fs::metadata(&public_key_path)
     225           70 :                     .expect("Can't stat public key")
     226           70 :                     .is_dir()
     227              :                 {
     228              :                     // Our config may specify a directory: this is for the pageserver's ability to handle multiple
     229              :                     // keys.  We only use one key at a time, so, arbitrarily load the first one in the directory.
     230            1 :                     let mut dir =
     231            1 :                         std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
     232            1 :                     let dent = dir
     233            1 :                         .next()
     234            1 :                         .expect("Empty key dir")
     235            1 :                         .expect("Error reading key dir");
     236            1 : 
     237            1 :                     std::fs::read_to_string(dent.path()).expect("Can't read public key")
     238              :                 } else {
     239           69 :                     std::fs::read_to_string(&public_key_path).expect("Can't read public key")
     240              :                 };
     241           70 :                 (Some(jwt_token), Some(public_key))
     242              :             }
     243              :         };
     244              : 
     245         2688 :         Self {
     246         2688 :             env: env.clone(),
     247         2688 :             path,
     248         2688 :             listen,
     249         2688 :             jwt_token,
     250         2688 :             public_key,
     251         2688 :             postgres_port,
     252         2688 :             client: reqwest::ClientBuilder::new()
     253         2688 :                 .build()
     254         2688 :                 .expect("Failed to construct http client"),
     255         2688 :         }
     256         2688 :     }
     257              : 
     258          724 :     fn pid_file(&self) -> Utf8PathBuf {
     259          724 :         Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("attachment_service.pid"))
     260          724 :             .expect("non-Unicode path")
     261          724 :     }
     262              : 
     263              :     /// PIDFile for the postgres instance used to store attachment service state
     264          361 :     fn postgres_pid_file(&self) -> Utf8PathBuf {
     265          361 :         Utf8PathBuf::from_path_buf(
     266          361 :             self.env
     267          361 :                 .base_data_dir
     268          361 :                 .join("attachment_service_postgres.pid"),
     269          361 :         )
     270          361 :         .expect("non-Unicode path")
     271          361 :     }
     272              : 
     273              :     /// In order to access database migrations, we need to find the Neon source tree
     274          361 :     async fn find_source_root(&self) -> anyhow::Result<Utf8PathBuf> {
     275              :         // We assume that either prd or our binary is in the source tree. The former is usually
     276              :         // true for automated test runners, the latter is usually true for developer workstations. Often
     277              :         // both are true, which is fine.
     278          361 :         let candidate_start_points = [
     279              :             // Current working directory
     280          361 :             Utf8PathBuf::from_path_buf(std::env::current_dir()?).unwrap(),
     281          361 :             // Directory containing the binary we're running inside
     282          361 :             Utf8PathBuf::from_path_buf(env::current_exe()?.parent().unwrap().to_owned()).unwrap(),
     283              :         ];
     284              : 
     285              :         // For each candidate start point, search through ancestors looking for a neon.git source tree root
     286          361 :         for start_point in &candidate_start_points {
     287              :             // Start from the build dir: assumes we are running out of a built neon source tree
     288          361 :             for path in start_point.ancestors() {
     289              :                 // A crude approximation: the root of the source tree is whatever contains a "control_plane"
     290              :                 // subdirectory.
     291          361 :                 let control_plane = path.join("control_plane");
     292          361 :                 if tokio::fs::try_exists(&control_plane).await? {
     293          361 :                     return Ok(path.to_owned());
     294            0 :                 }
     295              :             }
     296              :         }
     297              : 
     298              :         // Fall-through
     299            0 :         Err(anyhow::anyhow!(
     300            0 :             "Could not find control_plane src dir, after searching ancestors of {candidate_start_points:?}"
     301            0 :         ))
     302          361 :     }
     303              : 
     304              :     /// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
     305              :     ///
     306              :     /// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
     307              :     /// to other versions if that one isn't found.  Some automated tests create circumstances
     308              :     /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
     309          724 :     pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
     310          724 :         let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14];
     311              : 
     312          728 :         for v in prefer_versions {
     313          728 :             let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
     314          728 :             if tokio::fs::try_exists(&path).await? {
     315          724 :                 return Ok(path);
     316            4 :             }
     317              :         }
     318              : 
     319              :         // Fall through
     320            0 :         anyhow::bail!(
     321            0 :             "Postgres binaries not found in {}",
     322            0 :             self.env.pg_distrib_dir.display()
     323            0 :         );
     324          724 :     }
     325              : 
     326              :     /// Readiness check for our postgres process
     327          727 :     async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
     328          727 :         let bin_path = pg_bin_dir.join("pg_isready");
     329          727 :         let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
     330         1087 :         let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
     331              : 
     332          727 :         Ok(exitcode.success())
     333          727 :     }
     334              : 
     335              :     /// Create our database if it doesn't exist, and run migrations.
     336              :     ///
     337              :     /// This function is equivalent to the `diesel setup` command in the diesel CLI.  We implement
     338              :     /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
     339              :     /// who just want to run `cargo neon_local` without knowing about diesel.
     340              :     ///
     341              :     /// Returns the database url
     342          361 :     pub async fn setup_database(&self) -> anyhow::Result<String> {
     343          361 :         let database_url = format!(
     344          361 :             "postgresql://localhost:{}/attachment_service",
     345          361 :             self.postgres_port
     346          361 :         );
     347          361 :         println!("Running attachment service database setup...");
     348          361 :         fn change_database_of_url(database_url: &str, default_database: &str) -> (String, String) {
     349          355 :             let base = ::url::Url::parse(database_url).unwrap();
     350          355 :             let database = base.path_segments().unwrap().last().unwrap().to_owned();
     351          355 :             let mut new_url = base.join(default_database).unwrap();
     352          355 :             new_url.set_query(base.query());
     353          355 :             (database, new_url.into())
     354          355 :         }
     355          361 : 
     356          361 :         #[derive(Debug, Clone)]
     357          361 :         pub struct CreateDatabaseStatement {
     358          361 :             db_name: String,
     359          361 :         }
     360          361 : 
     361          361 :         impl CreateDatabaseStatement {
     362          361 :             pub fn new(db_name: &str) -> Self {
     363          355 :                 CreateDatabaseStatement {
     364          355 :                     db_name: db_name.to_owned(),
     365          355 :                 }
     366          355 :             }
     367          361 :         }
     368          361 : 
     369          361 :         impl<DB: Backend> QueryFragment<DB> for CreateDatabaseStatement {
     370         1420 :             fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
     371         1420 :                 out.push_sql("CREATE DATABASE ");
     372         1420 :                 out.push_identifier(&self.db_name)?;
     373         1420 :                 Ok(())
     374         1420 :             }
     375          361 :         }
     376          361 : 
     377          361 :         impl<Conn> RunQueryDsl<Conn> for CreateDatabaseStatement {}
     378          361 : 
     379          361 :         impl QueryId for CreateDatabaseStatement {
     380          361 :             type QueryId = ();
     381          361 : 
     382          361 :             const HAS_STATIC_QUERY_ID: bool = false;
     383          361 :         }
     384          361 :         if PgConnection::establish(&database_url).is_err() {
     385          355 :             let (database, postgres_url) = change_database_of_url(&database_url, "postgres");
     386          355 :             println!("Creating database: {database}");
     387          355 :             let mut conn = PgConnection::establish(&postgres_url)?;
     388          355 :             CreateDatabaseStatement::new(&database).execute(&mut conn)?;
     389            6 :         }
     390          361 :         let mut conn = PgConnection::establish(&database_url)?;
     391              : 
     392          361 :         let migrations_dir = self
     393          361 :             .find_source_root()
     394          360 :             .await?
     395          361 :             .join("control_plane/attachment_service/migrations");
     396              : 
     397          361 :         let migrations = diesel_migrations::FileBasedMigrations::from_path(migrations_dir)?;
     398          361 :         println!("Running migrations in {}", migrations.path().display());
     399          361 :         HarnessWithOutput::write_to_stdout(&mut conn)
     400          361 :             .run_pending_migrations(migrations)
     401          361 :             .map(|_| ())
     402          361 :             .map_err(|e| anyhow::anyhow!(e))?;
     403              : 
     404          361 :         println!("Migrations complete");
     405          361 : 
     406          361 :         Ok(database_url)
     407          361 :     }
     408              : 
     409          361 :     pub async fn start(&self) -> anyhow::Result<()> {
     410          361 :         // Start a vanilla Postgres process used by the attachment service for persistence.
     411          361 :         let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
     412          361 :             .unwrap()
     413          361 :             .join("attachment_service_db");
     414          363 :         let pg_bin_dir = self.get_pg_bin_dir().await?;
     415          361 :         let pg_log_path = pg_data_path.join("postgres.log");
     416          361 : 
     417          361 :         if !tokio::fs::try_exists(&pg_data_path).await? {
     418              :             // Initialize empty database
     419          355 :             let initdb_path = pg_bin_dir.join("initdb");
     420          355 :             let mut child = Command::new(&initdb_path)
     421          355 :                 .args(["-D", pg_data_path.as_ref()])
     422          355 :                 .spawn()
     423          355 :                 .expect("Failed to spawn initdb");
     424          355 :             let status = child.wait().await?;
     425          355 :             if !status.success() {
     426            0 :                 anyhow::bail!("initdb failed with status {status}");
     427          355 :             }
     428          355 : 
     429          355 :             tokio::fs::write(
     430          355 :                 &pg_data_path.join("postgresql.conf"),
     431          355 :                 format!("port = {}", self.postgres_port),
     432          355 :             )
     433          355 :             .await?;
     434            6 :         };
     435              : 
     436          361 :         println!("Starting attachment service database...");
     437          361 :         let db_start_args = [
     438          361 :             "-w",
     439          361 :             "-D",
     440          361 :             pg_data_path.as_ref(),
     441          361 :             "-l",
     442          361 :             pg_log_path.as_ref(),
     443          361 :             "start",
     444          361 :         ];
     445          361 : 
     446          361 :         background_process::start_process(
     447          361 :             "attachment_service_db",
     448          361 :             &self.env.base_data_dir,
     449          361 :             pg_bin_dir.join("pg_ctl").as_std_path(),
     450          361 :             db_start_args,
     451          361 :             [],
     452          361 :             background_process::InitialPidFile::Create(self.postgres_pid_file()),
     453          727 :             || self.pg_isready(&pg_bin_dir),
     454          361 :         )
     455         1087 :         .await?;
     456              : 
     457              :         // Run migrations on every startup, in case something changed.
     458          361 :         let database_url = self.setup_database().await?;
     459              : 
     460          361 :         let mut args = vec![
     461          361 :             "-l",
     462          361 :             &self.listen,
     463          361 :             "-p",
     464          361 :             self.path.as_ref(),
     465          361 :             "--database-url",
     466          361 :             &database_url,
     467          361 :         ]
     468          361 :         .into_iter()
     469         2166 :         .map(|s| s.to_string())
     470          361 :         .collect::<Vec<_>>();
     471          361 :         if let Some(jwt_token) = &self.jwt_token {
     472           11 :             args.push(format!("--jwt-token={jwt_token}"));
     473          350 :         }
     474              : 
     475          361 :         if let Some(public_key) = &self.public_key {
     476           11 :             args.push(format!("--public-key=\"{public_key}\""));
     477          350 :         }
     478              : 
     479          361 :         if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
     480            2 :             args.push(format!(
     481            2 :                 "--compute-hook-url={control_plane_compute_hook_api}"
     482            2 :             ));
     483          359 :         }
     484              : 
     485          361 :         background_process::start_process(
     486          361 :             COMMAND,
     487          361 :             &self.env.base_data_dir,
     488          361 :             &self.env.attachment_service_bin(),
     489          361 :             args,
     490          361 :             [(
     491          361 :                 "NEON_REPO_DIR".to_string(),
     492          361 :                 self.env.base_data_dir.to_string_lossy().to_string(),
     493          361 :             )],
     494          361 :             background_process::InitialPidFile::Create(self.pid_file()),
     495          722 :             || async {
     496         1444 :                 match self.status().await {
     497          361 :                     Ok(_) => Ok(true),
     498          361 :                     Err(_) => Ok(false),
     499              :                 }
     500          722 :             },
     501          361 :         )
     502         1444 :         .await?;
     503              : 
     504          361 :         Ok(())
     505          361 :     }
     506              : 
     507          363 :     pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
     508          363 :         background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
     509              : 
     510          363 :         let pg_data_path = self.env.base_data_dir.join("attachment_service_db");
     511          365 :         let pg_bin_dir = self.get_pg_bin_dir().await?;
     512              : 
     513          363 :         println!("Stopping attachment service database...");
     514          363 :         let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
     515          363 :         let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
     516          363 :             .args(pg_stop_args)
     517          363 :             .spawn()?
     518          363 :             .wait()
     519          363 :             .await?;
     520          363 :         if !stop_status.success() {
     521            2 :             let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
     522            2 :             let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
     523            2 :                 .args(pg_status_args)
     524            2 :                 .spawn()?
     525            2 :                 .wait()
     526            2 :                 .await?;
     527              : 
     528              :             // pg_ctl status returns this exit code if postgres is not running: in this case it is
     529              :             // fine that stop failed.  Otherwise it is an error that stop failed.
     530              :             const PG_STATUS_NOT_RUNNING: i32 = 3;
     531            2 :             if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
     532            2 :                 println!("Attachment service data base is already stopped");
     533            2 :                 return Ok(());
     534              :             } else {
     535            0 :                 anyhow::bail!("Failed to stop attachment service database: {stop_status}")
     536              :             }
     537          361 :         }
     538          361 : 
     539          361 :         Ok(())
     540          363 :     }
     541              : 
     542              :     /// Simple HTTP request wrapper for calling into attachment service
     543         3143 :     async fn dispatch<RQ, RS>(
     544         3143 :         &self,
     545         3143 :         method: hyper::Method,
     546         3143 :         path: String,
     547         3143 :         body: Option<RQ>,
     548         3143 :     ) -> anyhow::Result<RS>
     549         3143 :     where
     550         3143 :         RQ: Serialize + Sized,
     551         3143 :         RS: DeserializeOwned + Sized,
     552         3143 :     {
     553         3143 :         // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
     554         3143 :         // for general purpose API access.
     555         3143 :         let listen_url = self.env.control_plane_api.clone().unwrap();
     556         3143 :         let url = Url::from_str(&format!(
     557         3143 :             "http://{}:{}/{path}",
     558         3143 :             listen_url.host_str().unwrap(),
     559         3143 :             listen_url.port().unwrap()
     560         3143 :         ))
     561         3143 :         .unwrap();
     562         3143 : 
     563         3143 :         let mut builder = self.client.request(method, url);
     564         3143 :         if let Some(body) = body {
     565         1857 :             builder = builder.json(&body)
     566         1286 :         }
     567         3143 :         if let Some(jwt_token) = &self.jwt_token {
     568           82 :             builder = builder.header(
     569           82 :                 reqwest::header::AUTHORIZATION,
     570           82 :                 format!("Bearer {jwt_token}"),
     571           82 :             );
     572         3061 :         }
     573              : 
     574         7793 :         let response = builder.send().await?;
     575         2782 :         let response = response.error_from_body().await?;
     576              : 
     577         2777 :         Ok(response
     578         2777 :             .json()
     579            0 :             .await
     580         2777 :             .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
     581         3143 :     }
     582              : 
     583              :     /// Call into the attach_hook API, for use before handing out attachments to pageservers
     584            0 :     #[instrument(skip(self))]
     585              :     pub async fn attach_hook(
     586              :         &self,
     587              :         tenant_shard_id: TenantShardId,
     588              :         pageserver_id: NodeId,
     589              :     ) -> anyhow::Result<Option<u32>> {
     590              :         let request = AttachHookRequest {
     591              :             tenant_shard_id,
     592              :             node_id: Some(pageserver_id),
     593              :         };
     594              : 
     595              :         let response = self
     596              :             .dispatch::<_, AttachHookResponse>(
     597              :                 Method::POST,
     598              :                 "debug/v1/attach-hook".to_string(),
     599              :                 Some(request),
     600              :             )
     601              :             .await?;
     602              : 
     603              :         Ok(response.gen)
     604              :     }
     605              : 
     606            0 :     #[instrument(skip(self))]
     607              :     pub async fn inspect(
     608              :         &self,
     609              :         tenant_shard_id: TenantShardId,
     610              :     ) -> anyhow::Result<Option<(u32, NodeId)>> {
     611              :         let request = InspectRequest { tenant_shard_id };
     612              : 
     613              :         let response = self
     614              :             .dispatch::<_, InspectResponse>(
     615              :                 Method::POST,
     616              :                 "debug/v1/inspect".to_string(),
     617              :                 Some(request),
     618              :             )
     619              :             .await?;
     620              : 
     621              :         Ok(response.attachment)
     622              :     }
     623              : 
     624          916 :     #[instrument(skip(self))]
     625              :     pub async fn tenant_create(
     626              :         &self,
     627              :         req: TenantCreateRequest,
     628              :     ) -> anyhow::Result<TenantCreateResponse> {
     629              :         self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
     630              :             .await
     631              :     }
     632              : 
     633          564 :     #[instrument(skip(self))]
     634              :     pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
     635              :         self.dispatch::<(), _>(
     636              :             Method::GET,
     637              :             format!("control/v1/tenant/{tenant_id}/locate"),
     638              :             None,
     639              :         )
     640              :         .await
     641              :     }
     642              : 
     643            0 :     #[instrument(skip(self))]
     644              :     pub async fn tenant_migrate(
     645              :         &self,
     646              :         tenant_shard_id: TenantShardId,
     647              :         node_id: NodeId,
     648              :     ) -> anyhow::Result<TenantShardMigrateResponse> {
     649              :         self.dispatch(
     650              :             Method::PUT,
     651              :             format!("tenant/{tenant_shard_id}/migrate"),
     652              :             Some(TenantShardMigrateRequest {
     653              :                 tenant_shard_id,
     654              :                 node_id,
     655              :             }),
     656              :         )
     657              :         .await
     658              :     }
     659              : 
     660          603 :     #[instrument(skip_all, fields(node_id=%req.node_id))]
     661              :     pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
     662              :         self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
     663              :             .await
     664              :     }
     665              : 
     666            0 :     #[instrument(skip_all, fields(node_id=%req.node_id))]
     667              :     pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
     668              :         self.dispatch::<_, ()>(
     669              :             Method::PUT,
     670              :             format!("control/v1/node/{}/config", req.node_id),
     671              :             Some(req),
     672              :         )
     673              :         .await
     674              :     }
     675              : 
     676          722 :     #[instrument(skip(self))]
     677              :     pub async fn status(&self) -> anyhow::Result<()> {
     678              :         self.dispatch::<(), ()>(Method::GET, "status".to_string(), None)
     679              :             .await
     680              :     }
     681              : 
     682          796 :     #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
     683              :     pub async fn tenant_timeline_create(
     684              :         &self,
     685              :         tenant_id: TenantId,
     686              :         req: TimelineCreateRequest,
     687              :     ) -> anyhow::Result<TimelineInfo> {
     688              :         self.dispatch(
     689              :             Method::POST,
     690              :             format!("v1/tenant/{tenant_id}/timeline"),
     691              :             Some(req),
     692              :         )
     693              :         .await
     694              :     }
     695              : }
        

Generated by: LCOV version 2.1-beta