LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - detach_ancestor.rs (source / functions) Coverage Total Hit
Test: 496e96cdfff2df79370229591d6427cda12fde29.info Lines: 0.0 % 372 0
Test Date: 2024-05-21 18:28:29 Functions: 0.0 % 24 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use super::{layer_manager::LayerManager, Timeline};
       4              : use crate::{
       5              :     context::{DownloadBehavior, RequestContext},
       6              :     task_mgr::TaskKind,
       7              :     tenant::{
       8              :         storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
       9              :         Tenant,
      10              :     },
      11              :     virtual_file::{MaybeFatalIo, VirtualFile},
      12              : };
      13              : use tokio_util::sync::CancellationToken;
      14              : use tracing::Instrument;
      15              : use utils::{completion, generation::Generation, id::TimelineId, lsn::Lsn};
      16              : 
      17            0 : #[derive(Debug, thiserror::Error)]
      18              : pub(crate) enum Error {
      19              :     #[error("no ancestors")]
      20              :     NoAncestor,
      21              :     #[error("too many ancestors")]
      22              :     TooManyAncestors,
      23              :     #[error("shutting down, please retry later")]
      24              :     ShuttingDown,
      25              :     #[error("flushing failed")]
      26              :     FlushAncestor(#[source] anyhow::Error),
      27              :     #[error("layer download failed")]
      28              :     RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
      29              :     #[error("copying LSN prefix locally failed")]
      30              :     CopyDeltaPrefix(#[source] anyhow::Error),
      31              :     #[error("upload rewritten layer")]
      32              :     UploadRewritten(#[source] anyhow::Error),
      33              : 
      34              :     #[error("ancestor is already being detached by: {}", .0)]
      35              :     OtherTimelineDetachOngoing(TimelineId),
      36              : 
      37              :     #[error("remote copying layer failed")]
      38              :     CopyFailed(#[source] anyhow::Error),
      39              : 
      40              :     #[error("unexpected error")]
      41              :     Unexpected(#[source] anyhow::Error),
      42              : }
      43              : 
      44              : pub(crate) struct PreparedTimelineDetach {
      45              :     layers: Vec<Layer>,
      46              : }
      47              : 
      48              : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
      49              : #[derive(Debug)]
      50              : pub(crate) struct Options {
      51              :     pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
      52              :     pub(crate) copy_concurrency: std::num::NonZeroUsize,
      53              : }
      54              : 
      55              : impl Default for Options {
      56            0 :     fn default() -> Self {
      57            0 :         Self {
      58            0 :             rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
      59            0 :             copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
      60            0 :         }
      61            0 :     }
      62              : }
      63              : 
      64              : /// See [`Timeline::prepare_to_detach_from_ancestor`]
      65            0 : pub(super) async fn prepare(
      66            0 :     detached: &Arc<Timeline>,
      67            0 :     tenant: &Tenant,
      68            0 :     options: Options,
      69            0 :     ctx: &RequestContext,
      70            0 : ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
      71              :     use Error::*;
      72              : 
      73            0 :     let Some((ancestor, ancestor_lsn)) = detached
      74            0 :         .ancestor_timeline
      75            0 :         .as_ref()
      76            0 :         .map(|tl| (tl.clone(), detached.ancestor_lsn))
      77              :     else {
      78            0 :         return Err(NoAncestor);
      79              :     };
      80              : 
      81            0 :     if !ancestor_lsn.is_valid() {
      82            0 :         return Err(NoAncestor);
      83            0 :     }
      84            0 : 
      85            0 :     if ancestor.ancestor_timeline.is_some() {
      86              :         // non-technical requirement; we could flatten N ancestors just as easily but we chose
      87              :         // not to
      88            0 :         return Err(TooManyAncestors);
      89            0 :     }
      90            0 : 
      91            0 :     // before we acquire the gate, we must mark the ancestor as having a detach operation
      92            0 :     // ongoing which will block other concurrent detach operations so we don't get to ackward
      93            0 :     // situations where there would be two branches trying to reparent earlier branches.
      94            0 :     let (guard, barrier) = completion::channel();
      95            0 : 
      96            0 :     {
      97            0 :         let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
      98            0 :         if let Some((tl, other)) = guard.as_ref() {
      99            0 :             if !other.is_ready() {
     100            0 :                 return Err(OtherTimelineDetachOngoing(*tl));
     101            0 :             }
     102            0 :         }
     103            0 :         *guard = Some((detached.timeline_id, barrier));
     104              :     }
     105              : 
     106            0 :     let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
     107              : 
     108            0 :     if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
     109            0 :         let span =
     110            0 :             tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
     111            0 :         async {
     112            0 :             let started_at = std::time::Instant::now();
     113            0 :             let freeze_and_flush = ancestor.freeze_and_flush0();
     114            0 :             let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
     115              : 
     116            0 :             let res =
     117            0 :                 tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
     118            0 :                     .await;
     119              : 
     120            0 :             let res = match res {
     121            0 :                 Ok(res) => res,
     122            0 :                 Err(_elapsed) => {
     123            0 :                     tracing::info!("freezing and flushing ancestor is still ongoing");
     124            0 :                     freeze_and_flush.await
     125              :                 }
     126              :             };
     127              : 
     128            0 :             res.map_err(FlushAncestor)?;
     129              : 
     130              :             // we do not need to wait for uploads to complete but we do need `struct Layer`,
     131              :             // copying delta prefix is unsupported currently for `InMemoryLayer`.
     132            0 :             tracing::info!(
     133            0 :                 elapsed_ms = started_at.elapsed().as_millis(),
     134            0 :                 "froze and flushed the ancestor"
     135              :             );
     136            0 :             Ok(())
     137            0 :         }
     138            0 :         .instrument(span)
     139            0 :         .await?;
     140            0 :     }
     141              : 
     142            0 :     let end_lsn = ancestor_lsn + 1;
     143              : 
     144            0 :     let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
     145            0 :         // we do not need to start from our layers, because they can only be layers that come
     146            0 :         // *after* ancestor_lsn
     147            0 :         let layers = tokio::select! {
     148              :             guard = ancestor.layers.read() => guard,
     149              :             _ = detached.cancel.cancelled() => {
     150              :                 return Err(ShuttingDown);
     151              :             }
     152              :             _ = ancestor.cancel.cancelled() => {
     153              :                 return Err(ShuttingDown);
     154              :             }
     155            0 :         };
     156            0 : 
     157            0 :         // between retries, these can change if compaction or gc ran in between. this will mean
     158            0 :         // we have to redo work.
     159            0 :         partition_work(ancestor_lsn, &layers)
     160            0 :     };
     161            0 : 
     162            0 :     // TODO: layers are already sorted by something: use that to determine how much of remote
     163            0 :     // copies are already done.
     164            0 :     tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
     165              : 
     166              :     // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
     167            0 :     let mut new_layers: Vec<Layer> =
     168            0 :         Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
     169            0 : 
     170            0 :     {
     171            0 :         tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
     172              : 
     173            0 :         let mut tasks = tokio::task::JoinSet::new();
     174            0 : 
     175            0 :         let mut wrote_any = false;
     176            0 : 
     177            0 :         let limiter = Arc::new(tokio::sync::Semaphore::new(
     178            0 :             options.rewrite_concurrency.get(),
     179            0 :         ));
     180              : 
     181            0 :         for layer in straddling_branchpoint {
     182            0 :             let limiter = limiter.clone();
     183            0 :             let timeline = detached.clone();
     184            0 :             let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
     185            0 : 
     186            0 :             tasks.spawn(async move {
     187            0 :                 let _permit = limiter.acquire().await;
     188            0 :                 let copied =
     189            0 :                     upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
     190            0 :                         .await?;
     191            0 :                 Ok(copied)
     192            0 :             });
     193            0 :         }
     194              : 
     195            0 :         while let Some(res) = tasks.join_next().await {
     196            0 :             match res {
     197            0 :                 Ok(Ok(Some(copied))) => {
     198            0 :                     wrote_any = true;
     199            0 :                     tracing::info!(layer=%copied, "rewrote and uploaded");
     200            0 :                     new_layers.push(copied);
     201              :                 }
     202            0 :                 Ok(Ok(None)) => {}
     203            0 :                 Ok(Err(e)) => return Err(e),
     204            0 :                 Err(je) => return Err(Unexpected(je.into())),
     205              :             }
     206              :         }
     207              : 
     208              :         // FIXME: the fsync should be mandatory, after both rewrites and copies
     209            0 :         if wrote_any {
     210            0 :             let timeline_dir = VirtualFile::open(
     211            0 :                 &detached
     212            0 :                     .conf
     213            0 :                     .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
     214            0 :                 ctx,
     215            0 :             )
     216            0 :             .await
     217            0 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     218            0 :             timeline_dir
     219            0 :                 .sync_all()
     220            0 :                 .await
     221            0 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     222            0 :         }
     223              :     }
     224              : 
     225            0 :     let mut tasks = tokio::task::JoinSet::new();
     226            0 :     let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
     227              : 
     228            0 :     for adopted in rest_of_historic {
     229            0 :         let limiter = limiter.clone();
     230            0 :         let timeline = detached.clone();
     231            0 : 
     232            0 :         tasks.spawn(
     233            0 :             async move {
     234            0 :                 let _permit = limiter.acquire().await;
     235            0 :                 let owned =
     236            0 :                     remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
     237            0 :                 tracing::info!(layer=%owned, "remote copied");
     238            0 :                 Ok(owned)
     239            0 :             }
     240            0 :             .in_current_span(),
     241            0 :         );
     242            0 :     }
     243              : 
     244            0 :     while let Some(res) = tasks.join_next().await {
     245            0 :         match res {
     246            0 :             Ok(Ok(owned)) => {
     247            0 :                 new_layers.push(owned);
     248            0 :             }
     249            0 :             Ok(Err(failed)) => {
     250            0 :                 return Err(failed);
     251              :             }
     252            0 :             Err(je) => return Err(Unexpected(je.into())),
     253              :         }
     254              :     }
     255              : 
     256              :     // TODO: fsync directory again if we hardlinked something
     257              : 
     258            0 :     let prepared = PreparedTimelineDetach { layers: new_layers };
     259            0 : 
     260            0 :     Ok((guard, prepared))
     261            0 : }
     262              : 
     263            0 : fn partition_work(
     264            0 :     ancestor_lsn: Lsn,
     265            0 :     source_layermap: &LayerManager,
     266            0 : ) -> (usize, Vec<Layer>, Vec<Layer>) {
     267            0 :     let mut straddling_branchpoint = vec![];
     268            0 :     let mut rest_of_historic = vec![];
     269            0 : 
     270            0 :     let mut later_by_lsn = 0;
     271              : 
     272            0 :     for desc in source_layermap.layer_map().iter_historic_layers() {
     273              :         // off by one chances here:
     274              :         // - start is inclusive
     275              :         // - end is exclusive
     276            0 :         if desc.lsn_range.start > ancestor_lsn {
     277            0 :             later_by_lsn += 1;
     278            0 :             continue;
     279            0 :         }
     280              : 
     281            0 :         let target = if desc.lsn_range.start <= ancestor_lsn
     282            0 :             && desc.lsn_range.end > ancestor_lsn
     283            0 :             && desc.is_delta
     284              :         {
     285              :             // TODO: image layer at Lsn optimization
     286            0 :             &mut straddling_branchpoint
     287              :         } else {
     288            0 :             &mut rest_of_historic
     289              :         };
     290              : 
     291            0 :         target.push(source_layermap.get_from_desc(&desc));
     292              :     }
     293              : 
     294            0 :     (later_by_lsn, straddling_branchpoint, rest_of_historic)
     295            0 : }
     296              : 
     297            0 : async fn upload_rewritten_layer(
     298            0 :     end_lsn: Lsn,
     299            0 :     layer: &Layer,
     300            0 :     target: &Arc<Timeline>,
     301            0 :     cancel: &CancellationToken,
     302            0 :     ctx: &RequestContext,
     303            0 : ) -> Result<Option<Layer>, Error> {
     304              :     use Error::UploadRewritten;
     305            0 :     let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
     306              : 
     307            0 :     let Some(copied) = copied else {
     308            0 :         return Ok(None);
     309              :     };
     310              : 
     311              :     // FIXME: better shuttingdown error
     312            0 :     target
     313            0 :         .remote_client
     314            0 :         .upload_layer_file(&copied, cancel)
     315            0 :         .await
     316            0 :         .map_err(UploadRewritten)?;
     317              : 
     318            0 :     Ok(Some(copied.into()))
     319            0 : }
     320              : 
     321            0 : async fn copy_lsn_prefix(
     322            0 :     end_lsn: Lsn,
     323            0 :     layer: &Layer,
     324            0 :     target_timeline: &Arc<Timeline>,
     325            0 :     ctx: &RequestContext,
     326            0 : ) -> Result<Option<ResidentLayer>, Error> {
     327            0 :     use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
     328            0 : 
     329            0 :     tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
     330              : 
     331            0 :     let mut writer = DeltaLayerWriter::new(
     332            0 :         target_timeline.conf,
     333            0 :         target_timeline.timeline_id,
     334            0 :         target_timeline.tenant_shard_id,
     335            0 :         layer.layer_desc().key_range.start,
     336            0 :         layer.layer_desc().lsn_range.start..end_lsn,
     337            0 :         ctx,
     338            0 :     )
     339            0 :     .await
     340            0 :     .map_err(CopyDeltaPrefix)?;
     341              : 
     342            0 :     let resident = layer
     343            0 :         .download_and_keep_resident()
     344            0 :         .await
     345              :         // likely shutdown
     346            0 :         .map_err(RewrittenDeltaDownloadFailed)?;
     347              : 
     348            0 :     let records = resident
     349            0 :         .copy_delta_prefix(&mut writer, end_lsn, ctx)
     350            0 :         .await
     351            0 :         .map_err(CopyDeltaPrefix)?;
     352              : 
     353            0 :     drop(resident);
     354            0 : 
     355            0 :     tracing::debug!(%layer, records, "copied records");
     356              : 
     357            0 :     if records == 0 {
     358            0 :         drop(writer);
     359            0 :         // TODO: we might want to store an empty marker in remote storage for this
     360            0 :         // layer so that we will not needlessly walk `layer` on repeated attempts.
     361            0 :         Ok(None)
     362              :     } else {
     363              :         // reuse the key instead of adding more holes between layers by using the real
     364              :         // highest key in the layer.
     365            0 :         let reused_highest_key = layer.layer_desc().key_range.end;
     366            0 :         let copied = writer
     367            0 :             .finish(reused_highest_key, target_timeline, ctx)
     368            0 :             .await
     369            0 :             .map_err(CopyDeltaPrefix)?;
     370              : 
     371            0 :         tracing::debug!(%layer, %copied, "new layer produced");
     372              : 
     373            0 :         Ok(Some(copied))
     374              :     }
     375            0 : }
     376              : 
     377              : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
     378              : /// storage on successful return without the adopted layer being added to `index_part.json`.
     379            0 : async fn remote_copy(
     380            0 :     adopted: &Layer,
     381            0 :     adoptee: &Arc<Timeline>,
     382            0 :     generation: Generation,
     383            0 :     cancel: &CancellationToken,
     384            0 : ) -> Result<Layer, Error> {
     385            0 :     use Error::CopyFailed;
     386            0 : 
     387            0 :     // depending if Layer::keep_resident we could hardlink
     388            0 : 
     389            0 :     let mut metadata = adopted.metadata();
     390            0 :     debug_assert!(metadata.generation <= generation);
     391            0 :     metadata.generation = generation;
     392            0 : 
     393            0 :     let owned = crate::tenant::storage_layer::Layer::for_evicted(
     394            0 :         adoptee.conf,
     395            0 :         adoptee,
     396            0 :         adopted.layer_desc().layer_name(),
     397            0 :         metadata,
     398            0 :     );
     399            0 : 
     400            0 :     // FIXME: better shuttingdown error
     401            0 :     adoptee
     402            0 :         .remote_client
     403            0 :         .copy_timeline_layer(adopted, &owned, cancel)
     404            0 :         .await
     405            0 :         .map(move |()| owned)
     406            0 :         .map_err(CopyFailed)
     407            0 : }
     408              : 
     409              : /// See [`Timeline::complete_detaching_timeline_ancestor`].
     410            0 : pub(super) async fn complete(
     411            0 :     detached: &Arc<Timeline>,
     412            0 :     tenant: &Tenant,
     413            0 :     prepared: PreparedTimelineDetach,
     414            0 :     _ctx: &RequestContext,
     415            0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
     416            0 :     let PreparedTimelineDetach { layers } = prepared;
     417            0 : 
     418            0 :     let ancestor = detached
     419            0 :         .get_ancestor_timeline()
     420            0 :         .expect("must still have a ancestor");
     421            0 :     let ancestor_lsn = detached.get_ancestor_lsn();
     422            0 : 
     423            0 :     // publish the prepared layers before we reparent any of the timelines, so that on restart
     424            0 :     // reparented timelines find layers. also do the actual detaching.
     425            0 :     //
     426            0 :     // if we crash after this operation, we will at least come up having detached a timeline, but
     427            0 :     // we cannot go back and reparent the timelines which would had been reparented in normal
     428            0 :     // execution.
     429            0 :     //
     430            0 :     // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
     431            0 :     // which could give us a completely wrong layer combination.
     432            0 :     detached
     433            0 :         .remote_client
     434            0 :         .schedule_adding_existing_layers_to_index_detach_and_wait(
     435            0 :             &layers,
     436            0 :             (ancestor.timeline_id, ancestor_lsn),
     437            0 :         )
     438            0 :         .await?;
     439              : 
     440            0 :     let mut tasks = tokio::task::JoinSet::new();
     441            0 : 
     442            0 :     // because we are now keeping the slot in progress, it is unlikely that there will be any
     443            0 :     // timeline deletions during this time. if we raced one, then we'll just ignore it.
     444            0 :     tenant
     445            0 :         .timelines
     446            0 :         .lock()
     447            0 :         .unwrap()
     448            0 :         .values()
     449            0 :         .filter_map(|tl| {
     450            0 :             if Arc::ptr_eq(tl, detached) {
     451            0 :                 return None;
     452            0 :             }
     453            0 : 
     454            0 :             if !tl.is_active() {
     455            0 :                 return None;
     456            0 :             }
     457              : 
     458            0 :             let tl_ancestor = tl.ancestor_timeline.as_ref()?;
     459            0 :             let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
     460            0 :             let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
     461            0 : 
     462            0 :             let is_deleting = tl
     463            0 :                 .delete_progress
     464            0 :                 .try_lock()
     465            0 :                 .map(|flow| !flow.is_not_started())
     466            0 :                 .unwrap_or(true);
     467            0 : 
     468            0 :             if is_same && is_earlier && !is_deleting {
     469            0 :                 Some(tl.clone())
     470              :             } else {
     471            0 :                 None
     472              :             }
     473            0 :         })
     474            0 :         .for_each(|timeline| {
     475              :             // important in this scope: we are holding the Tenant::timelines lock
     476            0 :             let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
     477            0 :             let new_parent = detached.timeline_id;
     478            0 : 
     479            0 :             tasks.spawn(
     480            0 :                 async move {
     481            0 :                     let res = timeline
     482            0 :                         .remote_client
     483            0 :                         .schedule_reparenting_and_wait(&new_parent)
     484            0 :                         .await;
     485              : 
     486            0 :                     match res {
     487            0 :                         Ok(()) => Some(timeline),
     488            0 :                         Err(e) => {
     489            0 :                             // with the use of tenant slot, we no longer expect these.
     490            0 :                             tracing::warn!("reparenting failed: {e:#}");
     491            0 :                             None
     492              :                         }
     493              :                     }
     494            0 :                 }
     495            0 :                 .instrument(span),
     496            0 :             );
     497            0 :         });
     498            0 : 
     499            0 :     let reparenting_candidates = tasks.len();
     500            0 :     let mut reparented = Vec::with_capacity(tasks.len());
     501              : 
     502            0 :     while let Some(res) = tasks.join_next().await {
     503            0 :         match res {
     504            0 :             Ok(Some(timeline)) => {
     505            0 :                 tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
     506            0 :                 reparented.push(timeline.timeline_id);
     507              :             }
     508            0 :             Ok(None) => {
     509            0 :                 // lets just ignore this for now. one or all reparented timelines could had
     510            0 :                 // started deletion, and that is fine.
     511            0 :             }
     512            0 :             Err(je) if je.is_cancelled() => unreachable!("not used"),
     513            0 :             Err(je) if je.is_panic() => {
     514            0 :                 // ignore; it's better to continue with a single reparenting failing (or even
     515            0 :                 // all of them) in order to get to the goal state.
     516            0 :                 //
     517            0 :                 // these timelines will never be reparentable, but they can be always detached as
     518            0 :                 // separate tree roots.
     519            0 :             }
     520            0 :             Err(je) => tracing::error!("unexpected join error: {je:?}"),
     521              :         }
     522              :     }
     523              : 
     524            0 :     if reparenting_candidates != reparented.len() {
     525            0 :         tracing::info!("failed to reparent some candidates");
     526            0 :     }
     527              : 
     528            0 :     Ok(reparented)
     529            0 : }
        

Generated by: LCOV version 2.1-beta