Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 : use std::time::Instant;
6 :
7 : use self::split_state::SplitState;
8 : use diesel::pg::PgConnection;
9 : use diesel::prelude::*;
10 : use diesel::Connection;
11 : use pageserver_api::controller_api::ShardSchedulingPolicy;
12 : use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
13 : use pageserver_api::models::TenantConfig;
14 : use pageserver_api::shard::ShardConfigError;
15 : use pageserver_api::shard::ShardIdentity;
16 : use pageserver_api::shard::ShardStripeSize;
17 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
18 : use serde::{Deserialize, Serialize};
19 : use utils::generation::Generation;
20 : use utils::id::{NodeId, TenantId};
21 :
22 : use crate::metrics::{
23 : DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
24 : };
25 : use crate::node::Node;
26 :
27 : /// ## What do we store?
28 : ///
29 : /// The storage controller service does not store most of its state durably.
30 : ///
31 : /// The essential things to store durably are:
32 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
33 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
34 : /// - Node's scheduling policies, as the source of truth for these is something external.
35 : ///
36 : /// Other things we store durably as an implementation detail:
37 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
38 : /// but it is operationally simpler to make this service the authority for which nodes
39 : /// it talks to.
40 : ///
41 : /// ## Performance/efficiency
42 : ///
43 : /// The storage controller service does not go via the database for most things: there are
44 : /// a couple of places where we must, and where efficiency matters:
45 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
46 : /// before it can attach a tenant, so this acts as a bound on how fast things like
47 : /// failover can happen.
48 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
49 : /// so it is important to avoid e.g. issuing O(N) queries.
50 : ///
51 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
52 : /// updated, and reads of nodes are always from memory, not the database. We only require that
53 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
54 : pub struct Persistence {
55 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
56 : }
57 :
58 : /// Legacy format, for use in JSON compat objects in test environment
59 0 : #[derive(Serialize, Deserialize)]
60 : struct JsonPersistence {
61 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
62 : }
63 :
64 0 : #[derive(thiserror::Error, Debug)]
65 : pub(crate) enum DatabaseError {
66 : #[error(transparent)]
67 : Query(#[from] diesel::result::Error),
68 : #[error(transparent)]
69 : Connection(#[from] diesel::result::ConnectionError),
70 : #[error(transparent)]
71 : ConnectionPool(#[from] r2d2::Error),
72 : #[error("Logical error: {0}")]
73 : Logical(String),
74 : }
75 :
76 : #[derive(measured::FixedCardinalityLabel, Copy, Clone)]
77 : pub(crate) enum DatabaseOperation {
78 : InsertNode,
79 : UpdateNode,
80 : DeleteNode,
81 : ListNodes,
82 : BeginShardSplit,
83 : CompleteShardSplit,
84 : AbortShardSplit,
85 : Detach,
86 : ReAttach,
87 : IncrementGeneration,
88 : ListTenantShards,
89 : InsertTenantShards,
90 : UpdateTenantShard,
91 : DeleteTenant,
92 : UpdateTenantConfig,
93 : }
94 :
95 : #[must_use]
96 : pub(crate) enum AbortShardSplitStatus {
97 : /// We aborted the split in the database by reverting to the parent shards
98 : Aborted,
99 : /// The split had already been persisted.
100 : Complete,
101 : }
102 :
103 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
104 :
105 : /// Some methods can operate on either a whole tenant or a single shard
106 : pub(crate) enum TenantFilter {
107 : Tenant(TenantId),
108 : Shard(TenantShardId),
109 : }
110 :
111 : impl Persistence {
112 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
113 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
114 : pub const MAX_CONNECTIONS: u32 = 99;
115 :
116 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
117 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
118 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
119 :
120 0 : pub fn new(database_url: String) -> Self {
121 0 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
122 0 :
123 0 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
124 0 : // to execute queries (database queries are not generally on latency-sensitive paths).
125 0 : let connection_pool = diesel::r2d2::Pool::builder()
126 0 : .max_size(Self::MAX_CONNECTIONS)
127 0 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
128 0 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
129 0 : // Always keep at least one connection ready to go
130 0 : .min_idle(Some(1))
131 0 : .test_on_check_out(true)
132 0 : .build(manager)
133 0 : .expect("Could not build connection pool");
134 0 :
135 0 : Self { connection_pool }
136 0 : }
137 :
138 : /// A helper for use during startup, where we would like to tolerate concurrent restarts of the
139 : /// database and the storage controller, therefore the database might not be available right away
140 0 : pub async fn await_connection(
141 0 : database_url: &str,
142 0 : timeout: Duration,
143 0 : ) -> Result<(), diesel::ConnectionError> {
144 0 : let started_at = Instant::now();
145 0 : loop {
146 0 : match PgConnection::establish(database_url) {
147 : Ok(_) => {
148 0 : tracing::info!("Connected to database.");
149 0 : return Ok(());
150 : }
151 0 : Err(e) => {
152 0 : if started_at.elapsed() > timeout {
153 0 : return Err(e);
154 : } else {
155 0 : tracing::info!("Database not yet available, waiting... ({e})");
156 0 : tokio::time::sleep(Duration::from_millis(100)).await;
157 : }
158 : }
159 : }
160 : }
161 0 : }
162 :
163 : /// Wraps `with_conn` in order to collect latency and error metrics
164 0 : async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
165 0 : where
166 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
167 0 : R: Send + 'static,
168 0 : {
169 0 : let latency = &METRICS_REGISTRY
170 0 : .metrics_group
171 0 : .storage_controller_database_query_latency;
172 0 : let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op });
173 :
174 0 : let res = self.with_conn(func).await;
175 :
176 0 : if let Err(err) = &res {
177 0 : let error_counter = &METRICS_REGISTRY
178 0 : .metrics_group
179 0 : .storage_controller_database_query_error;
180 0 : error_counter.inc(DatabaseQueryErrorLabelGroup {
181 0 : error_type: err.error_label(),
182 0 : operation: op,
183 0 : })
184 0 : }
185 :
186 0 : res
187 0 : }
188 :
189 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
190 0 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
191 0 : where
192 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
193 0 : R: Send + 'static,
194 0 : {
195 : // A generous allowance for how many times we may retry serializable transactions
196 : // before giving up. This is not expected to be hit: it is a defensive measure in case we
197 : // somehow engineer a situation where duelling transactions might otherwise live-lock.
198 : const MAX_RETRIES: usize = 128;
199 :
200 0 : let mut conn = self.connection_pool.get()?;
201 0 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
202 0 : let mut retry_count = 0;
203 : loop {
204 0 : match conn.build_transaction().serializable().run(|c| func(c)) {
205 0 : Ok(r) => break Ok(r),
206 : Err(
207 0 : err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
208 0 : diesel::result::DatabaseErrorKind::SerializationFailure,
209 0 : _,
210 0 : )),
211 0 : ) => {
212 0 : retry_count += 1;
213 0 : if retry_count > MAX_RETRIES {
214 0 : tracing::error!(
215 0 : "Exceeded max retries on SerializationFailure errors: {err:?}"
216 : );
217 0 : break Err(err);
218 : } else {
219 : // Retry on serialization errors: these are expected, because even though our
220 : // transactions don't fight for the same rows, they will occasionally collide
221 : // on index pages (e.g. increment_generation for unrelated shards can collide)
222 0 : tracing::debug!(
223 0 : "Retrying transaction on serialization failure {err:?}"
224 : );
225 0 : continue;
226 : }
227 : }
228 0 : Err(e) => break Err(e),
229 : }
230 : }
231 0 : })
232 0 : .await
233 0 : .expect("Task panic")
234 0 : }
235 :
236 : /// When a node is first registered, persist it before using it for anything
237 0 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
238 0 : let np = node.to_persistent();
239 0 : self.with_measured_conn(
240 0 : DatabaseOperation::InsertNode,
241 0 : move |conn| -> DatabaseResult<()> {
242 0 : diesel::insert_into(crate::schema::nodes::table)
243 0 : .values(&np)
244 0 : .execute(conn)?;
245 0 : Ok(())
246 0 : },
247 0 : )
248 0 : .await
249 0 : }
250 :
251 : /// At startup, populate the list of nodes which our shards may be placed on
252 0 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
253 0 : let nodes: Vec<NodePersistence> = self
254 0 : .with_measured_conn(
255 0 : DatabaseOperation::ListNodes,
256 0 : move |conn| -> DatabaseResult<_> {
257 0 : Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
258 0 : },
259 0 : )
260 0 : .await?;
261 :
262 0 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
263 :
264 0 : Ok(nodes)
265 0 : }
266 :
267 0 : pub(crate) async fn update_node(
268 0 : &self,
269 0 : input_node_id: NodeId,
270 0 : input_scheduling: NodeSchedulingPolicy,
271 0 : ) -> DatabaseResult<()> {
272 : use crate::schema::nodes::dsl::*;
273 0 : let updated = self
274 0 : .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
275 0 : let updated = diesel::update(nodes)
276 0 : .filter(node_id.eq(input_node_id.0 as i64))
277 0 : .set((scheduling_policy.eq(String::from(input_scheduling)),))
278 0 : .execute(conn)?;
279 0 : Ok(updated)
280 0 : })
281 0 : .await?;
282 :
283 0 : if updated != 1 {
284 0 : Err(DatabaseError::Logical(format!(
285 0 : "Node {node_id:?} not found for update",
286 0 : )))
287 : } else {
288 0 : Ok(())
289 : }
290 0 : }
291 :
292 : /// At startup, load the high level state for shards, such as their config + policy. This will
293 : /// be enriched at runtime with state discovered on pageservers.
294 0 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
295 0 : self.with_measured_conn(
296 0 : DatabaseOperation::ListTenantShards,
297 0 : move |conn| -> DatabaseResult<_> {
298 0 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
299 0 : },
300 0 : )
301 0 : .await
302 0 : }
303 :
304 : /// Tenants must be persisted before we schedule them for the first time. This enables us
305 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
306 0 : pub(crate) async fn insert_tenant_shards(
307 0 : &self,
308 0 : shards: Vec<TenantShardPersistence>,
309 0 : ) -> DatabaseResult<()> {
310 0 : use crate::schema::tenant_shards::dsl::*;
311 0 : self.with_measured_conn(
312 0 : DatabaseOperation::InsertTenantShards,
313 0 : move |conn| -> DatabaseResult<()> {
314 0 : for tenant in &shards {
315 0 : diesel::insert_into(tenant_shards)
316 0 : .values(tenant)
317 0 : .execute(conn)?;
318 : }
319 0 : Ok(())
320 0 : },
321 0 : )
322 0 : .await
323 0 : }
324 :
325 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
326 : /// the tenant from memory on this server.
327 0 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
328 0 : use crate::schema::tenant_shards::dsl::*;
329 0 : self.with_measured_conn(
330 0 : DatabaseOperation::DeleteTenant,
331 0 : move |conn| -> DatabaseResult<()> {
332 0 : diesel::delete(tenant_shards)
333 0 : .filter(tenant_id.eq(del_tenant_id.to_string()))
334 0 : .execute(conn)?;
335 :
336 0 : Ok(())
337 0 : },
338 0 : )
339 0 : .await
340 0 : }
341 :
342 0 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
343 0 : use crate::schema::nodes::dsl::*;
344 0 : self.with_measured_conn(
345 0 : DatabaseOperation::DeleteNode,
346 0 : move |conn| -> DatabaseResult<()> {
347 0 : diesel::delete(nodes)
348 0 : .filter(node_id.eq(del_node_id.0 as i64))
349 0 : .execute(conn)?;
350 :
351 0 : Ok(())
352 0 : },
353 0 : )
354 0 : .await
355 0 : }
356 :
357 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
358 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
359 : /// the node that called /re-attach.
360 0 : #[tracing::instrument(skip_all, fields(node_id))]
361 : pub(crate) async fn re_attach(
362 : &self,
363 : input_node_id: NodeId,
364 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
365 : use crate::schema::nodes::dsl::scheduling_policy;
366 : use crate::schema::nodes::dsl::*;
367 : use crate::schema::tenant_shards::dsl::*;
368 : let updated = self
369 0 : .with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
370 0 : let rows_updated = diesel::update(tenant_shards)
371 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
372 0 : .set(generation.eq(generation + 1))
373 0 : .execute(conn)?;
374 :
375 0 : tracing::info!("Incremented {} tenants' generations", rows_updated);
376 :
377 : // TODO: UPDATE+SELECT in one query
378 :
379 0 : let updated = tenant_shards
380 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
381 0 : .select(TenantShardPersistence::as_select())
382 0 : .load(conn)?;
383 :
384 : // If the node went through a drain and restart phase before re-attaching,
385 : // then reset it's node scheduling policy to active.
386 0 : diesel::update(nodes)
387 0 : .filter(node_id.eq(input_node_id.0 as i64))
388 0 : .filter(
389 0 : scheduling_policy
390 0 : .eq(String::from(NodeSchedulingPolicy::PauseForRestart))
391 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
392 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
393 0 : )
394 0 : .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
395 0 : .execute(conn)?;
396 :
397 0 : Ok(updated)
398 0 : })
399 : .await?;
400 :
401 : let mut result = HashMap::new();
402 : for tsp in updated {
403 : let tenant_shard_id = TenantShardId {
404 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
405 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
406 : shard_number: ShardNumber(tsp.shard_number as u8),
407 : shard_count: ShardCount::new(tsp.shard_count as u8),
408 : };
409 :
410 : let Some(g) = tsp.generation else {
411 : // If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
412 : // we only set generation_pageserver when setting generation.
413 : return Err(DatabaseError::Logical(
414 : "Generation should always be set after incrementing".to_string(),
415 : ));
416 : };
417 : result.insert(tenant_shard_id, Generation::new(g as u32));
418 : }
419 :
420 : Ok(result)
421 : }
422 :
423 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
424 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
425 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
426 0 : pub(crate) async fn increment_generation(
427 0 : &self,
428 0 : tenant_shard_id: TenantShardId,
429 0 : node_id: NodeId,
430 0 : ) -> anyhow::Result<Generation> {
431 : use crate::schema::tenant_shards::dsl::*;
432 0 : let updated = self
433 0 : .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
434 0 : let updated = diesel::update(tenant_shards)
435 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
436 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
437 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
438 0 : .set((
439 0 : generation.eq(generation + 1),
440 0 : generation_pageserver.eq(node_id.0 as i64),
441 0 : ))
442 0 : // TODO: only returning() the generation column
443 0 : .returning(TenantShardPersistence::as_returning())
444 0 : .get_result(conn)?;
445 :
446 0 : Ok(updated)
447 0 : })
448 0 : .await?;
449 :
450 : // Generation is always non-null in the rseult: if the generation column had been NULL, then we
451 : // should have experienced an SQL Confilict error while executing a query that tries to increment it.
452 0 : debug_assert!(updated.generation.is_some());
453 0 : let Some(g) = updated.generation else {
454 0 : return Err(DatabaseError::Logical(
455 0 : "Generation should always be set after incrementing".to_string(),
456 0 : )
457 0 : .into());
458 : };
459 :
460 0 : Ok(Generation::new(g as u32))
461 0 : }
462 :
463 : #[allow(non_local_definitions)]
464 : /// For use when updating a persistent property of a tenant, such as its config or placement_policy.
465 : ///
466 : /// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
467 : /// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
468 : /// that we only do the first time a tenant is set to an attached policy via /location_config.
469 0 : pub(crate) async fn update_tenant_shard(
470 0 : &self,
471 0 : tenant: TenantFilter,
472 0 : input_placement_policy: Option<PlacementPolicy>,
473 0 : input_config: Option<TenantConfig>,
474 0 : input_generation: Option<Generation>,
475 0 : input_scheduling_policy: Option<ShardSchedulingPolicy>,
476 0 : ) -> DatabaseResult<()> {
477 0 : use crate::schema::tenant_shards::dsl::*;
478 0 :
479 0 : self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
480 0 : let query = match tenant {
481 0 : TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
482 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
483 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
484 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
485 0 : .into_boxed(),
486 0 : TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
487 0 : .filter(tenant_id.eq(input_tenant_id.to_string()))
488 0 : .into_boxed(),
489 : };
490 :
491 0 : #[derive(AsChangeset)]
492 : #[diesel(table_name = crate::schema::tenant_shards)]
493 : struct ShardUpdate {
494 : generation: Option<i32>,
495 : placement_policy: Option<String>,
496 : config: Option<String>,
497 : scheduling_policy: Option<String>,
498 : }
499 :
500 0 : let update = ShardUpdate {
501 0 : generation: input_generation.map(|g| g.into().unwrap() as i32),
502 0 : placement_policy: input_placement_policy
503 0 : .as_ref()
504 0 : .map(|p| serde_json::to_string(&p).unwrap()),
505 0 : config: input_config
506 0 : .as_ref()
507 0 : .map(|c| serde_json::to_string(&c).unwrap()),
508 0 : scheduling_policy: input_scheduling_policy
509 0 : .map(|p| serde_json::to_string(&p).unwrap()),
510 0 : };
511 0 :
512 0 : query.set(update).execute(conn)?;
513 :
514 0 : Ok(())
515 0 : })
516 0 : .await?;
517 :
518 0 : Ok(())
519 0 : }
520 :
521 0 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
522 0 : use crate::schema::tenant_shards::dsl::*;
523 0 : self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
524 0 : let updated = diesel::update(tenant_shards)
525 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
526 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
527 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
528 0 : .set((
529 0 : generation_pageserver.eq(Option::<i64>::None),
530 0 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
531 0 : ))
532 0 : .execute(conn)?;
533 :
534 0 : Ok(updated)
535 0 : })
536 0 : .await?;
537 :
538 0 : Ok(())
539 0 : }
540 :
541 : // When we start shard splitting, we must durably mark the tenant so that
542 : // on restart, we know that we must go through recovery.
543 : //
544 : // We create the child shards here, so that they will be available for increment_generation calls
545 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
546 0 : pub(crate) async fn begin_shard_split(
547 0 : &self,
548 0 : old_shard_count: ShardCount,
549 0 : split_tenant_id: TenantId,
550 0 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
551 0 : ) -> DatabaseResult<()> {
552 0 : use crate::schema::tenant_shards::dsl::*;
553 0 : self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
554 : // Mark parent shards as splitting
555 :
556 0 : let updated = diesel::update(tenant_shards)
557 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
558 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
559 0 : .set((splitting.eq(1),))
560 0 : .execute(conn)?;
561 0 : if u8::try_from(updated)
562 0 : .map_err(|_| DatabaseError::Logical(
563 0 : format!("Overflow existing shard count {} while splitting", updated))
564 0 : )? != old_shard_count.count() {
565 : // Perhaps a deletion or another split raced with this attempt to split, mutating
566 : // the parent shards that we intend to split. In this case the split request should fail.
567 0 : return Err(DatabaseError::Logical(
568 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
569 0 : ));
570 0 : }
571 0 :
572 0 : // FIXME: spurious clone to sidestep closure move rules
573 0 : let parent_to_children = parent_to_children.clone();
574 :
575 : // Insert child shards
576 0 : for (parent_shard_id, children) in parent_to_children {
577 0 : let mut parent = crate::schema::tenant_shards::table
578 0 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
579 0 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
580 0 : .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
581 0 : .load::<TenantShardPersistence>(conn)?;
582 0 : let parent = if parent.len() != 1 {
583 0 : return Err(DatabaseError::Logical(format!(
584 0 : "Parent shard {parent_shard_id} not found"
585 0 : )));
586 : } else {
587 0 : parent.pop().unwrap()
588 : };
589 0 : for mut shard in children {
590 : // Carry the parent's generation into the child
591 0 : shard.generation = parent.generation;
592 0 :
593 0 : debug_assert!(shard.splitting == SplitState::Splitting);
594 0 : diesel::insert_into(tenant_shards)
595 0 : .values(shard)
596 0 : .execute(conn)?;
597 : }
598 : }
599 :
600 0 : Ok(())
601 0 : })
602 0 : .await
603 0 : }
604 :
605 : // When we finish shard splitting, we must atomically clean up the old shards
606 : // and insert the new shards, and clear the splitting marker.
607 0 : pub(crate) async fn complete_shard_split(
608 0 : &self,
609 0 : split_tenant_id: TenantId,
610 0 : old_shard_count: ShardCount,
611 0 : ) -> DatabaseResult<()> {
612 0 : use crate::schema::tenant_shards::dsl::*;
613 0 : self.with_measured_conn(
614 0 : DatabaseOperation::CompleteShardSplit,
615 0 : move |conn| -> DatabaseResult<()> {
616 0 : // Drop parent shards
617 0 : diesel::delete(tenant_shards)
618 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
619 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
620 0 : .execute(conn)?;
621 :
622 : // Clear sharding flag
623 0 : let updated = diesel::update(tenant_shards)
624 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
625 0 : .set((splitting.eq(0),))
626 0 : .execute(conn)?;
627 0 : debug_assert!(updated > 0);
628 :
629 0 : Ok(())
630 0 : },
631 0 : )
632 0 : .await
633 0 : }
634 :
635 : /// Used when the remote part of a shard split failed: we will revert the database state to have only
636 : /// the parent shards, with SplitState::Idle.
637 0 : pub(crate) async fn abort_shard_split(
638 0 : &self,
639 0 : split_tenant_id: TenantId,
640 0 : new_shard_count: ShardCount,
641 0 : ) -> DatabaseResult<AbortShardSplitStatus> {
642 0 : use crate::schema::tenant_shards::dsl::*;
643 0 : self.with_measured_conn(
644 0 : DatabaseOperation::AbortShardSplit,
645 0 : move |conn| -> DatabaseResult<AbortShardSplitStatus> {
646 : // Clear the splitting state on parent shards
647 0 : let updated = diesel::update(tenant_shards)
648 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
649 0 : .filter(shard_count.ne(new_shard_count.literal() as i32))
650 0 : .set((splitting.eq(0),))
651 0 : .execute(conn)?;
652 :
653 : // Parent shards are already gone: we cannot abort.
654 0 : if updated == 0 {
655 0 : return Ok(AbortShardSplitStatus::Complete);
656 0 : }
657 0 :
658 0 : // Sanity check: if parent shards were present, their cardinality should
659 0 : // be less than the number of child shards.
660 0 : if updated >= new_shard_count.count() as usize {
661 0 : return Err(DatabaseError::Logical(format!(
662 0 : "Unexpected parent shard count {updated} while aborting split to \
663 0 : count {new_shard_count:?} on tenant {split_tenant_id}"
664 0 : )));
665 0 : }
666 0 :
667 0 : // Erase child shards
668 0 : diesel::delete(tenant_shards)
669 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
670 0 : .filter(shard_count.eq(new_shard_count.literal() as i32))
671 0 : .execute(conn)?;
672 :
673 0 : Ok(AbortShardSplitStatus::Aborted)
674 0 : },
675 0 : )
676 0 : .await
677 0 : }
678 : }
679 :
680 : /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
681 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
682 : #[diesel(table_name = crate::schema::tenant_shards)]
683 : pub(crate) struct TenantShardPersistence {
684 : #[serde(default)]
685 : pub(crate) tenant_id: String,
686 : #[serde(default)]
687 : pub(crate) shard_number: i32,
688 : #[serde(default)]
689 : pub(crate) shard_count: i32,
690 : #[serde(default)]
691 : pub(crate) shard_stripe_size: i32,
692 :
693 : // Latest generation number: next time we attach, increment this
694 : // and use the incremented number when attaching.
695 : //
696 : // Generation is only None when first onboarding a tenant, where it may
697 : // be in PlacementPolicy::Secondary and therefore have no valid generation state.
698 : pub(crate) generation: Option<i32>,
699 :
700 : // Currently attached pageserver
701 : #[serde(rename = "pageserver")]
702 : pub(crate) generation_pageserver: Option<i64>,
703 :
704 : #[serde(default)]
705 : pub(crate) placement_policy: String,
706 : #[serde(default)]
707 : pub(crate) splitting: SplitState,
708 : #[serde(default)]
709 : pub(crate) config: String,
710 : #[serde(default)]
711 : pub(crate) scheduling_policy: String,
712 : }
713 :
714 : impl TenantShardPersistence {
715 0 : pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
716 0 : if self.shard_count == 0 {
717 0 : Ok(ShardIdentity::unsharded())
718 : } else {
719 0 : Ok(ShardIdentity::new(
720 0 : ShardNumber(self.shard_number as u8),
721 0 : ShardCount::new(self.shard_count as u8),
722 0 : ShardStripeSize(self.shard_stripe_size as u32),
723 0 : )?)
724 : }
725 0 : }
726 :
727 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
728 0 : Ok(TenantShardId {
729 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
730 0 : shard_number: ShardNumber(self.shard_number as u8),
731 0 : shard_count: ShardCount::new(self.shard_count as u8),
732 : })
733 0 : }
734 : }
735 :
736 : /// Parts of [`crate::node::Node`] that are stored durably
737 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
738 : #[diesel(table_name = crate::schema::nodes)]
739 : pub(crate) struct NodePersistence {
740 : pub(crate) node_id: i64,
741 : pub(crate) scheduling_policy: String,
742 : pub(crate) listen_http_addr: String,
743 : pub(crate) listen_http_port: i32,
744 : pub(crate) listen_pg_addr: String,
745 : pub(crate) listen_pg_port: i32,
746 : }
|