LCOV - code coverage report
Current view: top level - safekeeper/src - test_utils.rs (source / functions) Coverage Total Hit
Test: 91bf6c8f32e5e69adde6241313e732fdd6d6e277.info Lines: 98.3 % 116 114
Test Date: 2025-03-04 12:19:20 Functions: 100.0 % 8 8

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use camino_tempfile::Utf8TempDir;
       4              : use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
       5              : use safekeeper_api::membership::SafekeeperGeneration as Generation;
       6              : use tokio::fs::create_dir_all;
       7              : use utils::id::{NodeId, TenantTimelineId};
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use crate::rate_limit::RateLimiter;
      11              : use crate::receive_wal::WalAcceptor;
      12              : use crate::safekeeper::{
      13              :     AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
      14              :     ProposerElected, SafeKeeper, TermHistory,
      15              : };
      16              : use crate::send_wal::EndWatch;
      17              : use crate::state::{TimelinePersistentState, TimelineState};
      18              : use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir};
      19              : use crate::timelines_set::TimelinesSet;
      20              : use crate::wal_backup::remote_timeline_path;
      21              : use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage};
      22              : 
      23              : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
      24              : pub struct Env {
      25              :     /// Whether to enable fsync.
      26              :     pub fsync: bool,
      27              :     /// Benchmark directory. Deleted when dropped.
      28              :     pub tempdir: Utf8TempDir,
      29              : }
      30              : 
      31              : impl Env {
      32              :     /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
      33              :     /// enable fsyncing.
      34            4 :     pub fn new(fsync: bool) -> anyhow::Result<Self> {
      35            4 :         let tempdir = camino_tempfile::tempdir()?;
      36            4 :         Ok(Self { fsync, tempdir })
      37            4 :     }
      38              : 
      39              :     /// Constructs a Safekeeper config for the given node ID.
      40            8 :     fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
      41            8 :         let mut conf = SafeKeeperConf::dummy();
      42            8 :         conf.my_id = node_id;
      43            8 :         conf.no_sync = !self.fsync;
      44            8 :         conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
      45            8 :         conf
      46            8 :     }
      47              : 
      48              :     /// Constructs a Safekeeper with the given node and tenant/timeline ID.
      49              :     ///
      50              :     /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
      51              :     /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
      52              :     /// currently possible to construct a timeline using non-file storage since StateSK only accepts
      53              :     /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
      54            4 :     pub async fn make_safekeeper(
      55            4 :         &self,
      56            4 :         node_id: NodeId,
      57            4 :         ttid: TenantTimelineId,
      58            4 :         start_lsn: Lsn,
      59            4 :     ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
      60            4 :         let conf = self.make_conf(node_id);
      61            4 : 
      62            4 :         let timeline_dir = get_timeline_dir(&conf, &ttid);
      63            4 :         create_dir_all(&timeline_dir).await?;
      64              : 
      65            4 :         let mut pstate = TimelinePersistentState::empty();
      66            4 :         pstate.tenant_id = ttid.tenant_id;
      67            4 :         pstate.timeline_id = ttid.timeline_id;
      68              : 
      69            4 :         let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
      70            4 :         let ctrl =
      71            4 :             control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
      72            4 :         let state = TimelineState::new(ctrl);
      73            4 :         let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
      74              : 
      75              :         // Emulate an initial election.
      76            4 :         safekeeper
      77            4 :             .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
      78            4 :                 generation: Generation::new(0),
      79            4 :                 term: 1,
      80            4 :                 start_streaming_at: start_lsn,
      81            4 :                 term_history: TermHistory(vec![(1, start_lsn).into()]),
      82            4 :             }))
      83            4 :             .await?;
      84              : 
      85            4 :         Ok(safekeeper)
      86            4 :     }
      87              : 
      88              :     /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
      89              :     /// manager task.
      90            4 :     pub async fn make_timeline(
      91            4 :         &self,
      92            4 :         node_id: NodeId,
      93            4 :         ttid: TenantTimelineId,
      94            4 :         start_lsn: Lsn,
      95            4 :     ) -> anyhow::Result<Arc<Timeline>> {
      96            4 :         let conf = Arc::new(self.make_conf(node_id));
      97            4 :         let timeline_dir = get_timeline_dir(&conf, &ttid);
      98            4 :         let remote_path = remote_timeline_path(&ttid)?;
      99              : 
     100            4 :         let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
     101            4 :         let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
     102            4 : 
     103            4 :         let timeline = Timeline::new(
     104            4 :             ttid,
     105            4 :             &timeline_dir,
     106            4 :             &remote_path,
     107            4 :             shared_state,
     108            4 :             conf.clone(),
     109            4 :         );
     110            4 :         timeline.bootstrap(
     111            4 :             &mut timeline.write_shared_state().await,
     112            4 :             &conf,
     113            4 :             Arc::new(TimelinesSet::default()), // ignored for now
     114            4 :             RateLimiter::new(0, 0),
     115            4 :         );
     116            4 :         Ok(timeline)
     117            4 :     }
     118              : 
     119              :     // This will be dead code when building a non-benchmark target with the
     120              :     // benchmarking feature enabled.
     121              :     #[allow(dead_code)]
     122            4 :     pub(crate) async fn write_wal(
     123            4 :         tli: Arc<Timeline>,
     124            4 :         start_lsn: Lsn,
     125            4 :         msg_size: usize,
     126            4 :         msg_count: usize,
     127            4 :         mut next_record_lsns: Option<&mut Vec<Lsn>>,
     128            4 :     ) -> anyhow::Result<EndWatch> {
     129            4 :         let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
     130            4 :         let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
     131            4 : 
     132            4 :         let end_watch = EndWatch::Commit(tli.get_commit_lsn_watch_rx());
     133            4 : 
     134            4 :         WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
     135            4 : 
     136            4 :         let prefix = c"neon-file:";
     137            4 :         let prefixlen = prefix.to_bytes_with_nul().len();
     138            4 :         assert!(msg_size >= prefixlen);
     139            4 :         let message = vec![0; msg_size - prefixlen];
     140            4 : 
     141            4 :         let walgen =
     142            4 :             &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
     143            4 :         for _ in 0..msg_count {
     144          610 :             let (lsn, record) = walgen.next().unwrap();
     145          610 :             if let Some(ref mut lsns) = next_record_lsns {
     146          210 :                 lsns.push(lsn);
     147          400 :             }
     148              : 
     149          610 :             let req = AppendRequest {
     150          610 :                 h: AppendRequestHeader {
     151          610 :                     generation: Generation::new(0),
     152          610 :                     term: 1,
     153          610 :                     begin_lsn: lsn,
     154          610 :                     end_lsn: lsn + record.len() as u64,
     155          610 :                     commit_lsn: lsn,
     156          610 :                     truncate_lsn: Lsn(0),
     157          610 :                 },
     158          610 :                 wal_data: record,
     159          610 :             };
     160          610 : 
     161          610 :             let end_lsn = req.h.end_lsn;
     162          610 : 
     163          610 :             let msg = ProposerAcceptorMessage::AppendRequest(req);
     164          610 :             msg_tx.send(msg).await?;
     165          610 :             while let Some(reply) = reply_rx.recv().await {
     166          610 :                 if let AcceptorProposerMessage::AppendResponse(resp) = reply {
     167          610 :                     if resp.flush_lsn >= end_lsn {
     168          610 :                         break;
     169            0 :                     }
     170            0 :                 }
     171              :             }
     172              :         }
     173              : 
     174            4 :         Ok(end_watch)
     175            4 :     }
     176              : }
        

Generated by: LCOV version 2.1-beta