LCOV - code coverage report
Current view: top level - safekeeper/src - test_utils.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 98.3 % 116 114
Test Date: 2025-03-12 16:10:49 Functions: 100.0 % 8 8

            Line data    Source code
       1              : use std::ffi::CStr;
       2              : use std::sync::Arc;
       3              : 
       4              : use camino_tempfile::Utf8TempDir;
       5              : use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
       6              : use safekeeper_api::membership::SafekeeperGeneration as Generation;
       7              : use tokio::fs::create_dir_all;
       8              : use utils::id::{NodeId, TenantTimelineId};
       9              : use utils::lsn::Lsn;
      10              : 
      11              : use crate::rate_limit::RateLimiter;
      12              : use crate::receive_wal::WalAcceptor;
      13              : use crate::safekeeper::{
      14              :     AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
      15              :     ProposerElected, SafeKeeper, TermHistory,
      16              : };
      17              : use crate::send_wal::EndWatch;
      18              : use crate::state::{TimelinePersistentState, TimelineState};
      19              : use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir};
      20              : use crate::timelines_set::TimelinesSet;
      21              : use crate::wal_backup::remote_timeline_path;
      22              : use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage};
      23              : 
      24              : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
      25              : pub struct Env {
      26              :     /// Whether to enable fsync.
      27              :     pub fsync: bool,
      28              :     /// Benchmark directory. Deleted when dropped.
      29              :     pub tempdir: Utf8TempDir,
      30              : }
      31              : 
      32              : impl Env {
      33              :     /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
      34              :     /// enable fsyncing.
      35            5 :     pub fn new(fsync: bool) -> anyhow::Result<Self> {
      36            5 :         let tempdir = camino_tempfile::tempdir()?;
      37            5 :         Ok(Self { fsync, tempdir })
      38            5 :     }
      39              : 
      40              :     /// Constructs a Safekeeper config for the given node ID.
      41           10 :     fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
      42           10 :         let mut conf = SafeKeeperConf::dummy();
      43           10 :         conf.my_id = node_id;
      44           10 :         conf.no_sync = !self.fsync;
      45           10 :         conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
      46           10 :         conf
      47           10 :     }
      48              : 
      49              :     /// Constructs a Safekeeper with the given node and tenant/timeline ID.
      50              :     ///
      51              :     /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
      52              :     /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
      53              :     /// currently possible to construct a timeline using non-file storage since StateSK only accepts
      54              :     /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
      55            5 :     pub async fn make_safekeeper(
      56            5 :         &self,
      57            5 :         node_id: NodeId,
      58            5 :         ttid: TenantTimelineId,
      59            5 :         start_lsn: Lsn,
      60            5 :     ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
      61            5 :         let conf = self.make_conf(node_id);
      62            5 : 
      63            5 :         let timeline_dir = get_timeline_dir(&conf, &ttid);
      64            5 :         create_dir_all(&timeline_dir).await?;
      65              : 
      66            5 :         let mut pstate = TimelinePersistentState::empty();
      67            5 :         pstate.tenant_id = ttid.tenant_id;
      68            5 :         pstate.timeline_id = ttid.timeline_id;
      69              : 
      70            5 :         let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
      71            5 :         let ctrl =
      72            5 :             control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
      73            5 :         let state = TimelineState::new(ctrl);
      74            5 :         let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
      75              : 
      76              :         // Emulate an initial election.
      77            5 :         safekeeper
      78            5 :             .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
      79            5 :                 generation: Generation::new(0),
      80            5 :                 term: 1,
      81            5 :                 start_streaming_at: start_lsn,
      82            5 :                 term_history: TermHistory(vec![(1, start_lsn).into()]),
      83            5 :             }))
      84            5 :             .await?;
      85              : 
      86            5 :         Ok(safekeeper)
      87            5 :     }
      88              : 
      89              :     /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
      90              :     /// manager task.
      91            5 :     pub async fn make_timeline(
      92            5 :         &self,
      93            5 :         node_id: NodeId,
      94            5 :         ttid: TenantTimelineId,
      95            5 :         start_lsn: Lsn,
      96            5 :     ) -> anyhow::Result<Arc<Timeline>> {
      97            5 :         let conf = Arc::new(self.make_conf(node_id));
      98            5 :         let timeline_dir = get_timeline_dir(&conf, &ttid);
      99            5 :         let remote_path = remote_timeline_path(&ttid)?;
     100              : 
     101            5 :         let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
     102            5 :         let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
     103            5 : 
     104            5 :         let timeline = Timeline::new(
     105            5 :             ttid,
     106            5 :             &timeline_dir,
     107            5 :             &remote_path,
     108            5 :             shared_state,
     109            5 :             conf.clone(),
     110            5 :         );
     111            5 :         timeline.bootstrap(
     112            5 :             &mut timeline.write_shared_state().await,
     113            5 :             &conf,
     114            5 :             Arc::new(TimelinesSet::default()), // ignored for now
     115            5 :             RateLimiter::new(0, 0),
     116            5 :         );
     117            5 :         Ok(timeline)
     118            5 :     }
     119              : 
     120              :     // This will be dead code when building a non-benchmark target with the
     121              :     // benchmarking feature enabled.
     122              :     #[allow(dead_code)]
     123            5 :     pub(crate) async fn write_wal(
     124            5 :         tli: Arc<Timeline>,
     125            5 :         start_lsn: Lsn,
     126            5 :         msg_size: usize,
     127            5 :         msg_count: usize,
     128            5 :         prefix: &CStr,
     129            5 :         mut next_record_lsns: Option<&mut Vec<Lsn>>,
     130            5 :     ) -> anyhow::Result<EndWatch> {
     131            5 :         let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
     132            5 :         let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
     133            5 : 
     134            5 :         let end_watch = EndWatch::Commit(tli.get_commit_lsn_watch_rx());
     135            5 : 
     136            5 :         WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
     137            5 : 
     138            5 :         let prefixlen = prefix.to_bytes_with_nul().len();
     139            5 :         assert!(msg_size >= prefixlen);
     140            5 :         let message = vec![0; msg_size - prefixlen];
     141            5 : 
     142            5 :         let walgen =
     143            5 :             &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
     144            5 :         for _ in 0..msg_count {
     145          620 :             let (lsn, record) = walgen.next().unwrap();
     146          620 :             if let Some(ref mut lsns) = next_record_lsns {
     147          220 :                 lsns.push(lsn);
     148          400 :             }
     149              : 
     150          620 :             let req = AppendRequest {
     151          620 :                 h: AppendRequestHeader {
     152          620 :                     generation: Generation::new(0),
     153          620 :                     term: 1,
     154          620 :                     begin_lsn: lsn,
     155          620 :                     end_lsn: lsn + record.len() as u64,
     156          620 :                     commit_lsn: lsn,
     157          620 :                     truncate_lsn: Lsn(0),
     158          620 :                 },
     159          620 :                 wal_data: record,
     160          620 :             };
     161          620 : 
     162          620 :             let end_lsn = req.h.end_lsn;
     163          620 : 
     164          620 :             let msg = ProposerAcceptorMessage::AppendRequest(req);
     165          620 :             msg_tx.send(msg).await?;
     166          620 :             while let Some(reply) = reply_rx.recv().await {
     167          620 :                 if let AcceptorProposerMessage::AppendResponse(resp) = reply {
     168          620 :                     if resp.flush_lsn >= end_lsn {
     169          620 :                         break;
     170            0 :                     }
     171            0 :                 }
     172              :             }
     173              :         }
     174              : 
     175            5 :         Ok(end_watch)
     176            5 :     }
     177              : }
        

Generated by: LCOV version 2.1-beta