|             Line data    Source code 
       1              : use std::sync::Arc;
       2              : 
       3              : use tracing::{info, warn};
       4              : use utils::lsn::Lsn;
       5              : 
       6              : use crate::walproposer_sim::{
       7              :     log::{init_logger, init_tracing_logger},
       8              :     simulation::{generate_network_opts, generate_schedule, Schedule, TestAction, TestConfig},
       9              : };
      10              : 
      11              : pub mod walproposer_sim;
      12              : 
      13              : // Test that simulation supports restarting (crashing) safekeepers.
      14              : #[test]
      15            2 : fn crash_safekeeper() {
      16            2 :     let clock = init_logger();
      17            2 :     let config = TestConfig::new(Some(clock));
      18            2 :     let test = config.start(1337);
      19            2 : 
      20            2 :     let lsn = test.sync_safekeepers().unwrap();
      21            2 :     assert_eq!(lsn, Lsn(0));
      22            2 :     info!("Sucessfully synced empty safekeepers at 0/0");
      23              : 
      24            2 :     let mut wp = test.launch_walproposer(lsn);
      25            2 : 
      26            2 :     // Write some WAL and crash safekeeper 0 without waiting for replication.
      27            2 :     test.poll_for_duration(30);
      28            2 :     wp.write_tx(3);
      29            2 :     test.servers[0].restart();
      30            2 : 
      31            2 :     // Wait some time, so that walproposer can reconnect.
      32            2 :     test.poll_for_duration(2000);
      33            2 : }
      34              : 
      35              : // Test that walproposer can be crashed (stopped).
      36              : #[test]
      37            2 : fn test_simple_restart() {
      38            2 :     let clock = init_logger();
      39            2 :     let config = TestConfig::new(Some(clock));
      40            2 :     let test = config.start(1337);
      41            2 : 
      42            2 :     let lsn = test.sync_safekeepers().unwrap();
      43            2 :     assert_eq!(lsn, Lsn(0));
      44            2 :     info!("Sucessfully synced empty safekeepers at 0/0");
      45              : 
      46            2 :     let mut wp = test.launch_walproposer(lsn);
      47            2 : 
      48            2 :     test.poll_for_duration(30);
      49            2 :     wp.write_tx(3);
      50            2 :     test.poll_for_duration(100);
      51            2 : 
      52            2 :     wp.stop();
      53            2 :     drop(wp);
      54            2 : 
      55            2 :     let lsn = test.sync_safekeepers().unwrap();
      56            2 :     info!("Sucessfully synced safekeepers at {}", lsn);
      57            2 : }
      58              : 
      59              : // Test runnning a simple schedule, restarting everything a several times.
      60              : #[test]
      61            2 : fn test_simple_schedule() -> anyhow::Result<()> {
      62            2 :     let clock = init_logger();
      63            2 :     let mut config = TestConfig::new(Some(clock));
      64            2 :     config.network.keepalive_timeout = Some(100);
      65            2 :     let test = config.start(1337);
      66            2 : 
      67            2 :     let schedule: Schedule = vec![
      68            2 :         (0, TestAction::RestartWalProposer),
      69            2 :         (50, TestAction::WriteTx(5)),
      70            2 :         (100, TestAction::RestartSafekeeper(0)),
      71            2 :         (100, TestAction::WriteTx(5)),
      72            2 :         (110, TestAction::RestartSafekeeper(1)),
      73            2 :         (110, TestAction::WriteTx(5)),
      74            2 :         (120, TestAction::RestartSafekeeper(2)),
      75            2 :         (120, TestAction::WriteTx(5)),
      76            2 :         (201, TestAction::RestartWalProposer),
      77            2 :         (251, TestAction::RestartSafekeeper(0)),
      78            2 :         (251, TestAction::RestartSafekeeper(1)),
      79            2 :         (251, TestAction::RestartSafekeeper(2)),
      80            2 :         (251, TestAction::WriteTx(5)),
      81            2 :         (255, TestAction::WriteTx(5)),
      82            2 :         (1000, TestAction::WriteTx(5)),
      83            2 :     ];
      84            2 : 
      85            2 :     test.run_schedule(&schedule)?;
      86            2 :     info!("Test finished, stopping all threads");
      87            2 :     test.world.deallocate();
      88            2 : 
      89            2 :     Ok(())
      90            2 : }
      91              : 
      92              : // Test that simulation can process 10^4 transactions.
      93              : #[test]
      94            2 : fn test_many_tx() -> anyhow::Result<()> {
      95            2 :     let clock = init_logger();
      96            2 :     let config = TestConfig::new(Some(clock));
      97            2 :     let test = config.start(1337);
      98            2 : 
      99            2 :     let mut schedule: Schedule = vec![];
     100          202 :     for i in 0..100 {
     101          200 :         schedule.push((i * 10, TestAction::WriteTx(100)));
     102          200 :     }
     103              : 
     104            2 :     test.run_schedule(&schedule)?;
     105            2 :     info!("Test finished, stopping all threads");
     106            2 :     test.world.stop_all();
     107            2 : 
     108            2 :     let events = test.world.take_events();
     109            2 :     info!("Events: {:?}", events);
     110            2 :     let last_commit_lsn = events
     111            2 :         .iter()
     112          402 :         .filter_map(|event| {
     113          402 :             if event.data.starts_with("commit_lsn;") {
     114          192 :                 let lsn: u64 = event.data.split(';').nth(1).unwrap().parse().unwrap();
     115          192 :                 return Some(lsn);
     116          210 :             }
     117          210 :             None
     118          402 :         })
     119            2 :         .last()
     120            2 :         .unwrap();
     121            2 : 
     122            2 :     let initdb_lsn = 21623024;
     123            2 :     let diff = last_commit_lsn - initdb_lsn;
     124            2 :     info!("Last commit lsn: {}, diff: {}", last_commit_lsn, diff);
     125              :     // each tx is at least 8 bytes, it's written a 100 times for in a loop for 100 times
     126            2 :     assert!(diff > 100 * 100 * 8);
     127            2 :     Ok(())
     128            2 : }
     129              : 
     130              : // Checks that we don't have nasty circular dependencies, preventing Arc from deallocating.
     131              : // This test doesn't really assert anything, you need to run it manually to check if there
     132              : // is any issue.
     133              : #[test]
     134            2 : fn test_res_dealloc() -> anyhow::Result<()> {
     135            2 :     let clock = init_tracing_logger(true);
     136            2 :     let mut config = TestConfig::new(Some(clock));
     137            2 : 
     138            2 :     let seed = 123456;
     139            2 :     config.network = generate_network_opts(seed);
     140            2 :     let test = config.start(seed);
     141            2 :     warn!("Running test with seed {}", seed);
     142              : 
     143            2 :     let schedule = generate_schedule(seed);
     144            2 :     info!("schedule: {:?}", schedule);
     145            2 :     test.run_schedule(&schedule).unwrap();
     146            2 :     test.world.stop_all();
     147            2 : 
     148            2 :     let world = test.world.clone();
     149            2 :     drop(test);
     150            2 :     info!("world strong count: {}", Arc::strong_count(&world));
     151            2 :     world.deallocate();
     152            2 :     info!("world strong count: {}", Arc::strong_count(&world));
     153              : 
     154            2 :     Ok(())
     155            2 : }
         |