Line data Source code
1 : use std::ffi::OsStr;
2 : use std::fs;
3 : use std::path::PathBuf;
4 : use std::process::ExitStatus;
5 : use std::str::FromStr;
6 : use std::sync::OnceLock;
7 : use std::time::{Duration, Instant};
8 :
9 : use camino::{Utf8Path, Utf8PathBuf};
10 : use hyper0::Uri;
11 : use nix::unistd::Pid;
12 : use pageserver_api::controller_api::{
13 : NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
14 : TenantCreateResponse, TenantLocateResponse,
15 : };
16 : use pageserver_api::models::{
17 : TenantConfig, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
18 : };
19 : use pageserver_api::shard::TenantShardId;
20 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
21 : use postgres_backend::AuthType;
22 : use reqwest::{Certificate, Method};
23 : use serde::de::DeserializeOwned;
24 : use serde::{Deserialize, Serialize};
25 : use tokio::process::Command;
26 : use tracing::instrument;
27 : use url::Url;
28 : use utils::auth::{Claims, Scope, encode_from_key_file};
29 : use utils::id::{NodeId, TenantId};
30 : use whoami::username;
31 :
32 : use crate::background_process;
33 : use crate::local_env::{LocalEnv, NeonStorageControllerConf};
34 :
35 : pub struct StorageController {
36 : env: LocalEnv,
37 : private_key: Option<Vec<u8>>,
38 : public_key: Option<String>,
39 : client: reqwest::Client,
40 : config: NeonStorageControllerConf,
41 :
42 : // The listen port is learned when starting the storage controller,
43 : // hence the use of OnceLock to init it at the right time.
44 : listen_port: OnceLock<u16>,
45 : }
46 :
47 : const COMMAND: &str = "storage_controller";
48 :
49 : const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
50 :
51 : const DB_NAME: &str = "storage_controller";
52 :
53 : pub struct NeonStorageControllerStartArgs {
54 : pub instance_id: u8,
55 : pub base_port: Option<u16>,
56 : pub start_timeout: humantime::Duration,
57 : }
58 :
59 : impl NeonStorageControllerStartArgs {
60 0 : pub fn with_default_instance_id(start_timeout: humantime::Duration) -> Self {
61 0 : Self {
62 0 : instance_id: 1,
63 0 : base_port: None,
64 0 : start_timeout,
65 0 : }
66 0 : }
67 : }
68 :
69 : pub struct NeonStorageControllerStopArgs {
70 : pub instance_id: u8,
71 : pub immediate: bool,
72 : }
73 :
74 : impl NeonStorageControllerStopArgs {
75 0 : pub fn with_default_instance_id(immediate: bool) -> Self {
76 0 : Self {
77 0 : instance_id: 1,
78 0 : immediate,
79 0 : }
80 0 : }
81 : }
82 :
83 0 : #[derive(Serialize, Deserialize)]
84 : pub struct AttachHookRequest {
85 : pub tenant_shard_id: TenantShardId,
86 : pub node_id: Option<NodeId>,
87 : pub generation_override: Option<i32>, // only new tenants
88 : pub config: Option<TenantConfig>, // only new tenants
89 : }
90 :
91 0 : #[derive(Serialize, Deserialize)]
92 : pub struct AttachHookResponse {
93 : #[serde(rename = "gen")]
94 : pub generation: Option<u32>,
95 : }
96 :
97 0 : #[derive(Serialize, Deserialize)]
98 : pub struct InspectRequest {
99 : pub tenant_shard_id: TenantShardId,
100 : }
101 :
102 0 : #[derive(Serialize, Deserialize)]
103 : pub struct InspectResponse {
104 : pub attachment: Option<(u32, NodeId)>,
105 : }
106 :
107 : impl StorageController {
108 0 : pub fn from_env(env: &LocalEnv) -> Self {
109 0 : // Assume all pageservers have symmetric auth configuration: this service
110 0 : // expects to use one JWT token to talk to all of them.
111 0 : let ps_conf = env
112 0 : .pageservers
113 0 : .first()
114 0 : .expect("Config is validated to contain at least one pageserver");
115 0 : let (private_key, public_key) = match ps_conf.http_auth_type {
116 0 : AuthType::Trust => (None, None),
117 : AuthType::NeonJWT => {
118 0 : let private_key_path = env.get_private_key_path();
119 0 : let private_key = fs::read(private_key_path).expect("failed to read private key");
120 0 :
121 0 : // If pageserver auth is enabled, this implicitly enables auth for this service,
122 0 : // using the same credentials.
123 0 : let public_key_path =
124 0 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
125 0 : .unwrap();
126 :
127 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
128 0 : let public_key = if std::fs::metadata(&public_key_path)
129 0 : .expect("Can't stat public key")
130 0 : .is_dir()
131 : {
132 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
133 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
134 0 : let mut dir =
135 0 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
136 0 : let dent = dir
137 0 : .next()
138 0 : .expect("Empty key dir")
139 0 : .expect("Error reading key dir");
140 0 :
141 0 : std::fs::read_to_string(dent.path()).expect("Can't read public key")
142 : } else {
143 0 : std::fs::read_to_string(&public_key_path).expect("Can't read public key")
144 : };
145 0 : (Some(private_key), Some(public_key))
146 : }
147 : };
148 :
149 0 : let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
150 0 : let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
151 0 : Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
152 0 : });
153 0 :
154 0 : let mut http_client = reqwest::Client::builder();
155 0 : for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
156 0 : http_client = http_client.add_root_certificate(ssl_ca_cert);
157 0 : }
158 0 : let http_client = http_client
159 0 : .build()
160 0 : .expect("HTTP client should construct with no error");
161 0 :
162 0 : Self {
163 0 : env: env.clone(),
164 0 : private_key,
165 0 : public_key,
166 0 : client: http_client,
167 0 : config: env.storage_controller.clone(),
168 0 : listen_port: OnceLock::default(),
169 0 : }
170 0 : }
171 :
172 0 : fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
173 0 : self.env
174 0 : .base_data_dir
175 0 : .join(format!("storage_controller_{}", instance_id))
176 0 : }
177 :
178 0 : fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
179 0 : Utf8PathBuf::from_path_buf(
180 0 : self.storage_controller_instance_dir(instance_id)
181 0 : .join("storage_controller.pid"),
182 0 : )
183 0 : .expect("non-Unicode path")
184 0 : }
185 :
186 : /// Find the directory containing postgres subdirectories, such `bin` and `lib`
187 : ///
188 : /// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
189 : /// to other versions if that one isn't found. Some automated tests create circumstances
190 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
191 0 : async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
192 0 : let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 16, 15, 14];
193 :
194 0 : for v in prefer_versions {
195 0 : let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
196 0 : if tokio::fs::try_exists(&path).await? {
197 0 : return Ok(path);
198 0 : }
199 : }
200 :
201 : // Fall through
202 0 : anyhow::bail!(
203 0 : "Postgres directory '{}' not found in {}",
204 0 : dir_name,
205 0 : self.env.pg_distrib_dir.display(),
206 0 : );
207 0 : }
208 :
209 0 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
210 0 : self.get_pg_dir("bin").await
211 0 : }
212 :
213 0 : pub async fn get_pg_lib_dir(&self) -> anyhow::Result<Utf8PathBuf> {
214 0 : self.get_pg_dir("lib").await
215 0 : }
216 :
217 : /// Readiness check for our postgres process
218 0 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result<bool> {
219 0 : let bin_path = pg_bin_dir.join("pg_isready");
220 0 : let args = [
221 0 : "-h",
222 0 : "localhost",
223 0 : "-U",
224 0 : &username(),
225 0 : "-d",
226 0 : DB_NAME,
227 0 : "-p",
228 0 : &format!("{}", postgres_port),
229 0 : ];
230 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
231 0 : let envs = [
232 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
233 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
234 0 : ];
235 0 : let exitcode = Command::new(bin_path)
236 0 : .args(args)
237 0 : .envs(envs)
238 0 : .spawn()?
239 0 : .wait()
240 0 : .await?;
241 :
242 0 : Ok(exitcode.success())
243 0 : }
244 :
245 : /// Create our database if it doesn't exist
246 : ///
247 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
248 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
249 : /// who just want to run `cargo neon_local` without knowing about diesel.
250 : ///
251 : /// Returns the database url
252 0 : pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result<String> {
253 0 : let database_url = format!(
254 0 : "postgresql://{}@localhost:{}/{DB_NAME}",
255 0 : &username(),
256 0 : postgres_port
257 0 : );
258 :
259 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
260 0 : let createdb_path = pg_bin_dir.join("createdb");
261 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
262 0 : let envs = [
263 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
264 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
265 0 : ];
266 0 : let output = Command::new(&createdb_path)
267 0 : .args([
268 0 : "-h",
269 0 : "localhost",
270 0 : "-p",
271 0 : &format!("{}", postgres_port),
272 0 : "-U",
273 0 : &username(),
274 0 : "-O",
275 0 : &username(),
276 0 : DB_NAME,
277 0 : ])
278 0 : .envs(envs)
279 0 : .output()
280 0 : .await
281 0 : .expect("Failed to spawn createdb");
282 0 :
283 0 : if !output.status.success() {
284 0 : let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
285 0 : if stderr.contains("already exists") {
286 0 : tracing::info!("Database {DB_NAME} already exists");
287 : } else {
288 0 : anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
289 : }
290 0 : }
291 :
292 0 : Ok(database_url)
293 0 : }
294 :
295 0 : pub async fn connect_to_database(
296 0 : &self,
297 0 : postgres_port: u16,
298 0 : ) -> anyhow::Result<(
299 0 : tokio_postgres::Client,
300 0 : tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
301 0 : )> {
302 0 : tokio_postgres::Config::new()
303 0 : .host("localhost")
304 0 : .port(postgres_port)
305 0 : // The user is the ambient operating system user name.
306 0 : // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400
307 0 : //
308 0 : // Until we get there, use the ambient operating system user name.
309 0 : // Recent tokio-postgres versions default to this if the user isn't specified.
310 0 : // But tokio-postgres fork doesn't have this upstream commit:
311 0 : // https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
312 0 : // => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
313 0 : .user(&username())
314 0 : .dbname(DB_NAME)
315 0 : .connect(tokio_postgres::NoTls)
316 0 : .await
317 0 : .map_err(anyhow::Error::new)
318 0 : }
319 :
320 : /// Wrapper for the pg_ctl binary, which we spawn as a short-lived subprocess when starting and stopping postgres
321 0 : async fn pg_ctl<I, S>(&self, args: I) -> ExitStatus
322 0 : where
323 0 : I: IntoIterator<Item = S>,
324 0 : S: AsRef<OsStr>,
325 0 : {
326 0 : let pg_bin_dir = self.get_pg_bin_dir().await.unwrap();
327 0 : let bin_path = pg_bin_dir.join("pg_ctl");
328 :
329 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
330 0 : let envs = [
331 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
332 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
333 0 : ];
334 0 :
335 0 : Command::new(bin_path)
336 0 : .args(args)
337 0 : .envs(envs)
338 0 : .spawn()
339 0 : .expect("Failed to spawn pg_ctl, binary_missing?")
340 0 : .wait()
341 0 : .await
342 0 : .expect("Failed to wait for pg_ctl termination")
343 0 : }
344 :
345 0 : pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> {
346 0 : let instance_dir = self.storage_controller_instance_dir(start_args.instance_id);
347 0 : if let Err(err) = tokio::fs::create_dir(&instance_dir).await {
348 0 : if err.kind() != std::io::ErrorKind::AlreadyExists {
349 0 : panic!("Failed to create instance dir {instance_dir:?}");
350 0 : }
351 0 : }
352 :
353 0 : if self.env.generate_local_ssl_certs {
354 0 : self.env.generate_ssl_cert(
355 0 : &instance_dir.join("server.crt"),
356 0 : &instance_dir.join("server.key"),
357 0 : )?;
358 0 : }
359 :
360 0 : let listen_url = &self.env.control_plane_api;
361 0 :
362 0 : let scheme = listen_url.scheme();
363 0 : let host = listen_url.host_str().unwrap();
364 :
365 0 : let (listen_port, postgres_port) = if let Some(base_port) = start_args.base_port {
366 0 : (
367 0 : base_port,
368 0 : self.config
369 0 : .database_url
370 0 : .expect("--base-port requires NeonStorageControllerConf::database_url")
371 0 : .port(),
372 0 : )
373 : } else {
374 0 : let port = listen_url.port().unwrap();
375 0 : (port, port + 1)
376 : };
377 :
378 0 : self.listen_port
379 0 : .set(listen_port)
380 0 : .expect("StorageController::listen_port is only set here");
381 :
382 : // Do we remove the pid file on stop?
383 0 : let pg_started = self.is_postgres_running().await?;
384 0 : let pg_lib_dir = self.get_pg_lib_dir().await?;
385 :
386 0 : if !pg_started {
387 : // Start a vanilla Postgres process used by the storage controller for persistence.
388 0 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
389 0 : .unwrap()
390 0 : .join("storage_controller_db");
391 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
392 0 : let pg_log_path = pg_data_path.join("postgres.log");
393 0 :
394 0 : if !tokio::fs::try_exists(&pg_data_path).await? {
395 0 : let initdb_args = [
396 0 : "--pgdata",
397 0 : pg_data_path.as_ref(),
398 0 : "--username",
399 0 : &username(),
400 0 : "--no-sync",
401 0 : "--no-instructions",
402 0 : ];
403 0 : tracing::info!(
404 0 : "Initializing storage controller database with args: {:?}",
405 : initdb_args
406 : );
407 :
408 : // Initialize empty database
409 0 : let initdb_path = pg_bin_dir.join("initdb");
410 0 : let mut child = Command::new(&initdb_path)
411 0 : .envs(vec![
412 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
413 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
414 0 : ])
415 0 : .args(initdb_args)
416 0 : .spawn()
417 0 : .expect("Failed to spawn initdb");
418 0 : let status = child.wait().await?;
419 0 : if !status.success() {
420 0 : anyhow::bail!("initdb failed with status {status}");
421 0 : }
422 0 : };
423 :
424 : // Write a minimal config file:
425 : // - Specify the port, since this is chosen dynamically
426 : // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
427 : // the storage controller we don't want a slow local disk to interfere with that.
428 : //
429 : // NB: it's important that we rewrite this file on each start command so we propagate changes
430 : // from `LocalEnv`'s config file (`.neon/config`).
431 0 : tokio::fs::write(
432 0 : &pg_data_path.join("postgresql.conf"),
433 0 : format!("port = {}\nfsync=off\n", postgres_port),
434 0 : )
435 0 : .await?;
436 :
437 0 : println!("Starting storage controller database...");
438 0 : let db_start_args = [
439 0 : "-w",
440 0 : "-D",
441 0 : pg_data_path.as_ref(),
442 0 : "-l",
443 0 : pg_log_path.as_ref(),
444 0 : "-U",
445 0 : &username(),
446 0 : "start",
447 0 : ];
448 0 : tracing::info!(
449 0 : "Starting storage controller database with args: {:?}",
450 : db_start_args
451 : );
452 :
453 0 : let db_start_status = self.pg_ctl(db_start_args).await;
454 0 : let start_timeout: Duration = start_args.start_timeout.into();
455 0 : let db_start_deadline = Instant::now() + start_timeout;
456 0 : if !db_start_status.success() {
457 0 : return Err(anyhow::anyhow!(
458 0 : "Failed to start postgres {}",
459 0 : db_start_status.code().unwrap()
460 0 : ));
461 0 : }
462 :
463 : loop {
464 0 : if Instant::now() > db_start_deadline {
465 0 : return Err(anyhow::anyhow!("Timed out waiting for postgres to start"));
466 0 : }
467 0 :
468 0 : match self.pg_isready(&pg_bin_dir, postgres_port).await {
469 : Ok(true) => {
470 0 : tracing::info!("storage controller postgres is now ready");
471 0 : break;
472 : }
473 : Ok(false) => {
474 0 : tokio::time::sleep(Duration::from_millis(100)).await;
475 : }
476 0 : Err(e) => {
477 0 : tracing::warn!("Failed to check postgres status: {e}")
478 : }
479 : }
480 : }
481 :
482 0 : self.setup_database(postgres_port).await?;
483 0 : }
484 :
485 0 : let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
486 0 :
487 0 : // We support running a startup SQL script to fiddle with the database before we launch storcon.
488 0 : // This is used by the test suite.
489 0 : let startup_script_path = self
490 0 : .env
491 0 : .base_data_dir
492 0 : .join("storage_controller_db.startup.sql");
493 0 : let startup_script = match tokio::fs::read_to_string(&startup_script_path).await {
494 0 : Ok(script) => {
495 0 : tokio::fs::remove_file(startup_script_path).await?;
496 0 : script
497 : }
498 0 : Err(e) => {
499 0 : if e.kind() == std::io::ErrorKind::NotFound {
500 : // always run some startup script so that this code path doesn't bit rot
501 0 : "BEGIN; COMMIT;".to_string()
502 : } else {
503 0 : anyhow::bail!("Failed to read startup script: {e}")
504 : }
505 : }
506 : };
507 0 : let (mut client, conn) = self.connect_to_database(postgres_port).await?;
508 0 : let conn = tokio::spawn(conn);
509 0 : let tx = client.build_transaction();
510 0 : let tx = tx.start().await?;
511 0 : tx.batch_execute(&startup_script).await?;
512 0 : tx.commit().await?;
513 0 : drop(client);
514 0 : conn.await??;
515 :
516 0 : let addr = format!("{}:{}", host, listen_port);
517 0 : let address_for_peers = Uri::builder()
518 0 : .scheme(scheme)
519 0 : .authority(addr.clone())
520 0 : .path_and_query("")
521 0 : .build()
522 0 : .unwrap();
523 0 :
524 0 : let mut args = vec![
525 0 : "--dev",
526 0 : "--database-url",
527 0 : &database_url,
528 0 : "--max-offline-interval",
529 0 : &humantime::Duration::from(self.config.max_offline).to_string(),
530 0 : "--max-warming-up-interval",
531 0 : &humantime::Duration::from(self.config.max_warming_up).to_string(),
532 0 : "--heartbeat-interval",
533 0 : &humantime::Duration::from(self.config.heartbeat_interval).to_string(),
534 0 : "--address-for-peers",
535 0 : &address_for_peers.to_string(),
536 0 : ]
537 0 : .into_iter()
538 0 : .map(|s| s.to_string())
539 0 : .collect::<Vec<_>>();
540 0 :
541 0 : match scheme {
542 0 : "http" => args.extend(["--listen".to_string(), addr]),
543 0 : "https" => args.extend(["--listen-https".to_string(), addr]),
544 : _ => {
545 0 : panic!("Unexpected url scheme in control_plane_api: {scheme}");
546 : }
547 : }
548 :
549 0 : if self.config.start_as_candidate {
550 0 : args.push("--start-as-candidate".to_string());
551 0 : }
552 :
553 0 : if self.config.use_https_pageserver_api {
554 0 : args.push("--use-https-pageserver-api".to_string());
555 0 : }
556 :
557 0 : if self.config.use_https_safekeeper_api {
558 0 : args.push("--use-https-safekeeper-api".to_string());
559 0 : }
560 :
561 0 : if self.config.use_local_compute_notifications {
562 0 : args.push("--use-local-compute-notifications".to_string());
563 0 : }
564 :
565 0 : if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
566 0 : args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
567 0 : }
568 :
569 0 : if let Some(private_key) = &self.private_key {
570 0 : let claims = Claims::new(None, Scope::PageServerApi);
571 0 : let jwt_token =
572 0 : encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
573 0 : args.push(format!("--jwt-token={jwt_token}"));
574 0 :
575 0 : let peer_claims = Claims::new(None, Scope::Admin);
576 0 : let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
577 0 : .expect("failed to generate jwt token");
578 0 : args.push(format!("--peer-jwt-token={peer_jwt_token}"));
579 0 : }
580 :
581 0 : if let Some(public_key) = &self.public_key {
582 0 : args.push(format!("--public-key=\"{public_key}\""));
583 0 : }
584 :
585 0 : if let Some(control_plane_hooks_api) = &self.env.control_plane_hooks_api {
586 0 : args.push(format!("--control-plane-url={control_plane_hooks_api}"));
587 0 : }
588 :
589 0 : if let Some(split_threshold) = self.config.split_threshold.as_ref() {
590 0 : args.push(format!("--split-threshold={split_threshold}"))
591 0 : }
592 :
593 0 : if let Some(max_split_shards) = self.config.max_split_shards.as_ref() {
594 0 : args.push(format!("--max-split-shards={max_split_shards}"))
595 0 : }
596 :
597 0 : if let Some(initial_split_threshold) = self.config.initial_split_threshold.as_ref() {
598 0 : args.push(format!(
599 0 : "--initial-split-threshold={initial_split_threshold}"
600 0 : ))
601 0 : }
602 :
603 0 : if let Some(initial_split_shards) = self.config.initial_split_shards.as_ref() {
604 0 : args.push(format!("--initial-split-shards={initial_split_shards}"))
605 0 : }
606 :
607 0 : if let Some(lag) = self.config.max_secondary_lag_bytes.as_ref() {
608 0 : args.push(format!("--max-secondary-lag-bytes={lag}"))
609 0 : }
610 :
611 0 : if let Some(threshold) = self.config.long_reconcile_threshold {
612 0 : args.push(format!(
613 0 : "--long-reconcile-threshold={}",
614 0 : humantime::Duration::from(threshold)
615 0 : ))
616 0 : }
617 :
618 0 : args.push(format!(
619 0 : "--neon-local-repo-dir={}",
620 0 : self.env.base_data_dir.display()
621 0 : ));
622 0 :
623 0 : if self.config.timelines_onto_safekeepers {
624 0 : args.push("--timelines-onto-safekeepers".to_string());
625 0 : }
626 :
627 0 : println!("Starting storage controller");
628 0 :
629 0 : background_process::start_process(
630 0 : COMMAND,
631 0 : &instance_dir,
632 0 : &self.env.storage_controller_bin(),
633 0 : args,
634 0 : vec![
635 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
636 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
637 0 : ],
638 0 : background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)),
639 0 : &start_args.start_timeout,
640 0 : || async {
641 0 : match self.ready().await {
642 0 : Ok(_) => Ok(true),
643 0 : Err(_) => Ok(false),
644 : }
645 0 : },
646 0 : )
647 0 : .await?;
648 :
649 0 : Ok(())
650 0 : }
651 :
652 0 : pub async fn stop(&self, stop_args: NeonStorageControllerStopArgs) -> anyhow::Result<()> {
653 0 : background_process::stop_process(
654 0 : stop_args.immediate,
655 0 : COMMAND,
656 0 : &self.pid_file(stop_args.instance_id),
657 0 : )?;
658 :
659 0 : let storcon_instances = self.env.storage_controller_instances().await?;
660 0 : for (instance_id, instanced_dir_path) in storcon_instances {
661 0 : if instance_id == stop_args.instance_id {
662 0 : continue;
663 0 : }
664 0 :
665 0 : let pid_file = instanced_dir_path.join("storage_controller.pid");
666 0 : let pid = tokio::fs::read_to_string(&pid_file)
667 0 : .await
668 0 : .map_err(|err| {
669 0 : anyhow::anyhow!("Failed to read storcon pid file at {pid_file:?}: {err}")
670 0 : })?
671 0 : .parse::<i32>()
672 0 : .expect("pid is valid i32");
673 :
674 0 : let other_proc_alive = !background_process::process_has_stopped(Pid::from_raw(pid))?;
675 0 : if other_proc_alive {
676 : // There is another storage controller instance running, so we return
677 : // and leave the database running.
678 0 : return Ok(());
679 0 : }
680 : }
681 :
682 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
683 0 :
684 0 : println!("Stopping storage controller database...");
685 0 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
686 0 : let stop_status = self.pg_ctl(pg_stop_args).await;
687 0 : if !stop_status.success() {
688 0 : match self.is_postgres_running().await {
689 : Ok(false) => {
690 0 : println!("Storage controller database is already stopped");
691 0 : return Ok(());
692 : }
693 : Ok(true) => {
694 0 : anyhow::bail!("Failed to stop storage controller database");
695 : }
696 0 : Err(err) => {
697 0 : anyhow::bail!("Failed to stop storage controller database: {err}");
698 : }
699 : }
700 0 : }
701 0 :
702 0 : Ok(())
703 0 : }
704 :
705 0 : async fn is_postgres_running(&self) -> anyhow::Result<bool> {
706 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
707 0 :
708 0 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
709 0 : let status_exitcode = self.pg_ctl(pg_status_args).await;
710 :
711 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
712 : // fine that stop failed. Otherwise it is an error that stop failed.
713 : const PG_STATUS_NOT_RUNNING: i32 = 3;
714 : const PG_NO_DATA_DIR: i32 = 4;
715 : const PG_STATUS_RUNNING: i32 = 0;
716 0 : match status_exitcode.code() {
717 0 : Some(PG_STATUS_NOT_RUNNING) => Ok(false),
718 0 : Some(PG_NO_DATA_DIR) => Ok(false),
719 0 : Some(PG_STATUS_RUNNING) => Ok(true),
720 0 : Some(code) => Err(anyhow::anyhow!(
721 0 : "pg_ctl status returned unexpected status code: {:?}",
722 0 : code
723 0 : )),
724 0 : None => Err(anyhow::anyhow!("pg_ctl status returned no status code")),
725 : }
726 0 : }
727 :
728 0 : fn get_claims_for_path(path: &str) -> anyhow::Result<Option<Claims>> {
729 0 : let category = match path.find('/') {
730 0 : Some(idx) => &path[..idx],
731 0 : None => path,
732 : };
733 :
734 0 : match category {
735 0 : "status" | "ready" => Ok(None),
736 0 : "control" | "debug" => Ok(Some(Claims::new(None, Scope::Admin))),
737 0 : "v1" => Ok(Some(Claims::new(None, Scope::PageServerApi))),
738 0 : _ => Err(anyhow::anyhow!("Failed to determine claims for {}", path)),
739 : }
740 0 : }
741 :
742 : /// Simple HTTP request wrapper for calling into storage controller
743 0 : async fn dispatch<RQ, RS>(
744 0 : &self,
745 0 : method: reqwest::Method,
746 0 : path: String,
747 0 : body: Option<RQ>,
748 0 : ) -> anyhow::Result<RS>
749 0 : where
750 0 : RQ: Serialize + Sized,
751 0 : RS: DeserializeOwned + Sized,
752 0 : {
753 : // In the special case of the `storage_controller start` subcommand, we wish
754 : // to use the API endpoint of the newly started storage controller in order
755 : // to pass the readiness check. In this scenario [`Self::listen_port`] will
756 : // be set (see [`Self::start`]).
757 : //
758 : // Otherwise, we infer the storage controller api endpoint from the configured
759 : // control plane API.
760 0 : let port = if let Some(port) = self.listen_port.get() {
761 0 : *port
762 : } else {
763 0 : self.env.control_plane_api.port().unwrap()
764 : };
765 :
766 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
767 : // for general purpose API access.
768 0 : let url = Url::from_str(&format!(
769 0 : "{}://{}:{port}/{path}",
770 0 : self.env.control_plane_api.scheme(),
771 0 : self.env.control_plane_api.host_str().unwrap(),
772 0 : ))
773 0 : .unwrap();
774 0 :
775 0 : let mut builder = self.client.request(method, url);
776 0 : if let Some(body) = body {
777 0 : builder = builder.json(&body)
778 0 : }
779 0 : if let Some(private_key) = &self.private_key {
780 0 : println!("Getting claims for path {}", path);
781 0 : if let Some(required_claims) = Self::get_claims_for_path(&path)? {
782 0 : println!("Got claims {:?} for path {}", required_claims, path);
783 0 : let jwt_token = encode_from_key_file(&required_claims, private_key)?;
784 0 : builder = builder.header(
785 0 : reqwest::header::AUTHORIZATION,
786 0 : format!("Bearer {jwt_token}"),
787 0 : );
788 0 : }
789 0 : }
790 :
791 0 : let response = builder.send().await?;
792 0 : let response = response.error_from_body().await?;
793 :
794 0 : Ok(response
795 0 : .json()
796 0 : .await
797 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
798 0 : }
799 :
800 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
801 : #[instrument(skip(self))]
802 : pub async fn attach_hook(
803 : &self,
804 : tenant_shard_id: TenantShardId,
805 : pageserver_id: NodeId,
806 : ) -> anyhow::Result<Option<u32>> {
807 : let request = AttachHookRequest {
808 : tenant_shard_id,
809 : node_id: Some(pageserver_id),
810 : generation_override: None,
811 : config: None,
812 : };
813 :
814 : let response = self
815 : .dispatch::<_, AttachHookResponse>(
816 : Method::POST,
817 : "debug/v1/attach-hook".to_string(),
818 : Some(request),
819 : )
820 : .await?;
821 :
822 : Ok(response.generation)
823 : }
824 :
825 : #[instrument(skip(self))]
826 : pub async fn inspect(
827 : &self,
828 : tenant_shard_id: TenantShardId,
829 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
830 : let request = InspectRequest { tenant_shard_id };
831 :
832 : let response = self
833 : .dispatch::<_, InspectResponse>(
834 : Method::POST,
835 : "debug/v1/inspect".to_string(),
836 : Some(request),
837 : )
838 : .await?;
839 :
840 : Ok(response.attachment)
841 : }
842 :
843 : #[instrument(skip(self))]
844 : pub async fn tenant_create(
845 : &self,
846 : req: TenantCreateRequest,
847 : ) -> anyhow::Result<TenantCreateResponse> {
848 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
849 : .await
850 : }
851 :
852 : #[instrument(skip(self))]
853 : pub async fn tenant_import(&self, tenant_id: TenantId) -> anyhow::Result<TenantCreateResponse> {
854 : self.dispatch::<(), TenantCreateResponse>(
855 : Method::POST,
856 : format!("debug/v1/tenant/{tenant_id}/import"),
857 : None,
858 : )
859 : .await
860 : }
861 :
862 : #[instrument(skip(self))]
863 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
864 : self.dispatch::<(), _>(
865 : Method::GET,
866 : format!("debug/v1/tenant/{tenant_id}/locate"),
867 : None,
868 : )
869 : .await
870 : }
871 :
872 : #[instrument(skip_all, fields(node_id=%req.node_id))]
873 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
874 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
875 : .await
876 : }
877 :
878 : #[instrument(skip_all, fields(node_id=%req.node_id))]
879 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
880 : self.dispatch::<_, ()>(
881 : Method::PUT,
882 : format!("control/v1/node/{}/config", req.node_id),
883 : Some(req),
884 : )
885 : .await
886 : }
887 :
888 0 : pub async fn node_list(&self) -> anyhow::Result<Vec<NodeDescribeResponse>> {
889 0 : self.dispatch::<(), Vec<NodeDescribeResponse>>(
890 0 : Method::GET,
891 0 : "control/v1/node".to_string(),
892 0 : None,
893 0 : )
894 0 : .await
895 0 : }
896 :
897 : #[instrument(skip(self))]
898 : pub async fn ready(&self) -> anyhow::Result<()> {
899 : self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None)
900 : .await
901 : }
902 :
903 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
904 : pub async fn tenant_timeline_create(
905 : &self,
906 : tenant_id: TenantId,
907 : req: TimelineCreateRequest,
908 : ) -> anyhow::Result<TimelineInfo> {
909 : self.dispatch(
910 : Method::POST,
911 : format!("v1/tenant/{tenant_id}/timeline"),
912 : Some(req),
913 : )
914 : .await
915 : }
916 :
917 0 : pub async fn set_tenant_config(&self, req: &TenantConfigRequest) -> anyhow::Result<()> {
918 0 : self.dispatch(Method::PUT, "v1/tenant/config".to_string(), Some(req))
919 0 : .await
920 0 : }
921 : }
|