Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 : use std::time::Instant;
6 :
7 : use self::split_state::SplitState;
8 : use diesel::pg::PgConnection;
9 : use diesel::prelude::*;
10 : use diesel::Connection;
11 : use itertools::Itertools;
12 : use pageserver_api::controller_api::MetadataHealthRecord;
13 : use pageserver_api::controller_api::ShardSchedulingPolicy;
14 : use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
15 : use pageserver_api::models::TenantConfig;
16 : use pageserver_api::shard::ShardConfigError;
17 : use pageserver_api::shard::ShardIdentity;
18 : use pageserver_api::shard::ShardStripeSize;
19 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
20 : use serde::{Deserialize, Serialize};
21 : use utils::generation::Generation;
22 : use utils::id::{NodeId, TenantId};
23 :
24 : use crate::metrics::{
25 : DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
26 : };
27 : use crate::node::Node;
28 :
29 : use diesel_migrations::{embed_migrations, EmbeddedMigrations};
30 : const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
31 :
32 : /// ## What do we store?
33 : ///
34 : /// The storage controller service does not store most of its state durably.
35 : ///
36 : /// The essential things to store durably are:
37 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
38 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
39 : /// - Node's scheduling policies, as the source of truth for these is something external.
40 : ///
41 : /// Other things we store durably as an implementation detail:
42 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
43 : /// but it is operationally simpler to make this service the authority for which nodes
44 : /// it talks to.
45 : ///
46 : /// ## Performance/efficiency
47 : ///
48 : /// The storage controller service does not go via the database for most things: there are
49 : /// a couple of places where we must, and where efficiency matters:
50 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
51 : /// before it can attach a tenant, so this acts as a bound on how fast things like
52 : /// failover can happen.
53 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
54 : /// so it is important to avoid e.g. issuing O(N) queries.
55 : ///
56 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
57 : /// updated, and reads of nodes are always from memory, not the database. We only require that
58 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
59 : pub struct Persistence {
60 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
61 : }
62 :
63 : /// Legacy format, for use in JSON compat objects in test environment
64 0 : #[derive(Serialize, Deserialize)]
65 : struct JsonPersistence {
66 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
67 : }
68 :
69 0 : #[derive(thiserror::Error, Debug)]
70 : pub(crate) enum DatabaseError {
71 : #[error(transparent)]
72 : Query(#[from] diesel::result::Error),
73 : #[error(transparent)]
74 : Connection(#[from] diesel::result::ConnectionError),
75 : #[error(transparent)]
76 : ConnectionPool(#[from] r2d2::Error),
77 : #[error("Logical error: {0}")]
78 : Logical(String),
79 : #[error("Migration error: {0}")]
80 : Migration(String),
81 : }
82 :
83 : #[derive(measured::FixedCardinalityLabel, Copy, Clone)]
84 : pub(crate) enum DatabaseOperation {
85 : InsertNode,
86 : UpdateNode,
87 : DeleteNode,
88 : ListNodes,
89 : BeginShardSplit,
90 : CompleteShardSplit,
91 : AbortShardSplit,
92 : Detach,
93 : ReAttach,
94 : IncrementGeneration,
95 : TenantGenerations,
96 : ShardGenerations,
97 : ListTenantShards,
98 : InsertTenantShards,
99 : UpdateTenantShard,
100 : DeleteTenant,
101 : UpdateTenantConfig,
102 : UpdateMetadataHealth,
103 : ListMetadataHealth,
104 : ListMetadataHealthUnhealthy,
105 : ListMetadataHealthOutdated,
106 : GetLeader,
107 : UpdateLeader,
108 : SetPreferredAzs,
109 : }
110 :
111 : #[must_use]
112 : pub(crate) enum AbortShardSplitStatus {
113 : /// We aborted the split in the database by reverting to the parent shards
114 : Aborted,
115 : /// The split had already been persisted.
116 : Complete,
117 : }
118 :
119 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
120 :
121 : /// Some methods can operate on either a whole tenant or a single shard
122 : pub(crate) enum TenantFilter {
123 : Tenant(TenantId),
124 : Shard(TenantShardId),
125 : }
126 :
127 : /// Represents the results of looking up generation+pageserver for the shards of a tenant
128 : pub(crate) struct ShardGenerationState {
129 : pub(crate) tenant_shard_id: TenantShardId,
130 : pub(crate) generation: Option<Generation>,
131 : pub(crate) generation_pageserver: Option<NodeId>,
132 : }
133 :
134 : impl Persistence {
135 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
136 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
137 : pub const MAX_CONNECTIONS: u32 = 99;
138 :
139 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
140 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
141 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
142 :
143 0 : pub fn new(database_url: String) -> Self {
144 0 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
145 0 :
146 0 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
147 0 : // to execute queries (database queries are not generally on latency-sensitive paths).
148 0 : let connection_pool = diesel::r2d2::Pool::builder()
149 0 : .max_size(Self::MAX_CONNECTIONS)
150 0 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
151 0 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
152 0 : // Always keep at least one connection ready to go
153 0 : .min_idle(Some(1))
154 0 : .test_on_check_out(true)
155 0 : .build(manager)
156 0 : .expect("Could not build connection pool");
157 0 :
158 0 : Self { connection_pool }
159 0 : }
160 :
161 : /// A helper for use during startup, where we would like to tolerate concurrent restarts of the
162 : /// database and the storage controller, therefore the database might not be available right away
163 0 : pub async fn await_connection(
164 0 : database_url: &str,
165 0 : timeout: Duration,
166 0 : ) -> Result<(), diesel::ConnectionError> {
167 0 : let started_at = Instant::now();
168 : loop {
169 0 : match PgConnection::establish(database_url) {
170 : Ok(_) => {
171 0 : tracing::info!("Connected to database.");
172 0 : return Ok(());
173 : }
174 0 : Err(e) => {
175 0 : if started_at.elapsed() > timeout {
176 0 : return Err(e);
177 : } else {
178 0 : tracing::info!("Database not yet available, waiting... ({e})");
179 0 : tokio::time::sleep(Duration::from_millis(100)).await;
180 : }
181 : }
182 : }
183 : }
184 0 : }
185 :
186 : /// Execute the diesel migrations that are built into this binary
187 0 : pub(crate) async fn migration_run(&self) -> DatabaseResult<()> {
188 : use diesel_migrations::{HarnessWithOutput, MigrationHarness};
189 :
190 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
191 0 : HarnessWithOutput::write_to_stdout(conn)
192 0 : .run_pending_migrations(MIGRATIONS)
193 0 : .map(|_| ())
194 0 : .map_err(|e| DatabaseError::Migration(e.to_string()))
195 0 : })
196 0 : .await
197 0 : }
198 :
199 : /// Wraps `with_conn` in order to collect latency and error metrics
200 0 : async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
201 0 : where
202 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
203 0 : R: Send + 'static,
204 0 : {
205 0 : let latency = &METRICS_REGISTRY
206 0 : .metrics_group
207 0 : .storage_controller_database_query_latency;
208 0 : let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op });
209 :
210 0 : let res = self.with_conn(func).await;
211 :
212 0 : if let Err(err) = &res {
213 0 : let error_counter = &METRICS_REGISTRY
214 0 : .metrics_group
215 0 : .storage_controller_database_query_error;
216 0 : error_counter.inc(DatabaseQueryErrorLabelGroup {
217 0 : error_type: err.error_label(),
218 0 : operation: op,
219 0 : })
220 0 : }
221 :
222 0 : res
223 0 : }
224 :
225 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
226 0 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
227 0 : where
228 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
229 0 : R: Send + 'static,
230 0 : {
231 : // A generous allowance for how many times we may retry serializable transactions
232 : // before giving up. This is not expected to be hit: it is a defensive measure in case we
233 : // somehow engineer a situation where duelling transactions might otherwise live-lock.
234 : const MAX_RETRIES: usize = 128;
235 :
236 0 : let mut conn = self.connection_pool.get()?;
237 0 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
238 0 : let mut retry_count = 0;
239 : loop {
240 0 : match conn.build_transaction().serializable().run(|c| func(c)) {
241 0 : Ok(r) => break Ok(r),
242 : Err(
243 0 : err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
244 0 : diesel::result::DatabaseErrorKind::SerializationFailure,
245 0 : _,
246 0 : )),
247 0 : ) => {
248 0 : retry_count += 1;
249 0 : if retry_count > MAX_RETRIES {
250 0 : tracing::error!(
251 0 : "Exceeded max retries on SerializationFailure errors: {err:?}"
252 : );
253 0 : break Err(err);
254 : } else {
255 : // Retry on serialization errors: these are expected, because even though our
256 : // transactions don't fight for the same rows, they will occasionally collide
257 : // on index pages (e.g. increment_generation for unrelated shards can collide)
258 0 : tracing::debug!(
259 0 : "Retrying transaction on serialization failure {err:?}"
260 : );
261 0 : continue;
262 : }
263 : }
264 0 : Err(e) => break Err(e),
265 : }
266 : }
267 0 : })
268 0 : .await
269 0 : .expect("Task panic")
270 0 : }
271 :
272 : /// When a node is first registered, persist it before using it for anything
273 0 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
274 0 : let np = node.to_persistent();
275 0 : self.with_measured_conn(
276 0 : DatabaseOperation::InsertNode,
277 0 : move |conn| -> DatabaseResult<()> {
278 0 : diesel::insert_into(crate::schema::nodes::table)
279 0 : .values(&np)
280 0 : .execute(conn)?;
281 0 : Ok(())
282 0 : },
283 0 : )
284 0 : .await
285 0 : }
286 :
287 : /// At startup, populate the list of nodes which our shards may be placed on
288 0 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
289 0 : let nodes: Vec<NodePersistence> = self
290 0 : .with_measured_conn(
291 0 : DatabaseOperation::ListNodes,
292 0 : move |conn| -> DatabaseResult<_> {
293 0 : Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
294 0 : },
295 0 : )
296 0 : .await?;
297 :
298 0 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
299 :
300 0 : Ok(nodes)
301 0 : }
302 :
303 0 : pub(crate) async fn update_node(
304 0 : &self,
305 0 : input_node_id: NodeId,
306 0 : input_scheduling: NodeSchedulingPolicy,
307 0 : ) -> DatabaseResult<()> {
308 : use crate::schema::nodes::dsl::*;
309 0 : let updated = self
310 0 : .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
311 0 : let updated = diesel::update(nodes)
312 0 : .filter(node_id.eq(input_node_id.0 as i64))
313 0 : .set((scheduling_policy.eq(String::from(input_scheduling)),))
314 0 : .execute(conn)?;
315 0 : Ok(updated)
316 0 : })
317 0 : .await?;
318 :
319 0 : if updated != 1 {
320 0 : Err(DatabaseError::Logical(format!(
321 0 : "Node {node_id:?} not found for update",
322 0 : )))
323 : } else {
324 0 : Ok(())
325 : }
326 0 : }
327 :
328 : /// At startup, load the high level state for shards, such as their config + policy. This will
329 : /// be enriched at runtime with state discovered on pageservers.
330 0 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
331 0 : self.with_measured_conn(
332 0 : DatabaseOperation::ListTenantShards,
333 0 : move |conn| -> DatabaseResult<_> {
334 0 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
335 0 : },
336 0 : )
337 0 : .await
338 0 : }
339 :
340 : /// Tenants must be persisted before we schedule them for the first time. This enables us
341 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
342 0 : pub(crate) async fn insert_tenant_shards(
343 0 : &self,
344 0 : shards: Vec<TenantShardPersistence>,
345 0 : ) -> DatabaseResult<()> {
346 : use crate::schema::metadata_health;
347 : use crate::schema::tenant_shards;
348 :
349 0 : let now = chrono::Utc::now();
350 0 :
351 0 : let metadata_health_records = shards
352 0 : .iter()
353 0 : .map(|t| MetadataHealthPersistence {
354 0 : tenant_id: t.tenant_id.clone(),
355 0 : shard_number: t.shard_number,
356 0 : shard_count: t.shard_count,
357 0 : healthy: true,
358 0 : last_scrubbed_at: now,
359 0 : })
360 0 : .collect::<Vec<_>>();
361 0 :
362 0 : self.with_measured_conn(
363 0 : DatabaseOperation::InsertTenantShards,
364 0 : move |conn| -> DatabaseResult<()> {
365 0 : diesel::insert_into(tenant_shards::table)
366 0 : .values(&shards)
367 0 : .execute(conn)?;
368 :
369 0 : diesel::insert_into(metadata_health::table)
370 0 : .values(&metadata_health_records)
371 0 : .execute(conn)?;
372 0 : Ok(())
373 0 : },
374 0 : )
375 0 : .await
376 0 : }
377 :
378 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
379 : /// the tenant from memory on this server.
380 0 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
381 : use crate::schema::tenant_shards::dsl::*;
382 0 : self.with_measured_conn(
383 0 : DatabaseOperation::DeleteTenant,
384 0 : move |conn| -> DatabaseResult<()> {
385 0 : // `metadata_health` status (if exists) is also deleted based on the cascade behavior.
386 0 : diesel::delete(tenant_shards)
387 0 : .filter(tenant_id.eq(del_tenant_id.to_string()))
388 0 : .execute(conn)?;
389 0 : Ok(())
390 0 : },
391 0 : )
392 0 : .await
393 0 : }
394 :
395 0 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
396 : use crate::schema::nodes::dsl::*;
397 0 : self.with_measured_conn(
398 0 : DatabaseOperation::DeleteNode,
399 0 : move |conn| -> DatabaseResult<()> {
400 0 : diesel::delete(nodes)
401 0 : .filter(node_id.eq(del_node_id.0 as i64))
402 0 : .execute(conn)?;
403 :
404 0 : Ok(())
405 0 : },
406 0 : )
407 0 : .await
408 0 : }
409 :
410 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
411 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
412 : /// the node that called /re-attach.
413 0 : #[tracing::instrument(skip_all, fields(node_id))]
414 : pub(crate) async fn re_attach(
415 : &self,
416 : input_node_id: NodeId,
417 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
418 : use crate::schema::nodes::dsl::scheduling_policy;
419 : use crate::schema::nodes::dsl::*;
420 : use crate::schema::tenant_shards::dsl::*;
421 : let updated = self
422 0 : .with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
423 0 : let rows_updated = diesel::update(tenant_shards)
424 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
425 0 : .set(generation.eq(generation + 1))
426 0 : .execute(conn)?;
427 :
428 0 : tracing::info!("Incremented {} tenants' generations", rows_updated);
429 :
430 : // TODO: UPDATE+SELECT in one query
431 :
432 0 : let updated = tenant_shards
433 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
434 0 : .select(TenantShardPersistence::as_select())
435 0 : .load(conn)?;
436 :
437 : // If the node went through a drain and restart phase before re-attaching,
438 : // then reset it's node scheduling policy to active.
439 0 : diesel::update(nodes)
440 0 : .filter(node_id.eq(input_node_id.0 as i64))
441 0 : .filter(
442 0 : scheduling_policy
443 0 : .eq(String::from(NodeSchedulingPolicy::PauseForRestart))
444 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
445 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
446 0 : )
447 0 : .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
448 0 : .execute(conn)?;
449 :
450 0 : Ok(updated)
451 0 : })
452 : .await?;
453 :
454 : let mut result = HashMap::new();
455 : for tsp in updated {
456 : let tenant_shard_id = TenantShardId {
457 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
458 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
459 : shard_number: ShardNumber(tsp.shard_number as u8),
460 : shard_count: ShardCount::new(tsp.shard_count as u8),
461 : };
462 :
463 : let Some(g) = tsp.generation else {
464 : // If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
465 : // we only set generation_pageserver when setting generation.
466 : return Err(DatabaseError::Logical(
467 : "Generation should always be set after incrementing".to_string(),
468 : ));
469 : };
470 : result.insert(tenant_shard_id, Generation::new(g as u32));
471 : }
472 :
473 : Ok(result)
474 : }
475 :
476 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
477 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
478 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
479 0 : pub(crate) async fn increment_generation(
480 0 : &self,
481 0 : tenant_shard_id: TenantShardId,
482 0 : node_id: NodeId,
483 0 : ) -> anyhow::Result<Generation> {
484 : use crate::schema::tenant_shards::dsl::*;
485 0 : let updated = self
486 0 : .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
487 0 : let updated = diesel::update(tenant_shards)
488 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
489 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
490 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
491 0 : .set((
492 0 : generation.eq(generation + 1),
493 0 : generation_pageserver.eq(node_id.0 as i64),
494 0 : ))
495 0 : // TODO: only returning() the generation column
496 0 : .returning(TenantShardPersistence::as_returning())
497 0 : .get_result(conn)?;
498 :
499 0 : Ok(updated)
500 0 : })
501 0 : .await?;
502 :
503 : // Generation is always non-null in the rseult: if the generation column had been NULL, then we
504 : // should have experienced an SQL Confilict error while executing a query that tries to increment it.
505 0 : debug_assert!(updated.generation.is_some());
506 0 : let Some(g) = updated.generation else {
507 0 : return Err(DatabaseError::Logical(
508 0 : "Generation should always be set after incrementing".to_string(),
509 0 : )
510 0 : .into());
511 : };
512 :
513 0 : Ok(Generation::new(g as u32))
514 0 : }
515 :
516 : /// When we want to call out to the running shards for a tenant, e.g. during timeline CRUD operations,
517 : /// we need to know where the shard is attached, _and_ the generation, so that we can re-check the generation
518 : /// afterwards to confirm that our timeline CRUD operation is truly persistent (it must have happened in the
519 : /// latest generation)
520 : ///
521 : /// If the tenant doesn't exist, an empty vector is returned.
522 : ///
523 : /// Output is sorted by shard number
524 0 : pub(crate) async fn tenant_generations(
525 0 : &self,
526 0 : filter_tenant_id: TenantId,
527 0 : ) -> Result<Vec<ShardGenerationState>, DatabaseError> {
528 : use crate::schema::tenant_shards::dsl::*;
529 0 : let rows = self
530 0 : .with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| {
531 0 : let result = tenant_shards
532 0 : .filter(tenant_id.eq(filter_tenant_id.to_string()))
533 0 : .select(TenantShardPersistence::as_select())
534 0 : .order(shard_number)
535 0 : .load(conn)?;
536 0 : Ok(result)
537 0 : })
538 0 : .await?;
539 :
540 0 : Ok(rows
541 0 : .into_iter()
542 0 : .map(|p| ShardGenerationState {
543 0 : tenant_shard_id: p
544 0 : .get_tenant_shard_id()
545 0 : .expect("Corrupt tenant shard id in database"),
546 0 : generation: p.generation.map(|g| Generation::new(g as u32)),
547 0 : generation_pageserver: p.generation_pageserver.map(|n| NodeId(n as u64)),
548 0 : })
549 0 : .collect())
550 0 : }
551 :
552 : /// Read the generation number of specific tenant shards
553 : ///
554 : /// Output is unsorted. Output may not include values for all inputs, if they are missing in the database.
555 0 : pub(crate) async fn shard_generations(
556 0 : &self,
557 0 : mut tenant_shard_ids: impl Iterator<Item = &TenantShardId>,
558 0 : ) -> Result<Vec<(TenantShardId, Option<Generation>)>, DatabaseError> {
559 0 : let mut rows = Vec::with_capacity(tenant_shard_ids.size_hint().0);
560 :
561 : // We will chunk our input to avoid composing arbitrarily long `IN` clauses. Typically we are
562 : // called with a single digit number of IDs, but in principle we could be called with tens
563 : // of thousands (all the shards on one pageserver) from the generation validation API.
564 0 : loop {
565 0 : // A modest hardcoded chunk size to handle typical cases in a single query but never generate particularly
566 0 : // large query strings.
567 0 : let chunk_ids = tenant_shard_ids.by_ref().take(32);
568 0 :
569 0 : // Compose a comma separated list of tuples for matching on (tenant_id, shard_number, shard_count)
570 0 : let in_clause = chunk_ids
571 0 : .map(|tsid| {
572 0 : format!(
573 0 : "('{}', {}, {})",
574 0 : tsid.tenant_id, tsid.shard_number.0, tsid.shard_count.0
575 0 : )
576 0 : })
577 0 : .join(",");
578 0 :
579 0 : // We are done when our iterator gives us nothing to filter on
580 0 : if in_clause.is_empty() {
581 0 : break;
582 0 : }
583 :
584 0 : let chunk_rows = self
585 0 : .with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| {
586 : // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because
587 : // the inputs are strongly typed and cannot carry any user-supplied raw string content.
588 0 : let result : Vec<TenantShardPersistence> = diesel::sql_query(
589 0 : format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str()
590 0 : ).load(conn)?;
591 :
592 0 : Ok(result)
593 0 : })
594 0 : .await?;
595 0 : rows.extend(chunk_rows.into_iter())
596 : }
597 :
598 0 : Ok(rows
599 0 : .into_iter()
600 0 : .map(|tsp| {
601 0 : (
602 0 : tsp.get_tenant_shard_id()
603 0 : .expect("Bad tenant ID in database"),
604 0 : tsp.generation.map(|g| Generation::new(g as u32)),
605 0 : )
606 0 : })
607 0 : .collect())
608 0 : }
609 :
610 : #[allow(non_local_definitions)]
611 : /// For use when updating a persistent property of a tenant, such as its config or placement_policy.
612 : ///
613 : /// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
614 : /// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
615 : /// that we only do the first time a tenant is set to an attached policy via /location_config.
616 0 : pub(crate) async fn update_tenant_shard(
617 0 : &self,
618 0 : tenant: TenantFilter,
619 0 : input_placement_policy: Option<PlacementPolicy>,
620 0 : input_config: Option<TenantConfig>,
621 0 : input_generation: Option<Generation>,
622 0 : input_scheduling_policy: Option<ShardSchedulingPolicy>,
623 0 : ) -> DatabaseResult<()> {
624 : use crate::schema::tenant_shards::dsl::*;
625 :
626 0 : self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
627 0 : let query = match tenant {
628 0 : TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
629 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
630 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
631 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
632 0 : .into_boxed(),
633 0 : TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
634 0 : .filter(tenant_id.eq(input_tenant_id.to_string()))
635 0 : .into_boxed(),
636 : };
637 :
638 0 : #[derive(AsChangeset)]
639 : #[diesel(table_name = crate::schema::tenant_shards)]
640 : struct ShardUpdate {
641 : generation: Option<i32>,
642 : placement_policy: Option<String>,
643 : config: Option<String>,
644 : scheduling_policy: Option<String>,
645 : }
646 :
647 0 : let update = ShardUpdate {
648 0 : generation: input_generation.map(|g| g.into().unwrap() as i32),
649 0 : placement_policy: input_placement_policy
650 0 : .as_ref()
651 0 : .map(|p| serde_json::to_string(&p).unwrap()),
652 0 : config: input_config
653 0 : .as_ref()
654 0 : .map(|c| serde_json::to_string(&c).unwrap()),
655 0 : scheduling_policy: input_scheduling_policy
656 0 : .map(|p| serde_json::to_string(&p).unwrap()),
657 0 : };
658 0 :
659 0 : query.set(update).execute(conn)?;
660 :
661 0 : Ok(())
662 0 : })
663 0 : .await?;
664 :
665 0 : Ok(())
666 0 : }
667 :
668 0 : pub(crate) async fn set_tenant_shard_preferred_azs(
669 0 : &self,
670 0 : preferred_azs: Vec<(TenantShardId, String)>,
671 0 : ) -> DatabaseResult<Vec<(TenantShardId, String)>> {
672 : use crate::schema::tenant_shards::dsl::*;
673 :
674 0 : self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
675 0 : let mut shards_updated = Vec::default();
676 :
677 0 : for (tenant_shard_id, preferred_az) in preferred_azs.iter() {
678 0 : let updated = diesel::update(tenant_shards)
679 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
680 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
681 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
682 0 : .set(preferred_az_id.eq(preferred_az))
683 0 : .execute(conn)?;
684 :
685 0 : if updated == 1 {
686 0 : shards_updated.push((*tenant_shard_id, preferred_az.clone()));
687 0 : }
688 : }
689 :
690 0 : Ok(shards_updated)
691 0 : })
692 0 : .await
693 0 : }
694 :
695 0 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
696 : use crate::schema::tenant_shards::dsl::*;
697 0 : self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
698 0 : let updated = diesel::update(tenant_shards)
699 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
700 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
701 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
702 0 : .set((
703 0 : generation_pageserver.eq(Option::<i64>::None),
704 0 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
705 0 : ))
706 0 : .execute(conn)?;
707 :
708 0 : Ok(updated)
709 0 : })
710 0 : .await?;
711 :
712 0 : Ok(())
713 0 : }
714 :
715 : // When we start shard splitting, we must durably mark the tenant so that
716 : // on restart, we know that we must go through recovery.
717 : //
718 : // We create the child shards here, so that they will be available for increment_generation calls
719 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
720 0 : pub(crate) async fn begin_shard_split(
721 0 : &self,
722 0 : old_shard_count: ShardCount,
723 0 : split_tenant_id: TenantId,
724 0 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
725 0 : ) -> DatabaseResult<()> {
726 : use crate::schema::tenant_shards::dsl::*;
727 0 : self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
728 : // Mark parent shards as splitting
729 :
730 0 : let updated = diesel::update(tenant_shards)
731 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
732 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
733 0 : .set((splitting.eq(1),))
734 0 : .execute(conn)?;
735 0 : if u8::try_from(updated)
736 0 : .map_err(|_| DatabaseError::Logical(
737 0 : format!("Overflow existing shard count {} while splitting", updated))
738 0 : )? != old_shard_count.count() {
739 : // Perhaps a deletion or another split raced with this attempt to split, mutating
740 : // the parent shards that we intend to split. In this case the split request should fail.
741 0 : return Err(DatabaseError::Logical(
742 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
743 0 : ));
744 0 : }
745 0 :
746 0 : // FIXME: spurious clone to sidestep closure move rules
747 0 : let parent_to_children = parent_to_children.clone();
748 :
749 : // Insert child shards
750 0 : for (parent_shard_id, children) in parent_to_children {
751 0 : let mut parent = crate::schema::tenant_shards::table
752 0 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
753 0 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
754 0 : .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
755 0 : .load::<TenantShardPersistence>(conn)?;
756 0 : let parent = if parent.len() != 1 {
757 0 : return Err(DatabaseError::Logical(format!(
758 0 : "Parent shard {parent_shard_id} not found"
759 0 : )));
760 : } else {
761 0 : parent.pop().unwrap()
762 : };
763 0 : for mut shard in children {
764 : // Carry the parent's generation into the child
765 0 : shard.generation = parent.generation;
766 0 :
767 0 : debug_assert!(shard.splitting == SplitState::Splitting);
768 0 : diesel::insert_into(tenant_shards)
769 0 : .values(shard)
770 0 : .execute(conn)?;
771 : }
772 : }
773 :
774 0 : Ok(())
775 0 : })
776 0 : .await
777 0 : }
778 :
779 : // When we finish shard splitting, we must atomically clean up the old shards
780 : // and insert the new shards, and clear the splitting marker.
781 0 : pub(crate) async fn complete_shard_split(
782 0 : &self,
783 0 : split_tenant_id: TenantId,
784 0 : old_shard_count: ShardCount,
785 0 : ) -> DatabaseResult<()> {
786 : use crate::schema::tenant_shards::dsl::*;
787 0 : self.with_measured_conn(
788 0 : DatabaseOperation::CompleteShardSplit,
789 0 : move |conn| -> DatabaseResult<()> {
790 0 : // Drop parent shards
791 0 : diesel::delete(tenant_shards)
792 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
793 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
794 0 : .execute(conn)?;
795 :
796 : // Clear sharding flag
797 0 : let updated = diesel::update(tenant_shards)
798 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
799 0 : .set((splitting.eq(0),))
800 0 : .execute(conn)?;
801 0 : debug_assert!(updated > 0);
802 :
803 0 : Ok(())
804 0 : },
805 0 : )
806 0 : .await
807 0 : }
808 :
809 : /// Used when the remote part of a shard split failed: we will revert the database state to have only
810 : /// the parent shards, with SplitState::Idle.
811 0 : pub(crate) async fn abort_shard_split(
812 0 : &self,
813 0 : split_tenant_id: TenantId,
814 0 : new_shard_count: ShardCount,
815 0 : ) -> DatabaseResult<AbortShardSplitStatus> {
816 : use crate::schema::tenant_shards::dsl::*;
817 0 : self.with_measured_conn(
818 0 : DatabaseOperation::AbortShardSplit,
819 0 : move |conn| -> DatabaseResult<AbortShardSplitStatus> {
820 : // Clear the splitting state on parent shards
821 0 : let updated = diesel::update(tenant_shards)
822 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
823 0 : .filter(shard_count.ne(new_shard_count.literal() as i32))
824 0 : .set((splitting.eq(0),))
825 0 : .execute(conn)?;
826 :
827 : // Parent shards are already gone: we cannot abort.
828 0 : if updated == 0 {
829 0 : return Ok(AbortShardSplitStatus::Complete);
830 0 : }
831 0 :
832 0 : // Sanity check: if parent shards were present, their cardinality should
833 0 : // be less than the number of child shards.
834 0 : if updated >= new_shard_count.count() as usize {
835 0 : return Err(DatabaseError::Logical(format!(
836 0 : "Unexpected parent shard count {updated} while aborting split to \
837 0 : count {new_shard_count:?} on tenant {split_tenant_id}"
838 0 : )));
839 0 : }
840 0 :
841 0 : // Erase child shards
842 0 : diesel::delete(tenant_shards)
843 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
844 0 : .filter(shard_count.eq(new_shard_count.literal() as i32))
845 0 : .execute(conn)?;
846 :
847 0 : Ok(AbortShardSplitStatus::Aborted)
848 0 : },
849 0 : )
850 0 : .await
851 0 : }
852 :
853 : /// Stores all the latest metadata health updates durably. Updates existing entry on conflict.
854 : ///
855 : /// **Correctness:** `metadata_health_updates` should all belong the tenant shards managed by the storage controller.
856 : #[allow(dead_code)]
857 0 : pub(crate) async fn update_metadata_health_records(
858 0 : &self,
859 0 : healthy_records: Vec<MetadataHealthPersistence>,
860 0 : unhealthy_records: Vec<MetadataHealthPersistence>,
861 0 : now: chrono::DateTime<chrono::Utc>,
862 0 : ) -> DatabaseResult<()> {
863 : use crate::schema::metadata_health::dsl::*;
864 :
865 0 : self.with_measured_conn(
866 0 : DatabaseOperation::UpdateMetadataHealth,
867 0 : move |conn| -> DatabaseResult<_> {
868 0 : diesel::insert_into(metadata_health)
869 0 : .values(&healthy_records)
870 0 : .on_conflict((tenant_id, shard_number, shard_count))
871 0 : .do_update()
872 0 : .set((healthy.eq(true), last_scrubbed_at.eq(now)))
873 0 : .execute(conn)?;
874 :
875 0 : diesel::insert_into(metadata_health)
876 0 : .values(&unhealthy_records)
877 0 : .on_conflict((tenant_id, shard_number, shard_count))
878 0 : .do_update()
879 0 : .set((healthy.eq(false), last_scrubbed_at.eq(now)))
880 0 : .execute(conn)?;
881 0 : Ok(())
882 0 : },
883 0 : )
884 0 : .await
885 0 : }
886 :
887 : /// Lists all the metadata health records.
888 : #[allow(dead_code)]
889 0 : pub(crate) async fn list_metadata_health_records(
890 0 : &self,
891 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
892 0 : self.with_measured_conn(
893 0 : DatabaseOperation::ListMetadataHealth,
894 0 : move |conn| -> DatabaseResult<_> {
895 0 : Ok(
896 0 : crate::schema::metadata_health::table
897 0 : .load::<MetadataHealthPersistence>(conn)?,
898 : )
899 0 : },
900 0 : )
901 0 : .await
902 0 : }
903 :
904 : /// Lists all the metadata health records that is unhealthy.
905 : #[allow(dead_code)]
906 0 : pub(crate) async fn list_unhealthy_metadata_health_records(
907 0 : &self,
908 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
909 : use crate::schema::metadata_health::dsl::*;
910 0 : self.with_measured_conn(
911 0 : DatabaseOperation::ListMetadataHealthUnhealthy,
912 0 : move |conn| -> DatabaseResult<_> {
913 0 : Ok(crate::schema::metadata_health::table
914 0 : .filter(healthy.eq(false))
915 0 : .load::<MetadataHealthPersistence>(conn)?)
916 0 : },
917 0 : )
918 0 : .await
919 0 : }
920 :
921 : /// Lists all the metadata health records that have not been updated since an `earlier` time.
922 : #[allow(dead_code)]
923 0 : pub(crate) async fn list_outdated_metadata_health_records(
924 0 : &self,
925 0 : earlier: chrono::DateTime<chrono::Utc>,
926 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
927 : use crate::schema::metadata_health::dsl::*;
928 :
929 0 : self.with_measured_conn(
930 0 : DatabaseOperation::ListMetadataHealthOutdated,
931 0 : move |conn| -> DatabaseResult<_> {
932 0 : let query = metadata_health.filter(last_scrubbed_at.lt(earlier));
933 0 : let res = query.load::<MetadataHealthPersistence>(conn)?;
934 :
935 0 : Ok(res)
936 0 : },
937 0 : )
938 0 : .await
939 0 : }
940 :
941 : /// Get the current entry from the `leader` table if one exists.
942 : /// It is an error for the table to contain more than one entry.
943 0 : pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
944 0 : let mut leader: Vec<ControllerPersistence> = self
945 0 : .with_measured_conn(
946 0 : DatabaseOperation::GetLeader,
947 0 : move |conn| -> DatabaseResult<_> {
948 0 : Ok(crate::schema::controllers::table.load::<ControllerPersistence>(conn)?)
949 0 : },
950 0 : )
951 0 : .await?;
952 :
953 0 : if leader.len() > 1 {
954 0 : return Err(DatabaseError::Logical(format!(
955 0 : "More than one entry present in the leader table: {leader:?}"
956 0 : )));
957 0 : }
958 0 :
959 0 : Ok(leader.pop())
960 0 : }
961 :
962 : /// Update the new leader with compare-exchange semantics. If `prev` does not
963 : /// match the current leader entry, then the update is treated as a failure.
964 : /// When `prev` is not specified, the update is forced.
965 0 : pub(crate) async fn update_leader(
966 0 : &self,
967 0 : prev: Option<ControllerPersistence>,
968 0 : new: ControllerPersistence,
969 0 : ) -> DatabaseResult<()> {
970 : use crate::schema::controllers::dsl::*;
971 :
972 0 : let updated = self
973 0 : .with_measured_conn(
974 0 : DatabaseOperation::UpdateLeader,
975 0 : move |conn| -> DatabaseResult<usize> {
976 0 : let updated = match &prev {
977 0 : Some(prev) => diesel::update(controllers)
978 0 : .filter(address.eq(prev.address.clone()))
979 0 : .filter(started_at.eq(prev.started_at))
980 0 : .set((
981 0 : address.eq(new.address.clone()),
982 0 : started_at.eq(new.started_at),
983 0 : ))
984 0 : .execute(conn)?,
985 0 : None => diesel::insert_into(controllers)
986 0 : .values(new.clone())
987 0 : .execute(conn)?,
988 : };
989 :
990 0 : Ok(updated)
991 0 : },
992 0 : )
993 0 : .await?;
994 :
995 0 : if updated == 0 {
996 0 : return Err(DatabaseError::Logical(
997 0 : "Leader table update failed".to_string(),
998 0 : ));
999 0 : }
1000 0 :
1001 0 : Ok(())
1002 0 : }
1003 :
1004 0 : pub(crate) async fn safekeeper_get(
1005 0 : &self,
1006 0 : id: i64,
1007 0 : ) -> Result<SafekeeperPersistence, DatabaseError> {
1008 : use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
1009 0 : self.with_conn(move |conn| -> DatabaseResult<SafekeeperPersistence> {
1010 0 : Ok(safekeepers
1011 0 : .filter(id_column.eq(&id))
1012 0 : .select(SafekeeperPersistence::as_select())
1013 0 : .get_result(conn)?)
1014 0 : })
1015 0 : .await
1016 0 : }
1017 :
1018 0 : pub(crate) async fn safekeeper_upsert(
1019 0 : &self,
1020 0 : record: SafekeeperPersistence,
1021 0 : ) -> Result<(), DatabaseError> {
1022 : use crate::schema::safekeepers::dsl::*;
1023 :
1024 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
1025 0 : let bind = record.as_insert_or_update();
1026 :
1027 0 : let inserted_updated = diesel::insert_into(safekeepers)
1028 0 : .values(&bind)
1029 0 : .on_conflict(id)
1030 0 : .do_update()
1031 0 : .set(&bind)
1032 0 : .execute(conn)?;
1033 :
1034 0 : if inserted_updated != 1 {
1035 0 : return Err(DatabaseError::Logical(format!(
1036 0 : "unexpected number of rows ({})",
1037 0 : inserted_updated
1038 0 : )));
1039 0 : }
1040 0 :
1041 0 : Ok(())
1042 0 : })
1043 0 : .await
1044 0 : }
1045 : }
1046 :
1047 : /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
1048 : #[derive(
1049 0 : QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,
1050 : )]
1051 : #[diesel(table_name = crate::schema::tenant_shards)]
1052 : pub(crate) struct TenantShardPersistence {
1053 : #[serde(default)]
1054 : pub(crate) tenant_id: String,
1055 : #[serde(default)]
1056 : pub(crate) shard_number: i32,
1057 : #[serde(default)]
1058 : pub(crate) shard_count: i32,
1059 : #[serde(default)]
1060 : pub(crate) shard_stripe_size: i32,
1061 :
1062 : // Latest generation number: next time we attach, increment this
1063 : // and use the incremented number when attaching.
1064 : //
1065 : // Generation is only None when first onboarding a tenant, where it may
1066 : // be in PlacementPolicy::Secondary and therefore have no valid generation state.
1067 : pub(crate) generation: Option<i32>,
1068 :
1069 : // Currently attached pageserver
1070 : #[serde(rename = "pageserver")]
1071 : pub(crate) generation_pageserver: Option<i64>,
1072 :
1073 : #[serde(default)]
1074 : pub(crate) placement_policy: String,
1075 : #[serde(default)]
1076 : pub(crate) splitting: SplitState,
1077 : #[serde(default)]
1078 : pub(crate) config: String,
1079 : #[serde(default)]
1080 : pub(crate) scheduling_policy: String,
1081 :
1082 : // Hint that we should attempt to schedule this tenant shard the given
1083 : // availability zone in order to minimise the chances of cross-AZ communication
1084 : // with compute.
1085 : pub(crate) preferred_az_id: Option<String>,
1086 : }
1087 :
1088 : impl TenantShardPersistence {
1089 0 : pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
1090 0 : if self.shard_count == 0 {
1091 0 : Ok(ShardIdentity::unsharded())
1092 : } else {
1093 0 : Ok(ShardIdentity::new(
1094 0 : ShardNumber(self.shard_number as u8),
1095 0 : ShardCount::new(self.shard_count as u8),
1096 0 : ShardStripeSize(self.shard_stripe_size as u32),
1097 0 : )?)
1098 : }
1099 0 : }
1100 :
1101 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
1102 0 : Ok(TenantShardId {
1103 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
1104 0 : shard_number: ShardNumber(self.shard_number as u8),
1105 0 : shard_count: ShardCount::new(self.shard_count as u8),
1106 : })
1107 0 : }
1108 : }
1109 :
1110 : /// Parts of [`crate::node::Node`] that are stored durably
1111 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
1112 : #[diesel(table_name = crate::schema::nodes)]
1113 : pub(crate) struct NodePersistence {
1114 : pub(crate) node_id: i64,
1115 : pub(crate) scheduling_policy: String,
1116 : pub(crate) listen_http_addr: String,
1117 : pub(crate) listen_http_port: i32,
1118 : pub(crate) listen_pg_addr: String,
1119 : pub(crate) listen_pg_port: i32,
1120 : pub(crate) availability_zone_id: String,
1121 : }
1122 :
1123 : /// Tenant metadata health status that are stored durably.
1124 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
1125 : #[diesel(table_name = crate::schema::metadata_health)]
1126 : pub(crate) struct MetadataHealthPersistence {
1127 : #[serde(default)]
1128 : pub(crate) tenant_id: String,
1129 : #[serde(default)]
1130 : pub(crate) shard_number: i32,
1131 : #[serde(default)]
1132 : pub(crate) shard_count: i32,
1133 :
1134 : pub(crate) healthy: bool,
1135 : pub(crate) last_scrubbed_at: chrono::DateTime<chrono::Utc>,
1136 : }
1137 :
1138 : impl MetadataHealthPersistence {
1139 0 : pub fn new(
1140 0 : tenant_shard_id: TenantShardId,
1141 0 : healthy: bool,
1142 0 : last_scrubbed_at: chrono::DateTime<chrono::Utc>,
1143 0 : ) -> Self {
1144 0 : let tenant_id = tenant_shard_id.tenant_id.to_string();
1145 0 : let shard_number = tenant_shard_id.shard_number.0 as i32;
1146 0 : let shard_count = tenant_shard_id.shard_count.literal() as i32;
1147 0 :
1148 0 : MetadataHealthPersistence {
1149 0 : tenant_id,
1150 0 : shard_number,
1151 0 : shard_count,
1152 0 : healthy,
1153 0 : last_scrubbed_at,
1154 0 : }
1155 0 : }
1156 :
1157 : #[allow(dead_code)]
1158 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
1159 0 : Ok(TenantShardId {
1160 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
1161 0 : shard_number: ShardNumber(self.shard_number as u8),
1162 0 : shard_count: ShardCount::new(self.shard_count as u8),
1163 : })
1164 0 : }
1165 : }
1166 :
1167 : impl From<MetadataHealthPersistence> for MetadataHealthRecord {
1168 0 : fn from(value: MetadataHealthPersistence) -> Self {
1169 0 : MetadataHealthRecord {
1170 0 : tenant_shard_id: value
1171 0 : .get_tenant_shard_id()
1172 0 : .expect("stored tenant id should be valid"),
1173 0 : healthy: value.healthy,
1174 0 : last_scrubbed_at: value.last_scrubbed_at,
1175 0 : }
1176 0 : }
1177 : }
1178 :
1179 : #[derive(
1180 0 : Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone,
1181 : )]
1182 : #[diesel(table_name = crate::schema::controllers)]
1183 : pub(crate) struct ControllerPersistence {
1184 : pub(crate) address: String,
1185 : pub(crate) started_at: chrono::DateTime<chrono::Utc>,
1186 : }
1187 :
1188 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Eq, PartialEq, Debug, Clone)]
1189 : #[diesel(table_name = crate::schema::safekeepers)]
1190 : pub(crate) struct SafekeeperPersistence {
1191 : pub(crate) id: i64,
1192 : pub(crate) region_id: String,
1193 : /// 1 is special, it means just created (not currently posted to storcon).
1194 : /// Zero or negative is not really expected.
1195 : /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
1196 : pub(crate) version: i64,
1197 : pub(crate) host: String,
1198 : pub(crate) port: i32,
1199 : pub(crate) active: bool,
1200 : pub(crate) http_port: i32,
1201 : pub(crate) availability_zone_id: String,
1202 : }
1203 :
1204 : impl SafekeeperPersistence {
1205 0 : fn as_insert_or_update(&self) -> InsertUpdateSafekeeper<'_> {
1206 0 : InsertUpdateSafekeeper {
1207 0 : id: self.id,
1208 0 : region_id: &self.region_id,
1209 0 : version: self.version,
1210 0 : host: &self.host,
1211 0 : port: self.port,
1212 0 : active: self.active,
1213 0 : http_port: self.http_port,
1214 0 : availability_zone_id: &self.availability_zone_id,
1215 0 : }
1216 0 : }
1217 : }
1218 :
1219 0 : #[derive(Insertable, AsChangeset)]
1220 : #[diesel(table_name = crate::schema::safekeepers)]
1221 : struct InsertUpdateSafekeeper<'a> {
1222 : id: i64,
1223 : region_id: &'a str,
1224 : version: i64,
1225 : host: &'a str,
1226 : port: i32,
1227 : active: bool,
1228 : http_port: i32,
1229 : availability_zone_id: &'a str,
1230 : }
|