Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 :
6 : use self::split_state::SplitState;
7 : use camino::Utf8Path;
8 : use camino::Utf8PathBuf;
9 : use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
10 : use diesel::pg::PgConnection;
11 : use diesel::prelude::*;
12 : use diesel::Connection;
13 : use pageserver_api::models::TenantConfig;
14 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
15 : use serde::{Deserialize, Serialize};
16 : use utils::generation::Generation;
17 : use utils::id::{NodeId, TenantId};
18 :
19 : use crate::node::Node;
20 : use crate::PlacementPolicy;
21 :
22 : /// ## What do we store?
23 : ///
24 : /// The attachment service does not store most of its state durably.
25 : ///
26 : /// The essential things to store durably are:
27 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
28 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
29 : /// - Node's scheduling policies, as the source of truth for these is something external.
30 : ///
31 : /// Other things we store durably as an implementation detail:
32 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
33 : /// but it is operationally simpler to make this service the authority for which nodes
34 : /// it talks to.
35 : ///
36 : /// ## Performance/efficiency
37 : ///
38 : /// The attachment service does not go via the database for most things: there are
39 : /// a couple of places where we must, and where efficiency matters:
40 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
41 : /// before it can attach a tenant, so this acts as a bound on how fast things like
42 : /// failover can happen.
43 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
44 : /// so it is important to avoid e.g. issuing O(N) queries.
45 : ///
46 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
47 : /// updated, and reads of nodes are always from memory, not the database. We only require that
48 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
49 : pub struct Persistence {
50 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
51 :
52 : // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
53 : // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
54 : // compatible just yet.
55 : json_path: Option<Utf8PathBuf>,
56 : }
57 :
58 : /// Legacy format, for use in JSON compat objects in test environment
59 366 : #[derive(Serialize, Deserialize)]
60 : struct JsonPersistence {
61 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
62 : }
63 :
64 0 : #[derive(thiserror::Error, Debug)]
65 : pub(crate) enum DatabaseError {
66 : #[error(transparent)]
67 : Query(#[from] diesel::result::Error),
68 : #[error(transparent)]
69 : Connection(#[from] diesel::result::ConnectionError),
70 : #[error(transparent)]
71 : ConnectionPool(#[from] r2d2::Error),
72 : #[error("Logical error: {0}")]
73 : Logical(String),
74 : }
75 :
76 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
77 :
78 : impl Persistence {
79 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
80 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
81 : pub const MAX_CONNECTIONS: u32 = 99;
82 :
83 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
84 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
85 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
86 :
87 366 : pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
88 366 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
89 366 :
90 366 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
91 366 : // to execute queries (database queries are not generally on latency-sensitive paths).
92 366 : let connection_pool = diesel::r2d2::Pool::builder()
93 366 : .max_size(Self::MAX_CONNECTIONS)
94 366 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
95 366 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
96 366 : // Always keep at least one connection ready to go
97 366 : .min_idle(Some(1))
98 366 : .test_on_check_out(true)
99 366 : .build(manager)
100 366 : .expect("Could not build connection pool");
101 366 :
102 366 : Self {
103 366 : connection_pool,
104 366 : json_path,
105 366 : }
106 366 : }
107 :
108 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
109 3330 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
110 3330 : where
111 3330 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
112 3330 : R: Send + 'static,
113 3330 : {
114 3330 : let mut conn = self.connection_pool.get()?;
115 3330 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
116 3347 : .await
117 3330 : .expect("Task panic")
118 3330 : }
119 :
120 : /// When a node is first registered, persist it before using it for anything
121 400 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
122 400 : let np = node.to_persistent();
123 400 : self.with_conn(move |conn| -> DatabaseResult<()> {
124 400 : diesel::insert_into(crate::schema::nodes::table)
125 400 : .values(&np)
126 400 : .execute(conn)?;
127 400 : Ok(())
128 400 : })
129 400 : .await
130 400 : }
131 :
132 : /// At startup, populate the list of nodes which our shards may be placed on
133 371 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<Node>> {
134 371 : let nodes: Vec<Node> = self
135 371 : .with_conn(move |conn| -> DatabaseResult<_> {
136 371 : Ok(crate::schema::nodes::table
137 371 : .load::<NodePersistence>(conn)?
138 371 : .into_iter()
139 371 : .map(|n| Node {
140 21 : id: NodeId(n.node_id as u64),
141 21 : // At startup we consider a node offline until proven otherwise.
142 21 : availability: NodeAvailability::Offline,
143 21 : scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
144 21 : .expect("Bad scheduling policy in DB"),
145 21 : listen_http_addr: n.listen_http_addr,
146 21 : listen_http_port: n.listen_http_port as u16,
147 21 : listen_pg_addr: n.listen_pg_addr,
148 21 : listen_pg_port: n.listen_pg_port as u16,
149 371 : })
150 371 : .collect::<Vec<Node>>())
151 371 : })
152 371 : .await?;
153 :
154 371 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
155 :
156 371 : Ok(nodes)
157 371 : }
158 :
159 : /// At startup, load the high level state for shards, such as their config + policy. This will
160 : /// be enriched at runtime with state discovered on pageservers.
161 732 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
162 732 : let loaded = self
163 732 : .with_conn(move |conn| -> DatabaseResult<_> {
164 732 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
165 732 : })
166 732 : .await?;
167 :
168 732 : if loaded.is_empty() {
169 358 : if let Some(path) = &self.json_path {
170 358 : if tokio::fs::try_exists(path)
171 358 : .await
172 358 : .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
173 : {
174 2 : tracing::info!("Importing from legacy JSON format at {path}");
175 4 : return self.list_tenant_shards_json(path).await;
176 356 : }
177 0 : }
178 374 : }
179 730 : Ok(loaded)
180 732 : }
181 :
182 : /// Shim for automated compatibility tests: load tenants from a JSON file instead of database
183 2 : pub(crate) async fn list_tenant_shards_json(
184 2 : &self,
185 2 : path: &Utf8Path,
186 2 : ) -> DatabaseResult<Vec<TenantShardPersistence>> {
187 2 : let bytes = tokio::fs::read(path)
188 2 : .await
189 2 : .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
190 :
191 2 : let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
192 2 : .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
193 4 : for (tenant_id, tenant) in &mut decoded.tenants {
194 : // Backward compat: an old attachments.json from before PR #6251, replace
195 : // empty strings with proper defaults.
196 2 : if tenant.tenant_id.is_empty() {
197 0 : tenant.tenant_id = tenant_id.to_string();
198 0 : tenant.config = serde_json::to_string(&TenantConfig::default())
199 0 : .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
200 0 : tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default())
201 0 : .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
202 2 : }
203 : }
204 :
205 2 : let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
206 2 :
207 2 : // Synchronize database with what is in the JSON file
208 2 : self.insert_tenant_shards(tenants.clone()).await?;
209 :
210 2 : Ok(tenants)
211 2 : }
212 :
213 : /// For use in testing environments, where we dump out JSON on shutdown.
214 366 : pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
215 366 : let Some(path) = &self.json_path else {
216 0 : anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
217 : };
218 366 : tracing::info!("Writing state to {path}...");
219 366 : let tenants = self.list_tenant_shards().await?;
220 366 : let mut tenants_map = HashMap::new();
221 874 : for tsp in tenants {
222 508 : let tenant_shard_id = TenantShardId {
223 508 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
224 508 : shard_number: ShardNumber(tsp.shard_number as u8),
225 508 : shard_count: ShardCount(tsp.shard_count as u8),
226 508 : };
227 508 :
228 508 : tenants_map.insert(tenant_shard_id, tsp);
229 : }
230 366 : let json = serde_json::to_string(&JsonPersistence {
231 366 : tenants: tenants_map,
232 366 : })?;
233 :
234 366 : tokio::fs::write(path, &json).await?;
235 366 : tracing::info!("Wrote {} bytes to {path}...", json.len());
236 :
237 366 : Ok(())
238 366 : }
239 :
240 : /// Tenants must be persisted before we schedule them for the first time. This enables us
241 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
242 481 : pub(crate) async fn insert_tenant_shards(
243 481 : &self,
244 481 : shards: Vec<TenantShardPersistence>,
245 481 : ) -> DatabaseResult<()> {
246 481 : use crate::schema::tenant_shards::dsl::*;
247 481 : self.with_conn(move |conn| -> DatabaseResult<()> {
248 481 : conn.transaction(|conn| -> QueryResult<()> {
249 988 : for tenant in &shards {
250 509 : diesel::insert_into(tenant_shards)
251 509 : .values(tenant)
252 509 : .execute(conn)?;
253 : }
254 479 : Ok(())
255 481 : })?;
256 479 : Ok(())
257 481 : })
258 482 : .await
259 481 : }
260 :
261 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
262 : /// the tenant from memory on this server.
263 7 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
264 7 : use crate::schema::tenant_shards::dsl::*;
265 7 : self.with_conn(move |conn| -> DatabaseResult<()> {
266 7 : diesel::delete(tenant_shards)
267 7 : .filter(tenant_id.eq(del_tenant_id.to_string()))
268 7 : .execute(conn)?;
269 :
270 7 : Ok(())
271 7 : })
272 7 : .await
273 7 : }
274 :
275 1 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
276 1 : use crate::schema::nodes::dsl::*;
277 1 : self.with_conn(move |conn| -> DatabaseResult<()> {
278 1 : diesel::delete(nodes)
279 1 : .filter(node_id.eq(del_node_id.0 as i64))
280 1 : .execute(conn)?;
281 :
282 1 : Ok(())
283 1 : })
284 1 : .await
285 1 : }
286 :
287 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
288 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
289 : /// the node that called /re-attach.
290 1248 : #[tracing::instrument(skip_all, fields(node_id))]
291 : pub(crate) async fn re_attach(
292 : &self,
293 : node_id: NodeId,
294 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
295 : use crate::schema::tenant_shards::dsl::*;
296 : let updated = self
297 624 : .with_conn(move |conn| {
298 624 : let rows_updated = diesel::update(tenant_shards)
299 624 : .filter(generation_pageserver.eq(node_id.0 as i64))
300 624 : .set(generation.eq(generation + 1))
301 624 : .execute(conn)?;
302 :
303 624 : tracing::info!("Incremented {} tenants' generations", rows_updated);
304 :
305 : // TODO: UPDATE+SELECT in one query
306 :
307 624 : let updated = tenant_shards
308 624 : .filter(generation_pageserver.eq(node_id.0 as i64))
309 624 : .select(TenantShardPersistence::as_select())
310 624 : .load(conn)?;
311 624 : Ok(updated)
312 624 : })
313 : .await?;
314 :
315 : let mut result = HashMap::new();
316 : for tsp in updated {
317 : let tenant_shard_id = TenantShardId {
318 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
319 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
320 : shard_number: ShardNumber(tsp.shard_number as u8),
321 : shard_count: ShardCount(tsp.shard_count as u8),
322 : };
323 : result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
324 : }
325 :
326 : Ok(result)
327 : }
328 :
329 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
330 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
331 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
332 703 : pub(crate) async fn increment_generation(
333 703 : &self,
334 703 : tenant_shard_id: TenantShardId,
335 703 : node_id: NodeId,
336 703 : ) -> anyhow::Result<Generation> {
337 : use crate::schema::tenant_shards::dsl::*;
338 703 : let updated = self
339 703 : .with_conn(move |conn| {
340 703 : let updated = diesel::update(tenant_shards)
341 703 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
342 703 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
343 703 : .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
344 703 : .set((
345 703 : generation.eq(generation + 1),
346 703 : generation_pageserver.eq(node_id.0 as i64),
347 703 : ))
348 703 : // TODO: only returning() the generation column
349 703 : .returning(TenantShardPersistence::as_returning())
350 703 : .get_result(conn)?;
351 :
352 703 : Ok(updated)
353 703 : })
354 719 : .await?;
355 :
356 703 : Ok(Generation::new(updated.generation as u32))
357 703 : }
358 :
359 7 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
360 7 : use crate::schema::tenant_shards::dsl::*;
361 7 : self.with_conn(move |conn| {
362 7 : let updated = diesel::update(tenant_shards)
363 7 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
364 7 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
365 7 : .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
366 7 : .set((
367 7 : generation_pageserver.eq(i64::MAX),
368 7 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
369 7 : ))
370 7 : .execute(conn)?;
371 :
372 7 : Ok(updated)
373 7 : })
374 7 : .await?;
375 :
376 7 : Ok(())
377 7 : }
378 :
379 : // When we start shard splitting, we must durably mark the tenant so that
380 : // on restart, we know that we must go through recovery.
381 : //
382 : // We create the child shards here, so that they will be available for increment_generation calls
383 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
384 : #[allow(dead_code)]
385 2 : pub(crate) async fn begin_shard_split(
386 2 : &self,
387 2 : old_shard_count: ShardCount,
388 2 : split_tenant_id: TenantId,
389 2 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
390 2 : ) -> DatabaseResult<()> {
391 2 : use crate::schema::tenant_shards::dsl::*;
392 2 : self.with_conn(move |conn| -> DatabaseResult<()> {
393 2 : conn.transaction(|conn| -> DatabaseResult<()> {
394 2 : // Mark parent shards as splitting
395 2 :
396 2 : let expect_parent_records = std::cmp::max(1, old_shard_count.0);
397 :
398 2 : let updated = diesel::update(tenant_shards)
399 2 : .filter(tenant_id.eq(split_tenant_id.to_string()))
400 2 : .filter(shard_count.eq(old_shard_count.0 as i32))
401 2 : .set((splitting.eq(1),))
402 2 : .execute(conn)?;
403 2 : if u8::try_from(updated)
404 2 : .map_err(|_| DatabaseError::Logical(
405 2 : format!("Overflow existing shard count {} while splitting", updated))
406 2 : )? != expect_parent_records {
407 : // Perhaps a deletion or another split raced with this attempt to split, mutating
408 : // the parent shards that we intend to split. In this case the split request should fail.
409 0 : return Err(DatabaseError::Logical(
410 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {expect_parent_records})")
411 0 : ));
412 2 : }
413 2 :
414 2 : // FIXME: spurious clone to sidestep closure move rules
415 2 : let parent_to_children = parent_to_children.clone();
416 :
417 : // Insert child shards
418 7 : for (parent_shard_id, children) in parent_to_children {
419 5 : let mut parent = crate::schema::tenant_shards::table
420 5 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
421 5 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
422 5 : .filter(shard_count.eq(parent_shard_id.shard_count.0 as i32))
423 5 : .load::<TenantShardPersistence>(conn)?;
424 5 : let parent = if parent.len() != 1 {
425 0 : return Err(DatabaseError::Logical(format!(
426 0 : "Parent shard {parent_shard_id} not found"
427 0 : )));
428 : } else {
429 5 : parent.pop().unwrap()
430 : };
431 15 : for mut shard in children {
432 : // Carry the parent's generation into the child
433 10 : shard.generation = parent.generation;
434 :
435 10 : debug_assert!(shard.splitting == SplitState::Splitting);
436 10 : diesel::insert_into(tenant_shards)
437 10 : .values(shard)
438 10 : .execute(conn)?;
439 : }
440 : }
441 :
442 2 : Ok(())
443 2 : })?;
444 :
445 2 : Ok(())
446 2 : })
447 2 : .await
448 2 : }
449 :
450 : // When we finish shard splitting, we must atomically clean up the old shards
451 : // and insert the new shards, and clear the splitting marker.
452 : #[allow(dead_code)]
453 2 : pub(crate) async fn complete_shard_split(
454 2 : &self,
455 2 : split_tenant_id: TenantId,
456 2 : old_shard_count: ShardCount,
457 2 : ) -> DatabaseResult<()> {
458 2 : use crate::schema::tenant_shards::dsl::*;
459 2 : self.with_conn(move |conn| -> DatabaseResult<()> {
460 2 : conn.transaction(|conn| -> QueryResult<()> {
461 2 : // Drop parent shards
462 2 : diesel::delete(tenant_shards)
463 2 : .filter(tenant_id.eq(split_tenant_id.to_string()))
464 2 : .filter(shard_count.eq(old_shard_count.0 as i32))
465 2 : .execute(conn)?;
466 :
467 : // Clear sharding flag
468 2 : let updated = diesel::update(tenant_shards)
469 2 : .filter(tenant_id.eq(split_tenant_id.to_string()))
470 2 : .set((splitting.eq(0),))
471 2 : .execute(conn)?;
472 2 : debug_assert!(updated > 0);
473 :
474 2 : Ok(())
475 2 : })?;
476 :
477 2 : Ok(())
478 2 : })
479 2 : .await
480 2 : }
481 : }
482 :
483 : /// Parts of [`crate::tenant_state::TenantState`] that are stored durably
484 1465 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone)]
485 : #[diesel(table_name = crate::schema::tenant_shards)]
486 : pub(crate) struct TenantShardPersistence {
487 : #[serde(default)]
488 : pub(crate) tenant_id: String,
489 : #[serde(default)]
490 : pub(crate) shard_number: i32,
491 : #[serde(default)]
492 : pub(crate) shard_count: i32,
493 : #[serde(default)]
494 : pub(crate) shard_stripe_size: i32,
495 :
496 : // Latest generation number: next time we attach, increment this
497 : // and use the incremented number when attaching
498 : pub(crate) generation: i32,
499 :
500 : // Currently attached pageserver
501 : #[serde(rename = "pageserver")]
502 : pub(crate) generation_pageserver: i64,
503 :
504 : #[serde(default)]
505 : pub(crate) placement_policy: String,
506 : #[serde(default)]
507 : pub(crate) splitting: SplitState,
508 : #[serde(default)]
509 : pub(crate) config: String,
510 : }
511 :
512 : /// Parts of [`crate::node::Node`] that are stored durably
513 400 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
514 : #[diesel(table_name = crate::schema::nodes)]
515 : pub(crate) struct NodePersistence {
516 : pub(crate) node_id: i64,
517 : pub(crate) scheduling_policy: String,
518 : pub(crate) listen_http_addr: String,
519 : pub(crate) listen_http_port: i32,
520 : pub(crate) listen_pg_addr: String,
521 : pub(crate) listen_pg_port: i32,
522 : }
|