LCOV - code coverage report
Current view: top level - storage_controller/src - hadron_queries.rs (source / functions) Coverage Total Hit
Test: 950df2668cb713840d5f6df1a3b961d557e55aff.info Lines: 0.0 % 290 0
Test Date: 2025-07-30 16:37:54 Functions: 0.0 % 38 0

            Line data    Source code
       1              : #![allow(dead_code, unused)]
       2              : 
       3              : use std::collections::{HashMap, HashSet};
       4              : 
       5              : use diesel::Queryable;
       6              : use diesel::dsl::min;
       7              : use diesel::prelude::*;
       8              : use diesel_async::AsyncConnection;
       9              : use diesel_async::AsyncPgConnection;
      10              : use diesel_async::RunQueryDsl;
      11              : use itertools::Itertools;
      12              : use pageserver_api::controller_api::SCSafekeeperTimelinesResponse;
      13              : use scoped_futures::ScopedFutureExt;
      14              : use serde::{Deserialize, Serialize};
      15              : use utils::id::{NodeId, TenantId, TimelineId};
      16              : use uuid::Uuid;
      17              : 
      18              : use crate::hadron_dns::NodeType;
      19              : use crate::hadron_requests::NodeConnectionInfo;
      20              : use crate::persistence::{DatabaseError, DatabaseResult};
      21              : use crate::schema::{hadron_safekeepers, nodes};
      22              : use crate::sk_node::SafeKeeperNode;
      23              : use std::str::FromStr;
      24              : 
      25              : // The Safe Keeper node database representation (for Diesel).
      26              : #[derive(
      27            0 :     Clone, Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, AsChangeset,
      28              : )]
      29              : #[diesel(table_name = crate::schema::hadron_safekeepers)]
      30              : pub(crate) struct HadronSafekeeperRow {
      31              :     pub(crate) sk_node_id: i64,
      32              :     pub(crate) listen_http_addr: String,
      33              :     pub(crate) listen_http_port: i32,
      34              :     pub(crate) listen_pg_addr: String,
      35              :     pub(crate) listen_pg_port: i32,
      36              : }
      37              : 
      38              : #[derive(
      39            0 :     Clone, Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, AsChangeset,
      40              : )]
      41              : #[diesel(table_name = crate::schema::hadron_timeline_safekeepers)]
      42              : pub(crate) struct HadronTimelineSafekeeper {
      43              :     pub(crate) timeline_id: String,
      44              :     pub(crate) sk_node_id: i64,
      45              :     pub(crate) legacy_endpoint_id: Option<Uuid>,
      46              : }
      47              : 
      48            0 : pub async fn execute_sk_upsert(
      49            0 :     conn: &mut AsyncPgConnection,
      50            0 :     sk_row: HadronSafekeeperRow,
      51            0 : ) -> DatabaseResult<()> {
      52              :     // SQL:
      53              :     // INSERT INTO hadron_safekeepers (sk_node_id, listen_http_addr, listen_http_port, listen_pg_addr, listen_pg_port)
      54              :     // VALUES ($1, $2, $3, $4, $5)
      55              :     // ON CONFLICT (sk_node_id)
      56              :     // DO UPDATE SET listen_http_addr = $2, listen_http_port = $3, listen_pg_addr = $4, listen_pg_port = $5;
      57              : 
      58              :     use crate::schema::hadron_safekeepers::dsl::*;
      59              : 
      60            0 :     diesel::insert_into(hadron_safekeepers)
      61            0 :         .values(&sk_row)
      62            0 :         .on_conflict(sk_node_id)
      63            0 :         .do_update()
      64            0 :         .set(&sk_row)
      65            0 :         .execute(conn)
      66            0 :         .await?;
      67              : 
      68            0 :     Ok(())
      69            0 : }
      70              : 
      71              : // Load all safekeeper nodes and their associated timelines from the meta PG. This query is supposed
      72              : // to run only once on HCC startup and is used to construct the SafeKeeperScheduler state. Performs
      73              : // scans of the hadron_safekeepers and hadron_timeline_safekeepers tables.
      74            0 : pub async fn scan_safekeepers_and_scheduled_timelines(
      75            0 :     conn: &mut AsyncPgConnection,
      76            0 : ) -> DatabaseResult<HashMap<NodeId, SafeKeeperNode>> {
      77              :     use crate::schema::hadron_safekeepers;
      78              :     use crate::schema::hadron_timeline_safekeepers;
      79              : 
      80              :     // We first scan the hadron_safekeepers table to constuct the SafeKeeperNode objects. We don't know anything about
      81              :     // the timelines scheduled to the safekeepers after this step. We then scan the hadron_timeline_safekeepers table
      82              :     // to populate the data structures in the SafeKeeperNode objects to reflect the timelines scheduled to the safekeepers.
      83            0 :     let mut results: HashMap<NodeId, SafeKeeperNode> = hadron_safekeepers::table
      84            0 :         .select((
      85            0 :             hadron_safekeepers::sk_node_id,
      86            0 :             hadron_safekeepers::listen_http_addr,
      87            0 :             hadron_safekeepers::listen_http_port,
      88            0 :             hadron_safekeepers::listen_pg_addr,
      89            0 :             hadron_safekeepers::listen_pg_port,
      90            0 :         ))
      91            0 :         .load::<HadronSafekeeperRow>(conn)
      92            0 :         .await?
      93            0 :         .into_iter()
      94            0 :         .map(|row| {
      95            0 :             let sk_node = SafeKeeperNode {
      96            0 :                 id: NodeId(row.sk_node_id as u64),
      97            0 :                 listen_http_addr: row.listen_http_addr.clone(),
      98            0 :                 listen_http_port: row.listen_http_port as u16,
      99            0 :                 listen_pg_addr: row.listen_pg_addr.clone(),
     100            0 :                 listen_pg_port: row.listen_pg_port as u16,
     101            0 :                 legacy_endpoints: HashMap::new(),
     102            0 :                 timelines: HashSet::new(),
     103            0 :             };
     104            0 :             (sk_node.id, sk_node)
     105            0 :         })
     106            0 :         .collect();
     107              : 
     108            0 :     let timeline_sk_rows = hadron_timeline_safekeepers::table
     109            0 :         .select((
     110            0 :             hadron_timeline_safekeepers::sk_node_id,
     111            0 :             hadron_timeline_safekeepers::timeline_id,
     112            0 :             hadron_timeline_safekeepers::legacy_endpoint_id,
     113            0 :         ))
     114            0 :         .load::<(i64, String, Option<Uuid>)>(conn)
     115            0 :         .await?;
     116            0 :     for (sk_node_id, timeline_id, legacy_endpoint_id) in timeline_sk_rows {
     117            0 :         if let Some(sk_node) = results.get_mut(&NodeId(sk_node_id as u64)) {
     118            0 :             let parsed_timeline_id =
     119            0 :                 TimelineId::from_str(&timeline_id).map_err(|e: hex::FromHexError| {
     120            0 :                     DatabaseError::Logical(format!("Failed to parse timeline IDs: {e}"))
     121            0 :                 })?;
     122            0 :             sk_node.timelines.insert(parsed_timeline_id);
     123            0 :             if let Some(legacy_endpoint_id) = legacy_endpoint_id {
     124            0 :                 sk_node
     125            0 :                     .legacy_endpoints
     126            0 :                     .insert(legacy_endpoint_id, parsed_timeline_id);
     127            0 :             }
     128            0 :         }
     129              :     }
     130              : 
     131            0 :     Ok(results)
     132            0 : }
     133              : 
     134              : // Queries the hadron_timeline_safekeepers table to get the safekeepers assigned to the passed
     135              : // timeline. If none are found, persists the input proposed safekeepers to the table and returns
     136              : // them.
     137            0 : pub async fn idempotently_persist_or_get_existing_timeline_safekeepers(
     138            0 :     conn: &mut AsyncPgConnection,
     139            0 :     timeline_id: TimelineId,
     140            0 :     safekeepers: &[NodeId],
     141            0 : ) -> DatabaseResult<Vec<NodeId>> {
     142              :     use crate::schema::hadron_timeline_safekeepers;
     143              :     // Confirm and persist the timeline-safekeeper mapping. If there are existing safekeepers
     144              :     // assigned to the timeline in the database, treat those as the source of truth.
     145            0 :     let existing_safekeepers: Vec<i64> = hadron_timeline_safekeepers::table
     146            0 :         .select(hadron_timeline_safekeepers::sk_node_id)
     147            0 :         .filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
     148            0 :         .load::<i64>(conn)
     149            0 :         .await?;
     150            0 :     let confirmed_safekeepers: Vec<NodeId> = if existing_safekeepers.is_empty() {
     151            0 :         let proposed_safekeeper_endpoint_rows_result: Result<Vec<HadronTimelineSafekeeper>, _> =
     152            0 :             safekeepers
     153            0 :                 .iter()
     154            0 :                 .map(|sk_node_id| {
     155            0 :                     i64::try_from(sk_node_id.0).map(|sk_node_id| HadronTimelineSafekeeper {
     156            0 :                         timeline_id: timeline_id.to_string(),
     157            0 :                         sk_node_id,
     158            0 :                         legacy_endpoint_id: None,
     159            0 :                     })
     160            0 :                 })
     161            0 :                 .collect();
     162              : 
     163            0 :         let proposed_safekeeper_endpoint_rows =
     164            0 :             proposed_safekeeper_endpoint_rows_result.map_err(|e| {
     165            0 :                 DatabaseError::Logical(format!("Failed to convert safekeeper IDs: {e}"))
     166            0 :             })?;
     167              : 
     168            0 :         diesel::insert_into(hadron_timeline_safekeepers::table)
     169            0 :             .values(&proposed_safekeeper_endpoint_rows)
     170            0 :             .execute(conn)
     171            0 :             .await?;
     172            0 :         safekeepers.to_owned()
     173              :     } else {
     174            0 :         let safekeeper_result: Result<Vec<NodeId>, _> = existing_safekeepers
     175            0 :             .into_iter()
     176            0 :             .map(|arg0: i64| u64::try_from(arg0).map(NodeId))
     177            0 :             .collect();
     178              : 
     179            0 :         safekeeper_result
     180            0 :             .map_err(|e| DatabaseError::Logical(format!("Failed to convert safekeeper IDs: {e}")))?
     181              :     };
     182              : 
     183            0 :     Ok(confirmed_safekeepers)
     184            0 : }
     185              : 
     186            0 : pub async fn delete_timeline_safekeepers(
     187            0 :     conn: &mut AsyncPgConnection,
     188            0 :     timeline_id: TimelineId,
     189            0 : ) -> DatabaseResult<()> {
     190              :     use crate::schema::hadron_timeline_safekeepers;
     191              : 
     192            0 :     diesel::delete(hadron_timeline_safekeepers::table)
     193            0 :         .filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
     194            0 :         .execute(conn)
     195            0 :         .await?;
     196              : 
     197            0 :     Ok(())
     198            0 : }
     199              : 
     200            0 : pub(crate) async fn execute_safekeeper_list_timelines(
     201            0 :     conn: &mut AsyncPgConnection,
     202            0 :     safekeeper_id: i64,
     203            0 : ) -> DatabaseResult<SCSafekeeperTimelinesResponse> {
     204              :     use crate::schema::hadron_timeline_safekeepers;
     205              :     use pageserver_api::controller_api::SCSafekeeperTimelinesResponse;
     206              : 
     207            0 :     conn.transaction(|conn| {
     208            0 :         async move {
     209            0 :             let mut sk_timelines = SCSafekeeperTimelinesResponse {
     210            0 :                 timelines: Vec::new(),
     211            0 :                 safekeeper_peers: Vec::new(),
     212            0 :             };
     213              : 
     214              :             // Find all timelines <String>
     215            0 :             let timeline_ids = hadron_timeline_safekeepers::table
     216            0 :                 .select(hadron_timeline_safekeepers::timeline_id)
     217            0 :                 .filter(hadron_timeline_safekeepers::sk_node_id.eq(safekeeper_id))
     218            0 :                 .load::<String>(conn)
     219            0 :                 .await
     220            0 :                 .into_iter()
     221            0 :                 .flatten()
     222            0 :                 .collect_vec();
     223              : 
     224              :             // Find the peers for each timeline. <timeline_id, sk_node_id>
     225            0 :             let timeline_peers = hadron_timeline_safekeepers::table
     226            0 :                 .select((
     227            0 :                     hadron_timeline_safekeepers::timeline_id,
     228            0 :                     hadron_timeline_safekeepers::sk_node_id,
     229            0 :                 ))
     230            0 :                 .filter(hadron_timeline_safekeepers::timeline_id.eq_any(&timeline_ids))
     231            0 :                 .load::<(String, i64)>(conn)
     232            0 :                 .await
     233            0 :                 .into_iter()
     234            0 :                 .flatten()
     235            0 :                 .collect_vec();
     236              : 
     237            0 :             let mut timeline_peers_map = HashMap::new();
     238            0 :             let mut seen = HashSet::new();
     239            0 :             let mut unique_sks = Vec::new();
     240              : 
     241            0 :             for (timeline_id, sk_node_id) in timeline_peers {
     242            0 :                 timeline_peers_map
     243            0 :                     .entry(timeline_id)
     244            0 :                     .or_insert_with(Vec::new)
     245            0 :                     .push(sk_node_id);
     246            0 :                 if seen.insert(sk_node_id) {
     247            0 :                     unique_sks.push(sk_node_id);
     248            0 :                 }
     249              :             }
     250              : 
     251              :             // Find SK info.
     252            0 :             let mut found_sk_nodes = HashSet::new();
     253            0 :             hadron_safekeepers::table
     254            0 :                 .select((
     255            0 :                     hadron_safekeepers::sk_node_id,
     256            0 :                     hadron_safekeepers::listen_http_addr,
     257            0 :                     hadron_safekeepers::listen_http_port,
     258            0 :                 ))
     259            0 :                 .filter(hadron_safekeepers::sk_node_id.eq_any(&unique_sks))
     260            0 :                 .load::<(i64, String, i32)>(conn)
     261            0 :                 .await
     262            0 :                 .into_iter()
     263            0 :                 .flatten()
     264            0 :                 .for_each(|(sk_node_id, listen_http_addr, http_port)| {
     265            0 :                     found_sk_nodes.insert(sk_node_id);
     266              : 
     267            0 :                     sk_timelines.safekeeper_peers.push(
     268            0 :                         pageserver_api::controller_api::TimelineSafekeeperPeer {
     269            0 :                             node_id: utils::id::NodeId(sk_node_id as u64),
     270            0 :                             listen_http_addr,
     271            0 :                             http_port,
     272            0 :                         },
     273              :                     );
     274            0 :                 });
     275              : 
     276              :             // Prepare timeline response.
     277            0 :             for timeline_id in timeline_ids {
     278            0 :                 if !timeline_peers_map.contains_key(&timeline_id) {
     279            0 :                     continue;
     280            0 :                 }
     281            0 :                 let peers = timeline_peers_map.get(&timeline_id).unwrap();
     282              :                 // Check peers exist.
     283            0 :                 if !peers
     284            0 :                     .iter()
     285            0 :                     .all(|sk_node_id| found_sk_nodes.contains(sk_node_id))
     286              :                 {
     287            0 :                     continue;
     288            0 :                 }
     289              : 
     290            0 :                 let timeline = pageserver_api::controller_api::SCSafekeeperTimeline {
     291            0 :                     timeline_id: TimelineId::from_str(&timeline_id).unwrap(),
     292            0 :                     peers: peers
     293            0 :                         .iter()
     294            0 :                         .map(|sk_node_id| utils::id::NodeId(*sk_node_id as u64))
     295            0 :                         .collect(),
     296              :                 };
     297            0 :                 sk_timelines.timelines.push(timeline);
     298              :             }
     299              : 
     300            0 :             Ok(sk_timelines)
     301            0 :         }
     302            0 :         .scope_boxed()
     303            0 :     })
     304            0 :     .await
     305            0 : }
     306              : 
     307              : /// Stores details about connecting to pageserver and safekeeper nodes for a given tenant and
     308              : /// timeline.
     309              : pub struct PageserverAndSafekeeperConnectionInfo {
     310              :     pub pageserver_conn_info: Vec<NodeConnectionInfo>,
     311              :     pub safekeeper_conn_info: Vec<NodeConnectionInfo>,
     312              : }
     313              : 
     314              : /// Retrieves the connection information for the pageserver and safekeepers associated with the
     315              : /// given tenant and timeline.
     316            0 : pub async fn get_pageserver_and_safekeeper_connection_info(
     317            0 :     conn: &mut AsyncPgConnection,
     318            0 :     tenant_id: TenantId,
     319            0 :     timeline_id: TimelineId,
     320            0 : ) -> DatabaseResult<PageserverAndSafekeeperConnectionInfo> {
     321            0 :     conn.transaction(|conn| {
     322            0 :         async move {
     323              :             // Fetch details about pageserver, which is associated with the input tenant.
     324            0 :             let pageserver_conn_info =
     325            0 :                 get_pageserver_connection_info(conn, &tenant_id.to_string()).await?;
     326              : 
     327              :             // Fetch details about safekeepers, which are associated with the input timeline.
     328            0 :             let safekeeper_conn_info =
     329            0 :                 get_safekeeper_connection_info(conn, &timeline_id.to_string()).await?;
     330              : 
     331            0 :             Ok(PageserverAndSafekeeperConnectionInfo {
     332            0 :                 pageserver_conn_info,
     333            0 :                 safekeeper_conn_info,
     334            0 :             })
     335            0 :         }
     336            0 :         .scope_boxed()
     337            0 :     })
     338            0 :     .await
     339            0 : }
     340              : 
     341            0 : async fn get_safekeeper_connection_info(
     342            0 :     conn: &mut AsyncPgConnection,
     343            0 :     timeline_id: &str,
     344            0 : ) -> DatabaseResult<Vec<NodeConnectionInfo>> {
     345              :     use crate::schema::hadron_safekeepers;
     346              :     use crate::schema::hadron_timeline_safekeepers;
     347              : 
     348            0 :     Ok(hadron_timeline_safekeepers::table
     349            0 :         .inner_join(
     350            0 :             hadron_safekeepers::table
     351            0 :                 .on(hadron_timeline_safekeepers::sk_node_id.eq(hadron_safekeepers::sk_node_id)),
     352            0 :         )
     353            0 :         .select((
     354            0 :             hadron_safekeepers::sk_node_id,
     355            0 :             hadron_safekeepers::listen_pg_addr,
     356            0 :             hadron_safekeepers::listen_pg_port,
     357            0 :         ))
     358            0 :         .filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
     359            0 :         .load::<(i64, String, i32)>(conn)
     360            0 :         .await?
     361            0 :         .into_iter()
     362            0 :         .map(|(node_id, addr, port)| {
     363            0 :             NodeConnectionInfo::new(
     364            0 :                 NodeType::Safekeeper,
     365            0 :                 NodeId(node_id as u64),
     366            0 :                 addr,
     367            0 :                 port as u16,
     368              :             )
     369            0 :         })
     370            0 :         .collect())
     371            0 : }
     372              : 
     373            0 : async fn get_pageserver_connection_info(
     374            0 :     conn: &mut AsyncPgConnection,
     375            0 :     tenant_id: &str,
     376            0 : ) -> DatabaseResult<Vec<NodeConnectionInfo>> {
     377              :     use crate::schema::tenant_shards;
     378              : 
     379              :     // When the tenant is being split, it'll contain both old shards and new shards. Until the tenant split is committed,
     380              :     // we should always use the old shards.
     381              :     // NOTE: we only support tenant split without tennat merge. Thus shard count could only increase.
     382            0 :     let min_shard_count = match tenant_shards::table
     383            0 :         .select(min(tenant_shards::shard_count))
     384            0 :         .filter(tenant_shards::tenant_id.eq(tenant_id))
     385            0 :         .first::<Option<i32>>(conn)
     386            0 :         .await
     387            0 :         .optional()?
     388              :     {
     389            0 :         Some(Some(count)) => count,
     390              :         Some(None) => {
     391              :             // Tenant doesn't exist. It's possible that it was deleted before we got the request.
     392            0 :             return Ok(vec![]);
     393              :         }
     394              :         None => {
     395              :             // This is never supposed to happen because `SELECT min()` should always return one row.
     396            0 :             return Err(DatabaseError::Logical(format!(
     397            0 :                 "Unexpected empty query result for min(shard_count) query. Tenant ID {tenant_id}"
     398            0 :             )));
     399              :         }
     400              :     };
     401              : 
     402            0 :     let shards: Vec<NodeConnectionInfo> = nodes::table
     403            0 :         .inner_join(
     404            0 :             tenant_shards::table.on(nodes::node_id
     405            0 :                 .nullable()
     406            0 :                 .eq(tenant_shards::generation_pageserver)),
     407            0 :         )
     408            0 :         .select((nodes::node_id, nodes::listen_pg_addr, nodes::listen_pg_port))
     409            0 :         .filter(tenant_shards::tenant_id.eq(&tenant_id.to_string()))
     410            0 :         .order(tenant_shards::shard_number.asc())
     411            0 :         .filter(tenant_shards::shard_count.eq(min_shard_count))
     412            0 :         .load::<(i64, String, i32)>(conn)
     413            0 :         .await?
     414            0 :         .into_iter()
     415            0 :         .map(|(node_id, addr, port)| {
     416            0 :             NodeConnectionInfo::new(
     417            0 :                 NodeType::Pageserver,
     418            0 :                 NodeId(node_id as u64),
     419            0 :                 addr,
     420            0 :                 port as u16,
     421              :             )
     422            0 :         })
     423            0 :         .collect();
     424              : 
     425            0 :     if !shards.is_empty() && !shards.len().is_power_of_two() {
     426            0 :         return Err(DatabaseError::Logical(format!(
     427            0 :             "Tenant {} has unexpected shard count {} (not a power of 2)",
     428            0 :             tenant_id,
     429            0 :             shards.len()
     430            0 :         )));
     431            0 :     }
     432            0 :     Ok(shards)
     433            0 : }
        

Generated by: LCOV version 2.1-beta