Line data Source code
1 : use crate::persistence::Persistence;
2 : use crate::service;
3 : use control_plane::attachment_service::NodeAvailability;
4 : use pageserver_api::models::{
5 : LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
6 : };
7 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
8 : use pageserver_client::mgmt_api;
9 : use std::collections::HashMap;
10 : use std::sync::Arc;
11 : use std::time::Duration;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::generation::Generation;
14 : use utils::id::{NodeId, TimelineId};
15 : use utils::lsn::Lsn;
16 :
17 : use crate::compute_hook::{ComputeHook, NotifyError};
18 : use crate::node::Node;
19 : use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
20 :
21 : /// Object with the lifetime of the background reconcile task that is created
22 : /// for tenants which have a difference between their intent and observed states.
23 : pub(super) struct Reconciler {
24 : /// See [`crate::tenant_state::TenantState`] for the meanings of these fields: they are a snapshot
25 : /// of a tenant's state from when we spawned a reconcile task.
26 : pub(super) tenant_shard_id: TenantShardId,
27 : pub(crate) shard: ShardIdentity,
28 : pub(crate) generation: Generation,
29 : pub(crate) intent: IntentState,
30 : pub(crate) config: TenantConfig,
31 : pub(crate) observed: ObservedState,
32 :
33 : pub(crate) service_config: service::Config,
34 :
35 : /// A snapshot of the pageservers as they were when we were asked
36 : /// to reconcile.
37 : pub(crate) pageservers: Arc<HashMap<NodeId, Node>>,
38 :
39 : /// A hook to notify the running postgres instances when we change the location
40 : /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
41 : /// and guarantee eventual retries.
42 : pub(crate) compute_hook: Arc<ComputeHook>,
43 :
44 : /// To avoid stalling if the cloud control plane is unavailable, we may proceed
45 : /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
46 : /// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry.
47 : pub(crate) compute_notify_failure: bool,
48 :
49 : /// A means to abort background reconciliation: it is essential to
50 : /// call this when something changes in the original TenantState that
51 : /// will make this reconciliation impossible or unnecessary, for
52 : /// example when a pageserver node goes offline, or the PlacementPolicy for
53 : /// the tenant is changed.
54 : pub(crate) cancel: CancellationToken,
55 :
56 : /// Access to persistent storage for updating generation numbers
57 : pub(crate) persistence: Arc<Persistence>,
58 : }
59 :
60 6 : #[derive(thiserror::Error, Debug)]
61 : pub(crate) enum ReconcileError {
62 : #[error(transparent)]
63 : Notify(#[from] NotifyError),
64 : #[error(transparent)]
65 : Other(#[from] anyhow::Error),
66 : }
67 :
68 : impl Reconciler {
69 498 : async fn location_config(
70 498 : &mut self,
71 498 : node_id: NodeId,
72 498 : config: LocationConfig,
73 498 : flush_ms: Option<Duration>,
74 498 : ) -> anyhow::Result<()> {
75 498 : let node = self
76 498 : .pageservers
77 498 : .get(&node_id)
78 498 : .expect("Pageserver may not be removed while referenced");
79 498 :
80 498 : self.observed
81 498 : .locations
82 498 : .insert(node.id, ObservedStateLocation { conf: None });
83 :
84 496 : tracing::info!("location_config({}) calling: {:?}", node_id, config);
85 496 : let client =
86 496 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
87 496 : client
88 496 : .location_config(self.tenant_shard_id, config.clone(), flush_ms)
89 1860 : .await?;
90 493 : tracing::info!("location_config({}) complete: {:?}", node_id, config);
91 :
92 493 : self.observed
93 493 : .locations
94 493 : .insert(node.id, ObservedStateLocation { conf: Some(config) });
95 493 :
96 493 : Ok(())
97 496 : }
98 :
99 493 : async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
100 493 : let destination = if let Some(node_id) = self.intent.attached {
101 493 : match self.observed.locations.get(&node_id) {
102 0 : Some(conf) => {
103 : // We will do a live migration only if the intended destination is not
104 : // currently in an attached state.
105 0 : match &conf.conf {
106 0 : Some(conf) if conf.mode == LocationConfigMode::Secondary => {
107 0 : // Fall through to do a live migration
108 0 : node_id
109 : }
110 : None | Some(_) => {
111 : // Attached or uncertain: don't do a live migration, proceed
112 : // with a general-case reconciliation
113 0 : tracing::info!("maybe_live_migrate: destination is None or attached");
114 0 : return Ok(());
115 : }
116 : }
117 : }
118 : None => {
119 : // Our destination is not attached: maybe live migrate if some other
120 : // node is currently attached. Fall through.
121 493 : node_id
122 : }
123 : }
124 : } else {
125 : // No intent to be attached
126 0 : tracing::info!("maybe_live_migrate: no attached intent");
127 0 : return Ok(());
128 : };
129 :
130 493 : let mut origin = None;
131 498 : for (node_id, state) in &self.observed.locations {
132 5 : if let Some(observed_conf) = &state.conf {
133 0 : if observed_conf.mode == LocationConfigMode::AttachedSingle {
134 0 : let node = self
135 0 : .pageservers
136 0 : .get(node_id)
137 0 : .expect("Nodes may not be removed while referenced");
138 0 : // We will only attempt live migration if the origin is not offline: this
139 0 : // avoids trying to do it while reconciling after responding to an HA failover.
140 0 : if !matches!(node.availability, NodeAvailability::Offline) {
141 0 : origin = Some(*node_id);
142 0 : break;
143 0 : }
144 0 : }
145 5 : }
146 : }
147 :
148 493 : let Some(origin) = origin else {
149 493 : tracing::info!("maybe_live_migrate: no origin found");
150 493 : return Ok(());
151 : };
152 :
153 : // We have an origin and a destination: proceed to do the live migration
154 0 : tracing::info!("Live migrating {}->{}", origin, destination);
155 0 : self.live_migrate(origin, destination).await?;
156 :
157 0 : Ok(())
158 493 : }
159 :
160 0 : async fn get_lsns(
161 0 : &self,
162 0 : tenant_shard_id: TenantShardId,
163 0 : node_id: &NodeId,
164 0 : ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
165 0 : let node = self
166 0 : .pageservers
167 0 : .get(node_id)
168 0 : .expect("Pageserver may not be removed while referenced");
169 0 :
170 0 : let client =
171 0 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
172 :
173 0 : let timelines = client.timeline_list(&tenant_shard_id).await?;
174 0 : Ok(timelines
175 0 : .into_iter()
176 0 : .map(|t| (t.timeline_id, t.last_record_lsn))
177 0 : .collect())
178 0 : }
179 :
180 0 : async fn secondary_download(&self, tenant_shard_id: TenantShardId, node_id: &NodeId) {
181 0 : let node = self
182 0 : .pageservers
183 0 : .get(node_id)
184 0 : .expect("Pageserver may not be removed while referenced");
185 0 :
186 0 : let client =
187 0 : mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
188 0 :
189 0 : match client.tenant_secondary_download(tenant_shard_id).await {
190 0 : Ok(()) => {}
191 : Err(_) => {
192 0 : tracing::info!(" (skipping, destination wasn't in secondary mode)")
193 : }
194 : }
195 0 : }
196 :
197 0 : async fn await_lsn(
198 0 : &self,
199 0 : tenant_shard_id: TenantShardId,
200 0 : pageserver_id: &NodeId,
201 0 : baseline: HashMap<TimelineId, Lsn>,
202 0 : ) -> anyhow::Result<()> {
203 : loop {
204 0 : let latest = match self.get_lsns(tenant_shard_id, pageserver_id).await {
205 0 : Ok(l) => l,
206 0 : Err(e) => {
207 0 : println!(
208 0 : "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})",
209 0 : pageserver_id
210 0 : );
211 0 : std::thread::sleep(Duration::from_millis(500));
212 0 : continue;
213 : }
214 : };
215 :
216 0 : let mut any_behind: bool = false;
217 0 : for (timeline_id, baseline_lsn) in &baseline {
218 0 : match latest.get(timeline_id) {
219 0 : Some(latest_lsn) => {
220 0 : println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
221 0 : if latest_lsn < baseline_lsn {
222 0 : any_behind = true;
223 0 : }
224 : }
225 0 : None => {
226 0 : // Expected timeline isn't yet visible on migration destination.
227 0 : // (IRL we would have to account for timeline deletion, but this
228 0 : // is just test helper)
229 0 : any_behind = true;
230 0 : }
231 : }
232 : }
233 :
234 0 : if !any_behind {
235 0 : println!("✅ LSN caught up. Proceeding...");
236 0 : break;
237 0 : } else {
238 0 : std::thread::sleep(Duration::from_millis(500));
239 0 : }
240 : }
241 :
242 0 : Ok(())
243 0 : }
244 :
245 0 : pub async fn live_migrate(
246 0 : &mut self,
247 0 : origin_ps_id: NodeId,
248 0 : dest_ps_id: NodeId,
249 0 : ) -> anyhow::Result<()> {
250 0 : // `maybe_live_migrate` is responsibble for sanity of inputs
251 0 : assert!(origin_ps_id != dest_ps_id);
252 :
253 0 : fn build_location_config(
254 0 : shard: &ShardIdentity,
255 0 : config: &TenantConfig,
256 0 : mode: LocationConfigMode,
257 0 : generation: Option<Generation>,
258 0 : secondary_conf: Option<LocationConfigSecondary>,
259 0 : ) -> LocationConfig {
260 0 : LocationConfig {
261 0 : mode,
262 0 : generation: generation.map(|g| g.into().unwrap()),
263 0 : secondary_conf,
264 0 : tenant_conf: config.clone(),
265 0 : shard_number: shard.number.0,
266 0 : shard_count: shard.count.0,
267 0 : shard_stripe_size: shard.stripe_size.0,
268 0 : }
269 0 : }
270 :
271 0 : tracing::info!(
272 0 : "🔁 Switching origin pageserver {} to stale mode",
273 0 : origin_ps_id
274 0 : );
275 :
276 : // FIXME: it is incorrect to use self.generation here, we should use the generation
277 : // from the ObservedState of the origin pageserver (it might be older than self.generation)
278 0 : let stale_conf = build_location_config(
279 0 : &self.shard,
280 0 : &self.config,
281 0 : LocationConfigMode::AttachedStale,
282 0 : Some(self.generation),
283 0 : None,
284 0 : );
285 0 : self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10)))
286 0 : .await?;
287 :
288 0 : let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps_id).await?);
289 :
290 : // If we are migrating to a destination that has a secondary location, warm it up first
291 0 : if let Some(destination_conf) = self.observed.locations.get(&dest_ps_id) {
292 0 : if let Some(destination_conf) = &destination_conf.conf {
293 0 : if destination_conf.mode == LocationConfigMode::Secondary {
294 0 : tracing::info!(
295 0 : "🔁 Downloading latest layers to destination pageserver {}",
296 0 : dest_ps_id,
297 0 : );
298 0 : self.secondary_download(self.tenant_shard_id, &dest_ps_id)
299 0 : .await;
300 0 : }
301 0 : }
302 0 : }
303 :
304 : // Increment generation before attaching to new pageserver
305 0 : self.generation = self
306 0 : .persistence
307 0 : .increment_generation(self.tenant_shard_id, dest_ps_id)
308 0 : .await?;
309 :
310 0 : let dest_conf = build_location_config(
311 0 : &self.shard,
312 0 : &self.config,
313 0 : LocationConfigMode::AttachedMulti,
314 0 : Some(self.generation),
315 0 : None,
316 0 : );
317 :
318 0 : tracing::info!("🔁 Attaching to pageserver {}", dest_ps_id);
319 0 : self.location_config(dest_ps_id, dest_conf, None).await?;
320 :
321 0 : if let Some(baseline) = baseline_lsns {
322 0 : tracing::info!("🕑 Waiting for LSN to catch up...");
323 0 : self.await_lsn(self.tenant_shard_id, &dest_ps_id, baseline)
324 0 : .await?;
325 0 : }
326 :
327 0 : tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id);
328 :
329 : // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
330 : // the origin without notifying compute, we will render the tenant unavailable.
331 0 : while let Err(e) = self.compute_notify().await {
332 0 : match e {
333 0 : NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)),
334 : _ => {
335 0 : tracing::warn!(
336 0 : "Live migration blocked by compute notification error, retrying: {e}"
337 0 : );
338 : }
339 : }
340 : }
341 :
342 : // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Single, then
343 : // this location will be deleted in the general case reconciliation that runs after this.
344 0 : let origin_secondary_conf = build_location_config(
345 0 : &self.shard,
346 0 : &self.config,
347 0 : LocationConfigMode::Secondary,
348 0 : None,
349 0 : Some(LocationConfigSecondary { warm: true }),
350 0 : );
351 0 : self.location_config(origin_ps_id, origin_secondary_conf.clone(), None)
352 0 : .await?;
353 : // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
354 : // partway through. In fact, all location conf API calls should be in a wrapper that sets
355 : // the observed state to None, then runs, then sets it to what we wrote.
356 0 : self.observed.locations.insert(
357 0 : origin_ps_id,
358 0 : ObservedStateLocation {
359 0 : conf: Some(origin_secondary_conf),
360 0 : },
361 0 : );
362 0 :
363 0 : println!(
364 0 : "🔁 Switching to AttachedSingle mode on pageserver {}",
365 0 : dest_ps_id
366 0 : );
367 0 : let dest_final_conf = build_location_config(
368 0 : &self.shard,
369 0 : &self.config,
370 0 : LocationConfigMode::AttachedSingle,
371 0 : Some(self.generation),
372 0 : None,
373 0 : );
374 0 : self.location_config(dest_ps_id, dest_final_conf.clone(), None)
375 0 : .await?;
376 0 : self.observed.locations.insert(
377 0 : dest_ps_id,
378 0 : ObservedStateLocation {
379 0 : conf: Some(dest_final_conf),
380 0 : },
381 0 : );
382 0 :
383 0 : println!("✅ Migration complete");
384 0 :
385 0 : Ok(())
386 0 : }
387 :
388 : /// Reconciling a tenant makes API calls to pageservers until the observed state
389 : /// matches the intended state.
390 : ///
391 : /// First we apply special case handling (e.g. for live migrations), and then a
392 : /// general case reconciliation where we walk through the intent by pageserver
393 : /// and call out to the pageserver to apply the desired state.
394 493 : pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
395 493 : // TODO: if any of self.observed is None, call to remote pageservers
396 493 : // to learn correct state.
397 493 :
398 493 : // Special case: live migration
399 493 : self.maybe_live_migrate().await?;
400 :
401 : // If the attached pageserver is not attached, do so now.
402 493 : if let Some(node_id) = self.intent.attached {
403 493 : let mut wanted_conf =
404 493 : attached_location_conf(self.generation, &self.shard, &self.config);
405 493 : match self.observed.locations.get(&node_id) {
406 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
407 : // Nothing to do
408 0 : tracing::info!("Observed configuration already correct.")
409 : }
410 : _ => {
411 : // In all cases other than a matching observed configuration, we will
412 : // reconcile this location. This includes locations with different configurations, as well
413 : // as locations with unknown (None) observed state.
414 493 : self.generation = self
415 493 : .persistence
416 493 : .increment_generation(self.tenant_shard_id, node_id)
417 493 : .await?;
418 493 : wanted_conf.generation = self.generation.into();
419 493 : tracing::info!("Observed configuration requires update.");
420 1841 : self.location_config(node_id, wanted_conf, None).await?;
421 488 : self.compute_notify().await?;
422 : }
423 : }
424 0 : }
425 :
426 : // Configure secondary locations: if these were previously attached this
427 : // implicitly downgrades them from attached to secondary.
428 488 : let mut changes = Vec::new();
429 488 : for node_id in &self.intent.secondary {
430 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
431 0 : match self.observed.locations.get(node_id) {
432 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
433 : // Nothing to do
434 0 : tracing::info!(%node_id, "Observed configuration already correct.")
435 : }
436 : _ => {
437 : // In all cases other than a matching observed configuration, we will
438 : // reconcile this location.
439 0 : tracing::info!(%node_id, "Observed configuration requires update.");
440 0 : changes.push((*node_id, wanted_conf))
441 : }
442 : }
443 : }
444 :
445 : // Detach any extraneous pageservers that are no longer referenced
446 : // by our intent.
447 488 : let all_pageservers = self.intent.all_pageservers();
448 493 : for node_id in self.observed.locations.keys() {
449 493 : if all_pageservers.contains(node_id) {
450 : // We are only detaching pageservers that aren't used at all.
451 488 : continue;
452 5 : }
453 5 :
454 5 : changes.push((
455 5 : *node_id,
456 5 : LocationConfig {
457 5 : mode: LocationConfigMode::Detached,
458 5 : generation: None,
459 5 : secondary_conf: None,
460 5 : shard_number: self.shard.number.0,
461 5 : shard_count: self.shard.count.0,
462 5 : shard_stripe_size: self.shard.stripe_size.0,
463 5 : tenant_conf: self.config.clone(),
464 5 : },
465 5 : ));
466 : }
467 :
468 493 : for (node_id, conf) in changes {
469 19 : self.location_config(node_id, conf, None).await?;
470 : }
471 :
472 488 : Ok(())
473 491 : }
474 :
475 488 : pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
476 : // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
477 : // destination.
478 488 : if let Some(node_id) = self.intent.attached {
479 488 : let result = self
480 488 : .compute_hook
481 488 : .notify(self.tenant_shard_id, node_id, &self.cancel)
482 13 : .await;
483 488 : if let Err(e) = &result {
484 : // It is up to the caller whether they want to drop out on this error, but they don't have to:
485 : // in general we should avoid letting unavailability of the cloud control plane stop us from
486 : // making progress.
487 0 : tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}");
488 : // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
489 : // needs to retry at some point.
490 0 : self.compute_notify_failure = true;
491 488 : }
492 488 : result
493 : } else {
494 0 : Ok(())
495 : }
496 488 : }
497 : }
498 :
499 1824 : pub(crate) fn attached_location_conf(
500 1824 : generation: Generation,
501 1824 : shard: &ShardIdentity,
502 1824 : config: &TenantConfig,
503 1824 : ) -> LocationConfig {
504 1824 : LocationConfig {
505 1824 : mode: LocationConfigMode::AttachedSingle,
506 1824 : generation: generation.into(),
507 1824 : secondary_conf: None,
508 1824 : shard_number: shard.number.0,
509 1824 : shard_count: shard.count.0,
510 1824 : shard_stripe_size: shard.stripe_size.0,
511 1824 : tenant_conf: config.clone(),
512 1824 : }
513 1824 : }
514 :
515 0 : pub(crate) fn secondary_location_conf(
516 0 : shard: &ShardIdentity,
517 0 : config: &TenantConfig,
518 0 : ) -> LocationConfig {
519 0 : LocationConfig {
520 0 : mode: LocationConfigMode::Secondary,
521 0 : generation: None,
522 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
523 0 : shard_number: shard.number.0,
524 0 : shard_count: shard.count.0,
525 0 : shard_stripe_size: shard.stripe_size.0,
526 0 : tenant_conf: config.clone(),
527 0 : }
528 0 : }
|