Line data Source code
1 : use std::sync::Arc;
2 :
3 : use tracing::{info, warn};
4 : use utils::lsn::Lsn;
5 :
6 : use crate::walproposer_sim::{
7 : log::{init_logger, init_tracing_logger},
8 : simulation::{generate_network_opts, generate_schedule, Schedule, TestAction, TestConfig},
9 : };
10 :
11 : pub mod walproposer_sim;
12 :
13 : // Test that simulation supports restarting (crashing) safekeepers.
14 : #[test]
15 6 : fn crash_safekeeper() {
16 6 : let clock = init_logger();
17 6 : let config = TestConfig::new(Some(clock));
18 6 : let test = config.start(1337);
19 6 :
20 6 : let lsn = test.sync_safekeepers().unwrap();
21 6 : assert_eq!(lsn, Lsn(0));
22 6 : info!("Sucessfully synced empty safekeepers at 0/0");
23 :
24 6 : let mut wp = test.launch_walproposer(lsn);
25 6 :
26 6 : // Write some WAL and crash safekeeper 0 without waiting for replication.
27 6 : test.poll_for_duration(30);
28 6 : wp.write_tx(3);
29 6 : test.servers[0].restart();
30 6 :
31 6 : // Wait some time, so that walproposer can reconnect.
32 6 : test.poll_for_duration(2000);
33 6 : }
34 :
35 : // Test that walproposer can be crashed (stopped).
36 : #[test]
37 6 : fn test_simple_restart() {
38 6 : let clock = init_logger();
39 6 : let config = TestConfig::new(Some(clock));
40 6 : let test = config.start(1337);
41 6 :
42 6 : let lsn = test.sync_safekeepers().unwrap();
43 6 : assert_eq!(lsn, Lsn(0));
44 6 : info!("Sucessfully synced empty safekeepers at 0/0");
45 :
46 6 : let mut wp = test.launch_walproposer(lsn);
47 6 :
48 6 : test.poll_for_duration(30);
49 6 : wp.write_tx(3);
50 6 : test.poll_for_duration(100);
51 6 :
52 6 : wp.stop();
53 6 : drop(wp);
54 6 :
55 6 : let lsn = test.sync_safekeepers().unwrap();
56 6 : info!("Sucessfully synced safekeepers at {}", lsn);
57 6 : }
58 :
59 : // Test runnning a simple schedule, restarting everything a several times.
60 : #[test]
61 6 : fn test_simple_schedule() -> anyhow::Result<()> {
62 6 : let clock = init_logger();
63 6 : let mut config = TestConfig::new(Some(clock));
64 6 : config.network.keepalive_timeout = Some(100);
65 6 : let test = config.start(1337);
66 6 :
67 6 : let schedule: Schedule = vec![
68 6 : (0, TestAction::RestartWalProposer),
69 6 : (50, TestAction::WriteTx(5)),
70 6 : (100, TestAction::RestartSafekeeper(0)),
71 6 : (100, TestAction::WriteTx(5)),
72 6 : (110, TestAction::RestartSafekeeper(1)),
73 6 : (110, TestAction::WriteTx(5)),
74 6 : (120, TestAction::RestartSafekeeper(2)),
75 6 : (120, TestAction::WriteTx(5)),
76 6 : (201, TestAction::RestartWalProposer),
77 6 : (251, TestAction::RestartSafekeeper(0)),
78 6 : (251, TestAction::RestartSafekeeper(1)),
79 6 : (251, TestAction::RestartSafekeeper(2)),
80 6 : (251, TestAction::WriteTx(5)),
81 6 : (255, TestAction::WriteTx(5)),
82 6 : (1000, TestAction::WriteTx(5)),
83 6 : ];
84 6 :
85 6 : test.run_schedule(&schedule)?;
86 6 : info!("Test finished, stopping all threads");
87 6 : test.world.deallocate();
88 6 :
89 6 : Ok(())
90 6 : }
91 :
92 : // Test that simulation can process 10^4 transactions.
93 : #[test]
94 6 : fn test_many_tx() -> anyhow::Result<()> {
95 6 : let clock = init_logger();
96 6 : let config = TestConfig::new(Some(clock));
97 6 : let test = config.start(1337);
98 6 :
99 6 : let mut schedule: Schedule = vec![];
100 606 : for i in 0..100 {
101 600 : schedule.push((i * 10, TestAction::WriteTx(100)));
102 600 : }
103 :
104 6 : test.run_schedule(&schedule)?;
105 6 : info!("Test finished, stopping all threads");
106 6 : test.world.stop_all();
107 6 :
108 6 : let events = test.world.take_events();
109 6 : info!("Events: {:?}", events);
110 6 : let last_commit_lsn = events
111 6 : .iter()
112 1206 : .filter_map(|event| {
113 1206 : if event.data.starts_with("commit_lsn;") {
114 576 : let lsn: u64 = event.data.split(';').nth(1).unwrap().parse().unwrap();
115 576 : return Some(lsn);
116 630 : }
117 630 : None
118 1206 : })
119 6 : .last()
120 6 : .unwrap();
121 6 :
122 6 : let initdb_lsn = 21623024;
123 6 : let diff = last_commit_lsn - initdb_lsn;
124 6 : info!("Last commit lsn: {}, diff: {}", last_commit_lsn, diff);
125 : // each tx is at least 8 bytes, it's written a 100 times for in a loop for 100 times
126 6 : assert!(diff > 100 * 100 * 8);
127 6 : Ok(())
128 6 : }
129 :
130 : // Checks that we don't have nasty circular dependencies, preventing Arc from deallocating.
131 : // This test doesn't really assert anything, you need to run it manually to check if there
132 : // is any issue.
133 : #[test]
134 6 : fn test_res_dealloc() -> anyhow::Result<()> {
135 6 : let clock = init_tracing_logger(true);
136 6 : let mut config = TestConfig::new(Some(clock));
137 6 :
138 6 : let seed = 123456;
139 6 : config.network = generate_network_opts(seed);
140 6 : let test = config.start(seed);
141 6 : warn!("Running test with seed {}", seed);
142 :
143 6 : let schedule = generate_schedule(seed);
144 6 : info!("schedule: {:?}", schedule);
145 6 : test.run_schedule(&schedule).unwrap();
146 6 : test.world.stop_all();
147 6 :
148 6 : let world = test.world.clone();
149 6 : drop(test);
150 6 : info!("world strong count: {}", Arc::strong_count(&world));
151 6 : world.deallocate();
152 6 : info!("world strong count: {}", Arc::strong_count(&world));
153 :
154 6 : Ok(())
155 6 : }
|