Line data Source code
1 : use crate::persistence::Persistence;
2 : use crate::service;
3 : use pageserver_api::controller_api::NodeAvailability;
4 : use pageserver_api::models::{
5 : LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
6 : };
7 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
8 : use pageserver_client::mgmt_api;
9 : use std::collections::HashMap;
10 : use std::sync::Arc;
11 : use std::time::Duration;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::generation::Generation;
14 : use utils::id::{NodeId, TimelineId};
15 : use utils::lsn::Lsn;
16 : use utils::sync::gate::GateGuard;
17 :
18 : use crate::compute_hook::{ComputeHook, NotifyError};
19 : use crate::node::Node;
20 : use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
21 :
22 : /// Object with the lifetime of the background reconcile task that is created
23 : /// for tenants which have a difference between their intent and observed states.
24 : pub(super) struct Reconciler {
25 : /// See [`crate::tenant_state::TenantState`] for the meanings of these fields: they are a snapshot
26 : /// of a tenant's state from when we spawned a reconcile task.
27 : pub(super) tenant_shard_id: TenantShardId,
28 : pub(crate) shard: ShardIdentity,
29 : pub(crate) generation: Generation,
30 : pub(crate) intent: TargetState,
31 : pub(crate) config: TenantConfig,
32 : pub(crate) observed: ObservedState,
33 :
34 : pub(crate) service_config: service::Config,
35 :
36 : /// A snapshot of the pageservers as they were when we were asked
37 : /// to reconcile.
38 : pub(crate) pageservers: Arc<HashMap<NodeId, Node>>,
39 :
40 : /// A hook to notify the running postgres instances when we change the location
41 : /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
42 : /// and guarantee eventual retries.
43 : pub(crate) compute_hook: Arc<ComputeHook>,
44 :
45 : /// To avoid stalling if the cloud control plane is unavailable, we may proceed
46 : /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
47 : /// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry.
48 : pub(crate) compute_notify_failure: bool,
49 :
50 : /// A means to abort background reconciliation: it is essential to
51 : /// call this when something changes in the original TenantState that
52 : /// will make this reconciliation impossible or unnecessary, for
53 : /// example when a pageserver node goes offline, or the PlacementPolicy for
54 : /// the tenant is changed.
55 : pub(crate) cancel: CancellationToken,
56 :
57 : /// Reconcilers are registered with a Gate so that during a graceful shutdown we
58 : /// can wait for all the reconcilers to respond to their cancellation tokens.
59 : pub(crate) _gate_guard: GateGuard,
60 :
61 : /// Access to persistent storage for updating generation numbers
62 : pub(crate) persistence: Arc<Persistence>,
63 : }
64 :
65 : /// This is a snapshot of [`crate::tenant_state::IntentState`], but it does not do any
66 : /// reference counting for Scheduler. The IntentState is what the scheduler works with,
67 : /// and the TargetState is just the instruction for a particular Reconciler run.
68 0 : #[derive(Debug)]
69 : pub(crate) struct TargetState {
70 : pub(crate) attached: Option<NodeId>,
71 : pub(crate) secondary: Vec<NodeId>,
72 : }
73 :
74 : impl TargetState {
75 0 : pub(crate) fn from_intent(intent: &IntentState) -> Self {
76 0 : Self {
77 0 : attached: *intent.get_attached(),
78 0 : secondary: intent.get_secondary().clone(),
79 0 : }
80 0 : }
81 :
82 0 : fn all_pageservers(&self) -> Vec<NodeId> {
83 0 : let mut result = self.secondary.clone();
84 0 : if let Some(node_id) = &self.attached {
85 0 : result.push(*node_id);
86 0 : }
87 0 : result
88 0 : }
89 : }
90 :
91 0 : #[derive(thiserror::Error, Debug)]
92 : pub(crate) enum ReconcileError {
93 : #[error(transparent)]
94 : Notify(#[from] NotifyError),
95 : #[error("Cancelled")]
96 : Cancel,
97 : #[error(transparent)]
98 : Other(#[from] anyhow::Error),
99 : }
100 :
101 : impl Reconciler {
102 0 : async fn location_config(
103 0 : &mut self,
104 0 : node_id: NodeId,
105 0 : config: LocationConfig,
106 0 : flush_ms: Option<Duration>,
107 0 : ) -> anyhow::Result<()> {
108 0 : let node = self
109 0 : .pageservers
110 0 : .get(&node_id)
111 0 : .expect("Pageserver may not be removed while referenced");
112 0 :
113 0 : self.observed
114 0 : .locations
115 0 : .insert(node.id, ObservedStateLocation { conf: None });
116 :
117 0 : tracing::info!("location_config({}) calling: {:?}", node_id, config);
118 0 : let client =
119 0 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
120 0 : client
121 0 : .location_config(self.tenant_shard_id, config.clone(), flush_ms)
122 0 : .await?;
123 0 : tracing::info!("location_config({}) complete: {:?}", node_id, config);
124 :
125 0 : self.observed
126 0 : .locations
127 0 : .insert(node.id, ObservedStateLocation { conf: Some(config) });
128 0 :
129 0 : Ok(())
130 0 : }
131 :
132 0 : async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
133 0 : let destination = if let Some(node_id) = self.intent.attached {
134 0 : match self.observed.locations.get(&node_id) {
135 0 : Some(conf) => {
136 : // We will do a live migration only if the intended destination is not
137 : // currently in an attached state.
138 0 : match &conf.conf {
139 0 : Some(conf) if conf.mode == LocationConfigMode::Secondary => {
140 0 : // Fall through to do a live migration
141 0 : node_id
142 : }
143 : None | Some(_) => {
144 : // Attached or uncertain: don't do a live migration, proceed
145 : // with a general-case reconciliation
146 0 : tracing::info!("maybe_live_migrate: destination is None or attached");
147 0 : return Ok(());
148 : }
149 : }
150 : }
151 : None => {
152 : // Our destination is not attached: maybe live migrate if some other
153 : // node is currently attached. Fall through.
154 0 : node_id
155 : }
156 : }
157 : } else {
158 : // No intent to be attached
159 0 : tracing::info!("maybe_live_migrate: no attached intent");
160 0 : return Ok(());
161 : };
162 :
163 0 : let mut origin = None;
164 0 : for (node_id, state) in &self.observed.locations {
165 0 : if let Some(observed_conf) = &state.conf {
166 0 : if observed_conf.mode == LocationConfigMode::AttachedSingle {
167 0 : let node = self
168 0 : .pageservers
169 0 : .get(node_id)
170 0 : .expect("Nodes may not be removed while referenced");
171 0 : // We will only attempt live migration if the origin is not offline: this
172 0 : // avoids trying to do it while reconciling after responding to an HA failover.
173 0 : if !matches!(node.availability, NodeAvailability::Offline) {
174 0 : origin = Some(*node_id);
175 0 : break;
176 0 : }
177 0 : }
178 0 : }
179 : }
180 :
181 0 : let Some(origin) = origin else {
182 0 : tracing::info!("maybe_live_migrate: no origin found");
183 0 : return Ok(());
184 : };
185 :
186 : // We have an origin and a destination: proceed to do the live migration
187 0 : tracing::info!("Live migrating {}->{}", origin, destination);
188 0 : self.live_migrate(origin, destination).await?;
189 :
190 0 : Ok(())
191 0 : }
192 :
193 0 : async fn get_lsns(
194 0 : &self,
195 0 : tenant_shard_id: TenantShardId,
196 0 : node_id: &NodeId,
197 0 : ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
198 0 : let node = self
199 0 : .pageservers
200 0 : .get(node_id)
201 0 : .expect("Pageserver may not be removed while referenced");
202 0 :
203 0 : let client =
204 0 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
205 :
206 0 : let timelines = client.timeline_list(&tenant_shard_id).await?;
207 0 : Ok(timelines
208 0 : .into_iter()
209 0 : .map(|t| (t.timeline_id, t.last_record_lsn))
210 0 : .collect())
211 0 : }
212 :
213 0 : async fn secondary_download(&self, tenant_shard_id: TenantShardId, node_id: &NodeId) {
214 0 : let node = self
215 0 : .pageservers
216 0 : .get(node_id)
217 0 : .expect("Pageserver may not be removed while referenced");
218 0 :
219 0 : let client =
220 0 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
221 0 :
222 0 : match client.tenant_secondary_download(tenant_shard_id).await {
223 0 : Ok(()) => {}
224 : Err(_) => {
225 0 : tracing::info!(" (skipping, destination wasn't in secondary mode)")
226 : }
227 : }
228 0 : }
229 :
230 0 : async fn await_lsn(
231 0 : &self,
232 0 : tenant_shard_id: TenantShardId,
233 0 : pageserver_id: &NodeId,
234 0 : baseline: HashMap<TimelineId, Lsn>,
235 0 : ) -> anyhow::Result<()> {
236 : loop {
237 0 : let latest = match self.get_lsns(tenant_shard_id, pageserver_id).await {
238 0 : Ok(l) => l,
239 0 : Err(e) => {
240 0 : println!(
241 0 : "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})",
242 0 : pageserver_id
243 0 : );
244 0 : std::thread::sleep(Duration::from_millis(500));
245 0 : continue;
246 : }
247 : };
248 :
249 0 : let mut any_behind: bool = false;
250 0 : for (timeline_id, baseline_lsn) in &baseline {
251 0 : match latest.get(timeline_id) {
252 0 : Some(latest_lsn) => {
253 0 : println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
254 0 : if latest_lsn < baseline_lsn {
255 0 : any_behind = true;
256 0 : }
257 : }
258 0 : None => {
259 0 : // Expected timeline isn't yet visible on migration destination.
260 0 : // (IRL we would have to account for timeline deletion, but this
261 0 : // is just test helper)
262 0 : any_behind = true;
263 0 : }
264 : }
265 : }
266 :
267 0 : if !any_behind {
268 0 : println!("✅ LSN caught up. Proceeding...");
269 0 : break;
270 0 : } else {
271 0 : std::thread::sleep(Duration::from_millis(500));
272 0 : }
273 : }
274 :
275 0 : Ok(())
276 0 : }
277 :
278 0 : pub async fn live_migrate(
279 0 : &mut self,
280 0 : origin_ps_id: NodeId,
281 0 : dest_ps_id: NodeId,
282 0 : ) -> anyhow::Result<()> {
283 0 : // `maybe_live_migrate` is responsibble for sanity of inputs
284 0 : assert!(origin_ps_id != dest_ps_id);
285 :
286 0 : fn build_location_config(
287 0 : shard: &ShardIdentity,
288 0 : config: &TenantConfig,
289 0 : mode: LocationConfigMode,
290 0 : generation: Option<Generation>,
291 0 : secondary_conf: Option<LocationConfigSecondary>,
292 0 : ) -> LocationConfig {
293 0 : LocationConfig {
294 0 : mode,
295 0 : generation: generation.map(|g| g.into().unwrap()),
296 0 : secondary_conf,
297 0 : tenant_conf: config.clone(),
298 0 : shard_number: shard.number.0,
299 0 : shard_count: shard.count.literal(),
300 0 : shard_stripe_size: shard.stripe_size.0,
301 0 : }
302 0 : }
303 :
304 0 : tracing::info!(
305 0 : "🔁 Switching origin pageserver {} to stale mode",
306 0 : origin_ps_id
307 0 : );
308 :
309 : // FIXME: it is incorrect to use self.generation here, we should use the generation
310 : // from the ObservedState of the origin pageserver (it might be older than self.generation)
311 0 : let stale_conf = build_location_config(
312 0 : &self.shard,
313 0 : &self.config,
314 0 : LocationConfigMode::AttachedStale,
315 0 : Some(self.generation),
316 0 : None,
317 0 : );
318 0 : self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10)))
319 0 : .await?;
320 :
321 0 : let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps_id).await?);
322 :
323 : // If we are migrating to a destination that has a secondary location, warm it up first
324 0 : if let Some(destination_conf) = self.observed.locations.get(&dest_ps_id) {
325 0 : if let Some(destination_conf) = &destination_conf.conf {
326 0 : if destination_conf.mode == LocationConfigMode::Secondary {
327 0 : tracing::info!(
328 0 : "🔁 Downloading latest layers to destination pageserver {}",
329 0 : dest_ps_id,
330 0 : );
331 0 : self.secondary_download(self.tenant_shard_id, &dest_ps_id)
332 0 : .await;
333 0 : }
334 0 : }
335 0 : }
336 :
337 : // Increment generation before attaching to new pageserver
338 0 : self.generation = self
339 0 : .persistence
340 0 : .increment_generation(self.tenant_shard_id, dest_ps_id)
341 0 : .await?;
342 :
343 0 : let dest_conf = build_location_config(
344 0 : &self.shard,
345 0 : &self.config,
346 0 : LocationConfigMode::AttachedMulti,
347 0 : Some(self.generation),
348 0 : None,
349 0 : );
350 :
351 0 : tracing::info!("🔁 Attaching to pageserver {}", dest_ps_id);
352 0 : self.location_config(dest_ps_id, dest_conf, None).await?;
353 :
354 0 : if let Some(baseline) = baseline_lsns {
355 0 : tracing::info!("🕑 Waiting for LSN to catch up...");
356 0 : self.await_lsn(self.tenant_shard_id, &dest_ps_id, baseline)
357 0 : .await?;
358 0 : }
359 :
360 0 : tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id);
361 :
362 : // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
363 : // the origin without notifying compute, we will render the tenant unavailable.
364 0 : while let Err(e) = self.compute_notify().await {
365 0 : match e {
366 0 : NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)),
367 : _ => {
368 0 : tracing::warn!(
369 0 : "Live migration blocked by compute notification error, retrying: {e}"
370 0 : );
371 : }
372 : }
373 : }
374 :
375 : // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Single, then
376 : // this location will be deleted in the general case reconciliation that runs after this.
377 0 : let origin_secondary_conf = build_location_config(
378 0 : &self.shard,
379 0 : &self.config,
380 0 : LocationConfigMode::Secondary,
381 0 : None,
382 0 : Some(LocationConfigSecondary { warm: true }),
383 0 : );
384 0 : self.location_config(origin_ps_id, origin_secondary_conf.clone(), None)
385 0 : .await?;
386 : // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
387 : // partway through. In fact, all location conf API calls should be in a wrapper that sets
388 : // the observed state to None, then runs, then sets it to what we wrote.
389 0 : self.observed.locations.insert(
390 0 : origin_ps_id,
391 0 : ObservedStateLocation {
392 0 : conf: Some(origin_secondary_conf),
393 0 : },
394 0 : );
395 0 :
396 0 : println!(
397 0 : "🔁 Switching to AttachedSingle mode on pageserver {}",
398 0 : dest_ps_id
399 0 : );
400 0 : let dest_final_conf = build_location_config(
401 0 : &self.shard,
402 0 : &self.config,
403 0 : LocationConfigMode::AttachedSingle,
404 0 : Some(self.generation),
405 0 : None,
406 0 : );
407 0 : self.location_config(dest_ps_id, dest_final_conf.clone(), None)
408 0 : .await?;
409 0 : self.observed.locations.insert(
410 0 : dest_ps_id,
411 0 : ObservedStateLocation {
412 0 : conf: Some(dest_final_conf),
413 0 : },
414 0 : );
415 0 :
416 0 : println!("✅ Migration complete");
417 0 :
418 0 : Ok(())
419 0 : }
420 :
421 : /// Reconciling a tenant makes API calls to pageservers until the observed state
422 : /// matches the intended state.
423 : ///
424 : /// First we apply special case handling (e.g. for live migrations), and then a
425 : /// general case reconciliation where we walk through the intent by pageserver
426 : /// and call out to the pageserver to apply the desired state.
427 0 : pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
428 0 : // TODO: if any of self.observed is None, call to remote pageservers
429 0 : // to learn correct state.
430 0 :
431 0 : // Special case: live migration
432 0 : self.maybe_live_migrate().await?;
433 :
434 : // If the attached pageserver is not attached, do so now.
435 0 : if let Some(node_id) = self.intent.attached {
436 0 : let mut wanted_conf =
437 0 : attached_location_conf(self.generation, &self.shard, &self.config);
438 0 : match self.observed.locations.get(&node_id) {
439 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
440 : // Nothing to do
441 0 : tracing::info!(%node_id, "Observed configuration already correct.")
442 : }
443 : _ => {
444 : // In all cases other than a matching observed configuration, we will
445 : // reconcile this location. This includes locations with different configurations, as well
446 : // as locations with unknown (None) observed state.
447 0 : self.generation = self
448 0 : .persistence
449 0 : .increment_generation(self.tenant_shard_id, node_id)
450 0 : .await?;
451 0 : wanted_conf.generation = self.generation.into();
452 0 : tracing::info!(%node_id, "Observed configuration requires update.");
453 0 : self.location_config(node_id, wanted_conf, None).await?;
454 0 : self.compute_notify().await?;
455 : }
456 : }
457 0 : }
458 :
459 : // Configure secondary locations: if these were previously attached this
460 : // implicitly downgrades them from attached to secondary.
461 0 : let mut changes = Vec::new();
462 0 : for node_id in &self.intent.secondary {
463 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
464 0 : match self.observed.locations.get(node_id) {
465 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
466 : // Nothing to do
467 0 : tracing::info!(%node_id, "Observed configuration already correct.")
468 : }
469 : _ => {
470 : // In all cases other than a matching observed configuration, we will
471 : // reconcile this location.
472 0 : tracing::info!(%node_id, "Observed configuration requires update.");
473 0 : changes.push((*node_id, wanted_conf))
474 : }
475 : }
476 : }
477 :
478 : // Detach any extraneous pageservers that are no longer referenced
479 : // by our intent.
480 0 : let all_pageservers = self.intent.all_pageservers();
481 0 : for node_id in self.observed.locations.keys() {
482 0 : if all_pageservers.contains(node_id) {
483 : // We are only detaching pageservers that aren't used at all.
484 0 : continue;
485 0 : }
486 0 :
487 0 : changes.push((
488 0 : *node_id,
489 0 : LocationConfig {
490 0 : mode: LocationConfigMode::Detached,
491 0 : generation: None,
492 0 : secondary_conf: None,
493 0 : shard_number: self.shard.number.0,
494 0 : shard_count: self.shard.count.literal(),
495 0 : shard_stripe_size: self.shard.stripe_size.0,
496 0 : tenant_conf: self.config.clone(),
497 0 : },
498 0 : ));
499 : }
500 :
501 0 : for (node_id, conf) in changes {
502 0 : if self.cancel.is_cancelled() {
503 0 : return Err(ReconcileError::Cancel);
504 0 : }
505 0 : self.location_config(node_id, conf, None).await?;
506 : }
507 :
508 0 : Ok(())
509 0 : }
510 :
511 0 : pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
512 : // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
513 : // destination.
514 0 : if let Some(node_id) = self.intent.attached {
515 0 : let result = self
516 0 : .compute_hook
517 0 : .notify(self.tenant_shard_id, node_id, &self.cancel)
518 0 : .await;
519 0 : if let Err(e) = &result {
520 : // It is up to the caller whether they want to drop out on this error, but they don't have to:
521 : // in general we should avoid letting unavailability of the cloud control plane stop us from
522 : // making progress.
523 0 : tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}");
524 : // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
525 : // needs to retry at some point.
526 0 : self.compute_notify_failure = true;
527 0 : }
528 0 : result
529 : } else {
530 0 : Ok(())
531 : }
532 0 : }
533 : }
534 :
535 0 : pub(crate) fn attached_location_conf(
536 0 : generation: Generation,
537 0 : shard: &ShardIdentity,
538 0 : config: &TenantConfig,
539 0 : ) -> LocationConfig {
540 0 : LocationConfig {
541 0 : mode: LocationConfigMode::AttachedSingle,
542 0 : generation: generation.into(),
543 0 : secondary_conf: None,
544 0 : shard_number: shard.number.0,
545 0 : shard_count: shard.count.literal(),
546 0 : shard_stripe_size: shard.stripe_size.0,
547 0 : tenant_conf: config.clone(),
548 0 : }
549 0 : }
550 :
551 0 : pub(crate) fn secondary_location_conf(
552 0 : shard: &ShardIdentity,
553 0 : config: &TenantConfig,
554 0 : ) -> LocationConfig {
555 0 : LocationConfig {
556 0 : mode: LocationConfigMode::Secondary,
557 0 : generation: None,
558 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
559 0 : shard_number: shard.number.0,
560 0 : shard_count: shard.count.literal(),
561 0 : shard_stripe_size: shard.stripe_size.0,
562 0 : tenant_conf: config.clone(),
563 0 : }
564 0 : }
|