Line data Source code
1 : use std::collections::HashSet;
2 : use std::str::FromStr;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use super::safekeeper_reconciler::ScheduleRequest;
7 : use crate::heartbeater::SafekeeperState;
8 : use crate::metrics;
9 : use crate::persistence::{
10 : DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
11 : };
12 : use crate::safekeeper::Safekeeper;
13 : use anyhow::Context;
14 : use http_utils::error::ApiError;
15 : use pageserver_api::controller_api::{
16 : SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
17 : };
18 : use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
19 : use safekeeper_api::membership::{MemberSet, SafekeeperId};
20 : use tokio::task::JoinSet;
21 : use tokio_util::sync::CancellationToken;
22 : use utils::id::{NodeId, TenantId, TimelineId};
23 : use utils::logging::SecretString;
24 : use utils::lsn::Lsn;
25 :
26 : use super::Service;
27 :
28 : impl Service {
29 : /// Timeline creation on safekeepers
30 : ///
31 : /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
32 : /// where `left` contains the list of safekeepers that didn't have a successful response.
33 : /// Assumes tenant lock is held while calling this function.
34 0 : pub(super) async fn tenant_timeline_create_safekeepers_quorum(
35 0 : &self,
36 0 : tenant_id: TenantId,
37 0 : timeline_id: TimelineId,
38 0 : pg_version: u32,
39 0 : timeline_persistence: &TimelinePersistence,
40 0 : ) -> Result<Vec<NodeId>, ApiError> {
41 0 : // If quorum is reached, return if we are outside of a specified timeout
42 0 : let jwt = self
43 0 : .config
44 0 : .safekeeper_jwt_token
45 0 : .clone()
46 0 : .map(SecretString::from);
47 0 : let mut joinset = JoinSet::new();
48 0 :
49 0 : // Prepare membership::Configuration from choosen safekeepers.
50 0 : let safekeepers = {
51 0 : let locked = self.inner.read().unwrap();
52 0 : locked.safekeepers.clone()
53 0 : };
54 0 :
55 0 : let mut members = Vec::new();
56 0 : for sk_id in timeline_persistence.sk_set.iter() {
57 0 : let sk_id = NodeId(*sk_id as u64);
58 0 : let Some(safekeeper) = safekeepers.get(&sk_id) else {
59 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
60 0 : "couldn't find entry for safekeeper with id {sk_id}"
61 0 : )))?;
62 : };
63 0 : members.push(SafekeeperId {
64 0 : id: sk_id,
65 0 : host: safekeeper.skp.host.clone(),
66 0 : pg_port: safekeeper.skp.port as u16,
67 0 : });
68 : }
69 0 : let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
70 0 : let mconf = safekeeper_api::membership::Configuration::new(mset);
71 0 :
72 0 : let req = safekeeper_api::models::TimelineCreateRequest {
73 0 : commit_lsn: None,
74 0 : mconf,
75 0 : pg_version,
76 0 : start_lsn: timeline_persistence.start_lsn.0,
77 0 : system_id: None,
78 0 : tenant_id,
79 0 : timeline_id,
80 0 : wal_seg_size: None,
81 0 : };
82 : const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
83 0 : for sk in timeline_persistence.sk_set.iter() {
84 0 : let sk_id = NodeId(*sk as u64);
85 0 : let safekeepers = safekeepers.clone();
86 0 : let http_client = self.http_client.clone();
87 0 : let jwt = jwt.clone();
88 0 : let req = req.clone();
89 0 : joinset.spawn(async move {
90 0 : // Unwrap is fine as we already would have returned error above
91 0 : let sk_p = safekeepers.get(&sk_id).unwrap();
92 0 : let res = sk_p
93 0 : .with_client_retries(
94 0 : |client| {
95 0 : let req = req.clone();
96 0 : async move { client.create_timeline(&req).await }
97 0 : },
98 0 : &http_client,
99 0 : &jwt,
100 0 : 3,
101 0 : 3,
102 0 : SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
103 0 : &CancellationToken::new(),
104 0 : )
105 0 : .await;
106 0 : (sk_id, sk_p.skp.host.clone(), res)
107 0 : });
108 0 : }
109 : // After we have built the joinset, we now wait for the tasks to complete,
110 : // but with a specified timeout to make sure we return swiftly, either with
111 : // a failure or success.
112 0 : let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
113 0 :
114 0 : // Wait until all tasks finish or timeout is hit, whichever occurs
115 0 : // first.
116 0 : let mut reconcile_results = Vec::new();
117 : loop {
118 0 : if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
119 : {
120 0 : let Some(res) = res else { break };
121 0 : match res {
122 0 : Ok(res) => {
123 0 : tracing::info!(
124 0 : "response from safekeeper id:{} at {}: {:?}",
125 : res.0,
126 : res.1,
127 : res.2
128 : );
129 0 : reconcile_results.push(res);
130 : }
131 0 : Err(join_err) => {
132 0 : tracing::info!("join_err for task in joinset: {join_err}");
133 : }
134 : }
135 : } else {
136 0 : tracing::info!(
137 0 : "timeout for creation call after {} responses",
138 0 : reconcile_results.len()
139 : );
140 0 : break;
141 : }
142 : }
143 :
144 : // Now check now if quorum was reached in reconcile_results.
145 0 : let total_result_count = reconcile_results.len();
146 0 : let remaining = reconcile_results
147 0 : .into_iter()
148 0 : .filter_map(|res| res.2.is_err().then_some(res.0))
149 0 : .collect::<Vec<_>>();
150 0 : tracing::info!(
151 0 : "Got {} non-successful responses from initial creation request of total {total_result_count} responses",
152 0 : remaining.len()
153 : );
154 0 : if remaining.len() >= 2 {
155 : // Failure
156 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
157 0 : "not enough successful reconciliations to reach quorum, please retry: {} errored",
158 0 : remaining.len()
159 0 : )));
160 0 : }
161 0 :
162 0 : Ok(remaining)
163 0 : }
164 :
165 : /// Create timeline in controller database and on safekeepers.
166 : /// `timeline_info` is result of timeline creation on pageserver.
167 : ///
168 : /// All actions must be idempotent as the call is retried until success. It
169 : /// tries to create timeline in the db and on at least majority of
170 : /// safekeepers + queue creation for safekeepers which missed it in the db
171 : /// for infinite retries; after that, call returns Ok.
172 : ///
173 : /// The idea is that once this is reached as long as we have alive majority
174 : /// of safekeepers it is expected to get eventually operational as storcon
175 : /// will be able to seed timeline on nodes which missed creation by making
176 : /// pull_timeline from peers. On the other hand we don't want to fail
177 : /// timeline creation if one safekeeper is down.
178 0 : pub(super) async fn tenant_timeline_create_safekeepers(
179 0 : self: &Arc<Self>,
180 0 : tenant_id: TenantId,
181 0 : timeline_info: &TimelineInfo,
182 0 : create_mode: models::TimelineCreateRequestMode,
183 0 : ) -> Result<SafekeepersInfo, ApiError> {
184 0 : let timeline_id = timeline_info.timeline_id;
185 0 : let pg_version = timeline_info.pg_version * 10000;
186 : // Initially start_lsn is determined by last_record_lsn in pageserver
187 : // response as it does initdb. However, later we persist it and in sk
188 : // creation calls replace with the value from the timeline row if it
189 : // previously existed as on retries in theory endpoint might have
190 : // already written some data and advanced last_record_lsn, while we want
191 : // safekeepers to have consistent start_lsn.
192 0 : let start_lsn = match create_mode {
193 0 : models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
194 0 : models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
195 : models::TimelineCreateRequestMode::ImportPgdata { .. } => {
196 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
197 0 : "import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
198 0 : )))?;
199 : }
200 : };
201 : // Choose initial set of safekeepers respecting affinity
202 0 : let sks = self.safekeepers_for_new_timeline().await?;
203 0 : let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
204 0 : // Add timeline to db
205 0 : let mut timeline_persist = TimelinePersistence {
206 0 : tenant_id: tenant_id.to_string(),
207 0 : timeline_id: timeline_id.to_string(),
208 0 : start_lsn: start_lsn.into(),
209 0 : generation: 1,
210 0 : sk_set: sks_persistence.clone(),
211 0 : new_sk_set: None,
212 0 : cplane_notified_generation: 0,
213 0 : deleted_at: None,
214 0 : };
215 0 : let inserted = self
216 0 : .persistence
217 0 : .insert_timeline(timeline_persist.clone())
218 0 : .await?;
219 0 : if !inserted {
220 0 : if let Some(existent_persist) = self
221 0 : .persistence
222 0 : .get_timeline(tenant_id, timeline_id)
223 0 : .await?
224 0 : {
225 0 : // Replace with what we have in the db, to get stuff like the generation right.
226 0 : // We do still repeat the http calls to the safekeepers. After all, we could have
227 0 : // crashed right after the wrote to the DB.
228 0 : timeline_persist = existent_persist;
229 0 : } else {
230 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
231 0 : "insertion said timeline already in db, but looking it up, it was gone"
232 0 : )));
233 : }
234 0 : }
235 : // Create the timeline on a quorum of safekeepers
236 0 : let remaining = self
237 0 : .tenant_timeline_create_safekeepers_quorum(
238 0 : tenant_id,
239 0 : timeline_id,
240 0 : pg_version,
241 0 : &timeline_persist,
242 0 : )
243 0 : .await?;
244 :
245 : // For the remaining safekeepers, take care of their reconciliation asynchronously
246 0 : for &remaining_id in remaining.iter() {
247 0 : let pending_op = TimelinePendingOpPersistence {
248 0 : tenant_id: tenant_id.to_string(),
249 0 : timeline_id: timeline_id.to_string(),
250 0 : generation: timeline_persist.generation,
251 0 : op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
252 0 : sk_id: remaining_id.0 as i64,
253 0 : };
254 0 : tracing::info!("writing pending op for sk id {remaining_id}");
255 0 : self.persistence.insert_pending_op(pending_op).await?;
256 : }
257 0 : if !remaining.is_empty() {
258 0 : let locked = self.inner.read().unwrap();
259 0 : for remaining_id in remaining {
260 0 : let Some(sk) = locked.safekeepers.get(&remaining_id) else {
261 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
262 0 : "Couldn't find safekeeper with id {remaining_id}"
263 0 : )));
264 : };
265 0 : let Ok(host_list) = sks
266 0 : .iter()
267 0 : .map(|sk| {
268 0 : Ok((
269 0 : sk.id,
270 0 : locked
271 0 : .safekeepers
272 0 : .get(&sk.id)
273 0 : .ok_or_else(|| {
274 0 : ApiError::InternalServerError(anyhow::anyhow!(
275 0 : "Couldn't find safekeeper with id {} to pull from",
276 0 : sk.id
277 0 : ))
278 0 : })?
279 0 : .base_url(),
280 : ))
281 0 : })
282 0 : .collect::<Result<_, ApiError>>()
283 : else {
284 0 : continue;
285 : };
286 0 : let req = ScheduleRequest {
287 0 : safekeeper: Box::new(sk.clone()),
288 0 : host_list,
289 0 : tenant_id,
290 0 : timeline_id: Some(timeline_id),
291 0 : generation: timeline_persist.generation as u32,
292 0 : kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
293 0 : };
294 0 : locked.safekeeper_reconcilers.schedule_request(req);
295 : }
296 0 : }
297 :
298 0 : Ok(SafekeepersInfo {
299 0 : generation: timeline_persist.generation as u32,
300 0 : safekeepers: sks,
301 0 : tenant_id,
302 0 : timeline_id,
303 0 : })
304 0 : }
305 :
306 : /// Directly insert the timeline into the database without reconciling it with safekeepers.
307 : ///
308 : /// Useful if the timeline already exists on the specified safekeepers,
309 : /// but we want to make it storage controller managed.
310 0 : pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
311 0 : let persistence = TimelinePersistence {
312 0 : tenant_id: req.tenant_id.to_string(),
313 0 : timeline_id: req.timeline_id.to_string(),
314 0 : start_lsn: Lsn::INVALID.into(),
315 0 : generation: 1,
316 0 : sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
317 0 : new_sk_set: None,
318 0 : cplane_notified_generation: 1,
319 0 : deleted_at: None,
320 0 : };
321 0 : let inserted = self.persistence.insert_timeline(persistence).await?;
322 0 : if inserted {
323 0 : tracing::info!("imported timeline into db");
324 : } else {
325 0 : tracing::info!("didn't import timeline into db, as it is already present in db");
326 : }
327 0 : Ok(())
328 0 : }
329 :
330 : /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
331 0 : pub(super) async fn tenant_timeline_delete_safekeepers(
332 0 : self: &Arc<Self>,
333 0 : tenant_id: TenantId,
334 0 : timeline_id: TimelineId,
335 0 : ) -> Result<(), ApiError> {
336 0 : let tl = self
337 0 : .persistence
338 0 : .get_timeline(tenant_id, timeline_id)
339 0 : .await?;
340 0 : let Some(tl) = tl else {
341 0 : tracing::info!(
342 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
343 : );
344 0 : return Ok(());
345 : };
346 0 : self.persistence
347 0 : .timeline_set_deleted_at(tenant_id, timeline_id)
348 0 : .await?;
349 0 : let all_sks = tl
350 0 : .new_sk_set
351 0 : .iter()
352 0 : .flatten()
353 0 : .chain(tl.sk_set.iter())
354 0 : .collect::<HashSet<_>>();
355 :
356 : // Schedule reconciliations
357 0 : for &sk_id in all_sks.iter() {
358 0 : let pending_op = TimelinePendingOpPersistence {
359 0 : tenant_id: tenant_id.to_string(),
360 0 : timeline_id: timeline_id.to_string(),
361 0 : generation: i32::MAX,
362 0 : op_kind: SafekeeperTimelineOpKind::Delete,
363 0 : sk_id: *sk_id,
364 0 : };
365 0 : tracing::info!("writing pending op for sk id {sk_id}");
366 0 : self.persistence.insert_pending_op(pending_op).await?;
367 : }
368 : {
369 0 : let locked = self.inner.read().unwrap();
370 0 : for sk_id in all_sks {
371 0 : let sk_id = NodeId(*sk_id as u64);
372 0 : let Some(sk) = locked.safekeepers.get(&sk_id) else {
373 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
374 0 : "Couldn't find safekeeper with id {sk_id}"
375 0 : )));
376 : };
377 :
378 0 : let req = ScheduleRequest {
379 0 : safekeeper: Box::new(sk.clone()),
380 0 : // we don't use this for this kind, put a dummy value
381 0 : host_list: Vec::new(),
382 0 : tenant_id,
383 0 : timeline_id: Some(timeline_id),
384 0 : generation: tl.generation as u32,
385 0 : kind: SafekeeperTimelineOpKind::Delete,
386 0 : };
387 0 : locked.safekeeper_reconcilers.schedule_request(req);
388 : }
389 : }
390 0 : Ok(())
391 0 : }
392 :
393 : /// Perform tenant deletion on safekeepers.
394 0 : pub(super) async fn tenant_delete_safekeepers(
395 0 : self: &Arc<Self>,
396 0 : tenant_id: TenantId,
397 0 : ) -> Result<(), ApiError> {
398 0 : let timeline_list = self
399 0 : .persistence
400 0 : .list_timelines_for_tenant(tenant_id)
401 0 : .await?;
402 :
403 0 : if timeline_list.is_empty() {
404 : // Early exit: the tenant is either empty or not migrated to the storcon yet
405 0 : tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
406 0 : return Ok(());
407 0 : }
408 :
409 0 : let timeline_list = timeline_list
410 0 : .into_iter()
411 0 : .map(|timeline| {
412 0 : let timeline_id = TimelineId::from_str(&timeline.timeline_id)
413 0 : .context("timeline id loaded from db")
414 0 : .map_err(ApiError::InternalServerError)?;
415 0 : Ok((timeline_id, timeline))
416 0 : })
417 0 : .collect::<Result<Vec<_>, ApiError>>()?;
418 :
419 : // Remove pending ops from db, and set `deleted_at`.
420 : // We cancel them in a later iteration once we hold the state lock.
421 0 : for (timeline_id, _timeline) in timeline_list.iter() {
422 0 : self.persistence
423 0 : .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
424 0 : .await?;
425 0 : self.persistence
426 0 : .timeline_set_deleted_at(tenant_id, *timeline_id)
427 0 : .await?;
428 : }
429 :
430 : // The list of safekeepers that have any of the timelines
431 0 : let mut sk_list = HashSet::new();
432 :
433 : // List all pending ops for all timelines, cancel them
434 0 : for (_timeline_id, timeline) in timeline_list.iter() {
435 0 : let sk_iter = timeline
436 0 : .sk_set
437 0 : .iter()
438 0 : .chain(timeline.new_sk_set.iter().flatten())
439 0 : .map(|id| NodeId(*id as u64));
440 0 : sk_list.extend(sk_iter);
441 0 : }
442 :
443 0 : for &sk_id in sk_list.iter() {
444 0 : let pending_op = TimelinePendingOpPersistence {
445 0 : tenant_id: tenant_id.to_string(),
446 0 : timeline_id: String::new(),
447 0 : generation: i32::MAX,
448 0 : op_kind: SafekeeperTimelineOpKind::Delete,
449 0 : sk_id: sk_id.0 as i64,
450 0 : };
451 0 : tracing::info!("writing pending op for sk id {sk_id}");
452 0 : self.persistence.insert_pending_op(pending_op).await?;
453 : }
454 :
455 0 : let mut locked = self.inner.write().unwrap();
456 :
457 0 : for (timeline_id, _timeline) in timeline_list.iter() {
458 0 : for sk_id in sk_list.iter() {
459 0 : locked
460 0 : .safekeeper_reconcilers
461 0 : .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
462 0 : }
463 : }
464 :
465 : // unwrap is safe: we return above for an empty timeline list
466 0 : let max_generation = timeline_list
467 0 : .iter()
468 0 : .map(|(_tl_id, tl)| tl.generation as u32)
469 0 : .max()
470 0 : .unwrap();
471 :
472 0 : for sk_id in sk_list {
473 0 : let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
474 0 : tracing::warn!("Couldn't find safekeeper with id {sk_id}");
475 0 : continue;
476 : };
477 : // Add pending op for tenant deletion
478 0 : let req = ScheduleRequest {
479 0 : generation: max_generation,
480 0 : host_list: Vec::new(),
481 0 : kind: SafekeeperTimelineOpKind::Delete,
482 0 : safekeeper: Box::new(safekeeper.clone()),
483 0 : tenant_id,
484 0 : timeline_id: None,
485 0 : };
486 0 : locked.safekeeper_reconcilers.schedule_request(req);
487 : }
488 0 : Ok(())
489 0 : }
490 :
491 : /// Choose safekeepers for the new timeline: 3 in different azs.
492 0 : pub(crate) async fn safekeepers_for_new_timeline(
493 0 : &self,
494 0 : ) -> Result<Vec<SafekeeperInfo>, ApiError> {
495 0 : // Number of safekeepers in different AZs we are looking for
496 0 : let wanted_count = 3;
497 0 : let mut all_safekeepers = {
498 0 : let locked = self.inner.read().unwrap();
499 0 : locked
500 0 : .safekeepers
501 0 : .iter()
502 0 : .filter_map(|sk| {
503 0 : if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
504 : // If we don't want to schedule stuff onto the safekeeper, respect that.
505 0 : return None;
506 0 : }
507 0 : let utilization_opt = if let SafekeeperState::Available {
508 : last_seen_at: _,
509 0 : utilization,
510 0 : } = sk.1.availability()
511 : {
512 0 : Some(utilization)
513 : } else {
514 : // non-available safekeepers still get a chance for new timelines,
515 : // but put them last in the list.
516 0 : None
517 : };
518 0 : let info = SafekeeperInfo {
519 0 : hostname: sk.1.skp.host.clone(),
520 0 : id: NodeId(sk.1.skp.id as u64),
521 0 : };
522 0 : Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
523 0 : })
524 0 : .collect::<Vec<_>>()
525 0 : };
526 0 : all_safekeepers.sort_by_key(|sk| {
527 0 : (
528 0 : sk.0.as_ref()
529 0 : .map(|ut| ut.timeline_count)
530 0 : .unwrap_or(u64::MAX),
531 0 : // Use the id to decide on equal scores for reliability
532 0 : sk.1.id.0,
533 0 : )
534 0 : });
535 0 : let mut sks = Vec::new();
536 0 : let mut azs = HashSet::new();
537 0 : for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
538 0 : if !azs.insert(az_id) {
539 0 : continue;
540 0 : }
541 0 : sks.push(sk_info.clone());
542 0 : if sks.len() == wanted_count {
543 0 : break;
544 0 : }
545 : }
546 0 : if sks.len() == wanted_count {
547 0 : Ok(sks)
548 : } else {
549 0 : Err(ApiError::InternalServerError(anyhow::anyhow!(
550 0 : "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
551 0 : sks.len(),
552 0 : all_safekeepers.len(),
553 0 : )))
554 : }
555 0 : }
556 :
557 0 : pub(crate) async fn safekeepers_list(
558 0 : &self,
559 0 : ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
560 0 : let locked = self.inner.read().unwrap();
561 0 : let mut list = locked
562 0 : .safekeepers
563 0 : .iter()
564 0 : .map(|sk| sk.1.describe_response())
565 0 : .collect::<Result<Vec<_>, _>>()?;
566 0 : list.sort_by_key(|v| v.id);
567 0 : Ok(list)
568 0 : }
569 :
570 0 : pub(crate) async fn get_safekeeper(
571 0 : &self,
572 0 : id: i64,
573 0 : ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
574 0 : let locked = self.inner.read().unwrap();
575 0 : let sk = locked
576 0 : .safekeepers
577 0 : .get(&NodeId(id as u64))
578 0 : .ok_or(diesel::result::Error::NotFound)?;
579 0 : sk.describe_response()
580 0 : }
581 :
582 0 : pub(crate) async fn upsert_safekeeper(
583 0 : self: &Arc<Service>,
584 0 : record: crate::persistence::SafekeeperUpsert,
585 0 : ) -> Result<(), ApiError> {
586 0 : let node_id = NodeId(record.id as u64);
587 0 : let use_https = self.config.use_https_safekeeper_api;
588 0 :
589 0 : if use_https && record.https_port.is_none() {
590 0 : return Err(ApiError::PreconditionFailed(
591 0 : format!(
592 0 : "cannot upsert safekeeper {node_id}: \
593 0 : https is enabled, but https port is not specified"
594 0 : )
595 0 : .into(),
596 0 : ));
597 0 : }
598 0 :
599 0 : self.persistence.safekeeper_upsert(record.clone()).await?;
600 : {
601 0 : let mut locked = self.inner.write().unwrap();
602 0 : let mut safekeepers = (*locked.safekeepers).clone();
603 0 : match safekeepers.entry(node_id) {
604 0 : std::collections::hash_map::Entry::Occupied(mut entry) => entry
605 0 : .get_mut()
606 0 : .update_from_record(record)
607 0 : .expect("all preconditions should be checked before upsert to database"),
608 0 : std::collections::hash_map::Entry::Vacant(entry) => {
609 0 : entry.insert(
610 0 : Safekeeper::from_persistence(
611 0 : crate::persistence::SafekeeperPersistence::from_upsert(
612 0 : record,
613 0 : SkSchedulingPolicy::Pause,
614 0 : ),
615 0 : CancellationToken::new(),
616 0 : use_https,
617 0 : )
618 0 : .expect("all preconditions should be checked before upsert to database"),
619 0 : );
620 0 : }
621 : }
622 0 : locked
623 0 : .safekeeper_reconcilers
624 0 : .start_reconciler(node_id, self);
625 0 : locked.safekeepers = Arc::new(safekeepers);
626 0 : metrics::METRICS_REGISTRY
627 0 : .metrics_group
628 0 : .storage_controller_safekeeper_nodes
629 0 : .set(locked.safekeepers.len() as i64);
630 0 : metrics::METRICS_REGISTRY
631 0 : .metrics_group
632 0 : .storage_controller_https_safekeeper_nodes
633 0 : .set(
634 0 : locked
635 0 : .safekeepers
636 0 : .values()
637 0 : .filter(|s| s.has_https_port())
638 0 : .count() as i64,
639 0 : );
640 0 : }
641 0 : Ok(())
642 0 : }
643 :
644 0 : pub(crate) async fn set_safekeeper_scheduling_policy(
645 0 : self: &Arc<Service>,
646 0 : id: i64,
647 0 : scheduling_policy: SkSchedulingPolicy,
648 0 : ) -> Result<(), DatabaseError> {
649 0 : self.persistence
650 0 : .set_safekeeper_scheduling_policy(id, scheduling_policy)
651 0 : .await?;
652 0 : let node_id = NodeId(id as u64);
653 0 : // After the change has been persisted successfully, update the in-memory state
654 0 : {
655 0 : let mut locked = self.inner.write().unwrap();
656 0 : let mut safekeepers = (*locked.safekeepers).clone();
657 0 : let sk = safekeepers
658 0 : .get_mut(&node_id)
659 0 : .ok_or(DatabaseError::Logical("Not found".to_string()))?;
660 0 : sk.set_scheduling_policy(scheduling_policy);
661 0 :
662 0 : match scheduling_policy {
663 0 : SkSchedulingPolicy::Active => {
664 0 : locked
665 0 : .safekeeper_reconcilers
666 0 : .start_reconciler(node_id, self);
667 0 : }
668 0 : SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
669 0 : locked.safekeeper_reconcilers.stop_reconciler(node_id);
670 0 : }
671 : }
672 :
673 0 : locked.safekeepers = Arc::new(safekeepers);
674 0 : }
675 0 : Ok(())
676 0 : }
677 : }
|