LCOV - code coverage report
Current view: top level - safekeeper/src - timelines_global_map.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 0.0 % 413 0
Test Date: 2025-07-26 17:20:05 Functions: 0.0 % 49 0

            Line data    Source code
       1              : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
       2              : //! All timelines should always be present in this map, this is done by loading them
       3              : //! all from the disk on startup and keeping them in memory.
       4              : 
       5              : use std::collections::HashMap;
       6              : use std::str::FromStr;
       7              : use std::sync::{Arc, Mutex};
       8              : use std::time::{Duration, Instant};
       9              : 
      10              : use anyhow::{Context, Result, bail};
      11              : use camino::Utf8PathBuf;
      12              : use camino_tempfile::Utf8TempDir;
      13              : use safekeeper_api::membership::{Configuration, SafekeeperGeneration};
      14              : use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
      15              : use safekeeper_api::{ServerInfo, membership};
      16              : use tokio::fs;
      17              : use tracing::*;
      18              : use utils::crashsafe::{durable_rename, fsync_async_opt};
      19              : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
      20              : use utils::lsn::Lsn;
      21              : 
      22              : use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
      23              : use crate::http::routes::DeleteOrExcludeError;
      24              : use crate::rate_limit::RateLimiter;
      25              : use crate::state::TimelinePersistentState;
      26              : use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
      27              : use crate::timelines_set::TimelinesSet;
      28              : use crate::wal_backup::WalBackup;
      29              : use crate::wal_storage::Storage;
      30              : use crate::{SafeKeeperConf, control_file, wal_storage};
      31              : 
      32              : // Timeline entry in the global map: either a ready timeline, or mark that it is
      33              : // being created.
      34              : #[derive(Clone)]
      35              : enum GlobalMapTimeline {
      36              :     CreationInProgress,
      37              :     Timeline(Arc<Timeline>),
      38              : }
      39              : 
      40              : struct GlobalTimelinesState {
      41              :     timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
      42              : 
      43              :     /// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
      44              :     /// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
      45              :     /// this map is dropped on restart.
      46              :     /// The timeline might also be locally deleted (excluded) via safekeeper migration algorithm. In that case,
      47              :     /// the tombsone contains the corresponding safekeeper generation. The pull_timeline requests with
      48              :     /// higher generation ignore such tombstones and can recreate the timeline.
      49              :     timeline_tombstones: HashMap<TenantTimelineId, TimelineTombstone>,
      50              :     /// A tombstone indicates that the tenant used to exist has been deleted.
      51              :     /// These are created only by tenant_delete requests. They are always valid regardless of the
      52              :     /// request generation.
      53              :     /// This is only soft-enforced, as this map is dropped on restart.
      54              :     tenant_tombstones: HashMap<TenantId, Instant>,
      55              : 
      56              :     conf: Arc<SafeKeeperConf>,
      57              :     broker_active_set: Arc<TimelinesSet>,
      58              :     global_rate_limiter: RateLimiter,
      59              :     wal_backup: Arc<WalBackup>,
      60              : }
      61              : 
      62              : impl GlobalTimelinesState {
      63              :     /// Get dependencies for a timeline constructor.
      64            0 :     fn get_dependencies(
      65            0 :         &self,
      66            0 :     ) -> (
      67            0 :         Arc<SafeKeeperConf>,
      68            0 :         Arc<TimelinesSet>,
      69            0 :         RateLimiter,
      70            0 :         Arc<WalBackup>,
      71            0 :     ) {
      72            0 :         (
      73            0 :             self.conf.clone(),
      74            0 :             self.broker_active_set.clone(),
      75            0 :             self.global_rate_limiter.clone(),
      76            0 :             self.wal_backup.clone(),
      77            0 :         )
      78            0 :     }
      79              : 
      80              :     /// Get timeline from the map. Returns error if timeline doesn't exist or
      81              :     /// creation is in progress.
      82            0 :     fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
      83            0 :         match self.timelines.get(ttid).cloned() {
      84            0 :             Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
      85              :             Some(GlobalMapTimeline::CreationInProgress) => {
      86            0 :                 Err(TimelineError::CreationInProgress(*ttid))
      87              :             }
      88              :             None => {
      89            0 :                 if self.has_tombstone(ttid, None) {
      90            0 :                     Err(TimelineError::Deleted(*ttid))
      91              :                 } else {
      92            0 :                     Err(TimelineError::NotFound(*ttid))
      93              :                 }
      94              :             }
      95              :         }
      96            0 :     }
      97              : 
      98            0 :     fn has_timeline_tombstone(
      99            0 :         &self,
     100            0 :         ttid: &TenantTimelineId,
     101            0 :         generation: Option<SafekeeperGeneration>,
     102            0 :     ) -> bool {
     103            0 :         if let Some(generation) = generation {
     104            0 :             self.timeline_tombstones
     105            0 :                 .get(ttid)
     106            0 :                 .is_some_and(|t| t.is_valid(generation))
     107              :         } else {
     108            0 :             self.timeline_tombstones.contains_key(ttid)
     109              :         }
     110            0 :     }
     111              : 
     112            0 :     fn has_tenant_tombstone(&self, tenant_id: &TenantId) -> bool {
     113            0 :         self.tenant_tombstones.contains_key(tenant_id)
     114            0 :     }
     115              : 
     116              :     /// Check if the state has a tenant or a timeline tombstone.
     117              :     /// If `generation` is provided, check only for timeline tombsotnes with same or higher generation.
     118              :     /// If `generation` is `None`, check for any timeline tombstone.
     119              :     /// Tenant tombstones are checked regardless of the generation.
     120            0 :     fn has_tombstone(
     121            0 :         &self,
     122            0 :         ttid: &TenantTimelineId,
     123            0 :         generation: Option<SafekeeperGeneration>,
     124            0 :     ) -> bool {
     125            0 :         self.has_timeline_tombstone(ttid, generation) || self.has_tenant_tombstone(&ttid.tenant_id)
     126            0 :     }
     127              : 
     128              :     /// Removes timeline tombstone for the given timeline ID.
     129              :     /// Returns `true` if there have been actual changes.
     130            0 :     fn remove_timeline_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
     131            0 :         self.timeline_tombstones.remove(ttid).is_some()
     132            0 :     }
     133              : 
     134            0 :     fn delete(&mut self, ttid: TenantTimelineId, generation: Option<SafekeeperGeneration>) {
     135            0 :         self.timelines.remove(&ttid);
     136            0 :         self.timeline_tombstones
     137            0 :             .insert(ttid, TimelineTombstone::new(generation));
     138            0 :     }
     139              : 
     140            0 :     fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
     141            0 :         self.tenant_tombstones.insert(tenant_id, Instant::now());
     142            0 :     }
     143              : }
     144              : 
     145              : /// A struct used to manage access to the global timelines map.
     146              : pub struct GlobalTimelines {
     147              :     state: Mutex<GlobalTimelinesState>,
     148              : }
     149              : 
     150              : impl GlobalTimelines {
     151              :     /// Create a new instance of the global timelines map.
     152            0 :     pub fn new(conf: Arc<SafeKeeperConf>, wal_backup: Arc<WalBackup>) -> Self {
     153            0 :         Self {
     154            0 :             state: Mutex::new(GlobalTimelinesState {
     155            0 :                 timelines: HashMap::new(),
     156            0 :                 timeline_tombstones: HashMap::new(),
     157            0 :                 tenant_tombstones: HashMap::new(),
     158            0 :                 conf,
     159            0 :                 broker_active_set: Arc::new(TimelinesSet::default()),
     160            0 :                 global_rate_limiter: RateLimiter::new(1, 1),
     161            0 :                 wal_backup,
     162            0 :             }),
     163            0 :         }
     164            0 :     }
     165              : 
     166              :     /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
     167            0 :     pub async fn init(&self) -> Result<()> {
     168              :         // clippy isn't smart enough to understand that drop(state) releases the
     169              :         // lock, so use explicit block
     170            0 :         let tenants_dir = {
     171            0 :             let mut state = self.state.lock().unwrap();
     172            0 :             state.global_rate_limiter = RateLimiter::new(
     173            0 :                 state.conf.partial_backup_concurrency,
     174              :                 DEFAULT_EVICTION_CONCURRENCY,
     175              :             );
     176              : 
     177              :             // Iterate through all directories and load tenants for all directories
     178              :             // named as a valid tenant_id.
     179            0 :             state.conf.workdir.clone()
     180              :         };
     181            0 :         let mut tenant_count = 0;
     182            0 :         for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
     183            0 :             .with_context(|| format!("failed to list tenants dir {tenants_dir}"))?
     184              :         {
     185            0 :             match &tenants_dir_entry {
     186            0 :                 Ok(tenants_dir_entry) => {
     187            0 :                     if let Ok(tenant_id) =
     188            0 :                         TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
     189              :                     {
     190            0 :                         tenant_count += 1;
     191            0 :                         self.load_tenant_timelines(tenant_id).await?;
     192            0 :                     }
     193              :                 }
     194            0 :                 Err(e) => error!(
     195            0 :                     "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
     196              :                     tenants_dir_entry, tenants_dir, e
     197              :                 ),
     198              :             }
     199              :         }
     200              : 
     201            0 :         info!(
     202            0 :             "found {} tenants directories, successfully loaded {} timelines",
     203              :             tenant_count,
     204            0 :             self.state.lock().unwrap().timelines.len()
     205              :         );
     206            0 :         Ok(())
     207            0 :     }
     208              : 
     209              :     /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
     210              :     /// errors if any.
     211              :     ///
     212              :     /// It is async, but self.state lock is sync and there is no important
     213              :     /// reason to make it async (it is always held for a short while), so we
     214              :     /// just lock and unlock it for each timeline -- this function is called
     215              :     /// during init when nothing else is running, so this is fine.
     216            0 :     async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
     217            0 :         let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
     218            0 :             let state = self.state.lock().unwrap();
     219            0 :             state.get_dependencies()
     220            0 :         };
     221              : 
     222            0 :         let timelines_dir = get_tenant_dir(&conf, &tenant_id);
     223            0 :         for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
     224            0 :             .with_context(|| format!("failed to list timelines dir {timelines_dir}"))?
     225              :         {
     226            0 :             match &timelines_dir_entry {
     227            0 :                 Ok(timeline_dir_entry) => {
     228            0 :                     if let Ok(timeline_id) =
     229            0 :                         TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
     230              :                     {
     231            0 :                         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     232            0 :                         match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) {
     233            0 :                             Ok(tli) => {
     234            0 :                                 let mut shared_state = tli.write_shared_state().await;
     235            0 :                                 self.state
     236            0 :                                     .lock()
     237            0 :                                     .unwrap()
     238            0 :                                     .timelines
     239            0 :                                     .insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
     240            0 :                                 tli.bootstrap(
     241            0 :                                     &mut shared_state,
     242            0 :                                     &conf,
     243            0 :                                     broker_active_set.clone(),
     244            0 :                                     partial_backup_rate_limiter.clone(),
     245            0 :                                     wal_backup.clone(),
     246              :                                 );
     247              :                             }
     248              :                             // If we can't load a timeline, it's most likely because of a corrupted
     249              :                             // directory. We will log an error and won't allow to delete/recreate
     250              :                             // this timeline. The only way to fix this timeline is to repair manually
     251              :                             // and restart the safekeeper.
     252            0 :                             Err(e) => error!(
     253            0 :                                 "failed to load timeline {} for tenant {}, reason: {:?}",
     254              :                                 timeline_id, tenant_id, e
     255              :                             ),
     256              :                         }
     257            0 :                     }
     258              :                 }
     259            0 :                 Err(e) => error!(
     260            0 :                     "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
     261              :                     timelines_dir_entry, timelines_dir, e
     262              :                 ),
     263              :             }
     264              :         }
     265              : 
     266            0 :         Ok(())
     267            0 :     }
     268              : 
     269              :     /// Get the number of timelines in the map.
     270            0 :     pub fn timelines_count(&self) -> usize {
     271            0 :         self.state.lock().unwrap().timelines.len()
     272            0 :     }
     273              : 
     274              :     /// Get the global safekeeper config.
     275            0 :     pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
     276            0 :         self.state.lock().unwrap().conf.clone()
     277            0 :     }
     278              : 
     279            0 :     pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
     280            0 :         self.state.lock().unwrap().broker_active_set.clone()
     281            0 :     }
     282              : 
     283            0 :     pub fn get_wal_backup(&self) -> Arc<WalBackup> {
     284            0 :         self.state.lock().unwrap().wal_backup.clone()
     285            0 :     }
     286              : 
     287              :     /// Create a new timeline with the given id. If the timeline already exists, returns
     288              :     /// an existing timeline.
     289            0 :     pub(crate) async fn create(
     290            0 :         &self,
     291            0 :         ttid: TenantTimelineId,
     292            0 :         mconf: Configuration,
     293            0 :         server_info: ServerInfo,
     294            0 :         start_lsn: Lsn,
     295            0 :         commit_lsn: Lsn,
     296            0 :     ) -> Result<Arc<Timeline>> {
     297            0 :         let generation = Some(mconf.generation);
     298              : 
     299            0 :         let (conf, _, _, _) = {
     300            0 :             let state = self.state.lock().unwrap();
     301            0 :             if let Ok(timeline) = state.get(&ttid) {
     302              :                 // Timeline already exists, return it.
     303            0 :                 return Ok(timeline);
     304            0 :             }
     305              : 
     306            0 :             if state.has_tombstone(&ttid, generation) {
     307            0 :                 anyhow::bail!(TimelineError::Deleted(ttid));
     308            0 :             }
     309              : 
     310            0 :             state.get_dependencies()
     311              :         };
     312              : 
     313            0 :         info!("creating new timeline {}", ttid);
     314              : 
     315              :         // Do on disk initialization in tmp dir.
     316            0 :         let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
     317              : 
     318              :         // TODO: currently we create only cfile. It would be reasonable to
     319              :         // immediately initialize first WAL segment as well.
     320            0 :         let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
     321            0 :         control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
     322            0 :         let timeline = self
     323            0 :             .load_temp_timeline(ttid, &tmp_dir_path, generation)
     324            0 :             .await?;
     325            0 :         Ok(timeline)
     326            0 :     }
     327              : 
     328              :     /// Move timeline from a temp directory to the main storage, and load it to
     329              :     /// the global map. Creating timeline in this way ensures atomicity: rename
     330              :     /// is atomic, so either move of the whole datadir succeeds or it doesn't,
     331              :     /// but corrupted data dir shouldn't be possible.
     332              :     ///
     333              :     /// We'd like to avoid holding map lock while doing IO, so it's a 3 step
     334              :     /// process:
     335              :     /// 1) check the global map that timeline doesn't exist and mark that we're
     336              :     ///    creating it;
     337              :     /// 2) move the directory and load the timeline
     338              :     /// 3) take lock again and insert the timeline into the global map.
     339            0 :     pub async fn load_temp_timeline(
     340            0 :         &self,
     341            0 :         ttid: TenantTimelineId,
     342            0 :         tmp_path: &Utf8PathBuf,
     343            0 :         generation: Option<SafekeeperGeneration>,
     344            0 :     ) -> Result<Arc<Timeline>> {
     345              :         // Check for existence and mark that we're creating it.
     346            0 :         let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
     347            0 :             let mut state = self.state.lock().unwrap();
     348            0 :             match state.timelines.get(&ttid) {
     349              :                 Some(GlobalMapTimeline::CreationInProgress) => {
     350            0 :                     bail!(TimelineError::CreationInProgress(ttid));
     351              :                 }
     352              :                 Some(GlobalMapTimeline::Timeline(_)) => {
     353            0 :                     bail!(TimelineError::AlreadyExists(ttid));
     354              :                 }
     355            0 :                 _ => {}
     356              :             }
     357              : 
     358            0 :             if state.has_tombstone(&ttid, generation) {
     359              :                 // If the timeline is deleted, we refuse to recreate it.
     360              :                 // This is a safeguard against accidentally overwriting a timeline that was deleted
     361              :                 // by concurrent request.
     362            0 :                 anyhow::bail!(TimelineError::Deleted(ttid));
     363            0 :             }
     364              : 
     365              :             // We might have an outdated tombstone with the older generation.
     366              :             // Remove it unconditionally.
     367            0 :             state.remove_timeline_tombstone(&ttid);
     368              : 
     369            0 :             state
     370            0 :                 .timelines
     371            0 :                 .insert(ttid, GlobalMapTimeline::CreationInProgress);
     372            0 :             state.get_dependencies()
     373              :         };
     374              : 
     375              :         // Do the actual move and reflect the result in the map.
     376            0 :         match GlobalTimelines::install_temp_timeline(
     377            0 :             ttid,
     378            0 :             tmp_path,
     379            0 :             conf.clone(),
     380            0 :             wal_backup.clone(),
     381              :         )
     382            0 :         .await
     383              :         {
     384            0 :             Ok(timeline) => {
     385            0 :                 let mut timeline_shared_state = timeline.write_shared_state().await;
     386            0 :                 let mut state = self.state.lock().unwrap();
     387            0 :                 assert!(matches!(
     388            0 :                     state.timelines.get(&ttid),
     389              :                     Some(GlobalMapTimeline::CreationInProgress)
     390              :                 ));
     391              : 
     392            0 :                 state
     393            0 :                     .timelines
     394            0 :                     .insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
     395            0 :                 drop(state);
     396            0 :                 timeline.bootstrap(
     397            0 :                     &mut timeline_shared_state,
     398            0 :                     &conf,
     399            0 :                     broker_active_set,
     400            0 :                     partial_backup_rate_limiter,
     401            0 :                     wal_backup,
     402              :                 );
     403            0 :                 drop(timeline_shared_state);
     404            0 :                 Ok(timeline)
     405              :             }
     406            0 :             Err(e) => {
     407              :                 // Init failed, remove the marker from the map
     408            0 :                 let mut state = self.state.lock().unwrap();
     409            0 :                 assert!(matches!(
     410            0 :                     state.timelines.get(&ttid),
     411              :                     Some(GlobalMapTimeline::CreationInProgress)
     412              :                 ));
     413            0 :                 state.timelines.remove(&ttid);
     414            0 :                 Err(e)
     415              :             }
     416              :         }
     417            0 :     }
     418              : 
     419              :     /// Main part of load_temp_timeline: do the move and load.
     420            0 :     async fn install_temp_timeline(
     421            0 :         ttid: TenantTimelineId,
     422            0 :         tmp_path: &Utf8PathBuf,
     423            0 :         conf: Arc<SafeKeeperConf>,
     424            0 :         wal_backup: Arc<WalBackup>,
     425            0 :     ) -> Result<Arc<Timeline>> {
     426            0 :         let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
     427            0 :         let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
     428              : 
     429              :         // We must have already checked that timeline doesn't exist in the map,
     430              :         // but there might be existing datadir: if timeline is corrupted it is
     431              :         // not loaded. We don't want to overwrite such a dir, so check for its
     432              :         // existence.
     433            0 :         match fs::metadata(&timeline_path).await {
     434              :             Ok(_) => {
     435              :                 // Timeline directory exists on disk, we should leave state unchanged
     436              :                 // and return error.
     437            0 :                 bail!(TimelineError::Invalid(ttid));
     438              :             }
     439            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     440            0 :             Err(e) => {
     441            0 :                 return Err(e.into());
     442              :             }
     443              :         }
     444              : 
     445            0 :         info!(
     446            0 :             "moving timeline {} from {} to {}",
     447              :             ttid, tmp_path, timeline_path
     448              :         );
     449              : 
     450              :         // Now it is safe to move the timeline directory to the correct
     451              :         // location. First, create tenant directory. Ignore error if it already
     452              :         // exists.
     453            0 :         if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
     454            0 :             if e.kind() != std::io::ErrorKind::AlreadyExists {
     455            0 :                 return Err(e.into());
     456            0 :             }
     457            0 :         }
     458              :         // fsync it
     459            0 :         fsync_async_opt(&tenant_path, !conf.no_sync).await?;
     460              :         // and its creation
     461            0 :         fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
     462              : 
     463              :         // Do the move.
     464            0 :         durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
     465              : 
     466            0 :         Timeline::load_timeline(conf, ttid, wal_backup)
     467            0 :     }
     468              : 
     469              :     /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
     470              :     /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
     471              :     /// i.e. loaded in memory and not cancelled.
     472            0 :     pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
     473            0 :         let tli_res = {
     474            0 :             let state = self.state.lock().unwrap();
     475            0 :             state.get(&ttid)
     476              :         };
     477            0 :         match tli_res {
     478            0 :             Ok(tli) => {
     479            0 :                 if tli.is_cancelled() {
     480            0 :                     return Err(TimelineError::Cancelled(ttid));
     481            0 :                 }
     482            0 :                 Ok(tli)
     483              :             }
     484            0 :             _ => tli_res,
     485              :         }
     486            0 :     }
     487              : 
     488              :     /// Returns all timelines. This is used for background timeline processes.
     489            0 :     pub fn get_all(&self) -> Vec<Arc<Timeline>> {
     490            0 :         let global_lock = self.state.lock().unwrap();
     491            0 :         global_lock
     492            0 :             .timelines
     493            0 :             .values()
     494            0 :             .filter_map(|t| match t {
     495            0 :                 GlobalMapTimeline::Timeline(t) => {
     496            0 :                     if t.is_cancelled() {
     497            0 :                         None
     498              :                     } else {
     499            0 :                         Some(t.clone())
     500              :                     }
     501              :                 }
     502            0 :                 _ => None,
     503            0 :             })
     504            0 :             .collect()
     505            0 :     }
     506              : 
     507              :     /// Returns statistics about timeline counts
     508            0 :     pub fn get_timeline_counts(&self) -> SafekeeperUtilization {
     509            0 :         let global_lock = self.state.lock().unwrap();
     510            0 :         let timeline_count = global_lock
     511            0 :             .timelines
     512            0 :             .values()
     513            0 :             .filter(|t| match t {
     514            0 :                 GlobalMapTimeline::CreationInProgress => false,
     515            0 :                 GlobalMapTimeline::Timeline(t) => !t.is_cancelled(),
     516            0 :             })
     517            0 :             .count() as u64;
     518            0 :         SafekeeperUtilization { timeline_count }
     519            0 :     }
     520              : 
     521              :     /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
     522              :     /// and that's why it can return cancelled timelines, to retry deleting them.
     523            0 :     fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
     524            0 :         let global_lock = self.state.lock().unwrap();
     525            0 :         global_lock
     526            0 :             .timelines
     527            0 :             .values()
     528            0 :             .filter_map(|t| match t {
     529            0 :                 GlobalMapTimeline::Timeline(t) => Some(t.clone()),
     530            0 :                 _ => None,
     531            0 :             })
     532            0 :             .filter(|t| t.ttid.tenant_id == tenant_id)
     533            0 :             .collect()
     534            0 :     }
     535              : 
     536              :     /// Delete timeline, only locally on this node or globally (also cleaning
     537              :     /// remote storage WAL), depending on `action` value.
     538            0 :     pub(crate) async fn delete_or_exclude(
     539            0 :         &self,
     540            0 :         ttid: &TenantTimelineId,
     541            0 :         action: DeleteOrExclude,
     542            0 :     ) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
     543            0 :         let generation = match &action {
     544            0 :             DeleteOrExclude::Delete | DeleteOrExclude::DeleteLocal => None,
     545            0 :             DeleteOrExclude::Exclude(mconf) => Some(mconf.generation),
     546              :         };
     547              : 
     548            0 :         let tli_res = {
     549            0 :             let state = self.state.lock().unwrap();
     550              : 
     551              :             // Do NOT check tenant tombstones here: those were set earlier
     552            0 :             if state.has_timeline_tombstone(ttid, generation) {
     553              :                 // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
     554            0 :                 info!("Timeline {ttid} was already deleted");
     555            0 :                 return Ok(TimelineDeleteResult { dir_existed: false });
     556            0 :             }
     557              : 
     558            0 :             state.get(ttid)
     559              :         };
     560              : 
     561            0 :         let result = match tli_res {
     562            0 :             Ok(timeline) => {
     563            0 :                 info!("deleting timeline {}, action={:?}", ttid, action);
     564              : 
     565              :                 // If node is getting excluded, check the generation first.
     566              :                 // Then, while holding the lock cancel the timeline; it will be
     567              :                 // unusable after this point, and if node is added back first
     568              :                 // deletion must be completed and node seeded anew.
     569              :                 //
     570              :                 // We would like to avoid holding the lock while waiting for the
     571              :                 // gate to finish as this is deadlock prone, so for actual
     572              :                 // deletion will take it second time.
     573              :                 //
     574              :                 // Canceling the timeline will block membership switch requests,
     575              :                 // ensuring that the timeline generation will not increase
     576              :                 // after this point, and we will not remove a timeline with a generation
     577              :                 // higher than the requested one.
     578            0 :                 if let DeleteOrExclude::Exclude(ref mconf) = action {
     579            0 :                     let shared_state = timeline.read_shared_state().await;
     580            0 :                     if shared_state.sk.state().mconf.generation > mconf.generation {
     581            0 :                         return Err(DeleteOrExcludeError::Conflict {
     582            0 :                             requested: mconf.clone(),
     583            0 :                             current: shared_state.sk.state().mconf.clone(),
     584            0 :                         });
     585            0 :                     }
     586            0 :                     timeline.cancel();
     587            0 :                 } else {
     588            0 :                     timeline.cancel();
     589            0 :                 }
     590              : 
     591            0 :                 timeline.close().await;
     592              : 
     593            0 :                 info!("timeline {ttid} shut down for deletion");
     594              : 
     595              :                 // Take a lock and finish the deletion holding this mutex.
     596            0 :                 let mut shared_state = timeline.write_shared_state().await;
     597              : 
     598            0 :                 let only_local = !matches!(action, DeleteOrExclude::Delete);
     599            0 :                 let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
     600              : 
     601            0 :                 Ok(TimelineDeleteResult { dir_existed })
     602              :             }
     603              :             Err(_) => {
     604              :                 // Timeline is not memory, but it may still exist on disk in broken state.
     605            0 :                 let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
     606            0 :                 let dir_existed = delete_dir(&dir_path).await?;
     607              : 
     608            0 :                 Ok(TimelineDeleteResult { dir_existed })
     609              :             }
     610              :         };
     611              : 
     612              :         // Finalize deletion, by dropping Timeline objects and storing smaller tombstones.  The tombstones
     613              :         // are used to prevent still-running computes from re-creating the same timeline when they send data,
     614              :         // and to speed up repeated deletion calls by avoiding re-listing objects.
     615            0 :         self.state.lock().unwrap().delete(*ttid, generation);
     616              : 
     617            0 :         result
     618            0 :     }
     619              : 
     620              :     /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
     621              :     /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
     622              :     /// created simultaneously. In that case the function will return error and the caller should
     623              :     /// retry tenant deletion again later.
     624              :     ///
     625              :     /// If only_local, doesn't remove WAL segments in remote storage.
     626            0 :     pub async fn delete_all_for_tenant(
     627            0 :         &self,
     628            0 :         tenant_id: &TenantId,
     629            0 :         action: DeleteOrExclude,
     630            0 :     ) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
     631            0 :         info!("deleting all timelines for tenant {}", tenant_id);
     632              : 
     633              :         // Adding a tombstone before getting the timelines to prevent new timeline additions
     634            0 :         self.state.lock().unwrap().add_tenant_tombstone(*tenant_id);
     635              : 
     636            0 :         let to_delete = self.get_all_for_tenant(*tenant_id);
     637              : 
     638            0 :         let mut err = None;
     639              : 
     640            0 :         let mut deleted = HashMap::new();
     641            0 :         for tli in &to_delete {
     642            0 :             match self.delete_or_exclude(&tli.ttid, action.clone()).await {
     643            0 :                 Ok(result) => {
     644            0 :                     deleted.insert(tli.ttid, result);
     645            0 :                 }
     646            0 :                 Err(e) => {
     647            0 :                     error!("failed to delete timeline {}: {}", tli.ttid, e);
     648              :                     // Save error to return later.
     649            0 :                     err = Some(e);
     650              :                 }
     651              :             }
     652              :         }
     653              : 
     654              :         // If there was an error, return it.
     655            0 :         if let Some(e) = err {
     656            0 :             return Err(anyhow::Error::from(e));
     657            0 :         }
     658              : 
     659              :         // There may be broken timelines on disk, so delete the whole tenant dir as well.
     660              :         // Note that we could concurrently create new timelines while we were deleting them,
     661              :         // so the directory may be not empty. In this case timelines will have bad state
     662              :         // and timeline background jobs can panic.
     663            0 :         let tenant_dir = get_tenant_dir(self.state.lock().unwrap().conf.as_ref(), tenant_id);
     664            0 :         delete_dir(&tenant_dir).await?;
     665              : 
     666            0 :         Ok(deleted)
     667            0 :     }
     668              : 
     669            0 :     pub fn housekeeping(&self, tombstone_ttl: &Duration) {
     670            0 :         let mut state = self.state.lock().unwrap();
     671              : 
     672              :         // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
     673              :         // timelines.  If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
     674              :         // may recreate a deleted timeline.
     675            0 :         let now = Instant::now();
     676            0 :         state
     677            0 :             .timeline_tombstones
     678            0 :             .retain(|_, v| now.duration_since(v.timestamp) < *tombstone_ttl);
     679            0 :         state
     680            0 :             .tenant_tombstones
     681            0 :             .retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
     682            0 :     }
     683              : 
     684            0 :     pub fn get_sk_id(&self) -> NodeId {
     685            0 :         self.state.lock().unwrap().conf.my_id
     686            0 :     }
     687              : }
     688              : 
     689              : /// Action for delete_or_exclude.
     690              : #[derive(Clone, Debug)]
     691              : pub enum DeleteOrExclude {
     692              :     /// Delete timeline globally.
     693              :     Delete,
     694              :     /// Legacy mode until we fully migrate to generations: like exclude deletes
     695              :     /// timeline only locally, but ignores generation number.
     696              :     DeleteLocal,
     697              :     /// This node is getting excluded, delete timeline locally.
     698              :     Exclude(membership::Configuration),
     699              : }
     700              : 
     701              : /// Create temp directory for a new timeline. It needs to be located on the same
     702              : /// filesystem as the rest of the timelines. It will be automatically deleted when
     703              : /// Utf8TempDir goes out of scope.
     704            0 : pub async fn create_temp_timeline_dir(
     705            0 :     conf: &SafeKeeperConf,
     706            0 :     ttid: TenantTimelineId,
     707            0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
     708            0 :     let temp_base = conf.workdir.join("tmp");
     709              : 
     710            0 :     tokio::fs::create_dir_all(&temp_base).await?;
     711              : 
     712            0 :     let tli_dir = camino_tempfile::Builder::new()
     713            0 :         .suffix("_temptli")
     714            0 :         .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
     715            0 :         .tempdir_in(temp_base)?;
     716              : 
     717            0 :     let tli_dir_path = tli_dir.path().to_path_buf();
     718              : 
     719            0 :     Ok((tli_dir, tli_dir_path))
     720            0 : }
     721              : 
     722              : /// Do basic validation of a temp timeline, before moving it to the global map.
     723            0 : pub async fn validate_temp_timeline(
     724            0 :     conf: &SafeKeeperConf,
     725            0 :     ttid: TenantTimelineId,
     726            0 :     path: &Utf8PathBuf,
     727            0 :     generation: Option<SafekeeperGeneration>,
     728            0 : ) -> Result<(Lsn, Lsn)> {
     729            0 :     let control_path = path.join("safekeeper.control");
     730              : 
     731            0 :     let control_store = control_file::FileStorage::load_control_file(control_path)?;
     732            0 :     if control_store.server.wal_seg_size == 0 {
     733            0 :         bail!("wal_seg_size is not set");
     734            0 :     }
     735              : 
     736            0 :     if let Some(generation) = generation {
     737            0 :         if control_store.mconf.generation > generation {
     738            0 :             bail!(
     739            0 :                 "tmp timeline generation {} is higher than expected {generation}",
     740              :                 control_store.mconf.generation
     741              :             );
     742            0 :         }
     743            0 :     }
     744              : 
     745            0 :     let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
     746              : 
     747            0 :     let commit_lsn = control_store.commit_lsn;
     748            0 :     let flush_lsn = wal_store.flush_lsn();
     749              : 
     750            0 :     Ok((commit_lsn, flush_lsn))
     751            0 : }
     752              : 
     753              : /// A tombstone for a deleted timeline.
     754              : /// The generation is passed with "exclude" request and stored in the tombstone.
     755              : /// We ignore the tombstone if the request generation is higher than
     756              : /// the tombstone generation.
     757              : /// If the tombstone doesn't have a generation, it's considered permanent,
     758              : /// e.g. after "delete" request.
     759              : struct TimelineTombstone {
     760              :     timestamp: Instant,
     761              :     generation: Option<SafekeeperGeneration>,
     762              : }
     763              : 
     764              : impl TimelineTombstone {
     765            0 :     fn new(generation: Option<SafekeeperGeneration>) -> Self {
     766            0 :         TimelineTombstone {
     767            0 :             timestamp: Instant::now(),
     768            0 :             generation,
     769            0 :         }
     770            0 :     }
     771              : 
     772              :     /// Check if the timeline is still valid for the given generation.
     773            0 :     fn is_valid(&self, generation: SafekeeperGeneration) -> bool {
     774            0 :         self.generation.is_none_or(|g| g >= generation)
     775            0 :     }
     776              : }
        

Generated by: LCOV version 2.1-beta