Line data Source code
1 : use crate::pageserver_client::PageserverClient;
2 : use crate::persistence::Persistence;
3 : use crate::service;
4 : use pageserver_api::models::{
5 : LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
6 : };
7 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
8 : use pageserver_client::mgmt_api;
9 : use reqwest::StatusCode;
10 : use std::collections::HashMap;
11 : use std::sync::Arc;
12 : use std::time::{Duration, Instant};
13 : use tokio_util::sync::CancellationToken;
14 : use utils::generation::Generation;
15 : use utils::id::{NodeId, TimelineId};
16 : use utils::lsn::Lsn;
17 : use utils::sync::gate::GateGuard;
18 :
19 : use crate::compute_hook::{ComputeHook, NotifyError};
20 : use crate::node::Node;
21 : use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
22 :
23 : const DEFAULT_HEATMAP_PERIOD: &str = "60s";
24 :
25 : /// Object with the lifetime of the background reconcile task that is created
26 : /// for tenants which have a difference between their intent and observed states.
27 : pub(super) struct Reconciler {
28 : /// See [`crate::tenant_shard::TenantShard`] for the meanings of these fields: they are a snapshot
29 : /// of a tenant's state from when we spawned a reconcile task.
30 : pub(super) tenant_shard_id: TenantShardId,
31 : pub(crate) shard: ShardIdentity,
32 : pub(crate) generation: Option<Generation>,
33 : pub(crate) intent: TargetState,
34 :
35 : /// Nodes not referenced by [`Self::intent`], from which we should try
36 : /// to detach this tenant shard.
37 : pub(crate) detach: Vec<Node>,
38 :
39 : pub(crate) config: TenantConfig,
40 : pub(crate) observed: ObservedState,
41 :
42 : pub(crate) service_config: service::Config,
43 :
44 : /// A hook to notify the running postgres instances when we change the location
45 : /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
46 : /// and guarantee eventual retries.
47 : pub(crate) compute_hook: Arc<ComputeHook>,
48 :
49 : /// To avoid stalling if the cloud control plane is unavailable, we may proceed
50 : /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
51 : /// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
52 : pub(crate) compute_notify_failure: bool,
53 :
54 : /// Reconciler is responsible for keeping alive semaphore units that limit concurrency on how many
55 : /// we will spawn.
56 : pub(crate) _resource_units: ReconcileUnits,
57 :
58 : /// A means to abort background reconciliation: it is essential to
59 : /// call this when something changes in the original TenantShard that
60 : /// will make this reconciliation impossible or unnecessary, for
61 : /// example when a pageserver node goes offline, or the PlacementPolicy for
62 : /// the tenant is changed.
63 : pub(crate) cancel: CancellationToken,
64 :
65 : /// Reconcilers are registered with a Gate so that during a graceful shutdown we
66 : /// can wait for all the reconcilers to respond to their cancellation tokens.
67 : pub(crate) _gate_guard: GateGuard,
68 :
69 : /// Access to persistent storage for updating generation numbers
70 : pub(crate) persistence: Arc<Persistence>,
71 : }
72 :
73 : /// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
74 : pub(crate) struct ReconcileUnits {
75 : _sem_units: tokio::sync::OwnedSemaphorePermit,
76 : }
77 :
78 : impl ReconcileUnits {
79 0 : pub(crate) fn new(sem_units: tokio::sync::OwnedSemaphorePermit) -> Self {
80 0 : Self {
81 0 : _sem_units: sem_units,
82 0 : }
83 0 : }
84 : }
85 :
86 : /// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
87 : /// reference counting for Scheduler. The IntentState is what the scheduler works with,
88 : /// and the TargetState is just the instruction for a particular Reconciler run.
89 : #[derive(Debug)]
90 : pub(crate) struct TargetState {
91 : pub(crate) attached: Option<Node>,
92 : pub(crate) secondary: Vec<Node>,
93 : }
94 :
95 : impl TargetState {
96 0 : pub(crate) fn from_intent(nodes: &HashMap<NodeId, Node>, intent: &IntentState) -> Self {
97 0 : Self {
98 0 : attached: intent.get_attached().map(|n| {
99 0 : nodes
100 0 : .get(&n)
101 0 : .expect("Intent attached referenced non-existent node")
102 0 : .clone()
103 0 : }),
104 0 : secondary: intent
105 0 : .get_secondary()
106 0 : .iter()
107 0 : .map(|n| {
108 0 : nodes
109 0 : .get(n)
110 0 : .expect("Intent secondary referenced non-existent node")
111 0 : .clone()
112 0 : })
113 0 : .collect(),
114 0 : }
115 0 : }
116 : }
117 :
118 0 : #[derive(thiserror::Error, Debug)]
119 : pub(crate) enum ReconcileError {
120 : #[error(transparent)]
121 : Remote(#[from] mgmt_api::Error),
122 : #[error(transparent)]
123 : Notify(#[from] NotifyError),
124 : #[error("Cancelled")]
125 : Cancel,
126 : #[error(transparent)]
127 : Other(#[from] anyhow::Error),
128 : }
129 :
130 : impl Reconciler {
131 0 : async fn location_config(
132 0 : &mut self,
133 0 : node: &Node,
134 0 : config: LocationConfig,
135 0 : flush_ms: Option<Duration>,
136 0 : lazy: bool,
137 0 : ) -> Result<(), ReconcileError> {
138 0 : if !node.is_available() && config.mode == LocationConfigMode::Detached {
139 : // Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
140 : // will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
141 : // what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
142 0 : tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
143 0 : self.observed.locations.remove(&node.get_id());
144 0 : return Ok(());
145 0 : }
146 0 :
147 0 : self.observed
148 0 : .locations
149 0 : .insert(node.get_id(), ObservedStateLocation { conf: None });
150 0 :
151 0 : // TODO: amend locations that use long-polling: they will hit this timeout.
152 0 : let timeout = Duration::from_secs(25);
153 0 :
154 0 : tracing::info!("location_config({node}) calling: {:?}", config);
155 0 : let tenant_shard_id = self.tenant_shard_id;
156 0 : let config_ref = &config;
157 0 : match node
158 0 : .with_client_retries(
159 0 : |client| async move {
160 0 : let config = config_ref.clone();
161 0 : client
162 0 : .location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
163 0 : .await
164 0 : },
165 0 : &self.service_config.jwt_token,
166 0 : 1,
167 0 : 3,
168 0 : timeout,
169 0 : &self.cancel,
170 0 : )
171 0 : .await
172 : {
173 0 : Some(Ok(_)) => {}
174 0 : Some(Err(e)) => return Err(e.into()),
175 0 : None => return Err(ReconcileError::Cancel),
176 : };
177 0 : tracing::info!("location_config({node}) complete: {:?}", config);
178 :
179 0 : match config.mode {
180 0 : LocationConfigMode::Detached => {
181 0 : self.observed.locations.remove(&node.get_id());
182 0 : }
183 0 : _ => {
184 0 : self.observed
185 0 : .locations
186 0 : .insert(node.get_id(), ObservedStateLocation { conf: Some(config) });
187 0 : }
188 : }
189 :
190 0 : Ok(())
191 0 : }
192 :
193 0 : fn get_node(&self, node_id: &NodeId) -> Option<&Node> {
194 0 : if let Some(node) = self.intent.attached.as_ref() {
195 0 : if node.get_id() == *node_id {
196 0 : return Some(node);
197 0 : }
198 0 : }
199 :
200 0 : if let Some(node) = self
201 0 : .intent
202 0 : .secondary
203 0 : .iter()
204 0 : .find(|n| n.get_id() == *node_id)
205 : {
206 0 : return Some(node);
207 0 : }
208 :
209 0 : if let Some(node) = self.detach.iter().find(|n| n.get_id() == *node_id) {
210 0 : return Some(node);
211 0 : }
212 0 :
213 0 : None
214 0 : }
215 :
216 0 : async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
217 0 : let destination = if let Some(node) = &self.intent.attached {
218 0 : match self.observed.locations.get(&node.get_id()) {
219 0 : Some(conf) => {
220 : // We will do a live migration only if the intended destination is not
221 : // currently in an attached state.
222 0 : match &conf.conf {
223 0 : Some(conf) if conf.mode == LocationConfigMode::Secondary => {
224 0 : // Fall through to do a live migration
225 0 : node
226 : }
227 : None | Some(_) => {
228 : // Attached or uncertain: don't do a live migration, proceed
229 : // with a general-case reconciliation
230 0 : tracing::info!("maybe_live_migrate: destination is None or attached");
231 0 : return Ok(());
232 : }
233 : }
234 : }
235 : None => {
236 : // Our destination is not attached: maybe live migrate if some other
237 : // node is currently attached. Fall through.
238 0 : node
239 : }
240 : }
241 : } else {
242 : // No intent to be attached
243 0 : tracing::info!("maybe_live_migrate: no attached intent");
244 0 : return Ok(());
245 : };
246 :
247 0 : let mut origin = None;
248 0 : for (node_id, state) in &self.observed.locations {
249 0 : if let Some(observed_conf) = &state.conf {
250 0 : if observed_conf.mode == LocationConfigMode::AttachedSingle {
251 : // We will only attempt live migration if the origin is not offline: this
252 : // avoids trying to do it while reconciling after responding to an HA failover.
253 0 : if let Some(node) = self.get_node(node_id) {
254 0 : if node.is_available() {
255 0 : origin = Some(node.clone());
256 0 : break;
257 0 : }
258 0 : }
259 0 : }
260 0 : }
261 : }
262 :
263 0 : let Some(origin) = origin else {
264 0 : tracing::info!("maybe_live_migrate: no origin found");
265 0 : return Ok(());
266 : };
267 :
268 : // We have an origin and a destination: proceed to do the live migration
269 0 : tracing::info!("Live migrating {}->{}", origin, destination);
270 0 : self.live_migrate(origin, destination.clone()).await?;
271 :
272 0 : Ok(())
273 0 : }
274 :
275 0 : async fn get_lsns(
276 0 : &self,
277 0 : tenant_shard_id: TenantShardId,
278 0 : node: &Node,
279 0 : ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
280 0 : let client = PageserverClient::new(
281 0 : node.get_id(),
282 0 : node.base_url(),
283 0 : self.service_config.jwt_token.as_deref(),
284 0 : );
285 :
286 0 : let timelines = client.timeline_list(&tenant_shard_id).await?;
287 0 : Ok(timelines
288 0 : .into_iter()
289 0 : .map(|t| (t.timeline_id, t.last_record_lsn))
290 0 : .collect())
291 0 : }
292 :
293 0 : async fn secondary_download(
294 0 : &self,
295 0 : tenant_shard_id: TenantShardId,
296 0 : node: &Node,
297 0 : ) -> Result<(), ReconcileError> {
298 0 : // This is not the timeout for a request, but the total amount of time we're willing to wait
299 0 : // for a secondary location to get up to date before
300 0 : const TOTAL_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(300);
301 0 :
302 0 : // This the long-polling interval for the secondary download requests we send to destination pageserver
303 0 : // during a migration.
304 0 : const REQUEST_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
305 0 :
306 0 : let started_at = Instant::now();
307 :
308 : loop {
309 0 : let (status, progress) = match node
310 0 : .with_client_retries(
311 0 : |client| async move {
312 0 : client
313 0 : .tenant_secondary_download(
314 0 : tenant_shard_id,
315 0 : Some(REQUEST_DOWNLOAD_TIMEOUT),
316 0 : )
317 0 : .await
318 0 : },
319 0 : &self.service_config.jwt_token,
320 0 : 1,
321 0 : 3,
322 0 : REQUEST_DOWNLOAD_TIMEOUT * 2,
323 0 : &self.cancel,
324 0 : )
325 0 : .await
326 : {
327 0 : None => Err(ReconcileError::Cancel),
328 0 : Some(Ok(v)) => Ok(v),
329 0 : Some(Err(e)) => {
330 0 : // Give up, but proceed: it's unfortunate if we couldn't freshen the destination before
331 0 : // attaching, but we should not let an issue with a secondary location stop us proceeding
332 0 : // with a live migration.
333 0 : tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})");
334 0 : return Ok(());
335 : }
336 0 : }?;
337 :
338 0 : if status == StatusCode::OK {
339 0 : tracing::info!(
340 0 : "Downloads to {} complete: {}/{} layers, {}/{} bytes",
341 : node,
342 : progress.layers_downloaded,
343 : progress.layers_total,
344 : progress.bytes_downloaded,
345 : progress.bytes_total
346 : );
347 0 : return Ok(());
348 0 : } else if status == StatusCode::ACCEPTED {
349 0 : let total_runtime = started_at.elapsed();
350 0 : if total_runtime > TOTAL_DOWNLOAD_TIMEOUT {
351 0 : tracing::warn!("Timed out after {}ms downloading layers to {node}. Progress so far: {}/{} layers, {}/{} bytes",
352 0 : total_runtime.as_millis(),
353 : progress.layers_downloaded,
354 : progress.layers_total,
355 : progress.bytes_downloaded,
356 : progress.bytes_total
357 : );
358 : // Give up, but proceed: an incompletely warmed destination doesn't prevent migration working,
359 : // it just makes the I/O performance for users less good.
360 0 : return Ok(());
361 0 : }
362 0 :
363 0 : // Log and proceed around the loop to retry. We don't sleep between requests, because our HTTP call
364 0 : // to the pageserver is a long-poll.
365 0 : tracing::info!(
366 0 : "Downloads to {} not yet complete: {}/{} layers, {}/{} bytes",
367 : node,
368 : progress.layers_downloaded,
369 : progress.layers_total,
370 : progress.bytes_downloaded,
371 : progress.bytes_total
372 : );
373 0 : }
374 : }
375 0 : }
376 :
377 0 : async fn await_lsn(
378 0 : &self,
379 0 : tenant_shard_id: TenantShardId,
380 0 : node: &Node,
381 0 : baseline: HashMap<TimelineId, Lsn>,
382 0 : ) -> anyhow::Result<()> {
383 : loop {
384 0 : let latest = match self.get_lsns(tenant_shard_id, node).await {
385 0 : Ok(l) => l,
386 0 : Err(e) => {
387 0 : tracing::info!("🕑 Can't get LSNs on node {node} yet, waiting ({e})",);
388 0 : std::thread::sleep(Duration::from_millis(500));
389 0 : continue;
390 : }
391 : };
392 :
393 0 : let mut any_behind: bool = false;
394 0 : for (timeline_id, baseline_lsn) in &baseline {
395 0 : match latest.get(timeline_id) {
396 0 : Some(latest_lsn) => {
397 0 : tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
398 0 : if latest_lsn < baseline_lsn {
399 0 : any_behind = true;
400 0 : }
401 : }
402 0 : None => {
403 0 : // Expected timeline isn't yet visible on migration destination.
404 0 : // (IRL we would have to account for timeline deletion, but this
405 0 : // is just test helper)
406 0 : any_behind = true;
407 0 : }
408 : }
409 : }
410 :
411 0 : if !any_behind {
412 0 : tracing::info!("✅ LSN caught up. Proceeding...");
413 0 : break;
414 0 : } else {
415 0 : std::thread::sleep(Duration::from_millis(500));
416 0 : }
417 : }
418 :
419 0 : Ok(())
420 0 : }
421 :
422 0 : pub async fn live_migrate(
423 0 : &mut self,
424 0 : origin_ps: Node,
425 0 : dest_ps: Node,
426 0 : ) -> Result<(), ReconcileError> {
427 0 : // `maybe_live_migrate` is responsibble for sanity of inputs
428 0 : assert!(origin_ps.get_id() != dest_ps.get_id());
429 :
430 0 : fn build_location_config(
431 0 : shard: &ShardIdentity,
432 0 : config: &TenantConfig,
433 0 : mode: LocationConfigMode,
434 0 : generation: Option<Generation>,
435 0 : secondary_conf: Option<LocationConfigSecondary>,
436 0 : ) -> LocationConfig {
437 0 : LocationConfig {
438 0 : mode,
439 0 : generation: generation.map(|g| g.into().unwrap()),
440 0 : secondary_conf,
441 0 : tenant_conf: config.clone(),
442 0 : shard_number: shard.number.0,
443 0 : shard_count: shard.count.literal(),
444 0 : shard_stripe_size: shard.stripe_size.0,
445 0 : }
446 0 : }
447 :
448 0 : tracing::info!("🔁 Switching origin node {origin_ps} to stale mode",);
449 :
450 : // FIXME: it is incorrect to use self.generation here, we should use the generation
451 : // from the ObservedState of the origin pageserver (it might be older than self.generation)
452 0 : let stale_conf = build_location_config(
453 0 : &self.shard,
454 0 : &self.config,
455 0 : LocationConfigMode::AttachedStale,
456 0 : self.generation,
457 0 : None,
458 0 : );
459 0 : self.location_config(&origin_ps, stale_conf, Some(Duration::from_secs(10)), false)
460 0 : .await?;
461 :
462 0 : let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps).await?);
463 :
464 : // If we are migrating to a destination that has a secondary location, warm it up first
465 0 : if let Some(destination_conf) = self.observed.locations.get(&dest_ps.get_id()) {
466 0 : if let Some(destination_conf) = &destination_conf.conf {
467 0 : if destination_conf.mode == LocationConfigMode::Secondary {
468 0 : tracing::info!("🔁 Downloading latest layers to destination node {dest_ps}",);
469 0 : self.secondary_download(self.tenant_shard_id, &dest_ps)
470 0 : .await?;
471 0 : }
472 0 : }
473 0 : }
474 :
475 : // Increment generation before attaching to new pageserver
476 : self.generation = Some(
477 0 : self.persistence
478 0 : .increment_generation(self.tenant_shard_id, dest_ps.get_id())
479 0 : .await?,
480 : );
481 :
482 0 : let dest_conf = build_location_config(
483 0 : &self.shard,
484 0 : &self.config,
485 0 : LocationConfigMode::AttachedMulti,
486 0 : self.generation,
487 0 : None,
488 0 : );
489 0 :
490 0 : tracing::info!("🔁 Attaching to pageserver {dest_ps}");
491 0 : self.location_config(&dest_ps, dest_conf, None, false)
492 0 : .await?;
493 :
494 0 : if let Some(baseline) = baseline_lsns {
495 0 : tracing::info!("🕑 Waiting for LSN to catch up...");
496 0 : self.await_lsn(self.tenant_shard_id, &dest_ps, baseline)
497 0 : .await?;
498 0 : }
499 :
500 0 : tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");
501 :
502 : // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
503 : // the origin without notifying compute, we will render the tenant unavailable.
504 0 : while let Err(e) = self.compute_notify().await {
505 0 : match e {
506 0 : NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
507 0 : NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
508 : _ => {
509 0 : tracing::warn!(
510 0 : "Live migration blocked by compute notification error, retrying: {e}"
511 : );
512 : }
513 : }
514 : }
515 :
516 : // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
517 : // this location will be deleted in the general case reconciliation that runs after this.
518 0 : let origin_secondary_conf = build_location_config(
519 0 : &self.shard,
520 0 : &self.config,
521 0 : LocationConfigMode::Secondary,
522 0 : None,
523 0 : Some(LocationConfigSecondary { warm: true }),
524 0 : );
525 0 : self.location_config(&origin_ps, origin_secondary_conf.clone(), None, false)
526 0 : .await?;
527 : // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
528 : // partway through. In fact, all location conf API calls should be in a wrapper that sets
529 : // the observed state to None, then runs, then sets it to what we wrote.
530 0 : self.observed.locations.insert(
531 0 : origin_ps.get_id(),
532 0 : ObservedStateLocation {
533 0 : conf: Some(origin_secondary_conf),
534 0 : },
535 0 : );
536 0 :
537 0 : tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
538 0 : let dest_final_conf = build_location_config(
539 0 : &self.shard,
540 0 : &self.config,
541 0 : LocationConfigMode::AttachedSingle,
542 0 : self.generation,
543 0 : None,
544 0 : );
545 0 : self.location_config(&dest_ps, dest_final_conf.clone(), None, false)
546 0 : .await?;
547 0 : self.observed.locations.insert(
548 0 : dest_ps.get_id(),
549 0 : ObservedStateLocation {
550 0 : conf: Some(dest_final_conf),
551 0 : },
552 0 : );
553 0 :
554 0 : tracing::info!("✅ Migration complete");
555 :
556 0 : Ok(())
557 0 : }
558 :
559 0 : async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
560 : // If the attached node has uncertain state, read it from the pageserver before proceeding: this
561 : // is important to avoid spurious generation increments.
562 : //
563 : // We don't need to do this for secondary/detach locations because it's harmless to just PUT their
564 : // location conf, whereas for attached locations it can interrupt clients if we spuriously destroy/recreate
565 : // the `Timeline` object in the pageserver.
566 :
567 0 : let Some(attached_node) = self.intent.attached.as_ref() else {
568 : // Nothing to do
569 0 : return Ok(());
570 : };
571 :
572 0 : if matches!(
573 0 : self.observed.locations.get(&attached_node.get_id()),
574 : Some(ObservedStateLocation { conf: None })
575 : ) {
576 0 : let tenant_shard_id = self.tenant_shard_id;
577 0 : let observed_conf = match attached_node
578 0 : .with_client_retries(
579 0 : |client| async move { client.get_location_config(tenant_shard_id).await },
580 0 : &self.service_config.jwt_token,
581 0 : 1,
582 0 : 1,
583 0 : Duration::from_secs(5),
584 0 : &self.cancel,
585 0 : )
586 0 : .await
587 : {
588 0 : Some(Ok(observed)) => Some(observed),
589 0 : Some(Err(mgmt_api::Error::ApiError(status, _msg)))
590 0 : if status == StatusCode::NOT_FOUND =>
591 0 : {
592 0 : None
593 : }
594 0 : Some(Err(e)) => return Err(e.into()),
595 0 : None => return Err(ReconcileError::Cancel),
596 : };
597 0 : tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
598 0 : match observed_conf {
599 0 : Some(conf) => {
600 0 : // Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
601 0 : // if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
602 0 : self.observed
603 0 : .locations
604 0 : .insert(attached_node.get_id(), ObservedStateLocation { conf });
605 0 : }
606 0 : None => {
607 0 : // Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
608 0 : self.observed.locations.remove(&attached_node.get_id());
609 0 : }
610 : }
611 0 : }
612 :
613 0 : Ok(())
614 0 : }
615 :
616 : /// Reconciling a tenant makes API calls to pageservers until the observed state
617 : /// matches the intended state.
618 : ///
619 : /// First we apply special case handling (e.g. for live migrations), and then a
620 : /// general case reconciliation where we walk through the intent by pageserver
621 : /// and call out to the pageserver to apply the desired state.
622 0 : pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
623 0 : // Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
624 0 : self.maybe_refresh_observed().await?;
625 :
626 : // Special case: live migration
627 0 : self.maybe_live_migrate().await?;
628 :
629 : // If the attached pageserver is not attached, do so now.
630 0 : if let Some(node) = self.intent.attached.as_ref() {
631 : // If we are in an attached policy, then generation must have been set (null generations
632 : // are only present when a tenant is initially loaded with a secondary policy)
633 0 : debug_assert!(self.generation.is_some());
634 0 : let Some(generation) = self.generation else {
635 0 : return Err(ReconcileError::Other(anyhow::anyhow!(
636 0 : "Attempted to attach with NULL generation"
637 0 : )));
638 : };
639 :
640 0 : let mut wanted_conf = attached_location_conf(
641 0 : generation,
642 0 : &self.shard,
643 0 : &self.config,
644 0 : !self.intent.secondary.is_empty(),
645 0 : );
646 0 : match self.observed.locations.get(&node.get_id()) {
647 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
648 0 : // Nothing to do
649 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
650 : }
651 0 : observed => {
652 : // In all cases other than a matching observed configuration, we will
653 : // reconcile this location. This includes locations with different configurations, as well
654 : // as locations with unknown (None) observed state.
655 :
656 : // The general case is to increment the generation. However, there are cases
657 : // where this is not necessary:
658 : // - if we are only updating the TenantConf part of the location
659 : // - if we are only changing the attachment mode (e.g. going to attachedmulti or attachedstale)
660 : // and the location was already in the correct generation
661 0 : let increment_generation = match observed {
662 0 : None => true,
663 0 : Some(ObservedStateLocation { conf: None }) => true,
664 : Some(ObservedStateLocation {
665 0 : conf: Some(observed),
666 0 : }) => {
667 0 : let generations_match = observed.generation == wanted_conf.generation;
668 :
669 : use LocationConfigMode::*;
670 0 : let mode_transition_requires_gen_inc =
671 0 : match (observed.mode, wanted_conf.mode) {
672 : // Usually the short-lived attachment modes (multi and stale) are only used
673 : // in the case of [`Self::live_migrate`], but it is simple to handle them correctly
674 : // here too. Locations are allowed to go Single->Stale and Multi->Single within the same generation.
675 0 : (AttachedSingle, AttachedStale) => false,
676 0 : (AttachedMulti, AttachedSingle) => false,
677 0 : (lhs, rhs) => lhs != rhs,
678 : };
679 :
680 0 : !generations_match || mode_transition_requires_gen_inc
681 : }
682 : };
683 :
684 0 : if increment_generation {
685 0 : let generation = self
686 0 : .persistence
687 0 : .increment_generation(self.tenant_shard_id, node.get_id())
688 0 : .await?;
689 0 : self.generation = Some(generation);
690 0 : wanted_conf.generation = generation.into();
691 0 : }
692 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
693 :
694 : // Because `node` comes from a ref to &self, clone it before calling into a &mut self
695 : // function: this could be avoided by refactoring the state mutated by location_config into
696 : // a separate type to Self.
697 0 : let node = node.clone();
698 0 :
699 0 : // Use lazy=true, because we may run many of Self concurrently, and do not want to
700 0 : // overload the pageserver with logical size calculations.
701 0 : self.location_config(&node, wanted_conf, None, true).await?;
702 0 : self.compute_notify().await?;
703 : }
704 : }
705 0 : }
706 :
707 : // Configure secondary locations: if these were previously attached this
708 : // implicitly downgrades them from attached to secondary.
709 0 : let mut changes = Vec::new();
710 0 : for node in &self.intent.secondary {
711 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
712 0 : match self.observed.locations.get(&node.get_id()) {
713 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
714 0 : // Nothing to do
715 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
716 : }
717 : _ => {
718 : // In all cases other than a matching observed configuration, we will
719 : // reconcile this location.
720 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
721 0 : changes.push((node.clone(), wanted_conf))
722 : }
723 : }
724 : }
725 :
726 : // Detach any extraneous pageservers that are no longer referenced
727 : // by our intent.
728 0 : for node in &self.detach {
729 0 : changes.push((
730 0 : node.clone(),
731 0 : LocationConfig {
732 0 : mode: LocationConfigMode::Detached,
733 0 : generation: None,
734 0 : secondary_conf: None,
735 0 : shard_number: self.shard.number.0,
736 0 : shard_count: self.shard.count.literal(),
737 0 : shard_stripe_size: self.shard.stripe_size.0,
738 0 : tenant_conf: self.config.clone(),
739 0 : },
740 0 : ));
741 0 : }
742 :
743 0 : for (node, conf) in changes {
744 0 : if self.cancel.is_cancelled() {
745 0 : return Err(ReconcileError::Cancel);
746 0 : }
747 0 : self.location_config(&node, conf, None, false).await?;
748 : }
749 :
750 0 : Ok(())
751 0 : }
752 :
753 0 : pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
754 : // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
755 : // destination.
756 0 : if let Some(node) = &self.intent.attached {
757 0 : let result = self
758 0 : .compute_hook
759 0 : .notify(
760 0 : self.tenant_shard_id,
761 0 : node.get_id(),
762 0 : self.shard.stripe_size,
763 0 : &self.cancel,
764 0 : )
765 0 : .await;
766 0 : if let Err(e) = &result {
767 : // It is up to the caller whether they want to drop out on this error, but they don't have to:
768 : // in general we should avoid letting unavailability of the cloud control plane stop us from
769 : // making progress.
770 0 : if !matches!(e, NotifyError::ShuttingDown) {
771 0 : tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
772 0 : }
773 :
774 : // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
775 : // needs to retry at some point.
776 0 : self.compute_notify_failure = true;
777 0 : }
778 0 : result
779 : } else {
780 0 : Ok(())
781 : }
782 0 : }
783 : }
784 :
785 : /// We tweak the externally-set TenantConfig while configuring
786 : /// locations, using our awareness of whether secondary locations
787 : /// are in use to automatically enable/disable heatmap uploads.
788 0 : fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
789 0 : let mut config = config.clone();
790 0 : if has_secondaries {
791 0 : if config.heatmap_period.is_none() {
792 0 : config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
793 0 : }
794 0 : } else {
795 0 : config.heatmap_period = None;
796 0 : }
797 0 : config
798 0 : }
799 :
800 0 : pub(crate) fn attached_location_conf(
801 0 : generation: Generation,
802 0 : shard: &ShardIdentity,
803 0 : config: &TenantConfig,
804 0 : has_secondaries: bool,
805 0 : ) -> LocationConfig {
806 0 : LocationConfig {
807 0 : mode: LocationConfigMode::AttachedSingle,
808 0 : generation: generation.into(),
809 0 : secondary_conf: None,
810 0 : shard_number: shard.number.0,
811 0 : shard_count: shard.count.literal(),
812 0 : shard_stripe_size: shard.stripe_size.0,
813 0 : tenant_conf: ha_aware_config(config, has_secondaries),
814 0 : }
815 0 : }
816 :
817 0 : pub(crate) fn secondary_location_conf(
818 0 : shard: &ShardIdentity,
819 0 : config: &TenantConfig,
820 0 : ) -> LocationConfig {
821 0 : LocationConfig {
822 0 : mode: LocationConfigMode::Secondary,
823 0 : generation: None,
824 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
825 0 : shard_number: shard.number.0,
826 0 : shard_count: shard.count.literal(),
827 0 : shard_stripe_size: shard.stripe_size.0,
828 0 : tenant_conf: ha_aware_config(config, true),
829 0 : }
830 0 : }
|