LCOV - code coverage report
Current view: top level - safekeeper/src - test_utils.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 0.0 % 66 0
Test Date: 2025-01-07 20:58:07 Functions: 0.0 % 6 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use crate::rate_limit::RateLimiter;
       4              : use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
       5              : use crate::state::{TimelinePersistentState, TimelineState};
       6              : use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
       7              : use crate::timelines_set::TimelinesSet;
       8              : use crate::wal_backup::remote_timeline_path;
       9              : use crate::{control_file, wal_storage, SafeKeeperConf};
      10              : use camino_tempfile::Utf8TempDir;
      11              : use tokio::fs::create_dir_all;
      12              : use utils::id::{NodeId, TenantTimelineId};
      13              : use utils::lsn::Lsn;
      14              : 
      15              : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
      16              : pub struct Env {
      17              :     /// Whether to enable fsync.
      18              :     pub fsync: bool,
      19              :     /// Benchmark directory. Deleted when dropped.
      20              :     pub tempdir: Utf8TempDir,
      21              : }
      22              : 
      23              : impl Env {
      24              :     /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
      25              :     /// enable fsyncing.
      26            0 :     pub fn new(fsync: bool) -> anyhow::Result<Self> {
      27            0 :         let tempdir = camino_tempfile::tempdir()?;
      28            0 :         Ok(Self { fsync, tempdir })
      29            0 :     }
      30              : 
      31              :     /// Constructs a Safekeeper config for the given node ID.
      32            0 :     fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
      33            0 :         let mut conf = SafeKeeperConf::dummy();
      34            0 :         conf.my_id = node_id;
      35            0 :         conf.no_sync = !self.fsync;
      36            0 :         conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
      37            0 :         conf
      38            0 :     }
      39              : 
      40              :     /// Constructs a Safekeeper with the given node and tenant/timeline ID.
      41              :     ///
      42              :     /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
      43              :     /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
      44              :     /// currently possible to construct a timeline using non-file storage since StateSK only accepts
      45              :     /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
      46            0 :     pub async fn make_safekeeper(
      47            0 :         &self,
      48            0 :         node_id: NodeId,
      49            0 :         ttid: TenantTimelineId,
      50            0 :         start_lsn: Lsn,
      51            0 :     ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
      52            0 :         let conf = self.make_conf(node_id);
      53            0 : 
      54            0 :         let timeline_dir = get_timeline_dir(&conf, &ttid);
      55            0 :         create_dir_all(&timeline_dir).await?;
      56              : 
      57            0 :         let mut pstate = TimelinePersistentState::empty();
      58            0 :         pstate.tenant_id = ttid.tenant_id;
      59            0 :         pstate.timeline_id = ttid.timeline_id;
      60              : 
      61            0 :         let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
      62            0 :         let ctrl =
      63            0 :             control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
      64            0 :         let state = TimelineState::new(ctrl);
      65            0 :         let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
      66              : 
      67              :         // Emulate an initial election.
      68            0 :         safekeeper
      69            0 :             .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
      70            0 :                 term: 1,
      71            0 :                 start_streaming_at: start_lsn,
      72            0 :                 term_history: TermHistory(vec![(1, start_lsn).into()]),
      73            0 :                 timeline_start_lsn: start_lsn,
      74            0 :             }))
      75            0 :             .await?;
      76              : 
      77            0 :         Ok(safekeeper)
      78            0 :     }
      79              : 
      80              :     /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
      81              :     /// manager task.
      82            0 :     pub async fn make_timeline(
      83            0 :         &self,
      84            0 :         node_id: NodeId,
      85            0 :         ttid: TenantTimelineId,
      86            0 :         start_lsn: Lsn,
      87            0 :     ) -> anyhow::Result<Arc<Timeline>> {
      88            0 :         let conf = Arc::new(self.make_conf(node_id));
      89            0 :         let timeline_dir = get_timeline_dir(&conf, &ttid);
      90            0 :         let remote_path = remote_timeline_path(&ttid)?;
      91              : 
      92            0 :         let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
      93            0 :         let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
      94            0 : 
      95            0 :         let timeline = Timeline::new(
      96            0 :             ttid,
      97            0 :             &timeline_dir,
      98            0 :             &remote_path,
      99            0 :             shared_state,
     100            0 :             conf.clone(),
     101            0 :         );
     102            0 :         timeline.bootstrap(
     103            0 :             &mut timeline.write_shared_state().await,
     104            0 :             &conf,
     105            0 :             Arc::new(TimelinesSet::default()), // ignored for now
     106            0 :             RateLimiter::new(0, 0),
     107            0 :         );
     108            0 :         Ok(timeline)
     109            0 :     }
     110              : }
        

Generated by: LCOV version 2.1-beta