LCOV - differential code coverage report
Current view: top level - safekeeper/src - timelines_global_map.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 90.8 % 260 236 24 236
Current Date: 2023-10-19 02:04:12 Functions: 76.3 % 38 29 9 29
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
       2                 : //! All timelines should always be present in this map, this is done by loading them
       3                 : //! all from the disk on startup and keeping them in memory.
       4                 : 
       5                 : use crate::safekeeper::ServerInfo;
       6                 : use crate::timeline::{Timeline, TimelineError};
       7                 : use crate::SafeKeeperConf;
       8                 : use anyhow::{bail, Context, Result};
       9                 : use camino::Utf8PathBuf;
      10                 : use once_cell::sync::Lazy;
      11                 : use serde::Serialize;
      12                 : use std::collections::HashMap;
      13                 : use std::str::FromStr;
      14                 : use std::sync::{Arc, Mutex};
      15                 : use tokio::sync::mpsc::Sender;
      16                 : use tracing::*;
      17                 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      18                 : use utils::lsn::Lsn;
      19                 : 
      20                 : struct GlobalTimelinesState {
      21                 :     timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
      22                 :     wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
      23                 :     conf: Option<SafeKeeperConf>,
      24                 : }
      25                 : 
      26                 : impl GlobalTimelinesState {
      27                 :     /// Get configuration, which must be set once during init.
      28 CBC        1080 :     fn get_conf(&self) -> &SafeKeeperConf {
      29            1080 :         self.conf
      30            1080 :             .as_ref()
      31            1080 :             .expect("GlobalTimelinesState conf is not initialized")
      32            1080 :     }
      33                 : 
      34                 :     /// Get dependencies for a timeline constructor.
      35             482 :     fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
      36             482 :         (
      37             482 :             self.get_conf().clone(),
      38             482 :             self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
      39             482 :         )
      40             482 :     }
      41                 : 
      42                 :     /// Insert timeline into the map. Returns error if timeline with the same id already exists.
      43             481 :     fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
      44             481 :         let ttid = timeline.ttid;
      45             481 :         if self.timelines.contains_key(&ttid) {
      46 UBC           0 :             bail!(TimelineError::AlreadyExists(ttid));
      47 CBC         481 :         }
      48             481 :         self.timelines.insert(ttid, timeline);
      49             481 :         Ok(())
      50             481 :     }
      51                 : 
      52                 :     /// Get timeline from the map. Returns error if timeline doesn't exist.
      53           14460 :     fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
      54           14460 :         self.timelines
      55           14460 :             .get(ttid)
      56           14460 :             .cloned()
      57           14460 :             .ok_or(TimelineError::NotFound(*ttid))
      58           14460 :     }
      59                 : }
      60                 : 
      61             500 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
      62             500 :     Mutex::new(GlobalTimelinesState {
      63             500 :         timelines: HashMap::new(),
      64             500 :         wal_backup_launcher_tx: None,
      65             500 :         conf: None,
      66             500 :     })
      67             500 : });
      68                 : 
      69                 : /// A zero-sized struct used to manage access to the global timelines map.
      70                 : pub struct GlobalTimelines;
      71                 : 
      72                 : impl GlobalTimelines {
      73                 :     /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
      74             500 :     pub async fn init(
      75             500 :         conf: SafeKeeperConf,
      76             500 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
      77             500 :     ) -> Result<()> {
      78                 :         // clippy isn't smart enough to understand that drop(state) releases the
      79                 :         // lock, so use explicit block
      80             500 :         let tenants_dir = {
      81             500 :             let mut state = TIMELINES_STATE.lock().unwrap();
      82             500 :             assert!(state.wal_backup_launcher_tx.is_none());
      83             500 :             state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
      84             500 :             state.conf = Some(conf);
      85             500 : 
      86             500 :             // Iterate through all directories and load tenants for all directories
      87             500 :             // named as a valid tenant_id.
      88             500 :             state.get_conf().workdir.clone()
      89             500 :         };
      90             500 :         let mut tenant_count = 0;
      91            1589 :         for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
      92             500 :             .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
      93                 :         {
      94            1589 :             match &tenants_dir_entry {
      95            1589 :                 Ok(tenants_dir_entry) => {
      96              86 :                     if let Ok(tenant_id) =
      97            1589 :                         TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
      98                 :                     {
      99              86 :                         tenant_count += 1;
     100              86 :                         GlobalTimelines::load_tenant_timelines(tenant_id).await?;
     101            1503 :                     }
     102                 :                 }
     103 UBC           0 :                 Err(e) => error!(
     104               0 :                     "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
     105               0 :                     tenants_dir_entry, tenants_dir, e
     106               0 :                 ),
     107                 :             }
     108                 :         }
     109                 : 
     110 CBC         500 :         info!(
     111             500 :             "found {} tenants directories, successfully loaded {} timelines",
     112             500 :             tenant_count,
     113             500 :             TIMELINES_STATE.lock().unwrap().timelines.len()
     114             500 :         );
     115             500 :         Ok(())
     116             500 :     }
     117                 : 
     118                 :     /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
     119                 :     /// errors if any.
     120                 :     ///
     121                 :     /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
     122                 :     /// sync and there is no important reason to make it async (it is always
     123                 :     /// held for a short while) we just lock and unlock it for each timeline --
     124                 :     /// this function is called during init when nothing else is running, so
     125                 :     /// this is fine.
     126              86 :     async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
     127              86 :         let (conf, wal_backup_launcher_tx) = {
     128              86 :             let state = TIMELINES_STATE.lock().unwrap();
     129              86 :             (
     130              86 :                 state.get_conf().clone(),
     131              86 :                 state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
     132              86 :             )
     133              86 :         };
     134              86 : 
     135              86 :         let timelines_dir = conf.tenant_dir(&tenant_id);
     136              86 :         for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
     137              86 :             .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
     138                 :         {
     139              83 :             match &timelines_dir_entry {
     140              83 :                 Ok(timeline_dir_entry) => {
     141              83 :                     if let Ok(timeline_id) =
     142              83 :                         TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
     143                 :                     {
     144              83 :                         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     145              83 :                         match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
     146              83 :                             Ok(timeline) => {
     147              83 :                                 let tli = Arc::new(timeline);
     148              83 :                                 TIMELINES_STATE
     149              83 :                                     .lock()
     150              83 :                                     .unwrap()
     151              83 :                                     .timelines
     152              83 :                                     .insert(ttid, tli.clone());
     153              83 :                                 tli.bootstrap(&conf);
     154              83 :                                 tli.update_status_notify().await.unwrap();
     155                 :                             }
     156                 :                             // If we can't load a timeline, it's most likely because of a corrupted
     157                 :                             // directory. We will log an error and won't allow to delete/recreate
     158                 :                             // this timeline. The only way to fix this timeline is to repair manually
     159                 :                             // and restart the safekeeper.
     160 UBC           0 :                             Err(e) => error!(
     161               0 :                                 "failed to load timeline {} for tenant {}, reason: {:?}",
     162               0 :                                 timeline_id, tenant_id, e
     163               0 :                             ),
     164                 :                         }
     165               0 :                     }
     166                 :                 }
     167               0 :                 Err(e) => error!(
     168               0 :                     "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
     169               0 :                     timelines_dir_entry, timelines_dir, e
     170               0 :                 ),
     171                 :             }
     172                 :         }
     173                 : 
     174 CBC          86 :         Ok(())
     175              86 :     }
     176                 : 
     177                 :     /// Load timeline from disk to the memory.
     178               1 :     pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
     179               1 :         let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
     180               1 : 
     181               1 :         match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
     182               1 :             Ok(timeline) => {
     183               1 :                 let tli = Arc::new(timeline);
     184               1 : 
     185               1 :                 // TODO: prevent concurrent timeline creation/loading
     186               1 :                 TIMELINES_STATE
     187               1 :                     .lock()
     188               1 :                     .unwrap()
     189               1 :                     .timelines
     190               1 :                     .insert(ttid, tli.clone());
     191               1 : 
     192               1 :                 tli.bootstrap(&conf);
     193               1 : 
     194               1 :                 Ok(tli)
     195                 :             }
     196                 :             // If we can't load a timeline, it's bad. Caller will figure it out.
     197 UBC           0 :             Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
     198                 :         }
     199 CBC           1 :     }
     200                 : 
     201                 :     /// Get the number of timelines in the map.
     202               5 :     pub fn timelines_count() -> usize {
     203               5 :         TIMELINES_STATE.lock().unwrap().timelines.len()
     204               5 :     }
     205                 : 
     206                 :     /// Get the global safekeeper config.
     207               6 :     pub fn get_global_config() -> SafeKeeperConf {
     208               6 :         TIMELINES_STATE.lock().unwrap().get_conf().clone()
     209               6 :     }
     210                 : 
     211                 :     /// Create a new timeline with the given id. If the timeline already exists, returns
     212                 :     /// an existing timeline.
     213            2024 :     pub async fn create(
     214            2024 :         ttid: TenantTimelineId,
     215            2024 :         server_info: ServerInfo,
     216            2024 :         commit_lsn: Lsn,
     217            2024 :         local_start_lsn: Lsn,
     218            2024 :     ) -> Result<Arc<Timeline>> {
     219             481 :         let (conf, wal_backup_launcher_tx) = {
     220            2024 :             let state = TIMELINES_STATE.lock().unwrap();
     221            2024 :             if let Ok(timeline) = state.get(&ttid) {
     222                 :                 // Timeline already exists, return it.
     223            1543 :                 return Ok(timeline);
     224             481 :             }
     225             481 :             state.get_dependencies()
     226                 :         };
     227                 : 
     228             481 :         info!("creating new timeline {}", ttid);
     229                 : 
     230             481 :         let timeline = Arc::new(Timeline::create_empty(
     231             481 :             &conf,
     232             481 :             ttid,
     233             481 :             wal_backup_launcher_tx,
     234             481 :             server_info,
     235             481 :             commit_lsn,
     236             481 :             local_start_lsn,
     237             481 :         )?);
     238                 : 
     239                 :         // Take a lock and finish the initialization holding this mutex. No other threads
     240                 :         // can interfere with creation after we will insert timeline into the map.
     241                 :         {
     242             481 :             let mut shared_state = timeline.write_shared_state().await;
     243                 : 
     244                 :             // We can get a race condition here in case of concurrent create calls, but only
     245                 :             // in theory. create() will return valid timeline on the next try.
     246             481 :             TIMELINES_STATE
     247             481 :                 .lock()
     248             481 :                 .unwrap()
     249             481 :                 .try_insert(timeline.clone())?;
     250                 : 
     251                 :             // Write the new timeline to the disk and start background workers.
     252                 :             // Bootstrap is transactional, so if it fails, the timeline will be deleted,
     253                 :             // and the state on disk should remain unchanged.
     254            2602 :             if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
     255                 :                 // Note: the most likely reason for init failure is that the timeline
     256                 :                 // directory already exists on disk. This happens when timeline is corrupted
     257                 :                 // and wasn't loaded from disk on startup because of that. We want to preserve
     258                 :                 // the timeline directory in this case, for further inspection.
     259                 : 
     260                 :                 // TODO: this is an unusual error, perhaps we should send it to sentry
     261                 :                 // TODO: compute will try to create timeline every second, we should add backoff
     262 UBC           0 :                 error!("failed to init new timeline {}: {}", ttid, e);
     263                 : 
     264                 :                 // Timeline failed to init, it cannot be used. Remove it from the map.
     265               0 :                 TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
     266               0 :                 return Err(e);
     267 CBC         481 :             }
     268             481 :             // We are done with bootstrap, release the lock, return the timeline.
     269             481 :             // {} block forces release before .await
     270             481 :         }
     271             481 :         timeline.update_status_notify().await?;
     272             481 :         timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
     273             481 :         Ok(timeline)
     274            2024 :     }
     275                 : 
     276                 :     /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
     277                 :     /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
     278                 :     /// i.e. loaded in memory and not cancelled.
     279           12403 :     pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
     280           12403 :         let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
     281           12403 : 
     282           12403 :         match res {
     283           11930 :             Ok(tli) => {
     284           11930 :                 if tli.is_cancelled() {
     285 UBC           0 :                     return Err(TimelineError::Cancelled(ttid));
     286 CBC       11930 :                 }
     287           11930 :                 Ok(tli)
     288                 :             }
     289             473 :             _ => res,
     290                 :         }
     291           12403 :     }
     292                 : 
     293                 :     /// Returns all timelines. This is used for background timeline processes.
     294            8566 :     pub fn get_all() -> Vec<Arc<Timeline>> {
     295            8566 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     296            8566 :         global_lock
     297            8566 :             .timelines
     298            8566 :             .values()
     299            8566 :             .cloned()
     300            8566 :             .filter(|t| !t.is_cancelled())
     301            8566 :             .collect()
     302            8566 :     }
     303                 : 
     304                 :     /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
     305                 :     /// and that's why it can return cancelled timelines, to retry deleting them.
     306               4 :     fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
     307               4 :         let global_lock = TIMELINES_STATE.lock().unwrap();
     308               4 :         global_lock
     309               4 :             .timelines
     310               4 :             .values()
     311              20 :             .filter(|t| t.ttid.tenant_id == tenant_id)
     312               4 :             .cloned()
     313               4 :             .collect()
     314               4 :     }
     315                 : 
     316                 :     /// Cancels timeline, then deletes the corresponding data directory.
     317              33 :     pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
     318              33 :         let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
     319              33 :         match tli_res {
     320              31 :             Ok(timeline) => {
     321                 :                 // Take a lock and finish the deletion holding this mutex.
     322              31 :                 let mut shared_state = timeline.write_shared_state().await;
     323                 : 
     324              31 :                 info!("deleting timeline {}", ttid);
     325              31 :                 let (dir_existed, was_active) =
     326              31 :                     timeline.delete_from_disk(&mut shared_state).await?;
     327                 : 
     328                 :                 // Remove timeline from the map.
     329                 :                 // FIXME: re-enable it once we fix the issue with recreation of deleted timelines
     330                 :                 // https://github.com/neondatabase/neon/issues/3146
     331                 :                 // TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
     332                 : 
     333              31 :                 Ok(TimelineDeleteForceResult {
     334              31 :                     dir_existed,
     335              31 :                     was_active,
     336              31 :                 })
     337                 :             }
     338                 :             Err(_) => {
     339                 :                 // Timeline is not memory, but it may still exist on disk in broken state.
     340               2 :                 let dir_path = TIMELINES_STATE
     341               2 :                     .lock()
     342               2 :                     .unwrap()
     343               2 :                     .get_conf()
     344               2 :                     .timeline_dir(ttid);
     345               2 :                 let dir_existed = delete_dir(dir_path)?;
     346                 : 
     347               2 :                 Ok(TimelineDeleteForceResult {
     348               2 :                     dir_existed,
     349               2 :                     was_active: false,
     350               2 :                 })
     351                 :             }
     352                 :         }
     353              33 :     }
     354                 : 
     355                 :     /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
     356                 :     /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
     357                 :     /// created simultaneously. In that case the function will return error and the caller should
     358                 :     /// retry tenant deletion again later.
     359               4 :     pub async fn delete_force_all_for_tenant(
     360               4 :         tenant_id: &TenantId,
     361               4 :     ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
     362               4 :         info!("deleting all timelines for tenant {}", tenant_id);
     363               4 :         let to_delete = Self::get_all_for_tenant(*tenant_id);
     364               4 : 
     365               4 :         let mut err = None;
     366               4 : 
     367               4 :         let mut deleted = HashMap::new();
     368              20 :         for tli in &to_delete {
     369              16 :             match Self::delete_force(&tli.ttid).await {
     370              16 :                 Ok(result) => {
     371              16 :                     deleted.insert(tli.ttid, result);
     372              16 :                 }
     373 UBC           0 :                 Err(e) => {
     374               0 :                     error!("failed to delete timeline {}: {}", tli.ttid, e);
     375                 :                     // Save error to return later.
     376               0 :                     err = Some(e);
     377                 :                 }
     378                 :             }
     379                 :         }
     380                 : 
     381                 :         // If there was an error, return it.
     382 CBC           4 :         if let Some(e) = err {
     383 UBC           0 :             return Err(e);
     384 CBC           4 :         }
     385               4 : 
     386               4 :         // There may be broken timelines on disk, so delete the whole tenant dir as well.
     387               4 :         // Note that we could concurrently create new timelines while we were deleting them,
     388               4 :         // so the directory may be not empty. In this case timelines will have bad state
     389               4 :         // and timeline background jobs can panic.
     390               4 :         delete_dir(
     391               4 :             TIMELINES_STATE
     392               4 :                 .lock()
     393               4 :                 .unwrap()
     394               4 :                 .get_conf()
     395               4 :                 .tenant_dir(tenant_id),
     396               4 :         )?;
     397                 : 
     398                 :         // FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
     399                 :         // let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
     400                 :         // if !tlis_after_delete.is_empty() {
     401                 :         //     // Some timelines were created while we were deleting them, returning error
     402                 :         //     // to the caller, so it can retry later.
     403                 :         //     bail!(
     404                 :         //         "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
     405                 :         //         tenant_id
     406                 :         //     );
     407                 :         // }
     408                 : 
     409               4 :         Ok(deleted)
     410               4 :     }
     411                 : }
     412                 : 
     413              33 : #[derive(Clone, Copy, Serialize)]
     414                 : pub struct TimelineDeleteForceResult {
     415                 :     pub dir_existed: bool,
     416                 :     pub was_active: bool,
     417                 : }
     418                 : 
     419                 : /// Deletes directory and it's contents. Returns false if directory does not exist.
     420                 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
     421               6 :     match std::fs::remove_dir_all(path) {
     422               2 :         Ok(_) => Ok(true),
     423               4 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     424 UBC           0 :         Err(e) => Err(e.into()),
     425                 :     }
     426 CBC           6 : }
        

Generated by: LCOV version 2.1-beta