Line data Source code
1 : use crate::pageserver_client::PageserverClient;
2 : use crate::persistence::Persistence;
3 : use crate::service;
4 : use pageserver_api::controller_api::PlacementPolicy;
5 : use pageserver_api::models::{
6 : LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
7 : };
8 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
9 : use pageserver_client::mgmt_api;
10 : use reqwest::StatusCode;
11 : use std::collections::HashMap;
12 : use std::sync::Arc;
13 : use std::time::{Duration, Instant};
14 : use tokio_util::sync::CancellationToken;
15 : use utils::backoff::exponential_backoff;
16 : use utils::failpoint_support;
17 : use utils::generation::Generation;
18 : use utils::id::{NodeId, TimelineId};
19 : use utils::lsn::Lsn;
20 : use utils::sync::gate::GateGuard;
21 :
22 : use crate::compute_hook::{ComputeHook, NotifyError};
23 : use crate::node::Node;
24 : use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
25 :
26 : const DEFAULT_HEATMAP_PERIOD: &str = "60s";
27 :
28 : /// Object with the lifetime of the background reconcile task that is created
29 : /// for tenants which have a difference between their intent and observed states.
30 : pub(super) struct Reconciler {
31 : /// See [`crate::tenant_shard::TenantShard`] for the meanings of these fields: they are a snapshot
32 : /// of a tenant's state from when we spawned a reconcile task.
33 : pub(super) tenant_shard_id: TenantShardId,
34 : pub(crate) shard: ShardIdentity,
35 : pub(crate) placement_policy: PlacementPolicy,
36 : pub(crate) generation: Option<Generation>,
37 : pub(crate) intent: TargetState,
38 :
39 : /// Nodes not referenced by [`Self::intent`], from which we should try
40 : /// to detach this tenant shard.
41 : pub(crate) detach: Vec<Node>,
42 :
43 : /// Configuration specific to this reconciler
44 : pub(crate) reconciler_config: ReconcilerConfig,
45 :
46 : pub(crate) config: TenantConfig,
47 : pub(crate) observed: ObservedState,
48 :
49 : pub(crate) service_config: service::Config,
50 :
51 : /// A hook to notify the running postgres instances when we change the location
52 : /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
53 : /// and guarantee eventual retries.
54 : pub(crate) compute_hook: Arc<ComputeHook>,
55 :
56 : /// To avoid stalling if the cloud control plane is unavailable, we may proceed
57 : /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
58 : /// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
59 : pub(crate) compute_notify_failure: bool,
60 :
61 : /// Reconciler is responsible for keeping alive semaphore units that limit concurrency on how many
62 : /// we will spawn.
63 : pub(crate) _resource_units: ReconcileUnits,
64 :
65 : /// A means to abort background reconciliation: it is essential to
66 : /// call this when something changes in the original TenantShard that
67 : /// will make this reconciliation impossible or unnecessary, for
68 : /// example when a pageserver node goes offline, or the PlacementPolicy for
69 : /// the tenant is changed.
70 : pub(crate) cancel: CancellationToken,
71 :
72 : /// Reconcilers are registered with a Gate so that during a graceful shutdown we
73 : /// can wait for all the reconcilers to respond to their cancellation tokens.
74 : pub(crate) _gate_guard: GateGuard,
75 :
76 : /// Access to persistent storage for updating generation numbers
77 : pub(crate) persistence: Arc<Persistence>,
78 : }
79 :
80 : pub(crate) struct ReconcilerConfigBuilder {
81 : config: ReconcilerConfig,
82 : }
83 :
84 : impl ReconcilerConfigBuilder {
85 0 : pub(crate) fn new() -> Self {
86 0 : Self {
87 0 : config: ReconcilerConfig::default(),
88 0 : }
89 0 : }
90 :
91 0 : pub(crate) fn secondary_warmup_timeout(self, value: Duration) -> Self {
92 0 : Self {
93 0 : config: ReconcilerConfig {
94 0 : secondary_warmup_timeout: Some(value),
95 0 : ..self.config
96 0 : },
97 0 : }
98 0 : }
99 :
100 0 : pub(crate) fn secondary_download_request_timeout(self, value: Duration) -> Self {
101 0 : Self {
102 0 : config: ReconcilerConfig {
103 0 : secondary_download_request_timeout: Some(value),
104 0 : ..self.config
105 0 : },
106 0 : }
107 0 : }
108 :
109 0 : pub(crate) fn build(self) -> ReconcilerConfig {
110 0 : self.config
111 0 : }
112 : }
113 :
114 : #[derive(Default, Debug, Copy, Clone)]
115 : pub(crate) struct ReconcilerConfig {
116 : // During live migration give up on warming-up the secondary
117 : // after this timeout.
118 : secondary_warmup_timeout: Option<Duration>,
119 :
120 : // During live migrations this is the amount of time that
121 : // the pagserver will hold our poll.
122 : secondary_download_request_timeout: Option<Duration>,
123 : }
124 :
125 : impl ReconcilerConfig {
126 0 : pub(crate) fn get_secondary_warmup_timeout(&self) -> Duration {
127 0 : const SECONDARY_WARMUP_TIMEOUT_DEFAULT: Duration = Duration::from_secs(300);
128 0 : self.secondary_warmup_timeout
129 0 : .unwrap_or(SECONDARY_WARMUP_TIMEOUT_DEFAULT)
130 0 : }
131 :
132 0 : pub(crate) fn get_secondary_download_request_timeout(&self) -> Duration {
133 0 : const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(20);
134 0 : self.secondary_download_request_timeout
135 0 : .unwrap_or(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT)
136 0 : }
137 : }
138 :
139 : /// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
140 : pub(crate) struct ReconcileUnits {
141 : _sem_units: tokio::sync::OwnedSemaphorePermit,
142 : }
143 :
144 : impl ReconcileUnits {
145 0 : pub(crate) fn new(sem_units: tokio::sync::OwnedSemaphorePermit) -> Self {
146 0 : Self {
147 0 : _sem_units: sem_units,
148 0 : }
149 0 : }
150 : }
151 :
152 : /// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
153 : /// reference counting for Scheduler. The IntentState is what the scheduler works with,
154 : /// and the TargetState is just the instruction for a particular Reconciler run.
155 : #[derive(Debug)]
156 : pub(crate) struct TargetState {
157 : pub(crate) attached: Option<Node>,
158 : pub(crate) secondary: Vec<Node>,
159 : }
160 :
161 : impl TargetState {
162 0 : pub(crate) fn from_intent(nodes: &HashMap<NodeId, Node>, intent: &IntentState) -> Self {
163 0 : Self {
164 0 : attached: intent.get_attached().map(|n| {
165 0 : nodes
166 0 : .get(&n)
167 0 : .expect("Intent attached referenced non-existent node")
168 0 : .clone()
169 0 : }),
170 0 : secondary: intent
171 0 : .get_secondary()
172 0 : .iter()
173 0 : .map(|n| {
174 0 : nodes
175 0 : .get(n)
176 0 : .expect("Intent secondary referenced non-existent node")
177 0 : .clone()
178 0 : })
179 0 : .collect(),
180 0 : }
181 0 : }
182 : }
183 :
184 0 : #[derive(thiserror::Error, Debug)]
185 : pub(crate) enum ReconcileError {
186 : #[error(transparent)]
187 : Remote(#[from] mgmt_api::Error),
188 : #[error(transparent)]
189 : Notify(#[from] NotifyError),
190 : #[error("Cancelled")]
191 : Cancel,
192 : #[error(transparent)]
193 : Other(#[from] anyhow::Error),
194 : }
195 :
196 : impl Reconciler {
197 0 : async fn location_config(
198 0 : &mut self,
199 0 : node: &Node,
200 0 : config: LocationConfig,
201 0 : flush_ms: Option<Duration>,
202 0 : lazy: bool,
203 0 : ) -> Result<(), ReconcileError> {
204 0 : if !node.is_available() && config.mode == LocationConfigMode::Detached {
205 : // Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
206 : // will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
207 : // what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
208 0 : tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
209 0 : self.observed.locations.remove(&node.get_id());
210 0 : return Ok(());
211 0 : }
212 0 :
213 0 : self.observed
214 0 : .locations
215 0 : .insert(node.get_id(), ObservedStateLocation { conf: None });
216 0 :
217 0 : // TODO: amend locations that use long-polling: they will hit this timeout.
218 0 : let timeout = Duration::from_secs(25);
219 0 :
220 0 : tracing::info!("location_config({node}) calling: {:?}", config);
221 0 : let tenant_shard_id = self.tenant_shard_id;
222 0 : let config_ref = &config;
223 0 : match node
224 0 : .with_client_retries(
225 0 : |client| async move {
226 0 : let config = config_ref.clone();
227 0 : client
228 0 : .location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
229 0 : .await
230 0 : },
231 0 : &self.service_config.jwt_token,
232 0 : 1,
233 0 : 3,
234 0 : timeout,
235 0 : &self.cancel,
236 0 : )
237 0 : .await
238 : {
239 0 : Some(Ok(_)) => {}
240 0 : Some(Err(e)) => return Err(e.into()),
241 0 : None => return Err(ReconcileError::Cancel),
242 : };
243 0 : tracing::info!("location_config({node}) complete: {:?}", config);
244 :
245 0 : match config.mode {
246 0 : LocationConfigMode::Detached => {
247 0 : self.observed.locations.remove(&node.get_id());
248 0 : }
249 0 : _ => {
250 0 : self.observed
251 0 : .locations
252 0 : .insert(node.get_id(), ObservedStateLocation { conf: Some(config) });
253 0 : }
254 : }
255 :
256 0 : Ok(())
257 0 : }
258 :
259 0 : fn get_node(&self, node_id: &NodeId) -> Option<&Node> {
260 0 : if let Some(node) = self.intent.attached.as_ref() {
261 0 : if node.get_id() == *node_id {
262 0 : return Some(node);
263 0 : }
264 0 : }
265 :
266 0 : if let Some(node) = self
267 0 : .intent
268 0 : .secondary
269 0 : .iter()
270 0 : .find(|n| n.get_id() == *node_id)
271 : {
272 0 : return Some(node);
273 0 : }
274 :
275 0 : if let Some(node) = self.detach.iter().find(|n| n.get_id() == *node_id) {
276 0 : return Some(node);
277 0 : }
278 0 :
279 0 : None
280 0 : }
281 :
282 0 : async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
283 0 : let destination = if let Some(node) = &self.intent.attached {
284 0 : match self.observed.locations.get(&node.get_id()) {
285 0 : Some(conf) => {
286 : // We will do a live migration only if the intended destination is not
287 : // currently in an attached state.
288 0 : match &conf.conf {
289 0 : Some(conf) if conf.mode == LocationConfigMode::Secondary => {
290 0 : // Fall through to do a live migration
291 0 : node
292 : }
293 : None | Some(_) => {
294 : // Attached or uncertain: don't do a live migration, proceed
295 : // with a general-case reconciliation
296 0 : tracing::info!("maybe_live_migrate: destination is None or attached");
297 0 : return Ok(());
298 : }
299 : }
300 : }
301 : None => {
302 : // Our destination is not attached: maybe live migrate if some other
303 : // node is currently attached. Fall through.
304 0 : node
305 : }
306 : }
307 : } else {
308 : // No intent to be attached
309 0 : tracing::info!("maybe_live_migrate: no attached intent");
310 0 : return Ok(());
311 : };
312 :
313 0 : let mut origin = None;
314 0 : for (node_id, state) in &self.observed.locations {
315 0 : if let Some(observed_conf) = &state.conf {
316 0 : if observed_conf.mode == LocationConfigMode::AttachedSingle {
317 : // We will only attempt live migration if the origin is not offline: this
318 : // avoids trying to do it while reconciling after responding to an HA failover.
319 0 : if let Some(node) = self.get_node(node_id) {
320 0 : if node.is_available() {
321 0 : origin = Some(node.clone());
322 0 : break;
323 0 : }
324 0 : }
325 0 : }
326 0 : }
327 : }
328 :
329 0 : let Some(origin) = origin else {
330 0 : tracing::info!("maybe_live_migrate: no origin found");
331 0 : return Ok(());
332 : };
333 :
334 : // We have an origin and a destination: proceed to do the live migration
335 0 : tracing::info!("Live migrating {}->{}", origin, destination);
336 0 : self.live_migrate(origin, destination.clone()).await?;
337 :
338 0 : Ok(())
339 0 : }
340 :
341 0 : async fn get_lsns(
342 0 : &self,
343 0 : tenant_shard_id: TenantShardId,
344 0 : node: &Node,
345 0 : ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
346 0 : let client = PageserverClient::new(
347 0 : node.get_id(),
348 0 : node.base_url(),
349 0 : self.service_config.jwt_token.as_deref(),
350 0 : );
351 :
352 0 : let timelines = client.timeline_list(&tenant_shard_id).await?;
353 0 : Ok(timelines
354 0 : .into_iter()
355 0 : .map(|t| (t.timeline_id, t.last_record_lsn))
356 0 : .collect())
357 0 : }
358 :
359 0 : async fn secondary_download(
360 0 : &self,
361 0 : tenant_shard_id: TenantShardId,
362 0 : node: &Node,
363 0 : ) -> Result<(), ReconcileError> {
364 0 : // This is not the timeout for a request, but the total amount of time we're willing to wait
365 0 : // for a secondary location to get up to date before
366 0 : let total_download_timeout = self.reconciler_config.get_secondary_warmup_timeout();
367 0 :
368 0 : // This the long-polling interval for the secondary download requests we send to destination pageserver
369 0 : // during a migration.
370 0 : let request_download_timeout = self
371 0 : .reconciler_config
372 0 : .get_secondary_download_request_timeout();
373 0 :
374 0 : let started_at = Instant::now();
375 :
376 : loop {
377 0 : let (status, progress) = match node
378 0 : .with_client_retries(
379 0 : |client| async move {
380 0 : client
381 0 : .tenant_secondary_download(
382 0 : tenant_shard_id,
383 0 : Some(request_download_timeout),
384 0 : )
385 0 : .await
386 0 : },
387 0 : &self.service_config.jwt_token,
388 0 : 1,
389 0 : 3,
390 0 : request_download_timeout * 2,
391 0 : &self.cancel,
392 0 : )
393 0 : .await
394 : {
395 0 : None => Err(ReconcileError::Cancel),
396 0 : Some(Ok(v)) => Ok(v),
397 0 : Some(Err(e)) => {
398 0 : // Give up, but proceed: it's unfortunate if we couldn't freshen the destination before
399 0 : // attaching, but we should not let an issue with a secondary location stop us proceeding
400 0 : // with a live migration.
401 0 : tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})");
402 0 : return Ok(());
403 : }
404 0 : }?;
405 :
406 0 : if status == StatusCode::OK {
407 0 : tracing::info!(
408 0 : "Downloads to {} complete: {}/{} layers, {}/{} bytes",
409 : node,
410 : progress.layers_downloaded,
411 : progress.layers_total,
412 : progress.bytes_downloaded,
413 : progress.bytes_total
414 : );
415 0 : return Ok(());
416 0 : } else if status == StatusCode::ACCEPTED {
417 0 : let total_runtime = started_at.elapsed();
418 0 : if total_runtime > total_download_timeout {
419 0 : tracing::warn!("Timed out after {}ms downloading layers to {node}. Progress so far: {}/{} layers, {}/{} bytes",
420 0 : total_runtime.as_millis(),
421 : progress.layers_downloaded,
422 : progress.layers_total,
423 : progress.bytes_downloaded,
424 : progress.bytes_total
425 : );
426 : // Give up, but proceed: an incompletely warmed destination doesn't prevent migration working,
427 : // it just makes the I/O performance for users less good.
428 0 : return Ok(());
429 0 : }
430 0 :
431 0 : // Log and proceed around the loop to retry. We don't sleep between requests, because our HTTP call
432 0 : // to the pageserver is a long-poll.
433 0 : tracing::info!(
434 0 : "Downloads to {} not yet complete: {}/{} layers, {}/{} bytes",
435 : node,
436 : progress.layers_downloaded,
437 : progress.layers_total,
438 : progress.bytes_downloaded,
439 : progress.bytes_total
440 : );
441 0 : }
442 : }
443 0 : }
444 :
445 0 : async fn await_lsn(
446 0 : &self,
447 0 : tenant_shard_id: TenantShardId,
448 0 : node: &Node,
449 0 : baseline: HashMap<TimelineId, Lsn>,
450 0 : ) -> anyhow::Result<()> {
451 : loop {
452 0 : let latest = match self.get_lsns(tenant_shard_id, node).await {
453 0 : Ok(l) => l,
454 0 : Err(e) => {
455 0 : tracing::info!("🕑 Can't get LSNs on node {node} yet, waiting ({e})",);
456 0 : std::thread::sleep(Duration::from_millis(500));
457 0 : continue;
458 : }
459 : };
460 :
461 0 : let mut any_behind: bool = false;
462 0 : for (timeline_id, baseline_lsn) in &baseline {
463 0 : match latest.get(timeline_id) {
464 0 : Some(latest_lsn) => {
465 0 : tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
466 0 : if latest_lsn < baseline_lsn {
467 0 : any_behind = true;
468 0 : }
469 : }
470 0 : None => {
471 0 : // Expected timeline isn't yet visible on migration destination.
472 0 : // (IRL we would have to account for timeline deletion, but this
473 0 : // is just test helper)
474 0 : any_behind = true;
475 0 : }
476 : }
477 : }
478 :
479 0 : if !any_behind {
480 0 : tracing::info!("✅ LSN caught up. Proceeding...");
481 0 : break;
482 0 : } else {
483 0 : std::thread::sleep(Duration::from_millis(500));
484 0 : }
485 : }
486 :
487 0 : Ok(())
488 0 : }
489 :
490 0 : pub async fn live_migrate(
491 0 : &mut self,
492 0 : origin_ps: Node,
493 0 : dest_ps: Node,
494 0 : ) -> Result<(), ReconcileError> {
495 0 : // `maybe_live_migrate` is responsibble for sanity of inputs
496 0 : assert!(origin_ps.get_id() != dest_ps.get_id());
497 :
498 0 : fn build_location_config(
499 0 : shard: &ShardIdentity,
500 0 : config: &TenantConfig,
501 0 : mode: LocationConfigMode,
502 0 : generation: Option<Generation>,
503 0 : secondary_conf: Option<LocationConfigSecondary>,
504 0 : ) -> LocationConfig {
505 0 : LocationConfig {
506 0 : mode,
507 0 : generation: generation.map(|g| g.into().unwrap()),
508 0 : secondary_conf,
509 0 : tenant_conf: config.clone(),
510 0 : shard_number: shard.number.0,
511 0 : shard_count: shard.count.literal(),
512 0 : shard_stripe_size: shard.stripe_size.0,
513 0 : }
514 0 : }
515 :
516 0 : tracing::info!("🔁 Switching origin node {origin_ps} to stale mode",);
517 :
518 : // FIXME: it is incorrect to use self.generation here, we should use the generation
519 : // from the ObservedState of the origin pageserver (it might be older than self.generation)
520 0 : let stale_conf = build_location_config(
521 0 : &self.shard,
522 0 : &self.config,
523 0 : LocationConfigMode::AttachedStale,
524 0 : self.generation,
525 0 : None,
526 0 : );
527 0 : self.location_config(&origin_ps, stale_conf, Some(Duration::from_secs(10)), false)
528 0 : .await?;
529 :
530 0 : let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps).await?);
531 :
532 : // If we are migrating to a destination that has a secondary location, warm it up first
533 0 : if let Some(destination_conf) = self.observed.locations.get(&dest_ps.get_id()) {
534 0 : if let Some(destination_conf) = &destination_conf.conf {
535 0 : if destination_conf.mode == LocationConfigMode::Secondary {
536 0 : tracing::info!("🔁 Downloading latest layers to destination node {dest_ps}",);
537 0 : self.secondary_download(self.tenant_shard_id, &dest_ps)
538 0 : .await?;
539 0 : }
540 0 : }
541 0 : }
542 :
543 : // Increment generation before attaching to new pageserver
544 : self.generation = Some(
545 0 : self.persistence
546 0 : .increment_generation(self.tenant_shard_id, dest_ps.get_id())
547 0 : .await?,
548 : );
549 :
550 0 : let dest_conf = build_location_config(
551 0 : &self.shard,
552 0 : &self.config,
553 0 : LocationConfigMode::AttachedMulti,
554 0 : self.generation,
555 0 : None,
556 0 : );
557 0 :
558 0 : tracing::info!("🔁 Attaching to pageserver {dest_ps}");
559 0 : self.location_config(&dest_ps, dest_conf, None, false)
560 0 : .await?;
561 :
562 0 : if let Some(baseline) = baseline_lsns {
563 0 : tracing::info!("🕑 Waiting for LSN to catch up...");
564 0 : self.await_lsn(self.tenant_shard_id, &dest_ps, baseline)
565 0 : .await?;
566 0 : }
567 :
568 0 : tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");
569 :
570 : // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
571 : // the origin without notifying compute, we will render the tenant unavailable.
572 0 : let mut notify_attempts = 0;
573 0 : while let Err(e) = self.compute_notify().await {
574 0 : match e {
575 0 : NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
576 0 : NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
577 : _ => {
578 0 : tracing::warn!(
579 0 : "Live migration blocked by compute notification error, retrying: {e}"
580 : );
581 : }
582 : }
583 :
584 0 : exponential_backoff(
585 0 : notify_attempts,
586 0 : // Generous waits: control plane operations which might be blocking us usually complete on the order
587 0 : // of hundreds to thousands of milliseconds, so no point busy polling.
588 0 : 1.0,
589 0 : 10.0,
590 0 : &self.cancel,
591 0 : )
592 0 : .await;
593 0 : notify_attempts += 1;
594 : }
595 :
596 : // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
597 : // this location will be deleted in the general case reconciliation that runs after this.
598 0 : let origin_secondary_conf = build_location_config(
599 0 : &self.shard,
600 0 : &self.config,
601 0 : LocationConfigMode::Secondary,
602 0 : None,
603 0 : Some(LocationConfigSecondary { warm: true }),
604 0 : );
605 0 : self.location_config(&origin_ps, origin_secondary_conf.clone(), None, false)
606 0 : .await?;
607 : // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
608 : // partway through. In fact, all location conf API calls should be in a wrapper that sets
609 : // the observed state to None, then runs, then sets it to what we wrote.
610 0 : self.observed.locations.insert(
611 0 : origin_ps.get_id(),
612 0 : ObservedStateLocation {
613 0 : conf: Some(origin_secondary_conf),
614 0 : },
615 0 : );
616 0 :
617 0 : tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
618 0 : let dest_final_conf = build_location_config(
619 0 : &self.shard,
620 0 : &self.config,
621 0 : LocationConfigMode::AttachedSingle,
622 0 : self.generation,
623 0 : None,
624 0 : );
625 0 : self.location_config(&dest_ps, dest_final_conf.clone(), None, false)
626 0 : .await?;
627 0 : self.observed.locations.insert(
628 0 : dest_ps.get_id(),
629 0 : ObservedStateLocation {
630 0 : conf: Some(dest_final_conf),
631 0 : },
632 0 : );
633 0 :
634 0 : tracing::info!("✅ Migration complete");
635 :
636 0 : Ok(())
637 0 : }
638 :
639 0 : async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
640 : // If the attached node has uncertain state, read it from the pageserver before proceeding: this
641 : // is important to avoid spurious generation increments.
642 : //
643 : // We don't need to do this for secondary/detach locations because it's harmless to just PUT their
644 : // location conf, whereas for attached locations it can interrupt clients if we spuriously destroy/recreate
645 : // the `Timeline` object in the pageserver.
646 :
647 0 : let Some(attached_node) = self.intent.attached.as_ref() else {
648 : // Nothing to do
649 0 : return Ok(());
650 : };
651 :
652 0 : if matches!(
653 0 : self.observed.locations.get(&attached_node.get_id()),
654 : Some(ObservedStateLocation { conf: None })
655 : ) {
656 0 : let tenant_shard_id = self.tenant_shard_id;
657 0 : let observed_conf = match attached_node
658 0 : .with_client_retries(
659 0 : |client| async move { client.get_location_config(tenant_shard_id).await },
660 0 : &self.service_config.jwt_token,
661 0 : 1,
662 0 : 1,
663 0 : Duration::from_secs(5),
664 0 : &self.cancel,
665 0 : )
666 0 : .await
667 : {
668 0 : Some(Ok(observed)) => Some(observed),
669 0 : Some(Err(mgmt_api::Error::ApiError(status, _msg)))
670 0 : if status == StatusCode::NOT_FOUND =>
671 0 : {
672 0 : None
673 : }
674 0 : Some(Err(e)) => return Err(e.into()),
675 0 : None => return Err(ReconcileError::Cancel),
676 : };
677 0 : tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
678 0 : match observed_conf {
679 0 : Some(conf) => {
680 0 : // Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
681 0 : // if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
682 0 : self.observed
683 0 : .locations
684 0 : .insert(attached_node.get_id(), ObservedStateLocation { conf });
685 0 : }
686 0 : None => {
687 0 : // Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
688 0 : self.observed.locations.remove(&attached_node.get_id());
689 0 : }
690 : }
691 0 : }
692 :
693 0 : Ok(())
694 0 : }
695 :
696 : /// Reconciling a tenant makes API calls to pageservers until the observed state
697 : /// matches the intended state.
698 : ///
699 : /// First we apply special case handling (e.g. for live migrations), and then a
700 : /// general case reconciliation where we walk through the intent by pageserver
701 : /// and call out to the pageserver to apply the desired state.
702 0 : pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
703 0 : // Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
704 0 : self.maybe_refresh_observed().await?;
705 :
706 : // Special case: live migration
707 0 : self.maybe_live_migrate().await?;
708 :
709 : // If the attached pageserver is not attached, do so now.
710 0 : if let Some(node) = self.intent.attached.as_ref() {
711 : // If we are in an attached policy, then generation must have been set (null generations
712 : // are only present when a tenant is initially loaded with a secondary policy)
713 0 : debug_assert!(self.generation.is_some());
714 0 : let Some(generation) = self.generation else {
715 0 : return Err(ReconcileError::Other(anyhow::anyhow!(
716 0 : "Attempted to attach with NULL generation"
717 0 : )));
718 : };
719 :
720 0 : let mut wanted_conf = attached_location_conf(
721 0 : generation,
722 0 : &self.shard,
723 0 : &self.config,
724 0 : &self.placement_policy,
725 0 : );
726 0 : match self.observed.locations.get(&node.get_id()) {
727 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
728 0 : // Nothing to do
729 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
730 : }
731 0 : observed => {
732 : // In all cases other than a matching observed configuration, we will
733 : // reconcile this location. This includes locations with different configurations, as well
734 : // as locations with unknown (None) observed state.
735 :
736 : // Incrementing generation is the safe general case, but is inefficient for changes that only
737 : // modify some details (e.g. the tenant's config).
738 0 : let increment_generation = match observed {
739 0 : None => true,
740 0 : Some(ObservedStateLocation { conf: None }) => true,
741 : Some(ObservedStateLocation {
742 0 : conf: Some(observed),
743 0 : }) => {
744 0 : let generations_match = observed.generation == wanted_conf.generation;
745 0 :
746 0 : // We may skip incrementing the generation if the location is already in the expected mode and
747 0 : // generation. In principle it would also be safe to skip from certain other modes (e.g. AttachedStale),
748 0 : // but such states are handled inside `live_migrate`, and if we see that state here we're cleaning up
749 0 : // after a restart/crash, so fall back to the universally safe path of incrementing generation.
750 0 : !generations_match || (observed.mode != wanted_conf.mode)
751 : }
752 : };
753 :
754 0 : if increment_generation {
755 0 : let generation = self
756 0 : .persistence
757 0 : .increment_generation(self.tenant_shard_id, node.get_id())
758 0 : .await?;
759 0 : self.generation = Some(generation);
760 0 : wanted_conf.generation = generation.into();
761 0 : }
762 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
763 :
764 : // Because `node` comes from a ref to &self, clone it before calling into a &mut self
765 : // function: this could be avoided by refactoring the state mutated by location_config into
766 : // a separate type to Self.
767 0 : let node = node.clone();
768 0 :
769 0 : // Use lazy=true, because we may run many of Self concurrently, and do not want to
770 0 : // overload the pageserver with logical size calculations.
771 0 : self.location_config(&node, wanted_conf, None, true).await?;
772 0 : self.compute_notify().await?;
773 : }
774 : }
775 0 : }
776 :
777 : // Configure secondary locations: if these were previously attached this
778 : // implicitly downgrades them from attached to secondary.
779 0 : let mut changes = Vec::new();
780 0 : for node in &self.intent.secondary {
781 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
782 0 : match self.observed.locations.get(&node.get_id()) {
783 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
784 0 : // Nothing to do
785 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
786 : }
787 : _ => {
788 : // In all cases other than a matching observed configuration, we will
789 : // reconcile this location.
790 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
791 0 : changes.push((node.clone(), wanted_conf))
792 : }
793 : }
794 : }
795 :
796 : // Detach any extraneous pageservers that are no longer referenced
797 : // by our intent.
798 0 : for node in &self.detach {
799 0 : changes.push((
800 0 : node.clone(),
801 0 : LocationConfig {
802 0 : mode: LocationConfigMode::Detached,
803 0 : generation: None,
804 0 : secondary_conf: None,
805 0 : shard_number: self.shard.number.0,
806 0 : shard_count: self.shard.count.literal(),
807 0 : shard_stripe_size: self.shard.stripe_size.0,
808 0 : tenant_conf: self.config.clone(),
809 0 : },
810 0 : ));
811 0 : }
812 :
813 0 : for (node, conf) in changes {
814 0 : if self.cancel.is_cancelled() {
815 0 : return Err(ReconcileError::Cancel);
816 0 : }
817 0 : self.location_config(&node, conf, None, false).await?;
818 : }
819 :
820 0 : failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
821 :
822 0 : Ok(())
823 0 : }
824 :
825 0 : pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
826 : // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
827 : // destination.
828 0 : if let Some(node) = &self.intent.attached {
829 0 : let result = self
830 0 : .compute_hook
831 0 : .notify(
832 0 : self.tenant_shard_id,
833 0 : node.get_id(),
834 0 : self.shard.stripe_size,
835 0 : &self.cancel,
836 0 : )
837 0 : .await;
838 0 : if let Err(e) = &result {
839 : // It is up to the caller whether they want to drop out on this error, but they don't have to:
840 : // in general we should avoid letting unavailability of the cloud control plane stop us from
841 : // making progress.
842 0 : if !matches!(e, NotifyError::ShuttingDown) {
843 0 : tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
844 0 : }
845 :
846 : // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
847 : // needs to retry at some point.
848 0 : self.compute_notify_failure = true;
849 0 : }
850 0 : result
851 : } else {
852 0 : Ok(())
853 : }
854 0 : }
855 : }
856 :
857 : /// We tweak the externally-set TenantConfig while configuring
858 : /// locations, using our awareness of whether secondary locations
859 : /// are in use to automatically enable/disable heatmap uploads.
860 0 : fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
861 0 : let mut config = config.clone();
862 0 : if has_secondaries {
863 0 : if config.heatmap_period.is_none() {
864 0 : config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
865 0 : }
866 0 : } else {
867 0 : config.heatmap_period = None;
868 0 : }
869 0 : config
870 0 : }
871 :
872 0 : pub(crate) fn attached_location_conf(
873 0 : generation: Generation,
874 0 : shard: &ShardIdentity,
875 0 : config: &TenantConfig,
876 0 : policy: &PlacementPolicy,
877 0 : ) -> LocationConfig {
878 0 : let has_secondaries = match policy {
879 : PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => {
880 0 : false
881 : }
882 0 : PlacementPolicy::Attached(_) => true,
883 : };
884 :
885 0 : LocationConfig {
886 0 : mode: LocationConfigMode::AttachedSingle,
887 0 : generation: generation.into(),
888 0 : secondary_conf: None,
889 0 : shard_number: shard.number.0,
890 0 : shard_count: shard.count.literal(),
891 0 : shard_stripe_size: shard.stripe_size.0,
892 0 : tenant_conf: ha_aware_config(config, has_secondaries),
893 0 : }
894 0 : }
895 :
896 0 : pub(crate) fn secondary_location_conf(
897 0 : shard: &ShardIdentity,
898 0 : config: &TenantConfig,
899 0 : ) -> LocationConfig {
900 0 : LocationConfig {
901 0 : mode: LocationConfigMode::Secondary,
902 0 : generation: None,
903 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
904 0 : shard_number: shard.number.0,
905 0 : shard_count: shard.count.literal(),
906 0 : shard_stripe_size: shard.stripe_size.0,
907 0 : tenant_conf: ha_aware_config(config, true),
908 0 : }
909 0 : }
|