Line data Source code
1 : pub(crate) mod split_state;
2 : use std::collections::HashMap;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 :
6 : use self::split_state::SplitState;
7 : use camino::Utf8Path;
8 : use camino::Utf8PathBuf;
9 : use diesel::pg::PgConnection;
10 : use diesel::prelude::*;
11 : use diesel::Connection;
12 : use pageserver_api::controller_api::NodeSchedulingPolicy;
13 : use pageserver_api::models::TenantConfig;
14 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
15 : use serde::{Deserialize, Serialize};
16 : use utils::generation::Generation;
17 : use utils::id::{NodeId, TenantId};
18 :
19 : use crate::node::Node;
20 : use crate::PlacementPolicy;
21 :
22 : /// ## What do we store?
23 : ///
24 : /// The attachment service does not store most of its state durably.
25 : ///
26 : /// The essential things to store durably are:
27 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
28 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
29 : /// - Node's scheduling policies, as the source of truth for these is something external.
30 : ///
31 : /// Other things we store durably as an implementation detail:
32 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
33 : /// but it is operationally simpler to make this service the authority for which nodes
34 : /// it talks to.
35 : ///
36 : /// ## Performance/efficiency
37 : ///
38 : /// The attachment service does not go via the database for most things: there are
39 : /// a couple of places where we must, and where efficiency matters:
40 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
41 : /// before it can attach a tenant, so this acts as a bound on how fast things like
42 : /// failover can happen.
43 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
44 : /// so it is important to avoid e.g. issuing O(N) queries.
45 : ///
46 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
47 : /// updated, and reads of nodes are always from memory, not the database. We only require that
48 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
49 : pub struct Persistence {
50 : connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
51 :
52 : // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
53 : // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
54 : // compatible just yet.
55 : json_path: Option<Utf8PathBuf>,
56 : }
57 :
58 : /// Legacy format, for use in JSON compat objects in test environment
59 0 : #[derive(Serialize, Deserialize)]
60 : struct JsonPersistence {
61 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
62 : }
63 :
64 0 : #[derive(thiserror::Error, Debug)]
65 : pub(crate) enum DatabaseError {
66 : #[error(transparent)]
67 : Query(#[from] diesel::result::Error),
68 : #[error(transparent)]
69 : Connection(#[from] diesel::result::ConnectionError),
70 : #[error(transparent)]
71 : ConnectionPool(#[from] r2d2::Error),
72 : #[error("Logical error: {0}")]
73 : Logical(String),
74 : }
75 :
76 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
77 :
78 : impl Persistence {
79 : // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
80 : // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
81 : pub const MAX_CONNECTIONS: u32 = 99;
82 :
83 : // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
84 : const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
85 : const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
86 :
87 0 : pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
88 0 : let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
89 0 :
90 0 : // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
91 0 : // to execute queries (database queries are not generally on latency-sensitive paths).
92 0 : let connection_pool = diesel::r2d2::Pool::builder()
93 0 : .max_size(Self::MAX_CONNECTIONS)
94 0 : .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
95 0 : .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
96 0 : // Always keep at least one connection ready to go
97 0 : .min_idle(Some(1))
98 0 : .test_on_check_out(true)
99 0 : .build(manager)
100 0 : .expect("Could not build connection pool");
101 0 :
102 0 : Self {
103 0 : connection_pool,
104 0 : json_path,
105 0 : }
106 0 : }
107 :
108 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
109 0 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
110 0 : where
111 0 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
112 0 : R: Send + 'static,
113 0 : {
114 0 : let mut conn = self.connection_pool.get()?;
115 0 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
116 0 : .await
117 0 : .expect("Task panic")
118 0 : }
119 :
120 : /// When a node is first registered, persist it before using it for anything
121 0 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
122 0 : let np = node.to_persistent();
123 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
124 0 : diesel::insert_into(crate::schema::nodes::table)
125 0 : .values(&np)
126 0 : .execute(conn)?;
127 0 : Ok(())
128 0 : })
129 0 : .await
130 0 : }
131 :
132 : /// At startup, populate the list of nodes which our shards may be placed on
133 0 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
134 0 : let nodes: Vec<NodePersistence> = self
135 0 : .with_conn(move |conn| -> DatabaseResult<_> {
136 0 : Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
137 0 : })
138 0 : .await?;
139 :
140 0 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
141 :
142 0 : Ok(nodes)
143 0 : }
144 :
145 0 : pub(crate) async fn update_node(
146 0 : &self,
147 0 : input_node_id: NodeId,
148 0 : input_scheduling: NodeSchedulingPolicy,
149 0 : ) -> DatabaseResult<()> {
150 : use crate::schema::nodes::dsl::*;
151 0 : let updated = self
152 0 : .with_conn(move |conn| {
153 0 : let updated = diesel::update(nodes)
154 0 : .filter(node_id.eq(input_node_id.0 as i64))
155 0 : .set((scheduling_policy.eq(String::from(input_scheduling)),))
156 0 : .execute(conn)?;
157 0 : Ok(updated)
158 0 : })
159 0 : .await?;
160 :
161 0 : if updated != 1 {
162 0 : Err(DatabaseError::Logical(format!(
163 0 : "Node {node_id:?} not found for update",
164 0 : )))
165 : } else {
166 0 : Ok(())
167 : }
168 0 : }
169 :
170 : /// At startup, load the high level state for shards, such as their config + policy. This will
171 : /// be enriched at runtime with state discovered on pageservers.
172 0 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
173 0 : let loaded = self
174 0 : .with_conn(move |conn| -> DatabaseResult<_> {
175 0 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
176 0 : })
177 0 : .await?;
178 :
179 0 : if loaded.is_empty() {
180 0 : if let Some(path) = &self.json_path {
181 0 : if tokio::fs::try_exists(path)
182 0 : .await
183 0 : .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
184 : {
185 0 : tracing::info!("Importing from legacy JSON format at {path}");
186 0 : return self.list_tenant_shards_json(path).await;
187 0 : }
188 0 : }
189 0 : }
190 0 : Ok(loaded)
191 0 : }
192 :
193 : /// Shim for automated compatibility tests: load tenants from a JSON file instead of database
194 0 : pub(crate) async fn list_tenant_shards_json(
195 0 : &self,
196 0 : path: &Utf8Path,
197 0 : ) -> DatabaseResult<Vec<TenantShardPersistence>> {
198 0 : let bytes = tokio::fs::read(path)
199 0 : .await
200 0 : .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
201 :
202 0 : let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
203 0 : .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
204 0 : for (tenant_id, tenant) in &mut decoded.tenants {
205 : // Backward compat: an old attachments.json from before PR #6251, replace
206 : // empty strings with proper defaults.
207 0 : if tenant.tenant_id.is_empty() {
208 0 : tenant.tenant_id = tenant_id.to_string();
209 0 : tenant.config = serde_json::to_string(&TenantConfig::default())
210 0 : .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
211 0 : tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default())
212 0 : .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
213 0 : }
214 : }
215 :
216 0 : let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
217 0 :
218 0 : // Synchronize database with what is in the JSON file
219 0 : self.insert_tenant_shards(tenants.clone()).await?;
220 :
221 0 : Ok(tenants)
222 0 : }
223 :
224 : /// For use in testing environments, where we dump out JSON on shutdown.
225 0 : pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
226 0 : let Some(path) = &self.json_path else {
227 0 : anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
228 : };
229 0 : tracing::info!("Writing state to {path}...");
230 0 : let tenants = self.list_tenant_shards().await?;
231 0 : let mut tenants_map = HashMap::new();
232 0 : for tsp in tenants {
233 0 : let tenant_shard_id = TenantShardId {
234 0 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
235 0 : shard_number: ShardNumber(tsp.shard_number as u8),
236 0 : shard_count: ShardCount::new(tsp.shard_count as u8),
237 0 : };
238 0 :
239 0 : tenants_map.insert(tenant_shard_id, tsp);
240 : }
241 0 : let json = serde_json::to_string(&JsonPersistence {
242 0 : tenants: tenants_map,
243 0 : })?;
244 :
245 0 : tokio::fs::write(path, &json).await?;
246 0 : tracing::info!("Wrote {} bytes to {path}...", json.len());
247 :
248 0 : Ok(())
249 0 : }
250 :
251 : /// Tenants must be persisted before we schedule them for the first time. This enables us
252 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
253 0 : pub(crate) async fn insert_tenant_shards(
254 0 : &self,
255 0 : shards: Vec<TenantShardPersistence>,
256 0 : ) -> DatabaseResult<()> {
257 0 : use crate::schema::tenant_shards::dsl::*;
258 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
259 0 : conn.transaction(|conn| -> QueryResult<()> {
260 0 : for tenant in &shards {
261 0 : diesel::insert_into(tenant_shards)
262 0 : .values(tenant)
263 0 : .execute(conn)?;
264 : }
265 0 : Ok(())
266 0 : })?;
267 0 : Ok(())
268 0 : })
269 0 : .await
270 0 : }
271 :
272 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
273 : /// the tenant from memory on this server.
274 0 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
275 0 : use crate::schema::tenant_shards::dsl::*;
276 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
277 0 : diesel::delete(tenant_shards)
278 0 : .filter(tenant_id.eq(del_tenant_id.to_string()))
279 0 : .execute(conn)?;
280 :
281 0 : Ok(())
282 0 : })
283 0 : .await
284 0 : }
285 :
286 0 : pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
287 0 : use crate::schema::nodes::dsl::*;
288 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
289 0 : diesel::delete(nodes)
290 0 : .filter(node_id.eq(del_node_id.0 as i64))
291 0 : .execute(conn)?;
292 :
293 0 : Ok(())
294 0 : })
295 0 : .await
296 0 : }
297 :
298 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
299 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
300 : /// the node that called /re-attach.
301 0 : #[tracing::instrument(skip_all, fields(node_id))]
302 : pub(crate) async fn re_attach(
303 : &self,
304 : node_id: NodeId,
305 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
306 : use crate::schema::tenant_shards::dsl::*;
307 : let updated = self
308 0 : .with_conn(move |conn| {
309 0 : let rows_updated = diesel::update(tenant_shards)
310 0 : .filter(generation_pageserver.eq(node_id.0 as i64))
311 0 : .set(generation.eq(generation + 1))
312 0 : .execute(conn)?;
313 :
314 0 : tracing::info!("Incremented {} tenants' generations", rows_updated);
315 :
316 : // TODO: UPDATE+SELECT in one query
317 :
318 0 : let updated = tenant_shards
319 0 : .filter(generation_pageserver.eq(node_id.0 as i64))
320 0 : .select(TenantShardPersistence::as_select())
321 0 : .load(conn)?;
322 0 : Ok(updated)
323 0 : })
324 : .await?;
325 :
326 : let mut result = HashMap::new();
327 : for tsp in updated {
328 : let tenant_shard_id = TenantShardId {
329 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
330 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
331 : shard_number: ShardNumber(tsp.shard_number as u8),
332 : shard_count: ShardCount::new(tsp.shard_count as u8),
333 : };
334 : result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
335 : }
336 :
337 : Ok(result)
338 : }
339 :
340 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
341 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
342 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
343 0 : pub(crate) async fn increment_generation(
344 0 : &self,
345 0 : tenant_shard_id: TenantShardId,
346 0 : node_id: NodeId,
347 0 : ) -> anyhow::Result<Generation> {
348 : use crate::schema::tenant_shards::dsl::*;
349 0 : let updated = self
350 0 : .with_conn(move |conn| {
351 0 : let updated = diesel::update(tenant_shards)
352 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
353 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
354 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
355 0 : .set((
356 0 : generation.eq(generation + 1),
357 0 : generation_pageserver.eq(node_id.0 as i64),
358 0 : ))
359 0 : // TODO: only returning() the generation column
360 0 : .returning(TenantShardPersistence::as_returning())
361 0 : .get_result(conn)?;
362 :
363 0 : Ok(updated)
364 0 : })
365 0 : .await?;
366 :
367 0 : Ok(Generation::new(updated.generation as u32))
368 0 : }
369 :
370 0 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
371 0 : use crate::schema::tenant_shards::dsl::*;
372 0 : self.with_conn(move |conn| {
373 0 : let updated = diesel::update(tenant_shards)
374 0 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
375 0 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
376 0 : .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
377 0 : .set((
378 0 : generation_pageserver.eq(i64::MAX),
379 0 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
380 0 : ))
381 0 : .execute(conn)?;
382 :
383 0 : Ok(updated)
384 0 : })
385 0 : .await?;
386 :
387 0 : Ok(())
388 0 : }
389 :
390 : // When we start shard splitting, we must durably mark the tenant so that
391 : // on restart, we know that we must go through recovery.
392 : //
393 : // We create the child shards here, so that they will be available for increment_generation calls
394 : // if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
395 0 : pub(crate) async fn begin_shard_split(
396 0 : &self,
397 0 : old_shard_count: ShardCount,
398 0 : split_tenant_id: TenantId,
399 0 : parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
400 0 : ) -> DatabaseResult<()> {
401 0 : use crate::schema::tenant_shards::dsl::*;
402 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
403 0 : conn.transaction(|conn| -> DatabaseResult<()> {
404 : // Mark parent shards as splitting
405 :
406 0 : let updated = diesel::update(tenant_shards)
407 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
408 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
409 0 : .set((splitting.eq(1),))
410 0 : .execute(conn)?;
411 0 : if u8::try_from(updated)
412 0 : .map_err(|_| DatabaseError::Logical(
413 0 : format!("Overflow existing shard count {} while splitting", updated))
414 0 : )? != old_shard_count.count() {
415 : // Perhaps a deletion or another split raced with this attempt to split, mutating
416 : // the parent shards that we intend to split. In this case the split request should fail.
417 0 : return Err(DatabaseError::Logical(
418 0 : format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
419 0 : ));
420 0 : }
421 0 :
422 0 : // FIXME: spurious clone to sidestep closure move rules
423 0 : let parent_to_children = parent_to_children.clone();
424 :
425 : // Insert child shards
426 0 : for (parent_shard_id, children) in parent_to_children {
427 0 : let mut parent = crate::schema::tenant_shards::table
428 0 : .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
429 0 : .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
430 0 : .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
431 0 : .load::<TenantShardPersistence>(conn)?;
432 0 : let parent = if parent.len() != 1 {
433 0 : return Err(DatabaseError::Logical(format!(
434 0 : "Parent shard {parent_shard_id} not found"
435 0 : )));
436 : } else {
437 0 : parent.pop().unwrap()
438 : };
439 0 : for mut shard in children {
440 : // Carry the parent's generation into the child
441 0 : shard.generation = parent.generation;
442 :
443 0 : debug_assert!(shard.splitting == SplitState::Splitting);
444 0 : diesel::insert_into(tenant_shards)
445 0 : .values(shard)
446 0 : .execute(conn)?;
447 : }
448 : }
449 :
450 0 : Ok(())
451 0 : })?;
452 :
453 0 : Ok(())
454 0 : })
455 0 : .await
456 0 : }
457 :
458 : // When we finish shard splitting, we must atomically clean up the old shards
459 : // and insert the new shards, and clear the splitting marker.
460 0 : pub(crate) async fn complete_shard_split(
461 0 : &self,
462 0 : split_tenant_id: TenantId,
463 0 : old_shard_count: ShardCount,
464 0 : ) -> DatabaseResult<()> {
465 0 : use crate::schema::tenant_shards::dsl::*;
466 0 : self.with_conn(move |conn| -> DatabaseResult<()> {
467 0 : conn.transaction(|conn| -> QueryResult<()> {
468 0 : // Drop parent shards
469 0 : diesel::delete(tenant_shards)
470 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
471 0 : .filter(shard_count.eq(old_shard_count.literal() as i32))
472 0 : .execute(conn)?;
473 :
474 : // Clear sharding flag
475 0 : let updated = diesel::update(tenant_shards)
476 0 : .filter(tenant_id.eq(split_tenant_id.to_string()))
477 0 : .set((splitting.eq(0),))
478 0 : .execute(conn)?;
479 0 : debug_assert!(updated > 0);
480 :
481 0 : Ok(())
482 0 : })?;
483 :
484 0 : Ok(())
485 0 : })
486 0 : .await
487 0 : }
488 : }
489 :
490 : /// Parts of [`crate::tenant_state::TenantState`] that are stored durably
491 0 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
492 : #[diesel(table_name = crate::schema::tenant_shards)]
493 : pub(crate) struct TenantShardPersistence {
494 : #[serde(default)]
495 : pub(crate) tenant_id: String,
496 : #[serde(default)]
497 : pub(crate) shard_number: i32,
498 : #[serde(default)]
499 : pub(crate) shard_count: i32,
500 : #[serde(default)]
501 : pub(crate) shard_stripe_size: i32,
502 :
503 : // Latest generation number: next time we attach, increment this
504 : // and use the incremented number when attaching
505 : pub(crate) generation: i32,
506 :
507 : // Currently attached pageserver
508 : #[serde(rename = "pageserver")]
509 : pub(crate) generation_pageserver: i64,
510 :
511 : #[serde(default)]
512 : pub(crate) placement_policy: String,
513 : #[serde(default)]
514 : pub(crate) splitting: SplitState,
515 : #[serde(default)]
516 : pub(crate) config: String,
517 : }
518 :
519 : /// Parts of [`crate::node::Node`] that are stored durably
520 0 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
521 : #[diesel(table_name = crate::schema::nodes)]
522 : pub(crate) struct NodePersistence {
523 : pub(crate) node_id: i64,
524 : pub(crate) scheduling_policy: String,
525 : pub(crate) listen_http_addr: String,
526 : pub(crate) listen_http_port: i32,
527 : pub(crate) listen_pg_addr: String,
528 : pub(crate) listen_pg_port: i32,
529 : }
|