LCOV - code coverage report
Current view: top level - safekeeper/tests/walproposer_sim - simulation.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 97.7 % 301 294
Test Date: 2024-08-29 11:33:10 Functions: 88.9 % 63 56

            Line data    Source code
       1              : use std::{cell::Cell, str::FromStr, sync::Arc};
       2              : 
       3              : use crate::walproposer_sim::{safekeeper::run_server, walproposer_api::SimulationApi};
       4              : use desim::{
       5              :     executor::{self, ExternalHandle},
       6              :     node_os::NodeOs,
       7              :     options::{Delay, NetworkOptions},
       8              :     proto::{AnyMessage, NodeEvent},
       9              :     world::Node,
      10              :     world::World,
      11              : };
      12              : use rand::{Rng, SeedableRng};
      13              : use tracing::{debug, info_span, warn};
      14              : use utils::{id::TenantTimelineId, lsn::Lsn};
      15              : use walproposer::walproposer::{Config, Wrapper};
      16              : 
      17              : use super::{
      18              :     log::SimClock, safekeeper_disk::SafekeeperDisk, walproposer_api,
      19              :     walproposer_disk::DiskWalProposer,
      20              : };
      21              : 
      22              : /// Simulated safekeeper node.
      23              : pub struct SafekeeperNode {
      24              :     pub node: Arc<Node>,
      25              :     pub id: u32,
      26              :     pub disk: Arc<SafekeeperDisk>,
      27              :     pub thread: Cell<ExternalHandle>,
      28              : }
      29              : 
      30              : impl SafekeeperNode {
      31              :     /// Create and start a safekeeper at the specified Node.
      32        36144 :     pub fn new(node: Arc<Node>) -> Self {
      33        36144 :         let disk = Arc::new(SafekeeperDisk::new());
      34        36144 :         let thread = Cell::new(SafekeeperNode::launch(disk.clone(), node.clone()));
      35        36144 : 
      36        36144 :         Self {
      37        36144 :             id: node.id,
      38        36144 :             node,
      39        36144 :             disk,
      40        36144 :             thread,
      41        36144 :         }
      42        36144 :     }
      43              : 
      44       234543 :     fn launch(disk: Arc<SafekeeperDisk>, node: Arc<Node>) -> ExternalHandle {
      45       234543 :         // start the server thread
      46       234543 :         node.launch(move |os| {
      47       230496 :             run_server(os, disk).expect("server should finish without errors");
      48       234543 :         })
      49       234543 :     }
      50              : 
      51              :     /// Restart the safekeeper.
      52       198399 :     pub fn restart(&self) {
      53       198399 :         let new_thread = SafekeeperNode::launch(self.disk.clone(), self.node.clone());
      54       198399 :         let old_thread = self.thread.replace(new_thread);
      55       198399 :         old_thread.crash_stop();
      56       198399 :     }
      57              : }
      58              : 
      59              : /// Simulated walproposer node.
      60              : pub struct WalProposer {
      61              :     thread: ExternalHandle,
      62              :     node: Arc<Node>,
      63              :     disk: Arc<DiskWalProposer>,
      64              :     sync_safekeepers: bool,
      65              : }
      66              : 
      67              : impl WalProposer {
      68              :     /// Generic start function for both modes.
      69       216618 :     fn start(
      70       216618 :         os: NodeOs,
      71       216618 :         disk: Arc<DiskWalProposer>,
      72       216618 :         ttid: TenantTimelineId,
      73       216618 :         addrs: Vec<String>,
      74       216618 :         lsn: Option<Lsn>,
      75       216618 :     ) {
      76       216618 :         let sync_safekeepers = lsn.is_none();
      77              : 
      78       216618 :         let _enter = if sync_safekeepers {
      79       208189 :             info_span!("sync", started = executor::now()).entered()
      80              :         } else {
      81         8429 :             info_span!("walproposer", started = executor::now()).entered()
      82              :         };
      83              : 
      84       216618 :         os.log_event(format!("started;walproposer;{}", sync_safekeepers as i32));
      85       216618 : 
      86       216618 :         let config = Config {
      87       216618 :             ttid,
      88       216618 :             safekeepers_list: addrs,
      89       216618 :             safekeeper_reconnect_timeout: 1000,
      90       216618 :             safekeeper_connection_timeout: 5000,
      91       216618 :             sync_safekeepers,
      92       216618 :         };
      93       216618 :         let args = walproposer_api::Args {
      94       216618 :             os,
      95       216618 :             config: config.clone(),
      96       216618 :             disk,
      97       216618 :             redo_start_lsn: lsn,
      98       216618 :         };
      99       216618 :         let api = SimulationApi::new(args);
     100       216618 :         let wp = Wrapper::new(Box::new(api), config);
     101       216618 :         wp.start();
     102       216618 :     }
     103              : 
     104              :     /// Start walproposer in a sync_safekeepers mode.
     105       212342 :     pub fn launch_sync(ttid: TenantTimelineId, addrs: Vec<String>, node: Arc<Node>) -> Self {
     106       212342 :         debug!("sync_safekeepers started at node {}", node.id);
     107       212342 :         let disk = DiskWalProposer::new();
     108       212342 :         let disk_wp = disk.clone();
     109       212342 : 
     110       212342 :         // start the client thread
     111       212342 :         let handle = node.launch(move |os| {
     112       208189 :             WalProposer::start(os, disk_wp, ttid, addrs, None);
     113       212342 :         });
     114       212342 : 
     115       212342 :         Self {
     116       212342 :             thread: handle,
     117       212342 :             node,
     118       212342 :             disk,
     119       212342 :             sync_safekeepers: true,
     120       212342 :         }
     121       212342 :     }
     122              : 
     123              :     /// Start walproposer in a normal mode.
     124         8429 :     pub fn launch_walproposer(
     125         8429 :         ttid: TenantTimelineId,
     126         8429 :         addrs: Vec<String>,
     127         8429 :         node: Arc<Node>,
     128         8429 :         lsn: Lsn,
     129         8429 :     ) -> Self {
     130         8429 :         debug!("walproposer started at node {}", node.id);
     131         8429 :         let disk = DiskWalProposer::new();
     132         8429 :         disk.lock().reset_to(lsn);
     133         8429 :         let disk_wp = disk.clone();
     134         8429 : 
     135         8429 :         // start the client thread
     136         8429 :         let handle = node.launch(move |os| {
     137         8429 :             WalProposer::start(os, disk_wp, ttid, addrs, Some(lsn));
     138         8429 :         });
     139         8429 : 
     140         8429 :         Self {
     141         8429 :             thread: handle,
     142         8429 :             node,
     143         8429 :             disk,
     144         8429 :             sync_safekeepers: false,
     145         8429 :         }
     146         8429 :     }
     147              : 
     148         9366 :     pub fn write_tx(&mut self, cnt: usize) {
     149         9366 :         let start_lsn = self.disk.lock().flush_rec_ptr();
     150         9366 : 
     151        99889 :         for _ in 0..cnt {
     152        99889 :             self.disk
     153        99889 :                 .lock()
     154        99889 :                 .insert_logical_message("prefix", b"message")
     155        99889 :                 .expect("failed to generate logical message");
     156        99889 :         }
     157              : 
     158         9366 :         let end_lsn = self.disk.lock().flush_rec_ptr();
     159         9366 : 
     160         9366 :         // log event
     161         9366 :         self.node
     162         9366 :             .log_event(format!("write_wal;{};{};{}", start_lsn.0, end_lsn.0, cnt));
     163         9366 : 
     164         9366 :         // now we need to set "Latch" in walproposer
     165         9366 :         self.node
     166         9366 :             .node_events()
     167         9366 :             .send(NodeEvent::Internal(AnyMessage::Just32(0)));
     168         9366 :     }
     169              : 
     170       198827 :     pub fn stop(&self) {
     171       198827 :         self.thread.crash_stop();
     172       198827 :     }
     173              : }
     174              : 
     175              : /// Holds basic simulation settings, such as network options.
     176              : pub struct TestConfig {
     177              :     pub network: NetworkOptions,
     178              :     pub timeout: u64,
     179              :     pub clock: Option<SimClock>,
     180              : }
     181              : 
     182              : impl TestConfig {
     183              :     /// Create a new TestConfig with default settings.
     184           54 :     pub fn new(clock: Option<SimClock>) -> Self {
     185           54 :         Self {
     186           54 :             network: NetworkOptions {
     187           54 :                 keepalive_timeout: Some(2000),
     188           54 :                 connect_delay: Delay {
     189           54 :                     min: 1,
     190           54 :                     max: 5,
     191           54 :                     fail_prob: 0.0,
     192           54 :                 },
     193           54 :                 send_delay: Delay {
     194           54 :                     min: 1,
     195           54 :                     max: 5,
     196           54 :                     fail_prob: 0.0,
     197           54 :                 },
     198           54 :             },
     199           54 :             timeout: 1_000 * 10,
     200           54 :             clock,
     201           54 :         }
     202           54 :     }
     203              : 
     204              :     /// Start a new simulation with the specified seed.
     205        12048 :     pub fn start(&self, seed: u64) -> Test {
     206        12048 :         let world = Arc::new(World::new(seed, Arc::new(self.network.clone())));
     207              : 
     208        12048 :         if let Some(clock) = &self.clock {
     209        12048 :             clock.set_clock(world.clock());
     210        12048 :         }
     211              : 
     212        12048 :         let servers = [
     213        12048 :             SafekeeperNode::new(world.new_node()),
     214        12048 :             SafekeeperNode::new(world.new_node()),
     215        12048 :             SafekeeperNode::new(world.new_node()),
     216        12048 :         ];
     217        12048 : 
     218        12048 :         let server_ids = [servers[0].id, servers[1].id, servers[2].id];
     219        36144 :         let safekeepers_addrs = server_ids.map(|id| format!("node:{}", id)).to_vec();
     220        12048 : 
     221        12048 :         let ttid = TenantTimelineId::generate();
     222        12048 : 
     223        12048 :         Test {
     224        12048 :             world,
     225        12048 :             servers,
     226        12048 :             sk_list: safekeepers_addrs,
     227        12048 :             ttid,
     228        12048 :             timeout: self.timeout,
     229        12048 :         }
     230        12048 :     }
     231              : }
     232              : 
     233              : /// Holds simulation state.
     234              : pub struct Test {
     235              :     pub world: Arc<World>,
     236              :     pub servers: [SafekeeperNode; 3],
     237              :     pub sk_list: Vec<String>,
     238              :     pub ttid: TenantTimelineId,
     239              :     pub timeout: u64,
     240              : }
     241              : 
     242              : impl Test {
     243              :     /// Start a sync_safekeepers thread and wait for it to finish.
     244           36 :     pub fn sync_safekeepers(&self) -> anyhow::Result<Lsn> {
     245           36 :         let wp = self.launch_sync_safekeepers();
     246           36 : 
     247           36 :         // poll until exit or timeout
     248           36 :         let time_limit = self.timeout;
     249         1380 :         while self.world.step() && self.world.now() < time_limit && !wp.thread.is_finished() {}
     250              : 
     251           36 :         if !wp.thread.is_finished() {
     252            0 :             anyhow::bail!("timeout or idle stuck");
     253           36 :         }
     254           36 : 
     255           36 :         let res = wp.thread.result();
     256           36 :         if res.0 != 0 {
     257            0 :             anyhow::bail!("non-zero exitcode: {:?}", res);
     258           36 :         }
     259           36 :         let lsn = Lsn::from_str(&res.1)?;
     260           36 :         Ok(lsn)
     261           36 :     }
     262              : 
     263              :     /// Spawn a new sync_safekeepers thread.
     264       212342 :     pub fn launch_sync_safekeepers(&self) -> WalProposer {
     265       212342 :         WalProposer::launch_sync(self.ttid, self.sk_list.clone(), self.world.new_node())
     266       212342 :     }
     267              : 
     268              :     /// Spawn a new walproposer thread.
     269         8429 :     pub fn launch_walproposer(&self, lsn: Lsn) -> WalProposer {
     270         8429 :         let lsn = if lsn.0 == 0 {
     271              :             // usual LSN after basebackup
     272         4519 :             Lsn(21623024)
     273              :         } else {
     274         3910 :             lsn
     275              :         };
     276              : 
     277         8429 :         WalProposer::launch_walproposer(self.ttid, self.sk_list.clone(), self.world.new_node(), lsn)
     278         8429 :     }
     279              : 
     280              :     /// Execute the simulation for the specified duration.
     281          630 :     pub fn poll_for_duration(&self, duration: u64) {
     282          630 :         let time_limit = std::cmp::min(self.world.now() + duration, self.timeout);
     283        10572 :         while self.world.step() && self.world.now() < time_limit {}
     284          630 :     }
     285              : 
     286              :     /// Execute the simulation together with events defined in some schedule.
     287        12024 :     pub fn run_schedule(&self, schedule: &Schedule) -> anyhow::Result<()> {
     288        12024 :         // scheduling empty events so that world will stop in those points
     289        12024 :         {
     290        12024 :             let clock = self.world.clock();
     291        12024 : 
     292        12024 :             let now = self.world.now();
     293       608186 :             for (time, _) in schedule {
     294       596162 :                 if *time < now {
     295            0 :                     continue;
     296       596162 :                 }
     297       596162 :                 clock.schedule_fake(*time - now);
     298              :             }
     299              :         }
     300              : 
     301        12024 :         let mut wp = self.launch_sync_safekeepers();
     302        12024 : 
     303        12024 :         let mut skipped_tx = 0;
     304        12024 :         let mut started_tx = 0;
     305        12024 : 
     306        12024 :         let mut schedule_ptr = 0;
     307              : 
     308       618555 :         loop {
     309       618555 :             if wp.sync_safekeepers && wp.thread.is_finished() {
     310         9872 :                 let res = wp.thread.result();
     311         9872 :                 if res.0 != 0 {
     312         1461 :                     warn!("sync non-zero exitcode: {:?}", res);
     313         1461 :                     debug!("restarting sync_safekeepers");
     314              :                     // restart the sync_safekeepers
     315         1461 :                     wp = self.launch_sync_safekeepers();
     316         1461 :                     continue;
     317         8411 :                 }
     318         8411 :                 let lsn = Lsn::from_str(&res.1)?;
     319         8411 :                 debug!("sync_safekeepers finished at LSN {}", lsn);
     320         8411 :                 wp = self.launch_walproposer(lsn);
     321         8411 :                 debug!("walproposer started at thread {}", wp.thread.id());
     322       608683 :             }
     323              : 
     324       617094 :             let now = self.world.now();
     325      1213256 :             while schedule_ptr < schedule.len() && schedule[schedule_ptr].0 <= now {
     326       596162 :                 if now != schedule[schedule_ptr].0 {
     327            0 :                     warn!("skipped event {:?} at {}", schedule[schedule_ptr], now);
     328       596162 :                 }
     329              : 
     330       596162 :                 let action = &schedule[schedule_ptr].1;
     331       596162 :                 match action {
     332       198948 :                     TestAction::WriteTx(size) => {
     333       198948 :                         if !wp.sync_safekeepers && !wp.thread.is_finished() {
     334         8754 :                             started_tx += *size;
     335         8754 :                             wp.write_tx(*size);
     336         8754 :                             debug!("written {} transactions", size);
     337              :                         } else {
     338       190194 :                             skipped_tx += size;
     339       190194 :                             debug!("skipped {} transactions", size);
     340              :                         }
     341              :                     }
     342       198393 :                     TestAction::RestartSafekeeper(id) => {
     343       198393 :                         debug!("restarting safekeeper {}", id);
     344       198393 :                         self.servers[*id].restart();
     345              :                     }
     346              :                     TestAction::RestartWalProposer => {
     347       198821 :                         debug!("restarting sync_safekeepers");
     348       198821 :                         wp.stop();
     349       198821 :                         wp = self.launch_sync_safekeepers();
     350              :                     }
     351              :                 }
     352       596162 :                 schedule_ptr += 1;
     353              :             }
     354              : 
     355       617094 :             if schedule_ptr == schedule.len() {
     356        12024 :                 break;
     357       605070 :             }
     358       605070 :             let next_event_time = schedule[schedule_ptr].0;
     359       605070 : 
     360       605070 :             // poll until the next event
     361       605070 :             if wp.thread.is_finished() {
     362         6779 :                 while self.world.step() && self.world.now() < next_event_time {}
     363              :             } else {
     364      9434941 :                 while self.world.step()
     365      9434941 :                     && self.world.now() < next_event_time
     366      8840658 :                     && !wp.thread.is_finished()
     367      8830534 :                 {}
     368              :             }
     369              :         }
     370              : 
     371        12024 :         debug!(
     372            0 :             "finished schedule, total steps: {}",
     373            0 :             self.world.get_thread_step_count()
     374              :         );
     375        12024 :         debug!("skipped_tx: {}", skipped_tx);
     376        12024 :         debug!("started_tx: {}", started_tx);
     377              : 
     378        12024 :         Ok(())
     379        12024 :     }
     380              : }
     381              : 
     382              : #[derive(Debug, Clone)]
     383              : pub enum TestAction {
     384              :     WriteTx(usize),
     385              :     RestartSafekeeper(usize),
     386              :     RestartWalProposer,
     387              : }
     388              : 
     389              : pub type Schedule = Vec<(u64, TestAction)>;
     390              : 
     391        12012 : pub fn generate_schedule(seed: u64) -> Schedule {
     392        12012 :     let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
     393        12012 :     let mut schedule = Vec::new();
     394        12012 :     let mut time = 0;
     395        12012 : 
     396        12012 :     let cnt = rng.gen_range(1..100);
     397        12012 : 
     398        12012 :     for _ in 0..cnt {
     399       595472 :         time += rng.gen_range(0..500);
     400       595472 :         let action = match rng.gen_range(0..3) {
     401       198306 :             0 => TestAction::WriteTx(rng.gen_range(1..10)),
     402       198357 :             1 => TestAction::RestartSafekeeper(rng.gen_range(0..3)),
     403       198809 :             2 => TestAction::RestartWalProposer,
     404            0 :             _ => unreachable!(),
     405              :         };
     406       595472 :         schedule.push((time, action));
     407              :     }
     408              : 
     409        12012 :     schedule
     410        12012 : }
     411              : 
     412        12012 : pub fn generate_network_opts(seed: u64) -> NetworkOptions {
     413        12012 :     let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
     414        12012 : 
     415        12012 :     let timeout = rng.gen_range(100..2000);
     416        12012 :     let max_delay = rng.gen_range(1..2 * timeout);
     417        12012 :     let min_delay = rng.gen_range(1..=max_delay);
     418        12012 : 
     419        12012 :     let max_fail_prob = rng.gen_range(0.0..0.9);
     420        12012 :     let connect_fail_prob = rng.gen_range(0.0..max_fail_prob);
     421        12012 :     let send_fail_prob = rng.gen_range(0.0..connect_fail_prob);
     422        12012 : 
     423        12012 :     NetworkOptions {
     424        12012 :         keepalive_timeout: Some(timeout),
     425        12012 :         connect_delay: Delay {
     426        12012 :             min: min_delay,
     427        12012 :             max: max_delay,
     428        12012 :             fail_prob: connect_fail_prob,
     429        12012 :         },
     430        12012 :         send_delay: Delay {
     431        12012 :             min: min_delay,
     432        12012 :             max: max_delay,
     433        12012 :             fail_prob: send_fail_prob,
     434        12012 :         },
     435        12012 :     }
     436        12012 : }
        

Generated by: LCOV version 2.1-beta