Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 :
6 : use self::split_state::SplitState;
7 : use camino::Utf8Path;
8 : use camino::Utf8PathBuf;
9 : use diesel::pg::PgConnection;
10 : use diesel::prelude::*;
11 : use diesel::Connection;
12 : use pageserver_api::controller_api::ShardSchedulingPolicy;
13 : use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
14 : use pageserver_api::models::TenantConfig;
15 : use pageserver_api::shard::ShardConfigError;
16 : use pageserver_api::shard::ShardIdentity;
17 : use pageserver_api::shard::ShardStripeSize;
18 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
19 : use serde::{Deserialize, Serialize};
20 : use utils::generation::Generation;
21 : use utils::id::{NodeId, TenantId};
22 :
23 : use crate::metrics::{
24 : DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
25 : };
26 : use crate::node::Node;
27 :
28 : /// ## What do we store?
29 : ///
30 : /// The storage controller service does not store most of its state durably.
31 : ///
32 : /// The essential things to store durably are:
33 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
34 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
35 : /// - Node's scheduling policies, as the source of truth for these is something external.
36 : ///
37 : /// Other things we store durably as an implementation detail:
38 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
39 : /// but it is operationally simpler to make this service the authority for which nodes
40 : /// it talks to.
41 : ///
42 : /// ## Performance/efficiency
43 : ///
44 : /// The storage controller service does not go via the database for most things: there are
45 : /// a couple of places where we must, and where efficiency matters:
46 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
47 : /// before it can attach a tenant, so this acts as a bound on how fast things like
48 : /// failover can happen.
49 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
50 : /// so it is important to avoid e.g. issuing O(N) queries.
51 : ///
52 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
53 : /// updated, and reads of nodes are always from memory, not the database. We only require that
54 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
55 : pub struct Persistence {
56 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
57 :
58 : // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
59 : // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
60 : // compatible just yet.
61 : json_path: Option<Utf8PathBuf>,
62 : }
63 :
64 : /// Legacy format, for use in JSON compat objects in test environment
65 0 : #[derive(Serialize, Deserialize)]
66 : struct JsonPersistence {
67 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
68 : }
69 :
70 0 : #[derive(thiserror::Error, Debug)]
71 : pub(crate) enum DatabaseError {
72 : #[error(transparent)]
73 : Query(#[from] diesel::result::Error),
74 : #[error(transparent)]
75 : Connection(#[from] diesel::result::ConnectionError),
76 : #[error(transparent)]
77 : ConnectionPool(#[from] r2d2::Error),
78 : #[error("Logical error: {0}")]
79 : Logical(String),
80 : }
81 :
82 : #[derive(measured::FixedCardinalityLabel, Clone)]
83 : pub(crate) enum DatabaseOperation {
84 : InsertNode,
85 : UpdateNode,
86 : DeleteNode,
87 : ListNodes,
88 : BeginShardSplit,
89 : CompleteShardSplit,
90 : AbortShardSplit,
91 : Detach,
92 : ReAttach,
93 : IncrementGeneration,
94 : ListTenantShards,
95 : InsertTenantShards,
96 : UpdateTenantShard,
97 : DeleteTenant,
98 : UpdateTenantConfig,
99 : }
100 :
101 : #[must_use]
102 : pub(crate) enum AbortShardSplitStatus {
103 : /// We aborted the split in the database by reverting to the parent shards
104 : Aborted,
105 : /// The split had already been persisted.
106 : Complete,
107 : }
108 :
109 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
110 :
111 : /// Some methods can operate on either a whole tenant or a single shard
112 : pub(crate) enum TenantFilter {
113 : Tenant(TenantId),
114 : Shard(TenantShardId),
115 : }
116 :
117 : impl Persistence {
118 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
119 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
120 : pub const MAX_CONNECTIONS: u32 = 99;
121 :
122 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
123 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
124 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
125 :
126 0 : pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
127 0 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
128 0 :
129 0 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
130 0 : // to execute queries (database queries are not generally on latency-sensitive paths).
131 0 : let connection_pool = diesel::r2d2::Pool::builder()
132 0 : .max_size(Self::MAX_CONNECTIONS)
133 0 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
134 0 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
135 0 : // Always keep at least one connection ready to go
136 0 : .min_idle(Some(1))
137 0 : .test_on_check_out(true)
138 0 : .build(manager)
139 0 : .expect("Could not build connection pool");
140 0 :
141 0 : Self {
142 0 : connection_pool,
143 0 : json_path,
144 0 : }
145 0 : }
146 :
147 : /// Wraps `with_conn` in order to collect latency and error metrics
148 0 : async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
149 0 : where
150 0 : F: FnOnce(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
151 0 : R: Send + 'static,
152 0 : {
153 0 : let latency = &METRICS_REGISTRY
154 0 : .metrics_group
155 0 : .storage_controller_database_query_latency;
156 0 : let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup {
157 0 : operation: op.clone(),
158 0 : });
159 :
160 0 : let res = self.with_conn(func).await;
161 :
162 0 : if let Err(err) = &res {
163 0 : let error_counter = &METRICS_REGISTRY
164 0 : .metrics_group
165 0 : .storage_controller_database_query_error;
166 0 : error_counter.inc(DatabaseQueryErrorLabelGroup {
167 0 : error_type: err.error_label(),
168 0 : operation: op,
169 0 : })
170 0 : }
171 :
172 0 : res
173 0 : }
174 :
175 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
176 0 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
177 0 : where
178 0 : F: FnOnce(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
179 0 : R: Send + 'static,
180 0 : {
181 0 : let mut conn = self.connection_pool.get()?;
182 0 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
183 0 : .await
184 0 : .expect("Task panic")
185 0 : }
186 :
187 : /// When a node is first registered, persist it before using it for anything
188 0 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
189 0 : let np = node.to_persistent();
190 0 : self.with_measured_conn(
191 0 : DatabaseOperation::InsertNode,
192 0 : move |conn| -> DatabaseResult<()> {
193 0 : diesel::insert_into(crate::schema::nodes::table)
194 0 : .values(&np)
195 0 : .execute(conn)?;
196 0 : Ok(())
197 0 : },
198 0 : )
199 0 : .await
200 0 : }
201 :
202 : /// At startup, populate the list of nodes which our shards may be placed on
203 0 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
204 0 : let nodes: Vec<NodePersistence> = self
205 0 : .with_measured_conn(
206 0 : DatabaseOperation::ListNodes,
207 0 : move |conn| -> DatabaseResult<_> {
208 0 : Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
209 0 : },
210 0 : )
211 0 : .await?;
212 :
213 0 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
214 :
215 0 : Ok(nodes)
216 0 : }
217 :
218 0 : pub(crate) async fn update_node(
219 0 : &self,
220 0 : input_node_id: NodeId,
221 0 : input_scheduling: NodeSchedulingPolicy,
222 0 : ) -> DatabaseResult<()> {
223 : use crate::schema::nodes::dsl::*;
224 0 : let updated = self
225 0 : .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
226 0 : let updated = diesel::update(nodes)
227 0 : .filter(node_id.eq(input_node_id.0 as i64))
228 0 : .set((scheduling_policy.eq(String::from(input_scheduling)),))
229 0 : .execute(conn)?;
230 0 : Ok(updated)
231 0 : })
232 0 : .await?;
233 :
234 0 : if updated != 1 {
235 0 : Err(DatabaseError::Logical(format!(
236 0 : "Node {node_id:?} not found for update",
237 0 : )))
238 : } else {
239 0 : Ok(())
240 : }
241 0 : }
242 :
243 : /// At startup, load the high level state for shards, such as their config + policy. This will
244 : /// be enriched at runtime with state discovered on pageservers.
245 0 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
246 0 : let loaded = self
247 0 : .with_measured_conn(
248 0 : DatabaseOperation::ListTenantShards,
249 0 : move |conn| -> DatabaseResult<_> {
250 0 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
251 0 : },
252 0 : )
253 0 : .await?;
254 :
255 0 : if loaded.is_empty() {
256 0 : if let Some(path) = &self.json_path {
257 0 : if tokio::fs::try_exists(path)
258 0 : .await
259 0 : .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
260 : {
261 0 : tracing::info!("Importing from legacy JSON format at {path}");
262 0 : return self.list_tenant_shards_json(path).await;
263 0 : }
264 0 : }
265 0 : }
266 0 : Ok(loaded)
267 0 : }
268 :
269 : /// Shim for automated compatibility tests: load tenants from a JSON file instead of database
270 0 : pub(crate) async fn list_tenant_shards_json(
271 0 : &self,
272 0 : path: &Utf8Path,
273 0 : ) -> DatabaseResult<Vec<TenantShardPersistence>> {
274 0 : let bytes = tokio::fs::read(path)
275 0 : .await
276 0 : .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
277 :
278 0 : let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
279 0 : .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
280 0 : for shard in decoded.tenants.values_mut() {
281 0 : if shard.placement_policy == "\"Single\"" {
282 0 : // Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165
283 0 : shard.placement_policy = "{\"Attached\":0}".to_string();
284 0 : }
285 :
286 0 : if shard.scheduling_policy.is_empty() {
287 0 : shard.scheduling_policy =
288 0 : serde_json::to_string(&ShardSchedulingPolicy::default()).unwrap();
289 0 : }
290 : }
291 :
292 0 : let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
293 0 :
294 0 : // Synchronize database with what is in the JSON file
295 0 : self.insert_tenant_shards(tenants.clone()).await?;
296 :
297 0 : Ok(tenants)
298 0 : }
299 :
300 : /// For use in testing environments, where we dump out JSON on shutdown.
301 0 : pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
302 0 : let Some(path) = &self.json_path else {
303 0 : anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
304 : };
305 0 : tracing::info!("Writing state to {path}...");
306 0 : let tenants = self.list_tenant_shards().await?;
307 0 : let mut tenants_map = HashMap::new();
308 0 : for tsp in tenants {
309 0 : let tenant_shard_id = TenantShardId {
310 0 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
311 0 : shard_number: ShardNumber(tsp.shard_number as u8),
312 0 : shard_count: ShardCount::new(tsp.shard_count as u8),
313 0 : };
314 0 :
315 0 : tenants_map.insert(tenant_shard_id, tsp);
316 : }
317 0 : let json = serde_json::to_string(&JsonPersistence {
318 0 : tenants: tenants_map,
319 0 : })?;
320 :
321 0 : tokio::fs::write(path, &json).await?;
322 0 : tracing::info!("Wrote {} bytes to {path}...", json.len());
323 :
324 0 : Ok(())
325 0 : }
326 :
327 : /// Tenants must be persisted before we schedule them for the first time. This enables us
328 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
329 0 : pub(crate) async fn insert_tenant_shards(
330 0 : &self,
331 0 : shards: Vec<TenantShardPersistence>,
332 0 : ) -> DatabaseResult<()> {
333 0 : use crate::schema::tenant_shards::dsl::*;
334 0 : self.with_measured_conn(
335 0 : DatabaseOperation::InsertTenantShards,
336 0 : move |conn| -> DatabaseResult<()> {
337 0 : conn.transaction(|conn| -> QueryResult<()> {
338 0 : for tenant in &shards {
339 0 : diesel::insert_into(tenant_shards)
340 0 : .values(tenant)
341 0 : .execute(conn)?;
342 : }
343 0 : Ok(())
344 0 : })?;
345 0 : Ok(())
346 0 : },
347 0 : )
348 0 : .await
349 0 : }
350 :
351 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
352 : /// the tenant from memory on this server.
353 0 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
354 0 : use crate::schema::tenant_shards::dsl::*;
355 0 : self.with_measured_conn(
356 0 : DatabaseOperation::DeleteTenant,
357 0 : move |conn| -> DatabaseResult<()> {
358 0 : diesel::delete(tenant_shards)
359 0 : .filter(tenant_id.eq(del_tenant_id.to_string()))
360 0 : .execute(conn)?;
361 :
362 0 : Ok(())
363 0 : },
364 0 : )
365 0 : .await
366 0 : }
367 :
368 0 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
369 0 : use crate::schema::nodes::dsl::*;
370 0 : self.with_measured_conn(
371 0 : DatabaseOperation::DeleteNode,
372 0 : move |conn| -> DatabaseResult<()> {
373 0 : diesel::delete(nodes)
374 0 : .filter(node_id.eq(del_node_id.0 as i64))
375 0 : .execute(conn)?;
376 :
377 0 : Ok(())
378 0 : },
379 0 : )
380 0 : .await
381 0 : }
382 :
383 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
384 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
385 : /// the node that called /re-attach.
386 0 : #[tracing::instrument(skip_all, fields(node_id))]
387 : pub(crate) async fn re_attach(
388 : &self,
389 : node_id: NodeId,
390 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
391 : use crate::schema::tenant_shards::dsl::*;
392 : let updated = self
393 0 : .with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
394 0 : let rows_updated = diesel::update(tenant_shards)
395 0 : .filter(generation_pageserver.eq(node_id.0 as i64))
396 0 : .set(generation.eq(generation + 1))
397 0 : .execute(conn)?;
398 :
399 0 : tracing::info!("Incremented {} tenants' generations", rows_updated);
400 :
401 : // TODO: UPDATE+SELECT in one query
402 :
403 0 : let updated = tenant_shards
404 0 : .filter(generation_pageserver.eq(node_id.0 as i64))
405 0 : .select(TenantShardPersistence::as_select())
406 0 : .load(conn)?;
407 0 : Ok(updated)
408 0 : })
409 : .await?;
410 :
411 : let mut result = HashMap::new();
412 : for tsp in updated {
413 : let tenant_shard_id = TenantShardId {
414 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
415 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
416 : shard_number: ShardNumber(tsp.shard_number as u8),
417 : shard_count: ShardCount::new(tsp.shard_count as u8),
418 : };
419 :
420 : let Some(g) = tsp.generation else {
421 : // If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
422 : // we only set generation_pageserver when setting generation.
423 : return Err(DatabaseError::Logical(
424 : "Generation should always be set after incrementing".to_string(),
425 : ));
426 : };
427 : result.insert(tenant_shard_id, Generation::new(g as u32));
428 : }
429 :
430 : Ok(result)
431 : }
432 :
433 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
434 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
435 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
436 0 : pub(crate) async fn increment_generation(
437 0 : &self,
438 0 : tenant_shard_id: TenantShardId,
439 0 : node_id: NodeId,
440 0 : ) -> anyhow::Result<Generation> {
441 : use crate::schema::tenant_shards::dsl::*;
442 0 : let updated = self
443 0 : .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
444 0 : let updated = diesel::update(tenant_shards)
445 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
446 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
447 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
448 0 : .set((
449 0 : generation.eq(generation + 1),
450 0 : generation_pageserver.eq(node_id.0 as i64),
451 0 : ))
452 0 : // TODO: only returning() the generation column
453 0 : .returning(TenantShardPersistence::as_returning())
454 0 : .get_result(conn)?;
455 :
456 0 : Ok(updated)
457 0 : })
458 0 : .await?;
459 :
460 : // Generation is always non-null in the rseult: if the generation column had been NULL, then we
461 : // should have experienced an SQL Confilict error while executing a query that tries to increment it.
462 0 : debug_assert!(updated.generation.is_some());
463 0 : let Some(g) = updated.generation else {
464 0 : return Err(DatabaseError::Logical(
465 0 : "Generation should always be set after incrementing".to_string(),
466 0 : )
467 0 : .into());
468 : };
469 :
470 0 : Ok(Generation::new(g as u32))
471 0 : }
472 :
473 : /// For use when updating a persistent property of a tenant, such as its config or placement_policy.
474 : ///
475 : /// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
476 : /// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
477 : /// that we only do the first time a tenant is set to an attached policy via /location_config.
478 0 : pub(crate) async fn update_tenant_shard(
479 0 : &self,
480 0 : tenant: TenantFilter,
481 0 : input_placement_policy: Option<PlacementPolicy>,
482 0 : input_config: Option<TenantConfig>,
483 0 : input_generation: Option<Generation>,
484 0 : input_scheduling_policy: Option<ShardSchedulingPolicy>,
485 0 : ) -> DatabaseResult<()> {
486 0 : use crate::schema::tenant_shards::dsl::*;
487 0 :
488 0 : self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
489 0 : let query = match tenant {
490 0 : TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
491 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
492 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
493 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
494 0 : .into_boxed(),
495 0 : TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
496 0 : .filter(tenant_id.eq(input_tenant_id.to_string()))
497 0 : .into_boxed(),
498 : };
499 :
500 0 : #[derive(AsChangeset)]
501 : #[diesel(table_name = crate::schema::tenant_shards)]
502 : struct ShardUpdate {
503 : generation: Option<i32>,
504 : placement_policy: Option<String>,
505 : config: Option<String>,
506 : scheduling_policy: Option<String>,
507 : }
508 :
509 0 : let update = ShardUpdate {
510 0 : generation: input_generation.map(|g| g.into().unwrap() as i32),
511 0 : placement_policy: input_placement_policy
512 0 : .map(|p| serde_json::to_string(&p).unwrap()),
513 0 : config: input_config.map(|c| serde_json::to_string(&c).unwrap()),
514 0 : scheduling_policy: input_scheduling_policy
515 0 : .map(|p| serde_json::to_string(&p).unwrap()),
516 0 : };
517 0 :
518 0 : query.set(update).execute(conn)?;
519 :
520 0 : Ok(())
521 0 : })
522 0 : .await?;
523 :
524 0 : Ok(())
525 0 : }
526 :
527 0 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
528 0 : use crate::schema::tenant_shards::dsl::*;
529 0 : self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
530 0 : let updated = diesel::update(tenant_shards)
531 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
532 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
533 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
534 0 : .set((
535 0 : generation_pageserver.eq(Option::<i64>::None),
536 0 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
537 0 : ))
538 0 : .execute(conn)?;
539 :
540 0 : Ok(updated)
541 0 : })
542 0 : .await?;
543 :
544 0 : Ok(())
545 0 : }
546 :
547 : // When we start shard splitting, we must durably mark the tenant so that
548 : // on restart, we know that we must go through recovery.
549 : //
550 : // We create the child shards here, so that they will be available for increment_generation calls
551 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
552 0 : pub(crate) async fn begin_shard_split(
553 0 : &self,
554 0 : old_shard_count: ShardCount,
555 0 : split_tenant_id: TenantId,
556 0 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
557 0 : ) -> DatabaseResult<()> {
558 0 : use crate::schema::tenant_shards::dsl::*;
559 0 : self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
560 0 : conn.transaction(|conn| -> DatabaseResult<()> {
561 : // Mark parent shards as splitting
562 :
563 0 : let updated = diesel::update(tenant_shards)
564 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
565 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
566 0 : .set((splitting.eq(1),))
567 0 : .execute(conn)?;
568 0 : if u8::try_from(updated)
569 0 : .map_err(|_| DatabaseError::Logical(
570 0 : format!("Overflow existing shard count {} while splitting", updated))
571 0 : )? != old_shard_count.count() {
572 : // Perhaps a deletion or another split raced with this attempt to split, mutating
573 : // the parent shards that we intend to split. In this case the split request should fail.
574 0 : return Err(DatabaseError::Logical(
575 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
576 0 : ));
577 0 : }
578 0 :
579 0 : // FIXME: spurious clone to sidestep closure move rules
580 0 : let parent_to_children = parent_to_children.clone();
581 :
582 : // Insert child shards
583 0 : for (parent_shard_id, children) in parent_to_children {
584 0 : let mut parent = crate::schema::tenant_shards::table
585 0 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
586 0 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
587 0 : .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
588 0 : .load::<TenantShardPersistence>(conn)?;
589 0 : let parent = if parent.len() != 1 {
590 0 : return Err(DatabaseError::Logical(format!(
591 0 : "Parent shard {parent_shard_id} not found"
592 0 : )));
593 : } else {
594 0 : parent.pop().unwrap()
595 : };
596 0 : for mut shard in children {
597 : // Carry the parent's generation into the child
598 0 : shard.generation = parent.generation;
599 0 :
600 0 : debug_assert!(shard.splitting == SplitState::Splitting);
601 0 : diesel::insert_into(tenant_shards)
602 0 : .values(shard)
603 0 : .execute(conn)?;
604 : }
605 : }
606 :
607 0 : Ok(())
608 0 : })?;
609 :
610 0 : Ok(())
611 0 : })
612 0 : .await
613 0 : }
614 :
615 : // When we finish shard splitting, we must atomically clean up the old shards
616 : // and insert the new shards, and clear the splitting marker.
617 0 : pub(crate) async fn complete_shard_split(
618 0 : &self,
619 0 : split_tenant_id: TenantId,
620 0 : old_shard_count: ShardCount,
621 0 : ) -> DatabaseResult<()> {
622 0 : use crate::schema::tenant_shards::dsl::*;
623 0 : self.with_measured_conn(
624 0 : DatabaseOperation::CompleteShardSplit,
625 0 : move |conn| -> DatabaseResult<()> {
626 0 : conn.transaction(|conn| -> QueryResult<()> {
627 0 : // Drop parent shards
628 0 : diesel::delete(tenant_shards)
629 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
630 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
631 0 : .execute(conn)?;
632 :
633 : // Clear sharding flag
634 0 : let updated = diesel::update(tenant_shards)
635 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
636 0 : .set((splitting.eq(0),))
637 0 : .execute(conn)?;
638 0 : debug_assert!(updated > 0);
639 :
640 0 : Ok(())
641 0 : })?;
642 :
643 0 : Ok(())
644 0 : },
645 0 : )
646 0 : .await
647 0 : }
648 :
649 : /// Used when the remote part of a shard split failed: we will revert the database state to have only
650 : /// the parent shards, with SplitState::Idle.
651 0 : pub(crate) async fn abort_shard_split(
652 0 : &self,
653 0 : split_tenant_id: TenantId,
654 0 : new_shard_count: ShardCount,
655 0 : ) -> DatabaseResult<AbortShardSplitStatus> {
656 0 : use crate::schema::tenant_shards::dsl::*;
657 0 : self.with_measured_conn(
658 0 : DatabaseOperation::AbortShardSplit,
659 0 : move |conn| -> DatabaseResult<AbortShardSplitStatus> {
660 0 : let aborted =
661 0 : conn.transaction(|conn| -> DatabaseResult<AbortShardSplitStatus> {
662 : // Clear the splitting state on parent shards
663 0 : let updated = diesel::update(tenant_shards)
664 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
665 0 : .filter(shard_count.ne(new_shard_count.literal() as i32))
666 0 : .set((splitting.eq(0),))
667 0 : .execute(conn)?;
668 :
669 : // Parent shards are already gone: we cannot abort.
670 0 : if updated == 0 {
671 0 : return Ok(AbortShardSplitStatus::Complete);
672 0 : }
673 0 :
674 0 : // Sanity check: if parent shards were present, their cardinality should
675 0 : // be less than the number of child shards.
676 0 : if updated >= new_shard_count.count() as usize {
677 0 : return Err(DatabaseError::Logical(format!(
678 0 : "Unexpected parent shard count {updated} while aborting split to \
679 0 : count {new_shard_count:?} on tenant {split_tenant_id}"
680 0 : )));
681 0 : }
682 0 :
683 0 : // Erase child shards
684 0 : diesel::delete(tenant_shards)
685 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
686 0 : .filter(shard_count.eq(new_shard_count.literal() as i32))
687 0 : .execute(conn)?;
688 :
689 0 : Ok(AbortShardSplitStatus::Aborted)
690 0 : })?;
691 :
692 0 : Ok(aborted)
693 0 : },
694 0 : )
695 0 : .await
696 0 : }
697 : }
698 :
699 : /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
700 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
701 : #[diesel(table_name = crate::schema::tenant_shards)]
702 : pub(crate) struct TenantShardPersistence {
703 : #[serde(default)]
704 : pub(crate) tenant_id: String,
705 : #[serde(default)]
706 : pub(crate) shard_number: i32,
707 : #[serde(default)]
708 : pub(crate) shard_count: i32,
709 : #[serde(default)]
710 : pub(crate) shard_stripe_size: i32,
711 :
712 : // Latest generation number: next time we attach, increment this
713 : // and use the incremented number when attaching.
714 : //
715 : // Generation is only None when first onboarding a tenant, where it may
716 : // be in PlacementPolicy::Secondary and therefore have no valid generation state.
717 : pub(crate) generation: Option<i32>,
718 :
719 : // Currently attached pageserver
720 : #[serde(rename = "pageserver")]
721 : pub(crate) generation_pageserver: Option<i64>,
722 :
723 : #[serde(default)]
724 : pub(crate) placement_policy: String,
725 : #[serde(default)]
726 : pub(crate) splitting: SplitState,
727 : #[serde(default)]
728 : pub(crate) config: String,
729 : #[serde(default)]
730 : pub(crate) scheduling_policy: String,
731 : }
732 :
733 : impl TenantShardPersistence {
734 0 : pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
735 0 : if self.shard_count == 0 {
736 0 : Ok(ShardIdentity::unsharded())
737 : } else {
738 0 : Ok(ShardIdentity::new(
739 0 : ShardNumber(self.shard_number as u8),
740 0 : ShardCount::new(self.shard_count as u8),
741 0 : ShardStripeSize(self.shard_stripe_size as u32),
742 0 : )?)
743 : }
744 0 : }
745 :
746 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
747 0 : Ok(TenantShardId {
748 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
749 0 : shard_number: ShardNumber(self.shard_number as u8),
750 0 : shard_count: ShardCount::new(self.shard_count as u8),
751 : })
752 0 : }
753 : }
754 :
755 : /// Parts of [`crate::node::Node`] that are stored durably
756 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
757 : #[diesel(table_name = crate::schema::nodes)]
758 : pub(crate) struct NodePersistence {
759 : pub(crate) node_id: i64,
760 : pub(crate) scheduling_policy: String,
761 : pub(crate) listen_http_addr: String,
762 : pub(crate) listen_http_port: i32,
763 : pub(crate) listen_pg_addr: String,
764 : pub(crate) listen_pg_port: i32,
765 : }
|