Line data Source code
1 : use crate::persistence::Persistence;
2 : use crate::service;
3 : use control_plane::attachment_service::NodeAvailability;
4 : use pageserver_api::models::{
5 : LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
6 : };
7 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
8 : use pageserver_client::mgmt_api;
9 : use std::collections::HashMap;
10 : use std::sync::Arc;
11 : use std::time::Duration;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::generation::Generation;
14 : use utils::id::{NodeId, TimelineId};
15 : use utils::lsn::Lsn;
16 :
17 : use crate::compute_hook::{ComputeHook, NotifyError};
18 : use crate::node::Node;
19 : use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
20 :
21 : /// Object with the lifetime of the background reconcile task that is created
22 : /// for tenants which have a difference between their intent and observed states.
23 : pub(super) struct Reconciler {
24 : /// See [`crate::tenant_state::TenantState`] for the meanings of these fields: they are a snapshot
25 : /// of a tenant's state from when we spawned a reconcile task.
26 : pub(super) tenant_shard_id: TenantShardId,
27 : pub(crate) shard: ShardIdentity,
28 : pub(crate) generation: Generation,
29 : pub(crate) intent: IntentState,
30 : pub(crate) config: TenantConfig,
31 : pub(crate) observed: ObservedState,
32 :
33 : pub(crate) service_config: service::Config,
34 :
35 : /// A snapshot of the pageservers as they were when we were asked
36 : /// to reconcile.
37 : pub(crate) pageservers: Arc<HashMap<NodeId, Node>>,
38 :
39 : /// A hook to notify the running postgres instances when we change the location
40 : /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
41 : /// and guarantee eventual retries.
42 : pub(crate) compute_hook: Arc<ComputeHook>,
43 :
44 : /// To avoid stalling if the cloud control plane is unavailable, we may proceed
45 : /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
46 : /// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry.
47 : pub(crate) compute_notify_failure: bool,
48 :
49 : /// A means to abort background reconciliation: it is essential to
50 : /// call this when something changes in the original TenantState that
51 : /// will make this reconciliation impossible or unnecessary, for
52 : /// example when a pageserver node goes offline, or the PlacementPolicy for
53 : /// the tenant is changed.
54 : pub(crate) cancel: CancellationToken,
55 :
56 : /// Access to persistent storage for updating generation numbers
57 : pub(crate) persistence: Arc<Persistence>,
58 : }
59 :
60 2 : #[derive(thiserror::Error, Debug)]
61 : pub(crate) enum ReconcileError {
62 : #[error(transparent)]
63 : Notify(#[from] NotifyError),
64 : #[error(transparent)]
65 : Other(#[from] anyhow::Error),
66 : }
67 :
68 : impl Reconciler {
69 524 : async fn location_config(
70 524 : &mut self,
71 524 : node_id: NodeId,
72 524 : config: LocationConfig,
73 524 : flush_ms: Option<Duration>,
74 524 : ) -> anyhow::Result<()> {
75 524 : let node = self
76 524 : .pageservers
77 524 : .get(&node_id)
78 524 : .expect("Pageserver may not be removed while referenced");
79 524 :
80 524 : self.observed
81 524 : .locations
82 524 : .insert(node.id, ObservedStateLocation { conf: None });
83 :
84 522 : tracing::info!("location_config({}) calling: {:?}", node_id, config);
85 522 : let client =
86 522 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
87 522 : client
88 522 : .location_config(self.tenant_shard_id, config.clone(), flush_ms)
89 2060 : .await?;
90 521 : tracing::info!("location_config({}) complete: {:?}", node_id, config);
91 :
92 521 : self.observed
93 521 : .locations
94 521 : .insert(node.id, ObservedStateLocation { conf: Some(config) });
95 521 :
96 521 : Ok(())
97 522 : }
98 :
99 503 : async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
100 503 : let destination = if let Some(node_id) = self.intent.attached {
101 503 : match self.observed.locations.get(&node_id) {
102 0 : Some(conf) => {
103 : // We will do a live migration only if the intended destination is not
104 : // currently in an attached state.
105 0 : match &conf.conf {
106 0 : Some(conf) if conf.mode == LocationConfigMode::Secondary => {
107 0 : // Fall through to do a live migration
108 0 : node_id
109 : }
110 : None | Some(_) => {
111 : // Attached or uncertain: don't do a live migration, proceed
112 : // with a general-case reconciliation
113 0 : tracing::info!("maybe_live_migrate: destination is None or attached");
114 0 : return Ok(());
115 : }
116 : }
117 : }
118 : None => {
119 : // Our destination is not attached: maybe live migrate if some other
120 : // node is currently attached. Fall through.
121 503 : node_id
122 : }
123 : }
124 : } else {
125 : // No intent to be attached
126 0 : tracing::info!("maybe_live_migrate: no attached intent");
127 0 : return Ok(());
128 : };
129 :
130 503 : let mut origin = None;
131 508 : for (node_id, state) in &self.observed.locations {
132 9 : if let Some(observed_conf) = &state.conf {
133 4 : if observed_conf.mode == LocationConfigMode::AttachedSingle {
134 4 : let node = self
135 4 : .pageservers
136 4 : .get(node_id)
137 4 : .expect("Nodes may not be removed while referenced");
138 4 : // We will only attempt live migration if the origin is not offline: this
139 4 : // avoids trying to do it while reconciling after responding to an HA failover.
140 4 : if !matches!(node.availability, NodeAvailability::Offline) {
141 4 : origin = Some(*node_id);
142 4 : break;
143 0 : }
144 0 : }
145 5 : }
146 : }
147 :
148 503 : let Some(origin) = origin else {
149 499 : tracing::info!("maybe_live_migrate: no origin found");
150 499 : return Ok(());
151 : };
152 :
153 : // We have an origin and a destination: proceed to do the live migration
154 4 : tracing::info!("Live migrating {}->{}", origin, destination);
155 128 : self.live_migrate(origin, destination).await?;
156 :
157 4 : Ok(())
158 503 : }
159 :
160 12 : async fn get_lsns(
161 12 : &self,
162 12 : tenant_shard_id: TenantShardId,
163 12 : node_id: &NodeId,
164 12 : ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
165 12 : let node = self
166 12 : .pageservers
167 12 : .get(node_id)
168 12 : .expect("Pageserver may not be removed while referenced");
169 12 :
170 12 : let client =
171 12 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
172 :
173 48 : let timelines = client.timeline_list(&tenant_shard_id).await?;
174 8 : Ok(timelines
175 8 : .into_iter()
176 8 : .map(|t| (t.timeline_id, t.last_record_lsn))
177 8 : .collect())
178 12 : }
179 :
180 0 : async fn secondary_download(&self, tenant_shard_id: TenantShardId, node_id: &NodeId) {
181 0 : let node = self
182 0 : .pageservers
183 0 : .get(node_id)
184 0 : .expect("Pageserver may not be removed while referenced");
185 0 :
186 0 : let client =
187 0 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
188 0 :
189 0 : match client.tenant_secondary_download(tenant_shard_id).await {
190 0 : Ok(()) => {}
191 : Err(_) => {
192 0 : tracing::info!(" (skipping, destination wasn't in secondary mode)")
193 : }
194 : }
195 0 : }
196 :
197 4 : async fn await_lsn(
198 4 : &self,
199 4 : tenant_shard_id: TenantShardId,
200 4 : pageserver_id: &NodeId,
201 4 : baseline: HashMap<TimelineId, Lsn>,
202 4 : ) -> anyhow::Result<()> {
203 : loop {
204 32 : let latest = match self.get_lsns(tenant_shard_id, pageserver_id).await {
205 4 : Ok(l) => l,
206 4 : Err(e) => {
207 4 : println!(
208 4 : "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})",
209 4 : pageserver_id
210 4 : );
211 4 : std::thread::sleep(Duration::from_millis(500));
212 4 : continue;
213 : }
214 : };
215 :
216 4 : let mut any_behind: bool = false;
217 8 : for (timeline_id, baseline_lsn) in &baseline {
218 4 : match latest.get(timeline_id) {
219 4 : Some(latest_lsn) => {
220 4 : println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
221 4 : if latest_lsn < baseline_lsn {
222 0 : any_behind = true;
223 4 : }
224 : }
225 0 : None => {
226 0 : // Expected timeline isn't yet visible on migration destination.
227 0 : // (IRL we would have to account for timeline deletion, but this
228 0 : // is just test helper)
229 0 : any_behind = true;
230 0 : }
231 : }
232 : }
233 :
234 4 : if !any_behind {
235 4 : println!("✅ LSN caught up. Proceeding...");
236 4 : break;
237 0 : } else {
238 0 : std::thread::sleep(Duration::from_millis(500));
239 0 : }
240 : }
241 :
242 4 : Ok(())
243 4 : }
244 :
245 4 : pub async fn live_migrate(
246 4 : &mut self,
247 4 : origin_ps_id: NodeId,
248 4 : dest_ps_id: NodeId,
249 4 : ) -> anyhow::Result<()> {
250 4 : // `maybe_live_migrate` is responsibble for sanity of inputs
251 4 : assert!(origin_ps_id != dest_ps_id);
252 :
253 16 : fn build_location_config(
254 16 : shard: &ShardIdentity,
255 16 : config: &TenantConfig,
256 16 : mode: LocationConfigMode,
257 16 : generation: Option<Generation>,
258 16 : secondary_conf: Option<LocationConfigSecondary>,
259 16 : ) -> LocationConfig {
260 16 : LocationConfig {
261 16 : mode,
262 16 : generation: generation.map(|g| g.into().unwrap()),
263 16 : secondary_conf,
264 16 : tenant_conf: config.clone(),
265 16 : shard_number: shard.number.0,
266 16 : shard_count: shard.count.0,
267 16 : shard_stripe_size: shard.stripe_size.0,
268 16 : }
269 16 : }
270 :
271 4 : tracing::info!(
272 4 : "🔁 Switching origin pageserver {} to stale mode",
273 4 : origin_ps_id
274 4 : );
275 :
276 : // FIXME: it is incorrect to use self.generation here, we should use the generation
277 : // from the ObservedState of the origin pageserver (it might be older than self.generation)
278 4 : let stale_conf = build_location_config(
279 4 : &self.shard,
280 4 : &self.config,
281 4 : LocationConfigMode::AttachedStale,
282 4 : Some(self.generation),
283 4 : None,
284 4 : );
285 4 : self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10)))
286 16 : .await?;
287 :
288 16 : let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps_id).await?);
289 :
290 : // If we are migrating to a destination that has a secondary location, warm it up first
291 4 : if let Some(destination_conf) = self.observed.locations.get(&dest_ps_id) {
292 0 : if let Some(destination_conf) = &destination_conf.conf {
293 0 : if destination_conf.mode == LocationConfigMode::Secondary {
294 0 : tracing::info!(
295 0 : "🔁 Downloading latest layers to destination pageserver {}",
296 0 : dest_ps_id,
297 0 : );
298 0 : self.secondary_download(self.tenant_shard_id, &dest_ps_id)
299 0 : .await;
300 0 : }
301 0 : }
302 4 : }
303 :
304 : // Increment generation before attaching to new pageserver
305 4 : self.generation = self
306 4 : .persistence
307 4 : .increment_generation(self.tenant_shard_id, dest_ps_id)
308 4 : .await?;
309 :
310 4 : let dest_conf = build_location_config(
311 4 : &self.shard,
312 4 : &self.config,
313 4 : LocationConfigMode::AttachedMulti,
314 4 : Some(self.generation),
315 4 : None,
316 4 : );
317 :
318 4 : tracing::info!("🔁 Attaching to pageserver {}", dest_ps_id);
319 16 : self.location_config(dest_ps_id, dest_conf, None).await?;
320 :
321 4 : if let Some(baseline) = baseline_lsns {
322 4 : tracing::info!("🕑 Waiting for LSN to catch up...");
323 4 : self.await_lsn(self.tenant_shard_id, &dest_ps_id, baseline)
324 32 : .await?;
325 0 : }
326 :
327 4 : tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id);
328 :
329 : // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
330 : // the origin without notifying compute, we will render the tenant unavailable.
331 12 : while let Err(e) = self.compute_notify().await {
332 0 : match e {
333 0 : NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)),
334 : _ => {
335 0 : tracing::warn!(
336 0 : "Live migration blocked by compute notification error, retrying: {e}"
337 0 : );
338 : }
339 : }
340 : }
341 :
342 : // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Single, then
343 : // this location will be deleted in the general case reconciliation that runs after this.
344 4 : let origin_secondary_conf = build_location_config(
345 4 : &self.shard,
346 4 : &self.config,
347 4 : LocationConfigMode::Secondary,
348 4 : None,
349 4 : Some(LocationConfigSecondary { warm: true }),
350 4 : );
351 4 : self.location_config(origin_ps_id, origin_secondary_conf.clone(), None)
352 16 : .await?;
353 : // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
354 : // partway through. In fact, all location conf API calls should be in a wrapper that sets
355 : // the observed state to None, then runs, then sets it to what we wrote.
356 4 : self.observed.locations.insert(
357 4 : origin_ps_id,
358 4 : ObservedStateLocation {
359 4 : conf: Some(origin_secondary_conf),
360 4 : },
361 4 : );
362 4 :
363 4 : println!(
364 4 : "🔁 Switching to AttachedSingle mode on pageserver {}",
365 4 : dest_ps_id
366 4 : );
367 4 : let dest_final_conf = build_location_config(
368 4 : &self.shard,
369 4 : &self.config,
370 4 : LocationConfigMode::AttachedSingle,
371 4 : Some(self.generation),
372 4 : None,
373 4 : );
374 4 : self.location_config(dest_ps_id, dest_final_conf.clone(), None)
375 16 : .await?;
376 4 : self.observed.locations.insert(
377 4 : dest_ps_id,
378 4 : ObservedStateLocation {
379 4 : conf: Some(dest_final_conf),
380 4 : },
381 4 : );
382 4 :
383 4 : println!("✅ Migration complete");
384 4 :
385 4 : Ok(())
386 4 : }
387 :
388 : /// Reconciling a tenant makes API calls to pageservers until the observed state
389 : /// matches the intended state.
390 : ///
391 : /// First we apply special case handling (e.g. for live migrations), and then a
392 : /// general case reconciliation where we walk through the intent by pageserver
393 : /// and call out to the pageserver to apply the desired state.
394 503 : pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
395 503 : // TODO: if any of self.observed is None, call to remote pageservers
396 503 : // to learn correct state.
397 503 :
398 503 : // Special case: live migration
399 503 : self.maybe_live_migrate().await?;
400 :
401 : // If the attached pageserver is not attached, do so now.
402 503 : if let Some(node_id) = self.intent.attached {
403 503 : let mut wanted_conf =
404 503 : attached_location_conf(self.generation, &self.shard, &self.config);
405 503 : match self.observed.locations.get(&node_id) {
406 4 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
407 : // Nothing to do
408 4 : tracing::info!("Observed configuration already correct.")
409 : }
410 : _ => {
411 : // In all cases other than a matching observed configuration, we will
412 : // reconcile this location. This includes locations with different configurations, as well
413 : // as locations with unknown (None) observed state.
414 499 : self.generation = self
415 499 : .persistence
416 499 : .increment_generation(self.tenant_shard_id, node_id)
417 499 : .await?;
418 499 : wanted_conf.generation = self.generation.into();
419 499 : tracing::info!("Observed configuration requires update.");
420 1960 : self.location_config(node_id, wanted_conf, None).await?;
421 496 : self.compute_notify().await?;
422 : }
423 : }
424 0 : }
425 :
426 : // Configure secondary locations: if these were previously attached this
427 : // implicitly downgrades them from attached to secondary.
428 500 : let mut changes = Vec::new();
429 500 : for node_id in &self.intent.secondary {
430 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
431 0 : match self.observed.locations.get(node_id) {
432 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
433 : // Nothing to do
434 0 : tracing::info!(%node_id, "Observed configuration already correct.")
435 : }
436 : _ => {
437 : // In all cases other than a matching observed configuration, we will
438 : // reconcile this location.
439 0 : tracing::info!(%node_id, "Observed configuration requires update.");
440 0 : changes.push((*node_id, wanted_conf))
441 : }
442 : }
443 : }
444 :
445 : // Detach any extraneous pageservers that are no longer referenced
446 : // by our intent.
447 500 : let all_pageservers = self.intent.all_pageservers();
448 509 : for node_id in self.observed.locations.keys() {
449 509 : if all_pageservers.contains(node_id) {
450 : // We are only detaching pageservers that aren't used at all.
451 500 : continue;
452 9 : }
453 9 :
454 9 : changes.push((
455 9 : *node_id,
456 9 : LocationConfig {
457 9 : mode: LocationConfigMode::Detached,
458 9 : generation: None,
459 9 : secondary_conf: None,
460 9 : shard_number: self.shard.number.0,
461 9 : shard_count: self.shard.count.0,
462 9 : shard_stripe_size: self.shard.stripe_size.0,
463 9 : tenant_conf: self.config.clone(),
464 9 : },
465 9 : ));
466 : }
467 :
468 509 : for (node_id, conf) in changes {
469 36 : self.location_config(node_id, conf, None).await?;
470 : }
471 :
472 500 : Ok(())
473 501 : }
474 :
475 500 : pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
476 : // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
477 : // destination.
478 500 : if let Some(node_id) = self.intent.attached {
479 500 : let result = self
480 500 : .compute_hook
481 500 : .notify(self.tenant_shard_id, node_id, &self.cancel)
482 20 : .await;
483 500 : if let Err(e) = &result {
484 : // It is up to the caller whether they want to drop out on this error, but they don't have to:
485 : // in general we should avoid letting unavailability of the cloud control plane stop us from
486 : // making progress.
487 0 : tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}");
488 : // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
489 : // needs to retry at some point.
490 0 : self.compute_notify_failure = true;
491 500 : }
492 500 : result
493 : } else {
494 0 : Ok(())
495 : }
496 500 : }
497 : }
498 :
499 1858 : pub(crate) fn attached_location_conf(
500 1858 : generation: Generation,
501 1858 : shard: &ShardIdentity,
502 1858 : config: &TenantConfig,
503 1858 : ) -> LocationConfig {
504 1858 : LocationConfig {
505 1858 : mode: LocationConfigMode::AttachedSingle,
506 1858 : generation: generation.into(),
507 1858 : secondary_conf: None,
508 1858 : shard_number: shard.number.0,
509 1858 : shard_count: shard.count.0,
510 1858 : shard_stripe_size: shard.stripe_size.0,
511 1858 : tenant_conf: config.clone(),
512 1858 : }
513 1858 : }
514 :
515 0 : pub(crate) fn secondary_location_conf(
516 0 : shard: &ShardIdentity,
517 0 : config: &TenantConfig,
518 0 : ) -> LocationConfig {
519 0 : LocationConfig {
520 0 : mode: LocationConfigMode::Secondary,
521 0 : generation: None,
522 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
523 0 : shard_number: shard.number.0,
524 0 : shard_count: shard.count.0,
525 0 : shard_stripe_size: shard.stripe_size.0,
526 0 : tenant_conf: config.clone(),
527 0 : }
528 0 : }
|