|             Line data    Source code 
       1              : use std::cell::Cell;
       2              : use std::str::FromStr;
       3              : use std::sync::Arc;
       4              : 
       5              : use desim::executor::{self, ExternalHandle};
       6              : use desim::node_os::NodeOs;
       7              : use desim::options::{Delay, NetworkOptions};
       8              : use desim::proto::{AnyMessage, NodeEvent};
       9              : use desim::world::{Node, World};
      10              : use rand::{Rng, SeedableRng};
      11              : use tracing::{debug, info_span, warn};
      12              : use utils::id::TenantTimelineId;
      13              : use utils::lsn::Lsn;
      14              : use walproposer::walproposer::{Config, Wrapper};
      15              : 
      16              : use super::log::SimClock;
      17              : use super::safekeeper_disk::SafekeeperDisk;
      18              : use super::walproposer_api;
      19              : use super::walproposer_disk::DiskWalProposer;
      20              : use crate::walproposer_sim::safekeeper::run_server;
      21              : use crate::walproposer_sim::walproposer_api::SimulationApi;
      22              : 
      23              : /// Simulated safekeeper node.
      24              : pub struct SafekeeperNode {
      25              :     pub node: Arc<Node>,
      26              :     pub id: u32,
      27              :     pub disk: Arc<SafekeeperDisk>,
      28              :     pub thread: Cell<ExternalHandle>,
      29              : }
      30              : 
      31              : impl SafekeeperNode {
      32              :     /// Create and start a safekeeper at the specified Node.
      33         1524 :     pub fn new(node: Arc<Node>) -> Self {
      34         1524 :         let disk = Arc::new(SafekeeperDisk::new());
      35         1524 :         let thread = Cell::new(SafekeeperNode::launch(disk.clone(), node.clone()));
      36              : 
      37         1524 :         Self {
      38         1524 :             id: node.id,
      39         1524 :             node,
      40         1524 :             disk,
      41         1524 :             thread,
      42         1524 :         }
      43         1524 :     }
      44              : 
      45         9760 :     fn launch(disk: Arc<SafekeeperDisk>, node: Arc<Node>) -> ExternalHandle {
      46              :         // start the server thread
      47         9760 :         node.launch(move |os| {
      48         9562 :             run_server(os, disk).expect("server should finish without errors");
      49         9562 :         })
      50         9760 :     }
      51              : 
      52              :     /// Restart the safekeeper.
      53         8236 :     pub fn restart(&self) {
      54         8236 :         let new_thread = SafekeeperNode::launch(self.disk.clone(), self.node.clone());
      55         8236 :         let old_thread = self.thread.replace(new_thread);
      56         8236 :         old_thread.crash_stop();
      57         8236 :     }
      58              : }
      59              : 
      60              : /// Simulated walproposer node.
      61              : pub struct WalProposer {
      62              :     thread: ExternalHandle,
      63              :     node: Arc<Node>,
      64              :     disk: Arc<DiskWalProposer>,
      65              :     sync_safekeepers: bool,
      66              : }
      67              : 
      68              : impl WalProposer {
      69              :     /// Generic start function for both modes.
      70         8855 :     fn start(
      71         8855 :         os: NodeOs,
      72         8855 :         disk: Arc<DiskWalProposer>,
      73         8855 :         ttid: TenantTimelineId,
      74         8855 :         addrs: Vec<String>,
      75         8855 :         lsn: Option<Lsn>,
      76         8855 :     ) {
      77         8855 :         let sync_safekeepers = lsn.is_none();
      78              : 
      79         8855 :         let _enter = if sync_safekeepers {
      80         8520 :             info_span!("sync", started = executor::now()).entered()
      81              :         } else {
      82          335 :             info_span!("walproposer", started = executor::now()).entered()
      83              :         };
      84              : 
      85         8855 :         os.log_event(format!("started;walproposer;{}", sync_safekeepers as i32));
      86              : 
      87         8855 :         let config = Config {
      88         8855 :             ttid,
      89         8855 :             safekeepers_list: addrs,
      90         8855 :             safekeeper_conninfo_options: String::new(),
      91         8855 :             safekeeper_reconnect_timeout: 1000,
      92         8855 :             safekeeper_connection_timeout: 5000,
      93         8855 :             sync_safekeepers,
      94         8855 :         };
      95         8855 :         let args = walproposer_api::Args {
      96         8855 :             os,
      97         8855 :             config: config.clone(),
      98         8855 :             disk,
      99         8855 :             redo_start_lsn: lsn,
     100         8855 :         };
     101         8855 :         let api = SimulationApi::new(args);
     102         8855 :         let wp = Wrapper::new(Box::new(api), config);
     103         8855 :         wp.start();
     104         8855 :     }
     105              : 
     106              :     /// Start walproposer in a sync_safekeepers mode.
     107         8682 :     pub fn launch_sync(ttid: TenantTimelineId, addrs: Vec<String>, node: Arc<Node>) -> Self {
     108         8682 :         debug!("sync_safekeepers started at node {}", node.id);
     109         8682 :         let disk = DiskWalProposer::new();
     110         8682 :         let disk_wp = disk.clone();
     111              : 
     112              :         // start the client thread
     113         8682 :         let handle = node.launch(move |os| {
     114         8520 :             WalProposer::start(os, disk_wp, ttid, addrs, None);
     115         8520 :         });
     116              : 
     117         8682 :         Self {
     118         8682 :             thread: handle,
     119         8682 :             node,
     120         8682 :             disk,
     121         8682 :             sync_safekeepers: true,
     122         8682 :         }
     123         8682 :     }
     124              : 
     125              :     /// Start walproposer in a normal mode.
     126          335 :     pub fn launch_walproposer(
     127          335 :         ttid: TenantTimelineId,
     128          335 :         addrs: Vec<String>,
     129          335 :         node: Arc<Node>,
     130          335 :         lsn: Lsn,
     131          335 :     ) -> Self {
     132          335 :         debug!("walproposer started at node {}", node.id);
     133          335 :         let disk = DiskWalProposer::new();
     134          335 :         disk.lock().reset_to(lsn);
     135          335 :         let disk_wp = disk.clone();
     136              : 
     137              :         // start the client thread
     138          335 :         let handle = node.launch(move |os| {
     139          335 :             WalProposer::start(os, disk_wp, ttid, addrs, Some(lsn));
     140          335 :         });
     141              : 
     142          335 :         Self {
     143          335 :             thread: handle,
     144          335 :             node,
     145          335 :             disk,
     146          335 :             sync_safekeepers: false,
     147          335 :         }
     148          335 :     }
     149              : 
     150          521 :     pub fn write_tx(&mut self, cnt: usize) {
     151          521 :         let start_lsn = self.disk.lock().flush_rec_ptr();
     152              : 
     153        11552 :         for _ in 0..cnt {
     154        11552 :             self.disk
     155        11552 :                 .lock()
     156        11552 :                 .insert_logical_message(c"prefix", b"message");
     157        11552 :         }
     158              : 
     159          521 :         let end_lsn = self.disk.lock().flush_rec_ptr();
     160              : 
     161              :         // log event
     162          521 :         self.node
     163          521 :             .log_event(format!("write_wal;{};{};{}", start_lsn.0, end_lsn.0, cnt));
     164              : 
     165              :         // now we need to set "Latch" in walproposer
     166          521 :         self.node
     167          521 :             .node_events()
     168          521 :             .send(NodeEvent::Internal(AnyMessage::Just32(0)));
     169          521 :     }
     170              : 
     171         8110 :     pub fn stop(&self) {
     172         8110 :         self.thread.crash_stop();
     173         8110 :     }
     174              : }
     175              : 
     176              : /// Holds basic simulation settings, such as network options.
     177              : pub struct TestConfig {
     178              :     pub network: NetworkOptions,
     179              :     pub timeout: u64,
     180              :     pub clock: Option<SimClock>,
     181              : }
     182              : 
     183              : impl TestConfig {
     184              :     /// Create a new TestConfig with default settings.
     185            9 :     pub fn new(clock: Option<SimClock>) -> Self {
     186            9 :         Self {
     187            9 :             network: NetworkOptions {
     188            9 :                 keepalive_timeout: Some(2000),
     189            9 :                 connect_delay: Delay {
     190            9 :                     min: 1,
     191            9 :                     max: 5,
     192            9 :                     fail_prob: 0.0,
     193            9 :                 },
     194            9 :                 send_delay: Delay {
     195            9 :                     min: 1,
     196            9 :                     max: 5,
     197            9 :                     fail_prob: 0.0,
     198            9 :                 },
     199            9 :             },
     200            9 :             timeout: 1_000 * 10,
     201            9 :             clock,
     202            9 :         }
     203            9 :     }
     204              : 
     205              :     /// Start a new simulation with the specified seed.
     206          508 :     pub fn start(&self, seed: u64) -> Test {
     207          508 :         let world = Arc::new(World::new(seed, Arc::new(self.network.clone())));
     208              : 
     209          508 :         if let Some(clock) = &self.clock {
     210          508 :             clock.set_clock(world.clock());
     211          508 :         }
     212              : 
     213          508 :         let servers = [
     214          508 :             SafekeeperNode::new(world.new_node()),
     215          508 :             SafekeeperNode::new(world.new_node()),
     216          508 :             SafekeeperNode::new(world.new_node()),
     217          508 :         ];
     218              : 
     219          508 :         let server_ids = [servers[0].id, servers[1].id, servers[2].id];
     220         1524 :         let safekeepers_addrs = server_ids.map(|id| format!("node:{id}")).to_vec();
     221              : 
     222          508 :         let ttid = TenantTimelineId::generate();
     223              : 
     224          508 :         Test {
     225          508 :             world,
     226          508 :             servers,
     227          508 :             sk_list: safekeepers_addrs,
     228          508 :             ttid,
     229          508 :             timeout: self.timeout,
     230          508 :         }
     231          508 :     }
     232              : }
     233              : 
     234              : /// Holds simulation state.
     235              : pub struct Test {
     236              :     pub world: Arc<World>,
     237              :     pub servers: [SafekeeperNode; 3],
     238              :     pub sk_list: Vec<String>,
     239              :     pub ttid: TenantTimelineId,
     240              :     pub timeout: u64,
     241              : }
     242              : 
     243              : impl Test {
     244              :     /// Start a sync_safekeepers thread and wait for it to finish.
     245            6 :     pub fn sync_safekeepers(&self) -> anyhow::Result<Lsn> {
     246            6 :         let wp = self.launch_sync_safekeepers();
     247              : 
     248              :         // poll until exit or timeout
     249            6 :         let time_limit = self.timeout;
     250          286 :         while self.world.step() && self.world.now() < time_limit && !wp.thread.is_finished() {}
     251              : 
     252            6 :         if !wp.thread.is_finished() {
     253            0 :             anyhow::bail!("timeout or idle stuck");
     254            6 :         }
     255              : 
     256            6 :         let res = wp.thread.result();
     257            6 :         if res.0 != 0 {
     258            0 :             anyhow::bail!("non-zero exitcode: {:?}", res);
     259            6 :         }
     260            6 :         let lsn = Lsn::from_str(&res.1)?;
     261            6 :         Ok(lsn)
     262            6 :     }
     263              : 
     264              :     /// Spawn a new sync_safekeepers thread.
     265         8682 :     pub fn launch_sync_safekeepers(&self) -> WalProposer {
     266         8682 :         WalProposer::launch_sync(self.ttid, self.sk_list.clone(), self.world.new_node())
     267         8682 :     }
     268              : 
     269              :     /// Spawn a new walproposer thread.
     270          335 :     pub fn launch_walproposer(&self, lsn: Lsn) -> WalProposer {
     271          335 :         let lsn = if lsn.0 == 0 {
     272              :             // usual LSN after basebackup
     273          208 :             Lsn(21623024)
     274              :         } else {
     275          127 :             lsn
     276              :         };
     277              : 
     278          335 :         WalProposer::launch_walproposer(self.ttid, self.sk_list.clone(), self.world.new_node(), lsn)
     279          335 :     }
     280              : 
     281              :     /// Execute the simulation for the specified duration.
     282          105 :     pub fn poll_for_duration(&self, duration: u64) {
     283          105 :         let time_limit = std::cmp::min(self.world.now() + duration, self.timeout);
     284         2625 :         while self.world.step() && self.world.now() < time_limit {}
     285          105 :     }
     286              : 
     287              :     /// Execute the simulation together with events defined in some schedule.
     288          504 :     pub fn run_schedule(&self, schedule: &Schedule) -> anyhow::Result<()> {
     289              :         // scheduling empty events so that world will stop in those points
     290              :         {
     291          504 :             let clock = self.world.clock();
     292              : 
     293          504 :             let now = self.world.now();
     294        25015 :             for (time, _) in schedule {
     295        24511 :                 if *time < now {
     296            0 :                     continue;
     297        24511 :                 }
     298        24511 :                 clock.schedule_fake(*time - now);
     299              :             }
     300              :         }
     301              : 
     302          504 :         let mut wp = self.launch_sync_safekeepers();
     303              : 
     304          504 :         let mut skipped_tx = 0;
     305          504 :         let mut started_tx = 0;
     306              : 
     307          504 :         let mut schedule_ptr = 0;
     308              : 
     309              :         loop {
     310        25429 :             if wp.sync_safekeepers && wp.thread.is_finished() {
     311          395 :                 let res = wp.thread.result();
     312          395 :                 if res.0 != 0 {
     313           63 :                     warn!("sync non-zero exitcode: {:?}", res);
     314           63 :                     debug!("restarting sync_safekeepers");
     315              :                     // restart the sync_safekeepers
     316           63 :                     wp = self.launch_sync_safekeepers();
     317           63 :                     continue;
     318          332 :                 }
     319          332 :                 let lsn = Lsn::from_str(&res.1)?;
     320          332 :                 debug!("sync_safekeepers finished at LSN {}", lsn);
     321          332 :                 wp = self.launch_walproposer(lsn);
     322          332 :                 debug!("walproposer started at thread {}", wp.thread.id());
     323        25034 :             }
     324              : 
     325        25366 :             let now = self.world.now();
     326        49877 :             while schedule_ptr < schedule.len() && schedule[schedule_ptr].0 <= now {
     327        24511 :                 if now != schedule[schedule_ptr].0 {
     328            0 :                     warn!("skipped event {:?} at {}", schedule[schedule_ptr], now);
     329        24511 :                 }
     330              : 
     331        24511 :                 let action = &schedule[schedule_ptr].1;
     332        24511 :                 match action {
     333         8167 :                     TestAction::WriteTx(size) => {
     334         8167 :                         if !wp.sync_safekeepers && !wp.thread.is_finished() {
     335          419 :                             started_tx += *size;
     336          419 :                             wp.write_tx(*size);
     337          419 :                             debug!("written {} transactions", size);
     338              :                         } else {
     339         7748 :                             skipped_tx += size;
     340         7748 :                             debug!("skipped {} transactions", size);
     341              :                         }
     342              :                     }
     343         8235 :                     TestAction::RestartSafekeeper(id) => {
     344         8235 :                         debug!("restarting safekeeper {}", id);
     345         8235 :                         self.servers[*id].restart();
     346              :                     }
     347              :                     TestAction::RestartWalProposer => {
     348         8109 :                         debug!("restarting sync_safekeepers");
     349         8109 :                         wp.stop();
     350         8109 :                         wp = self.launch_sync_safekeepers();
     351              :                     }
     352              :                 }
     353        24511 :                 schedule_ptr += 1;
     354              :             }
     355              : 
     356        25366 :             if schedule_ptr == schedule.len() {
     357          504 :                 break;
     358        24862 :             }
     359        24862 :             let next_event_time = schedule[schedule_ptr].0;
     360              : 
     361              :             // poll until the next event
     362        24862 :             if wp.thread.is_finished() {
     363          374 :                 while self.world.step() && self.world.now() < next_event_time {}
     364              :             } else {
     365       385453 :                 while self.world.step()
     366       385453 :                     && self.world.now() < next_event_time
     367       361031 :                     && !wp.thread.is_finished()
     368       360623 :                 {}
     369              :             }
     370              :         }
     371              : 
     372          504 :         debug!(
     373            0 :             "finished schedule, total steps: {}",
     374            0 :             self.world.get_thread_step_count()
     375              :         );
     376          504 :         debug!("skipped_tx: {}", skipped_tx);
     377          504 :         debug!("started_tx: {}", started_tx);
     378              : 
     379          504 :         Ok(())
     380          504 :     }
     381              : }
     382              : 
     383              : #[derive(Debug, Clone)]
     384              : pub enum TestAction {
     385              :     WriteTx(usize),
     386              :     RestartSafekeeper(usize),
     387              :     RestartWalProposer,
     388              : }
     389              : 
     390              : pub type Schedule = Vec<(u64, TestAction)>;
     391              : 
     392          502 : pub fn generate_schedule(seed: u64) -> Schedule {
     393          502 :     let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
     394          502 :     let mut schedule = Vec::new();
     395          502 :     let mut time = 0;
     396              : 
     397          502 :     let cnt = rng.random_range(1..100);
     398              : 
     399          502 :     for _ in 0..cnt {
     400        24396 :         time += rng.random_range(0..500);
     401        24396 :         let action = match rng.random_range(0..3) {
     402         8060 :             0 => TestAction::WriteTx(rng.random_range(1..10)),
     403         8229 :             1 => TestAction::RestartSafekeeper(rng.random_range(0..3)),
     404         8107 :             2 => TestAction::RestartWalProposer,
     405            0 :             _ => unreachable!(),
     406              :         };
     407        24396 :         schedule.push((time, action));
     408              :     }
     409              : 
     410          502 :     schedule
     411          502 : }
     412              : 
     413          502 : pub fn generate_network_opts(seed: u64) -> NetworkOptions {
     414          502 :     let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
     415              : 
     416          502 :     let timeout = rng.random_range(100..2000);
     417          502 :     let max_delay = rng.random_range(1..2 * timeout);
     418          502 :     let min_delay = rng.random_range(1..=max_delay);
     419              : 
     420          502 :     let max_fail_prob = rng.random_range(0.0..0.9);
     421          502 :     let connect_fail_prob = rng.random_range(0.0..max_fail_prob);
     422          502 :     let send_fail_prob = rng.random_range(0.0..connect_fail_prob);
     423              : 
     424          502 :     NetworkOptions {
     425          502 :         keepalive_timeout: Some(timeout),
     426          502 :         connect_delay: Delay {
     427          502 :             min: min_delay,
     428          502 :             max: max_delay,
     429          502 :             fail_prob: connect_fail_prob,
     430          502 :         },
     431          502 :         send_delay: Delay {
     432          502 :             min: min_delay,
     433          502 :             max: max_delay,
     434          502 :             fail_prob: send_fail_prob,
     435          502 :         },
     436          502 :     }
     437          502 : }
         |