LCOV - differential code coverage report
Current view: top level - control_plane/src/bin - attachment_service.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 95.6 % 206 197 9 197
Current Date: 2024-01-09 02:06:09 Functions: 67.4 % 86 58 28 58
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : /// The attachment service mimics the aspects of the control plane API
       2                 : /// that are required for a pageserver to operate.
       3                 : ///
       4                 : /// This enables running & testing pageservers without a full-blown
       5                 : /// deployment of the Neon cloud platform.
       6                 : ///
       7                 : use anyhow::anyhow;
       8                 : use clap::Parser;
       9                 : use hex::FromHex;
      10                 : use hyper::StatusCode;
      11                 : use hyper::{Body, Request, Response};
      12                 : use pageserver_api::shard::TenantShardId;
      13                 : use serde::{Deserialize, Serialize};
      14                 : use std::path::{Path, PathBuf};
      15                 : use std::{collections::HashMap, sync::Arc};
      16                 : use utils::http::endpoint::request_span;
      17                 : use utils::logging::{self, LogFormat};
      18                 : use utils::signals::{ShutdownSignals, Signal};
      19                 : 
      20                 : use utils::{
      21                 :     http::{
      22                 :         endpoint::{self},
      23                 :         error::ApiError,
      24                 :         json::{json_request, json_response},
      25                 :         RequestExt, RouterBuilder,
      26                 :     },
      27                 :     id::{NodeId, TenantId},
      28                 :     tcp_listener,
      29                 : };
      30                 : 
      31                 : use pageserver_api::control_api::{
      32                 :     ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
      33                 :     ValidateResponseTenant,
      34                 : };
      35                 : 
      36                 : use control_plane::attachment_service::{
      37                 :     AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
      38                 : };
      39                 : 
      40 CBC         336 : #[derive(Parser)]
      41                 : #[command(author, version, about, long_about = None)]
      42                 : #[command(arg_required_else_help(true))]
      43                 : struct Cli {
      44                 :     /// Host and port to listen on, like `127.0.0.1:1234`
      45                 :     #[arg(short, long)]
      46 UBC           0 :     listen: std::net::SocketAddr,
      47                 : 
      48                 :     /// Path to the .json file to store state (will be created if it doesn't exist)
      49                 :     #[arg(short, long)]
      50               0 :     path: PathBuf,
      51                 : }
      52                 : 
      53                 : // The persistent state of each Tenant
      54 CBC         974 : #[derive(Serialize, Deserialize, Clone)]
      55                 : struct TenantState {
      56                 :     // Currently attached pageserver
      57                 :     pageserver: Option<NodeId>,
      58                 : 
      59                 :     // Latest generation number: next time we attach, increment this
      60                 :     // and use the incremented number when attaching
      61                 :     generation: u32,
      62                 : }
      63                 : 
      64            1110 : fn to_hex_map<S, V>(input: &HashMap<TenantId, V>, serializer: S) -> Result<S::Ok, S::Error>
      65            1110 : where
      66            1110 :     S: serde::Serializer,
      67            1110 :     V: Clone + Serialize,
      68            1110 : {
      69            1110 :     let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone()));
      70            1110 : 
      71            1110 :     transformed
      72            1110 :         .collect::<HashMap<String, V>>()
      73            1110 :         .serialize(serializer)
      74            1110 : }
      75                 : 
      76               4 : fn from_hex_map<'de, D, V>(deserializer: D) -> Result<HashMap<TenantId, V>, D::Error>
      77               4 : where
      78               4 :     D: serde::de::Deserializer<'de>,
      79               4 :     V: Deserialize<'de>,
      80               4 : {
      81               4 :     let hex_map = HashMap::<String, V>::deserialize(deserializer)?;
      82               4 :     hex_map
      83               4 :         .into_iter()
      84               4 :         .map(|(k, v)| {
      85               4 :             TenantId::from_hex(k)
      86               4 :                 .map(|k| (k, v))
      87               4 :                 .map_err(serde::de::Error::custom)
      88               4 :         })
      89               4 :         .collect()
      90               4 : }
      91                 : 
      92                 : // Top level state available to all HTTP handlers
      93            1110 : #[derive(Serialize, Deserialize)]
      94                 : struct PersistentState {
      95                 :     #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
      96                 :     tenants: HashMap<TenantId, TenantState>,
      97                 : 
      98                 :     #[serde(skip)]
      99                 :     path: PathBuf,
     100                 : }
     101                 : 
     102                 : impl PersistentState {
     103            1110 :     async fn save(&self) -> anyhow::Result<()> {
     104            1110 :         let bytes = serde_json::to_vec(self)?;
     105            1113 :         tokio::fs::write(&self.path, &bytes).await?;
     106                 : 
     107            1110 :         Ok(())
     108            1110 :     }
     109                 : 
     110             336 :     async fn load(path: &Path) -> anyhow::Result<Self> {
     111             336 :         let bytes = tokio::fs::read(path).await?;
     112               4 :         let mut decoded = serde_json::from_slice::<Self>(&bytes)?;
     113               4 :         decoded.path = path.to_owned();
     114               4 :         Ok(decoded)
     115             336 :     }
     116                 : 
     117             336 :     async fn load_or_new(path: &Path) -> Self {
     118             336 :         match Self::load(path).await {
     119               4 :             Ok(s) => {
     120               4 :                 tracing::info!("Loaded state file at {}", path.display());
     121               4 :                 s
     122                 :             }
     123             332 :             Err(e)
     124             332 :                 if e.downcast_ref::<std::io::Error>()
     125             332 :                     .map(|e| e.kind() == std::io::ErrorKind::NotFound)
     126             332 :                     .unwrap_or(false) =>
     127                 :             {
     128             332 :                 tracing::info!("Will create state file at {}", path.display());
     129             332 :                 Self {
     130             332 :                     tenants: HashMap::new(),
     131             332 :                     path: path.to_owned(),
     132             332 :                 }
     133                 :             }
     134 UBC           0 :             Err(e) => {
     135               0 :                 panic!("Failed to load state from '{}': {e:#} (maybe your .neon/ dir was written by an older version?)", path.display())
     136                 :             }
     137                 :         }
     138 CBC         336 :     }
     139                 : }
     140                 : 
     141                 : /// State available to HTTP request handlers
     142 UBC           0 : #[derive(Clone)]
     143                 : struct State {
     144                 :     inner: Arc<tokio::sync::RwLock<PersistentState>>,
     145                 : }
     146                 : 
     147                 : impl State {
     148 CBC         336 :     fn new(persistent_state: PersistentState) -> State {
     149             336 :         Self {
     150             336 :             inner: Arc::new(tokio::sync::RwLock::new(persistent_state)),
     151             336 :         }
     152             336 :     }
     153                 : }
     154                 : 
     155                 : #[inline(always)]
     156            1556 : fn get_state(request: &Request<Body>) -> &State {
     157            1556 :     request
     158            1556 :         .data::<Arc<State>>()
     159            1556 :         .expect("unknown state type")
     160            1556 :         .as_ref()
     161            1556 : }
     162                 : 
     163                 : /// Pageserver calls into this on startup, to learn which tenants it should attach
     164             555 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     165             555 :     let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
     166                 : 
     167             555 :     let state = get_state(&req).inner.clone();
     168             555 :     let mut locked = state.write().await;
     169                 : 
     170             555 :     let mut response = ReAttachResponse {
     171             555 :         tenants: Vec::new(),
     172             555 :     };
     173             555 :     for (t, state) in &mut locked.tenants {
     174             249 :         if state.pageserver == Some(reattach_req.node_id) {
     175             223 :             state.generation += 1;
     176             223 :             response.tenants.push(ReAttachResponseTenant {
     177             223 :                 // TODO(sharding): make this shard-aware
     178             223 :                 id: TenantShardId::unsharded(*t),
     179             223 :                 gen: state.generation,
     180             223 :             });
     181             223 :         }
     182                 :     }
     183                 : 
     184             555 :     locked.save().await.map_err(ApiError::InternalServerError)?;
     185                 : 
     186             555 :     json_response(StatusCode::OK, response)
     187             555 : }
     188                 : 
     189                 : /// Pageserver calls into this before doing deletions, to confirm that it still
     190                 : /// holds the latest generation for the tenants with deletions enqueued
     191             444 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     192             444 :     let validate_req = json_request::<ValidateRequest>(&mut req).await?;
     193                 : 
     194             444 :     let locked = get_state(&req).inner.read().await;
     195                 : 
     196             444 :     let mut response = ValidateResponse {
     197             444 :         tenants: Vec::new(),
     198             444 :     };
     199                 : 
     200             955 :     for req_tenant in validate_req.tenants {
     201                 :         // TODO(sharding): make this shard-aware
     202             511 :         if let Some(tenant_state) = locked.tenants.get(&req_tenant.id.tenant_id) {
     203             511 :             let valid = tenant_state.generation == req_tenant.gen;
     204             511 :             tracing::info!(
     205             511 :                 "handle_validate: {}(gen {}): valid={valid} (latest {})",
     206             511 :                 req_tenant.id,
     207             511 :                 req_tenant.gen,
     208             511 :                 tenant_state.generation
     209             511 :             );
     210             511 :             response.tenants.push(ValidateResponseTenant {
     211             511 :                 id: req_tenant.id,
     212             511 :                 valid,
     213             511 :             });
     214 UBC           0 :         }
     215                 :     }
     216                 : 
     217 CBC         444 :     json_response(StatusCode::OK, response)
     218             444 : }
     219                 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
     220                 : /// (in the real control plane this is unnecessary, because the same program is managing
     221                 : ///  generation numbers and doing attachments).
     222             555 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     223             555 :     let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
     224                 : 
     225             555 :     let state = get_state(&req).inner.clone();
     226             555 :     let mut locked = state.write().await;
     227                 : 
     228             555 :     let tenant_state = locked
     229             555 :         .tenants
     230             555 :         .entry(attach_req.tenant_id)
     231             555 :         .or_insert_with(|| TenantState {
     232             431 :             pageserver: attach_req.node_id,
     233             431 :             generation: 0,
     234             555 :         });
     235                 : 
     236             555 :     if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
     237             553 :         tenant_state.generation += 1;
     238             553 :         tracing::info!(
     239             553 :             tenant_id = %attach_req.tenant_id,
     240             553 :             ps_id = %attaching_pageserver,
     241             553 :             generation = %tenant_state.generation,
     242             553 :             "issuing",
     243             553 :         );
     244               2 :     } else if let Some(ps_id) = tenant_state.pageserver {
     245               2 :         tracing::info!(
     246               2 :             tenant_id = %attach_req.tenant_id,
     247               2 :             %ps_id,
     248               2 :             generation = %tenant_state.generation,
     249               2 :             "dropping",
     250               2 :         );
     251                 :     } else {
     252 UBC           0 :         tracing::info!(
     253               0 :             tenant_id = %attach_req.tenant_id,
     254               0 :             "no-op: tenant already has no pageserver");
     255                 :     }
     256 CBC         555 :     tenant_state.pageserver = attach_req.node_id;
     257             555 :     let generation = tenant_state.generation;
     258                 : 
     259             555 :     tracing::info!(
     260             555 :         "handle_attach_hook: tenant {} set generation {}, pageserver {}",
     261             555 :         attach_req.tenant_id,
     262             555 :         tenant_state.generation,
     263             555 :         attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
     264             555 :     );
     265                 : 
     266             558 :     locked.save().await.map_err(ApiError::InternalServerError)?;
     267                 : 
     268             555 :     json_response(
     269             555 :         StatusCode::OK,
     270             555 :         AttachHookResponse {
     271             555 :             gen: attach_req.node_id.map(|_| generation),
     272             555 :         },
     273             555 :     )
     274             555 : }
     275                 : 
     276               2 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     277               2 :     let inspect_req = json_request::<InspectRequest>(&mut req).await?;
     278                 : 
     279               2 :     let state = get_state(&req).inner.clone();
     280               2 :     let locked = state.write().await;
     281               2 :     let tenant_state = locked.tenants.get(&inspect_req.tenant_id);
     282               2 : 
     283               2 :     json_response(
     284               2 :         StatusCode::OK,
     285               2 :         InspectResponse {
     286               2 :             attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
     287               2 :         },
     288               2 :     )
     289               2 : }
     290                 : 
     291             336 : fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
     292             336 :     endpoint::make_router()
     293             336 :         .data(Arc::new(State::new(persistent_state)))
     294             555 :         .post("/re-attach", |r| request_span(r, handle_re_attach))
     295             444 :         .post("/validate", |r| request_span(r, handle_validate))
     296             555 :         .post("/attach-hook", |r| request_span(r, handle_attach_hook))
     297             336 :         .post("/inspect", |r| request_span(r, handle_inspect))
     298             336 : }
     299                 : 
     300                 : #[tokio::main]
     301             336 : async fn main() -> anyhow::Result<()> {
     302             336 :     logging::init(
     303             336 :         LogFormat::Plain,
     304             336 :         logging::TracingErrorLayerEnablement::Disabled,
     305             336 :         logging::Output::Stdout,
     306             336 :     )?;
     307                 : 
     308             336 :     let args = Cli::parse();
     309             336 :     tracing::info!(
     310             336 :         "Starting, state at {}, listening on {}",
     311             336 :         args.path.to_string_lossy(),
     312             336 :         args.listen
     313             336 :     );
     314                 : 
     315             336 :     let persistent_state = PersistentState::load_or_new(&args.path).await;
     316                 : 
     317             336 :     let http_listener = tcp_listener::bind(args.listen)?;
     318             336 :     let router = make_router(persistent_state)
     319             336 :         .build()
     320             336 :         .map_err(|err| anyhow!(err))?;
     321             336 :     let service = utils::http::RouterService::new(router).unwrap();
     322             336 :     let server = hyper::Server::from_tcp(http_listener)?.serve(service);
     323             336 : 
     324             336 :     tracing::info!("Serving on {0}", args.listen);
     325                 : 
     326             336 :     tokio::task::spawn(server);
     327             336 : 
     328             336 :     ShutdownSignals::handle(|signal| match signal {
     329                 :         Signal::Interrupt | Signal::Terminate | Signal::Quit => {
     330             336 :             tracing::info!("Got {}. Terminating", signal.name());
     331                 :             // We're just a test helper: no graceful shutdown.
     332             336 :             std::process::exit(0);
     333                 :         }
     334             336 :     })?;
     335                 : 
     336             336 :     Ok(())
     337                 : }
        

Generated by: LCOV version 2.1-beta