LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - detach_ancestor.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 806 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 77 0

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::sync::Arc;
       3              : 
       4              : use anyhow::Context;
       5              : use bytes::Bytes;
       6              : use http_utils::error::ApiError;
       7              : use pageserver_api::key::Key;
       8              : use pageserver_api::keyspace::KeySpace;
       9              : use pageserver_api::models::DetachBehavior;
      10              : use pageserver_api::models::detach_ancestor::AncestorDetached;
      11              : use pageserver_api::shard::ShardIdentity;
      12              : use pageserver_compaction::helpers::overlaps_with;
      13              : use tokio::sync::Semaphore;
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::Instrument;
      16              : use utils::completion;
      17              : use utils::generation::Generation;
      18              : use utils::id::TimelineId;
      19              : use utils::lsn::Lsn;
      20              : use utils::sync::gate::GateError;
      21              : 
      22              : use super::layer_manager::{LayerManager, LayerManagerLockHolder};
      23              : use super::{FlushLayerError, Timeline};
      24              : use crate::context::{DownloadBehavior, RequestContext};
      25              : use crate::task_mgr::TaskKind;
      26              : use crate::tenant::TenantShard;
      27              : use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
      28              : use crate::tenant::storage_layer::layer::local_layer_path;
      29              : use crate::tenant::storage_layer::{
      30              :     AsLayerDesc as _, DeltaLayerWriter, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer,
      31              :     ValuesReconstructState,
      32              : };
      33              : use crate::tenant::timeline::VersionedKeySpaceQuery;
      34              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      35              : 
      36              : #[derive(Debug, thiserror::Error)]
      37              : pub(crate) enum Error {
      38              :     #[error("no ancestors")]
      39              :     NoAncestor,
      40              : 
      41              :     #[error("too many ancestors")]
      42              :     TooManyAncestors,
      43              : 
      44              :     #[error("ancestor is not empty")]
      45              :     AncestorNotEmpty,
      46              : 
      47              :     #[error("shutting down, please retry later")]
      48              :     ShuttingDown,
      49              : 
      50              :     #[error("archived: {}", .0)]
      51              :     Archived(TimelineId),
      52              : 
      53              :     #[error(transparent)]
      54              :     NotFound(crate::tenant::GetTimelineError),
      55              : 
      56              :     #[error("failed to reparent all candidate timelines, please retry")]
      57              :     FailedToReparentAll,
      58              : 
      59              :     #[error("ancestor is already being detached by: {}", .0)]
      60              :     OtherTimelineDetachOngoing(TimelineId),
      61              : 
      62              :     #[error("preparing to timeline ancestor detach failed")]
      63              :     Prepare(#[source] anyhow::Error),
      64              : 
      65              :     #[error("detaching and reparenting failed")]
      66              :     DetachReparent(#[source] anyhow::Error),
      67              : 
      68              :     #[error("completing ancestor detach failed")]
      69              :     Complete(#[source] anyhow::Error),
      70              : 
      71              :     #[error("failpoint: {}", .0)]
      72              :     Failpoint(&'static str),
      73              : }
      74              : 
      75              : impl Error {
      76              :     /// Try to catch cancellation from within the `anyhow::Error`, or wrap the anyhow as the given
      77              :     /// variant or fancier `or_else`.
      78            0 :     fn launder<F>(e: anyhow::Error, or_else: F) -> Error
      79            0 :     where
      80            0 :         F: Fn(anyhow::Error) -> Error,
      81              :     {
      82              :         use remote_storage::TimeoutOrCancel;
      83              : 
      84              :         use crate::tenant::remote_timeline_client::WaitCompletionError;
      85              :         use crate::tenant::upload_queue::NotInitialized;
      86              : 
      87            0 :         if e.is::<NotInitialized>()
      88            0 :             || TimeoutOrCancel::caused_by_cancel(&e)
      89            0 :             || e.downcast_ref::<remote_storage::DownloadError>()
      90            0 :                 .is_some_and(|e| e.is_cancelled())
      91            0 :             || e.is::<WaitCompletionError>()
      92              :         {
      93            0 :             Error::ShuttingDown
      94              :         } else {
      95            0 :             or_else(e)
      96              :         }
      97            0 :     }
      98              : }
      99              : 
     100              : impl From<Error> for ApiError {
     101            0 :     fn from(value: Error) -> Self {
     102            0 :         match value {
     103            0 :             Error::NoAncestor => ApiError::Conflict(value.to_string()),
     104              :             Error::TooManyAncestors | Error::AncestorNotEmpty => {
     105            0 :                 ApiError::BadRequest(anyhow::anyhow!("{value}"))
     106              :             }
     107            0 :             Error::ShuttingDown => ApiError::ShuttingDown,
     108            0 :             Error::Archived(_) => ApiError::BadRequest(anyhow::anyhow!("{value}")),
     109              :             Error::OtherTimelineDetachOngoing(_) | Error::FailedToReparentAll => {
     110            0 :                 ApiError::ResourceUnavailable(value.to_string().into())
     111              :             }
     112            0 :             Error::NotFound(e) => ApiError::from(e),
     113              :             // these variants should have no cancellation errors because of Error::launder
     114              :             Error::Prepare(_)
     115              :             | Error::DetachReparent(_)
     116              :             | Error::Complete(_)
     117            0 :             | Error::Failpoint(_) => ApiError::InternalServerError(value.into()),
     118              :         }
     119            0 :     }
     120              : }
     121              : 
     122              : impl From<crate::tenant::upload_queue::NotInitialized> for Error {
     123            0 :     fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self {
     124              :         // treat all as shutting down signals, even though that is not entirely correct
     125              :         // (uninitialized state)
     126            0 :         Error::ShuttingDown
     127            0 :     }
     128              : }
     129              : impl From<super::layer_manager::Shutdown> for Error {
     130            0 :     fn from(_: super::layer_manager::Shutdown) -> Self {
     131            0 :         Error::ShuttingDown
     132            0 :     }
     133              : }
     134              : 
     135              : pub(crate) enum Progress {
     136              :     Prepared(Attempt, PreparedTimelineDetach),
     137              :     Done(AncestorDetached),
     138              : }
     139              : 
     140              : pub(crate) struct PreparedTimelineDetach {
     141              :     layers: Vec<Layer>,
     142              : }
     143              : 
     144              : // TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
     145              : #[derive(Debug)]
     146              : pub(crate) struct Options {
     147              :     pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
     148              :     pub(crate) copy_concurrency: std::num::NonZeroUsize,
     149              : }
     150              : 
     151              : impl Default for Options {
     152            0 :     fn default() -> Self {
     153            0 :         Self {
     154            0 :             rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
     155            0 :             copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
     156            0 :         }
     157            0 :     }
     158              : }
     159              : 
     160              : /// Represents an across tenant reset exclusive single attempt to detach ancestor.
     161              : #[derive(Debug)]
     162              : pub(crate) struct Attempt {
     163              :     pub(crate) timeline_id: TimelineId,
     164              :     pub(crate) ancestor_timeline_id: TimelineId,
     165              :     pub(crate) ancestor_lsn: Lsn,
     166              :     _guard: completion::Completion,
     167              :     gate_entered: Option<utils::sync::gate::GateGuard>,
     168              : }
     169              : 
     170              : impl Attempt {
     171            0 :     pub(crate) fn before_reset_tenant(&mut self) {
     172            0 :         let taken = self.gate_entered.take();
     173            0 :         assert!(taken.is_some());
     174            0 :     }
     175              : 
     176            0 :     pub(crate) fn new_barrier(&self) -> completion::Barrier {
     177            0 :         self._guard.barrier()
     178            0 :     }
     179              : }
     180              : 
     181            0 : pub(crate) async fn generate_tombstone_image_layer(
     182            0 :     detached: &Arc<Timeline>,
     183            0 :     ancestor: &Arc<Timeline>,
     184            0 :     ancestor_lsn: Lsn,
     185            0 :     historic_layers_to_copy: &Vec<Layer>,
     186            0 :     ctx: &RequestContext,
     187            0 : ) -> Result<Option<ResidentLayer>, Error> {
     188            0 :     tracing::info!(
     189            0 :         "removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
     190              :     );
     191            0 :     let io_concurrency = IoConcurrency::spawn_from_conf(
     192            0 :         detached.conf.get_vectored_concurrent_io,
     193            0 :         detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
     194              :     );
     195            0 :     let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
     196              :     // Directly use `get_vectored_impl` to skip the max_vectored_read_key limit check. Note that the keyspace should
     197              :     // not contain too many keys, otherwise this takes a lot of memory. Currently we limit it to 10k keys in the compute.
     198            0 :     let key_range = Key::sparse_non_inherited_keyspace();
     199              :     // avoid generating a "future layer" which will then be removed
     200            0 :     let image_lsn = ancestor_lsn;
     201              : 
     202              :     {
     203            0 :         for layer in historic_layers_to_copy {
     204            0 :             let desc = layer.layer_desc();
     205            0 :             if !desc.is_delta
     206            0 :                 && desc.lsn_range.start == image_lsn
     207            0 :                 && overlaps_with(&key_range, &desc.key_range)
     208              :             {
     209            0 :                 tracing::info!(
     210            0 :                     layer=%layer, "will copy tombstone from ancestor instead of creating a new one"
     211              :                 );
     212              : 
     213            0 :                 return Ok(None);
     214            0 :             }
     215              :         }
     216              : 
     217            0 :         let layers = detached
     218            0 :             .layers
     219            0 :             .read(LayerManagerLockHolder::DetachAncestor)
     220            0 :             .await;
     221            0 :         for layer in layers.all_persistent_layers() {
     222            0 :             if !layer.is_delta
     223            0 :                 && layer.lsn_range.start == image_lsn
     224            0 :                 && overlaps_with(&key_range, &layer.key_range)
     225              :             {
     226            0 :                 tracing::warn!(
     227            0 :                     layer=%layer, "image layer at the detach LSN already exists, skipping removing aux files"
     228              :                 );
     229            0 :                 return Ok(None);
     230            0 :             }
     231              :         }
     232              :     }
     233              : 
     234            0 :     let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key_range.clone()), image_lsn);
     235            0 :     let data = ancestor
     236            0 :         .get_vectored_impl(query, &mut reconstruct_state, ctx)
     237            0 :         .await
     238            0 :         .context("failed to retrieve aux keys")
     239            0 :         .map_err(|e| Error::launder(e, Error::Prepare))?;
     240            0 :     if !data.is_empty() {
     241              :         // TODO: is it possible that we can have an image at `image_lsn`? Unlikely because image layers are only generated
     242              :         // upon compaction but theoretically possible.
     243            0 :         let mut image_layer_writer = ImageLayerWriter::new(
     244            0 :             detached.conf,
     245            0 :             detached.timeline_id,
     246            0 :             detached.tenant_shard_id,
     247            0 :             &key_range,
     248            0 :             image_lsn,
     249            0 :             &detached.gate,
     250            0 :             detached.cancel.clone(),
     251            0 :             ctx,
     252            0 :         )
     253            0 :         .await
     254            0 :         .context("failed to create image layer writer")
     255            0 :         .map_err(Error::Prepare)?;
     256            0 :         for key in data.keys() {
     257            0 :             image_layer_writer
     258            0 :                 .put_image(*key, Bytes::new(), ctx)
     259            0 :                 .await
     260            0 :                 .context("failed to write key")
     261            0 :                 .map_err(|e| Error::launder(e, Error::Prepare))?;
     262              :         }
     263            0 :         let (desc, path) = image_layer_writer
     264            0 :             .finish(ctx)
     265            0 :             .await
     266            0 :             .context("failed to finish image layer writer for removing the metadata keys")
     267            0 :             .map_err(|e| Error::launder(e, Error::Prepare))?;
     268            0 :         let generated = Layer::finish_creating(detached.conf, detached, desc, &path)
     269            0 :             .map_err(|e| Error::launder(e, Error::Prepare))?;
     270            0 :         detached
     271            0 :             .remote_client
     272            0 :             .upload_layer_file(&generated, &detached.cancel)
     273            0 :             .await
     274            0 :             .map_err(|e| Error::launder(e, Error::Prepare))?;
     275            0 :         tracing::info!(layer=%generated, "wrote image layer");
     276            0 :         Ok(Some(generated))
     277              :     } else {
     278            0 :         tracing::info!("no aux keys found in ancestor");
     279            0 :         Ok(None)
     280              :     }
     281            0 : }
     282              : 
     283              : /// See [`Timeline::prepare_to_detach_from_ancestor`]
     284            0 : pub(super) async fn prepare(
     285            0 :     detached: &Arc<Timeline>,
     286            0 :     tenant: &TenantShard,
     287            0 :     behavior: DetachBehavior,
     288            0 :     options: Options,
     289            0 :     ctx: &RequestContext,
     290            0 : ) -> Result<Progress, Error> {
     291              :     use Error::*;
     292              : 
     293            0 :     let Some((mut ancestor, mut ancestor_lsn)) = detached
     294            0 :         .ancestor_timeline
     295            0 :         .as_ref()
     296            0 :         .map(|tl| (tl.clone(), detached.ancestor_lsn))
     297              :     else {
     298              :         let ancestor_id;
     299              :         let ancestor_lsn;
     300            0 :         let still_in_progress = {
     301            0 :             let accessor = detached.remote_client.initialized_upload_queue()?;
     302              : 
     303              :             // we are safe to inspect the latest uploaded, because we can only witness this after
     304              :             // restart is complete and ancestor is no more.
     305            0 :             let latest = accessor.latest_uploaded_index_part();
     306            0 :             let Some((id, lsn)) = latest.lineage.detached_previous_ancestor() else {
     307            0 :                 return Err(NoAncestor);
     308              :             };
     309            0 :             ancestor_id = id;
     310            0 :             ancestor_lsn = lsn;
     311              : 
     312            0 :             latest
     313            0 :                 .gc_blocking
     314            0 :                 .as_ref()
     315            0 :                 .is_some_and(|b| b.blocked_by(DetachAncestor))
     316              :         };
     317              : 
     318            0 :         if still_in_progress {
     319              :             // gc is still blocked, we can still reparent and complete.
     320              :             // we are safe to reparent remaining, because they were locked in in the beginning.
     321            0 :             let attempt =
     322            0 :                 continue_with_blocked_gc(detached, tenant, ancestor_id, ancestor_lsn).await?;
     323              : 
     324              :             // because the ancestor of detached is already set to none, we have published all
     325              :             // of the layers, so we are still "prepared."
     326            0 :             return Ok(Progress::Prepared(
     327            0 :                 attempt,
     328            0 :                 PreparedTimelineDetach { layers: Vec::new() },
     329            0 :             ));
     330            0 :         }
     331              : 
     332            0 :         let reparented_timelines = reparented_direct_children(detached, tenant)?;
     333            0 :         return Ok(Progress::Done(AncestorDetached {
     334            0 :             reparented_timelines,
     335            0 :         }));
     336              :     };
     337              : 
     338            0 :     if detached.is_archived() != Some(false) {
     339            0 :         return Err(Archived(detached.timeline_id));
     340            0 :     }
     341              : 
     342            0 :     if !ancestor_lsn.is_valid() {
     343              :         // rare case, probably wouldn't even load
     344            0 :         tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing");
     345            0 :         return Err(NoAncestor);
     346            0 :     }
     347              : 
     348            0 :     check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn, behavior)?;
     349              : 
     350            0 :     if let DetachBehavior::MultiLevelAndNoReparent = behavior {
     351              :         // If the ancestor has an ancestor, we might be able to fast-path detach it if the current ancestor does not have any data written/used by the detaching timeline.
     352            0 :         while let Some(ancestor_of_ancestor) = ancestor.ancestor_timeline.clone() {
     353            0 :             if ancestor_lsn != ancestor.ancestor_lsn {
     354              :                 // non-technical requirement; we could flatten still if ancestor LSN does not match but that needs
     355              :                 // us to copy and cut more layers.
     356            0 :                 return Err(AncestorNotEmpty);
     357            0 :             }
     358              :             // Use the ancestor of the ancestor as the new ancestor (only when the ancestor LSNs are the same)
     359            0 :             ancestor_lsn = ancestor.ancestor_lsn; // Get the LSN first before resetting the `ancestor` variable
     360            0 :             ancestor = ancestor_of_ancestor;
     361              :             // TODO: do we still need to check if we don't want to reparent?
     362            0 :             check_no_archived_children_of_ancestor(
     363            0 :                 tenant,
     364            0 :                 detached,
     365            0 :                 &ancestor,
     366            0 :                 ancestor_lsn,
     367            0 :                 behavior,
     368            0 :             )?;
     369              :         }
     370            0 :     } else if ancestor.ancestor_timeline.is_some() {
     371              :         // non-technical requirement; we could flatten N ancestors just as easily but we chose
     372              :         // not to, at least initially
     373            0 :         return Err(TooManyAncestors);
     374            0 :     }
     375              : 
     376            0 :     tracing::info!(
     377            0 :         "attempt to detach the timeline from the ancestor: {}@{}, behavior={:?}",
     378            0 :         ancestor.timeline_id,
     379              :         ancestor_lsn,
     380              :         behavior
     381              :     );
     382              : 
     383            0 :     let attempt = start_new_attempt(detached, tenant, ancestor.timeline_id, ancestor_lsn).await?;
     384              : 
     385            0 :     utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking-pausable");
     386              : 
     387            0 :     fail::fail_point!(
     388            0 :         "timeline-detach-ancestor::before_starting_after_locking",
     389            0 :         |_| Err(Error::Failpoint(
     390            0 :             "timeline-detach-ancestor::before_starting_after_locking"
     391            0 :         ))
     392              :     );
     393              : 
     394            0 :     if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
     395            0 :         let span =
     396            0 :             tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
     397            0 :         async {
     398            0 :             let started_at = std::time::Instant::now();
     399            0 :             let freeze_and_flush = ancestor.freeze_and_flush0();
     400            0 :             let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
     401              : 
     402            0 :             let res =
     403            0 :                 tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
     404            0 :                     .await;
     405              : 
     406            0 :             let res = match res {
     407            0 :                 Ok(res) => res,
     408            0 :                 Err(_elapsed) => {
     409            0 :                     tracing::info!("freezing and flushing ancestor is still ongoing");
     410            0 :                     freeze_and_flush.await
     411              :                 }
     412              :             };
     413              : 
     414            0 :             res.map_err(|e| {
     415              :                 use FlushLayerError::*;
     416            0 :                 match e {
     417              :                     Cancelled | NotRunning(_) => {
     418              :                         // FIXME(#6424): technically statically unreachable right now, given how we never
     419              :                         // drop the sender
     420            0 :                         Error::ShuttingDown
     421              :                     }
     422            0 :                     CreateImageLayersError(_) | Other(_) => Error::Prepare(e.into()),
     423              :                 }
     424            0 :             })?;
     425              : 
     426              :             // we do not need to wait for uploads to complete but we do need `struct Layer`,
     427              :             // copying delta prefix is unsupported currently for `InMemoryLayer`.
     428            0 :             tracing::info!(
     429            0 :                 elapsed_ms = started_at.elapsed().as_millis(),
     430            0 :                 "froze and flushed the ancestor"
     431              :             );
     432            0 :             Ok::<_, Error>(())
     433            0 :         }
     434            0 :         .instrument(span)
     435            0 :         .await?;
     436            0 :     }
     437              : 
     438            0 :     let end_lsn = ancestor_lsn + 1;
     439              : 
     440            0 :     let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
     441              :         // we do not need to start from our layers, because they can only be layers that come
     442              :         // *after* ancestor_lsn
     443            0 :         let layers = tokio::select! {
     444            0 :             guard = ancestor.layers.read(LayerManagerLockHolder::DetachAncestor) => guard,
     445            0 :             _ = detached.cancel.cancelled() => {
     446            0 :                 return Err(ShuttingDown);
     447              :             }
     448            0 :             _ = ancestor.cancel.cancelled() => {
     449            0 :                 return Err(ShuttingDown);
     450              :             }
     451              :         };
     452              : 
     453              :         // between retries, these can change if compaction or gc ran in between. this will mean
     454              :         // we have to redo work.
     455            0 :         partition_work(ancestor_lsn, &layers)?
     456              :     };
     457              : 
     458              :     // TODO: layers are already sorted by something: use that to determine how much of remote
     459              :     // copies are already done -- gc is blocked, but a compaction could had happened on ancestor,
     460              :     // which is something to keep in mind if copy skipping is implemented.
     461            0 :     tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
     462              : 
     463              :     // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
     464            0 :     let mut new_layers: Vec<Layer> =
     465            0 :         Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
     466              : 
     467            0 :     if let Some(tombstone_layer) =
     468            0 :         generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, &rest_of_historic, ctx)
     469            0 :             .await?
     470            0 :     {
     471            0 :         new_layers.push(tombstone_layer.into());
     472            0 :     }
     473              : 
     474              :     {
     475            0 :         tracing::info!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
     476              : 
     477            0 :         let mut tasks = tokio::task::JoinSet::new();
     478              : 
     479            0 :         let mut wrote_any = false;
     480              : 
     481            0 :         let limiter = Arc::new(Semaphore::new(options.rewrite_concurrency.get()));
     482              : 
     483            0 :         for layer in straddling_branchpoint {
     484            0 :             let limiter = limiter.clone();
     485            0 :             let timeline = detached.clone();
     486            0 :             let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
     487              : 
     488            0 :             let span = tracing::info_span!("upload_rewritten_layer", %layer);
     489            0 :             tasks.spawn(
     490            0 :                 async move {
     491            0 :                     let _permit = limiter.acquire().await;
     492            0 :                     let copied =
     493            0 :                         upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
     494            0 :                             .await?;
     495            0 :                     if let Some(copied) = copied.as_ref() {
     496            0 :                         tracing::info!(%copied, "rewrote and uploaded");
     497            0 :                     }
     498            0 :                     Ok(copied)
     499            0 :                 }
     500            0 :                 .instrument(span),
     501              :             );
     502              :         }
     503              : 
     504            0 :         while let Some(res) = tasks.join_next().await {
     505            0 :             match res {
     506            0 :                 Ok(Ok(Some(copied))) => {
     507            0 :                     wrote_any = true;
     508            0 :                     new_layers.push(copied);
     509            0 :                 }
     510            0 :                 Ok(Ok(None)) => {}
     511            0 :                 Ok(Err(e)) => return Err(e),
     512            0 :                 Err(je) => return Err(Error::Prepare(je.into())),
     513              :             }
     514              :         }
     515              : 
     516              :         // FIXME: the fsync should be mandatory, after both rewrites and copies
     517            0 :         if wrote_any {
     518            0 :             fsync_timeline_dir(detached, ctx).await;
     519            0 :         }
     520              :     }
     521              : 
     522            0 :     let mut tasks = tokio::task::JoinSet::new();
     523            0 :     let limiter = Arc::new(Semaphore::new(options.copy_concurrency.get()));
     524            0 :     let cancel_eval = CancellationToken::new();
     525              : 
     526            0 :     for adopted in rest_of_historic {
     527            0 :         let limiter = limiter.clone();
     528            0 :         let timeline = detached.clone();
     529            0 :         let cancel_eval = cancel_eval.clone();
     530              : 
     531            0 :         tasks.spawn(
     532            0 :             async move {
     533            0 :                 let _permit = tokio::select! {
     534            0 :                     permit = limiter.acquire() => {
     535            0 :                         permit
     536              :                     }
     537              :                     // Wait for the cancellation here instead of letting the entire task be cancelled.
     538              :                     // Cancellations are racy in that they might leave layers on disk.
     539            0 :                     _ = cancel_eval.cancelled() => {
     540            0 :                         Err(Error::ShuttingDown)?
     541              :                     }
     542              :                 };
     543            0 :                 let (owned, did_hardlink) = remote_copy(
     544            0 :                     &adopted,
     545            0 :                     &timeline,
     546            0 :                     timeline.generation,
     547            0 :                     timeline.shard_identity,
     548            0 :                     &timeline.cancel,
     549            0 :                 )
     550            0 :                 .await?;
     551            0 :                 tracing::info!(layer=%owned, did_hard_link=%did_hardlink, "remote copied");
     552            0 :                 Ok((owned, did_hardlink))
     553            0 :             }
     554            0 :             .in_current_span(),
     555              :         );
     556              :     }
     557              : 
     558            0 :     fn delete_layers(timeline: &Timeline, layers: Vec<Layer>) -> Result<(), Error> {
     559              :         // We are deleting layers, so we must hold the gate
     560            0 :         let _gate = timeline.gate.enter().map_err(|e| match e {
     561            0 :             GateError::GateClosed => Error::ShuttingDown,
     562            0 :         })?;
     563              :         {
     564            0 :             layers.into_iter().for_each(|l: Layer| {
     565            0 :                 l.delete_on_drop();
     566            0 :                 std::mem::drop(l);
     567            0 :             });
     568              :         }
     569            0 :         Ok(())
     570            0 :     }
     571              : 
     572            0 :     let mut should_fsync = false;
     573            0 :     let mut first_err = None;
     574            0 :     while let Some(res) = tasks.join_next().await {
     575            0 :         match res {
     576            0 :             Ok(Ok((owned, did_hardlink))) => {
     577            0 :                 if did_hardlink {
     578            0 :                     should_fsync = true;
     579            0 :                 }
     580            0 :                 new_layers.push(owned);
     581              :             }
     582              : 
     583              :             // Don't stop the evaluation on errors, so that we get the full set of hardlinked layers to delete.
     584            0 :             Ok(Err(failed)) => {
     585            0 :                 cancel_eval.cancel();
     586            0 :                 first_err.get_or_insert(failed);
     587            0 :             }
     588            0 :             Err(je) => {
     589            0 :                 cancel_eval.cancel();
     590            0 :                 first_err.get_or_insert(Error::Prepare(je.into()));
     591            0 :             }
     592              :         }
     593              :     }
     594              : 
     595            0 :     if let Some(failed) = first_err {
     596            0 :         delete_layers(detached, new_layers)?;
     597            0 :         return Err(failed);
     598            0 :     }
     599              : 
     600              :     // fsync directory again if we hardlinked something
     601            0 :     if should_fsync {
     602            0 :         fsync_timeline_dir(detached, ctx).await;
     603            0 :     }
     604              : 
     605            0 :     let prepared = PreparedTimelineDetach { layers: new_layers };
     606              : 
     607            0 :     Ok(Progress::Prepared(attempt, prepared))
     608            0 : }
     609              : 
     610            0 : async fn start_new_attempt(
     611            0 :     detached: &Timeline,
     612            0 :     tenant: &TenantShard,
     613            0 :     ancestor_timeline_id: TimelineId,
     614            0 :     ancestor_lsn: Lsn,
     615            0 : ) -> Result<Attempt, Error> {
     616            0 :     let attempt = obtain_exclusive_attempt(detached, tenant, ancestor_timeline_id, ancestor_lsn)?;
     617              : 
     618              :     // insert the block in the index_part.json, if not already there.
     619            0 :     let _dont_care = tenant
     620            0 :         .gc_block
     621            0 :         .insert(
     622            0 :             detached,
     623            0 :             crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
     624            0 :         )
     625            0 :         .await
     626            0 :         .map_err(|e| Error::launder(e, Error::Prepare))?;
     627              : 
     628            0 :     Ok(attempt)
     629            0 : }
     630              : 
     631            0 : async fn continue_with_blocked_gc(
     632            0 :     detached: &Timeline,
     633            0 :     tenant: &TenantShard,
     634            0 :     ancestor_timeline_id: TimelineId,
     635            0 :     ancestor_lsn: Lsn,
     636            0 : ) -> Result<Attempt, Error> {
     637              :     // FIXME: it would be nice to confirm that there is an in-memory version, since we've just
     638              :     // verified there is a persistent one?
     639            0 :     obtain_exclusive_attempt(detached, tenant, ancestor_timeline_id, ancestor_lsn)
     640            0 : }
     641              : 
     642            0 : fn obtain_exclusive_attempt(
     643            0 :     detached: &Timeline,
     644            0 :     tenant: &TenantShard,
     645            0 :     ancestor_timeline_id: TimelineId,
     646            0 :     ancestor_lsn: Lsn,
     647            0 : ) -> Result<Attempt, Error> {
     648              :     use Error::{OtherTimelineDetachOngoing, ShuttingDown};
     649              : 
     650              :     // ensure we are the only active attempt for this tenant
     651            0 :     let (guard, barrier) = completion::channel();
     652              :     {
     653            0 :         let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
     654            0 :         if let Some((tl, other)) = guard.as_ref() {
     655            0 :             if !other.is_ready() {
     656            0 :                 return Err(OtherTimelineDetachOngoing(*tl));
     657            0 :             }
     658              :             // FIXME: no test enters here
     659            0 :         }
     660            0 :         *guard = Some((detached.timeline_id, barrier));
     661              :     }
     662              : 
     663              :     // ensure the gate is still open
     664            0 :     let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
     665              : 
     666            0 :     Ok(Attempt {
     667            0 :         timeline_id: detached.timeline_id,
     668            0 :         ancestor_timeline_id,
     669            0 :         ancestor_lsn,
     670            0 :         _guard: guard,
     671            0 :         gate_entered: Some(_gate_entered),
     672            0 :     })
     673            0 : }
     674              : 
     675            0 : fn reparented_direct_children(
     676            0 :     detached: &Arc<Timeline>,
     677            0 :     tenant: &TenantShard,
     678            0 : ) -> Result<HashSet<TimelineId>, Error> {
     679            0 :     let mut all_direct_children = tenant
     680            0 :         .timelines
     681            0 :         .lock()
     682            0 :         .unwrap()
     683            0 :         .values()
     684            0 :         .filter_map(|tl| {
     685            0 :             let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
     686              : 
     687            0 :             if is_direct_child {
     688            0 :                 Some(tl.clone())
     689              :             } else {
     690            0 :                 if let Some(timeline) = tl.ancestor_timeline.as_ref() {
     691            0 :                     assert_ne!(timeline.timeline_id, detached.timeline_id, "we cannot have two timelines with the same timeline_id live");
     692            0 :                 }
     693            0 :                 None
     694              :             }
     695            0 :         })
     696              :         // Collect to avoid lock taking order problem with Tenant::timelines and
     697              :         // Timeline::remote_client
     698            0 :         .collect::<Vec<_>>();
     699              : 
     700            0 :     let mut any_shutdown = false;
     701              : 
     702            0 :     all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() {
     703            0 :         Ok(accessor) => accessor
     704            0 :             .latest_uploaded_index_part()
     705            0 :             .lineage
     706            0 :             .is_reparented(),
     707            0 :         Err(_shutdownalike) => {
     708              :             // not 100% a shutdown, but let's bail early not to give inconsistent results in
     709              :             // sharded enviroment.
     710            0 :             any_shutdown = true;
     711            0 :             true
     712              :         }
     713            0 :     });
     714              : 
     715            0 :     if any_shutdown {
     716              :         // it could be one or many being deleted; have client retry
     717            0 :         return Err(Error::ShuttingDown);
     718            0 :     }
     719              : 
     720            0 :     Ok(all_direct_children
     721            0 :         .into_iter()
     722            0 :         .map(|tl| tl.timeline_id)
     723            0 :         .collect())
     724            0 : }
     725              : 
     726            0 : fn partition_work(
     727            0 :     ancestor_lsn: Lsn,
     728            0 :     source: &LayerManager,
     729            0 : ) -> Result<(usize, Vec<Layer>, Vec<Layer>), Error> {
     730            0 :     let mut straddling_branchpoint = vec![];
     731            0 :     let mut rest_of_historic = vec![];
     732              : 
     733            0 :     let mut later_by_lsn = 0;
     734              : 
     735            0 :     for desc in source.layer_map()?.iter_historic_layers() {
     736              :         // off by one chances here:
     737              :         // - start is inclusive
     738              :         // - end is exclusive
     739            0 :         if desc.lsn_range.start > ancestor_lsn {
     740            0 :             later_by_lsn += 1;
     741            0 :             continue;
     742            0 :         }
     743              : 
     744            0 :         let target = if desc.lsn_range.start <= ancestor_lsn
     745            0 :             && desc.lsn_range.end > ancestor_lsn
     746            0 :             && desc.is_delta
     747              :         {
     748              :             // TODO: image layer at Lsn optimization
     749            0 :             &mut straddling_branchpoint
     750              :         } else {
     751            0 :             &mut rest_of_historic
     752              :         };
     753              : 
     754            0 :         target.push(source.get_from_desc(&desc));
     755              :     }
     756              : 
     757            0 :     Ok((later_by_lsn, straddling_branchpoint, rest_of_historic))
     758            0 : }
     759              : 
     760            0 : async fn upload_rewritten_layer(
     761            0 :     end_lsn: Lsn,
     762            0 :     layer: &Layer,
     763            0 :     target: &Arc<Timeline>,
     764            0 :     cancel: &CancellationToken,
     765            0 :     ctx: &RequestContext,
     766            0 : ) -> Result<Option<Layer>, Error> {
     767            0 :     let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
     768              : 
     769            0 :     let Some(copied) = copied else {
     770            0 :         return Ok(None);
     771              :     };
     772              : 
     773            0 :     target
     774            0 :         .remote_client
     775            0 :         .upload_layer_file(&copied, cancel)
     776            0 :         .await
     777            0 :         .map_err(|e| Error::launder(e, Error::Prepare))?;
     778              : 
     779            0 :     Ok(Some(copied.into()))
     780            0 : }
     781              : 
     782            0 : async fn copy_lsn_prefix(
     783            0 :     end_lsn: Lsn,
     784            0 :     layer: &Layer,
     785            0 :     target_timeline: &Arc<Timeline>,
     786            0 :     ctx: &RequestContext,
     787            0 : ) -> Result<Option<ResidentLayer>, Error> {
     788            0 :     if target_timeline.cancel.is_cancelled() {
     789            0 :         return Err(Error::ShuttingDown);
     790            0 :     }
     791              : 
     792            0 :     tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
     793              : 
     794            0 :     let mut writer = DeltaLayerWriter::new(
     795            0 :         target_timeline.conf,
     796            0 :         target_timeline.timeline_id,
     797            0 :         target_timeline.tenant_shard_id,
     798            0 :         layer.layer_desc().key_range.start,
     799            0 :         layer.layer_desc().lsn_range.start..end_lsn,
     800            0 :         &target_timeline.gate,
     801            0 :         target_timeline.cancel.clone(),
     802            0 :         ctx,
     803            0 :     )
     804            0 :     .await
     805            0 :     .with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}"))
     806            0 :     .map_err(Error::Prepare)?;
     807              : 
     808            0 :     let resident = layer.download_and_keep_resident(ctx).await.map_err(|e| {
     809            0 :         if e.is_cancelled() {
     810            0 :             Error::ShuttingDown
     811              :         } else {
     812            0 :             Error::Prepare(e.into())
     813              :         }
     814            0 :     })?;
     815              : 
     816            0 :     let records = resident
     817            0 :         .copy_delta_prefix(&mut writer, end_lsn, ctx)
     818            0 :         .await
     819            0 :         .with_context(|| format!("copy lsn prefix of ancestors {layer}"))
     820            0 :         .map_err(Error::Prepare)?;
     821              : 
     822            0 :     drop(resident);
     823              : 
     824            0 :     tracing::debug!(%layer, records, "copied records");
     825              : 
     826            0 :     if records == 0 {
     827            0 :         drop(writer);
     828              :         // TODO: we might want to store an empty marker in remote storage for this
     829              :         // layer so that we will not needlessly walk `layer` on repeated attempts.
     830            0 :         Ok(None)
     831              :     } else {
     832              :         // reuse the key instead of adding more holes between layers by using the real
     833              :         // highest key in the layer.
     834            0 :         let reused_highest_key = layer.layer_desc().key_range.end;
     835            0 :         let (desc, path) = writer
     836            0 :             .finish(reused_highest_key, ctx)
     837            0 :             .await
     838            0 :             .map_err(Error::Prepare)?;
     839            0 :         let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
     840            0 :             .map_err(Error::Prepare)?;
     841              : 
     842            0 :         tracing::debug!(%layer, %copied, "new layer produced");
     843              : 
     844            0 :         Ok(Some(copied))
     845              :     }
     846            0 : }
     847              : 
     848              : /// Creates a new Layer instance for the adopted layer, and ensures it is found in the remote
     849              : /// storage on successful return. without the adopted layer being added to `index_part.json`.
     850              : /// Returns (Layer, did hardlink)
     851            0 : async fn remote_copy(
     852            0 :     adopted: &Layer,
     853            0 :     adoptee: &Arc<Timeline>,
     854            0 :     generation: Generation,
     855            0 :     shard_identity: ShardIdentity,
     856            0 :     cancel: &CancellationToken,
     857            0 : ) -> Result<(Layer, bool), Error> {
     858            0 :     let mut metadata = adopted.metadata();
     859            0 :     debug_assert!(metadata.generation <= generation);
     860            0 :     metadata.generation = generation;
     861            0 :     metadata.shard = shard_identity.shard_index();
     862              : 
     863            0 :     let conf = adoptee.conf;
     864            0 :     let file_name = adopted.layer_desc().layer_name();
     865              : 
     866              :     // We don't want to shut the timeline down during this operation because we do `delete_on_drop` below
     867            0 :     let _gate = adoptee.gate.enter().map_err(|e| match e {
     868            0 :         GateError::GateClosed => Error::ShuttingDown,
     869            0 :     })?;
     870              : 
     871              :     // depending if Layer::keep_resident, do a hardlink
     872              :     let did_hardlink;
     873            0 :     let owned = if let Some(adopted_resident) = adopted.keep_resident().await {
     874            0 :         let adopted_path = adopted_resident.local_path();
     875            0 :         let adoptee_path = local_layer_path(
     876            0 :             conf,
     877            0 :             &adoptee.tenant_shard_id,
     878            0 :             &adoptee.timeline_id,
     879            0 :             &file_name,
     880            0 :             &metadata.generation,
     881              :         );
     882              : 
     883            0 :         match std::fs::hard_link(adopted_path, &adoptee_path) {
     884            0 :             Ok(()) => {}
     885            0 :             Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
     886              :                 // In theory we should not get into this situation as we are doing cleanups of the layer file after errors.
     887              :                 // However, we don't do cleanups for errors past `prepare`, so there is the slight chance to get to this branch.
     888              : 
     889              :                 // Double check that the file is orphan (probably from an earlier attempt), then delete it
     890            0 :                 let key = file_name.clone().into();
     891            0 :                 if adoptee
     892            0 :                     .layers
     893            0 :                     .read(LayerManagerLockHolder::DetachAncestor)
     894            0 :                     .await
     895            0 :                     .contains_key(&key)
     896              :                 {
     897              :                     // We are supposed to filter out such cases before coming to this function
     898            0 :                     return Err(Error::Prepare(anyhow::anyhow!(
     899            0 :                         "layer file {file_name} already present and inside layer map"
     900            0 :                     )));
     901            0 :                 }
     902            0 :                 tracing::info!("Deleting orphan layer file to make way for hard linking");
     903              :                 // Delete orphan layer file and try again, to ensure this layer has a well understood source
     904            0 :                 std::fs::remove_file(&adoptee_path)
     905            0 :                     .map_err(|e| Error::launder(e.into(), Error::Prepare))?;
     906            0 :                 std::fs::hard_link(adopted_path, &adoptee_path)
     907            0 :                     .map_err(|e| Error::launder(e.into(), Error::Prepare))?;
     908              :             }
     909            0 :             Err(e) => {
     910            0 :                 return Err(Error::launder(e.into(), Error::Prepare));
     911              :             }
     912              :         };
     913            0 :         did_hardlink = true;
     914            0 :         Layer::for_resident(conf, adoptee, adoptee_path, file_name, metadata).drop_eviction_guard()
     915              :     } else {
     916            0 :         did_hardlink = false;
     917            0 :         Layer::for_evicted(conf, adoptee, file_name, metadata)
     918              :     };
     919              : 
     920            0 :     let layer = match adoptee
     921            0 :         .remote_client
     922            0 :         .copy_timeline_layer(adopted, &owned, cancel)
     923            0 :         .await
     924              :     {
     925            0 :         Ok(()) => owned,
     926            0 :         Err(e) => {
     927            0 :             {
     928            0 :                 // Clean up the layer so that on a retry we don't get errors that the file already exists
     929            0 :                 owned.delete_on_drop();
     930            0 :                 std::mem::drop(owned);
     931            0 :             }
     932            0 :             return Err(Error::launder(e, Error::Prepare));
     933              :         }
     934              :     };
     935              : 
     936            0 :     Ok((layer, did_hardlink))
     937            0 : }
     938              : 
     939              : pub(crate) enum DetachingAndReparenting {
     940              :     /// All of the following timeline ids were reparented and the timeline ancestor detach must be
     941              :     /// marked as completed.
     942              :     Reparented(HashSet<TimelineId>),
     943              : 
     944              :     /// Some of the reparentings failed. The timeline ancestor detach must **not** be marked as
     945              :     /// completed.
     946              :     ///
     947              :     /// Nested `must_reset_tenant` is set to true when any restart requiring changes were made.
     948              :     SomeReparentingFailed { must_reset_tenant: bool },
     949              : 
     950              :     /// Detaching and reparentings were completed in a previous attempt. Timeline ancestor detach
     951              :     /// must be marked as completed.
     952              :     AlreadyDone(HashSet<TimelineId>),
     953              : }
     954              : 
     955              : impl DetachingAndReparenting {
     956            0 :     pub(crate) fn reset_tenant_required(&self) -> bool {
     957              :         use DetachingAndReparenting::*;
     958            0 :         match self {
     959            0 :             Reparented(_) => true,
     960            0 :             SomeReparentingFailed { must_reset_tenant } => *must_reset_tenant,
     961            0 :             AlreadyDone(_) => false,
     962              :         }
     963            0 :     }
     964              : 
     965            0 :     pub(crate) fn completed(self) -> Option<HashSet<TimelineId>> {
     966              :         use DetachingAndReparenting::*;
     967            0 :         match self {
     968            0 :             Reparented(x) | AlreadyDone(x) => Some(x),
     969            0 :             SomeReparentingFailed { .. } => None,
     970              :         }
     971            0 :     }
     972              : }
     973              : 
     974              : /// See [`Timeline::detach_from_ancestor_and_reparent`].
     975            0 : pub(super) async fn detach_and_reparent(
     976            0 :     detached: &Arc<Timeline>,
     977            0 :     tenant: &TenantShard,
     978            0 :     prepared: PreparedTimelineDetach,
     979            0 :     ancestor_timeline_id: TimelineId,
     980            0 :     ancestor_lsn: Lsn,
     981            0 :     behavior: DetachBehavior,
     982            0 :     _ctx: &RequestContext,
     983            0 : ) -> Result<DetachingAndReparenting, Error> {
     984            0 :     let PreparedTimelineDetach { layers } = prepared;
     985              : 
     986              :     #[derive(Debug)]
     987              :     enum Ancestor {
     988              :         NotDetached(Arc<Timeline>, Lsn),
     989              :         Detached(Arc<Timeline>, Lsn),
     990              :     }
     991              : 
     992            0 :     let (recorded_branchpoint, still_ongoing) = {
     993            0 :         let access = detached.remote_client.initialized_upload_queue()?;
     994            0 :         let latest = access.latest_uploaded_index_part();
     995              : 
     996              :         (
     997            0 :             latest.lineage.detached_previous_ancestor(),
     998            0 :             latest
     999            0 :                 .gc_blocking
    1000            0 :                 .as_ref()
    1001            0 :                 .is_some_and(|b| b.blocked_by(DetachAncestor)),
    1002              :         )
    1003              :     };
    1004            0 :     assert!(
    1005            0 :         still_ongoing,
    1006            0 :         "cannot (detach? reparent)? complete if the operation is not still ongoing"
    1007              :     );
    1008              : 
    1009            0 :     let ancestor_to_detach = match detached.ancestor_timeline.as_ref() {
    1010            0 :         Some(mut ancestor) => {
    1011            0 :             while ancestor.timeline_id != ancestor_timeline_id {
    1012            0 :                 match ancestor.ancestor_timeline.as_ref() {
    1013            0 :                     Some(found) => {
    1014            0 :                         if ancestor_lsn != ancestor.ancestor_lsn {
    1015            0 :                             return Err(Error::DetachReparent(anyhow::anyhow!(
    1016            0 :                                 "cannot find the ancestor timeline to detach from: wrong ancestor lsn"
    1017            0 :                             )));
    1018            0 :                         }
    1019            0 :                         ancestor = found;
    1020              :                     }
    1021              :                     None => {
    1022            0 :                         return Err(Error::DetachReparent(anyhow::anyhow!(
    1023            0 :                             "cannot find the ancestor timeline to detach from"
    1024            0 :                         )));
    1025              :                     }
    1026              :                 }
    1027              :             }
    1028            0 :             Some(ancestor)
    1029              :         }
    1030            0 :         None => None,
    1031              :     };
    1032            0 :     let ancestor = match (ancestor_to_detach, recorded_branchpoint) {
    1033            0 :         (Some(ancestor), None) => {
    1034            0 :             assert!(
    1035            0 :                 !layers.is_empty(),
    1036            0 :                 "there should always be at least one layer to inherit"
    1037              :             );
    1038            0 :             Ancestor::NotDetached(ancestor.clone(), detached.ancestor_lsn)
    1039              :         }
    1040              :         (Some(_), Some(_)) => {
    1041            0 :             panic!(
    1042            0 :                 "it should be impossible to get to here without having gone through the tenant reset; if the tenant was reset, then the ancestor_timeline would be None"
    1043              :             );
    1044              :         }
    1045            0 :         (None, Some((ancestor_id, ancestor_lsn))) => {
    1046              :             // it has been either:
    1047              :             // - detached but still exists => we can try reparenting
    1048              :             // - detached and deleted
    1049              :             //
    1050              :             // either way, we must complete
    1051            0 :             assert!(
    1052            0 :                 layers.is_empty(),
    1053            0 :                 "no layers should had been copied as detach is done"
    1054              :             );
    1055              : 
    1056            0 :             let existing = tenant.timelines.lock().unwrap().get(&ancestor_id).cloned();
    1057              : 
    1058            0 :             if let Some(ancestor) = existing {
    1059            0 :                 Ancestor::Detached(ancestor, ancestor_lsn)
    1060              :             } else {
    1061            0 :                 let direct_children = reparented_direct_children(detached, tenant)?;
    1062            0 :                 return Ok(DetachingAndReparenting::AlreadyDone(direct_children));
    1063              :             }
    1064              :         }
    1065              :         (None, None) => {
    1066              :             // TODO: make sure there are no `?` before tenant_reset from after a questionmark from
    1067              :             // here.
    1068            0 :             panic!(
    1069            0 :                 "bug: detach_and_reparent called on a timeline which has not been detached or which has no live ancestor"
    1070              :             );
    1071              :         }
    1072              :     };
    1073              : 
    1074              :     // publish the prepared layers before we reparent any of the timelines, so that on restart
    1075              :     // reparented timelines find layers. also do the actual detaching.
    1076              :     //
    1077              :     // if we crash after this operation, a retry will allow reparenting the remaining timelines as
    1078              :     // gc is blocked.
    1079              : 
    1080            0 :     let (ancestor, ancestor_lsn, was_detached) = match ancestor {
    1081            0 :         Ancestor::NotDetached(ancestor, ancestor_lsn) => {
    1082              :             // this has to complete before any reparentings because otherwise they would not have
    1083              :             // layers on the new parent.
    1084            0 :             detached
    1085            0 :                 .remote_client
    1086            0 :                 .schedule_adding_existing_layers_to_index_detach_and_wait(
    1087            0 :                     &layers,
    1088            0 :                     (ancestor.timeline_id, ancestor_lsn),
    1089            0 :                 )
    1090            0 :                 .await
    1091            0 :                 .context("publish layers and detach ancestor")
    1092            0 :                 .map_err(|e| Error::launder(e, Error::DetachReparent))?;
    1093              : 
    1094            0 :             tracing::info!(
    1095            0 :                 ancestor=%ancestor.timeline_id,
    1096              :                 %ancestor_lsn,
    1097            0 :                 inherited_layers=%layers.len(),
    1098            0 :                 "detached from ancestor"
    1099              :             );
    1100            0 :             (ancestor, ancestor_lsn, true)
    1101              :         }
    1102            0 :         Ancestor::Detached(ancestor, ancestor_lsn) => (ancestor, ancestor_lsn, false),
    1103              :     };
    1104              : 
    1105            0 :     if let DetachBehavior::MultiLevelAndNoReparent = behavior {
    1106              :         // Do not reparent if the user requests to behave so.
    1107            0 :         return Ok(DetachingAndReparenting::Reparented(HashSet::new()));
    1108            0 :     }
    1109              : 
    1110            0 :     let mut tasks = tokio::task::JoinSet::new();
    1111              : 
    1112              :     // Returns a single permit semaphore which will be used to make one reparenting succeed,
    1113              :     // others will fail as if those timelines had been stopped for whatever reason.
    1114              :     #[cfg(feature = "testing")]
    1115            0 :     let failpoint_sem = || -> Option<Arc<Semaphore>> {
    1116            0 :         fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
    1117            0 :             Arc::new(Semaphore::new(1))
    1118              :         ));
    1119            0 :         None
    1120            0 :     }();
    1121              : 
    1122              :     // because we are now keeping the slot in progress, it is unlikely that there will be any
    1123              :     // timeline deletions during this time. if we raced one, then we'll just ignore it.
    1124              :     {
    1125            0 :         let g = tenant.timelines.lock().unwrap();
    1126            0 :         reparentable_timelines(g.values(), detached, &ancestor, ancestor_lsn)
    1127            0 :             .cloned()
    1128            0 :             .for_each(|timeline| {
    1129              :                 // important in this scope: we are holding the Tenant::timelines lock
    1130            0 :                 let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
    1131            0 :                 let new_parent = detached.timeline_id;
    1132              :                 #[cfg(feature = "testing")]
    1133            0 :                 let failpoint_sem = failpoint_sem.clone();
    1134              : 
    1135            0 :                 tasks.spawn(
    1136            0 :                     async move {
    1137            0 :                         let res = async {
    1138              :                             #[cfg(feature = "testing")]
    1139            0 :                             if let Some(failpoint_sem) = failpoint_sem {
    1140            0 :                                 let _permit = failpoint_sem.acquire().await.map_err(|_| {
    1141            0 :                                     anyhow::anyhow!(
    1142            0 :                                         "failpoint: timeline-detach-ancestor::allow_one_reparented",
    1143              :                                     )
    1144            0 :                                 })?;
    1145            0 :                                 failpoint_sem.close();
    1146            0 :                             }
    1147              : 
    1148            0 :                             timeline
    1149            0 :                                 .remote_client
    1150            0 :                                 .schedule_reparenting_and_wait(&new_parent)
    1151            0 :                                 .await
    1152            0 :                         }
    1153            0 :                         .await;
    1154              : 
    1155            0 :                         match res {
    1156              :                             Ok(()) => {
    1157            0 :                                 tracing::info!("reparented");
    1158            0 :                                 Some(timeline)
    1159              :                             }
    1160            0 :                             Err(e) => {
    1161              :                                 // with the use of tenant slot, raced timeline deletion is the most
    1162              :                                 // likely reason.
    1163            0 :                                 tracing::warn!("reparenting failed: {e:#}");
    1164            0 :                                 None
    1165              :                             }
    1166              :                         }
    1167            0 :                     }
    1168            0 :                     .instrument(span),
    1169              :                 );
    1170            0 :             });
    1171              :     }
    1172              : 
    1173            0 :     let reparenting_candidates = tasks.len();
    1174            0 :     let mut reparented = HashSet::with_capacity(tasks.len());
    1175              : 
    1176            0 :     while let Some(res) = tasks.join_next().await {
    1177            0 :         match res {
    1178            0 :             Ok(Some(timeline)) => {
    1179            0 :                 assert!(
    1180            0 :                     reparented.insert(timeline.timeline_id),
    1181            0 :                     "duplicate reparenting? timeline_id={}",
    1182            0 :                     timeline.timeline_id
    1183              :                 );
    1184              :             }
    1185            0 :             Err(je) if je.is_cancelled() => unreachable!("not used"),
    1186              :             // just ignore failures now, we can retry
    1187            0 :             Ok(None) => {}
    1188            0 :             Err(je) if je.is_panic() => {}
    1189            0 :             Err(je) => tracing::error!("unexpected join error: {je:?}"),
    1190              :         }
    1191              :     }
    1192              : 
    1193            0 :     let reparented_all = reparenting_candidates == reparented.len();
    1194              : 
    1195            0 :     if reparented_all {
    1196            0 :         Ok(DetachingAndReparenting::Reparented(reparented))
    1197              :     } else {
    1198            0 :         tracing::info!(
    1199            0 :             reparented = reparented.len(),
    1200              :             candidates = reparenting_candidates,
    1201            0 :             "failed to reparent all candidates; they can be retried after the tenant_reset",
    1202              :         );
    1203              : 
    1204            0 :         let must_reset_tenant = !reparented.is_empty() || was_detached;
    1205            0 :         Ok(DetachingAndReparenting::SomeReparentingFailed { must_reset_tenant })
    1206              :     }
    1207            0 : }
    1208              : 
    1209            0 : pub(super) async fn complete(
    1210            0 :     detached: &Arc<Timeline>,
    1211            0 :     tenant: &TenantShard,
    1212            0 :     mut attempt: Attempt,
    1213            0 :     _ctx: &RequestContext,
    1214            0 : ) -> Result<(), Error> {
    1215            0 :     assert_eq!(detached.timeline_id, attempt.timeline_id);
    1216              : 
    1217            0 :     if attempt.gate_entered.is_none() {
    1218            0 :         let entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
    1219            0 :         attempt.gate_entered = Some(entered);
    1220            0 :     } else {
    1221            0 :         // Some(gate_entered) means the tenant was not restarted, as is not required
    1222            0 :     }
    1223              : 
    1224            0 :     assert!(detached.ancestor_timeline.is_none());
    1225              : 
    1226              :     // this should be an 503 at least...?
    1227            0 :     fail::fail_point!(
    1228            0 :         "timeline-detach-ancestor::complete_before_uploading",
    1229            0 :         |_| Err(Error::Failpoint(
    1230            0 :             "timeline-detach-ancestor::complete_before_uploading"
    1231            0 :         ))
    1232              :     );
    1233              : 
    1234            0 :     tenant
    1235            0 :         .gc_block
    1236            0 :         .remove(
    1237            0 :             detached,
    1238            0 :             crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
    1239            0 :         )
    1240            0 :         .await
    1241            0 :         .map_err(|e| Error::launder(e, Error::Complete))?;
    1242              : 
    1243            0 :     Ok(())
    1244            0 : }
    1245              : 
    1246              : /// Query against a locked `Tenant::timelines`.
    1247              : ///
    1248              : /// A timeline is reparentable if:
    1249              : ///
    1250              : /// - It is not the timeline being detached.
    1251              : /// - It has the same ancestor as the timeline being detached. Note that the ancestor might not be the direct ancestor.
    1252            0 : fn reparentable_timelines<'a, I>(
    1253            0 :     timelines: I,
    1254            0 :     detached: &'a Arc<Timeline>,
    1255            0 :     ancestor: &'a Arc<Timeline>,
    1256            0 :     ancestor_lsn: Lsn,
    1257            0 : ) -> impl Iterator<Item = &'a Arc<Timeline>> + 'a
    1258            0 : where
    1259            0 :     I: Iterator<Item = &'a Arc<Timeline>> + 'a,
    1260              : {
    1261            0 :     timelines.filter_map(move |tl| {
    1262            0 :         if Arc::ptr_eq(tl, detached) {
    1263            0 :             return None;
    1264            0 :         }
    1265              : 
    1266            0 :         let tl_ancestor = tl.ancestor_timeline.as_ref()?;
    1267            0 :         let is_same = Arc::ptr_eq(ancestor, tl_ancestor);
    1268            0 :         let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
    1269              : 
    1270            0 :         let is_deleting = tl
    1271            0 :             .delete_progress
    1272            0 :             .try_lock()
    1273            0 :             .map(|flow| !flow.is_not_started())
    1274            0 :             .unwrap_or(true);
    1275              : 
    1276            0 :         if is_same && is_earlier && !is_deleting {
    1277            0 :             Some(tl)
    1278              :         } else {
    1279            0 :             None
    1280              :         }
    1281            0 :     })
    1282            0 : }
    1283              : 
    1284            0 : fn check_no_archived_children_of_ancestor(
    1285            0 :     tenant: &TenantShard,
    1286            0 :     detached: &Arc<Timeline>,
    1287            0 :     ancestor: &Arc<Timeline>,
    1288            0 :     ancestor_lsn: Lsn,
    1289            0 :     detach_behavior: DetachBehavior,
    1290            0 : ) -> Result<(), Error> {
    1291            0 :     match detach_behavior {
    1292              :         DetachBehavior::NoAncestorAndReparent => {
    1293            0 :             let timelines = tenant.timelines.lock().unwrap();
    1294            0 :             let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
    1295              : 
    1296            0 :             for timeline in
    1297            0 :                 reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn)
    1298              :             {
    1299            0 :                 if timeline.is_archived() == Some(true) {
    1300            0 :                     return Err(Error::Archived(timeline.timeline_id));
    1301            0 :                 }
    1302              :             }
    1303              : 
    1304            0 :             for timeline_offloaded in timelines_offloaded.values() {
    1305            0 :                 if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
    1306            0 :                     continue;
    1307            0 :                 }
    1308              :                 // This forbids the detach ancestor feature if flattened timelines are present,
    1309              :                 // even if the ancestor_lsn is from after the branchpoint of the detached timeline.
    1310              :                 // But as per current design, we don't record the ancestor_lsn of flattened timelines.
    1311              :                 // This is a bit unfortunate, but as of writing this we don't support flattening
    1312              :                 // anyway. Maybe we can evolve the data model in the future.
    1313            0 :                 if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
    1314            0 :                     let is_earlier = retain_lsn <= ancestor_lsn;
    1315            0 :                     if !is_earlier {
    1316            0 :                         continue;
    1317            0 :                     }
    1318            0 :                 }
    1319            0 :                 return Err(Error::Archived(timeline_offloaded.timeline_id));
    1320              :             }
    1321              :         }
    1322            0 :         DetachBehavior::MultiLevelAndNoReparent => {
    1323            0 :             // We don't need to check anything if the user requested to not reparent.
    1324            0 :         }
    1325              :     }
    1326              : 
    1327            0 :     Ok(())
    1328            0 : }
    1329              : 
    1330            0 : async fn fsync_timeline_dir(timeline: &Timeline, ctx: &RequestContext) {
    1331            0 :     let path = &timeline
    1332            0 :         .conf
    1333            0 :         .timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id);
    1334            0 :     let timeline_dir = VirtualFile::open(&path, ctx)
    1335            0 :         .await
    1336            0 :         .fatal_err("VirtualFile::open for timeline dir fsync");
    1337            0 :     timeline_dir
    1338            0 :         .sync_all()
    1339            0 :         .await
    1340            0 :         .fatal_err("VirtualFile::sync_all timeline dir");
    1341            0 : }
        

Generated by: LCOV version 2.1-beta