Line data Source code
1 : use std::sync::Arc;
2 :
3 : use tracing::{info, warn};
4 : use utils::lsn::Lsn;
5 :
6 : use crate::walproposer_sim::{
7 : log::{init_logger, init_tracing_logger},
8 : simulation::{generate_network_opts, generate_schedule, Schedule, TestAction, TestConfig},
9 : };
10 :
11 : pub mod walproposer_sim;
12 :
13 : // Test that simulation supports restarting (crashing) safekeepers.
14 : #[test]
15 1 : fn crash_safekeeper() {
16 1 : let clock = init_logger();
17 1 : let config = TestConfig::new(Some(clock));
18 1 : let test = config.start(1337);
19 1 :
20 1 : let lsn = test.sync_safekeepers().unwrap();
21 1 : assert_eq!(lsn, Lsn(0));
22 1 : info!("Sucessfully synced empty safekeepers at 0/0");
23 :
24 1 : let mut wp = test.launch_walproposer(lsn);
25 1 :
26 1 : // Write some WAL and crash safekeeper 0 without waiting for replication.
27 1 : test.poll_for_duration(30);
28 1 : wp.write_tx(3);
29 1 : test.servers[0].restart();
30 1 :
31 1 : // Wait some time, so that walproposer can reconnect.
32 1 : test.poll_for_duration(2000);
33 1 : }
34 :
35 : // Test that walproposer can be crashed (stopped).
36 : #[test]
37 1 : fn test_simple_restart() {
38 1 : let clock = init_logger();
39 1 : let config = TestConfig::new(Some(clock));
40 1 : let test = config.start(1337);
41 1 :
42 1 : let lsn = test.sync_safekeepers().unwrap();
43 1 : assert_eq!(lsn, Lsn(0));
44 1 : info!("Sucessfully synced empty safekeepers at 0/0");
45 :
46 1 : let mut wp = test.launch_walproposer(lsn);
47 1 :
48 1 : test.poll_for_duration(30);
49 1 : wp.write_tx(3);
50 1 : test.poll_for_duration(100);
51 1 :
52 1 : wp.stop();
53 1 : drop(wp);
54 1 :
55 1 : let lsn = test.sync_safekeepers().unwrap();
56 1 : info!("Sucessfully synced safekeepers at {}", lsn);
57 1 : }
58 :
59 : // Test runnning a simple schedule, restarting everything a several times.
60 : #[test]
61 1 : fn test_simple_schedule() -> anyhow::Result<()> {
62 1 : let clock = init_logger();
63 1 : let mut config = TestConfig::new(Some(clock));
64 1 : config.network.keepalive_timeout = Some(100);
65 1 : let test = config.start(1337);
66 1 :
67 1 : let schedule: Schedule = vec![
68 1 : (0, TestAction::RestartWalProposer),
69 1 : (50, TestAction::WriteTx(5)),
70 1 : (100, TestAction::RestartSafekeeper(0)),
71 1 : (100, TestAction::WriteTx(5)),
72 1 : (110, TestAction::RestartSafekeeper(1)),
73 1 : (110, TestAction::WriteTx(5)),
74 1 : (120, TestAction::RestartSafekeeper(2)),
75 1 : (120, TestAction::WriteTx(5)),
76 1 : (201, TestAction::RestartWalProposer),
77 1 : (251, TestAction::RestartSafekeeper(0)),
78 1 : (251, TestAction::RestartSafekeeper(1)),
79 1 : (251, TestAction::RestartSafekeeper(2)),
80 1 : (251, TestAction::WriteTx(5)),
81 1 : (255, TestAction::WriteTx(5)),
82 1 : (1000, TestAction::WriteTx(5)),
83 1 : ];
84 1 :
85 1 : test.run_schedule(&schedule)?;
86 1 : info!("Test finished, stopping all threads");
87 1 : test.world.deallocate();
88 1 :
89 1 : Ok(())
90 1 : }
91 :
92 : // Test that simulation can process 10^4 transactions.
93 : #[test]
94 1 : fn test_many_tx() -> anyhow::Result<()> {
95 1 : let clock = init_logger();
96 1 : let config = TestConfig::new(Some(clock));
97 1 : let test = config.start(1337);
98 1 :
99 1 : let mut schedule: Schedule = vec![];
100 101 : for i in 0..100 {
101 100 : schedule.push((i * 10, TestAction::WriteTx(100)));
102 100 : }
103 :
104 1 : test.run_schedule(&schedule)?;
105 1 : info!("Test finished, stopping all threads");
106 1 : test.world.stop_all();
107 1 :
108 1 : let events = test.world.take_events();
109 1 : info!("Events: {:?}", events);
110 1 : let last_commit_lsn = events
111 1 : .iter()
112 201 : .filter_map(|event| {
113 201 : if event.data.starts_with("commit_lsn;") {
114 96 : let lsn: u64 = event.data.split(';').nth(1).unwrap().parse().unwrap();
115 96 : return Some(lsn);
116 105 : }
117 105 : None
118 201 : })
119 1 : .last()
120 1 : .unwrap();
121 1 :
122 1 : let initdb_lsn = 21623024;
123 1 : let diff = last_commit_lsn - initdb_lsn;
124 1 : info!("Last commit lsn: {}, diff: {}", last_commit_lsn, diff);
125 : // each tx is at least 8 bytes, it's written a 100 times for in a loop for 100 times
126 1 : assert!(diff > 100 * 100 * 8);
127 1 : Ok(())
128 1 : }
129 :
130 : // Checks that we don't have nasty circular dependencies, preventing Arc from deallocating.
131 : // This test doesn't really assert anything, you need to run it manually to check if there
132 : // is any issue.
133 : #[test]
134 1 : fn test_res_dealloc() -> anyhow::Result<()> {
135 1 : let clock = init_tracing_logger(true);
136 1 : let mut config = TestConfig::new(Some(clock));
137 1 :
138 1 : let seed = 123456;
139 1 : config.network = generate_network_opts(seed);
140 1 : let test = config.start(seed);
141 1 : warn!("Running test with seed {}", seed);
142 :
143 1 : let schedule = generate_schedule(seed);
144 1 : info!("schedule: {:?}", schedule);
145 1 : test.run_schedule(&schedule).unwrap();
146 1 : test.world.stop_all();
147 1 :
148 1 : let world = test.world.clone();
149 1 : drop(test);
150 1 : info!("world strong count: {}", Arc::strong_count(&world));
151 1 : world.deallocate();
152 1 : info!("world strong count: {}", Arc::strong_count(&world));
153 :
154 1 : Ok(())
155 1 : }
|