LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - uninit.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 68.2 % 195 133
Test Date: 2025-04-24 20:31:15 Functions: 47.8 % 23 11

            Line data    Source code
       1              : use std::collections::hash_map::Entry;
       2              : use std::fs;
       3              : use std::future::Future;
       4              : use std::sync::Arc;
       5              : 
       6              : use anyhow::Context;
       7              : use camino::Utf8PathBuf;
       8              : use tracing::{error, info, info_span};
       9              : use utils::fs_ext;
      10              : use utils::id::TimelineId;
      11              : use utils::lsn::Lsn;
      12              : use utils::sync::gate::GateGuard;
      13              : 
      14              : use super::Timeline;
      15              : use crate::context::RequestContext;
      16              : use crate::import_datadir;
      17              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      18              : use crate::tenant::{
      19              :     CreateTimelineError, CreateTimelineIdempotency, TenantShard, TimelineOrOffloaded,
      20              : };
      21              : 
      22              : /// A timeline with some of its files on disk, being initialized.
      23              : /// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
      24              : /// its local files are removed.  If we crash while this class exists, then the timeline's local
      25              : /// state is cleaned up during [`TenantShard::clean_up_timelines`], because the timeline's content isn't in remote storage.
      26              : ///
      27              : /// The caller is responsible for proper timeline data filling before the final init.
      28              : #[must_use]
      29              : pub struct UninitializedTimeline<'t> {
      30              :     pub(crate) owning_tenant: &'t TenantShard,
      31              :     timeline_id: TimelineId,
      32              :     raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
      33              :     /// Whether we spawned the inner Timeline's tasks such that we must later shut it down
      34              :     /// if aborting the timeline creation
      35              :     needs_shutdown: bool,
      36              : }
      37              : 
      38              : impl<'t> UninitializedTimeline<'t> {
      39         2748 :     pub(crate) fn new(
      40         2748 :         owning_tenant: &'t TenantShard,
      41         2748 :         timeline_id: TimelineId,
      42         2748 :         raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
      43         2748 :     ) -> Self {
      44         2748 :         Self {
      45         2748 :             owning_tenant,
      46         2748 :             timeline_id,
      47         2748 :             raw_timeline,
      48         2748 :             needs_shutdown: false,
      49         2748 :         }
      50         2748 :     }
      51              : 
      52              :     /// When writing data to this timeline during creation, use this wrapper: it will take care of
      53              :     /// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
      54              :     /// later.
      55           12 :     pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
      56           12 :     where
      57           12 :         F: FnOnce(Arc<Timeline>) -> Fut,
      58           12 :         Fut: Future<Output = Result<(), CreateTimelineError>>,
      59           12 :     {
      60           12 :         debug_assert_current_span_has_tenant_and_timeline_id();
      61           12 : 
      62           12 :         // Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
      63           12 :         self.needs_shutdown = true;
      64              : 
      65           12 :         let timeline = self.raw_timeline()?;
      66              : 
      67              :         // Spawn flush loop so that the Timeline is ready to accept writes
      68           12 :         timeline.maybe_spawn_flush_loop();
      69              : 
      70              :         // Invoke the provided function, which will write some data into the new timeline
      71           12 :         if let Err(e) = f(timeline.clone()).await {
      72            0 :             self.abort().await;
      73            0 :             return Err(e.into());
      74           12 :         }
      75              : 
      76              :         // Flush the underlying timeline's ephemeral layers to disk
      77           12 :         if let Err(e) = timeline
      78           12 :             .freeze_and_flush()
      79           12 :             .await
      80           12 :             .context("Failed to flush after timeline creation writes")
      81              :         {
      82            0 :             self.abort().await;
      83            0 :             return Err(e);
      84           12 :         }
      85           12 : 
      86           12 :         Ok(())
      87           12 :     }
      88              : 
      89            0 :     pub(crate) async fn abort(&self) {
      90            0 :         if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
      91            0 :             raw_timeline.shutdown(super::ShutdownMode::Hard).await;
      92            0 :         }
      93            0 :     }
      94              : 
      95              :     /// Finish timeline creation: insert it into the Tenant's timelines map
      96              :     ///
      97              :     /// This function launches the flush loop if not already done.
      98              :     ///
      99              :     /// The caller is responsible for activating the timeline (function `.activate()`).
     100         2700 :     pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
     101         2700 :         let timeline_id = self.timeline_id;
     102         2700 :         let tenant_shard_id = self.owning_tenant.tenant_shard_id;
     103         2700 : 
     104         2700 :         if self.raw_timeline.is_none() {
     105            0 :             self.abort().await;
     106              : 
     107            0 :             return Err(anyhow::anyhow!(
     108            0 :                 "No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
     109            0 :             ));
     110         2700 :         }
     111         2700 : 
     112         2700 :         // Check that the caller initialized disk_consistent_lsn
     113         2700 :         let new_disk_consistent_lsn = self
     114         2700 :             .raw_timeline
     115         2700 :             .as_ref()
     116         2700 :             .expect("checked above")
     117         2700 :             .0
     118         2700 :             .get_disk_consistent_lsn();
     119         2700 : 
     120         2700 :         if !new_disk_consistent_lsn.is_valid() {
     121            0 :             self.abort().await;
     122              : 
     123            0 :             return Err(anyhow::anyhow!(
     124            0 :                 "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
     125            0 :             ));
     126         2700 :         }
     127         2700 : 
     128         2700 :         let mut timelines = self.owning_tenant.timelines.lock().unwrap();
     129         2700 :         match timelines.entry(timeline_id) {
     130              :             Entry::Occupied(_) => {
     131              :                 // Unexpected, bug in the caller.  Tenant is responsible for preventing concurrent creation of the same timeline.
     132              :                 //
     133              :                 // We do not call Self::abort here.  Because we don't cleanly shut down our Timeline, [`Self::drop`] should
     134              :                 // skip trying to delete the timeline directory too.
     135            0 :                 anyhow::bail!(
     136            0 :                     "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
     137            0 :                 )
     138              :             }
     139         2700 :             Entry::Vacant(v) => {
     140         2700 :                 // after taking here should be no fallible operations, because the drop guard will not
     141         2700 :                 // cleanup after and would block for example the tenant deletion
     142         2700 :                 let (new_timeline, _create_guard) =
     143         2700 :                     self.raw_timeline.take().expect("already checked");
     144         2700 : 
     145         2700 :                 v.insert(Arc::clone(&new_timeline));
     146         2700 : 
     147         2700 :                 new_timeline.maybe_spawn_flush_loop();
     148         2700 : 
     149         2700 :                 Ok(new_timeline)
     150              :             }
     151              :         }
     152         2700 :     }
     153              : 
     154            0 :     pub(crate) fn finish_creation_myself(&mut self) -> (Arc<Timeline>, TimelineCreateGuard) {
     155            0 :         self.raw_timeline.take().expect("already checked")
     156            0 :     }
     157              : 
     158              :     /// Prepares timeline data by loading it from the basebackup archive.
     159            0 :     pub(crate) async fn import_basebackup_from_tar(
     160            0 :         mut self,
     161            0 :         tenant: Arc<TenantShard>,
     162            0 :         copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
     163            0 :         base_lsn: Lsn,
     164            0 :         broker_client: storage_broker::BrokerClientChannel,
     165            0 :         ctx: &RequestContext,
     166            0 :     ) -> anyhow::Result<Arc<Timeline>> {
     167            0 :         self.write(|raw_timeline| async move {
     168            0 :             import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
     169            0 :                 .await
     170            0 :                 .context("Failed to import basebackup")
     171            0 :                 .map_err(CreateTimelineError::Other)?;
     172              : 
     173            0 :             fail::fail_point!("before-checkpoint-new-timeline", |_| {
     174            0 :                 Err(CreateTimelineError::Other(anyhow::anyhow!(
     175            0 :                     "failpoint before-checkpoint-new-timeline"
     176            0 :                 )))
     177            0 :             });
     178              : 
     179            0 :             Ok(())
     180            0 :         })
     181            0 :         .await?;
     182              : 
     183              :         // All the data has been imported. Insert the Timeline into the tenant's timelines map
     184            0 :         let tl = self.finish_creation().await?;
     185            0 :         tl.activate(tenant, broker_client, None, ctx);
     186            0 :         Ok(tl)
     187            0 :     }
     188              : 
     189         1344 :     pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
     190         1344 :         Ok(&self
     191         1344 :             .raw_timeline
     192         1344 :             .as_ref()
     193         1344 :             .with_context(|| {
     194            0 :                 format!(
     195            0 :                     "No raw timeline {}/{} found",
     196            0 :                     self.owning_tenant.tenant_shard_id, self.timeline_id
     197            0 :                 )
     198         1344 :             })?
     199              :             .0)
     200         1344 :     }
     201              : }
     202              : 
     203              : impl Drop for UninitializedTimeline<'_> {
     204         2736 :     fn drop(&mut self) {
     205         2736 :         if let Some((timeline, create_guard)) = self.raw_timeline.take() {
     206           36 :             let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
     207           36 :             if self.needs_shutdown && !timeline.gate.close_complete() {
     208              :                 // This should not happen: caller should call [`Self::abort`] on failures
     209            0 :                 tracing::warn!(
     210            0 :                     "Timeline not shut down after initialization failure, cannot clean up files"
     211              :                 );
     212              :             } else {
     213              :                 // This is unusual, but can happen harmlessly if the pageserver is stopped while
     214              :                 // creating a timeline.
     215           36 :                 info!("Timeline got dropped without initializing, cleaning its files");
     216           36 :                 cleanup_timeline_directory(create_guard);
     217              :             }
     218         2700 :         }
     219         2736 :     }
     220              : }
     221              : 
     222           36 : pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
     223           36 :     let timeline_path = &create_guard.timeline_path;
     224           36 :     match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
     225              :         Ok(()) => {
     226           36 :             info!("Timeline dir {timeline_path:?} removed successfully")
     227              :         }
     228            0 :         Err(e) => {
     229            0 :             error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
     230              :         }
     231              :     }
     232              :     // Having cleaned up, we can release this TimelineId in `[TenantShard::timelines_creating]` to allow other
     233              :     // timeline creation attempts under this TimelineId to proceed
     234           36 :     drop(create_guard);
     235           36 : }
     236              : 
     237              : /// A guard for timeline creations in process: as long as this object exists, the timeline ID
     238              : /// is kept in `[TenantShard::timelines_creating]` to exclude concurrent attempts to create the same timeline.
     239              : #[must_use]
     240              : pub(crate) struct TimelineCreateGuard {
     241              :     pub(crate) _tenant_gate_guard: GateGuard,
     242              :     pub(crate) owning_tenant: Arc<TenantShard>,
     243              :     pub(crate) timeline_id: TimelineId,
     244              :     pub(crate) timeline_path: Utf8PathBuf,
     245              :     pub(crate) idempotency: CreateTimelineIdempotency,
     246              : }
     247              : 
     248              : /// Errors when acquiring exclusive access to a timeline ID for creation
     249              : #[derive(thiserror::Error, Debug)]
     250              : pub(crate) enum TimelineExclusionError {
     251              :     #[error("Already exists")]
     252              :     AlreadyExists {
     253              :         existing: TimelineOrOffloaded,
     254              :         arg: CreateTimelineIdempotency,
     255              :     },
     256              :     #[error("Already creating")]
     257              :     AlreadyCreating,
     258              :     #[error("Shutting down")]
     259              :     ShuttingDown,
     260              : 
     261              :     // e.g. I/O errors, or some failure deep in postgres initdb
     262              :     #[error(transparent)]
     263              :     Other(#[from] anyhow::Error),
     264              : }
     265              : 
     266              : impl TimelineCreateGuard {
     267         2784 :     pub(crate) fn new(
     268         2784 :         owning_tenant: &Arc<TenantShard>,
     269         2784 :         timeline_id: TimelineId,
     270         2784 :         timeline_path: Utf8PathBuf,
     271         2784 :         idempotency: CreateTimelineIdempotency,
     272         2784 :         allow_offloaded: bool,
     273         2784 :     ) -> Result<Self, TimelineExclusionError> {
     274         2784 :         let _tenant_gate_guard = owning_tenant
     275         2784 :             .gate
     276         2784 :             .enter()
     277         2784 :             .map_err(|_| TimelineExclusionError::ShuttingDown)?;
     278              : 
     279              :         // Lock order: this is the only place we take both locks.  During drop() we only
     280              :         // lock creating_timelines
     281         2784 :         let timelines = owning_tenant.timelines.lock().unwrap();
     282         2784 :         let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
     283         2784 :         let mut creating_timelines: std::sync::MutexGuard<
     284         2784 :             '_,
     285         2784 :             std::collections::HashSet<TimelineId>,
     286         2784 :         > = owning_tenant.timelines_creating.lock().unwrap();
     287              : 
     288         2784 :         if let Some(existing) = timelines.get(&timeline_id) {
     289           12 :             return Err(TimelineExclusionError::AlreadyExists {
     290           12 :                 existing: TimelineOrOffloaded::Timeline(existing.clone()),
     291           12 :                 arg: idempotency,
     292           12 :             });
     293         2772 :         }
     294         2772 :         if !allow_offloaded {
     295         2772 :             if let Some(existing) = timelines_offloaded.get(&timeline_id) {
     296            0 :                 return Err(TimelineExclusionError::AlreadyExists {
     297            0 :                     existing: TimelineOrOffloaded::Offloaded(existing.clone()),
     298            0 :                     arg: idempotency,
     299            0 :                 });
     300         2772 :             }
     301            0 :         }
     302         2772 :         if creating_timelines.contains(&timeline_id) {
     303            0 :             return Err(TimelineExclusionError::AlreadyCreating);
     304         2772 :         }
     305         2772 :         creating_timelines.insert(timeline_id);
     306         2772 :         drop(creating_timelines);
     307         2772 :         drop(timelines_offloaded);
     308         2772 :         drop(timelines);
     309         2772 :         Ok(Self {
     310         2772 :             _tenant_gate_guard,
     311         2772 :             owning_tenant: Arc::clone(owning_tenant),
     312         2772 :             timeline_id,
     313         2772 :             timeline_path,
     314         2772 :             idempotency,
     315         2772 :         })
     316         2784 :     }
     317              : }
     318              : 
     319              : impl Drop for TimelineCreateGuard {
     320         2760 :     fn drop(&mut self) {
     321         2760 :         self.owning_tenant
     322         2760 :             .timelines_creating
     323         2760 :             .lock()
     324         2760 :             .unwrap()
     325         2760 :             .remove(&self.timeline_id);
     326         2760 :     }
     327              : }
        

Generated by: LCOV version 2.1-beta