LCOV - code coverage report
Current view: top level - safekeeper/src - timelines_global_map.rs (source / functions) Coverage Total Hit
Test: 7179b4db0d82ca8088cc95c44c4be4232078509c.info Lines: 0.0 % 373 0
Test Date: 2024-11-21 16:46:58 Functions: 0.0 % 37 0

            Line data    Source code
       1              : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
       2              : //! All timelines should always be present in this map, this is done by loading them
       3              : //! all from the disk on startup and keeping them in memory.
       4              : 
       5              : use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
       6              : use crate::rate_limit::RateLimiter;
       7              : use crate::safekeeper::ServerInfo;
       8              : use crate::state::TimelinePersistentState;
       9              : use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
      10              : use crate::timelines_set::TimelinesSet;
      11              : use crate::wal_storage::Storage;
      12              : use crate::{control_file, wal_storage, SafeKeeperConf};
      13              : use anyhow::{bail, Context, Result};
      14              : use camino::Utf8PathBuf;
      15              : use camino_tempfile::Utf8TempDir;
      16              : use once_cell::sync::Lazy;
      17              : use serde::Serialize;
      18              : use std::collections::HashMap;
      19              : use std::str::FromStr;
      20              : use std::sync::atomic::Ordering;
      21              : use std::sync::{Arc, Mutex};
      22              : use std::time::{Duration, Instant};
      23              : use tokio::fs;
      24              : use tracing::*;
      25              : use utils::crashsafe::{durable_rename, fsync_async_opt};
      26              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      27              : use utils::lsn::Lsn;
      28              : 
      29              : // Timeline entry in the global map: either a ready timeline, or mark that it is
      30              : // being created.
      31              : #[derive(Clone)]
      32              : enum GlobalMapTimeline {
      33              :     CreationInProgress,
      34              :     Timeline(Arc<Timeline>),
      35              : }
      36              : 
      37              : struct GlobalTimelinesState {
      38              :     timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
      39              : 
      40              :     // A tombstone indicates this timeline used to exist has been deleted.  These are used to prevent
      41              :     // on-demand timeline creation from recreating deleted timelines.  This is only soft-enforced, as
      42              :     // this map is dropped on restart.
      43              :     tombstones: HashMap<TenantTimelineId, Instant>,
      44              : 
      45              :     conf: Option<SafeKeeperConf>,
      46              :     broker_active_set: Arc<TimelinesSet>,
      47              :     global_rate_limiter: RateLimiter,
      48              : }
      49              : 
      50              : impl GlobalTimelinesState {
      51              :     /// Get configuration, which must be set once during init.
      52            0 :     fn get_conf(&self) -> &SafeKeeperConf {
      53            0 :         self.conf
      54            0 :             .as_ref()
      55            0 :             .expect("GlobalTimelinesState conf is not initialized")
      56            0 :     }
      57              : 
      58              :     /// Get dependencies for a timeline constructor.
      59            0 :     fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
      60            0 :         (
      61            0 :             self.get_conf().clone(),
      62            0 :             self.broker_active_set.clone(),
      63            0 :             self.global_rate_limiter.clone(),
      64            0 :         )
      65            0 :     }
      66              : 
      67              :     /// Get timeline from the map. Returns error if timeline doesn't exist or
      68              :     /// creation is in progress.
      69            0 :     fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
      70            0 :         match self.timelines.get(ttid).cloned() {
      71            0 :             Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
      72              :             Some(GlobalMapTimeline::CreationInProgress) => {
      73            0 :                 Err(TimelineError::CreationInProgress(*ttid))
      74              :             }
      75            0 :             None => Err(TimelineError::NotFound(*ttid)),
      76              :         }
      77            0 :     }
      78              : 
      79            0 :     fn delete(&mut self, ttid: TenantTimelineId) {
      80            0 :         self.timelines.remove(&ttid);
      81            0 :         self.tombstones.insert(ttid, Instant::now());
      82            0 :     }
      83              : }
      84              : 
      85            0 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
      86            0 :     Mutex::new(GlobalTimelinesState {
      87            0 :         timelines: HashMap::new(),
      88            0 :         tombstones: HashMap::new(),
      89            0 :         conf: None,
      90            0 :         broker_active_set: Arc::new(TimelinesSet::default()),
      91            0 :         global_rate_limiter: RateLimiter::new(1, 1),
      92            0 :     })
      93            0 : });
      94              : 
      95              : /// A zero-sized struct used to manage access to the global timelines map.
      96              : pub struct GlobalTimelines;
      97              : 
      98              : impl GlobalTimelines {
      99              :     /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
     100            0 :     pub async fn init(conf: SafeKeeperConf) -> Result<()> {
     101            0 :         // clippy isn't smart enough to understand that drop(state) releases the
     102            0 :         // lock, so use explicit block
     103            0 :         let tenants_dir = {
     104            0 :             let mut state = TIMELINES_STATE.lock().unwrap();
     105            0 :             state.global_rate_limiter = RateLimiter::new(
     106            0 :                 conf.partial_backup_concurrency,
     107            0 :                 DEFAULT_EVICTION_CONCURRENCY,
     108            0 :             );
     109            0 :             state.conf = Some(conf);
     110            0 : 
     111            0 :             // Iterate through all directories and load tenants for all directories
     112            0 :             // named as a valid tenant_id.
     113            0 :             state.get_conf().workdir.clone()
     114            0 :         };
     115            0 :         let mut tenant_count = 0;
     116            0 :         for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
     117            0 :             .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
     118              :         {
     119            0 :             match &tenants_dir_entry {
     120            0 :                 Ok(tenants_dir_entry) => {
     121            0 :                     if let Ok(tenant_id) =
     122            0 :                         TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
     123              :                     {
     124            0 :                         tenant_count += 1;
     125            0 :                         GlobalTimelines::load_tenant_timelines(tenant_id).await?;
     126            0 :                     }
     127              :                 }
     128            0 :                 Err(e) => error!(
     129            0 :                     "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
     130              :                     tenants_dir_entry, tenants_dir, e
     131              :                 ),
     132              :             }
     133              :         }
     134              : 
     135            0 :         info!(
     136            0 :             "found {} tenants directories, successfully loaded {} timelines",
     137            0 :             tenant_count,
     138            0 :             TIMELINES_STATE.lock().unwrap().timelines.len()
     139              :         );
     140            0 :         Ok(())
     141            0 :     }
     142              : 
     143              :     /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
     144              :     /// errors if any.
     145              :     ///
     146              :     /// It is async, but TIMELINES_STATE lock is sync and there is no important
     147              :     /// reason to make it async (it is always held for a short while), so we
     148              :     /// just lock and unlock it for each timeline -- this function is called
     149              :     /// during init when nothing else is running, so this is fine.
     150            0 :     async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
     151            0 :         let (conf, broker_active_set, partial_backup_rate_limiter) = {
     152            0 :             let state = TIMELINES_STATE.lock().unwrap();
     153            0 :             state.get_dependencies()
     154            0 :         };
     155            0 : 
     156            0 :         let timelines_dir = get_tenant_dir(&conf, &tenant_id);
     157            0 :         for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
     158            0 :             .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
     159              :         {
     160            0 :             match &timelines_dir_entry {
     161            0 :                 Ok(timeline_dir_entry) => {
     162            0 :                     if let Ok(timeline_id) =
     163            0 :                         TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
     164              :                     {
     165            0 :                         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     166            0 :                         match Timeline::load_timeline(&conf, ttid) {
     167            0 :                             Ok(tli) => {
     168            0 :                                 let mut shared_state = tli.write_shared_state().await;
     169            0 :                                 TIMELINES_STATE
     170            0 :                                     .lock()
     171            0 :                                     .unwrap()
     172            0 :                                     .timelines
     173            0 :                                     .insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
     174            0 :                                 tli.bootstrap(
     175            0 :                                     &mut shared_state,
     176            0 :                                     &conf,
     177            0 :                                     broker_active_set.clone(),
     178            0 :                                     partial_backup_rate_limiter.clone(),
     179            0 :                                 );
     180              :                             }
     181              :                             // If we can't load a timeline, it's most likely because of a corrupted
     182              :                             // directory. We will log an error and won't allow to delete/recreate
     183              :                             // this timeline. The only way to fix this timeline is to repair manually
     184              :                             // and restart the safekeeper.
     185            0 :                             Err(e) => error!(
     186            0 :                                 "failed to load timeline {} for tenant {}, reason: {:?}",
     187              :                                 timeline_id, tenant_id, e
     188              :                             ),
     189              :                         }
     190            0 :                     }
     191              :                 }
     192            0 :                 Err(e) => error!(
     193            0 :                     "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
     194              :                     timelines_dir_entry, timelines_dir, e
     195              :                 ),
     196              :             }
     197              :         }
     198              : 
     199            0 :         Ok(())
     200            0 :     }
     201              : 
     202              :     /// Get the number of timelines in the map.
     203            0 :     pub fn timelines_count() -> usize {
     204            0 :         TIMELINES_STATE.lock().unwrap().timelines.len()
     205            0 :     }
     206              : 
     207              :     /// Get the global safekeeper config.
     208            0 :     pub fn get_global_config() -> SafeKeeperConf {
     209            0 :         TIMELINES_STATE.lock().unwrap().get_conf().clone()
     210            0 :     }
     211              : 
     212            0 :     pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
     213            0 :         TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
     214            0 :     }
     215              : 
     216              :     /// Create a new timeline with the given id. If the timeline already exists, returns
     217              :     /// an existing timeline.
     218            0 :     pub(crate) async fn create(
     219            0 :         ttid: TenantTimelineId,
     220            0 :         server_info: ServerInfo,
     221            0 :         commit_lsn: Lsn,
     222            0 :         local_start_lsn: Lsn,
     223            0 :     ) -> Result<Arc<Timeline>> {
     224            0 :         let (conf, _, _) = {
     225            0 :             let state = TIMELINES_STATE.lock().unwrap();
     226            0 :             if let Ok(timeline) = state.get(&ttid) {
     227              :                 // Timeline already exists, return it.
     228            0 :                 return Ok(timeline);
     229            0 :             }
     230            0 : 
     231            0 :             if state.tombstones.contains_key(&ttid) {
     232            0 :                 anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
     233            0 :             }
     234            0 : 
     235            0 :             state.get_dependencies()
     236            0 :         };
     237            0 : 
     238            0 :         info!("creating new timeline {}", ttid);
     239              : 
     240              :         // Do on disk initialization in tmp dir.
     241            0 :         let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
     242              : 
     243              :         // TODO: currently we create only cfile. It would be reasonable to
     244              :         // immediately initialize first WAL segment as well.
     245            0 :         let state =
     246            0 :             TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
     247            0 :         control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
     248            0 :         let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
     249            0 :         Ok(timeline)
     250            0 :     }
     251              : 
     252              :     /// Move timeline from a temp directory to the main storage, and load it to
     253              :     /// the global map. Creating timeline in this way ensures atomicity: rename
     254              :     /// is atomic, so either move of the whole datadir succeeds or it doesn't,
     255              :     /// but corrupted data dir shouldn't be possible.
     256              :     ///
     257              :     /// We'd like to avoid holding map lock while doing IO, so it's a 3 step
     258              :     /// process:
     259              :     /// 1) check the global map that timeline doesn't exist and mark that we're
     260              :     ///    creating it;
     261              :     /// 2) move the directory and load the timeline
     262              :     /// 3) take lock again and insert the timeline into the global map.
     263            0 :     pub async fn load_temp_timeline(
     264            0 :         ttid: TenantTimelineId,
     265            0 :         tmp_path: &Utf8PathBuf,
     266            0 :         check_tombstone: bool,
     267            0 :     ) -> Result<Arc<Timeline>> {
     268              :         // Check for existence and mark that we're creating it.
     269            0 :         let (conf, broker_active_set, partial_backup_rate_limiter) = {
     270            0 :             let mut state = TIMELINES_STATE.lock().unwrap();
     271            0 :             match state.timelines.get(&ttid) {
     272              :                 Some(GlobalMapTimeline::CreationInProgress) => {
     273            0 :                     bail!(TimelineError::CreationInProgress(ttid));
     274              :                 }
     275              :                 Some(GlobalMapTimeline::Timeline(_)) => {
     276            0 :                     bail!(TimelineError::AlreadyExists(ttid));
     277              :                 }
     278            0 :                 _ => {}
     279            0 :             }
     280            0 :             if check_tombstone {
     281            0 :                 if state.tombstones.contains_key(&ttid) {
     282            0 :                     anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
     283            0 :                 }
     284              :             } else {
     285              :                 // We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`).  We trust
     286              :                 // that the human doing this manual intervention knows what they are doing, and remove its tombstone.
     287            0 :                 if state.tombstones.remove(&ttid).is_some() {
     288            0 :                     warn!("un-deleted timeline {ttid}");
     289            0 :                 }
     290              :             }
     291            0 :             state
     292            0 :                 .timelines
     293            0 :                 .insert(ttid, GlobalMapTimeline::CreationInProgress);
     294            0 :             state.get_dependencies()
     295            0 :         };
     296            0 : 
     297            0 :         // Do the actual move and reflect the result in the map.
     298            0 :         match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await {
     299            0 :             Ok(timeline) => {
     300            0 :                 let mut timeline_shared_state = timeline.write_shared_state().await;
     301            0 :                 let mut state = TIMELINES_STATE.lock().unwrap();
     302            0 :                 assert!(matches!(
     303            0 :                     state.timelines.get(&ttid),
     304              :                     Some(GlobalMapTimeline::CreationInProgress)
     305              :                 ));
     306              : 
     307            0 :                 state
     308            0 :                     .timelines
     309            0 :                     .insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
     310            0 :                 drop(state);
     311            0 :                 timeline.bootstrap(
     312            0 :                     &mut timeline_shared_state,
     313            0 :                     &conf,
     314            0 :                     broker_active_set,
     315            0 :                     partial_backup_rate_limiter,
     316            0 :                 );
     317            0 :                 drop(timeline_shared_state);
     318            0 :                 Ok(timeline)
     319              :             }
     320            0 :             Err(e) => {
     321            0 :                 // Init failed, remove the marker from the map
     322            0 :                 let mut state = TIMELINES_STATE.lock().unwrap();
     323            0 :                 assert!(matches!(
     324            0 :                     state.timelines.get(&ttid),
     325              :                     Some(GlobalMapTimeline::CreationInProgress)
     326              :                 ));
     327            0 :                 state.timelines.remove(&ttid);
     328            0 :                 Err(e)
     329              :             }
     330              :         }
     331            0 :     }
     332              : 
     333              :     /// Main part of load_temp_timeline: do the move and load.
     334            0 :     async fn install_temp_timeline(
     335            0 :         ttid: TenantTimelineId,
     336            0 :         tmp_path: &Utf8PathBuf,
     337            0 :         conf: &SafeKeeperConf,
     338            0 :     ) -> Result<Arc<Timeline>> {
     339            0 :         let tenant_path = get_tenant_dir(conf, &ttid.tenant_id);
     340            0 :         let timeline_path = get_timeline_dir(conf, &ttid);
     341            0 : 
     342            0 :         // We must have already checked that timeline doesn't exist in the map,
     343            0 :         // but there might be existing datadir: if timeline is corrupted it is
     344            0 :         // not loaded. We don't want to overwrite such a dir, so check for its
     345            0 :         // existence.
     346            0 :         match fs::metadata(&timeline_path).await {
     347              :             Ok(_) => {
     348              :                 // Timeline directory exists on disk, we should leave state unchanged
     349              :                 // and return error.
     350            0 :                 bail!(TimelineError::Invalid(ttid));
     351              :             }
     352            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     353            0 :             Err(e) => {
     354            0 :                 return Err(e.into());
     355              :             }
     356              :         }
     357              : 
     358            0 :         info!(
     359            0 :             "moving timeline {} from {} to {}",
     360              :             ttid, tmp_path, timeline_path
     361              :         );
     362              : 
     363              :         // Now it is safe to move the timeline directory to the correct
     364              :         // location. First, create tenant directory. Ignore error if it already
     365              :         // exists.
     366            0 :         if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
     367            0 :             if e.kind() != std::io::ErrorKind::AlreadyExists {
     368            0 :                 return Err(e.into());
     369            0 :             }
     370            0 :         }
     371              :         // fsync it
     372            0 :         fsync_async_opt(&tenant_path, !conf.no_sync).await?;
     373              :         // and its creation
     374            0 :         fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
     375              : 
     376              :         // Do the move.
     377            0 :         durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
     378              : 
     379            0 :         Timeline::load_timeline(conf, ttid)
     380            0 :     }
     381              : 
     382              :     /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
     383              :     /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
     384              :     /// i.e. loaded in memory and not cancelled.
     385            0 :     pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
     386            0 :         let tli_res = {
     387            0 :             let state = TIMELINES_STATE.lock().unwrap();
     388            0 :             state.get(&ttid)
     389            0 :         };
     390            0 :         match tli_res {
     391            0 :             Ok(tli) => {
     392            0 :                 if tli.is_cancelled() {
     393            0 :                     return Err(TimelineError::Cancelled(ttid));
     394            0 :                 }
     395            0 :                 Ok(tli)
     396              :             }
     397            0 :             _ => tli_res,
     398              :         }
     399            0 :     }
     400              : 
     401              :     /// Returns all timelines. This is used for background timeline processes.
     402            0 :     pub fn get_all() -> Vec<Arc<Timeline>> {
     403            0 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     404            0 :         global_lock
     405            0 :             .timelines
     406            0 :             .values()
     407            0 :             .filter_map(|t| match t {
     408            0 :                 GlobalMapTimeline::Timeline(t) => {
     409            0 :                     if t.is_cancelled() {
     410            0 :                         None
     411              :                     } else {
     412            0 :                         Some(t.clone())
     413              :                     }
     414              :                 }
     415            0 :                 _ => None,
     416            0 :             })
     417            0 :             .collect()
     418            0 :     }
     419              : 
     420              :     /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
     421              :     /// and that's why it can return cancelled timelines, to retry deleting them.
     422            0 :     fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
     423            0 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     424            0 :         global_lock
     425            0 :             .timelines
     426            0 :             .values()
     427            0 :             .filter_map(|t| match t {
     428            0 :                 GlobalMapTimeline::Timeline(t) => Some(t.clone()),
     429            0 :                 _ => None,
     430            0 :             })
     431            0 :             .filter(|t| t.ttid.tenant_id == tenant_id)
     432            0 :             .collect()
     433            0 :     }
     434              : 
     435              :     /// Cancels timeline, then deletes the corresponding data directory.
     436              :     /// If only_local, doesn't remove WAL segments in remote storage.
     437            0 :     pub(crate) async fn delete(
     438            0 :         ttid: &TenantTimelineId,
     439            0 :         only_local: bool,
     440            0 :     ) -> Result<TimelineDeleteForceResult> {
     441            0 :         let tli_res = {
     442            0 :             let state = TIMELINES_STATE.lock().unwrap();
     443            0 : 
     444            0 :             if state.tombstones.contains_key(ttid) {
     445              :                 // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
     446            0 :                 info!("Timeline {ttid} was already deleted");
     447            0 :                 return Ok(TimelineDeleteForceResult {
     448            0 :                     dir_existed: false,
     449            0 :                     was_active: false,
     450            0 :                 });
     451            0 :             }
     452            0 : 
     453            0 :             state.get(ttid)
     454              :         };
     455              : 
     456            0 :         let result = match tli_res {
     457            0 :             Ok(timeline) => {
     458            0 :                 let was_active = timeline.broker_active.load(Ordering::Relaxed);
     459            0 : 
     460            0 :                 info!("deleting timeline {}, only_local={}", ttid, only_local);
     461            0 :                 timeline.shutdown().await;
     462              : 
     463              :                 // Take a lock and finish the deletion holding this mutex.
     464            0 :                 let mut shared_state = timeline.write_shared_state().await;
     465              : 
     466            0 :                 let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
     467              : 
     468            0 :                 Ok(TimelineDeleteForceResult {
     469            0 :                     dir_existed,
     470            0 :                     was_active, // TODO: we probably should remove this field
     471            0 :                 })
     472              :             }
     473              :             Err(_) => {
     474              :                 // Timeline is not memory, but it may still exist on disk in broken state.
     475            0 :                 let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid);
     476            0 :                 let dir_existed = delete_dir(dir_path)?;
     477              : 
     478            0 :                 Ok(TimelineDeleteForceResult {
     479            0 :                     dir_existed,
     480            0 :                     was_active: false,
     481            0 :                 })
     482              :             }
     483              :         };
     484              : 
     485              :         // Finalize deletion, by dropping Timeline objects and storing smaller tombstones.  The tombstones
     486              :         // are used to prevent still-running computes from re-creating the same timeline when they send data,
     487              :         // and to speed up repeated deletion calls by avoiding re-listing objects.
     488            0 :         TIMELINES_STATE.lock().unwrap().delete(*ttid);
     489            0 : 
     490            0 :         result
     491            0 :     }
     492              : 
     493              :     /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
     494              :     /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
     495              :     /// created simultaneously. In that case the function will return error and the caller should
     496              :     /// retry tenant deletion again later.
     497              :     ///
     498              :     /// If only_local, doesn't remove WAL segments in remote storage.
     499            0 :     pub async fn delete_force_all_for_tenant(
     500            0 :         tenant_id: &TenantId,
     501            0 :         only_local: bool,
     502            0 :     ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
     503            0 :         info!("deleting all timelines for tenant {}", tenant_id);
     504            0 :         let to_delete = Self::get_all_for_tenant(*tenant_id);
     505            0 : 
     506            0 :         let mut err = None;
     507            0 : 
     508            0 :         let mut deleted = HashMap::new();
     509            0 :         for tli in &to_delete {
     510            0 :             match Self::delete(&tli.ttid, only_local).await {
     511            0 :                 Ok(result) => {
     512            0 :                     deleted.insert(tli.ttid, result);
     513            0 :                 }
     514            0 :                 Err(e) => {
     515            0 :                     error!("failed to delete timeline {}: {}", tli.ttid, e);
     516              :                     // Save error to return later.
     517            0 :                     err = Some(e);
     518              :                 }
     519              :             }
     520              :         }
     521              : 
     522              :         // If there was an error, return it.
     523            0 :         if let Some(e) = err {
     524            0 :             return Err(e);
     525            0 :         }
     526            0 : 
     527            0 :         // There may be broken timelines on disk, so delete the whole tenant dir as well.
     528            0 :         // Note that we could concurrently create new timelines while we were deleting them,
     529            0 :         // so the directory may be not empty. In this case timelines will have bad state
     530            0 :         // and timeline background jobs can panic.
     531            0 :         delete_dir(get_tenant_dir(
     532            0 :             TIMELINES_STATE.lock().unwrap().get_conf(),
     533            0 :             tenant_id,
     534            0 :         ))?;
     535              : 
     536            0 :         Ok(deleted)
     537            0 :     }
     538              : 
     539            0 :     pub fn housekeeping(tombstone_ttl: &Duration) {
     540            0 :         let mut state = TIMELINES_STATE.lock().unwrap();
     541            0 : 
     542            0 :         // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
     543            0 :         // timelines.  If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
     544            0 :         // may recreate a deleted timeline.
     545            0 :         let now = Instant::now();
     546            0 :         state
     547            0 :             .tombstones
     548            0 :             .retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
     549            0 :     }
     550              : }
     551              : 
     552              : #[derive(Clone, Copy, Serialize)]
     553              : pub struct TimelineDeleteForceResult {
     554              :     pub dir_existed: bool,
     555              :     pub was_active: bool,
     556              : }
     557              : 
     558              : /// Deletes directory and it's contents. Returns false if directory does not exist.
     559            0 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
     560            0 :     match std::fs::remove_dir_all(path) {
     561            0 :         Ok(_) => Ok(true),
     562            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     563            0 :         Err(e) => Err(e.into()),
     564              :     }
     565            0 : }
     566              : 
     567              : /// Create temp directory for a new timeline. It needs to be located on the same
     568              : /// filesystem as the rest of the timelines. It will be automatically deleted when
     569              : /// Utf8TempDir goes out of scope.
     570            0 : pub async fn create_temp_timeline_dir(
     571            0 :     conf: &SafeKeeperConf,
     572            0 :     ttid: TenantTimelineId,
     573            0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
     574            0 :     let temp_base = conf.workdir.join("tmp");
     575            0 : 
     576            0 :     tokio::fs::create_dir_all(&temp_base).await?;
     577              : 
     578            0 :     let tli_dir = camino_tempfile::Builder::new()
     579            0 :         .suffix("_temptli")
     580            0 :         .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
     581            0 :         .tempdir_in(temp_base)?;
     582              : 
     583            0 :     let tli_dir_path = tli_dir.path().to_path_buf();
     584            0 : 
     585            0 :     Ok((tli_dir, tli_dir_path))
     586            0 : }
     587              : 
     588              : /// Do basic validation of a temp timeline, before moving it to the global map.
     589            0 : pub async fn validate_temp_timeline(
     590            0 :     conf: &SafeKeeperConf,
     591            0 :     ttid: TenantTimelineId,
     592            0 :     path: &Utf8PathBuf,
     593            0 : ) -> Result<(Lsn, Lsn)> {
     594            0 :     let control_path = path.join("safekeeper.control");
     595              : 
     596            0 :     let control_store = control_file::FileStorage::load_control_file(control_path)?;
     597            0 :     if control_store.server.wal_seg_size == 0 {
     598            0 :         bail!("wal_seg_size is not set");
     599            0 :     }
     600              : 
     601            0 :     let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
     602              : 
     603            0 :     let commit_lsn = control_store.commit_lsn;
     604            0 :     let flush_lsn = wal_store.flush_lsn();
     605            0 : 
     606            0 :     Ok((commit_lsn, flush_lsn))
     607            0 : }
        

Generated by: LCOV version 2.1-beta