Line data Source code
1 : use crate::pageserver_client::PageserverClient;
2 : use crate::persistence::Persistence;
3 : use crate::{compute_hook, service};
4 : use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
5 : use pageserver_api::models::{
6 : LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
7 : };
8 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
9 : use pageserver_client::mgmt_api;
10 : use reqwest::StatusCode;
11 : use std::borrow::Cow;
12 : use std::collections::HashMap;
13 : use std::sync::Arc;
14 : use std::time::{Duration, Instant};
15 : use tokio_util::sync::CancellationToken;
16 : use utils::backoff::exponential_backoff;
17 : use utils::failpoint_support;
18 : use utils::generation::Generation;
19 : use utils::id::{NodeId, TimelineId};
20 : use utils::lsn::Lsn;
21 : use utils::pausable_failpoint;
22 : use utils::sync::gate::GateGuard;
23 :
24 : use crate::compute_hook::{ComputeHook, NotifyError};
25 : use crate::node::Node;
26 : use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
27 :
28 : const DEFAULT_HEATMAP_PERIOD: &str = "60s";
29 :
30 : /// Object with the lifetime of the background reconcile task that is created
31 : /// for tenants which have a difference between their intent and observed states.
32 : pub(super) struct Reconciler {
33 : /// See [`crate::tenant_shard::TenantShard`] for the meanings of these fields: they are a snapshot
34 : /// of a tenant's state from when we spawned a reconcile task.
35 : pub(super) tenant_shard_id: TenantShardId,
36 : pub(crate) shard: ShardIdentity,
37 : pub(crate) placement_policy: PlacementPolicy,
38 : pub(crate) generation: Option<Generation>,
39 : pub(crate) intent: TargetState,
40 :
41 : /// Nodes not referenced by [`Self::intent`], from which we should try
42 : /// to detach this tenant shard.
43 : pub(crate) detach: Vec<Node>,
44 :
45 : /// Configuration specific to this reconciler
46 : pub(crate) reconciler_config: ReconcilerConfig,
47 :
48 : pub(crate) config: TenantConfig,
49 : pub(crate) preferred_az: Option<AvailabilityZone>,
50 :
51 : /// Observed state from the point of view of the reconciler.
52 : /// This gets updated as the reconciliation makes progress.
53 : pub(crate) observed: ObservedState,
54 :
55 : /// Snapshot of the observed state at the point when the reconciler
56 : /// was spawned.
57 : pub(crate) original_observed: ObservedState,
58 :
59 : pub(crate) service_config: service::Config,
60 :
61 : /// A hook to notify the running postgres instances when we change the location
62 : /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
63 : /// and guarantee eventual retries.
64 : pub(crate) compute_hook: Arc<ComputeHook>,
65 :
66 : /// To avoid stalling if the cloud control plane is unavailable, we may proceed
67 : /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
68 : /// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
69 : pub(crate) compute_notify_failure: bool,
70 :
71 : /// Reconciler is responsible for keeping alive semaphore units that limit concurrency on how many
72 : /// we will spawn.
73 : pub(crate) _resource_units: ReconcileUnits,
74 :
75 : /// A means to abort background reconciliation: it is essential to
76 : /// call this when something changes in the original TenantShard that
77 : /// will make this reconciliation impossible or unnecessary, for
78 : /// example when a pageserver node goes offline, or the PlacementPolicy for
79 : /// the tenant is changed.
80 : pub(crate) cancel: CancellationToken,
81 :
82 : /// Reconcilers are registered with a Gate so that during a graceful shutdown we
83 : /// can wait for all the reconcilers to respond to their cancellation tokens.
84 : pub(crate) _gate_guard: GateGuard,
85 :
86 : /// Access to persistent storage for updating generation numbers
87 : pub(crate) persistence: Arc<Persistence>,
88 : }
89 :
90 : pub(crate) struct ReconcilerConfigBuilder {
91 : config: ReconcilerConfig,
92 : }
93 :
94 : impl ReconcilerConfigBuilder {
95 0 : pub(crate) fn new() -> Self {
96 0 : Self {
97 0 : config: ReconcilerConfig::default(),
98 0 : }
99 0 : }
100 :
101 0 : pub(crate) fn secondary_warmup_timeout(self, value: Duration) -> Self {
102 0 : Self {
103 0 : config: ReconcilerConfig {
104 0 : secondary_warmup_timeout: Some(value),
105 0 : ..self.config
106 0 : },
107 0 : }
108 0 : }
109 :
110 0 : pub(crate) fn secondary_download_request_timeout(self, value: Duration) -> Self {
111 0 : Self {
112 0 : config: ReconcilerConfig {
113 0 : secondary_download_request_timeout: Some(value),
114 0 : ..self.config
115 0 : },
116 0 : }
117 0 : }
118 :
119 0 : pub(crate) fn build(self) -> ReconcilerConfig {
120 0 : self.config
121 0 : }
122 : }
123 :
124 : #[derive(Default, Debug, Copy, Clone)]
125 : pub(crate) struct ReconcilerConfig {
126 : // During live migration give up on warming-up the secondary
127 : // after this timeout.
128 : secondary_warmup_timeout: Option<Duration>,
129 :
130 : // During live migrations this is the amount of time that
131 : // the pagserver will hold our poll.
132 : secondary_download_request_timeout: Option<Duration>,
133 : }
134 :
135 : impl ReconcilerConfig {
136 0 : pub(crate) fn get_secondary_warmup_timeout(&self) -> Duration {
137 : const SECONDARY_WARMUP_TIMEOUT_DEFAULT: Duration = Duration::from_secs(300);
138 0 : self.secondary_warmup_timeout
139 0 : .unwrap_or(SECONDARY_WARMUP_TIMEOUT_DEFAULT)
140 0 : }
141 :
142 0 : pub(crate) fn get_secondary_download_request_timeout(&self) -> Duration {
143 : const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(20);
144 0 : self.secondary_download_request_timeout
145 0 : .unwrap_or(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT)
146 0 : }
147 : }
148 :
149 : /// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
150 : pub(crate) struct ReconcileUnits {
151 : _sem_units: tokio::sync::OwnedSemaphorePermit,
152 : }
153 :
154 : impl ReconcileUnits {
155 0 : pub(crate) fn new(sem_units: tokio::sync::OwnedSemaphorePermit) -> Self {
156 0 : Self {
157 0 : _sem_units: sem_units,
158 0 : }
159 0 : }
160 : }
161 :
162 : /// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
163 : /// reference counting for Scheduler. The IntentState is what the scheduler works with,
164 : /// and the TargetState is just the instruction for a particular Reconciler run.
165 : #[derive(Debug)]
166 : pub(crate) struct TargetState {
167 : pub(crate) attached: Option<Node>,
168 : pub(crate) secondary: Vec<Node>,
169 : }
170 :
171 : impl TargetState {
172 0 : pub(crate) fn from_intent(nodes: &HashMap<NodeId, Node>, intent: &IntentState) -> Self {
173 0 : Self {
174 0 : attached: intent.get_attached().map(|n| {
175 0 : nodes
176 0 : .get(&n)
177 0 : .expect("Intent attached referenced non-existent node")
178 0 : .clone()
179 0 : }),
180 0 : secondary: intent
181 0 : .get_secondary()
182 0 : .iter()
183 0 : .map(|n| {
184 0 : nodes
185 0 : .get(n)
186 0 : .expect("Intent secondary referenced non-existent node")
187 0 : .clone()
188 0 : })
189 0 : .collect(),
190 0 : }
191 0 : }
192 : }
193 :
194 : #[derive(thiserror::Error, Debug)]
195 : pub(crate) enum ReconcileError {
196 : #[error(transparent)]
197 : Remote(#[from] mgmt_api::Error),
198 : #[error(transparent)]
199 : Notify(#[from] NotifyError),
200 : #[error("Cancelled")]
201 : Cancel,
202 : #[error(transparent)]
203 : Other(#[from] anyhow::Error),
204 : }
205 :
206 : impl Reconciler {
207 0 : async fn location_config(
208 0 : &mut self,
209 0 : node: &Node,
210 0 : config: LocationConfig,
211 0 : flush_ms: Option<Duration>,
212 0 : lazy: bool,
213 0 : ) -> Result<(), ReconcileError> {
214 0 : if !node.is_available() && config.mode == LocationConfigMode::Detached {
215 : // Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
216 : // will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
217 : // what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
218 0 : tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
219 0 : self.observed.locations.remove(&node.get_id());
220 0 : return Ok(());
221 0 : }
222 0 :
223 0 : self.observed
224 0 : .locations
225 0 : .insert(node.get_id(), ObservedStateLocation { conf: None });
226 0 :
227 0 : // TODO: amend locations that use long-polling: they will hit this timeout.
228 0 : let timeout = Duration::from_secs(25);
229 0 :
230 0 : tracing::info!("location_config({node}) calling: {:?}", config);
231 0 : let tenant_shard_id = self.tenant_shard_id;
232 0 : let config_ref = &config;
233 0 : match node
234 0 : .with_client_retries(
235 0 : |client| async move {
236 0 : let config = config_ref.clone();
237 0 : client
238 0 : .location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
239 0 : .await
240 0 : },
241 0 : &self.service_config.jwt_token,
242 0 : 1,
243 0 : 3,
244 0 : timeout,
245 0 : &self.cancel,
246 0 : )
247 0 : .await
248 : {
249 0 : Some(Ok(_)) => {}
250 0 : Some(Err(e)) => return Err(e.into()),
251 0 : None => return Err(ReconcileError::Cancel),
252 : };
253 0 : tracing::info!("location_config({node}) complete: {:?}", config);
254 :
255 0 : match config.mode {
256 0 : LocationConfigMode::Detached => {
257 0 : self.observed.locations.remove(&node.get_id());
258 0 : }
259 0 : _ => {
260 0 : self.observed
261 0 : .locations
262 0 : .insert(node.get_id(), ObservedStateLocation { conf: Some(config) });
263 0 : }
264 : }
265 :
266 0 : Ok(())
267 0 : }
268 :
269 0 : fn get_node(&self, node_id: &NodeId) -> Option<&Node> {
270 0 : if let Some(node) = self.intent.attached.as_ref() {
271 0 : if node.get_id() == *node_id {
272 0 : return Some(node);
273 0 : }
274 0 : }
275 :
276 0 : if let Some(node) = self
277 0 : .intent
278 0 : .secondary
279 0 : .iter()
280 0 : .find(|n| n.get_id() == *node_id)
281 : {
282 0 : return Some(node);
283 0 : }
284 :
285 0 : if let Some(node) = self.detach.iter().find(|n| n.get_id() == *node_id) {
286 0 : return Some(node);
287 0 : }
288 0 :
289 0 : None
290 0 : }
291 :
292 0 : async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
293 0 : let destination = if let Some(node) = &self.intent.attached {
294 0 : match self.observed.locations.get(&node.get_id()) {
295 0 : Some(conf) => {
296 : // We will do a live migration only if the intended destination is not
297 : // currently in an attached state.
298 0 : match &conf.conf {
299 0 : Some(conf) if conf.mode == LocationConfigMode::Secondary => {
300 0 : // Fall through to do a live migration
301 0 : node
302 : }
303 : None | Some(_) => {
304 : // Attached or uncertain: don't do a live migration, proceed
305 : // with a general-case reconciliation
306 0 : tracing::info!("maybe_live_migrate: destination is None or attached");
307 0 : return Ok(());
308 : }
309 : }
310 : }
311 : None => {
312 : // Our destination is not attached: maybe live migrate if some other
313 : // node is currently attached. Fall through.
314 0 : node
315 : }
316 : }
317 : } else {
318 : // No intent to be attached
319 0 : tracing::info!("maybe_live_migrate: no attached intent");
320 0 : return Ok(());
321 : };
322 :
323 0 : let mut origin = None;
324 0 : for (node_id, state) in &self.observed.locations {
325 0 : if let Some(observed_conf) = &state.conf {
326 0 : if observed_conf.mode == LocationConfigMode::AttachedSingle {
327 : // We will only attempt live migration if the origin is not offline: this
328 : // avoids trying to do it while reconciling after responding to an HA failover.
329 0 : if let Some(node) = self.get_node(node_id) {
330 0 : if node.is_available() {
331 0 : origin = Some(node.clone());
332 0 : break;
333 0 : }
334 0 : }
335 0 : }
336 0 : }
337 : }
338 :
339 0 : let Some(origin) = origin else {
340 0 : tracing::info!("maybe_live_migrate: no origin found");
341 0 : return Ok(());
342 : };
343 :
344 : // We have an origin and a destination: proceed to do the live migration
345 0 : tracing::info!("Live migrating {}->{}", origin, destination);
346 0 : self.live_migrate(origin, destination.clone()).await?;
347 :
348 0 : Ok(())
349 0 : }
350 :
351 0 : async fn get_lsns(
352 0 : &self,
353 0 : tenant_shard_id: TenantShardId,
354 0 : node: &Node,
355 0 : ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
356 0 : let client = PageserverClient::new(
357 0 : node.get_id(),
358 0 : node.base_url(),
359 0 : self.service_config.jwt_token.as_deref(),
360 0 : );
361 :
362 0 : let timelines = client.timeline_list(&tenant_shard_id).await?;
363 0 : Ok(timelines
364 0 : .into_iter()
365 0 : .map(|t| (t.timeline_id, t.last_record_lsn))
366 0 : .collect())
367 0 : }
368 :
369 0 : async fn secondary_download(
370 0 : &self,
371 0 : tenant_shard_id: TenantShardId,
372 0 : node: &Node,
373 0 : ) -> Result<(), ReconcileError> {
374 0 : // This is not the timeout for a request, but the total amount of time we're willing to wait
375 0 : // for a secondary location to get up to date before
376 0 : let total_download_timeout = self.reconciler_config.get_secondary_warmup_timeout();
377 0 :
378 0 : // This the long-polling interval for the secondary download requests we send to destination pageserver
379 0 : // during a migration.
380 0 : let request_download_timeout = self
381 0 : .reconciler_config
382 0 : .get_secondary_download_request_timeout();
383 0 :
384 0 : let started_at = Instant::now();
385 :
386 : loop {
387 0 : let (status, progress) = match node
388 0 : .with_client_retries(
389 0 : |client| async move {
390 0 : client
391 0 : .tenant_secondary_download(
392 0 : tenant_shard_id,
393 0 : Some(request_download_timeout),
394 0 : )
395 0 : .await
396 0 : },
397 0 : &self.service_config.jwt_token,
398 0 : 1,
399 0 : 3,
400 0 : request_download_timeout * 2,
401 0 : &self.cancel,
402 0 : )
403 0 : .await
404 : {
405 0 : None => Err(ReconcileError::Cancel),
406 0 : Some(Ok(v)) => Ok(v),
407 0 : Some(Err(e)) => {
408 0 : // Give up, but proceed: it's unfortunate if we couldn't freshen the destination before
409 0 : // attaching, but we should not let an issue with a secondary location stop us proceeding
410 0 : // with a live migration.
411 0 : tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})");
412 0 : return Ok(());
413 : }
414 0 : }?;
415 :
416 0 : if status == StatusCode::OK {
417 0 : tracing::info!(
418 0 : "Downloads to {} complete: {}/{} layers, {}/{} bytes",
419 : node,
420 : progress.layers_downloaded,
421 : progress.layers_total,
422 : progress.bytes_downloaded,
423 : progress.bytes_total
424 : );
425 0 : return Ok(());
426 0 : } else if status == StatusCode::ACCEPTED {
427 0 : let total_runtime = started_at.elapsed();
428 0 : if total_runtime > total_download_timeout {
429 0 : tracing::warn!("Timed out after {}ms downloading layers to {node}. Progress so far: {}/{} layers, {}/{} bytes",
430 0 : total_runtime.as_millis(),
431 : progress.layers_downloaded,
432 : progress.layers_total,
433 : progress.bytes_downloaded,
434 : progress.bytes_total
435 : );
436 : // Give up, but proceed: an incompletely warmed destination doesn't prevent migration working,
437 : // it just makes the I/O performance for users less good.
438 0 : return Ok(());
439 0 : }
440 0 :
441 0 : // Log and proceed around the loop to retry. We don't sleep between requests, because our HTTP call
442 0 : // to the pageserver is a long-poll.
443 0 : tracing::info!(
444 0 : "Downloads to {} not yet complete: {}/{} layers, {}/{} bytes",
445 : node,
446 : progress.layers_downloaded,
447 : progress.layers_total,
448 : progress.bytes_downloaded,
449 : progress.bytes_total
450 : );
451 0 : }
452 : }
453 0 : }
454 :
455 : /// This function does _not_ mutate any state, so it is cancellation safe.
456 : ///
457 : /// This function does not respect [`Self::cancel`], callers should handle that.
458 0 : async fn await_lsn(
459 0 : &self,
460 0 : tenant_shard_id: TenantShardId,
461 0 : node: &Node,
462 0 : baseline: HashMap<TimelineId, Lsn>,
463 0 : ) -> anyhow::Result<()> {
464 : loop {
465 0 : let latest = match self.get_lsns(tenant_shard_id, node).await {
466 0 : Ok(l) => l,
467 0 : Err(e) => {
468 0 : tracing::info!("🕑 Can't get LSNs on node {node} yet, waiting ({e})",);
469 0 : tokio::time::sleep(Duration::from_millis(500)).await;
470 0 : continue;
471 : }
472 : };
473 :
474 0 : let mut any_behind: bool = false;
475 0 : for (timeline_id, baseline_lsn) in &baseline {
476 0 : match latest.get(timeline_id) {
477 0 : Some(latest_lsn) => {
478 0 : tracing::info!(timeline_id = %timeline_id, "🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
479 0 : if latest_lsn < baseline_lsn {
480 0 : any_behind = true;
481 0 : }
482 : }
483 0 : None => {
484 0 : // Timeline was deleted in the meantime - ignore it
485 0 : }
486 : }
487 : }
488 :
489 0 : if !any_behind {
490 0 : tracing::info!("✅ LSN caught up. Proceeding...");
491 0 : break;
492 : } else {
493 0 : tokio::time::sleep(Duration::from_millis(500)).await;
494 : }
495 : }
496 :
497 0 : Ok(())
498 0 : }
499 :
500 0 : pub async fn live_migrate(
501 0 : &mut self,
502 0 : origin_ps: Node,
503 0 : dest_ps: Node,
504 0 : ) -> Result<(), ReconcileError> {
505 0 : // `maybe_live_migrate` is responsibble for sanity of inputs
506 0 : assert!(origin_ps.get_id() != dest_ps.get_id());
507 :
508 0 : fn build_location_config(
509 0 : shard: &ShardIdentity,
510 0 : config: &TenantConfig,
511 0 : mode: LocationConfigMode,
512 0 : generation: Option<Generation>,
513 0 : secondary_conf: Option<LocationConfigSecondary>,
514 0 : ) -> LocationConfig {
515 0 : LocationConfig {
516 0 : mode,
517 0 : generation: generation.map(|g| g.into().unwrap()),
518 0 : secondary_conf,
519 0 : tenant_conf: config.clone(),
520 0 : shard_number: shard.number.0,
521 0 : shard_count: shard.count.literal(),
522 0 : shard_stripe_size: shard.stripe_size.0,
523 0 : }
524 0 : }
525 :
526 0 : tracing::info!("🔁 Switching origin node {origin_ps} to stale mode",);
527 :
528 : // FIXME: it is incorrect to use self.generation here, we should use the generation
529 : // from the ObservedState of the origin pageserver (it might be older than self.generation)
530 0 : let stale_conf = build_location_config(
531 0 : &self.shard,
532 0 : &self.config,
533 0 : LocationConfigMode::AttachedStale,
534 0 : self.generation,
535 0 : None,
536 0 : );
537 0 : self.location_config(&origin_ps, stale_conf, Some(Duration::from_secs(10)), false)
538 0 : .await?;
539 :
540 0 : let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps).await?);
541 :
542 : // If we are migrating to a destination that has a secondary location, warm it up first
543 0 : if let Some(destination_conf) = self.observed.locations.get(&dest_ps.get_id()) {
544 0 : if let Some(destination_conf) = &destination_conf.conf {
545 0 : if destination_conf.mode == LocationConfigMode::Secondary {
546 0 : tracing::info!("🔁 Downloading latest layers to destination node {dest_ps}",);
547 0 : self.secondary_download(self.tenant_shard_id, &dest_ps)
548 0 : .await?;
549 0 : }
550 0 : }
551 0 : }
552 :
553 0 : pausable_failpoint!("reconciler-live-migrate-pre-generation-inc");
554 :
555 : // Increment generation before attaching to new pageserver
556 : self.generation = Some(
557 0 : self.persistence
558 0 : .increment_generation(self.tenant_shard_id, dest_ps.get_id())
559 0 : .await?,
560 : );
561 :
562 0 : let dest_conf = build_location_config(
563 0 : &self.shard,
564 0 : &self.config,
565 0 : LocationConfigMode::AttachedMulti,
566 0 : self.generation,
567 0 : None,
568 0 : );
569 0 :
570 0 : tracing::info!("🔁 Attaching to pageserver {dest_ps}");
571 0 : self.location_config(&dest_ps, dest_conf, None, false)
572 0 : .await?;
573 :
574 0 : pausable_failpoint!("reconciler-live-migrate-pre-await-lsn");
575 :
576 0 : if let Some(baseline) = baseline_lsns {
577 0 : tracing::info!("🕑 Waiting for LSN to catch up...");
578 0 : tokio::select! {
579 0 : r = self.await_lsn(self.tenant_shard_id, &dest_ps, baseline) => {r?;}
580 0 : _ = self.cancel.cancelled() => {return Err(ReconcileError::Cancel)}
581 : };
582 0 : }
583 :
584 0 : tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");
585 :
586 : // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
587 : // the origin without notifying compute, we will render the tenant unavailable.
588 0 : self.compute_notify_blocking(&origin_ps).await?;
589 0 : pausable_failpoint!("reconciler-live-migrate-post-notify");
590 :
591 : // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
592 : // this location will be deleted in the general case reconciliation that runs after this.
593 0 : let origin_secondary_conf = build_location_config(
594 0 : &self.shard,
595 0 : &self.config,
596 0 : LocationConfigMode::Secondary,
597 0 : None,
598 0 : Some(LocationConfigSecondary { warm: true }),
599 0 : );
600 0 : self.location_config(&origin_ps, origin_secondary_conf.clone(), None, false)
601 0 : .await?;
602 : // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
603 : // partway through. In fact, all location conf API calls should be in a wrapper that sets
604 : // the observed state to None, then runs, then sets it to what we wrote.
605 0 : self.observed.locations.insert(
606 0 : origin_ps.get_id(),
607 0 : ObservedStateLocation {
608 0 : conf: Some(origin_secondary_conf),
609 0 : },
610 0 : );
611 0 :
612 0 : pausable_failpoint!("reconciler-live-migrate-post-detach");
613 :
614 0 : tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
615 0 : let dest_final_conf = build_location_config(
616 0 : &self.shard,
617 0 : &self.config,
618 0 : LocationConfigMode::AttachedSingle,
619 0 : self.generation,
620 0 : None,
621 0 : );
622 0 : self.location_config(&dest_ps, dest_final_conf.clone(), None, false)
623 0 : .await?;
624 0 : self.observed.locations.insert(
625 0 : dest_ps.get_id(),
626 0 : ObservedStateLocation {
627 0 : conf: Some(dest_final_conf),
628 0 : },
629 0 : );
630 0 :
631 0 : tracing::info!("✅ Migration complete");
632 :
633 0 : Ok(())
634 0 : }
635 :
636 0 : async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
637 : // If the attached node has uncertain state, read it from the pageserver before proceeding: this
638 : // is important to avoid spurious generation increments.
639 : //
640 : // We don't need to do this for secondary/detach locations because it's harmless to just PUT their
641 : // location conf, whereas for attached locations it can interrupt clients if we spuriously destroy/recreate
642 : // the `Timeline` object in the pageserver.
643 :
644 0 : let Some(attached_node) = self.intent.attached.as_ref() else {
645 : // Nothing to do
646 0 : return Ok(());
647 : };
648 :
649 0 : if matches!(
650 0 : self.observed.locations.get(&attached_node.get_id()),
651 : Some(ObservedStateLocation { conf: None })
652 : ) {
653 0 : let tenant_shard_id = self.tenant_shard_id;
654 0 : let observed_conf = match attached_node
655 0 : .with_client_retries(
656 0 : |client| async move { client.get_location_config(tenant_shard_id).await },
657 0 : &self.service_config.jwt_token,
658 0 : 1,
659 0 : 1,
660 0 : Duration::from_secs(5),
661 0 : &self.cancel,
662 0 : )
663 0 : .await
664 : {
665 0 : Some(Ok(observed)) => Some(observed),
666 0 : Some(Err(mgmt_api::Error::ApiError(status, _msg)))
667 0 : if status == StatusCode::NOT_FOUND =>
668 0 : {
669 0 : None
670 : }
671 0 : Some(Err(e)) => return Err(e.into()),
672 0 : None => return Err(ReconcileError::Cancel),
673 : };
674 0 : tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
675 0 : match observed_conf {
676 0 : Some(conf) => {
677 0 : // Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
678 0 : // if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
679 0 : self.observed
680 0 : .locations
681 0 : .insert(attached_node.get_id(), ObservedStateLocation { conf });
682 0 : }
683 0 : None => {
684 0 : // Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
685 0 : self.observed.locations.remove(&attached_node.get_id());
686 0 : }
687 : }
688 0 : }
689 :
690 0 : Ok(())
691 0 : }
692 :
693 : /// Reconciling a tenant makes API calls to pageservers until the observed state
694 : /// matches the intended state.
695 : ///
696 : /// First we apply special case handling (e.g. for live migrations), and then a
697 : /// general case reconciliation where we walk through the intent by pageserver
698 : /// and call out to the pageserver to apply the desired state.
699 0 : pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
700 0 : // Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
701 0 : self.maybe_refresh_observed().await?;
702 :
703 : // Special case: live migration
704 0 : self.maybe_live_migrate().await?;
705 :
706 : // If the attached pageserver is not attached, do so now.
707 0 : if let Some(node) = self.intent.attached.as_ref() {
708 : // If we are in an attached policy, then generation must have been set (null generations
709 : // are only present when a tenant is initially loaded with a secondary policy)
710 0 : debug_assert!(self.generation.is_some());
711 0 : let Some(generation) = self.generation else {
712 0 : return Err(ReconcileError::Other(anyhow::anyhow!(
713 0 : "Attempted to attach with NULL generation"
714 0 : )));
715 : };
716 :
717 0 : let mut wanted_conf = attached_location_conf(
718 0 : generation,
719 0 : &self.shard,
720 0 : &self.config,
721 0 : &self.placement_policy,
722 0 : );
723 0 : match self.observed.locations.get(&node.get_id()) {
724 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
725 0 : // Nothing to do
726 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
727 : }
728 0 : observed => {
729 : // In all cases other than a matching observed configuration, we will
730 : // reconcile this location. This includes locations with different configurations, as well
731 : // as locations with unknown (None) observed state.
732 :
733 : // Incrementing generation is the safe general case, but is inefficient for changes that only
734 : // modify some details (e.g. the tenant's config).
735 0 : let increment_generation = match observed {
736 0 : None => true,
737 0 : Some(ObservedStateLocation { conf: None }) => true,
738 : Some(ObservedStateLocation {
739 0 : conf: Some(observed),
740 0 : }) => {
741 0 : let generations_match = observed.generation == wanted_conf.generation;
742 0 :
743 0 : // We may skip incrementing the generation if the location is already in the expected mode and
744 0 : // generation. In principle it would also be safe to skip from certain other modes (e.g. AttachedStale),
745 0 : // but such states are handled inside `live_migrate`, and if we see that state here we're cleaning up
746 0 : // after a restart/crash, so fall back to the universally safe path of incrementing generation.
747 0 : !generations_match || (observed.mode != wanted_conf.mode)
748 : }
749 : };
750 :
751 0 : if increment_generation {
752 0 : let generation = self
753 0 : .persistence
754 0 : .increment_generation(self.tenant_shard_id, node.get_id())
755 0 : .await?;
756 0 : self.generation = Some(generation);
757 0 : wanted_conf.generation = generation.into();
758 0 : }
759 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
760 :
761 : // Because `node` comes from a ref to &self, clone it before calling into a &mut self
762 : // function: this could be avoided by refactoring the state mutated by location_config into
763 : // a separate type to Self.
764 0 : let node = node.clone();
765 0 :
766 0 : // Use lazy=true, because we may run many of Self concurrently, and do not want to
767 0 : // overload the pageserver with logical size calculations.
768 0 : self.location_config(&node, wanted_conf, None, true).await?;
769 0 : self.compute_notify().await?;
770 : }
771 : }
772 0 : }
773 :
774 : // Configure secondary locations: if these were previously attached this
775 : // implicitly downgrades them from attached to secondary.
776 0 : let mut changes = Vec::new();
777 0 : for node in &self.intent.secondary {
778 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
779 0 : match self.observed.locations.get(&node.get_id()) {
780 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
781 0 : // Nothing to do
782 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
783 : }
784 : _ => {
785 : // In all cases other than a matching observed configuration, we will
786 : // reconcile this location.
787 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
788 0 : changes.push((node.clone(), wanted_conf))
789 : }
790 : }
791 : }
792 :
793 : // Detach any extraneous pageservers that are no longer referenced
794 : // by our intent.
795 0 : for node in &self.detach {
796 0 : changes.push((
797 0 : node.clone(),
798 0 : LocationConfig {
799 0 : mode: LocationConfigMode::Detached,
800 0 : generation: None,
801 0 : secondary_conf: None,
802 0 : shard_number: self.shard.number.0,
803 0 : shard_count: self.shard.count.literal(),
804 0 : shard_stripe_size: self.shard.stripe_size.0,
805 0 : tenant_conf: self.config.clone(),
806 0 : },
807 0 : ));
808 0 : }
809 :
810 0 : for (node, conf) in changes {
811 0 : if self.cancel.is_cancelled() {
812 0 : return Err(ReconcileError::Cancel);
813 0 : }
814 0 : self.location_config(&node, conf, None, false).await?;
815 : }
816 :
817 : // The condition below identifies a detach. We must have no attached intent and
818 : // must have been attached to something previously. Pass this information to
819 : // the [`ComputeHook`] such that it can update its tenant-wide state.
820 0 : if self.intent.attached.is_none() && !self.detach.is_empty() {
821 0 : // TODO: Consider notifying control plane about detaches. This would avoid situations
822 0 : // where the compute tries to start-up with a stale set of pageservers.
823 0 : self.compute_hook
824 0 : .handle_detach(self.tenant_shard_id, self.shard.stripe_size);
825 0 : }
826 :
827 0 : failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
828 :
829 0 : Ok(())
830 0 : }
831 :
832 0 : pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
833 : // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
834 : // destination.
835 0 : if let Some(node) = &self.intent.attached {
836 0 : let result = self
837 0 : .compute_hook
838 0 : .notify(
839 0 : compute_hook::ShardUpdate {
840 0 : tenant_shard_id: self.tenant_shard_id,
841 0 : node_id: node.get_id(),
842 0 : stripe_size: self.shard.stripe_size,
843 0 : preferred_az: self.preferred_az.as_ref().map(Cow::Borrowed),
844 0 : },
845 0 : &self.cancel,
846 0 : )
847 0 : .await;
848 0 : if let Err(e) = &result {
849 : // It is up to the caller whether they want to drop out on this error, but they don't have to:
850 : // in general we should avoid letting unavailability of the cloud control plane stop us from
851 : // making progress.
852 0 : if !matches!(e, NotifyError::ShuttingDown) {
853 0 : tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
854 0 : }
855 :
856 : // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
857 : // needs to retry at some point.
858 0 : self.compute_notify_failure = true;
859 0 : }
860 0 : result
861 : } else {
862 0 : Ok(())
863 : }
864 0 : }
865 :
866 : /// Compare the observed state snapshot from when the reconcile was created
867 : /// with the final observed state in order to generate observed state deltas.
868 0 : pub(crate) fn observed_deltas(&self) -> Vec<ObservedStateDelta> {
869 0 : let mut deltas = Vec::default();
870 :
871 0 : for (node_id, location) in &self.observed.locations {
872 0 : let previous_location = self.original_observed.locations.get(node_id);
873 0 : let do_upsert = match previous_location {
874 : // Location config changed for node
875 0 : Some(prev) if location.conf != prev.conf => true,
876 : // New location config for node
877 0 : None => true,
878 : // Location config has not changed for node
879 0 : _ => false,
880 : };
881 :
882 0 : if do_upsert {
883 0 : deltas.push(ObservedStateDelta::Upsert(Box::new((
884 0 : *node_id,
885 0 : location.clone(),
886 0 : ))));
887 0 : }
888 : }
889 :
890 0 : for node_id in self.original_observed.locations.keys() {
891 0 : if !self.observed.locations.contains_key(node_id) {
892 0 : deltas.push(ObservedStateDelta::Delete(*node_id));
893 0 : }
894 : }
895 :
896 0 : deltas
897 0 : }
898 :
899 : /// Keep trying to notify the compute indefinitely, only dropping out if:
900 : /// - the node `origin` becomes unavailable -> Ok(())
901 : /// - the node `origin` no longer has our tenant shard attached -> Ok(())
902 : /// - our cancellation token fires -> Err(ReconcileError::Cancelled)
903 : ///
904 : /// This is used during live migration, where we do not wish to detach
905 : /// an origin location until the compute definitely knows about the new
906 : /// location.
907 : ///
908 : /// In cases where the origin node becomes unavailable, we return success, indicating
909 : /// to the caller that they should continue irrespective of whether the compute was notified,
910 : /// because the origin node is unusable anyway. Notification will be retried later via the
911 : /// [`Self::compute_notify_failure`] flag.
912 0 : async fn compute_notify_blocking(&mut self, origin: &Node) -> Result<(), ReconcileError> {
913 0 : let mut notify_attempts = 0;
914 0 : while let Err(e) = self.compute_notify().await {
915 0 : match e {
916 0 : NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
917 0 : NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
918 : _ => {
919 0 : tracing::warn!(
920 0 : "Live migration blocked by compute notification error, retrying: {e}"
921 : );
922 : }
923 : }
924 :
925 : // Did the origin pageserver become unavailable?
926 0 : if !origin.is_available() {
927 0 : tracing::info!("Giving up on compute notification because {origin} is unavailable");
928 0 : break;
929 0 : }
930 0 :
931 0 : // Does the origin pageserver still host the shard we are interested in? We should only
932 0 : // continue waiting for compute notification to be acked if the old location is still usable.
933 0 : let tenant_shard_id = self.tenant_shard_id;
934 0 : match origin
935 0 : .with_client_retries(
936 0 : |client| async move { client.get_location_config(tenant_shard_id).await },
937 0 : &self.service_config.jwt_token,
938 0 : 1,
939 0 : 3,
940 0 : Duration::from_secs(5),
941 0 : &self.cancel,
942 0 : )
943 0 : .await
944 : {
945 0 : Some(Ok(Some(location_conf))) => {
946 0 : if matches!(
947 0 : location_conf.mode,
948 : LocationConfigMode::AttachedMulti
949 : | LocationConfigMode::AttachedSingle
950 : | LocationConfigMode::AttachedStale
951 : ) {
952 0 : tracing::debug!(
953 0 : "Still attached to {origin}, will wait & retry compute notification"
954 : );
955 : } else {
956 0 : tracing::info!(
957 0 : "Giving up on compute notification because {origin} is in state {:?}",
958 : location_conf.mode
959 : );
960 0 : return Ok(());
961 : }
962 : // Fall through
963 : }
964 : Some(Ok(None)) => {
965 0 : tracing::info!(
966 0 : "No longer attached to {origin}, giving up on compute notification"
967 : );
968 0 : return Ok(());
969 : }
970 0 : Some(Err(e)) => {
971 0 : match e {
972 : mgmt_api::Error::Cancelled => {
973 0 : tracing::info!(
974 0 : "Giving up on compute notification because {origin} is unavailable"
975 : );
976 0 : return Ok(());
977 : }
978 : mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _) => {
979 0 : tracing::info!(
980 0 : "No longer attached to {origin}, giving up on compute notification"
981 : );
982 0 : return Ok(());
983 : }
984 0 : e => {
985 0 : // Other API errors are unexpected here.
986 0 : tracing::warn!("Unexpected error checking location on {origin}: {e}");
987 :
988 : // Fall through, we will retry compute notification.
989 : }
990 : }
991 : }
992 0 : None => return Err(ReconcileError::Cancel),
993 : };
994 :
995 0 : exponential_backoff(
996 0 : notify_attempts,
997 0 : // Generous waits: control plane operations which might be blocking us usually complete on the order
998 0 : // of hundreds to thousands of milliseconds, so no point busy polling.
999 0 : 1.0,
1000 0 : 10.0,
1001 0 : &self.cancel,
1002 0 : )
1003 0 : .await;
1004 0 : notify_attempts += 1;
1005 : }
1006 :
1007 0 : Ok(())
1008 0 : }
1009 : }
1010 :
1011 : /// We tweak the externally-set TenantConfig while configuring
1012 : /// locations, using our awareness of whether secondary locations
1013 : /// are in use to automatically enable/disable heatmap uploads.
1014 0 : fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
1015 0 : let mut config = config.clone();
1016 0 : if has_secondaries {
1017 0 : if config.heatmap_period.is_none() {
1018 0 : config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
1019 0 : }
1020 0 : } else {
1021 0 : config.heatmap_period = None;
1022 0 : }
1023 0 : config
1024 0 : }
1025 :
1026 0 : pub(crate) fn attached_location_conf(
1027 0 : generation: Generation,
1028 0 : shard: &ShardIdentity,
1029 0 : config: &TenantConfig,
1030 0 : policy: &PlacementPolicy,
1031 0 : ) -> LocationConfig {
1032 0 : let has_secondaries = match policy {
1033 : PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => {
1034 0 : false
1035 : }
1036 0 : PlacementPolicy::Attached(_) => true,
1037 : };
1038 :
1039 0 : LocationConfig {
1040 0 : mode: LocationConfigMode::AttachedSingle,
1041 0 : generation: generation.into(),
1042 0 : secondary_conf: None,
1043 0 : shard_number: shard.number.0,
1044 0 : shard_count: shard.count.literal(),
1045 0 : shard_stripe_size: shard.stripe_size.0,
1046 0 : tenant_conf: ha_aware_config(config, has_secondaries),
1047 0 : }
1048 0 : }
1049 :
1050 0 : pub(crate) fn secondary_location_conf(
1051 0 : shard: &ShardIdentity,
1052 0 : config: &TenantConfig,
1053 0 : ) -> LocationConfig {
1054 0 : LocationConfig {
1055 0 : mode: LocationConfigMode::Secondary,
1056 0 : generation: None,
1057 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
1058 0 : shard_number: shard.number.0,
1059 0 : shard_count: shard.count.literal(),
1060 0 : shard_stripe_size: shard.stripe_size.0,
1061 0 : tenant_conf: ha_aware_config(config, true),
1062 0 : }
1063 0 : }
|