LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - detach_ancestor.rs (source / functions) Coverage Total Hit
Test: f081ec316c96fa98335efd15ef501745aa4f015d.info Lines: 0.0 % 385 0
Test Date: 2024-06-25 15:11:17 Functions: 0.0 % 25 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
       4              : use crate::{
       5              :     context::{DownloadBehavior, RequestContext},
       6              :     task_mgr::TaskKind,
       7              :     tenant::{
       8              :         storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
       9              :         Tenant,
      10              :     },
      11              :     virtual_file::{MaybeFatalIo, VirtualFile},
      12              : };
      13              : use tokio_util::sync::CancellationToken;
      14              : use tracing::Instrument;
      15              : use utils::{completion, generation::Generation, http::error::ApiError, id::TimelineId, lsn::Lsn};
      16              : 
      17            0 : #[derive(Debug, thiserror::Error)]
      18              : pub(crate) enum Error {
      19              :     #[error("no ancestors")]
      20              :     NoAncestor,
      21              :     #[error("too many ancestors")]
      22              :     TooManyAncestors,
      23              :     #[error("shutting down, please retry later")]
      24              :     ShuttingDown,
      25              :     #[error("flushing failed")]
      26              :     FlushAncestor(#[source] FlushLayerError),
      27              :     #[error("layer download failed")]
      28              :     RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
      29              :     #[error("copying LSN prefix locally failed")]
      30              :     CopyDeltaPrefix(#[source] anyhow::Error),
      31              :     #[error("upload rewritten layer")]
      32              :     UploadRewritten(#[source] anyhow::Error),
      33              : 
      34              :     #[error("ancestor is already being detached by: {}", .0)]
      35              :     OtherTimelineDetachOngoing(TimelineId),
      36              : 
      37              :     #[error("remote copying layer failed")]
      38              :     CopyFailed(#[source] anyhow::Error),
      39              : 
      40              :     #[error("unexpected error")]
      41              :     Unexpected(#[source] anyhow::Error),
      42              : }
      43              : 
      44              : impl From<Error> for ApiError {
      45            0 :     fn from(value: Error) -> Self {
      46            0 :         match value {
      47            0 :             e @ Error::NoAncestor => ApiError::Conflict(e.to_string()),
      48              :             // TODO: ApiError converts the anyhow using debug formatting ... just stop using ApiError?
      49            0 :             e @ Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", e)),
      50            0 :             Error::ShuttingDown => ApiError::ShuttingDown,
      51              :             Error::OtherTimelineDetachOngoing(_) => {
      52            0 :                 ApiError::ResourceUnavailable("other timeline detach is already ongoing".into())
      53              :             }
      54              :             // All of these contain shutdown errors, in fact, it's the most common
      55            0 :             e @ Error::FlushAncestor(_)
      56            0 :             | e @ Error::RewrittenDeltaDownloadFailed(_)
      57            0 :             | e @ Error::CopyDeltaPrefix(_)
      58            0 :             | e @ Error::UploadRewritten(_)
      59            0 :             | e @ Error::CopyFailed(_)
      60            0 :             | e @ Error::Unexpected(_) => ApiError::InternalServerError(e.into()),
      61              :         }
      62            0 :     }
      63              : }
      64              : 
      65              : pub(crate) struct PreparedTimelineDetach {
      66              :     layers: Vec<Layer>,
      67              : }
      68              : 
      69              : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
      70              : #[derive(Debug)]
      71              : pub(crate) struct Options {
      72              :     pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
      73              :     pub(crate) copy_concurrency: std::num::NonZeroUsize,
      74              : }
      75              : 
      76              : impl Default for Options {
      77            0 :     fn default() -> Self {
      78            0 :         Self {
      79            0 :             rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
      80            0 :             copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
      81            0 :         }
      82            0 :     }
      83              : }
      84              : 
      85              : /// See [`Timeline::prepare_to_detach_from_ancestor`]
      86            0 : pub(super) async fn prepare(
      87            0 :     detached: &Arc<Timeline>,
      88            0 :     tenant: &Tenant,
      89            0 :     options: Options,
      90            0 :     ctx: &RequestContext,
      91            0 : ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
      92              :     use Error::*;
      93              : 
      94            0 :     let Some((ancestor, ancestor_lsn)) = detached
      95            0 :         .ancestor_timeline
      96            0 :         .as_ref()
      97            0 :         .map(|tl| (tl.clone(), detached.ancestor_lsn))
      98              :     else {
      99              :         // TODO: check if we have already been detached; for this we need to read the stored data
     100              :         // on remote client, for that we need a follow-up which makes uploads cheaper and maintains
     101              :         // a projection of the commited data.
     102              :         //
     103              :         // the error is wrong per openapi
     104            0 :         return Err(NoAncestor);
     105              :     };
     106              : 
     107            0 :     if !ancestor_lsn.is_valid() {
     108            0 :         return Err(NoAncestor);
     109            0 :     }
     110            0 : 
     111            0 :     if ancestor.ancestor_timeline.is_some() {
     112              :         // non-technical requirement; we could flatten N ancestors just as easily but we chose
     113              :         // not to, at least initially
     114            0 :         return Err(TooManyAncestors);
     115            0 :     }
     116            0 : 
     117            0 :     // before we acquire the gate, we must mark the ancestor as having a detach operation
     118            0 :     // ongoing which will block other concurrent detach operations so we don't get to ackward
     119            0 :     // situations where there would be two branches trying to reparent earlier branches.
     120            0 :     let (guard, barrier) = completion::channel();
     121            0 : 
     122            0 :     {
     123            0 :         let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
     124            0 :         if let Some((tl, other)) = guard.as_ref() {
     125            0 :             if !other.is_ready() {
     126            0 :                 return Err(OtherTimelineDetachOngoing(*tl));
     127            0 :             }
     128            0 :         }
     129            0 :         *guard = Some((detached.timeline_id, barrier));
     130              :     }
     131              : 
     132            0 :     let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
     133              : 
     134            0 :     if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
     135            0 :         let span =
     136            0 :             tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
     137            0 :         async {
     138            0 :             let started_at = std::time::Instant::now();
     139            0 :             let freeze_and_flush = ancestor.freeze_and_flush0();
     140            0 :             let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
     141              : 
     142            0 :             let res =
     143            0 :                 tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
     144            0 :                     .await;
     145              : 
     146            0 :             let res = match res {
     147            0 :                 Ok(res) => res,
     148            0 :                 Err(_elapsed) => {
     149            0 :                     tracing::info!("freezing and flushing ancestor is still ongoing");
     150            0 :                     freeze_and_flush.await
     151              :                 }
     152              :             };
     153              : 
     154            0 :             res.map_err(FlushAncestor)?;
     155              : 
     156              :             // we do not need to wait for uploads to complete but we do need `struct Layer`,
     157              :             // copying delta prefix is unsupported currently for `InMemoryLayer`.
     158            0 :             tracing::info!(
     159            0 :                 elapsed_ms = started_at.elapsed().as_millis(),
     160            0 :                 "froze and flushed the ancestor"
     161              :             );
     162            0 :             Ok(())
     163            0 :         }
     164            0 :         .instrument(span)
     165            0 :         .await?;
     166            0 :     }
     167              : 
     168            0 :     let end_lsn = ancestor_lsn + 1;
     169              : 
     170            0 :     let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
     171            0 :         // we do not need to start from our layers, because they can only be layers that come
     172            0 :         // *after* ancestor_lsn
     173            0 :         let layers = tokio::select! {
     174              :             guard = ancestor.layers.read() => guard,
     175              :             _ = detached.cancel.cancelled() => {
     176              :                 return Err(ShuttingDown);
     177              :             }
     178              :             _ = ancestor.cancel.cancelled() => {
     179              :                 return Err(ShuttingDown);
     180              :             }
     181            0 :         };
     182            0 : 
     183            0 :         // between retries, these can change if compaction or gc ran in between. this will mean
     184            0 :         // we have to redo work.
     185            0 :         partition_work(ancestor_lsn, &layers)
     186            0 :     };
     187            0 : 
     188            0 :     // TODO: layers are already sorted by something: use that to determine how much of remote
     189            0 :     // copies are already done.
     190            0 :     tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
     191              : 
     192              :     // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
     193            0 :     let mut new_layers: Vec<Layer> =
     194            0 :         Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
     195            0 : 
     196            0 :     {
     197            0 :         tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
     198              : 
     199            0 :         let mut tasks = tokio::task::JoinSet::new();
     200            0 : 
     201            0 :         let mut wrote_any = false;
     202            0 : 
     203            0 :         let limiter = Arc::new(tokio::sync::Semaphore::new(
     204            0 :             options.rewrite_concurrency.get(),
     205            0 :         ));
     206              : 
     207            0 :         for layer in straddling_branchpoint {
     208            0 :             let limiter = limiter.clone();
     209            0 :             let timeline = detached.clone();
     210            0 :             let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
     211            0 : 
     212            0 :             tasks.spawn(async move {
     213            0 :                 let _permit = limiter.acquire().await;
     214            0 :                 let copied =
     215            0 :                     upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
     216            0 :                         .await?;
     217            0 :                 Ok(copied)
     218            0 :             });
     219            0 :         }
     220              : 
     221            0 :         while let Some(res) = tasks.join_next().await {
     222            0 :             match res {
     223            0 :                 Ok(Ok(Some(copied))) => {
     224            0 :                     wrote_any = true;
     225            0 :                     tracing::info!(layer=%copied, "rewrote and uploaded");
     226            0 :                     new_layers.push(copied);
     227              :                 }
     228            0 :                 Ok(Ok(None)) => {}
     229            0 :                 Ok(Err(e)) => return Err(e),
     230            0 :                 Err(je) => return Err(Unexpected(je.into())),
     231              :             }
     232              :         }
     233              : 
     234              :         // FIXME: the fsync should be mandatory, after both rewrites and copies
     235            0 :         if wrote_any {
     236            0 :             let timeline_dir = VirtualFile::open(
     237            0 :                 &detached
     238            0 :                     .conf
     239            0 :                     .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
     240            0 :                 ctx,
     241            0 :             )
     242            0 :             .await
     243            0 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     244            0 :             timeline_dir
     245            0 :                 .sync_all()
     246            0 :                 .await
     247            0 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     248            0 :         }
     249              :     }
     250              : 
     251            0 :     let mut tasks = tokio::task::JoinSet::new();
     252            0 :     let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
     253              : 
     254            0 :     for adopted in rest_of_historic {
     255            0 :         let limiter = limiter.clone();
     256            0 :         let timeline = detached.clone();
     257            0 : 
     258            0 :         tasks.spawn(
     259            0 :             async move {
     260            0 :                 let _permit = limiter.acquire().await;
     261            0 :                 let owned =
     262            0 :                     remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
     263            0 :                 tracing::info!(layer=%owned, "remote copied");
     264            0 :                 Ok(owned)
     265            0 :             }
     266            0 :             .in_current_span(),
     267            0 :         );
     268            0 :     }
     269              : 
     270            0 :     while let Some(res) = tasks.join_next().await {
     271            0 :         match res {
     272            0 :             Ok(Ok(owned)) => {
     273            0 :                 new_layers.push(owned);
     274            0 :             }
     275            0 :             Ok(Err(failed)) => {
     276            0 :                 return Err(failed);
     277              :             }
     278            0 :             Err(je) => return Err(Unexpected(je.into())),
     279              :         }
     280              :     }
     281              : 
     282              :     // TODO: fsync directory again if we hardlinked something
     283              : 
     284            0 :     let prepared = PreparedTimelineDetach { layers: new_layers };
     285            0 : 
     286            0 :     Ok((guard, prepared))
     287            0 : }
     288              : 
     289            0 : fn partition_work(
     290            0 :     ancestor_lsn: Lsn,
     291            0 :     source_layermap: &LayerManager,
     292            0 : ) -> (usize, Vec<Layer>, Vec<Layer>) {
     293            0 :     let mut straddling_branchpoint = vec![];
     294            0 :     let mut rest_of_historic = vec![];
     295            0 : 
     296            0 :     let mut later_by_lsn = 0;
     297              : 
     298            0 :     for desc in source_layermap.layer_map().iter_historic_layers() {
     299              :         // off by one chances here:
     300              :         // - start is inclusive
     301              :         // - end is exclusive
     302            0 :         if desc.lsn_range.start > ancestor_lsn {
     303            0 :             later_by_lsn += 1;
     304            0 :             continue;
     305            0 :         }
     306              : 
     307            0 :         let target = if desc.lsn_range.start <= ancestor_lsn
     308            0 :             && desc.lsn_range.end > ancestor_lsn
     309            0 :             && desc.is_delta
     310              :         {
     311              :             // TODO: image layer at Lsn optimization
     312            0 :             &mut straddling_branchpoint
     313              :         } else {
     314            0 :             &mut rest_of_historic
     315              :         };
     316              : 
     317            0 :         target.push(source_layermap.get_from_desc(&desc));
     318              :     }
     319              : 
     320            0 :     (later_by_lsn, straddling_branchpoint, rest_of_historic)
     321            0 : }
     322              : 
     323            0 : async fn upload_rewritten_layer(
     324            0 :     end_lsn: Lsn,
     325            0 :     layer: &Layer,
     326            0 :     target: &Arc<Timeline>,
     327            0 :     cancel: &CancellationToken,
     328            0 :     ctx: &RequestContext,
     329            0 : ) -> Result<Option<Layer>, Error> {
     330              :     use Error::UploadRewritten;
     331            0 :     let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
     332              : 
     333            0 :     let Some(copied) = copied else {
     334            0 :         return Ok(None);
     335              :     };
     336              : 
     337              :     // FIXME: better shuttingdown error
     338            0 :     target
     339            0 :         .remote_client
     340            0 :         .upload_layer_file(&copied, cancel)
     341            0 :         .await
     342            0 :         .map_err(UploadRewritten)?;
     343              : 
     344            0 :     Ok(Some(copied.into()))
     345            0 : }
     346              : 
     347            0 : async fn copy_lsn_prefix(
     348            0 :     end_lsn: Lsn,
     349            0 :     layer: &Layer,
     350            0 :     target_timeline: &Arc<Timeline>,
     351            0 :     ctx: &RequestContext,
     352            0 : ) -> Result<Option<ResidentLayer>, Error> {
     353            0 :     use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
     354            0 : 
     355            0 :     tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
     356              : 
     357            0 :     let mut writer = DeltaLayerWriter::new(
     358            0 :         target_timeline.conf,
     359            0 :         target_timeline.timeline_id,
     360            0 :         target_timeline.tenant_shard_id,
     361            0 :         layer.layer_desc().key_range.start,
     362            0 :         layer.layer_desc().lsn_range.start..end_lsn,
     363            0 :         ctx,
     364            0 :     )
     365            0 :     .await
     366            0 :     .map_err(CopyDeltaPrefix)?;
     367              : 
     368            0 :     let resident = layer
     369            0 :         .download_and_keep_resident()
     370            0 :         .await
     371              :         // likely shutdown
     372            0 :         .map_err(RewrittenDeltaDownloadFailed)?;
     373              : 
     374            0 :     let records = resident
     375            0 :         .copy_delta_prefix(&mut writer, end_lsn, ctx)
     376            0 :         .await
     377            0 :         .map_err(CopyDeltaPrefix)?;
     378              : 
     379            0 :     drop(resident);
     380            0 : 
     381            0 :     tracing::debug!(%layer, records, "copied records");
     382              : 
     383            0 :     if records == 0 {
     384            0 :         drop(writer);
     385            0 :         // TODO: we might want to store an empty marker in remote storage for this
     386            0 :         // layer so that we will not needlessly walk `layer` on repeated attempts.
     387            0 :         Ok(None)
     388              :     } else {
     389              :         // reuse the key instead of adding more holes between layers by using the real
     390              :         // highest key in the layer.
     391            0 :         let reused_highest_key = layer.layer_desc().key_range.end;
     392            0 :         let copied = writer
     393            0 :             .finish(reused_highest_key, target_timeline, ctx)
     394            0 :             .await
     395            0 :             .map_err(CopyDeltaPrefix)?;
     396              : 
     397            0 :         tracing::debug!(%layer, %copied, "new layer produced");
     398              : 
     399            0 :         Ok(Some(copied))
     400              :     }
     401            0 : }
     402              : 
     403              : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
     404              : /// storage on successful return without the adopted layer being added to `index_part.json`.
     405            0 : async fn remote_copy(
     406            0 :     adopted: &Layer,
     407            0 :     adoptee: &Arc<Timeline>,
     408            0 :     generation: Generation,
     409            0 :     cancel: &CancellationToken,
     410            0 : ) -> Result<Layer, Error> {
     411            0 :     use Error::CopyFailed;
     412            0 : 
     413            0 :     // depending if Layer::keep_resident we could hardlink
     414            0 : 
     415            0 :     let mut metadata = adopted.metadata();
     416            0 :     debug_assert!(metadata.generation <= generation);
     417            0 :     metadata.generation = generation;
     418            0 : 
     419            0 :     let owned = crate::tenant::storage_layer::Layer::for_evicted(
     420            0 :         adoptee.conf,
     421            0 :         adoptee,
     422            0 :         adopted.layer_desc().layer_name(),
     423            0 :         metadata,
     424            0 :     );
     425            0 : 
     426            0 :     // FIXME: better shuttingdown error
     427            0 :     adoptee
     428            0 :         .remote_client
     429            0 :         .copy_timeline_layer(adopted, &owned, cancel)
     430            0 :         .await
     431            0 :         .map(move |()| owned)
     432            0 :         .map_err(CopyFailed)
     433            0 : }
     434              : 
     435              : /// See [`Timeline::complete_detaching_timeline_ancestor`].
     436            0 : pub(super) async fn complete(
     437            0 :     detached: &Arc<Timeline>,
     438            0 :     tenant: &Tenant,
     439            0 :     prepared: PreparedTimelineDetach,
     440            0 :     _ctx: &RequestContext,
     441            0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
     442            0 :     let PreparedTimelineDetach { layers } = prepared;
     443            0 : 
     444            0 :     let ancestor = detached
     445            0 :         .get_ancestor_timeline()
     446            0 :         .expect("must still have a ancestor");
     447            0 :     let ancestor_lsn = detached.get_ancestor_lsn();
     448            0 : 
     449            0 :     // publish the prepared layers before we reparent any of the timelines, so that on restart
     450            0 :     // reparented timelines find layers. also do the actual detaching.
     451            0 :     //
     452            0 :     // if we crash after this operation, we will at least come up having detached a timeline, but
     453            0 :     // we cannot go back and reparent the timelines which would had been reparented in normal
     454            0 :     // execution.
     455            0 :     //
     456            0 :     // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
     457            0 :     // which could give us a completely wrong layer combination.
     458            0 :     detached
     459            0 :         .remote_client
     460            0 :         .schedule_adding_existing_layers_to_index_detach_and_wait(
     461            0 :             &layers,
     462            0 :             (ancestor.timeline_id, ancestor_lsn),
     463            0 :         )
     464            0 :         .await?;
     465              : 
     466            0 :     let mut tasks = tokio::task::JoinSet::new();
     467            0 : 
     468            0 :     // because we are now keeping the slot in progress, it is unlikely that there will be any
     469            0 :     // timeline deletions during this time. if we raced one, then we'll just ignore it.
     470            0 :     tenant
     471            0 :         .timelines
     472            0 :         .lock()
     473            0 :         .unwrap()
     474            0 :         .values()
     475            0 :         .filter_map(|tl| {
     476            0 :             if Arc::ptr_eq(tl, detached) {
     477            0 :                 return None;
     478            0 :             }
     479            0 : 
     480            0 :             if !tl.is_active() {
     481            0 :                 return None;
     482            0 :             }
     483              : 
     484            0 :             let tl_ancestor = tl.ancestor_timeline.as_ref()?;
     485            0 :             let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
     486            0 :             let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
     487            0 : 
     488            0 :             let is_deleting = tl
     489            0 :                 .delete_progress
     490            0 :                 .try_lock()
     491            0 :                 .map(|flow| !flow.is_not_started())
     492            0 :                 .unwrap_or(true);
     493            0 : 
     494            0 :             if is_same && is_earlier && !is_deleting {
     495            0 :                 Some(tl.clone())
     496              :             } else {
     497            0 :                 None
     498              :             }
     499            0 :         })
     500            0 :         .for_each(|timeline| {
     501              :             // important in this scope: we are holding the Tenant::timelines lock
     502            0 :             let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
     503            0 :             let new_parent = detached.timeline_id;
     504            0 : 
     505            0 :             tasks.spawn(
     506            0 :                 async move {
     507            0 :                     let res = timeline
     508            0 :                         .remote_client
     509            0 :                         .schedule_reparenting_and_wait(&new_parent)
     510            0 :                         .await;
     511              : 
     512            0 :                     match res {
     513            0 :                         Ok(()) => Some(timeline),
     514            0 :                         Err(e) => {
     515            0 :                             // with the use of tenant slot, we no longer expect these.
     516            0 :                             tracing::warn!("reparenting failed: {e:#}");
     517            0 :                             None
     518              :                         }
     519              :                     }
     520            0 :                 }
     521            0 :                 .instrument(span),
     522            0 :             );
     523            0 :         });
     524            0 : 
     525            0 :     let reparenting_candidates = tasks.len();
     526            0 :     let mut reparented = Vec::with_capacity(tasks.len());
     527              : 
     528            0 :     while let Some(res) = tasks.join_next().await {
     529            0 :         match res {
     530            0 :             Ok(Some(timeline)) => {
     531            0 :                 tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
     532            0 :                 reparented.push(timeline.timeline_id);
     533              :             }
     534            0 :             Ok(None) => {
     535            0 :                 // lets just ignore this for now. one or all reparented timelines could had
     536            0 :                 // started deletion, and that is fine.
     537            0 :             }
     538            0 :             Err(je) if je.is_cancelled() => unreachable!("not used"),
     539            0 :             Err(je) if je.is_panic() => {
     540            0 :                 // ignore; it's better to continue with a single reparenting failing (or even
     541            0 :                 // all of them) in order to get to the goal state.
     542            0 :                 //
     543            0 :                 // these timelines will never be reparentable, but they can be always detached as
     544            0 :                 // separate tree roots.
     545            0 :             }
     546            0 :             Err(je) => tracing::error!("unexpected join error: {je:?}"),
     547              :         }
     548              :     }
     549              : 
     550            0 :     if reparenting_candidates != reparented.len() {
     551            0 :         tracing::info!("failed to reparent some candidates");
     552            0 :     }
     553              : 
     554            0 :     Ok(reparented)
     555            0 : }
        

Generated by: LCOV version 2.1-beta