Line data Source code
1 : use std::sync::Arc;
2 :
3 : use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
4 : use crate::{
5 : context::{DownloadBehavior, RequestContext},
6 : task_mgr::TaskKind,
7 : tenant::{
8 : storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
9 : Tenant,
10 : },
11 : virtual_file::{MaybeFatalIo, VirtualFile},
12 : };
13 : use pageserver_api::models::detach_ancestor::AncestorDetached;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::Instrument;
16 : use utils::{completion, generation::Generation, http::error::ApiError, id::TimelineId, lsn::Lsn};
17 :
18 0 : #[derive(Debug, thiserror::Error)]
19 : pub(crate) enum Error {
20 : #[error("no ancestors")]
21 : NoAncestor,
22 : #[error("too many ancestors")]
23 : TooManyAncestors,
24 : #[error("shutting down, please retry later")]
25 : ShuttingDown,
26 : #[error("flushing failed")]
27 : FlushAncestor(#[source] FlushLayerError),
28 : #[error("layer download failed")]
29 : RewrittenDeltaDownloadFailed(#[source] crate::tenant::storage_layer::layer::DownloadError),
30 : #[error("copying LSN prefix locally failed")]
31 : CopyDeltaPrefix(#[source] anyhow::Error),
32 : #[error("upload rewritten layer")]
33 : UploadRewritten(#[source] anyhow::Error),
34 :
35 : #[error("ancestor is already being detached by: {}", .0)]
36 : OtherTimelineDetachOngoing(TimelineId),
37 :
38 : #[error("remote copying layer failed")]
39 : CopyFailed(#[source] anyhow::Error),
40 :
41 : #[error("unexpected error")]
42 : Unexpected(#[source] anyhow::Error),
43 :
44 : #[error("failpoint: {}", .0)]
45 : Failpoint(&'static str),
46 : }
47 :
48 : impl From<Error> for ApiError {
49 0 : fn from(value: Error) -> Self {
50 0 : match value {
51 0 : e @ Error::NoAncestor => ApiError::Conflict(e.to_string()),
52 : // TODO: ApiError converts the anyhow using debug formatting ... just stop using ApiError?
53 0 : e @ Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", e)),
54 0 : Error::ShuttingDown => ApiError::ShuttingDown,
55 : Error::OtherTimelineDetachOngoing(_) => {
56 0 : ApiError::ResourceUnavailable("other timeline detach is already ongoing".into())
57 : }
58 : // All of these contain shutdown errors, in fact, it's the most common
59 0 : e @ Error::FlushAncestor(_)
60 0 : | e @ Error::RewrittenDeltaDownloadFailed(_)
61 0 : | e @ Error::CopyDeltaPrefix(_)
62 0 : | e @ Error::UploadRewritten(_)
63 0 : | e @ Error::CopyFailed(_)
64 0 : | e @ Error::Unexpected(_)
65 0 : | e @ Error::Failpoint(_) => ApiError::InternalServerError(e.into()),
66 : }
67 0 : }
68 : }
69 :
70 : impl From<crate::tenant::upload_queue::NotInitialized> for Error {
71 0 : fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self {
72 0 : // treat all as shutting down signals, even though that is not entirely correct
73 0 : // (uninitialized state)
74 0 : Error::ShuttingDown
75 0 : }
76 : }
77 :
78 : impl From<FlushLayerError> for Error {
79 0 : fn from(value: FlushLayerError) -> Self {
80 0 : match value {
81 0 : FlushLayerError::Cancelled => Error::ShuttingDown,
82 : FlushLayerError::NotRunning(_) => {
83 : // FIXME(#6424): technically statically unreachable right now, given how we never
84 : // drop the sender
85 0 : Error::ShuttingDown
86 : }
87 : FlushLayerError::CreateImageLayersError(_) | FlushLayerError::Other(_) => {
88 0 : Error::FlushAncestor(value)
89 : }
90 : }
91 0 : }
92 : }
93 :
94 : pub(crate) enum Progress {
95 : Prepared(completion::Completion, PreparedTimelineDetach),
96 : Done(AncestorDetached),
97 : }
98 :
99 : pub(crate) struct PreparedTimelineDetach {
100 : layers: Vec<Layer>,
101 : }
102 :
103 : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
104 : #[derive(Debug)]
105 : pub(crate) struct Options {
106 : pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
107 : pub(crate) copy_concurrency: std::num::NonZeroUsize,
108 : }
109 :
110 : impl Default for Options {
111 0 : fn default() -> Self {
112 0 : Self {
113 0 : rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
114 0 : copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
115 0 : }
116 0 : }
117 : }
118 :
119 : /// See [`Timeline::prepare_to_detach_from_ancestor`]
120 0 : pub(super) async fn prepare(
121 0 : detached: &Arc<Timeline>,
122 0 : tenant: &Tenant,
123 0 : options: Options,
124 0 : ctx: &RequestContext,
125 0 : ) -> Result<Progress, Error> {
126 : use Error::*;
127 :
128 0 : let Some((ancestor, ancestor_lsn)) = detached
129 0 : .ancestor_timeline
130 0 : .as_ref()
131 0 : .map(|tl| (tl.clone(), detached.ancestor_lsn))
132 : else {
133 : {
134 0 : let accessor = detached.remote_client.initialized_upload_queue()?;
135 :
136 : // we are safe to inspect the latest uploaded, because we can only witness this after
137 : // restart is complete and ancestor is no more.
138 0 : let latest = accessor.latest_uploaded_index_part();
139 0 : if !latest.lineage.is_detached_from_original_ancestor() {
140 0 : return Err(NoAncestor);
141 0 : }
142 0 : }
143 0 :
144 0 : // detached has previously been detached; let's inspect each of the current timelines and
145 0 : // report back the timelines which have been reparented by our detach
146 0 : let mut all_direct_children = tenant
147 0 : .timelines
148 0 : .lock()
149 0 : .unwrap()
150 0 : .values()
151 0 : .filter(|tl| matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached)))
152 0 : .map(|tl| (tl.ancestor_lsn, tl.clone()))
153 0 : .collect::<Vec<_>>();
154 0 :
155 0 : let mut any_shutdown = false;
156 0 :
157 0 : all_direct_children.retain(
158 0 : |(_, tl)| match tl.remote_client.initialized_upload_queue() {
159 0 : Ok(accessor) => accessor
160 0 : .latest_uploaded_index_part()
161 0 : .lineage
162 0 : .is_reparented(),
163 0 : Err(_shutdownalike) => {
164 0 : // not 100% a shutdown, but let's bail early not to give inconsistent results in
165 0 : // sharded enviroment.
166 0 : any_shutdown = true;
167 0 : true
168 : }
169 0 : },
170 0 : );
171 0 :
172 0 : if any_shutdown {
173 : // it could be one or many being deleted; have client retry
174 0 : return Err(Error::ShuttingDown);
175 0 : }
176 0 :
177 0 : let mut reparented = all_direct_children;
178 0 : // why this instead of hashset? there is a reason, but I've forgotten it many times.
179 0 : //
180 0 : // maybe if this was a hashset we would not be able to distinguish some race condition.
181 0 : reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id));
182 0 :
183 0 : return Ok(Progress::Done(AncestorDetached {
184 0 : reparented_timelines: reparented
185 0 : .into_iter()
186 0 : .map(|(_, tl)| tl.timeline_id)
187 0 : .collect(),
188 0 : }));
189 : };
190 :
191 0 : if !ancestor_lsn.is_valid() {
192 : // rare case, probably wouldn't even load
193 0 : tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing");
194 0 : return Err(NoAncestor);
195 0 : }
196 0 :
197 0 : if ancestor.ancestor_timeline.is_some() {
198 : // non-technical requirement; we could flatten N ancestors just as easily but we chose
199 : // not to, at least initially
200 0 : return Err(TooManyAncestors);
201 0 : }
202 0 :
203 0 : // before we acquire the gate, we must mark the ancestor as having a detach operation
204 0 : // ongoing which will block other concurrent detach operations so we don't get to ackward
205 0 : // situations where there would be two branches trying to reparent earlier branches.
206 0 : let (guard, barrier) = completion::channel();
207 0 :
208 0 : {
209 0 : let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
210 0 : if let Some((tl, other)) = guard.as_ref() {
211 0 : if !other.is_ready() {
212 0 : return Err(OtherTimelineDetachOngoing(*tl));
213 0 : }
214 0 : }
215 0 : *guard = Some((detached.timeline_id, barrier));
216 : }
217 :
218 0 : let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
219 :
220 : utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable");
221 :
222 0 : fail::fail_point!(
223 0 : "timeline-detach-ancestor::before_starting_after_locking",
224 0 : |_| Err(Error::Failpoint(
225 0 : "timeline-detach-ancestor::before_starting_after_locking"
226 0 : ))
227 0 : );
228 :
229 0 : if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
230 0 : let span =
231 0 : tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
232 0 : async {
233 0 : let started_at = std::time::Instant::now();
234 0 : let freeze_and_flush = ancestor.freeze_and_flush0();
235 0 : let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
236 :
237 0 : let res =
238 0 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
239 0 : .await;
240 :
241 0 : let res = match res {
242 0 : Ok(res) => res,
243 0 : Err(_elapsed) => {
244 0 : tracing::info!("freezing and flushing ancestor is still ongoing");
245 0 : freeze_and_flush.await
246 : }
247 : };
248 :
249 0 : res?;
250 :
251 : // we do not need to wait for uploads to complete but we do need `struct Layer`,
252 : // copying delta prefix is unsupported currently for `InMemoryLayer`.
253 0 : tracing::info!(
254 0 : elapsed_ms = started_at.elapsed().as_millis(),
255 0 : "froze and flushed the ancestor"
256 : );
257 0 : Ok::<_, Error>(())
258 0 : }
259 0 : .instrument(span)
260 0 : .await?;
261 0 : }
262 :
263 0 : let end_lsn = ancestor_lsn + 1;
264 :
265 0 : let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
266 0 : // we do not need to start from our layers, because they can only be layers that come
267 0 : // *after* ancestor_lsn
268 0 : let layers = tokio::select! {
269 : guard = ancestor.layers.read() => guard,
270 : _ = detached.cancel.cancelled() => {
271 : return Err(ShuttingDown);
272 : }
273 : _ = ancestor.cancel.cancelled() => {
274 : return Err(ShuttingDown);
275 : }
276 0 : };
277 0 :
278 0 : // between retries, these can change if compaction or gc ran in between. this will mean
279 0 : // we have to redo work.
280 0 : partition_work(ancestor_lsn, &layers)
281 0 : };
282 0 :
283 0 : // TODO: layers are already sorted by something: use that to determine how much of remote
284 0 : // copies are already done.
285 0 : tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
286 :
287 : // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
288 0 : let mut new_layers: Vec<Layer> =
289 0 : Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
290 0 :
291 0 : {
292 0 : tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
293 :
294 0 : let mut tasks = tokio::task::JoinSet::new();
295 0 :
296 0 : let mut wrote_any = false;
297 0 :
298 0 : let limiter = Arc::new(tokio::sync::Semaphore::new(
299 0 : options.rewrite_concurrency.get(),
300 0 : ));
301 :
302 0 : for layer in straddling_branchpoint {
303 0 : let limiter = limiter.clone();
304 0 : let timeline = detached.clone();
305 0 : let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
306 0 :
307 0 : tasks.spawn(async move {
308 0 : let _permit = limiter.acquire().await;
309 0 : let copied =
310 0 : upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
311 0 : .await?;
312 0 : Ok(copied)
313 0 : });
314 0 : }
315 :
316 0 : while let Some(res) = tasks.join_next().await {
317 0 : match res {
318 0 : Ok(Ok(Some(copied))) => {
319 0 : wrote_any = true;
320 0 : tracing::info!(layer=%copied, "rewrote and uploaded");
321 0 : new_layers.push(copied);
322 : }
323 0 : Ok(Ok(None)) => {}
324 0 : Ok(Err(e)) => return Err(e),
325 0 : Err(je) => return Err(Unexpected(je.into())),
326 : }
327 : }
328 :
329 : // FIXME: the fsync should be mandatory, after both rewrites and copies
330 0 : if wrote_any {
331 0 : let timeline_dir = VirtualFile::open(
332 0 : &detached
333 0 : .conf
334 0 : .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
335 0 : ctx,
336 0 : )
337 0 : .await
338 0 : .fatal_err("VirtualFile::open for timeline dir fsync");
339 0 : timeline_dir
340 0 : .sync_all()
341 0 : .await
342 0 : .fatal_err("VirtualFile::sync_all timeline dir");
343 0 : }
344 : }
345 :
346 0 : let mut tasks = tokio::task::JoinSet::new();
347 0 : let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
348 :
349 0 : for adopted in rest_of_historic {
350 0 : let limiter = limiter.clone();
351 0 : let timeline = detached.clone();
352 0 :
353 0 : tasks.spawn(
354 0 : async move {
355 0 : let _permit = limiter.acquire().await;
356 0 : let owned =
357 0 : remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
358 0 : tracing::info!(layer=%owned, "remote copied");
359 0 : Ok(owned)
360 0 : }
361 0 : .in_current_span(),
362 0 : );
363 0 : }
364 :
365 0 : while let Some(res) = tasks.join_next().await {
366 0 : match res {
367 0 : Ok(Ok(owned)) => {
368 0 : new_layers.push(owned);
369 0 : }
370 0 : Ok(Err(failed)) => {
371 0 : return Err(failed);
372 : }
373 0 : Err(je) => return Err(Unexpected(je.into())),
374 : }
375 : }
376 :
377 : // TODO: fsync directory again if we hardlinked something
378 :
379 0 : let prepared = PreparedTimelineDetach { layers: new_layers };
380 0 :
381 0 : Ok(Progress::Prepared(guard, prepared))
382 0 : }
383 :
384 0 : fn partition_work(
385 0 : ancestor_lsn: Lsn,
386 0 : source_layermap: &LayerManager,
387 0 : ) -> (usize, Vec<Layer>, Vec<Layer>) {
388 0 : let mut straddling_branchpoint = vec![];
389 0 : let mut rest_of_historic = vec![];
390 0 :
391 0 : let mut later_by_lsn = 0;
392 :
393 0 : for desc in source_layermap.layer_map().iter_historic_layers() {
394 : // off by one chances here:
395 : // - start is inclusive
396 : // - end is exclusive
397 0 : if desc.lsn_range.start > ancestor_lsn {
398 0 : later_by_lsn += 1;
399 0 : continue;
400 0 : }
401 :
402 0 : let target = if desc.lsn_range.start <= ancestor_lsn
403 0 : && desc.lsn_range.end > ancestor_lsn
404 0 : && desc.is_delta
405 : {
406 : // TODO: image layer at Lsn optimization
407 0 : &mut straddling_branchpoint
408 : } else {
409 0 : &mut rest_of_historic
410 : };
411 :
412 0 : target.push(source_layermap.get_from_desc(&desc));
413 : }
414 :
415 0 : (later_by_lsn, straddling_branchpoint, rest_of_historic)
416 0 : }
417 :
418 0 : async fn upload_rewritten_layer(
419 0 : end_lsn: Lsn,
420 0 : layer: &Layer,
421 0 : target: &Arc<Timeline>,
422 0 : cancel: &CancellationToken,
423 0 : ctx: &RequestContext,
424 0 : ) -> Result<Option<Layer>, Error> {
425 : use Error::UploadRewritten;
426 0 : let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
427 :
428 0 : let Some(copied) = copied else {
429 0 : return Ok(None);
430 : };
431 :
432 : // FIXME: better shuttingdown error
433 0 : target
434 0 : .remote_client
435 0 : .upload_layer_file(&copied, cancel)
436 0 : .await
437 0 : .map_err(UploadRewritten)?;
438 :
439 0 : Ok(Some(copied.into()))
440 0 : }
441 :
442 0 : async fn copy_lsn_prefix(
443 0 : end_lsn: Lsn,
444 0 : layer: &Layer,
445 0 : target_timeline: &Arc<Timeline>,
446 0 : ctx: &RequestContext,
447 0 : ) -> Result<Option<ResidentLayer>, Error> {
448 0 : use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed, ShuttingDown};
449 0 :
450 0 : if target_timeline.cancel.is_cancelled() {
451 0 : return Err(ShuttingDown);
452 0 : }
453 0 :
454 0 : tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
455 :
456 0 : let mut writer = DeltaLayerWriter::new(
457 0 : target_timeline.conf,
458 0 : target_timeline.timeline_id,
459 0 : target_timeline.tenant_shard_id,
460 0 : layer.layer_desc().key_range.start,
461 0 : layer.layer_desc().lsn_range.start..end_lsn,
462 0 : ctx,
463 0 : )
464 0 : .await
465 0 : .map_err(CopyDeltaPrefix)?;
466 :
467 0 : let resident = layer
468 0 : .download_and_keep_resident()
469 0 : .await
470 : // likely shutdown
471 0 : .map_err(RewrittenDeltaDownloadFailed)?;
472 :
473 0 : let records = resident
474 0 : .copy_delta_prefix(&mut writer, end_lsn, ctx)
475 0 : .await
476 0 : .map_err(CopyDeltaPrefix)?;
477 :
478 0 : drop(resident);
479 0 :
480 0 : tracing::debug!(%layer, records, "copied records");
481 :
482 0 : if records == 0 {
483 0 : drop(writer);
484 0 : // TODO: we might want to store an empty marker in remote storage for this
485 0 : // layer so that we will not needlessly walk `layer` on repeated attempts.
486 0 : Ok(None)
487 : } else {
488 : // reuse the key instead of adding more holes between layers by using the real
489 : // highest key in the layer.
490 0 : let reused_highest_key = layer.layer_desc().key_range.end;
491 0 : let copied = writer
492 0 : .finish(reused_highest_key, target_timeline, ctx)
493 0 : .await
494 0 : .map_err(CopyDeltaPrefix)?;
495 :
496 0 : tracing::debug!(%layer, %copied, "new layer produced");
497 :
498 0 : Ok(Some(copied))
499 : }
500 0 : }
501 :
502 : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
503 : /// storage on successful return without the adopted layer being added to `index_part.json`.
504 0 : async fn remote_copy(
505 0 : adopted: &Layer,
506 0 : adoptee: &Arc<Timeline>,
507 0 : generation: Generation,
508 0 : cancel: &CancellationToken,
509 0 : ) -> Result<Layer, Error> {
510 0 : use Error::CopyFailed;
511 0 :
512 0 : // depending if Layer::keep_resident we could hardlink
513 0 :
514 0 : let mut metadata = adopted.metadata();
515 0 : debug_assert!(metadata.generation <= generation);
516 0 : metadata.generation = generation;
517 0 :
518 0 : let owned = crate::tenant::storage_layer::Layer::for_evicted(
519 0 : adoptee.conf,
520 0 : adoptee,
521 0 : adopted.layer_desc().layer_name(),
522 0 : metadata,
523 0 : );
524 0 :
525 0 : // FIXME: better shuttingdown error
526 0 : adoptee
527 0 : .remote_client
528 0 : .copy_timeline_layer(adopted, &owned, cancel)
529 0 : .await
530 0 : .map(move |()| owned)
531 0 : .map_err(CopyFailed)
532 0 : }
533 :
534 : /// See [`Timeline::complete_detaching_timeline_ancestor`].
535 0 : pub(super) async fn complete(
536 0 : detached: &Arc<Timeline>,
537 0 : tenant: &Tenant,
538 0 : prepared: PreparedTimelineDetach,
539 0 : _ctx: &RequestContext,
540 0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
541 0 : let PreparedTimelineDetach { layers } = prepared;
542 0 :
543 0 : let ancestor = detached
544 0 : .get_ancestor_timeline()
545 0 : .expect("must still have a ancestor");
546 0 : let ancestor_lsn = detached.get_ancestor_lsn();
547 0 :
548 0 : // publish the prepared layers before we reparent any of the timelines, so that on restart
549 0 : // reparented timelines find layers. also do the actual detaching.
550 0 : //
551 0 : // if we crash after this operation, we will at least come up having detached a timeline, but
552 0 : // we cannot go back and reparent the timelines which would had been reparented in normal
553 0 : // execution.
554 0 : //
555 0 : // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
556 0 : // which could give us a completely wrong layer combination.
557 0 : detached
558 0 : .remote_client
559 0 : .schedule_adding_existing_layers_to_index_detach_and_wait(
560 0 : &layers,
561 0 : (ancestor.timeline_id, ancestor_lsn),
562 0 : )
563 0 : .await?;
564 :
565 0 : let mut tasks = tokio::task::JoinSet::new();
566 0 :
567 0 : // because we are now keeping the slot in progress, it is unlikely that there will be any
568 0 : // timeline deletions during this time. if we raced one, then we'll just ignore it.
569 0 : tenant
570 0 : .timelines
571 0 : .lock()
572 0 : .unwrap()
573 0 : .values()
574 0 : .filter_map(|tl| {
575 0 : if Arc::ptr_eq(tl, detached) {
576 0 : return None;
577 0 : }
578 0 :
579 0 : if !tl.is_active() {
580 0 : return None;
581 0 : }
582 :
583 0 : let tl_ancestor = tl.ancestor_timeline.as_ref()?;
584 0 : let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
585 0 : let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
586 0 :
587 0 : let is_deleting = tl
588 0 : .delete_progress
589 0 : .try_lock()
590 0 : .map(|flow| !flow.is_not_started())
591 0 : .unwrap_or(true);
592 0 :
593 0 : if is_same && is_earlier && !is_deleting {
594 0 : Some(tl.clone())
595 : } else {
596 0 : None
597 : }
598 0 : })
599 0 : .for_each(|timeline| {
600 : // important in this scope: we are holding the Tenant::timelines lock
601 0 : let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
602 0 : let new_parent = detached.timeline_id;
603 0 :
604 0 : tasks.spawn(
605 0 : async move {
606 0 : let res = timeline
607 0 : .remote_client
608 0 : .schedule_reparenting_and_wait(&new_parent)
609 0 : .await;
610 :
611 0 : match res {
612 0 : Ok(()) => Some(timeline),
613 0 : Err(e) => {
614 0 : // with the use of tenant slot, we no longer expect these.
615 0 : tracing::warn!("reparenting failed: {e:#}");
616 0 : None
617 : }
618 : }
619 0 : }
620 0 : .instrument(span),
621 0 : );
622 0 : });
623 0 :
624 0 : let reparenting_candidates = tasks.len();
625 0 : let mut reparented = Vec::with_capacity(tasks.len());
626 :
627 0 : while let Some(res) = tasks.join_next().await {
628 0 : match res {
629 0 : Ok(Some(timeline)) => {
630 0 : tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
631 0 : reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
632 : }
633 0 : Ok(None) => {
634 0 : // lets just ignore this for now. one or all reparented timelines could had
635 0 : // started deletion, and that is fine.
636 0 : }
637 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
638 0 : Err(je) if je.is_panic() => {
639 0 : // ignore; it's better to continue with a single reparenting failing (or even
640 0 : // all of them) in order to get to the goal state.
641 0 : //
642 0 : // these timelines will never be reparentable, but they can be always detached as
643 0 : // separate tree roots.
644 0 : }
645 0 : Err(je) => tracing::error!("unexpected join error: {je:?}"),
646 : }
647 : }
648 :
649 0 : if reparenting_candidates != reparented.len() {
650 0 : tracing::info!("failed to reparent some candidates");
651 0 : }
652 :
653 0 : reparented.sort_unstable();
654 0 :
655 0 : let reparented = reparented
656 0 : .into_iter()
657 0 : .map(|(_, timeline_id)| timeline_id)
658 0 : .collect();
659 0 :
660 0 : Ok(reparented)
661 0 : }
|