Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use super::safekeeper_reconciler::ScheduleRequest;
6 : use crate::heartbeater::SafekeeperState;
7 : use crate::persistence::{
8 : DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
9 : };
10 : use crate::safekeeper::Safekeeper;
11 : use http_utils::error::ApiError;
12 : use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
13 : use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
14 : use safekeeper_api::membership::{MemberSet, SafekeeperId};
15 : use tokio::task::JoinSet;
16 : use tokio_util::sync::CancellationToken;
17 : use utils::id::{NodeId, TenantId, TimelineId};
18 : use utils::logging::SecretString;
19 :
20 : use super::Service;
21 :
22 : impl Service {
23 : /// Timeline creation on safekeepers
24 : ///
25 : /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
26 : /// where `left` contains the list of safekeepers that didn't have a successful response.
27 : /// Assumes tenant lock is held while calling this function.
28 0 : pub(super) async fn tenant_timeline_create_safekeepers_quorum(
29 0 : &self,
30 0 : tenant_id: TenantId,
31 0 : timeline_id: TimelineId,
32 0 : pg_version: u32,
33 0 : timeline_persistence: &TimelinePersistence,
34 0 : ) -> Result<Vec<NodeId>, ApiError> {
35 0 : // If quorum is reached, return if we are outside of a specified timeout
36 0 : let jwt = self
37 0 : .config
38 0 : .safekeeper_jwt_token
39 0 : .clone()
40 0 : .map(SecretString::from);
41 0 : let mut joinset = JoinSet::new();
42 0 :
43 0 : let safekeepers = {
44 0 : let locked = self.inner.read().unwrap();
45 0 : locked.safekeepers.clone()
46 0 : };
47 0 :
48 0 : let mut members = Vec::new();
49 0 : for sk_id in timeline_persistence.sk_set.iter() {
50 0 : let sk_id = NodeId(*sk_id as u64);
51 0 : let Some(safekeeper) = safekeepers.get(&sk_id) else {
52 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
53 0 : "couldn't find entry for safekeeper with id {sk_id}"
54 0 : )))?;
55 : };
56 0 : members.push(SafekeeperId {
57 0 : id: sk_id,
58 0 : host: safekeeper.skp.host.clone(),
59 0 : pg_port: safekeeper.skp.port as u16,
60 0 : });
61 : }
62 0 : let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
63 0 : let mconf = safekeeper_api::membership::Configuration::new(mset);
64 0 :
65 0 : let req = safekeeper_api::models::TimelineCreateRequest {
66 0 : commit_lsn: None,
67 0 : mconf,
68 0 : pg_version,
69 0 : start_lsn: timeline_persistence.start_lsn.0,
70 0 : system_id: None,
71 0 : tenant_id,
72 0 : timeline_id,
73 0 : wal_seg_size: None,
74 0 : };
75 : const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
76 0 : for sk in timeline_persistence.sk_set.iter() {
77 0 : let sk_id = NodeId(*sk as u64);
78 0 : let safekeepers = safekeepers.clone();
79 0 : let jwt = jwt.clone();
80 0 : let ssl_ca_cert = self.config.ssl_ca_cert.clone();
81 0 : let req = req.clone();
82 0 : joinset.spawn(async move {
83 0 : // Unwrap is fine as we already would have returned error above
84 0 : let sk_p = safekeepers.get(&sk_id).unwrap();
85 0 : let res = sk_p
86 0 : .with_client_retries(
87 0 : |client| {
88 0 : let req = req.clone();
89 0 : async move { client.create_timeline(&req).await }
90 0 : },
91 0 : &jwt,
92 0 : &ssl_ca_cert,
93 0 : 3,
94 0 : 3,
95 0 : SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
96 0 : &CancellationToken::new(),
97 0 : )
98 0 : .await;
99 0 : (sk_id, sk_p.skp.host.clone(), res)
100 0 : });
101 0 : }
102 : // After we have built the joinset, we now wait for the tasks to complete,
103 : // but with a specified timeout to make sure we return swiftly, either with
104 : // a failure or success.
105 0 : let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
106 0 :
107 0 : // Wait until all tasks finish or timeout is hit, whichever occurs
108 0 : // first.
109 0 : let mut reconcile_results = Vec::new();
110 : loop {
111 0 : if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
112 : {
113 0 : let Some(res) = res else { break };
114 0 : match res {
115 0 : Ok(res) => {
116 0 : tracing::info!(
117 0 : "response from safekeeper id:{} at {}: {:?}",
118 : res.0,
119 : res.1,
120 : res.2
121 : );
122 0 : reconcile_results.push(res);
123 : }
124 0 : Err(join_err) => {
125 0 : tracing::info!("join_err for task in joinset: {join_err}");
126 : }
127 : }
128 : } else {
129 0 : tracing::info!(
130 0 : "timeout for creation call after {} responses",
131 0 : reconcile_results.len()
132 : );
133 0 : break;
134 : }
135 : }
136 :
137 : // Now check now if quorum was reached in reconcile_results.
138 0 : let total_result_count = reconcile_results.len();
139 0 : let remaining = reconcile_results
140 0 : .into_iter()
141 0 : .filter_map(|res| res.2.is_err().then_some(res.0))
142 0 : .collect::<Vec<_>>();
143 0 : tracing::info!(
144 0 : "Got {} non-successful responses from initial creation request of total {total_result_count} responses",
145 0 : remaining.len()
146 : );
147 0 : if remaining.len() >= 2 {
148 : // Failure
149 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
150 0 : "not enough successful reconciliations to reach quorum, please retry: {} errored",
151 0 : remaining.len()
152 0 : )));
153 0 : }
154 0 :
155 0 : Ok(remaining)
156 0 : }
157 :
158 : /// Create timeline in controller database and on safekeepers.
159 : /// `timeline_info` is result of timeline creation on pageserver.
160 : ///
161 : /// All actions must be idempotent as the call is retried until success. It
162 : /// tries to create timeline in the db and on at least majority of
163 : /// safekeepers + queue creation for safekeepers which missed it in the db
164 : /// for infinite retries; after that, call returns Ok.
165 : ///
166 : /// The idea is that once this is reached as long as we have alive majority
167 : /// of safekeepers it is expected to get eventually operational as storcon
168 : /// will be able to seed timeline on nodes which missed creation by making
169 : /// pull_timeline from peers. On the other hand we don't want to fail
170 : /// timeline creation if one safekeeper is down.
171 0 : pub(super) async fn tenant_timeline_create_safekeepers(
172 0 : self: &Arc<Self>,
173 0 : tenant_id: TenantId,
174 0 : timeline_info: &TimelineInfo,
175 0 : create_mode: models::TimelineCreateRequestMode,
176 0 : ) -> Result<SafekeepersInfo, ApiError> {
177 0 : let timeline_id = timeline_info.timeline_id;
178 0 : let pg_version = timeline_info.pg_version * 10000;
179 : // Initially start_lsn is determined by last_record_lsn in pageserver
180 : // response as it does initdb. However, later we persist it and in sk
181 : // creation calls replace with the value from the timeline row if it
182 : // previously existed as on retries in theory endpoint might have
183 : // already written some data and advanced last_record_lsn, while we want
184 : // safekeepers to have consistent start_lsn.
185 0 : let start_lsn = match create_mode {
186 0 : models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
187 0 : models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
188 : models::TimelineCreateRequestMode::ImportPgdata { .. } => {
189 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
190 0 : "import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
191 0 : )))?;
192 : }
193 : };
194 : // Choose initial set of safekeepers respecting affinity
195 0 : let sks = self.safekeepers_for_new_timeline().await?;
196 0 : let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
197 0 : // Add timeline to db
198 0 : let mut timeline_persist = TimelinePersistence {
199 0 : tenant_id: tenant_id.to_string(),
200 0 : timeline_id: timeline_id.to_string(),
201 0 : start_lsn: start_lsn.into(),
202 0 : generation: 0,
203 0 : sk_set: sks_persistence.clone(),
204 0 : new_sk_set: None,
205 0 : cplane_notified_generation: 0,
206 0 : deleted_at: None,
207 0 : };
208 0 : let inserted = self
209 0 : .persistence
210 0 : .insert_timeline(timeline_persist.clone())
211 0 : .await?;
212 0 : if !inserted {
213 0 : if let Some(existent_persist) = self
214 0 : .persistence
215 0 : .get_timeline(tenant_id, timeline_id)
216 0 : .await?
217 0 : {
218 0 : // Replace with what we have in the db, to get stuff like the generation right.
219 0 : // We do still repeat the http calls to the safekeepers. After all, we could have
220 0 : // crashed right after the wrote to the DB.
221 0 : timeline_persist = existent_persist;
222 0 : } else {
223 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
224 0 : "insertion said timeline already in db, but looking it up, it was gone"
225 0 : )));
226 : }
227 0 : }
228 : // Create the timeline on a quorum of safekeepers
229 0 : let remaining = self
230 0 : .tenant_timeline_create_safekeepers_quorum(
231 0 : tenant_id,
232 0 : timeline_id,
233 0 : pg_version,
234 0 : &timeline_persist,
235 0 : )
236 0 : .await?;
237 :
238 : // For the remaining safekeepers, take care of their reconciliation asynchronously
239 0 : for &remaining_id in remaining.iter() {
240 0 : let pending_op = TimelinePendingOpPersistence {
241 0 : tenant_id: tenant_id.to_string(),
242 0 : timeline_id: timeline_id.to_string(),
243 0 : generation: timeline_persist.generation,
244 0 : op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
245 0 : sk_id: remaining_id.0 as i64,
246 0 : };
247 0 : tracing::info!("writing pending op for sk id {remaining_id}");
248 0 : self.persistence.insert_pending_op(pending_op).await?;
249 : }
250 0 : if !remaining.is_empty() {
251 0 : let mut locked = self.inner.write().unwrap();
252 0 : for remaining_id in remaining {
253 0 : let Some(sk) = locked.safekeepers.get(&remaining_id) else {
254 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
255 0 : "Couldn't find safekeeper with id {remaining_id}"
256 0 : )));
257 : };
258 0 : let Ok(host_list) = sks
259 0 : .iter()
260 0 : .map(|sk| {
261 0 : Ok((
262 0 : sk.id,
263 0 : locked
264 0 : .safekeepers
265 0 : .get(&sk.id)
266 0 : .ok_or_else(|| {
267 0 : ApiError::InternalServerError(anyhow::anyhow!(
268 0 : "Couldn't find safekeeper with id {remaining_id} to pull from"
269 0 : ))
270 0 : })?
271 0 : .base_url(),
272 : ))
273 0 : })
274 0 : .collect::<Result<_, ApiError>>()
275 : else {
276 0 : continue;
277 : };
278 0 : let req = ScheduleRequest {
279 0 : safekeeper: Box::new(sk.clone()),
280 0 : host_list,
281 0 : tenant_id,
282 0 : timeline_id,
283 0 : generation: timeline_persist.generation as u32,
284 0 : kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
285 0 : };
286 0 : locked.safekeeper_reconcilers.schedule_request(self, req);
287 : }
288 0 : }
289 :
290 0 : Ok(SafekeepersInfo {
291 0 : generation: timeline_persist.generation as u32,
292 0 : safekeepers: sks,
293 0 : tenant_id,
294 0 : timeline_id,
295 0 : })
296 0 : }
297 : /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
298 0 : pub(super) async fn tenant_timeline_delete_safekeepers(
299 0 : self: &Arc<Self>,
300 0 : tenant_id: TenantId,
301 0 : timeline_id: TimelineId,
302 0 : ) -> Result<(), ApiError> {
303 0 : let tl = self
304 0 : .persistence
305 0 : .get_timeline(tenant_id, timeline_id)
306 0 : .await?;
307 0 : let Some(tl) = tl else {
308 0 : tracing::info!(
309 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
310 : );
311 0 : return Ok(());
312 : };
313 0 : let all_sks = tl
314 0 : .new_sk_set
315 0 : .iter()
316 0 : .flat_map(|sks| {
317 0 : sks.iter()
318 0 : .map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude))
319 0 : })
320 0 : .chain(
321 0 : tl.sk_set
322 0 : .iter()
323 0 : .map(|v| (*v, SafekeeperTimelineOpKind::Delete)),
324 0 : )
325 0 : .collect::<HashMap<_, _>>();
326 0 :
327 0 : // Schedule reconciliations
328 0 : {
329 0 : let mut locked = self.inner.write().unwrap();
330 0 : for (sk_id, kind) in all_sks {
331 0 : let sk_id = NodeId(sk_id as u64);
332 0 : let Some(sk) = locked.safekeepers.get(&sk_id) else {
333 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
334 0 : "Couldn't find safekeeper with id {sk_id}"
335 0 : )));
336 : };
337 :
338 0 : let req = ScheduleRequest {
339 0 : safekeeper: Box::new(sk.clone()),
340 0 : // we don't use this for this kind, put a dummy value
341 0 : host_list: Vec::new(),
342 0 : tenant_id,
343 0 : timeline_id,
344 0 : generation: tl.generation as u32,
345 0 : kind,
346 0 : };
347 0 : locked.safekeeper_reconcilers.schedule_request(self, req);
348 : }
349 : }
350 0 : Ok(())
351 0 : }
352 :
353 : /// Choose safekeepers for the new timeline: 3 in different azs.
354 0 : pub(crate) async fn safekeepers_for_new_timeline(
355 0 : &self,
356 0 : ) -> Result<Vec<SafekeeperInfo>, ApiError> {
357 0 : // Number of safekeepers in different AZs we are looking for
358 0 : let wanted_count = 3;
359 0 : let mut all_safekeepers = {
360 0 : let locked = self.inner.read().unwrap();
361 0 : locked
362 0 : .safekeepers
363 0 : .iter()
364 0 : .filter_map(|sk| {
365 0 : if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
366 : // If we don't want to schedule stuff onto the safekeeper, respect that.
367 0 : return None;
368 0 : }
369 0 : let utilization_opt = if let SafekeeperState::Available {
370 : last_seen_at: _,
371 0 : utilization,
372 0 : } = sk.1.availability()
373 : {
374 0 : Some(utilization)
375 : } else {
376 : // non-available safekeepers still get a chance for new timelines,
377 : // but put them last in the list.
378 0 : None
379 : };
380 0 : let info = SafekeeperInfo {
381 0 : hostname: sk.1.skp.host.clone(),
382 0 : id: NodeId(sk.1.skp.id as u64),
383 0 : };
384 0 : Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
385 0 : })
386 0 : .collect::<Vec<_>>()
387 0 : };
388 0 : all_safekeepers.sort_by_key(|sk| {
389 0 : (
390 0 : sk.0.as_ref()
391 0 : .map(|ut| ut.timeline_count)
392 0 : .unwrap_or(u64::MAX),
393 0 : // Use the id to decide on equal scores for reliability
394 0 : sk.1.id.0,
395 0 : )
396 0 : });
397 0 : let mut sks = Vec::new();
398 0 : let mut azs = HashSet::new();
399 0 : for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
400 0 : if !azs.insert(az_id) {
401 0 : continue;
402 0 : }
403 0 : sks.push(sk_info.clone());
404 0 : if sks.len() == wanted_count {
405 0 : break;
406 0 : }
407 : }
408 0 : if sks.len() == wanted_count {
409 0 : Ok(sks)
410 : } else {
411 0 : Err(ApiError::InternalServerError(anyhow::anyhow!(
412 0 : "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
413 0 : sks.len(),
414 0 : all_safekeepers.len(),
415 0 : )))
416 : }
417 0 : }
418 :
419 0 : pub(crate) async fn safekeepers_list(
420 0 : &self,
421 0 : ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
422 0 : let locked = self.inner.read().unwrap();
423 0 : let mut list = locked
424 0 : .safekeepers
425 0 : .iter()
426 0 : .map(|sk| sk.1.describe_response())
427 0 : .collect::<Result<Vec<_>, _>>()?;
428 0 : list.sort_by_key(|v| v.id);
429 0 : Ok(list)
430 0 : }
431 :
432 0 : pub(crate) async fn get_safekeeper(
433 0 : &self,
434 0 : id: i64,
435 0 : ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
436 0 : let locked = self.inner.read().unwrap();
437 0 : let sk = locked
438 0 : .safekeepers
439 0 : .get(&NodeId(id as u64))
440 0 : .ok_or(diesel::result::Error::NotFound)?;
441 0 : sk.describe_response()
442 0 : }
443 :
444 0 : pub(crate) async fn upsert_safekeeper(
445 0 : &self,
446 0 : record: crate::persistence::SafekeeperUpsert,
447 0 : ) -> Result<(), ApiError> {
448 0 : let node_id = NodeId(record.id as u64);
449 0 : let use_https = self.config.use_https_safekeeper_api;
450 0 :
451 0 : if use_https && record.https_port.is_none() {
452 0 : return Err(ApiError::PreconditionFailed(
453 0 : format!(
454 0 : "cannot upsert safekeeper {node_id}: \
455 0 : https is enabled, but https port is not specified"
456 0 : )
457 0 : .into(),
458 0 : ));
459 0 : }
460 0 :
461 0 : self.persistence.safekeeper_upsert(record.clone()).await?;
462 : {
463 0 : let mut locked = self.inner.write().unwrap();
464 0 : let mut safekeepers = (*locked.safekeepers).clone();
465 0 : match safekeepers.entry(node_id) {
466 0 : std::collections::hash_map::Entry::Occupied(mut entry) => entry
467 0 : .get_mut()
468 0 : .update_from_record(record)
469 0 : .expect("all preconditions should be checked before upsert to database"),
470 0 : std::collections::hash_map::Entry::Vacant(entry) => {
471 0 : entry.insert(
472 0 : Safekeeper::from_persistence(
473 0 : crate::persistence::SafekeeperPersistence::from_upsert(
474 0 : record,
475 0 : SkSchedulingPolicy::Pause,
476 0 : ),
477 0 : CancellationToken::new(),
478 0 : use_https,
479 0 : )
480 0 : .expect("all preconditions should be checked before upsert to database"),
481 0 : );
482 0 : }
483 : }
484 0 : locked.safekeepers = Arc::new(safekeepers);
485 0 : }
486 0 : Ok(())
487 0 : }
488 :
489 0 : pub(crate) async fn set_safekeeper_scheduling_policy(
490 0 : &self,
491 0 : id: i64,
492 0 : scheduling_policy: SkSchedulingPolicy,
493 0 : ) -> Result<(), DatabaseError> {
494 0 : self.persistence
495 0 : .set_safekeeper_scheduling_policy(id, scheduling_policy)
496 0 : .await?;
497 0 : let node_id = NodeId(id as u64);
498 0 : // After the change has been persisted successfully, update the in-memory state
499 0 : {
500 0 : let mut locked = self.inner.write().unwrap();
501 0 : let mut safekeepers = (*locked.safekeepers).clone();
502 0 : let sk = safekeepers
503 0 : .get_mut(&node_id)
504 0 : .ok_or(DatabaseError::Logical("Not found".to_string()))?;
505 0 : sk.set_scheduling_policy(scheduling_policy);
506 0 :
507 0 : match scheduling_policy {
508 0 : SkSchedulingPolicy::Active => (),
509 0 : SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
510 0 : locked.safekeeper_reconcilers.cancel_safekeeper(node_id);
511 0 : }
512 : }
513 :
514 0 : locked.safekeepers = Arc::new(safekeepers);
515 0 : }
516 0 : Ok(())
517 0 : }
518 : }
|