Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 : use std::time::Instant;
6 :
7 : use self::split_state::SplitState;
8 : use diesel::pg::PgConnection;
9 : use diesel::prelude::*;
10 : use diesel::Connection;
11 : use pageserver_api::controller_api::MetadataHealthRecord;
12 : use pageserver_api::controller_api::ShardSchedulingPolicy;
13 : use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
14 : use pageserver_api::models::TenantConfig;
15 : use pageserver_api::shard::ShardConfigError;
16 : use pageserver_api::shard::ShardIdentity;
17 : use pageserver_api::shard::ShardStripeSize;
18 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
19 : use serde::{Deserialize, Serialize};
20 : use utils::generation::Generation;
21 : use utils::id::{NodeId, TenantId};
22 :
23 : use crate::metrics::{
24 : DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
25 : };
26 : use crate::node::Node;
27 :
28 : use diesel_migrations::{embed_migrations, EmbeddedMigrations};
29 : const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
30 :
31 : /// ## What do we store?
32 : ///
33 : /// The storage controller service does not store most of its state durably.
34 : ///
35 : /// The essential things to store durably are:
36 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
37 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
38 : /// - Node's scheduling policies, as the source of truth for these is something external.
39 : ///
40 : /// Other things we store durably as an implementation detail:
41 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
42 : /// but it is operationally simpler to make this service the authority for which nodes
43 : /// it talks to.
44 : ///
45 : /// ## Performance/efficiency
46 : ///
47 : /// The storage controller service does not go via the database for most things: there are
48 : /// a couple of places where we must, and where efficiency matters:
49 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
50 : /// before it can attach a tenant, so this acts as a bound on how fast things like
51 : /// failover can happen.
52 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
53 : /// so it is important to avoid e.g. issuing O(N) queries.
54 : ///
55 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
56 : /// updated, and reads of nodes are always from memory, not the database. We only require that
57 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
58 : pub struct Persistence {
59 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
60 : }
61 :
62 : /// Legacy format, for use in JSON compat objects in test environment
63 0 : #[derive(Serialize, Deserialize)]
64 : struct JsonPersistence {
65 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
66 : }
67 :
68 0 : #[derive(thiserror::Error, Debug)]
69 : pub(crate) enum DatabaseError {
70 : #[error(transparent)]
71 : Query(#[from] diesel::result::Error),
72 : #[error(transparent)]
73 : Connection(#[from] diesel::result::ConnectionError),
74 : #[error(transparent)]
75 : ConnectionPool(#[from] r2d2::Error),
76 : #[error("Logical error: {0}")]
77 : Logical(String),
78 : #[error("Migration error: {0}")]
79 : Migration(String),
80 : }
81 :
82 : #[derive(measured::FixedCardinalityLabel, Copy, Clone)]
83 : pub(crate) enum DatabaseOperation {
84 : InsertNode,
85 : UpdateNode,
86 : DeleteNode,
87 : ListNodes,
88 : BeginShardSplit,
89 : CompleteShardSplit,
90 : AbortShardSplit,
91 : Detach,
92 : ReAttach,
93 : IncrementGeneration,
94 : PeekGenerations,
95 : ListTenantShards,
96 : InsertTenantShards,
97 : UpdateTenantShard,
98 : DeleteTenant,
99 : UpdateTenantConfig,
100 : UpdateMetadataHealth,
101 : ListMetadataHealth,
102 : ListMetadataHealthUnhealthy,
103 : ListMetadataHealthOutdated,
104 : GetLeader,
105 : UpdateLeader,
106 : SetNodeAzId,
107 : }
108 :
109 : #[must_use]
110 : pub(crate) enum AbortShardSplitStatus {
111 : /// We aborted the split in the database by reverting to the parent shards
112 : Aborted,
113 : /// The split had already been persisted.
114 : Complete,
115 : }
116 :
117 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
118 :
119 : /// Some methods can operate on either a whole tenant or a single shard
120 : pub(crate) enum TenantFilter {
121 : Tenant(TenantId),
122 : Shard(TenantShardId),
123 : }
124 :
125 : /// Represents the results of looking up generation+pageserver for the shards of a tenant
126 : pub(crate) struct ShardGenerationState {
127 : pub(crate) tenant_shard_id: TenantShardId,
128 : pub(crate) generation: Option<Generation>,
129 : pub(crate) generation_pageserver: Option<NodeId>,
130 : }
131 :
132 : impl Persistence {
133 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
134 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
135 : pub const MAX_CONNECTIONS: u32 = 99;
136 :
137 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
138 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
139 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
140 :
141 0 : pub fn new(database_url: String) -> Self {
142 0 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
143 0 :
144 0 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
145 0 : // to execute queries (database queries are not generally on latency-sensitive paths).
146 0 : let connection_pool = diesel::r2d2::Pool::builder()
147 0 : .max_size(Self::MAX_CONNECTIONS)
148 0 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
149 0 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
150 0 : // Always keep at least one connection ready to go
151 0 : .min_idle(Some(1))
152 0 : .test_on_check_out(true)
153 0 : .build(manager)
154 0 : .expect("Could not build connection pool");
155 0 :
156 0 : Self { connection_pool }
157 0 : }
158 :
159 : /// A helper for use during startup, where we would like to tolerate concurrent restarts of the
160 : /// database and the storage controller, therefore the database might not be available right away
161 0 : pub async fn await_connection(
162 0 : database_url: &str,
163 0 : timeout: Duration,
164 0 : ) -> Result<(), diesel::ConnectionError> {
165 0 : let started_at = Instant::now();
166 0 : loop {
167 0 : match PgConnection::establish(database_url) {
168 : Ok(_) => {
169 0 : tracing::info!("Connected to database.");
170 0 : return Ok(());
171 : }
172 0 : Err(e) => {
173 0 : if started_at.elapsed() > timeout {
174 0 : return Err(e);
175 : } else {
176 0 : tracing::info!("Database not yet available, waiting... ({e})");
177 0 : tokio::time::sleep(Duration::from_millis(100)).await;
178 : }
179 : }
180 : }
181 : }
182 0 : }
183 :
184 : /// Execute the diesel migrations that are built into this binary
185 0 : pub(crate) async fn migration_run(&self) -> DatabaseResult<()> {
186 0 : use diesel_migrations::{HarnessWithOutput, MigrationHarness};
187 0 :
188 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
189 0 : HarnessWithOutput::write_to_stdout(conn)
190 0 : .run_pending_migrations(MIGRATIONS)
191 0 : .map(|_| ())
192 0 : .map_err(|e| DatabaseError::Migration(e.to_string()))
193 0 : })
194 0 : .await
195 0 : }
196 :
197 : /// Wraps `with_conn` in order to collect latency and error metrics
198 0 : async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
199 0 : where
200 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
201 0 : R: Send + 'static,
202 0 : {
203 0 : let latency = &METRICS_REGISTRY
204 0 : .metrics_group
205 0 : .storage_controller_database_query_latency;
206 0 : let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op });
207 :
208 0 : let res = self.with_conn(func).await;
209 :
210 0 : if let Err(err) = &res {
211 0 : let error_counter = &METRICS_REGISTRY
212 0 : .metrics_group
213 0 : .storage_controller_database_query_error;
214 0 : error_counter.inc(DatabaseQueryErrorLabelGroup {
215 0 : error_type: err.error_label(),
216 0 : operation: op,
217 0 : })
218 0 : }
219 :
220 0 : res
221 0 : }
222 :
223 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
224 0 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
225 0 : where
226 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
227 0 : R: Send + 'static,
228 0 : {
229 : // A generous allowance for how many times we may retry serializable transactions
230 : // before giving up. This is not expected to be hit: it is a defensive measure in case we
231 : // somehow engineer a situation where duelling transactions might otherwise live-lock.
232 : const MAX_RETRIES: usize = 128;
233 :
234 0 : let mut conn = self.connection_pool.get()?;
235 0 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
236 0 : let mut retry_count = 0;
237 : loop {
238 0 : match conn.build_transaction().serializable().run(|c| func(c)) {
239 0 : Ok(r) => break Ok(r),
240 : Err(
241 0 : err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
242 0 : diesel::result::DatabaseErrorKind::SerializationFailure,
243 0 : _,
244 0 : )),
245 0 : ) => {
246 0 : retry_count += 1;
247 0 : if retry_count > MAX_RETRIES {
248 0 : tracing::error!(
249 0 : "Exceeded max retries on SerializationFailure errors: {err:?}"
250 : );
251 0 : break Err(err);
252 : } else {
253 : // Retry on serialization errors: these are expected, because even though our
254 : // transactions don't fight for the same rows, they will occasionally collide
255 : // on index pages (e.g. increment_generation for unrelated shards can collide)
256 0 : tracing::debug!(
257 0 : "Retrying transaction on serialization failure {err:?}"
258 : );
259 0 : continue;
260 : }
261 : }
262 0 : Err(e) => break Err(e),
263 : }
264 : }
265 0 : })
266 0 : .await
267 0 : .expect("Task panic")
268 0 : }
269 :
270 : /// When a node is first registered, persist it before using it for anything
271 0 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
272 0 : let np = node.to_persistent();
273 0 : self.with_measured_conn(
274 0 : DatabaseOperation::InsertNode,
275 0 : move |conn| -> DatabaseResult<()> {
276 0 : diesel::insert_into(crate::schema::nodes::table)
277 0 : .values(&np)
278 0 : .execute(conn)?;
279 0 : Ok(())
280 0 : },
281 0 : )
282 0 : .await
283 0 : }
284 :
285 : /// At startup, populate the list of nodes which our shards may be placed on
286 0 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
287 0 : let nodes: Vec<NodePersistence> = self
288 0 : .with_measured_conn(
289 0 : DatabaseOperation::ListNodes,
290 0 : move |conn| -> DatabaseResult<_> {
291 0 : Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
292 0 : },
293 0 : )
294 0 : .await?;
295 :
296 0 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
297 :
298 0 : Ok(nodes)
299 0 : }
300 :
301 0 : pub(crate) async fn update_node(
302 0 : &self,
303 0 : input_node_id: NodeId,
304 0 : input_scheduling: NodeSchedulingPolicy,
305 0 : ) -> DatabaseResult<()> {
306 : use crate::schema::nodes::dsl::*;
307 0 : let updated = self
308 0 : .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
309 0 : let updated = diesel::update(nodes)
310 0 : .filter(node_id.eq(input_node_id.0 as i64))
311 0 : .set((scheduling_policy.eq(String::from(input_scheduling)),))
312 0 : .execute(conn)?;
313 0 : Ok(updated)
314 0 : })
315 0 : .await?;
316 :
317 0 : if updated != 1 {
318 0 : Err(DatabaseError::Logical(format!(
319 0 : "Node {node_id:?} not found for update",
320 0 : )))
321 : } else {
322 0 : Ok(())
323 : }
324 0 : }
325 :
326 0 : pub(crate) async fn set_node_availability_zone_id(
327 0 : &self,
328 0 : input_node_id: NodeId,
329 0 : input_az_id: String,
330 0 : ) -> DatabaseResult<()> {
331 : use crate::schema::nodes::dsl::*;
332 0 : let updated = self
333 0 : .with_measured_conn(DatabaseOperation::SetNodeAzId, move |conn| {
334 0 : let updated = diesel::update(nodes)
335 0 : .filter(node_id.eq(input_node_id.0 as i64))
336 0 : .set((availability_zone_id.eq(input_az_id.clone()),))
337 0 : .execute(conn)?;
338 0 : Ok(updated)
339 0 : })
340 0 : .await?;
341 :
342 0 : if updated != 1 {
343 0 : Err(DatabaseError::Logical(format!(
344 0 : "Node {node_id:?} not found for setting az id",
345 0 : )))
346 : } else {
347 0 : Ok(())
348 : }
349 0 : }
350 :
351 : /// At startup, load the high level state for shards, such as their config + policy. This will
352 : /// be enriched at runtime with state discovered on pageservers.
353 0 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
354 0 : self.with_measured_conn(
355 0 : DatabaseOperation::ListTenantShards,
356 0 : move |conn| -> DatabaseResult<_> {
357 0 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
358 0 : },
359 0 : )
360 0 : .await
361 0 : }
362 :
363 : /// Tenants must be persisted before we schedule them for the first time. This enables us
364 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
365 0 : pub(crate) async fn insert_tenant_shards(
366 0 : &self,
367 0 : shards: Vec<TenantShardPersistence>,
368 0 : ) -> DatabaseResult<()> {
369 0 : use crate::schema::metadata_health;
370 0 : use crate::schema::tenant_shards;
371 0 :
372 0 : let now = chrono::Utc::now();
373 0 :
374 0 : let metadata_health_records = shards
375 0 : .iter()
376 0 : .map(|t| MetadataHealthPersistence {
377 0 : tenant_id: t.tenant_id.clone(),
378 0 : shard_number: t.shard_number,
379 0 : shard_count: t.shard_count,
380 0 : healthy: true,
381 0 : last_scrubbed_at: now,
382 0 : })
383 0 : .collect::<Vec<_>>();
384 0 :
385 0 : self.with_measured_conn(
386 0 : DatabaseOperation::InsertTenantShards,
387 0 : move |conn| -> DatabaseResult<()> {
388 0 : diesel::insert_into(tenant_shards::table)
389 0 : .values(&shards)
390 0 : .execute(conn)?;
391 :
392 0 : diesel::insert_into(metadata_health::table)
393 0 : .values(&metadata_health_records)
394 0 : .execute(conn)?;
395 0 : Ok(())
396 0 : },
397 0 : )
398 0 : .await
399 0 : }
400 :
401 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
402 : /// the tenant from memory on this server.
403 0 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
404 0 : use crate::schema::tenant_shards::dsl::*;
405 0 : self.with_measured_conn(
406 0 : DatabaseOperation::DeleteTenant,
407 0 : move |conn| -> DatabaseResult<()> {
408 0 : // `metadata_health` status (if exists) is also deleted based on the cascade behavior.
409 0 : diesel::delete(tenant_shards)
410 0 : .filter(tenant_id.eq(del_tenant_id.to_string()))
411 0 : .execute(conn)?;
412 0 : Ok(())
413 0 : },
414 0 : )
415 0 : .await
416 0 : }
417 :
418 0 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
419 0 : use crate::schema::nodes::dsl::*;
420 0 : self.with_measured_conn(
421 0 : DatabaseOperation::DeleteNode,
422 0 : move |conn| -> DatabaseResult<()> {
423 0 : diesel::delete(nodes)
424 0 : .filter(node_id.eq(del_node_id.0 as i64))
425 0 : .execute(conn)?;
426 :
427 0 : Ok(())
428 0 : },
429 0 : )
430 0 : .await
431 0 : }
432 :
433 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
434 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
435 : /// the node that called /re-attach.
436 0 : #[tracing::instrument(skip_all, fields(node_id))]
437 : pub(crate) async fn re_attach(
438 : &self,
439 : input_node_id: NodeId,
440 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
441 : use crate::schema::nodes::dsl::scheduling_policy;
442 : use crate::schema::nodes::dsl::*;
443 : use crate::schema::tenant_shards::dsl::*;
444 : let updated = self
445 0 : .with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
446 0 : let rows_updated = diesel::update(tenant_shards)
447 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
448 0 : .set(generation.eq(generation + 1))
449 0 : .execute(conn)?;
450 :
451 0 : tracing::info!("Incremented {} tenants' generations", rows_updated);
452 :
453 : // TODO: UPDATE+SELECT in one query
454 :
455 0 : let updated = tenant_shards
456 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
457 0 : .select(TenantShardPersistence::as_select())
458 0 : .load(conn)?;
459 :
460 : // If the node went through a drain and restart phase before re-attaching,
461 : // then reset it's node scheduling policy to active.
462 0 : diesel::update(nodes)
463 0 : .filter(node_id.eq(input_node_id.0 as i64))
464 0 : .filter(
465 0 : scheduling_policy
466 0 : .eq(String::from(NodeSchedulingPolicy::PauseForRestart))
467 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
468 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
469 0 : )
470 0 : .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
471 0 : .execute(conn)?;
472 :
473 0 : Ok(updated)
474 0 : })
475 : .await?;
476 :
477 : let mut result = HashMap::new();
478 : for tsp in updated {
479 : let tenant_shard_id = TenantShardId {
480 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
481 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
482 : shard_number: ShardNumber(tsp.shard_number as u8),
483 : shard_count: ShardCount::new(tsp.shard_count as u8),
484 : };
485 :
486 : let Some(g) = tsp.generation else {
487 : // If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
488 : // we only set generation_pageserver when setting generation.
489 : return Err(DatabaseError::Logical(
490 : "Generation should always be set after incrementing".to_string(),
491 : ));
492 : };
493 : result.insert(tenant_shard_id, Generation::new(g as u32));
494 : }
495 :
496 : Ok(result)
497 : }
498 :
499 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
500 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
501 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
502 0 : pub(crate) async fn increment_generation(
503 0 : &self,
504 0 : tenant_shard_id: TenantShardId,
505 0 : node_id: NodeId,
506 0 : ) -> anyhow::Result<Generation> {
507 : use crate::schema::tenant_shards::dsl::*;
508 0 : let updated = self
509 0 : .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
510 0 : let updated = diesel::update(tenant_shards)
511 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
512 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
513 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
514 0 : .set((
515 0 : generation.eq(generation + 1),
516 0 : generation_pageserver.eq(node_id.0 as i64),
517 0 : ))
518 0 : // TODO: only returning() the generation column
519 0 : .returning(TenantShardPersistence::as_returning())
520 0 : .get_result(conn)?;
521 :
522 0 : Ok(updated)
523 0 : })
524 0 : .await?;
525 :
526 : // Generation is always non-null in the rseult: if the generation column had been NULL, then we
527 : // should have experienced an SQL Confilict error while executing a query that tries to increment it.
528 0 : debug_assert!(updated.generation.is_some());
529 0 : let Some(g) = updated.generation else {
530 0 : return Err(DatabaseError::Logical(
531 0 : "Generation should always be set after incrementing".to_string(),
532 0 : )
533 0 : .into());
534 : };
535 :
536 0 : Ok(Generation::new(g as u32))
537 0 : }
538 :
539 : /// When we want to call out to the running shards for a tenant, e.g. during timeline CRUD operations,
540 : /// we need to know where the shard is attached, _and_ the generation, so that we can re-check the generation
541 : /// afterwards to confirm that our timeline CRUD operation is truly persistent (it must have happened in the
542 : /// latest generation)
543 : ///
544 : /// If the tenant doesn't exist, an empty vector is returned.
545 : ///
546 : /// Output is sorted by shard number
547 0 : pub(crate) async fn peek_generations(
548 0 : &self,
549 0 : filter_tenant_id: TenantId,
550 0 : ) -> Result<Vec<ShardGenerationState>, DatabaseError> {
551 : use crate::schema::tenant_shards::dsl::*;
552 0 : let rows = self
553 0 : .with_measured_conn(DatabaseOperation::PeekGenerations, move |conn| {
554 0 : let result = tenant_shards
555 0 : .filter(tenant_id.eq(filter_tenant_id.to_string()))
556 0 : .select(TenantShardPersistence::as_select())
557 0 : .order(shard_number)
558 0 : .load(conn)?;
559 0 : Ok(result)
560 0 : })
561 0 : .await?;
562 :
563 0 : Ok(rows
564 0 : .into_iter()
565 0 : .map(|p| ShardGenerationState {
566 0 : tenant_shard_id: p
567 0 : .get_tenant_shard_id()
568 0 : .expect("Corrupt tenant shard id in database"),
569 0 : generation: p.generation.map(|g| Generation::new(g as u32)),
570 0 : generation_pageserver: p.generation_pageserver.map(|n| NodeId(n as u64)),
571 0 : })
572 0 : .collect())
573 0 : }
574 :
575 : #[allow(non_local_definitions)]
576 : /// For use when updating a persistent property of a tenant, such as its config or placement_policy.
577 : ///
578 : /// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
579 : /// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
580 : /// that we only do the first time a tenant is set to an attached policy via /location_config.
581 0 : pub(crate) async fn update_tenant_shard(
582 0 : &self,
583 0 : tenant: TenantFilter,
584 0 : input_placement_policy: Option<PlacementPolicy>,
585 0 : input_config: Option<TenantConfig>,
586 0 : input_generation: Option<Generation>,
587 0 : input_scheduling_policy: Option<ShardSchedulingPolicy>,
588 0 : ) -> DatabaseResult<()> {
589 0 : use crate::schema::tenant_shards::dsl::*;
590 0 :
591 0 : self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
592 0 : let query = match tenant {
593 0 : TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
594 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
595 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
596 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
597 0 : .into_boxed(),
598 0 : TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
599 0 : .filter(tenant_id.eq(input_tenant_id.to_string()))
600 0 : .into_boxed(),
601 : };
602 :
603 0 : #[derive(AsChangeset)]
604 : #[diesel(table_name = crate::schema::tenant_shards)]
605 : struct ShardUpdate {
606 : generation: Option<i32>,
607 : placement_policy: Option<String>,
608 : config: Option<String>,
609 : scheduling_policy: Option<String>,
610 : }
611 :
612 0 : let update = ShardUpdate {
613 0 : generation: input_generation.map(|g| g.into().unwrap() as i32),
614 0 : placement_policy: input_placement_policy
615 0 : .as_ref()
616 0 : .map(|p| serde_json::to_string(&p).unwrap()),
617 0 : config: input_config
618 0 : .as_ref()
619 0 : .map(|c| serde_json::to_string(&c).unwrap()),
620 0 : scheduling_policy: input_scheduling_policy
621 0 : .map(|p| serde_json::to_string(&p).unwrap()),
622 0 : };
623 0 :
624 0 : query.set(update).execute(conn)?;
625 :
626 0 : Ok(())
627 0 : })
628 0 : .await?;
629 :
630 0 : Ok(())
631 0 : }
632 :
633 0 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
634 0 : use crate::schema::tenant_shards::dsl::*;
635 0 : self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
636 0 : let updated = diesel::update(tenant_shards)
637 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
638 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
639 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
640 0 : .set((
641 0 : generation_pageserver.eq(Option::<i64>::None),
642 0 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
643 0 : ))
644 0 : .execute(conn)?;
645 :
646 0 : Ok(updated)
647 0 : })
648 0 : .await?;
649 :
650 0 : Ok(())
651 0 : }
652 :
653 : // When we start shard splitting, we must durably mark the tenant so that
654 : // on restart, we know that we must go through recovery.
655 : //
656 : // We create the child shards here, so that they will be available for increment_generation calls
657 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
658 0 : pub(crate) async fn begin_shard_split(
659 0 : &self,
660 0 : old_shard_count: ShardCount,
661 0 : split_tenant_id: TenantId,
662 0 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
663 0 : ) -> DatabaseResult<()> {
664 0 : use crate::schema::tenant_shards::dsl::*;
665 0 : self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
666 : // Mark parent shards as splitting
667 :
668 0 : let updated = diesel::update(tenant_shards)
669 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
670 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
671 0 : .set((splitting.eq(1),))
672 0 : .execute(conn)?;
673 0 : if u8::try_from(updated)
674 0 : .map_err(|_| DatabaseError::Logical(
675 0 : format!("Overflow existing shard count {} while splitting", updated))
676 0 : )? != old_shard_count.count() {
677 : // Perhaps a deletion or another split raced with this attempt to split, mutating
678 : // the parent shards that we intend to split. In this case the split request should fail.
679 0 : return Err(DatabaseError::Logical(
680 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
681 0 : ));
682 0 : }
683 0 :
684 0 : // FIXME: spurious clone to sidestep closure move rules
685 0 : let parent_to_children = parent_to_children.clone();
686 :
687 : // Insert child shards
688 0 : for (parent_shard_id, children) in parent_to_children {
689 0 : let mut parent = crate::schema::tenant_shards::table
690 0 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
691 0 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
692 0 : .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
693 0 : .load::<TenantShardPersistence>(conn)?;
694 0 : let parent = if parent.len() != 1 {
695 0 : return Err(DatabaseError::Logical(format!(
696 0 : "Parent shard {parent_shard_id} not found"
697 0 : )));
698 : } else {
699 0 : parent.pop().unwrap()
700 : };
701 0 : for mut shard in children {
702 : // Carry the parent's generation into the child
703 0 : shard.generation = parent.generation;
704 0 :
705 0 : debug_assert!(shard.splitting == SplitState::Splitting);
706 0 : diesel::insert_into(tenant_shards)
707 0 : .values(shard)
708 0 : .execute(conn)?;
709 : }
710 : }
711 :
712 0 : Ok(())
713 0 : })
714 0 : .await
715 0 : }
716 :
717 : // When we finish shard splitting, we must atomically clean up the old shards
718 : // and insert the new shards, and clear the splitting marker.
719 0 : pub(crate) async fn complete_shard_split(
720 0 : &self,
721 0 : split_tenant_id: TenantId,
722 0 : old_shard_count: ShardCount,
723 0 : ) -> DatabaseResult<()> {
724 0 : use crate::schema::tenant_shards::dsl::*;
725 0 : self.with_measured_conn(
726 0 : DatabaseOperation::CompleteShardSplit,
727 0 : move |conn| -> DatabaseResult<()> {
728 0 : // Drop parent shards
729 0 : diesel::delete(tenant_shards)
730 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
731 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
732 0 : .execute(conn)?;
733 :
734 : // Clear sharding flag
735 0 : let updated = diesel::update(tenant_shards)
736 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
737 0 : .set((splitting.eq(0),))
738 0 : .execute(conn)?;
739 0 : debug_assert!(updated > 0);
740 :
741 0 : Ok(())
742 0 : },
743 0 : )
744 0 : .await
745 0 : }
746 :
747 : /// Used when the remote part of a shard split failed: we will revert the database state to have only
748 : /// the parent shards, with SplitState::Idle.
749 0 : pub(crate) async fn abort_shard_split(
750 0 : &self,
751 0 : split_tenant_id: TenantId,
752 0 : new_shard_count: ShardCount,
753 0 : ) -> DatabaseResult<AbortShardSplitStatus> {
754 0 : use crate::schema::tenant_shards::dsl::*;
755 0 : self.with_measured_conn(
756 0 : DatabaseOperation::AbortShardSplit,
757 0 : move |conn| -> DatabaseResult<AbortShardSplitStatus> {
758 : // Clear the splitting state on parent shards
759 0 : let updated = diesel::update(tenant_shards)
760 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
761 0 : .filter(shard_count.ne(new_shard_count.literal() as i32))
762 0 : .set((splitting.eq(0),))
763 0 : .execute(conn)?;
764 :
765 : // Parent shards are already gone: we cannot abort.
766 0 : if updated == 0 {
767 0 : return Ok(AbortShardSplitStatus::Complete);
768 0 : }
769 0 :
770 0 : // Sanity check: if parent shards were present, their cardinality should
771 0 : // be less than the number of child shards.
772 0 : if updated >= new_shard_count.count() as usize {
773 0 : return Err(DatabaseError::Logical(format!(
774 0 : "Unexpected parent shard count {updated} while aborting split to \
775 0 : count {new_shard_count:?} on tenant {split_tenant_id}"
776 0 : )));
777 0 : }
778 0 :
779 0 : // Erase child shards
780 0 : diesel::delete(tenant_shards)
781 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
782 0 : .filter(shard_count.eq(new_shard_count.literal() as i32))
783 0 : .execute(conn)?;
784 :
785 0 : Ok(AbortShardSplitStatus::Aborted)
786 0 : },
787 0 : )
788 0 : .await
789 0 : }
790 :
791 : /// Stores all the latest metadata health updates durably. Updates existing entry on conflict.
792 : ///
793 : /// **Correctness:** `metadata_health_updates` should all belong the tenant shards managed by the storage controller.
794 : #[allow(dead_code)]
795 0 : pub(crate) async fn update_metadata_health_records(
796 0 : &self,
797 0 : healthy_records: Vec<MetadataHealthPersistence>,
798 0 : unhealthy_records: Vec<MetadataHealthPersistence>,
799 0 : now: chrono::DateTime<chrono::Utc>,
800 0 : ) -> DatabaseResult<()> {
801 0 : use crate::schema::metadata_health::dsl::*;
802 0 :
803 0 : self.with_measured_conn(
804 0 : DatabaseOperation::UpdateMetadataHealth,
805 0 : move |conn| -> DatabaseResult<_> {
806 0 : diesel::insert_into(metadata_health)
807 0 : .values(&healthy_records)
808 0 : .on_conflict((tenant_id, shard_number, shard_count))
809 0 : .do_update()
810 0 : .set((healthy.eq(true), last_scrubbed_at.eq(now)))
811 0 : .execute(conn)?;
812 :
813 0 : diesel::insert_into(metadata_health)
814 0 : .values(&unhealthy_records)
815 0 : .on_conflict((tenant_id, shard_number, shard_count))
816 0 : .do_update()
817 0 : .set((healthy.eq(false), last_scrubbed_at.eq(now)))
818 0 : .execute(conn)?;
819 0 : Ok(())
820 0 : },
821 0 : )
822 0 : .await
823 0 : }
824 :
825 : /// Lists all the metadata health records.
826 : #[allow(dead_code)]
827 0 : pub(crate) async fn list_metadata_health_records(
828 0 : &self,
829 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
830 0 : self.with_measured_conn(
831 0 : DatabaseOperation::ListMetadataHealth,
832 0 : move |conn| -> DatabaseResult<_> {
833 0 : Ok(
834 0 : crate::schema::metadata_health::table
835 0 : .load::<MetadataHealthPersistence>(conn)?,
836 : )
837 0 : },
838 0 : )
839 0 : .await
840 0 : }
841 :
842 : /// Lists all the metadata health records that is unhealthy.
843 : #[allow(dead_code)]
844 0 : pub(crate) async fn list_unhealthy_metadata_health_records(
845 0 : &self,
846 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
847 0 : use crate::schema::metadata_health::dsl::*;
848 0 : self.with_measured_conn(
849 0 : DatabaseOperation::ListMetadataHealthUnhealthy,
850 0 : move |conn| -> DatabaseResult<_> {
851 0 : Ok(crate::schema::metadata_health::table
852 0 : .filter(healthy.eq(false))
853 0 : .load::<MetadataHealthPersistence>(conn)?)
854 0 : },
855 0 : )
856 0 : .await
857 0 : }
858 :
859 : /// Lists all the metadata health records that have not been updated since an `earlier` time.
860 : #[allow(dead_code)]
861 0 : pub(crate) async fn list_outdated_metadata_health_records(
862 0 : &self,
863 0 : earlier: chrono::DateTime<chrono::Utc>,
864 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
865 0 : use crate::schema::metadata_health::dsl::*;
866 0 :
867 0 : self.with_measured_conn(
868 0 : DatabaseOperation::ListMetadataHealthOutdated,
869 0 : move |conn| -> DatabaseResult<_> {
870 0 : let query = metadata_health.filter(last_scrubbed_at.lt(earlier));
871 0 : let res = query.load::<MetadataHealthPersistence>(conn)?;
872 :
873 0 : Ok(res)
874 0 : },
875 0 : )
876 0 : .await
877 0 : }
878 :
879 : /// Get the current entry from the `leader` table if one exists.
880 : /// It is an error for the table to contain more than one entry.
881 0 : pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
882 0 : let mut leader: Vec<ControllerPersistence> = self
883 0 : .with_measured_conn(
884 0 : DatabaseOperation::GetLeader,
885 0 : move |conn| -> DatabaseResult<_> {
886 0 : Ok(crate::schema::controllers::table.load::<ControllerPersistence>(conn)?)
887 0 : },
888 0 : )
889 0 : .await?;
890 :
891 0 : if leader.len() > 1 {
892 0 : return Err(DatabaseError::Logical(format!(
893 0 : "More than one entry present in the leader table: {leader:?}"
894 0 : )));
895 0 : }
896 0 :
897 0 : Ok(leader.pop())
898 0 : }
899 :
900 : /// Update the new leader with compare-exchange semantics. If `prev` does not
901 : /// match the current leader entry, then the update is treated as a failure.
902 : /// When `prev` is not specified, the update is forced.
903 0 : pub(crate) async fn update_leader(
904 0 : &self,
905 0 : prev: Option<ControllerPersistence>,
906 0 : new: ControllerPersistence,
907 0 : ) -> DatabaseResult<()> {
908 : use crate::schema::controllers::dsl::*;
909 :
910 0 : let updated = self
911 0 : .with_measured_conn(
912 0 : DatabaseOperation::UpdateLeader,
913 0 : move |conn| -> DatabaseResult<usize> {
914 0 : let updated = match &prev {
915 0 : Some(prev) => diesel::update(controllers)
916 0 : .filter(address.eq(prev.address.clone()))
917 0 : .filter(started_at.eq(prev.started_at))
918 0 : .set((
919 0 : address.eq(new.address.clone()),
920 0 : started_at.eq(new.started_at),
921 0 : ))
922 0 : .execute(conn)?,
923 0 : None => diesel::insert_into(controllers)
924 0 : .values(new.clone())
925 0 : .execute(conn)?,
926 : };
927 :
928 0 : Ok(updated)
929 0 : },
930 0 : )
931 0 : .await?;
932 :
933 0 : if updated == 0 {
934 0 : return Err(DatabaseError::Logical(
935 0 : "Leader table update failed".to_string(),
936 0 : ));
937 0 : }
938 0 :
939 0 : Ok(())
940 0 : }
941 : }
942 :
943 : /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
944 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
945 : #[diesel(table_name = crate::schema::tenant_shards)]
946 : pub(crate) struct TenantShardPersistence {
947 : #[serde(default)]
948 : pub(crate) tenant_id: String,
949 : #[serde(default)]
950 : pub(crate) shard_number: i32,
951 : #[serde(default)]
952 : pub(crate) shard_count: i32,
953 : #[serde(default)]
954 : pub(crate) shard_stripe_size: i32,
955 :
956 : // Latest generation number: next time we attach, increment this
957 : // and use the incremented number when attaching.
958 : //
959 : // Generation is only None when first onboarding a tenant, where it may
960 : // be in PlacementPolicy::Secondary and therefore have no valid generation state.
961 : pub(crate) generation: Option<i32>,
962 :
963 : // Currently attached pageserver
964 : #[serde(rename = "pageserver")]
965 : pub(crate) generation_pageserver: Option<i64>,
966 :
967 : #[serde(default)]
968 : pub(crate) placement_policy: String,
969 : #[serde(default)]
970 : pub(crate) splitting: SplitState,
971 : #[serde(default)]
972 : pub(crate) config: String,
973 : #[serde(default)]
974 : pub(crate) scheduling_policy: String,
975 : }
976 :
977 : impl TenantShardPersistence {
978 0 : pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
979 0 : if self.shard_count == 0 {
980 0 : Ok(ShardIdentity::unsharded())
981 : } else {
982 0 : Ok(ShardIdentity::new(
983 0 : ShardNumber(self.shard_number as u8),
984 0 : ShardCount::new(self.shard_count as u8),
985 0 : ShardStripeSize(self.shard_stripe_size as u32),
986 0 : )?)
987 : }
988 0 : }
989 :
990 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
991 0 : Ok(TenantShardId {
992 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
993 0 : shard_number: ShardNumber(self.shard_number as u8),
994 0 : shard_count: ShardCount::new(self.shard_count as u8),
995 : })
996 0 : }
997 : }
998 :
999 : /// Parts of [`crate::node::Node`] that are stored durably
1000 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
1001 : #[diesel(table_name = crate::schema::nodes)]
1002 : pub(crate) struct NodePersistence {
1003 : pub(crate) node_id: i64,
1004 : pub(crate) scheduling_policy: String,
1005 : pub(crate) listen_http_addr: String,
1006 : pub(crate) listen_http_port: i32,
1007 : pub(crate) listen_pg_addr: String,
1008 : pub(crate) listen_pg_port: i32,
1009 : pub(crate) availability_zone_id: Option<String>,
1010 : }
1011 :
1012 : /// Tenant metadata health status that are stored durably.
1013 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
1014 : #[diesel(table_name = crate::schema::metadata_health)]
1015 : pub(crate) struct MetadataHealthPersistence {
1016 : #[serde(default)]
1017 : pub(crate) tenant_id: String,
1018 : #[serde(default)]
1019 : pub(crate) shard_number: i32,
1020 : #[serde(default)]
1021 : pub(crate) shard_count: i32,
1022 :
1023 : pub(crate) healthy: bool,
1024 : pub(crate) last_scrubbed_at: chrono::DateTime<chrono::Utc>,
1025 : }
1026 :
1027 : impl MetadataHealthPersistence {
1028 0 : pub fn new(
1029 0 : tenant_shard_id: TenantShardId,
1030 0 : healthy: bool,
1031 0 : last_scrubbed_at: chrono::DateTime<chrono::Utc>,
1032 0 : ) -> Self {
1033 0 : let tenant_id = tenant_shard_id.tenant_id.to_string();
1034 0 : let shard_number = tenant_shard_id.shard_number.0 as i32;
1035 0 : let shard_count = tenant_shard_id.shard_count.literal() as i32;
1036 0 :
1037 0 : MetadataHealthPersistence {
1038 0 : tenant_id,
1039 0 : shard_number,
1040 0 : shard_count,
1041 0 : healthy,
1042 0 : last_scrubbed_at,
1043 0 : }
1044 0 : }
1045 :
1046 : #[allow(dead_code)]
1047 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
1048 0 : Ok(TenantShardId {
1049 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
1050 0 : shard_number: ShardNumber(self.shard_number as u8),
1051 0 : shard_count: ShardCount::new(self.shard_count as u8),
1052 : })
1053 0 : }
1054 : }
1055 :
1056 : impl From<MetadataHealthPersistence> for MetadataHealthRecord {
1057 0 : fn from(value: MetadataHealthPersistence) -> Self {
1058 0 : MetadataHealthRecord {
1059 0 : tenant_shard_id: value
1060 0 : .get_tenant_shard_id()
1061 0 : .expect("stored tenant id should be valid"),
1062 0 : healthy: value.healthy,
1063 0 : last_scrubbed_at: value.last_scrubbed_at,
1064 0 : }
1065 0 : }
1066 : }
1067 :
1068 : #[derive(
1069 0 : Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone,
1070 : )]
1071 : #[diesel(table_name = crate::schema::controllers)]
1072 : pub(crate) struct ControllerPersistence {
1073 : pub(crate) address: String,
1074 : pub(crate) started_at: chrono::DateTime<chrono::Utc>,
1075 : }
|