TLA Line data Source code
1 : use crate::{background_process, local_env::LocalEnv};
2 : use anyhow::anyhow;
3 : use camino::Utf8PathBuf;
4 : use serde::{Deserialize, Serialize};
5 : use std::{path::PathBuf, process::Child};
6 : use utils::id::{NodeId, TenantId};
7 :
8 : pub struct AttachmentService {
9 : env: LocalEnv,
10 : listen: String,
11 : path: PathBuf,
12 : client: reqwest::Client,
13 : }
14 :
15 : const COMMAND: &str = "attachment_service";
16 :
17 CBC 2775 : #[derive(Serialize, Deserialize)]
18 : pub struct AttachHookRequest {
19 : pub tenant_id: TenantId,
20 : pub node_id: Option<NodeId>,
21 : }
22 :
23 1227 : #[derive(Serialize, Deserialize)]
24 : pub struct AttachHookResponse {
25 : pub gen: Option<u32>,
26 : }
27 :
28 6 : #[derive(Serialize, Deserialize)]
29 : pub struct InspectRequest {
30 : pub tenant_id: TenantId,
31 : }
32 :
33 2 : #[derive(Serialize, Deserialize)]
34 : pub struct InspectResponse {
35 : pub attachment: Option<(u32, NodeId)>,
36 : }
37 :
38 : impl AttachmentService {
39 1083 : pub fn from_env(env: &LocalEnv) -> Self {
40 1083 : let path = env.base_data_dir.join("attachments.json");
41 1083 :
42 1083 : // Makes no sense to construct this if pageservers aren't going to use it: assume
43 1083 : // pageservers have control plane API set
44 1083 : let listen_url = env.control_plane_api.clone().unwrap();
45 1083 :
46 1083 : let listen = format!(
47 1083 : "{}:{}",
48 1083 : listen_url.host_str().unwrap(),
49 1083 : listen_url.port().unwrap()
50 1083 : );
51 1083 :
52 1083 : Self {
53 1083 : env: env.clone(),
54 1083 : path,
55 1083 : listen,
56 1083 : client: reqwest::ClientBuilder::new()
57 1083 : .build()
58 1083 : .expect("Failed to construct http client"),
59 1083 : }
60 1083 : }
61 :
62 674 : fn pid_file(&self) -> Utf8PathBuf {
63 674 : Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("attachment_service.pid"))
64 674 : .expect("non-Unicode path")
65 674 : }
66 :
67 336 : pub async fn start(&self) -> anyhow::Result<Child> {
68 336 : let path_str = self.path.to_string_lossy();
69 336 :
70 336 : background_process::start_process(
71 336 : COMMAND,
72 336 : &self.env.base_data_dir,
73 336 : &self.env.attachment_service_bin(),
74 336 : ["-l", &self.listen, "-p", &path_str],
75 336 : [],
76 336 : background_process::InitialPidFile::Create(self.pid_file()),
77 336 : // TODO: a real status check
78 336 : || async move { anyhow::Ok(true) },
79 336 : )
80 UBC 0 : .await
81 CBC 336 : }
82 :
83 338 : pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
84 338 : background_process::stop_process(immediate, COMMAND, &self.pid_file())
85 338 : }
86 :
87 : /// Call into the attach_hook API, for use before handing out attachments to pageservers
88 409 : pub async fn attach_hook(
89 409 : &self,
90 409 : tenant_id: TenantId,
91 409 : pageserver_id: NodeId,
92 409 : ) -> anyhow::Result<Option<u32>> {
93 409 : use hyper::StatusCode;
94 409 :
95 409 : let url = self
96 409 : .env
97 409 : .control_plane_api
98 409 : .clone()
99 409 : .unwrap()
100 409 : .join("attach-hook")
101 409 : .unwrap();
102 409 :
103 409 : let request = AttachHookRequest {
104 409 : tenant_id,
105 409 : node_id: Some(pageserver_id),
106 409 : };
107 :
108 1227 : let response = self.client.post(url).json(&request).send().await?;
109 409 : if response.status() != StatusCode::OK {
110 UBC 0 : return Err(anyhow!("Unexpected status {}", response.status()));
111 CBC 409 : }
112 :
113 409 : let response = response.json::<AttachHookResponse>().await?;
114 409 : Ok(response.gen)
115 409 : }
116 :
117 UBC 0 : pub async fn inspect(&self, tenant_id: TenantId) -> anyhow::Result<Option<(u32, NodeId)>> {
118 0 : use hyper::StatusCode;
119 0 :
120 0 : let url = self
121 0 : .env
122 0 : .control_plane_api
123 0 : .clone()
124 0 : .unwrap()
125 0 : .join("inspect")
126 0 : .unwrap();
127 0 :
128 0 : let request = InspectRequest { tenant_id };
129 :
130 0 : let response = self.client.post(url).json(&request).send().await?;
131 0 : if response.status() != StatusCode::OK {
132 0 : return Err(anyhow!("Unexpected status {}", response.status()));
133 0 : }
134 :
135 0 : let response = response.json::<InspectResponse>().await?;
136 0 : Ok(response.attachment)
137 0 : }
138 : }
|