LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - uninit.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 68.2 % 195 133
Test Date: 2025-02-20 13:11:02 Functions: 47.8 % 23 11

            Line data    Source code
       1              : use std::{collections::hash_map::Entry, fs, future::Future, sync::Arc};
       2              : 
       3              : use anyhow::Context;
       4              : use camino::Utf8PathBuf;
       5              : use tracing::{error, info, info_span};
       6              : use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard};
       7              : 
       8              : use crate::{
       9              :     context::RequestContext,
      10              :     import_datadir,
      11              :     span::debug_assert_current_span_has_tenant_and_timeline_id,
      12              :     tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
      13              : };
      14              : 
      15              : use super::Timeline;
      16              : 
      17              : /// A timeline with some of its files on disk, being initialized.
      18              : /// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
      19              : /// its local files are removed.  If we crash while this class exists, then the timeline's local
      20              : /// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
      21              : ///
      22              : /// The caller is responsible for proper timeline data filling before the final init.
      23              : #[must_use]
      24              : pub struct UninitializedTimeline<'t> {
      25              :     pub(crate) owning_tenant: &'t Tenant,
      26              :     timeline_id: TimelineId,
      27              :     raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
      28              :     /// Whether we spawned the inner Timeline's tasks such that we must later shut it down
      29              :     /// if aborting the timeline creation
      30              :     needs_shutdown: bool,
      31              : }
      32              : 
      33              : impl<'t> UninitializedTimeline<'t> {
      34          884 :     pub(crate) fn new(
      35          884 :         owning_tenant: &'t Tenant,
      36          884 :         timeline_id: TimelineId,
      37          884 :         raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
      38          884 :     ) -> Self {
      39          884 :         Self {
      40          884 :             owning_tenant,
      41          884 :             timeline_id,
      42          884 :             raw_timeline,
      43          884 :             needs_shutdown: false,
      44          884 :         }
      45          884 :     }
      46              : 
      47              :     /// When writing data to this timeline during creation, use this wrapper: it will take care of
      48              :     /// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
      49              :     /// later.
      50            4 :     pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
      51            4 :     where
      52            4 :         F: FnOnce(Arc<Timeline>) -> Fut,
      53            4 :         Fut: Future<Output = Result<(), CreateTimelineError>>,
      54            4 :     {
      55            4 :         debug_assert_current_span_has_tenant_and_timeline_id();
      56            4 : 
      57            4 :         // Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
      58            4 :         self.needs_shutdown = true;
      59              : 
      60            4 :         let timeline = self.raw_timeline()?;
      61              : 
      62              :         // Spawn flush loop so that the Timeline is ready to accept writes
      63            4 :         timeline.maybe_spawn_flush_loop();
      64              : 
      65              :         // Invoke the provided function, which will write some data into the new timeline
      66            4 :         if let Err(e) = f(timeline.clone()).await {
      67            0 :             self.abort().await;
      68            0 :             return Err(e.into());
      69            4 :         }
      70              : 
      71              :         // Flush the underlying timeline's ephemeral layers to disk
      72            4 :         if let Err(e) = timeline
      73            4 :             .freeze_and_flush()
      74            4 :             .await
      75            4 :             .context("Failed to flush after timeline creation writes")
      76              :         {
      77            0 :             self.abort().await;
      78            0 :             return Err(e);
      79            4 :         }
      80            4 : 
      81            4 :         Ok(())
      82            4 :     }
      83              : 
      84            0 :     pub(crate) async fn abort(&self) {
      85            0 :         if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
      86            0 :             raw_timeline.shutdown(super::ShutdownMode::Hard).await;
      87            0 :         }
      88            0 :     }
      89              : 
      90              :     /// Finish timeline creation: insert it into the Tenant's timelines map
      91              :     ///
      92              :     /// This function launches the flush loop if not already done.
      93              :     ///
      94              :     /// The caller is responsible for activating the timeline (function `.activate()`).
      95          868 :     pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
      96          868 :         let timeline_id = self.timeline_id;
      97          868 :         let tenant_shard_id = self.owning_tenant.tenant_shard_id;
      98          868 : 
      99          868 :         if self.raw_timeline.is_none() {
     100            0 :             self.abort().await;
     101              : 
     102            0 :             return Err(anyhow::anyhow!(
     103            0 :                 "No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
     104            0 :             ));
     105          868 :         }
     106          868 : 
     107          868 :         // Check that the caller initialized disk_consistent_lsn
     108          868 :         let new_disk_consistent_lsn = self
     109          868 :             .raw_timeline
     110          868 :             .as_ref()
     111          868 :             .expect("checked above")
     112          868 :             .0
     113          868 :             .get_disk_consistent_lsn();
     114          868 : 
     115          868 :         if !new_disk_consistent_lsn.is_valid() {
     116            0 :             self.abort().await;
     117              : 
     118            0 :             return Err(anyhow::anyhow!(
     119            0 :                 "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
     120            0 :             ));
     121          868 :         }
     122          868 : 
     123          868 :         let mut timelines = self.owning_tenant.timelines.lock().unwrap();
     124          868 :         match timelines.entry(timeline_id) {
     125              :             Entry::Occupied(_) => {
     126              :                 // Unexpected, bug in the caller.  Tenant is responsible for preventing concurrent creation of the same timeline.
     127              :                 //
     128              :                 // We do not call Self::abort here.  Because we don't cleanly shut down our Timeline, [`Self::drop`] should
     129              :                 // skip trying to delete the timeline directory too.
     130            0 :                 anyhow::bail!(
     131            0 :                 "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
     132            0 :                 )
     133              :             }
     134          868 :             Entry::Vacant(v) => {
     135          868 :                 // after taking here should be no fallible operations, because the drop guard will not
     136          868 :                 // cleanup after and would block for example the tenant deletion
     137          868 :                 let (new_timeline, _create_guard) =
     138          868 :                     self.raw_timeline.take().expect("already checked");
     139          868 : 
     140          868 :                 v.insert(Arc::clone(&new_timeline));
     141          868 : 
     142          868 :                 new_timeline.maybe_spawn_flush_loop();
     143          868 : 
     144          868 :                 Ok(new_timeline)
     145              :             }
     146              :         }
     147          868 :     }
     148              : 
     149            0 :     pub(crate) fn finish_creation_myself(&mut self) -> (Arc<Timeline>, TimelineCreateGuard) {
     150            0 :         self.raw_timeline.take().expect("already checked")
     151            0 :     }
     152              : 
     153              :     /// Prepares timeline data by loading it from the basebackup archive.
     154            0 :     pub(crate) async fn import_basebackup_from_tar(
     155            0 :         mut self,
     156            0 :         tenant: Arc<Tenant>,
     157            0 :         copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
     158            0 :         base_lsn: Lsn,
     159            0 :         broker_client: storage_broker::BrokerClientChannel,
     160            0 :         ctx: &RequestContext,
     161            0 :     ) -> anyhow::Result<Arc<Timeline>> {
     162            0 :         self.write(|raw_timeline| async move {
     163            0 :             import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
     164            0 :                 .await
     165            0 :                 .context("Failed to import basebackup")
     166            0 :                 .map_err(CreateTimelineError::Other)?;
     167              : 
     168            0 :             fail::fail_point!("before-checkpoint-new-timeline", |_| {
     169            0 :                 Err(CreateTimelineError::Other(anyhow::anyhow!(
     170            0 :                     "failpoint before-checkpoint-new-timeline"
     171            0 :                 )))
     172            0 :             });
     173              : 
     174            0 :             Ok(())
     175            0 :         })
     176            0 :         .await?;
     177              : 
     178              :         // All the data has been imported. Insert the Timeline into the tenant's timelines map
     179            0 :         let tl = self.finish_creation().await?;
     180            0 :         tl.activate(tenant, broker_client, None, ctx);
     181            0 :         Ok(tl)
     182            0 :     }
     183              : 
     184          428 :     pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
     185          428 :         Ok(&self
     186          428 :             .raw_timeline
     187          428 :             .as_ref()
     188          428 :             .with_context(|| {
     189            0 :                 format!(
     190            0 :                     "No raw timeline {}/{} found",
     191            0 :                     self.owning_tenant.tenant_shard_id, self.timeline_id
     192            0 :                 )
     193          428 :             })?
     194              :             .0)
     195          428 :     }
     196              : }
     197              : 
     198              : impl Drop for UninitializedTimeline<'_> {
     199          880 :     fn drop(&mut self) {
     200          880 :         if let Some((timeline, create_guard)) = self.raw_timeline.take() {
     201           12 :             let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
     202           12 :             if self.needs_shutdown && !timeline.gate.close_complete() {
     203              :                 // This should not happen: caller should call [`Self::abort`] on failures
     204            0 :                 tracing::warn!(
     205            0 :                     "Timeline not shut down after initialization failure, cannot clean up files"
     206              :                 );
     207              :             } else {
     208              :                 // This is unusual, but can happen harmlessly if the pageserver is stopped while
     209              :                 // creating a timeline.
     210           12 :                 info!("Timeline got dropped without initializing, cleaning its files");
     211           12 :                 cleanup_timeline_directory(create_guard);
     212              :             }
     213          868 :         }
     214          880 :     }
     215              : }
     216              : 
     217           12 : pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
     218           12 :     let timeline_path = &create_guard.timeline_path;
     219           12 :     match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
     220              :         Ok(()) => {
     221           12 :             info!("Timeline dir {timeline_path:?} removed successfully")
     222              :         }
     223            0 :         Err(e) => {
     224            0 :             error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
     225              :         }
     226              :     }
     227              :     // Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
     228              :     // timeline creation attempts under this TimelineId to proceed
     229           12 :     drop(create_guard);
     230           12 : }
     231              : 
     232              : /// A guard for timeline creations in process: as long as this object exists, the timeline ID
     233              : /// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
     234              : #[must_use]
     235              : pub(crate) struct TimelineCreateGuard {
     236              :     pub(crate) _tenant_gate_guard: GateGuard,
     237              :     pub(crate) owning_tenant: Arc<Tenant>,
     238              :     pub(crate) timeline_id: TimelineId,
     239              :     pub(crate) timeline_path: Utf8PathBuf,
     240              :     pub(crate) idempotency: CreateTimelineIdempotency,
     241              : }
     242              : 
     243              : /// Errors when acquiring exclusive access to a timeline ID for creation
     244              : #[derive(thiserror::Error, Debug)]
     245              : pub(crate) enum TimelineExclusionError {
     246              :     #[error("Already exists")]
     247              :     AlreadyExists {
     248              :         existing: TimelineOrOffloaded,
     249              :         arg: CreateTimelineIdempotency,
     250              :     },
     251              :     #[error("Already creating")]
     252              :     AlreadyCreating,
     253              :     #[error("Shutting down")]
     254              :     ShuttingDown,
     255              : 
     256              :     // e.g. I/O errors, or some failure deep in postgres initdb
     257              :     #[error(transparent)]
     258              :     Other(#[from] anyhow::Error),
     259              : }
     260              : 
     261              : impl TimelineCreateGuard {
     262          896 :     pub(crate) fn new(
     263          896 :         owning_tenant: &Arc<Tenant>,
     264          896 :         timeline_id: TimelineId,
     265          896 :         timeline_path: Utf8PathBuf,
     266          896 :         idempotency: CreateTimelineIdempotency,
     267          896 :         allow_offloaded: bool,
     268          896 :     ) -> Result<Self, TimelineExclusionError> {
     269          896 :         let _tenant_gate_guard = owning_tenant
     270          896 :             .gate
     271          896 :             .enter()
     272          896 :             .map_err(|_| TimelineExclusionError::ShuttingDown)?;
     273              : 
     274              :         // Lock order: this is the only place we take both locks.  During drop() we only
     275              :         // lock creating_timelines
     276          896 :         let timelines = owning_tenant.timelines.lock().unwrap();
     277          896 :         let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
     278          896 :         let mut creating_timelines: std::sync::MutexGuard<
     279          896 :             '_,
     280          896 :             std::collections::HashSet<TimelineId>,
     281          896 :         > = owning_tenant.timelines_creating.lock().unwrap();
     282              : 
     283          896 :         if let Some(existing) = timelines.get(&timeline_id) {
     284            4 :             return Err(TimelineExclusionError::AlreadyExists {
     285            4 :                 existing: TimelineOrOffloaded::Timeline(existing.clone()),
     286            4 :                 arg: idempotency,
     287            4 :             });
     288          892 :         }
     289          892 :         if !allow_offloaded {
     290          892 :             if let Some(existing) = timelines_offloaded.get(&timeline_id) {
     291            0 :                 return Err(TimelineExclusionError::AlreadyExists {
     292            0 :                     existing: TimelineOrOffloaded::Offloaded(existing.clone()),
     293            0 :                     arg: idempotency,
     294            0 :                 });
     295          892 :             }
     296            0 :         }
     297          892 :         if creating_timelines.contains(&timeline_id) {
     298            0 :             return Err(TimelineExclusionError::AlreadyCreating);
     299          892 :         }
     300          892 :         creating_timelines.insert(timeline_id);
     301          892 :         drop(creating_timelines);
     302          892 :         drop(timelines_offloaded);
     303          892 :         drop(timelines);
     304          892 :         Ok(Self {
     305          892 :             _tenant_gate_guard,
     306          892 :             owning_tenant: Arc::clone(owning_tenant),
     307          892 :             timeline_id,
     308          892 :             timeline_path,
     309          892 :             idempotency,
     310          892 :         })
     311          896 :     }
     312              : }
     313              : 
     314              : impl Drop for TimelineCreateGuard {
     315          888 :     fn drop(&mut self) {
     316          888 :         self.owning_tenant
     317          888 :             .timelines_creating
     318          888 :             .lock()
     319          888 :             .unwrap()
     320          888 :             .remove(&self.timeline_id);
     321          888 :     }
     322              : }
        

Generated by: LCOV version 2.1-beta