Line data Source code
1 : use std::collections::HashSet;
2 : use std::str::FromStr;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use super::safekeeper_reconciler::ScheduleRequest;
7 : use crate::heartbeater::SafekeeperState;
8 : use crate::metrics;
9 : use crate::persistence::{
10 : DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
11 : };
12 : use crate::safekeeper::Safekeeper;
13 : use crate::timeline_import::TimelineImportFinalizeError;
14 : use anyhow::Context;
15 : use http_utils::error::ApiError;
16 : use pageserver_api::controller_api::{
17 : SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
18 : };
19 : use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
20 : use safekeeper_api::membership::{MemberSet, SafekeeperGeneration, SafekeeperId};
21 : use tokio::task::JoinSet;
22 : use tokio_util::sync::CancellationToken;
23 : use utils::id::{NodeId, TenantId, TimelineId};
24 : use utils::logging::SecretString;
25 : use utils::lsn::Lsn;
26 :
27 : use super::Service;
28 :
29 0 : #[derive(serde::Serialize, serde::Deserialize, Clone)]
30 : pub struct TimelineLocateResponse {
31 : pub generation: SafekeeperGeneration,
32 : pub sk_set: Vec<NodeId>,
33 : pub new_sk_set: Option<Vec<NodeId>>,
34 : }
35 :
36 : impl Service {
37 : /// Timeline creation on safekeepers
38 : ///
39 : /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
40 : /// where `left` contains the list of safekeepers that didn't have a successful response.
41 : /// Assumes tenant lock is held while calling this function.
42 0 : pub(super) async fn tenant_timeline_create_safekeepers_quorum(
43 0 : &self,
44 0 : tenant_id: TenantId,
45 0 : timeline_id: TimelineId,
46 0 : pg_version: u32,
47 0 : timeline_persistence: &TimelinePersistence,
48 0 : ) -> Result<Vec<NodeId>, ApiError> {
49 0 : // If quorum is reached, return if we are outside of a specified timeout
50 0 : let jwt = self
51 0 : .config
52 0 : .safekeeper_jwt_token
53 0 : .clone()
54 0 : .map(SecretString::from);
55 0 : let mut joinset = JoinSet::new();
56 0 :
57 0 : // Prepare membership::Configuration from choosen safekeepers.
58 0 : let safekeepers = {
59 0 : let locked = self.inner.read().unwrap();
60 0 : locked.safekeepers.clone()
61 0 : };
62 0 :
63 0 : let mut members = Vec::new();
64 0 : for sk_id in timeline_persistence.sk_set.iter() {
65 0 : let sk_id = NodeId(*sk_id as u64);
66 0 : let Some(safekeeper) = safekeepers.get(&sk_id) else {
67 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
68 0 : "couldn't find entry for safekeeper with id {sk_id}"
69 0 : )))?;
70 : };
71 0 : members.push(SafekeeperId {
72 0 : id: sk_id,
73 0 : host: safekeeper.skp.host.clone(),
74 0 : pg_port: safekeeper.skp.port as u16,
75 0 : });
76 : }
77 0 : let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
78 0 : let mconf = safekeeper_api::membership::Configuration::new(mset);
79 0 :
80 0 : let req = safekeeper_api::models::TimelineCreateRequest {
81 0 : commit_lsn: None,
82 0 : mconf,
83 0 : pg_version,
84 0 : start_lsn: timeline_persistence.start_lsn.0,
85 0 : system_id: None,
86 0 : tenant_id,
87 0 : timeline_id,
88 0 : wal_seg_size: None,
89 0 : };
90 : const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
91 0 : for sk in timeline_persistence.sk_set.iter() {
92 0 : let sk_id = NodeId(*sk as u64);
93 0 : let safekeepers = safekeepers.clone();
94 0 : let http_client = self.http_client.clone();
95 0 : let jwt = jwt.clone();
96 0 : let req = req.clone();
97 0 : joinset.spawn(async move {
98 0 : // Unwrap is fine as we already would have returned error above
99 0 : let sk_p = safekeepers.get(&sk_id).unwrap();
100 0 : let res = sk_p
101 0 : .with_client_retries(
102 0 : |client| {
103 0 : let req = req.clone();
104 0 : async move { client.create_timeline(&req).await }
105 0 : },
106 0 : &http_client,
107 0 : &jwt,
108 0 : 3,
109 0 : 3,
110 0 : SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
111 0 : &CancellationToken::new(),
112 0 : )
113 0 : .await;
114 0 : (sk_id, sk_p.skp.host.clone(), res)
115 0 : });
116 0 : }
117 : // After we have built the joinset, we now wait for the tasks to complete,
118 : // but with a specified timeout to make sure we return swiftly, either with
119 : // a failure or success.
120 0 : let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
121 0 :
122 0 : // Wait until all tasks finish or timeout is hit, whichever occurs
123 0 : // first.
124 0 : let mut reconcile_results = Vec::new();
125 : loop {
126 0 : if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
127 : {
128 0 : let Some(res) = res else { break };
129 0 : match res {
130 0 : Ok(res) => {
131 0 : tracing::info!(
132 0 : "response from safekeeper id:{} at {}: {:?}",
133 : res.0,
134 : res.1,
135 : res.2
136 : );
137 0 : reconcile_results.push(res);
138 : }
139 0 : Err(join_err) => {
140 0 : tracing::info!("join_err for task in joinset: {join_err}");
141 : }
142 : }
143 : } else {
144 0 : tracing::info!(
145 0 : "timeout for creation call after {} responses",
146 0 : reconcile_results.len()
147 : );
148 0 : break;
149 : }
150 : }
151 :
152 : // Now check now if quorum was reached in reconcile_results.
153 0 : let total_result_count = reconcile_results.len();
154 0 : let remaining = reconcile_results
155 0 : .into_iter()
156 0 : .filter_map(|res| res.2.is_err().then_some(res.0))
157 0 : .collect::<Vec<_>>();
158 0 : tracing::info!(
159 0 : "Got {} non-successful responses from initial creation request of total {total_result_count} responses",
160 0 : remaining.len()
161 : );
162 0 : let target_sk_count = timeline_persistence.sk_set.len();
163 0 : let quorum_size = match target_sk_count {
164 : 0 => {
165 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
166 0 : "timeline configured without any safekeepers",
167 0 : )));
168 : }
169 : 1 | 2 => {
170 : #[cfg(feature = "testing")]
171 : {
172 : // In test settings, it is allowed to have one or two safekeepers
173 0 : target_sk_count
174 : }
175 : #[cfg(not(feature = "testing"))]
176 : {
177 : // The region is misconfigured: we need at least three safekeepers to be configured
178 : // in order to schedule work to them
179 : tracing::warn!(
180 : "couldn't find at least 3 safekeepers for timeline, found: {:?}",
181 : timeline_persistence.sk_set
182 : );
183 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
184 : "couldn't find at least 3 safekeepers to put timeline to"
185 : )));
186 : }
187 : }
188 0 : _ => target_sk_count / 2 + 1,
189 : };
190 0 : let success_count = target_sk_count - remaining.len();
191 0 : if success_count < quorum_size {
192 : // Failure
193 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
194 0 : "not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
195 0 : )));
196 0 : }
197 0 :
198 0 : Ok(remaining)
199 0 : }
200 :
201 : /// Create timeline in controller database and on safekeepers.
202 : /// `timeline_info` is result of timeline creation on pageserver.
203 : ///
204 : /// All actions must be idempotent as the call is retried until success. It
205 : /// tries to create timeline in the db and on at least majority of
206 : /// safekeepers + queue creation for safekeepers which missed it in the db
207 : /// for infinite retries; after that, call returns Ok.
208 : ///
209 : /// The idea is that once this is reached as long as we have alive majority
210 : /// of safekeepers it is expected to get eventually operational as storcon
211 : /// will be able to seed timeline on nodes which missed creation by making
212 : /// pull_timeline from peers. On the other hand we don't want to fail
213 : /// timeline creation if one safekeeper is down.
214 0 : pub(super) async fn tenant_timeline_create_safekeepers(
215 0 : self: &Arc<Self>,
216 0 : tenant_id: TenantId,
217 0 : timeline_info: &TimelineInfo,
218 0 : read_only: bool,
219 0 : ) -> Result<SafekeepersInfo, ApiError> {
220 0 : let timeline_id = timeline_info.timeline_id;
221 0 : let pg_version = timeline_info.pg_version * 10000;
222 0 : // Initially start_lsn is determined by last_record_lsn in pageserver
223 0 : // response as it does initdb. However, later we persist it and in sk
224 0 : // creation calls replace with the value from the timeline row if it
225 0 : // previously existed as on retries in theory endpoint might have
226 0 : // already written some data and advanced last_record_lsn, while we want
227 0 : // safekeepers to have consistent start_lsn.
228 0 : let start_lsn = timeline_info.last_record_lsn;
229 :
230 : // Choose initial set of safekeepers respecting affinity
231 0 : let sks = if !read_only {
232 0 : self.safekeepers_for_new_timeline().await?
233 : } else {
234 0 : Vec::new()
235 : };
236 0 : let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
237 0 : // Add timeline to db
238 0 : let mut timeline_persist = TimelinePersistence {
239 0 : tenant_id: tenant_id.to_string(),
240 0 : timeline_id: timeline_id.to_string(),
241 0 : start_lsn: start_lsn.into(),
242 0 : generation: 1,
243 0 : sk_set: sks_persistence.clone(),
244 0 : new_sk_set: None,
245 0 : cplane_notified_generation: 0,
246 0 : deleted_at: None,
247 0 : };
248 0 : let inserted = self
249 0 : .persistence
250 0 : .insert_timeline(timeline_persist.clone())
251 0 : .await?;
252 0 : if !inserted {
253 0 : if let Some(existent_persist) = self
254 0 : .persistence
255 0 : .get_timeline(tenant_id, timeline_id)
256 0 : .await?
257 0 : {
258 0 : // Replace with what we have in the db, to get stuff like the generation right.
259 0 : // We do still repeat the http calls to the safekeepers. After all, we could have
260 0 : // crashed right after the wrote to the DB.
261 0 : timeline_persist = existent_persist;
262 0 : } else {
263 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
264 0 : "insertion said timeline already in db, but looking it up, it was gone"
265 0 : )));
266 : }
267 0 : }
268 0 : let ret = SafekeepersInfo {
269 0 : generation: timeline_persist.generation as u32,
270 0 : safekeepers: sks.clone(),
271 0 : tenant_id,
272 0 : timeline_id,
273 0 : };
274 0 : if read_only {
275 0 : return Ok(ret);
276 0 : }
277 :
278 : // Create the timeline on a quorum of safekeepers
279 0 : let remaining = self
280 0 : .tenant_timeline_create_safekeepers_quorum(
281 0 : tenant_id,
282 0 : timeline_id,
283 0 : pg_version,
284 0 : &timeline_persist,
285 0 : )
286 0 : .await?;
287 :
288 : // For the remaining safekeepers, take care of their reconciliation asynchronously
289 0 : for &remaining_id in remaining.iter() {
290 0 : let pending_op = TimelinePendingOpPersistence {
291 0 : tenant_id: tenant_id.to_string(),
292 0 : timeline_id: timeline_id.to_string(),
293 0 : generation: timeline_persist.generation,
294 0 : op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
295 0 : sk_id: remaining_id.0 as i64,
296 0 : };
297 0 : tracing::info!("writing pending op for sk id {remaining_id}");
298 0 : self.persistence.insert_pending_op(pending_op).await?;
299 : }
300 0 : if !remaining.is_empty() {
301 0 : let locked = self.inner.read().unwrap();
302 0 : for remaining_id in remaining {
303 0 : let Some(sk) = locked.safekeepers.get(&remaining_id) else {
304 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
305 0 : "Couldn't find safekeeper with id {remaining_id}"
306 0 : )));
307 : };
308 0 : let Ok(host_list) = sks
309 0 : .iter()
310 0 : .map(|sk| {
311 0 : Ok((
312 0 : sk.id,
313 0 : locked
314 0 : .safekeepers
315 0 : .get(&sk.id)
316 0 : .ok_or_else(|| {
317 0 : ApiError::InternalServerError(anyhow::anyhow!(
318 0 : "Couldn't find safekeeper with id {} to pull from",
319 0 : sk.id
320 0 : ))
321 0 : })?
322 0 : .base_url(),
323 : ))
324 0 : })
325 0 : .collect::<Result<_, ApiError>>()
326 : else {
327 0 : continue;
328 : };
329 0 : let req = ScheduleRequest {
330 0 : safekeeper: Box::new(sk.clone()),
331 0 : host_list,
332 0 : tenant_id,
333 0 : timeline_id: Some(timeline_id),
334 0 : generation: timeline_persist.generation as u32,
335 0 : kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
336 0 : };
337 0 : locked.safekeeper_reconcilers.schedule_request(req);
338 : }
339 0 : }
340 :
341 0 : Ok(ret)
342 0 : }
343 :
344 0 : pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
345 0 : self: &Arc<Self>,
346 0 : tenant_id: TenantId,
347 0 : timeline_info: TimelineInfo,
348 0 : ) -> Result<(), TimelineImportFinalizeError> {
349 : const BACKOFF: Duration = Duration::from_secs(5);
350 :
351 : loop {
352 0 : if self.cancel.is_cancelled() {
353 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
354 0 : }
355 0 :
356 0 : // This function is only used in non-read-only scenarios
357 0 : let read_only = false;
358 0 : let res = self
359 0 : .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
360 0 : .await;
361 :
362 0 : match res {
363 : Ok(_) => {
364 0 : tracing::info!("Timeline created on safekeepers");
365 0 : break;
366 : }
367 0 : Err(err) => {
368 0 : tracing::error!("Failed to create timeline on safekeepers: {err}");
369 0 : tokio::select! {
370 0 : _ = self.cancel.cancelled() => {
371 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
372 : },
373 0 : _ = tokio::time::sleep(BACKOFF) => {}
374 : };
375 : }
376 : }
377 : }
378 :
379 0 : Ok(())
380 0 : }
381 :
382 : /// Directly insert the timeline into the database without reconciling it with safekeepers.
383 : ///
384 : /// Useful if the timeline already exists on the specified safekeepers,
385 : /// but we want to make it storage controller managed.
386 0 : pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
387 0 : let persistence = TimelinePersistence {
388 0 : tenant_id: req.tenant_id.to_string(),
389 0 : timeline_id: req.timeline_id.to_string(),
390 0 : start_lsn: Lsn::INVALID.into(),
391 0 : generation: 1,
392 0 : sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
393 0 : new_sk_set: None,
394 0 : cplane_notified_generation: 1,
395 0 : deleted_at: None,
396 0 : };
397 0 : let inserted = self.persistence.insert_timeline(persistence).await?;
398 0 : if inserted {
399 0 : tracing::info!("imported timeline into db");
400 : } else {
401 0 : tracing::info!("didn't import timeline into db, as it is already present in db");
402 : }
403 0 : Ok(())
404 0 : }
405 :
406 : /// Locate safekeepers for a timeline.
407 : /// Return the generation, sk_set and new_sk_set if present.
408 : /// If the timeline is not storcon-managed, return NotFound.
409 0 : pub(crate) async fn tenant_timeline_locate(
410 0 : &self,
411 0 : tenant_id: TenantId,
412 0 : timeline_id: TimelineId,
413 0 : ) -> Result<TimelineLocateResponse, ApiError> {
414 0 : let timeline = self
415 0 : .persistence
416 0 : .get_timeline(tenant_id, timeline_id)
417 0 : .await?;
418 :
419 0 : let Some(timeline) = timeline else {
420 0 : return Err(ApiError::NotFound(
421 0 : anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
422 0 : ));
423 : };
424 :
425 0 : Ok(TimelineLocateResponse {
426 0 : generation: SafekeeperGeneration::new(timeline.generation as u32),
427 0 : sk_set: timeline
428 0 : .sk_set
429 0 : .iter()
430 0 : .map(|id| NodeId(*id as u64))
431 0 : .collect(),
432 0 : new_sk_set: timeline
433 0 : .new_sk_set
434 0 : .map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
435 0 : })
436 0 : }
437 :
438 : /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
439 0 : pub(super) async fn tenant_timeline_delete_safekeepers(
440 0 : self: &Arc<Self>,
441 0 : tenant_id: TenantId,
442 0 : timeline_id: TimelineId,
443 0 : ) -> Result<(), ApiError> {
444 0 : let tl = self
445 0 : .persistence
446 0 : .get_timeline(tenant_id, timeline_id)
447 0 : .await?;
448 0 : let Some(tl) = tl else {
449 0 : tracing::info!(
450 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
451 : );
452 0 : return Ok(());
453 : };
454 0 : self.persistence
455 0 : .timeline_set_deleted_at(tenant_id, timeline_id)
456 0 : .await?;
457 0 : let all_sks = tl
458 0 : .new_sk_set
459 0 : .iter()
460 0 : .flatten()
461 0 : .chain(tl.sk_set.iter())
462 0 : .collect::<HashSet<_>>();
463 0 :
464 0 : // The timeline has no safekeepers: we need to delete it from the db manually,
465 0 : // as no safekeeper reconciler will get to it
466 0 : if all_sks.is_empty() {
467 0 : if let Err(err) = self
468 0 : .persistence
469 0 : .delete_timeline(tenant_id, timeline_id)
470 0 : .await
471 : {
472 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
473 0 : }
474 0 : }
475 :
476 : // Schedule reconciliations
477 0 : for &sk_id in all_sks.iter() {
478 0 : let pending_op = TimelinePendingOpPersistence {
479 0 : tenant_id: tenant_id.to_string(),
480 0 : timeline_id: timeline_id.to_string(),
481 0 : generation: i32::MAX,
482 0 : op_kind: SafekeeperTimelineOpKind::Delete,
483 0 : sk_id: *sk_id,
484 0 : };
485 0 : tracing::info!("writing pending op for sk id {sk_id}");
486 0 : self.persistence.insert_pending_op(pending_op).await?;
487 : }
488 : {
489 0 : let locked = self.inner.read().unwrap();
490 0 : for sk_id in all_sks {
491 0 : let sk_id = NodeId(*sk_id as u64);
492 0 : let Some(sk) = locked.safekeepers.get(&sk_id) else {
493 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
494 0 : "Couldn't find safekeeper with id {sk_id}"
495 0 : )));
496 : };
497 :
498 0 : let req = ScheduleRequest {
499 0 : safekeeper: Box::new(sk.clone()),
500 0 : // we don't use this for this kind, put a dummy value
501 0 : host_list: Vec::new(),
502 0 : tenant_id,
503 0 : timeline_id: Some(timeline_id),
504 0 : generation: tl.generation as u32,
505 0 : kind: SafekeeperTimelineOpKind::Delete,
506 0 : };
507 0 : locked.safekeeper_reconcilers.schedule_request(req);
508 : }
509 : }
510 0 : Ok(())
511 0 : }
512 :
513 : /// Perform tenant deletion on safekeepers.
514 0 : pub(super) async fn tenant_delete_safekeepers(
515 0 : self: &Arc<Self>,
516 0 : tenant_id: TenantId,
517 0 : ) -> Result<(), ApiError> {
518 0 : let timeline_list = self
519 0 : .persistence
520 0 : .list_timelines_for_tenant(tenant_id)
521 0 : .await?;
522 :
523 0 : if timeline_list.is_empty() {
524 : // Early exit: the tenant is either empty or not migrated to the storcon yet
525 0 : tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
526 0 : return Ok(());
527 0 : }
528 :
529 0 : let timeline_list = timeline_list
530 0 : .into_iter()
531 0 : .map(|timeline| {
532 0 : let timeline_id = TimelineId::from_str(&timeline.timeline_id)
533 0 : .context("timeline id loaded from db")
534 0 : .map_err(ApiError::InternalServerError)?;
535 0 : Ok((timeline_id, timeline))
536 0 : })
537 0 : .collect::<Result<Vec<_>, ApiError>>()?;
538 :
539 : // Remove pending ops from db, and set `deleted_at`.
540 : // We cancel them in a later iteration once we hold the state lock.
541 0 : for (timeline_id, _timeline) in timeline_list.iter() {
542 0 : self.persistence
543 0 : .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
544 0 : .await?;
545 0 : self.persistence
546 0 : .timeline_set_deleted_at(tenant_id, *timeline_id)
547 0 : .await?;
548 : }
549 :
550 : // The list of safekeepers that have any of the timelines
551 0 : let mut sk_list = HashSet::new();
552 :
553 : // List all pending ops for all timelines, cancel them
554 0 : for (_timeline_id, timeline) in timeline_list.iter() {
555 0 : let sk_iter = timeline
556 0 : .sk_set
557 0 : .iter()
558 0 : .chain(timeline.new_sk_set.iter().flatten())
559 0 : .map(|id| NodeId(*id as u64));
560 0 : sk_list.extend(sk_iter);
561 0 : }
562 :
563 0 : for &sk_id in sk_list.iter() {
564 0 : let pending_op = TimelinePendingOpPersistence {
565 0 : tenant_id: tenant_id.to_string(),
566 0 : timeline_id: String::new(),
567 0 : generation: i32::MAX,
568 0 : op_kind: SafekeeperTimelineOpKind::Delete,
569 0 : sk_id: sk_id.0 as i64,
570 0 : };
571 0 : tracing::info!("writing pending op for sk id {sk_id}");
572 0 : self.persistence.insert_pending_op(pending_op).await?;
573 : }
574 :
575 0 : let mut locked = self.inner.write().unwrap();
576 :
577 0 : for (timeline_id, _timeline) in timeline_list.iter() {
578 0 : for sk_id in sk_list.iter() {
579 0 : locked
580 0 : .safekeeper_reconcilers
581 0 : .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
582 0 : }
583 : }
584 :
585 : // unwrap is safe: we return above for an empty timeline list
586 0 : let max_generation = timeline_list
587 0 : .iter()
588 0 : .map(|(_tl_id, tl)| tl.generation as u32)
589 0 : .max()
590 0 : .unwrap();
591 :
592 0 : for sk_id in sk_list {
593 0 : let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
594 0 : tracing::warn!("Couldn't find safekeeper with id {sk_id}");
595 0 : continue;
596 : };
597 : // Add pending op for tenant deletion
598 0 : let req = ScheduleRequest {
599 0 : generation: max_generation,
600 0 : host_list: Vec::new(),
601 0 : kind: SafekeeperTimelineOpKind::Delete,
602 0 : safekeeper: Box::new(safekeeper.clone()),
603 0 : tenant_id,
604 0 : timeline_id: None,
605 0 : };
606 0 : locked.safekeeper_reconcilers.schedule_request(req);
607 : }
608 0 : Ok(())
609 0 : }
610 :
611 : /// Choose safekeepers for the new timeline: 3 in different azs.
612 0 : pub(crate) async fn safekeepers_for_new_timeline(
613 0 : &self,
614 0 : ) -> Result<Vec<SafekeeperInfo>, ApiError> {
615 0 : let mut all_safekeepers = {
616 0 : let locked = self.inner.read().unwrap();
617 0 : locked
618 0 : .safekeepers
619 0 : .iter()
620 0 : .filter_map(|sk| {
621 0 : if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
622 : // If we don't want to schedule stuff onto the safekeeper, respect that.
623 0 : return None;
624 0 : }
625 0 : let utilization_opt = if let SafekeeperState::Available {
626 : last_seen_at: _,
627 0 : utilization,
628 0 : } = sk.1.availability()
629 : {
630 0 : Some(utilization)
631 : } else {
632 : // non-available safekeepers still get a chance for new timelines,
633 : // but put them last in the list.
634 0 : None
635 : };
636 0 : let info = SafekeeperInfo {
637 0 : hostname: sk.1.skp.host.clone(),
638 0 : id: NodeId(sk.1.skp.id as u64),
639 0 : };
640 0 : Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
641 0 : })
642 0 : .collect::<Vec<_>>()
643 0 : };
644 0 : all_safekeepers.sort_by_key(|sk| {
645 0 : (
646 0 : sk.0.as_ref()
647 0 : .map(|ut| ut.timeline_count)
648 0 : .unwrap_or(u64::MAX),
649 0 : // Use the id to decide on equal scores for reliability
650 0 : sk.1.id.0,
651 0 : )
652 0 : });
653 : // Number of safekeepers in different AZs we are looking for
654 0 : let wanted_count = match all_safekeepers.len() {
655 : 0 => {
656 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
657 0 : "couldn't find any active safekeeper for new timeline",
658 0 : )));
659 : }
660 : // Have laxer requirements on testig mode as we don't want to
661 : // spin up three safekeepers for every single test
662 : #[cfg(feature = "testing")]
663 0 : 1 | 2 => all_safekeepers.len(),
664 0 : _ => 3,
665 : };
666 0 : let mut sks = Vec::new();
667 0 : let mut azs = HashSet::new();
668 0 : for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
669 0 : if !azs.insert(az_id) {
670 0 : continue;
671 0 : }
672 0 : sks.push(sk_info.clone());
673 0 : if sks.len() == wanted_count {
674 0 : break;
675 0 : }
676 : }
677 0 : if sks.len() == wanted_count {
678 0 : Ok(sks)
679 : } else {
680 0 : Err(ApiError::InternalServerError(anyhow::anyhow!(
681 0 : "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
682 0 : sks.len(),
683 0 : all_safekeepers.len(),
684 0 : )))
685 : }
686 0 : }
687 :
688 0 : pub(crate) async fn safekeepers_list(
689 0 : &self,
690 0 : ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
691 0 : let locked = self.inner.read().unwrap();
692 0 : let mut list = locked
693 0 : .safekeepers
694 0 : .iter()
695 0 : .map(|sk| sk.1.describe_response())
696 0 : .collect::<Result<Vec<_>, _>>()?;
697 0 : list.sort_by_key(|v| v.id);
698 0 : Ok(list)
699 0 : }
700 :
701 0 : pub(crate) async fn get_safekeeper(
702 0 : &self,
703 0 : id: i64,
704 0 : ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
705 0 : let locked = self.inner.read().unwrap();
706 0 : let sk = locked
707 0 : .safekeepers
708 0 : .get(&NodeId(id as u64))
709 0 : .ok_or(diesel::result::Error::NotFound)?;
710 0 : sk.describe_response()
711 0 : }
712 :
713 0 : pub(crate) async fn upsert_safekeeper(
714 0 : self: &Arc<Service>,
715 0 : record: crate::persistence::SafekeeperUpsert,
716 0 : ) -> Result<(), ApiError> {
717 0 : let node_id = NodeId(record.id as u64);
718 0 : let use_https = self.config.use_https_safekeeper_api;
719 0 :
720 0 : if use_https && record.https_port.is_none() {
721 0 : return Err(ApiError::PreconditionFailed(
722 0 : format!(
723 0 : "cannot upsert safekeeper {node_id}: \
724 0 : https is enabled, but https port is not specified"
725 0 : )
726 0 : .into(),
727 0 : ));
728 0 : }
729 0 :
730 0 : self.persistence.safekeeper_upsert(record.clone()).await?;
731 : {
732 0 : let mut locked = self.inner.write().unwrap();
733 0 : let mut safekeepers = (*locked.safekeepers).clone();
734 0 : match safekeepers.entry(node_id) {
735 0 : std::collections::hash_map::Entry::Occupied(mut entry) => entry
736 0 : .get_mut()
737 0 : .update_from_record(record)
738 0 : .expect("all preconditions should be checked before upsert to database"),
739 0 : std::collections::hash_map::Entry::Vacant(entry) => {
740 0 : entry.insert(
741 0 : Safekeeper::from_persistence(
742 0 : crate::persistence::SafekeeperPersistence::from_upsert(
743 0 : record,
744 0 : SkSchedulingPolicy::Pause,
745 0 : ),
746 0 : CancellationToken::new(),
747 0 : use_https,
748 0 : )
749 0 : .expect("all preconditions should be checked before upsert to database"),
750 0 : );
751 0 : }
752 : }
753 0 : locked
754 0 : .safekeeper_reconcilers
755 0 : .start_reconciler(node_id, self);
756 0 : locked.safekeepers = Arc::new(safekeepers);
757 0 : metrics::METRICS_REGISTRY
758 0 : .metrics_group
759 0 : .storage_controller_safekeeper_nodes
760 0 : .set(locked.safekeepers.len() as i64);
761 0 : metrics::METRICS_REGISTRY
762 0 : .metrics_group
763 0 : .storage_controller_https_safekeeper_nodes
764 0 : .set(
765 0 : locked
766 0 : .safekeepers
767 0 : .values()
768 0 : .filter(|s| s.has_https_port())
769 0 : .count() as i64,
770 0 : );
771 0 : }
772 0 : Ok(())
773 0 : }
774 :
775 0 : pub(crate) async fn set_safekeeper_scheduling_policy(
776 0 : self: &Arc<Service>,
777 0 : id: i64,
778 0 : scheduling_policy: SkSchedulingPolicy,
779 0 : ) -> Result<(), DatabaseError> {
780 0 : self.persistence
781 0 : .set_safekeeper_scheduling_policy(id, scheduling_policy)
782 0 : .await?;
783 0 : let node_id = NodeId(id as u64);
784 0 : // After the change has been persisted successfully, update the in-memory state
785 0 : {
786 0 : let mut locked = self.inner.write().unwrap();
787 0 : let mut safekeepers = (*locked.safekeepers).clone();
788 0 : let sk = safekeepers
789 0 : .get_mut(&node_id)
790 0 : .ok_or(DatabaseError::Logical("Not found".to_string()))?;
791 0 : sk.set_scheduling_policy(scheduling_policy);
792 0 :
793 0 : match scheduling_policy {
794 0 : SkSchedulingPolicy::Active => {
795 0 : locked
796 0 : .safekeeper_reconcilers
797 0 : .start_reconciler(node_id, self);
798 0 : }
799 0 : SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
800 0 : locked.safekeeper_reconcilers.stop_reconciler(node_id);
801 0 : }
802 : }
803 :
804 0 : locked.safekeepers = Arc::new(safekeepers);
805 0 : }
806 0 : Ok(())
807 0 : }
808 : }
|