Line data Source code
1 : use std::ffi::OsStr;
2 : use std::fs;
3 : use std::path::PathBuf;
4 : use std::process::ExitStatus;
5 : use std::str::FromStr;
6 : use std::sync::OnceLock;
7 : use std::time::{Duration, Instant};
8 :
9 : use crate::background_process;
10 : use crate::local_env::{LocalEnv, NeonStorageControllerConf};
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use hyper0::Uri;
13 : use nix::unistd::Pid;
14 : use pageserver_api::controller_api::{
15 : NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest,
16 : SafekeeperSchedulingPolicyRequest, SkSchedulingPolicy, TenantCreateRequest,
17 : TenantCreateResponse, TenantLocateResponse,
18 : };
19 : use pageserver_api::models::{
20 : TenantConfig, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
21 : };
22 : use pageserver_api::shard::TenantShardId;
23 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
24 : use pem::Pem;
25 : use postgres_backend::AuthType;
26 : use reqwest::{Method, Response};
27 : use safekeeper_api::PgMajorVersion;
28 : use serde::de::DeserializeOwned;
29 : use serde::{Deserialize, Serialize};
30 : use tokio::process::Command;
31 : use tracing::instrument;
32 : use url::Url;
33 : use utils::auth::{Claims, Scope, encode_from_key_file, encode_hadron_token};
34 : use utils::id::{NodeId, TenantId};
35 : use whoami::username;
36 :
37 : pub struct StorageController {
38 : env: LocalEnv,
39 : private_key: Option<StorageControllerPrivateKey>,
40 : public_key: Option<StorageControllerPublicKey>,
41 : client: reqwest::Client,
42 : config: NeonStorageControllerConf,
43 :
44 : // The listen port is learned when starting the storage controller,
45 : // hence the use of OnceLock to init it at the right time.
46 : listen_port: OnceLock<u16>,
47 : }
48 :
49 : const COMMAND: &str = "storage_controller";
50 :
51 : const STORAGE_CONTROLLER_POSTGRES_VERSION: PgMajorVersion = PgMajorVersion::PG16;
52 :
53 : const DB_NAME: &str = "storage_controller";
54 :
55 : pub struct NeonStorageControllerStartArgs {
56 : pub instance_id: u8,
57 : pub base_port: Option<u16>,
58 : pub start_timeout: humantime::Duration,
59 : pub handle_ps_local_disk_loss: Option<bool>,
60 : }
61 :
62 : impl NeonStorageControllerStartArgs {
63 0 : pub fn with_default_instance_id(start_timeout: humantime::Duration) -> Self {
64 0 : Self {
65 0 : instance_id: 1,
66 0 : base_port: None,
67 0 : start_timeout,
68 0 : handle_ps_local_disk_loss: None,
69 0 : }
70 0 : }
71 : }
72 :
73 : pub struct NeonStorageControllerStopArgs {
74 : pub instance_id: u8,
75 : pub immediate: bool,
76 : }
77 :
78 : impl NeonStorageControllerStopArgs {
79 0 : pub fn with_default_instance_id(immediate: bool) -> Self {
80 0 : Self {
81 0 : instance_id: 1,
82 0 : immediate,
83 0 : }
84 0 : }
85 : }
86 :
87 0 : #[derive(Serialize, Deserialize)]
88 : pub struct AttachHookRequest {
89 : pub tenant_shard_id: TenantShardId,
90 : pub node_id: Option<NodeId>,
91 : pub generation_override: Option<i32>, // only new tenants
92 : pub config: Option<TenantConfig>, // only new tenants
93 : }
94 :
95 0 : #[derive(Serialize, Deserialize)]
96 : pub struct AttachHookResponse {
97 : #[serde(rename = "gen")]
98 : pub generation: Option<u32>,
99 : }
100 :
101 0 : #[derive(Serialize, Deserialize)]
102 : pub struct InspectRequest {
103 : pub tenant_shard_id: TenantShardId,
104 : }
105 :
106 0 : #[derive(Serialize, Deserialize)]
107 : pub struct InspectResponse {
108 : pub attachment: Option<(u32, NodeId)>,
109 : }
110 :
111 : enum StorageControllerPublicKey {
112 : RawPublicKey(Pem),
113 : PublicKeyCertPath(Utf8PathBuf),
114 : }
115 :
116 : enum StorageControllerPrivateKey {
117 : EdPrivateKey(Pem),
118 : HadronPrivateKey(Utf8PathBuf, Vec<u8>),
119 : }
120 :
121 : impl StorageControllerPrivateKey {
122 0 : pub fn encode_token(&self, claims: &Claims) -> anyhow::Result<String> {
123 0 : match self {
124 0 : Self::EdPrivateKey(key_data) => encode_from_key_file(claims, key_data),
125 0 : Self::HadronPrivateKey(_, key_data) => encode_hadron_token(claims, key_data),
126 : }
127 0 : }
128 : }
129 :
130 : impl StorageController {
131 0 : pub fn from_env(env: &LocalEnv) -> Self {
132 : // Assume all pageservers have symmetric auth configuration: this service
133 : // expects to use one JWT token to talk to all of them.
134 0 : let ps_conf = env
135 0 : .pageservers
136 0 : .first()
137 0 : .expect("Config is validated to contain at least one pageserver");
138 0 : let (private_key, public_key) = match ps_conf.http_auth_type {
139 0 : AuthType::Trust => (None, None),
140 : AuthType::NeonJWT => {
141 0 : let private_key_path = env.get_private_key_path();
142 0 : let private_key =
143 0 : pem::parse(fs::read(private_key_path).expect("failed to read private key"))
144 0 : .expect("failed to parse PEM file");
145 :
146 : // If pageserver auth is enabled, this implicitly enables auth for this service,
147 : // using the same credentials.
148 0 : let public_key_path =
149 0 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
150 0 : .unwrap();
151 :
152 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
153 0 : let public_key = if std::fs::metadata(&public_key_path)
154 0 : .expect("Can't stat public key")
155 0 : .is_dir()
156 : {
157 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
158 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
159 0 : let mut dir =
160 0 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
161 0 : let dent = dir
162 0 : .next()
163 0 : .expect("Empty key dir")
164 0 : .expect("Error reading key dir");
165 :
166 0 : pem::parse(std::fs::read_to_string(dent.path()).expect("Can't read public key"))
167 0 : .expect("Failed to parse PEM file")
168 : } else {
169 0 : pem::parse(
170 0 : std::fs::read_to_string(&public_key_path).expect("Can't read public key"),
171 : )
172 0 : .expect("Failed to parse PEM file")
173 : };
174 0 : (
175 0 : Some(StorageControllerPrivateKey::EdPrivateKey(private_key)),
176 0 : Some(StorageControllerPublicKey::RawPublicKey(public_key)),
177 0 : )
178 : }
179 : AuthType::HadronJWT => {
180 0 : let private_key_path = env.get_private_key_path();
181 0 : let private_key =
182 0 : fs::read(private_key_path.clone()).expect("failed to read private key");
183 :
184 : // If pageserver auth is enabled, this implicitly enables auth for this service,
185 : // using the same credentials.
186 0 : let public_key_path =
187 0 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
188 0 : .unwrap();
189 0 : (
190 0 : Some(StorageControllerPrivateKey::HadronPrivateKey(
191 0 : camino::Utf8PathBuf::try_from(private_key_path).unwrap(),
192 0 : private_key,
193 0 : )),
194 0 : Some(StorageControllerPublicKey::PublicKeyCertPath(
195 0 : public_key_path,
196 0 : )),
197 0 : )
198 : }
199 : };
200 :
201 0 : Self {
202 0 : env: env.clone(),
203 0 : private_key,
204 0 : public_key,
205 0 : client: env.create_http_client(),
206 0 : config: env.storage_controller.clone(),
207 0 : listen_port: OnceLock::default(),
208 0 : }
209 0 : }
210 :
211 0 : fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
212 0 : self.env
213 0 : .base_data_dir
214 0 : .join(format!("storage_controller_{instance_id}"))
215 0 : }
216 :
217 0 : fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
218 0 : Utf8PathBuf::from_path_buf(
219 0 : self.storage_controller_instance_dir(instance_id)
220 0 : .join("storage_controller.pid"),
221 : )
222 0 : .expect("non-Unicode path")
223 0 : }
224 :
225 : /// Find the directory containing postgres subdirectories, such `bin` and `lib`
226 : ///
227 : /// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
228 : /// to other versions if that one isn't found. Some automated tests create circumstances
229 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
230 0 : async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
231 : const PREFER_VERSIONS: [PgMajorVersion; 5] = [
232 : STORAGE_CONTROLLER_POSTGRES_VERSION,
233 : PgMajorVersion::PG16,
234 : PgMajorVersion::PG15,
235 : PgMajorVersion::PG14,
236 : PgMajorVersion::PG17,
237 : ];
238 :
239 0 : for v in PREFER_VERSIONS {
240 0 : let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
241 0 : if tokio::fs::try_exists(&path).await? {
242 0 : return Ok(path);
243 0 : }
244 : }
245 :
246 : // Fall through
247 0 : anyhow::bail!(
248 0 : "Postgres directory '{}' not found in {}",
249 : dir_name,
250 0 : self.env.pg_distrib_dir.display(),
251 : );
252 0 : }
253 :
254 0 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
255 0 : self.get_pg_dir("bin").await
256 0 : }
257 :
258 0 : pub async fn get_pg_lib_dir(&self) -> anyhow::Result<Utf8PathBuf> {
259 0 : self.get_pg_dir("lib").await
260 0 : }
261 :
262 : /// Readiness check for our postgres process
263 0 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result<bool> {
264 0 : let bin_path = pg_bin_dir.join("pg_isready");
265 0 : let args = [
266 0 : "-h",
267 0 : "localhost",
268 0 : "-U",
269 0 : &username(),
270 0 : "-d",
271 0 : DB_NAME,
272 0 : "-p",
273 0 : &format!("{postgres_port}"),
274 0 : ];
275 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
276 0 : let envs = [
277 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
278 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
279 0 : ];
280 0 : let exitcode = Command::new(bin_path)
281 0 : .args(args)
282 0 : .envs(envs)
283 0 : .spawn()?
284 0 : .wait()
285 0 : .await?;
286 :
287 0 : Ok(exitcode.success())
288 0 : }
289 :
290 : /// Create our database if it doesn't exist
291 : ///
292 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
293 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
294 : /// who just want to run `cargo neon_local` without knowing about diesel.
295 : ///
296 : /// Returns the database url
297 0 : pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result<String> {
298 0 : let database_url = format!(
299 0 : "postgresql://{}@localhost:{}/{DB_NAME}",
300 0 : &username(),
301 : postgres_port
302 : );
303 :
304 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
305 0 : let createdb_path = pg_bin_dir.join("createdb");
306 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
307 0 : let envs = [
308 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
309 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
310 0 : ];
311 0 : let output = Command::new(&createdb_path)
312 0 : .args([
313 0 : "-h",
314 0 : "localhost",
315 0 : "-p",
316 0 : &format!("{postgres_port}"),
317 0 : "-U",
318 0 : &username(),
319 0 : "-O",
320 0 : &username(),
321 0 : DB_NAME,
322 0 : ])
323 0 : .envs(envs)
324 0 : .output()
325 0 : .await
326 0 : .expect("Failed to spawn createdb");
327 :
328 0 : if !output.status.success() {
329 0 : let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
330 0 : if stderr.contains("already exists") {
331 0 : tracing::info!("Database {DB_NAME} already exists");
332 : } else {
333 0 : anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
334 : }
335 0 : }
336 :
337 0 : Ok(database_url)
338 0 : }
339 :
340 0 : pub async fn connect_to_database(
341 0 : &self,
342 0 : postgres_port: u16,
343 0 : ) -> anyhow::Result<(
344 0 : tokio_postgres::Client,
345 0 : tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
346 0 : )> {
347 0 : tokio_postgres::Config::new()
348 0 : .host("localhost")
349 0 : .port(postgres_port)
350 0 : // The user is the ambient operating system user name.
351 0 : // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400
352 0 : //
353 0 : // Until we get there, use the ambient operating system user name.
354 0 : // Recent tokio-postgres versions default to this if the user isn't specified.
355 0 : // But tokio-postgres fork doesn't have this upstream commit:
356 0 : // https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
357 0 : // => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
358 0 : .user(&username())
359 0 : .dbname(DB_NAME)
360 0 : .connect(tokio_postgres::NoTls)
361 0 : .await
362 0 : .map_err(anyhow::Error::new)
363 0 : }
364 :
365 : /// Wrapper for the pg_ctl binary, which we spawn as a short-lived subprocess when starting and stopping postgres
366 0 : async fn pg_ctl<I, S>(&self, args: I) -> ExitStatus
367 0 : where
368 0 : I: IntoIterator<Item = S>,
369 0 : S: AsRef<OsStr>,
370 0 : {
371 0 : let pg_bin_dir = self.get_pg_bin_dir().await.unwrap();
372 0 : let bin_path = pg_bin_dir.join("pg_ctl");
373 :
374 0 : let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
375 0 : let envs = [
376 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
377 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
378 0 : ];
379 :
380 0 : Command::new(bin_path)
381 0 : .args(args)
382 0 : .envs(envs)
383 0 : .spawn()
384 0 : .expect("Failed to spawn pg_ctl, binary_missing?")
385 0 : .wait()
386 0 : .await
387 0 : .expect("Failed to wait for pg_ctl termination")
388 0 : }
389 :
390 0 : pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> {
391 0 : let instance_dir = self.storage_controller_instance_dir(start_args.instance_id);
392 0 : if let Err(err) = tokio::fs::create_dir(&instance_dir).await {
393 0 : if err.kind() != std::io::ErrorKind::AlreadyExists {
394 0 : panic!("Failed to create instance dir {instance_dir:?}");
395 0 : }
396 0 : }
397 :
398 0 : if self.env.generate_local_ssl_certs {
399 0 : self.env.generate_ssl_cert(
400 0 : &instance_dir.join("server.crt"),
401 0 : &instance_dir.join("server.key"),
402 0 : )?;
403 0 : }
404 :
405 0 : let listen_url = &self.env.control_plane_api;
406 :
407 0 : let scheme = listen_url.scheme();
408 0 : let host = listen_url.host_str().unwrap();
409 :
410 0 : let (listen_port, postgres_port) = if let Some(base_port) = start_args.base_port {
411 0 : (
412 0 : base_port,
413 0 : self.config
414 0 : .database_url
415 0 : .expect("--base-port requires NeonStorageControllerConf::database_url")
416 0 : .port(),
417 0 : )
418 : } else {
419 0 : let port = listen_url.port().unwrap();
420 0 : (port, port + 1)
421 : };
422 :
423 0 : self.listen_port
424 0 : .set(listen_port)
425 0 : .expect("StorageController::listen_port is only set here");
426 :
427 : // Do we remove the pid file on stop?
428 0 : let pg_started = self.is_postgres_running().await?;
429 0 : let pg_lib_dir = self.get_pg_lib_dir().await?;
430 :
431 0 : if !pg_started {
432 : // Start a vanilla Postgres process used by the storage controller for persistence.
433 0 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
434 0 : .unwrap()
435 0 : .join("storage_controller_db");
436 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
437 0 : let pg_log_path = pg_data_path.join("postgres.log");
438 :
439 0 : if !tokio::fs::try_exists(&pg_data_path).await? {
440 0 : let initdb_args = [
441 0 : "--pgdata",
442 0 : pg_data_path.as_ref(),
443 0 : "--username",
444 0 : &username(),
445 0 : "--no-sync",
446 0 : "--no-instructions",
447 0 : ];
448 0 : tracing::info!(
449 0 : "Initializing storage controller database with args: {:?}",
450 : initdb_args
451 : );
452 :
453 : // Initialize empty database
454 0 : let initdb_path = pg_bin_dir.join("initdb");
455 0 : let mut child = Command::new(&initdb_path)
456 0 : .envs(vec![
457 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
458 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
459 0 : ])
460 0 : .args(initdb_args)
461 0 : .spawn()
462 0 : .expect("Failed to spawn initdb");
463 0 : let status = child.wait().await?;
464 0 : if !status.success() {
465 0 : anyhow::bail!("initdb failed with status {status}");
466 0 : }
467 0 : };
468 :
469 : // Write a minimal config file:
470 : // - Specify the port, since this is chosen dynamically
471 : // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
472 : // the storage controller we don't want a slow local disk to interfere with that.
473 : //
474 : // NB: it's important that we rewrite this file on each start command so we propagate changes
475 : // from `LocalEnv`'s config file (`.neon/config`).
476 0 : tokio::fs::write(
477 0 : &pg_data_path.join("postgresql.conf"),
478 0 : format!("port = {postgres_port}\nfsync=off\n"),
479 0 : )
480 0 : .await?;
481 :
482 0 : println!("Starting storage controller database...");
483 0 : let db_start_args = [
484 0 : "-w",
485 0 : "-D",
486 0 : pg_data_path.as_ref(),
487 0 : "-l",
488 0 : pg_log_path.as_ref(),
489 0 : "-U",
490 0 : &username(),
491 0 : "start",
492 0 : ];
493 0 : tracing::info!(
494 0 : "Starting storage controller database with args: {:?}",
495 : db_start_args
496 : );
497 :
498 0 : let db_start_status = self.pg_ctl(db_start_args).await;
499 0 : let start_timeout: Duration = start_args.start_timeout.into();
500 0 : let db_start_deadline = Instant::now() + start_timeout;
501 0 : if !db_start_status.success() {
502 0 : return Err(anyhow::anyhow!(
503 0 : "Failed to start postgres {}",
504 0 : db_start_status.code().unwrap()
505 0 : ));
506 0 : }
507 :
508 : loop {
509 0 : if Instant::now() > db_start_deadline {
510 0 : return Err(anyhow::anyhow!("Timed out waiting for postgres to start"));
511 0 : }
512 :
513 0 : match self.pg_isready(&pg_bin_dir, postgres_port).await {
514 : Ok(true) => {
515 0 : tracing::info!("storage controller postgres is now ready");
516 0 : break;
517 : }
518 : Ok(false) => {
519 0 : tokio::time::sleep(Duration::from_millis(100)).await;
520 : }
521 0 : Err(e) => {
522 0 : tracing::warn!("Failed to check postgres status: {e}")
523 : }
524 : }
525 : }
526 :
527 0 : self.setup_database(postgres_port).await?;
528 0 : }
529 :
530 0 : let database_url = format!("postgresql://localhost:{postgres_port}/{DB_NAME}");
531 :
532 : // We support running a startup SQL script to fiddle with the database before we launch storcon.
533 : // This is used by the test suite.
534 0 : let startup_script_path = self
535 0 : .env
536 0 : .base_data_dir
537 0 : .join("storage_controller_db.startup.sql");
538 0 : let startup_script = match tokio::fs::read_to_string(&startup_script_path).await {
539 0 : Ok(script) => {
540 0 : tokio::fs::remove_file(startup_script_path).await?;
541 0 : script
542 : }
543 0 : Err(e) => {
544 0 : if e.kind() == std::io::ErrorKind::NotFound {
545 : // always run some startup script so that this code path doesn't bit rot
546 0 : "BEGIN; COMMIT;".to_string()
547 : } else {
548 0 : anyhow::bail!("Failed to read startup script: {e}")
549 : }
550 : }
551 : };
552 0 : let (mut client, conn) = self.connect_to_database(postgres_port).await?;
553 0 : let conn = tokio::spawn(conn);
554 0 : let tx = client.build_transaction();
555 0 : let tx = tx.start().await?;
556 0 : tx.batch_execute(&startup_script).await?;
557 0 : tx.commit().await?;
558 0 : drop(client);
559 0 : conn.await??;
560 :
561 0 : let addr = format!("{host}:{listen_port}");
562 0 : let address_for_peers = Uri::builder()
563 0 : .scheme(scheme)
564 0 : .authority(addr.clone())
565 0 : .path_and_query("")
566 0 : .build()
567 0 : .unwrap();
568 :
569 0 : let mut args = vec![
570 : "--dev",
571 0 : "--database-url",
572 0 : &database_url,
573 0 : "--max-offline-interval",
574 0 : &humantime::Duration::from(self.config.max_offline).to_string(),
575 0 : "--max-warming-up-interval",
576 0 : &humantime::Duration::from(self.config.max_warming_up).to_string(),
577 0 : "--heartbeat-interval",
578 0 : &humantime::Duration::from(self.config.heartbeat_interval).to_string(),
579 0 : "--address-for-peers",
580 0 : &address_for_peers.to_string(),
581 : ]
582 0 : .into_iter()
583 0 : .map(|s| s.to_string())
584 0 : .collect::<Vec<_>>();
585 :
586 0 : match scheme {
587 0 : "http" => args.extend(["--listen".to_string(), addr]),
588 0 : "https" => args.extend(["--listen-https".to_string(), addr]),
589 : _ => {
590 0 : panic!("Unexpected url scheme in control_plane_api: {scheme}");
591 : }
592 : }
593 :
594 0 : if self.config.start_as_candidate {
595 0 : args.push("--start-as-candidate".to_string());
596 0 : }
597 :
598 0 : if self.config.use_https_pageserver_api {
599 0 : args.push("--use-https-pageserver-api".to_string());
600 0 : }
601 :
602 0 : if self.config.use_https_safekeeper_api {
603 0 : args.push("--use-https-safekeeper-api".to_string());
604 0 : }
605 :
606 0 : if self.config.use_local_compute_notifications {
607 0 : args.push("--use-local-compute-notifications".to_string());
608 0 : }
609 :
610 0 : if let Some(value) = self.config.kick_secondary_downloads {
611 0 : args.push(format!("--kick-secondary-downloads={value}"));
612 0 : }
613 :
614 0 : if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
615 0 : args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
616 0 : }
617 :
618 0 : if let Some(private_key) = &self.private_key {
619 0 : let claims = Claims::new(None, Scope::PageServerApi);
620 0 : if let StorageControllerPrivateKey::HadronPrivateKey(key_path, _) = private_key {
621 0 : args.push(format!("--private-key-path={key_path}"));
622 0 : }
623 : // We are setting all JWT tokens for Hadron as well in this test to avoid bifurcation between Neon and
624 : // Hadron test cases. In production we do not need to set this as HTTP auth is not enabled on the
625 : // pageserver. We use network segmentation to ensure that only trusted components can talk to
626 : // pageserver's http port
627 0 : let jwt_token = private_key.encode_token(&claims)?;
628 0 : args.push(format!("--jwt-token={jwt_token}"));
629 :
630 0 : let peer_claims = Claims::new(None, Scope::Admin);
631 0 : let peer_jwt_token = private_key
632 0 : .encode_token(&peer_claims)
633 0 : .expect("failed to generate jwt token");
634 0 : args.push(format!("--peer-jwt-token={peer_jwt_token}"));
635 :
636 0 : let claims = Claims::new(None, Scope::SafekeeperData);
637 0 : let jwt_token = private_key
638 0 : .encode_token(&claims)
639 0 : .expect("failed to generate jwt token");
640 0 : args.push(format!("--safekeeper-jwt-token={jwt_token}"));
641 0 : }
642 :
643 0 : if let Some(public_key) = &self.public_key {
644 0 : match public_key {
645 0 : StorageControllerPublicKey::RawPublicKey(public_key) => {
646 0 : args.push(format!("--public-key=\"{public_key}\""));
647 0 : }
648 0 : StorageControllerPublicKey::PublicKeyCertPath(public_key_path) => {
649 0 : args.push(format!("--public-key-cert-path={public_key_path}"));
650 0 : }
651 : }
652 0 : }
653 :
654 0 : if let Some(control_plane_hooks_api) = &self.env.control_plane_hooks_api {
655 0 : args.push(format!("--control-plane-url={control_plane_hooks_api}"));
656 0 : }
657 :
658 0 : if let Some(split_threshold) = self.config.split_threshold.as_ref() {
659 0 : args.push(format!("--split-threshold={split_threshold}"))
660 0 : }
661 :
662 0 : if let Some(max_split_shards) = self.config.max_split_shards.as_ref() {
663 0 : args.push(format!("--max-split-shards={max_split_shards}"))
664 0 : }
665 :
666 0 : if let Some(initial_split_threshold) = self.config.initial_split_threshold.as_ref() {
667 0 : args.push(format!(
668 0 : "--initial-split-threshold={initial_split_threshold}"
669 : ))
670 0 : }
671 :
672 0 : if let Some(initial_split_shards) = self.config.initial_split_shards.as_ref() {
673 0 : args.push(format!("--initial-split-shards={initial_split_shards}"))
674 0 : }
675 :
676 0 : if let Some(lag) = self.config.max_secondary_lag_bytes.as_ref() {
677 0 : args.push(format!("--max-secondary-lag-bytes={lag}"))
678 0 : }
679 :
680 0 : if let Some(threshold) = self.config.long_reconcile_threshold {
681 0 : args.push(format!(
682 0 : "--long-reconcile-threshold={}",
683 0 : humantime::Duration::from(threshold)
684 : ))
685 0 : }
686 :
687 0 : args.push(format!(
688 0 : "--neon-local-repo-dir={}",
689 0 : self.env.base_data_dir.display()
690 : ));
691 :
692 0 : if self
693 0 : .env
694 0 : .safekeepers
695 0 : .iter()
696 0 : .any(|sk| sk.auth_type != AuthType::Trust)
697 0 : && self.private_key.is_none()
698 : {
699 0 : anyhow::bail!("Safekeeper set up for auth but no private key specified");
700 0 : }
701 :
702 0 : if self.config.timelines_onto_safekeepers {
703 0 : args.push("--timelines-onto-safekeepers".to_string());
704 0 : }
705 :
706 : // neon_local is used in test environments where we often have less than 3 safekeepers.
707 0 : if self.config.timeline_safekeeper_count.is_some() || self.env.safekeepers.len() < 3 {
708 0 : let sk_cnt = self
709 0 : .config
710 0 : .timeline_safekeeper_count
711 0 : .unwrap_or(self.env.safekeepers.len());
712 0 :
713 0 : args.push(format!("--timeline-safekeeper-count={sk_cnt}"));
714 0 : }
715 :
716 0 : if let Some(duration) = self.config.shard_split_request_timeout {
717 0 : args.push(format!(
718 0 : "--shard-split-request-timeout={}",
719 0 : humantime::Duration::from(duration)
720 0 : ));
721 0 : }
722 :
723 0 : let mut envs = vec![
724 0 : ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
725 0 : ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
726 : ];
727 :
728 0 : if let Some(posthog_config) = &self.config.posthog_config {
729 0 : envs.push((
730 0 : "POSTHOG_CONFIG".to_string(),
731 0 : serde_json::to_string(posthog_config)?,
732 : ));
733 0 : }
734 :
735 0 : println!("Starting storage controller at {scheme}://{host}:{listen_port}");
736 :
737 0 : if start_args.handle_ps_local_disk_loss.unwrap_or_default() {
738 0 : args.push("--handle-ps-local-disk-loss".to_string());
739 0 : }
740 :
741 0 : background_process::start_process(
742 0 : COMMAND,
743 0 : &instance_dir,
744 0 : &self.env.storage_controller_bin(),
745 0 : args,
746 0 : envs,
747 0 : background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)),
748 0 : &start_args.start_timeout,
749 0 : || async {
750 0 : match self.ready().await {
751 0 : Ok(_) => Ok(true),
752 0 : Err(_) => Ok(false),
753 : }
754 0 : },
755 : )
756 0 : .await?;
757 :
758 0 : if self.config.timelines_onto_safekeepers {
759 0 : self.register_safekeepers().await?;
760 0 : }
761 :
762 0 : Ok(())
763 0 : }
764 :
765 0 : pub async fn stop(&self, stop_args: NeonStorageControllerStopArgs) -> anyhow::Result<()> {
766 0 : background_process::stop_process(
767 0 : stop_args.immediate,
768 0 : COMMAND,
769 0 : &self.pid_file(stop_args.instance_id),
770 0 : )?;
771 :
772 0 : let storcon_instances = self.env.storage_controller_instances().await?;
773 0 : for (instance_id, instanced_dir_path) in storcon_instances {
774 0 : if instance_id == stop_args.instance_id {
775 0 : continue;
776 0 : }
777 :
778 0 : let pid_file = instanced_dir_path.join("storage_controller.pid");
779 0 : let pid = tokio::fs::read_to_string(&pid_file)
780 0 : .await
781 0 : .map_err(|err| {
782 0 : anyhow::anyhow!("Failed to read storcon pid file at {pid_file:?}: {err}")
783 0 : })?
784 0 : .parse::<i32>()
785 0 : .expect("pid is valid i32");
786 :
787 0 : let other_proc_alive = !background_process::process_has_stopped(Pid::from_raw(pid))?;
788 0 : if other_proc_alive {
789 : // There is another storage controller instance running, so we return
790 : // and leave the database running.
791 0 : return Ok(());
792 0 : }
793 : }
794 :
795 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
796 :
797 0 : println!("Stopping storage controller database...");
798 0 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
799 0 : let stop_status = self.pg_ctl(pg_stop_args).await;
800 0 : if !stop_status.success() {
801 0 : match self.is_postgres_running().await {
802 : Ok(false) => {
803 0 : println!("Storage controller database is already stopped");
804 0 : return Ok(());
805 : }
806 : Ok(true) => {
807 0 : anyhow::bail!("Failed to stop storage controller database");
808 : }
809 0 : Err(err) => {
810 0 : anyhow::bail!("Failed to stop storage controller database: {err}");
811 : }
812 : }
813 0 : }
814 :
815 0 : Ok(())
816 0 : }
817 :
818 0 : async fn is_postgres_running(&self) -> anyhow::Result<bool> {
819 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
820 :
821 0 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
822 0 : let status_exitcode = self.pg_ctl(pg_status_args).await;
823 :
824 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
825 : // fine that stop failed. Otherwise it is an error that stop failed.
826 : const PG_STATUS_NOT_RUNNING: i32 = 3;
827 : const PG_NO_DATA_DIR: i32 = 4;
828 : const PG_STATUS_RUNNING: i32 = 0;
829 0 : match status_exitcode.code() {
830 0 : Some(PG_STATUS_NOT_RUNNING) => Ok(false),
831 0 : Some(PG_NO_DATA_DIR) => Ok(false),
832 0 : Some(PG_STATUS_RUNNING) => Ok(true),
833 0 : Some(code) => Err(anyhow::anyhow!(
834 0 : "pg_ctl status returned unexpected status code: {:?}",
835 0 : code
836 0 : )),
837 0 : None => Err(anyhow::anyhow!("pg_ctl status returned no status code")),
838 : }
839 0 : }
840 :
841 0 : fn get_claims_for_path(path: &str) -> anyhow::Result<Option<Claims>> {
842 0 : let category = match path.find('/') {
843 0 : Some(idx) => &path[..idx],
844 0 : None => path,
845 : };
846 :
847 0 : match category {
848 0 : "status" | "ready" => Ok(None),
849 0 : "control" | "debug" => Ok(Some(Claims::new(None, Scope::Admin))),
850 0 : "v1" => Ok(Some(Claims::new(None, Scope::PageServerApi))),
851 0 : _ => Err(anyhow::anyhow!("Failed to determine claims for {}", path)),
852 : }
853 0 : }
854 :
855 : /// Simple HTTP request wrapper for calling into storage controller
856 0 : async fn dispatch<RQ, RS>(
857 0 : &self,
858 0 : method: reqwest::Method,
859 0 : path: String,
860 0 : body: Option<RQ>,
861 0 : ) -> anyhow::Result<RS>
862 0 : where
863 0 : RQ: Serialize + Sized,
864 0 : RS: DeserializeOwned + Sized,
865 0 : {
866 0 : let response = self.dispatch_inner(method, path, body).await?;
867 0 : Ok(response
868 0 : .json()
869 0 : .await
870 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
871 0 : }
872 :
873 : /// Simple HTTP request wrapper for calling into storage controller
874 0 : async fn dispatch_inner<RQ>(
875 0 : &self,
876 0 : method: reqwest::Method,
877 0 : path: String,
878 0 : body: Option<RQ>,
879 0 : ) -> anyhow::Result<Response>
880 0 : where
881 0 : RQ: Serialize + Sized,
882 0 : {
883 : // In the special case of the `storage_controller start` subcommand, we wish
884 : // to use the API endpoint of the newly started storage controller in order
885 : // to pass the readiness check. In this scenario [`Self::listen_port`] will
886 : // be set (see [`Self::start`]).
887 : //
888 : // Otherwise, we infer the storage controller api endpoint from the configured
889 : // control plane API.
890 0 : let port = if let Some(port) = self.listen_port.get() {
891 0 : *port
892 : } else {
893 0 : self.env.control_plane_api.port().unwrap()
894 : };
895 :
896 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
897 : // for general purpose API access.
898 0 : let url = Url::from_str(&format!(
899 0 : "{}://{}:{port}/{path}",
900 0 : self.env.control_plane_api.scheme(),
901 0 : self.env.control_plane_api.host_str().unwrap(),
902 0 : ))
903 0 : .unwrap();
904 :
905 0 : let mut builder = self.client.request(method, url);
906 0 : if let Some(body) = body {
907 0 : builder = builder.json(&body)
908 0 : }
909 0 : if let Some(private_key) = &self.private_key {
910 0 : println!("Getting claims for path {path}");
911 0 : if let Some(required_claims) = Self::get_claims_for_path(&path)? {
912 0 : println!("Got claims {required_claims:?} for path {path}");
913 0 : let jwt_token = private_key.encode_token(&required_claims)?;
914 0 : builder = builder.header(
915 0 : reqwest::header::AUTHORIZATION,
916 0 : format!("Bearer {jwt_token}"),
917 : );
918 0 : }
919 0 : }
920 :
921 0 : let response = builder.send().await?;
922 0 : let response = response.error_from_body().await?;
923 :
924 0 : Ok(response)
925 0 : }
926 :
927 : /// Register the safekeepers in the storage controller
928 : #[instrument(skip(self))]
929 : async fn register_safekeepers(&self) -> anyhow::Result<()> {
930 : for sk in self.env.safekeepers.iter() {
931 : let sk_id = sk.id;
932 : let body = serde_json::json!({
933 : "id": sk_id,
934 : "created_at": "2023-10-25T09:11:25Z",
935 : "updated_at": "2024-08-28T11:32:43Z",
936 : "region_id": "aws-us-east-2",
937 : "host": "127.0.0.1",
938 : "port": sk.pg_port,
939 : "http_port": sk.http_port,
940 : "https_port": sk.https_port,
941 : "version": 5957,
942 : "availability_zone_id": format!("us-east-2b-{sk_id}"),
943 : });
944 : self.upsert_safekeeper(sk_id, body).await?;
945 : self.safekeeper_scheduling_policy(sk_id, SkSchedulingPolicy::Active)
946 : .await?;
947 : }
948 : Ok(())
949 : }
950 :
951 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
952 : #[instrument(skip(self))]
953 : pub async fn attach_hook(
954 : &self,
955 : tenant_shard_id: TenantShardId,
956 : pageserver_id: NodeId,
957 : ) -> anyhow::Result<Option<u32>> {
958 : let request = AttachHookRequest {
959 : tenant_shard_id,
960 : node_id: Some(pageserver_id),
961 : generation_override: None,
962 : config: None,
963 : };
964 :
965 : let response = self
966 : .dispatch::<_, AttachHookResponse>(
967 : Method::POST,
968 : "debug/v1/attach-hook".to_string(),
969 : Some(request),
970 : )
971 : .await?;
972 :
973 : Ok(response.generation)
974 : }
975 :
976 : #[instrument(skip(self))]
977 : pub async fn upsert_safekeeper(
978 : &self,
979 : node_id: NodeId,
980 : request: serde_json::Value,
981 : ) -> anyhow::Result<()> {
982 : let resp = self
983 : .dispatch_inner::<serde_json::Value>(
984 : Method::POST,
985 : format!("control/v1/safekeeper/{node_id}"),
986 : Some(request),
987 : )
988 : .await?;
989 : if !resp.status().is_success() {
990 : anyhow::bail!(
991 : "setting scheduling policy unsuccessful for safekeeper {node_id}: {}",
992 : resp.status()
993 : );
994 : }
995 : Ok(())
996 : }
997 :
998 : #[instrument(skip(self))]
999 : pub async fn safekeeper_scheduling_policy(
1000 : &self,
1001 : node_id: NodeId,
1002 : scheduling_policy: SkSchedulingPolicy,
1003 : ) -> anyhow::Result<()> {
1004 : self.dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
1005 : Method::POST,
1006 : format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
1007 : Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
1008 : )
1009 : .await
1010 : }
1011 :
1012 : #[instrument(skip(self))]
1013 : pub async fn inspect(
1014 : &self,
1015 : tenant_shard_id: TenantShardId,
1016 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
1017 : let request = InspectRequest { tenant_shard_id };
1018 :
1019 : let response = self
1020 : .dispatch::<_, InspectResponse>(
1021 : Method::POST,
1022 : "debug/v1/inspect".to_string(),
1023 : Some(request),
1024 : )
1025 : .await?;
1026 :
1027 : Ok(response.attachment)
1028 : }
1029 :
1030 : #[instrument(skip(self))]
1031 : pub async fn tenant_create(
1032 : &self,
1033 : req: TenantCreateRequest,
1034 : ) -> anyhow::Result<TenantCreateResponse> {
1035 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
1036 : .await
1037 : }
1038 :
1039 : #[instrument(skip(self))]
1040 : pub async fn tenant_import(&self, tenant_id: TenantId) -> anyhow::Result<TenantCreateResponse> {
1041 : self.dispatch::<(), TenantCreateResponse>(
1042 : Method::POST,
1043 : format!("debug/v1/tenant/{tenant_id}/import"),
1044 : None,
1045 : )
1046 : .await
1047 : }
1048 :
1049 : #[instrument(skip(self))]
1050 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
1051 : self.dispatch::<(), _>(
1052 : Method::GET,
1053 : format!("debug/v1/tenant/{tenant_id}/locate"),
1054 : None,
1055 : )
1056 : .await
1057 : }
1058 :
1059 : #[instrument(skip_all, fields(node_id=%req.node_id))]
1060 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
1061 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
1062 : .await
1063 : }
1064 :
1065 : #[instrument(skip_all, fields(node_id=%req.node_id))]
1066 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
1067 : self.dispatch::<_, ()>(
1068 : Method::PUT,
1069 : format!("control/v1/node/{}/config", req.node_id),
1070 : Some(req),
1071 : )
1072 : .await
1073 : }
1074 :
1075 0 : pub async fn node_list(&self) -> anyhow::Result<Vec<NodeDescribeResponse>> {
1076 0 : self.dispatch::<(), Vec<NodeDescribeResponse>>(
1077 0 : Method::GET,
1078 0 : "control/v1/node".to_string(),
1079 0 : None,
1080 0 : )
1081 0 : .await
1082 0 : }
1083 :
1084 : #[instrument(skip(self))]
1085 : pub async fn ready(&self) -> anyhow::Result<()> {
1086 : self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None)
1087 : .await
1088 : }
1089 :
1090 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
1091 : pub async fn tenant_timeline_create(
1092 : &self,
1093 : tenant_id: TenantId,
1094 : req: TimelineCreateRequest,
1095 : ) -> anyhow::Result<TimelineInfo> {
1096 : self.dispatch(
1097 : Method::POST,
1098 : format!("v1/tenant/{tenant_id}/timeline"),
1099 : Some(req),
1100 : )
1101 : .await
1102 : }
1103 :
1104 0 : pub async fn set_tenant_config(&self, req: &TenantConfigRequest) -> anyhow::Result<()> {
1105 0 : self.dispatch(Method::PUT, "v1/tenant/config".to_string(), Some(req))
1106 0 : .await
1107 0 : }
1108 : }
|