LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - uninit.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 68.2 % 195 133
Test Date: 2025-03-12 16:10:49 Functions: 47.8 % 23 11

            Line data    Source code
       1              : use std::collections::hash_map::Entry;
       2              : use std::fs;
       3              : use std::future::Future;
       4              : use std::sync::Arc;
       5              : 
       6              : use anyhow::Context;
       7              : use camino::Utf8PathBuf;
       8              : use tracing::{error, info, info_span};
       9              : use utils::fs_ext;
      10              : use utils::id::TimelineId;
      11              : use utils::lsn::Lsn;
      12              : use utils::sync::gate::GateGuard;
      13              : 
      14              : use super::Timeline;
      15              : use crate::context::RequestContext;
      16              : use crate::import_datadir;
      17              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      18              : use crate::tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded};
      19              : 
      20              : /// A timeline with some of its files on disk, being initialized.
      21              : /// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
      22              : /// its local files are removed.  If we crash while this class exists, then the timeline's local
      23              : /// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
      24              : ///
      25              : /// The caller is responsible for proper timeline data filling before the final init.
      26              : #[must_use]
      27              : pub struct UninitializedTimeline<'t> {
      28              :     pub(crate) owning_tenant: &'t Tenant,
      29              :     timeline_id: TimelineId,
      30              :     raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
      31              :     /// Whether we spawned the inner Timeline's tasks such that we must later shut it down
      32              :     /// if aborting the timeline creation
      33              :     needs_shutdown: bool,
      34              : }
      35              : 
      36              : impl<'t> UninitializedTimeline<'t> {
      37          892 :     pub(crate) fn new(
      38          892 :         owning_tenant: &'t Tenant,
      39          892 :         timeline_id: TimelineId,
      40          892 :         raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
      41          892 :     ) -> Self {
      42          892 :         Self {
      43          892 :             owning_tenant,
      44          892 :             timeline_id,
      45          892 :             raw_timeline,
      46          892 :             needs_shutdown: false,
      47          892 :         }
      48          892 :     }
      49              : 
      50              :     /// When writing data to this timeline during creation, use this wrapper: it will take care of
      51              :     /// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
      52              :     /// later.
      53            4 :     pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
      54            4 :     where
      55            4 :         F: FnOnce(Arc<Timeline>) -> Fut,
      56            4 :         Fut: Future<Output = Result<(), CreateTimelineError>>,
      57            4 :     {
      58            4 :         debug_assert_current_span_has_tenant_and_timeline_id();
      59            4 : 
      60            4 :         // Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
      61            4 :         self.needs_shutdown = true;
      62              : 
      63            4 :         let timeline = self.raw_timeline()?;
      64              : 
      65              :         // Spawn flush loop so that the Timeline is ready to accept writes
      66            4 :         timeline.maybe_spawn_flush_loop();
      67              : 
      68              :         // Invoke the provided function, which will write some data into the new timeline
      69            4 :         if let Err(e) = f(timeline.clone()).await {
      70            0 :             self.abort().await;
      71            0 :             return Err(e.into());
      72            4 :         }
      73              : 
      74              :         // Flush the underlying timeline's ephemeral layers to disk
      75            4 :         if let Err(e) = timeline
      76            4 :             .freeze_and_flush()
      77            4 :             .await
      78            4 :             .context("Failed to flush after timeline creation writes")
      79              :         {
      80            0 :             self.abort().await;
      81            0 :             return Err(e);
      82            4 :         }
      83            4 : 
      84            4 :         Ok(())
      85            4 :     }
      86              : 
      87            0 :     pub(crate) async fn abort(&self) {
      88            0 :         if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
      89            0 :             raw_timeline.shutdown(super::ShutdownMode::Hard).await;
      90            0 :         }
      91            0 :     }
      92              : 
      93              :     /// Finish timeline creation: insert it into the Tenant's timelines map
      94              :     ///
      95              :     /// This function launches the flush loop if not already done.
      96              :     ///
      97              :     /// The caller is responsible for activating the timeline (function `.activate()`).
      98          876 :     pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
      99          876 :         let timeline_id = self.timeline_id;
     100          876 :         let tenant_shard_id = self.owning_tenant.tenant_shard_id;
     101          876 : 
     102          876 :         if self.raw_timeline.is_none() {
     103            0 :             self.abort().await;
     104              : 
     105            0 :             return Err(anyhow::anyhow!(
     106            0 :                 "No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
     107            0 :             ));
     108          876 :         }
     109          876 : 
     110          876 :         // Check that the caller initialized disk_consistent_lsn
     111          876 :         let new_disk_consistent_lsn = self
     112          876 :             .raw_timeline
     113          876 :             .as_ref()
     114          876 :             .expect("checked above")
     115          876 :             .0
     116          876 :             .get_disk_consistent_lsn();
     117          876 : 
     118          876 :         if !new_disk_consistent_lsn.is_valid() {
     119            0 :             self.abort().await;
     120              : 
     121            0 :             return Err(anyhow::anyhow!(
     122            0 :                 "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
     123            0 :             ));
     124          876 :         }
     125          876 : 
     126          876 :         let mut timelines = self.owning_tenant.timelines.lock().unwrap();
     127          876 :         match timelines.entry(timeline_id) {
     128              :             Entry::Occupied(_) => {
     129              :                 // Unexpected, bug in the caller.  Tenant is responsible for preventing concurrent creation of the same timeline.
     130              :                 //
     131              :                 // We do not call Self::abort here.  Because we don't cleanly shut down our Timeline, [`Self::drop`] should
     132              :                 // skip trying to delete the timeline directory too.
     133            0 :                 anyhow::bail!(
     134            0 :                     "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
     135            0 :                 )
     136              :             }
     137          876 :             Entry::Vacant(v) => {
     138          876 :                 // after taking here should be no fallible operations, because the drop guard will not
     139          876 :                 // cleanup after and would block for example the tenant deletion
     140          876 :                 let (new_timeline, _create_guard) =
     141          876 :                     self.raw_timeline.take().expect("already checked");
     142          876 : 
     143          876 :                 v.insert(Arc::clone(&new_timeline));
     144          876 : 
     145          876 :                 new_timeline.maybe_spawn_flush_loop();
     146          876 : 
     147          876 :                 Ok(new_timeline)
     148              :             }
     149              :         }
     150          876 :     }
     151              : 
     152            0 :     pub(crate) fn finish_creation_myself(&mut self) -> (Arc<Timeline>, TimelineCreateGuard) {
     153            0 :         self.raw_timeline.take().expect("already checked")
     154            0 :     }
     155              : 
     156              :     /// Prepares timeline data by loading it from the basebackup archive.
     157            0 :     pub(crate) async fn import_basebackup_from_tar(
     158            0 :         mut self,
     159            0 :         tenant: Arc<Tenant>,
     160            0 :         copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
     161            0 :         base_lsn: Lsn,
     162            0 :         broker_client: storage_broker::BrokerClientChannel,
     163            0 :         ctx: &RequestContext,
     164            0 :     ) -> anyhow::Result<Arc<Timeline>> {
     165            0 :         self.write(|raw_timeline| async move {
     166            0 :             import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
     167            0 :                 .await
     168            0 :                 .context("Failed to import basebackup")
     169            0 :                 .map_err(CreateTimelineError::Other)?;
     170              : 
     171            0 :             fail::fail_point!("before-checkpoint-new-timeline", |_| {
     172            0 :                 Err(CreateTimelineError::Other(anyhow::anyhow!(
     173            0 :                     "failpoint before-checkpoint-new-timeline"
     174            0 :                 )))
     175            0 :             });
     176              : 
     177            0 :             Ok(())
     178            0 :         })
     179            0 :         .await?;
     180              : 
     181              :         // All the data has been imported. Insert the Timeline into the tenant's timelines map
     182            0 :         let tl = self.finish_creation().await?;
     183            0 :         tl.activate(tenant, broker_client, None, ctx);
     184            0 :         Ok(tl)
     185            0 :     }
     186              : 
     187          436 :     pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
     188          436 :         Ok(&self
     189          436 :             .raw_timeline
     190          436 :             .as_ref()
     191          436 :             .with_context(|| {
     192            0 :                 format!(
     193            0 :                     "No raw timeline {}/{} found",
     194            0 :                     self.owning_tenant.tenant_shard_id, self.timeline_id
     195            0 :                 )
     196          436 :             })?
     197              :             .0)
     198          436 :     }
     199              : }
     200              : 
     201              : impl Drop for UninitializedTimeline<'_> {
     202          888 :     fn drop(&mut self) {
     203          888 :         if let Some((timeline, create_guard)) = self.raw_timeline.take() {
     204           12 :             let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
     205           12 :             if self.needs_shutdown && !timeline.gate.close_complete() {
     206              :                 // This should not happen: caller should call [`Self::abort`] on failures
     207            0 :                 tracing::warn!(
     208            0 :                     "Timeline not shut down after initialization failure, cannot clean up files"
     209              :                 );
     210              :             } else {
     211              :                 // This is unusual, but can happen harmlessly if the pageserver is stopped while
     212              :                 // creating a timeline.
     213           12 :                 info!("Timeline got dropped without initializing, cleaning its files");
     214           12 :                 cleanup_timeline_directory(create_guard);
     215              :             }
     216          876 :         }
     217          888 :     }
     218              : }
     219              : 
     220           12 : pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
     221           12 :     let timeline_path = &create_guard.timeline_path;
     222           12 :     match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
     223              :         Ok(()) => {
     224           12 :             info!("Timeline dir {timeline_path:?} removed successfully")
     225              :         }
     226            0 :         Err(e) => {
     227            0 :             error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
     228              :         }
     229              :     }
     230              :     // Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
     231              :     // timeline creation attempts under this TimelineId to proceed
     232           12 :     drop(create_guard);
     233           12 : }
     234              : 
     235              : /// A guard for timeline creations in process: as long as this object exists, the timeline ID
     236              : /// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
     237              : #[must_use]
     238              : pub(crate) struct TimelineCreateGuard {
     239              :     pub(crate) _tenant_gate_guard: GateGuard,
     240              :     pub(crate) owning_tenant: Arc<Tenant>,
     241              :     pub(crate) timeline_id: TimelineId,
     242              :     pub(crate) timeline_path: Utf8PathBuf,
     243              :     pub(crate) idempotency: CreateTimelineIdempotency,
     244              : }
     245              : 
     246              : /// Errors when acquiring exclusive access to a timeline ID for creation
     247              : #[derive(thiserror::Error, Debug)]
     248              : pub(crate) enum TimelineExclusionError {
     249              :     #[error("Already exists")]
     250              :     AlreadyExists {
     251              :         existing: TimelineOrOffloaded,
     252              :         arg: CreateTimelineIdempotency,
     253              :     },
     254              :     #[error("Already creating")]
     255              :     AlreadyCreating,
     256              :     #[error("Shutting down")]
     257              :     ShuttingDown,
     258              : 
     259              :     // e.g. I/O errors, or some failure deep in postgres initdb
     260              :     #[error(transparent)]
     261              :     Other(#[from] anyhow::Error),
     262              : }
     263              : 
     264              : impl TimelineCreateGuard {
     265          904 :     pub(crate) fn new(
     266          904 :         owning_tenant: &Arc<Tenant>,
     267          904 :         timeline_id: TimelineId,
     268          904 :         timeline_path: Utf8PathBuf,
     269          904 :         idempotency: CreateTimelineIdempotency,
     270          904 :         allow_offloaded: bool,
     271          904 :     ) -> Result<Self, TimelineExclusionError> {
     272          904 :         let _tenant_gate_guard = owning_tenant
     273          904 :             .gate
     274          904 :             .enter()
     275          904 :             .map_err(|_| TimelineExclusionError::ShuttingDown)?;
     276              : 
     277              :         // Lock order: this is the only place we take both locks.  During drop() we only
     278              :         // lock creating_timelines
     279          904 :         let timelines = owning_tenant.timelines.lock().unwrap();
     280          904 :         let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
     281          904 :         let mut creating_timelines: std::sync::MutexGuard<
     282          904 :             '_,
     283          904 :             std::collections::HashSet<TimelineId>,
     284          904 :         > = owning_tenant.timelines_creating.lock().unwrap();
     285              : 
     286          904 :         if let Some(existing) = timelines.get(&timeline_id) {
     287            4 :             return Err(TimelineExclusionError::AlreadyExists {
     288            4 :                 existing: TimelineOrOffloaded::Timeline(existing.clone()),
     289            4 :                 arg: idempotency,
     290            4 :             });
     291          900 :         }
     292          900 :         if !allow_offloaded {
     293          900 :             if let Some(existing) = timelines_offloaded.get(&timeline_id) {
     294            0 :                 return Err(TimelineExclusionError::AlreadyExists {
     295            0 :                     existing: TimelineOrOffloaded::Offloaded(existing.clone()),
     296            0 :                     arg: idempotency,
     297            0 :                 });
     298          900 :             }
     299            0 :         }
     300          900 :         if creating_timelines.contains(&timeline_id) {
     301            0 :             return Err(TimelineExclusionError::AlreadyCreating);
     302          900 :         }
     303          900 :         creating_timelines.insert(timeline_id);
     304          900 :         drop(creating_timelines);
     305          900 :         drop(timelines_offloaded);
     306          900 :         drop(timelines);
     307          900 :         Ok(Self {
     308          900 :             _tenant_gate_guard,
     309          900 :             owning_tenant: Arc::clone(owning_tenant),
     310          900 :             timeline_id,
     311          900 :             timeline_path,
     312          900 :             idempotency,
     313          900 :         })
     314          904 :     }
     315              : }
     316              : 
     317              : impl Drop for TimelineCreateGuard {
     318          896 :     fn drop(&mut self) {
     319          896 :         self.owning_tenant
     320          896 :             .timelines_creating
     321          896 :             .lock()
     322          896 :             .unwrap()
     323          896 :             .remove(&self.timeline_id);
     324          896 :     }
     325              : }
        

Generated by: LCOV version 2.1-beta