Line data Source code
1 : use crate::{
2 : background_process,
3 : local_env::{LocalEnv, NeonStorageControllerConf},
4 : };
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use pageserver_api::{
7 : controller_api::{
8 : NodeConfigureRequest, NodeRegisterRequest, TenantCreateResponse, TenantLocateResponse,
9 : TenantShardMigrateRequest, TenantShardMigrateResponse,
10 : },
11 : models::{
12 : TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
13 : TimelineCreateRequest, TimelineInfo,
14 : },
15 : shard::{ShardStripeSize, TenantShardId},
16 : };
17 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
18 : use postgres_backend::AuthType;
19 : use reqwest::Method;
20 : use serde::{de::DeserializeOwned, Deserialize, Serialize};
21 : use std::{fs, str::FromStr};
22 : use tokio::process::Command;
23 : use tracing::instrument;
24 : use url::Url;
25 : use utils::{
26 : auth::{encode_from_key_file, Claims, Scope},
27 : id::{NodeId, TenantId},
28 : };
29 :
30 : pub struct StorageController {
31 : env: LocalEnv,
32 : listen: String,
33 : path: Utf8PathBuf,
34 : private_key: Option<Vec<u8>>,
35 : public_key: Option<String>,
36 : postgres_port: u16,
37 : client: reqwest::Client,
38 : config: NeonStorageControllerConf,
39 : }
40 :
41 : const COMMAND: &str = "storage_controller";
42 :
43 : const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
44 :
45 0 : #[derive(Serialize, Deserialize)]
46 : pub struct AttachHookRequest {
47 : pub tenant_shard_id: TenantShardId,
48 : pub node_id: Option<NodeId>,
49 : }
50 :
51 0 : #[derive(Serialize, Deserialize)]
52 : pub struct AttachHookResponse {
53 : pub gen: Option<u32>,
54 : }
55 :
56 0 : #[derive(Serialize, Deserialize)]
57 : pub struct InspectRequest {
58 : pub tenant_shard_id: TenantShardId,
59 : }
60 :
61 0 : #[derive(Serialize, Deserialize)]
62 : pub struct InspectResponse {
63 : pub attachment: Option<(u32, NodeId)>,
64 : }
65 :
66 : impl StorageController {
67 0 : pub fn from_env(env: &LocalEnv) -> Self {
68 0 : let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
69 0 : .unwrap()
70 0 : .join("attachments.json");
71 0 :
72 0 : // Makes no sense to construct this if pageservers aren't going to use it: assume
73 0 : // pageservers have control plane API set
74 0 : let listen_url = env.control_plane_api.clone().unwrap();
75 0 :
76 0 : let listen = format!(
77 0 : "{}:{}",
78 0 : listen_url.host_str().unwrap(),
79 0 : listen_url.port().unwrap()
80 0 : );
81 0 :
82 0 : // Convention: NeonEnv in python tests reserves the next port after the control_plane_api
83 0 : // port, for use by our captive postgres.
84 0 : let postgres_port = listen_url
85 0 : .port()
86 0 : .expect("Control plane API setting should always have a port")
87 0 : + 1;
88 0 :
89 0 : // Assume all pageservers have symmetric auth configuration: this service
90 0 : // expects to use one JWT token to talk to all of them.
91 0 : let ps_conf = env
92 0 : .pageservers
93 0 : .first()
94 0 : .expect("Config is validated to contain at least one pageserver");
95 0 : let (private_key, public_key) = match ps_conf.http_auth_type {
96 0 : AuthType::Trust => (None, None),
97 : AuthType::NeonJWT => {
98 0 : let private_key_path = env.get_private_key_path();
99 0 : let private_key = fs::read(private_key_path).expect("failed to read private key");
100 0 :
101 0 : // If pageserver auth is enabled, this implicitly enables auth for this service,
102 0 : // using the same credentials.
103 0 : let public_key_path =
104 0 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
105 0 : .unwrap();
106 :
107 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
108 0 : let public_key = if std::fs::metadata(&public_key_path)
109 0 : .expect("Can't stat public key")
110 0 : .is_dir()
111 : {
112 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
113 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
114 0 : let mut dir =
115 0 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
116 0 : let dent = dir
117 0 : .next()
118 0 : .expect("Empty key dir")
119 0 : .expect("Error reading key dir");
120 0 :
121 0 : std::fs::read_to_string(dent.path()).expect("Can't read public key")
122 : } else {
123 0 : std::fs::read_to_string(&public_key_path).expect("Can't read public key")
124 : };
125 0 : (Some(private_key), Some(public_key))
126 : }
127 : };
128 :
129 0 : Self {
130 0 : env: env.clone(),
131 0 : path,
132 0 : listen,
133 0 : private_key,
134 0 : public_key,
135 0 : postgres_port,
136 0 : client: reqwest::ClientBuilder::new()
137 0 : .build()
138 0 : .expect("Failed to construct http client"),
139 0 : config: env.storage_controller.clone(),
140 0 : }
141 0 : }
142 :
143 0 : fn pid_file(&self) -> Utf8PathBuf {
144 0 : Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_controller.pid"))
145 0 : .expect("non-Unicode path")
146 0 : }
147 :
148 : /// PIDFile for the postgres instance used to store storage controller state
149 0 : fn postgres_pid_file(&self) -> Utf8PathBuf {
150 0 : Utf8PathBuf::from_path_buf(
151 0 : self.env
152 0 : .base_data_dir
153 0 : .join("storage_controller_postgres.pid"),
154 0 : )
155 0 : .expect("non-Unicode path")
156 0 : }
157 :
158 : /// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
159 : ///
160 : /// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
161 : /// to other versions if that one isn't found. Some automated tests create circumstances
162 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
163 0 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
164 0 : let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14];
165 :
166 0 : for v in prefer_versions {
167 0 : let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
168 0 : if tokio::fs::try_exists(&path).await? {
169 0 : return Ok(path);
170 0 : }
171 : }
172 :
173 : // Fall through
174 0 : anyhow::bail!(
175 0 : "Postgres binaries not found in {}",
176 0 : self.env.pg_distrib_dir.display()
177 0 : );
178 0 : }
179 :
180 : /// Readiness check for our postgres process
181 0 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
182 0 : let bin_path = pg_bin_dir.join("pg_isready");
183 0 : let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
184 0 : let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
185 :
186 0 : Ok(exitcode.success())
187 0 : }
188 :
189 : /// Create our database if it doesn't exist, and run migrations.
190 : ///
191 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
192 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
193 : /// who just want to run `cargo neon_local` without knowing about diesel.
194 : ///
195 : /// Returns the database url
196 0 : pub async fn setup_database(&self) -> anyhow::Result<String> {
197 0 : const DB_NAME: &str = "storage_controller";
198 0 : let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
199 :
200 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
201 0 : let createdb_path = pg_bin_dir.join("createdb");
202 0 : let output = Command::new(&createdb_path)
203 0 : .args([
204 0 : "-h",
205 0 : "localhost",
206 0 : "-p",
207 0 : &format!("{}", self.postgres_port),
208 0 : DB_NAME,
209 0 : ])
210 0 : .output()
211 0 : .await
212 0 : .expect("Failed to spawn createdb");
213 0 :
214 0 : if !output.status.success() {
215 0 : let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
216 0 : if stderr.contains("already exists") {
217 0 : tracing::info!("Database {DB_NAME} already exists");
218 : } else {
219 0 : anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
220 : }
221 0 : }
222 :
223 0 : Ok(database_url)
224 0 : }
225 :
226 0 : pub async fn start(&self) -> anyhow::Result<()> {
227 0 : // Start a vanilla Postgres process used by the storage controller for persistence.
228 0 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
229 0 : .unwrap()
230 0 : .join("storage_controller_db");
231 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
232 0 : let pg_log_path = pg_data_path.join("postgres.log");
233 0 :
234 0 : if !tokio::fs::try_exists(&pg_data_path).await? {
235 : // Initialize empty database
236 0 : let initdb_path = pg_bin_dir.join("initdb");
237 0 : let mut child = Command::new(&initdb_path)
238 0 : .args(["-D", pg_data_path.as_ref()])
239 0 : .spawn()
240 0 : .expect("Failed to spawn initdb");
241 0 : let status = child.wait().await?;
242 0 : if !status.success() {
243 0 : anyhow::bail!("initdb failed with status {status}");
244 0 : }
245 0 :
246 0 : // Write a minimal config file:
247 0 : // - Specify the port, since this is chosen dynamically
248 0 : // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
249 0 : // the storage controller we don't want a slow local disk to interfere with that.
250 0 : tokio::fs::write(
251 0 : &pg_data_path.join("postgresql.conf"),
252 0 : format!("port = {}\nfsync=off\n", self.postgres_port),
253 0 : )
254 0 : .await?;
255 0 : };
256 :
257 0 : println!("Starting storage controller database...");
258 0 : let db_start_args = [
259 0 : "-w",
260 0 : "-D",
261 0 : pg_data_path.as_ref(),
262 0 : "-l",
263 0 : pg_log_path.as_ref(),
264 0 : "start",
265 0 : ];
266 0 :
267 0 : background_process::start_process(
268 0 : "storage_controller_db",
269 0 : &self.env.base_data_dir,
270 0 : pg_bin_dir.join("pg_ctl").as_std_path(),
271 0 : db_start_args,
272 0 : [],
273 0 : background_process::InitialPidFile::Create(self.postgres_pid_file()),
274 0 : || self.pg_isready(&pg_bin_dir),
275 0 : )
276 0 : .await?;
277 :
278 : // Run migrations on every startup, in case something changed.
279 0 : let database_url = self.setup_database().await?;
280 :
281 0 : let mut args = vec![
282 0 : "-l",
283 0 : &self.listen,
284 0 : "-p",
285 0 : self.path.as_ref(),
286 0 : "--dev",
287 0 : "--database-url",
288 0 : &database_url,
289 0 : "--max-unavailable-interval",
290 0 : &humantime::Duration::from(self.config.max_unavailable).to_string(),
291 0 : ]
292 0 : .into_iter()
293 0 : .map(|s| s.to_string())
294 0 : .collect::<Vec<_>>();
295 0 : if let Some(private_key) = &self.private_key {
296 0 : let claims = Claims::new(None, Scope::PageServerApi);
297 0 : let jwt_token =
298 0 : encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
299 0 : args.push(format!("--jwt-token={jwt_token}"));
300 0 : }
301 :
302 0 : if let Some(public_key) = &self.public_key {
303 0 : args.push(format!("--public-key=\"{public_key}\""));
304 0 : }
305 :
306 0 : if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
307 0 : args.push(format!(
308 0 : "--compute-hook-url={control_plane_compute_hook_api}"
309 0 : ));
310 0 : }
311 :
312 0 : if let Some(split_threshold) = self.config.split_threshold.as_ref() {
313 0 : args.push(format!("--split-threshold={split_threshold}"))
314 0 : }
315 :
316 0 : background_process::start_process(
317 0 : COMMAND,
318 0 : &self.env.base_data_dir,
319 0 : &self.env.storage_controller_bin(),
320 0 : args,
321 0 : [(
322 0 : "NEON_REPO_DIR".to_string(),
323 0 : self.env.base_data_dir.to_string_lossy().to_string(),
324 0 : )],
325 0 : background_process::InitialPidFile::Create(self.pid_file()),
326 0 : || async {
327 0 : match self.ready().await {
328 0 : Ok(_) => Ok(true),
329 0 : Err(_) => Ok(false),
330 0 : }
331 0 : },
332 0 : )
333 0 : .await?;
334 :
335 0 : Ok(())
336 0 : }
337 :
338 0 : pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
339 0 : background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
340 :
341 0 : let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
342 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
343 :
344 0 : println!("Stopping storage controller database...");
345 0 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
346 0 : let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
347 0 : .args(pg_stop_args)
348 0 : .spawn()?
349 0 : .wait()
350 0 : .await?;
351 0 : if !stop_status.success() {
352 0 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
353 0 : let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
354 0 : .args(pg_status_args)
355 0 : .spawn()?
356 0 : .wait()
357 0 : .await?;
358 :
359 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
360 : // fine that stop failed. Otherwise it is an error that stop failed.
361 : const PG_STATUS_NOT_RUNNING: i32 = 3;
362 0 : if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
363 0 : println!("Storage controller database is already stopped");
364 0 : return Ok(());
365 : } else {
366 0 : anyhow::bail!("Failed to stop storage controller database: {stop_status}")
367 : }
368 0 : }
369 0 :
370 0 : Ok(())
371 0 : }
372 :
373 0 : fn get_claims_for_path(path: &str) -> anyhow::Result<Option<Claims>> {
374 0 : let category = match path.find('/') {
375 0 : Some(idx) => &path[..idx],
376 0 : None => path,
377 : };
378 :
379 0 : match category {
380 0 : "status" | "ready" => Ok(None),
381 0 : "control" | "debug" => Ok(Some(Claims::new(None, Scope::Admin))),
382 0 : "v1" => Ok(Some(Claims::new(None, Scope::PageServerApi))),
383 0 : _ => Err(anyhow::anyhow!("Failed to determine claims for {}", path)),
384 : }
385 0 : }
386 :
387 : /// Simple HTTP request wrapper for calling into storage controller
388 0 : async fn dispatch<RQ, RS>(
389 0 : &self,
390 0 : method: reqwest::Method,
391 0 : path: String,
392 0 : body: Option<RQ>,
393 0 : ) -> anyhow::Result<RS>
394 0 : where
395 0 : RQ: Serialize + Sized,
396 0 : RS: DeserializeOwned + Sized,
397 0 : {
398 0 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
399 0 : // for general purpose API access.
400 0 : let listen_url = self.env.control_plane_api.clone().unwrap();
401 0 : let url = Url::from_str(&format!(
402 0 : "http://{}:{}/{path}",
403 0 : listen_url.host_str().unwrap(),
404 0 : listen_url.port().unwrap()
405 0 : ))
406 0 : .unwrap();
407 0 :
408 0 : let mut builder = self.client.request(method, url);
409 0 : if let Some(body) = body {
410 0 : builder = builder.json(&body)
411 0 : }
412 0 : if let Some(private_key) = &self.private_key {
413 0 : println!("Getting claims for path {}", path);
414 0 : if let Some(required_claims) = Self::get_claims_for_path(&path)? {
415 0 : println!("Got claims {:?} for path {}", required_claims, path);
416 0 : let jwt_token = encode_from_key_file(&required_claims, private_key)?;
417 0 : builder = builder.header(
418 0 : reqwest::header::AUTHORIZATION,
419 0 : format!("Bearer {jwt_token}"),
420 0 : );
421 0 : }
422 0 : }
423 :
424 0 : let response = builder.send().await?;
425 0 : let response = response.error_from_body().await?;
426 :
427 0 : Ok(response
428 0 : .json()
429 0 : .await
430 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
431 0 : }
432 :
433 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
434 0 : #[instrument(skip(self))]
435 : pub async fn attach_hook(
436 : &self,
437 : tenant_shard_id: TenantShardId,
438 : pageserver_id: NodeId,
439 : ) -> anyhow::Result<Option<u32>> {
440 : let request = AttachHookRequest {
441 : tenant_shard_id,
442 : node_id: Some(pageserver_id),
443 : };
444 :
445 : let response = self
446 : .dispatch::<_, AttachHookResponse>(
447 : Method::POST,
448 : "debug/v1/attach-hook".to_string(),
449 : Some(request),
450 : )
451 : .await?;
452 :
453 : Ok(response.gen)
454 : }
455 :
456 0 : #[instrument(skip(self))]
457 : pub async fn inspect(
458 : &self,
459 : tenant_shard_id: TenantShardId,
460 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
461 : let request = InspectRequest { tenant_shard_id };
462 :
463 : let response = self
464 : .dispatch::<_, InspectResponse>(
465 : Method::POST,
466 : "debug/v1/inspect".to_string(),
467 : Some(request),
468 : )
469 : .await?;
470 :
471 : Ok(response.attachment)
472 : }
473 :
474 0 : #[instrument(skip(self))]
475 : pub async fn tenant_create(
476 : &self,
477 : req: TenantCreateRequest,
478 : ) -> anyhow::Result<TenantCreateResponse> {
479 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
480 : .await
481 : }
482 :
483 0 : #[instrument(skip(self))]
484 : pub async fn tenant_import(&self, tenant_id: TenantId) -> anyhow::Result<TenantCreateResponse> {
485 : self.dispatch::<(), TenantCreateResponse>(
486 : Method::POST,
487 : format!("debug/v1/tenant/{tenant_id}/import"),
488 : None,
489 : )
490 : .await
491 : }
492 :
493 0 : #[instrument(skip(self))]
494 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
495 : self.dispatch::<(), _>(
496 : Method::GET,
497 : format!("debug/v1/tenant/{tenant_id}/locate"),
498 : None,
499 : )
500 : .await
501 : }
502 :
503 0 : #[instrument(skip(self))]
504 : pub async fn tenant_migrate(
505 : &self,
506 : tenant_shard_id: TenantShardId,
507 : node_id: NodeId,
508 : ) -> anyhow::Result<TenantShardMigrateResponse> {
509 : self.dispatch(
510 : Method::PUT,
511 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
512 : Some(TenantShardMigrateRequest {
513 : tenant_shard_id,
514 : node_id,
515 : }),
516 : )
517 : .await
518 : }
519 :
520 0 : #[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
521 : pub async fn tenant_split(
522 : &self,
523 : tenant_id: TenantId,
524 : new_shard_count: u8,
525 : new_stripe_size: Option<ShardStripeSize>,
526 : ) -> anyhow::Result<TenantShardSplitResponse> {
527 : self.dispatch(
528 : Method::PUT,
529 : format!("control/v1/tenant/{tenant_id}/shard_split"),
530 : Some(TenantShardSplitRequest {
531 : new_shard_count,
532 : new_stripe_size,
533 : }),
534 : )
535 : .await
536 : }
537 :
538 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
539 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
540 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
541 : .await
542 : }
543 :
544 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
545 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
546 : self.dispatch::<_, ()>(
547 : Method::PUT,
548 : format!("control/v1/node/{}/config", req.node_id),
549 : Some(req),
550 : )
551 : .await
552 : }
553 :
554 0 : #[instrument(skip(self))]
555 : pub async fn ready(&self) -> anyhow::Result<()> {
556 : self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None)
557 : .await
558 : }
559 :
560 0 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
561 : pub async fn tenant_timeline_create(
562 : &self,
563 : tenant_id: TenantId,
564 : req: TimelineCreateRequest,
565 : ) -> anyhow::Result<TimelineInfo> {
566 : self.dispatch(
567 : Method::POST,
568 : format!("v1/tenant/{tenant_id}/timeline"),
569 : Some(req),
570 : )
571 : .await
572 : }
573 : }
|