Line data Source code
1 : use crate::{
2 : background_process,
3 : local_env::{LocalEnv, NeonStorageControllerConf},
4 : };
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use hyper::Uri;
7 : use nix::unistd::Pid;
8 : use pageserver_api::{
9 : controller_api::{
10 : NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
11 : TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
12 : TenantShardMigrateResponse,
13 : },
14 : models::{
15 : TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
16 : },
17 : shard::{ShardStripeSize, TenantShardId},
18 : };
19 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
20 : use postgres_backend::AuthType;
21 : use reqwest::Method;
22 : use serde::{de::DeserializeOwned, Deserialize, Serialize};
23 : use std::{fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::OnceLock};
24 : use tokio::process::Command;
25 : use tracing::instrument;
26 : use url::Url;
27 : use utils::{
28 : auth::{encode_from_key_file, Claims, Scope},
29 : id::{NodeId, TenantId},
30 : };
31 :
32 : pub struct StorageController {
33 : env: LocalEnv,
34 : private_key: Option<Vec<u8>>,
35 : public_key: Option<String>,
36 : client: reqwest::Client,
37 : config: NeonStorageControllerConf,
38 :
39 : // The listen addresses is learned when starting the storage controller,
40 : // hence the use of OnceLock to init it at the right time.
41 : listen: OnceLock<SocketAddr>,
42 : }
43 :
44 : const COMMAND: &str = "storage_controller";
45 :
46 : const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
47 :
48 : const DB_NAME: &str = "storage_controller";
49 :
50 : pub struct NeonStorageControllerStartArgs {
51 : pub instance_id: u8,
52 : pub base_port: Option<u16>,
53 : pub start_timeout: humantime::Duration,
54 : }
55 :
56 : impl NeonStorageControllerStartArgs {
57 0 : pub fn with_default_instance_id(start_timeout: humantime::Duration) -> Self {
58 0 : Self {
59 0 : instance_id: 1,
60 0 : base_port: None,
61 0 : start_timeout,
62 0 : }
63 0 : }
64 : }
65 :
66 : pub struct NeonStorageControllerStopArgs {
67 : pub instance_id: u8,
68 : pub immediate: bool,
69 : }
70 :
71 : impl NeonStorageControllerStopArgs {
72 0 : pub fn with_default_instance_id(immediate: bool) -> Self {
73 0 : Self {
74 0 : instance_id: 1,
75 0 : immediate,
76 0 : }
77 0 : }
78 : }
79 :
80 0 : #[derive(Serialize, Deserialize)]
81 : pub struct AttachHookRequest {
82 : pub tenant_shard_id: TenantShardId,
83 : pub node_id: Option<NodeId>,
84 : pub generation_override: Option<i32>,
85 : }
86 :
87 0 : #[derive(Serialize, Deserialize)]
88 : pub struct AttachHookResponse {
89 : pub gen: Option<u32>,
90 : }
91 :
92 0 : #[derive(Serialize, Deserialize)]
93 : pub struct InspectRequest {
94 : pub tenant_shard_id: TenantShardId,
95 : }
96 :
97 0 : #[derive(Serialize, Deserialize)]
98 : pub struct InspectResponse {
99 : pub attachment: Option<(u32, NodeId)>,
100 : }
101 :
102 : impl StorageController {
103 0 : pub fn from_env(env: &LocalEnv) -> Self {
104 0 : // Assume all pageservers have symmetric auth configuration: this service
105 0 : // expects to use one JWT token to talk to all of them.
106 0 : let ps_conf = env
107 0 : .pageservers
108 0 : .first()
109 0 : .expect("Config is validated to contain at least one pageserver");
110 0 : let (private_key, public_key) = match ps_conf.http_auth_type {
111 0 : AuthType::Trust => (None, None),
112 : AuthType::NeonJWT => {
113 0 : let private_key_path = env.get_private_key_path();
114 0 : let private_key = fs::read(private_key_path).expect("failed to read private key");
115 0 :
116 0 : // If pageserver auth is enabled, this implicitly enables auth for this service,
117 0 : // using the same credentials.
118 0 : let public_key_path =
119 0 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
120 0 : .unwrap();
121 :
122 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
123 0 : let public_key = if std::fs::metadata(&public_key_path)
124 0 : .expect("Can't stat public key")
125 0 : .is_dir()
126 : {
127 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
128 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
129 0 : let mut dir =
130 0 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
131 0 : let dent = dir
132 0 : .next()
133 0 : .expect("Empty key dir")
134 0 : .expect("Error reading key dir");
135 0 :
136 0 : std::fs::read_to_string(dent.path()).expect("Can't read public key")
137 : } else {
138 0 : std::fs::read_to_string(&public_key_path).expect("Can't read public key")
139 : };
140 0 : (Some(private_key), Some(public_key))
141 : }
142 : };
143 :
144 0 : Self {
145 0 : env: env.clone(),
146 0 : private_key,
147 0 : public_key,
148 0 : client: reqwest::ClientBuilder::new()
149 0 : .build()
150 0 : .expect("Failed to construct http client"),
151 0 : config: env.storage_controller.clone(),
152 0 : listen: OnceLock::default(),
153 0 : }
154 0 : }
155 :
156 0 : fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
157 0 : self.env
158 0 : .base_data_dir
159 0 : .join(format!("storage_controller_{}", instance_id))
160 0 : }
161 :
162 0 : fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
163 0 : Utf8PathBuf::from_path_buf(
164 0 : self.storage_controller_instance_dir(instance_id)
165 0 : .join("storage_controller.pid"),
166 0 : )
167 0 : .expect("non-Unicode path")
168 0 : }
169 :
170 : /// PIDFile for the postgres instance used to store storage controller state
171 0 : fn postgres_pid_file(&self) -> Utf8PathBuf {
172 0 : Utf8PathBuf::from_path_buf(
173 0 : self.env
174 0 : .base_data_dir
175 0 : .join("storage_controller_postgres.pid"),
176 0 : )
177 0 : .expect("non-Unicode path")
178 0 : }
179 :
180 : /// Find the directory containing postgres subdirectories, such `bin` and `lib`
181 : ///
182 : /// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
183 : /// to other versions if that one isn't found. Some automated tests create circumstances
184 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
185 0 : async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
186 0 : let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14];
187 :
188 0 : for v in prefer_versions {
189 0 : let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
190 0 : if tokio::fs::try_exists(&path).await? {
191 0 : return Ok(path);
192 0 : }
193 : }
194 :
195 : // Fall through
196 0 : anyhow::bail!(
197 0 : "Postgres directory '{}' not found in {}",
198 0 : dir_name,
199 0 : self.env.pg_distrib_dir.display(),
200 0 : );
201 0 : }
202 :
203 0 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
204 0 : self.get_pg_dir("bin").await
205 0 : }
206 :
207 0 : pub async fn get_pg_lib_dir(&self) -> anyhow::Result<Utf8PathBuf> {
208 0 : self.get_pg_dir("lib").await
209 0 : }
210 :
211 : /// Readiness check for our postgres process
212 0 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result<bool> {
213 0 : let bin_path = pg_bin_dir.join("pg_isready");
214 0 : let args = ["-h", "localhost", "-p", &format!("{}", postgres_port)];
215 0 : let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
216 :
217 0 : Ok(exitcode.success())
218 0 : }
219 :
220 : /// Create our database if it doesn't exist
221 : ///
222 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
223 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
224 : /// who just want to run `cargo neon_local` without knowing about diesel.
225 : ///
226 : /// Returns the database url
227 0 : pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result<String> {
228 0 : let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
229 :
230 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
231 0 : let createdb_path = pg_bin_dir.join("createdb");
232 0 : let output = Command::new(&createdb_path)
233 0 : .args([
234 0 : "-h",
235 0 : "localhost",
236 0 : "-p",
237 0 : &format!("{}", postgres_port),
238 0 : DB_NAME,
239 0 : ])
240 0 : .output()
241 0 : .await
242 0 : .expect("Failed to spawn createdb");
243 0 :
244 0 : if !output.status.success() {
245 0 : let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
246 0 : if stderr.contains("already exists") {
247 0 : tracing::info!("Database {DB_NAME} already exists");
248 : } else {
249 0 : anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
250 : }
251 0 : }
252 :
253 0 : Ok(database_url)
254 0 : }
255 :
256 0 : pub async fn connect_to_database(
257 0 : &self,
258 0 : postgres_port: u16,
259 0 : ) -> anyhow::Result<(
260 0 : tokio_postgres::Client,
261 0 : tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
262 0 : )> {
263 0 : tokio_postgres::Config::new()
264 0 : .host("localhost")
265 0 : .port(postgres_port)
266 0 : // The user is the ambient operating system user name.
267 0 : // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400
268 0 : //
269 0 : // Until we get there, use the ambient operating system user name.
270 0 : // Recent tokio-postgres versions default to this if the user isn't specified.
271 0 : // But tokio-postgres fork doesn't have this upstream commit:
272 0 : // https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
273 0 : // => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
274 0 : .user(&whoami::username())
275 0 : .dbname(DB_NAME)
276 0 : .connect(tokio_postgres::NoTls)
277 0 : .await
278 0 : .map_err(anyhow::Error::new)
279 0 : }
280 :
281 0 : pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> {
282 0 : let instance_dir = self.storage_controller_instance_dir(start_args.instance_id);
283 0 : if let Err(err) = tokio::fs::create_dir(&instance_dir).await {
284 0 : if err.kind() != std::io::ErrorKind::AlreadyExists {
285 0 : panic!("Failed to create instance dir {instance_dir:?}");
286 0 : }
287 0 : }
288 :
289 0 : let (listen, postgres_port) = {
290 0 : if let Some(base_port) = start_args.base_port {
291 0 : (
292 0 : format!("127.0.0.1:{base_port}"),
293 0 : self.config
294 0 : .database_url
295 0 : .expect("--base-port requires NeonStorageControllerConf::database_url")
296 0 : .port(),
297 0 : )
298 : } else {
299 0 : let listen_url = self.env.control_plane_api.clone().unwrap();
300 0 :
301 0 : let listen = format!(
302 0 : "{}:{}",
303 0 : listen_url.host_str().unwrap(),
304 0 : listen_url.port().unwrap()
305 0 : );
306 0 :
307 0 : (listen, listen_url.port().unwrap() + 1)
308 : }
309 : };
310 :
311 0 : let socket_addr = listen
312 0 : .parse()
313 0 : .expect("listen address is a valid socket address");
314 0 : self.listen
315 0 : .set(socket_addr)
316 0 : .expect("StorageController::listen is only set here");
317 :
318 : // Do we remove the pid file on stop?
319 0 : let pg_started = self.is_postgres_running().await?;
320 0 : let pg_lib_dir = self.get_pg_lib_dir().await?;
321 :
322 0 : if !pg_started {
323 : // Start a vanilla Postgres process used by the storage controller for persistence.
324 0 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
325 0 : .unwrap()
326 0 : .join("storage_controller_db");
327 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
328 0 : let pg_log_path = pg_data_path.join("postgres.log");
329 0 :
330 0 : if !tokio::fs::try_exists(&pg_data_path).await? {
331 : // Initialize empty database
332 0 : let initdb_path = pg_bin_dir.join("initdb");
333 0 : let mut child = Command::new(&initdb_path)
334 0 : .envs(vec![
335 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
336 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
337 0 : ])
338 0 : .args(["-D", pg_data_path.as_ref()])
339 0 : .spawn()
340 0 : .expect("Failed to spawn initdb");
341 0 : let status = child.wait().await?;
342 0 : if !status.success() {
343 0 : anyhow::bail!("initdb failed with status {status}");
344 0 : }
345 0 : };
346 :
347 : // Write a minimal config file:
348 : // - Specify the port, since this is chosen dynamically
349 : // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
350 : // the storage controller we don't want a slow local disk to interfere with that.
351 : //
352 : // NB: it's important that we rewrite this file on each start command so we propagate changes
353 : // from `LocalEnv`'s config file (`.neon/config`).
354 0 : tokio::fs::write(
355 0 : &pg_data_path.join("postgresql.conf"),
356 0 : format!("port = {}\nfsync=off\n", postgres_port),
357 0 : )
358 0 : .await?;
359 :
360 0 : println!("Starting storage controller database...");
361 0 : let db_start_args = [
362 0 : "-w",
363 0 : "-D",
364 0 : pg_data_path.as_ref(),
365 0 : "-l",
366 0 : pg_log_path.as_ref(),
367 0 : "start",
368 0 : ];
369 0 :
370 0 : background_process::start_process(
371 0 : "storage_controller_db",
372 0 : &self.env.base_data_dir,
373 0 : pg_bin_dir.join("pg_ctl").as_std_path(),
374 0 : db_start_args,
375 0 : vec![
376 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
377 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
378 0 : ],
379 0 : background_process::InitialPidFile::Create(self.postgres_pid_file()),
380 0 : &start_args.start_timeout,
381 0 : || self.pg_isready(&pg_bin_dir, postgres_port),
382 0 : )
383 0 : .await?;
384 :
385 0 : self.setup_database(postgres_port).await?;
386 0 : }
387 :
388 0 : let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
389 0 :
390 0 : // We support running a startup SQL script to fiddle with the database before we launch storcon.
391 0 : // This is used by the test suite.
392 0 : let startup_script_path = self
393 0 : .env
394 0 : .base_data_dir
395 0 : .join("storage_controller_db.startup.sql");
396 0 : let startup_script = match tokio::fs::read_to_string(&startup_script_path).await {
397 0 : Ok(script) => {
398 0 : tokio::fs::remove_file(startup_script_path).await?;
399 0 : script
400 : }
401 0 : Err(e) => {
402 0 : if e.kind() == std::io::ErrorKind::NotFound {
403 : // always run some startup script so that this code path doesn't bit rot
404 0 : "BEGIN; COMMIT;".to_string()
405 : } else {
406 0 : anyhow::bail!("Failed to read startup script: {e}")
407 : }
408 : }
409 : };
410 0 : let (mut client, conn) = self.connect_to_database(postgres_port).await?;
411 0 : let conn = tokio::spawn(conn);
412 0 : let tx = client.build_transaction();
413 0 : let tx = tx.start().await?;
414 0 : tx.batch_execute(&startup_script).await?;
415 0 : tx.commit().await?;
416 0 : drop(client);
417 0 : conn.await??;
418 :
419 0 : let listen = self
420 0 : .listen
421 0 : .get()
422 0 : .expect("cell is set earlier in this function");
423 0 : let address_for_peers = Uri::builder()
424 0 : .scheme("http")
425 0 : .authority(format!("{}:{}", listen.ip(), listen.port()))
426 0 : .path_and_query("")
427 0 : .build()
428 0 : .unwrap();
429 0 :
430 0 : let mut args = vec![
431 0 : "-l",
432 0 : &listen.to_string(),
433 0 : "--dev",
434 0 : "--database-url",
435 0 : &database_url,
436 0 : "--max-offline-interval",
437 0 : &humantime::Duration::from(self.config.max_offline).to_string(),
438 0 : "--max-warming-up-interval",
439 0 : &humantime::Duration::from(self.config.max_warming_up).to_string(),
440 0 : "--address-for-peers",
441 0 : &address_for_peers.to_string(),
442 0 : ]
443 0 : .into_iter()
444 0 : .map(|s| s.to_string())
445 0 : .collect::<Vec<_>>();
446 0 :
447 0 : if self.config.start_as_candidate {
448 0 : args.push("--start-as-candidate".to_string());
449 0 : }
450 :
451 0 : if let Some(private_key) = &self.private_key {
452 0 : let claims = Claims::new(None, Scope::PageServerApi);
453 0 : let jwt_token =
454 0 : encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
455 0 : args.push(format!("--jwt-token={jwt_token}"));
456 0 :
457 0 : let peer_claims = Claims::new(None, Scope::Admin);
458 0 : let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
459 0 : .expect("failed to generate jwt token");
460 0 : args.push(format!("--peer-jwt-token={peer_jwt_token}"));
461 0 : }
462 :
463 0 : if let Some(public_key) = &self.public_key {
464 0 : args.push(format!("--public-key=\"{public_key}\""));
465 0 : }
466 :
467 0 : if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
468 0 : args.push(format!(
469 0 : "--compute-hook-url={control_plane_compute_hook_api}"
470 0 : ));
471 0 : }
472 :
473 0 : if let Some(split_threshold) = self.config.split_threshold.as_ref() {
474 0 : args.push(format!("--split-threshold={split_threshold}"))
475 0 : }
476 :
477 0 : if let Some(lag) = self.config.max_secondary_lag_bytes.as_ref() {
478 0 : args.push(format!("--max-secondary-lag-bytes={lag}"))
479 0 : }
480 :
481 0 : args.push(format!(
482 0 : "--neon-local-repo-dir={}",
483 0 : self.env.base_data_dir.display()
484 0 : ));
485 0 :
486 0 : background_process::start_process(
487 0 : COMMAND,
488 0 : &instance_dir,
489 0 : &self.env.storage_controller_bin(),
490 0 : args,
491 0 : vec![
492 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
493 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
494 0 : ],
495 0 : background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)),
496 0 : &start_args.start_timeout,
497 0 : || async {
498 0 : match self.ready().await {
499 0 : Ok(_) => Ok(true),
500 0 : Err(_) => Ok(false),
501 0 : }
502 0 : },
503 0 : )
504 0 : .await?;
505 :
506 0 : Ok(())
507 0 : }
508 :
509 0 : pub async fn stop(&self, stop_args: NeonStorageControllerStopArgs) -> anyhow::Result<()> {
510 0 : background_process::stop_process(
511 0 : stop_args.immediate,
512 0 : COMMAND,
513 0 : &self.pid_file(stop_args.instance_id),
514 0 : )?;
515 :
516 0 : let storcon_instances = self.env.storage_controller_instances().await?;
517 0 : for (instance_id, instanced_dir_path) in storcon_instances {
518 0 : if instance_id == stop_args.instance_id {
519 0 : continue;
520 0 : }
521 0 :
522 0 : let pid_file = instanced_dir_path.join("storage_controller.pid");
523 0 : let pid = tokio::fs::read_to_string(&pid_file)
524 0 : .await
525 0 : .map_err(|err| {
526 0 : anyhow::anyhow!("Failed to read storcon pid file at {pid_file:?}: {err}")
527 0 : })?
528 0 : .parse::<i32>()
529 0 : .expect("pid is valid i32");
530 :
531 0 : let other_proc_alive = !background_process::process_has_stopped(Pid::from_raw(pid))?;
532 0 : if other_proc_alive {
533 : // There is another storage controller instance running, so we return
534 : // and leave the database running.
535 0 : return Ok(());
536 0 : }
537 : }
538 :
539 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
540 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
541 :
542 0 : println!("Stopping storage controller database...");
543 0 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
544 0 : let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
545 0 : .args(pg_stop_args)
546 0 : .spawn()?
547 0 : .wait()
548 0 : .await?;
549 0 : if !stop_status.success() {
550 0 : match self.is_postgres_running().await {
551 : Ok(false) => {
552 0 : println!("Storage controller database is already stopped");
553 0 : return Ok(());
554 : }
555 : Ok(true) => {
556 0 : anyhow::bail!("Failed to stop storage controller database");
557 : }
558 0 : Err(err) => {
559 0 : anyhow::bail!("Failed to stop storage controller database: {err}");
560 : }
561 : }
562 0 : }
563 0 :
564 0 : Ok(())
565 0 : }
566 :
567 0 : async fn is_postgres_running(&self) -> anyhow::Result<bool> {
568 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
569 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
570 :
571 0 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
572 0 : let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
573 0 : .args(pg_status_args)
574 0 : .spawn()?
575 0 : .wait()
576 0 : .await?;
577 :
578 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
579 : // fine that stop failed. Otherwise it is an error that stop failed.
580 : const PG_STATUS_NOT_RUNNING: i32 = 3;
581 : const PG_NO_DATA_DIR: i32 = 4;
582 : const PG_STATUS_RUNNING: i32 = 0;
583 0 : match status_exitcode.code() {
584 0 : Some(PG_STATUS_NOT_RUNNING) => Ok(false),
585 0 : Some(PG_NO_DATA_DIR) => Ok(false),
586 0 : Some(PG_STATUS_RUNNING) => Ok(true),
587 0 : Some(code) => Err(anyhow::anyhow!(
588 0 : "pg_ctl status returned unexpected status code: {:?}",
589 0 : code
590 0 : )),
591 0 : None => Err(anyhow::anyhow!("pg_ctl status returned no status code")),
592 : }
593 0 : }
594 :
595 0 : fn get_claims_for_path(path: &str) -> anyhow::Result<Option<Claims>> {
596 0 : let category = match path.find('/') {
597 0 : Some(idx) => &path[..idx],
598 0 : None => path,
599 : };
600 :
601 0 : match category {
602 0 : "status" | "ready" => Ok(None),
603 0 : "control" | "debug" => Ok(Some(Claims::new(None, Scope::Admin))),
604 0 : "v1" => Ok(Some(Claims::new(None, Scope::PageServerApi))),
605 0 : _ => Err(anyhow::anyhow!("Failed to determine claims for {}", path)),
606 : }
607 0 : }
608 :
609 : /// Simple HTTP request wrapper for calling into storage controller
610 0 : async fn dispatch<RQ, RS>(
611 0 : &self,
612 0 : method: reqwest::Method,
613 0 : path: String,
614 0 : body: Option<RQ>,
615 0 : ) -> anyhow::Result<RS>
616 0 : where
617 0 : RQ: Serialize + Sized,
618 0 : RS: DeserializeOwned + Sized,
619 0 : {
620 : // In the special case of the `storage_controller start` subcommand, we wish
621 : // to use the API endpoint of the newly started storage controller in order
622 : // to pass the readiness check. In this scenario [`Self::listen`] will be set
623 : // (see [`Self::start`]).
624 : //
625 : // Otherwise, we infer the storage controller api endpoint from the configured
626 : // control plane API.
627 0 : let url = if let Some(socket_addr) = self.listen.get() {
628 0 : Url::from_str(&format!(
629 0 : "http://{}:{}/{path}",
630 0 : socket_addr.ip().to_canonical(),
631 0 : socket_addr.port()
632 0 : ))
633 0 : .unwrap()
634 : } else {
635 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
636 : // for general purpose API access.
637 0 : let listen_url = self.env.control_plane_api.clone().unwrap();
638 0 : Url::from_str(&format!(
639 0 : "http://{}:{}/{path}",
640 0 : listen_url.host_str().unwrap(),
641 0 : listen_url.port().unwrap()
642 0 : ))
643 0 : .unwrap()
644 : };
645 :
646 0 : let mut builder = self.client.request(method, url);
647 0 : if let Some(body) = body {
648 0 : builder = builder.json(&body)
649 0 : }
650 0 : if let Some(private_key) = &self.private_key {
651 0 : println!("Getting claims for path {}", path);
652 0 : if let Some(required_claims) = Self::get_claims_for_path(&path)? {
653 0 : println!("Got claims {:?} for path {}", required_claims, path);
654 0 : let jwt_token = encode_from_key_file(&required_claims, private_key)?;
655 0 : builder = builder.header(
656 0 : reqwest::header::AUTHORIZATION,
657 0 : format!("Bearer {jwt_token}"),
658 0 : );
659 0 : }
660 0 : }
661 :
662 0 : let response = builder.send().await?;
663 0 : let response = response.error_from_body().await?;
664 :
665 0 : Ok(response
666 0 : .json()
667 0 : .await
668 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
669 0 : }
670 :
671 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
672 0 : #[instrument(skip(self))]
673 : pub async fn attach_hook(
674 : &self,
675 : tenant_shard_id: TenantShardId,
676 : pageserver_id: NodeId,
677 : ) -> anyhow::Result<Option<u32>> {
678 : let request = AttachHookRequest {
679 : tenant_shard_id,
680 : node_id: Some(pageserver_id),
681 : generation_override: None,
682 : };
683 :
684 : let response = self
685 : .dispatch::<_, AttachHookResponse>(
686 : Method::POST,
687 : "debug/v1/attach-hook".to_string(),
688 : Some(request),
689 : )
690 : .await?;
691 :
692 : Ok(response.gen)
693 : }
694 :
695 0 : #[instrument(skip(self))]
696 : pub async fn inspect(
697 : &self,
698 : tenant_shard_id: TenantShardId,
699 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
700 : let request = InspectRequest { tenant_shard_id };
701 :
702 : let response = self
703 : .dispatch::<_, InspectResponse>(
704 : Method::POST,
705 : "debug/v1/inspect".to_string(),
706 : Some(request),
707 : )
708 : .await?;
709 :
710 : Ok(response.attachment)
711 : }
712 :
713 0 : #[instrument(skip(self))]
714 : pub async fn tenant_create(
715 : &self,
716 : req: TenantCreateRequest,
717 : ) -> anyhow::Result<TenantCreateResponse> {
718 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
719 : .await
720 : }
721 :
722 0 : #[instrument(skip(self))]
723 : pub async fn tenant_import(&self, tenant_id: TenantId) -> anyhow::Result<TenantCreateResponse> {
724 : self.dispatch::<(), TenantCreateResponse>(
725 : Method::POST,
726 : format!("debug/v1/tenant/{tenant_id}/import"),
727 : None,
728 : )
729 : .await
730 : }
731 :
732 0 : #[instrument(skip(self))]
733 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
734 : self.dispatch::<(), _>(
735 : Method::GET,
736 : format!("debug/v1/tenant/{tenant_id}/locate"),
737 : None,
738 : )
739 : .await
740 : }
741 :
742 0 : #[instrument(skip(self))]
743 : pub async fn tenant_migrate(
744 : &self,
745 : tenant_shard_id: TenantShardId,
746 : node_id: NodeId,
747 : ) -> anyhow::Result<TenantShardMigrateResponse> {
748 : self.dispatch(
749 : Method::PUT,
750 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
751 : Some(TenantShardMigrateRequest {
752 : tenant_shard_id,
753 : node_id,
754 : }),
755 : )
756 : .await
757 : }
758 :
759 0 : #[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
760 : pub async fn tenant_split(
761 : &self,
762 : tenant_id: TenantId,
763 : new_shard_count: u8,
764 : new_stripe_size: Option<ShardStripeSize>,
765 : ) -> anyhow::Result<TenantShardSplitResponse> {
766 : self.dispatch(
767 : Method::PUT,
768 : format!("control/v1/tenant/{tenant_id}/shard_split"),
769 : Some(TenantShardSplitRequest {
770 : new_shard_count,
771 : new_stripe_size,
772 : }),
773 : )
774 : .await
775 : }
776 :
777 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
778 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
779 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
780 : .await
781 : }
782 :
783 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
784 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
785 : self.dispatch::<_, ()>(
786 : Method::PUT,
787 : format!("control/v1/node/{}/config", req.node_id),
788 : Some(req),
789 : )
790 : .await
791 : }
792 :
793 0 : pub async fn node_list(&self) -> anyhow::Result<Vec<NodeDescribeResponse>> {
794 0 : self.dispatch::<(), Vec<NodeDescribeResponse>>(
795 0 : Method::GET,
796 0 : "control/v1/node".to_string(),
797 0 : None,
798 0 : )
799 0 : .await
800 0 : }
801 :
802 0 : #[instrument(skip(self))]
803 : pub async fn ready(&self) -> anyhow::Result<()> {
804 : self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None)
805 : .await
806 : }
807 :
808 0 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
809 : pub async fn tenant_timeline_create(
810 : &self,
811 : tenant_id: TenantId,
812 : req: TimelineCreateRequest,
813 : ) -> anyhow::Result<TimelineInfo> {
814 : self.dispatch(
815 : Method::POST,
816 : format!("v1/tenant/{tenant_id}/timeline"),
817 : Some(req),
818 : )
819 : .await
820 : }
821 : }
|