       1              : use std::collections::HashMap;
       2              : use std::str::FromStr;
       3              : 
       4              : use camino::Utf8Path;
       5              : use camino::Utf8PathBuf;
       6              : use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
       7              : use diesel::pg::PgConnection;
       8              : use diesel::prelude::*;
       9              : use diesel::Connection;
      10              : use pageserver_api::models::TenantConfig;
      11              : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
      12              : use serde::{Deserialize, Serialize};
      13              : use utils::generation::Generation;
      14              : use utils::id::{NodeId, TenantId};
      15              : 
      16              : use crate::node::Node;
      17              : use crate::PlacementPolicy;
      18              : 
      19              : /// ## What do we store?
      20              : ///
      21              : /// The attachment service does not store most of its state durably.
      22              : ///
      23              : /// The essential things to store durably are:
      24              : /// - generation numbers, as these must always advance monotonically to ensure data safety.
      25              : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
      26              : /// - Node's scheduling policies, as the source of truth for these is something external.
      27              : ///
      28              : /// Other things we store durably as an implementation detail:
      29              : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
      30              : ///   but it is operationally simpler to make this service the authority for which nodes
      31              : ///   it talks to.
      32              : ///
      33              : /// ## Performance/efficiency
      34              : ///
      35              : /// The attachment service does not go via the database for most things: there are
      36              : /// a couple of places where we must, and where efficiency matters:
      37              : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
      38              : ///   before it can attach a tenant, so this acts as a bound on how fast things like
      39              : ///   failover can happen.
      40              : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
      41              : ///   so it is important to avoid e.g. issuing O(N) queries.
      42              : ///
      43              : /// Database calls relating to nodes have low performance requirements, as they are very rarely
      44              : /// updated, and reads of nodes are always from memory, not the database.  We only require that
      45              : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
      46              : pub struct Persistence {
      47              :     database_url: String,
      48              : 
      49              :     // In test environments, we support loading+saving a JSON file.  This is temporary, for the benefit of
      50              :     //, so that we don't have to commit to making the database contents fully backward/forward
      51              :     // compatible just yet.
      52              :     json_path: Option<Utf8PathBuf>,
      53              : }
      54              : 
      55              : /// Legacy format, for use in JSON compat objects in test environment
      56          361 : #[derive(Serialize, Deserialize)]
      57              : struct JsonPersistence {
      58              :     tenants: HashMap<TenantShardId, TenantShardPersistence>,
      59              : }
      60              : 
      61            0 : #[derive(thiserror::Error, Debug)]
      62              : pub(crate) enum DatabaseError {
      63              :     #[error(transparent)]
      64              :     Query(#[from] diesel::result::Error),
      65              :     #[error(transparent)]
      66              :     Connection(#[from] diesel::result::ConnectionError),
      67              :     #[error("Logical error: {0}")]
      68              :     Logical(String),
      69              : }
      70              : 
      71              : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
      72              : 
      73              : impl Persistence {
      74          361 :     pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
      75          361 :         Self {
      76          361 :             database_url,
      77          361 :             json_path,
      78          361 :         }
      79          361 :     }
      80              : 
      81              :     /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
      82         3262 :     async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
      83         3262 :     where
      84         3262 :         F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
      85         3262 :         R: Send + 'static,
      86         3262 :     {
      87         3262 :         let database_url = self.database_url.clone();
      88         3262 :         tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
      89              :             // TODO: connection pooling, such as via diesel::r2d2
      90         3262 :             let mut conn = PgConnection::establish(&database_url)?;
      91         3262 :             func(&mut conn)
      92         3262 :         })
      93         3273 :         .await
      94         3262 :         .expect("Task panic")
      95         3262 :     }
      96              : 
      97              :     /// When a node is first registered, persist it before using it for anything
      98          388 :     pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
      99          388 :         let np = node.to_persistent();
     100          388 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     101          388 :             diesel::insert_into(crate::schema::nodes::table)
     102          388 :                 .values(&np)
     103          388 :                 .execute(conn)?;
     104          388 :             Ok(())
     105          388 :         })
     106          388 :         .await
     107          388 :     }
     108              : 
     109              :     /// At startup, populate the list of nodes which our shards may be placed on
     110          363 :     pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<Node>> {
     111          363 :         let nodes: Vec<Node> = self
     112          363 :             .with_conn(move |conn| -> DatabaseResult<_> {
     113          363 :                 Ok(crate::schema::nodes::table
     114          363 :                     .load::<NodePersistence>(conn)?
     115          363 :                     .into_iter()
     116          363 :                     .map(|n| Node {
     117           13 :                         id: NodeId(n.node_id as u64),
     118           13 :                         // At startup we consider a node offline until proven otherwise.
     119           13 :                         availability: NodeAvailability::Offline,
     120           13 :                         scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
     121           13 :                             .expect("Bad scheduling policy in DB"),
     122           13 :                         listen_http_addr: n.listen_http_addr,
     123           13 :                         listen_http_port: n.listen_http_port as u16,
     124           13 :                         listen_pg_addr: n.listen_pg_addr,
     125           13 :                         listen_pg_port: n.listen_pg_port as u16,
     126          363 :                     })
     127          363 :                     .collect::<Vec<Node>>())
     128          363 :             })
     129          363 :             .await?;
     130              : 
     131          363 :         tracing::info!("list_nodes: loaded {} nodes", nodes.len());
     132              : 
     133          363 :         Ok(nodes)
     134          363 :     }
     135              : 
     136              :     /// At startup, load the high level state for shards, such as their config + policy.  This will
     137              :     /// be enriched at runtime with state discovered on pageservers.
     138          722 :     pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
     139          722 :         let loaded = self
     140          722 :             .with_conn(move |conn| -> DatabaseResult<_> {
     141          722 :                 Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
     142          722 :             })
     143          722 :             .await?;
     144              : 
     145          722 :         if loaded.is_empty() {
     146          355 :             if let Some(path) = &self.json_path {
     147          355 :                 if tokio::fs::try_exists(path)
     148          351 :                     .await
     149          355 :                     .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
     150              :                 {
     151            2 :                     tracing::info!("Importing from legacy JSON format at {path}");
     152            4 :                     return self.list_tenant_shards_json(path).await;
     153          353 :                 }
     154            0 :             }
     155          367 :         }
     156          720 :         Ok(loaded)
     157          722 :     }
     158              : 
     159              :     /// Shim for automated compatibility tests: load tenants from a JSON file instead of database
     160            2 :     pub(crate) async fn list_tenant_shards_json(
     161            2 :         &self,
     162            2 :         path: &Utf8Path,
     163            2 :     ) -> DatabaseResult<Vec<TenantShardPersistence>> {
     164            2 :         let bytes = tokio::fs::read(path)
     165            2 :             .await
     166            2 :             .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
     167              : 
     168            2 :         let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
     169            2 :             .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
     170            4 :         for (tenant_id, tenant) in &mut decoded.tenants {
     171              :             // Backward compat: an old attachments.json from before PR #6251, replace
     172              :             // empty strings with proper defaults.
     173            2 :             if tenant.tenant_id.is_empty() {
     174            0 :                 tenant.tenant_id = tenant_id.to_string();
     175            0 :                 tenant.config = serde_json::to_string(&TenantConfig::default())
     176            0 :                     .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
     177            0 :                 tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default())
     178            0 :                     .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
     179            2 :             }
     180              :         }
     181              : 
     182            2 :         let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
     183            2 : 
     184            2 :         // Synchronize database with what is in the JSON file
     185            2 :         self.insert_tenant_shards(tenants.clone()).await?;
     186              : 
     187            2 :         Ok(tenants)
     188            2 :     }
     189              : 
     190              :     /// For use in testing environments, where we dump out JSON on shutdown.
     191          361 :     pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
     192          361 :         let Some(path) = &self.json_path else {
     193            0 :             anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
     194              :         };
     195          361 :         tracing::info!("Writing state to {path}...");
     196          361 :         let tenants = self.list_tenant_shards().await?;
     197          361 :         let mut tenants_map = HashMap::new();
     198          855 :         for tsp in tenants {
     199          494 :             let tenant_shard_id = TenantShardId {
     200          494 :                 tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
     201          494 :                 shard_number: ShardNumber(tsp.shard_number as u8),
     202          494 :                 shard_count: ShardCount(tsp.shard_count as u8),
     203          494 :             };
     204          494 : 
     205          494 :             tenants_map.insert(tenant_shard_id, tsp);
     206              :         }
     207          361 :         let json = serde_json::to_string(&JsonPersistence {
     208          361 :             tenants: tenants_map,
     209          361 :         })?;
     210              : 
     211          361 :         tokio::fs::write(path, &json).await?;
     212          361 :         tracing::info!("Wrote {} bytes to {path}...", json.len());
     213              : 
     214          361 :         Ok(())
     215          361 :     }
     216              : 
     217              :     /// Tenants must be persisted before we schedule them for the first time.  This enables us
     218              :     /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
     219          475 :     pub(crate) async fn insert_tenant_shards(
     220          475 :         &self,
     221          475 :         shards: Vec<TenantShardPersistence>,
     222          475 :     ) -> DatabaseResult<()> {
     223          475 :         use crate::schema::tenant_shards::dsl::*;
     224          475 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     225          475 :             conn.transaction(|conn| -> QueryResult<()> {
     226          974 :                 for tenant in &shards {
     227          499 :                     diesel::insert_into(tenant_shards)
     228          499 :                         .values(tenant)
     229          499 :                         .execute(conn)?;
     230              :                 }
     231          475 :                 Ok(())
     232          475 :             })?;
     233          475 :             Ok(())
     234          475 :         })
     235          475 :         .await
     236          475 :     }
     237              : 
     238              :     /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
     239              :     /// the tenant from memory on this server.
     240              :     #[allow(unused)]
     241            6 :     pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
     242            6 :         use crate::schema::tenant_shards::dsl::*;
     243            6 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     244            6 :             diesel::delete(tenant_shards)
     245            6 :                 .filter(tenant_id.eq(del_tenant_id.to_string()))
     246            6 :                 .execute(conn)?;
     247              : 
     248            6 :             Ok(())
     249            6 :         })
     250            6 :         .await
     251            6 :     }
     252              : 
     253              :     /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
     254              :     /// batched increment of the generations of all tenants whose generation_pageserver is equal to
     255              :     /// the node that called /re-attach.
     256            0 :     #[tracing::instrument(skip_all, fields(node_id))]
     257              :     pub(crate) async fn re_attach(
     258              :         &self,
     259              :         node_id: NodeId,
     260              :     ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
     261              :         use crate::schema::tenant_shards::dsl::*;
     262              :         let updated = self
     263          603 :             .with_conn(move |conn| {
     264          603 :                 let rows_updated = diesel::update(tenant_shards)
     265          603 :                     .filter(generation_pageserver.eq(node_id.0 as i64))
     266          603 :                     .set(generation.eq(generation + 1))
     267          603 :                     .execute(conn)?;
     268              : 
     269          603 :                 tracing::info!("Incremented {} tenants' generations", rows_updated);
     270              : 
     271              :                 // TODO: UPDATE+SELECT in one query
     272              : 
     273          603 :                 let updated = tenant_shards
     274          603 :                     .filter(generation_pageserver.eq(node_id.0 as i64))
     275          603 :                     .select(TenantShardPersistence::as_select())
     276          603 :                     .load(conn)?;
     277          603 :                 Ok(updated)
     278          603 :             })
     279              :             .await?;
     280              : 
     281              :         let mut result = HashMap::new();
     282              :         for tsp in updated {
     283              :             let tenant_shard_id = TenantShardId {
     284              :                 tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
     285            0 :                     .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
     286              :                 shard_number: ShardNumber(tsp.shard_number as u8),
     287              :                 shard_count: ShardCount(tsp.shard_count as u8),
     288              :             };
     289              :             result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
     290              :         }
     291              : 
     292              :         Ok(result)
     293              :     }
     294              : 
     295              :     /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
     296              :     /// advancing generation number.  We also store the NodeId for which the generation was issued, so that in
     297              :     /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
     298          698 :     pub(crate) async fn increment_generation(
     299          698 :         &self,
     300          698 :         tenant_shard_id: TenantShardId,
     301          698 :         node_id: NodeId,
     302          698 :     ) -> anyhow::Result<Generation> {
     303              :         use crate::schema::tenant_shards::dsl::*;
     304          698 :         let updated = self
     305          698 :             .with_conn(move |conn| {
     306          698 :                 let updated = diesel::update(tenant_shards)
     307          698 :                     .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
     308          698 :                     .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
     309          698 :                     .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
     310          698 :                     .set((
     311          698 :                         generation.eq(generation + 1),
     312          698 :                         generation_pageserver.eq(node_id.0 as i64),
     313          698 :                     ))
     314          698 :                     // TODO: only returning() the generation column
     315          698 :                     .returning(TenantShardPersistence::as_returning())
     316          698 :                     .get_result(conn)?;
     317              : 
     318          698 :                 Ok(updated)
     319          698 :             })
     320          709 :             .await?;
     321              : 
     322          698 :         Ok(Generation::new(updated.generation as u32))
     323          698 :     }
     324              : 
     325            7 :     pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
     326            7 :         use crate::schema::tenant_shards::dsl::*;
     327            7 :         self.with_conn(move |conn| {
     328            7 :             let updated = diesel::update(tenant_shards)
     329            7 :                 .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
     330            7 :                 .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
     331            7 :                 .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
     332            7 :                 .set((
     333            7 :                     generation_pageserver.eq(i64::MAX),
     334            7 :                     placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
     335            7 :                 ))
     336            7 :                 .execute(conn)?;
     337              : 
     338            7 :             Ok(updated)
     339            7 :         })
     340            7 :         .await?;
     341              : 
     342            7 :         Ok(())
     343            7 :     }
     344              : 
     345              :     // TODO: when we start shard splitting, we must durably mark the tenant so that
     346              :     // on restart, we know that we must go through recovery (list shards that exist
     347              :     // and pick up where we left off and/or revert to parent shards).
     348              :     #[allow(dead_code)]
     349            0 :     pub(crate) async fn begin_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
     350            0 :         todo!();
     351            0 :     }
     352              : 
     353              :     // TODO: when we finish shard splitting, we must atomically clean up the old shards
     354              :     // and insert the new shards, and clear the splitting marker.
     355              :     #[allow(dead_code)]
     356            0 :     pub(crate) async fn complete_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
     357            0 :         todo!();
     358            0 :     }
     359              : }
     360              : 
     361              : /// Parts of [`crate::tenant_state::TenantState`] that are stored durably
     362         1426 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone)]
     363              : #[diesel(table_name = crate::schema::tenant_shards)]
     364              : pub(crate) struct TenantShardPersistence {
     365              :     #[serde(default)]
     366              :     pub(crate) tenant_id: String,
     367              :     #[serde(default)]
     368              :     pub(crate) shard_number: i32,
     369              :     #[serde(default)]
     370              :     pub(crate) shard_count: i32,
     371              :     #[serde(default)]
     372              :     pub(crate) shard_stripe_size: i32,
     373              : 
     374              :     // Latest generation number: next time we attach, increment this
     375              :     // and use the incremented number when attaching
     376              :     pub(crate) generation: i32,
     377              : 
     378              :     // Currently attached pageserver
     379              :     #[serde(rename = "pageserver")]
     380              :     pub(crate) generation_pageserver: i64,
     381              : 
     382              :     #[serde(default)]
     383              :     pub(crate) placement_policy: String,
     384              :     #[serde(default)]
     385              :     pub(crate) config: String,
     386              : }
     387              : 
     388              : /// Parts of [`crate::node::Node`] that are stored durably
     389          388 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
     390              : #[diesel(table_name = crate::schema::nodes)]
     391              : pub(crate) struct NodePersistence {
     392              :     pub(crate) node_id: i64,
     393              :     pub(crate) scheduling_policy: String,
     394              :     pub(crate) listen_http_addr: String,
     395              :     pub(crate) listen_http_port: i32,
     396              :     pub(crate) listen_pg_addr: String,
     397              :     pub(crate) listen_pg_port: i32,
     398              : }

