Line data Source code
1 : #![allow(dead_code, unused)]
2 :
3 : use std::collections::{HashMap, HashSet};
4 :
5 : use diesel::Queryable;
6 : use diesel::dsl::min;
7 : use diesel::prelude::*;
8 : use diesel_async::AsyncConnection;
9 : use diesel_async::AsyncPgConnection;
10 : use diesel_async::RunQueryDsl;
11 : use itertools::Itertools;
12 : use pageserver_api::controller_api::SCSafekeeperTimelinesResponse;
13 : use scoped_futures::ScopedFutureExt;
14 : use serde::{Deserialize, Serialize};
15 : use utils::id::{NodeId, TenantId, TimelineId};
16 : use uuid::Uuid;
17 :
18 : use crate::hadron_dns::NodeType;
19 : use crate::hadron_requests::NodeConnectionInfo;
20 : use crate::persistence::{DatabaseError, DatabaseResult};
21 : use crate::schema::{hadron_safekeepers, nodes};
22 : use crate::sk_node::SafeKeeperNode;
23 : use std::str::FromStr;
24 :
25 : // The Safe Keeper node database representation (for Diesel).
26 : #[derive(
27 0 : Clone, Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, AsChangeset,
28 : )]
29 : #[diesel(table_name = crate::schema::hadron_safekeepers)]
30 : pub(crate) struct HadronSafekeeperRow {
31 : pub(crate) sk_node_id: i64,
32 : pub(crate) listen_http_addr: String,
33 : pub(crate) listen_http_port: i32,
34 : pub(crate) listen_pg_addr: String,
35 : pub(crate) listen_pg_port: i32,
36 : }
37 :
38 : #[derive(
39 0 : Clone, Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, AsChangeset,
40 : )]
41 : #[diesel(table_name = crate::schema::hadron_timeline_safekeepers)]
42 : pub(crate) struct HadronTimelineSafekeeper {
43 : pub(crate) timeline_id: String,
44 : pub(crate) sk_node_id: i64,
45 : pub(crate) legacy_endpoint_id: Option<Uuid>,
46 : }
47 :
48 0 : pub async fn execute_sk_upsert(
49 0 : conn: &mut AsyncPgConnection,
50 0 : sk_row: HadronSafekeeperRow,
51 0 : ) -> DatabaseResult<()> {
52 : // SQL:
53 : // INSERT INTO hadron_safekeepers (sk_node_id, listen_http_addr, listen_http_port, listen_pg_addr, listen_pg_port)
54 : // VALUES ($1, $2, $3, $4, $5)
55 : // ON CONFLICT (sk_node_id)
56 : // DO UPDATE SET listen_http_addr = $2, listen_http_port = $3, listen_pg_addr = $4, listen_pg_port = $5;
57 :
58 : use crate::schema::hadron_safekeepers::dsl::*;
59 :
60 0 : diesel::insert_into(hadron_safekeepers)
61 0 : .values(&sk_row)
62 0 : .on_conflict(sk_node_id)
63 0 : .do_update()
64 0 : .set(&sk_row)
65 0 : .execute(conn)
66 0 : .await?;
67 :
68 0 : Ok(())
69 0 : }
70 :
71 : // Load all safekeeper nodes and their associated timelines from the meta PG. This query is supposed
72 : // to run only once on HCC startup and is used to construct the SafeKeeperScheduler state. Performs
73 : // scans of the hadron_safekeepers and hadron_timeline_safekeepers tables.
74 0 : pub async fn scan_safekeepers_and_scheduled_timelines(
75 0 : conn: &mut AsyncPgConnection,
76 0 : ) -> DatabaseResult<HashMap<NodeId, SafeKeeperNode>> {
77 : use crate::schema::hadron_safekeepers;
78 : use crate::schema::hadron_timeline_safekeepers;
79 :
80 : // We first scan the hadron_safekeepers table to constuct the SafeKeeperNode objects. We don't know anything about
81 : // the timelines scheduled to the safekeepers after this step. We then scan the hadron_timeline_safekeepers table
82 : // to populate the data structures in the SafeKeeperNode objects to reflect the timelines scheduled to the safekeepers.
83 0 : let mut results: HashMap<NodeId, SafeKeeperNode> = hadron_safekeepers::table
84 0 : .select((
85 0 : hadron_safekeepers::sk_node_id,
86 0 : hadron_safekeepers::listen_http_addr,
87 0 : hadron_safekeepers::listen_http_port,
88 0 : hadron_safekeepers::listen_pg_addr,
89 0 : hadron_safekeepers::listen_pg_port,
90 0 : ))
91 0 : .load::<HadronSafekeeperRow>(conn)
92 0 : .await?
93 0 : .into_iter()
94 0 : .map(|row| {
95 0 : let sk_node = SafeKeeperNode {
96 0 : id: NodeId(row.sk_node_id as u64),
97 0 : listen_http_addr: row.listen_http_addr.clone(),
98 0 : listen_http_port: row.listen_http_port as u16,
99 0 : listen_pg_addr: row.listen_pg_addr.clone(),
100 0 : listen_pg_port: row.listen_pg_port as u16,
101 0 : legacy_endpoints: HashMap::new(),
102 0 : timelines: HashSet::new(),
103 0 : };
104 0 : (sk_node.id, sk_node)
105 0 : })
106 0 : .collect();
107 :
108 0 : let timeline_sk_rows = hadron_timeline_safekeepers::table
109 0 : .select((
110 0 : hadron_timeline_safekeepers::sk_node_id,
111 0 : hadron_timeline_safekeepers::timeline_id,
112 0 : hadron_timeline_safekeepers::legacy_endpoint_id,
113 0 : ))
114 0 : .load::<(i64, String, Option<Uuid>)>(conn)
115 0 : .await?;
116 0 : for (sk_node_id, timeline_id, legacy_endpoint_id) in timeline_sk_rows {
117 0 : if let Some(sk_node) = results.get_mut(&NodeId(sk_node_id as u64)) {
118 0 : let parsed_timeline_id =
119 0 : TimelineId::from_str(&timeline_id).map_err(|e: hex::FromHexError| {
120 0 : DatabaseError::Logical(format!("Failed to parse timeline IDs: {e}"))
121 0 : })?;
122 0 : sk_node.timelines.insert(parsed_timeline_id);
123 0 : if let Some(legacy_endpoint_id) = legacy_endpoint_id {
124 0 : sk_node
125 0 : .legacy_endpoints
126 0 : .insert(legacy_endpoint_id, parsed_timeline_id);
127 0 : }
128 0 : }
129 : }
130 :
131 0 : Ok(results)
132 0 : }
133 :
134 : // Queries the hadron_timeline_safekeepers table to get the safekeepers assigned to the passed
135 : // timeline. If none are found, persists the input proposed safekeepers to the table and returns
136 : // them.
137 0 : pub async fn idempotently_persist_or_get_existing_timeline_safekeepers(
138 0 : conn: &mut AsyncPgConnection,
139 0 : timeline_id: TimelineId,
140 0 : safekeepers: &[NodeId],
141 0 : ) -> DatabaseResult<Vec<NodeId>> {
142 : use crate::schema::hadron_timeline_safekeepers;
143 : // Confirm and persist the timeline-safekeeper mapping. If there are existing safekeepers
144 : // assigned to the timeline in the database, treat those as the source of truth.
145 0 : let existing_safekeepers: Vec<i64> = hadron_timeline_safekeepers::table
146 0 : .select(hadron_timeline_safekeepers::sk_node_id)
147 0 : .filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
148 0 : .load::<i64>(conn)
149 0 : .await?;
150 0 : let confirmed_safekeepers: Vec<NodeId> = if existing_safekeepers.is_empty() {
151 0 : let proposed_safekeeper_endpoint_rows_result: Result<Vec<HadronTimelineSafekeeper>, _> =
152 0 : safekeepers
153 0 : .iter()
154 0 : .map(|sk_node_id| {
155 0 : i64::try_from(sk_node_id.0).map(|sk_node_id| HadronTimelineSafekeeper {
156 0 : timeline_id: timeline_id.to_string(),
157 0 : sk_node_id,
158 0 : legacy_endpoint_id: None,
159 0 : })
160 0 : })
161 0 : .collect();
162 :
163 0 : let proposed_safekeeper_endpoint_rows =
164 0 : proposed_safekeeper_endpoint_rows_result.map_err(|e| {
165 0 : DatabaseError::Logical(format!("Failed to convert safekeeper IDs: {e}"))
166 0 : })?;
167 :
168 0 : diesel::insert_into(hadron_timeline_safekeepers::table)
169 0 : .values(&proposed_safekeeper_endpoint_rows)
170 0 : .execute(conn)
171 0 : .await?;
172 0 : safekeepers.to_owned()
173 : } else {
174 0 : let safekeeper_result: Result<Vec<NodeId>, _> = existing_safekeepers
175 0 : .into_iter()
176 0 : .map(|arg0: i64| u64::try_from(arg0).map(NodeId))
177 0 : .collect();
178 :
179 0 : safekeeper_result
180 0 : .map_err(|e| DatabaseError::Logical(format!("Failed to convert safekeeper IDs: {e}")))?
181 : };
182 :
183 0 : Ok(confirmed_safekeepers)
184 0 : }
185 :
186 0 : pub async fn delete_timeline_safekeepers(
187 0 : conn: &mut AsyncPgConnection,
188 0 : timeline_id: TimelineId,
189 0 : ) -> DatabaseResult<()> {
190 : use crate::schema::hadron_timeline_safekeepers;
191 :
192 0 : diesel::delete(hadron_timeline_safekeepers::table)
193 0 : .filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
194 0 : .execute(conn)
195 0 : .await?;
196 :
197 0 : Ok(())
198 0 : }
199 :
200 0 : pub(crate) async fn execute_safekeeper_list_timelines(
201 0 : conn: &mut AsyncPgConnection,
202 0 : safekeeper_id: i64,
203 0 : ) -> DatabaseResult<SCSafekeeperTimelinesResponse> {
204 : use crate::schema::hadron_timeline_safekeepers;
205 : use pageserver_api::controller_api::SCSafekeeperTimelinesResponse;
206 :
207 0 : conn.transaction(|conn| {
208 0 : async move {
209 0 : let mut sk_timelines = SCSafekeeperTimelinesResponse {
210 0 : timelines: Vec::new(),
211 0 : safekeeper_peers: Vec::new(),
212 0 : };
213 :
214 : // Find all timelines <String>
215 0 : let timeline_ids = hadron_timeline_safekeepers::table
216 0 : .select(hadron_timeline_safekeepers::timeline_id)
217 0 : .filter(hadron_timeline_safekeepers::sk_node_id.eq(safekeeper_id))
218 0 : .load::<String>(conn)
219 0 : .await
220 0 : .into_iter()
221 0 : .flatten()
222 0 : .collect_vec();
223 :
224 : // Find the peers for each timeline. <timeline_id, sk_node_id>
225 0 : let timeline_peers = hadron_timeline_safekeepers::table
226 0 : .select((
227 0 : hadron_timeline_safekeepers::timeline_id,
228 0 : hadron_timeline_safekeepers::sk_node_id,
229 0 : ))
230 0 : .filter(hadron_timeline_safekeepers::timeline_id.eq_any(&timeline_ids))
231 0 : .load::<(String, i64)>(conn)
232 0 : .await
233 0 : .into_iter()
234 0 : .flatten()
235 0 : .collect_vec();
236 :
237 0 : let mut timeline_peers_map = HashMap::new();
238 0 : let mut seen = HashSet::new();
239 0 : let mut unique_sks = Vec::new();
240 :
241 0 : for (timeline_id, sk_node_id) in timeline_peers {
242 0 : timeline_peers_map
243 0 : .entry(timeline_id)
244 0 : .or_insert_with(Vec::new)
245 0 : .push(sk_node_id);
246 0 : if seen.insert(sk_node_id) {
247 0 : unique_sks.push(sk_node_id);
248 0 : }
249 : }
250 :
251 : // Find SK info.
252 0 : let mut found_sk_nodes = HashSet::new();
253 0 : hadron_safekeepers::table
254 0 : .select((
255 0 : hadron_safekeepers::sk_node_id,
256 0 : hadron_safekeepers::listen_http_addr,
257 0 : hadron_safekeepers::listen_http_port,
258 0 : ))
259 0 : .filter(hadron_safekeepers::sk_node_id.eq_any(&unique_sks))
260 0 : .load::<(i64, String, i32)>(conn)
261 0 : .await
262 0 : .into_iter()
263 0 : .flatten()
264 0 : .for_each(|(sk_node_id, listen_http_addr, http_port)| {
265 0 : found_sk_nodes.insert(sk_node_id);
266 :
267 0 : sk_timelines.safekeeper_peers.push(
268 0 : pageserver_api::controller_api::TimelineSafekeeperPeer {
269 0 : node_id: utils::id::NodeId(sk_node_id as u64),
270 0 : listen_http_addr,
271 0 : http_port,
272 0 : },
273 : );
274 0 : });
275 :
276 : // Prepare timeline response.
277 0 : for timeline_id in timeline_ids {
278 0 : if !timeline_peers_map.contains_key(&timeline_id) {
279 0 : continue;
280 0 : }
281 0 : let peers = timeline_peers_map.get(&timeline_id).unwrap();
282 : // Check peers exist.
283 0 : if !peers
284 0 : .iter()
285 0 : .all(|sk_node_id| found_sk_nodes.contains(sk_node_id))
286 : {
287 0 : continue;
288 0 : }
289 :
290 0 : let timeline = pageserver_api::controller_api::SCSafekeeperTimeline {
291 0 : timeline_id: TimelineId::from_str(&timeline_id).unwrap(),
292 0 : peers: peers
293 0 : .iter()
294 0 : .map(|sk_node_id| utils::id::NodeId(*sk_node_id as u64))
295 0 : .collect(),
296 : };
297 0 : sk_timelines.timelines.push(timeline);
298 : }
299 :
300 0 : Ok(sk_timelines)
301 0 : }
302 0 : .scope_boxed()
303 0 : })
304 0 : .await
305 0 : }
306 :
307 : /// Stores details about connecting to pageserver and safekeeper nodes for a given tenant and
308 : /// timeline.
309 : pub struct PageserverAndSafekeeperConnectionInfo {
310 : pub pageserver_conn_info: Vec<NodeConnectionInfo>,
311 : pub safekeeper_conn_info: Vec<NodeConnectionInfo>,
312 : }
313 :
314 : /// Retrieves the connection information for the pageserver and safekeepers associated with the
315 : /// given tenant and timeline.
316 0 : pub async fn get_pageserver_and_safekeeper_connection_info(
317 0 : conn: &mut AsyncPgConnection,
318 0 : tenant_id: TenantId,
319 0 : timeline_id: TimelineId,
320 0 : ) -> DatabaseResult<PageserverAndSafekeeperConnectionInfo> {
321 0 : conn.transaction(|conn| {
322 0 : async move {
323 : // Fetch details about pageserver, which is associated with the input tenant.
324 0 : let pageserver_conn_info =
325 0 : get_pageserver_connection_info(conn, &tenant_id.to_string()).await?;
326 :
327 : // Fetch details about safekeepers, which are associated with the input timeline.
328 0 : let safekeeper_conn_info =
329 0 : get_safekeeper_connection_info(conn, &timeline_id.to_string()).await?;
330 :
331 0 : Ok(PageserverAndSafekeeperConnectionInfo {
332 0 : pageserver_conn_info,
333 0 : safekeeper_conn_info,
334 0 : })
335 0 : }
336 0 : .scope_boxed()
337 0 : })
338 0 : .await
339 0 : }
340 :
341 0 : async fn get_safekeeper_connection_info(
342 0 : conn: &mut AsyncPgConnection,
343 0 : timeline_id: &str,
344 0 : ) -> DatabaseResult<Vec<NodeConnectionInfo>> {
345 : use crate::schema::hadron_safekeepers;
346 : use crate::schema::hadron_timeline_safekeepers;
347 :
348 0 : Ok(hadron_timeline_safekeepers::table
349 0 : .inner_join(
350 0 : hadron_safekeepers::table
351 0 : .on(hadron_timeline_safekeepers::sk_node_id.eq(hadron_safekeepers::sk_node_id)),
352 0 : )
353 0 : .select((
354 0 : hadron_safekeepers::sk_node_id,
355 0 : hadron_safekeepers::listen_pg_addr,
356 0 : hadron_safekeepers::listen_pg_port,
357 0 : ))
358 0 : .filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
359 0 : .load::<(i64, String, i32)>(conn)
360 0 : .await?
361 0 : .into_iter()
362 0 : .map(|(node_id, addr, port)| {
363 0 : NodeConnectionInfo::new(
364 0 : NodeType::Safekeeper,
365 0 : NodeId(node_id as u64),
366 0 : addr,
367 0 : port as u16,
368 : )
369 0 : })
370 0 : .collect())
371 0 : }
372 :
373 0 : async fn get_pageserver_connection_info(
374 0 : conn: &mut AsyncPgConnection,
375 0 : tenant_id: &str,
376 0 : ) -> DatabaseResult<Vec<NodeConnectionInfo>> {
377 : use crate::schema::tenant_shards;
378 :
379 : // When the tenant is being split, it'll contain both old shards and new shards. Until the tenant split is committed,
380 : // we should always use the old shards.
381 : // NOTE: we only support tenant split without tennat merge. Thus shard count could only increase.
382 0 : let min_shard_count = match tenant_shards::table
383 0 : .select(min(tenant_shards::shard_count))
384 0 : .filter(tenant_shards::tenant_id.eq(tenant_id))
385 0 : .first::<Option<i32>>(conn)
386 0 : .await
387 0 : .optional()?
388 : {
389 0 : Some(Some(count)) => count,
390 : Some(None) => {
391 : // Tenant doesn't exist. It's possible that it was deleted before we got the request.
392 0 : return Ok(vec![]);
393 : }
394 : None => {
395 : // This is never supposed to happen because `SELECT min()` should always return one row.
396 0 : return Err(DatabaseError::Logical(format!(
397 0 : "Unexpected empty query result for min(shard_count) query. Tenant ID {tenant_id}"
398 0 : )));
399 : }
400 : };
401 :
402 0 : let shards: Vec<NodeConnectionInfo> = nodes::table
403 0 : .inner_join(
404 0 : tenant_shards::table.on(nodes::node_id
405 0 : .nullable()
406 0 : .eq(tenant_shards::generation_pageserver)),
407 0 : )
408 0 : .select((nodes::node_id, nodes::listen_pg_addr, nodes::listen_pg_port))
409 0 : .filter(tenant_shards::tenant_id.eq(&tenant_id.to_string()))
410 0 : .order(tenant_shards::shard_number.asc())
411 0 : .filter(tenant_shards::shard_count.eq(min_shard_count))
412 0 : .load::<(i64, String, i32)>(conn)
413 0 : .await?
414 0 : .into_iter()
415 0 : .map(|(node_id, addr, port)| {
416 0 : NodeConnectionInfo::new(
417 0 : NodeType::Pageserver,
418 0 : NodeId(node_id as u64),
419 0 : addr,
420 0 : port as u16,
421 : )
422 0 : })
423 0 : .collect();
424 :
425 0 : if !shards.is_empty() && !shards.len().is_power_of_two() {
426 0 : return Err(DatabaseError::Logical(format!(
427 0 : "Tenant {} has unexpected shard count {} (not a power of 2)",
428 0 : tenant_id,
429 0 : shards.len()
430 0 : )));
431 0 : }
432 0 : Ok(shards)
433 0 : }
|