Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 : use std::time::Instant;
6 :
7 : use self::split_state::SplitState;
8 : use camino::Utf8Path;
9 : use camino::Utf8PathBuf;
10 : use diesel::pg::PgConnection;
11 : use diesel::prelude::*;
12 : use diesel::Connection;
13 : use pageserver_api::controller_api::ShardSchedulingPolicy;
14 : use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
15 : use pageserver_api::models::TenantConfig;
16 : use pageserver_api::shard::ShardConfigError;
17 : use pageserver_api::shard::ShardIdentity;
18 : use pageserver_api::shard::ShardStripeSize;
19 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
20 : use serde::{Deserialize, Serialize};
21 : use utils::generation::Generation;
22 : use utils::id::{NodeId, TenantId};
23 :
24 : use crate::metrics::{
25 : DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
26 : };
27 : use crate::node::Node;
28 :
29 : /// ## What do we store?
30 : ///
31 : /// The storage controller service does not store most of its state durably.
32 : ///
33 : /// The essential things to store durably are:
34 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
35 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
36 : /// - Node's scheduling policies, as the source of truth for these is something external.
37 : ///
38 : /// Other things we store durably as an implementation detail:
39 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
40 : /// but it is operationally simpler to make this service the authority for which nodes
41 : /// it talks to.
42 : ///
43 : /// ## Performance/efficiency
44 : ///
45 : /// The storage controller service does not go via the database for most things: there are
46 : /// a couple of places where we must, and where efficiency matters:
47 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
48 : /// before it can attach a tenant, so this acts as a bound on how fast things like
49 : /// failover can happen.
50 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
51 : /// so it is important to avoid e.g. issuing O(N) queries.
52 : ///
53 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
54 : /// updated, and reads of nodes are always from memory, not the database. We only require that
55 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
56 : pub struct Persistence {
57 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
58 :
59 : // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
60 : // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
61 : // compatible just yet.
62 : json_path: Option<Utf8PathBuf>,
63 : }
64 :
65 : /// Legacy format, for use in JSON compat objects in test environment
66 0 : #[derive(Serialize, Deserialize)]
67 : struct JsonPersistence {
68 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
69 : }
70 :
71 0 : #[derive(thiserror::Error, Debug)]
72 : pub(crate) enum DatabaseError {
73 : #[error(transparent)]
74 : Query(#[from] diesel::result::Error),
75 : #[error(transparent)]
76 : Connection(#[from] diesel::result::ConnectionError),
77 : #[error(transparent)]
78 : ConnectionPool(#[from] r2d2::Error),
79 : #[error("Logical error: {0}")]
80 : Logical(String),
81 : }
82 :
83 : #[derive(measured::FixedCardinalityLabel, Copy, Clone)]
84 : pub(crate) enum DatabaseOperation {
85 : InsertNode,
86 : UpdateNode,
87 : DeleteNode,
88 : ListNodes,
89 : BeginShardSplit,
90 : CompleteShardSplit,
91 : AbortShardSplit,
92 : Detach,
93 : ReAttach,
94 : IncrementGeneration,
95 : ListTenantShards,
96 : InsertTenantShards,
97 : UpdateTenantShard,
98 : DeleteTenant,
99 : UpdateTenantConfig,
100 : }
101 :
102 : #[must_use]
103 : pub(crate) enum AbortShardSplitStatus {
104 : /// We aborted the split in the database by reverting to the parent shards
105 : Aborted,
106 : /// The split had already been persisted.
107 : Complete,
108 : }
109 :
110 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
111 :
112 : /// Some methods can operate on either a whole tenant or a single shard
113 : pub(crate) enum TenantFilter {
114 : Tenant(TenantId),
115 : Shard(TenantShardId),
116 : }
117 :
118 : impl Persistence {
119 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
120 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
121 : pub const MAX_CONNECTIONS: u32 = 99;
122 :
123 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
124 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
125 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
126 :
127 0 : pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
128 0 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
129 0 :
130 0 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
131 0 : // to execute queries (database queries are not generally on latency-sensitive paths).
132 0 : let connection_pool = diesel::r2d2::Pool::builder()
133 0 : .max_size(Self::MAX_CONNECTIONS)
134 0 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
135 0 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
136 0 : // Always keep at least one connection ready to go
137 0 : .min_idle(Some(1))
138 0 : .test_on_check_out(true)
139 0 : .build(manager)
140 0 : .expect("Could not build connection pool");
141 0 :
142 0 : Self {
143 0 : connection_pool,
144 0 : json_path,
145 0 : }
146 0 : }
147 :
148 : /// A helper for use during startup, where we would like to tolerate concurrent restarts of the
149 : /// database and the storage controller, therefore the database might not be available right away
150 0 : pub async fn await_connection(
151 0 : database_url: &str,
152 0 : timeout: Duration,
153 0 : ) -> Result<(), diesel::ConnectionError> {
154 0 : let started_at = Instant::now();
155 0 : loop {
156 0 : match PgConnection::establish(database_url) {
157 : Ok(_) => {
158 0 : tracing::info!("Connected to database.");
159 0 : return Ok(());
160 : }
161 0 : Err(e) => {
162 0 : if started_at.elapsed() > timeout {
163 0 : return Err(e);
164 : } else {
165 0 : tracing::info!("Database not yet available, waiting... ({e})");
166 0 : tokio::time::sleep(Duration::from_millis(100)).await;
167 : }
168 : }
169 : }
170 : }
171 0 : }
172 :
173 : /// Wraps `with_conn` in order to collect latency and error metrics
174 0 : async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
175 0 : where
176 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
177 0 : R: Send + 'static,
178 0 : {
179 0 : let latency = &METRICS_REGISTRY
180 0 : .metrics_group
181 0 : .storage_controller_database_query_latency;
182 0 : let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup { operation: op });
183 :
184 0 : let res = self.with_conn(func).await;
185 :
186 0 : if let Err(err) = &res {
187 0 : let error_counter = &METRICS_REGISTRY
188 0 : .metrics_group
189 0 : .storage_controller_database_query_error;
190 0 : error_counter.inc(DatabaseQueryErrorLabelGroup {
191 0 : error_type: err.error_label(),
192 0 : operation: op,
193 0 : })
194 0 : }
195 :
196 0 : res
197 0 : }
198 :
199 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
200 0 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
201 0 : where
202 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
203 0 : R: Send + 'static,
204 0 : {
205 : // A generous allowance for how many times we may retry serializable transactions
206 : // before giving up. This is not expected to be hit: it is a defensive measure in case we
207 : // somehow engineer a situation where duelling transactions might otherwise live-lock.
208 : const MAX_RETRIES: usize = 128;
209 :
210 0 : let mut conn = self.connection_pool.get()?;
211 0 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
212 0 : let mut retry_count = 0;
213 : loop {
214 0 : match conn.build_transaction().serializable().run(|c| func(c)) {
215 0 : Ok(r) => break Ok(r),
216 : Err(
217 0 : err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
218 0 : diesel::result::DatabaseErrorKind::SerializationFailure,
219 0 : _,
220 0 : )),
221 0 : ) => {
222 0 : retry_count += 1;
223 0 : if retry_count > MAX_RETRIES {
224 0 : tracing::error!(
225 0 : "Exceeded max retries on SerializationFailure errors: {err:?}"
226 : );
227 0 : break Err(err);
228 : } else {
229 : // Retry on serialization errors: these are expected, because even though our
230 : // transactions don't fight for the same rows, they will occasionally collide
231 : // on index pages (e.g. increment_generation for unrelated shards can collide)
232 0 : tracing::debug!(
233 0 : "Retrying transaction on serialization failure {err:?}"
234 : );
235 0 : continue;
236 : }
237 : }
238 0 : Err(e) => break Err(e),
239 : }
240 : }
241 0 : })
242 0 : .await
243 0 : .expect("Task panic")
244 0 : }
245 :
246 : /// When a node is first registered, persist it before using it for anything
247 0 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
248 0 : let np = node.to_persistent();
249 0 : self.with_measured_conn(
250 0 : DatabaseOperation::InsertNode,
251 0 : move |conn| -> DatabaseResult<()> {
252 0 : diesel::insert_into(crate::schema::nodes::table)
253 0 : .values(&np)
254 0 : .execute(conn)?;
255 0 : Ok(())
256 0 : },
257 0 : )
258 0 : .await
259 0 : }
260 :
261 : /// At startup, populate the list of nodes which our shards may be placed on
262 0 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
263 0 : let nodes: Vec<NodePersistence> = self
264 0 : .with_measured_conn(
265 0 : DatabaseOperation::ListNodes,
266 0 : move |conn| -> DatabaseResult<_> {
267 0 : Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
268 0 : },
269 0 : )
270 0 : .await?;
271 :
272 0 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
273 :
274 0 : Ok(nodes)
275 0 : }
276 :
277 0 : pub(crate) async fn update_node(
278 0 : &self,
279 0 : input_node_id: NodeId,
280 0 : input_scheduling: NodeSchedulingPolicy,
281 0 : ) -> DatabaseResult<()> {
282 : use crate::schema::nodes::dsl::*;
283 0 : let updated = self
284 0 : .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
285 0 : let updated = diesel::update(nodes)
286 0 : .filter(node_id.eq(input_node_id.0 as i64))
287 0 : .set((scheduling_policy.eq(String::from(input_scheduling)),))
288 0 : .execute(conn)?;
289 0 : Ok(updated)
290 0 : })
291 0 : .await?;
292 :
293 0 : if updated != 1 {
294 0 : Err(DatabaseError::Logical(format!(
295 0 : "Node {node_id:?} not found for update",
296 0 : )))
297 : } else {
298 0 : Ok(())
299 : }
300 0 : }
301 :
302 : /// At startup, load the high level state for shards, such as their config + policy. This will
303 : /// be enriched at runtime with state discovered on pageservers.
304 0 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
305 0 : let loaded = self
306 0 : .with_measured_conn(
307 0 : DatabaseOperation::ListTenantShards,
308 0 : move |conn| -> DatabaseResult<_> {
309 0 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
310 0 : },
311 0 : )
312 0 : .await?;
313 :
314 0 : if loaded.is_empty() {
315 0 : if let Some(path) = &self.json_path {
316 0 : if tokio::fs::try_exists(path)
317 0 : .await
318 0 : .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
319 : {
320 0 : tracing::info!("Importing from legacy JSON format at {path}");
321 0 : return self.list_tenant_shards_json(path).await;
322 0 : }
323 0 : }
324 0 : }
325 0 : Ok(loaded)
326 0 : }
327 :
328 : /// Shim for automated compatibility tests: load tenants from a JSON file instead of database
329 0 : pub(crate) async fn list_tenant_shards_json(
330 0 : &self,
331 0 : path: &Utf8Path,
332 0 : ) -> DatabaseResult<Vec<TenantShardPersistence>> {
333 0 : let bytes = tokio::fs::read(path)
334 0 : .await
335 0 : .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
336 :
337 0 : let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
338 0 : .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
339 0 : for shard in decoded.tenants.values_mut() {
340 0 : if shard.placement_policy == "\"Single\"" {
341 0 : // Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165
342 0 : shard.placement_policy = "{\"Attached\":0}".to_string();
343 0 : }
344 :
345 0 : if shard.scheduling_policy.is_empty() {
346 0 : shard.scheduling_policy =
347 0 : serde_json::to_string(&ShardSchedulingPolicy::default()).unwrap();
348 0 : }
349 : }
350 :
351 0 : let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
352 0 :
353 0 : // Synchronize database with what is in the JSON file
354 0 : self.insert_tenant_shards(tenants.clone()).await?;
355 :
356 0 : Ok(tenants)
357 0 : }
358 :
359 : /// For use in testing environments, where we dump out JSON on shutdown.
360 0 : pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
361 0 : let Some(path) = &self.json_path else {
362 0 : anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
363 : };
364 0 : tracing::info!("Writing state to {path}...");
365 0 : let tenants = self.list_tenant_shards().await?;
366 0 : let mut tenants_map = HashMap::new();
367 0 : for tsp in tenants {
368 0 : let tenant_shard_id = TenantShardId {
369 0 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
370 0 : shard_number: ShardNumber(tsp.shard_number as u8),
371 0 : shard_count: ShardCount::new(tsp.shard_count as u8),
372 0 : };
373 0 :
374 0 : tenants_map.insert(tenant_shard_id, tsp);
375 : }
376 0 : let json = serde_json::to_string(&JsonPersistence {
377 0 : tenants: tenants_map,
378 0 : })?;
379 :
380 0 : tokio::fs::write(path, &json).await?;
381 0 : tracing::info!("Wrote {} bytes to {path}...", json.len());
382 :
383 0 : Ok(())
384 0 : }
385 :
386 : /// Tenants must be persisted before we schedule them for the first time. This enables us
387 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
388 0 : pub(crate) async fn insert_tenant_shards(
389 0 : &self,
390 0 : shards: Vec<TenantShardPersistence>,
391 0 : ) -> DatabaseResult<()> {
392 0 : use crate::schema::tenant_shards::dsl::*;
393 0 : self.with_measured_conn(
394 0 : DatabaseOperation::InsertTenantShards,
395 0 : move |conn| -> DatabaseResult<()> {
396 0 : for tenant in &shards {
397 0 : diesel::insert_into(tenant_shards)
398 0 : .values(tenant)
399 0 : .execute(conn)?;
400 : }
401 0 : Ok(())
402 0 : },
403 0 : )
404 0 : .await
405 0 : }
406 :
407 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
408 : /// the tenant from memory on this server.
409 0 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
410 0 : use crate::schema::tenant_shards::dsl::*;
411 0 : self.with_measured_conn(
412 0 : DatabaseOperation::DeleteTenant,
413 0 : move |conn| -> DatabaseResult<()> {
414 0 : diesel::delete(tenant_shards)
415 0 : .filter(tenant_id.eq(del_tenant_id.to_string()))
416 0 : .execute(conn)?;
417 :
418 0 : Ok(())
419 0 : },
420 0 : )
421 0 : .await
422 0 : }
423 :
424 0 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
425 0 : use crate::schema::nodes::dsl::*;
426 0 : self.with_measured_conn(
427 0 : DatabaseOperation::DeleteNode,
428 0 : move |conn| -> DatabaseResult<()> {
429 0 : diesel::delete(nodes)
430 0 : .filter(node_id.eq(del_node_id.0 as i64))
431 0 : .execute(conn)?;
432 :
433 0 : Ok(())
434 0 : },
435 0 : )
436 0 : .await
437 0 : }
438 :
439 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
440 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
441 : /// the node that called /re-attach.
442 0 : #[tracing::instrument(skip_all, fields(node_id))]
443 : pub(crate) async fn re_attach(
444 : &self,
445 : input_node_id: NodeId,
446 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
447 : use crate::schema::nodes::dsl::scheduling_policy;
448 : use crate::schema::nodes::dsl::*;
449 : use crate::schema::tenant_shards::dsl::*;
450 : let updated = self
451 0 : .with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
452 0 : let rows_updated = diesel::update(tenant_shards)
453 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
454 0 : .set(generation.eq(generation + 1))
455 0 : .execute(conn)?;
456 :
457 0 : tracing::info!("Incremented {} tenants' generations", rows_updated);
458 :
459 : // TODO: UPDATE+SELECT in one query
460 :
461 0 : let updated = tenant_shards
462 0 : .filter(generation_pageserver.eq(input_node_id.0 as i64))
463 0 : .select(TenantShardPersistence::as_select())
464 0 : .load(conn)?;
465 :
466 : // If the node went through a drain and restart phase before re-attaching,
467 : // then reset it's node scheduling policy to active.
468 0 : diesel::update(nodes)
469 0 : .filter(node_id.eq(input_node_id.0 as i64))
470 0 : .filter(
471 0 : scheduling_policy
472 0 : .eq(String::from(NodeSchedulingPolicy::PauseForRestart))
473 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
474 0 : .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
475 0 : )
476 0 : .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
477 0 : .execute(conn)?;
478 :
479 0 : Ok(updated)
480 0 : })
481 : .await?;
482 :
483 : let mut result = HashMap::new();
484 : for tsp in updated {
485 : let tenant_shard_id = TenantShardId {
486 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
487 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
488 : shard_number: ShardNumber(tsp.shard_number as u8),
489 : shard_count: ShardCount::new(tsp.shard_count as u8),
490 : };
491 :
492 : let Some(g) = tsp.generation else {
493 : // If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
494 : // we only set generation_pageserver when setting generation.
495 : return Err(DatabaseError::Logical(
496 : "Generation should always be set after incrementing".to_string(),
497 : ));
498 : };
499 : result.insert(tenant_shard_id, Generation::new(g as u32));
500 : }
501 :
502 : Ok(result)
503 : }
504 :
505 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
506 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
507 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
508 0 : pub(crate) async fn increment_generation(
509 0 : &self,
510 0 : tenant_shard_id: TenantShardId,
511 0 : node_id: NodeId,
512 0 : ) -> anyhow::Result<Generation> {
513 : use crate::schema::tenant_shards::dsl::*;
514 0 : let updated = self
515 0 : .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
516 0 : let updated = diesel::update(tenant_shards)
517 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
518 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
519 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
520 0 : .set((
521 0 : generation.eq(generation + 1),
522 0 : generation_pageserver.eq(node_id.0 as i64),
523 0 : ))
524 0 : // TODO: only returning() the generation column
525 0 : .returning(TenantShardPersistence::as_returning())
526 0 : .get_result(conn)?;
527 :
528 0 : Ok(updated)
529 0 : })
530 0 : .await?;
531 :
532 : // Generation is always non-null in the rseult: if the generation column had been NULL, then we
533 : // should have experienced an SQL Confilict error while executing a query that tries to increment it.
534 0 : debug_assert!(updated.generation.is_some());
535 0 : let Some(g) = updated.generation else {
536 0 : return Err(DatabaseError::Logical(
537 0 : "Generation should always be set after incrementing".to_string(),
538 0 : )
539 0 : .into());
540 : };
541 :
542 0 : Ok(Generation::new(g as u32))
543 0 : }
544 :
545 : /// For use when updating a persistent property of a tenant, such as its config or placement_policy.
546 : ///
547 : /// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
548 : /// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
549 : /// that we only do the first time a tenant is set to an attached policy via /location_config.
550 0 : pub(crate) async fn update_tenant_shard(
551 0 : &self,
552 0 : tenant: TenantFilter,
553 0 : input_placement_policy: Option<PlacementPolicy>,
554 0 : input_config: Option<TenantConfig>,
555 0 : input_generation: Option<Generation>,
556 0 : input_scheduling_policy: Option<ShardSchedulingPolicy>,
557 0 : ) -> DatabaseResult<()> {
558 0 : use crate::schema::tenant_shards::dsl::*;
559 0 :
560 0 : self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
561 0 : let query = match tenant {
562 0 : TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
563 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
564 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
565 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
566 0 : .into_boxed(),
567 0 : TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
568 0 : .filter(tenant_id.eq(input_tenant_id.to_string()))
569 0 : .into_boxed(),
570 : };
571 :
572 0 : #[derive(AsChangeset)]
573 : #[diesel(table_name = crate::schema::tenant_shards)]
574 : struct ShardUpdate {
575 : generation: Option<i32>,
576 : placement_policy: Option<String>,
577 : config: Option<String>,
578 : scheduling_policy: Option<String>,
579 : }
580 :
581 0 : let update = ShardUpdate {
582 0 : generation: input_generation.map(|g| g.into().unwrap() as i32),
583 0 : placement_policy: input_placement_policy
584 0 : .as_ref()
585 0 : .map(|p| serde_json::to_string(&p).unwrap()),
586 0 : config: input_config
587 0 : .as_ref()
588 0 : .map(|c| serde_json::to_string(&c).unwrap()),
589 0 : scheduling_policy: input_scheduling_policy
590 0 : .map(|p| serde_json::to_string(&p).unwrap()),
591 0 : };
592 0 :
593 0 : query.set(update).execute(conn)?;
594 :
595 0 : Ok(())
596 0 : })
597 0 : .await?;
598 :
599 0 : Ok(())
600 0 : }
601 :
602 0 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
603 0 : use crate::schema::tenant_shards::dsl::*;
604 0 : self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
605 0 : let updated = diesel::update(tenant_shards)
606 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
607 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
608 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
609 0 : .set((
610 0 : generation_pageserver.eq(Option::<i64>::None),
611 0 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
612 0 : ))
613 0 : .execute(conn)?;
614 :
615 0 : Ok(updated)
616 0 : })
617 0 : .await?;
618 :
619 0 : Ok(())
620 0 : }
621 :
622 : // When we start shard splitting, we must durably mark the tenant so that
623 : // on restart, we know that we must go through recovery.
624 : //
625 : // We create the child shards here, so that they will be available for increment_generation calls
626 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
627 0 : pub(crate) async fn begin_shard_split(
628 0 : &self,
629 0 : old_shard_count: ShardCount,
630 0 : split_tenant_id: TenantId,
631 0 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
632 0 : ) -> DatabaseResult<()> {
633 0 : use crate::schema::tenant_shards::dsl::*;
634 0 : self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
635 : // Mark parent shards as splitting
636 :
637 0 : let updated = diesel::update(tenant_shards)
638 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
639 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
640 0 : .set((splitting.eq(1),))
641 0 : .execute(conn)?;
642 0 : if u8::try_from(updated)
643 0 : .map_err(|_| DatabaseError::Logical(
644 0 : format!("Overflow existing shard count {} while splitting", updated))
645 0 : )? != old_shard_count.count() {
646 : // Perhaps a deletion or another split raced with this attempt to split, mutating
647 : // the parent shards that we intend to split. In this case the split request should fail.
648 0 : return Err(DatabaseError::Logical(
649 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
650 0 : ));
651 0 : }
652 0 :
653 0 : // FIXME: spurious clone to sidestep closure move rules
654 0 : let parent_to_children = parent_to_children.clone();
655 :
656 : // Insert child shards
657 0 : for (parent_shard_id, children) in parent_to_children {
658 0 : let mut parent = crate::schema::tenant_shards::table
659 0 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
660 0 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
661 0 : .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
662 0 : .load::<TenantShardPersistence>(conn)?;
663 0 : let parent = if parent.len() != 1 {
664 0 : return Err(DatabaseError::Logical(format!(
665 0 : "Parent shard {parent_shard_id} not found"
666 0 : )));
667 : } else {
668 0 : parent.pop().unwrap()
669 : };
670 0 : for mut shard in children {
671 : // Carry the parent's generation into the child
672 0 : shard.generation = parent.generation;
673 0 :
674 0 : debug_assert!(shard.splitting == SplitState::Splitting);
675 0 : diesel::insert_into(tenant_shards)
676 0 : .values(shard)
677 0 : .execute(conn)?;
678 : }
679 : }
680 :
681 0 : Ok(())
682 0 : })
683 0 : .await
684 0 : }
685 :
686 : // When we finish shard splitting, we must atomically clean up the old shards
687 : // and insert the new shards, and clear the splitting marker.
688 0 : pub(crate) async fn complete_shard_split(
689 0 : &self,
690 0 : split_tenant_id: TenantId,
691 0 : old_shard_count: ShardCount,
692 0 : ) -> DatabaseResult<()> {
693 0 : use crate::schema::tenant_shards::dsl::*;
694 0 : self.with_measured_conn(
695 0 : DatabaseOperation::CompleteShardSplit,
696 0 : move |conn| -> DatabaseResult<()> {
697 0 : // Drop parent shards
698 0 : diesel::delete(tenant_shards)
699 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
700 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
701 0 : .execute(conn)?;
702 :
703 : // Clear sharding flag
704 0 : let updated = diesel::update(tenant_shards)
705 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
706 0 : .set((splitting.eq(0),))
707 0 : .execute(conn)?;
708 0 : debug_assert!(updated > 0);
709 :
710 0 : Ok(())
711 0 : },
712 0 : )
713 0 : .await
714 0 : }
715 :
716 : /// Used when the remote part of a shard split failed: we will revert the database state to have only
717 : /// the parent shards, with SplitState::Idle.
718 0 : pub(crate) async fn abort_shard_split(
719 0 : &self,
720 0 : split_tenant_id: TenantId,
721 0 : new_shard_count: ShardCount,
722 0 : ) -> DatabaseResult<AbortShardSplitStatus> {
723 0 : use crate::schema::tenant_shards::dsl::*;
724 0 : self.with_measured_conn(
725 0 : DatabaseOperation::AbortShardSplit,
726 0 : move |conn| -> DatabaseResult<AbortShardSplitStatus> {
727 : // Clear the splitting state on parent shards
728 0 : let updated = diesel::update(tenant_shards)
729 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
730 0 : .filter(shard_count.ne(new_shard_count.literal() as i32))
731 0 : .set((splitting.eq(0),))
732 0 : .execute(conn)?;
733 :
734 : // Parent shards are already gone: we cannot abort.
735 0 : if updated == 0 {
736 0 : return Ok(AbortShardSplitStatus::Complete);
737 0 : }
738 0 :
739 0 : // Sanity check: if parent shards were present, their cardinality should
740 0 : // be less than the number of child shards.
741 0 : if updated >= new_shard_count.count() as usize {
742 0 : return Err(DatabaseError::Logical(format!(
743 0 : "Unexpected parent shard count {updated} while aborting split to \
744 0 : count {new_shard_count:?} on tenant {split_tenant_id}"
745 0 : )));
746 0 : }
747 0 :
748 0 : // Erase child shards
749 0 : diesel::delete(tenant_shards)
750 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
751 0 : .filter(shard_count.eq(new_shard_count.literal() as i32))
752 0 : .execute(conn)?;
753 :
754 0 : Ok(AbortShardSplitStatus::Aborted)
755 0 : },
756 0 : )
757 0 : .await
758 0 : }
759 : }
760 :
761 : /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
762 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
763 : #[diesel(table_name = crate::schema::tenant_shards)]
764 : pub(crate) struct TenantShardPersistence {
765 : #[serde(default)]
766 : pub(crate) tenant_id: String,
767 : #[serde(default)]
768 : pub(crate) shard_number: i32,
769 : #[serde(default)]
770 : pub(crate) shard_count: i32,
771 : #[serde(default)]
772 : pub(crate) shard_stripe_size: i32,
773 :
774 : // Latest generation number: next time we attach, increment this
775 : // and use the incremented number when attaching.
776 : //
777 : // Generation is only None when first onboarding a tenant, where it may
778 : // be in PlacementPolicy::Secondary and therefore have no valid generation state.
779 : pub(crate) generation: Option<i32>,
780 :
781 : // Currently attached pageserver
782 : #[serde(rename = "pageserver")]
783 : pub(crate) generation_pageserver: Option<i64>,
784 :
785 : #[serde(default)]
786 : pub(crate) placement_policy: String,
787 : #[serde(default)]
788 : pub(crate) splitting: SplitState,
789 : #[serde(default)]
790 : pub(crate) config: String,
791 : #[serde(default)]
792 : pub(crate) scheduling_policy: String,
793 : }
794 :
795 : impl TenantShardPersistence {
796 0 : pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
797 0 : if self.shard_count == 0 {
798 0 : Ok(ShardIdentity::unsharded())
799 : } else {
800 0 : Ok(ShardIdentity::new(
801 0 : ShardNumber(self.shard_number as u8),
802 0 : ShardCount::new(self.shard_count as u8),
803 0 : ShardStripeSize(self.shard_stripe_size as u32),
804 0 : )?)
805 : }
806 0 : }
807 :
808 0 : pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
809 0 : Ok(TenantShardId {
810 0 : tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
811 0 : shard_number: ShardNumber(self.shard_number as u8),
812 0 : shard_count: ShardCount::new(self.shard_count as u8),
813 : })
814 0 : }
815 : }
816 :
817 : /// Parts of [`crate::node::Node`] that are stored durably
818 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
819 : #[diesel(table_name = crate::schema::nodes)]
820 : pub(crate) struct NodePersistence {
821 : pub(crate) node_id: i64,
822 : pub(crate) scheduling_policy: String,
823 : pub(crate) listen_http_addr: String,
824 : pub(crate) listen_http_port: i32,
825 : pub(crate) listen_pg_addr: String,
826 : pub(crate) listen_pg_port: i32,
827 : }
|