Line data Source code
1 : use std::collections::HashSet;
2 : use std::str::FromStr;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use super::safekeeper_reconciler::ScheduleRequest;
7 : use crate::compute_hook;
8 : use crate::heartbeater::SafekeeperState;
9 : use crate::id_lock_map::trace_shared_lock;
10 : use crate::metrics;
11 : use crate::persistence::{
12 : DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
13 : };
14 : use crate::safekeeper::Safekeeper;
15 : use crate::safekeeper_client::SafekeeperClient;
16 : use crate::service::TenantOperations;
17 : use crate::timeline_import::TimelineImportFinalizeError;
18 : use anyhow::Context;
19 : use http_utils::error::ApiError;
20 : use pageserver_api::controller_api::{
21 : SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
22 : TimelineSafekeeperMigrateRequest,
23 : };
24 : use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
25 : use safekeeper_api::PgVersionId;
26 : use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
27 : use safekeeper_api::models::{
28 : PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
29 : TimelineMembershipSwitchResponse,
30 : };
31 : use safekeeper_api::{INITIAL_TERM, Term};
32 : use safekeeper_client::mgmt_api;
33 : use tokio::task::JoinSet;
34 : use tokio_util::sync::CancellationToken;
35 : use utils::id::{NodeId, TenantId, TimelineId};
36 : use utils::logging::SecretString;
37 : use utils::lsn::Lsn;
38 :
39 : use super::Service;
40 :
41 : impl Service {
42 0 : fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, anyhow::Error> {
43 0 : let members = safekeepers
44 0 : .iter()
45 0 : .map(|sk| sk.get_safekeeper_id())
46 0 : .collect::<Vec<_>>();
47 :
48 0 : MemberSet::new(members)
49 0 : }
50 :
51 0 : fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
52 0 : let safekeepers = {
53 0 : let locked = self.inner.read().unwrap();
54 0 : locked.safekeepers.clone()
55 : };
56 :
57 0 : ids.iter()
58 0 : .map(|&id| {
59 0 : let node_id = NodeId(id as u64);
60 0 : safekeepers.get(&node_id).cloned().ok_or_else(|| {
61 0 : ApiError::InternalServerError(anyhow::anyhow!(
62 0 : "safekeeper {node_id} is not registered"
63 0 : ))
64 0 : })
65 0 : })
66 0 : .collect::<Result<Vec<_>, _>>()
67 0 : }
68 :
69 : /// Timeline creation on safekeepers
70 : ///
71 : /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
72 : /// where `left` contains the list of safekeepers that didn't have a successful response.
73 : /// Assumes tenant lock is held while calling this function.
74 0 : pub(super) async fn tenant_timeline_create_safekeepers_quorum(
75 0 : &self,
76 0 : tenant_id: TenantId,
77 0 : timeline_id: TimelineId,
78 0 : pg_version: PgVersionId,
79 0 : timeline_persistence: &TimelinePersistence,
80 0 : ) -> Result<Vec<NodeId>, ApiError> {
81 0 : let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
82 :
83 0 : let mset = Self::make_member_set(&safekeepers).map_err(ApiError::InternalServerError)?;
84 0 : let mconf = safekeeper_api::membership::Configuration::new(mset);
85 :
86 0 : let req = safekeeper_api::models::TimelineCreateRequest {
87 0 : commit_lsn: None,
88 0 : mconf,
89 0 : pg_version,
90 0 : start_lsn: timeline_persistence.start_lsn.0,
91 0 : system_id: None,
92 0 : tenant_id,
93 0 : timeline_id,
94 0 : wal_seg_size: None,
95 0 : };
96 :
97 : const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
98 :
99 0 : let results = self
100 0 : .tenant_timeline_safekeeper_op_quorum(
101 0 : &safekeepers,
102 0 : move |client| {
103 0 : let req = req.clone();
104 0 : async move { client.create_timeline(&req).await }
105 0 : },
106 : SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
107 : )
108 0 : .await?;
109 :
110 0 : Ok(results
111 0 : .into_iter()
112 0 : .enumerate()
113 0 : .filter_map(|(idx, res)| {
114 0 : if res.is_ok() {
115 0 : None // Success, don't return this safekeeper
116 : } else {
117 0 : Some(safekeepers[idx].get_id()) // Failure, return this safekeeper
118 : }
119 0 : })
120 0 : .collect::<Vec<_>>())
121 0 : }
122 :
123 : /// Perform an operation on a list of safekeepers in parallel with retries.
124 : ///
125 : /// Return the results of the operation on each safekeeper in the input order.
126 0 : async fn tenant_timeline_safekeeper_op<T, O, F>(
127 0 : &self,
128 0 : safekeepers: &[Safekeeper],
129 0 : op: O,
130 0 : timeout: Duration,
131 0 : ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
132 0 : where
133 0 : O: FnMut(SafekeeperClient) -> F + Send + 'static,
134 0 : O: Clone,
135 0 : F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
136 0 : T: Sync + Send + 'static,
137 0 : {
138 0 : let jwt = self
139 0 : .config
140 0 : .safekeeper_jwt_token
141 0 : .clone()
142 0 : .map(SecretString::from);
143 0 : let mut joinset = JoinSet::new();
144 :
145 0 : for (idx, sk) in safekeepers.iter().enumerate() {
146 0 : let sk = sk.clone();
147 0 : let http_client = self.http_client.clone();
148 0 : let jwt = jwt.clone();
149 0 : let op = op.clone();
150 0 : joinset.spawn(async move {
151 0 : let res = sk
152 0 : .with_client_retries(
153 0 : op,
154 0 : &http_client,
155 0 : &jwt,
156 0 : 3,
157 0 : 3,
158 0 : // TODO(diko): This is a wrong timeout.
159 0 : // It should be scaled to the retry count.
160 0 : timeout,
161 0 : &CancellationToken::new(),
162 0 : )
163 0 : .await;
164 0 : (idx, res)
165 0 : });
166 : }
167 :
168 : // Initialize results with timeout errors in case we never get a response.
169 0 : let mut results: Vec<mgmt_api::Result<T>> = safekeepers
170 0 : .iter()
171 0 : .map(|_| {
172 0 : Err(mgmt_api::Error::Timeout(
173 0 : "safekeeper operation timed out".to_string(),
174 0 : ))
175 0 : })
176 0 : .collect();
177 :
178 : // After we have built the joinset, we now wait for the tasks to complete,
179 : // but with a specified timeout to make sure we return swiftly, either with
180 : // a failure or success.
181 0 : let reconcile_deadline = tokio::time::Instant::now() + timeout;
182 :
183 : // Wait until all tasks finish or timeout is hit, whichever occurs
184 : // first.
185 0 : let mut result_count = 0;
186 : loop {
187 0 : if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
188 : {
189 0 : let Some(res) = res else { break };
190 0 : match res {
191 0 : Ok((idx, res)) => {
192 0 : let sk = &safekeepers[idx];
193 0 : tracing::info!(
194 0 : "response from safekeeper id:{} at {}: {:?}",
195 0 : sk.get_id(),
196 : sk.skp.host,
197 : // Only print errors, as there is no Debug trait for T.
198 0 : res.as_ref().map(|_| ()),
199 : );
200 0 : results[idx] = res;
201 0 : result_count += 1;
202 : }
203 0 : Err(join_err) => {
204 0 : tracing::info!("join_err for task in joinset: {join_err}");
205 : }
206 : }
207 : } else {
208 0 : tracing::info!("timeout for operation call after {result_count} responses",);
209 0 : break;
210 : }
211 : }
212 :
213 0 : Ok(results)
214 0 : }
215 :
216 : /// Perform an operation on a list of safekeepers in parallel with retries,
217 : /// and validates that we reach a quorum of successful responses.
218 : ///
219 : /// Return the results of the operation on each safekeeper in the input order.
220 : /// It's guaranteed that at least a quorum of the responses are successful.
221 0 : async fn tenant_timeline_safekeeper_op_quorum<T, O, F>(
222 0 : &self,
223 0 : safekeepers: &[Safekeeper],
224 0 : op: O,
225 0 : timeout: Duration,
226 0 : ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
227 0 : where
228 0 : O: FnMut(SafekeeperClient) -> F,
229 0 : O: Clone + Send + 'static,
230 0 : F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
231 0 : T: Sync + Send + 'static,
232 0 : {
233 0 : let target_sk_count = safekeepers.len();
234 :
235 0 : if target_sk_count == 0 {
236 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
237 0 : "timeline configured without any safekeepers"
238 0 : )));
239 0 : }
240 :
241 0 : if target_sk_count < self.config.timeline_safekeeper_count {
242 0 : tracing::warn!(
243 0 : "running a quorum operation with {} safekeepers, which is less than configured {} safekeepers per timeline",
244 : target_sk_count,
245 : self.config.timeline_safekeeper_count
246 : );
247 0 : }
248 :
249 0 : let results = self
250 0 : .tenant_timeline_safekeeper_op(safekeepers, op, timeout)
251 0 : .await?;
252 :
253 : // Now check if quorum was reached in results.
254 :
255 0 : let quorum_size = target_sk_count / 2 + 1;
256 :
257 0 : let success_count = results.iter().filter(|res| res.is_ok()).count();
258 0 : if success_count < quorum_size {
259 : // Failure
260 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
261 0 : "not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
262 0 : )));
263 0 : }
264 :
265 0 : Ok(results)
266 0 : }
267 :
268 : /// Create timeline in controller database and on safekeepers.
269 : /// `timeline_info` is result of timeline creation on pageserver.
270 : ///
271 : /// All actions must be idempotent as the call is retried until success. It
272 : /// tries to create timeline in the db and on at least majority of
273 : /// safekeepers + queue creation for safekeepers which missed it in the db
274 : /// for infinite retries; after that, call returns Ok.
275 : ///
276 : /// The idea is that once this is reached as long as we have alive majority
277 : /// of safekeepers it is expected to get eventually operational as storcon
278 : /// will be able to seed timeline on nodes which missed creation by making
279 : /// pull_timeline from peers. On the other hand we don't want to fail
280 : /// timeline creation if one safekeeper is down.
281 0 : pub(super) async fn tenant_timeline_create_safekeepers(
282 0 : self: &Arc<Self>,
283 0 : tenant_id: TenantId,
284 0 : timeline_info: &TimelineInfo,
285 0 : read_only: bool,
286 0 : ) -> Result<SafekeepersInfo, ApiError> {
287 0 : let timeline_id = timeline_info.timeline_id;
288 0 : let pg_version = PgVersionId::from(timeline_info.pg_version);
289 : // Initially start_lsn is determined by last_record_lsn in pageserver
290 : // response as it does initdb. However, later we persist it and in sk
291 : // creation calls replace with the value from the timeline row if it
292 : // previously existed as on retries in theory endpoint might have
293 : // already written some data and advanced last_record_lsn, while we want
294 : // safekeepers to have consistent start_lsn.
295 0 : let start_lsn = timeline_info.last_record_lsn;
296 :
297 : // Choose initial set of safekeepers respecting affinity
298 0 : let sks = if !read_only {
299 0 : self.safekeepers_for_new_timeline().await?
300 : } else {
301 0 : Vec::new()
302 : };
303 0 : let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
304 : // Add timeline to db
305 0 : let mut timeline_persist = TimelinePersistence {
306 0 : tenant_id: tenant_id.to_string(),
307 0 : timeline_id: timeline_id.to_string(),
308 0 : start_lsn: start_lsn.into(),
309 0 : generation: 1,
310 0 : sk_set: sks_persistence.clone(),
311 0 : new_sk_set: None,
312 0 : cplane_notified_generation: 0,
313 0 : deleted_at: None,
314 0 : };
315 0 : let inserted = self
316 0 : .persistence
317 0 : .insert_timeline(timeline_persist.clone())
318 0 : .await?;
319 0 : if !inserted {
320 0 : if let Some(existent_persist) = self
321 0 : .persistence
322 0 : .get_timeline(tenant_id, timeline_id)
323 0 : .await?
324 0 : {
325 0 : // Replace with what we have in the db, to get stuff like the generation right.
326 0 : // We do still repeat the http calls to the safekeepers. After all, we could have
327 0 : // crashed right after the wrote to the DB.
328 0 : timeline_persist = existent_persist;
329 0 : } else {
330 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
331 0 : "insertion said timeline already in db, but looking it up, it was gone"
332 0 : )));
333 : }
334 0 : }
335 0 : let ret = SafekeepersInfo {
336 0 : generation: timeline_persist.generation as u32,
337 0 : safekeepers: sks.clone(),
338 0 : tenant_id,
339 0 : timeline_id,
340 0 : };
341 0 : if read_only {
342 0 : return Ok(ret);
343 0 : }
344 :
345 : // Create the timeline on a quorum of safekeepers
346 0 : let remaining = self
347 0 : .tenant_timeline_create_safekeepers_quorum(
348 0 : tenant_id,
349 0 : timeline_id,
350 0 : pg_version,
351 0 : &timeline_persist,
352 0 : )
353 0 : .await?;
354 :
355 : // For the remaining safekeepers, take care of their reconciliation asynchronously
356 0 : for &remaining_id in remaining.iter() {
357 0 : let pending_op = TimelinePendingOpPersistence {
358 0 : tenant_id: tenant_id.to_string(),
359 0 : timeline_id: timeline_id.to_string(),
360 0 : generation: timeline_persist.generation,
361 0 : op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
362 0 : sk_id: remaining_id.0 as i64,
363 0 : };
364 0 : tracing::info!("writing pending op for sk id {remaining_id}");
365 0 : self.persistence.insert_pending_op(pending_op).await?;
366 : }
367 0 : if !remaining.is_empty() {
368 0 : let locked = self.inner.read().unwrap();
369 0 : for remaining_id in remaining {
370 0 : let Some(sk) = locked.safekeepers.get(&remaining_id) else {
371 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
372 0 : "Couldn't find safekeeper with id {remaining_id}"
373 0 : )));
374 : };
375 0 : let Ok(host_list) = sks
376 0 : .iter()
377 0 : .map(|sk| {
378 : Ok((
379 0 : sk.id,
380 0 : locked
381 0 : .safekeepers
382 0 : .get(&sk.id)
383 0 : .ok_or_else(|| {
384 0 : ApiError::InternalServerError(anyhow::anyhow!(
385 0 : "Couldn't find safekeeper with id {} to pull from",
386 0 : sk.id
387 0 : ))
388 0 : })?
389 0 : .base_url(),
390 : ))
391 0 : })
392 0 : .collect::<Result<_, ApiError>>()
393 : else {
394 0 : continue;
395 : };
396 0 : let req = ScheduleRequest {
397 0 : safekeeper: Box::new(sk.clone()),
398 0 : host_list,
399 0 : tenant_id,
400 0 : timeline_id: Some(timeline_id),
401 0 : generation: timeline_persist.generation as u32,
402 0 : kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
403 0 : };
404 0 : locked.safekeeper_reconcilers.schedule_request(req);
405 : }
406 0 : }
407 :
408 0 : Ok(ret)
409 0 : }
410 :
411 0 : pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
412 0 : self: &Arc<Self>,
413 0 : tenant_id: TenantId,
414 0 : timeline_info: TimelineInfo,
415 0 : ) -> Result<(), TimelineImportFinalizeError> {
416 : const BACKOFF: Duration = Duration::from_secs(5);
417 :
418 : loop {
419 0 : if self.cancel.is_cancelled() {
420 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
421 0 : }
422 :
423 : // This function is only used in non-read-only scenarios
424 0 : let read_only = false;
425 0 : let res = self
426 0 : .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
427 0 : .await;
428 :
429 0 : match res {
430 : Ok(_) => {
431 0 : tracing::info!("Timeline created on safekeepers");
432 0 : break;
433 : }
434 0 : Err(err) => {
435 0 : tracing::error!("Failed to create timeline on safekeepers: {err}");
436 0 : tokio::select! {
437 0 : _ = self.cancel.cancelled() => {
438 0 : return Err(TimelineImportFinalizeError::ShuttingDown);
439 : },
440 0 : _ = tokio::time::sleep(BACKOFF) => {}
441 : };
442 : }
443 : }
444 : }
445 :
446 0 : Ok(())
447 0 : }
448 :
449 : /// Directly insert the timeline into the database without reconciling it with safekeepers.
450 : ///
451 : /// Useful if the timeline already exists on the specified safekeepers,
452 : /// but we want to make it storage controller managed.
453 0 : pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
454 0 : let persistence = TimelinePersistence {
455 0 : tenant_id: req.tenant_id.to_string(),
456 0 : timeline_id: req.timeline_id.to_string(),
457 0 : start_lsn: Lsn::INVALID.into(),
458 : generation: 1,
459 0 : sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
460 0 : new_sk_set: None,
461 : cplane_notified_generation: 1,
462 0 : deleted_at: None,
463 : };
464 0 : let inserted = self.persistence.insert_timeline(persistence).await?;
465 0 : if inserted {
466 0 : tracing::info!("imported timeline into db");
467 : } else {
468 0 : tracing::info!("didn't import timeline into db, as it is already present in db");
469 : }
470 0 : Ok(())
471 0 : }
472 :
473 : /// Locate safekeepers for a timeline.
474 : /// Return the generation, sk_set and new_sk_set if present.
475 : /// If the timeline is not storcon-managed, return NotFound.
476 0 : pub(crate) async fn tenant_timeline_locate(
477 0 : &self,
478 0 : tenant_id: TenantId,
479 0 : timeline_id: TimelineId,
480 0 : ) -> Result<TimelineLocateResponse, ApiError> {
481 0 : let timeline = self
482 0 : .persistence
483 0 : .get_timeline(tenant_id, timeline_id)
484 0 : .await?;
485 :
486 0 : let Some(timeline) = timeline else {
487 0 : return Err(ApiError::NotFound(
488 0 : anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
489 0 : ));
490 : };
491 :
492 : Ok(TimelineLocateResponse {
493 0 : generation: SafekeeperGeneration::new(timeline.generation as u32),
494 0 : sk_set: timeline
495 0 : .sk_set
496 0 : .iter()
497 0 : .map(|id| NodeId(*id as u64))
498 0 : .collect(),
499 0 : new_sk_set: timeline
500 0 : .new_sk_set
501 0 : .map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
502 : })
503 0 : }
504 :
505 : /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
506 0 : pub(super) async fn tenant_timeline_delete_safekeepers(
507 0 : self: &Arc<Self>,
508 0 : tenant_id: TenantId,
509 0 : timeline_id: TimelineId,
510 0 : ) -> Result<(), ApiError> {
511 0 : let tl = self
512 0 : .persistence
513 0 : .get_timeline(tenant_id, timeline_id)
514 0 : .await?;
515 0 : let Some(tl) = tl else {
516 0 : tracing::info!(
517 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
518 : );
519 0 : return Ok(());
520 : };
521 0 : self.persistence
522 0 : .timeline_set_deleted_at(tenant_id, timeline_id)
523 0 : .await?;
524 0 : let all_sks = tl
525 0 : .new_sk_set
526 0 : .iter()
527 0 : .flatten()
528 0 : .chain(tl.sk_set.iter())
529 0 : .collect::<HashSet<_>>();
530 :
531 : // The timeline has no safekeepers: we need to delete it from the db manually,
532 : // as no safekeeper reconciler will get to it
533 0 : if all_sks.is_empty() {
534 0 : if let Err(err) = self
535 0 : .persistence
536 0 : .delete_timeline(tenant_id, timeline_id)
537 0 : .await
538 : {
539 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
540 0 : }
541 0 : }
542 :
543 : // Schedule reconciliations
544 0 : for &sk_id in all_sks.iter() {
545 0 : let pending_op = TimelinePendingOpPersistence {
546 0 : tenant_id: tenant_id.to_string(),
547 0 : timeline_id: timeline_id.to_string(),
548 0 : generation: i32::MAX,
549 0 : op_kind: SafekeeperTimelineOpKind::Delete,
550 0 : sk_id: *sk_id,
551 0 : };
552 0 : tracing::info!("writing pending op for sk id {sk_id}");
553 0 : self.persistence.insert_pending_op(pending_op).await?;
554 : }
555 : {
556 0 : let locked = self.inner.read().unwrap();
557 0 : for sk_id in all_sks {
558 0 : let sk_id = NodeId(*sk_id as u64);
559 0 : let Some(sk) = locked.safekeepers.get(&sk_id) else {
560 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
561 0 : "Couldn't find safekeeper with id {sk_id}"
562 0 : )));
563 : };
564 :
565 0 : let req = ScheduleRequest {
566 0 : safekeeper: Box::new(sk.clone()),
567 0 : // we don't use this for this kind, put a dummy value
568 0 : host_list: Vec::new(),
569 0 : tenant_id,
570 0 : timeline_id: Some(timeline_id),
571 0 : generation: tl.generation as u32,
572 0 : kind: SafekeeperTimelineOpKind::Delete,
573 0 : };
574 0 : locked.safekeeper_reconcilers.schedule_request(req);
575 : }
576 : }
577 0 : Ok(())
578 0 : }
579 :
580 : /// Perform tenant deletion on safekeepers.
581 0 : pub(super) async fn tenant_delete_safekeepers(
582 0 : self: &Arc<Self>,
583 0 : tenant_id: TenantId,
584 0 : ) -> Result<(), ApiError> {
585 0 : let timeline_list = self
586 0 : .persistence
587 0 : .list_timelines_for_tenant(tenant_id)
588 0 : .await?;
589 :
590 0 : if timeline_list.is_empty() {
591 : // Early exit: the tenant is either empty or not migrated to the storcon yet
592 0 : tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
593 0 : return Ok(());
594 0 : }
595 :
596 0 : let timeline_list = timeline_list
597 0 : .into_iter()
598 0 : .map(|timeline| {
599 0 : let timeline_id = TimelineId::from_str(&timeline.timeline_id)
600 0 : .context("timeline id loaded from db")
601 0 : .map_err(ApiError::InternalServerError)?;
602 0 : Ok((timeline_id, timeline))
603 0 : })
604 0 : .collect::<Result<Vec<_>, ApiError>>()?;
605 :
606 : // Remove pending ops from db, and set `deleted_at`.
607 : // We cancel them in a later iteration once we hold the state lock.
608 0 : for (timeline_id, _timeline) in timeline_list.iter() {
609 0 : self.persistence
610 0 : .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
611 0 : .await?;
612 0 : self.persistence
613 0 : .timeline_set_deleted_at(tenant_id, *timeline_id)
614 0 : .await?;
615 : }
616 :
617 : // The list of safekeepers that have any of the timelines
618 0 : let mut sk_list = HashSet::new();
619 :
620 : // List all pending ops for all timelines, cancel them
621 0 : for (_timeline_id, timeline) in timeline_list.iter() {
622 0 : let sk_iter = timeline
623 0 : .sk_set
624 0 : .iter()
625 0 : .chain(timeline.new_sk_set.iter().flatten())
626 0 : .map(|id| NodeId(*id as u64));
627 0 : sk_list.extend(sk_iter);
628 : }
629 :
630 0 : for &sk_id in sk_list.iter() {
631 0 : let pending_op = TimelinePendingOpPersistence {
632 0 : tenant_id: tenant_id.to_string(),
633 0 : timeline_id: String::new(),
634 0 : generation: i32::MAX,
635 0 : op_kind: SafekeeperTimelineOpKind::Delete,
636 0 : sk_id: sk_id.0 as i64,
637 0 : };
638 0 : tracing::info!("writing pending op for sk id {sk_id}");
639 0 : self.persistence.insert_pending_op(pending_op).await?;
640 : }
641 :
642 0 : let mut locked = self.inner.write().unwrap();
643 :
644 0 : for (timeline_id, _timeline) in timeline_list.iter() {
645 0 : for sk_id in sk_list.iter() {
646 0 : locked
647 0 : .safekeeper_reconcilers
648 0 : .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
649 0 : }
650 : }
651 :
652 : // unwrap is safe: we return above for an empty timeline list
653 0 : let max_generation = timeline_list
654 0 : .iter()
655 0 : .map(|(_tl_id, tl)| tl.generation as u32)
656 0 : .max()
657 0 : .unwrap();
658 :
659 0 : for sk_id in sk_list {
660 0 : let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
661 0 : tracing::warn!("Couldn't find safekeeper with id {sk_id}");
662 0 : continue;
663 : };
664 : // Add pending op for tenant deletion
665 0 : let req = ScheduleRequest {
666 0 : generation: max_generation,
667 0 : host_list: Vec::new(),
668 0 : kind: SafekeeperTimelineOpKind::Delete,
669 0 : safekeeper: Box::new(safekeeper.clone()),
670 0 : tenant_id,
671 0 : timeline_id: None,
672 0 : };
673 0 : locked.safekeeper_reconcilers.schedule_request(req);
674 : }
675 0 : Ok(())
676 0 : }
677 :
678 : /// Choose safekeepers for the new timeline in different azs.
679 : /// 3 are choosen by default, but may be configured via config (for testing).
680 0 : pub(crate) async fn safekeepers_for_new_timeline(
681 0 : &self,
682 0 : ) -> Result<Vec<SafekeeperInfo>, ApiError> {
683 0 : let mut all_safekeepers = {
684 0 : let locked = self.inner.read().unwrap();
685 0 : locked
686 0 : .safekeepers
687 0 : .iter()
688 0 : .filter_map(|sk| {
689 0 : if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
690 : // If we don't want to schedule stuff onto the safekeeper, respect that.
691 0 : return None;
692 0 : }
693 0 : let utilization_opt = if let SafekeeperState::Available {
694 : last_seen_at: _,
695 0 : utilization,
696 0 : } = sk.1.availability()
697 : {
698 0 : Some(utilization)
699 : } else {
700 : // non-available safekeepers still get a chance for new timelines,
701 : // but put them last in the list.
702 0 : None
703 : };
704 0 : let info = SafekeeperInfo {
705 0 : hostname: sk.1.skp.host.clone(),
706 0 : id: NodeId(sk.1.skp.id as u64),
707 0 : };
708 0 : Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
709 0 : })
710 0 : .collect::<Vec<_>>()
711 : };
712 0 : all_safekeepers.sort_by_key(|sk| {
713 : (
714 0 : sk.0.as_ref()
715 0 : .map(|ut| ut.timeline_count)
716 0 : .unwrap_or(u64::MAX),
717 : // Use the id to decide on equal scores for reliability
718 0 : sk.1.id.0,
719 : )
720 0 : });
721 : // Number of safekeepers in different AZs we are looking for
722 0 : let wanted_count = self.config.timeline_safekeeper_count;
723 :
724 0 : let mut sks = Vec::new();
725 0 : let mut azs = HashSet::new();
726 0 : for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
727 0 : if !azs.insert(az_id) {
728 0 : continue;
729 0 : }
730 0 : sks.push(sk_info.clone());
731 0 : if sks.len() == wanted_count {
732 0 : break;
733 0 : }
734 : }
735 0 : if sks.len() == wanted_count {
736 0 : Ok(sks)
737 : } else {
738 0 : Err(ApiError::InternalServerError(anyhow::anyhow!(
739 0 : "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
740 0 : sks.len(),
741 0 : all_safekeepers.len(),
742 0 : )))
743 : }
744 0 : }
745 :
746 0 : pub(crate) async fn safekeepers_list(
747 0 : &self,
748 0 : ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
749 0 : let locked = self.inner.read().unwrap();
750 0 : let mut list = locked
751 0 : .safekeepers
752 0 : .iter()
753 0 : .map(|sk| sk.1.describe_response())
754 0 : .collect::<Result<Vec<_>, _>>()?;
755 0 : list.sort_by_key(|v| v.id);
756 0 : Ok(list)
757 0 : }
758 :
759 0 : pub(crate) async fn get_safekeeper(
760 0 : &self,
761 0 : id: i64,
762 0 : ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
763 0 : let locked = self.inner.read().unwrap();
764 0 : let sk = locked
765 0 : .safekeepers
766 0 : .get(&NodeId(id as u64))
767 0 : .ok_or(diesel::result::Error::NotFound)?;
768 0 : sk.describe_response()
769 0 : }
770 :
771 0 : pub(crate) async fn upsert_safekeeper(
772 0 : self: &Arc<Service>,
773 0 : record: crate::persistence::SafekeeperUpsert,
774 0 : ) -> Result<(), ApiError> {
775 0 : let node_id = NodeId(record.id as u64);
776 0 : let use_https = self.config.use_https_safekeeper_api;
777 :
778 0 : if use_https && record.https_port.is_none() {
779 0 : return Err(ApiError::PreconditionFailed(
780 0 : format!(
781 0 : "cannot upsert safekeeper {node_id}: \
782 0 : https is enabled, but https port is not specified"
783 0 : )
784 0 : .into(),
785 0 : ));
786 0 : }
787 :
788 0 : self.persistence.safekeeper_upsert(record.clone()).await?;
789 : {
790 0 : let mut locked = self.inner.write().unwrap();
791 0 : let mut safekeepers = (*locked.safekeepers).clone();
792 0 : match safekeepers.entry(node_id) {
793 0 : std::collections::hash_map::Entry::Occupied(mut entry) => entry
794 0 : .get_mut()
795 0 : .update_from_record(record)
796 0 : .expect("all preconditions should be checked before upsert to database"),
797 0 : std::collections::hash_map::Entry::Vacant(entry) => {
798 0 : entry.insert(
799 0 : Safekeeper::from_persistence(
800 0 : crate::persistence::SafekeeperPersistence::from_upsert(
801 0 : record,
802 0 : SkSchedulingPolicy::Activating,
803 0 : ),
804 0 : CancellationToken::new(),
805 0 : use_https,
806 0 : )
807 0 : .expect("all preconditions should be checked before upsert to database"),
808 0 : );
809 0 : }
810 : }
811 0 : locked
812 0 : .safekeeper_reconcilers
813 0 : .start_reconciler(node_id, self);
814 0 : locked.safekeepers = Arc::new(safekeepers);
815 0 : metrics::METRICS_REGISTRY
816 0 : .metrics_group
817 0 : .storage_controller_safekeeper_nodes
818 0 : .set(locked.safekeepers.len() as i64);
819 0 : metrics::METRICS_REGISTRY
820 0 : .metrics_group
821 0 : .storage_controller_https_safekeeper_nodes
822 0 : .set(
823 0 : locked
824 0 : .safekeepers
825 0 : .values()
826 0 : .filter(|s| s.has_https_port())
827 0 : .count() as i64,
828 : );
829 : }
830 0 : Ok(())
831 0 : }
832 :
833 0 : pub(crate) async fn set_safekeeper_scheduling_policy(
834 0 : self: &Arc<Service>,
835 0 : id: i64,
836 0 : scheduling_policy: SkSchedulingPolicy,
837 0 : ) -> Result<(), DatabaseError> {
838 0 : self.persistence
839 0 : .set_safekeeper_scheduling_policy(id, scheduling_policy)
840 0 : .await?;
841 0 : let node_id = NodeId(id as u64);
842 : // After the change has been persisted successfully, update the in-memory state
843 0 : self.set_safekeeper_scheduling_policy_in_mem(node_id, scheduling_policy)
844 0 : .await
845 0 : }
846 :
847 0 : pub(crate) async fn set_safekeeper_scheduling_policy_in_mem(
848 0 : self: &Arc<Service>,
849 0 : node_id: NodeId,
850 0 : scheduling_policy: SkSchedulingPolicy,
851 0 : ) -> Result<(), DatabaseError> {
852 0 : let mut locked = self.inner.write().unwrap();
853 0 : let mut safekeepers = (*locked.safekeepers).clone();
854 0 : let sk = safekeepers
855 0 : .get_mut(&node_id)
856 0 : .ok_or(DatabaseError::Logical("Not found".to_string()))?;
857 0 : sk.set_scheduling_policy(scheduling_policy);
858 :
859 0 : match scheduling_policy {
860 0 : SkSchedulingPolicy::Active => {
861 0 : locked
862 0 : .safekeeper_reconcilers
863 0 : .start_reconciler(node_id, self);
864 0 : }
865 : SkSchedulingPolicy::Decomissioned
866 : | SkSchedulingPolicy::Pause
867 0 : | SkSchedulingPolicy::Activating => {
868 0 : locked.safekeeper_reconcilers.stop_reconciler(node_id);
869 0 : }
870 : }
871 :
872 0 : locked.safekeepers = Arc::new(safekeepers);
873 0 : Ok(())
874 0 : }
875 :
876 : /// Call `switch_timeline_membership` on all safekeepers with retries
877 : /// till the quorum of successful responses is reached.
878 : ///
879 : /// If min_position is not None, validates that majority of safekeepers
880 : /// reached at least min_position.
881 : ///
882 : /// Return responses from safekeepers in the input order.
883 0 : async fn tenant_timeline_set_membership_quorum(
884 0 : self: &Arc<Self>,
885 0 : tenant_id: TenantId,
886 0 : timeline_id: TimelineId,
887 0 : safekeepers: &[Safekeeper],
888 0 : config: &membership::Configuration,
889 0 : min_position: Option<(Term, Lsn)>,
890 0 : ) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
891 0 : let req = TimelineMembershipSwitchRequest {
892 0 : mconf: config.clone(),
893 0 : };
894 :
895 : const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
896 :
897 0 : let results = self
898 0 : .tenant_timeline_safekeeper_op_quorum(
899 0 : safekeepers,
900 0 : move |client| {
901 0 : let req = req.clone();
902 0 : async move {
903 0 : let mut res = client
904 0 : .switch_timeline_membership(tenant_id, timeline_id, &req)
905 0 : .await;
906 :
907 : // If min_position is not reached, map the response to an error,
908 : // so it isn't counted toward the quorum.
909 0 : if let Some(min_position) = min_position {
910 0 : if let Ok(ok_res) = &res {
911 0 : if (ok_res.last_log_term, ok_res.flush_lsn) < min_position {
912 0 : // Use Error::Timeout to make this error retriable.
913 0 : res = Err(mgmt_api::Error::Timeout(
914 0 : format!(
915 0 : "safekeeper {} returned position {:?} which is less than minimum required position {:?}",
916 0 : client.node_id_label(),
917 0 : (ok_res.last_log_term, ok_res.flush_lsn),
918 0 : min_position
919 0 : )
920 0 : ));
921 0 : }
922 0 : }
923 0 : }
924 :
925 0 : res
926 0 : }
927 0 : },
928 : SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT,
929 : )
930 0 : .await?;
931 :
932 0 : for res in results.iter().flatten() {
933 0 : if res.current_conf.generation > config.generation {
934 : // Antoher switch_membership raced us.
935 0 : return Err(ApiError::Conflict(format!(
936 0 : "received configuration with generation {} from safekeeper, but expected {}",
937 0 : res.current_conf.generation, config.generation
938 0 : )));
939 0 : } else if res.current_conf.generation < config.generation {
940 : // Note: should never happen.
941 : // If we get a response, it should be at least the sent generation.
942 0 : tracing::error!(
943 0 : "received configuration with generation {} from safekeeper, but expected {}",
944 : res.current_conf.generation,
945 : config.generation
946 : );
947 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
948 0 : "received configuration with generation {} from safekeeper, but expected {}",
949 0 : res.current_conf.generation,
950 0 : config.generation
951 0 : )));
952 0 : }
953 : }
954 :
955 0 : Ok(results)
956 0 : }
957 :
958 : /// Pull timeline to to_safekeepers from from_safekeepers with retries.
959 : ///
960 : /// Returns Ok(()) only if all the pull_timeline requests were successful.
961 0 : async fn tenant_timeline_pull_from_peers(
962 0 : self: &Arc<Self>,
963 0 : tenant_id: TenantId,
964 0 : timeline_id: TimelineId,
965 0 : to_safekeepers: &[Safekeeper],
966 0 : from_safekeepers: &[Safekeeper],
967 0 : ) -> Result<(), ApiError> {
968 0 : let http_hosts = from_safekeepers
969 0 : .iter()
970 0 : .map(|sk| sk.base_url())
971 0 : .collect::<Vec<_>>();
972 :
973 0 : tracing::info!(
974 0 : "pulling timeline to {:?} from {:?}",
975 0 : to_safekeepers
976 0 : .iter()
977 0 : .map(|sk| sk.get_id())
978 0 : .collect::<Vec<_>>(),
979 0 : from_safekeepers
980 0 : .iter()
981 0 : .map(|sk| sk.get_id())
982 0 : .collect::<Vec<_>>()
983 : );
984 :
985 : // TODO(diko): need to pass mconf/generation with the request
986 : // to properly handle tombstones. Ignore tombstones for now.
987 : // Worst case: we leave a timeline on a safekeeper which is not in the current set.
988 0 : let req = PullTimelineRequest {
989 0 : tenant_id,
990 0 : timeline_id,
991 0 : http_hosts,
992 0 : ignore_tombstone: Some(true),
993 0 : };
994 :
995 : const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
996 :
997 0 : let responses = self
998 0 : .tenant_timeline_safekeeper_op(
999 0 : to_safekeepers,
1000 0 : move |client| {
1001 0 : let req = req.clone();
1002 0 : async move { client.pull_timeline(&req).await }
1003 0 : },
1004 : SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
1005 : )
1006 0 : .await?;
1007 :
1008 0 : if let Some((idx, err)) = responses
1009 0 : .iter()
1010 0 : .enumerate()
1011 0 : .find_map(|(idx, res)| Some((idx, res.as_ref().err()?)))
1012 : {
1013 0 : let sk_id = to_safekeepers[idx].get_id();
1014 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1015 0 : "pull_timeline to {sk_id} failed: {err}",
1016 0 : )));
1017 0 : }
1018 :
1019 0 : Ok(())
1020 0 : }
1021 :
1022 : /// Exclude a timeline from safekeepers in parallel with retries.
1023 : /// If an exclude request is unsuccessful, it will be added to
1024 : /// the reconciler, and after that the function will succeed.
1025 0 : async fn tenant_timeline_safekeeper_exclude(
1026 0 : self: &Arc<Self>,
1027 0 : tenant_id: TenantId,
1028 0 : timeline_id: TimelineId,
1029 0 : safekeepers: &[Safekeeper],
1030 0 : config: &membership::Configuration,
1031 0 : ) -> Result<(), ApiError> {
1032 0 : let req = TimelineMembershipSwitchRequest {
1033 0 : mconf: config.clone(),
1034 0 : };
1035 :
1036 : const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
1037 :
1038 0 : let results = self
1039 0 : .tenant_timeline_safekeeper_op(
1040 0 : safekeepers,
1041 0 : move |client| {
1042 0 : let req = req.clone();
1043 0 : async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
1044 0 : },
1045 : SK_EXCLUDE_TIMELINE_TIMEOUT,
1046 : )
1047 0 : .await?;
1048 :
1049 0 : let mut reconcile_requests = Vec::new();
1050 :
1051 0 : for (idx, res) in results.iter().enumerate() {
1052 0 : if res.is_err() {
1053 0 : let sk_id = safekeepers[idx].skp.id;
1054 0 : let pending_op = TimelinePendingOpPersistence {
1055 0 : tenant_id: tenant_id.to_string(),
1056 0 : timeline_id: timeline_id.to_string(),
1057 0 : generation: config.generation.into_inner() as i32,
1058 0 : op_kind: SafekeeperTimelineOpKind::Exclude,
1059 0 : sk_id,
1060 0 : };
1061 0 : tracing::info!("writing pending exclude op for sk id {sk_id}");
1062 0 : self.persistence.insert_pending_op(pending_op).await?;
1063 :
1064 0 : let req = ScheduleRequest {
1065 0 : safekeeper: Box::new(safekeepers[idx].clone()),
1066 0 : host_list: Vec::new(),
1067 0 : tenant_id,
1068 0 : timeline_id: Some(timeline_id),
1069 0 : generation: config.generation.into_inner(),
1070 0 : kind: SafekeeperTimelineOpKind::Exclude,
1071 0 : };
1072 0 : reconcile_requests.push(req);
1073 0 : }
1074 : }
1075 :
1076 0 : if !reconcile_requests.is_empty() {
1077 0 : let locked = self.inner.read().unwrap();
1078 0 : for req in reconcile_requests {
1079 0 : locked.safekeeper_reconcilers.schedule_request(req);
1080 0 : }
1081 0 : }
1082 :
1083 0 : Ok(())
1084 0 : }
1085 :
1086 : /// Migrate timeline safekeeper set to a new set.
1087 : ///
1088 : /// This function implements an algorithm from RFC-035.
1089 : /// <https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md>
1090 0 : pub(crate) async fn tenant_timeline_safekeeper_migrate(
1091 0 : self: &Arc<Self>,
1092 0 : tenant_id: TenantId,
1093 0 : timeline_id: TimelineId,
1094 0 : req: TimelineSafekeeperMigrateRequest,
1095 0 : ) -> Result<(), ApiError> {
1096 0 : let all_safekeepers = self.inner.read().unwrap().safekeepers.clone();
1097 :
1098 0 : let new_sk_set = req.new_sk_set;
1099 :
1100 0 : for sk_id in new_sk_set.iter() {
1101 0 : if !all_safekeepers.contains_key(sk_id) {
1102 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1103 0 : "safekeeper {sk_id} does not exist"
1104 0 : )));
1105 0 : }
1106 : }
1107 :
1108 0 : if new_sk_set.is_empty() {
1109 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1110 0 : "new safekeeper set is empty"
1111 0 : )));
1112 0 : }
1113 :
1114 0 : if new_sk_set.len() < self.config.timeline_safekeeper_count {
1115 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1116 0 : "new safekeeper set must have at least {} safekeepers",
1117 0 : self.config.timeline_safekeeper_count
1118 0 : )));
1119 0 : }
1120 :
1121 0 : let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
1122 0 : let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
1123 : // Construct new member set in advance to validate it.
1124 : // E.g. validates that there is no duplicate safekeepers.
1125 0 : let new_sk_member_set =
1126 0 : Self::make_member_set(&new_safekeepers).map_err(ApiError::BadRequest)?;
1127 :
1128 : // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
1129 0 : let _tenant_lock = trace_shared_lock(
1130 0 : &self.tenant_op_locks,
1131 0 : tenant_id,
1132 0 : TenantOperations::TimelineSafekeeperMigrate,
1133 0 : )
1134 0 : .await;
1135 :
1136 : // 1. Fetch current timeline configuration from the configuration storage.
1137 :
1138 0 : let timeline = self
1139 0 : .persistence
1140 0 : .get_timeline(tenant_id, timeline_id)
1141 0 : .await?;
1142 :
1143 0 : let Some(timeline) = timeline else {
1144 0 : return Err(ApiError::NotFound(
1145 0 : anyhow::anyhow!(
1146 0 : "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
1147 0 : )
1148 0 : .into(),
1149 0 : ));
1150 : };
1151 :
1152 0 : let cur_sk_set = timeline
1153 0 : .sk_set
1154 0 : .iter()
1155 0 : .map(|&id| NodeId(id as u64))
1156 0 : .collect::<Vec<_>>();
1157 :
1158 : // Validate that we are not migrating to a decomissioned safekeeper.
1159 0 : for sk in new_safekeepers.iter() {
1160 0 : if !cur_sk_set.contains(&sk.get_id())
1161 0 : && sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned
1162 : {
1163 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1164 0 : "safekeeper {} is decomissioned",
1165 0 : sk.get_id()
1166 0 : )));
1167 0 : }
1168 : }
1169 :
1170 0 : tracing::info!(
1171 : ?cur_sk_set,
1172 : ?new_sk_set,
1173 0 : "Migrating timeline to new safekeeper set",
1174 : );
1175 :
1176 0 : let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
1177 :
1178 0 : if let Some(ref presistent_new_sk_set) = timeline.new_sk_set {
1179 : // 2. If it is already joint one and new_set is different from desired_set refuse to change.
1180 0 : if presistent_new_sk_set
1181 0 : .iter()
1182 0 : .map(|&id| NodeId(id as u64))
1183 0 : .ne(new_sk_set.iter().cloned())
1184 : {
1185 0 : tracing::info!(
1186 : ?presistent_new_sk_set,
1187 : ?new_sk_set,
1188 0 : "different new safekeeper set is already set in the database",
1189 : );
1190 0 : return Err(ApiError::Conflict(format!(
1191 0 : "the timeline is already migrating to a different safekeeper set: {presistent_new_sk_set:?}"
1192 0 : )));
1193 0 : }
1194 : // It it is the same new_sk_set, we can continue the migration (retry).
1195 : } else {
1196 : // 3. No active migration yet.
1197 : // Increment current generation and put desired_set to new_sk_set.
1198 0 : generation = generation.next();
1199 :
1200 0 : self.persistence
1201 0 : .update_timeline_membership(
1202 0 : tenant_id,
1203 0 : timeline_id,
1204 0 : generation,
1205 0 : &cur_sk_set,
1206 0 : Some(&new_sk_set),
1207 0 : )
1208 0 : .await?;
1209 : }
1210 :
1211 0 : let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
1212 0 : let cur_sk_member_set =
1213 0 : Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
1214 :
1215 0 : let joint_config = membership::Configuration {
1216 0 : generation,
1217 0 : members: cur_sk_member_set,
1218 0 : new_members: Some(new_sk_member_set.clone()),
1219 0 : };
1220 :
1221 : // 4. Call PUT configuration on safekeepers from the current set,
1222 : // delivering them joint_conf.
1223 :
1224 : // Notify cplane/compute about the membership change BEFORE changing the membership on safekeepers.
1225 : // This way the compute will know about new safekeepers from joint_config before we require to
1226 : // collect a quorum from them.
1227 0 : self.cplane_notify_safekeepers(tenant_id, timeline_id, &joint_config)
1228 0 : .await?;
1229 :
1230 0 : let results = self
1231 0 : .tenant_timeline_set_membership_quorum(
1232 0 : tenant_id,
1233 0 : timeline_id,
1234 0 : &cur_safekeepers,
1235 0 : &joint_config,
1236 0 : None, // no min position
1237 0 : )
1238 0 : .await?;
1239 :
1240 0 : let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
1241 0 : for res in results.into_iter().flatten() {
1242 0 : let sk_position = (res.last_log_term, res.flush_lsn);
1243 0 : if sync_position < sk_position {
1244 0 : sync_position = sk_position;
1245 0 : }
1246 : }
1247 :
1248 0 : tracing::info!(
1249 : %generation,
1250 : ?sync_position,
1251 0 : "safekeepers set membership updated",
1252 : );
1253 :
1254 : // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
1255 : // by doing pull_timeline from the majority of the current set.
1256 :
1257 : // Filter out safekeepers which are already in the current set.
1258 0 : let from_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
1259 0 : let pull_to_safekeepers = new_safekeepers
1260 0 : .iter()
1261 0 : .filter(|sk| !from_ids.contains(&sk.get_id()))
1262 0 : .cloned()
1263 0 : .collect::<Vec<_>>();
1264 :
1265 0 : self.tenant_timeline_pull_from_peers(
1266 0 : tenant_id,
1267 0 : timeline_id,
1268 0 : &pull_to_safekeepers,
1269 0 : &cur_safekeepers,
1270 0 : )
1271 0 : .await?;
1272 :
1273 : // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
1274 :
1275 : // TODO(diko): do we need to bump timeline term?
1276 :
1277 : // 7. Repeatedly call PUT configuration on safekeepers from the new set,
1278 : // delivering them joint_conf and collecting their positions.
1279 :
1280 0 : tracing::info!(?sync_position, "waiting for safekeepers to sync position");
1281 :
1282 0 : self.tenant_timeline_set_membership_quorum(
1283 0 : tenant_id,
1284 0 : timeline_id,
1285 0 : &new_safekeepers,
1286 0 : &joint_config,
1287 0 : Some(sync_position),
1288 0 : )
1289 0 : .await?;
1290 :
1291 : // 8. Create new_conf: Configuration incrementing joint_conf generation and
1292 : // having new safekeeper set as sk_set and None new_sk_set.
1293 :
1294 0 : let generation = generation.next();
1295 :
1296 0 : let new_conf = membership::Configuration {
1297 0 : generation,
1298 0 : members: new_sk_member_set,
1299 0 : new_members: None,
1300 0 : };
1301 :
1302 0 : self.persistence
1303 0 : .update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None)
1304 0 : .await?;
1305 :
1306 : // TODO(diko): at this point we have already updated the timeline in the database,
1307 : // but we still need to notify safekeepers and cplane about the new configuration,
1308 : // and put delition of the timeline from the old safekeepers into the reconciler.
1309 : // Ideally it should be done atomically, but now it's not.
1310 : // Worst case: the timeline is not deleted from old safekeepers,
1311 : // the compute may require both quorums till the migration is retried and completed.
1312 :
1313 0 : self.tenant_timeline_set_membership_quorum(
1314 0 : tenant_id,
1315 0 : timeline_id,
1316 0 : &new_safekeepers,
1317 0 : &new_conf,
1318 0 : None, // no min position
1319 0 : )
1320 0 : .await?;
1321 :
1322 0 : let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
1323 0 : let exclude_safekeepers = cur_safekeepers
1324 0 : .into_iter()
1325 0 : .filter(|sk| !new_ids.contains(&sk.get_id()))
1326 0 : .collect::<Vec<_>>();
1327 0 : self.tenant_timeline_safekeeper_exclude(
1328 0 : tenant_id,
1329 0 : timeline_id,
1330 0 : &exclude_safekeepers,
1331 0 : &new_conf,
1332 0 : )
1333 0 : .await?;
1334 :
1335 : // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
1336 : // This way the compute will stop talking to excluded safekeepers only after we stop requiring to
1337 : // collect a quorum from them.
1338 0 : self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf)
1339 0 : .await?;
1340 :
1341 0 : Ok(())
1342 0 : }
1343 :
1344 : /// Notify cplane about safekeeper membership change.
1345 : /// The cplane will receive a joint set of safekeepers as a safekeeper list.
1346 0 : async fn cplane_notify_safekeepers(
1347 0 : &self,
1348 0 : tenant_id: TenantId,
1349 0 : timeline_id: TimelineId,
1350 0 : mconf: &membership::Configuration,
1351 0 : ) -> Result<(), ApiError> {
1352 0 : let mut safekeepers = Vec::new();
1353 0 : let mut ids: HashSet<_> = HashSet::new();
1354 :
1355 0 : for member in mconf
1356 0 : .members
1357 0 : .m
1358 0 : .iter()
1359 0 : .chain(mconf.new_members.iter().flat_map(|m| m.m.iter()))
1360 : {
1361 0 : if ids.insert(member.id) {
1362 0 : safekeepers.push(compute_hook::SafekeeperInfo {
1363 0 : id: member.id,
1364 0 : hostname: Some(member.host.clone()),
1365 0 : });
1366 0 : }
1367 : }
1368 :
1369 0 : self.compute_hook
1370 0 : .notify_safekeepers(
1371 0 : compute_hook::SafekeepersUpdate {
1372 0 : tenant_id,
1373 0 : timeline_id,
1374 0 : generation: mconf.generation,
1375 0 : safekeepers,
1376 0 : },
1377 0 : &self.cancel,
1378 0 : )
1379 0 : .await
1380 0 : .map_err(|err| {
1381 0 : ApiError::InternalServerError(anyhow::anyhow!(
1382 0 : "failed to notify cplane about safekeeper membership change: {err}"
1383 0 : ))
1384 0 : })
1385 0 : }
1386 : }
|