LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - detach_ancestor.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 0.0 % 621 0
Test Date: 2024-09-25 14:04:07 Functions: 0.0 % 67 0

            Line data    Source code
       1              : use std::{collections::HashSet, sync::Arc};
       2              : 
       3              : use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
       4              : use crate::{
       5              :     context::{DownloadBehavior, RequestContext},
       6              :     task_mgr::TaskKind,
       7              :     tenant::{
       8              :         remote_timeline_client::index::GcBlockingReason::DetachAncestor,
       9              :         storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
      10              :         Tenant,
      11              :     },
      12              :     virtual_file::{MaybeFatalIo, VirtualFile},
      13              : };
      14              : use anyhow::Context;
      15              : use pageserver_api::models::detach_ancestor::AncestorDetached;
      16              : use tokio::sync::Semaphore;
      17              : use tokio_util::sync::CancellationToken;
      18              : use tracing::Instrument;
      19              : use utils::{completion, generation::Generation, http::error::ApiError, id::TimelineId, lsn::Lsn};
      20              : 
      21            0 : #[derive(Debug, thiserror::Error)]
      22              : pub(crate) enum Error {
      23              :     #[error("no ancestors")]
      24              :     NoAncestor,
      25              : 
      26              :     #[error("too many ancestors")]
      27              :     TooManyAncestors,
      28              : 
      29              :     #[error("shutting down, please retry later")]
      30              :     ShuttingDown,
      31              : 
      32              :     #[error(transparent)]
      33              :     NotFound(crate::tenant::GetTimelineError),
      34              : 
      35              :     #[error("failed to reparent all candidate timelines, please retry")]
      36              :     FailedToReparentAll,
      37              : 
      38              :     #[error("ancestor is already being detached by: {}", .0)]
      39              :     OtherTimelineDetachOngoing(TimelineId),
      40              : 
      41              :     #[error("preparing to timeline ancestor detach failed")]
      42              :     Prepare(#[source] anyhow::Error),
      43              : 
      44              :     #[error("detaching and reparenting failed")]
      45              :     DetachReparent(#[source] anyhow::Error),
      46              : 
      47              :     #[error("completing ancestor detach failed")]
      48              :     Complete(#[source] anyhow::Error),
      49              : 
      50              :     #[error("failpoint: {}", .0)]
      51              :     Failpoint(&'static str),
      52              : }
      53              : 
      54              : impl Error {
      55              :     /// Try to catch cancellation from within the `anyhow::Error`, or wrap the anyhow as the given
      56              :     /// variant or fancier `or_else`.
      57            0 :     fn launder<F>(e: anyhow::Error, or_else: F) -> Error
      58            0 :     where
      59            0 :         F: Fn(anyhow::Error) -> Error,
      60            0 :     {
      61              :         use crate::tenant::remote_timeline_client::WaitCompletionError;
      62              :         use crate::tenant::upload_queue::NotInitialized;
      63              :         use remote_storage::TimeoutOrCancel;
      64              : 
      65            0 :         if e.is::<NotInitialized>()
      66            0 :             || TimeoutOrCancel::caused_by_cancel(&e)
      67            0 :             || e.downcast_ref::<remote_storage::DownloadError>()
      68            0 :                 .is_some_and(|e| e.is_cancelled())
      69            0 :             || e.is::<WaitCompletionError>()
      70              :         {
      71            0 :             Error::ShuttingDown
      72              :         } else {
      73            0 :             or_else(e)
      74              :         }
      75            0 :     }
      76              : }
      77              : 
      78              : impl From<Error> for ApiError {
      79            0 :     fn from(value: Error) -> Self {
      80            0 :         match value {
      81            0 :             Error::NoAncestor => ApiError::Conflict(value.to_string()),
      82            0 :             Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", value)),
      83            0 :             Error::ShuttingDown => ApiError::ShuttingDown,
      84              :             Error::OtherTimelineDetachOngoing(_) | Error::FailedToReparentAll => {
      85            0 :                 ApiError::ResourceUnavailable(value.to_string().into())
      86              :             }
      87            0 :             Error::NotFound(e) => ApiError::from(e),
      88              :             // these variants should have no cancellation errors because of Error::launder
      89              :             Error::Prepare(_)
      90              :             | Error::DetachReparent(_)
      91              :             | Error::Complete(_)
      92            0 :             | Error::Failpoint(_) => ApiError::InternalServerError(value.into()),
      93              :         }
      94            0 :     }
      95              : }
      96              : 
      97              : impl From<crate::tenant::upload_queue::NotInitialized> for Error {
      98            0 :     fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self {
      99            0 :         // treat all as shutting down signals, even though that is not entirely correct
     100            0 :         // (uninitialized state)
     101            0 :         Error::ShuttingDown
     102            0 :     }
     103              : }
     104              : impl From<super::layer_manager::Shutdown> for Error {
     105            0 :     fn from(_: super::layer_manager::Shutdown) -> Self {
     106            0 :         Error::ShuttingDown
     107            0 :     }
     108              : }
     109              : 
     110              : pub(crate) enum Progress {
     111              :     Prepared(Attempt, PreparedTimelineDetach),
     112              :     Done(AncestorDetached),
     113              : }
     114              : 
     115              : pub(crate) struct PreparedTimelineDetach {
     116              :     layers: Vec<Layer>,
     117              : }
     118              : 
     119              : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
     120              : #[derive(Debug)]
     121              : pub(crate) struct Options {
     122              :     pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
     123              :     pub(crate) copy_concurrency: std::num::NonZeroUsize,
     124              : }
     125              : 
     126              : impl Default for Options {
     127            0 :     fn default() -> Self {
     128            0 :         Self {
     129            0 :             rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
     130            0 :             copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
     131            0 :         }
     132            0 :     }
     133              : }
     134              : 
     135              : /// Represents an across tenant reset exclusive single attempt to detach ancestor.
     136              : #[derive(Debug)]
     137              : pub(crate) struct Attempt {
     138              :     pub(crate) timeline_id: TimelineId,
     139              : 
     140              :     _guard: completion::Completion,
     141              :     gate_entered: Option<utils::sync::gate::GateGuard>,
     142              : }
     143              : 
     144              : impl Attempt {
     145            0 :     pub(crate) fn before_reset_tenant(&mut self) {
     146            0 :         let taken = self.gate_entered.take();
     147            0 :         assert!(taken.is_some());
     148            0 :     }
     149              : 
     150            0 :     pub(crate) fn new_barrier(&self) -> completion::Barrier {
     151            0 :         self._guard.barrier()
     152            0 :     }
     153              : }
     154              : 
     155              : /// See [`Timeline::prepare_to_detach_from_ancestor`]
     156            0 : pub(super) async fn prepare(
     157            0 :     detached: &Arc<Timeline>,
     158            0 :     tenant: &Tenant,
     159            0 :     options: Options,
     160            0 :     ctx: &RequestContext,
     161            0 : ) -> Result<Progress, Error> {
     162              :     use Error::*;
     163              : 
     164            0 :     let Some((ancestor, ancestor_lsn)) = detached
     165            0 :         .ancestor_timeline
     166            0 :         .as_ref()
     167            0 :         .map(|tl| (tl.clone(), detached.ancestor_lsn))
     168              :     else {
     169            0 :         let still_in_progress = {
     170            0 :             let accessor = detached.remote_client.initialized_upload_queue()?;
     171              : 
     172              :             // we are safe to inspect the latest uploaded, because we can only witness this after
     173              :             // restart is complete and ancestor is no more.
     174            0 :             let latest = accessor.latest_uploaded_index_part();
     175            0 :             if latest.lineage.detached_previous_ancestor().is_none() {
     176            0 :                 return Err(NoAncestor);
     177            0 :             };
     178            0 : 
     179            0 :             latest
     180            0 :                 .gc_blocking
     181            0 :                 .as_ref()
     182            0 :                 .is_some_and(|b| b.blocked_by(DetachAncestor))
     183            0 :         };
     184            0 : 
     185            0 :         if still_in_progress {
     186              :             // gc is still blocked, we can still reparent and complete.
     187              :             // we are safe to reparent remaining, because they were locked in in the beginning.
     188            0 :             let attempt = continue_with_blocked_gc(detached, tenant).await?;
     189              : 
     190              :             // because the ancestor of detached is already set to none, we have published all
     191              :             // of the layers, so we are still "prepared."
     192            0 :             return Ok(Progress::Prepared(
     193            0 :                 attempt,
     194            0 :                 PreparedTimelineDetach { layers: Vec::new() },
     195            0 :             ));
     196            0 :         }
     197              : 
     198            0 :         let reparented_timelines = reparented_direct_children(detached, tenant)?;
     199            0 :         return Ok(Progress::Done(AncestorDetached {
     200            0 :             reparented_timelines,
     201            0 :         }));
     202              :     };
     203              : 
     204            0 :     if !ancestor_lsn.is_valid() {
     205              :         // rare case, probably wouldn't even load
     206            0 :         tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing");
     207            0 :         return Err(NoAncestor);
     208            0 :     }
     209            0 : 
     210            0 :     if ancestor.ancestor_timeline.is_some() {
     211              :         // non-technical requirement; we could flatten N ancestors just as easily but we chose
     212              :         // not to, at least initially
     213            0 :         return Err(TooManyAncestors);
     214            0 :     }
     215              : 
     216            0 :     let attempt = start_new_attempt(detached, tenant).await?;
     217              : 
     218            0 :     utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking-pausable");
     219              : 
     220            0 :     fail::fail_point!(
     221            0 :         "timeline-detach-ancestor::before_starting_after_locking",
     222            0 :         |_| Err(Error::Failpoint(
     223            0 :             "timeline-detach-ancestor::before_starting_after_locking"
     224            0 :         ))
     225            0 :     );
     226              : 
     227            0 :     if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
     228            0 :         let span =
     229            0 :             tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
     230            0 :         async {
     231            0 :             let started_at = std::time::Instant::now();
     232            0 :             let freeze_and_flush = ancestor.freeze_and_flush0();
     233            0 :             let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
     234              : 
     235            0 :             let res =
     236            0 :                 tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
     237            0 :                     .await;
     238              : 
     239            0 :             let res = match res {
     240            0 :                 Ok(res) => res,
     241            0 :                 Err(_elapsed) => {
     242            0 :                     tracing::info!("freezing and flushing ancestor is still ongoing");
     243            0 :                     freeze_and_flush.await
     244              :                 }
     245              :             };
     246              : 
     247            0 :             res.map_err(|e| {
     248              :                 use FlushLayerError::*;
     249            0 :                 match e {
     250              :                     Cancelled | NotRunning(_) => {
     251              :                         // FIXME(#6424): technically statically unreachable right now, given how we never
     252              :                         // drop the sender
     253            0 :                         Error::ShuttingDown
     254              :                     }
     255            0 :                     CreateImageLayersError(_) | Other(_) => Error::Prepare(e.into()),
     256              :                 }
     257            0 :             })?;
     258              : 
     259              :             // we do not need to wait for uploads to complete but we do need `struct Layer`,
     260              :             // copying delta prefix is unsupported currently for `InMemoryLayer`.
     261            0 :             tracing::info!(
     262            0 :                 elapsed_ms = started_at.elapsed().as_millis(),
     263            0 :                 "froze and flushed the ancestor"
     264              :             );
     265            0 :             Ok::<_, Error>(())
     266            0 :         }
     267            0 :         .instrument(span)
     268            0 :         .await?;
     269            0 :     }
     270              : 
     271            0 :     let end_lsn = ancestor_lsn + 1;
     272              : 
     273            0 :     let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
     274              :         // we do not need to start from our layers, because they can only be layers that come
     275              :         // *after* ancestor_lsn
     276            0 :         let layers = tokio::select! {
     277            0 :             guard = ancestor.layers.read() => guard,
     278            0 :             _ = detached.cancel.cancelled() => {
     279            0 :                 return Err(ShuttingDown);
     280              :             }
     281            0 :             _ = ancestor.cancel.cancelled() => {
     282            0 :                 return Err(ShuttingDown);
     283              :             }
     284              :         };
     285              : 
     286              :         // between retries, these can change if compaction or gc ran in between. this will mean
     287              :         // we have to redo work.
     288            0 :         partition_work(ancestor_lsn, &layers)?
     289              :     };
     290              : 
     291              :     // TODO: layers are already sorted by something: use that to determine how much of remote
     292              :     // copies are already done -- gc is blocked, but a compaction could had happened on ancestor,
     293              :     // which is something to keep in mind if copy skipping is implemented.
     294            0 :     tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
     295              : 
     296              :     // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
     297            0 :     let mut new_layers: Vec<Layer> =
     298            0 :         Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
     299            0 : 
     300            0 :     {
     301            0 :         tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
     302              : 
     303            0 :         let mut tasks = tokio::task::JoinSet::new();
     304            0 : 
     305            0 :         let mut wrote_any = false;
     306            0 : 
     307            0 :         let limiter = Arc::new(Semaphore::new(options.rewrite_concurrency.get()));
     308              : 
     309            0 :         for layer in straddling_branchpoint {
     310            0 :             let limiter = limiter.clone();
     311            0 :             let timeline = detached.clone();
     312            0 :             let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
     313              : 
     314            0 :             let span = tracing::info_span!("upload_rewritten_layer", %layer);
     315            0 :             tasks.spawn(
     316            0 :                 async move {
     317            0 :                     let _permit = limiter.acquire().await;
     318            0 :                     let copied =
     319            0 :                         upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
     320            0 :                             .await?;
     321            0 :                     if let Some(copied) = copied.as_ref() {
     322            0 :                         tracing::info!(%copied, "rewrote and uploaded");
     323            0 :                     }
     324            0 :                     Ok(copied)
     325            0 :                 }
     326            0 :                 .instrument(span),
     327            0 :             );
     328            0 :         }
     329              : 
     330            0 :         while let Some(res) = tasks.join_next().await {
     331            0 :             match res {
     332            0 :                 Ok(Ok(Some(copied))) => {
     333            0 :                     wrote_any = true;
     334            0 :                     new_layers.push(copied);
     335            0 :                 }
     336            0 :                 Ok(Ok(None)) => {}
     337            0 :                 Ok(Err(e)) => return Err(e),
     338            0 :                 Err(je) => return Err(Error::Prepare(je.into())),
     339              :             }
     340              :         }
     341              : 
     342              :         // FIXME: the fsync should be mandatory, after both rewrites and copies
     343            0 :         if wrote_any {
     344            0 :             let timeline_dir = VirtualFile::open(
     345            0 :                 &detached
     346            0 :                     .conf
     347            0 :                     .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
     348            0 :                 ctx,
     349            0 :             )
     350            0 :             .await
     351            0 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     352            0 :             timeline_dir
     353            0 :                 .sync_all()
     354            0 :                 .await
     355            0 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     356            0 :         }
     357              :     }
     358              : 
     359            0 :     let mut tasks = tokio::task::JoinSet::new();
     360            0 :     let limiter = Arc::new(Semaphore::new(options.copy_concurrency.get()));
     361              : 
     362            0 :     for adopted in rest_of_historic {
     363            0 :         let limiter = limiter.clone();
     364            0 :         let timeline = detached.clone();
     365            0 : 
     366            0 :         tasks.spawn(
     367            0 :             async move {
     368            0 :                 let _permit = limiter.acquire().await;
     369            0 :                 let owned =
     370            0 :                     remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
     371            0 :                 tracing::info!(layer=%owned, "remote copied");
     372            0 :                 Ok(owned)
     373            0 :             }
     374            0 :             .in_current_span(),
     375            0 :         );
     376            0 :     }
     377              : 
     378            0 :     while let Some(res) = tasks.join_next().await {
     379            0 :         match res {
     380            0 :             Ok(Ok(owned)) => {
     381            0 :                 new_layers.push(owned);
     382            0 :             }
     383            0 :             Ok(Err(failed)) => {
     384            0 :                 return Err(failed);
     385              :             }
     386            0 :             Err(je) => return Err(Error::Prepare(je.into())),
     387              :         }
     388              :     }
     389              : 
     390              :     // TODO: fsync directory again if we hardlinked something
     391              : 
     392            0 :     let prepared = PreparedTimelineDetach { layers: new_layers };
     393            0 : 
     394            0 :     Ok(Progress::Prepared(attempt, prepared))
     395            0 : }
     396              : 
     397            0 : async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
     398            0 :     let attempt = obtain_exclusive_attempt(detached, tenant)?;
     399              : 
     400              :     // insert the block in the index_part.json, if not already there.
     401            0 :     let _dont_care = tenant
     402            0 :         .gc_block
     403            0 :         .insert(
     404            0 :             detached,
     405            0 :             crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
     406            0 :         )
     407            0 :         .await
     408            0 :         .map_err(|e| Error::launder(e, Error::Prepare))?;
     409              : 
     410            0 :     Ok(attempt)
     411            0 : }
     412              : 
     413            0 : async fn continue_with_blocked_gc(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
     414            0 :     // FIXME: it would be nice to confirm that there is an in-memory version, since we've just
     415            0 :     // verified there is a persistent one?
     416            0 :     obtain_exclusive_attempt(detached, tenant)
     417            0 : }
     418              : 
     419            0 : fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
     420              :     use Error::{OtherTimelineDetachOngoing, ShuttingDown};
     421              : 
     422              :     // ensure we are the only active attempt for this tenant
     423            0 :     let (guard, barrier) = completion::channel();
     424            0 :     {
     425            0 :         let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
     426            0 :         if let Some((tl, other)) = guard.as_ref() {
     427            0 :             if !other.is_ready() {
     428            0 :                 return Err(OtherTimelineDetachOngoing(*tl));
     429            0 :             }
     430              :             // FIXME: no test enters here
     431            0 :         }
     432            0 :         *guard = Some((detached.timeline_id, barrier));
     433              :     }
     434              : 
     435              :     // ensure the gate is still open
     436            0 :     let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
     437              : 
     438            0 :     Ok(Attempt {
     439            0 :         timeline_id: detached.timeline_id,
     440            0 :         _guard: guard,
     441            0 :         gate_entered: Some(_gate_entered),
     442            0 :     })
     443            0 : }
     444              : 
     445            0 : fn reparented_direct_children(
     446            0 :     detached: &Arc<Timeline>,
     447            0 :     tenant: &Tenant,
     448            0 : ) -> Result<HashSet<TimelineId>, Error> {
     449            0 :     let mut all_direct_children = tenant
     450            0 :         .timelines
     451            0 :         .lock()
     452            0 :         .unwrap()
     453            0 :         .values()
     454            0 :         .filter_map(|tl| {
     455            0 :             let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
     456              : 
     457            0 :             if is_direct_child {
     458            0 :                 Some(tl.clone())
     459              :             } else {
     460            0 :                 if let Some(timeline) = tl.ancestor_timeline.as_ref() {
     461            0 :                     assert_ne!(timeline.timeline_id, detached.timeline_id, "we cannot have two timelines with the same timeline_id live");
     462            0 :                 }
     463            0 :                 None
     464              :             }
     465            0 :         })
     466            0 :         // Collect to avoid lock taking order problem with Tenant::timelines and
     467            0 :         // Timeline::remote_client
     468            0 :         .collect::<Vec<_>>();
     469            0 : 
     470            0 :     let mut any_shutdown = false;
     471            0 : 
     472            0 :     all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() {
     473            0 :         Ok(accessor) => accessor
     474            0 :             .latest_uploaded_index_part()
     475            0 :             .lineage
     476            0 :             .is_reparented(),
     477            0 :         Err(_shutdownalike) => {
     478            0 :             // not 100% a shutdown, but let's bail early not to give inconsistent results in
     479            0 :             // sharded enviroment.
     480            0 :             any_shutdown = true;
     481            0 :             true
     482              :         }
     483            0 :     });
     484            0 : 
     485            0 :     if any_shutdown {
     486              :         // it could be one or many being deleted; have client retry
     487            0 :         return Err(Error::ShuttingDown);
     488            0 :     }
     489            0 : 
     490            0 :     Ok(all_direct_children
     491            0 :         .into_iter()
     492            0 :         .map(|tl| tl.timeline_id)
     493            0 :         .collect())
     494            0 : }
     495              : 
     496            0 : fn partition_work(
     497            0 :     ancestor_lsn: Lsn,
     498            0 :     source: &LayerManager,
     499            0 : ) -> Result<(usize, Vec<Layer>, Vec<Layer>), Error> {
     500            0 :     let mut straddling_branchpoint = vec![];
     501            0 :     let mut rest_of_historic = vec![];
     502            0 : 
     503            0 :     let mut later_by_lsn = 0;
     504              : 
     505            0 :     for desc in source.layer_map()?.iter_historic_layers() {
     506              :         // off by one chances here:
     507              :         // - start is inclusive
     508              :         // - end is exclusive
     509            0 :         if desc.lsn_range.start > ancestor_lsn {
     510            0 :             later_by_lsn += 1;
     511            0 :             continue;
     512            0 :         }
     513              : 
     514            0 :         let target = if desc.lsn_range.start <= ancestor_lsn
     515            0 :             && desc.lsn_range.end > ancestor_lsn
     516            0 :             && desc.is_delta
     517              :         {
     518              :             // TODO: image layer at Lsn optimization
     519            0 :             &mut straddling_branchpoint
     520              :         } else {
     521            0 :             &mut rest_of_historic
     522              :         };
     523              : 
     524            0 :         target.push(source.get_from_desc(&desc));
     525              :     }
     526              : 
     527            0 :     Ok((later_by_lsn, straddling_branchpoint, rest_of_historic))
     528            0 : }
     529              : 
     530            0 : async fn upload_rewritten_layer(
     531            0 :     end_lsn: Lsn,
     532            0 :     layer: &Layer,
     533            0 :     target: &Arc<Timeline>,
     534            0 :     cancel: &CancellationToken,
     535            0 :     ctx: &RequestContext,
     536            0 : ) -> Result<Option<Layer>, Error> {
     537            0 :     let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
     538              : 
     539            0 :     let Some(copied) = copied else {
     540            0 :         return Ok(None);
     541              :     };
     542              : 
     543            0 :     target
     544            0 :         .remote_client
     545            0 :         .upload_layer_file(&copied, cancel)
     546            0 :         .await
     547            0 :         .map_err(|e| Error::launder(e, Error::Prepare))?;
     548              : 
     549            0 :     Ok(Some(copied.into()))
     550            0 : }
     551              : 
     552            0 : async fn copy_lsn_prefix(
     553            0 :     end_lsn: Lsn,
     554            0 :     layer: &Layer,
     555            0 :     target_timeline: &Arc<Timeline>,
     556            0 :     ctx: &RequestContext,
     557            0 : ) -> Result<Option<ResidentLayer>, Error> {
     558            0 :     if target_timeline.cancel.is_cancelled() {
     559            0 :         return Err(Error::ShuttingDown);
     560            0 :     }
     561            0 : 
     562            0 :     tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
     563              : 
     564            0 :     let mut writer = DeltaLayerWriter::new(
     565            0 :         target_timeline.conf,
     566            0 :         target_timeline.timeline_id,
     567            0 :         target_timeline.tenant_shard_id,
     568            0 :         layer.layer_desc().key_range.start,
     569            0 :         layer.layer_desc().lsn_range.start..end_lsn,
     570            0 :         ctx,
     571            0 :     )
     572            0 :     .await
     573            0 :     .with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}"))
     574            0 :     .map_err(Error::Prepare)?;
     575              : 
     576            0 :     let resident = layer.download_and_keep_resident().await.map_err(|e| {
     577            0 :         if e.is_cancelled() {
     578            0 :             Error::ShuttingDown
     579              :         } else {
     580            0 :             Error::Prepare(e.into())
     581              :         }
     582            0 :     })?;
     583              : 
     584            0 :     let records = resident
     585            0 :         .copy_delta_prefix(&mut writer, end_lsn, ctx)
     586            0 :         .await
     587            0 :         .with_context(|| format!("copy lsn prefix of ancestors {layer}"))
     588            0 :         .map_err(Error::Prepare)?;
     589              : 
     590            0 :     drop(resident);
     591            0 : 
     592            0 :     tracing::debug!(%layer, records, "copied records");
     593              : 
     594            0 :     if records == 0 {
     595            0 :         drop(writer);
     596            0 :         // TODO: we might want to store an empty marker in remote storage for this
     597            0 :         // layer so that we will not needlessly walk `layer` on repeated attempts.
     598            0 :         Ok(None)
     599              :     } else {
     600              :         // reuse the key instead of adding more holes between layers by using the real
     601              :         // highest key in the layer.
     602            0 :         let reused_highest_key = layer.layer_desc().key_range.end;
     603            0 :         let (desc, path) = writer
     604            0 :             .finish(reused_highest_key, ctx)
     605            0 :             .await
     606            0 :             .map_err(Error::Prepare)?;
     607            0 :         let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
     608            0 :             .map_err(Error::Prepare)?;
     609              : 
     610            0 :         tracing::debug!(%layer, %copied, "new layer produced");
     611              : 
     612            0 :         Ok(Some(copied))
     613              :     }
     614            0 : }
     615              : 
     616              : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
     617              : /// storage on successful return without the adopted layer being added to `index_part.json`.
     618            0 : async fn remote_copy(
     619            0 :     adopted: &Layer,
     620            0 :     adoptee: &Arc<Timeline>,
     621            0 :     generation: Generation,
     622            0 :     cancel: &CancellationToken,
     623            0 : ) -> Result<Layer, Error> {
     624            0 :     // depending if Layer::keep_resident we could hardlink
     625            0 : 
     626            0 :     let mut metadata = adopted.metadata();
     627            0 :     debug_assert!(metadata.generation <= generation);
     628            0 :     metadata.generation = generation;
     629            0 : 
     630            0 :     let owned = crate::tenant::storage_layer::Layer::for_evicted(
     631            0 :         adoptee.conf,
     632            0 :         adoptee,
     633            0 :         adopted.layer_desc().layer_name(),
     634            0 :         metadata,
     635            0 :     );
     636            0 : 
     637            0 :     adoptee
     638            0 :         .remote_client
     639            0 :         .copy_timeline_layer(adopted, &owned, cancel)
     640            0 :         .await
     641            0 :         .map(move |()| owned)
     642            0 :         .map_err(|e| Error::launder(e, Error::Prepare))
     643            0 : }
     644              : 
     645              : pub(crate) enum DetachingAndReparenting {
     646              :     /// All of the following timeline ids were reparented and the timeline ancestor detach must be
     647              :     /// marked as completed.
     648              :     Reparented(HashSet<TimelineId>),
     649              : 
     650              :     /// Some of the reparentings failed. The timeline ancestor detach must **not** be marked as
     651              :     /// completed.
     652              :     ///
     653              :     /// Nested `must_reset_tenant` is set to true when any restart requiring changes were made.
     654              :     SomeReparentingFailed { must_reset_tenant: bool },
     655              : 
     656              :     /// Detaching and reparentings were completed in a previous attempt. Timeline ancestor detach
     657              :     /// must be marked as completed.
     658              :     AlreadyDone(HashSet<TimelineId>),
     659              : }
     660              : 
     661              : impl DetachingAndReparenting {
     662            0 :     pub(crate) fn reset_tenant_required(&self) -> bool {
     663              :         use DetachingAndReparenting::*;
     664            0 :         match self {
     665            0 :             Reparented(_) => true,
     666            0 :             SomeReparentingFailed { must_reset_tenant } => *must_reset_tenant,
     667            0 :             AlreadyDone(_) => false,
     668              :         }
     669            0 :     }
     670              : 
     671            0 :     pub(crate) fn completed(self) -> Option<HashSet<TimelineId>> {
     672              :         use DetachingAndReparenting::*;
     673            0 :         match self {
     674            0 :             Reparented(x) | AlreadyDone(x) => Some(x),
     675            0 :             SomeReparentingFailed { .. } => None,
     676              :         }
     677            0 :     }
     678              : }
     679              : 
     680              : /// See [`Timeline::detach_from_ancestor_and_reparent`].
     681            0 : pub(super) async fn detach_and_reparent(
     682            0 :     detached: &Arc<Timeline>,
     683            0 :     tenant: &Tenant,
     684            0 :     prepared: PreparedTimelineDetach,
     685            0 :     _ctx: &RequestContext,
     686            0 : ) -> Result<DetachingAndReparenting, Error> {
     687            0 :     let PreparedTimelineDetach { layers } = prepared;
     688              : 
     689              :     #[derive(Debug)]
     690              :     enum Ancestor {
     691              :         NotDetached(Arc<Timeline>, Lsn),
     692              :         Detached(Arc<Timeline>, Lsn),
     693              :     }
     694              : 
     695            0 :     let (recorded_branchpoint, still_ongoing) = {
     696            0 :         let access = detached.remote_client.initialized_upload_queue()?;
     697            0 :         let latest = access.latest_uploaded_index_part();
     698            0 : 
     699            0 :         (
     700            0 :             latest.lineage.detached_previous_ancestor(),
     701            0 :             latest
     702            0 :                 .gc_blocking
     703            0 :                 .as_ref()
     704            0 :                 .is_some_and(|b| b.blocked_by(DetachAncestor)),
     705            0 :         )
     706            0 :     };
     707            0 :     assert!(
     708            0 :         still_ongoing,
     709            0 :         "cannot (detach? reparent)? complete if the operation is not still ongoing"
     710              :     );
     711              : 
     712            0 :     let ancestor = match (detached.ancestor_timeline.as_ref(), recorded_branchpoint) {
     713            0 :         (Some(ancestor), None) => {
     714            0 :             assert!(
     715            0 :                 !layers.is_empty(),
     716            0 :                 "there should always be at least one layer to inherit"
     717              :             );
     718            0 :             Ancestor::NotDetached(ancestor.clone(), detached.ancestor_lsn)
     719              :         }
     720              :         (Some(_), Some(_)) => {
     721            0 :             panic!(
     722            0 :                 "it should be impossible to get to here without having gone through the tenant reset; if the tenant was reset, then the ancestor_timeline would be None"
     723            0 :             );
     724              :         }
     725            0 :         (None, Some((ancestor_id, ancestor_lsn))) => {
     726            0 :             // it has been either:
     727            0 :             // - detached but still exists => we can try reparenting
     728            0 :             // - detached and deleted
     729            0 :             //
     730            0 :             // either way, we must complete
     731            0 :             assert!(
     732            0 :                 layers.is_empty(),
     733            0 :                 "no layers should had been copied as detach is done"
     734              :             );
     735              : 
     736            0 :             let existing = tenant.timelines.lock().unwrap().get(&ancestor_id).cloned();
     737              : 
     738            0 :             if let Some(ancestor) = existing {
     739            0 :                 Ancestor::Detached(ancestor, ancestor_lsn)
     740              :             } else {
     741            0 :                 let direct_children = reparented_direct_children(detached, tenant)?;
     742            0 :                 return Ok(DetachingAndReparenting::AlreadyDone(direct_children));
     743              :             }
     744              :         }
     745              :         (None, None) => {
     746              :             // TODO: make sure there are no `?` before tenant_reset from after a questionmark from
     747              :             // here.
     748            0 :             panic!(
     749            0 :             "bug: detach_and_reparent called on a timeline which has not been detached or which has no live ancestor"
     750            0 :             );
     751              :         }
     752              :     };
     753              : 
     754              :     // publish the prepared layers before we reparent any of the timelines, so that on restart
     755              :     // reparented timelines find layers. also do the actual detaching.
     756              :     //
     757              :     // if we crash after this operation, a retry will allow reparenting the remaining timelines as
     758              :     // gc is blocked.
     759              : 
     760            0 :     let (ancestor, ancestor_lsn, was_detached) = match ancestor {
     761            0 :         Ancestor::NotDetached(ancestor, ancestor_lsn) => {
     762            0 :             // this has to complete before any reparentings because otherwise they would not have
     763            0 :             // layers on the new parent.
     764            0 :             detached
     765            0 :                 .remote_client
     766            0 :                 .schedule_adding_existing_layers_to_index_detach_and_wait(
     767            0 :                     &layers,
     768            0 :                     (ancestor.timeline_id, ancestor_lsn),
     769            0 :                 )
     770            0 :                 .await
     771            0 :                 .context("publish layers and detach ancestor")
     772            0 :                 .map_err(|e| Error::launder(e, Error::DetachReparent))?;
     773              : 
     774            0 :             tracing::info!(
     775            0 :                 ancestor=%ancestor.timeline_id,
     776            0 :                 %ancestor_lsn,
     777            0 :                 inherited_layers=%layers.len(),
     778            0 :                 "detached from ancestor"
     779              :             );
     780            0 :             (ancestor, ancestor_lsn, true)
     781              :         }
     782            0 :         Ancestor::Detached(ancestor, ancestor_lsn) => (ancestor, ancestor_lsn, false),
     783              :     };
     784              : 
     785            0 :     let mut tasks = tokio::task::JoinSet::new();
     786            0 : 
     787            0 :     // Returns a single permit semaphore which will be used to make one reparenting succeed,
     788            0 :     // others will fail as if those timelines had been stopped for whatever reason.
     789            0 :     #[cfg(feature = "testing")]
     790            0 :     let failpoint_sem = || -> Option<Arc<Semaphore>> {
     791            0 :         fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
     792            0 :             Arc::new(Semaphore::new(1))
     793            0 :         ));
     794            0 :         None
     795            0 :     }();
     796            0 : 
     797            0 :     // because we are now keeping the slot in progress, it is unlikely that there will be any
     798            0 :     // timeline deletions during this time. if we raced one, then we'll just ignore it.
     799            0 :     {
     800            0 :         let g = tenant.timelines.lock().unwrap();
     801            0 :         reparentable_timelines(g.values(), detached, &ancestor, ancestor_lsn)
     802            0 :             .cloned()
     803            0 :             .for_each(|timeline| {
     804              :                 // important in this scope: we are holding the Tenant::timelines lock
     805            0 :                 let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
     806            0 :                 let new_parent = detached.timeline_id;
     807            0 :                 #[cfg(feature = "testing")]
     808            0 :                 let failpoint_sem = failpoint_sem.clone();
     809            0 : 
     810            0 :                 tasks.spawn(
     811            0 :                     async move {
     812            0 :                         let res = async {
     813              :                             #[cfg(feature = "testing")]
     814            0 :                             if let Some(failpoint_sem) = failpoint_sem {
     815            0 :                                 let _permit = failpoint_sem.acquire().await.map_err(|_| {
     816            0 :                                     anyhow::anyhow!(
     817            0 :                                         "failpoint: timeline-detach-ancestor::allow_one_reparented",
     818            0 :                                     )
     819            0 :                                 })?;
     820            0 :                                 failpoint_sem.close();
     821            0 :                             }
     822              : 
     823            0 :                             timeline
     824            0 :                                 .remote_client
     825            0 :                                 .schedule_reparenting_and_wait(&new_parent)
     826            0 :                                 .await
     827            0 :                         }
     828            0 :                         .await;
     829              : 
     830            0 :                         match res {
     831              :                             Ok(()) => {
     832            0 :                                 tracing::info!("reparented");
     833            0 :                                 Some(timeline)
     834              :                             }
     835            0 :                             Err(e) => {
     836            0 :                                 // with the use of tenant slot, raced timeline deletion is the most
     837            0 :                                 // likely reason.
     838            0 :                                 tracing::warn!("reparenting failed: {e:#}");
     839            0 :                                 None
     840              :                             }
     841              :                         }
     842            0 :                     }
     843            0 :                     .instrument(span),
     844            0 :                 );
     845            0 :             });
     846            0 :     }
     847            0 : 
     848            0 :     let reparenting_candidates = tasks.len();
     849            0 :     let mut reparented = HashSet::with_capacity(tasks.len());
     850              : 
     851            0 :     while let Some(res) = tasks.join_next().await {
     852            0 :         match res {
     853            0 :             Ok(Some(timeline)) => {
     854            0 :                 assert!(
     855            0 :                     reparented.insert(timeline.timeline_id),
     856            0 :                     "duplicate reparenting? timeline_id={}",
     857            0 :                     timeline.timeline_id
     858              :                 );
     859              :             }
     860            0 :             Err(je) if je.is_cancelled() => unreachable!("not used"),
     861              :             // just ignore failures now, we can retry
     862            0 :             Ok(None) => {}
     863            0 :             Err(je) if je.is_panic() => {}
     864            0 :             Err(je) => tracing::error!("unexpected join error: {je:?}"),
     865              :         }
     866              :     }
     867              : 
     868            0 :     let reparented_all = reparenting_candidates == reparented.len();
     869            0 : 
     870            0 :     if reparented_all {
     871            0 :         Ok(DetachingAndReparenting::Reparented(reparented))
     872              :     } else {
     873            0 :         tracing::info!(
     874            0 :             reparented = reparented.len(),
     875            0 :             candidates = reparenting_candidates,
     876            0 :             "failed to reparent all candidates; they can be retried after the tenant_reset",
     877              :         );
     878              : 
     879            0 :         let must_reset_tenant = !reparented.is_empty() || was_detached;
     880            0 :         Ok(DetachingAndReparenting::SomeReparentingFailed { must_reset_tenant })
     881              :     }
     882            0 : }
     883              : 
     884            0 : pub(super) async fn complete(
     885            0 :     detached: &Arc<Timeline>,
     886            0 :     tenant: &Tenant,
     887            0 :     mut attempt: Attempt,
     888            0 :     _ctx: &RequestContext,
     889            0 : ) -> Result<(), Error> {
     890            0 :     assert_eq!(detached.timeline_id, attempt.timeline_id);
     891              : 
     892            0 :     if attempt.gate_entered.is_none() {
     893            0 :         let entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
     894            0 :         attempt.gate_entered = Some(entered);
     895            0 :     } else {
     896            0 :         // Some(gate_entered) means the tenant was not restarted, as is not required
     897            0 :     }
     898              : 
     899            0 :     assert!(detached.ancestor_timeline.is_none());
     900              : 
     901              :     // this should be an 503 at least...?
     902            0 :     fail::fail_point!(
     903            0 :         "timeline-detach-ancestor::complete_before_uploading",
     904            0 :         |_| Err(Error::Failpoint(
     905            0 :             "timeline-detach-ancestor::complete_before_uploading"
     906            0 :         ))
     907            0 :     );
     908              : 
     909            0 :     tenant
     910            0 :         .gc_block
     911            0 :         .remove(
     912            0 :             detached,
     913            0 :             crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
     914            0 :         )
     915            0 :         .await
     916            0 :         .map_err(|e| Error::launder(e, Error::Complete))?;
     917              : 
     918            0 :     Ok(())
     919            0 : }
     920              : 
     921              : /// Query against a locked `Tenant::timelines`.
     922            0 : fn reparentable_timelines<'a, I>(
     923            0 :     timelines: I,
     924            0 :     detached: &'a Arc<Timeline>,
     925            0 :     ancestor: &'a Arc<Timeline>,
     926            0 :     ancestor_lsn: Lsn,
     927            0 : ) -> impl Iterator<Item = &'a Arc<Timeline>> + 'a
     928            0 : where
     929            0 :     I: Iterator<Item = &'a Arc<Timeline>> + 'a,
     930            0 : {
     931            0 :     timelines.filter_map(move |tl| {
     932            0 :         if Arc::ptr_eq(tl, detached) {
     933            0 :             return None;
     934            0 :         }
     935              : 
     936            0 :         let tl_ancestor = tl.ancestor_timeline.as_ref()?;
     937            0 :         let is_same = Arc::ptr_eq(ancestor, tl_ancestor);
     938            0 :         let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
     939            0 : 
     940            0 :         let is_deleting = tl
     941            0 :             .delete_progress
     942            0 :             .try_lock()
     943            0 :             .map(|flow| !flow.is_not_started())
     944            0 :             .unwrap_or(true);
     945            0 : 
     946            0 :         if is_same && is_earlier && !is_deleting {
     947            0 :             Some(tl)
     948              :         } else {
     949            0 :             None
     950              :         }
     951            0 :     })
     952            0 : }
        

Generated by: LCOV version 2.1-beta