Line data Source code
1 : use crate::{background_process, local_env::LocalEnv};
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use hyper::Method;
4 : use pageserver_api::{
5 : controller_api::{
6 : NodeConfigureRequest, NodeRegisterRequest, TenantCreateResponse, TenantLocateResponse,
7 : TenantShardMigrateRequest, TenantShardMigrateResponse,
8 : },
9 : models::{
10 : TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
11 : TimelineCreateRequest, TimelineInfo,
12 : },
13 : shard::TenantShardId,
14 : };
15 : use pageserver_client::mgmt_api::ResponseErrorMessageExt;
16 : use postgres_backend::AuthType;
17 : use serde::{de::DeserializeOwned, Deserialize, Serialize};
18 : use std::{fs, str::FromStr};
19 : use tokio::process::Command;
20 : use tracing::instrument;
21 : use url::Url;
22 : use utils::{
23 : auth::{encode_from_key_file, Claims, Scope},
24 : id::{NodeId, TenantId},
25 : };
26 :
27 : pub struct AttachmentService {
28 : env: LocalEnv,
29 : listen: String,
30 : path: Utf8PathBuf,
31 : private_key: Option<Vec<u8>>,
32 : public_key: Option<String>,
33 : postgres_port: u16,
34 : client: reqwest::Client,
35 : }
36 :
37 : const COMMAND: &str = "attachment_service";
38 :
39 : const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16;
40 :
41 0 : #[derive(Serialize, Deserialize)]
42 : pub struct AttachHookRequest {
43 : pub tenant_shard_id: TenantShardId,
44 : pub node_id: Option<NodeId>,
45 : }
46 :
47 0 : #[derive(Serialize, Deserialize)]
48 : pub struct AttachHookResponse {
49 : pub gen: Option<u32>,
50 : }
51 :
52 0 : #[derive(Serialize, Deserialize)]
53 : pub struct InspectRequest {
54 : pub tenant_shard_id: TenantShardId,
55 : }
56 :
57 0 : #[derive(Serialize, Deserialize)]
58 : pub struct InspectResponse {
59 : pub attachment: Option<(u32, NodeId)>,
60 : }
61 :
62 : impl AttachmentService {
63 0 : pub fn from_env(env: &LocalEnv) -> Self {
64 0 : let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
65 0 : .unwrap()
66 0 : .join("attachments.json");
67 0 :
68 0 : // Makes no sense to construct this if pageservers aren't going to use it: assume
69 0 : // pageservers have control plane API set
70 0 : let listen_url = env.control_plane_api.clone().unwrap();
71 0 :
72 0 : let listen = format!(
73 0 : "{}:{}",
74 0 : listen_url.host_str().unwrap(),
75 0 : listen_url.port().unwrap()
76 0 : );
77 0 :
78 0 : // Convention: NeonEnv in python tests reserves the next port after the control_plane_api
79 0 : // port, for use by our captive postgres.
80 0 : let postgres_port = listen_url
81 0 : .port()
82 0 : .expect("Control plane API setting should always have a port")
83 0 : + 1;
84 0 :
85 0 : // Assume all pageservers have symmetric auth configuration: this service
86 0 : // expects to use one JWT token to talk to all of them.
87 0 : let ps_conf = env
88 0 : .pageservers
89 0 : .first()
90 0 : .expect("Config is validated to contain at least one pageserver");
91 0 : let (private_key, public_key) = match ps_conf.http_auth_type {
92 0 : AuthType::Trust => (None, None),
93 : AuthType::NeonJWT => {
94 0 : let private_key_path = env.get_private_key_path();
95 0 : let private_key = fs::read(private_key_path).expect("failed to read private key");
96 0 :
97 0 : // If pageserver auth is enabled, this implicitly enables auth for this service,
98 0 : // using the same credentials.
99 0 : let public_key_path =
100 0 : camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
101 0 : .unwrap();
102 :
103 : // This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
104 0 : let public_key = if std::fs::metadata(&public_key_path)
105 0 : .expect("Can't stat public key")
106 0 : .is_dir()
107 : {
108 : // Our config may specify a directory: this is for the pageserver's ability to handle multiple
109 : // keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
110 0 : let mut dir =
111 0 : std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
112 0 : let dent = dir
113 0 : .next()
114 0 : .expect("Empty key dir")
115 0 : .expect("Error reading key dir");
116 0 :
117 0 : std::fs::read_to_string(dent.path()).expect("Can't read public key")
118 : } else {
119 0 : std::fs::read_to_string(&public_key_path).expect("Can't read public key")
120 : };
121 0 : (Some(private_key), Some(public_key))
122 : }
123 : };
124 :
125 0 : Self {
126 0 : env: env.clone(),
127 0 : path,
128 0 : listen,
129 0 : private_key,
130 0 : public_key,
131 0 : postgres_port,
132 0 : client: reqwest::ClientBuilder::new()
133 0 : .build()
134 0 : .expect("Failed to construct http client"),
135 0 : }
136 0 : }
137 :
138 0 : fn pid_file(&self) -> Utf8PathBuf {
139 0 : Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("attachment_service.pid"))
140 0 : .expect("non-Unicode path")
141 0 : }
142 :
143 : /// PIDFile for the postgres instance used to store attachment service state
144 0 : fn postgres_pid_file(&self) -> Utf8PathBuf {
145 0 : Utf8PathBuf::from_path_buf(
146 0 : self.env
147 0 : .base_data_dir
148 0 : .join("attachment_service_postgres.pid"),
149 0 : )
150 0 : .expect("non-Unicode path")
151 0 : }
152 :
153 : /// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
154 : ///
155 : /// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
156 : /// to other versions if that one isn't found. Some automated tests create circumstances
157 : /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
158 0 : pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
159 0 : let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14];
160 :
161 0 : for v in prefer_versions {
162 0 : let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
163 0 : if tokio::fs::try_exists(&path).await? {
164 0 : return Ok(path);
165 0 : }
166 : }
167 :
168 : // Fall through
169 0 : anyhow::bail!(
170 0 : "Postgres binaries not found in {}",
171 0 : self.env.pg_distrib_dir.display()
172 0 : );
173 0 : }
174 :
175 : /// Readiness check for our postgres process
176 0 : async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
177 0 : let bin_path = pg_bin_dir.join("pg_isready");
178 0 : let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
179 0 : let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
180 :
181 0 : Ok(exitcode.success())
182 0 : }
183 :
184 : /// Create our database if it doesn't exist, and run migrations.
185 : ///
186 : /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
187 : /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
188 : /// who just want to run `cargo neon_local` without knowing about diesel.
189 : ///
190 : /// Returns the database url
191 0 : pub async fn setup_database(&self) -> anyhow::Result<String> {
192 0 : const DB_NAME: &str = "attachment_service";
193 0 : let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
194 :
195 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
196 0 : let createdb_path = pg_bin_dir.join("createdb");
197 0 : let output = Command::new(&createdb_path)
198 0 : .args([
199 0 : "-h",
200 0 : "localhost",
201 0 : "-p",
202 0 : &format!("{}", self.postgres_port),
203 0 : &DB_NAME,
204 0 : ])
205 0 : .output()
206 0 : .await
207 0 : .expect("Failed to spawn createdb");
208 0 :
209 0 : if !output.status.success() {
210 0 : let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
211 0 : if stderr.contains("already exists") {
212 0 : tracing::info!("Database {DB_NAME} already exists");
213 : } else {
214 0 : anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
215 : }
216 0 : }
217 :
218 0 : Ok(database_url)
219 0 : }
220 :
221 0 : pub async fn start(&self) -> anyhow::Result<()> {
222 0 : // Start a vanilla Postgres process used by the attachment service for persistence.
223 0 : let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
224 0 : .unwrap()
225 0 : .join("attachment_service_db");
226 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
227 0 : let pg_log_path = pg_data_path.join("postgres.log");
228 0 :
229 0 : if !tokio::fs::try_exists(&pg_data_path).await? {
230 : // Initialize empty database
231 0 : let initdb_path = pg_bin_dir.join("initdb");
232 0 : let mut child = Command::new(&initdb_path)
233 0 : .args(["-D", pg_data_path.as_ref()])
234 0 : .spawn()
235 0 : .expect("Failed to spawn initdb");
236 0 : let status = child.wait().await?;
237 0 : if !status.success() {
238 0 : anyhow::bail!("initdb failed with status {status}");
239 0 : }
240 0 :
241 0 : tokio::fs::write(
242 0 : &pg_data_path.join("postgresql.conf"),
243 0 : format!("port = {}", self.postgres_port),
244 0 : )
245 0 : .await?;
246 0 : };
247 :
248 0 : println!("Starting attachment service database...");
249 0 : let db_start_args = [
250 0 : "-w",
251 0 : "-D",
252 0 : pg_data_path.as_ref(),
253 0 : "-l",
254 0 : pg_log_path.as_ref(),
255 0 : "start",
256 0 : ];
257 0 :
258 0 : background_process::start_process(
259 0 : "attachment_service_db",
260 0 : &self.env.base_data_dir,
261 0 : pg_bin_dir.join("pg_ctl").as_std_path(),
262 0 : db_start_args,
263 0 : [],
264 0 : background_process::InitialPidFile::Create(self.postgres_pid_file()),
265 0 : || self.pg_isready(&pg_bin_dir),
266 0 : )
267 0 : .await?;
268 :
269 : // Run migrations on every startup, in case something changed.
270 0 : let database_url = self.setup_database().await?;
271 :
272 0 : let mut args = vec![
273 0 : "-l",
274 0 : &self.listen,
275 0 : "-p",
276 0 : self.path.as_ref(),
277 0 : "--database-url",
278 0 : &database_url,
279 0 : ]
280 0 : .into_iter()
281 0 : .map(|s| s.to_string())
282 0 : .collect::<Vec<_>>();
283 0 : if let Some(private_key) = &self.private_key {
284 0 : let claims = Claims::new(None, Scope::PageServerApi);
285 0 : let jwt_token =
286 0 : encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
287 0 : args.push(format!("--jwt-token={jwt_token}"));
288 0 : }
289 :
290 0 : if let Some(public_key) = &self.public_key {
291 0 : args.push(format!("--public-key=\"{public_key}\""));
292 0 : }
293 :
294 0 : if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
295 0 : args.push(format!(
296 0 : "--compute-hook-url={control_plane_compute_hook_api}"
297 0 : ));
298 0 : }
299 :
300 0 : background_process::start_process(
301 0 : COMMAND,
302 0 : &self.env.base_data_dir,
303 0 : &self.env.attachment_service_bin(),
304 0 : args,
305 0 : [(
306 0 : "NEON_REPO_DIR".to_string(),
307 0 : self.env.base_data_dir.to_string_lossy().to_string(),
308 0 : )],
309 0 : background_process::InitialPidFile::Create(self.pid_file()),
310 0 : || async {
311 0 : match self.ready().await {
312 0 : Ok(_) => Ok(true),
313 0 : Err(_) => Ok(false),
314 : }
315 0 : },
316 0 : )
317 0 : .await?;
318 :
319 0 : Ok(())
320 0 : }
321 :
322 0 : pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
323 0 : background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
324 :
325 0 : let pg_data_path = self.env.base_data_dir.join("attachment_service_db");
326 0 : let pg_bin_dir = self.get_pg_bin_dir().await?;
327 :
328 0 : println!("Stopping attachment service database...");
329 0 : let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
330 0 : let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
331 0 : .args(pg_stop_args)
332 0 : .spawn()?
333 0 : .wait()
334 0 : .await?;
335 0 : if !stop_status.success() {
336 0 : let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
337 0 : let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
338 0 : .args(pg_status_args)
339 0 : .spawn()?
340 0 : .wait()
341 0 : .await?;
342 :
343 : // pg_ctl status returns this exit code if postgres is not running: in this case it is
344 : // fine that stop failed. Otherwise it is an error that stop failed.
345 : const PG_STATUS_NOT_RUNNING: i32 = 3;
346 0 : if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
347 0 : println!("Attachment service data base is already stopped");
348 0 : return Ok(());
349 : } else {
350 0 : anyhow::bail!("Failed to stop attachment service database: {stop_status}")
351 : }
352 0 : }
353 0 :
354 0 : Ok(())
355 0 : }
356 :
357 0 : fn get_claims_for_path(path: &str) -> anyhow::Result<Option<Claims>> {
358 0 : let category = match path.find('/') {
359 0 : Some(idx) => &path[..idx],
360 0 : None => path,
361 : };
362 :
363 0 : match category {
364 0 : "status" | "ready" => Ok(None),
365 0 : "control" | "debug" => Ok(Some(Claims::new(None, Scope::Admin))),
366 0 : "v1" => Ok(Some(Claims::new(None, Scope::PageServerApi))),
367 0 : _ => Err(anyhow::anyhow!("Failed to determine claims for {}", path)),
368 : }
369 0 : }
370 :
371 : /// Simple HTTP request wrapper for calling into attachment service
372 0 : async fn dispatch<RQ, RS>(
373 0 : &self,
374 0 : method: hyper::Method,
375 0 : path: String,
376 0 : body: Option<RQ>,
377 0 : ) -> anyhow::Result<RS>
378 0 : where
379 0 : RQ: Serialize + Sized,
380 0 : RS: DeserializeOwned + Sized,
381 0 : {
382 0 : // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
383 0 : // for general purpose API access.
384 0 : let listen_url = self.env.control_plane_api.clone().unwrap();
385 0 : let url = Url::from_str(&format!(
386 0 : "http://{}:{}/{path}",
387 0 : listen_url.host_str().unwrap(),
388 0 : listen_url.port().unwrap()
389 0 : ))
390 0 : .unwrap();
391 0 :
392 0 : let mut builder = self.client.request(method, url);
393 0 : if let Some(body) = body {
394 0 : builder = builder.json(&body)
395 0 : }
396 0 : if let Some(private_key) = &self.private_key {
397 0 : println!("Getting claims for path {}", path);
398 0 : if let Some(required_claims) = Self::get_claims_for_path(&path)? {
399 0 : println!("Got claims {:?} for path {}", required_claims, path);
400 0 : let jwt_token = encode_from_key_file(&required_claims, private_key)?;
401 0 : builder = builder.header(
402 0 : reqwest::header::AUTHORIZATION,
403 0 : format!("Bearer {jwt_token}"),
404 0 : );
405 0 : }
406 0 : }
407 :
408 0 : let response = builder.send().await?;
409 0 : let response = response.error_from_body().await?;
410 :
411 0 : Ok(response
412 0 : .json()
413 0 : .await
414 0 : .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
415 0 : }
416 :
417 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
418 0 : #[instrument(skip(self))]
419 : pub async fn attach_hook(
420 : &self,
421 : tenant_shard_id: TenantShardId,
422 : pageserver_id: NodeId,
423 : ) -> anyhow::Result<Option<u32>> {
424 : let request = AttachHookRequest {
425 : tenant_shard_id,
426 : node_id: Some(pageserver_id),
427 : };
428 :
429 : let response = self
430 : .dispatch::<_, AttachHookResponse>(
431 : Method::POST,
432 : "debug/v1/attach-hook".to_string(),
433 : Some(request),
434 : )
435 : .await?;
436 :
437 : Ok(response.gen)
438 : }
439 :
440 0 : #[instrument(skip(self))]
441 : pub async fn inspect(
442 : &self,
443 : tenant_shard_id: TenantShardId,
444 : ) -> anyhow::Result<Option<(u32, NodeId)>> {
445 : let request = InspectRequest { tenant_shard_id };
446 :
447 : let response = self
448 : .dispatch::<_, InspectResponse>(
449 : Method::POST,
450 : "debug/v1/inspect".to_string(),
451 : Some(request),
452 : )
453 : .await?;
454 :
455 : Ok(response.attachment)
456 : }
457 :
458 0 : #[instrument(skip(self))]
459 : pub async fn tenant_create(
460 : &self,
461 : req: TenantCreateRequest,
462 : ) -> anyhow::Result<TenantCreateResponse> {
463 : self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
464 : .await
465 : }
466 :
467 0 : #[instrument(skip(self))]
468 : pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
469 : self.dispatch::<(), _>(
470 : Method::GET,
471 : format!("control/v1/tenant/{tenant_id}/locate"),
472 : None,
473 : )
474 : .await
475 : }
476 :
477 0 : #[instrument(skip(self))]
478 : pub async fn tenant_migrate(
479 : &self,
480 : tenant_shard_id: TenantShardId,
481 : node_id: NodeId,
482 : ) -> anyhow::Result<TenantShardMigrateResponse> {
483 : self.dispatch(
484 : Method::PUT,
485 : format!("control/v1/tenant/{tenant_shard_id}/migrate"),
486 : Some(TenantShardMigrateRequest {
487 : tenant_shard_id,
488 : node_id,
489 : }),
490 : )
491 : .await
492 : }
493 :
494 0 : #[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
495 : pub async fn tenant_split(
496 : &self,
497 : tenant_id: TenantId,
498 : new_shard_count: u8,
499 : ) -> anyhow::Result<TenantShardSplitResponse> {
500 : self.dispatch(
501 : Method::PUT,
502 : format!("control/v1/tenant/{tenant_id}/shard_split"),
503 : Some(TenantShardSplitRequest { new_shard_count }),
504 : )
505 : .await
506 : }
507 :
508 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
509 : pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
510 : self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
511 : .await
512 : }
513 :
514 0 : #[instrument(skip_all, fields(node_id=%req.node_id))]
515 : pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
516 : self.dispatch::<_, ()>(
517 : Method::PUT,
518 : format!("control/v1/node/{}/config", req.node_id),
519 : Some(req),
520 : )
521 : .await
522 : }
523 :
524 0 : #[instrument(skip(self))]
525 : pub async fn ready(&self) -> anyhow::Result<()> {
526 : self.dispatch::<(), ()>(Method::GET, "ready".to_string(), None)
527 : .await
528 : }
529 :
530 0 : #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
531 : pub async fn tenant_timeline_create(
532 : &self,
533 : tenant_id: TenantId,
534 : req: TimelineCreateRequest,
535 : ) -> anyhow::Result<TimelineInfo> {
536 : self.dispatch(
537 : Method::POST,
538 : format!("v1/tenant/{tenant_id}/timeline"),
539 : Some(req),
540 : )
541 : .await
542 : }
543 : }
|