Line data Source code
1 : use std::sync::Arc;
2 :
3 : use tracing::{info, warn};
4 : use utils::lsn::Lsn;
5 :
6 : use crate::walproposer_sim::{
7 : log::{init_logger, init_tracing_logger},
8 : simulation::{generate_network_opts, generate_schedule, Schedule, TestAction, TestConfig},
9 : };
10 :
11 : pub mod walproposer_sim;
12 :
13 : // Test that simulation supports restarting (crashing) safekeepers.
14 2 : #[test]
15 2 : fn crash_safekeeper() {
16 2 : let clock = init_logger();
17 2 : let config = TestConfig::new(Some(clock));
18 2 : let test = config.start(1337);
19 2 :
20 2 : let lsn = test.sync_safekeepers().unwrap();
21 2 : assert_eq!(lsn, Lsn(0));
22 2 : info!("Sucessfully synced empty safekeepers at 0/0");
23 :
24 2 : let mut wp = test.launch_walproposer(lsn);
25 2 :
26 2 : // Write some WAL and crash safekeeper 0 without waiting for replication.
27 2 : test.poll_for_duration(30);
28 2 : wp.write_tx(3);
29 2 : test.servers[0].restart();
30 2 :
31 2 : // Wait some time, so that walproposer can reconnect.
32 2 : test.poll_for_duration(2000);
33 2 : }
34 :
35 : // Test that walproposer can be crashed (stopped).
36 2 : #[test]
37 2 : fn test_simple_restart() {
38 2 : let clock = init_logger();
39 2 : let config = TestConfig::new(Some(clock));
40 2 : let test = config.start(1337);
41 2 :
42 2 : let lsn = test.sync_safekeepers().unwrap();
43 2 : assert_eq!(lsn, Lsn(0));
44 2 : info!("Sucessfully synced empty safekeepers at 0/0");
45 :
46 2 : let mut wp = test.launch_walproposer(lsn);
47 2 :
48 2 : test.poll_for_duration(30);
49 2 : wp.write_tx(3);
50 2 : test.poll_for_duration(100);
51 2 :
52 2 : wp.stop();
53 2 : drop(wp);
54 2 :
55 2 : let lsn = test.sync_safekeepers().unwrap();
56 2 : info!("Sucessfully synced safekeepers at {}", lsn);
57 2 : }
58 :
59 : // Test runnning a simple schedule, restarting everything a several times.
60 2 : #[test]
61 2 : fn test_simple_schedule() -> anyhow::Result<()> {
62 2 : let clock = init_logger();
63 2 : let mut config = TestConfig::new(Some(clock));
64 2 : config.network.keepalive_timeout = Some(100);
65 2 : let test = config.start(1337);
66 2 :
67 2 : let schedule: Schedule = vec![
68 2 : (0, TestAction::RestartWalProposer),
69 2 : (50, TestAction::WriteTx(5)),
70 2 : (100, TestAction::RestartSafekeeper(0)),
71 2 : (100, TestAction::WriteTx(5)),
72 2 : (110, TestAction::RestartSafekeeper(1)),
73 2 : (110, TestAction::WriteTx(5)),
74 2 : (120, TestAction::RestartSafekeeper(2)),
75 2 : (120, TestAction::WriteTx(5)),
76 2 : (201, TestAction::RestartWalProposer),
77 2 : (251, TestAction::RestartSafekeeper(0)),
78 2 : (251, TestAction::RestartSafekeeper(1)),
79 2 : (251, TestAction::RestartSafekeeper(2)),
80 2 : (251, TestAction::WriteTx(5)),
81 2 : (255, TestAction::WriteTx(5)),
82 2 : (1000, TestAction::WriteTx(5)),
83 2 : ];
84 2 :
85 2 : test.run_schedule(&schedule)?;
86 2 : info!("Test finished, stopping all threads");
87 2 : test.world.deallocate();
88 2 :
89 2 : Ok(())
90 2 : }
91 :
92 : // Test that simulation can process 10^4 transactions.
93 2 : #[test]
94 2 : fn test_many_tx() -> anyhow::Result<()> {
95 2 : let clock = init_logger();
96 2 : let config = TestConfig::new(Some(clock));
97 2 : let test = config.start(1337);
98 2 :
99 2 : let mut schedule: Schedule = vec![];
100 202 : for i in 0..100 {
101 200 : schedule.push((i * 10, TestAction::WriteTx(100)));
102 200 : }
103 :
104 2 : test.run_schedule(&schedule)?;
105 2 : info!("Test finished, stopping all threads");
106 2 : test.world.stop_all();
107 2 :
108 2 : let events = test.world.take_events();
109 2 : info!("Events: {:?}", events);
110 2 : let last_commit_lsn = events
111 2 : .iter()
112 402 : .filter_map(|event| {
113 402 : if event.data.starts_with("commit_lsn;") {
114 192 : let lsn: u64 = event.data.split(';').nth(1).unwrap().parse().unwrap();
115 192 : return Some(lsn);
116 210 : }
117 210 : None
118 402 : })
119 2 : .last()
120 2 : .unwrap();
121 2 :
122 2 : let initdb_lsn = 21623024;
123 2 : let diff = last_commit_lsn - initdb_lsn;
124 2 : info!("Last commit lsn: {}, diff: {}", last_commit_lsn, diff);
125 : // each tx is at least 8 bytes, it's written a 100 times for in a loop for 100 times
126 2 : assert!(diff > 100 * 100 * 8);
127 2 : Ok(())
128 2 : }
129 :
130 : // Checks that we don't have nasty circular dependencies, preventing Arc from deallocating.
131 : // This test doesn't really assert anything, you need to run it manually to check if there
132 : // is any issue.
133 2 : #[test]
134 2 : fn test_res_dealloc() -> anyhow::Result<()> {
135 2 : let clock = init_tracing_logger(true);
136 2 : let mut config = TestConfig::new(Some(clock));
137 2 :
138 2 : let seed = 123456;
139 2 : config.network = generate_network_opts(seed);
140 2 : let test = config.start(seed);
141 2 : warn!("Running test with seed {}", seed);
142 :
143 2 : let schedule = generate_schedule(seed);
144 2 : info!("schedule: {:?}", schedule);
145 2 : test.run_schedule(&schedule).unwrap();
146 2 : test.world.stop_all();
147 2 :
148 2 : let world = test.world.clone();
149 2 : drop(test);
150 2 : info!("world strong count: {}", Arc::strong_count(&world));
151 2 : world.deallocate();
152 2 : info!("world strong count: {}", Arc::strong_count(&world));
153 :
154 2 : Ok(())
155 2 : }
|