Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 : use std::time::Instant;
6 :
7 : use self::split_state::SplitState;
8 : use diesel::pg::PgConnection;
9 : use diesel::prelude::*;
10 : use diesel::Connection;
11 : use pageserver_api::controller_api::MetadataHealthRecord;
12 : use pageserver_api::controller_api::ShardSchedulingPolicy;
13 : use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
14 : use pageserver_api::models::TenantConfig;
15 : use pageserver_api::shard::ShardConfigError;
16 : use pageserver_api::shard::ShardIdentity;
17 : use pageserver_api::shard::ShardStripeSize;
18 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
19 : use serde::{Deserialize, Serialize};
20 : use utils::generation::Generation;
21 : use utils::id::{NodeId, TenantId};
22 :
23 : use crate::metrics::{
24 : DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
25 : };
26 : use crate::node::Node;
27 :
28 : /// ## What do we store?
29 : ///
30 : /// The storage controller service does not store most of its state durably.
31 : ///
32 : /// The essential things to store durably are:
33 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
34 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
35 : /// - Node's scheduling policies, as the source of truth for these is something external.
36 : ///
37 : /// Other things we store durably as an implementation detail:
38 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
39 : /// but it is operationally simpler to make this service the authority for which nodes
40 : /// it talks to.
41 : ///
42 : /// ## Performance/efficiency
43 : ///
44 : /// The storage controller service does not go via the database for most things: there are
45 : /// a couple of places where we must, and where efficiency matters:
46 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
47 : /// before it can attach a tenant, so this acts as a bound on how fast things like
48 : /// failover can happen.
49 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
50 : /// so it is important to avoid e.g. issuing O(N) queries.
51 : ///
52 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
53 : /// updated, and reads of nodes are always from memory, not the database. We only require that
54 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
55 : pub struct Persistence {
56 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
57 : }
58 :
59 : /// Legacy format, for use in JSON compat objects in test environment
60 0 : #[derive(Serialize, Deserialize)]
61 : struct JsonPersistence {
62 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
63 : }
64 :
65 0 : #[derive(thiserror::Error, Debug)]
66 : pub(crate) enum DatabaseError {
67 : #[error(transparent)]
68 : Query(#[from] diesel::result::Error),
69 : #[error(transparent)]
70 : Connection(#[from] diesel::result::ConnectionError),
71 : #[error(transparent)]
72 : ConnectionPool(#[from] r2d2::Error),
73 : #[error("Logical error: {0}")]
74 : Logical(String),
75 : }
76 :
77 : #[derive(measured::FixedCardinalityLabel, Copy, Clone)]
78 : pub(crate) enum DatabaseOperation {
79 : InsertNode,
80 : UpdateNode,
81 : DeleteNode,
82 : ListNodes,
83 : BeginShardSplit,
84 : CompleteShardSplit,
85 : AbortShardSplit,
86 : Detach,
87 : ReAttach,
88 : IncrementGeneration,
89 : ListTenantShards,
90 : InsertTenantShards,
91 : UpdateTenantShard,
92 : DeleteTenant,
93 : UpdateTenantConfig,
94 : UpdateMetadataHealth,
95 : ListMetadataHealth,
96 : ListMetadataHealthUnhealthy,
97 : ListMetadataHealthOutdated,
98 : }
99 :
100 : #[must_use]
101 : pub(crate) enum AbortShardSplitStatus {
102 : /// We aborted the split in the database by reverting to the parent shards
103 : Aborted,
104 : /// The split had already been persisted.
105 : Complete,
106 : }
107 :
108 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
109 :
110 : /// Some methods can operate on either a whole tenant or a single shard
111 : pub(crate) enum TenantFilter {
112 : Tenant(TenantId),
113 : Shard(TenantShardId),
114 : }
115 :
116 : impl Persistence {
117 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
118 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
119 : pub const MAX_CONNECTIONS: u32 = 99;
120 :
121 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
122 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
123 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
124 :
125 0 : pub fn new(database_url: String) -> Self {
126 0 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
127 0 :
128 0 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
129 0 : // to execute queries (database queries are not generally on latency-sensitive paths).
130 0 : let connection_pool = diesel::r2d2::Pool::builder()
131 0 : .max_size(Self::MAX_CONNECTIONS)
132 0 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
133 0 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
134 0 : // Always keep at least one connection ready to go
135 0 : .min_idle(Some(1))
136 0 : .test_on_check_out(true)
137 0 : .build(manager)
138 0 : .expect("Could not build connection pool");
139 0 :
140 0 : Self { connection_pool }
141 0 : }
142 :
143 : /// A helper for use during startup, where we would like to tolerate concurrent restarts of the
144 : /// database and the storage controller, therefore the database might not be available right away
145 0 : pub async fn await_connection(
146 0 : database_url: &str,
147 0 : timeout: Duration,
148 0 : ) -> Result<(), diesel::ConnectionError> {
149 0 : let started_at = Instant::now();
150 0 : loop {
151 0 : match PgConnection::establish(database_url) {
152 : Ok(_) => {
153 0 : tracing::info!("Connected to database.");
154 0 : return Ok(());
155 : }
156 0 : Err(e) => {
157 0 : if started_at.elapsed() > timeout {
158 0 : return Err(e);
159 : } else {
160 0 : tracing::info!("Database not yet available, waiting... ({e})");
161 0 : tokio::time::sleep(Duration::from_millis(100)).await;
162 : }
163 : }
164 : }
165 : }
166 0 : }
167 :
168 : /// Wraps `with_conn` in order to collect latency and error metrics
169 0 : async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
170 0 : where
171 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
172 0 : R: Send + 'static,
173 0 : {
174 0 : let latency = &METRICS_REGISTRY
175 0 : .metrics_group
176 0 : .storage_controller_database_query_latency;
177 0 : let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op });
178 :
179 0 : let res = self.with_conn(func).await;
180 :
181 0 : if let Err(err) = &res {
182 0 : let error_counter = &METRICS_REGISTRY
183 0 : .metrics_group
184 0 : .storage_controller_database_query_error;
185 0 : error_counter.inc(DatabaseQueryErrorLabelGroup {
186 0 : error_type: err.error_label(),
187 0 : operation: op,
188 0 : })
189 0 : }
190 :
191 0 : res
192 0 : }
193 :
194 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
195 0 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
196 0 : where
197 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
198 0 : R: Send + 'static,
199 0 : {
200 : // A generous allowance for how many times we may retry serializable transactions
201 : // before giving up. This is not expected to be hit: it is a defensive measure in case we
202 : // somehow engineer a situation where duelling transactions might otherwise live-lock.
203 : const MAX_RETRIES: usize = 128;
204 :
205 0 : let mut conn = self.connection_pool.get()?;
206 0 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
207 0 : let mut retry_count = 0;
208 : loop {
209 0 : match conn.build_transaction().serializable().run(|c| func(c)) {
210 0 : Ok(r) => break Ok(r),
211 : Err(
212 0 : err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
213 0 : diesel::result::DatabaseErrorKind::SerializationFailure,
214 0 : _,
215 0 : )),
216 0 : ) => {
217 0 : retry_count += 1;
218 0 : if retry_count > MAX_RETRIES {
219 0 : tracing::error!(
220 0 : "Exceeded max retries on SerializationFailure errors: {err:?}"
221 : );
222 0 : break Err(err);
223 : } else {
224 : // Retry on serialization errors: these are expected, because even though our
225 : // transactions don't fight for the same rows, they will occasionally collide
226 : // on index pages (e.g. increment_generation for unrelated shards can collide)
227 0 : tracing::debug!(
228 0 : "Retrying transaction on serialization failure {err:?}"
229 : );
230 0 : continue;
231 : }
232 : }
233 0 : Err(e) => break Err(e),
234 : }
235 : }
236 0 : })
237 0 : .await
238 0 : .expect("Task panic")
239 0 : }
240 :
241 : /// When a node is first registered, persist it before using it for anything
242 0 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
243 0 : let np = node.to_persistent();
244 0 : self.with_measured_conn(
245 0 : DatabaseOperation::InsertNode,
246 0 : move |conn| -> DatabaseResult<()> {
247 0 : diesel::insert_into(crate::schema::nodes::table)
248 0 : .values(&np)
249 0 : .execute(conn)?;
250 0 : Ok(())
251 0 : },
252 0 : )
253 0 : .await
254 0 : }
255 :
256 : /// At startup, populate the list of nodes which our shards may be placed on
257 0 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
258 0 : let nodes: Vec<NodePersistence> = self
259 0 : .with_measured_conn(
260 0 : DatabaseOperation::ListNodes,
261 0 : move |conn| -> DatabaseResult<_> {
262 0 : Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
263 0 : },
264 0 : )
265 0 : .await?;
266 :
267 0 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
268 :
269 0 : Ok(nodes)
270 0 : }
271 :
272 0 : pub(crate) async fn update_node(
273 0 : &self,
274 0 : input_node_id: NodeId,
275 0 : input_scheduling: NodeSchedulingPolicy,
276 0 : ) -> DatabaseResult<()> {
277 : use crate::schema::nodes::dsl::*;
278 0 : let updated = self
279 0 : .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
280 0 : let updated = diesel::update(nodes)
281 0 : .filter(node_id.eq(input_node_id.0 as i64))
282 0 : .set((scheduling_policy.eq(String::from(input_scheduling)),))
283 0 : .execute(conn)?;
284 0 : Ok(updated)
285 0 : })
286 0 : .await?;
287 :
288 0 : if updated != 1 {
289 0 : Err(DatabaseError::Logical(format!(
290 0 : "Node {node_id:?} not found for update",
291 0 : )))
292 : } else {
293 0 : Ok(())
294 : }
295 0 : }
296 :
297 : /// At startup, load the high level state for shards, such as their config + policy. This will
298 : /// be enriched at runtime with state discovered on pageservers.
299 0 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
300 0 : self.with_measured_conn(
301 0 : DatabaseOperation::ListTenantShards,
302 0 : move |conn| -> DatabaseResult<_> {
303 0 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
304 0 : },
305 0 : )
306 0 : .await
307 0 : }
308 :
309 : /// Tenants must be persisted before we schedule them for the first time. This enables us
310 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
311 0 : pub(crate) async fn insert_tenant_shards(
312 0 : &self,
313 0 : shards: Vec<TenantShardPersistence>,
314 0 : ) -> DatabaseResult<()> {
315 0 : use crate::schema::metadata_health;
316 0 : use crate::schema::tenant_shards;
317 0 :
318 0 : let now = chrono::Utc::now();
319 0 :
320 0 : let metadata_health_records = shards
321 0 : .iter()
322 0 : .map(|t| MetadataHealthPersistence {
323 0 : tenant_id: t.tenant_id.clone(),
324 0 : shard_number: t.shard_number,
325 0 : shard_count: t.shard_count,
326 0 : healthy: true,
327 0 : last_scrubbed_at: now,
328 0 : })
329 0 : .collect::<Vec<_>>();
330 0 :
331 0 : self.with_measured_conn(
332 0 : DatabaseOperation::InsertTenantShards,
333 0 : move |conn| -> DatabaseResult<()> {
334 0 : diesel::insert_into(tenant_shards::table)
335 0 : .values(&shards)
336 0 : .execute(conn)?;
337 :
338 0 : diesel::insert_into(metadata_health::table)
339 0 : .values(&metadata_health_records)
340 0 : .execute(conn)?;
341 0 : Ok(())
342 0 : },
343 0 : )
344 0 : .await
345 0 : }
346 :
347 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
348 : /// the tenant from memory on this server.
349 0 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
350 0 : use crate::schema::tenant_shards::dsl::*;
351 0 : self.with_measured_conn(
352 0 : DatabaseOperation::DeleteTenant,
353 0 : move |conn| -> DatabaseResult<()> {
354 0 : // `metadata_health` status (if exists) is also deleted based on the cascade behavior.
355 0 : diesel::delete(tenant_shards)
356 0 : .filter(tenant_id.eq(del_tenant_id.to_string()))
357 0 : .execute(conn)?;
358 0 : Ok(())
359 0 : },
360 0 : )
361 0 : .await
362 0 : }
363 :
364 0 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
365 0 : use crate::schema::nodes::dsl::*;
366 0 : self.with_measured_conn(
367 0 : DatabaseOperation::DeleteNode,
368 0 : move |conn| -> DatabaseResult<()> {
369 0 : diesel::delete(nodes)
370 0 : .filter(node_id.eq(del_node_id.0 as i64))
371 0 : .execute(conn)?;
372 :
373 0 : Ok(())
374 0 : },
375 0 : )
376 0 : .await
377 0 : }
378 :
379 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
380 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
381 : /// the node that called /re-attach.
382 0 : #[tracing::instrument(skip_all, fields(node_id))]
383 : pub(crate) async fn re_attach(
384 : &self,
385 : input_node_id: NodeId,
386 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
387 : use crate::schema::nodes::dsl::scheduling_policy;
388 : use crate::schema::nodes::dsl::*;
389 : use crate::schema::tenant_shards::dsl::*;
390 : let updated = self
391 0 : .with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
392 0 : let rows_updated = diesel::update(tenant_shards)
393 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
394 0 : .set(generation.eq(generation + 1))
395 0 : .execute(conn)?;
396 :
397 0 : tracing::info!("Incremented {} tenants' generations", rows_updated);
398 :
399 : // TODO: UPDATE+SELECT in one query
400 :
401 0 : let updated = tenant_shards
402 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
403 0 : .select(TenantShardPersistence::as_select())
404 0 : .load(conn)?;
405 :
406 : // If the node went through a drain and restart phase before re-attaching,
407 : // then reset it's node scheduling policy to active.
408 0 : diesel::update(nodes)
409 0 : .filter(node_id.eq(input_node_id.0 as i64))
410 0 : .filter(
411 0 : scheduling_policy
412 0 : .eq(String::from(NodeSchedulingPolicy::PauseForRestart))
413 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
414 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
415 0 : )
416 0 : .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
417 0 : .execute(conn)?;
418 :
419 0 : Ok(updated)
420 0 : })
421 : .await?;
422 :
423 : let mut result = HashMap::new();
424 : for tsp in updated {
425 : let tenant_shard_id = TenantShardId {
426 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
427 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
428 : shard_number: ShardNumber(tsp.shard_number as u8),
429 : shard_count: ShardCount::new(tsp.shard_count as u8),
430 : };
431 :
432 : let Some(g) = tsp.generation else {
433 : // If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
434 : // we only set generation_pageserver when setting generation.
435 : return Err(DatabaseError::Logical(
436 : "Generation should always be set after incrementing".to_string(),
437 : ));
438 : };
439 : result.insert(tenant_shard_id, Generation::new(g as u32));
440 : }
441 :
442 : Ok(result)
443 : }
444 :
445 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
446 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
447 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
448 0 : pub(crate) async fn increment_generation(
449 0 : &self,
450 0 : tenant_shard_id: TenantShardId,
451 0 : node_id: NodeId,
452 0 : ) -> anyhow::Result<Generation> {
453 : use crate::schema::tenant_shards::dsl::*;
454 0 : let updated = self
455 0 : .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
456 0 : let updated = diesel::update(tenant_shards)
457 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
458 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
459 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
460 0 : .set((
461 0 : generation.eq(generation + 1),
462 0 : generation_pageserver.eq(node_id.0 as i64),
463 0 : ))
464 0 : // TODO: only returning() the generation column
465 0 : .returning(TenantShardPersistence::as_returning())
466 0 : .get_result(conn)?;
467 :
468 0 : Ok(updated)
469 0 : })
470 0 : .await?;
471 :
472 : // Generation is always non-null in the rseult: if the generation column had been NULL, then we
473 : // should have experienced an SQL Confilict error while executing a query that tries to increment it.
474 0 : debug_assert!(updated.generation.is_some());
475 0 : let Some(g) = updated.generation else {
476 0 : return Err(DatabaseError::Logical(
477 0 : "Generation should always be set after incrementing".to_string(),
478 0 : )
479 0 : .into());
480 : };
481 :
482 0 : Ok(Generation::new(g as u32))
483 0 : }
484 :
485 : #[allow(non_local_definitions)]
486 : /// For use when updating a persistent property of a tenant, such as its config or placement_policy.
487 : ///
488 : /// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
489 : /// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
490 : /// that we only do the first time a tenant is set to an attached policy via /location_config.
491 0 : pub(crate) async fn update_tenant_shard(
492 0 : &self,
493 0 : tenant: TenantFilter,
494 0 : input_placement_policy: Option<PlacementPolicy>,
495 0 : input_config: Option<TenantConfig>,
496 0 : input_generation: Option<Generation>,
497 0 : input_scheduling_policy: Option<ShardSchedulingPolicy>,
498 0 : ) -> DatabaseResult<()> {
499 0 : use crate::schema::tenant_shards::dsl::*;
500 0 :
501 0 : self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
502 0 : let query = match tenant {
503 0 : TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
504 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
505 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
506 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
507 0 : .into_boxed(),
508 0 : TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
509 0 : .filter(tenant_id.eq(input_tenant_id.to_string()))
510 0 : .into_boxed(),
511 : };
512 :
513 0 : #[derive(AsChangeset)]
514 : #[diesel(table_name = crate::schema::tenant_shards)]
515 : struct ShardUpdate {
516 : generation: Option<i32>,
517 : placement_policy: Option<String>,
518 : config: Option<String>,
519 : scheduling_policy: Option<String>,
520 : }
521 :
522 0 : let update = ShardUpdate {
523 0 : generation: input_generation.map(|g| g.into().unwrap() as i32),
524 0 : placement_policy: input_placement_policy
525 0 : .as_ref()
526 0 : .map(|p| serde_json::to_string(&p).unwrap()),
527 0 : config: input_config
528 0 : .as_ref()
529 0 : .map(|c| serde_json::to_string(&c).unwrap()),
530 0 : scheduling_policy: input_scheduling_policy
531 0 : .map(|p| serde_json::to_string(&p).unwrap()),
532 0 : };
533 0 :
534 0 : query.set(update).execute(conn)?;
535 :
536 0 : Ok(())
537 0 : })
538 0 : .await?;
539 :
540 0 : Ok(())
541 0 : }
542 :
543 0 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
544 0 : use crate::schema::tenant_shards::dsl::*;
545 0 : self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
546 0 : let updated = diesel::update(tenant_shards)
547 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
548 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
549 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
550 0 : .set((
551 0 : generation_pageserver.eq(Option::<i64>::None),
552 0 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
553 0 : ))
554 0 : .execute(conn)?;
555 :
556 0 : Ok(updated)
557 0 : })
558 0 : .await?;
559 :
560 0 : Ok(())
561 0 : }
562 :
563 : // When we start shard splitting, we must durably mark the tenant so that
564 : // on restart, we know that we must go through recovery.
565 : //
566 : // We create the child shards here, so that they will be available for increment_generation calls
567 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
568 0 : pub(crate) async fn begin_shard_split(
569 0 : &self,
570 0 : old_shard_count: ShardCount,
571 0 : split_tenant_id: TenantId,
572 0 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
573 0 : ) -> DatabaseResult<()> {
574 0 : use crate::schema::tenant_shards::dsl::*;
575 0 : self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
576 : // Mark parent shards as splitting
577 :
578 0 : let updated = diesel::update(tenant_shards)
579 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
580 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
581 0 : .set((splitting.eq(1),))
582 0 : .execute(conn)?;
583 0 : if u8::try_from(updated)
584 0 : .map_err(|_| DatabaseError::Logical(
585 0 : format!("Overflow existing shard count {} while splitting", updated))
586 0 : )? != old_shard_count.count() {
587 : // Perhaps a deletion or another split raced with this attempt to split, mutating
588 : // the parent shards that we intend to split. In this case the split request should fail.
589 0 : return Err(DatabaseError::Logical(
590 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
591 0 : ));
592 0 : }
593 0 :
594 0 : // FIXME: spurious clone to sidestep closure move rules
595 0 : let parent_to_children = parent_to_children.clone();
596 :
597 : // Insert child shards
598 0 : for (parent_shard_id, children) in parent_to_children {
599 0 : let mut parent = crate::schema::tenant_shards::table
600 0 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
601 0 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
602 0 : .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
603 0 : .load::<TenantShardPersistence>(conn)?;
604 0 : let parent = if parent.len() != 1 {
605 0 : return Err(DatabaseError::Logical(format!(
606 0 : "Parent shard {parent_shard_id} not found"
607 0 : )));
608 : } else {
609 0 : parent.pop().unwrap()
610 : };
611 0 : for mut shard in children {
612 : // Carry the parent's generation into the child
613 0 : shard.generation = parent.generation;
614 0 :
615 0 : debug_assert!(shard.splitting == SplitState::Splitting);
616 0 : diesel::insert_into(tenant_shards)
617 0 : .values(shard)
618 0 : .execute(conn)?;
619 : }
620 : }
621 :
622 0 : Ok(())
623 0 : })
624 0 : .await
625 0 : }
626 :
627 : // When we finish shard splitting, we must atomically clean up the old shards
628 : // and insert the new shards, and clear the splitting marker.
629 0 : pub(crate) async fn complete_shard_split(
630 0 : &self,
631 0 : split_tenant_id: TenantId,
632 0 : old_shard_count: ShardCount,
633 0 : ) -> DatabaseResult<()> {
634 0 : use crate::schema::tenant_shards::dsl::*;
635 0 : self.with_measured_conn(
636 0 : DatabaseOperation::CompleteShardSplit,
637 0 : move |conn| -> DatabaseResult<()> {
638 0 : // Drop parent shards
639 0 : diesel::delete(tenant_shards)
640 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
641 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
642 0 : .execute(conn)?;
643 :
644 : // Clear sharding flag
645 0 : let updated = diesel::update(tenant_shards)
646 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
647 0 : .set((splitting.eq(0),))
648 0 : .execute(conn)?;
649 0 : debug_assert!(updated > 0);
650 :
651 0 : Ok(())
652 0 : },
653 0 : )
654 0 : .await
655 0 : }
656 :
657 : /// Used when the remote part of a shard split failed: we will revert the database state to have only
658 : /// the parent shards, with SplitState::Idle.
659 0 : pub(crate) async fn abort_shard_split(
660 0 : &self,
661 0 : split_tenant_id: TenantId,
662 0 : new_shard_count: ShardCount,
663 0 : ) -> DatabaseResult<AbortShardSplitStatus> {
664 0 : use crate::schema::tenant_shards::dsl::*;
665 0 : self.with_measured_conn(
666 0 : DatabaseOperation::AbortShardSplit,
667 0 : move |conn| -> DatabaseResult<AbortShardSplitStatus> {
668 : // Clear the splitting state on parent shards
669 0 : let updated = diesel::update(tenant_shards)
670 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
671 0 : .filter(shard_count.ne(new_shard_count.literal() as i32))
672 0 : .set((splitting.eq(0),))
673 0 : .execute(conn)?;
674 :
675 : // Parent shards are already gone: we cannot abort.
676 0 : if updated == 0 {
677 0 : return Ok(AbortShardSplitStatus::Complete);
678 0 : }
679 0 :
680 0 : // Sanity check: if parent shards were present, their cardinality should
681 0 : // be less than the number of child shards.
682 0 : if updated >= new_shard_count.count() as usize {
683 0 : return Err(DatabaseError::Logical(format!(
684 0 : "Unexpected parent shard count {updated} while aborting split to \
685 0 : count {new_shard_count:?} on tenant {split_tenant_id}"
686 0 : )));
687 0 : }
688 0 :
689 0 : // Erase child shards
690 0 : diesel::delete(tenant_shards)
691 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
692 0 : .filter(shard_count.eq(new_shard_count.literal() as i32))
693 0 : .execute(conn)?;
694 :
695 0 : Ok(AbortShardSplitStatus::Aborted)
696 0 : },
697 0 : )
698 0 : .await
699 0 : }
700 :
701 : /// Stores all the latest metadata health updates durably. Updates existing entry on conflict.
702 : ///
703 : /// **Correctness:** `metadata_health_updates` should all belong the tenant shards managed by the storage controller.
704 : #[allow(dead_code)]
705 0 : pub(crate) async fn update_metadata_health_records(
706 0 : &self,
707 0 : healthy_records: Vec<MetadataHealthPersistence>,
708 0 : unhealthy_records: Vec<MetadataHealthPersistence>,
709 0 : now: chrono::DateTime<chrono::Utc>,
710 0 : ) -> DatabaseResult<()> {
711 0 : use crate::schema::metadata_health::dsl::*;
712 0 :
713 0 : self.with_measured_conn(
714 0 : DatabaseOperation::UpdateMetadataHealth,
715 0 : move |conn| -> DatabaseResult<_> {
716 0 : diesel::insert_into(metadata_health)
717 0 : .values(&healthy_records)
718 0 : .on_conflict((tenant_id, shard_number, shard_count))
719 0 : .do_update()
720 0 : .set((healthy.eq(true), last_scrubbed_at.eq(now)))
721 0 : .execute(conn)?;
722 :
723 0 : diesel::insert_into(metadata_health)
724 0 : .values(&unhealthy_records)
725 0 : .on_conflict((tenant_id, shard_number, shard_count))
726 0 : .do_update()
727 0 : .set((healthy.eq(false), last_scrubbed_at.eq(now)))
728 0 : .execute(conn)?;
729 0 : Ok(())
730 0 : },
731 0 : )
732 0 : .await
733 0 : }
734 :
735 : /// Lists all the metadata health records.
736 : #[allow(dead_code)]
737 0 : pub(crate) async fn list_metadata_health_records(
738 0 : &self,
739 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
740 0 : self.with_measured_conn(
741 0 : DatabaseOperation::ListMetadataHealth,
742 0 : move |conn| -> DatabaseResult<_> {
743 0 : Ok(
744 0 : crate::schema::metadata_health::table
745 0 : .load::<MetadataHealthPersistence>(conn)?,
746 : )
747 0 : },
748 0 : )
749 0 : .await
750 0 : }
751 :
752 : /// Lists all the metadata health records that is unhealthy.
753 : #[allow(dead_code)]
754 0 : pub(crate) async fn list_unhealthy_metadata_health_records(
755 0 : &self,
756 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
757 0 : use crate::schema::metadata_health::dsl::*;
758 0 : self.with_measured_conn(
759 0 : DatabaseOperation::ListMetadataHealthUnhealthy,
760 0 : move |conn| -> DatabaseResult<_> {
761 0 : Ok(crate::schema::metadata_health::table
762 0 : .filter(healthy.eq(false))
763 0 : .load::<MetadataHealthPersistence>(conn)?)
764 0 : },
765 0 : )
766 0 : .await
767 0 : }
768 :
769 : /// Lists all the metadata health records that have not been updated since an `earlier` time.
770 : #[allow(dead_code)]
771 0 : pub(crate) async fn list_outdated_metadata_health_records(
772 0 : &self,
773 0 : earlier: chrono::DateTime<chrono::Utc>,
774 0 : ) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
775 0 : use crate::schema::metadata_health::dsl::*;
776 0 :
777 0 : self.with_measured_conn(
778 0 : DatabaseOperation::ListMetadataHealthOutdated,
779 0 : move |conn| -> DatabaseResult<_> {
780 0 : let query = metadata_health.filter(last_scrubbed_at.lt(earlier));
781 0 : let res = query.load::<MetadataHealthPersistence>(conn)?;
782 :
783 0 : Ok(res)
784 0 : },
785 0 : )
786 0 : .await
787 0 : }
788 : }
789 :
790 : /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
791 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
792 : #[diesel(table_name = crate::schema::tenant_shards)]
793 : pub(crate) struct TenantShardPersistence {
794 : #[serde(default)]
795 : pub(crate) tenant_id: String,
796 : #[serde(default)]
797 : pub(crate) shard_number: i32,
798 : #[serde(default)]
799 : pub(crate) shard_count: i32,
800 : #[serde(default)]
801 : pub(crate) shard_stripe_size: i32,
802 :
803 : // Latest generation number: next time we attach, increment this
804 : // and use the incremented number when attaching.
805 : //
806 : // Generation is only None when first onboarding a tenant, where it may
807 : // be in PlacementPolicy::Secondary and therefore have no valid generation state.
808 : pub(crate) generation: Option<i32>,
809 :
810 : // Currently attached pageserver
811 : #[serde(rename = "pageserver")]
812 : pub(crate) generation_pageserver: Option<i64>,
813 :
814 : #[serde(default)]
815 : pub(crate) placement_policy: String,
816 : #[serde(default)]
817 : pub(crate) splitting: SplitState,
818 : #[serde(default)]
819 : pub(crate) config: String,
820 : #[serde(default)]
821 : pub(crate) scheduling_policy: String,
822 : }
823 :
824 : impl TenantShardPersistence {
825 0 : pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
826 0 : if self.shard_count == 0 {
827 0 : Ok(ShardIdentity::unsharded())
828 : } else {
829 0 : Ok(ShardIdentity::new(
830 0 : ShardNumber(self.shard_number as u8),
831 0 : ShardCount::new(self.shard_count as u8),
832 0 : ShardStripeSize(self.shard_stripe_size as u32),
833 0 : )?)
834 : }
835 0 : }
836 :
837 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
838 0 : Ok(TenantShardId {
839 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
840 0 : shard_number: ShardNumber(self.shard_number as u8),
841 0 : shard_count: ShardCount::new(self.shard_count as u8),
842 : })
843 0 : }
844 : }
845 :
846 : /// Parts of [`crate::node::Node`] that are stored durably
847 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
848 : #[diesel(table_name = crate::schema::nodes)]
849 : pub(crate) struct NodePersistence {
850 : pub(crate) node_id: i64,
851 : pub(crate) scheduling_policy: String,
852 : pub(crate) listen_http_addr: String,
853 : pub(crate) listen_http_port: i32,
854 : pub(crate) listen_pg_addr: String,
855 : pub(crate) listen_pg_port: i32,
856 : }
857 :
858 : /// Tenant metadata health status that are stored durably.
859 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
860 : #[diesel(table_name = crate::schema::metadata_health)]
861 : pub(crate) struct MetadataHealthPersistence {
862 : #[serde(default)]
863 : pub(crate) tenant_id: String,
864 : #[serde(default)]
865 : pub(crate) shard_number: i32,
866 : #[serde(default)]
867 : pub(crate) shard_count: i32,
868 :
869 : pub(crate) healthy: bool,
870 : pub(crate) last_scrubbed_at: chrono::DateTime<chrono::Utc>,
871 : }
872 :
873 : impl MetadataHealthPersistence {
874 0 : pub fn new(
875 0 : tenant_shard_id: TenantShardId,
876 0 : healthy: bool,
877 0 : last_scrubbed_at: chrono::DateTime<chrono::Utc>,
878 0 : ) -> Self {
879 0 : let tenant_id = tenant_shard_id.tenant_id.to_string();
880 0 : let shard_number = tenant_shard_id.shard_number.0 as i32;
881 0 : let shard_count = tenant_shard_id.shard_count.literal() as i32;
882 0 :
883 0 : MetadataHealthPersistence {
884 0 : tenant_id,
885 0 : shard_number,
886 0 : shard_count,
887 0 : healthy,
888 0 : last_scrubbed_at,
889 0 : }
890 0 : }
891 :
892 : #[allow(dead_code)]
893 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
894 0 : Ok(TenantShardId {
895 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
896 0 : shard_number: ShardNumber(self.shard_number as u8),
897 0 : shard_count: ShardCount::new(self.shard_count as u8),
898 : })
899 0 : }
900 : }
901 :
902 : impl From<MetadataHealthPersistence> for MetadataHealthRecord {
903 0 : fn from(value: MetadataHealthPersistence) -> Self {
904 0 : MetadataHealthRecord {
905 0 : tenant_shard_id: value
906 0 : .get_tenant_shard_id()
907 0 : .expect("stored tenant id should be valid"),
908 0 : healthy: value.healthy,
909 0 : last_scrubbed_at: value.last_scrubbed_at,
910 0 : }
911 0 : }
912 : }
|