LCOV - code coverage report
Current view: top level - safekeeper/src - test_utils.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 98.2 % 114 112
Test Date: 2025-01-30 15:18:43 Functions: 100.0 % 8 8

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use crate::rate_limit::RateLimiter;
       4              : use crate::receive_wal::WalAcceptor;
       5              : use crate::safekeeper::{
       6              :     AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
       7              :     ProposerElected, SafeKeeper, TermHistory,
       8              : };
       9              : use crate::send_wal::EndWatch;
      10              : use crate::state::{TimelinePersistentState, TimelineState};
      11              : use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
      12              : use crate::timelines_set::TimelinesSet;
      13              : use crate::wal_backup::remote_timeline_path;
      14              : use crate::{control_file, receive_wal, wal_storage, SafeKeeperConf};
      15              : use camino_tempfile::Utf8TempDir;
      16              : use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
      17              : use tokio::fs::create_dir_all;
      18              : use utils::id::{NodeId, TenantTimelineId};
      19              : use utils::lsn::Lsn;
      20              : 
      21              : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
      22              : pub struct Env {
      23              :     /// Whether to enable fsync.
      24              :     pub fsync: bool,
      25              :     /// Benchmark directory. Deleted when dropped.
      26              :     pub tempdir: Utf8TempDir,
      27              : }
      28              : 
      29              : impl Env {
      30              :     /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
      31              :     /// enable fsyncing.
      32            3 :     pub fn new(fsync: bool) -> anyhow::Result<Self> {
      33            3 :         let tempdir = camino_tempfile::tempdir()?;
      34            3 :         Ok(Self { fsync, tempdir })
      35            3 :     }
      36              : 
      37              :     /// Constructs a Safekeeper config for the given node ID.
      38            6 :     fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
      39            6 :         let mut conf = SafeKeeperConf::dummy();
      40            6 :         conf.my_id = node_id;
      41            6 :         conf.no_sync = !self.fsync;
      42            6 :         conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
      43            6 :         conf
      44            6 :     }
      45              : 
      46              :     /// Constructs a Safekeeper with the given node and tenant/timeline ID.
      47              :     ///
      48              :     /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
      49              :     /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
      50              :     /// currently possible to construct a timeline using non-file storage since StateSK only accepts
      51              :     /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
      52            3 :     pub async fn make_safekeeper(
      53            3 :         &self,
      54            3 :         node_id: NodeId,
      55            3 :         ttid: TenantTimelineId,
      56            3 :         start_lsn: Lsn,
      57            3 :     ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
      58            3 :         let conf = self.make_conf(node_id);
      59            3 : 
      60            3 :         let timeline_dir = get_timeline_dir(&conf, &ttid);
      61            3 :         create_dir_all(&timeline_dir).await?;
      62              : 
      63            3 :         let mut pstate = TimelinePersistentState::empty();
      64            3 :         pstate.tenant_id = ttid.tenant_id;
      65            3 :         pstate.timeline_id = ttid.timeline_id;
      66              : 
      67            3 :         let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
      68            3 :         let ctrl =
      69            3 :             control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
      70            3 :         let state = TimelineState::new(ctrl);
      71            3 :         let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
      72              : 
      73              :         // Emulate an initial election.
      74            3 :         safekeeper
      75            3 :             .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
      76            3 :                 term: 1,
      77            3 :                 start_streaming_at: start_lsn,
      78            3 :                 term_history: TermHistory(vec![(1, start_lsn).into()]),
      79            3 :                 timeline_start_lsn: start_lsn,
      80            3 :             }))
      81            3 :             .await?;
      82              : 
      83            3 :         Ok(safekeeper)
      84            3 :     }
      85              : 
      86              :     /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
      87              :     /// manager task.
      88            3 :     pub async fn make_timeline(
      89            3 :         &self,
      90            3 :         node_id: NodeId,
      91            3 :         ttid: TenantTimelineId,
      92            3 :         start_lsn: Lsn,
      93            3 :     ) -> anyhow::Result<Arc<Timeline>> {
      94            3 :         let conf = Arc::new(self.make_conf(node_id));
      95            3 :         let timeline_dir = get_timeline_dir(&conf, &ttid);
      96            3 :         let remote_path = remote_timeline_path(&ttid)?;
      97              : 
      98            3 :         let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
      99            3 :         let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
     100            3 : 
     101            3 :         let timeline = Timeline::new(
     102            3 :             ttid,
     103            3 :             &timeline_dir,
     104            3 :             &remote_path,
     105            3 :             shared_state,
     106            3 :             conf.clone(),
     107            3 :         );
     108            3 :         timeline.bootstrap(
     109            3 :             &mut timeline.write_shared_state().await,
     110            3 :             &conf,
     111            3 :             Arc::new(TimelinesSet::default()), // ignored for now
     112            3 :             RateLimiter::new(0, 0),
     113            3 :         );
     114            3 :         Ok(timeline)
     115            3 :     }
     116              : 
     117              :     // This will be dead code when building a non-benchmark target with the
     118              :     // benchmarking feature enabled.
     119              :     #[allow(dead_code)]
     120            3 :     pub(crate) async fn write_wal(
     121            3 :         tli: Arc<Timeline>,
     122            3 :         start_lsn: Lsn,
     123            3 :         msg_size: usize,
     124            3 :         msg_count: usize,
     125            3 :     ) -> anyhow::Result<EndWatch> {
     126            3 :         let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
     127            3 :         let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
     128            3 : 
     129            3 :         let end_watch = EndWatch::Commit(tli.get_commit_lsn_watch_rx());
     130            3 : 
     131            3 :         WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
     132            3 : 
     133            3 :         let prefix = c"p";
     134            3 :         let prefixlen = prefix.to_bytes_with_nul().len();
     135            3 :         assert!(msg_size >= prefixlen);
     136            3 :         let message = vec![0; msg_size - prefixlen];
     137            3 : 
     138            3 :         let walgen =
     139            3 :             &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
     140            3 :         for _ in 0..msg_count {
     141          600 :             let (lsn, record) = walgen.next().unwrap();
     142          600 : 
     143          600 :             let req = AppendRequest {
     144          600 :                 h: AppendRequestHeader {
     145          600 :                     term: 1,
     146          600 :                     term_start_lsn: start_lsn,
     147          600 :                     begin_lsn: lsn,
     148          600 :                     end_lsn: lsn + record.len() as u64,
     149          600 :                     commit_lsn: lsn,
     150          600 :                     truncate_lsn: Lsn(0),
     151          600 :                     proposer_uuid: [0; 16],
     152          600 :                 },
     153          600 :                 wal_data: record,
     154          600 :             };
     155          600 : 
     156          600 :             let end_lsn = req.h.end_lsn;
     157          600 : 
     158          600 :             let msg = ProposerAcceptorMessage::AppendRequest(req);
     159          600 :             msg_tx.send(msg).await?;
     160          600 :             while let Some(reply) = reply_rx.recv().await {
     161          600 :                 if let AcceptorProposerMessage::AppendResponse(resp) = reply {
     162          600 :                     if resp.flush_lsn >= end_lsn {
     163          600 :                         break;
     164            0 :                     }
     165            0 :                 }
     166              :             }
     167              :         }
     168              : 
     169            3 :         Ok(end_watch)
     170            3 :     }
     171              : }
        

Generated by: LCOV version 2.1-beta