Line data Source code
1 : use std::collections::HashSet;
2 : use std::str::FromStr;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use super::safekeeper_reconciler::ScheduleRequest;
7 : use crate::compute_hook;
8 : use crate::heartbeater::SafekeeperState;
9 : use crate::id_lock_map::trace_shared_lock;
10 : use crate::metrics;
11 : use crate::persistence::{
12 : DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
13 : TimelineUpdate,
14 : };
15 : use crate::safekeeper::Safekeeper;
16 : use crate::safekeeper_client::SafekeeperClient;
17 : use crate::service::TenantOperations;
18 : use crate::timeline_import::TimelineImportFinalizeError;
19 : use anyhow::Context;
20 : use http_utils::error::ApiError;
21 : use pageserver_api::controller_api::{
22 : SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
23 : TimelineSafekeeperMigrateRequest,
24 : };
25 : use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
26 : use safekeeper_api::PgVersionId;
27 : use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
28 : use safekeeper_api::models::{
29 : PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
30 : TimelineMembershipSwitchResponse,
31 : };
32 : use safekeeper_api::{INITIAL_TERM, Term};
33 : use safekeeper_client::mgmt_api;
34 : use tokio::task::JoinSet;
35 : use tokio_util::sync::CancellationToken;
36 : use utils::id::{NodeId, TenantId, TimelineId};
37 : use utils::logging::SecretString;
38 : use utils::lsn::Lsn;
39 :
40 : use super::Service;
41 :
42 : impl Service {
43 0 : fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, anyhow::Error> {
44 0 : let members = safekeepers
45 0 : .iter()
46 0 : .map(|sk| sk.get_safekeeper_id())
47 0 : .collect::<Vec<_>>();
48 :
49 0 : MemberSet::new(members)
50 0 : }
51 :
52 0 : fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
53 0 : let safekeepers = {
54 0 : let locked = self.inner.read().unwrap();
55 0 : locked.safekeepers.clone()
56 : };
57 :
58 0 : ids.iter()
59 0 : .map(|&id| {
60 0 : let node_id = NodeId(id as u64);
61 0 : safekeepers.get(&node_id).cloned().ok_or_else(|| {
62 0 : ApiError::InternalServerError(anyhow::anyhow!(
63 0 : "safekeeper {node_id} is not registered"
64 0 : ))
65 0 : })
66 0 : })
67 0 : .collect::<Result<Vec<_>, _>>()
68 0 : }
69 :
70 : /// Timeline creation on safekeepers
71 : ///
72 : /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
73 : /// where `left` contains the list of safekeepers that didn't have a successful response.
74 : /// Assumes tenant lock is held while calling this function.
75 0 : pub(super) async fn tenant_timeline_create_safekeepers_quorum(
76 0 : &self,
77 0 : tenant_id: TenantId,
78 0 : timeline_id: TimelineId,
79 0 : pg_version: PgVersionId,
80 0 : timeline_persistence: &TimelinePersistence,
81 0 : ) -> Result<Vec<NodeId>, ApiError> {
82 0 : let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
83 :
84 0 : let mset = Self::make_member_set(&safekeepers).map_err(ApiError::InternalServerError)?;
85 0 : let mconf = safekeeper_api::membership::Configuration::new(mset);
86 :
87 0 : let req = safekeeper_api::models::TimelineCreateRequest {
88 0 : commit_lsn: None,
89 0 : mconf,
90 0 : pg_version,
91 0 : start_lsn: timeline_persistence.start_lsn.0,
92 0 : system_id: None,
93 0 : tenant_id,
94 0 : timeline_id,
95 0 : wal_seg_size: None,
96 0 : };
97 :
98 : const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
99 :
100 0 : let results = self
101 0 : .tenant_timeline_safekeeper_op_quorum(
102 0 : &safekeepers,
103 0 : move |client| {
104 0 : let req = req.clone();
105 0 : async move { client.create_timeline(&req).await }
106 0 : },
107 : SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
108 : )
109 0 : .await?;
110 :
111 0 : Ok(results
112 0 : .into_iter()
113 0 : .enumerate()
114 0 : .filter_map(|(idx, res)| {
115 0 : if res.is_ok() {
116 0 : None // Success, don't return this safekeeper
117 : } else {
118 0 : Some(safekeepers[idx].get_id()) // Failure, return this safekeeper
119 : }
120 0 : })
121 0 : .collect::<Vec<_>>())
122 0 : }
123 :
124 : /// Perform an operation on a list of safekeepers in parallel with retries.
125 : ///
126 : /// Return the results of the operation on each safekeeper in the input order.
127 0 : async fn tenant_timeline_safekeeper_op<T, O, F>(
128 0 : &self,
129 0 : safekeepers: &[Safekeeper],
130 0 : op: O,
131 0 : timeout: Duration,
132 0 : ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
133 0 : where
134 0 : O: FnMut(SafekeeperClient) -> F + Send + 'static,
135 0 : O: Clone,
136 0 : F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
137 0 : T: Sync + Send + 'static,
138 0 : {
139 0 : let jwt = self
140 0 : .config
141 0 : .safekeeper_jwt_token
142 0 : .clone()
143 0 : .map(SecretString::from);
144 0 : let mut joinset = JoinSet::new();
145 :
146 0 : for (idx, sk) in safekeepers.iter().enumerate() {
147 0 : let sk = sk.clone();
148 0 : let http_client = self.http_client.clone();
149 0 : let jwt = jwt.clone();
150 0 : let op = op.clone();
151 0 : joinset.spawn(async move {
152 0 : let res = sk
153 0 : .with_client_retries(
154 0 : op,
155 0 : &http_client,
156 0 : &jwt,
157 0 : 3,
158 0 : 3,
159 0 : // TODO(diko): This is a wrong timeout.
160 0 : // It should be scaled to the retry count.
161 0 : timeout,
162 0 : &CancellationToken::new(),
163 0 : )
164 0 : .await;
165 0 : (idx, res)
166 0 : });
167 : }
168 :
169 : // Initialize results with timeout errors in case we never get a response.
170 0 : let mut results: Vec<mgmt_api::Result<T>> = safekeepers
171 0 : .iter()
172 0 : .map(|_| {
173 0 : Err(mgmt_api::Error::Timeout(
174 0 : "safekeeper operation timed out".to_string(),
175 0 : ))
176 0 : })
177 0 : .collect();
178 :
179 : // After we have built the joinset, we now wait for the tasks to complete,
180 : // but with a specified timeout to make sure we return swiftly, either with
181 : // a failure or success.
182 0 : let reconcile_deadline = tokio::time::Instant::now() + timeout;
183 :
184 : // Wait until all tasks finish or timeout is hit, whichever occurs
185 : // first.
186 0 : let mut result_count = 0;
187 : loop {
188 0 : if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
189 : {
190 0 : let Some(res) = res else { break };
191 0 : match res {
192 0 : Ok((idx, res)) => {
193 0 : let sk = &safekeepers[idx];
194 0 : tracing::info!(
195 0 : "response from safekeeper id:{} at {}: {:?}",
196 0 : sk.get_id(),
197 : sk.skp.host,
198 : // Only print errors, as there is no Debug trait for T.
199 0 : res.as_ref().map(|_| ()),
200 : );
201 0 : results[idx] = res;
202 0 : result_count += 1;
203 : }
204 0 : Err(join_err) => {
205 0 : tracing::info!("join_err for task in joinset: {join_err}");
206 : }
207 : }
208 : } else {
209 0 : tracing::info!("timeout for operation call after {result_count} responses",);
210 0 : break;
211 : }
212 : }
213 :
214 0 : Ok(results)
215 0 : }
216 :
217 : /// Perform an operation on a list of safekeepers in parallel with retries,
218 : /// and validates that we reach a quorum of successful responses.
219 : ///
220 : /// Return the results of the operation on each safekeeper in the input order.
221 : /// It's guaranteed that at least a quorum of the responses are successful.
222 0 : async fn tenant_timeline_safekeeper_op_quorum<T, O, F>(
223 0 : &self,
224 0 : safekeepers: &[Safekeeper],
225 0 : op: O,
226 0 : timeout: Duration,
227 0 : ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
228 0 : where
229 0 : O: FnMut(SafekeeperClient) -> F,
230 0 : O: Clone + Send + 'static,
231 0 : F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
232 0 : T: Sync + Send + 'static,
233 0 : {
234 0 : let target_sk_count = safekeepers.len();
235 :
236 0 : if target_sk_count == 0 {
237 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
238 0 : "timeline configured without any safekeepers"
239 0 : )));
240 0 : }
241 :
242 0 : if target_sk_count < self.config.timeline_safekeeper_count {
243 0 : tracing::warn!(
244 0 : "running a quorum operation with {} safekeepers, which is less than configured {} safekeepers per timeline",
245 : target_sk_count,
246 : self.config.timeline_safekeeper_count
247 : );
248 0 : }
249 :
250 0 : let results = self
251 0 : .tenant_timeline_safekeeper_op(safekeepers, op, timeout)
252 0 : .await?;
253 :
254 : // Now check if quorum was reached in results.
255 :
256 0 : let quorum_size = target_sk_count / 2 + 1;
257 :
258 0 : let success_count = results.iter().filter(|res| res.is_ok()).count();
259 0 : if success_count < quorum_size {
260 : // Failure
261 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
262 0 : "not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
263 0 : )));
264 0 : }
265 :
266 0 : Ok(results)
267 0 : }
268 :
269 : /// Create timeline in controller database and on safekeepers.
270 : /// `timeline_info` is result of timeline creation on pageserver.
271 : ///
272 : /// All actions must be idempotent as the call is retried until success. It
273 : /// tries to create timeline in the db and on at least majority of
274 : /// safekeepers + queue creation for safekeepers which missed it in the db
275 : /// for infinite retries; after that, call returns Ok.
276 : ///
277 : /// The idea is that once this is reached as long as we have alive majority
278 : /// of safekeepers it is expected to get eventually operational as storcon
279 : /// will be able to seed timeline on nodes which missed creation by making
280 : /// pull_timeline from peers. On the other hand we don't want to fail
281 : /// timeline creation if one safekeeper is down.
282 0 : pub(super) async fn tenant_timeline_create_safekeepers(
283 0 : self: &Arc<Self>,
284 0 : tenant_id: TenantId,
285 0 : timeline_info: &TimelineInfo,
286 0 : read_only: bool,
287 0 : ) -> Result<SafekeepersInfo, ApiError> {
288 0 : let timeline_id = timeline_info.timeline_id;
289 0 : let pg_version = PgVersionId::from(timeline_info.pg_version);
290 : // Initially start_lsn is determined by last_record_lsn in pageserver
291 : // response as it does initdb. However, later we persist it and in sk
292 : // creation calls replace with the value from the timeline row if it
293 : // previously existed as on retries in theory endpoint might have
294 : // already written some data and advanced last_record_lsn, while we want
295 : // safekeepers to have consistent start_lsn.
296 0 : let start_lsn = timeline_info.last_record_lsn;
297 :
298 : // Choose initial set of safekeepers respecting affinity
299 0 : let sks = if !read_only {
300 0 : self.safekeepers_for_new_timeline().await?
301 : } else {
302 0 : Vec::new()
303 : };
304 0 : let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
305 : // Add timeline to db
306 0 : let mut timeline_persist = TimelinePersistence {
307 0 : tenant_id: tenant_id.to_string(),
308 0 : timeline_id: timeline_id.to_string(),
309 0 : start_lsn: start_lsn.into(),
310 0 : generation: 1,
311 0 : sk_set: sks_persistence.clone(),
312 0 : new_sk_set: None,
313 0 : cplane_notified_generation: 0,
314 0 : deleted_at: None,
315 0 : sk_set_notified_generation: 0,
316 0 : };
317 0 : let inserted = self
318 0 : .persistence
319 0 : .insert_timeline(timeline_persist.clone())
320 0 : .await?;
321 0 : if !inserted {
322 0 : if let Some(existent_persist) = self
323 0 : .persistence
324 0 : .get_timeline(tenant_id, timeline_id)
325 0 : .await?
326 0 : {
327 0 : // Replace with what we have in the db, to get stuff like the generation right.
328 0 : // We do still repeat the http calls to the safekeepers. After all, we could have
329 0 : // crashed right after the wrote to the DB.
330 0 : timeline_persist = existent_persist;
331 0 : } else {
332 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
333 0 : "insertion said timeline already in db, but looking it up, it was gone"
334 0 : )));
335 : }
336 0 : }
337 0 : let ret = SafekeepersInfo {
338 0 : generation: timeline_persist.generation as u32,
339 0 : safekeepers: sks.clone(),
340 0 : tenant_id,
341 0 : timeline_id,
342 0 : };
343 0 : if read_only {
344 0 : return Ok(ret);
345 0 : }
346 :
347 : // Create the timeline on a quorum of safekeepers
348 0 : let remaining = self
349 0 : .tenant_timeline_create_safekeepers_quorum(
350 0 : tenant_id,
351 0 : timeline_id,
352 0 : pg_version,
353 0 : &timeline_persist,
354 0 : )
355 0 : .await?;
356 :
357 : // For the remaining safekeepers, take care of their reconciliation asynchronously
358 0 : for &remaining_id in remaining.iter() {
359 0 : let pending_op = TimelinePendingOpPersistence {
360 0 : tenant_id: tenant_id.to_string(),
361 0 : timeline_id: timeline_id.to_string(),
362 0 : generation: timeline_persist.generation,
363 0 : op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
364 0 : sk_id: remaining_id.0 as i64,
365 0 : };
366 0 : tracing::info!("writing pending op for sk id {remaining_id}");
367 0 : self.persistence.insert_pending_op(pending_op).await?;
368 : }
369 0 : if !remaining.is_empty() {
370 0 : let locked = self.inner.read().unwrap();
371 0 : for remaining_id in remaining {
372 0 : let Some(sk) = locked.safekeepers.get(&remaining_id) else {
373 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
374 0 : "Couldn't find safekeeper with id {remaining_id}"
375 0 : )));
376 : };
377 0 : let Ok(host_list) = sks
378 0 : .iter()
379 0 : .map(|sk| {
380 : Ok((
381 0 : sk.id,
382 0 : locked
383 0 : .safekeepers
384 0 : .get(&sk.id)
385 0 : .ok_or_else(|| {
386 0 : ApiError::InternalServerError(anyhow::anyhow!(
387 0 : "Couldn't find safekeeper with id {} to pull from",
388 0 : sk.id
389 0 : ))
390 0 : })?
391 0 : .base_url(),
392 : ))
393 0 : })
394 0 : .collect::<Result<_, ApiError>>()
395 : else {
396 0 : continue;
397 : };
398 0 : let req = ScheduleRequest {
399 0 : safekeeper: Box::new(sk.clone()),
400 0 : host_list,
401 0 : tenant_id,
402 0 : timeline_id: Some(timeline_id),
403 0 : generation: timeline_persist.generation as u32,
404 0 : kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
405 0 : };
406 0 : locked.safekeeper_reconcilers.schedule_request(req);
407 : }
408 0 : }
409 :
410 0 : Ok(ret)
411 0 : }
412 :
413 0 : pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
414 0 : self: &Arc<Self>,
415 0 : tenant_id: TenantId,
416 0 : timeline_info: TimelineInfo,
417 0 : ) -> Result<(), TimelineImportFinalizeError> {
418 : const BACKOFF: Duration = Duration::from_secs(5);
419 :
420 : loop {
421 0 : if self.cancel.is_cancelled() {
422 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
423 0 : }
424 :
425 : // This function is only used in non-read-only scenarios
426 0 : let read_only = false;
427 0 : let res = self
428 0 : .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
429 0 : .await;
430 :
431 0 : match res {
432 : Ok(_) => {
433 0 : tracing::info!("Timeline created on safekeepers");
434 0 : break;
435 : }
436 0 : Err(err) => {
437 0 : tracing::error!("Failed to create timeline on safekeepers: {err}");
438 0 : tokio::select! {
439 0 : _ = self.cancel.cancelled() => {
440 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
441 : },
442 0 : _ = tokio::time::sleep(BACKOFF) => {}
443 : };
444 : }
445 : }
446 : }
447 :
448 0 : Ok(())
449 0 : }
450 :
451 : /// Directly insert the timeline into the database without reconciling it with safekeepers.
452 : ///
453 : /// Useful if the timeline already exists on the specified safekeepers,
454 : /// but we want to make it storage controller managed.
455 0 : pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
456 0 : let persistence = TimelinePersistence {
457 0 : tenant_id: req.tenant_id.to_string(),
458 0 : timeline_id: req.timeline_id.to_string(),
459 0 : start_lsn: req.start_lsn.into(),
460 : generation: 1,
461 0 : sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
462 0 : new_sk_set: None,
463 : cplane_notified_generation: 1,
464 0 : deleted_at: None,
465 : sk_set_notified_generation: 1,
466 : };
467 0 : let inserted = self
468 0 : .persistence
469 0 : .insert_timeline(persistence.clone())
470 0 : .await?;
471 0 : if inserted {
472 0 : tracing::info!("imported timeline into db");
473 0 : return Ok(());
474 0 : }
475 0 : tracing::info!("timeline already present in db, updating");
476 :
477 0 : let update = TimelineUpdate {
478 0 : tenant_id: persistence.tenant_id,
479 0 : timeline_id: persistence.timeline_id,
480 0 : start_lsn: persistence.start_lsn,
481 0 : sk_set: persistence.sk_set,
482 0 : new_sk_set: persistence.new_sk_set,
483 0 : };
484 0 : self.persistence.update_timeline_unsafe(update).await?;
485 0 : tracing::info!("timeline updated");
486 :
487 0 : Ok(())
488 0 : }
489 :
490 : /// Locate safekeepers for a timeline.
491 : /// Return the generation, sk_set and new_sk_set if present.
492 : /// If the timeline is not storcon-managed, return NotFound.
493 0 : pub(crate) async fn tenant_timeline_locate(
494 0 : &self,
495 0 : tenant_id: TenantId,
496 0 : timeline_id: TimelineId,
497 0 : ) -> Result<TimelineLocateResponse, ApiError> {
498 0 : let timeline = self
499 0 : .persistence
500 0 : .get_timeline(tenant_id, timeline_id)
501 0 : .await?;
502 :
503 0 : let Some(timeline) = timeline else {
504 0 : return Err(ApiError::NotFound(
505 0 : anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
506 0 : ));
507 : };
508 :
509 : Ok(TimelineLocateResponse {
510 0 : generation: SafekeeperGeneration::new(timeline.generation as u32),
511 0 : sk_set: timeline
512 0 : .sk_set
513 0 : .iter()
514 0 : .map(|id| NodeId(*id as u64))
515 0 : .collect(),
516 0 : new_sk_set: timeline
517 0 : .new_sk_set
518 0 : .map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
519 : })
520 0 : }
521 :
522 : /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
523 0 : pub(super) async fn tenant_timeline_delete_safekeepers(
524 0 : self: &Arc<Self>,
525 0 : tenant_id: TenantId,
526 0 : timeline_id: TimelineId,
527 0 : ) -> Result<(), ApiError> {
528 0 : let tl = self
529 0 : .persistence
530 0 : .get_timeline(tenant_id, timeline_id)
531 0 : .await?;
532 0 : let Some(tl) = tl else {
533 0 : tracing::info!(
534 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
535 : );
536 0 : return Ok(());
537 : };
538 0 : self.persistence
539 0 : .timeline_set_deleted_at(tenant_id, timeline_id)
540 0 : .await?;
541 0 : let all_sks = tl
542 0 : .new_sk_set
543 0 : .iter()
544 0 : .flatten()
545 0 : .chain(tl.sk_set.iter())
546 0 : .collect::<HashSet<_>>();
547 :
548 : // The timeline has no safekeepers: we need to delete it from the db manually,
549 : // as no safekeeper reconciler will get to it
550 0 : if all_sks.is_empty() {
551 0 : if let Err(err) = self
552 0 : .persistence
553 0 : .delete_timeline(tenant_id, timeline_id)
554 0 : .await
555 : {
556 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
557 0 : }
558 0 : }
559 :
560 : // Schedule reconciliations
561 0 : for &sk_id in all_sks.iter() {
562 0 : let pending_op = TimelinePendingOpPersistence {
563 0 : tenant_id: tenant_id.to_string(),
564 0 : timeline_id: timeline_id.to_string(),
565 0 : generation: i32::MAX,
566 0 : op_kind: SafekeeperTimelineOpKind::Delete,
567 0 : sk_id: *sk_id,
568 0 : };
569 0 : tracing::info!("writing pending op for sk id {sk_id}");
570 0 : self.persistence.insert_pending_op(pending_op).await?;
571 : }
572 : {
573 0 : let locked = self.inner.read().unwrap();
574 0 : for sk_id in all_sks {
575 0 : let sk_id = NodeId(*sk_id as u64);
576 0 : let Some(sk) = locked.safekeepers.get(&sk_id) else {
577 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
578 0 : "Couldn't find safekeeper with id {sk_id}"
579 0 : )));
580 : };
581 :
582 0 : let req = ScheduleRequest {
583 0 : safekeeper: Box::new(sk.clone()),
584 0 : // we don't use this for this kind, put a dummy value
585 0 : host_list: Vec::new(),
586 0 : tenant_id,
587 0 : timeline_id: Some(timeline_id),
588 0 : generation: tl.generation as u32,
589 0 : kind: SafekeeperTimelineOpKind::Delete,
590 0 : };
591 0 : locked.safekeeper_reconcilers.schedule_request(req);
592 : }
593 : }
594 0 : Ok(())
595 0 : }
596 :
597 : /// Perform tenant deletion on safekeepers.
598 0 : pub(super) async fn tenant_delete_safekeepers(
599 0 : self: &Arc<Self>,
600 0 : tenant_id: TenantId,
601 0 : ) -> Result<(), ApiError> {
602 0 : let timeline_list = self
603 0 : .persistence
604 0 : .list_timelines_for_tenant(tenant_id)
605 0 : .await?;
606 :
607 0 : if timeline_list.is_empty() {
608 : // Early exit: the tenant is either empty or not migrated to the storcon yet
609 0 : tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
610 0 : return Ok(());
611 0 : }
612 :
613 0 : let timeline_list = timeline_list
614 0 : .into_iter()
615 0 : .map(|timeline| {
616 0 : let timeline_id = TimelineId::from_str(&timeline.timeline_id)
617 0 : .context("timeline id loaded from db")
618 0 : .map_err(ApiError::InternalServerError)?;
619 0 : Ok((timeline_id, timeline))
620 0 : })
621 0 : .collect::<Result<Vec<_>, ApiError>>()?;
622 :
623 : // Remove pending ops from db, and set `deleted_at`.
624 : // We cancel them in a later iteration once we hold the state lock.
625 0 : for (timeline_id, _timeline) in timeline_list.iter() {
626 0 : self.persistence
627 0 : .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
628 0 : .await?;
629 0 : self.persistence
630 0 : .timeline_set_deleted_at(tenant_id, *timeline_id)
631 0 : .await?;
632 : }
633 :
634 : // The list of safekeepers that have any of the timelines
635 0 : let mut sk_list = HashSet::new();
636 :
637 : // List all pending ops for all timelines, cancel them
638 0 : for (_timeline_id, timeline) in timeline_list.iter() {
639 0 : let sk_iter = timeline
640 0 : .sk_set
641 0 : .iter()
642 0 : .chain(timeline.new_sk_set.iter().flatten())
643 0 : .map(|id| NodeId(*id as u64));
644 0 : sk_list.extend(sk_iter);
645 : }
646 :
647 0 : for &sk_id in sk_list.iter() {
648 0 : let pending_op = TimelinePendingOpPersistence {
649 0 : tenant_id: tenant_id.to_string(),
650 0 : timeline_id: String::new(),
651 0 : generation: i32::MAX,
652 0 : op_kind: SafekeeperTimelineOpKind::Delete,
653 0 : sk_id: sk_id.0 as i64,
654 0 : };
655 0 : tracing::info!("writing pending op for sk id {sk_id}");
656 0 : self.persistence.insert_pending_op(pending_op).await?;
657 : }
658 :
659 0 : let mut locked = self.inner.write().unwrap();
660 :
661 0 : for (timeline_id, _timeline) in timeline_list.iter() {
662 0 : for sk_id in sk_list.iter() {
663 0 : locked
664 0 : .safekeeper_reconcilers
665 0 : .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
666 0 : }
667 : }
668 :
669 : // unwrap is safe: we return above for an empty timeline list
670 0 : let max_generation = timeline_list
671 0 : .iter()
672 0 : .map(|(_tl_id, tl)| tl.generation as u32)
673 0 : .max()
674 0 : .unwrap();
675 :
676 0 : for sk_id in sk_list {
677 0 : let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
678 0 : tracing::warn!("Couldn't find safekeeper with id {sk_id}");
679 0 : continue;
680 : };
681 : // Add pending op for tenant deletion
682 0 : let req = ScheduleRequest {
683 0 : generation: max_generation,
684 0 : host_list: Vec::new(),
685 0 : kind: SafekeeperTimelineOpKind::Delete,
686 0 : safekeeper: Box::new(safekeeper.clone()),
687 0 : tenant_id,
688 0 : timeline_id: None,
689 0 : };
690 0 : locked.safekeeper_reconcilers.schedule_request(req);
691 : }
692 0 : Ok(())
693 0 : }
694 :
695 : /// Choose safekeepers for the new timeline in different azs.
696 : /// 3 are choosen by default, but may be configured via config (for testing).
697 0 : pub(crate) async fn safekeepers_for_new_timeline(
698 0 : &self,
699 0 : ) -> Result<Vec<SafekeeperInfo>, ApiError> {
700 0 : let mut all_safekeepers = {
701 0 : let locked = self.inner.read().unwrap();
702 0 : locked
703 0 : .safekeepers
704 0 : .iter()
705 0 : .filter_map(|sk| {
706 0 : if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
707 : // If we don't want to schedule stuff onto the safekeeper, respect that.
708 0 : return None;
709 0 : }
710 0 : let utilization_opt = if let SafekeeperState::Available {
711 : last_seen_at: _,
712 0 : utilization,
713 0 : } = sk.1.availability()
714 : {
715 0 : Some(utilization)
716 : } else {
717 : // non-available safekeepers still get a chance for new timelines,
718 : // but put them last in the list.
719 0 : None
720 : };
721 0 : let info = SafekeeperInfo {
722 0 : hostname: sk.1.skp.host.clone(),
723 0 : id: NodeId(sk.1.skp.id as u64),
724 0 : };
725 0 : Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
726 0 : })
727 0 : .collect::<Vec<_>>()
728 : };
729 0 : all_safekeepers.sort_by_key(|sk| {
730 : (
731 0 : sk.0.as_ref()
732 0 : .map(|ut| ut.timeline_count)
733 0 : .unwrap_or(u64::MAX),
734 : // Use the id to decide on equal scores for reliability
735 0 : sk.1.id.0,
736 : )
737 0 : });
738 : // Number of safekeepers in different AZs we are looking for
739 0 : let wanted_count = self.config.timeline_safekeeper_count;
740 :
741 0 : let mut sks = Vec::new();
742 0 : let mut azs = HashSet::new();
743 0 : for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
744 0 : if !azs.insert(az_id) {
745 0 : continue;
746 0 : }
747 0 : sks.push(sk_info.clone());
748 0 : if sks.len() == wanted_count {
749 0 : break;
750 0 : }
751 : }
752 0 : if sks.len() == wanted_count {
753 0 : Ok(sks)
754 : } else {
755 0 : Err(ApiError::InternalServerError(anyhow::anyhow!(
756 0 : "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
757 0 : sks.len(),
758 0 : all_safekeepers.len(),
759 0 : )))
760 : }
761 0 : }
762 :
763 0 : pub(crate) async fn safekeepers_list(
764 0 : &self,
765 0 : ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
766 0 : let locked = self.inner.read().unwrap();
767 0 : let mut list = locked
768 0 : .safekeepers
769 0 : .iter()
770 0 : .map(|sk| sk.1.describe_response())
771 0 : .collect::<Result<Vec<_>, _>>()?;
772 0 : list.sort_by_key(|v| v.id);
773 0 : Ok(list)
774 0 : }
775 :
776 0 : pub(crate) async fn get_safekeeper(
777 0 : &self,
778 0 : id: i64,
779 0 : ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
780 0 : let locked = self.inner.read().unwrap();
781 0 : let sk = locked
782 0 : .safekeepers
783 0 : .get(&NodeId(id as u64))
784 0 : .ok_or(diesel::result::Error::NotFound)?;
785 0 : sk.describe_response()
786 0 : }
787 :
788 0 : pub(crate) async fn upsert_safekeeper(
789 0 : self: &Arc<Service>,
790 0 : record: crate::persistence::SafekeeperUpsert,
791 0 : ) -> Result<(), ApiError> {
792 0 : let node_id = NodeId(record.id as u64);
793 0 : let use_https = self.config.use_https_safekeeper_api;
794 :
795 0 : if use_https && record.https_port.is_none() {
796 0 : return Err(ApiError::PreconditionFailed(
797 0 : format!(
798 0 : "cannot upsert safekeeper {node_id}: \
799 0 : https is enabled, but https port is not specified"
800 0 : )
801 0 : .into(),
802 0 : ));
803 0 : }
804 :
805 0 : self.persistence.safekeeper_upsert(record.clone()).await?;
806 : {
807 0 : let mut locked = self.inner.write().unwrap();
808 0 : let mut safekeepers = (*locked.safekeepers).clone();
809 0 : match safekeepers.entry(node_id) {
810 0 : std::collections::hash_map::Entry::Occupied(mut entry) => entry
811 0 : .get_mut()
812 0 : .update_from_record(record)
813 0 : .expect("all preconditions should be checked before upsert to database"),
814 0 : std::collections::hash_map::Entry::Vacant(entry) => {
815 0 : entry.insert(
816 0 : Safekeeper::from_persistence(
817 0 : crate::persistence::SafekeeperPersistence::from_upsert(
818 0 : record,
819 0 : SkSchedulingPolicy::Activating,
820 0 : ),
821 0 : CancellationToken::new(),
822 0 : use_https,
823 0 : )
824 0 : .expect("all preconditions should be checked before upsert to database"),
825 0 : );
826 0 : }
827 : }
828 0 : locked
829 0 : .safekeeper_reconcilers
830 0 : .start_reconciler(node_id, self);
831 0 : locked.safekeepers = Arc::new(safekeepers);
832 0 : metrics::METRICS_REGISTRY
833 0 : .metrics_group
834 0 : .storage_controller_safekeeper_nodes
835 0 : .set(locked.safekeepers.len() as i64);
836 0 : metrics::METRICS_REGISTRY
837 0 : .metrics_group
838 0 : .storage_controller_https_safekeeper_nodes
839 0 : .set(
840 0 : locked
841 0 : .safekeepers
842 0 : .values()
843 0 : .filter(|s| s.has_https_port())
844 0 : .count() as i64,
845 : );
846 : }
847 0 : Ok(())
848 0 : }
849 :
850 0 : pub(crate) async fn set_safekeeper_scheduling_policy(
851 0 : self: &Arc<Service>,
852 0 : id: i64,
853 0 : scheduling_policy: SkSchedulingPolicy,
854 0 : ) -> Result<(), DatabaseError> {
855 0 : self.persistence
856 0 : .set_safekeeper_scheduling_policy(id, scheduling_policy)
857 0 : .await?;
858 0 : let node_id = NodeId(id as u64);
859 : // After the change has been persisted successfully, update the in-memory state
860 0 : self.set_safekeeper_scheduling_policy_in_mem(node_id, scheduling_policy)
861 0 : .await
862 0 : }
863 :
864 0 : pub(crate) async fn set_safekeeper_scheduling_policy_in_mem(
865 0 : self: &Arc<Service>,
866 0 : node_id: NodeId,
867 0 : scheduling_policy: SkSchedulingPolicy,
868 0 : ) -> Result<(), DatabaseError> {
869 0 : let mut locked = self.inner.write().unwrap();
870 0 : let mut safekeepers = (*locked.safekeepers).clone();
871 0 : let sk = safekeepers
872 0 : .get_mut(&node_id)
873 0 : .ok_or(DatabaseError::Logical("Not found".to_string()))?;
874 0 : sk.set_scheduling_policy(scheduling_policy);
875 :
876 0 : match scheduling_policy {
877 0 : SkSchedulingPolicy::Active => {
878 0 : locked
879 0 : .safekeeper_reconcilers
880 0 : .start_reconciler(node_id, self);
881 0 : }
882 : SkSchedulingPolicy::Decomissioned
883 : | SkSchedulingPolicy::Pause
884 0 : | SkSchedulingPolicy::Activating => {
885 0 : locked.safekeeper_reconcilers.stop_reconciler(node_id);
886 0 : }
887 : }
888 :
889 0 : locked.safekeepers = Arc::new(safekeepers);
890 0 : Ok(())
891 0 : }
892 :
893 : /// Call `switch_timeline_membership` on all safekeepers with retries
894 : /// till the quorum of successful responses is reached.
895 : ///
896 : /// If min_position is not None, validates that majority of safekeepers
897 : /// reached at least min_position.
898 : ///
899 : /// If update_notified_generation is set, also updates sk_set_notified_generation
900 : /// in the timelines table.
901 : ///
902 : /// Return responses from safekeepers in the input order.
903 0 : async fn tenant_timeline_set_membership_quorum(
904 0 : self: &Arc<Self>,
905 0 : tenant_id: TenantId,
906 0 : timeline_id: TimelineId,
907 0 : safekeepers: &[Safekeeper],
908 0 : mconf: &membership::Configuration,
909 0 : min_position: Option<(Term, Lsn)>,
910 0 : update_notified_generation: bool,
911 0 : ) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
912 0 : let req = TimelineMembershipSwitchRequest {
913 0 : mconf: mconf.clone(),
914 0 : };
915 :
916 : const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
917 :
918 0 : let results = self
919 0 : .tenant_timeline_safekeeper_op_quorum(
920 0 : safekeepers,
921 0 : move |client| {
922 0 : let req = req.clone();
923 0 : async move {
924 0 : let mut res = client
925 0 : .switch_timeline_membership(tenant_id, timeline_id, &req)
926 0 : .await;
927 :
928 : // If min_position is not reached, map the response to an error,
929 : // so it isn't counted toward the quorum.
930 0 : if let Some(min_position) = min_position {
931 0 : if let Ok(ok_res) = &res {
932 0 : if (ok_res.last_log_term, ok_res.flush_lsn) < min_position {
933 0 : // Use Error::Timeout to make this error retriable.
934 0 : res = Err(mgmt_api::Error::Timeout(
935 0 : format!(
936 0 : "safekeeper {} returned position {:?} which is less than minimum required position {:?}",
937 0 : client.node_id_label(),
938 0 : (ok_res.last_log_term, ok_res.flush_lsn),
939 0 : min_position
940 0 : )
941 0 : ));
942 0 : }
943 0 : }
944 0 : }
945 :
946 0 : res
947 0 : }
948 0 : },
949 : SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT,
950 : )
951 0 : .await?;
952 :
953 0 : for res in results.iter().flatten() {
954 0 : if res.current_conf.generation > mconf.generation {
955 : // Antoher switch_membership raced us.
956 0 : return Err(ApiError::Conflict(format!(
957 0 : "received configuration with generation {} from safekeeper, but expected {}",
958 0 : res.current_conf.generation, mconf.generation
959 0 : )));
960 0 : } else if res.current_conf.generation < mconf.generation {
961 : // Note: should never happen.
962 : // If we get a response, it should be at least the sent generation.
963 0 : tracing::error!(
964 0 : "received configuration with generation {} from safekeeper, but expected {}",
965 : res.current_conf.generation,
966 : mconf.generation
967 : );
968 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
969 0 : "received configuration with generation {} from safekeeper, but expected {}",
970 0 : res.current_conf.generation,
971 0 : mconf.generation
972 0 : )));
973 0 : }
974 : }
975 :
976 0 : if update_notified_generation {
977 0 : self.persistence
978 0 : .update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation)
979 0 : .await?;
980 0 : }
981 :
982 0 : Ok(results)
983 0 : }
984 :
985 : /// Pull timeline to to_safekeepers from from_safekeepers with retries.
986 : ///
987 : /// Returns Ok(()) only if all the pull_timeline requests were successful.
988 0 : async fn tenant_timeline_pull_from_peers(
989 0 : self: &Arc<Self>,
990 0 : tenant_id: TenantId,
991 0 : timeline_id: TimelineId,
992 0 : to_safekeepers: &[Safekeeper],
993 0 : from_safekeepers: &[Safekeeper],
994 0 : ) -> Result<(), ApiError> {
995 0 : let http_hosts = from_safekeepers
996 0 : .iter()
997 0 : .map(|sk| sk.base_url())
998 0 : .collect::<Vec<_>>();
999 :
1000 0 : tracing::info!(
1001 0 : "pulling timeline to {:?} from {:?}",
1002 0 : to_safekeepers
1003 0 : .iter()
1004 0 : .map(|sk| sk.get_id())
1005 0 : .collect::<Vec<_>>(),
1006 0 : from_safekeepers
1007 0 : .iter()
1008 0 : .map(|sk| sk.get_id())
1009 0 : .collect::<Vec<_>>()
1010 : );
1011 :
1012 : // TODO(diko): need to pass mconf/generation with the request
1013 : // to properly handle tombstones. Ignore tombstones for now.
1014 : // Worst case: we leave a timeline on a safekeeper which is not in the current set.
1015 0 : let req = PullTimelineRequest {
1016 0 : tenant_id,
1017 0 : timeline_id,
1018 0 : http_hosts,
1019 0 : ignore_tombstone: Some(true),
1020 0 : };
1021 :
1022 : const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
1023 :
1024 0 : let responses = self
1025 0 : .tenant_timeline_safekeeper_op(
1026 0 : to_safekeepers,
1027 0 : move |client| {
1028 0 : let req = req.clone();
1029 0 : async move { client.pull_timeline(&req).await }
1030 0 : },
1031 : SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
1032 : )
1033 0 : .await?;
1034 :
1035 0 : if let Some((idx, err)) = responses
1036 0 : .iter()
1037 0 : .enumerate()
1038 0 : .find_map(|(idx, res)| Some((idx, res.as_ref().err()?)))
1039 : {
1040 0 : let sk_id = to_safekeepers[idx].get_id();
1041 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1042 0 : "pull_timeline to {sk_id} failed: {err}",
1043 0 : )));
1044 0 : }
1045 :
1046 0 : Ok(())
1047 0 : }
1048 :
1049 : /// Exclude a timeline from safekeepers in parallel with retries.
1050 : ///
1051 : /// Assumes that the exclude requests are already persistent in the database.
1052 : ///
1053 : /// The function does best effort: if an exclude request is unsuccessful,
1054 : /// it will be added to the in-memory reconciler, and the function will succeed anyway.
1055 : ///
1056 : /// Might fail if there is error accessing the database.
1057 0 : async fn tenant_timeline_safekeeper_exclude_reconcile(
1058 0 : self: &Arc<Self>,
1059 0 : tenant_id: TenantId,
1060 0 : timeline_id: TimelineId,
1061 0 : safekeepers: &[Safekeeper],
1062 0 : mconf: &membership::Configuration,
1063 0 : ) -> Result<(), ApiError> {
1064 0 : let req = TimelineMembershipSwitchRequest {
1065 0 : mconf: mconf.clone(),
1066 0 : };
1067 :
1068 : const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
1069 :
1070 0 : let results = self
1071 0 : .tenant_timeline_safekeeper_op(
1072 0 : safekeepers,
1073 0 : move |client| {
1074 0 : let req = req.clone();
1075 0 : async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
1076 0 : },
1077 : SK_EXCLUDE_TIMELINE_TIMEOUT,
1078 : )
1079 0 : .await?;
1080 :
1081 0 : let mut reconcile_requests = Vec::new();
1082 :
1083 0 : fail::fail_point!("sk-migration-step-9-mid-exclude", |_| {
1084 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1085 0 : "failpoint sk-migration-step-9-mid-exclude"
1086 0 : )))
1087 0 : });
1088 :
1089 0 : for (idx, res) in results.iter().enumerate() {
1090 0 : let sk_id = safekeepers[idx].skp.id;
1091 0 : let generation = mconf.generation.into_inner();
1092 :
1093 0 : if res.is_ok() {
1094 0 : self.persistence
1095 0 : .remove_pending_op(
1096 0 : tenant_id,
1097 0 : Some(timeline_id),
1098 0 : NodeId(sk_id as u64),
1099 0 : generation,
1100 0 : )
1101 0 : .await?;
1102 0 : } else {
1103 0 : let req = ScheduleRequest {
1104 0 : safekeeper: Box::new(safekeepers[idx].clone()),
1105 0 : host_list: Vec::new(),
1106 0 : tenant_id,
1107 0 : timeline_id: Some(timeline_id),
1108 0 : generation,
1109 0 : kind: SafekeeperTimelineOpKind::Exclude,
1110 0 : };
1111 0 : reconcile_requests.push(req);
1112 0 : }
1113 : }
1114 :
1115 0 : if !reconcile_requests.is_empty() {
1116 0 : let locked = self.inner.read().unwrap();
1117 0 : for req in reconcile_requests {
1118 0 : locked.safekeeper_reconcilers.schedule_request(req);
1119 0 : }
1120 0 : }
1121 :
1122 0 : Ok(())
1123 0 : }
1124 :
1125 : /// Migrate timeline safekeeper set to a new set.
1126 : ///
1127 : /// This function implements an algorithm from RFC-035.
1128 : /// <https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md>
1129 0 : pub(crate) async fn tenant_timeline_safekeeper_migrate(
1130 0 : self: &Arc<Self>,
1131 0 : tenant_id: TenantId,
1132 0 : timeline_id: TimelineId,
1133 0 : req: TimelineSafekeeperMigrateRequest,
1134 0 : ) -> Result<(), ApiError> {
1135 0 : let all_safekeepers = self.inner.read().unwrap().safekeepers.clone();
1136 :
1137 0 : let new_sk_set = req.new_sk_set;
1138 :
1139 0 : for sk_id in new_sk_set.iter() {
1140 0 : if !all_safekeepers.contains_key(sk_id) {
1141 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1142 0 : "safekeeper {sk_id} does not exist"
1143 0 : )));
1144 0 : }
1145 : }
1146 :
1147 0 : if new_sk_set.is_empty() {
1148 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1149 0 : "new safekeeper set is empty"
1150 0 : )));
1151 0 : }
1152 :
1153 0 : if new_sk_set.len() < self.config.timeline_safekeeper_count {
1154 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1155 0 : "new safekeeper set must have at least {} safekeepers",
1156 0 : self.config.timeline_safekeeper_count
1157 0 : )));
1158 0 : }
1159 :
1160 0 : let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
1161 0 : let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
1162 : // Construct new member set in advance to validate it.
1163 : // E.g. validates that there is no duplicate safekeepers.
1164 0 : let new_sk_member_set =
1165 0 : Self::make_member_set(&new_safekeepers).map_err(ApiError::BadRequest)?;
1166 :
1167 : // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
1168 0 : let _tenant_lock = trace_shared_lock(
1169 0 : &self.tenant_op_locks,
1170 0 : tenant_id,
1171 0 : TenantOperations::TimelineSafekeeperMigrate,
1172 0 : )
1173 0 : .await;
1174 :
1175 : // 1. Fetch current timeline configuration from the configuration storage.
1176 :
1177 0 : let timeline = self
1178 0 : .persistence
1179 0 : .get_timeline(tenant_id, timeline_id)
1180 0 : .await?;
1181 :
1182 0 : let Some(timeline) = timeline else {
1183 0 : return Err(ApiError::NotFound(
1184 0 : anyhow::anyhow!(
1185 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
1186 0 : )
1187 0 : .into(),
1188 0 : ));
1189 : };
1190 :
1191 0 : let cur_sk_set = timeline
1192 0 : .sk_set
1193 0 : .iter()
1194 0 : .map(|&id| NodeId(id as u64))
1195 0 : .collect::<Vec<_>>();
1196 :
1197 : // Validate that we are not migrating to a decomissioned safekeeper.
1198 0 : for sk in new_safekeepers.iter() {
1199 0 : if !cur_sk_set.contains(&sk.get_id())
1200 0 : && sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned
1201 : {
1202 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1203 0 : "safekeeper {} is decomissioned",
1204 0 : sk.get_id()
1205 0 : )));
1206 0 : }
1207 : }
1208 :
1209 0 : tracing::info!(
1210 : ?cur_sk_set,
1211 : ?new_sk_set,
1212 0 : "Migrating timeline to new safekeeper set",
1213 : );
1214 :
1215 0 : let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
1216 :
1217 0 : if let Some(ref presistent_new_sk_set) = timeline.new_sk_set {
1218 : // 2. If it is already joint one and new_set is different from desired_set refuse to change.
1219 0 : if presistent_new_sk_set
1220 0 : .iter()
1221 0 : .map(|&id| NodeId(id as u64))
1222 0 : .ne(new_sk_set.iter().cloned())
1223 : {
1224 0 : tracing::info!(
1225 : ?presistent_new_sk_set,
1226 : ?new_sk_set,
1227 0 : "different new safekeeper set is already set in the database",
1228 : );
1229 0 : return Err(ApiError::Conflict(format!(
1230 0 : "the timeline is already migrating to a different safekeeper set: {presistent_new_sk_set:?}"
1231 0 : )));
1232 0 : }
1233 : // It it is the same new_sk_set, we can continue the migration (retry).
1234 : } else {
1235 0 : let prev_finished = timeline.cplane_notified_generation == timeline.generation
1236 0 : && timeline.sk_set_notified_generation == timeline.generation;
1237 :
1238 0 : if !prev_finished {
1239 : // The previous migration is committed, but the finish step failed.
1240 : // Safekeepers/cplane might not know about the last membership configuration.
1241 : // Retry the finish step to ensure smooth migration.
1242 0 : self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
1243 0 : .await?;
1244 0 : }
1245 :
1246 0 : if cur_sk_set == new_sk_set {
1247 0 : tracing::info!("timeline is already at the desired safekeeper set");
1248 0 : return Ok(());
1249 0 : }
1250 :
1251 : // 3. No active migration yet.
1252 : // Increment current generation and put desired_set to new_sk_set.
1253 0 : generation = generation.next();
1254 :
1255 0 : self.persistence
1256 0 : .update_timeline_membership(
1257 0 : tenant_id,
1258 0 : timeline_id,
1259 0 : generation,
1260 0 : &cur_sk_set,
1261 0 : Some(&new_sk_set),
1262 0 : &[],
1263 0 : )
1264 0 : .await?;
1265 :
1266 0 : fail::fail_point!("sk-migration-after-step-3", |_| {
1267 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1268 0 : "failpoint sk-migration-after-step-3"
1269 0 : )))
1270 0 : });
1271 : }
1272 :
1273 0 : let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
1274 0 : let cur_sk_member_set =
1275 0 : Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
1276 :
1277 0 : let joint_config = membership::Configuration {
1278 0 : generation,
1279 0 : members: cur_sk_member_set,
1280 0 : new_members: Some(new_sk_member_set.clone()),
1281 0 : };
1282 :
1283 : // 4. Call PUT configuration on safekeepers from the current set,
1284 : // delivering them joint_conf.
1285 :
1286 : // Notify cplane/compute about the membership change BEFORE changing the membership on safekeepers.
1287 : // This way the compute will know about new safekeepers from joint_config before we require to
1288 : // collect a quorum from them.
1289 0 : self.cplane_notify_safekeepers(tenant_id, timeline_id, &joint_config)
1290 0 : .await?;
1291 :
1292 0 : let results = self
1293 0 : .tenant_timeline_set_membership_quorum(
1294 0 : tenant_id,
1295 0 : timeline_id,
1296 0 : &cur_safekeepers,
1297 0 : &joint_config,
1298 0 : None, // no min position
1299 0 : true, // update notified generation
1300 0 : )
1301 0 : .await?;
1302 :
1303 0 : let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
1304 0 : for res in results.into_iter().flatten() {
1305 0 : let sk_position = (res.last_log_term, res.flush_lsn);
1306 0 : if sync_position < sk_position {
1307 0 : sync_position = sk_position;
1308 0 : }
1309 : }
1310 :
1311 0 : tracing::info!(
1312 : %generation,
1313 : ?sync_position,
1314 0 : "safekeepers set membership updated",
1315 : );
1316 :
1317 0 : fail::fail_point!("sk-migration-after-step-4", |_| {
1318 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1319 0 : "failpoint sk-migration-after-step-4"
1320 0 : )))
1321 0 : });
1322 :
1323 : // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
1324 : // by doing pull_timeline from the majority of the current set.
1325 :
1326 : // Filter out safekeepers which are already in the current set.
1327 0 : let from_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
1328 0 : let pull_to_safekeepers = new_safekeepers
1329 0 : .iter()
1330 0 : .filter(|sk| !from_ids.contains(&sk.get_id()))
1331 0 : .cloned()
1332 0 : .collect::<Vec<_>>();
1333 :
1334 0 : self.tenant_timeline_pull_from_peers(
1335 0 : tenant_id,
1336 0 : timeline_id,
1337 0 : &pull_to_safekeepers,
1338 0 : &cur_safekeepers,
1339 0 : )
1340 0 : .await?;
1341 :
1342 0 : fail::fail_point!("sk-migration-after-step-5", |_| {
1343 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1344 0 : "failpoint sk-migration-after-step-5"
1345 0 : )))
1346 0 : });
1347 :
1348 : // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
1349 :
1350 : // TODO(diko): do we need to bump timeline term?
1351 :
1352 : // 7. Repeatedly call PUT configuration on safekeepers from the new set,
1353 : // delivering them joint_conf and collecting their positions.
1354 :
1355 0 : tracing::info!(?sync_position, "waiting for safekeepers to sync position");
1356 :
1357 0 : self.tenant_timeline_set_membership_quorum(
1358 0 : tenant_id,
1359 0 : timeline_id,
1360 0 : &new_safekeepers,
1361 0 : &joint_config,
1362 0 : Some(sync_position),
1363 0 : false, // we're just waiting for sync position, don't update notified generation
1364 0 : )
1365 0 : .await?;
1366 :
1367 0 : fail::fail_point!("sk-migration-after-step-7", |_| {
1368 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1369 0 : "failpoint sk-migration-after-step-7"
1370 0 : )))
1371 0 : });
1372 :
1373 : // 8. Create new_conf: Configuration incrementing joint_conf generation and
1374 : // having new safekeeper set as sk_set and None new_sk_set.
1375 :
1376 0 : let generation = generation.next();
1377 :
1378 0 : let new_conf = membership::Configuration {
1379 0 : generation,
1380 0 : members: new_sk_member_set,
1381 0 : new_members: None,
1382 0 : };
1383 :
1384 0 : let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
1385 0 : let exclude_safekeepers = cur_safekeepers
1386 0 : .into_iter()
1387 0 : .filter(|sk| !new_ids.contains(&sk.get_id()))
1388 0 : .collect::<Vec<_>>();
1389 0 : let exclude_requests = exclude_safekeepers
1390 0 : .iter()
1391 0 : .map(|sk| TimelinePendingOpPersistence {
1392 0 : sk_id: sk.skp.id,
1393 0 : tenant_id: tenant_id.to_string(),
1394 0 : timeline_id: timeline_id.to_string(),
1395 0 : generation: generation.into_inner() as i32,
1396 0 : op_kind: SafekeeperTimelineOpKind::Exclude,
1397 0 : })
1398 0 : .collect::<Vec<_>>();
1399 :
1400 0 : self.persistence
1401 0 : .update_timeline_membership(
1402 0 : tenant_id,
1403 0 : timeline_id,
1404 0 : generation,
1405 0 : &new_sk_set,
1406 0 : None,
1407 0 : &exclude_requests,
1408 0 : )
1409 0 : .await?;
1410 :
1411 0 : fail::fail_point!("sk-migration-after-step-8", |_| {
1412 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1413 0 : "failpoint sk-migration-after-step-8"
1414 0 : )))
1415 0 : });
1416 :
1417 : // At this point we have already updated the timeline in the database, so the final
1418 : // membership configuration is commited and the migration is not abortable anymore.
1419 : // But safekeepers and cplane/compute still need to be notified about the new configuration.
1420 : // The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about
1421 : // the new configuration and reconciles excluded safekeepers.
1422 : // If it fails, the safkeeper migration call should be retried.
1423 :
1424 0 : self.finish_safekeeper_migration(
1425 0 : tenant_id,
1426 0 : timeline_id,
1427 0 : &new_safekeepers,
1428 0 : &new_conf,
1429 0 : &exclude_safekeepers,
1430 0 : )
1431 0 : .await?;
1432 :
1433 0 : Ok(())
1434 0 : }
1435 :
1436 : /// Notify cplane about safekeeper membership change.
1437 : /// The cplane will receive a joint set of safekeepers as a safekeeper list.
1438 0 : async fn cplane_notify_safekeepers(
1439 0 : &self,
1440 0 : tenant_id: TenantId,
1441 0 : timeline_id: TimelineId,
1442 0 : mconf: &membership::Configuration,
1443 0 : ) -> Result<(), ApiError> {
1444 0 : let mut safekeepers = Vec::new();
1445 0 : let mut ids: HashSet<_> = HashSet::new();
1446 :
1447 0 : for member in mconf
1448 0 : .members
1449 0 : .m
1450 0 : .iter()
1451 0 : .chain(mconf.new_members.iter().flat_map(|m| m.m.iter()))
1452 : {
1453 0 : if ids.insert(member.id) {
1454 0 : safekeepers.push(compute_hook::SafekeeperInfo {
1455 0 : id: member.id,
1456 0 : hostname: Some(member.host.clone()),
1457 0 : });
1458 0 : }
1459 : }
1460 :
1461 0 : self.compute_hook
1462 0 : .notify_safekeepers(
1463 0 : compute_hook::SafekeepersUpdate {
1464 0 : tenant_id,
1465 0 : timeline_id,
1466 0 : generation: mconf.generation,
1467 0 : safekeepers,
1468 0 : },
1469 0 : &self.cancel,
1470 0 : )
1471 0 : .await
1472 0 : .map_err(|err| {
1473 0 : ApiError::InternalServerError(anyhow::anyhow!(
1474 0 : "failed to notify cplane about safekeeper membership change: {err}"
1475 0 : ))
1476 0 : })?;
1477 :
1478 0 : self.persistence
1479 0 : .update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation)
1480 0 : .await?;
1481 :
1482 0 : Ok(())
1483 0 : }
1484 :
1485 : /// Finish safekeeper migration.
1486 : ///
1487 : /// It is the last step of the safekeeper migration.
1488 : ///
1489 : /// Notifies safekeepers and cplane about the final membership configuration,
1490 : /// reconciles excluded safekeepers and updates *_notified_generation in the database.
1491 0 : async fn finish_safekeeper_migration(
1492 0 : self: &Arc<Self>,
1493 0 : tenant_id: TenantId,
1494 0 : timeline_id: TimelineId,
1495 0 : new_safekeepers: &[Safekeeper],
1496 0 : new_conf: &membership::Configuration,
1497 0 : exclude_safekeepers: &[Safekeeper],
1498 0 : ) -> Result<(), ApiError> {
1499 : // 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf.
1500 : // Also try to exclude safekeepers and notify cplane about the membership change.
1501 :
1502 0 : self.tenant_timeline_set_membership_quorum(
1503 0 : tenant_id,
1504 0 : timeline_id,
1505 0 : new_safekeepers,
1506 0 : new_conf,
1507 0 : None, // no min position
1508 0 : true, // update notified generation
1509 0 : )
1510 0 : .await?;
1511 :
1512 0 : fail::fail_point!("sk-migration-step-9-after-set-membership", |_| {
1513 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1514 0 : "failpoint sk-migration-step-9-after-set-membership"
1515 0 : )))
1516 0 : });
1517 :
1518 0 : self.tenant_timeline_safekeeper_exclude_reconcile(
1519 0 : tenant_id,
1520 0 : timeline_id,
1521 0 : exclude_safekeepers,
1522 0 : new_conf,
1523 0 : )
1524 0 : .await?;
1525 :
1526 0 : fail::fail_point!("sk-migration-step-9-after-exclude", |_| {
1527 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1528 0 : "failpoint sk-migration-step-9-after-exclude"
1529 0 : )))
1530 0 : });
1531 :
1532 : // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
1533 : // This way the compute will stop talking to excluded safekeepers only after we stop requiring to
1534 : // collect a quorum from them.
1535 0 : self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf)
1536 0 : .await?;
1537 :
1538 0 : fail::fail_point!("sk-migration-after-step-9", |_| {
1539 0 : Err(ApiError::BadRequest(anyhow::anyhow!(
1540 0 : "failpoint sk-migration-after-step-9"
1541 0 : )))
1542 0 : });
1543 :
1544 0 : Ok(())
1545 0 : }
1546 :
1547 : /// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database.
1548 : /// It's used when the migration failed during the finish step and we need to retry it.
1549 0 : async fn finish_safekeeper_migration_retry(
1550 0 : self: &Arc<Self>,
1551 0 : tenant_id: TenantId,
1552 0 : timeline_id: TimelineId,
1553 0 : timeline: &TimelinePersistence,
1554 0 : ) -> Result<(), ApiError> {
1555 0 : if timeline.new_sk_set.is_some() {
1556 : // Logical error, should never happen.
1557 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1558 0 : "can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None"
1559 0 : )));
1560 0 : }
1561 :
1562 0 : let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
1563 0 : let cur_sk_member_set =
1564 0 : Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
1565 :
1566 0 : let mconf = membership::Configuration {
1567 0 : generation: SafekeeperGeneration::new(timeline.generation as u32),
1568 0 : members: cur_sk_member_set,
1569 0 : new_members: None,
1570 0 : };
1571 :
1572 : // We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler.
1573 : // Reload them from the database.
1574 0 : let pending_ops = self
1575 0 : .persistence
1576 0 : .list_pending_ops_for_timeline(tenant_id, timeline_id)
1577 0 : .await?;
1578 :
1579 0 : let mut exclude_sk_ids = Vec::new();
1580 :
1581 0 : for op in pending_ops {
1582 0 : if op.op_kind == SafekeeperTimelineOpKind::Exclude
1583 0 : && op.generation == timeline.generation
1584 0 : {
1585 0 : exclude_sk_ids.push(op.sk_id);
1586 0 : }
1587 : }
1588 :
1589 0 : let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?;
1590 :
1591 0 : self.finish_safekeeper_migration(
1592 0 : tenant_id,
1593 0 : timeline_id,
1594 0 : &cur_safekeepers,
1595 0 : &mconf,
1596 0 : &exclude_safekeepers,
1597 0 : )
1598 0 : .await?;
1599 :
1600 0 : Ok(())
1601 0 : }
1602 : }
|