Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 : use std::time::Instant;
6 :
7 : use self::split_state::SplitState;
8 : use diesel::pg::PgConnection;
9 : use diesel::prelude::*;
10 : use diesel::Connection;
11 : use itertools::Itertools;
12 : use pageserver_api::controller_api::AvailabilityZone;
13 : use pageserver_api::controller_api::MetadataHealthRecord;
14 : use pageserver_api::controller_api::SafekeeperDescribeResponse;
15 : use pageserver_api::controller_api::ShardSchedulingPolicy;
16 : use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
17 : use pageserver_api::models::TenantConfig;
18 : use pageserver_api::shard::ShardConfigError;
19 : use pageserver_api::shard::ShardIdentity;
20 : use pageserver_api::shard::ShardStripeSize;
21 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
22 : use serde::{Deserialize, Serialize};
23 : use utils::generation::Generation;
24 : use utils::id::{NodeId, TenantId};
25 :
26 : use crate::metrics::{
27 : DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
28 : };
29 : use crate::node::Node;
30 :
31 : use diesel_migrations::{embed_migrations, EmbeddedMigrations};
32 : const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
33 :
34 : /// ## What do we store?
35 : ///
36 : /// The storage controller service does not store most of its state durably.
37 : ///
38 : /// The essential things to store durably are:
39 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
40 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
41 : /// - Node's scheduling policies, as the source of truth for these is something external.
42 : ///
43 : /// Other things we store durably as an implementation detail:
44 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
45 : /// but it is operationally simpler to make this service the authority for which nodes
46 : /// it talks to.
47 : ///
48 : /// ## Performance/efficiency
49 : ///
50 : /// The storage controller service does not go via the database for most things: there are
51 : /// a couple of places where we must, and where efficiency matters:
52 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
53 : /// before it can attach a tenant, so this acts as a bound on how fast things like
54 : /// failover can happen.
55 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
56 : /// so it is important to avoid e.g. issuing O(N) queries.
57 : ///
58 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
59 : /// updated, and reads of nodes are always from memory, not the database. We only require that
60 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
61 : pub struct Persistence {
62 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
63 : }
64 :
65 : /// Legacy format, for use in JSON compat objects in test environment
66 0 : #[derive(Serialize, Deserialize)]
67 : struct JsonPersistence {
68 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
69 : }
70 :
71 : #[derive(thiserror::Error, Debug)]
72 : pub(crate) enum DatabaseError {
73 : #[error(transparent)]
74 : Query(#[from] diesel::result::Error),
75 : #[error(transparent)]
76 : Connection(#[from] diesel::result::ConnectionError),
77 : #[error(transparent)]
78 : ConnectionPool(#[from] r2d2::Error),
79 : #[error("Logical error: {0}")]
80 : Logical(String),
81 : #[error("Migration error: {0}")]
82 : Migration(String),
83 : }
84 :
85 : #[derive(measured::FixedCardinalityLabel, Copy, Clone)]
86 : pub(crate) enum DatabaseOperation {
87 : InsertNode,
88 : UpdateNode,
89 : DeleteNode,
90 : ListNodes,
91 : BeginShardSplit,
92 : CompleteShardSplit,
93 : AbortShardSplit,
94 : Detach,
95 : ReAttach,
96 : IncrementGeneration,
97 : TenantGenerations,
98 : ShardGenerations,
99 : ListTenantShards,
100 : InsertTenantShards,
101 : UpdateTenantShard,
102 : DeleteTenant,
103 : UpdateTenantConfig,
104 : UpdateMetadataHealth,
105 : ListMetadataHealth,
106 : ListMetadataHealthUnhealthy,
107 : ListMetadataHealthOutdated,
108 : ListSafekeepers,
109 : GetLeader,
110 : UpdateLeader,
111 : SetPreferredAzs,
112 : }
113 :
114 : #[must_use]
115 : pub(crate) enum AbortShardSplitStatus {
116 : /// We aborted the split in the database by reverting to the parent shards
117 : Aborted,
118 : /// The split had already been persisted.
119 : Complete,
120 : }
121 :
122 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
123 :
124 : /// Some methods can operate on either a whole tenant or a single shard
125 : pub(crate) enum TenantFilter {
126 : Tenant(TenantId),
127 : Shard(TenantShardId),
128 : }
129 :
130 : /// Represents the results of looking up generation+pageserver for the shards of a tenant
131 : pub(crate) struct ShardGenerationState {
132 : pub(crate) tenant_shard_id: TenantShardId,
133 : pub(crate) generation: Option<Generation>,
134 : pub(crate) generation_pageserver: Option<NodeId>,
135 : }
136 :
137 : impl Persistence {
138 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
139 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
140 : pub const MAX_CONNECTIONS: u32 = 99;
141 :
142 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
143 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
144 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
145 :
146 0 : pub fn new(database_url: String) -> Self {
147 0 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
148 0 :
149 0 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
150 0 : // to execute queries (database queries are not generally on latency-sensitive paths).
151 0 : let connection_pool = diesel::r2d2::Pool::builder()
152 0 : .max_size(Self::MAX_CONNECTIONS)
153 0 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
154 0 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
155 0 : // Always keep at least one connection ready to go
156 0 : .min_idle(Some(1))
157 0 : .test_on_check_out(true)
158 0 : .build(manager)
159 0 : .expect("Could not build connection pool");
160 0 :
161 0 : Self { connection_pool }
162 0 : }
163 :
164 : /// A helper for use during startup, where we would like to tolerate concurrent restarts of the
165 : /// database and the storage controller, therefore the database might not be available right away
166 0 : pub async fn await_connection(
167 0 : database_url: &str,
168 0 : timeout: Duration,
169 0 : ) -> Result<(), diesel::ConnectionError> {
170 0 : let started_at = Instant::now();
171 : loop {
172 0 : match PgConnection::establish(database_url) {
173 : Ok(_) => {
174 0 : tracing::info!("Connected to database.");
175 0 : return Ok(());
176 : }
177 0 : Err(e) => {
178 0 : if started_at.elapsed() > timeout {
179 0 : return Err(e);
180 : } else {
181 0 : tracing::info!("Database not yet available, waiting... ({e})");
182 0 : tokio::time::sleep(Duration::from_millis(100)).await;
183 : }
184 : }
185 : }
186 : }
187 0 : }
188 :
189 : /// Execute the diesel migrations that are built into this binary
190 0 : pub(crate) async fn migration_run(&self) -> DatabaseResult<()> {
191 : use diesel_migrations::{HarnessWithOutput, MigrationHarness};
192 :
193 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
194 0 : HarnessWithOutput::write_to_stdout(conn)
195 0 : .run_pending_migrations(MIGRATIONS)
196 0 : .map(|_| ())
197 0 : .map_err(|e| DatabaseError::Migration(e.to_string()))
198 0 : })
199 0 : .await
200 0 : }
201 :
202 : /// Wraps `with_conn` in order to collect latency and error metrics
203 0 : async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
204 0 : where
205 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
206 0 : R: Send + 'static,
207 0 : {
208 0 : let latency = &METRICS_REGISTRY
209 0 : .metrics_group
210 0 : .storage_controller_database_query_latency;
211 0 : let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op });
212 :
213 0 : let res = self.with_conn(func).await;
214 :
215 0 : if let Err(err) = &res {
216 0 : let error_counter = &METRICS_REGISTRY
217 0 : .metrics_group
218 0 : .storage_controller_database_query_error;
219 0 : error_counter.inc(DatabaseQueryErrorLabelGroup {
220 0 : error_type: err.error_label(),
221 0 : operation: op,
222 0 : })
223 0 : }
224 :
225 0 : res
226 0 : }
227 :
228 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
229 0 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
230 0 : where
231 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
232 0 : R: Send + 'static,
233 0 : {
234 : // A generous allowance for how many times we may retry serializable transactions
235 : // before giving up. This is not expected to be hit: it is a defensive measure in case we
236 : // somehow engineer a situation where duelling transactions might otherwise live-lock.
237 : const MAX_RETRIES: usize = 128;
238 :
239 0 : let mut conn = self.connection_pool.get()?;
240 0 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
241 0 : let mut retry_count = 0;
242 : loop {
243 0 : match conn.build_transaction().serializable().run(|c| func(c)) {
244 0 : Ok(r) => break Ok(r),
245 : Err(
246 0 : err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
247 0 : diesel::result::DatabaseErrorKind::SerializationFailure,
248 0 : _,
249 0 : )),
250 0 : ) => {
251 0 : retry_count += 1;
252 0 : if retry_count > MAX_RETRIES {
253 0 : tracing::error!(
254 0 : "Exceeded max retries on SerializationFailure errors: {err:?}"
255 : );
256 0 : break Err(err);
257 : } else {
258 : // Retry on serialization errors: these are expected, because even though our
259 : // transactions don't fight for the same rows, they will occasionally collide
260 : // on index pages (e.g. increment_generation for unrelated shards can collide)
261 0 : tracing::debug!(
262 0 : "Retrying transaction on serialization failure {err:?}"
263 : );
264 0 : continue;
265 : }
266 : }
267 0 : Err(e) => break Err(e),
268 : }
269 : }
270 0 : })
271 0 : .await
272 0 : .expect("Task panic")
273 0 : }
274 :
275 : /// When a node is first registered, persist it before using it for anything
276 0 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
277 0 : let np = node.to_persistent();
278 0 : self.with_measured_conn(
279 0 : DatabaseOperation::InsertNode,
280 0 : move |conn| -> DatabaseResult<()> {
281 0 : diesel::insert_into(crate::schema::nodes::table)
282 0 : .values(&np)
283 0 : .execute(conn)?;
284 0 : Ok(())
285 0 : },
286 0 : )
287 0 : .await
288 0 : }
289 :
290 : /// At startup, populate the list of nodes which our shards may be placed on
291 0 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
292 0 : let nodes: Vec<NodePersistence> = self
293 0 : .with_measured_conn(
294 0 : DatabaseOperation::ListNodes,
295 0 : move |conn| -> DatabaseResult<_> {
296 0 : Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
297 0 : },
298 0 : )
299 0 : .await?;
300 :
301 0 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
302 :
303 0 : Ok(nodes)
304 0 : }
305 :
306 0 : pub(crate) async fn update_node(
307 0 : &self,
308 0 : input_node_id: NodeId,
309 0 : input_scheduling: NodeSchedulingPolicy,
310 0 : ) -> DatabaseResult<()> {
311 : use crate::schema::nodes::dsl::*;
312 0 : let updated = self
313 0 : .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
314 0 : let updated = diesel::update(nodes)
315 0 : .filter(node_id.eq(input_node_id.0 as i64))
316 0 : .set((scheduling_policy.eq(String::from(input_scheduling)),))
317 0 : .execute(conn)?;
318 0 : Ok(updated)
319 0 : })
320 0 : .await?;
321 :
322 0 : if updated != 1 {
323 0 : Err(DatabaseError::Logical(format!(
324 0 : "Node {node_id:?} not found for update",
325 0 : )))
326 : } else {
327 0 : Ok(())
328 : }
329 0 : }
330 :
331 : /// At startup, load the high level state for shards, such as their config + policy. This will
332 : /// be enriched at runtime with state discovered on pageservers.
333 0 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
334 0 : self.with_measured_conn(
335 0 : DatabaseOperation::ListTenantShards,
336 0 : move |conn| -> DatabaseResult<_> {
337 0 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
338 0 : },
339 0 : )
340 0 : .await
341 0 : }
342 :
343 : /// Tenants must be persisted before we schedule them for the first time. This enables us
344 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
345 0 : pub(crate) async fn insert_tenant_shards(
346 0 : &self,
347 0 : shards: Vec<TenantShardPersistence>,
348 0 : ) -> DatabaseResult<()> {
349 : use crate::schema::metadata_health;
350 : use crate::schema::tenant_shards;
351 :
352 0 : let now = chrono::Utc::now();
353 0 :
354 0 : let metadata_health_records = shards
355 0 : .iter()
356 0 : .map(|t| MetadataHealthPersistence {
357 0 : tenant_id: t.tenant_id.clone(),
358 0 : shard_number: t.shard_number,
359 0 : shard_count: t.shard_count,
360 0 : healthy: true,
361 0 : last_scrubbed_at: now,
362 0 : })
363 0 : .collect::<Vec<_>>();
364 0 :
365 0 : self.with_measured_conn(
366 0 : DatabaseOperation::InsertTenantShards,
367 0 : move |conn| -> DatabaseResult<()> {
368 0 : diesel::insert_into(tenant_shards::table)
369 0 : .values(&shards)
370 0 : .execute(conn)?;
371 :
372 0 : diesel::insert_into(metadata_health::table)
373 0 : .values(&metadata_health_records)
374 0 : .execute(conn)?;
375 0 : Ok(())
376 0 : },
377 0 : )
378 0 : .await
379 0 : }
380 :
381 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
382 : /// the tenant from memory on this server.
383 0 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
384 : use crate::schema::tenant_shards::dsl::*;
385 0 : self.with_measured_conn(
386 0 : DatabaseOperation::DeleteTenant,
387 0 : move |conn| -> DatabaseResult<()> {
388 0 : // `metadata_health` status (if exists) is also deleted based on the cascade behavior.
389 0 : diesel::delete(tenant_shards)
390 0 : .filter(tenant_id.eq(del_tenant_id.to_string()))
391 0 : .execute(conn)?;
392 0 : Ok(())
393 0 : },
394 0 : )
395 0 : .await
396 0 : }
397 :
398 0 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
399 : use crate::schema::nodes::dsl::*;
400 0 : self.with_measured_conn(
401 0 : DatabaseOperation::DeleteNode,
402 0 : move |conn| -> DatabaseResult<()> {
403 0 : diesel::delete(nodes)
404 0 : .filter(node_id.eq(del_node_id.0 as i64))
405 0 : .execute(conn)?;
406 :
407 0 : Ok(())
408 0 : },
409 0 : )
410 0 : .await
411 0 : }
412 :
413 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
414 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
415 : /// the node that called /re-attach.
416 0 : #[tracing::instrument(skip_all, fields(node_id))]
417 : pub(crate) async fn re_attach(
418 : &self,
419 : input_node_id: NodeId,
420 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
421 : use crate::schema::nodes::dsl::scheduling_policy;
422 : use crate::schema::nodes::dsl::*;
423 : use crate::schema::tenant_shards::dsl::*;
424 : let updated = self
425 0 : .with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
426 0 : let rows_updated = diesel::update(tenant_shards)
427 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
428 0 : .set(generation.eq(generation + 1))
429 0 : .execute(conn)?;
430 :
431 0 : tracing::info!("Incremented {} tenants' generations", rows_updated);
432 :
433 : // TODO: UPDATE+SELECT in one query
434 :
435 0 : let updated = tenant_shards
436 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
437 0 : .select(TenantShardPersistence::as_select())
438 0 : .load(conn)?;
439 :
440 : // If the node went through a drain and restart phase before re-attaching,
441 : // then reset it's node scheduling policy to active.
442 0 : diesel::update(nodes)
443 0 : .filter(node_id.eq(input_node_id.0 as i64))
444 0 : .filter(
445 0 : scheduling_policy
446 0 : .eq(String::from(NodeSchedulingPolicy::PauseForRestart))
447 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
448 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
449 0 : )
450 0 : .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
451 0 : .execute(conn)?;
452 :
453 0 : Ok(updated)
454 0 : })
455 : .await?;
456 :
457 : let mut result = HashMap::new();
458 : for tsp in updated {
459 : let tenant_shard_id = TenantShardId {
460 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
461 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
462 : shard_number: ShardNumber(tsp.shard_number as u8),
463 : shard_count: ShardCount::new(tsp.shard_count as u8),
464 : };
465 :
466 : let Some(g) = tsp.generation else {
467 : // If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
468 : // we only set generation_pageserver when setting generation.
469 : return Err(DatabaseError::Logical(
470 : "Generation should always be set after incrementing".to_string(),
471 : ));
472 : };
473 : result.insert(tenant_shard_id, Generation::new(g as u32));
474 : }
475 :
476 : Ok(result)
477 : }
478 :
479 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
480 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
481 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
482 0 : pub(crate) async fn increment_generation(
483 0 : &self,
484 0 : tenant_shard_id: TenantShardId,
485 0 : node_id: NodeId,
486 0 : ) -> anyhow::Result<Generation> {
487 : use crate::schema::tenant_shards::dsl::*;
488 0 : let updated = self
489 0 : .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
490 0 : let updated = diesel::update(tenant_shards)
491 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
492 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
493 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
494 0 : .set((
495 0 : generation.eq(generation + 1),
496 0 : generation_pageserver.eq(node_id.0 as i64),
497 0 : ))
498 0 : // TODO: only returning() the generation column
499 0 : .returning(TenantShardPersistence::as_returning())
500 0 : .get_result(conn)?;
501 :
502 0 : Ok(updated)
503 0 : })
504 0 : .await?;
505 :
506 : // Generation is always non-null in the rseult: if the generation column had been NULL, then we
507 : // should have experienced an SQL Confilict error while executing a query that tries to increment it.
508 0 : debug_assert!(updated.generation.is_some());
509 0 : let Some(g) = updated.generation else {
510 0 : return Err(DatabaseError::Logical(
511 0 : "Generation should always be set after incrementing".to_string(),
512 0 : )
513 0 : .into());
514 : };
515 :
516 0 : Ok(Generation::new(g as u32))
517 0 : }
518 :
519 : /// When we want to call out to the running shards for a tenant, e.g. during timeline CRUD operations,
520 : /// we need to know where the shard is attached, _and_ the generation, so that we can re-check the generation
521 : /// afterwards to confirm that our timeline CRUD operation is truly persistent (it must have happened in the
522 : /// latest generation)
523 : ///
524 : /// If the tenant doesn't exist, an empty vector is returned.
525 : ///
526 : /// Output is sorted by shard number
527 0 : pub(crate) async fn tenant_generations(
528 0 : &self,
529 0 : filter_tenant_id: TenantId,
530 0 : ) -> Result<Vec<ShardGenerationState>, DatabaseError> {
531 : use crate::schema::tenant_shards::dsl::*;
532 0 : let rows = self
533 0 : .with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| {
534 0 : let result = tenant_shards
535 0 : .filter(tenant_id.eq(filter_tenant_id.to_string()))
536 0 : .select(TenantShardPersistence::as_select())
537 0 : .order(shard_number)
538 0 : .load(conn)?;
539 0 : Ok(result)
540 0 : })
541 0 : .await?;
542 :
543 0 : Ok(rows
544 0 : .into_iter()
545 0 : .map(|p| ShardGenerationState {
546 0 : tenant_shard_id: p
547 0 : .get_tenant_shard_id()
548 0 : .expect("Corrupt tenant shard id in database"),
549 0 : generation: p.generation.map(|g| Generation::new(g as u32)),
550 0 : generation_pageserver: p.generation_pageserver.map(|n| NodeId(n as u64)),
551 0 : })
552 0 : .collect())
553 0 : }
554 :
555 : /// Read the generation number of specific tenant shards
556 : ///
557 : /// Output is unsorted. Output may not include values for all inputs, if they are missing in the database.
558 0 : pub(crate) async fn shard_generations(
559 0 : &self,
560 0 : mut tenant_shard_ids: impl Iterator<Item = &TenantShardId>,
561 0 : ) -> Result<Vec<(TenantShardId, Option<Generation>)>, DatabaseError> {
562 0 : let mut rows = Vec::with_capacity(tenant_shard_ids.size_hint().0);
563 :
564 : // We will chunk our input to avoid composing arbitrarily long `IN` clauses. Typically we are
565 : // called with a single digit number of IDs, but in principle we could be called with tens
566 : // of thousands (all the shards on one pageserver) from the generation validation API.
567 0 : loop {
568 0 : // A modest hardcoded chunk size to handle typical cases in a single query but never generate particularly
569 0 : // large query strings.
570 0 : let chunk_ids = tenant_shard_ids.by_ref().take(32);
571 0 :
572 0 : // Compose a comma separated list of tuples for matching on (tenant_id, shard_number, shard_count)
573 0 : let in_clause = chunk_ids
574 0 : .map(|tsid| {
575 0 : format!(
576 0 : "('{}', {}, {})",
577 0 : tsid.tenant_id, tsid.shard_number.0, tsid.shard_count.0
578 0 : )
579 0 : })
580 0 : .join(",");
581 0 :
582 0 : // We are done when our iterator gives us nothing to filter on
583 0 : if in_clause.is_empty() {
584 0 : break;
585 0 : }
586 :
587 0 : let chunk_rows = self
588 0 : .with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| {
589 : // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because
590 : // the inputs are strongly typed and cannot carry any user-supplied raw string content.
591 0 : let result : Vec<TenantShardPersistence> = diesel::sql_query(
592 0 : format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str()
593 0 : ).load(conn)?;
594 :
595 0 : Ok(result)
596 0 : })
597 0 : .await?;
598 0 : rows.extend(chunk_rows.into_iter())
599 : }
600 :
601 0 : Ok(rows
602 0 : .into_iter()
603 0 : .map(|tsp| {
604 0 : (
605 0 : tsp.get_tenant_shard_id()
606 0 : .expect("Bad tenant ID in database"),
607 0 : tsp.generation.map(|g| Generation::new(g as u32)),
608 0 : )
609 0 : })
610 0 : .collect())
611 0 : }
612 :
613 : #[allow(non_local_definitions)]
614 : /// For use when updating a persistent property of a tenant, such as its config or placement_policy.
615 : ///
616 : /// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
617 : /// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
618 : /// that we only do the first time a tenant is set to an attached policy via /location_config.
619 0 : pub(crate) async fn update_tenant_shard(
620 0 : &self,
621 0 : tenant: TenantFilter,
622 0 : input_placement_policy: Option<PlacementPolicy>,
623 0 : input_config: Option<TenantConfig>,
624 0 : input_generation: Option<Generation>,
625 0 : input_scheduling_policy: Option<ShardSchedulingPolicy>,
626 0 : ) -> DatabaseResult<()> {
627 : use crate::schema::tenant_shards::dsl::*;
628 :
629 0 : self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
630 0 : let query = match tenant {
631 0 : TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
632 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
633 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
634 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
635 0 : .into_boxed(),
636 0 : TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
637 0 : .filter(tenant_id.eq(input_tenant_id.to_string()))
638 0 : .into_boxed(),
639 : };
640 :
641 : // Clear generation_pageserver if we are moving into a state where we won't have
642 : // any attached pageservers.
643 0 : let input_generation_pageserver = match input_placement_policy {
644 0 : None | Some(PlacementPolicy::Attached(_)) => None,
645 0 : Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None),
646 : };
647 :
648 0 : #[derive(AsChangeset)]
649 : #[diesel(table_name = crate::schema::tenant_shards)]
650 : struct ShardUpdate {
651 : generation: Option<i32>,
652 : placement_policy: Option<String>,
653 : config: Option<String>,
654 : scheduling_policy: Option<String>,
655 : generation_pageserver: Option<Option<i64>>,
656 : }
657 :
658 0 : let update = ShardUpdate {
659 0 : generation: input_generation.map(|g| g.into().unwrap() as i32),
660 0 : placement_policy: input_placement_policy
661 0 : .as_ref()
662 0 : .map(|p| serde_json::to_string(&p).unwrap()),
663 0 : config: input_config
664 0 : .as_ref()
665 0 : .map(|c| serde_json::to_string(&c).unwrap()),
666 0 : scheduling_policy: input_scheduling_policy
667 0 : .map(|p| serde_json::to_string(&p).unwrap()),
668 0 : generation_pageserver: input_generation_pageserver,
669 0 : };
670 0 :
671 0 : query.set(update).execute(conn)?;
672 :
673 0 : Ok(())
674 0 : })
675 0 : .await?;
676 :
677 0 : Ok(())
678 0 : }
679 :
680 0 : pub(crate) async fn set_tenant_shard_preferred_azs(
681 0 : &self,
682 0 : preferred_azs: Vec<(TenantShardId, AvailabilityZone)>,
683 0 : ) -> DatabaseResult<Vec<(TenantShardId, AvailabilityZone)>> {
684 : use crate::schema::tenant_shards::dsl::*;
685 :
686 0 : self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
687 0 : let mut shards_updated = Vec::default();
688 :
689 0 : for (tenant_shard_id, preferred_az) in preferred_azs.iter() {
690 0 : let updated = diesel::update(tenant_shards)
691 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
692 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
693 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
694 0 : .set(preferred_az_id.eq(preferred_az.0.clone()))
695 0 : .execute(conn)?;
696 :
697 0 : if updated == 1 {
698 0 : shards_updated.push((*tenant_shard_id, preferred_az.clone()));
699 0 : }
700 : }
701 :
702 0 : Ok(shards_updated)
703 0 : })
704 0 : .await
705 0 : }
706 :
707 0 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
708 : use crate::schema::tenant_shards::dsl::*;
709 0 : self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
710 0 : let updated = diesel::update(tenant_shards)
711 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
712 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
713 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
714 0 : .set((
715 0 : generation_pageserver.eq(Option::<i64>::None),
716 0 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
717 0 : ))
718 0 : .execute(conn)?;
719 :
720 0 : Ok(updated)
721 0 : })
722 0 : .await?;
723 :
724 0 : Ok(())
725 0 : }
726 :
727 : // When we start shard splitting, we must durably mark the tenant so that
728 : // on restart, we know that we must go through recovery.
729 : //
730 : // We create the child shards here, so that they will be available for increment_generation calls
731 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
732 0 : pub(crate) async fn begin_shard_split(
733 0 : &self,
734 0 : old_shard_count: ShardCount,
735 0 : split_tenant_id: TenantId,
736 0 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
737 0 : ) -> DatabaseResult<()> {
738 : use crate::schema::tenant_shards::dsl::*;
739 0 : self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
740 : // Mark parent shards as splitting
741 :
742 0 : let updated = diesel::update(tenant_shards)
743 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
744 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
745 0 : .set((splitting.eq(1),))
746 0 : .execute(conn)?;
747 0 : if u8::try_from(updated)
748 0 : .map_err(|_| DatabaseError::Logical(
749 0 : format!("Overflow existing shard count {} while splitting", updated))
750 0 : )? != old_shard_count.count() {
751 : // Perhaps a deletion or another split raced with this attempt to split, mutating
752 : // the parent shards that we intend to split. In this case the split request should fail.
753 0 : return Err(DatabaseError::Logical(
754 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
755 0 : ));
756 0 : }
757 0 :
758 0 : // FIXME: spurious clone to sidestep closure move rules
759 0 : let parent_to_children = parent_to_children.clone();
760 :
761 : // Insert child shards
762 0 : for (parent_shard_id, children) in parent_to_children {
763 0 : let mut parent = crate::schema::tenant_shards::table
764 0 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
765 0 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
766 0 : .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
767 0 : .load::<TenantShardPersistence>(conn)?;
768 0 : let parent = if parent.len() != 1 {
769 0 : return Err(DatabaseError::Logical(format!(
770 0 : "Parent shard {parent_shard_id} not found"
771 0 : )));
772 : } else {
773 0 : parent.pop().unwrap()
774 : };
775 0 : for mut shard in children {
776 : // Carry the parent's generation into the child
777 0 : shard.generation = parent.generation;
778 0 :
779 0 : debug_assert!(shard.splitting == SplitState::Splitting);
780 0 : diesel::insert_into(tenant_shards)
781 0 : .values(shard)
782 0 : .execute(conn)?;
783 : }
784 : }
785 :
786 0 : Ok(())
787 0 : })
788 0 : .await
789 0 : }
790 :
791 : // When we finish shard splitting, we must atomically clean up the old shards
792 : // and insert the new shards, and clear the splitting marker.
793 0 : pub(crate) async fn complete_shard_split(
794 0 : &self,
795 0 : split_tenant_id: TenantId,
796 0 : old_shard_count: ShardCount,
797 0 : ) -> DatabaseResult<()> {
798 : use crate::schema::tenant_shards::dsl::*;
799 0 : self.with_measured_conn(
800 0 : DatabaseOperation::CompleteShardSplit,
801 0 : move |conn| -> DatabaseResult<()> {
802 0 : // Drop parent shards
803 0 : diesel::delete(tenant_shards)
804 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
805 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
806 0 : .execute(conn)?;
807 :
808 : // Clear sharding flag
809 0 : let updated = diesel::update(tenant_shards)
810 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
811 0 : .set((splitting.eq(0),))
812 0 : .execute(conn)?;
813 0 : debug_assert!(updated > 0);
814 :
815 0 : Ok(())
816 0 : },
817 0 : )
818 0 : .await
819 0 : }
820 :
821 : /// Used when the remote part of a shard split failed: we will revert the database state to have only
822 : /// the parent shards, with SplitState::Idle.
823 0 : pub(crate) async fn abort_shard_split(
824 0 : &self,
825 0 : split_tenant_id: TenantId,
826 0 : new_shard_count: ShardCount,
827 0 : ) -> DatabaseResult<AbortShardSplitStatus> {
828 : use crate::schema::tenant_shards::dsl::*;
829 0 : self.with_measured_conn(
830 0 : DatabaseOperation::AbortShardSplit,
831 0 : move |conn| -> DatabaseResult<AbortShardSplitStatus> {
832 : // Clear the splitting state on parent shards
833 0 : let updated = diesel::update(tenant_shards)
834 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
835 0 : .filter(shard_count.ne(new_shard_count.literal() as i32))
836 0 : .set((splitting.eq(0),))
837 0 : .execute(conn)?;
838 :
839 : // Parent shards are already gone: we cannot abort.
840 0 : if updated == 0 {
841 0 : return Ok(AbortShardSplitStatus::Complete);
842 0 : }
843 0 :
844 0 : // Sanity check: if parent shards were present, their cardinality should
845 0 : // be less than the number of child shards.
846 0 : if updated >= new_shard_count.count() as usize {
847 0 : return Err(DatabaseError::Logical(format!(
848 0 : "Unexpected parent shard count {updated} while aborting split to \
849 0 : count {new_shard_count:?} on tenant {split_tenant_id}"
850 0 : )));
851 0 : }
852 0 :
853 0 : // Erase child shards
854 0 : diesel::delete(tenant_shards)
855 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
856 0 : .filter(shard_count.eq(new_shard_count.literal() as i32))
857 0 : .execute(conn)?;
858 :
859 0 : Ok(AbortShardSplitStatus::Aborted)
860 0 : },
861 0 : )
862 0 : .await
863 0 : }
864 :
865 : /// Stores all the latest metadata health updates durably. Updates existing entry on conflict.
866 : ///
867 : /// **Correctness:** `metadata_health_updates` should all belong the tenant shards managed by the storage controller.
868 : #[allow(dead_code)]
869 0 : pub(crate) async fn update_metadata_health_records(
870 0 : &self,
871 0 : healthy_records: Vec<MetadataHealthPersistence>,
872 0 : unhealthy_records: Vec<MetadataHealthPersistence>,
873 0 : now: chrono::DateTime<chrono::Utc>,
874 0 : ) -> DatabaseResult<()> {
875 : use crate::schema::metadata_health::dsl::*;
876 :
877 0 : self.with_measured_conn(
878 0 : DatabaseOperation::UpdateMetadataHealth,
879 0 : move |conn| -> DatabaseResult<_> {
880 0 : diesel::insert_into(metadata_health)
881 0 : .values(&healthy_records)
882 0 : .on_conflict((tenant_id, shard_number, shard_count))
883 0 : .do_update()
884 0 : .set((healthy.eq(true), last_scrubbed_at.eq(now)))
885 0 : .execute(conn)?;
886 :
887 0 : diesel::insert_into(metadata_health)
888 0 : .values(&unhealthy_records)
889 0 : .on_conflict((tenant_id, shard_number, shard_count))
890 0 : .do_update()
891 0 : .set((healthy.eq(false), last_scrubbed_at.eq(now)))
892 0 : .execute(conn)?;
893 0 : Ok(())
894 0 : },
895 0 : )
896 0 : .await
897 0 : }
898 :
899 : /// Lists all the metadata health records.
900 : #[allow(dead_code)]
901 0 : pub(crate) async fn list_metadata_health_records(
902 0 : &self,
903 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
904 0 : self.with_measured_conn(
905 0 : DatabaseOperation::ListMetadataHealth,
906 0 : move |conn| -> DatabaseResult<_> {
907 0 : Ok(
908 0 : crate::schema::metadata_health::table
909 0 : .load::<MetadataHealthPersistence>(conn)?,
910 : )
911 0 : },
912 0 : )
913 0 : .await
914 0 : }
915 :
916 : /// Lists all the metadata health records that is unhealthy.
917 : #[allow(dead_code)]
918 0 : pub(crate) async fn list_unhealthy_metadata_health_records(
919 0 : &self,
920 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
921 : use crate::schema::metadata_health::dsl::*;
922 0 : self.with_measured_conn(
923 0 : DatabaseOperation::ListMetadataHealthUnhealthy,
924 0 : move |conn| -> DatabaseResult<_> {
925 0 : Ok(crate::schema::metadata_health::table
926 0 : .filter(healthy.eq(false))
927 0 : .load::<MetadataHealthPersistence>(conn)?)
928 0 : },
929 0 : )
930 0 : .await
931 0 : }
932 :
933 : /// Lists all the metadata health records that have not been updated since an `earlier` time.
934 : #[allow(dead_code)]
935 0 : pub(crate) async fn list_outdated_metadata_health_records(
936 0 : &self,
937 0 : earlier: chrono::DateTime<chrono::Utc>,
938 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
939 : use crate::schema::metadata_health::dsl::*;
940 :
941 0 : self.with_measured_conn(
942 0 : DatabaseOperation::ListMetadataHealthOutdated,
943 0 : move |conn| -> DatabaseResult<_> {
944 0 : let query = metadata_health.filter(last_scrubbed_at.lt(earlier));
945 0 : let res = query.load::<MetadataHealthPersistence>(conn)?;
946 :
947 0 : Ok(res)
948 0 : },
949 0 : )
950 0 : .await
951 0 : }
952 :
953 : /// Get the current entry from the `leader` table if one exists.
954 : /// It is an error for the table to contain more than one entry.
955 0 : pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
956 0 : let mut leader: Vec<ControllerPersistence> = self
957 0 : .with_measured_conn(
958 0 : DatabaseOperation::GetLeader,
959 0 : move |conn| -> DatabaseResult<_> {
960 0 : Ok(crate::schema::controllers::table.load::<ControllerPersistence>(conn)?)
961 0 : },
962 0 : )
963 0 : .await?;
964 :
965 0 : if leader.len() > 1 {
966 0 : return Err(DatabaseError::Logical(format!(
967 0 : "More than one entry present in the leader table: {leader:?}"
968 0 : )));
969 0 : }
970 0 :
971 0 : Ok(leader.pop())
972 0 : }
973 :
974 : /// Update the new leader with compare-exchange semantics. If `prev` does not
975 : /// match the current leader entry, then the update is treated as a failure.
976 : /// When `prev` is not specified, the update is forced.
977 0 : pub(crate) async fn update_leader(
978 0 : &self,
979 0 : prev: Option<ControllerPersistence>,
980 0 : new: ControllerPersistence,
981 0 : ) -> DatabaseResult<()> {
982 : use crate::schema::controllers::dsl::*;
983 :
984 0 : let updated = self
985 0 : .with_measured_conn(
986 0 : DatabaseOperation::UpdateLeader,
987 0 : move |conn| -> DatabaseResult<usize> {
988 0 : let updated = match &prev {
989 0 : Some(prev) => diesel::update(controllers)
990 0 : .filter(address.eq(prev.address.clone()))
991 0 : .filter(started_at.eq(prev.started_at))
992 0 : .set((
993 0 : address.eq(new.address.clone()),
994 0 : started_at.eq(new.started_at),
995 0 : ))
996 0 : .execute(conn)?,
997 0 : None => diesel::insert_into(controllers)
998 0 : .values(new.clone())
999 0 : .execute(conn)?,
1000 : };
1001 :
1002 0 : Ok(updated)
1003 0 : },
1004 0 : )
1005 0 : .await?;
1006 :
1007 0 : if updated == 0 {
1008 0 : return Err(DatabaseError::Logical(
1009 0 : "Leader table update failed".to_string(),
1010 0 : ));
1011 0 : }
1012 0 :
1013 0 : Ok(())
1014 0 : }
1015 :
1016 : /// At startup, populate the list of nodes which our shards may be placed on
1017 0 : pub(crate) async fn list_safekeepers(&self) -> DatabaseResult<Vec<SafekeeperPersistence>> {
1018 0 : let safekeepers: Vec<SafekeeperPersistence> = self
1019 0 : .with_measured_conn(
1020 0 : DatabaseOperation::ListNodes,
1021 0 : move |conn| -> DatabaseResult<_> {
1022 0 : Ok(crate::schema::safekeepers::table.load::<SafekeeperPersistence>(conn)?)
1023 0 : },
1024 0 : )
1025 0 : .await?;
1026 :
1027 0 : tracing::info!("list_safekeepers: loaded {} nodes", safekeepers.len());
1028 :
1029 0 : Ok(safekeepers)
1030 0 : }
1031 :
1032 0 : pub(crate) async fn safekeeper_get(
1033 0 : &self,
1034 0 : id: i64,
1035 0 : ) -> Result<SafekeeperPersistence, DatabaseError> {
1036 : use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
1037 0 : self.with_conn(move |conn| -> DatabaseResult<SafekeeperPersistence> {
1038 0 : Ok(safekeepers
1039 0 : .filter(id_column.eq(&id))
1040 0 : .select(SafekeeperPersistence::as_select())
1041 0 : .get_result(conn)?)
1042 0 : })
1043 0 : .await
1044 0 : }
1045 :
1046 0 : pub(crate) async fn safekeeper_upsert(
1047 0 : &self,
1048 0 : record: SafekeeperPersistence,
1049 0 : ) -> Result<(), DatabaseError> {
1050 : use crate::schema::safekeepers::dsl::*;
1051 :
1052 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
1053 0 : let bind = record.as_insert_or_update();
1054 :
1055 0 : let inserted_updated = diesel::insert_into(safekeepers)
1056 0 : .values(&bind)
1057 0 : .on_conflict(id)
1058 0 : .do_update()
1059 0 : .set(&bind)
1060 0 : .execute(conn)?;
1061 :
1062 0 : if inserted_updated != 1 {
1063 0 : return Err(DatabaseError::Logical(format!(
1064 0 : "unexpected number of rows ({})",
1065 0 : inserted_updated
1066 0 : )));
1067 0 : }
1068 0 :
1069 0 : Ok(())
1070 0 : })
1071 0 : .await
1072 0 : }
1073 : }
1074 :
1075 : /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
1076 : #[derive(
1077 0 : QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,
1078 : )]
1079 : #[diesel(table_name = crate::schema::tenant_shards)]
1080 : pub(crate) struct TenantShardPersistence {
1081 : #[serde(default)]
1082 : pub(crate) tenant_id: String,
1083 : #[serde(default)]
1084 : pub(crate) shard_number: i32,
1085 : #[serde(default)]
1086 : pub(crate) shard_count: i32,
1087 : #[serde(default)]
1088 : pub(crate) shard_stripe_size: i32,
1089 :
1090 : // Latest generation number: next time we attach, increment this
1091 : // and use the incremented number when attaching.
1092 : //
1093 : // Generation is only None when first onboarding a tenant, where it may
1094 : // be in PlacementPolicy::Secondary and therefore have no valid generation state.
1095 : pub(crate) generation: Option<i32>,
1096 :
1097 : // Currently attached pageserver
1098 : #[serde(rename = "pageserver")]
1099 : pub(crate) generation_pageserver: Option<i64>,
1100 :
1101 : #[serde(default)]
1102 : pub(crate) placement_policy: String,
1103 : #[serde(default)]
1104 : pub(crate) splitting: SplitState,
1105 : #[serde(default)]
1106 : pub(crate) config: String,
1107 : #[serde(default)]
1108 : pub(crate) scheduling_policy: String,
1109 :
1110 : // Hint that we should attempt to schedule this tenant shard the given
1111 : // availability zone in order to minimise the chances of cross-AZ communication
1112 : // with compute.
1113 : pub(crate) preferred_az_id: Option<String>,
1114 : }
1115 :
1116 : impl TenantShardPersistence {
1117 0 : pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
1118 0 : if self.shard_count == 0 {
1119 0 : Ok(ShardIdentity::unsharded())
1120 : } else {
1121 0 : Ok(ShardIdentity::new(
1122 0 : ShardNumber(self.shard_number as u8),
1123 0 : ShardCount::new(self.shard_count as u8),
1124 0 : ShardStripeSize(self.shard_stripe_size as u32),
1125 0 : )?)
1126 : }
1127 0 : }
1128 :
1129 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
1130 0 : Ok(TenantShardId {
1131 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
1132 0 : shard_number: ShardNumber(self.shard_number as u8),
1133 0 : shard_count: ShardCount::new(self.shard_count as u8),
1134 : })
1135 0 : }
1136 : }
1137 :
1138 : /// Parts of [`crate::node::Node`] that are stored durably
1139 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
1140 : #[diesel(table_name = crate::schema::nodes)]
1141 : pub(crate) struct NodePersistence {
1142 : pub(crate) node_id: i64,
1143 : pub(crate) scheduling_policy: String,
1144 : pub(crate) listen_http_addr: String,
1145 : pub(crate) listen_http_port: i32,
1146 : pub(crate) listen_pg_addr: String,
1147 : pub(crate) listen_pg_port: i32,
1148 : pub(crate) availability_zone_id: String,
1149 : }
1150 :
1151 : /// Tenant metadata health status that are stored durably.
1152 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
1153 : #[diesel(table_name = crate::schema::metadata_health)]
1154 : pub(crate) struct MetadataHealthPersistence {
1155 : #[serde(default)]
1156 : pub(crate) tenant_id: String,
1157 : #[serde(default)]
1158 : pub(crate) shard_number: i32,
1159 : #[serde(default)]
1160 : pub(crate) shard_count: i32,
1161 :
1162 : pub(crate) healthy: bool,
1163 : pub(crate) last_scrubbed_at: chrono::DateTime<chrono::Utc>,
1164 : }
1165 :
1166 : impl MetadataHealthPersistence {
1167 0 : pub fn new(
1168 0 : tenant_shard_id: TenantShardId,
1169 0 : healthy: bool,
1170 0 : last_scrubbed_at: chrono::DateTime<chrono::Utc>,
1171 0 : ) -> Self {
1172 0 : let tenant_id = tenant_shard_id.tenant_id.to_string();
1173 0 : let shard_number = tenant_shard_id.shard_number.0 as i32;
1174 0 : let shard_count = tenant_shard_id.shard_count.literal() as i32;
1175 0 :
1176 0 : MetadataHealthPersistence {
1177 0 : tenant_id,
1178 0 : shard_number,
1179 0 : shard_count,
1180 0 : healthy,
1181 0 : last_scrubbed_at,
1182 0 : }
1183 0 : }
1184 :
1185 : #[allow(dead_code)]
1186 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
1187 0 : Ok(TenantShardId {
1188 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
1189 0 : shard_number: ShardNumber(self.shard_number as u8),
1190 0 : shard_count: ShardCount::new(self.shard_count as u8),
1191 : })
1192 0 : }
1193 : }
1194 :
1195 : impl From<MetadataHealthPersistence> for MetadataHealthRecord {
1196 0 : fn from(value: MetadataHealthPersistence) -> Self {
1197 0 : MetadataHealthRecord {
1198 0 : tenant_shard_id: value
1199 0 : .get_tenant_shard_id()
1200 0 : .expect("stored tenant id should be valid"),
1201 0 : healthy: value.healthy,
1202 0 : last_scrubbed_at: value.last_scrubbed_at,
1203 0 : }
1204 0 : }
1205 : }
1206 :
1207 : #[derive(
1208 0 : Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone,
1209 : )]
1210 : #[diesel(table_name = crate::schema::controllers)]
1211 : pub(crate) struct ControllerPersistence {
1212 : pub(crate) address: String,
1213 : pub(crate) started_at: chrono::DateTime<chrono::Utc>,
1214 : }
1215 :
1216 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Eq, PartialEq, Debug, Clone)]
1217 : #[diesel(table_name = crate::schema::safekeepers)]
1218 : pub(crate) struct SafekeeperPersistence {
1219 : pub(crate) id: i64,
1220 : pub(crate) region_id: String,
1221 : /// 1 is special, it means just created (not currently posted to storcon).
1222 : /// Zero or negative is not really expected.
1223 : /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
1224 : pub(crate) version: i64,
1225 : pub(crate) host: String,
1226 : pub(crate) port: i32,
1227 : pub(crate) active: bool,
1228 : pub(crate) http_port: i32,
1229 : pub(crate) availability_zone_id: String,
1230 : }
1231 :
1232 : impl SafekeeperPersistence {
1233 0 : fn as_insert_or_update(&self) -> InsertUpdateSafekeeper<'_> {
1234 0 : InsertUpdateSafekeeper {
1235 0 : id: self.id,
1236 0 : region_id: &self.region_id,
1237 0 : version: self.version,
1238 0 : host: &self.host,
1239 0 : port: self.port,
1240 0 : active: self.active,
1241 0 : http_port: self.http_port,
1242 0 : availability_zone_id: &self.availability_zone_id,
1243 0 : }
1244 0 : }
1245 0 : pub(crate) fn as_describe_response(&self) -> SafekeeperDescribeResponse {
1246 0 : // omit the `active` flag on purpose: it is deprecated.
1247 0 : SafekeeperDescribeResponse {
1248 0 : id: NodeId(self.id as u64),
1249 0 : region_id: self.region_id.clone(),
1250 0 : version: self.version,
1251 0 : host: self.host.clone(),
1252 0 : port: self.port,
1253 0 : http_port: self.http_port,
1254 0 : availability_zone_id: self.availability_zone_id.clone(),
1255 0 : }
1256 0 : }
1257 : }
1258 :
1259 0 : #[derive(Insertable, AsChangeset)]
1260 : #[diesel(table_name = crate::schema::safekeepers)]
1261 : struct InsertUpdateSafekeeper<'a> {
1262 : id: i64,
1263 : region_id: &'a str,
1264 : version: i64,
1265 : host: &'a str,
1266 : port: i32,
1267 : active: bool,
1268 : http_port: i32,
1269 : availability_zone_id: &'a str,
1270 : }
|