Line data Source code
1 : use std::collections::HashSet;
2 : use std::str::FromStr;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use super::safekeeper_reconciler::ScheduleRequest;
7 : use crate::compute_hook;
8 : use crate::heartbeater::SafekeeperState;
9 : use crate::id_lock_map::trace_shared_lock;
10 : use crate::metrics;
11 : use crate::persistence::{
12 : DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
13 : TimelineUpdate,
14 : };
15 : use crate::safekeeper::Safekeeper;
16 : use crate::safekeeper_client::SafekeeperClient;
17 : use crate::service::TenantOperations;
18 : use crate::timeline_import::TimelineImportFinalizeError;
19 : use anyhow::Context;
20 : use http_utils::error::ApiError;
21 : use pageserver_api::controller_api::{
22 : SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
23 : TimelineSafekeeperMigrateRequest,
24 : };
25 : use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
26 : use safekeeper_api::PgVersionId;
27 : use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
28 : use safekeeper_api::models::{
29 : PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
30 : TimelineMembershipSwitchResponse,
31 : };
32 : use safekeeper_api::{INITIAL_TERM, Term};
33 : use safekeeper_client::mgmt_api;
34 : use tokio::task::JoinSet;
35 : use tokio_util::sync::CancellationToken;
36 : use utils::id::{NodeId, TenantId, TimelineId};
37 : use utils::logging::SecretString;
38 : use utils::lsn::Lsn;
39 :
40 : use super::Service;
41 :
42 : impl Service {
43 0 : fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, anyhow::Error> {
44 0 : let members = safekeepers
45 0 : .iter()
46 0 : .map(|sk| sk.get_safekeeper_id())
47 0 : .collect::<Vec<_>>();
48 :
49 0 : MemberSet::new(members)
50 0 : }
51 :
52 0 : fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
53 0 : let safekeepers = {
54 0 : let locked = self.inner.read().unwrap();
55 0 : locked.safekeepers.clone()
56 : };
57 :
58 0 : ids.iter()
59 0 : .map(|&id| {
60 0 : let node_id = NodeId(id as u64);
61 0 : safekeepers.get(&node_id).cloned().ok_or_else(|| {
62 0 : ApiError::InternalServerError(anyhow::anyhow!(
63 0 : "safekeeper {node_id} is not registered"
64 0 : ))
65 0 : })
66 0 : })
67 0 : .collect::<Result<Vec<_>, _>>()
68 0 : }
69 :
70 : /// Timeline creation on safekeepers
71 : ///
72 : /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
73 : /// where `left` contains the list of safekeepers that didn't have a successful response.
74 : /// Assumes tenant lock is held while calling this function.
75 0 : pub(super) async fn tenant_timeline_create_safekeepers_quorum(
76 0 : &self,
77 0 : tenant_id: TenantId,
78 0 : timeline_id: TimelineId,
79 0 : pg_version: PgVersionId,
80 0 : timeline_persistence: &TimelinePersistence,
81 0 : ) -> Result<Vec<NodeId>, ApiError> {
82 0 : let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
83 :
84 0 : let mset = Self::make_member_set(&safekeepers).map_err(ApiError::InternalServerError)?;
85 0 : let mconf = safekeeper_api::membership::Configuration::new(mset);
86 :
87 0 : let req = safekeeper_api::models::TimelineCreateRequest {
88 0 : commit_lsn: None,
89 0 : mconf,
90 0 : pg_version,
91 0 : start_lsn: timeline_persistence.start_lsn.0,
92 0 : system_id: None,
93 0 : tenant_id,
94 0 : timeline_id,
95 0 : wal_seg_size: None,
96 0 : };
97 :
98 : const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
99 :
100 0 : let results = self
101 0 : .tenant_timeline_safekeeper_op_quorum(
102 0 : &safekeepers,
103 0 : move |client| {
104 0 : let req = req.clone();
105 0 : async move { client.create_timeline(&req).await }
106 0 : },
107 : SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
108 : )
109 0 : .await?;
110 :
111 0 : Ok(results
112 0 : .into_iter()
113 0 : .enumerate()
114 0 : .filter_map(|(idx, res)| {
115 0 : if res.is_ok() {
116 0 : None // Success, don't return this safekeeper
117 : } else {
118 0 : Some(safekeepers[idx].get_id()) // Failure, return this safekeeper
119 : }
120 0 : })
121 0 : .collect::<Vec<_>>())
122 0 : }
123 :
124 : /// Perform an operation on a list of safekeepers in parallel with retries.
125 : ///
126 : /// If desired_success_count is set, the remaining operations will be cancelled
127 : /// when the desired number of successful responses is reached.
128 : ///
129 : /// Return the results of the operation on each safekeeper in the input order.
130 0 : async fn tenant_timeline_safekeeper_op<T, O, F>(
131 0 : &self,
132 0 : safekeepers: &[Safekeeper],
133 0 : op: O,
134 0 : max_retries: u32,
135 0 : timeout: Duration,
136 0 : desired_success_count: Option<usize>,
137 0 : ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
138 0 : where
139 0 : O: FnMut(SafekeeperClient) -> F + Send + 'static,
140 0 : O: Clone,
141 0 : F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
142 0 : T: Sync + Send + 'static,
143 0 : {
144 0 : let warn_threshold = std::cmp::min(3, max_retries);
145 0 : let jwt = self
146 0 : .config
147 0 : .safekeeper_jwt_token
148 0 : .clone()
149 0 : .map(SecretString::from);
150 0 : let mut joinset = JoinSet::new();
151 :
152 0 : let cancel_new_retries = CancellationToken::new();
153 :
154 0 : for (idx, sk) in safekeepers.iter().enumerate() {
155 0 : let sk = sk.clone();
156 0 : let http_client = self.http_client.clone();
157 0 : let jwt = jwt.clone();
158 0 : let op = op.clone();
159 0 : let cancel_new_retries = cancel_new_retries.clone();
160 0 : joinset.spawn(async move {
161 0 : let res = sk
162 0 : .with_client_retries(
163 0 : op,
164 0 : &http_client,
165 0 : &jwt,
166 0 : warn_threshold,
167 0 : max_retries,
168 0 : // TODO(diko): This is a wrong timeout.
169 0 : // It should be scaled to the retry count.
170 0 : timeout,
171 0 : &cancel_new_retries,
172 0 : )
173 0 : .await;
174 0 : (idx, res)
175 0 : });
176 : }
177 :
178 : // Initialize results with timeout errors in case we never get a response.
179 0 : let mut results: Vec<mgmt_api::Result<T>> = safekeepers
180 0 : .iter()
181 0 : .map(|_| {
182 0 : Err(mgmt_api::Error::Timeout(
183 0 : "safekeeper operation timed out".to_string(),
184 0 : ))
185 0 : })
186 0 : .collect();
187 :
188 : // After we have built the joinset, we now wait for the tasks to complete,
189 : // but with a specified timeout to make sure we return swiftly, either with
190 : // a failure or success.
191 0 : let reconcile_deadline = tokio::time::Instant::now() + timeout;
192 :
193 : // Wait until all tasks finish or timeout is hit, whichever occurs
194 : // first.
195 0 : let mut result_count = 0;
196 0 : let mut success_count = 0;
197 : loop {
198 0 : if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
199 : {
200 0 : let Some(res) = res else { break };
201 0 : match res {
202 0 : Ok((idx, res)) => {
203 0 : let sk = &safekeepers[idx];
204 0 : tracing::info!(
205 0 : "response from safekeeper id:{} at {}: {:?}",
206 0 : sk.get_id(),
207 : sk.skp.host,
208 : // Only print errors, as there is no Debug trait for T.
209 0 : res.as_ref().map(|_| ()),
210 : );
211 0 : if res.is_ok() {
212 0 : success_count += 1;
213 0 : if desired_success_count == Some(success_count) {
214 0 : // We reached the desired number of successful responses, cancel new retries for
215 0 : // the remaining safekeepers.
216 0 : // It does not cancel already started requests, we will still wait for them.
217 0 : cancel_new_retries.cancel();
218 0 : }
219 0 : }
220 0 : results[idx] = res;
221 0 : result_count += 1;
222 : }
223 0 : Err(join_err) => {
224 0 : tracing::info!("join_err for task in joinset: {join_err}");
225 : }
226 : }
227 : } else {
228 0 : tracing::info!("timeout for operation call after {result_count} responses",);
229 0 : break;
230 : }
231 : }
232 :
233 0 : Ok(results)
234 0 : }
235 :
236 : /// Perform an operation on a list of safekeepers in parallel with retries,
237 : /// and validates that we reach a quorum of successful responses.
238 : ///
239 : /// Return the results of the operation on each safekeeper in the input order.
240 : /// It's guaranteed that at least a quorum of the responses are successful.
241 0 : async fn tenant_timeline_safekeeper_op_quorum<T, O, F>(
242 0 : &self,
243 0 : safekeepers: &[Safekeeper],
244 0 : op: O,
245 0 : timeout: Duration,
246 0 : ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
247 0 : where
248 0 : O: FnMut(SafekeeperClient) -> F,
249 0 : O: Clone + Send + 'static,
250 0 : F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
251 0 : T: Sync + Send + 'static,
252 0 : {
253 0 : let target_sk_count = safekeepers.len();
254 :
255 0 : if target_sk_count == 0 {
256 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
257 0 : "timeline configured without any safekeepers"
258 0 : )));
259 0 : }
260 :
261 0 : if target_sk_count < self.config.timeline_safekeeper_count {
262 0 : tracing::warn!(
263 0 : "running a quorum operation with {} safekeepers, which is less than configured {} safekeepers per timeline",
264 : target_sk_count,
265 : self.config.timeline_safekeeper_count
266 : );
267 0 : }
268 :
269 0 : let quorum_size = target_sk_count / 2 + 1;
270 0 : let max_retries = 3;
271 :
272 0 : let results = self
273 0 : .tenant_timeline_safekeeper_op(safekeepers, op, max_retries, timeout, Some(quorum_size))
274 0 : .await?;
275 :
276 : // Now check if quorum was reached in results.
277 0 : let success_count = results.iter().filter(|res| res.is_ok()).count();
278 0 : if success_count < quorum_size {
279 : // Failure
280 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
281 0 : "not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
282 0 : )));
283 0 : }
284 :
285 0 : Ok(results)
286 0 : }
287 :
288 : /// Create timeline in controller database and on safekeepers.
289 : /// `timeline_info` is result of timeline creation on pageserver.
290 : ///
291 : /// All actions must be idempotent as the call is retried until success. It
292 : /// tries to create timeline in the db and on at least majority of
293 : /// safekeepers + queue creation for safekeepers which missed it in the db
294 : /// for infinite retries; after that, call returns Ok.
295 : ///
296 : /// The idea is that once this is reached as long as we have alive majority
297 : /// of safekeepers it is expected to get eventually operational as storcon
298 : /// will be able to seed timeline on nodes which missed creation by making
299 : /// pull_timeline from peers. On the other hand we don't want to fail
300 : /// timeline creation if one safekeeper is down.
301 0 : pub(super) async fn tenant_timeline_create_safekeepers(
302 0 : self: &Arc<Self>,
303 0 : tenant_id: TenantId,
304 0 : timeline_info: &TimelineInfo,
305 0 : read_only: bool,
306 0 : ) -> Result<SafekeepersInfo, ApiError> {
307 0 : let timeline_id = timeline_info.timeline_id;
308 0 : let pg_version = PgVersionId::from(timeline_info.pg_version);
309 : // Initially start_lsn is determined by last_record_lsn in pageserver
310 : // response as it does initdb. However, later we persist it and in sk
311 : // creation calls replace with the value from the timeline row if it
312 : // previously existed as on retries in theory endpoint might have
313 : // already written some data and advanced last_record_lsn, while we want
314 : // safekeepers to have consistent start_lsn.
315 0 : let start_lsn = timeline_info.last_record_lsn;
316 :
317 : // Choose initial set of safekeepers respecting affinity
318 0 : let sks = if !read_only {
319 0 : self.safekeepers_for_new_timeline().await?
320 : } else {
321 0 : Vec::new()
322 : };
323 0 : let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
324 : // Add timeline to db
325 0 : let mut timeline_persist = TimelinePersistence {
326 0 : tenant_id: tenant_id.to_string(),
327 0 : timeline_id: timeline_id.to_string(),
328 0 : start_lsn: start_lsn.into(),
329 0 : generation: 1,
330 0 : sk_set: sks_persistence.clone(),
331 0 : new_sk_set: None,
332 0 : cplane_notified_generation: 0,
333 0 : deleted_at: None,
334 0 : sk_set_notified_generation: 0,
335 0 : };
336 0 : let inserted = self
337 0 : .persistence
338 0 : .insert_timeline(timeline_persist.clone())
339 0 : .await?;
340 0 : if !inserted {
341 0 : if let Some(existent_persist) = self
342 0 : .persistence
343 0 : .get_timeline(tenant_id, timeline_id)
344 0 : .await?
345 0 : {
346 0 : // Replace with what we have in the db, to get stuff like the generation right.
347 0 : // We do still repeat the http calls to the safekeepers. After all, we could have
348 0 : // crashed right after the wrote to the DB.
349 0 : timeline_persist = existent_persist;
350 0 : } else {
351 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
352 0 : "insertion said timeline already in db, but looking it up, it was gone"
353 0 : )));
354 : }
355 0 : }
356 0 : let ret = SafekeepersInfo {
357 0 : generation: timeline_persist.generation as u32,
358 0 : safekeepers: sks.clone(),
359 0 : tenant_id,
360 0 : timeline_id,
361 0 : };
362 0 : if read_only {
363 0 : return Ok(ret);
364 0 : }
365 :
366 : // Create the timeline on a quorum of safekeepers
367 0 : let remaining = self
368 0 : .tenant_timeline_create_safekeepers_quorum(
369 0 : tenant_id,
370 0 : timeline_id,
371 0 : pg_version,
372 0 : &timeline_persist,
373 0 : )
374 0 : .await?;
375 :
376 : // For the remaining safekeepers, take care of their reconciliation asynchronously
377 0 : for &remaining_id in remaining.iter() {
378 0 : let pending_op = TimelinePendingOpPersistence {
379 0 : tenant_id: tenant_id.to_string(),
380 0 : timeline_id: timeline_id.to_string(),
381 0 : generation: timeline_persist.generation,
382 0 : op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
383 0 : sk_id: remaining_id.0 as i64,
384 0 : };
385 0 : tracing::info!("writing pending op for sk id {remaining_id}");
386 0 : self.persistence.insert_pending_op(pending_op).await?;
387 : }
388 0 : if !remaining.is_empty() {
389 0 : let locked = self.inner.read().unwrap();
390 0 : for remaining_id in remaining {
391 0 : let Some(sk) = locked.safekeepers.get(&remaining_id) else {
392 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
393 0 : "Couldn't find safekeeper with id {remaining_id}"
394 0 : )));
395 : };
396 0 : let Ok(host_list) = sks
397 0 : .iter()
398 0 : .map(|sk| {
399 : Ok((
400 0 : sk.id,
401 0 : locked
402 0 : .safekeepers
403 0 : .get(&sk.id)
404 0 : .ok_or_else(|| {
405 0 : ApiError::InternalServerError(anyhow::anyhow!(
406 0 : "Couldn't find safekeeper with id {} to pull from",
407 0 : sk.id
408 0 : ))
409 0 : })?
410 0 : .base_url(),
411 : ))
412 0 : })
413 0 : .collect::<Result<_, ApiError>>()
414 : else {
415 0 : continue;
416 : };
417 0 : let req = ScheduleRequest {
418 0 : safekeeper: Box::new(sk.clone()),
419 0 : host_list,
420 0 : tenant_id,
421 0 : timeline_id: Some(timeline_id),
422 0 : generation: timeline_persist.generation as u32,
423 0 : kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
424 0 : };
425 0 : locked.safekeeper_reconcilers.schedule_request(req);
426 : }
427 0 : }
428 :
429 0 : Ok(ret)
430 0 : }
431 :
432 0 : pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
433 0 : self: &Arc<Self>,
434 0 : tenant_id: TenantId,
435 0 : timeline_info: TimelineInfo,
436 0 : ) -> Result<(), TimelineImportFinalizeError> {
437 : const BACKOFF: Duration = Duration::from_secs(5);
438 :
439 : loop {
440 0 : if self.cancel.is_cancelled() {
441 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
442 0 : }
443 :
444 : // This function is only used in non-read-only scenarios
445 0 : let read_only = false;
446 0 : let res = self
447 0 : .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
448 0 : .await;
449 :
450 0 : match res {
451 : Ok(_) => {
452 0 : tracing::info!("Timeline created on safekeepers");
453 0 : break;
454 : }
455 0 : Err(err) => {
456 0 : tracing::error!("Failed to create timeline on safekeepers: {err}");
457 0 : tokio::select! {
458 0 : _ = self.cancel.cancelled() => {
459 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
460 : },
461 0 : _ = tokio::time::sleep(BACKOFF) => {}
462 : };
463 : }
464 : }
465 : }
466 :
467 0 : Ok(())
468 0 : }
469 :
470 : /// Directly insert the timeline into the database without reconciling it with safekeepers.
471 : ///
472 : /// Useful if the timeline already exists on the specified safekeepers,
473 : /// but we want to make it storage controller managed.
474 0 : pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
475 0 : let persistence = TimelinePersistence {
476 0 : tenant_id: req.tenant_id.to_string(),
477 0 : timeline_id: req.timeline_id.to_string(),
478 0 : start_lsn: req.start_lsn.into(),
479 : generation: 1,
480 0 : sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
481 0 : new_sk_set: None,
482 : cplane_notified_generation: 1,
483 0 : deleted_at: None,
484 : sk_set_notified_generation: 1,
485 : };
486 0 : let inserted = self
487 0 : .persistence
488 0 : .insert_timeline(persistence.clone())
489 0 : .await?;
490 0 : if inserted {
491 0 : tracing::info!("imported timeline into db");
492 0 : return Ok(());
493 0 : }
494 0 : tracing::info!("timeline already present in db, updating");
495 :
496 0 : let update = TimelineUpdate {
497 0 : tenant_id: persistence.tenant_id,
498 0 : timeline_id: persistence.timeline_id,
499 0 : start_lsn: persistence.start_lsn,
500 0 : sk_set: persistence.sk_set,
501 0 : new_sk_set: persistence.new_sk_set,
502 0 : };
503 0 : self.persistence.update_timeline_unsafe(update).await?;
504 0 : tracing::info!("timeline updated");
505 :
506 0 : Ok(())
507 0 : }
508 :
509 : /// Locate safekeepers for a timeline.
510 : /// Return the generation, sk_set and new_sk_set if present.
511 : /// If the timeline is not storcon-managed, return NotFound.
512 0 : pub(crate) async fn tenant_timeline_locate(
513 0 : &self,
514 0 : tenant_id: TenantId,
515 0 : timeline_id: TimelineId,
516 0 : ) -> Result<TimelineLocateResponse, ApiError> {
517 0 : let timeline = self
518 0 : .persistence
519 0 : .get_timeline(tenant_id, timeline_id)
520 0 : .await?;
521 :
522 0 : let Some(timeline) = timeline else {
523 0 : return Err(ApiError::NotFound(
524 0 : anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
525 0 : ));
526 : };
527 :
528 : Ok(TimelineLocateResponse {
529 0 : generation: SafekeeperGeneration::new(timeline.generation as u32),
530 0 : sk_set: timeline
531 0 : .sk_set
532 0 : .iter()
533 0 : .map(|id| NodeId(*id as u64))
534 0 : .collect(),
535 0 : new_sk_set: timeline
536 0 : .new_sk_set
537 0 : .map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
538 : })
539 0 : }
540 :
541 : /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
542 0 : pub(super) async fn tenant_timeline_delete_safekeepers(
543 0 : self: &Arc<Self>,
544 0 : tenant_id: TenantId,
545 0 : timeline_id: TimelineId,
546 0 : ) -> Result<(), ApiError> {
547 0 : let tl = self
548 0 : .persistence
549 0 : .get_timeline(tenant_id, timeline_id)
550 0 : .await?;
551 0 : let Some(tl) = tl else {
552 0 : tracing::info!(
553 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
554 : );
555 0 : return Ok(());
556 : };
557 0 : self.persistence
558 0 : .timeline_set_deleted_at(tenant_id, timeline_id)
559 0 : .await?;
560 0 : let all_sks = tl
561 0 : .new_sk_set
562 0 : .iter()
563 0 : .flatten()
564 0 : .chain(tl.sk_set.iter())
565 0 : .collect::<HashSet<_>>();
566 :
567 : // The timeline has no safekeepers: we need to delete it from the db manually,
568 : // as no safekeeper reconciler will get to it
569 0 : if all_sks.is_empty() {
570 0 : if let Err(err) = self
571 0 : .persistence
572 0 : .delete_timeline(tenant_id, timeline_id)
573 0 : .await
574 : {
575 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
576 0 : }
577 0 : }
578 :
579 : // Schedule reconciliations
580 0 : for &sk_id in all_sks.iter() {
581 0 : let pending_op = TimelinePendingOpPersistence {
582 0 : tenant_id: tenant_id.to_string(),
583 0 : timeline_id: timeline_id.to_string(),
584 0 : generation: i32::MAX,
585 0 : op_kind: SafekeeperTimelineOpKind::Delete,
586 0 : sk_id: *sk_id,
587 0 : };
588 0 : tracing::info!("writing pending op for sk id {sk_id}");
589 0 : self.persistence.insert_pending_op(pending_op).await?;
590 : }
591 : {
592 0 : let locked = self.inner.read().unwrap();
593 0 : for sk_id in all_sks {
594 0 : let sk_id = NodeId(*sk_id as u64);
595 0 : let Some(sk) = locked.safekeepers.get(&sk_id) else {
596 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
597 0 : "Couldn't find safekeeper with id {sk_id}"
598 0 : )));
599 : };
600 :
601 0 : let req = ScheduleRequest {
602 0 : safekeeper: Box::new(sk.clone()),
603 0 : // we don't use this for this kind, put a dummy value
604 0 : host_list: Vec::new(),
605 0 : tenant_id,
606 0 : timeline_id: Some(timeline_id),
607 0 : generation: tl.generation as u32,
608 0 : kind: SafekeeperTimelineOpKind::Delete,
609 0 : };
610 0 : locked.safekeeper_reconcilers.schedule_request(req);
611 : }
612 : }
613 0 : Ok(())
614 0 : }
615 :
616 : /// Perform tenant deletion on safekeepers.
617 0 : pub(super) async fn tenant_delete_safekeepers(
618 0 : self: &Arc<Self>,
619 0 : tenant_id: TenantId,
620 0 : ) -> Result<(), ApiError> {
621 0 : let timeline_list = self
622 0 : .persistence
623 0 : .list_timelines_for_tenant(tenant_id)
624 0 : .await?;
625 :
626 0 : if timeline_list.is_empty() {
627 : // Early exit: the tenant is either empty or not migrated to the storcon yet
628 0 : tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
629 0 : return Ok(());
630 0 : }
631 :
632 0 : let timeline_list = timeline_list
633 0 : .into_iter()
634 0 : .map(|timeline| {
635 0 : let timeline_id = TimelineId::from_str(&timeline.timeline_id)
636 0 : .context("timeline id loaded from db")
637 0 : .map_err(ApiError::InternalServerError)?;
638 0 : Ok((timeline_id, timeline))
639 0 : })
640 0 : .collect::<Result<Vec<_>, ApiError>>()?;
641 :
642 : // Remove pending ops from db, and set `deleted_at`.
643 : // We cancel them in a later iteration once we hold the state lock.
644 0 : for (timeline_id, _timeline) in timeline_list.iter() {
645 0 : self.persistence
646 0 : .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
647 0 : .await?;
648 0 : self.persistence
649 0 : .timeline_set_deleted_at(tenant_id, *timeline_id)
650 0 : .await?;
651 : }
652 :
653 : // The list of safekeepers that have any of the timelines
654 0 : let mut sk_list = HashSet::new();
655 :
656 : // List all pending ops for all timelines, cancel them
657 0 : for (_timeline_id, timeline) in timeline_list.iter() {
658 0 : let sk_iter = timeline
659 0 : .sk_set
660 0 : .iter()
661 0 : .chain(timeline.new_sk_set.iter().flatten())
662 0 : .map(|id| NodeId(*id as u64));
663 0 : sk_list.extend(sk_iter);
664 : }
665 :
666 0 : for &sk_id in sk_list.iter() {
667 0 : let pending_op = TimelinePendingOpPersistence {
668 0 : tenant_id: tenant_id.to_string(),
669 0 : timeline_id: String::new(),
670 0 : generation: i32::MAX,
671 0 : op_kind: SafekeeperTimelineOpKind::Delete,
672 0 : sk_id: sk_id.0 as i64,
673 0 : };
674 0 : tracing::info!("writing pending op for sk id {sk_id}");
675 0 : self.persistence.insert_pending_op(pending_op).await?;
676 : }
677 :
678 0 : let mut locked = self.inner.write().unwrap();
679 :
680 0 : for (timeline_id, _timeline) in timeline_list.iter() {
681 0 : for sk_id in sk_list.iter() {
682 0 : locked
683 0 : .safekeeper_reconcilers
684 0 : .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
685 0 : }
686 : }
687 :
688 : // unwrap is safe: we return above for an empty timeline list
689 0 : let max_generation = timeline_list
690 0 : .iter()
691 0 : .map(|(_tl_id, tl)| tl.generation as u32)
692 0 : .max()
693 0 : .unwrap();
694 :
695 0 : for sk_id in sk_list {
696 0 : let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
697 0 : tracing::warn!("Couldn't find safekeeper with id {sk_id}");
698 0 : continue;
699 : };
700 : // Add pending op for tenant deletion
701 0 : let req = ScheduleRequest {
702 0 : generation: max_generation,
703 0 : host_list: Vec::new(),
704 0 : kind: SafekeeperTimelineOpKind::Delete,
705 0 : safekeeper: Box::new(safekeeper.clone()),
706 0 : tenant_id,
707 0 : timeline_id: None,
708 0 : };
709 0 : locked.safekeeper_reconcilers.schedule_request(req);
710 : }
711 0 : Ok(())
712 0 : }
713 :
714 : /// Choose safekeepers for the new timeline in different azs.
715 : /// 3 are choosen by default, but may be configured via config (for testing).
716 0 : pub(crate) async fn safekeepers_for_new_timeline(
717 0 : &self,
718 0 : ) -> Result<Vec<SafekeeperInfo>, ApiError> {
719 0 : let mut all_safekeepers = {
720 0 : let locked = self.inner.read().unwrap();
721 0 : locked
722 0 : .safekeepers
723 0 : .iter()
724 0 : .filter_map(|sk| {
725 0 : if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
726 : // If we don't want to schedule stuff onto the safekeeper, respect that.
727 0 : return None;
728 0 : }
729 0 : let utilization_opt = if let SafekeeperState::Available {
730 : last_seen_at: _,
731 0 : utilization,
732 0 : } = sk.1.availability()
733 : {
734 0 : Some(utilization)
735 : } else {
736 : // non-available safekeepers still get a chance for new timelines,
737 : // but put them last in the list.
738 0 : None
739 : };
740 0 : let info = SafekeeperInfo {
741 0 : hostname: sk.1.skp.host.clone(),
742 0 : id: NodeId(sk.1.skp.id as u64),
743 0 : };
744 0 : Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
745 0 : })
746 0 : .collect::<Vec<_>>()
747 : };
748 0 : all_safekeepers.sort_by_key(|sk| {
749 : (
750 0 : sk.0.as_ref()
751 0 : .map(|ut| ut.timeline_count)
752 0 : .unwrap_or(u64::MAX),
753 : // Use the id to decide on equal scores for reliability
754 0 : sk.1.id.0,
755 : )
756 0 : });
757 : // Number of safekeepers in different AZs we are looking for
758 0 : let wanted_count = self.config.timeline_safekeeper_count;
759 :
760 0 : let mut sks = Vec::new();
761 0 : let mut azs = HashSet::new();
762 0 : for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
763 0 : if !azs.insert(az_id) {
764 0 : continue;
765 0 : }
766 0 : sks.push(sk_info.clone());
767 0 : if sks.len() == wanted_count {
768 0 : break;
769 0 : }
770 : }
771 0 : if sks.len() == wanted_count {
772 0 : Ok(sks)
773 : } else {
774 0 : Err(ApiError::InternalServerError(anyhow::anyhow!(
775 0 : "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
776 0 : sks.len(),
777 0 : all_safekeepers.len(),
778 0 : )))
779 : }
780 0 : }
781 :
782 0 : pub(crate) async fn safekeepers_list(
783 0 : &self,
784 0 : ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
785 0 : let locked = self.inner.read().unwrap();
786 0 : let mut list = locked
787 0 : .safekeepers
788 0 : .iter()
789 0 : .map(|sk| sk.1.describe_response())
790 0 : .collect::<Result<Vec<_>, _>>()?;
791 0 : list.sort_by_key(|v| v.id);
792 0 : Ok(list)
793 0 : }
794 :
795 0 : pub(crate) async fn get_safekeeper(
796 0 : &self,
797 0 : id: i64,
798 0 : ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
799 0 : let locked = self.inner.read().unwrap();
800 0 : let sk = locked
801 0 : .safekeepers
802 0 : .get(&NodeId(id as u64))
803 0 : .ok_or(diesel::result::Error::NotFound)?;
804 0 : sk.describe_response()
805 0 : }
806 :
807 0 : pub(crate) async fn upsert_safekeeper(
808 0 : self: &Arc<Service>,
809 0 : record: crate::persistence::SafekeeperUpsert,
810 0 : ) -> Result<(), ApiError> {
811 0 : let node_id = NodeId(record.id as u64);
812 0 : let use_https = self.config.use_https_safekeeper_api;
813 :
814 0 : if use_https && record.https_port.is_none() {
815 0 : return Err(ApiError::PreconditionFailed(
816 0 : format!(
817 0 : "cannot upsert safekeeper {node_id}: \
818 0 : https is enabled, but https port is not specified"
819 0 : )
820 0 : .into(),
821 0 : ));
822 0 : }
823 :
824 0 : self.persistence.safekeeper_upsert(record.clone()).await?;
825 : {
826 0 : let mut locked = self.inner.write().unwrap();
827 0 : let mut safekeepers = (*locked.safekeepers).clone();
828 0 : match safekeepers.entry(node_id) {
829 0 : std::collections::hash_map::Entry::Occupied(mut entry) => entry
830 0 : .get_mut()
831 0 : .update_from_record(record)
832 0 : .expect("all preconditions should be checked before upsert to database"),
833 0 : std::collections::hash_map::Entry::Vacant(entry) => {
834 0 : entry.insert(
835 0 : Safekeeper::from_persistence(
836 0 : crate::persistence::SafekeeperPersistence::from_upsert(
837 0 : record,
838 0 : SkSchedulingPolicy::Activating,
839 0 : ),
840 0 : CancellationToken::new(),
841 0 : use_https,
842 0 : )
843 0 : .expect("all preconditions should be checked before upsert to database"),
844 0 : );
845 0 : }
846 : }
847 0 : locked
848 0 : .safekeeper_reconcilers
849 0 : .start_reconciler(node_id, self);
850 0 : locked.safekeepers = Arc::new(safekeepers);
851 0 : metrics::METRICS_REGISTRY
852 0 : .metrics_group
853 0 : .storage_controller_safekeeper_nodes
854 0 : .set(locked.safekeepers.len() as i64);
855 0 : metrics::METRICS_REGISTRY
856 0 : .metrics_group
857 0 : .storage_controller_https_safekeeper_nodes
858 0 : .set(
859 0 : locked
860 0 : .safekeepers
861 0 : .values()
862 0 : .filter(|s| s.has_https_port())
863 0 : .count() as i64,
864 : );
865 : }
866 0 : Ok(())
867 0 : }
868 :
869 0 : pub(crate) async fn set_safekeeper_scheduling_policy(
870 0 : self: &Arc<Service>,
871 0 : id: i64,
872 0 : scheduling_policy: SkSchedulingPolicy,
873 0 : ) -> Result<(), DatabaseError> {
874 0 : self.persistence
875 0 : .set_safekeeper_scheduling_policy(id, scheduling_policy)
876 0 : .await?;
877 0 : let node_id = NodeId(id as u64);
878 : // After the change has been persisted successfully, update the in-memory state
879 0 : self.set_safekeeper_scheduling_policy_in_mem(node_id, scheduling_policy)
880 0 : .await
881 0 : }
882 :
883 0 : pub(crate) async fn set_safekeeper_scheduling_policy_in_mem(
884 0 : self: &Arc<Service>,
885 0 : node_id: NodeId,
886 0 : scheduling_policy: SkSchedulingPolicy,
887 0 : ) -> Result<(), DatabaseError> {
888 0 : let mut locked = self.inner.write().unwrap();
889 0 : let mut safekeepers = (*locked.safekeepers).clone();
890 0 : let sk = safekeepers
891 0 : .get_mut(&node_id)
892 0 : .ok_or(DatabaseError::Logical("Not found".to_string()))?;
893 0 : sk.set_scheduling_policy(scheduling_policy);
894 :
895 0 : match scheduling_policy {
896 0 : SkSchedulingPolicy::Active => {
897 0 : locked
898 0 : .safekeeper_reconcilers
899 0 : .start_reconciler(node_id, self);
900 0 : }
901 : SkSchedulingPolicy::Decomissioned
902 : | SkSchedulingPolicy::Pause
903 0 : | SkSchedulingPolicy::Activating => {
904 0 : locked.safekeeper_reconcilers.stop_reconciler(node_id);
905 0 : }
906 : }
907 :
908 0 : locked.safekeepers = Arc::new(safekeepers);
909 0 : Ok(())
910 0 : }
911 :
912 : /// Call `switch_timeline_membership` on all safekeepers with retries
913 : /// till the quorum of successful responses is reached.
914 : ///
915 : /// If min_position is not None, validates that majority of safekeepers
916 : /// reached at least min_position.
917 : ///
918 : /// If update_notified_generation is set, also updates sk_set_notified_generation
919 : /// in the timelines table.
920 : ///
921 : /// Return responses from safekeepers in the input order.
922 0 : async fn tenant_timeline_set_membership_quorum(
923 0 : self: &Arc<Self>,
924 0 : tenant_id: TenantId,
925 0 : timeline_id: TimelineId,
926 0 : safekeepers: &[Safekeeper],
927 0 : mconf: &membership::Configuration,
928 0 : min_position: Option<(Term, Lsn)>,
929 0 : update_notified_generation: bool,
930 0 : ) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
931 0 : let req = TimelineMembershipSwitchRequest {
932 0 : mconf: mconf.clone(),
933 0 : };
934 :
935 : const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
936 :
937 0 : let results = self
938 0 : .tenant_timeline_safekeeper_op_quorum(
939 0 : safekeepers,
940 0 : move |client| {
941 0 : let req = req.clone();
942 0 : async move {
943 0 : let mut res = client
944 0 : .switch_timeline_membership(tenant_id, timeline_id, &req)
945 0 : .await;
946 :
947 : // If min_position is not reached, map the response to an error,
948 : // so it isn't counted toward the quorum.
949 0 : if let Some(min_position) = min_position {
950 0 : if let Ok(ok_res) = &res {
951 0 : if (ok_res.last_log_term, ok_res.flush_lsn) < min_position {
952 0 : // Use Error::Timeout to make this error retriable.
953 0 : res = Err(mgmt_api::Error::Timeout(
954 0 : format!(
955 0 : "safekeeper {} returned position {:?} which is less than minimum required position {:?}",
956 0 : client.node_id_label(),
957 0 : (ok_res.last_log_term, ok_res.flush_lsn),
958 0 : min_position
959 0 : )
960 0 : ));
961 0 : }
962 0 : }
963 0 : }
964 :
965 0 : res
966 0 : }
967 0 : },
968 : SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT,
969 : )
970 0 : .await?;
971 :
972 0 : for res in results.iter().flatten() {
973 0 : if res.current_conf.generation > mconf.generation {
974 : // Antoher switch_membership raced us.
975 0 : return Err(ApiError::Conflict(format!(
976 0 : "received configuration with generation {} from safekeeper, but expected {}",
977 0 : res.current_conf.generation, mconf.generation
978 0 : )));
979 0 : } else if res.current_conf.generation < mconf.generation {
980 : // Note: should never happen.
981 : // If we get a response, it should be at least the sent generation.
982 0 : tracing::error!(
983 0 : "received configuration with generation {} from safekeeper, but expected {}",
984 : res.current_conf.generation,
985 : mconf.generation
986 : );
987 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
988 0 : "received configuration with generation {} from safekeeper, but expected {}",
989 0 : res.current_conf.generation,
990 0 : mconf.generation
991 0 : )));
992 0 : }
993 : }
994 :
995 0 : if update_notified_generation {
996 0 : self.persistence
997 0 : .update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation)
998 0 : .await?;
999 0 : }
1000 :
1001 0 : Ok(results)
1002 0 : }
1003 :
1004 : /// Pull timeline to to_safekeepers from from_safekeepers with retries.
1005 : ///
1006 : /// Returns Ok(()) only if all the pull_timeline requests were successful.
1007 0 : async fn tenant_timeline_pull_from_peers(
1008 0 : self: &Arc<Self>,
1009 0 : tenant_id: TenantId,
1010 0 : timeline_id: TimelineId,
1011 0 : to_safekeepers: &[Safekeeper],
1012 0 : from_safekeepers: &[Safekeeper],
1013 0 : mconf: membership::Configuration,
1014 0 : ) -> Result<(), ApiError> {
1015 0 : let http_hosts = from_safekeepers
1016 0 : .iter()
1017 0 : .map(|sk| sk.base_url())
1018 0 : .collect::<Vec<_>>();
1019 :
1020 0 : tracing::info!(
1021 0 : "pulling timeline to {:?} from {:?}",
1022 0 : to_safekeepers
1023 0 : .iter()
1024 0 : .map(|sk| sk.get_id())
1025 0 : .collect::<Vec<_>>(),
1026 0 : from_safekeepers
1027 0 : .iter()
1028 0 : .map(|sk| sk.get_id())
1029 0 : .collect::<Vec<_>>()
1030 : );
1031 :
1032 0 : let req = PullTimelineRequest {
1033 0 : tenant_id,
1034 0 : timeline_id,
1035 0 : http_hosts,
1036 0 : mconf: Some(mconf),
1037 0 : };
1038 :
1039 : const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
1040 0 : let max_retries = 3;
1041 :
1042 0 : let responses = self
1043 0 : .tenant_timeline_safekeeper_op(
1044 0 : to_safekeepers,
1045 0 : move |client| {
1046 0 : let req = req.clone();
1047 0 : async move { client.pull_timeline(&req).await }
1048 0 : },
1049 0 : max_retries,
1050 : SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
1051 0 : None,
1052 : )
1053 0 : .await?;
1054 :
1055 0 : if let Some((idx, err)) = responses
1056 0 : .iter()
1057 0 : .enumerate()
1058 0 : .find_map(|(idx, res)| Some((idx, res.as_ref().err()?)))
1059 : {
1060 0 : let sk_id = to_safekeepers[idx].get_id();
1061 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1062 0 : "pull_timeline to {sk_id} failed: {err}",
1063 0 : )));
1064 0 : }
1065 :
1066 0 : Ok(())
1067 0 : }
1068 :
1069 : /// Exclude a timeline from safekeepers in parallel with retries.
1070 : ///
1071 : /// Assumes that the exclude requests are already persistent in the database.
1072 : ///
1073 : /// The function does best effort: if an exclude request is unsuccessful,
1074 : /// it will be added to the in-memory reconciler, and the function will succeed anyway.
1075 : ///
1076 : /// Might fail if there is error accessing the database.
1077 0 : async fn tenant_timeline_safekeeper_exclude_reconcile(
1078 0 : self: &Arc<Self>,
1079 0 : tenant_id: TenantId,
1080 0 : timeline_id: TimelineId,
1081 0 : safekeepers: &[Safekeeper],
1082 0 : mconf: &membership::Configuration,
1083 0 : ) -> Result<(), ApiError> {
1084 0 : let req = TimelineMembershipSwitchRequest {
1085 0 : mconf: mconf.clone(),
1086 0 : };
1087 :
1088 : const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
1089 : // Do not retry failed requests to speed up the finishing phase.
1090 : // They will be retried in the reconciler.
1091 0 : let max_retries = 0;
1092 :
1093 0 : let results = self
1094 0 : .tenant_timeline_safekeeper_op(
1095 0 : safekeepers,
1096 0 : move |client| {
1097 0 : let req = req.clone();
1098 0 : async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
1099 0 : },
1100 0 : max_retries,
1101 : SK_EXCLUDE_TIMELINE_TIMEOUT,
1102 0 : None,
1103 : )
1104 0 : .await?;
1105 :
1106 0 : let mut reconcile_requests = Vec::new();
1107 :
1108 0 : fail::fail_point!("sk-migration-step-9-mid-exclude", |_| {
1109 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1110 0 : "failpoint sk-migration-step-9-mid-exclude"
1111 0 : )))
1112 0 : });
1113 :
1114 0 : for (idx, res) in results.iter().enumerate() {
1115 0 : let sk_id = safekeepers[idx].skp.id;
1116 0 : let generation = mconf.generation.into_inner();
1117 :
1118 0 : if res.is_ok() {
1119 0 : self.persistence
1120 0 : .remove_pending_op(
1121 0 : tenant_id,
1122 0 : Some(timeline_id),
1123 0 : NodeId(sk_id as u64),
1124 0 : generation,
1125 0 : )
1126 0 : .await?;
1127 0 : } else {
1128 0 : let req = ScheduleRequest {
1129 0 : safekeeper: Box::new(safekeepers[idx].clone()),
1130 0 : host_list: Vec::new(),
1131 0 : tenant_id,
1132 0 : timeline_id: Some(timeline_id),
1133 0 : generation,
1134 0 : kind: SafekeeperTimelineOpKind::Exclude,
1135 0 : };
1136 0 : reconcile_requests.push(req);
1137 0 : }
1138 : }
1139 :
1140 0 : if !reconcile_requests.is_empty() {
1141 0 : let locked = self.inner.read().unwrap();
1142 0 : for req in reconcile_requests {
1143 0 : locked.safekeeper_reconcilers.schedule_request(req);
1144 0 : }
1145 0 : }
1146 :
1147 0 : Ok(())
1148 0 : }
1149 :
1150 : /// Migrate timeline safekeeper set to a new set.
1151 : ///
1152 : /// This function implements an algorithm from RFC-035.
1153 : /// <https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md>
1154 0 : pub(crate) async fn tenant_timeline_safekeeper_migrate(
1155 0 : self: &Arc<Self>,
1156 0 : tenant_id: TenantId,
1157 0 : timeline_id: TimelineId,
1158 0 : req: TimelineSafekeeperMigrateRequest,
1159 0 : ) -> Result<(), ApiError> {
1160 0 : let all_safekeepers = self.inner.read().unwrap().safekeepers.clone();
1161 :
1162 0 : let new_sk_set = req.new_sk_set;
1163 :
1164 0 : for sk_id in new_sk_set.iter() {
1165 0 : if !all_safekeepers.contains_key(sk_id) {
1166 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1167 0 : "safekeeper {sk_id} does not exist"
1168 0 : )));
1169 0 : }
1170 : }
1171 :
1172 0 : if new_sk_set.is_empty() {
1173 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1174 0 : "new safekeeper set is empty"
1175 0 : )));
1176 0 : }
1177 :
1178 0 : if new_sk_set.len() < self.config.timeline_safekeeper_count {
1179 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1180 0 : "new safekeeper set must have at least {} safekeepers",
1181 0 : self.config.timeline_safekeeper_count
1182 0 : )));
1183 0 : }
1184 :
1185 0 : let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
1186 0 : let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
1187 : // Construct new member set in advance to validate it.
1188 : // E.g. validates that there is no duplicate safekeepers.
1189 0 : let new_sk_member_set =
1190 0 : Self::make_member_set(&new_safekeepers).map_err(ApiError::BadRequest)?;
1191 :
1192 : // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
1193 0 : let _tenant_lock = trace_shared_lock(
1194 0 : &self.tenant_op_locks,
1195 0 : tenant_id,
1196 0 : TenantOperations::TimelineSafekeeperMigrate,
1197 0 : )
1198 0 : .await;
1199 :
1200 : // 1. Fetch current timeline configuration from the configuration storage.
1201 :
1202 0 : let timeline = self
1203 0 : .persistence
1204 0 : .get_timeline(tenant_id, timeline_id)
1205 0 : .await?;
1206 :
1207 0 : let Some(timeline) = timeline else {
1208 0 : return Err(ApiError::NotFound(
1209 0 : anyhow::anyhow!(
1210 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
1211 0 : )
1212 0 : .into(),
1213 0 : ));
1214 : };
1215 :
1216 0 : let cur_sk_set = timeline
1217 0 : .sk_set
1218 0 : .iter()
1219 0 : .map(|&id| NodeId(id as u64))
1220 0 : .collect::<Vec<_>>();
1221 :
1222 : // Validate that we are not migrating to a decomissioned safekeeper.
1223 0 : for sk in new_safekeepers.iter() {
1224 0 : if !cur_sk_set.contains(&sk.get_id())
1225 0 : && sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned
1226 : {
1227 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1228 0 : "safekeeper {} is decomissioned",
1229 0 : sk.get_id()
1230 0 : )));
1231 0 : }
1232 : }
1233 :
1234 0 : tracing::info!(
1235 : ?cur_sk_set,
1236 : ?new_sk_set,
1237 0 : "Migrating timeline to new safekeeper set",
1238 : );
1239 :
1240 0 : let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
1241 :
1242 0 : if let Some(ref presistent_new_sk_set) = timeline.new_sk_set {
1243 : // 2. If it is already joint one and new_set is different from desired_set refuse to change.
1244 0 : if presistent_new_sk_set
1245 0 : .iter()
1246 0 : .map(|&id| NodeId(id as u64))
1247 0 : .ne(new_sk_set.iter().cloned())
1248 : {
1249 0 : tracing::info!(
1250 : ?presistent_new_sk_set,
1251 : ?new_sk_set,
1252 0 : "different new safekeeper set is already set in the database",
1253 : );
1254 0 : return Err(ApiError::Conflict(format!(
1255 0 : "the timeline is already migrating to a different safekeeper set: {presistent_new_sk_set:?}"
1256 0 : )));
1257 0 : }
1258 : // It it is the same new_sk_set, we can continue the migration (retry).
1259 : } else {
1260 0 : let prev_finished = timeline.cplane_notified_generation == timeline.generation
1261 0 : && timeline.sk_set_notified_generation == timeline.generation;
1262 :
1263 0 : if !prev_finished {
1264 : // The previous migration is committed, but the finish step failed.
1265 : // Safekeepers/cplane might not know about the last membership configuration.
1266 : // Retry the finish step to ensure smooth migration.
1267 0 : self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
1268 0 : .await?;
1269 0 : }
1270 :
1271 0 : if cur_sk_set == new_sk_set {
1272 0 : tracing::info!("timeline is already at the desired safekeeper set");
1273 0 : return Ok(());
1274 0 : }
1275 :
1276 : // 3. No active migration yet.
1277 : // Increment current generation and put desired_set to new_sk_set.
1278 0 : generation = generation.next();
1279 :
1280 0 : self.persistence
1281 0 : .update_timeline_membership(
1282 0 : tenant_id,
1283 0 : timeline_id,
1284 0 : generation,
1285 0 : &cur_sk_set,
1286 0 : Some(&new_sk_set),
1287 0 : &[],
1288 0 : )
1289 0 : .await?;
1290 :
1291 0 : fail::fail_point!("sk-migration-after-step-3", |_| {
1292 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1293 0 : "failpoint sk-migration-after-step-3"
1294 0 : )))
1295 0 : });
1296 : }
1297 :
1298 0 : let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
1299 0 : let cur_sk_member_set =
1300 0 : Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
1301 :
1302 0 : let joint_config = membership::Configuration {
1303 0 : generation,
1304 0 : members: cur_sk_member_set,
1305 0 : new_members: Some(new_sk_member_set.clone()),
1306 0 : };
1307 :
1308 : // 4. Call PUT configuration on safekeepers from the current set,
1309 : // delivering them joint_conf.
1310 :
1311 : // Notify cplane/compute about the membership change BEFORE changing the membership on safekeepers.
1312 : // This way the compute will know about new safekeepers from joint_config before we require to
1313 : // collect a quorum from them.
1314 0 : self.cplane_notify_safekeepers(tenant_id, timeline_id, &joint_config)
1315 0 : .await?;
1316 :
1317 0 : let results = self
1318 0 : .tenant_timeline_set_membership_quorum(
1319 0 : tenant_id,
1320 0 : timeline_id,
1321 0 : &cur_safekeepers,
1322 0 : &joint_config,
1323 0 : None, // no min position
1324 0 : true, // update notified generation
1325 0 : )
1326 0 : .await?;
1327 :
1328 0 : let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
1329 0 : for res in results.into_iter().flatten() {
1330 0 : let sk_position = (res.last_log_term, res.flush_lsn);
1331 0 : if sync_position < sk_position {
1332 0 : sync_position = sk_position;
1333 0 : }
1334 : }
1335 :
1336 0 : tracing::info!(
1337 : %generation,
1338 : ?sync_position,
1339 0 : "safekeepers set membership updated",
1340 : );
1341 :
1342 0 : fail::fail_point!("sk-migration-after-step-4", |_| {
1343 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1344 0 : "failpoint sk-migration-after-step-4"
1345 0 : )))
1346 0 : });
1347 :
1348 : // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
1349 : // by doing pull_timeline from the majority of the current set.
1350 :
1351 : // Filter out safekeepers which are already in the current set.
1352 0 : let from_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
1353 0 : let pull_to_safekeepers = new_safekeepers
1354 0 : .iter()
1355 0 : .filter(|sk| !from_ids.contains(&sk.get_id()))
1356 0 : .cloned()
1357 0 : .collect::<Vec<_>>();
1358 :
1359 0 : self.tenant_timeline_pull_from_peers(
1360 0 : tenant_id,
1361 0 : timeline_id,
1362 0 : &pull_to_safekeepers,
1363 0 : &cur_safekeepers,
1364 0 : joint_config.clone(),
1365 0 : )
1366 0 : .await?;
1367 :
1368 0 : fail::fail_point!("sk-migration-after-step-5", |_| {
1369 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1370 0 : "failpoint sk-migration-after-step-5"
1371 0 : )))
1372 0 : });
1373 :
1374 : // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
1375 :
1376 : // TODO(diko): do we need to bump timeline term?
1377 :
1378 : // 7. Repeatedly call PUT configuration on safekeepers from the new set,
1379 : // delivering them joint_conf and collecting their positions.
1380 :
1381 0 : tracing::info!(?sync_position, "waiting for safekeepers to sync position");
1382 :
1383 0 : self.tenant_timeline_set_membership_quorum(
1384 0 : tenant_id,
1385 0 : timeline_id,
1386 0 : &new_safekeepers,
1387 0 : &joint_config,
1388 0 : Some(sync_position),
1389 0 : false, // we're just waiting for sync position, don't update notified generation
1390 0 : )
1391 0 : .await?;
1392 :
1393 0 : fail::fail_point!("sk-migration-after-step-7", |_| {
1394 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1395 0 : "failpoint sk-migration-after-step-7"
1396 0 : )))
1397 0 : });
1398 :
1399 : // 8. Create new_conf: Configuration incrementing joint_conf generation and
1400 : // having new safekeeper set as sk_set and None new_sk_set.
1401 :
1402 0 : let generation = generation.next();
1403 :
1404 0 : let new_conf = membership::Configuration {
1405 0 : generation,
1406 0 : members: new_sk_member_set,
1407 0 : new_members: None,
1408 0 : };
1409 :
1410 0 : let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
1411 0 : let exclude_safekeepers = cur_safekeepers
1412 0 : .into_iter()
1413 0 : .filter(|sk| !new_ids.contains(&sk.get_id()))
1414 0 : .collect::<Vec<_>>();
1415 0 : let exclude_requests = exclude_safekeepers
1416 0 : .iter()
1417 0 : .map(|sk| TimelinePendingOpPersistence {
1418 0 : sk_id: sk.skp.id,
1419 0 : tenant_id: tenant_id.to_string(),
1420 0 : timeline_id: timeline_id.to_string(),
1421 0 : generation: generation.into_inner() as i32,
1422 0 : op_kind: SafekeeperTimelineOpKind::Exclude,
1423 0 : })
1424 0 : .collect::<Vec<_>>();
1425 :
1426 0 : self.persistence
1427 0 : .update_timeline_membership(
1428 0 : tenant_id,
1429 0 : timeline_id,
1430 0 : generation,
1431 0 : &new_sk_set,
1432 0 : None,
1433 0 : &exclude_requests,
1434 0 : )
1435 0 : .await?;
1436 :
1437 0 : fail::fail_point!("sk-migration-after-step-8", |_| {
1438 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1439 0 : "failpoint sk-migration-after-step-8"
1440 0 : )))
1441 0 : });
1442 :
1443 : // At this point we have already updated the timeline in the database, so the final
1444 : // membership configuration is commited and the migration is not abortable anymore.
1445 : // But safekeepers and cplane/compute still need to be notified about the new configuration.
1446 : // The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about
1447 : // the new configuration and reconciles excluded safekeepers.
1448 : // If it fails, the safkeeper migration call should be retried.
1449 :
1450 0 : self.finish_safekeeper_migration(
1451 0 : tenant_id,
1452 0 : timeline_id,
1453 0 : &new_safekeepers,
1454 0 : &new_conf,
1455 0 : &exclude_safekeepers,
1456 0 : )
1457 0 : .await?;
1458 :
1459 0 : Ok(())
1460 0 : }
1461 :
1462 : /// Notify cplane about safekeeper membership change.
1463 : /// The cplane will receive a joint set of safekeepers as a safekeeper list.
1464 0 : async fn cplane_notify_safekeepers(
1465 0 : &self,
1466 0 : tenant_id: TenantId,
1467 0 : timeline_id: TimelineId,
1468 0 : mconf: &membership::Configuration,
1469 0 : ) -> Result<(), ApiError> {
1470 0 : let mut safekeepers = Vec::new();
1471 0 : let mut ids: HashSet<_> = HashSet::new();
1472 :
1473 0 : for member in mconf
1474 0 : .members
1475 0 : .m
1476 0 : .iter()
1477 0 : .chain(mconf.new_members.iter().flat_map(|m| m.m.iter()))
1478 : {
1479 0 : if ids.insert(member.id) {
1480 0 : safekeepers.push(compute_hook::SafekeeperInfo {
1481 0 : id: member.id,
1482 0 : hostname: Some(member.host.clone()),
1483 0 : });
1484 0 : }
1485 : }
1486 :
1487 0 : self.compute_hook
1488 0 : .notify_safekeepers(
1489 0 : compute_hook::SafekeepersUpdate {
1490 0 : tenant_id,
1491 0 : timeline_id,
1492 0 : generation: mconf.generation,
1493 0 : safekeepers,
1494 0 : },
1495 0 : &self.cancel,
1496 0 : )
1497 0 : .await
1498 0 : .map_err(|err| {
1499 0 : ApiError::InternalServerError(anyhow::anyhow!(
1500 0 : "failed to notify cplane about safekeeper membership change: {err}"
1501 0 : ))
1502 0 : })?;
1503 :
1504 0 : self.persistence
1505 0 : .update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation)
1506 0 : .await?;
1507 :
1508 0 : Ok(())
1509 0 : }
1510 :
1511 : /// Finish safekeeper migration.
1512 : ///
1513 : /// It is the last step of the safekeeper migration.
1514 : ///
1515 : /// Notifies safekeepers and cplane about the final membership configuration,
1516 : /// reconciles excluded safekeepers and updates *_notified_generation in the database.
1517 0 : async fn finish_safekeeper_migration(
1518 0 : self: &Arc<Self>,
1519 0 : tenant_id: TenantId,
1520 0 : timeline_id: TimelineId,
1521 0 : new_safekeepers: &[Safekeeper],
1522 0 : new_conf: &membership::Configuration,
1523 0 : exclude_safekeepers: &[Safekeeper],
1524 0 : ) -> Result<(), ApiError> {
1525 : // 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf.
1526 : // Also try to exclude safekeepers and notify cplane about the membership change.
1527 :
1528 0 : self.tenant_timeline_set_membership_quorum(
1529 0 : tenant_id,
1530 0 : timeline_id,
1531 0 : new_safekeepers,
1532 0 : new_conf,
1533 0 : None, // no min position
1534 0 : true, // update notified generation
1535 0 : )
1536 0 : .await?;
1537 :
1538 0 : fail::fail_point!("sk-migration-step-9-after-set-membership", |_| {
1539 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1540 0 : "failpoint sk-migration-step-9-after-set-membership"
1541 0 : )))
1542 0 : });
1543 :
1544 0 : self.tenant_timeline_safekeeper_exclude_reconcile(
1545 0 : tenant_id,
1546 0 : timeline_id,
1547 0 : exclude_safekeepers,
1548 0 : new_conf,
1549 0 : )
1550 0 : .await?;
1551 :
1552 0 : fail::fail_point!("sk-migration-step-9-after-exclude", |_| {
1553 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1554 0 : "failpoint sk-migration-step-9-after-exclude"
1555 0 : )))
1556 0 : });
1557 :
1558 : // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
1559 : // This way the compute will stop talking to excluded safekeepers only after we stop requiring to
1560 : // collect a quorum from them.
1561 0 : self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf)
1562 0 : .await?;
1563 :
1564 0 : fail::fail_point!("sk-migration-after-step-9", |_| {
1565 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1566 0 : "failpoint sk-migration-after-step-9"
1567 0 : )))
1568 0 : });
1569 :
1570 0 : Ok(())
1571 0 : }
1572 :
1573 : /// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database.
1574 : /// It's used when the migration failed during the finish step and we need to retry it.
1575 0 : async fn finish_safekeeper_migration_retry(
1576 0 : self: &Arc<Self>,
1577 0 : tenant_id: TenantId,
1578 0 : timeline_id: TimelineId,
1579 0 : timeline: &TimelinePersistence,
1580 0 : ) -> Result<(), ApiError> {
1581 0 : if timeline.new_sk_set.is_some() {
1582 : // Logical error, should never happen.
1583 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1584 0 : "can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None"
1585 0 : )));
1586 0 : }
1587 :
1588 0 : let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
1589 0 : let cur_sk_member_set =
1590 0 : Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
1591 :
1592 0 : let mconf = membership::Configuration {
1593 0 : generation: SafekeeperGeneration::new(timeline.generation as u32),
1594 0 : members: cur_sk_member_set,
1595 0 : new_members: None,
1596 0 : };
1597 :
1598 : // We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler.
1599 : // Reload them from the database.
1600 0 : let pending_ops = self
1601 0 : .persistence
1602 0 : .list_pending_ops_for_timeline(tenant_id, timeline_id)
1603 0 : .await?;
1604 :
1605 0 : let mut exclude_sk_ids = Vec::new();
1606 :
1607 0 : for op in pending_ops {
1608 0 : if op.op_kind == SafekeeperTimelineOpKind::Exclude
1609 0 : && op.generation == timeline.generation
1610 0 : {
1611 0 : exclude_sk_ids.push(op.sk_id);
1612 0 : }
1613 : }
1614 :
1615 0 : let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?;
1616 :
1617 0 : self.finish_safekeeper_migration(
1618 0 : tenant_id,
1619 0 : timeline_id,
1620 0 : &cur_safekeepers,
1621 0 : &mconf,
1622 0 : &exclude_safekeepers,
1623 0 : )
1624 0 : .await?;
1625 :
1626 0 : Ok(())
1627 0 : }
1628 : }
|