LCOV - differential code coverage report
Current view: top level - safekeeper/src - timelines_global_map.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 91.4 % 268 245 23 245
Current Date: 2024-01-09 02:06:09 Functions: 80.0 % 40 32 8 32
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
       2                 : //! All timelines should always be present in this map, this is done by loading them
       3                 : //! all from the disk on startup and keeping them in memory.
       4                 : 
       5                 : use crate::safekeeper::ServerInfo;
       6                 : use crate::timeline::{Timeline, TimelineError};
       7                 : use crate::SafeKeeperConf;
       8                 : use anyhow::{bail, Context, Result};
       9                 : use camino::Utf8PathBuf;
      10                 : use once_cell::sync::Lazy;
      11                 : use serde::Serialize;
      12                 : use std::collections::HashMap;
      13                 : use std::str::FromStr;
      14                 : use std::sync::{Arc, Mutex};
      15                 : use tokio::sync::mpsc::Sender;
      16                 : use tracing::*;
      17                 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      18                 : use utils::lsn::Lsn;
      19                 : 
      20                 : struct GlobalTimelinesState {
      21                 :     timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
      22                 :     wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
      23                 :     conf: Option<SafeKeeperConf>,
      24                 :     load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
      25                 : }
      26                 : 
      27                 : // Used to prevent concurrent timeline loading.
      28                 : pub struct TimelineLoadLock;
      29                 : 
      30                 : impl GlobalTimelinesState {
      31                 :     /// Get configuration, which must be set once during init.
      32 CBC        1188 :     fn get_conf(&self) -> &SafeKeeperConf {
      33            1188 :         self.conf
      34            1188 :             .as_ref()
      35            1188 :             .expect("GlobalTimelinesState conf is not initialized")
      36            1188 :     }
      37                 : 
      38                 :     /// Get dependencies for a timeline constructor.
      39             498 :     fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
      40             498 :         (
      41             498 :             self.get_conf().clone(),
      42             498 :             self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
      43             498 :         )
      44             498 :     }
      45                 : 
      46                 :     /// Insert timeline into the map. Returns error if timeline with the same id already exists.
      47             449 :     fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
      48             449 :         let ttid = timeline.ttid;
      49             449 :         if self.timelines.contains_key(&ttid) {
      50 UBC           0 :             bail!(TimelineError::AlreadyExists(ttid));
      51 CBC         449 :         }
      52             449 :         self.timelines.insert(ttid, timeline);
      53             449 :         Ok(())
      54             449 :     }
      55                 : 
      56                 :     /// Get timeline from the map. Returns error if timeline doesn't exist.
      57           16447 :     fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
      58           16447 :         self.timelines
      59           16447 :             .get(ttid)
      60           16447 :             .cloned()
      61           16447 :             .ok_or(TimelineError::NotFound(*ttid))
      62           16447 :     }
      63                 : }
      64                 : 
      65             485 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
      66             485 :     Mutex::new(GlobalTimelinesState {
      67             485 :         timelines: HashMap::new(),
      68             485 :         wal_backup_launcher_tx: None,
      69             485 :         conf: None,
      70             485 :         load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
      71             485 :     })
      72             485 : });
      73                 : 
      74                 : /// A zero-sized struct used to manage access to the global timelines map.
      75                 : pub struct GlobalTimelines;
      76                 : 
      77                 : impl GlobalTimelines {
      78                 :     /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
      79             485 :     pub async fn init(
      80             485 :         conf: SafeKeeperConf,
      81             485 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
      82             485 :     ) -> Result<()> {
      83                 :         // clippy isn't smart enough to understand that drop(state) releases the
      84                 :         // lock, so use explicit block
      85             485 :         let tenants_dir = {
      86             485 :             let mut state = TIMELINES_STATE.lock().unwrap();
      87             485 :             assert!(state.wal_backup_launcher_tx.is_none());
      88             485 :             state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
      89             485 :             state.conf = Some(conf);
      90             485 : 
      91             485 :             // Iterate through all directories and load tenants for all directories
      92             485 :             // named as a valid tenant_id.
      93             485 :             state.get_conf().workdir.clone()
      94             485 :         };
      95             485 :         let mut tenant_count = 0;
      96            1537 :         for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
      97             485 :             .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
      98                 :         {
      99            1537 :             match &tenants_dir_entry {
     100            1537 :                 Ok(tenants_dir_entry) => {
     101              81 :                     if let Ok(tenant_id) =
     102            1537 :                         TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
     103                 :                     {
     104              81 :                         tenant_count += 1;
     105              81 :                         GlobalTimelines::load_tenant_timelines(tenant_id).await?;
     106            1456 :                     }
     107                 :                 }
     108 UBC           0 :                 Err(e) => error!(
     109               0 :                     "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
     110               0 :                     tenants_dir_entry, tenants_dir, e
     111               0 :                 ),
     112                 :             }
     113                 :         }
     114                 : 
     115 CBC         485 :         info!(
     116             485 :             "found {} tenants directories, successfully loaded {} timelines",
     117             485 :             tenant_count,
     118             485 :             TIMELINES_STATE.lock().unwrap().timelines.len()
     119             485 :         );
     120             485 :         Ok(())
     121             485 :     }
     122                 : 
     123                 :     /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
     124                 :     /// errors if any.
     125                 :     ///
     126                 :     /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
     127                 :     /// sync and there is no important reason to make it async (it is always
     128                 :     /// held for a short while) we just lock and unlock it for each timeline --
     129                 :     /// this function is called during init when nothing else is running, so
     130                 :     /// this is fine.
     131              81 :     async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
     132              81 :         let (conf, wal_backup_launcher_tx) = {
     133              81 :             let state = TIMELINES_STATE.lock().unwrap();
     134              81 :             (
     135              81 :                 state.get_conf().clone(),
     136              81 :                 state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
     137              81 :             )
     138              81 :         };
     139              81 : 
     140              81 :         let timelines_dir = conf.tenant_dir(&tenant_id);
     141              84 :         for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
     142              81 :             .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
     143                 :         {
     144              84 :             match &timelines_dir_entry {
     145              84 :                 Ok(timeline_dir_entry) => {
     146              84 :                     if let Ok(timeline_id) =
     147              84 :                         TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
     148                 :                     {
     149              84 :                         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     150              84 :                         match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
     151              84 :                             Ok(timeline) => {
     152              84 :                                 let tli = Arc::new(timeline);
     153              84 :                                 TIMELINES_STATE
     154              84 :                                     .lock()
     155              84 :                                     .unwrap()
     156              84 :                                     .timelines
     157              84 :                                     .insert(ttid, tli.clone());
     158              84 :                                 tli.bootstrap(&conf);
     159              84 :                                 tli.update_status_notify().await.unwrap();
     160                 :                             }
     161                 :                             // If we can't load a timeline, it's most likely because of a corrupted
     162                 :                             // directory. We will log an error and won't allow to delete/recreate
     163                 :                             // this timeline. The only way to fix this timeline is to repair manually
     164                 :                             // and restart the safekeeper.
     165 UBC           0 :                             Err(e) => error!(
     166               0 :                                 "failed to load timeline {} for tenant {}, reason: {:?}",
     167               0 :                                 timeline_id, tenant_id, e
     168               0 :                             ),
     169                 :                         }
     170               0 :                     }
     171                 :                 }
     172               0 :                 Err(e) => error!(
     173               0 :                     "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
     174               0 :                     timelines_dir_entry, timelines_dir, e
     175               0 :                 ),
     176                 :             }
     177                 :         }
     178                 : 
     179 CBC          81 :         Ok(())
     180              81 :     }
     181                 : 
     182                 :     /// Take a lock for timeline loading.
     183              49 :     pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
     184              49 :         TIMELINES_STATE.lock().unwrap().load_lock.clone()
     185              49 :     }
     186                 : 
     187                 :     /// Load timeline from disk to the memory.
     188              49 :     pub async fn load_timeline<'a>(
     189              49 :         _guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
     190              49 :         ttid: TenantTimelineId,
     191              49 :     ) -> Result<Arc<Timeline>> {
     192              49 :         let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
     193              49 : 
     194              49 :         match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
     195              49 :             Ok(timeline) => {
     196              49 :                 let tli = Arc::new(timeline);
     197              49 : 
     198              49 :                 // TODO: prevent concurrent timeline creation/loading
     199              49 :                 TIMELINES_STATE
     200              49 :                     .lock()
     201              49 :                     .unwrap()
     202              49 :                     .timelines
     203              49 :                     .insert(ttid, tli.clone());
     204              49 : 
     205              49 :                 tli.bootstrap(&conf);
     206              49 : 
     207              49 :                 Ok(tli)
     208                 :             }
     209                 :             // If we can't load a timeline, it's bad. Caller will figure it out.
     210 UBC           0 :             Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
     211                 :         }
     212 CBC          49 :     }
     213                 : 
     214                 :     /// Get the number of timelines in the map.
     215               5 :     pub fn timelines_count() -> usize {
     216               5 :         TIMELINES_STATE.lock().unwrap().timelines.len()
     217               5 :     }
     218                 : 
     219                 :     /// Get the global safekeeper config.
     220             118 :     pub fn get_global_config() -> SafeKeeperConf {
     221             118 :         TIMELINES_STATE.lock().unwrap().get_conf().clone()
     222             118 :     }
     223                 : 
     224                 :     /// Create a new timeline with the given id. If the timeline already exists, returns
     225                 :     /// an existing timeline.
     226            1751 :     pub async fn create(
     227            1751 :         ttid: TenantTimelineId,
     228            1751 :         server_info: ServerInfo,
     229            1751 :         commit_lsn: Lsn,
     230            1751 :         local_start_lsn: Lsn,
     231            1751 :     ) -> Result<Arc<Timeline>> {
     232             449 :         let (conf, wal_backup_launcher_tx) = {
     233            1751 :             let state = TIMELINES_STATE.lock().unwrap();
     234            1751 :             if let Ok(timeline) = state.get(&ttid) {
     235                 :                 // Timeline already exists, return it.
     236            1302 :                 return Ok(timeline);
     237             449 :             }
     238             449 :             state.get_dependencies()
     239                 :         };
     240                 : 
     241             449 :         info!("creating new timeline {}", ttid);
     242                 : 
     243             449 :         let timeline = Arc::new(Timeline::create_empty(
     244             449 :             &conf,
     245             449 :             ttid,
     246             449 :             wal_backup_launcher_tx,
     247             449 :             server_info,
     248             449 :             commit_lsn,
     249             449 :             local_start_lsn,
     250             449 :         )?);
     251                 : 
     252                 :         // Take a lock and finish the initialization holding this mutex. No other threads
     253                 :         // can interfere with creation after we will insert timeline into the map.
     254                 :         {
     255             449 :             let mut shared_state = timeline.write_shared_state().await;
     256                 : 
     257                 :             // We can get a race condition here in case of concurrent create calls, but only
     258                 :             // in theory. create() will return valid timeline on the next try.
     259             449 :             TIMELINES_STATE
     260             449 :                 .lock()
     261             449 :                 .unwrap()
     262             449 :                 .try_insert(timeline.clone())?;
     263                 : 
     264                 :             // Write the new timeline to the disk and start background workers.
     265                 :             // Bootstrap is transactional, so if it fails, the timeline will be deleted,
     266                 :             // and the state on disk should remain unchanged.
     267            2496 :             if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
     268                 :                 // Note: the most likely reason for init failure is that the timeline
     269                 :                 // directory already exists on disk. This happens when timeline is corrupted
     270                 :                 // and wasn't loaded from disk on startup because of that. We want to preserve
     271                 :                 // the timeline directory in this case, for further inspection.
     272                 : 
     273                 :                 // TODO: this is an unusual error, perhaps we should send it to sentry
     274                 :                 // TODO: compute will try to create timeline every second, we should add backoff
     275 UBC           0 :                 error!("failed to init new timeline {}: {}", ttid, e);
     276                 : 
     277                 :                 // Timeline failed to init, it cannot be used. Remove it from the map.
     278               0 :                 TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
     279               0 :                 return Err(e);
     280 CBC         449 :             }
     281             449 :             // We are done with bootstrap, release the lock, return the timeline.
     282             449 :             // {} block forces release before .await
     283             449 :         }
     284             449 :         timeline.update_status_notify().await?;
     285             449 :         timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
     286             449 :         Ok(timeline)
     287            1751 :     }
     288                 : 
     289                 :     /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
     290                 :     /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
     291                 :     /// i.e. loaded in memory and not cancelled.
     292           14669 :     pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
     293           14669 :         let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
     294           14669 : 
     295           14669 :         match res {
     296           14121 :             Ok(tli) => {
     297           14121 :                 if tli.is_cancelled() {
     298               1 :                     return Err(TimelineError::Cancelled(ttid));
     299           14120 :                 }
     300           14120 :                 Ok(tli)
     301                 :             }
     302             548 :             _ => res,
     303                 :         }
     304           14669 :     }
     305                 : 
     306                 :     /// Returns all timelines. This is used for background timeline processes.
     307           11343 :     pub fn get_all() -> Vec<Arc<Timeline>> {
     308           11343 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     309           11343 :         global_lock
     310           11343 :             .timelines
     311           11343 :             .values()
     312           11343 :             .filter(|t| !t.is_cancelled())
     313           11343 :             .cloned()
     314           11343 :             .collect()
     315           11343 :     }
     316                 : 
     317                 :     /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
     318                 :     /// and that's why it can return cancelled timelines, to retry deleting them.
     319               4 :     fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
     320               4 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     321               4 :         global_lock
     322               4 :             .timelines
     323               4 :             .values()
     324              20 :             .filter(|t| t.ttid.tenant_id == tenant_id)
     325               4 :             .cloned()
     326               4 :             .collect()
     327               4 :     }
     328                 : 
     329                 :     /// Cancels timeline, then deletes the corresponding data directory.
     330              27 :     pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
     331              27 :         let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
     332              27 :         match tli_res {
     333              25 :             Ok(timeline) => {
     334                 :                 // Take a lock and finish the deletion holding this mutex.
     335              25 :                 let mut shared_state = timeline.write_shared_state().await;
     336                 : 
     337              25 :                 info!("deleting timeline {}", ttid);
     338              25 :                 let (dir_existed, was_active) =
     339              25 :                     timeline.delete_from_disk(&mut shared_state).await?;
     340                 : 
     341                 :                 // Remove timeline from the map.
     342                 :                 // FIXME: re-enable it once we fix the issue with recreation of deleted timelines
     343                 :                 // https://github.com/neondatabase/neon/issues/3146
     344                 :                 // TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
     345                 : 
     346              25 :                 Ok(TimelineDeleteForceResult {
     347              25 :                     dir_existed,
     348              25 :                     was_active,
     349              25 :                 })
     350                 :             }
     351                 :             Err(_) => {
     352                 :                 // Timeline is not memory, but it may still exist on disk in broken state.
     353               2 :                 let dir_path = TIMELINES_STATE
     354               2 :                     .lock()
     355               2 :                     .unwrap()
     356               2 :                     .get_conf()
     357               2 :                     .timeline_dir(ttid);
     358               2 :                 let dir_existed = delete_dir(dir_path)?;
     359                 : 
     360               2 :                 Ok(TimelineDeleteForceResult {
     361               2 :                     dir_existed,
     362               2 :                     was_active: false,
     363               2 :                 })
     364                 :             }
     365                 :         }
     366              27 :     }
     367                 : 
     368                 :     /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
     369                 :     /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
     370                 :     /// created simultaneously. In that case the function will return error and the caller should
     371                 :     /// retry tenant deletion again later.
     372               4 :     pub async fn delete_force_all_for_tenant(
     373               4 :         tenant_id: &TenantId,
     374               4 :     ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
     375               4 :         info!("deleting all timelines for tenant {}", tenant_id);
     376               4 :         let to_delete = Self::get_all_for_tenant(*tenant_id);
     377               4 : 
     378               4 :         let mut err = None;
     379               4 : 
     380               4 :         let mut deleted = HashMap::new();
     381              20 :         for tli in &to_delete {
     382              16 :             match Self::delete_force(&tli.ttid).await {
     383              16 :                 Ok(result) => {
     384              16 :                     deleted.insert(tli.ttid, result);
     385              16 :                 }
     386 UBC           0 :                 Err(e) => {
     387               0 :                     error!("failed to delete timeline {}: {}", tli.ttid, e);
     388                 :                     // Save error to return later.
     389               0 :                     err = Some(e);
     390                 :                 }
     391                 :             }
     392                 :         }
     393                 : 
     394                 :         // If there was an error, return it.
     395 CBC           4 :         if let Some(e) = err {
     396 UBC           0 :             return Err(e);
     397 CBC           4 :         }
     398               4 : 
     399               4 :         // There may be broken timelines on disk, so delete the whole tenant dir as well.
     400               4 :         // Note that we could concurrently create new timelines while we were deleting them,
     401               4 :         // so the directory may be not empty. In this case timelines will have bad state
     402               4 :         // and timeline background jobs can panic.
     403               4 :         delete_dir(
     404               4 :             TIMELINES_STATE
     405               4 :                 .lock()
     406               4 :                 .unwrap()
     407               4 :                 .get_conf()
     408               4 :                 .tenant_dir(tenant_id),
     409               4 :         )?;
     410                 : 
     411                 :         // FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
     412                 :         // let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
     413                 :         // if !tlis_after_delete.is_empty() {
     414                 :         //     // Some timelines were created while we were deleting them, returning error
     415                 :         //     // to the caller, so it can retry later.
     416                 :         //     bail!(
     417                 :         //         "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
     418                 :         //         tenant_id
     419                 :         //     );
     420                 :         // }
     421                 : 
     422               4 :         Ok(deleted)
     423               4 :     }
     424                 : }
     425                 : 
     426              27 : #[derive(Clone, Copy, Serialize)]
     427                 : pub struct TimelineDeleteForceResult {
     428                 :     pub dir_existed: bool,
     429                 :     pub was_active: bool,
     430                 : }
     431                 : 
     432                 : /// Deletes directory and it's contents. Returns false if directory does not exist.
     433               6 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
     434               6 :     match std::fs::remove_dir_all(path) {
     435               2 :         Ok(_) => Ok(true),
     436               4 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     437 UBC           0 :         Err(e) => Err(e.into()),
     438                 :     }
     439 CBC           6 : }
        

Generated by: LCOV version 2.1-beta