LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - persistence.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 94.9 % 295 280
Test Date: 2024-02-14 18:05:35 Functions: 68.3 % 145 99

            Line data    Source code
       1              : pub(crate) mod split_state;
       2              : use std::collections::HashMap;
       3              : use std::str::FromStr;
       4              : use std::time::Duration;
       5              : 
       6              : use self::split_state::SplitState;
       7              : use camino::Utf8Path;
       8              : use camino::Utf8PathBuf;
       9              : use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
      10              : use diesel::pg::PgConnection;
      11              : use diesel::prelude::*;
      12              : use diesel::Connection;
      13              : use pageserver_api::models::TenantConfig;
      14              : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
      15              : use serde::{Deserialize, Serialize};
      16              : use utils::generation::Generation;
      17              : use utils::id::{NodeId, TenantId};
      18              : 
      19              : use crate::node::Node;
      20              : use crate::PlacementPolicy;
      21              : 
      22              : /// ## What do we store?
      23              : ///
      24              : /// The attachment service does not store most of its state durably.
      25              : ///
      26              : /// The essential things to store durably are:
      27              : /// - generation numbers, as these must always advance monotonically to ensure data safety.
      28              : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
      29              : /// - Node's scheduling policies, as the source of truth for these is something external.
      30              : ///
      31              : /// Other things we store durably as an implementation detail:
      32              : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
      33              : ///   but it is operationally simpler to make this service the authority for which nodes
      34              : ///   it talks to.
      35              : ///
      36              : /// ## Performance/efficiency
      37              : ///
      38              : /// The attachment service does not go via the database for most things: there are
      39              : /// a couple of places where we must, and where efficiency matters:
      40              : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
      41              : ///   before it can attach a tenant, so this acts as a bound on how fast things like
      42              : ///   failover can happen.
      43              : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
      44              : ///   so it is important to avoid e.g. issuing O(N) queries.
      45              : ///
      46              : /// Database calls relating to nodes have low performance requirements, as they are very rarely
      47              : /// updated, and reads of nodes are always from memory, not the database.  We only require that
      48              : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
      49              : pub struct Persistence {
      50              :     connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
      51              : 
      52              :     // In test environments, we support loading+saving a JSON file.  This is temporary, for the benefit of
      53              :     // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
      54              :     // compatible just yet.
      55              :     json_path: Option<Utf8PathBuf>,
      56              : }
      57              : 
      58              : /// Legacy format, for use in JSON compat objects in test environment
      59          366 : #[derive(Serialize, Deserialize)]
      60              : struct JsonPersistence {
      61              :     tenants: HashMap<TenantShardId, TenantShardPersistence>,
      62              : }
      63              : 
      64            0 : #[derive(thiserror::Error, Debug)]
      65              : pub(crate) enum DatabaseError {
      66              :     #[error(transparent)]
      67              :     Query(#[from] diesel::result::Error),
      68              :     #[error(transparent)]
      69              :     Connection(#[from] diesel::result::ConnectionError),
      70              :     #[error(transparent)]
      71              :     ConnectionPool(#[from] r2d2::Error),
      72              :     #[error("Logical error: {0}")]
      73              :     Logical(String),
      74              : }
      75              : 
      76              : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
      77              : 
      78              : impl Persistence {
      79              :     // The default postgres connection limit is 100.  We use up to 99, to leave one free for a human admin under
      80              :     // normal circumstances.  This assumes we have exclusive use of the database cluster to which we connect.
      81              :     pub const MAX_CONNECTIONS: u32 = 99;
      82              : 
      83              :     // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
      84              :     const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
      85              :     const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
      86              : 
      87          366 :     pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
      88          366 :         let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
      89          366 : 
      90          366 :         // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
      91          366 :         // to execute queries (database queries are not generally on latency-sensitive paths).
      92          366 :         let connection_pool = diesel::r2d2::Pool::builder()
      93          366 :             .max_size(Self::MAX_CONNECTIONS)
      94          366 :             .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
      95          366 :             .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
      96          366 :             // Always keep at least one connection ready to go
      97          366 :             .min_idle(Some(1))
      98          366 :             .test_on_check_out(true)
      99          366 :             .build(manager)
     100          366 :             .expect("Could not build connection pool");
     101          366 : 
     102          366 :         Self {
     103          366 :             connection_pool,
     104          366 :             json_path,
     105          366 :         }
     106          366 :     }
     107              : 
     108              :     /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
     109         3330 :     async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
     110         3330 :     where
     111         3330 :         F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
     112         3330 :         R: Send + 'static,
     113         3330 :     {
     114         3330 :         let mut conn = self.connection_pool.get()?;
     115         3330 :         tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
     116         3347 :             .await
     117         3330 :             .expect("Task panic")
     118         3330 :     }
     119              : 
     120              :     /// When a node is first registered, persist it before using it for anything
     121          400 :     pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
     122          400 :         let np = node.to_persistent();
     123          400 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     124          400 :             diesel::insert_into(crate::schema::nodes::table)
     125          400 :                 .values(&np)
     126          400 :                 .execute(conn)?;
     127          400 :             Ok(())
     128          400 :         })
     129          400 :         .await
     130          400 :     }
     131              : 
     132              :     /// At startup, populate the list of nodes which our shards may be placed on
     133          371 :     pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<Node>> {
     134          371 :         let nodes: Vec<Node> = self
     135          371 :             .with_conn(move |conn| -> DatabaseResult<_> {
     136          371 :                 Ok(crate::schema::nodes::table
     137          371 :                     .load::<NodePersistence>(conn)?
     138          371 :                     .into_iter()
     139          371 :                     .map(|n| Node {
     140           21 :                         id: NodeId(n.node_id as u64),
     141           21 :                         // At startup we consider a node offline until proven otherwise.
     142           21 :                         availability: NodeAvailability::Offline,
     143           21 :                         scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
     144           21 :                             .expect("Bad scheduling policy in DB"),
     145           21 :                         listen_http_addr: n.listen_http_addr,
     146           21 :                         listen_http_port: n.listen_http_port as u16,
     147           21 :                         listen_pg_addr: n.listen_pg_addr,
     148           21 :                         listen_pg_port: n.listen_pg_port as u16,
     149          371 :                     })
     150          371 :                     .collect::<Vec<Node>>())
     151          371 :             })
     152          371 :             .await?;
     153              : 
     154          371 :         tracing::info!("list_nodes: loaded {} nodes", nodes.len());
     155              : 
     156          371 :         Ok(nodes)
     157          371 :     }
     158              : 
     159              :     /// At startup, load the high level state for shards, such as their config + policy.  This will
     160              :     /// be enriched at runtime with state discovered on pageservers.
     161          732 :     pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
     162          732 :         let loaded = self
     163          732 :             .with_conn(move |conn| -> DatabaseResult<_> {
     164          732 :                 Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
     165          732 :             })
     166          732 :             .await?;
     167              : 
     168          732 :         if loaded.is_empty() {
     169          358 :             if let Some(path) = &self.json_path {
     170          358 :                 if tokio::fs::try_exists(path)
     171          358 :                     .await
     172          358 :                     .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
     173              :                 {
     174            2 :                     tracing::info!("Importing from legacy JSON format at {path}");
     175            4 :                     return self.list_tenant_shards_json(path).await;
     176          356 :                 }
     177            0 :             }
     178          374 :         }
     179          730 :         Ok(loaded)
     180          732 :     }
     181              : 
     182              :     /// Shim for automated compatibility tests: load tenants from a JSON file instead of database
     183            2 :     pub(crate) async fn list_tenant_shards_json(
     184            2 :         &self,
     185            2 :         path: &Utf8Path,
     186            2 :     ) -> DatabaseResult<Vec<TenantShardPersistence>> {
     187            2 :         let bytes = tokio::fs::read(path)
     188            2 :             .await
     189            2 :             .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
     190              : 
     191            2 :         let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
     192            2 :             .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
     193            4 :         for (tenant_id, tenant) in &mut decoded.tenants {
     194              :             // Backward compat: an old attachments.json from before PR #6251, replace
     195              :             // empty strings with proper defaults.
     196            2 :             if tenant.tenant_id.is_empty() {
     197            0 :                 tenant.tenant_id = tenant_id.to_string();
     198            0 :                 tenant.config = serde_json::to_string(&TenantConfig::default())
     199            0 :                     .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
     200            0 :                 tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default())
     201            0 :                     .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
     202            2 :             }
     203              :         }
     204              : 
     205            2 :         let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
     206            2 : 
     207            2 :         // Synchronize database with what is in the JSON file
     208            2 :         self.insert_tenant_shards(tenants.clone()).await?;
     209              : 
     210            2 :         Ok(tenants)
     211            2 :     }
     212              : 
     213              :     /// For use in testing environments, where we dump out JSON on shutdown.
     214          366 :     pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
     215          366 :         let Some(path) = &self.json_path else {
     216            0 :             anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
     217              :         };
     218          366 :         tracing::info!("Writing state to {path}...");
     219          366 :         let tenants = self.list_tenant_shards().await?;
     220          366 :         let mut tenants_map = HashMap::new();
     221          874 :         for tsp in tenants {
     222          508 :             let tenant_shard_id = TenantShardId {
     223          508 :                 tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
     224          508 :                 shard_number: ShardNumber(tsp.shard_number as u8),
     225          508 :                 shard_count: ShardCount(tsp.shard_count as u8),
     226          508 :             };
     227          508 : 
     228          508 :             tenants_map.insert(tenant_shard_id, tsp);
     229              :         }
     230          366 :         let json = serde_json::to_string(&JsonPersistence {
     231          366 :             tenants: tenants_map,
     232          366 :         })?;
     233              : 
     234          366 :         tokio::fs::write(path, &json).await?;
     235          366 :         tracing::info!("Wrote {} bytes to {path}...", json.len());
     236              : 
     237          366 :         Ok(())
     238          366 :     }
     239              : 
     240              :     /// Tenants must be persisted before we schedule them for the first time.  This enables us
     241              :     /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
     242          481 :     pub(crate) async fn insert_tenant_shards(
     243          481 :         &self,
     244          481 :         shards: Vec<TenantShardPersistence>,
     245          481 :     ) -> DatabaseResult<()> {
     246          481 :         use crate::schema::tenant_shards::dsl::*;
     247          481 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     248          481 :             conn.transaction(|conn| -> QueryResult<()> {
     249          988 :                 for tenant in &shards {
     250          509 :                     diesel::insert_into(tenant_shards)
     251          509 :                         .values(tenant)
     252          509 :                         .execute(conn)?;
     253              :                 }
     254          479 :                 Ok(())
     255          481 :             })?;
     256          479 :             Ok(())
     257          481 :         })
     258          482 :         .await
     259          481 :     }
     260              : 
     261              :     /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
     262              :     /// the tenant from memory on this server.
     263            7 :     pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
     264            7 :         use crate::schema::tenant_shards::dsl::*;
     265            7 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     266            7 :             diesel::delete(tenant_shards)
     267            7 :                 .filter(tenant_id.eq(del_tenant_id.to_string()))
     268            7 :                 .execute(conn)?;
     269              : 
     270            7 :             Ok(())
     271            7 :         })
     272            7 :         .await
     273            7 :     }
     274              : 
     275            1 :     pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
     276            1 :         use crate::schema::nodes::dsl::*;
     277            1 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     278            1 :             diesel::delete(nodes)
     279            1 :                 .filter(node_id.eq(del_node_id.0 as i64))
     280            1 :                 .execute(conn)?;
     281              : 
     282            1 :             Ok(())
     283            1 :         })
     284            1 :         .await
     285            1 :     }
     286              : 
     287              :     /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
     288              :     /// batched increment of the generations of all tenants whose generation_pageserver is equal to
     289              :     /// the node that called /re-attach.
     290         1248 :     #[tracing::instrument(skip_all, fields(node_id))]
     291              :     pub(crate) async fn re_attach(
     292              :         &self,
     293              :         node_id: NodeId,
     294              :     ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
     295              :         use crate::schema::tenant_shards::dsl::*;
     296              :         let updated = self
     297          624 :             .with_conn(move |conn| {
     298          624 :                 let rows_updated = diesel::update(tenant_shards)
     299          624 :                     .filter(generation_pageserver.eq(node_id.0 as i64))
     300          624 :                     .set(generation.eq(generation + 1))
     301          624 :                     .execute(conn)?;
     302              : 
     303          624 :                 tracing::info!("Incremented {} tenants' generations", rows_updated);
     304              : 
     305              :                 // TODO: UPDATE+SELECT in one query
     306              : 
     307          624 :                 let updated = tenant_shards
     308          624 :                     .filter(generation_pageserver.eq(node_id.0 as i64))
     309          624 :                     .select(TenantShardPersistence::as_select())
     310          624 :                     .load(conn)?;
     311          624 :                 Ok(updated)
     312          624 :             })
     313              :             .await?;
     314              : 
     315              :         let mut result = HashMap::new();
     316              :         for tsp in updated {
     317              :             let tenant_shard_id = TenantShardId {
     318              :                 tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
     319            0 :                     .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
     320              :                 shard_number: ShardNumber(tsp.shard_number as u8),
     321              :                 shard_count: ShardCount(tsp.shard_count as u8),
     322              :             };
     323              :             result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
     324              :         }
     325              : 
     326              :         Ok(result)
     327              :     }
     328              : 
     329              :     /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
     330              :     /// advancing generation number.  We also store the NodeId for which the generation was issued, so that in
     331              :     /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
     332          703 :     pub(crate) async fn increment_generation(
     333          703 :         &self,
     334          703 :         tenant_shard_id: TenantShardId,
     335          703 :         node_id: NodeId,
     336          703 :     ) -> anyhow::Result<Generation> {
     337              :         use crate::schema::tenant_shards::dsl::*;
     338          703 :         let updated = self
     339          703 :             .with_conn(move |conn| {
     340          703 :                 let updated = diesel::update(tenant_shards)
     341          703 :                     .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
     342          703 :                     .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
     343          703 :                     .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
     344          703 :                     .set((
     345          703 :                         generation.eq(generation + 1),
     346          703 :                         generation_pageserver.eq(node_id.0 as i64),
     347          703 :                     ))
     348          703 :                     // TODO: only returning() the generation column
     349          703 :                     .returning(TenantShardPersistence::as_returning())
     350          703 :                     .get_result(conn)?;
     351              : 
     352          703 :                 Ok(updated)
     353          703 :             })
     354          719 :             .await?;
     355              : 
     356          703 :         Ok(Generation::new(updated.generation as u32))
     357          703 :     }
     358              : 
     359            7 :     pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
     360            7 :         use crate::schema::tenant_shards::dsl::*;
     361            7 :         self.with_conn(move |conn| {
     362            7 :             let updated = diesel::update(tenant_shards)
     363            7 :                 .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
     364            7 :                 .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
     365            7 :                 .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
     366            7 :                 .set((
     367            7 :                     generation_pageserver.eq(i64::MAX),
     368            7 :                     placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
     369            7 :                 ))
     370            7 :                 .execute(conn)?;
     371              : 
     372            7 :             Ok(updated)
     373            7 :         })
     374            7 :         .await?;
     375              : 
     376            7 :         Ok(())
     377            7 :     }
     378              : 
     379              :     // When we start shard splitting, we must durably mark the tenant so that
     380              :     // on restart, we know that we must go through recovery.
     381              :     //
     382              :     // We create the child shards here, so that they will be available for increment_generation calls
     383              :     // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
     384              :     #[allow(dead_code)]
     385            2 :     pub(crate) async fn begin_shard_split(
     386            2 :         &self,
     387            2 :         old_shard_count: ShardCount,
     388            2 :         split_tenant_id: TenantId,
     389            2 :         parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
     390            2 :     ) -> DatabaseResult<()> {
     391            2 :         use crate::schema::tenant_shards::dsl::*;
     392            2 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     393            2 :             conn.transaction(|conn| -> DatabaseResult<()> {
     394            2 :                 // Mark parent shards as splitting
     395            2 : 
     396            2 :                 let expect_parent_records = std::cmp::max(1, old_shard_count.0);
     397              : 
     398            2 :                 let updated = diesel::update(tenant_shards)
     399            2 :                     .filter(tenant_id.eq(split_tenant_id.to_string()))
     400            2 :                     .filter(shard_count.eq(old_shard_count.0 as i32))
     401            2 :                     .set((splitting.eq(1),))
     402            2 :                     .execute(conn)?;
     403            2 :                 if u8::try_from(updated)
     404            2 :                     .map_err(|_| DatabaseError::Logical(
     405            2 :                         format!("Overflow existing shard count {} while splitting", updated))
     406            2 :                     )? != expect_parent_records {
     407              :                     // Perhaps a deletion or another split raced with this attempt to split, mutating
     408              :                     // the parent shards that we intend to split. In this case the split request should fail.
     409            0 :                     return Err(DatabaseError::Logical(
     410            0 :                         format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {expect_parent_records})")
     411            0 :                     ));
     412            2 :                 }
     413            2 : 
     414            2 :                 // FIXME: spurious clone to sidestep closure move rules
     415            2 :                 let parent_to_children = parent_to_children.clone();
     416              : 
     417              :                 // Insert child shards
     418            7 :                 for (parent_shard_id, children) in parent_to_children {
     419            5 :                     let mut parent = crate::schema::tenant_shards::table
     420            5 :                         .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
     421            5 :                         .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
     422            5 :                         .filter(shard_count.eq(parent_shard_id.shard_count.0 as i32))
     423            5 :                         .load::<TenantShardPersistence>(conn)?;
     424            5 :                     let parent = if parent.len() != 1 {
     425            0 :                         return Err(DatabaseError::Logical(format!(
     426            0 :                             "Parent shard {parent_shard_id} not found"
     427            0 :                         )));
     428              :                     } else {
     429            5 :                         parent.pop().unwrap()
     430              :                     };
     431           15 :                     for mut shard in children {
     432              :                         // Carry the parent's generation into the child
     433           10 :                         shard.generation = parent.generation;
     434              : 
     435           10 :                         debug_assert!(shard.splitting == SplitState::Splitting);
     436           10 :                         diesel::insert_into(tenant_shards)
     437           10 :                             .values(shard)
     438           10 :                             .execute(conn)?;
     439              :                     }
     440              :                 }
     441              : 
     442            2 :                 Ok(())
     443            2 :             })?;
     444              : 
     445            2 :             Ok(())
     446            2 :         })
     447            2 :         .await
     448            2 :     }
     449              : 
     450              :     // When we finish shard splitting, we must atomically clean up the old shards
     451              :     // and insert the new shards, and clear the splitting marker.
     452              :     #[allow(dead_code)]
     453            2 :     pub(crate) async fn complete_shard_split(
     454            2 :         &self,
     455            2 :         split_tenant_id: TenantId,
     456            2 :         old_shard_count: ShardCount,
     457            2 :     ) -> DatabaseResult<()> {
     458            2 :         use crate::schema::tenant_shards::dsl::*;
     459            2 :         self.with_conn(move |conn| -> DatabaseResult<()> {
     460            2 :             conn.transaction(|conn| -> QueryResult<()> {
     461            2 :                 // Drop parent shards
     462            2 :                 diesel::delete(tenant_shards)
     463            2 :                     .filter(tenant_id.eq(split_tenant_id.to_string()))
     464            2 :                     .filter(shard_count.eq(old_shard_count.0 as i32))
     465            2 :                     .execute(conn)?;
     466              : 
     467              :                 // Clear sharding flag
     468            2 :                 let updated = diesel::update(tenant_shards)
     469            2 :                     .filter(tenant_id.eq(split_tenant_id.to_string()))
     470            2 :                     .set((splitting.eq(0),))
     471            2 :                     .execute(conn)?;
     472            2 :                 debug_assert!(updated > 0);
     473              : 
     474            2 :                 Ok(())
     475            2 :             })?;
     476              : 
     477            2 :             Ok(())
     478            2 :         })
     479            2 :         .await
     480            2 :     }
     481              : }
     482              : 
     483              : /// Parts of [`crate::tenant_state::TenantState`] that are stored durably
     484         1465 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone)]
     485              : #[diesel(table_name = crate::schema::tenant_shards)]
     486              : pub(crate) struct TenantShardPersistence {
     487              :     #[serde(default)]
     488              :     pub(crate) tenant_id: String,
     489              :     #[serde(default)]
     490              :     pub(crate) shard_number: i32,
     491              :     #[serde(default)]
     492              :     pub(crate) shard_count: i32,
     493              :     #[serde(default)]
     494              :     pub(crate) shard_stripe_size: i32,
     495              : 
     496              :     // Latest generation number: next time we attach, increment this
     497              :     // and use the incremented number when attaching
     498              :     pub(crate) generation: i32,
     499              : 
     500              :     // Currently attached pageserver
     501              :     #[serde(rename = "pageserver")]
     502              :     pub(crate) generation_pageserver: i64,
     503              : 
     504              :     #[serde(default)]
     505              :     pub(crate) placement_policy: String,
     506              :     #[serde(default)]
     507              :     pub(crate) splitting: SplitState,
     508              :     #[serde(default)]
     509              :     pub(crate) config: String,
     510              : }
     511              : 
     512              : /// Parts of [`crate::node::Node`] that are stored durably
     513          400 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
     514              : #[diesel(table_name = crate::schema::nodes)]
     515              : pub(crate) struct NodePersistence {
     516              :     pub(crate) node_id: i64,
     517              :     pub(crate) scheduling_policy: String,
     518              :     pub(crate) listen_http_addr: String,
     519              :     pub(crate) listen_http_port: i32,
     520              :     pub(crate) listen_pg_addr: String,
     521              :     pub(crate) listen_pg_port: i32,
     522              : }
        

Generated by: LCOV version 2.1-beta