LCOV - code coverage report
Current view: top level - safekeeper/src - timelines_global_map.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 89.4 % 264 236
Test Date: 2023-09-06 10:18:01 Functions: 76.3 % 38 29

            Line data    Source code
       1              : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
       2              : //! All timelines should always be present in this map, this is done by loading them
       3              : //! all from the disk on startup and keeping them in memory.
       4              : 
       5              : use crate::safekeeper::ServerInfo;
       6              : use crate::timeline::{Timeline, TimelineError};
       7              : use crate::SafeKeeperConf;
       8              : use anyhow::{bail, Context, Result};
       9              : use once_cell::sync::Lazy;
      10              : use serde::Serialize;
      11              : use std::collections::HashMap;
      12              : use std::path::PathBuf;
      13              : use std::str::FromStr;
      14              : use std::sync::{Arc, Mutex};
      15              : use tokio::sync::mpsc::Sender;
      16              : use tracing::*;
      17              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      18              : use utils::lsn::Lsn;
      19              : 
      20              : struct GlobalTimelinesState {
      21              :     timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
      22              :     wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
      23              :     conf: Option<SafeKeeperConf>,
      24              : }
      25              : 
      26              : impl GlobalTimelinesState {
      27              :     /// Get configuration, which must be set once during init.
      28         1136 :     fn get_conf(&self) -> &SafeKeeperConf {
      29         1136 :         self.conf
      30         1136 :             .as_ref()
      31         1136 :             .expect("GlobalTimelinesState conf is not initialized")
      32         1136 :     }
      33              : 
      34              :     /// Get dependencies for a timeline constructor.
      35          524 :     fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
      36          524 :         (
      37          524 :             self.get_conf().clone(),
      38          524 :             self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
      39          524 :         )
      40          524 :     }
      41              : 
      42              :     /// Insert timeline into the map. Returns error if timeline with the same id already exists.
      43          523 :     fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
      44          523 :         let ttid = timeline.ttid;
      45          523 :         if self.timelines.contains_key(&ttid) {
      46            0 :             bail!(TimelineError::AlreadyExists(ttid));
      47          523 :         }
      48          523 :         self.timelines.insert(ttid, timeline);
      49          523 :         Ok(())
      50          523 :     }
      51              : 
      52              :     /// Get timeline from the map. Returns error if timeline doesn't exist.
      53        14020 :     fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
      54        14020 :         self.timelines
      55        14020 :             .get(ttid)
      56        14020 :             .cloned()
      57        14020 :             .ok_or(TimelineError::NotFound(*ttid))
      58        14020 :     }
      59              : }
      60              : 
      61          517 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
      62          517 :     Mutex::new(GlobalTimelinesState {
      63          517 :         timelines: HashMap::new(),
      64          517 :         wal_backup_launcher_tx: None,
      65          517 :         conf: None,
      66          517 :     })
      67          517 : });
      68              : 
      69              : /// A zero-sized struct used to manage access to the global timelines map.
      70              : pub struct GlobalTimelines;
      71              : 
      72              : impl GlobalTimelines {
      73              :     /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
      74          517 :     pub async fn init(
      75          517 :         conf: SafeKeeperConf,
      76          517 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
      77          517 :     ) -> Result<()> {
      78              :         // clippy isn't smart enough to understand that drop(state) releases the
      79              :         // lock, so use explicit block
      80          517 :         let tenants_dir = {
      81          517 :             let mut state = TIMELINES_STATE.lock().unwrap();
      82          517 :             assert!(state.wal_backup_launcher_tx.is_none());
      83          517 :             state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
      84          517 :             state.conf = Some(conf);
      85          517 : 
      86          517 :             // Iterate through all directories and load tenants for all directories
      87          517 :             // named as a valid tenant_id.
      88          517 :             state.get_conf().workdir.clone()
      89          517 :         };
      90          517 :         let mut tenant_count = 0;
      91         1637 :         for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
      92          517 :             .with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
      93              :         {
      94         1637 :             match &tenants_dir_entry {
      95         1637 :                 Ok(tenants_dir_entry) => {
      96           83 :                     if let Ok(tenant_id) =
      97         1637 :                         TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
      98              :                     {
      99           83 :                         tenant_count += 1;
     100           83 :                         GlobalTimelines::load_tenant_timelines(tenant_id).await?;
     101         1554 :                     }
     102              :                 }
     103            0 :                 Err(e) => error!(
     104            0 :                     "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
     105            0 :                     tenants_dir_entry,
     106            0 :                     tenants_dir.display(),
     107            0 :                     e
     108            0 :                 ),
     109              :             }
     110              :         }
     111              : 
     112          517 :         info!(
     113          517 :             "found {} tenants directories, successfully loaded {} timelines",
     114          517 :             tenant_count,
     115          517 :             TIMELINES_STATE.lock().unwrap().timelines.len()
     116          517 :         );
     117          517 :         Ok(())
     118          517 :     }
     119              : 
     120              :     /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
     121              :     /// errors if any.
     122              :     ///
     123              :     /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
     124              :     /// sync and there is no important reason to make it async (it is always
     125              :     /// held for a short while) we just lock and unlock it for each timeline --
     126              :     /// this function is called during init when nothing else is running, so
     127              :     /// this is fine.
     128           83 :     async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
     129           83 :         let (conf, wal_backup_launcher_tx) = {
     130           83 :             let state = TIMELINES_STATE.lock().unwrap();
     131           83 :             (
     132           83 :                 state.get_conf().clone(),
     133           83 :                 state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
     134           83 :             )
     135           83 :         };
     136           83 : 
     137           83 :         let timelines_dir = conf.tenant_dir(&tenant_id);
     138           83 :         for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
     139           83 :             .with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
     140              :         {
     141           80 :             match &timelines_dir_entry {
     142           80 :                 Ok(timeline_dir_entry) => {
     143           80 :                     if let Ok(timeline_id) =
     144           80 :                         TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
     145              :                     {
     146           80 :                         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     147           80 :                         match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
     148           80 :                             Ok(timeline) => {
     149           80 :                                 let tli = Arc::new(timeline);
     150           80 :                                 TIMELINES_STATE
     151           80 :                                     .lock()
     152           80 :                                     .unwrap()
     153           80 :                                     .timelines
     154           80 :                                     .insert(ttid, tli.clone());
     155           80 :                                 tli.bootstrap(&conf);
     156           80 :                                 tli.update_status_notify().await.unwrap();
     157              :                             }
     158              :                             // If we can't load a timeline, it's most likely because of a corrupted
     159              :                             // directory. We will log an error and won't allow to delete/recreate
     160              :                             // this timeline. The only way to fix this timeline is to repair manually
     161              :                             // and restart the safekeeper.
     162            0 :                             Err(e) => error!(
     163            0 :                                 "failed to load timeline {} for tenant {}, reason: {:?}",
     164            0 :                                 timeline_id, tenant_id, e
     165            0 :                             ),
     166              :                         }
     167            0 :                     }
     168              :                 }
     169            0 :                 Err(e) => error!(
     170            0 :                     "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
     171            0 :                     timelines_dir_entry,
     172            0 :                     timelines_dir.display(),
     173            0 :                     e
     174            0 :                 ),
     175              :             }
     176              :         }
     177              : 
     178           83 :         Ok(())
     179           83 :     }
     180              : 
     181              :     /// Load timeline from disk to the memory.
     182            1 :     pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
     183            1 :         let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
     184            1 : 
     185            1 :         match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
     186            1 :             Ok(timeline) => {
     187            1 :                 let tli = Arc::new(timeline);
     188            1 : 
     189            1 :                 // TODO: prevent concurrent timeline creation/loading
     190            1 :                 TIMELINES_STATE
     191            1 :                     .lock()
     192            1 :                     .unwrap()
     193            1 :                     .timelines
     194            1 :                     .insert(ttid, tli.clone());
     195            1 : 
     196            1 :                 tli.bootstrap(&conf);
     197            1 : 
     198            1 :                 Ok(tli)
     199              :             }
     200              :             // If we can't load a timeline, it's bad. Caller will figure it out.
     201            0 :             Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
     202              :         }
     203            1 :     }
     204              : 
     205              :     /// Get the number of timelines in the map.
     206            5 :     pub fn timelines_count() -> usize {
     207            5 :         TIMELINES_STATE.lock().unwrap().timelines.len()
     208            5 :     }
     209              : 
     210              :     /// Get the global safekeeper config.
     211            6 :     pub fn get_global_config() -> SafeKeeperConf {
     212            6 :         TIMELINES_STATE.lock().unwrap().get_conf().clone()
     213            6 :     }
     214              : 
     215              :     /// Create a new timeline with the given id. If the timeline already exists, returns
     216              :     /// an existing timeline.
     217         2117 :     pub async fn create(
     218         2117 :         ttid: TenantTimelineId,
     219         2117 :         server_info: ServerInfo,
     220         2117 :         commit_lsn: Lsn,
     221         2117 :         local_start_lsn: Lsn,
     222         2117 :     ) -> Result<Arc<Timeline>> {
     223          523 :         let (conf, wal_backup_launcher_tx) = {
     224         2117 :             let state = TIMELINES_STATE.lock().unwrap();
     225         2117 :             if let Ok(timeline) = state.get(&ttid) {
     226              :                 // Timeline already exists, return it.
     227         1594 :                 return Ok(timeline);
     228          523 :             }
     229          523 :             state.get_dependencies()
     230              :         };
     231              : 
     232          523 :         info!("creating new timeline {}", ttid);
     233              : 
     234          523 :         let timeline = Arc::new(Timeline::create_empty(
     235          523 :             &conf,
     236          523 :             ttid,
     237          523 :             wal_backup_launcher_tx,
     238          523 :             server_info,
     239          523 :             commit_lsn,
     240          523 :             local_start_lsn,
     241          523 :         )?);
     242              : 
     243              :         // Take a lock and finish the initialization holding this mutex. No other threads
     244              :         // can interfere with creation after we will insert timeline into the map.
     245              :         {
     246          523 :             let mut shared_state = timeline.write_shared_state().await;
     247              : 
     248              :             // We can get a race condition here in case of concurrent create calls, but only
     249              :             // in theory. create() will return valid timeline on the next try.
     250          523 :             TIMELINES_STATE
     251          523 :                 .lock()
     252          523 :                 .unwrap()
     253          523 :                 .try_insert(timeline.clone())?;
     254              : 
     255              :             // Write the new timeline to the disk and start background workers.
     256              :             // Bootstrap is transactional, so if it fails, the timeline will be deleted,
     257              :             // and the state on disk should remain unchanged.
     258         2818 :             if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
     259              :                 // Note: the most likely reason for init failure is that the timeline
     260              :                 // directory already exists on disk. This happens when timeline is corrupted
     261              :                 // and wasn't loaded from disk on startup because of that. We want to preserve
     262              :                 // the timeline directory in this case, for further inspection.
     263              : 
     264              :                 // TODO: this is an unusual error, perhaps we should send it to sentry
     265              :                 // TODO: compute will try to create timeline every second, we should add backoff
     266            0 :                 error!("failed to init new timeline {}: {}", ttid, e);
     267              : 
     268              :                 // Timeline failed to init, it cannot be used. Remove it from the map.
     269            0 :                 TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
     270            0 :                 return Err(e);
     271          523 :             }
     272          523 :             // We are done with bootstrap, release the lock, return the timeline.
     273          523 :             // {} block forces release before .await
     274          523 :         }
     275          523 :         timeline.update_status_notify().await?;
     276          523 :         timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
     277          523 :         Ok(timeline)
     278         2117 :     }
     279              : 
     280              :     /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
     281              :     /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
     282              :     /// i.e. loaded in memory and not cancelled.
     283        11870 :     pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
     284        11870 :         let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
     285        11870 : 
     286        11870 :         match res {
     287        11357 :             Ok(tli) => {
     288        11357 :                 if tli.is_cancelled() {
     289            0 :                     return Err(TimelineError::Cancelled(ttid));
     290        11357 :                 }
     291        11357 :                 Ok(tli)
     292              :             }
     293          513 :             _ => res,
     294              :         }
     295        11870 :     }
     296              : 
     297              :     /// Returns all timelines. This is used for background timeline processes.
     298         8117 :     pub fn get_all() -> Vec<Arc<Timeline>> {
     299         8117 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     300         8117 :         global_lock
     301         8117 :             .timelines
     302         8117 :             .values()
     303         8117 :             .cloned()
     304         8117 :             .filter(|t| !t.is_cancelled())
     305         8117 :             .collect()
     306         8117 :     }
     307              : 
     308              :     /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
     309              :     /// and that's why it can return cancelled timelines, to retry deleting them.
     310            4 :     fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
     311            4 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     312            4 :         global_lock
     313            4 :             .timelines
     314            4 :             .values()
     315           20 :             .filter(|t| t.ttid.tenant_id == tenant_id)
     316            4 :             .cloned()
     317            4 :             .collect()
     318            4 :     }
     319              : 
     320              :     /// Cancels timeline, then deletes the corresponding data directory.
     321           33 :     pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
     322           33 :         let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
     323           33 :         match tli_res {
     324           31 :             Ok(timeline) => {
     325              :                 // Take a lock and finish the deletion holding this mutex.
     326           31 :                 let mut shared_state = timeline.write_shared_state().await;
     327              : 
     328           31 :                 info!("deleting timeline {}", ttid);
     329           31 :                 let (dir_existed, was_active) =
     330           31 :                     timeline.delete_from_disk(&mut shared_state).await?;
     331              : 
     332              :                 // Remove timeline from the map.
     333              :                 // FIXME: re-enable it once we fix the issue with recreation of deleted timelines
     334              :                 // https://github.com/neondatabase/neon/issues/3146
     335              :                 // TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
     336              : 
     337           31 :                 Ok(TimelineDeleteForceResult {
     338           31 :                     dir_existed,
     339           31 :                     was_active,
     340           31 :                 })
     341              :             }
     342              :             Err(_) => {
     343              :                 // Timeline is not memory, but it may still exist on disk in broken state.
     344            2 :                 let dir_path = TIMELINES_STATE
     345            2 :                     .lock()
     346            2 :                     .unwrap()
     347            2 :                     .get_conf()
     348            2 :                     .timeline_dir(ttid);
     349            2 :                 let dir_existed = delete_dir(dir_path)?;
     350              : 
     351            2 :                 Ok(TimelineDeleteForceResult {
     352            2 :                     dir_existed,
     353            2 :                     was_active: false,
     354            2 :                 })
     355              :             }
     356              :         }
     357           33 :     }
     358              : 
     359              :     /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
     360              :     /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
     361              :     /// created simultaneously. In that case the function will return error and the caller should
     362              :     /// retry tenant deletion again later.
     363            4 :     pub async fn delete_force_all_for_tenant(
     364            4 :         tenant_id: &TenantId,
     365            4 :     ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
     366            4 :         info!("deleting all timelines for tenant {}", tenant_id);
     367            4 :         let to_delete = Self::get_all_for_tenant(*tenant_id);
     368            4 : 
     369            4 :         let mut err = None;
     370            4 : 
     371            4 :         let mut deleted = HashMap::new();
     372           20 :         for tli in &to_delete {
     373           16 :             match Self::delete_force(&tli.ttid).await {
     374           16 :                 Ok(result) => {
     375           16 :                     deleted.insert(tli.ttid, result);
     376           16 :                 }
     377            0 :                 Err(e) => {
     378            0 :                     error!("failed to delete timeline {}: {}", tli.ttid, e);
     379              :                     // Save error to return later.
     380            0 :                     err = Some(e);
     381              :                 }
     382              :             }
     383              :         }
     384              : 
     385              :         // If there was an error, return it.
     386            4 :         if let Some(e) = err {
     387            0 :             return Err(e);
     388            4 :         }
     389            4 : 
     390            4 :         // There may be broken timelines on disk, so delete the whole tenant dir as well.
     391            4 :         // Note that we could concurrently create new timelines while we were deleting them,
     392            4 :         // so the directory may be not empty. In this case timelines will have bad state
     393            4 :         // and timeline background jobs can panic.
     394            4 :         delete_dir(
     395            4 :             TIMELINES_STATE
     396            4 :                 .lock()
     397            4 :                 .unwrap()
     398            4 :                 .get_conf()
     399            4 :                 .tenant_dir(tenant_id),
     400            4 :         )?;
     401              : 
     402              :         // FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
     403              :         // let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
     404              :         // if !tlis_after_delete.is_empty() {
     405              :         //     // Some timelines were created while we were deleting them, returning error
     406              :         //     // to the caller, so it can retry later.
     407              :         //     bail!(
     408              :         //         "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
     409              :         //         tenant_id
     410              :         //     );
     411              :         // }
     412              : 
     413            4 :         Ok(deleted)
     414            4 :     }
     415              : }
     416              : 
     417           33 : #[derive(Clone, Copy, Serialize)]
     418              : pub struct TimelineDeleteForceResult {
     419              :     pub dir_existed: bool,
     420              :     pub was_active: bool,
     421              : }
     422              : 
     423              : /// Deletes directory and it's contents. Returns false if directory does not exist.
     424              : fn delete_dir(path: PathBuf) -> Result<bool> {
     425            6 :     match std::fs::remove_dir_all(path) {
     426            2 :         Ok(_) => Ok(true),
     427            4 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     428            0 :         Err(e) => Err(e.into()),
     429              :     }
     430            6 : }
        

Generated by: LCOV version 2.1-beta