Line data Source code
1 : use std::collections::HashMap;
2 : use std::str::FromStr;
3 :
4 : use camino::Utf8Path;
5 : use camino::Utf8PathBuf;
6 : use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
7 : use diesel::pg::PgConnection;
8 : use diesel::prelude::*;
9 : use diesel::Connection;
10 : use pageserver_api::models::TenantConfig;
11 : use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
12 : use serde::{Deserialize, Serialize};
13 : use utils::generation::Generation;
14 : use utils::id::{NodeId, TenantId};
15 :
16 : use crate::node::Node;
17 : use crate::PlacementPolicy;
18 :
19 : /// ## What do we store?
20 : ///
21 : /// The attachment service does not store most of its state durably.
22 : ///
23 : /// The essential things to store durably are:
24 : /// - generation numbers, as these must always advance monotonically to ensure data safety.
25 : /// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
26 : /// - Node's scheduling policies, as the source of truth for these is something external.
27 : ///
28 : /// Other things we store durably as an implementation detail:
29 : /// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
30 : /// but it is operationally simpler to make this service the authority for which nodes
31 : /// it talks to.
32 : ///
33 : /// ## Performance/efficiency
34 : ///
35 : /// The attachment service does not go via the database for most things: there are
36 : /// a couple of places where we must, and where efficiency matters:
37 : /// - Incrementing generation numbers: the Reconciler has to wait for this to complete
38 : /// before it can attach a tenant, so this acts as a bound on how fast things like
39 : /// failover can happen.
40 : /// - Pageserver re-attach: we will increment many shards' generations when this happens,
41 : /// so it is important to avoid e.g. issuing O(N) queries.
42 : ///
43 : /// Database calls relating to nodes have low performance requirements, as they are very rarely
44 : /// updated, and reads of nodes are always from memory, not the database. We only require that
45 : /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
46 : pub struct Persistence {
47 : database_url: String,
48 :
49 : // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
50 : // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
51 : // compatible just yet.
52 : json_path: Option<Utf8PathBuf>,
53 : }
54 :
55 : /// Legacy format, for use in JSON compat objects in test environment
56 361 : #[derive(Serialize, Deserialize)]
57 : struct JsonPersistence {
58 : tenants: HashMap<TenantShardId, TenantShardPersistence>,
59 : }
60 :
61 0 : #[derive(thiserror::Error, Debug)]
62 : pub(crate) enum DatabaseError {
63 : #[error(transparent)]
64 : Query(#[from] diesel::result::Error),
65 : #[error(transparent)]
66 : Connection(#[from] diesel::result::ConnectionError),
67 : #[error("Logical error: {0}")]
68 : Logical(String),
69 : }
70 :
71 : pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
72 :
73 : impl Persistence {
74 361 : pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
75 361 : Self {
76 361 : database_url,
77 361 : json_path,
78 361 : }
79 361 : }
80 :
81 : /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
82 3262 : async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
83 3262 : where
84 3262 : F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
85 3262 : R: Send + 'static,
86 3262 : {
87 3262 : let database_url = self.database_url.clone();
88 3262 : tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
89 : // TODO: connection pooling, such as via diesel::r2d2
90 3262 : let mut conn = PgConnection::establish(&database_url)?;
91 3262 : func(&mut conn)
92 3262 : })
93 3273 : .await
94 3262 : .expect("Task panic")
95 3262 : }
96 :
97 : /// When a node is first registered, persist it before using it for anything
98 388 : pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
99 388 : let np = node.to_persistent();
100 388 : self.with_conn(move |conn| -> DatabaseResult<()> {
101 388 : diesel::insert_into(crate::schema::nodes::table)
102 388 : .values(&np)
103 388 : .execute(conn)?;
104 388 : Ok(())
105 388 : })
106 388 : .await
107 388 : }
108 :
109 : /// At startup, populate the list of nodes which our shards may be placed on
110 363 : pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<Node>> {
111 363 : let nodes: Vec<Node> = self
112 363 : .with_conn(move |conn| -> DatabaseResult<_> {
113 363 : Ok(crate::schema::nodes::table
114 363 : .load::<NodePersistence>(conn)?
115 363 : .into_iter()
116 363 : .map(|n| Node {
117 13 : id: NodeId(n.node_id as u64),
118 13 : // At startup we consider a node offline until proven otherwise.
119 13 : availability: NodeAvailability::Offline,
120 13 : scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
121 13 : .expect("Bad scheduling policy in DB"),
122 13 : listen_http_addr: n.listen_http_addr,
123 13 : listen_http_port: n.listen_http_port as u16,
124 13 : listen_pg_addr: n.listen_pg_addr,
125 13 : listen_pg_port: n.listen_pg_port as u16,
126 363 : })
127 363 : .collect::<Vec<Node>>())
128 363 : })
129 363 : .await?;
130 :
131 363 : tracing::info!("list_nodes: loaded {} nodes", nodes.len());
132 :
133 363 : Ok(nodes)
134 363 : }
135 :
136 : /// At startup, load the high level state for shards, such as their config + policy. This will
137 : /// be enriched at runtime with state discovered on pageservers.
138 722 : pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
139 722 : let loaded = self
140 722 : .with_conn(move |conn| -> DatabaseResult<_> {
141 722 : Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
142 722 : })
143 722 : .await?;
144 :
145 722 : if loaded.is_empty() {
146 355 : if let Some(path) = &self.json_path {
147 355 : if tokio::fs::try_exists(path)
148 351 : .await
149 355 : .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
150 : {
151 2 : tracing::info!("Importing from legacy JSON format at {path}");
152 4 : return self.list_tenant_shards_json(path).await;
153 353 : }
154 0 : }
155 367 : }
156 720 : Ok(loaded)
157 722 : }
158 :
159 : /// Shim for automated compatibility tests: load tenants from a JSON file instead of database
160 2 : pub(crate) async fn list_tenant_shards_json(
161 2 : &self,
162 2 : path: &Utf8Path,
163 2 : ) -> DatabaseResult<Vec<TenantShardPersistence>> {
164 2 : let bytes = tokio::fs::read(path)
165 2 : .await
166 2 : .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
167 :
168 2 : let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
169 2 : .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
170 4 : for (tenant_id, tenant) in &mut decoded.tenants {
171 : // Backward compat: an old attachments.json from before PR #6251, replace
172 : // empty strings with proper defaults.
173 2 : if tenant.tenant_id.is_empty() {
174 0 : tenant.tenant_id = tenant_id.to_string();
175 0 : tenant.config = serde_json::to_string(&TenantConfig::default())
176 0 : .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
177 0 : tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default())
178 0 : .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
179 2 : }
180 : }
181 :
182 2 : let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
183 2 :
184 2 : // Synchronize database with what is in the JSON file
185 2 : self.insert_tenant_shards(tenants.clone()).await?;
186 :
187 2 : Ok(tenants)
188 2 : }
189 :
190 : /// For use in testing environments, where we dump out JSON on shutdown.
191 361 : pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
192 361 : let Some(path) = &self.json_path else {
193 0 : anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
194 : };
195 361 : tracing::info!("Writing state to {path}...");
196 361 : let tenants = self.list_tenant_shards().await?;
197 361 : let mut tenants_map = HashMap::new();
198 855 : for tsp in tenants {
199 494 : let tenant_shard_id = TenantShardId {
200 494 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
201 494 : shard_number: ShardNumber(tsp.shard_number as u8),
202 494 : shard_count: ShardCount(tsp.shard_count as u8),
203 494 : };
204 494 :
205 494 : tenants_map.insert(tenant_shard_id, tsp);
206 : }
207 361 : let json = serde_json::to_string(&JsonPersistence {
208 361 : tenants: tenants_map,
209 361 : })?;
210 :
211 361 : tokio::fs::write(path, &json).await?;
212 361 : tracing::info!("Wrote {} bytes to {path}...", json.len());
213 :
214 361 : Ok(())
215 361 : }
216 :
217 : /// Tenants must be persisted before we schedule them for the first time. This enables us
218 : /// to correctly retain generation monotonicity, and the externally provided placement policy & config.
219 475 : pub(crate) async fn insert_tenant_shards(
220 475 : &self,
221 475 : shards: Vec<TenantShardPersistence>,
222 475 : ) -> DatabaseResult<()> {
223 475 : use crate::schema::tenant_shards::dsl::*;
224 475 : self.with_conn(move |conn| -> DatabaseResult<()> {
225 475 : conn.transaction(|conn| -> QueryResult<()> {
226 974 : for tenant in &shards {
227 499 : diesel::insert_into(tenant_shards)
228 499 : .values(tenant)
229 499 : .execute(conn)?;
230 : }
231 475 : Ok(())
232 475 : })?;
233 475 : Ok(())
234 475 : })
235 475 : .await
236 475 : }
237 :
238 : /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
239 : /// the tenant from memory on this server.
240 : #[allow(unused)]
241 6 : pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
242 6 : use crate::schema::tenant_shards::dsl::*;
243 6 : self.with_conn(move |conn| -> DatabaseResult<()> {
244 6 : diesel::delete(tenant_shards)
245 6 : .filter(tenant_id.eq(del_tenant_id.to_string()))
246 6 : .execute(conn)?;
247 :
248 6 : Ok(())
249 6 : })
250 6 : .await
251 6 : }
252 :
253 : /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
254 : /// batched increment of the generations of all tenants whose generation_pageserver is equal to
255 : /// the node that called /re-attach.
256 0 : #[tracing::instrument(skip_all, fields(node_id))]
257 : pub(crate) async fn re_attach(
258 : &self,
259 : node_id: NodeId,
260 : ) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
261 : use crate::schema::tenant_shards::dsl::*;
262 : let updated = self
263 603 : .with_conn(move |conn| {
264 603 : let rows_updated = diesel::update(tenant_shards)
265 603 : .filter(generation_pageserver.eq(node_id.0 as i64))
266 603 : .set(generation.eq(generation + 1))
267 603 : .execute(conn)?;
268 :
269 603 : tracing::info!("Incremented {} tenants' generations", rows_updated);
270 :
271 : // TODO: UPDATE+SELECT in one query
272 :
273 603 : let updated = tenant_shards
274 603 : .filter(generation_pageserver.eq(node_id.0 as i64))
275 603 : .select(TenantShardPersistence::as_select())
276 603 : .load(conn)?;
277 603 : Ok(updated)
278 603 : })
279 : .await?;
280 :
281 : let mut result = HashMap::new();
282 : for tsp in updated {
283 : let tenant_shard_id = TenantShardId {
284 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
285 0 : .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
286 : shard_number: ShardNumber(tsp.shard_number as u8),
287 : shard_count: ShardCount(tsp.shard_count as u8),
288 : };
289 : result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
290 : }
291 :
292 : Ok(result)
293 : }
294 :
295 : /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
296 : /// advancing generation number. We also store the NodeId for which the generation was issued, so that in
297 : /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
298 698 : pub(crate) async fn increment_generation(
299 698 : &self,
300 698 : tenant_shard_id: TenantShardId,
301 698 : node_id: NodeId,
302 698 : ) -> anyhow::Result<Generation> {
303 : use crate::schema::tenant_shards::dsl::*;
304 698 : let updated = self
305 698 : .with_conn(move |conn| {
306 698 : let updated = diesel::update(tenant_shards)
307 698 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
308 698 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
309 698 : .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
310 698 : .set((
311 698 : generation.eq(generation + 1),
312 698 : generation_pageserver.eq(node_id.0 as i64),
313 698 : ))
314 698 : // TODO: only returning() the generation column
315 698 : .returning(TenantShardPersistence::as_returning())
316 698 : .get_result(conn)?;
317 :
318 698 : Ok(updated)
319 698 : })
320 709 : .await?;
321 :
322 698 : Ok(Generation::new(updated.generation as u32))
323 698 : }
324 :
325 7 : pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
326 7 : use crate::schema::tenant_shards::dsl::*;
327 7 : self.with_conn(move |conn| {
328 7 : let updated = diesel::update(tenant_shards)
329 7 : .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
330 7 : .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
331 7 : .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
332 7 : .set((
333 7 : generation_pageserver.eq(i64::MAX),
334 7 : placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
335 7 : ))
336 7 : .execute(conn)?;
337 :
338 7 : Ok(updated)
339 7 : })
340 7 : .await?;
341 :
342 7 : Ok(())
343 7 : }
344 :
345 : // TODO: when we start shard splitting, we must durably mark the tenant so that
346 : // on restart, we know that we must go through recovery (list shards that exist
347 : // and pick up where we left off and/or revert to parent shards).
348 : #[allow(dead_code)]
349 0 : pub(crate) async fn begin_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
350 0 : todo!();
351 0 : }
352 :
353 : // TODO: when we finish shard splitting, we must atomically clean up the old shards
354 : // and insert the new shards, and clear the splitting marker.
355 : #[allow(dead_code)]
356 0 : pub(crate) async fn complete_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
357 0 : todo!();
358 0 : }
359 : }
360 :
361 : /// Parts of [`crate::tenant_state::TenantState`] that are stored durably
362 1426 : #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone)]
363 : #[diesel(table_name = crate::schema::tenant_shards)]
364 : pub(crate) struct TenantShardPersistence {
365 : #[serde(default)]
366 : pub(crate) tenant_id: String,
367 : #[serde(default)]
368 : pub(crate) shard_number: i32,
369 : #[serde(default)]
370 : pub(crate) shard_count: i32,
371 : #[serde(default)]
372 : pub(crate) shard_stripe_size: i32,
373 :
374 : // Latest generation number: next time we attach, increment this
375 : // and use the incremented number when attaching
376 : pub(crate) generation: i32,
377 :
378 : // Currently attached pageserver
379 : #[serde(rename = "pageserver")]
380 : pub(crate) generation_pageserver: i64,
381 :
382 : #[serde(default)]
383 : pub(crate) placement_policy: String,
384 : #[serde(default)]
385 : pub(crate) config: String,
386 : }
387 :
388 : /// Parts of [`crate::node::Node`] that are stored durably
389 388 : #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
390 : #[diesel(table_name = crate::schema::nodes)]
391 : pub(crate) struct NodePersistence {
392 : pub(crate) node_id: i64,
393 : pub(crate) scheduling_policy: String,
394 : pub(crate) listen_http_addr: String,
395 : pub(crate) listen_http_port: i32,
396 : pub(crate) listen_pg_addr: String,
397 : pub(crate) listen_pg_port: i32,
398 : }
|