TLA Line data Source code
1 : /// The attachment service mimics the aspects of the control plane API
2 : /// that are required for a pageserver to operate.
3 : ///
4 : /// This enables running & testing pageservers without a full-blown
5 : /// deployment of the Neon cloud platform.
6 : ///
7 : use anyhow::anyhow;
8 : use clap::Parser;
9 : use hex::FromHex;
10 : use hyper::StatusCode;
11 : use hyper::{Body, Request, Response};
12 : use pageserver_api::shard::TenantShardId;
13 : use serde::{Deserialize, Serialize};
14 : use std::path::{Path, PathBuf};
15 : use std::{collections::HashMap, sync::Arc};
16 : use utils::http::endpoint::request_span;
17 : use utils::logging::{self, LogFormat};
18 : use utils::signals::{ShutdownSignals, Signal};
19 :
20 : use utils::{
21 : http::{
22 : endpoint::{self},
23 : error::ApiError,
24 : json::{json_request, json_response},
25 : RequestExt, RouterBuilder,
26 : },
27 : id::{NodeId, TenantId},
28 : tcp_listener,
29 : };
30 :
31 : use pageserver_api::control_api::{
32 : ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
33 : ValidateResponseTenant,
34 : };
35 :
36 : use control_plane::attachment_service::{
37 : AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
38 : };
39 :
40 CBC 336 : #[derive(Parser)]
41 : #[command(author, version, about, long_about = None)]
42 : #[command(arg_required_else_help(true))]
43 : struct Cli {
44 : /// Host and port to listen on, like `127.0.0.1:1234`
45 : #[arg(short, long)]
46 UBC 0 : listen: std::net::SocketAddr,
47 :
48 : /// Path to the .json file to store state (will be created if it doesn't exist)
49 : #[arg(short, long)]
50 0 : path: PathBuf,
51 : }
52 :
53 : // The persistent state of each Tenant
54 CBC 974 : #[derive(Serialize, Deserialize, Clone)]
55 : struct TenantState {
56 : // Currently attached pageserver
57 : pageserver: Option<NodeId>,
58 :
59 : // Latest generation number: next time we attach, increment this
60 : // and use the incremented number when attaching
61 : generation: u32,
62 : }
63 :
64 1110 : fn to_hex_map<S, V>(input: &HashMap<TenantId, V>, serializer: S) -> Result<S::Ok, S::Error>
65 1110 : where
66 1110 : S: serde::Serializer,
67 1110 : V: Clone + Serialize,
68 1110 : {
69 1110 : let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone()));
70 1110 :
71 1110 : transformed
72 1110 : .collect::<HashMap<String, V>>()
73 1110 : .serialize(serializer)
74 1110 : }
75 :
76 4 : fn from_hex_map<'de, D, V>(deserializer: D) -> Result<HashMap<TenantId, V>, D::Error>
77 4 : where
78 4 : D: serde::de::Deserializer<'de>,
79 4 : V: Deserialize<'de>,
80 4 : {
81 4 : let hex_map = HashMap::<String, V>::deserialize(deserializer)?;
82 4 : hex_map
83 4 : .into_iter()
84 4 : .map(|(k, v)| {
85 4 : TenantId::from_hex(k)
86 4 : .map(|k| (k, v))
87 4 : .map_err(serde::de::Error::custom)
88 4 : })
89 4 : .collect()
90 4 : }
91 :
92 : // Top level state available to all HTTP handlers
93 1110 : #[derive(Serialize, Deserialize)]
94 : struct PersistentState {
95 : #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
96 : tenants: HashMap<TenantId, TenantState>,
97 :
98 : #[serde(skip)]
99 : path: PathBuf,
100 : }
101 :
102 : impl PersistentState {
103 1110 : async fn save(&self) -> anyhow::Result<()> {
104 1110 : let bytes = serde_json::to_vec(self)?;
105 1113 : tokio::fs::write(&self.path, &bytes).await?;
106 :
107 1110 : Ok(())
108 1110 : }
109 :
110 336 : async fn load(path: &Path) -> anyhow::Result<Self> {
111 336 : let bytes = tokio::fs::read(path).await?;
112 4 : let mut decoded = serde_json::from_slice::<Self>(&bytes)?;
113 4 : decoded.path = path.to_owned();
114 4 : Ok(decoded)
115 336 : }
116 :
117 336 : async fn load_or_new(path: &Path) -> Self {
118 336 : match Self::load(path).await {
119 4 : Ok(s) => {
120 4 : tracing::info!("Loaded state file at {}", path.display());
121 4 : s
122 : }
123 332 : Err(e)
124 332 : if e.downcast_ref::<std::io::Error>()
125 332 : .map(|e| e.kind() == std::io::ErrorKind::NotFound)
126 332 : .unwrap_or(false) =>
127 : {
128 332 : tracing::info!("Will create state file at {}", path.display());
129 332 : Self {
130 332 : tenants: HashMap::new(),
131 332 : path: path.to_owned(),
132 332 : }
133 : }
134 UBC 0 : Err(e) => {
135 0 : panic!("Failed to load state from '{}': {e:#} (maybe your .neon/ dir was written by an older version?)", path.display())
136 : }
137 : }
138 CBC 336 : }
139 : }
140 :
141 : /// State available to HTTP request handlers
142 UBC 0 : #[derive(Clone)]
143 : struct State {
144 : inner: Arc<tokio::sync::RwLock<PersistentState>>,
145 : }
146 :
147 : impl State {
148 CBC 336 : fn new(persistent_state: PersistentState) -> State {
149 336 : Self {
150 336 : inner: Arc::new(tokio::sync::RwLock::new(persistent_state)),
151 336 : }
152 336 : }
153 : }
154 :
155 : #[inline(always)]
156 1556 : fn get_state(request: &Request<Body>) -> &State {
157 1556 : request
158 1556 : .data::<Arc<State>>()
159 1556 : .expect("unknown state type")
160 1556 : .as_ref()
161 1556 : }
162 :
163 : /// Pageserver calls into this on startup, to learn which tenants it should attach
164 555 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
165 555 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
166 :
167 555 : let state = get_state(&req).inner.clone();
168 555 : let mut locked = state.write().await;
169 :
170 555 : let mut response = ReAttachResponse {
171 555 : tenants: Vec::new(),
172 555 : };
173 555 : for (t, state) in &mut locked.tenants {
174 249 : if state.pageserver == Some(reattach_req.node_id) {
175 223 : state.generation += 1;
176 223 : response.tenants.push(ReAttachResponseTenant {
177 223 : // TODO(sharding): make this shard-aware
178 223 : id: TenantShardId::unsharded(*t),
179 223 : gen: state.generation,
180 223 : });
181 223 : }
182 : }
183 :
184 555 : locked.save().await.map_err(ApiError::InternalServerError)?;
185 :
186 555 : json_response(StatusCode::OK, response)
187 555 : }
188 :
189 : /// Pageserver calls into this before doing deletions, to confirm that it still
190 : /// holds the latest generation for the tenants with deletions enqueued
191 444 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
192 444 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
193 :
194 444 : let locked = get_state(&req).inner.read().await;
195 :
196 444 : let mut response = ValidateResponse {
197 444 : tenants: Vec::new(),
198 444 : };
199 :
200 955 : for req_tenant in validate_req.tenants {
201 : // TODO(sharding): make this shard-aware
202 511 : if let Some(tenant_state) = locked.tenants.get(&req_tenant.id.tenant_id) {
203 511 : let valid = tenant_state.generation == req_tenant.gen;
204 511 : tracing::info!(
205 511 : "handle_validate: {}(gen {}): valid={valid} (latest {})",
206 511 : req_tenant.id,
207 511 : req_tenant.gen,
208 511 : tenant_state.generation
209 511 : );
210 511 : response.tenants.push(ValidateResponseTenant {
211 511 : id: req_tenant.id,
212 511 : valid,
213 511 : });
214 UBC 0 : }
215 : }
216 :
217 CBC 444 : json_response(StatusCode::OK, response)
218 444 : }
219 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
220 : /// (in the real control plane this is unnecessary, because the same program is managing
221 : /// generation numbers and doing attachments).
222 555 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
223 555 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
224 :
225 555 : let state = get_state(&req).inner.clone();
226 555 : let mut locked = state.write().await;
227 :
228 555 : let tenant_state = locked
229 555 : .tenants
230 555 : .entry(attach_req.tenant_id)
231 555 : .or_insert_with(|| TenantState {
232 431 : pageserver: attach_req.node_id,
233 431 : generation: 0,
234 555 : });
235 :
236 555 : if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
237 553 : tenant_state.generation += 1;
238 553 : tracing::info!(
239 553 : tenant_id = %attach_req.tenant_id,
240 553 : ps_id = %attaching_pageserver,
241 553 : generation = %tenant_state.generation,
242 553 : "issuing",
243 553 : );
244 2 : } else if let Some(ps_id) = tenant_state.pageserver {
245 2 : tracing::info!(
246 2 : tenant_id = %attach_req.tenant_id,
247 2 : %ps_id,
248 2 : generation = %tenant_state.generation,
249 2 : "dropping",
250 2 : );
251 : } else {
252 UBC 0 : tracing::info!(
253 0 : tenant_id = %attach_req.tenant_id,
254 0 : "no-op: tenant already has no pageserver");
255 : }
256 CBC 555 : tenant_state.pageserver = attach_req.node_id;
257 555 : let generation = tenant_state.generation;
258 :
259 555 : tracing::info!(
260 555 : "handle_attach_hook: tenant {} set generation {}, pageserver {}",
261 555 : attach_req.tenant_id,
262 555 : tenant_state.generation,
263 555 : attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
264 555 : );
265 :
266 558 : locked.save().await.map_err(ApiError::InternalServerError)?;
267 :
268 555 : json_response(
269 555 : StatusCode::OK,
270 555 : AttachHookResponse {
271 555 : gen: attach_req.node_id.map(|_| generation),
272 555 : },
273 555 : )
274 555 : }
275 :
276 2 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
277 2 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
278 :
279 2 : let state = get_state(&req).inner.clone();
280 2 : let locked = state.write().await;
281 2 : let tenant_state = locked.tenants.get(&inspect_req.tenant_id);
282 2 :
283 2 : json_response(
284 2 : StatusCode::OK,
285 2 : InspectResponse {
286 2 : attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
287 2 : },
288 2 : )
289 2 : }
290 :
291 336 : fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
292 336 : endpoint::make_router()
293 336 : .data(Arc::new(State::new(persistent_state)))
294 555 : .post("/re-attach", |r| request_span(r, handle_re_attach))
295 444 : .post("/validate", |r| request_span(r, handle_validate))
296 555 : .post("/attach-hook", |r| request_span(r, handle_attach_hook))
297 336 : .post("/inspect", |r| request_span(r, handle_inspect))
298 336 : }
299 :
300 : #[tokio::main]
301 336 : async fn main() -> anyhow::Result<()> {
302 336 : logging::init(
303 336 : LogFormat::Plain,
304 336 : logging::TracingErrorLayerEnablement::Disabled,
305 336 : logging::Output::Stdout,
306 336 : )?;
307 :
308 336 : let args = Cli::parse();
309 336 : tracing::info!(
310 336 : "Starting, state at {}, listening on {}",
311 336 : args.path.to_string_lossy(),
312 336 : args.listen
313 336 : );
314 :
315 336 : let persistent_state = PersistentState::load_or_new(&args.path).await;
316 :
317 336 : let http_listener = tcp_listener::bind(args.listen)?;
318 336 : let router = make_router(persistent_state)
319 336 : .build()
320 336 : .map_err(|err| anyhow!(err))?;
321 336 : let service = utils::http::RouterService::new(router).unwrap();
322 336 : let server = hyper::Server::from_tcp(http_listener)?.serve(service);
323 336 :
324 336 : tracing::info!("Serving on {0}", args.listen);
325 :
326 336 : tokio::task::spawn(server);
327 336 :
328 336 : ShutdownSignals::handle(|signal| match signal {
329 : Signal::Interrupt | Signal::Terminate | Signal::Quit => {
330 336 : tracing::info!("Got {}. Terminating", signal.name());
331 : // We're just a test helper: no graceful shutdown.
332 336 : std::process::exit(0);
333 : }
334 336 : })?;
335 :
336 336 : Ok(())
337 : }
|