Line data Source code
1 : use std::collections::HashSet;
2 : use std::str::FromStr;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use super::safekeeper_reconciler::ScheduleRequest;
7 : use crate::compute_hook;
8 : use crate::heartbeater::SafekeeperState;
9 : use crate::id_lock_map::trace_shared_lock;
10 : use crate::metrics;
11 : use crate::persistence::{
12 : DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
13 : };
14 : use crate::safekeeper::Safekeeper;
15 : use crate::safekeeper_client::SafekeeperClient;
16 : use crate::service::TenantOperations;
17 : use crate::timeline_import::TimelineImportFinalizeError;
18 : use anyhow::Context;
19 : use http_utils::error::ApiError;
20 : use pageserver_api::controller_api::{
21 : SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
22 : TimelineSafekeeperMigrateRequest,
23 : };
24 : use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
25 : use safekeeper_api::PgVersionId;
26 : use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
27 : use safekeeper_api::models::{
28 : PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse,
29 : };
30 : use safekeeper_api::{INITIAL_TERM, Term};
31 : use safekeeper_client::mgmt_api;
32 : use tokio::task::JoinSet;
33 : use tokio_util::sync::CancellationToken;
34 : use utils::id::{NodeId, TenantId, TimelineId};
35 : use utils::logging::SecretString;
36 : use utils::lsn::Lsn;
37 :
38 : use super::Service;
39 :
40 0 : #[derive(serde::Serialize, serde::Deserialize, Clone)]
41 : pub struct TimelineLocateResponse {
42 : pub generation: SafekeeperGeneration,
43 : pub sk_set: Vec<NodeId>,
44 : pub new_sk_set: Option<Vec<NodeId>>,
45 : }
46 :
47 : impl Service {
48 0 : fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, ApiError> {
49 0 : let members = safekeepers
50 0 : .iter()
51 0 : .map(|sk| sk.get_safekeeper_id())
52 0 : .collect::<Vec<_>>();
53 :
54 0 : MemberSet::new(members).map_err(ApiError::InternalServerError)
55 0 : }
56 :
57 0 : fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
58 0 : let safekeepers = {
59 0 : let locked = self.inner.read().unwrap();
60 0 : locked.safekeepers.clone()
61 : };
62 :
63 0 : ids.iter()
64 0 : .map(|&id| {
65 0 : let node_id = NodeId(id as u64);
66 0 : safekeepers.get(&node_id).cloned().ok_or_else(|| {
67 0 : ApiError::InternalServerError(anyhow::anyhow!(
68 0 : "safekeeper {node_id} is not registered"
69 0 : ))
70 0 : })
71 0 : })
72 0 : .collect::<Result<Vec<_>, _>>()
73 0 : }
74 :
75 : /// Timeline creation on safekeepers
76 : ///
77 : /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
78 : /// where `left` contains the list of safekeepers that didn't have a successful response.
79 : /// Assumes tenant lock is held while calling this function.
80 0 : pub(super) async fn tenant_timeline_create_safekeepers_quorum(
81 0 : &self,
82 0 : tenant_id: TenantId,
83 0 : timeline_id: TimelineId,
84 0 : pg_version: PgVersionId,
85 0 : timeline_persistence: &TimelinePersistence,
86 0 : ) -> Result<Vec<NodeId>, ApiError> {
87 0 : let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
88 :
89 0 : let mset = Self::make_member_set(&safekeepers)?;
90 0 : let mconf = safekeeper_api::membership::Configuration::new(mset);
91 :
92 0 : let req = safekeeper_api::models::TimelineCreateRequest {
93 0 : commit_lsn: None,
94 0 : mconf,
95 0 : pg_version,
96 0 : start_lsn: timeline_persistence.start_lsn.0,
97 0 : system_id: None,
98 0 : tenant_id,
99 0 : timeline_id,
100 0 : wal_seg_size: None,
101 0 : };
102 :
103 : const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
104 :
105 0 : let results = self
106 0 : .tenant_timeline_safekeeper_op_quorum(
107 0 : &safekeepers,
108 0 : move |client| {
109 0 : let req = req.clone();
110 0 : async move { client.create_timeline(&req).await }
111 0 : },
112 : SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
113 : )
114 0 : .await?;
115 :
116 0 : Ok(results
117 0 : .into_iter()
118 0 : .enumerate()
119 0 : .filter_map(|(idx, res)| {
120 0 : if res.is_ok() {
121 0 : None // Success, don't return this safekeeper
122 : } else {
123 0 : Some(safekeepers[idx].get_id()) // Failure, return this safekeeper
124 : }
125 0 : })
126 0 : .collect::<Vec<_>>())
127 0 : }
128 :
129 : /// Perform an operation on a list of safekeepers in parallel with retries.
130 : ///
131 : /// Return the results of the operation on each safekeeper in the input order.
132 0 : async fn tenant_timeline_safekeeper_op<T, O, F>(
133 0 : &self,
134 0 : safekeepers: &[Safekeeper],
135 0 : op: O,
136 0 : timeout: Duration,
137 0 : ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
138 0 : where
139 0 : O: FnMut(SafekeeperClient) -> F + Send + 'static,
140 0 : O: Clone,
141 0 : F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
142 0 : T: Sync + Send + 'static,
143 0 : {
144 0 : let jwt = self
145 0 : .config
146 0 : .safekeeper_jwt_token
147 0 : .clone()
148 0 : .map(SecretString::from);
149 0 : let mut joinset = JoinSet::new();
150 :
151 0 : for (idx, sk) in safekeepers.iter().enumerate() {
152 0 : let sk = sk.clone();
153 0 : let http_client = self.http_client.clone();
154 0 : let jwt = jwt.clone();
155 0 : let op = op.clone();
156 0 : joinset.spawn(async move {
157 0 : let res = sk
158 0 : .with_client_retries(
159 0 : op,
160 0 : &http_client,
161 0 : &jwt,
162 0 : 3,
163 0 : 3,
164 0 : // TODO(diko): This is a wrong timeout.
165 0 : // It should be scaled to the retry count.
166 0 : timeout,
167 0 : &CancellationToken::new(),
168 0 : )
169 0 : .await;
170 0 : (idx, res)
171 0 : });
172 : }
173 :
174 : // Initialize results with timeout errors in case we never get a response.
175 0 : let mut results: Vec<mgmt_api::Result<T>> = safekeepers
176 0 : .iter()
177 0 : .map(|_| {
178 0 : Err(mgmt_api::Error::Timeout(
179 0 : "safekeeper operation timed out".to_string(),
180 0 : ))
181 0 : })
182 0 : .collect();
183 :
184 : // After we have built the joinset, we now wait for the tasks to complete,
185 : // but with a specified timeout to make sure we return swiftly, either with
186 : // a failure or success.
187 0 : let reconcile_deadline = tokio::time::Instant::now() + timeout;
188 :
189 : // Wait until all tasks finish or timeout is hit, whichever occurs
190 : // first.
191 0 : let mut result_count = 0;
192 : loop {
193 0 : if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
194 : {
195 0 : let Some(res) = res else { break };
196 0 : match res {
197 0 : Ok((idx, res)) => {
198 0 : let sk = &safekeepers[idx];
199 0 : tracing::info!(
200 0 : "response from safekeeper id:{} at {}: {:?}",
201 0 : sk.get_id(),
202 : sk.skp.host,
203 : // Only print errors, as there is no Debug trait for T.
204 0 : res.as_ref().map(|_| ()),
205 : );
206 0 : results[idx] = res;
207 0 : result_count += 1;
208 : }
209 0 : Err(join_err) => {
210 0 : tracing::info!("join_err for task in joinset: {join_err}");
211 : }
212 : }
213 : } else {
214 0 : tracing::info!("timeout for operation call after {result_count} responses",);
215 0 : break;
216 : }
217 : }
218 :
219 0 : Ok(results)
220 0 : }
221 :
222 : /// Perform an operation on a list of safekeepers in parallel with retries,
223 : /// and validates that we reach a quorum of successful responses.
224 : ///
225 : /// Return the results of the operation on each safekeeper in the input order.
226 : /// It's guaranteed that at least a quorum of the responses are successful.
227 0 : async fn tenant_timeline_safekeeper_op_quorum<T, O, F>(
228 0 : &self,
229 0 : safekeepers: &[Safekeeper],
230 0 : op: O,
231 0 : timeout: Duration,
232 0 : ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
233 0 : where
234 0 : O: FnMut(SafekeeperClient) -> F,
235 0 : O: Clone + Send + 'static,
236 0 : F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
237 0 : T: Sync + Send + 'static,
238 0 : {
239 0 : let target_sk_count = safekeepers.len();
240 :
241 0 : if target_sk_count == 0 {
242 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
243 0 : "timeline configured without any safekeepers"
244 0 : )));
245 0 : }
246 :
247 0 : if target_sk_count < self.config.timeline_safekeeper_count {
248 0 : tracing::warn!(
249 0 : "running a quorum operation with {} safekeepers, which is less than configured {} safekeepers per timeline",
250 : target_sk_count,
251 : self.config.timeline_safekeeper_count
252 : );
253 0 : }
254 :
255 0 : let results = self
256 0 : .tenant_timeline_safekeeper_op(safekeepers, op, timeout)
257 0 : .await?;
258 :
259 : // Now check if quorum was reached in results.
260 :
261 0 : let quorum_size = target_sk_count / 2 + 1;
262 :
263 0 : let success_count = results.iter().filter(|res| res.is_ok()).count();
264 0 : if success_count < quorum_size {
265 : // Failure
266 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
267 0 : "not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
268 0 : )));
269 0 : }
270 :
271 0 : Ok(results)
272 0 : }
273 :
274 : /// Create timeline in controller database and on safekeepers.
275 : /// `timeline_info` is result of timeline creation on pageserver.
276 : ///
277 : /// All actions must be idempotent as the call is retried until success. It
278 : /// tries to create timeline in the db and on at least majority of
279 : /// safekeepers + queue creation for safekeepers which missed it in the db
280 : /// for infinite retries; after that, call returns Ok.
281 : ///
282 : /// The idea is that once this is reached as long as we have alive majority
283 : /// of safekeepers it is expected to get eventually operational as storcon
284 : /// will be able to seed timeline on nodes which missed creation by making
285 : /// pull_timeline from peers. On the other hand we don't want to fail
286 : /// timeline creation if one safekeeper is down.
287 0 : pub(super) async fn tenant_timeline_create_safekeepers(
288 0 : self: &Arc<Self>,
289 0 : tenant_id: TenantId,
290 0 : timeline_info: &TimelineInfo,
291 0 : read_only: bool,
292 0 : ) -> Result<SafekeepersInfo, ApiError> {
293 0 : let timeline_id = timeline_info.timeline_id;
294 0 : let pg_version = PgVersionId::from(timeline_info.pg_version);
295 : // Initially start_lsn is determined by last_record_lsn in pageserver
296 : // response as it does initdb. However, later we persist it and in sk
297 : // creation calls replace with the value from the timeline row if it
298 : // previously existed as on retries in theory endpoint might have
299 : // already written some data and advanced last_record_lsn, while we want
300 : // safekeepers to have consistent start_lsn.
301 0 : let start_lsn = timeline_info.last_record_lsn;
302 :
303 : // Choose initial set of safekeepers respecting affinity
304 0 : let sks = if !read_only {
305 0 : self.safekeepers_for_new_timeline().await?
306 : } else {
307 0 : Vec::new()
308 : };
309 0 : let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
310 : // Add timeline to db
311 0 : let mut timeline_persist = TimelinePersistence {
312 0 : tenant_id: tenant_id.to_string(),
313 0 : timeline_id: timeline_id.to_string(),
314 0 : start_lsn: start_lsn.into(),
315 0 : generation: 1,
316 0 : sk_set: sks_persistence.clone(),
317 0 : new_sk_set: None,
318 0 : cplane_notified_generation: 0,
319 0 : deleted_at: None,
320 0 : };
321 0 : let inserted = self
322 0 : .persistence
323 0 : .insert_timeline(timeline_persist.clone())
324 0 : .await?;
325 0 : if !inserted {
326 0 : if let Some(existent_persist) = self
327 0 : .persistence
328 0 : .get_timeline(tenant_id, timeline_id)
329 0 : .await?
330 0 : {
331 0 : // Replace with what we have in the db, to get stuff like the generation right.
332 0 : // We do still repeat the http calls to the safekeepers. After all, we could have
333 0 : // crashed right after the wrote to the DB.
334 0 : timeline_persist = existent_persist;
335 0 : } else {
336 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
337 0 : "insertion said timeline already in db, but looking it up, it was gone"
338 0 : )));
339 : }
340 0 : }
341 0 : let ret = SafekeepersInfo {
342 0 : generation: timeline_persist.generation as u32,
343 0 : safekeepers: sks.clone(),
344 0 : tenant_id,
345 0 : timeline_id,
346 0 : };
347 0 : if read_only {
348 0 : return Ok(ret);
349 0 : }
350 :
351 : // Create the timeline on a quorum of safekeepers
352 0 : let remaining = self
353 0 : .tenant_timeline_create_safekeepers_quorum(
354 0 : tenant_id,
355 0 : timeline_id,
356 0 : pg_version,
357 0 : &timeline_persist,
358 0 : )
359 0 : .await?;
360 :
361 : // For the remaining safekeepers, take care of their reconciliation asynchronously
362 0 : for &remaining_id in remaining.iter() {
363 0 : let pending_op = TimelinePendingOpPersistence {
364 0 : tenant_id: tenant_id.to_string(),
365 0 : timeline_id: timeline_id.to_string(),
366 0 : generation: timeline_persist.generation,
367 0 : op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
368 0 : sk_id: remaining_id.0 as i64,
369 0 : };
370 0 : tracing::info!("writing pending op for sk id {remaining_id}");
371 0 : self.persistence.insert_pending_op(pending_op).await?;
372 : }
373 0 : if !remaining.is_empty() {
374 0 : let locked = self.inner.read().unwrap();
375 0 : for remaining_id in remaining {
376 0 : let Some(sk) = locked.safekeepers.get(&remaining_id) else {
377 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
378 0 : "Couldn't find safekeeper with id {remaining_id}"
379 0 : )));
380 : };
381 0 : let Ok(host_list) = sks
382 0 : .iter()
383 0 : .map(|sk| {
384 : Ok((
385 0 : sk.id,
386 0 : locked
387 0 : .safekeepers
388 0 : .get(&sk.id)
389 0 : .ok_or_else(|| {
390 0 : ApiError::InternalServerError(anyhow::anyhow!(
391 0 : "Couldn't find safekeeper with id {} to pull from",
392 0 : sk.id
393 0 : ))
394 0 : })?
395 0 : .base_url(),
396 : ))
397 0 : })
398 0 : .collect::<Result<_, ApiError>>()
399 : else {
400 0 : continue;
401 : };
402 0 : let req = ScheduleRequest {
403 0 : safekeeper: Box::new(sk.clone()),
404 0 : host_list,
405 0 : tenant_id,
406 0 : timeline_id: Some(timeline_id),
407 0 : generation: timeline_persist.generation as u32,
408 0 : kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
409 0 : };
410 0 : locked.safekeeper_reconcilers.schedule_request(req);
411 : }
412 0 : }
413 :
414 0 : Ok(ret)
415 0 : }
416 :
417 0 : pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
418 0 : self: &Arc<Self>,
419 0 : tenant_id: TenantId,
420 0 : timeline_info: TimelineInfo,
421 0 : ) -> Result<(), TimelineImportFinalizeError> {
422 : const BACKOFF: Duration = Duration::from_secs(5);
423 :
424 : loop {
425 0 : if self.cancel.is_cancelled() {
426 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
427 0 : }
428 :
429 : // This function is only used in non-read-only scenarios
430 0 : let read_only = false;
431 0 : let res = self
432 0 : .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
433 0 : .await;
434 :
435 0 : match res {
436 : Ok(_) => {
437 0 : tracing::info!("Timeline created on safekeepers");
438 0 : break;
439 : }
440 0 : Err(err) => {
441 0 : tracing::error!("Failed to create timeline on safekeepers: {err}");
442 0 : tokio::select! {
443 0 : _ = self.cancel.cancelled() => {
444 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
445 : },
446 0 : _ = tokio::time::sleep(BACKOFF) => {}
447 : };
448 : }
449 : }
450 : }
451 :
452 0 : Ok(())
453 0 : }
454 :
455 : /// Directly insert the timeline into the database without reconciling it with safekeepers.
456 : ///
457 : /// Useful if the timeline already exists on the specified safekeepers,
458 : /// but we want to make it storage controller managed.
459 0 : pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
460 0 : let persistence = TimelinePersistence {
461 0 : tenant_id: req.tenant_id.to_string(),
462 0 : timeline_id: req.timeline_id.to_string(),
463 0 : start_lsn: Lsn::INVALID.into(),
464 : generation: 1,
465 0 : sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
466 0 : new_sk_set: None,
467 : cplane_notified_generation: 1,
468 0 : deleted_at: None,
469 : };
470 0 : let inserted = self.persistence.insert_timeline(persistence).await?;
471 0 : if inserted {
472 0 : tracing::info!("imported timeline into db");
473 : } else {
474 0 : tracing::info!("didn't import timeline into db, as it is already present in db");
475 : }
476 0 : Ok(())
477 0 : }
478 :
479 : /// Locate safekeepers for a timeline.
480 : /// Return the generation, sk_set and new_sk_set if present.
481 : /// If the timeline is not storcon-managed, return NotFound.
482 0 : pub(crate) async fn tenant_timeline_locate(
483 0 : &self,
484 0 : tenant_id: TenantId,
485 0 : timeline_id: TimelineId,
486 0 : ) -> Result<TimelineLocateResponse, ApiError> {
487 0 : let timeline = self
488 0 : .persistence
489 0 : .get_timeline(tenant_id, timeline_id)
490 0 : .await?;
491 :
492 0 : let Some(timeline) = timeline else {
493 0 : return Err(ApiError::NotFound(
494 0 : anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
495 0 : ));
496 : };
497 :
498 : Ok(TimelineLocateResponse {
499 0 : generation: SafekeeperGeneration::new(timeline.generation as u32),
500 0 : sk_set: timeline
501 0 : .sk_set
502 0 : .iter()
503 0 : .map(|id| NodeId(*id as u64))
504 0 : .collect(),
505 0 : new_sk_set: timeline
506 0 : .new_sk_set
507 0 : .map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
508 : })
509 0 : }
510 :
511 : /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
512 0 : pub(super) async fn tenant_timeline_delete_safekeepers(
513 0 : self: &Arc<Self>,
514 0 : tenant_id: TenantId,
515 0 : timeline_id: TimelineId,
516 0 : ) -> Result<(), ApiError> {
517 0 : let tl = self
518 0 : .persistence
519 0 : .get_timeline(tenant_id, timeline_id)
520 0 : .await?;
521 0 : let Some(tl) = tl else {
522 0 : tracing::info!(
523 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
524 : );
525 0 : return Ok(());
526 : };
527 0 : self.persistence
528 0 : .timeline_set_deleted_at(tenant_id, timeline_id)
529 0 : .await?;
530 0 : let all_sks = tl
531 0 : .new_sk_set
532 0 : .iter()
533 0 : .flatten()
534 0 : .chain(tl.sk_set.iter())
535 0 : .collect::<HashSet<_>>();
536 :
537 : // The timeline has no safekeepers: we need to delete it from the db manually,
538 : // as no safekeeper reconciler will get to it
539 0 : if all_sks.is_empty() {
540 0 : if let Err(err) = self
541 0 : .persistence
542 0 : .delete_timeline(tenant_id, timeline_id)
543 0 : .await
544 : {
545 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
546 0 : }
547 0 : }
548 :
549 : // Schedule reconciliations
550 0 : for &sk_id in all_sks.iter() {
551 0 : let pending_op = TimelinePendingOpPersistence {
552 0 : tenant_id: tenant_id.to_string(),
553 0 : timeline_id: timeline_id.to_string(),
554 0 : generation: i32::MAX,
555 0 : op_kind: SafekeeperTimelineOpKind::Delete,
556 0 : sk_id: *sk_id,
557 0 : };
558 0 : tracing::info!("writing pending op for sk id {sk_id}");
559 0 : self.persistence.insert_pending_op(pending_op).await?;
560 : }
561 : {
562 0 : let locked = self.inner.read().unwrap();
563 0 : for sk_id in all_sks {
564 0 : let sk_id = NodeId(*sk_id as u64);
565 0 : let Some(sk) = locked.safekeepers.get(&sk_id) else {
566 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
567 0 : "Couldn't find safekeeper with id {sk_id}"
568 0 : )));
569 : };
570 :
571 0 : let req = ScheduleRequest {
572 0 : safekeeper: Box::new(sk.clone()),
573 0 : // we don't use this for this kind, put a dummy value
574 0 : host_list: Vec::new(),
575 0 : tenant_id,
576 0 : timeline_id: Some(timeline_id),
577 0 : generation: tl.generation as u32,
578 0 : kind: SafekeeperTimelineOpKind::Delete,
579 0 : };
580 0 : locked.safekeeper_reconcilers.schedule_request(req);
581 : }
582 : }
583 0 : Ok(())
584 0 : }
585 :
586 : /// Perform tenant deletion on safekeepers.
587 0 : pub(super) async fn tenant_delete_safekeepers(
588 0 : self: &Arc<Self>,
589 0 : tenant_id: TenantId,
590 0 : ) -> Result<(), ApiError> {
591 0 : let timeline_list = self
592 0 : .persistence
593 0 : .list_timelines_for_tenant(tenant_id)
594 0 : .await?;
595 :
596 0 : if timeline_list.is_empty() {
597 : // Early exit: the tenant is either empty or not migrated to the storcon yet
598 0 : tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
599 0 : return Ok(());
600 0 : }
601 :
602 0 : let timeline_list = timeline_list
603 0 : .into_iter()
604 0 : .map(|timeline| {
605 0 : let timeline_id = TimelineId::from_str(&timeline.timeline_id)
606 0 : .context("timeline id loaded from db")
607 0 : .map_err(ApiError::InternalServerError)?;
608 0 : Ok((timeline_id, timeline))
609 0 : })
610 0 : .collect::<Result<Vec<_>, ApiError>>()?;
611 :
612 : // Remove pending ops from db, and set `deleted_at`.
613 : // We cancel them in a later iteration once we hold the state lock.
614 0 : for (timeline_id, _timeline) in timeline_list.iter() {
615 0 : self.persistence
616 0 : .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
617 0 : .await?;
618 0 : self.persistence
619 0 : .timeline_set_deleted_at(tenant_id, *timeline_id)
620 0 : .await?;
621 : }
622 :
623 : // The list of safekeepers that have any of the timelines
624 0 : let mut sk_list = HashSet::new();
625 :
626 : // List all pending ops for all timelines, cancel them
627 0 : for (_timeline_id, timeline) in timeline_list.iter() {
628 0 : let sk_iter = timeline
629 0 : .sk_set
630 0 : .iter()
631 0 : .chain(timeline.new_sk_set.iter().flatten())
632 0 : .map(|id| NodeId(*id as u64));
633 0 : sk_list.extend(sk_iter);
634 : }
635 :
636 0 : for &sk_id in sk_list.iter() {
637 0 : let pending_op = TimelinePendingOpPersistence {
638 0 : tenant_id: tenant_id.to_string(),
639 0 : timeline_id: String::new(),
640 0 : generation: i32::MAX,
641 0 : op_kind: SafekeeperTimelineOpKind::Delete,
642 0 : sk_id: sk_id.0 as i64,
643 0 : };
644 0 : tracing::info!("writing pending op for sk id {sk_id}");
645 0 : self.persistence.insert_pending_op(pending_op).await?;
646 : }
647 :
648 0 : let mut locked = self.inner.write().unwrap();
649 :
650 0 : for (timeline_id, _timeline) in timeline_list.iter() {
651 0 : for sk_id in sk_list.iter() {
652 0 : locked
653 0 : .safekeeper_reconcilers
654 0 : .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
655 0 : }
656 : }
657 :
658 : // unwrap is safe: we return above for an empty timeline list
659 0 : let max_generation = timeline_list
660 0 : .iter()
661 0 : .map(|(_tl_id, tl)| tl.generation as u32)
662 0 : .max()
663 0 : .unwrap();
664 :
665 0 : for sk_id in sk_list {
666 0 : let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
667 0 : tracing::warn!("Couldn't find safekeeper with id {sk_id}");
668 0 : continue;
669 : };
670 : // Add pending op for tenant deletion
671 0 : let req = ScheduleRequest {
672 0 : generation: max_generation,
673 0 : host_list: Vec::new(),
674 0 : kind: SafekeeperTimelineOpKind::Delete,
675 0 : safekeeper: Box::new(safekeeper.clone()),
676 0 : tenant_id,
677 0 : timeline_id: None,
678 0 : };
679 0 : locked.safekeeper_reconcilers.schedule_request(req);
680 : }
681 0 : Ok(())
682 0 : }
683 :
684 : /// Choose safekeepers for the new timeline in different azs.
685 : /// 3 are choosen by default, but may be configured via config (for testing).
686 0 : pub(crate) async fn safekeepers_for_new_timeline(
687 0 : &self,
688 0 : ) -> Result<Vec<SafekeeperInfo>, ApiError> {
689 0 : let mut all_safekeepers = {
690 0 : let locked = self.inner.read().unwrap();
691 0 : locked
692 0 : .safekeepers
693 0 : .iter()
694 0 : .filter_map(|sk| {
695 0 : if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
696 : // If we don't want to schedule stuff onto the safekeeper, respect that.
697 0 : return None;
698 0 : }
699 0 : let utilization_opt = if let SafekeeperState::Available {
700 : last_seen_at: _,
701 0 : utilization,
702 0 : } = sk.1.availability()
703 : {
704 0 : Some(utilization)
705 : } else {
706 : // non-available safekeepers still get a chance for new timelines,
707 : // but put them last in the list.
708 0 : None
709 : };
710 0 : let info = SafekeeperInfo {
711 0 : hostname: sk.1.skp.host.clone(),
712 0 : id: NodeId(sk.1.skp.id as u64),
713 0 : };
714 0 : Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
715 0 : })
716 0 : .collect::<Vec<_>>()
717 : };
718 0 : all_safekeepers.sort_by_key(|sk| {
719 : (
720 0 : sk.0.as_ref()
721 0 : .map(|ut| ut.timeline_count)
722 0 : .unwrap_or(u64::MAX),
723 : // Use the id to decide on equal scores for reliability
724 0 : sk.1.id.0,
725 : )
726 0 : });
727 : // Number of safekeepers in different AZs we are looking for
728 0 : let wanted_count = self.config.timeline_safekeeper_count;
729 :
730 0 : let mut sks = Vec::new();
731 0 : let mut azs = HashSet::new();
732 0 : for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
733 0 : if !azs.insert(az_id) {
734 0 : continue;
735 0 : }
736 0 : sks.push(sk_info.clone());
737 0 : if sks.len() == wanted_count {
738 0 : break;
739 0 : }
740 : }
741 0 : if sks.len() == wanted_count {
742 0 : Ok(sks)
743 : } else {
744 0 : Err(ApiError::InternalServerError(anyhow::anyhow!(
745 0 : "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
746 0 : sks.len(),
747 0 : all_safekeepers.len(),
748 0 : )))
749 : }
750 0 : }
751 :
752 0 : pub(crate) async fn safekeepers_list(
753 0 : &self,
754 0 : ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
755 0 : let locked = self.inner.read().unwrap();
756 0 : let mut list = locked
757 0 : .safekeepers
758 0 : .iter()
759 0 : .map(|sk| sk.1.describe_response())
760 0 : .collect::<Result<Vec<_>, _>>()?;
761 0 : list.sort_by_key(|v| v.id);
762 0 : Ok(list)
763 0 : }
764 :
765 0 : pub(crate) async fn get_safekeeper(
766 0 : &self,
767 0 : id: i64,
768 0 : ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
769 0 : let locked = self.inner.read().unwrap();
770 0 : let sk = locked
771 0 : .safekeepers
772 0 : .get(&NodeId(id as u64))
773 0 : .ok_or(diesel::result::Error::NotFound)?;
774 0 : sk.describe_response()
775 0 : }
776 :
777 0 : pub(crate) async fn upsert_safekeeper(
778 0 : self: &Arc<Service>,
779 0 : record: crate::persistence::SafekeeperUpsert,
780 0 : ) -> Result<(), ApiError> {
781 0 : let node_id = NodeId(record.id as u64);
782 0 : let use_https = self.config.use_https_safekeeper_api;
783 :
784 0 : if use_https && record.https_port.is_none() {
785 0 : return Err(ApiError::PreconditionFailed(
786 0 : format!(
787 0 : "cannot upsert safekeeper {node_id}: \
788 0 : https is enabled, but https port is not specified"
789 0 : )
790 0 : .into(),
791 0 : ));
792 0 : }
793 :
794 0 : self.persistence.safekeeper_upsert(record.clone()).await?;
795 : {
796 0 : let mut locked = self.inner.write().unwrap();
797 0 : let mut safekeepers = (*locked.safekeepers).clone();
798 0 : match safekeepers.entry(node_id) {
799 0 : std::collections::hash_map::Entry::Occupied(mut entry) => entry
800 0 : .get_mut()
801 0 : .update_from_record(record)
802 0 : .expect("all preconditions should be checked before upsert to database"),
803 0 : std::collections::hash_map::Entry::Vacant(entry) => {
804 0 : entry.insert(
805 0 : Safekeeper::from_persistence(
806 0 : crate::persistence::SafekeeperPersistence::from_upsert(
807 0 : record,
808 0 : SkSchedulingPolicy::Activating,
809 0 : ),
810 0 : CancellationToken::new(),
811 0 : use_https,
812 0 : )
813 0 : .expect("all preconditions should be checked before upsert to database"),
814 0 : );
815 0 : }
816 : }
817 0 : locked
818 0 : .safekeeper_reconcilers
819 0 : .start_reconciler(node_id, self);
820 0 : locked.safekeepers = Arc::new(safekeepers);
821 0 : metrics::METRICS_REGISTRY
822 0 : .metrics_group
823 0 : .storage_controller_safekeeper_nodes
824 0 : .set(locked.safekeepers.len() as i64);
825 0 : metrics::METRICS_REGISTRY
826 0 : .metrics_group
827 0 : .storage_controller_https_safekeeper_nodes
828 0 : .set(
829 0 : locked
830 0 : .safekeepers
831 0 : .values()
832 0 : .filter(|s| s.has_https_port())
833 0 : .count() as i64,
834 : );
835 : }
836 0 : Ok(())
837 0 : }
838 :
839 0 : pub(crate) async fn set_safekeeper_scheduling_policy(
840 0 : self: &Arc<Service>,
841 0 : id: i64,
842 0 : scheduling_policy: SkSchedulingPolicy,
843 0 : ) -> Result<(), DatabaseError> {
844 0 : self.persistence
845 0 : .set_safekeeper_scheduling_policy(id, scheduling_policy)
846 0 : .await?;
847 0 : let node_id = NodeId(id as u64);
848 : // After the change has been persisted successfully, update the in-memory state
849 0 : self.set_safekeeper_scheduling_policy_in_mem(node_id, scheduling_policy)
850 0 : .await
851 0 : }
852 :
853 0 : pub(crate) async fn set_safekeeper_scheduling_policy_in_mem(
854 0 : self: &Arc<Service>,
855 0 : node_id: NodeId,
856 0 : scheduling_policy: SkSchedulingPolicy,
857 0 : ) -> Result<(), DatabaseError> {
858 0 : let mut locked = self.inner.write().unwrap();
859 0 : let mut safekeepers = (*locked.safekeepers).clone();
860 0 : let sk = safekeepers
861 0 : .get_mut(&node_id)
862 0 : .ok_or(DatabaseError::Logical("Not found".to_string()))?;
863 0 : sk.set_scheduling_policy(scheduling_policy);
864 :
865 0 : match scheduling_policy {
866 0 : SkSchedulingPolicy::Active => {
867 0 : locked
868 0 : .safekeeper_reconcilers
869 0 : .start_reconciler(node_id, self);
870 0 : }
871 : SkSchedulingPolicy::Decomissioned
872 : | SkSchedulingPolicy::Pause
873 0 : | SkSchedulingPolicy::Activating => {
874 0 : locked.safekeeper_reconcilers.stop_reconciler(node_id);
875 0 : }
876 : }
877 :
878 0 : locked.safekeepers = Arc::new(safekeepers);
879 0 : Ok(())
880 0 : }
881 :
882 : /// Call `switch_timeline_membership` on all safekeepers with retries
883 : /// till the quorum of successful responses is reached.
884 : ///
885 : /// If min_position is not None, validates that majority of safekeepers
886 : /// reached at least min_position.
887 : ///
888 : /// Return responses from safekeepers in the input order.
889 0 : async fn tenant_timeline_set_membership_quorum(
890 0 : self: &Arc<Self>,
891 0 : tenant_id: TenantId,
892 0 : timeline_id: TimelineId,
893 0 : safekeepers: &[Safekeeper],
894 0 : config: &membership::Configuration,
895 0 : min_position: Option<(Term, Lsn)>,
896 0 : ) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
897 0 : let req = TimelineMembershipSwitchRequest {
898 0 : mconf: config.clone(),
899 0 : };
900 :
901 : const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
902 :
903 0 : let results = self
904 0 : .tenant_timeline_safekeeper_op_quorum(
905 0 : safekeepers,
906 0 : move |client| {
907 0 : let req = req.clone();
908 0 : async move {
909 0 : let mut res = client
910 0 : .switch_timeline_membership(tenant_id, timeline_id, &req)
911 0 : .await;
912 :
913 : // If min_position is not reached, map the response to an error,
914 : // so it isn't counted toward the quorum.
915 0 : if let Some(min_position) = min_position {
916 0 : if let Ok(ok_res) = &res {
917 0 : if (ok_res.last_log_term, ok_res.flush_lsn) < min_position {
918 0 : // Use Error::Timeout to make this error retriable.
919 0 : res = Err(mgmt_api::Error::Timeout(
920 0 : format!(
921 0 : "safekeeper {} returned position {:?} which is less than minimum required position {:?}",
922 0 : client.node_id_label(),
923 0 : (ok_res.last_log_term, ok_res.flush_lsn),
924 0 : min_position
925 0 : )
926 0 : ));
927 0 : }
928 0 : }
929 0 : }
930 :
931 0 : res
932 0 : }
933 0 : },
934 : SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT,
935 : )
936 0 : .await?;
937 :
938 0 : for res in results.iter().flatten() {
939 0 : if res.current_conf.generation > config.generation {
940 : // Antoher switch_membership raced us.
941 0 : return Err(ApiError::Conflict(format!(
942 0 : "received configuration with generation {} from safekeeper, but expected {}",
943 0 : res.current_conf.generation, config.generation
944 0 : )));
945 0 : } else if res.current_conf.generation < config.generation {
946 : // Note: should never happen.
947 : // If we get a response, it should be at least the sent generation.
948 0 : tracing::error!(
949 0 : "received configuration with generation {} from safekeeper, but expected {}",
950 : res.current_conf.generation,
951 : config.generation
952 : );
953 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
954 0 : "received configuration with generation {} from safekeeper, but expected {}",
955 0 : res.current_conf.generation,
956 0 : config.generation
957 0 : )));
958 0 : }
959 : }
960 :
961 0 : Ok(results)
962 0 : }
963 :
964 : /// Pull timeline to to_safekeepers from from_safekeepers with retries.
965 : ///
966 : /// Returns Ok(()) only if all the pull_timeline requests were successful.
967 0 : async fn tenant_timeline_pull_from_peers(
968 0 : self: &Arc<Self>,
969 0 : tenant_id: TenantId,
970 0 : timeline_id: TimelineId,
971 0 : to_safekeepers: &[Safekeeper],
972 0 : from_safekeepers: &[Safekeeper],
973 0 : ) -> Result<(), ApiError> {
974 0 : let http_hosts = from_safekeepers
975 0 : .iter()
976 0 : .map(|sk| sk.base_url())
977 0 : .collect::<Vec<_>>();
978 :
979 0 : tracing::info!(
980 0 : "pulling timeline to {:?} from {:?}",
981 0 : to_safekeepers
982 0 : .iter()
983 0 : .map(|sk| sk.get_id())
984 0 : .collect::<Vec<_>>(),
985 0 : from_safekeepers
986 0 : .iter()
987 0 : .map(|sk| sk.get_id())
988 0 : .collect::<Vec<_>>()
989 : );
990 :
991 : // TODO(diko): need to pass mconf/generation with the request
992 : // to properly handle tombstones. Ignore tombstones for now.
993 : // Worst case: we leave a timeline on a safekeeper which is not in the current set.
994 0 : let req = PullTimelineRequest {
995 0 : tenant_id,
996 0 : timeline_id,
997 0 : http_hosts,
998 0 : ignore_tombstone: Some(true),
999 0 : };
1000 :
1001 : const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
1002 :
1003 0 : let responses = self
1004 0 : .tenant_timeline_safekeeper_op(
1005 0 : to_safekeepers,
1006 0 : move |client| {
1007 0 : let req = req.clone();
1008 0 : async move { client.pull_timeline(&req).await }
1009 0 : },
1010 : SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
1011 : )
1012 0 : .await?;
1013 :
1014 0 : if let Some((idx, err)) = responses
1015 0 : .iter()
1016 0 : .enumerate()
1017 0 : .find_map(|(idx, res)| Some((idx, res.as_ref().err()?)))
1018 : {
1019 0 : let sk_id = to_safekeepers[idx].get_id();
1020 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1021 0 : "pull_timeline to {sk_id} failed: {err}",
1022 0 : )));
1023 0 : }
1024 :
1025 0 : Ok(())
1026 0 : }
1027 :
1028 : /// Exclude a timeline from safekeepers in parallel with retries.
1029 : /// If an exclude request is unsuccessful, it will be added to
1030 : /// the reconciler, and after that the function will succeed.
1031 0 : async fn tenant_timeline_safekeeper_exclude(
1032 0 : self: &Arc<Self>,
1033 0 : tenant_id: TenantId,
1034 0 : timeline_id: TimelineId,
1035 0 : safekeepers: &[Safekeeper],
1036 0 : config: &membership::Configuration,
1037 0 : ) -> Result<(), ApiError> {
1038 0 : let req = TimelineMembershipSwitchRequest {
1039 0 : mconf: config.clone(),
1040 0 : };
1041 :
1042 : const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
1043 :
1044 0 : let results = self
1045 0 : .tenant_timeline_safekeeper_op(
1046 0 : safekeepers,
1047 0 : move |client| {
1048 0 : let req = req.clone();
1049 0 : async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
1050 0 : },
1051 : SK_EXCLUDE_TIMELINE_TIMEOUT,
1052 : )
1053 0 : .await?;
1054 :
1055 0 : let mut reconcile_requests = Vec::new();
1056 :
1057 0 : for (idx, res) in results.iter().enumerate() {
1058 0 : if res.is_err() {
1059 0 : let sk_id = safekeepers[idx].skp.id;
1060 0 : let pending_op = TimelinePendingOpPersistence {
1061 0 : tenant_id: tenant_id.to_string(),
1062 0 : timeline_id: timeline_id.to_string(),
1063 0 : generation: config.generation.into_inner() as i32,
1064 0 : op_kind: SafekeeperTimelineOpKind::Exclude,
1065 0 : sk_id,
1066 0 : };
1067 0 : tracing::info!("writing pending exclude op for sk id {sk_id}");
1068 0 : self.persistence.insert_pending_op(pending_op).await?;
1069 :
1070 0 : let req = ScheduleRequest {
1071 0 : safekeeper: Box::new(safekeepers[idx].clone()),
1072 0 : host_list: Vec::new(),
1073 0 : tenant_id,
1074 0 : timeline_id: Some(timeline_id),
1075 0 : generation: config.generation.into_inner(),
1076 0 : kind: SafekeeperTimelineOpKind::Exclude,
1077 0 : };
1078 0 : reconcile_requests.push(req);
1079 0 : }
1080 : }
1081 :
1082 0 : if !reconcile_requests.is_empty() {
1083 0 : let locked = self.inner.read().unwrap();
1084 0 : for req in reconcile_requests {
1085 0 : locked.safekeeper_reconcilers.schedule_request(req);
1086 0 : }
1087 0 : }
1088 :
1089 0 : Ok(())
1090 0 : }
1091 :
1092 : /// Migrate timeline safekeeper set to a new set.
1093 : ///
1094 : /// This function implements an algorithm from RFC-035.
1095 : /// <https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md>
1096 0 : pub(crate) async fn tenant_timeline_safekeeper_migrate(
1097 0 : self: &Arc<Self>,
1098 0 : tenant_id: TenantId,
1099 0 : timeline_id: TimelineId,
1100 0 : req: TimelineSafekeeperMigrateRequest,
1101 0 : ) -> Result<(), ApiError> {
1102 0 : let all_safekeepers = self.inner.read().unwrap().safekeepers.clone();
1103 :
1104 0 : let new_sk_set = req.new_sk_set;
1105 :
1106 0 : for sk_id in new_sk_set.iter() {
1107 0 : if !all_safekeepers.contains_key(sk_id) {
1108 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1109 0 : "safekeeper {sk_id} does not exist"
1110 0 : )));
1111 0 : }
1112 : }
1113 :
1114 : // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
1115 0 : let _tenant_lock = trace_shared_lock(
1116 0 : &self.tenant_op_locks,
1117 0 : tenant_id,
1118 0 : TenantOperations::TimelineSafekeeperMigrate,
1119 0 : )
1120 0 : .await;
1121 :
1122 : // 1. Fetch current timeline configuration from the configuration storage.
1123 :
1124 0 : let timeline = self
1125 0 : .persistence
1126 0 : .get_timeline(tenant_id, timeline_id)
1127 0 : .await?;
1128 :
1129 0 : let Some(timeline) = timeline else {
1130 0 : return Err(ApiError::NotFound(
1131 0 : anyhow::anyhow!(
1132 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
1133 0 : )
1134 0 : .into(),
1135 0 : ));
1136 : };
1137 :
1138 0 : let cur_sk_set = timeline
1139 0 : .sk_set
1140 0 : .iter()
1141 0 : .map(|&id| NodeId(id as u64))
1142 0 : .collect::<Vec<_>>();
1143 :
1144 0 : tracing::info!(
1145 : ?cur_sk_set,
1146 : ?new_sk_set,
1147 0 : "Migrating timeline to new safekeeper set",
1148 : );
1149 :
1150 0 : let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
1151 :
1152 0 : if let Some(ref presistent_new_sk_set) = timeline.new_sk_set {
1153 : // 2. If it is already joint one and new_set is different from desired_set refuse to change.
1154 0 : if presistent_new_sk_set
1155 0 : .iter()
1156 0 : .map(|&id| NodeId(id as u64))
1157 0 : .ne(new_sk_set.iter().cloned())
1158 : {
1159 0 : tracing::info!(
1160 : ?presistent_new_sk_set,
1161 : ?new_sk_set,
1162 0 : "different new safekeeper set is already set in the database",
1163 : );
1164 0 : return Err(ApiError::Conflict(format!(
1165 0 : "the timeline is already migrating to a different safekeeper set: {presistent_new_sk_set:?}"
1166 0 : )));
1167 0 : }
1168 : // It it is the same new_sk_set, we can continue the migration (retry).
1169 : } else {
1170 : // 3. No active migration yet.
1171 : // Increment current generation and put desired_set to new_sk_set.
1172 0 : generation = generation.next();
1173 :
1174 0 : self.persistence
1175 0 : .update_timeline_membership(
1176 0 : tenant_id,
1177 0 : timeline_id,
1178 0 : generation,
1179 0 : &cur_sk_set,
1180 0 : Some(&new_sk_set),
1181 0 : )
1182 0 : .await?;
1183 : }
1184 :
1185 0 : let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
1186 0 : let cur_sk_member_set = Self::make_member_set(&cur_safekeepers)?;
1187 :
1188 0 : let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
1189 0 : let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
1190 0 : let new_sk_member_set = Self::make_member_set(&new_safekeepers)?;
1191 :
1192 0 : let joint_config = membership::Configuration {
1193 0 : generation,
1194 0 : members: cur_sk_member_set,
1195 0 : new_members: Some(new_sk_member_set.clone()),
1196 0 : };
1197 :
1198 : // 4. Call PUT configuration on safekeepers from the current set,
1199 : // delivering them joint_conf.
1200 :
1201 : // Notify cplane/compute about the membership change BEFORE changing the membership on safekeepers.
1202 : // This way the compute will know about new safekeepers from joint_config before we require to
1203 : // collect a quorum from them.
1204 0 : self.cplane_notify_safekeepers(tenant_id, timeline_id, &joint_config)
1205 0 : .await?;
1206 :
1207 0 : let results = self
1208 0 : .tenant_timeline_set_membership_quorum(
1209 0 : tenant_id,
1210 0 : timeline_id,
1211 0 : &cur_safekeepers,
1212 0 : &joint_config,
1213 0 : None, // no min position
1214 0 : )
1215 0 : .await?;
1216 :
1217 0 : let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
1218 0 : for res in results.into_iter().flatten() {
1219 0 : let sk_position = (res.last_log_term, res.flush_lsn);
1220 0 : if sync_position < sk_position {
1221 0 : sync_position = sk_position;
1222 0 : }
1223 : }
1224 :
1225 0 : tracing::info!(
1226 : %generation,
1227 : ?sync_position,
1228 0 : "safekeepers set membership updated",
1229 : );
1230 :
1231 : // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
1232 : // by doing pull_timeline from the majority of the current set.
1233 :
1234 : // Filter out safekeepers which are already in the current set.
1235 0 : let from_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
1236 0 : let pull_to_safekeepers = new_safekeepers
1237 0 : .iter()
1238 0 : .filter(|sk| !from_ids.contains(&sk.get_id()))
1239 0 : .cloned()
1240 0 : .collect::<Vec<_>>();
1241 :
1242 0 : self.tenant_timeline_pull_from_peers(
1243 0 : tenant_id,
1244 0 : timeline_id,
1245 0 : &pull_to_safekeepers,
1246 0 : &cur_safekeepers,
1247 0 : )
1248 0 : .await?;
1249 :
1250 : // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
1251 :
1252 : // TODO(diko): do we need to bump timeline term?
1253 :
1254 : // 7. Repeatedly call PUT configuration on safekeepers from the new set,
1255 : // delivering them joint_conf and collecting their positions.
1256 :
1257 0 : tracing::info!(?sync_position, "waiting for safekeepers to sync position");
1258 :
1259 0 : self.tenant_timeline_set_membership_quorum(
1260 0 : tenant_id,
1261 0 : timeline_id,
1262 0 : &new_safekeepers,
1263 0 : &joint_config,
1264 0 : Some(sync_position),
1265 0 : )
1266 0 : .await?;
1267 :
1268 : // 8. Create new_conf: Configuration incrementing joint_conf generation and
1269 : // having new safekeeper set as sk_set and None new_sk_set.
1270 :
1271 0 : let generation = generation.next();
1272 :
1273 0 : let new_conf = membership::Configuration {
1274 0 : generation,
1275 0 : members: new_sk_member_set,
1276 0 : new_members: None,
1277 0 : };
1278 :
1279 0 : self.persistence
1280 0 : .update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None)
1281 0 : .await?;
1282 :
1283 : // TODO(diko): at this point we have already updated the timeline in the database,
1284 : // but we still need to notify safekeepers and cplane about the new configuration,
1285 : // and put delition of the timeline from the old safekeepers into the reconciler.
1286 : // Ideally it should be done atomically, but now it's not.
1287 : // Worst case: the timeline is not deleted from old safekeepers,
1288 : // the compute may require both quorums till the migration is retried and completed.
1289 :
1290 0 : self.tenant_timeline_set_membership_quorum(
1291 0 : tenant_id,
1292 0 : timeline_id,
1293 0 : &new_safekeepers,
1294 0 : &new_conf,
1295 0 : None, // no min position
1296 0 : )
1297 0 : .await?;
1298 :
1299 0 : let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
1300 0 : let exclude_safekeepers = cur_safekeepers
1301 0 : .into_iter()
1302 0 : .filter(|sk| !new_ids.contains(&sk.get_id()))
1303 0 : .collect::<Vec<_>>();
1304 0 : self.tenant_timeline_safekeeper_exclude(
1305 0 : tenant_id,
1306 0 : timeline_id,
1307 0 : &exclude_safekeepers,
1308 0 : &new_conf,
1309 0 : )
1310 0 : .await?;
1311 :
1312 : // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
1313 : // This way the compute will stop talking to excluded safekeepers only after we stop requiring to
1314 : // collect a quorum from them.
1315 0 : self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf)
1316 0 : .await?;
1317 :
1318 0 : Ok(())
1319 0 : }
1320 :
1321 : /// Notify cplane about safekeeper membership change.
1322 : /// The cplane will receive a joint set of safekeepers as a safekeeper list.
1323 0 : async fn cplane_notify_safekeepers(
1324 0 : &self,
1325 0 : tenant_id: TenantId,
1326 0 : timeline_id: TimelineId,
1327 0 : mconf: &membership::Configuration,
1328 0 : ) -> Result<(), ApiError> {
1329 0 : let mut safekeepers = Vec::new();
1330 0 : let mut ids: HashSet<_> = HashSet::new();
1331 :
1332 0 : for member in mconf
1333 0 : .members
1334 0 : .m
1335 0 : .iter()
1336 0 : .chain(mconf.new_members.iter().flat_map(|m| m.m.iter()))
1337 : {
1338 0 : if ids.insert(member.id) {
1339 0 : safekeepers.push(compute_hook::SafekeeperInfo {
1340 0 : id: member.id,
1341 0 : hostname: Some(member.host.clone()),
1342 0 : });
1343 0 : }
1344 : }
1345 :
1346 0 : self.compute_hook
1347 0 : .notify_safekeepers(
1348 0 : compute_hook::SafekeepersUpdate {
1349 0 : tenant_id,
1350 0 : timeline_id,
1351 0 : generation: mconf.generation,
1352 0 : safekeepers,
1353 0 : },
1354 0 : &self.cancel,
1355 0 : )
1356 0 : .await
1357 0 : .map_err(|err| {
1358 0 : ApiError::InternalServerError(anyhow::anyhow!(
1359 0 : "failed to notify cplane about safekeeper membership change: {err}"
1360 0 : ))
1361 0 : })
1362 0 : }
1363 : }
|