LCOV - code coverage report
Current view: top level - safekeeper/src - timelines_global_map.rs (source / functions) Coverage Total Hit
Test: a43a77853355b937a79c57b07a8f05607cf29e6c.info Lines: 0.0 % 313 0
Test Date: 2024-09-19 12:04:32 Functions: 0.0 % 33 0

            Line data    Source code
       1              : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
       2              : //! All timelines should always be present in this map, this is done by loading them
       3              : //! all from the disk on startup and keeping them in memory.
       4              : 
       5              : use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
       6              : use crate::rate_limit::RateLimiter;
       7              : use crate::safekeeper::ServerInfo;
       8              : use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
       9              : use crate::timelines_set::TimelinesSet;
      10              : use crate::SafeKeeperConf;
      11              : use anyhow::{bail, Context, Result};
      12              : use camino::Utf8PathBuf;
      13              : use once_cell::sync::Lazy;
      14              : use serde::Serialize;
      15              : use std::collections::HashMap;
      16              : use std::str::FromStr;
      17              : use std::sync::atomic::Ordering;
      18              : use std::sync::{Arc, Mutex};
      19              : use std::time::{Duration, Instant};
      20              : use tracing::*;
      21              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      22              : use utils::lsn::Lsn;
      23              : 
      24              : struct GlobalTimelinesState {
      25              :     timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
      26              : 
      27              :     // A tombstone indicates this timeline used to exist has been deleted.  These are used to prevent
      28              :     // on-demand timeline creation from recreating deleted timelines.  This is only soft-enforced, as
      29              :     // this map is dropped on restart.
      30              :     tombstones: HashMap<TenantTimelineId, Instant>,
      31              : 
      32              :     conf: Option<SafeKeeperConf>,
      33              :     broker_active_set: Arc<TimelinesSet>,
      34              :     load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
      35              :     global_rate_limiter: RateLimiter,
      36              : }
      37              : 
      38              : // Used to prevent concurrent timeline loading.
      39              : pub struct TimelineLoadLock;
      40              : 
      41              : impl GlobalTimelinesState {
      42              :     /// Get configuration, which must be set once during init.
      43            0 :     fn get_conf(&self) -> &SafeKeeperConf {
      44            0 :         self.conf
      45            0 :             .as_ref()
      46            0 :             .expect("GlobalTimelinesState conf is not initialized")
      47            0 :     }
      48              : 
      49              :     /// Get dependencies for a timeline constructor.
      50            0 :     fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
      51            0 :         (
      52            0 :             self.get_conf().clone(),
      53            0 :             self.broker_active_set.clone(),
      54            0 :             self.global_rate_limiter.clone(),
      55            0 :         )
      56            0 :     }
      57              : 
      58              :     /// Insert timeline into the map. Returns error if timeline with the same id already exists.
      59            0 :     fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
      60            0 :         let ttid = timeline.ttid;
      61            0 :         if self.timelines.contains_key(&ttid) {
      62            0 :             bail!(TimelineError::AlreadyExists(ttid));
      63            0 :         }
      64            0 :         self.timelines.insert(ttid, timeline);
      65            0 :         Ok(())
      66            0 :     }
      67              : 
      68              :     /// Get timeline from the map. Returns error if timeline doesn't exist.
      69            0 :     fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
      70            0 :         self.timelines
      71            0 :             .get(ttid)
      72            0 :             .cloned()
      73            0 :             .ok_or(TimelineError::NotFound(*ttid))
      74            0 :     }
      75              : 
      76            0 :     fn delete(&mut self, ttid: TenantTimelineId) {
      77            0 :         self.timelines.remove(&ttid);
      78            0 :         self.tombstones.insert(ttid, Instant::now());
      79            0 :     }
      80              : }
      81              : 
      82            0 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
      83            0 :     Mutex::new(GlobalTimelinesState {
      84            0 :         timelines: HashMap::new(),
      85            0 :         tombstones: HashMap::new(),
      86            0 :         conf: None,
      87            0 :         broker_active_set: Arc::new(TimelinesSet::default()),
      88            0 :         load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
      89            0 :         global_rate_limiter: RateLimiter::new(1, 1),
      90            0 :     })
      91            0 : });
      92              : 
      93              : /// A zero-sized struct used to manage access to the global timelines map.
      94              : pub struct GlobalTimelines;
      95              : 
      96              : impl GlobalTimelines {
      97              :     /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
      98            0 :     pub async fn init(conf: SafeKeeperConf) -> Result<()> {
      99            0 :         // clippy isn't smart enough to understand that drop(state) releases the
     100            0 :         // lock, so use explicit block
     101            0 :         let tenants_dir = {
     102            0 :             let mut state = TIMELINES_STATE.lock().unwrap();
     103            0 :             state.global_rate_limiter = RateLimiter::new(
     104            0 :                 conf.partial_backup_concurrency,
     105            0 :                 DEFAULT_EVICTION_CONCURRENCY,
     106            0 :             );
     107            0 :             state.conf = Some(conf);
     108            0 : 
     109            0 :             // Iterate through all directories and load tenants for all directories
     110            0 :             // named as a valid tenant_id.
     111            0 :             state.get_conf().workdir.clone()
     112            0 :         };
     113            0 :         let mut tenant_count = 0;
     114            0 :         for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
     115            0 :             .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
     116              :         {
     117            0 :             match &tenants_dir_entry {
     118            0 :                 Ok(tenants_dir_entry) => {
     119            0 :                     if let Ok(tenant_id) =
     120            0 :                         TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
     121              :                     {
     122            0 :                         tenant_count += 1;
     123            0 :                         GlobalTimelines::load_tenant_timelines(tenant_id).await?;
     124            0 :                     }
     125              :                 }
     126            0 :                 Err(e) => error!(
     127            0 :                     "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
     128              :                     tenants_dir_entry, tenants_dir, e
     129              :                 ),
     130              :             }
     131              :         }
     132              : 
     133            0 :         info!(
     134            0 :             "found {} tenants directories, successfully loaded {} timelines",
     135            0 :             tenant_count,
     136            0 :             TIMELINES_STATE.lock().unwrap().timelines.len()
     137              :         );
     138            0 :         Ok(())
     139            0 :     }
     140              : 
     141              :     /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
     142              :     /// errors if any.
     143              :     ///
     144              :     /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
     145              :     /// sync and there is no important reason to make it async (it is always
     146              :     /// held for a short while) we just lock and unlock it for each timeline --
     147              :     /// this function is called during init when nothing else is running, so
     148              :     /// this is fine.
     149            0 :     async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
     150            0 :         let (conf, broker_active_set, partial_backup_rate_limiter) = {
     151            0 :             let state = TIMELINES_STATE.lock().unwrap();
     152            0 :             state.get_dependencies()
     153            0 :         };
     154            0 : 
     155            0 :         let timelines_dir = get_tenant_dir(&conf, &tenant_id);
     156            0 :         for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
     157            0 :             .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
     158              :         {
     159            0 :             match &timelines_dir_entry {
     160            0 :                 Ok(timeline_dir_entry) => {
     161            0 :                     if let Ok(timeline_id) =
     162            0 :                         TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
     163              :                     {
     164            0 :                         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     165            0 :                         match Timeline::load_timeline(&conf, ttid) {
     166            0 :                             Ok(timeline) => {
     167            0 :                                 let tli = Arc::new(timeline);
     168            0 :                                 TIMELINES_STATE
     169            0 :                                     .lock()
     170            0 :                                     .unwrap()
     171            0 :                                     .timelines
     172            0 :                                     .insert(ttid, tli.clone());
     173            0 :                                 tli.bootstrap(
     174            0 :                                     &conf,
     175            0 :                                     broker_active_set.clone(),
     176            0 :                                     partial_backup_rate_limiter.clone(),
     177            0 :                                 );
     178            0 :                             }
     179              :                             // If we can't load a timeline, it's most likely because of a corrupted
     180              :                             // directory. We will log an error and won't allow to delete/recreate
     181              :                             // this timeline. The only way to fix this timeline is to repair manually
     182              :                             // and restart the safekeeper.
     183            0 :                             Err(e) => error!(
     184            0 :                                 "failed to load timeline {} for tenant {}, reason: {:?}",
     185              :                                 timeline_id, tenant_id, e
     186              :                             ),
     187              :                         }
     188            0 :                     }
     189              :                 }
     190            0 :                 Err(e) => error!(
     191            0 :                     "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
     192              :                     timelines_dir_entry, timelines_dir, e
     193              :                 ),
     194              :             }
     195              :         }
     196              : 
     197            0 :         Ok(())
     198            0 :     }
     199              : 
     200              :     /// Take a lock for timeline loading.
     201            0 :     pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
     202            0 :         TIMELINES_STATE.lock().unwrap().load_lock.clone()
     203            0 :     }
     204              : 
     205              :     /// Load timeline from disk to the memory.
     206            0 :     pub async fn load_timeline<'a>(
     207            0 :         _guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
     208            0 :         ttid: TenantTimelineId,
     209            0 :     ) -> Result<Arc<Timeline>> {
     210            0 :         let (conf, broker_active_set, partial_backup_rate_limiter) =
     211            0 :             TIMELINES_STATE.lock().unwrap().get_dependencies();
     212            0 : 
     213            0 :         match Timeline::load_timeline(&conf, ttid) {
     214            0 :             Ok(timeline) => {
     215            0 :                 let tli = Arc::new(timeline);
     216            0 : 
     217            0 :                 // TODO: prevent concurrent timeline creation/loading
     218            0 :                 {
     219            0 :                     let mut state = TIMELINES_STATE.lock().unwrap();
     220            0 : 
     221            0 :                     // We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`).  We trust
     222            0 :                     // that the human doing this manual intervention knows what they are doing, and remove its tombstone.
     223            0 :                     if state.tombstones.remove(&ttid).is_some() {
     224            0 :                         warn!("Un-deleted timeline {ttid}");
     225            0 :                     }
     226              : 
     227            0 :                     state.timelines.insert(ttid, tli.clone());
     228            0 :                 }
     229            0 : 
     230            0 :                 tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
     231            0 : 
     232            0 :                 Ok(tli)
     233              :             }
     234              :             // If we can't load a timeline, it's bad. Caller will figure it out.
     235            0 :             Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
     236              :         }
     237            0 :     }
     238              : 
     239              :     /// Get the number of timelines in the map.
     240            0 :     pub fn timelines_count() -> usize {
     241            0 :         TIMELINES_STATE.lock().unwrap().timelines.len()
     242            0 :     }
     243              : 
     244              :     /// Get the global safekeeper config.
     245            0 :     pub fn get_global_config() -> SafeKeeperConf {
     246            0 :         TIMELINES_STATE.lock().unwrap().get_conf().clone()
     247            0 :     }
     248              : 
     249            0 :     pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
     250            0 :         TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
     251            0 :     }
     252              : 
     253              :     /// Create a new timeline with the given id. If the timeline already exists, returns
     254              :     /// an existing timeline.
     255            0 :     pub(crate) async fn create(
     256            0 :         ttid: TenantTimelineId,
     257            0 :         server_info: ServerInfo,
     258            0 :         commit_lsn: Lsn,
     259            0 :         local_start_lsn: Lsn,
     260            0 :     ) -> Result<Arc<Timeline>> {
     261            0 :         let (conf, broker_active_set, partial_backup_rate_limiter) = {
     262            0 :             let state = TIMELINES_STATE.lock().unwrap();
     263            0 :             if let Ok(timeline) = state.get(&ttid) {
     264              :                 // Timeline already exists, return it.
     265            0 :                 return Ok(timeline);
     266            0 :             }
     267            0 : 
     268            0 :             if state.tombstones.contains_key(&ttid) {
     269            0 :                 anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
     270            0 :             }
     271            0 : 
     272            0 :             state.get_dependencies()
     273            0 :         };
     274            0 : 
     275            0 :         info!("creating new timeline {}", ttid);
     276              : 
     277            0 :         let timeline = Arc::new(Timeline::create_empty(
     278            0 :             &conf,
     279            0 :             ttid,
     280            0 :             server_info,
     281            0 :             commit_lsn,
     282            0 :             local_start_lsn,
     283            0 :         )?);
     284              : 
     285              :         // Take a lock and finish the initialization holding this mutex. No other threads
     286              :         // can interfere with creation after we will insert timeline into the map.
     287              :         {
     288            0 :             let mut shared_state = timeline.write_shared_state().await;
     289              : 
     290              :             // We can get a race condition here in case of concurrent create calls, but only
     291              :             // in theory. create() will return valid timeline on the next try.
     292            0 :             TIMELINES_STATE
     293            0 :                 .lock()
     294            0 :                 .unwrap()
     295            0 :                 .try_insert(timeline.clone())?;
     296              : 
     297              :             // Write the new timeline to the disk and start background workers.
     298              :             // Bootstrap is transactional, so if it fails, the timeline will be deleted,
     299              :             // and the state on disk should remain unchanged.
     300            0 :             if let Err(e) = timeline
     301            0 :                 .init_new(
     302            0 :                     &mut shared_state,
     303            0 :                     &conf,
     304            0 :                     broker_active_set,
     305            0 :                     partial_backup_rate_limiter,
     306            0 :                 )
     307            0 :                 .await
     308              :             {
     309              :                 // Note: the most likely reason for init failure is that the timeline
     310              :                 // directory already exists on disk. This happens when timeline is corrupted
     311              :                 // and wasn't loaded from disk on startup because of that. We want to preserve
     312              :                 // the timeline directory in this case, for further inspection.
     313              : 
     314              :                 // TODO: this is an unusual error, perhaps we should send it to sentry
     315              :                 // TODO: compute will try to create timeline every second, we should add backoff
     316            0 :                 error!("failed to init new timeline {}: {}", ttid, e);
     317              : 
     318              :                 // Timeline failed to init, it cannot be used. Remove it from the map.
     319            0 :                 TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
     320            0 :                 return Err(e);
     321            0 :             }
     322            0 :             // We are done with bootstrap, release the lock, return the timeline.
     323            0 :             // {} block forces release before .await
     324            0 :         }
     325            0 :         Ok(timeline)
     326            0 :     }
     327              : 
     328              :     /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
     329              :     /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
     330              :     /// i.e. loaded in memory and not cancelled.
     331            0 :     pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
     332            0 :         let tli_res = {
     333            0 :             let state = TIMELINES_STATE.lock().unwrap();
     334            0 :             state.get(&ttid)
     335            0 :         };
     336            0 :         match tli_res {
     337            0 :             Ok(tli) => {
     338            0 :                 if tli.is_cancelled() {
     339            0 :                     return Err(TimelineError::Cancelled(ttid));
     340            0 :                 }
     341            0 :                 Ok(tli)
     342              :             }
     343            0 :             _ => tli_res,
     344              :         }
     345            0 :     }
     346              : 
     347              :     /// Returns all timelines. This is used for background timeline processes.
     348            0 :     pub fn get_all() -> Vec<Arc<Timeline>> {
     349            0 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     350            0 :         global_lock
     351            0 :             .timelines
     352            0 :             .values()
     353            0 :             .filter(|t| !t.is_cancelled())
     354            0 :             .cloned()
     355            0 :             .collect()
     356            0 :     }
     357              : 
     358              :     /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
     359              :     /// and that's why it can return cancelled timelines, to retry deleting them.
     360            0 :     fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
     361            0 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     362            0 :         global_lock
     363            0 :             .timelines
     364            0 :             .values()
     365            0 :             .filter(|t| t.ttid.tenant_id == tenant_id)
     366            0 :             .cloned()
     367            0 :             .collect()
     368            0 :     }
     369              : 
     370              :     /// Cancels timeline, then deletes the corresponding data directory.
     371              :     /// If only_local, doesn't remove WAL segments in remote storage.
     372            0 :     pub(crate) async fn delete(
     373            0 :         ttid: &TenantTimelineId,
     374            0 :         only_local: bool,
     375            0 :     ) -> Result<TimelineDeleteForceResult> {
     376            0 :         let tli_res = {
     377            0 :             let state = TIMELINES_STATE.lock().unwrap();
     378            0 : 
     379            0 :             if state.tombstones.contains_key(ttid) {
     380              :                 // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
     381            0 :                 info!("Timeline {ttid} was already deleted");
     382            0 :                 return Ok(TimelineDeleteForceResult {
     383            0 :                     dir_existed: false,
     384            0 :                     was_active: false,
     385            0 :                 });
     386            0 :             }
     387            0 : 
     388            0 :             state.get(ttid)
     389              :         };
     390              : 
     391            0 :         let result = match tli_res {
     392            0 :             Ok(timeline) => {
     393            0 :                 let was_active = timeline.broker_active.load(Ordering::Relaxed);
     394              : 
     395              :                 // Take a lock and finish the deletion holding this mutex.
     396            0 :                 let mut shared_state = timeline.write_shared_state().await;
     397              : 
     398            0 :                 info!("deleting timeline {}, only_local={}", ttid, only_local);
     399            0 :                 let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
     400              : 
     401            0 :                 Ok(TimelineDeleteForceResult {
     402            0 :                     dir_existed,
     403            0 :                     was_active, // TODO: we probably should remove this field
     404            0 :                 })
     405              :             }
     406              :             Err(_) => {
     407              :                 // Timeline is not memory, but it may still exist on disk in broken state.
     408            0 :                 let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid);
     409            0 :                 let dir_existed = delete_dir(dir_path)?;
     410              : 
     411            0 :                 Ok(TimelineDeleteForceResult {
     412            0 :                     dir_existed,
     413            0 :                     was_active: false,
     414            0 :                 })
     415              :             }
     416              :         };
     417              : 
     418              :         // Finalize deletion, by dropping Timeline objects and storing smaller tombstones.  The tombstones
     419              :         // are used to prevent still-running computes from re-creating the same timeline when they send data,
     420              :         // and to speed up repeated deletion calls by avoiding re-listing objects.
     421            0 :         TIMELINES_STATE.lock().unwrap().delete(*ttid);
     422            0 : 
     423            0 :         result
     424            0 :     }
     425              : 
     426              :     /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
     427              :     /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
     428              :     /// created simultaneously. In that case the function will return error and the caller should
     429              :     /// retry tenant deletion again later.
     430              :     ///
     431              :     /// If only_local, doesn't remove WAL segments in remote storage.
     432            0 :     pub async fn delete_force_all_for_tenant(
     433            0 :         tenant_id: &TenantId,
     434            0 :         only_local: bool,
     435            0 :     ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
     436            0 :         info!("deleting all timelines for tenant {}", tenant_id);
     437            0 :         let to_delete = Self::get_all_for_tenant(*tenant_id);
     438            0 : 
     439            0 :         let mut err = None;
     440            0 : 
     441            0 :         let mut deleted = HashMap::new();
     442            0 :         for tli in &to_delete {
     443            0 :             match Self::delete(&tli.ttid, only_local).await {
     444            0 :                 Ok(result) => {
     445            0 :                     deleted.insert(tli.ttid, result);
     446            0 :                 }
     447            0 :                 Err(e) => {
     448            0 :                     error!("failed to delete timeline {}: {}", tli.ttid, e);
     449              :                     // Save error to return later.
     450            0 :                     err = Some(e);
     451              :                 }
     452              :             }
     453              :         }
     454              : 
     455              :         // If there was an error, return it.
     456            0 :         if let Some(e) = err {
     457            0 :             return Err(e);
     458            0 :         }
     459            0 : 
     460            0 :         // There may be broken timelines on disk, so delete the whole tenant dir as well.
     461            0 :         // Note that we could concurrently create new timelines while we were deleting them,
     462            0 :         // so the directory may be not empty. In this case timelines will have bad state
     463            0 :         // and timeline background jobs can panic.
     464            0 :         delete_dir(get_tenant_dir(
     465            0 :             TIMELINES_STATE.lock().unwrap().get_conf(),
     466            0 :             tenant_id,
     467            0 :         ))?;
     468              : 
     469            0 :         Ok(deleted)
     470            0 :     }
     471              : 
     472            0 :     pub fn housekeeping(tombstone_ttl: &Duration) {
     473            0 :         let mut state = TIMELINES_STATE.lock().unwrap();
     474            0 : 
     475            0 :         // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
     476            0 :         // timelines.  If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
     477            0 :         // may recreate a deleted timeline.
     478            0 :         let now = Instant::now();
     479            0 :         state
     480            0 :             .tombstones
     481            0 :             .retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
     482            0 :     }
     483              : }
     484              : 
     485              : #[derive(Clone, Copy, Serialize)]
     486              : pub struct TimelineDeleteForceResult {
     487              :     pub dir_existed: bool,
     488              :     pub was_active: bool,
     489              : }
     490              : 
     491              : /// Deletes directory and it's contents. Returns false if directory does not exist.
     492            0 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
     493            0 :     match std::fs::remove_dir_all(path) {
     494            0 :         Ok(_) => Ok(true),
     495            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     496            0 :         Err(e) => Err(e.into()),
     497              :     }
     498            0 : }
        

Generated by: LCOV version 2.1-beta