Line data Source code
1 : use std::ffi::OsStr;
2 : use std::fs;
3 : use std::net::SocketAddr;
4 : use std::path::PathBuf;
5 : use std::process::ExitStatus;
6 : use std::str::FromStr;
7 : use std::sync::OnceLock;
8 : use std::time::{Duration, Instant};
9 :
10 : use camino::{Utf8Path, Utf8PathBuf};
11 : use hyper0::Uri;
12 : use nix::unistd::Pid;
13 : use pageserver_api::controller_api::{
14 : NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
15 : TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
16 : TenantShardMigrateResponse,
17 : };
18 : use pageserver_api::models::{
19 : TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
20 : };
21 : use pageserver_api::shard::{ShardStripeSize, TenantShardId};
22 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
23 : use postgres_backend::AuthType;
24 : use reqwest::Method;
25 : use serde::de::DeserializeOwned;
26 : use serde::{Deserialize, Serialize};
27 : use tokio::process::Command;
28 : use tracing::instrument;
29 : use url::Url;
30 : use utils::auth::{Claims, Scope, encode_from_key_file};
31 : use utils::id::{NodeId, TenantId};
32 : use whoami::username;
33 :
34 : use crate::background_process;
35 : use crate::local_env::{LocalEnv, NeonStorageControllerConf};
36 :
37 : pub struct StorageController {
38 : env: LocalEnv,
39 : private_key: Option<Vec<u8>>,
40 : public_key: Option<String>,
41 : client: reqwest::Client,
42 : config: NeonStorageControllerConf,
43 :
44 : // The listen addresses is learned when starting the storage controller,
45 : // hence the use of OnceLock to init it at the right time.
46 : listen: OnceLock<SocketAddr>,
47 : }
48 :
49 : const COMMAND: &str = "storage_controller";
50 :
51 : const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
52 :
53 : const DB_NAME: &str = "storage_controller";
54 :
55 : pub struct NeonStorageControllerStartArgs {
56 : pub instance_id: u8,
57 : pub base_port: Option<u16>,
58 : pub start_timeout: humantime::Duration,
59 : }
60 :
61 : impl NeonStorageControllerStartArgs {
62 0 : pub fn with_default_instance_id(start_timeout: humantime::Duration) -> Self {
63 0 : Self {
64 0 : instance_id: 1,
65 0 : base_port: None,
66 0 : start_timeout,
67 0 : }
68 0 : }
69 : }
70 :
71 : pub struct NeonStorageControllerStopArgs {
72 : pub instance_id: u8,
73 : pub immediate: bool,
74 : }
75 :
76 : impl NeonStorageControllerStopArgs {
77 0 : pub fn with_default_instance_id(immediate: bool) -> Self {
78 0 : Self {
79 0 : instance_id: 1,
80 0 : immediate,
81 0 : }
82 0 : }
83 : }
84 :
85 0 : #[derive(Serialize, Deserialize)]
86 : pub struct AttachHookRequest {
87 : pub tenant_shard_id: TenantShardId,
88 : pub node_id: Option<NodeId>,
89 : pub generation_override: Option<i32>,
90 : }
91 :
92 0 : #[derive(Serialize, Deserialize)]
93 : pub struct AttachHookResponse {
94 : #[serde(rename = "gen")]
95 : pub generation: Option<u32>,
96 : }
97 :
98 0 : #[derive(Serialize, Deserialize)]
99 : pub struct InspectRequest {
100 : pub tenant_shard_id: TenantShardId,
101 : }
102 :
103 0 : #[derive(Serialize, Deserialize)]
104 : pub struct InspectResponse {
105 : pub attachment: Option<(u32, NodeId)>,
106 : }
107 :
108 : impl StorageController {
109 0 : pub fn from_env(env: &LocalEnv) -> Self {
110 0 : // Assume all pageservers have symmetric auth configuration: this service
111 0 : // expects to use one JWT token to talk to all of them.
112 0 : let ps_conf = env
113 0 : .pageservers
114 0 : .first()
115 0 : .expect("Config is validated to contain at least one pageserver");
116 0 : let (private_key, public_key) = match ps_conf.http_auth_type {
117 0 : AuthType::Trust => (None, None),
118 : AuthType::NeonJWT => {
119 0 : let private_key_path = env.get_private_key_path();
120 0 : let private_key = fs::read(private_key_path).expect("failed to read private key");
121 0 :
122 0 : // If pageserver auth is enabled, this implicitly enables auth for this service,
123 0 : // using the same credentials.
124 0 : let public_key_path =
125 0 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
126 0 : .unwrap();
127 :
128 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
129 0 : let public_key = if std::fs::metadata(&public_key_path)
130 0 : .expect("Can't stat public key")
131 0 : .is_dir()
132 : {
133 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
134 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
135 0 : let mut dir =
136 0 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
137 0 : let dent = dir
138 0 : .next()
139 0 : .expect("Empty key dir")
140 0 : .expect("Error reading key dir");
141 0 :
142 0 : std::fs::read_to_string(dent.path()).expect("Can't read public key")
143 : } else {
144 0 : std::fs::read_to_string(&public_key_path).expect("Can't read public key")
145 : };
146 0 : (Some(private_key), Some(public_key))
147 : }
148 : };
149 :
150 0 : Self {
151 0 : env: env.clone(),
152 0 : private_key,
153 0 : public_key,
154 0 : client: reqwest::ClientBuilder::new()
155 0 : .build()
156 0 : .expect("Failed to construct http client"),
157 0 : config: env.storage_controller.clone(),
158 0 : listen: OnceLock::default(),
159 0 : }
160 0 : }
161 :
162 0 : fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
163 0 : self.env
164 0 : .base_data_dir
165 0 : .join(format!("storage_controller_{}", instance_id))
166 0 : }
167 :
168 0 : fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
169 0 : Utf8PathBuf::from_path_buf(
170 0 : self.storage_controller_instance_dir(instance_id)
171 0 : .join("storage_controller.pid"),
172 0 : )
173 0 : .expect("non-Unicode path")
174 0 : }
175 :
176 : /// Find the directory containing postgres subdirectories, such `bin` and `lib`
177 : ///
178 : /// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
179 : /// to other versions if that one isn't found. Some automated tests create circumstances
180 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
181 0 : async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
182 0 : let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 16, 15, 14];
183 :
184 0 : for v in prefer_versions {
185 0 : let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
186 0 : if tokio::fs::try_exists(&path).await? {
187 0 : return Ok(path);
188 0 : }
189 : }
190 :
191 : // Fall through
192 0 : anyhow::bail!(
193 0 : "Postgres directory '{}' not found in {}",
194 0 : dir_name,
195 0 : self.env.pg_distrib_dir.display(),
196 0 : );
197 0 : }
198 :
199 0 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
200 0 : self.get_pg_dir("bin").await
201 0 : }
202 :
203 0 : pub async fn get_pg_lib_dir(&self) -> anyhow::Result<Utf8PathBuf> {
204 0 : self.get_pg_dir("lib").await
205 0 : }
206 :
207 : /// Readiness check for our postgres process
208 0 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result<bool> {
209 0 : let bin_path = pg_bin_dir.join("pg_isready");
210 0 : let args = [
211 0 : "-h",
212 0 : "localhost",
213 0 : "-U",
214 0 : &username(),
215 0 : "-d",
216 0 : DB_NAME,
217 0 : "-p",
218 0 : &format!("{}", postgres_port),
219 0 : ];
220 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
221 0 : let envs = [
222 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
223 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
224 0 : ];
225 0 : let exitcode = Command::new(bin_path)
226 0 : .args(args)
227 0 : .envs(envs)
228 0 : .spawn()?
229 0 : .wait()
230 0 : .await?;
231 :
232 0 : Ok(exitcode.success())
233 0 : }
234 :
235 : /// Create our database if it doesn't exist
236 : ///
237 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
238 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
239 : /// who just want to run `cargo neon_local` without knowing about diesel.
240 : ///
241 : /// Returns the database url
242 0 : pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result<String> {
243 0 : let database_url = format!(
244 0 : "postgresql://{}@localhost:{}/{DB_NAME}",
245 0 : &username(),
246 0 : postgres_port
247 0 : );
248 :
249 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
250 0 : let createdb_path = pg_bin_dir.join("createdb");
251 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
252 0 : let envs = [
253 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
254 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
255 0 : ];
256 0 : let output = Command::new(&createdb_path)
257 0 : .args([
258 0 : "-h",
259 0 : "localhost",
260 0 : "-p",
261 0 : &format!("{}", postgres_port),
262 0 : "-U",
263 0 : &username(),
264 0 : "-O",
265 0 : &username(),
266 0 : DB_NAME,
267 0 : ])
268 0 : .envs(envs)
269 0 : .output()
270 0 : .await
271 0 : .expect("Failed to spawn createdb");
272 0 :
273 0 : if !output.status.success() {
274 0 : let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
275 0 : if stderr.contains("already exists") {
276 0 : tracing::info!("Database {DB_NAME} already exists");
277 : } else {
278 0 : anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
279 : }
280 0 : }
281 :
282 0 : Ok(database_url)
283 0 : }
284 :
285 0 : pub async fn connect_to_database(
286 0 : &self,
287 0 : postgres_port: u16,
288 0 : ) -> anyhow::Result<(
289 0 : tokio_postgres::Client,
290 0 : tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
291 0 : )> {
292 0 : tokio_postgres::Config::new()
293 0 : .host("localhost")
294 0 : .port(postgres_port)
295 0 : // The user is the ambient operating system user name.
296 0 : // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400
297 0 : //
298 0 : // Until we get there, use the ambient operating system user name.
299 0 : // Recent tokio-postgres versions default to this if the user isn't specified.
300 0 : // But tokio-postgres fork doesn't have this upstream commit:
301 0 : // https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
302 0 : // => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
303 0 : .user(&username())
304 0 : .dbname(DB_NAME)
305 0 : .connect(tokio_postgres::NoTls)
306 0 : .await
307 0 : .map_err(anyhow::Error::new)
308 0 : }
309 :
310 : /// Wrapper for the pg_ctl binary, which we spawn as a short-lived subprocess when starting and stopping postgres
311 0 : async fn pg_ctl<I, S>(&self, args: I) -> ExitStatus
312 0 : where
313 0 : I: IntoIterator<Item = S>,
314 0 : S: AsRef<OsStr>,
315 0 : {
316 0 : let pg_bin_dir = self.get_pg_bin_dir().await.unwrap();
317 0 : let bin_path = pg_bin_dir.join("pg_ctl");
318 :
319 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
320 0 : let envs = [
321 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
322 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
323 0 : ];
324 0 :
325 0 : Command::new(bin_path)
326 0 : .args(args)
327 0 : .envs(envs)
328 0 : .spawn()
329 0 : .expect("Failed to spawn pg_ctl, binary_missing?")
330 0 : .wait()
331 0 : .await
332 0 : .expect("Failed to wait for pg_ctl termination")
333 0 : }
334 :
335 0 : pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> {
336 0 : let instance_dir = self.storage_controller_instance_dir(start_args.instance_id);
337 0 : if let Err(err) = tokio::fs::create_dir(&instance_dir).await {
338 0 : if err.kind() != std::io::ErrorKind::AlreadyExists {
339 0 : panic!("Failed to create instance dir {instance_dir:?}");
340 0 : }
341 0 : }
342 :
343 0 : let (listen, postgres_port) = {
344 0 : if let Some(base_port) = start_args.base_port {
345 0 : (
346 0 : format!("127.0.0.1:{base_port}"),
347 0 : self.config
348 0 : .database_url
349 0 : .expect("--base-port requires NeonStorageControllerConf::database_url")
350 0 : .port(),
351 0 : )
352 : } else {
353 0 : let listen_url = self.env.control_plane_api.clone();
354 0 :
355 0 : let listen = format!(
356 0 : "{}:{}",
357 0 : listen_url.host_str().unwrap(),
358 0 : listen_url.port().unwrap()
359 0 : );
360 0 :
361 0 : (listen, listen_url.port().unwrap() + 1)
362 : }
363 : };
364 :
365 0 : let socket_addr = listen
366 0 : .parse()
367 0 : .expect("listen address is a valid socket address");
368 0 : self.listen
369 0 : .set(socket_addr)
370 0 : .expect("StorageController::listen is only set here");
371 :
372 : // Do we remove the pid file on stop?
373 0 : let pg_started = self.is_postgres_running().await?;
374 0 : let pg_lib_dir = self.get_pg_lib_dir().await?;
375 :
376 0 : if !pg_started {
377 : // Start a vanilla Postgres process used by the storage controller for persistence.
378 0 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
379 0 : .unwrap()
380 0 : .join("storage_controller_db");
381 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
382 0 : let pg_log_path = pg_data_path.join("postgres.log");
383 0 :
384 0 : if !tokio::fs::try_exists(&pg_data_path).await? {
385 0 : let initdb_args = [
386 0 : "--pgdata",
387 0 : pg_data_path.as_ref(),
388 0 : "--username",
389 0 : &username(),
390 0 : "--no-sync",
391 0 : "--no-instructions",
392 0 : ];
393 0 : tracing::info!(
394 0 : "Initializing storage controller database with args: {:?}",
395 : initdb_args
396 : );
397 :
398 : // Initialize empty database
399 0 : let initdb_path = pg_bin_dir.join("initdb");
400 0 : let mut child = Command::new(&initdb_path)
401 0 : .envs(vec![
402 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
403 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
404 0 : ])
405 0 : .args(initdb_args)
406 0 : .spawn()
407 0 : .expect("Failed to spawn initdb");
408 0 : let status = child.wait().await?;
409 0 : if !status.success() {
410 0 : anyhow::bail!("initdb failed with status {status}");
411 0 : }
412 0 : };
413 :
414 : // Write a minimal config file:
415 : // - Specify the port, since this is chosen dynamically
416 : // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
417 : // the storage controller we don't want a slow local disk to interfere with that.
418 : //
419 : // NB: it's important that we rewrite this file on each start command so we propagate changes
420 : // from `LocalEnv`'s config file (`.neon/config`).
421 0 : tokio::fs::write(
422 0 : &pg_data_path.join("postgresql.conf"),
423 0 : format!("port = {}\nfsync=off\n", postgres_port),
424 0 : )
425 0 : .await?;
426 :
427 0 : println!("Starting storage controller database...");
428 0 : let db_start_args = [
429 0 : "-w",
430 0 : "-D",
431 0 : pg_data_path.as_ref(),
432 0 : "-l",
433 0 : pg_log_path.as_ref(),
434 0 : "-U",
435 0 : &username(),
436 0 : "start",
437 0 : ];
438 0 : tracing::info!(
439 0 : "Starting storage controller database with args: {:?}",
440 : db_start_args
441 : );
442 :
443 0 : let db_start_status = self.pg_ctl(db_start_args).await;
444 0 : let start_timeout: Duration = start_args.start_timeout.into();
445 0 : let db_start_deadline = Instant::now() + start_timeout;
446 0 : if !db_start_status.success() {
447 0 : return Err(anyhow::anyhow!(
448 0 : "Failed to start postgres {}",
449 0 : db_start_status.code().unwrap()
450 0 : ));
451 0 : }
452 :
453 : loop {
454 0 : if Instant::now() > db_start_deadline {
455 0 : return Err(anyhow::anyhow!("Timed out waiting for postgres to start"));
456 0 : }
457 0 :
458 0 : match self.pg_isready(&pg_bin_dir, postgres_port).await {
459 : Ok(true) => {
460 0 : tracing::info!("storage controller postgres is now ready");
461 0 : break;
462 : }
463 : Ok(false) => {
464 0 : tokio::time::sleep(Duration::from_millis(100)).await;
465 : }
466 0 : Err(e) => {
467 0 : tracing::warn!("Failed to check postgres status: {e}")
468 : }
469 : }
470 : }
471 :
472 0 : self.setup_database(postgres_port).await?;
473 0 : }
474 :
475 0 : let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
476 0 :
477 0 : // We support running a startup SQL script to fiddle with the database before we launch storcon.
478 0 : // This is used by the test suite.
479 0 : let startup_script_path = self
480 0 : .env
481 0 : .base_data_dir
482 0 : .join("storage_controller_db.startup.sql");
483 0 : let startup_script = match tokio::fs::read_to_string(&startup_script_path).await {
484 0 : Ok(script) => {
485 0 : tokio::fs::remove_file(startup_script_path).await?;
486 0 : script
487 : }
488 0 : Err(e) => {
489 0 : if e.kind() == std::io::ErrorKind::NotFound {
490 : // always run some startup script so that this code path doesn't bit rot
491 0 : "BEGIN; COMMIT;".to_string()
492 : } else {
493 0 : anyhow::bail!("Failed to read startup script: {e}")
494 : }
495 : }
496 : };
497 0 : let (mut client, conn) = self.connect_to_database(postgres_port).await?;
498 0 : let conn = tokio::spawn(conn);
499 0 : let tx = client.build_transaction();
500 0 : let tx = tx.start().await?;
501 0 : tx.batch_execute(&startup_script).await?;
502 0 : tx.commit().await?;
503 0 : drop(client);
504 0 : conn.await??;
505 :
506 0 : let listen = self
507 0 : .listen
508 0 : .get()
509 0 : .expect("cell is set earlier in this function");
510 0 : let address_for_peers = Uri::builder()
511 0 : .scheme("http")
512 0 : .authority(format!("{}:{}", listen.ip(), listen.port()))
513 0 : .path_and_query("")
514 0 : .build()
515 0 : .unwrap();
516 0 :
517 0 : let mut args = vec![
518 0 : "-l",
519 0 : &listen.to_string(),
520 0 : "--dev",
521 0 : "--database-url",
522 0 : &database_url,
523 0 : "--max-offline-interval",
524 0 : &humantime::Duration::from(self.config.max_offline).to_string(),
525 0 : "--max-warming-up-interval",
526 0 : &humantime::Duration::from(self.config.max_warming_up).to_string(),
527 0 : "--heartbeat-interval",
528 0 : &humantime::Duration::from(self.config.heartbeat_interval).to_string(),
529 0 : "--address-for-peers",
530 0 : &address_for_peers.to_string(),
531 0 : ]
532 0 : .into_iter()
533 0 : .map(|s| s.to_string())
534 0 : .collect::<Vec<_>>();
535 0 :
536 0 : if self.config.start_as_candidate {
537 0 : args.push("--start-as-candidate".to_string());
538 0 : }
539 :
540 0 : if self.config.load_safekeepers {
541 0 : args.push("--load-safekeepers".to_string());
542 0 : }
543 :
544 0 : if let Some(private_key) = &self.private_key {
545 0 : let claims = Claims::new(None, Scope::PageServerApi);
546 0 : let jwt_token =
547 0 : encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
548 0 : args.push(format!("--jwt-token={jwt_token}"));
549 0 :
550 0 : let peer_claims = Claims::new(None, Scope::Admin);
551 0 : let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
552 0 : .expect("failed to generate jwt token");
553 0 : args.push(format!("--peer-jwt-token={peer_jwt_token}"));
554 0 : }
555 :
556 0 : if let Some(public_key) = &self.public_key {
557 0 : args.push(format!("--public-key=\"{public_key}\""));
558 0 : }
559 :
560 0 : if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
561 0 : args.push(format!(
562 0 : "--compute-hook-url={control_plane_compute_hook_api}"
563 0 : ));
564 0 : }
565 :
566 0 : if let Some(split_threshold) = self.config.split_threshold.as_ref() {
567 0 : args.push(format!("--split-threshold={split_threshold}"))
568 0 : }
569 :
570 0 : if let Some(lag) = self.config.max_secondary_lag_bytes.as_ref() {
571 0 : args.push(format!("--max-secondary-lag-bytes={lag}"))
572 0 : }
573 :
574 0 : if let Some(threshold) = self.config.long_reconcile_threshold {
575 0 : args.push(format!(
576 0 : "--long-reconcile-threshold={}",
577 0 : humantime::Duration::from(threshold)
578 0 : ))
579 0 : }
580 :
581 0 : args.push(format!(
582 0 : "--neon-local-repo-dir={}",
583 0 : self.env.base_data_dir.display()
584 0 : ));
585 0 :
586 0 : background_process::start_process(
587 0 : COMMAND,
588 0 : &instance_dir,
589 0 : &self.env.storage_controller_bin(),
590 0 : args,
591 0 : vec![
592 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
593 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
594 0 : ],
595 0 : background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)),
596 0 : &start_args.start_timeout,
597 0 : || async {
598 0 : match self.ready().await {
599 0 : Ok(_) => Ok(true),
600 0 : Err(_) => Ok(false),
601 : }
602 0 : },
603 0 : )
604 0 : .await?;
605 :
606 0 : Ok(())
607 0 : }
608 :
609 0 : pub async fn stop(&self, stop_args: NeonStorageControllerStopArgs) -> anyhow::Result<()> {
610 0 : background_process::stop_process(
611 0 : stop_args.immediate,
612 0 : COMMAND,
613 0 : &self.pid_file(stop_args.instance_id),
614 0 : )?;
615 :
616 0 : let storcon_instances = self.env.storage_controller_instances().await?;
617 0 : for (instance_id, instanced_dir_path) in storcon_instances {
618 0 : if instance_id == stop_args.instance_id {
619 0 : continue;
620 0 : }
621 0 :
622 0 : let pid_file = instanced_dir_path.join("storage_controller.pid");
623 0 : let pid = tokio::fs::read_to_string(&pid_file)
624 0 : .await
625 0 : .map_err(|err| {
626 0 : anyhow::anyhow!("Failed to read storcon pid file at {pid_file:?}: {err}")
627 0 : })?
628 0 : .parse::<i32>()
629 0 : .expect("pid is valid i32");
630 :
631 0 : let other_proc_alive = !background_process::process_has_stopped(Pid::from_raw(pid))?;
632 0 : if other_proc_alive {
633 : // There is another storage controller instance running, so we return
634 : // and leave the database running.
635 0 : return Ok(());
636 0 : }
637 : }
638 :
639 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
640 0 :
641 0 : println!("Stopping storage controller database...");
642 0 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
643 0 : let stop_status = self.pg_ctl(pg_stop_args).await;
644 0 : if !stop_status.success() {
645 0 : match self.is_postgres_running().await {
646 : Ok(false) => {
647 0 : println!("Storage controller database is already stopped");
648 0 : return Ok(());
649 : }
650 : Ok(true) => {
651 0 : anyhow::bail!("Failed to stop storage controller database");
652 : }
653 0 : Err(err) => {
654 0 : anyhow::bail!("Failed to stop storage controller database: {err}");
655 : }
656 : }
657 0 : }
658 0 :
659 0 : Ok(())
660 0 : }
661 :
662 0 : async fn is_postgres_running(&self) -> anyhow::Result<bool> {
663 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
664 0 :
665 0 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
666 0 : let status_exitcode = self.pg_ctl(pg_status_args).await;
667 :
668 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
669 : // fine that stop failed. Otherwise it is an error that stop failed.
670 : const PG_STATUS_NOT_RUNNING: i32 = 3;
671 : const PG_NO_DATA_DIR: i32 = 4;
672 : const PG_STATUS_RUNNING: i32 = 0;
673 0 : match status_exitcode.code() {
674 0 : Some(PG_STATUS_NOT_RUNNING) => Ok(false),
675 0 : Some(PG_NO_DATA_DIR) => Ok(false),
676 0 : Some(PG_STATUS_RUNNING) => Ok(true),
677 0 : Some(code) => Err(anyhow::anyhow!(
678 0 : "pg_ctl status returned unexpected status code: {:?}",
679 0 : code
680 0 : )),
681 0 : None => Err(anyhow::anyhow!("pg_ctl status returned no status code")),
682 : }
683 0 : }
684 :
685 0 : fn get_claims_for_path(path: &str) -> anyhow::Result<Option<Claims>> {
686 0 : let category = match path.find('/') {
687 0 : Some(idx) => &path[..idx],
688 0 : None => path,
689 : };
690 :
691 0 : match category {
692 0 : "status" | "ready" => Ok(None),
693 0 : "control" | "debug" => Ok(Some(Claims::new(None, Scope::Admin))),
694 0 : "v1" => Ok(Some(Claims::new(None, Scope::PageServerApi))),
695 0 : _ => Err(anyhow::anyhow!("Failed to determine claims for {}", path)),
696 : }
697 0 : }
698 :
699 : /// Simple HTTP request wrapper for calling into storage controller
700 0 : async fn dispatch<RQ, RS>(
701 0 : &self,
702 0 : method: reqwest::Method,
703 0 : path: String,
704 0 : body: Option<RQ>,
705 0 : ) -> anyhow::Result<RS>
706 0 : where
707 0 : RQ: Serialize + Sized,
708 0 : RS: DeserializeOwned + Sized,
709 0 : {
710 : // In the special case of the `storage_controller start` subcommand, we wish
711 : // to use the API endpoint of the newly started storage controller in order
712 : // to pass the readiness check. In this scenario [`Self::listen`] will be set
713 : // (see [`Self::start`]).
714 : //
715 : // Otherwise, we infer the storage controller api endpoint from the configured
716 : // control plane API.
717 0 : let url = if let Some(socket_addr) = self.listen.get() {
718 0 : Url::from_str(&format!(
719 0 : "http://{}:{}/{path}",
720 0 : socket_addr.ip().to_canonical(),
721 0 : socket_addr.port()
722 0 : ))
723 0 : .unwrap()
724 : } else {
725 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
726 : // for general purpose API access.
727 0 : let listen_url = self.env.control_plane_api.clone();
728 0 : Url::from_str(&format!(
729 0 : "http://{}:{}/{path}",
730 0 : listen_url.host_str().unwrap(),
731 0 : listen_url.port().unwrap()
732 0 : ))
733 0 : .unwrap()
734 : };
735 :
736 0 : let mut builder = self.client.request(method, url);
737 0 : if let Some(body) = body {
738 0 : builder = builder.json(&body)
739 0 : }
740 0 : if let Some(private_key) = &self.private_key {
741 0 : println!("Getting claims for path {}", path);
742 0 : if let Some(required_claims) = Self::get_claims_for_path(&path)? {
743 0 : println!("Got claims {:?} for path {}", required_claims, path);
744 0 : let jwt_token = encode_from_key_file(&required_claims, private_key)?;
745 0 : builder = builder.header(
746 0 : reqwest::header::AUTHORIZATION,
747 0 : format!("Bearer {jwt_token}"),
748 0 : );
749 0 : }
750 0 : }
751 :
752 0 : let response = builder.send().await?;
753 0 : let response = response.error_from_body().await?;
754 :
755 0 : Ok(response
756 0 : .json()
757 0 : .await
758 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
759 0 : }
760 :
761 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
762 : #[instrument(skip(self))]
763 : pub async fn attach_hook(
764 : &self,
765 : tenant_shard_id: TenantShardId,
766 : pageserver_id: NodeId,
767 : ) -> anyhow::Result<Option<u32>> {
768 : let request = AttachHookRequest {
769 : tenant_shard_id,
770 : node_id: Some(pageserver_id),
771 : generation_override: None,
772 : };
773 :
774 : let response = self
775 : .dispatch::<_, AttachHookResponse>(
776 : Method::POST,
777 : "debug/v1/attach-hook".to_string(),
778 : Some(request),
779 : )
780 : .await?;
781 :
782 : Ok(response.generation)
783 : }
784 :
785 : #[instrument(skip(self))]
786 : pub async fn inspect(
787 : &self,
788 : tenant_shard_id: TenantShardId,
789 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
790 : let request = InspectRequest { tenant_shard_id };
791 :
792 : let response = self
793 : .dispatch::<_, InspectResponse>(
794 : Method::POST,
795 : "debug/v1/inspect".to_string(),
796 : Some(request),
797 : )
798 : .await?;
799 :
800 : Ok(response.attachment)
801 : }
802 :
803 : #[instrument(skip(self))]
804 : pub async fn tenant_create(
805 : &self,
806 : req: TenantCreateRequest,
807 : ) -> anyhow::Result<TenantCreateResponse> {
808 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
809 : .await
810 : }
811 :
812 : #[instrument(skip(self))]
813 : pub async fn tenant_import(&self, tenant_id: TenantId) -> anyhow::Result<TenantCreateResponse> {
814 : self.dispatch::<(), TenantCreateResponse>(
815 : Method::POST,
816 : format!("debug/v1/tenant/{tenant_id}/import"),
817 : None,
818 : )
819 : .await
820 : }
821 :
822 : #[instrument(skip(self))]
823 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
824 : self.dispatch::<(), _>(
825 : Method::GET,
826 : format!("debug/v1/tenant/{tenant_id}/locate"),
827 : None,
828 : )
829 : .await
830 : }
831 :
832 : #[instrument(skip(self))]
833 : pub async fn tenant_migrate(
834 : &self,
835 : tenant_shard_id: TenantShardId,
836 : node_id: NodeId,
837 : ) -> anyhow::Result<TenantShardMigrateResponse> {
838 : self.dispatch(
839 : Method::PUT,
840 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
841 : Some(TenantShardMigrateRequest {
842 : node_id,
843 : migration_config: None,
844 : }),
845 : )
846 : .await
847 : }
848 :
849 : #[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
850 : pub async fn tenant_split(
851 : &self,
852 : tenant_id: TenantId,
853 : new_shard_count: u8,
854 : new_stripe_size: Option<ShardStripeSize>,
855 : ) -> anyhow::Result<TenantShardSplitResponse> {
856 : self.dispatch(
857 : Method::PUT,
858 : format!("control/v1/tenant/{tenant_id}/shard_split"),
859 : Some(TenantShardSplitRequest {
860 : new_shard_count,
861 : new_stripe_size,
862 : }),
863 : )
864 : .await
865 : }
866 :
867 : #[instrument(skip_all, fields(node_id=%req.node_id))]
868 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
869 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
870 : .await
871 : }
872 :
873 : #[instrument(skip_all, fields(node_id=%req.node_id))]
874 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
875 : self.dispatch::<_, ()>(
876 : Method::PUT,
877 : format!("control/v1/node/{}/config", req.node_id),
878 : Some(req),
879 : )
880 : .await
881 : }
882 :
883 0 : pub async fn node_list(&self) -> anyhow::Result<Vec<NodeDescribeResponse>> {
884 0 : self.dispatch::<(), Vec<NodeDescribeResponse>>(
885 0 : Method::GET,
886 0 : "control/v1/node".to_string(),
887 0 : None,
888 0 : )
889 0 : .await
890 0 : }
891 :
892 : #[instrument(skip(self))]
893 : pub async fn ready(&self) -> anyhow::Result<()> {
894 : self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None)
895 : .await
896 : }
897 :
898 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
899 : pub async fn tenant_timeline_create(
900 : &self,
901 : tenant_id: TenantId,
902 : req: TimelineCreateRequest,
903 : ) -> anyhow::Result<TimelineInfo> {
904 : self.dispatch(
905 : Method::POST,
906 : format!("v1/tenant/{tenant_id}/timeline"),
907 : Some(req),
908 : )
909 : .await
910 : }
911 : }
|