LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - detach_ancestor.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 0.0 % 464 0
Test Date: 2024-08-02 21:34:27 Functions: 0.0 % 34 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
       4              : use crate::{
       5              :     context::{DownloadBehavior, RequestContext},
       6              :     task_mgr::TaskKind,
       7              :     tenant::{
       8              :         storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
       9              :         Tenant,
      10              :     },
      11              :     virtual_file::{MaybeFatalIo, VirtualFile},
      12              : };
      13              : use pageserver_api::models::detach_ancestor::AncestorDetached;
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::Instrument;
      16              : use utils::{completion, generation::Generation, http::error::ApiError, id::TimelineId, lsn::Lsn};
      17              : 
      18            0 : #[derive(Debug, thiserror::Error)]
      19              : pub(crate) enum Error {
      20              :     #[error("no ancestors")]
      21              :     NoAncestor,
      22              :     #[error("too many ancestors")]
      23              :     TooManyAncestors,
      24              :     #[error("shutting down, please retry later")]
      25              :     ShuttingDown,
      26              :     #[error("flushing failed")]
      27              :     FlushAncestor(#[source] FlushLayerError),
      28              :     #[error("layer download failed")]
      29              :     RewrittenDeltaDownloadFailed(#[source] crate::tenant::storage_layer::layer::DownloadError),
      30              :     #[error("copying LSN prefix locally failed")]
      31              :     CopyDeltaPrefix(#[source] anyhow::Error),
      32              :     #[error("upload rewritten layer")]
      33              :     UploadRewritten(#[source] anyhow::Error),
      34              : 
      35              :     #[error("ancestor is already being detached by: {}", .0)]
      36              :     OtherTimelineDetachOngoing(TimelineId),
      37              : 
      38              :     #[error("remote copying layer failed")]
      39              :     CopyFailed(#[source] anyhow::Error),
      40              : 
      41              :     #[error("unexpected error")]
      42              :     Unexpected(#[source] anyhow::Error),
      43              : 
      44              :     #[error("failpoint: {}", .0)]
      45              :     Failpoint(&'static str),
      46              : }
      47              : 
      48              : impl From<Error> for ApiError {
      49            0 :     fn from(value: Error) -> Self {
      50            0 :         match value {
      51            0 :             e @ Error::NoAncestor => ApiError::Conflict(e.to_string()),
      52              :             // TODO: ApiError converts the anyhow using debug formatting ... just stop using ApiError?
      53            0 :             e @ Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", e)),
      54            0 :             Error::ShuttingDown => ApiError::ShuttingDown,
      55              :             Error::OtherTimelineDetachOngoing(_) => {
      56            0 :                 ApiError::ResourceUnavailable("other timeline detach is already ongoing".into())
      57              :             }
      58              :             // All of these contain shutdown errors, in fact, it's the most common
      59            0 :             e @ Error::FlushAncestor(_)
      60            0 :             | e @ Error::RewrittenDeltaDownloadFailed(_)
      61            0 :             | e @ Error::CopyDeltaPrefix(_)
      62            0 :             | e @ Error::UploadRewritten(_)
      63            0 :             | e @ Error::CopyFailed(_)
      64            0 :             | e @ Error::Unexpected(_)
      65            0 :             | e @ Error::Failpoint(_) => ApiError::InternalServerError(e.into()),
      66              :         }
      67            0 :     }
      68              : }
      69              : 
      70              : impl From<crate::tenant::upload_queue::NotInitialized> for Error {
      71            0 :     fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self {
      72            0 :         // treat all as shutting down signals, even though that is not entirely correct
      73            0 :         // (uninitialized state)
      74            0 :         Error::ShuttingDown
      75            0 :     }
      76              : }
      77              : 
      78              : impl From<FlushLayerError> for Error {
      79            0 :     fn from(value: FlushLayerError) -> Self {
      80            0 :         match value {
      81            0 :             FlushLayerError::Cancelled => Error::ShuttingDown,
      82              :             FlushLayerError::NotRunning(_) => {
      83              :                 // FIXME(#6424): technically statically unreachable right now, given how we never
      84              :                 // drop the sender
      85            0 :                 Error::ShuttingDown
      86              :             }
      87              :             FlushLayerError::CreateImageLayersError(_) | FlushLayerError::Other(_) => {
      88            0 :                 Error::FlushAncestor(value)
      89              :             }
      90              :         }
      91            0 :     }
      92              : }
      93              : 
      94              : pub(crate) enum Progress {
      95              :     Prepared(completion::Completion, PreparedTimelineDetach),
      96              :     Done(AncestorDetached),
      97              : }
      98              : 
      99              : pub(crate) struct PreparedTimelineDetach {
     100              :     layers: Vec<Layer>,
     101              : }
     102              : 
     103              : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
     104              : #[derive(Debug)]
     105              : pub(crate) struct Options {
     106              :     pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
     107              :     pub(crate) copy_concurrency: std::num::NonZeroUsize,
     108              : }
     109              : 
     110              : impl Default for Options {
     111            0 :     fn default() -> Self {
     112            0 :         Self {
     113            0 :             rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
     114            0 :             copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
     115            0 :         }
     116            0 :     }
     117              : }
     118              : 
     119              : /// See [`Timeline::prepare_to_detach_from_ancestor`]
     120            0 : pub(super) async fn prepare(
     121            0 :     detached: &Arc<Timeline>,
     122            0 :     tenant: &Tenant,
     123            0 :     options: Options,
     124            0 :     ctx: &RequestContext,
     125            0 : ) -> Result<Progress, Error> {
     126              :     use Error::*;
     127              : 
     128            0 :     let Some((ancestor, ancestor_lsn)) = detached
     129            0 :         .ancestor_timeline
     130            0 :         .as_ref()
     131            0 :         .map(|tl| (tl.clone(), detached.ancestor_lsn))
     132              :     else {
     133              :         {
     134            0 :             let accessor = detached.remote_client.initialized_upload_queue()?;
     135              : 
     136              :             // we are safe to inspect the latest uploaded, because we can only witness this after
     137              :             // restart is complete and ancestor is no more.
     138            0 :             let latest = accessor.latest_uploaded_index_part();
     139            0 :             if !latest.lineage.is_detached_from_original_ancestor() {
     140            0 :                 return Err(NoAncestor);
     141            0 :             }
     142            0 :         }
     143            0 : 
     144            0 :         // detached has previously been detached; let's inspect each of the current timelines and
     145            0 :         // report back the timelines which have been reparented by our detach
     146            0 :         let mut all_direct_children = tenant
     147            0 :             .timelines
     148            0 :             .lock()
     149            0 :             .unwrap()
     150            0 :             .values()
     151            0 :             .filter(|tl| matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached)))
     152            0 :             .map(|tl| (tl.ancestor_lsn, tl.clone()))
     153            0 :             .collect::<Vec<_>>();
     154            0 : 
     155            0 :         let mut any_shutdown = false;
     156            0 : 
     157            0 :         all_direct_children.retain(
     158            0 :             |(_, tl)| match tl.remote_client.initialized_upload_queue() {
     159            0 :                 Ok(accessor) => accessor
     160            0 :                     .latest_uploaded_index_part()
     161            0 :                     .lineage
     162            0 :                     .is_reparented(),
     163            0 :                 Err(_shutdownalike) => {
     164            0 :                     // not 100% a shutdown, but let's bail early not to give inconsistent results in
     165            0 :                     // sharded enviroment.
     166            0 :                     any_shutdown = true;
     167            0 :                     true
     168              :                 }
     169            0 :             },
     170            0 :         );
     171            0 : 
     172            0 :         if any_shutdown {
     173              :             // it could be one or many being deleted; have client retry
     174            0 :             return Err(Error::ShuttingDown);
     175            0 :         }
     176            0 : 
     177            0 :         let mut reparented = all_direct_children;
     178            0 :         // why this instead of hashset? there is a reason, but I've forgotten it many times.
     179            0 :         //
     180            0 :         // maybe if this was a hashset we would not be able to distinguish some race condition.
     181            0 :         reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id));
     182            0 : 
     183            0 :         return Ok(Progress::Done(AncestorDetached {
     184            0 :             reparented_timelines: reparented
     185            0 :                 .into_iter()
     186            0 :                 .map(|(_, tl)| tl.timeline_id)
     187            0 :                 .collect(),
     188            0 :         }));
     189              :     };
     190              : 
     191            0 :     if !ancestor_lsn.is_valid() {
     192              :         // rare case, probably wouldn't even load
     193            0 :         tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing");
     194            0 :         return Err(NoAncestor);
     195            0 :     }
     196            0 : 
     197            0 :     if ancestor.ancestor_timeline.is_some() {
     198              :         // non-technical requirement; we could flatten N ancestors just as easily but we chose
     199              :         // not to, at least initially
     200            0 :         return Err(TooManyAncestors);
     201            0 :     }
     202            0 : 
     203            0 :     // before we acquire the gate, we must mark the ancestor as having a detach operation
     204            0 :     // ongoing which will block other concurrent detach operations so we don't get to ackward
     205            0 :     // situations where there would be two branches trying to reparent earlier branches.
     206            0 :     let (guard, barrier) = completion::channel();
     207            0 : 
     208            0 :     {
     209            0 :         let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
     210            0 :         if let Some((tl, other)) = guard.as_ref() {
     211            0 :             if !other.is_ready() {
     212            0 :                 return Err(OtherTimelineDetachOngoing(*tl));
     213            0 :             }
     214            0 :         }
     215            0 :         *guard = Some((detached.timeline_id, barrier));
     216              :     }
     217              : 
     218            0 :     let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
     219              : 
     220              :     utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking_pausable");
     221              : 
     222            0 :     fail::fail_point!(
     223            0 :         "timeline-detach-ancestor::before_starting_after_locking",
     224            0 :         |_| Err(Error::Failpoint(
     225            0 :             "timeline-detach-ancestor::before_starting_after_locking"
     226            0 :         ))
     227            0 :     );
     228              : 
     229            0 :     if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
     230            0 :         let span =
     231            0 :             tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
     232            0 :         async {
     233            0 :             let started_at = std::time::Instant::now();
     234            0 :             let freeze_and_flush = ancestor.freeze_and_flush0();
     235            0 :             let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
     236              : 
     237            0 :             let res =
     238            0 :                 tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
     239            0 :                     .await;
     240              : 
     241            0 :             let res = match res {
     242            0 :                 Ok(res) => res,
     243            0 :                 Err(_elapsed) => {
     244            0 :                     tracing::info!("freezing and flushing ancestor is still ongoing");
     245            0 :                     freeze_and_flush.await
     246              :                 }
     247              :             };
     248              : 
     249            0 :             res?;
     250              : 
     251              :             // we do not need to wait for uploads to complete but we do need `struct Layer`,
     252              :             // copying delta prefix is unsupported currently for `InMemoryLayer`.
     253            0 :             tracing::info!(
     254            0 :                 elapsed_ms = started_at.elapsed().as_millis(),
     255            0 :                 "froze and flushed the ancestor"
     256              :             );
     257            0 :             Ok::<_, Error>(())
     258            0 :         }
     259            0 :         .instrument(span)
     260            0 :         .await?;
     261            0 :     }
     262              : 
     263            0 :     let end_lsn = ancestor_lsn + 1;
     264              : 
     265            0 :     let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
     266            0 :         // we do not need to start from our layers, because they can only be layers that come
     267            0 :         // *after* ancestor_lsn
     268            0 :         let layers = tokio::select! {
     269              :             guard = ancestor.layers.read() => guard,
     270              :             _ = detached.cancel.cancelled() => {
     271              :                 return Err(ShuttingDown);
     272              :             }
     273              :             _ = ancestor.cancel.cancelled() => {
     274              :                 return Err(ShuttingDown);
     275              :             }
     276            0 :         };
     277            0 : 
     278            0 :         // between retries, these can change if compaction or gc ran in between. this will mean
     279            0 :         // we have to redo work.
     280            0 :         partition_work(ancestor_lsn, &layers)
     281            0 :     };
     282            0 : 
     283            0 :     // TODO: layers are already sorted by something: use that to determine how much of remote
     284            0 :     // copies are already done.
     285            0 :     tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
     286              : 
     287              :     // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
     288            0 :     let mut new_layers: Vec<Layer> =
     289            0 :         Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
     290            0 : 
     291            0 :     {
     292            0 :         tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
     293              : 
     294            0 :         let mut tasks = tokio::task::JoinSet::new();
     295            0 : 
     296            0 :         let mut wrote_any = false;
     297            0 : 
     298            0 :         let limiter = Arc::new(tokio::sync::Semaphore::new(
     299            0 :             options.rewrite_concurrency.get(),
     300            0 :         ));
     301              : 
     302            0 :         for layer in straddling_branchpoint {
     303            0 :             let limiter = limiter.clone();
     304            0 :             let timeline = detached.clone();
     305            0 :             let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
     306            0 : 
     307            0 :             tasks.spawn(async move {
     308            0 :                 let _permit = limiter.acquire().await;
     309            0 :                 let copied =
     310            0 :                     upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
     311            0 :                         .await?;
     312            0 :                 Ok(copied)
     313            0 :             });
     314            0 :         }
     315              : 
     316            0 :         while let Some(res) = tasks.join_next().await {
     317            0 :             match res {
     318            0 :                 Ok(Ok(Some(copied))) => {
     319            0 :                     wrote_any = true;
     320            0 :                     tracing::info!(layer=%copied, "rewrote and uploaded");
     321            0 :                     new_layers.push(copied);
     322              :                 }
     323            0 :                 Ok(Ok(None)) => {}
     324            0 :                 Ok(Err(e)) => return Err(e),
     325            0 :                 Err(je) => return Err(Unexpected(je.into())),
     326              :             }
     327              :         }
     328              : 
     329              :         // FIXME: the fsync should be mandatory, after both rewrites and copies
     330            0 :         if wrote_any {
     331            0 :             let timeline_dir = VirtualFile::open(
     332            0 :                 &detached
     333            0 :                     .conf
     334            0 :                     .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
     335            0 :                 ctx,
     336            0 :             )
     337            0 :             .await
     338            0 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     339            0 :             timeline_dir
     340            0 :                 .sync_all()
     341            0 :                 .await
     342            0 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     343            0 :         }
     344              :     }
     345              : 
     346            0 :     let mut tasks = tokio::task::JoinSet::new();
     347            0 :     let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
     348              : 
     349            0 :     for adopted in rest_of_historic {
     350            0 :         let limiter = limiter.clone();
     351            0 :         let timeline = detached.clone();
     352            0 : 
     353            0 :         tasks.spawn(
     354            0 :             async move {
     355            0 :                 let _permit = limiter.acquire().await;
     356            0 :                 let owned =
     357            0 :                     remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
     358            0 :                 tracing::info!(layer=%owned, "remote copied");
     359            0 :                 Ok(owned)
     360            0 :             }
     361            0 :             .in_current_span(),
     362            0 :         );
     363            0 :     }
     364              : 
     365            0 :     while let Some(res) = tasks.join_next().await {
     366            0 :         match res {
     367            0 :             Ok(Ok(owned)) => {
     368            0 :                 new_layers.push(owned);
     369            0 :             }
     370            0 :             Ok(Err(failed)) => {
     371            0 :                 return Err(failed);
     372              :             }
     373            0 :             Err(je) => return Err(Unexpected(je.into())),
     374              :         }
     375              :     }
     376              : 
     377              :     // TODO: fsync directory again if we hardlinked something
     378              : 
     379            0 :     let prepared = PreparedTimelineDetach { layers: new_layers };
     380            0 : 
     381            0 :     Ok(Progress::Prepared(guard, prepared))
     382            0 : }
     383              : 
     384            0 : fn partition_work(
     385            0 :     ancestor_lsn: Lsn,
     386            0 :     source_layermap: &LayerManager,
     387            0 : ) -> (usize, Vec<Layer>, Vec<Layer>) {
     388            0 :     let mut straddling_branchpoint = vec![];
     389            0 :     let mut rest_of_historic = vec![];
     390            0 : 
     391            0 :     let mut later_by_lsn = 0;
     392              : 
     393            0 :     for desc in source_layermap.layer_map().iter_historic_layers() {
     394              :         // off by one chances here:
     395              :         // - start is inclusive
     396              :         // - end is exclusive
     397            0 :         if desc.lsn_range.start > ancestor_lsn {
     398            0 :             later_by_lsn += 1;
     399            0 :             continue;
     400            0 :         }
     401              : 
     402            0 :         let target = if desc.lsn_range.start <= ancestor_lsn
     403            0 :             && desc.lsn_range.end > ancestor_lsn
     404            0 :             && desc.is_delta
     405              :         {
     406              :             // TODO: image layer at Lsn optimization
     407            0 :             &mut straddling_branchpoint
     408              :         } else {
     409            0 :             &mut rest_of_historic
     410              :         };
     411              : 
     412            0 :         target.push(source_layermap.get_from_desc(&desc));
     413              :     }
     414              : 
     415            0 :     (later_by_lsn, straddling_branchpoint, rest_of_historic)
     416            0 : }
     417              : 
     418            0 : async fn upload_rewritten_layer(
     419            0 :     end_lsn: Lsn,
     420            0 :     layer: &Layer,
     421            0 :     target: &Arc<Timeline>,
     422            0 :     cancel: &CancellationToken,
     423            0 :     ctx: &RequestContext,
     424            0 : ) -> Result<Option<Layer>, Error> {
     425              :     use Error::UploadRewritten;
     426            0 :     let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
     427              : 
     428            0 :     let Some(copied) = copied else {
     429            0 :         return Ok(None);
     430              :     };
     431              : 
     432              :     // FIXME: better shuttingdown error
     433            0 :     target
     434            0 :         .remote_client
     435            0 :         .upload_layer_file(&copied, cancel)
     436            0 :         .await
     437            0 :         .map_err(UploadRewritten)?;
     438              : 
     439            0 :     Ok(Some(copied.into()))
     440            0 : }
     441              : 
     442            0 : async fn copy_lsn_prefix(
     443            0 :     end_lsn: Lsn,
     444            0 :     layer: &Layer,
     445            0 :     target_timeline: &Arc<Timeline>,
     446            0 :     ctx: &RequestContext,
     447            0 : ) -> Result<Option<ResidentLayer>, Error> {
     448            0 :     use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed, ShuttingDown};
     449            0 : 
     450            0 :     if target_timeline.cancel.is_cancelled() {
     451            0 :         return Err(ShuttingDown);
     452            0 :     }
     453            0 : 
     454            0 :     tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
     455              : 
     456            0 :     let mut writer = DeltaLayerWriter::new(
     457            0 :         target_timeline.conf,
     458            0 :         target_timeline.timeline_id,
     459            0 :         target_timeline.tenant_shard_id,
     460            0 :         layer.layer_desc().key_range.start,
     461            0 :         layer.layer_desc().lsn_range.start..end_lsn,
     462            0 :         ctx,
     463            0 :     )
     464            0 :     .await
     465            0 :     .map_err(CopyDeltaPrefix)?;
     466              : 
     467            0 :     let resident = layer
     468            0 :         .download_and_keep_resident()
     469            0 :         .await
     470              :         // likely shutdown
     471            0 :         .map_err(RewrittenDeltaDownloadFailed)?;
     472              : 
     473            0 :     let records = resident
     474            0 :         .copy_delta_prefix(&mut writer, end_lsn, ctx)
     475            0 :         .await
     476            0 :         .map_err(CopyDeltaPrefix)?;
     477              : 
     478            0 :     drop(resident);
     479            0 : 
     480            0 :     tracing::debug!(%layer, records, "copied records");
     481              : 
     482            0 :     if records == 0 {
     483            0 :         drop(writer);
     484            0 :         // TODO: we might want to store an empty marker in remote storage for this
     485            0 :         // layer so that we will not needlessly walk `layer` on repeated attempts.
     486            0 :         Ok(None)
     487              :     } else {
     488              :         // reuse the key instead of adding more holes between layers by using the real
     489              :         // highest key in the layer.
     490            0 :         let reused_highest_key = layer.layer_desc().key_range.end;
     491            0 :         let copied = writer
     492            0 :             .finish(reused_highest_key, target_timeline, ctx)
     493            0 :             .await
     494            0 :             .map_err(CopyDeltaPrefix)?;
     495              : 
     496            0 :         tracing::debug!(%layer, %copied, "new layer produced");
     497              : 
     498            0 :         Ok(Some(copied))
     499              :     }
     500            0 : }
     501              : 
     502              : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
     503              : /// storage on successful return without the adopted layer being added to `index_part.json`.
     504            0 : async fn remote_copy(
     505            0 :     adopted: &Layer,
     506            0 :     adoptee: &Arc<Timeline>,
     507            0 :     generation: Generation,
     508            0 :     cancel: &CancellationToken,
     509            0 : ) -> Result<Layer, Error> {
     510            0 :     use Error::CopyFailed;
     511            0 : 
     512            0 :     // depending if Layer::keep_resident we could hardlink
     513            0 : 
     514            0 :     let mut metadata = adopted.metadata();
     515            0 :     debug_assert!(metadata.generation <= generation);
     516            0 :     metadata.generation = generation;
     517            0 : 
     518            0 :     let owned = crate::tenant::storage_layer::Layer::for_evicted(
     519            0 :         adoptee.conf,
     520            0 :         adoptee,
     521            0 :         adopted.layer_desc().layer_name(),
     522            0 :         metadata,
     523            0 :     );
     524            0 : 
     525            0 :     // FIXME: better shuttingdown error
     526            0 :     adoptee
     527            0 :         .remote_client
     528            0 :         .copy_timeline_layer(adopted, &owned, cancel)
     529            0 :         .await
     530            0 :         .map(move |()| owned)
     531            0 :         .map_err(CopyFailed)
     532            0 : }
     533              : 
     534              : /// See [`Timeline::complete_detaching_timeline_ancestor`].
     535            0 : pub(super) async fn complete(
     536            0 :     detached: &Arc<Timeline>,
     537            0 :     tenant: &Tenant,
     538            0 :     prepared: PreparedTimelineDetach,
     539            0 :     _ctx: &RequestContext,
     540            0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
     541            0 :     let PreparedTimelineDetach { layers } = prepared;
     542            0 : 
     543            0 :     let ancestor = detached
     544            0 :         .get_ancestor_timeline()
     545            0 :         .expect("must still have a ancestor");
     546            0 :     let ancestor_lsn = detached.get_ancestor_lsn();
     547            0 : 
     548            0 :     // publish the prepared layers before we reparent any of the timelines, so that on restart
     549            0 :     // reparented timelines find layers. also do the actual detaching.
     550            0 :     //
     551            0 :     // if we crash after this operation, we will at least come up having detached a timeline, but
     552            0 :     // we cannot go back and reparent the timelines which would had been reparented in normal
     553            0 :     // execution.
     554            0 :     //
     555            0 :     // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
     556            0 :     // which could give us a completely wrong layer combination.
     557            0 :     detached
     558            0 :         .remote_client
     559            0 :         .schedule_adding_existing_layers_to_index_detach_and_wait(
     560            0 :             &layers,
     561            0 :             (ancestor.timeline_id, ancestor_lsn),
     562            0 :         )
     563            0 :         .await?;
     564              : 
     565            0 :     let mut tasks = tokio::task::JoinSet::new();
     566            0 : 
     567            0 :     // because we are now keeping the slot in progress, it is unlikely that there will be any
     568            0 :     // timeline deletions during this time. if we raced one, then we'll just ignore it.
     569            0 :     tenant
     570            0 :         .timelines
     571            0 :         .lock()
     572            0 :         .unwrap()
     573            0 :         .values()
     574            0 :         .filter_map(|tl| {
     575            0 :             if Arc::ptr_eq(tl, detached) {
     576            0 :                 return None;
     577            0 :             }
     578            0 : 
     579            0 :             if !tl.is_active() {
     580            0 :                 return None;
     581            0 :             }
     582              : 
     583            0 :             let tl_ancestor = tl.ancestor_timeline.as_ref()?;
     584            0 :             let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
     585            0 :             let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
     586            0 : 
     587            0 :             let is_deleting = tl
     588            0 :                 .delete_progress
     589            0 :                 .try_lock()
     590            0 :                 .map(|flow| !flow.is_not_started())
     591            0 :                 .unwrap_or(true);
     592            0 : 
     593            0 :             if is_same && is_earlier && !is_deleting {
     594            0 :                 Some(tl.clone())
     595              :             } else {
     596            0 :                 None
     597              :             }
     598            0 :         })
     599            0 :         .for_each(|timeline| {
     600              :             // important in this scope: we are holding the Tenant::timelines lock
     601            0 :             let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
     602            0 :             let new_parent = detached.timeline_id;
     603            0 : 
     604            0 :             tasks.spawn(
     605            0 :                 async move {
     606            0 :                     let res = timeline
     607            0 :                         .remote_client
     608            0 :                         .schedule_reparenting_and_wait(&new_parent)
     609            0 :                         .await;
     610              : 
     611            0 :                     match res {
     612            0 :                         Ok(()) => Some(timeline),
     613            0 :                         Err(e) => {
     614            0 :                             // with the use of tenant slot, we no longer expect these.
     615            0 :                             tracing::warn!("reparenting failed: {e:#}");
     616            0 :                             None
     617              :                         }
     618              :                     }
     619            0 :                 }
     620            0 :                 .instrument(span),
     621            0 :             );
     622            0 :         });
     623            0 : 
     624            0 :     let reparenting_candidates = tasks.len();
     625            0 :     let mut reparented = Vec::with_capacity(tasks.len());
     626              : 
     627            0 :     while let Some(res) = tasks.join_next().await {
     628            0 :         match res {
     629            0 :             Ok(Some(timeline)) => {
     630            0 :                 tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
     631            0 :                 reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
     632              :             }
     633            0 :             Ok(None) => {
     634            0 :                 // lets just ignore this for now. one or all reparented timelines could had
     635            0 :                 // started deletion, and that is fine.
     636            0 :             }
     637            0 :             Err(je) if je.is_cancelled() => unreachable!("not used"),
     638            0 :             Err(je) if je.is_panic() => {
     639            0 :                 // ignore; it's better to continue with a single reparenting failing (or even
     640            0 :                 // all of them) in order to get to the goal state.
     641            0 :                 //
     642            0 :                 // these timelines will never be reparentable, but they can be always detached as
     643            0 :                 // separate tree roots.
     644            0 :             }
     645            0 :             Err(je) => tracing::error!("unexpected join error: {je:?}"),
     646              :         }
     647              :     }
     648              : 
     649            0 :     if reparenting_candidates != reparented.len() {
     650            0 :         tracing::info!("failed to reparent some candidates");
     651            0 :     }
     652              : 
     653            0 :     reparented.sort_unstable();
     654            0 : 
     655            0 :     let reparented = reparented
     656            0 :         .into_iter()
     657            0 :         .map(|(_, timeline_id)| timeline_id)
     658            0 :         .collect();
     659            0 : 
     660            0 :     Ok(reparented)
     661            0 : }
        

Generated by: LCOV version 2.1-beta