Line data Source code
1 : use std::{cell::Cell, str::FromStr, sync::Arc};
2 :
3 : use crate::walproposer_sim::{safekeeper::run_server, walproposer_api::SimulationApi};
4 : use desim::{
5 : executor::{self, ExternalHandle},
6 : node_os::NodeOs,
7 : options::{Delay, NetworkOptions},
8 : proto::{AnyMessage, NodeEvent},
9 : world::Node,
10 : world::World,
11 : };
12 : use rand::{Rng, SeedableRng};
13 : use tracing::{debug, info_span, warn};
14 : use utils::{id::TenantTimelineId, lsn::Lsn};
15 : use walproposer::walproposer::{Config, Wrapper};
16 :
17 : use super::{
18 : log::SimClock, safekeeper_disk::SafekeeperDisk, walproposer_api,
19 : walproposer_disk::DiskWalProposer,
20 : };
21 :
22 : /// Simulated safekeeper node.
23 : pub struct SafekeeperNode {
24 : pub node: Arc<Node>,
25 : pub id: u32,
26 : pub disk: Arc<SafekeeperDisk>,
27 : pub thread: Cell<ExternalHandle>,
28 : }
29 :
30 : impl SafekeeperNode {
31 : /// Create and start a safekeeper at the specified Node.
32 12048 : pub fn new(node: Arc<Node>) -> Self {
33 12048 : let disk = Arc::new(SafekeeperDisk::new());
34 12048 : let thread = Cell::new(SafekeeperNode::launch(disk.clone(), node.clone()));
35 12048 :
36 12048 : Self {
37 12048 : id: node.id,
38 12048 : node,
39 12048 : disk,
40 12048 : thread,
41 12048 : }
42 12048 : }
43 :
44 79394 : fn launch(disk: Arc<SafekeeperDisk>, node: Arc<Node>) -> ExternalHandle {
45 79394 : // start the server thread
46 79394 : node.launch(move |os| {
47 78103 : run_server(os, disk).expect("server should finish without errors");
48 79394 : })
49 79394 : }
50 :
51 : /// Restart the safekeeper.
52 67346 : pub fn restart(&self) {
53 67346 : let new_thread = SafekeeperNode::launch(self.disk.clone(), self.node.clone());
54 67346 : let old_thread = self.thread.replace(new_thread);
55 67346 : old_thread.crash_stop();
56 67346 : }
57 : }
58 :
59 : /// Simulated walproposer node.
60 : pub struct WalProposer {
61 : thread: ExternalHandle,
62 : node: Arc<Node>,
63 : disk: Arc<DiskWalProposer>,
64 : sync_safekeepers: bool,
65 : }
66 :
67 : impl WalProposer {
68 : /// Generic start function for both modes.
69 73354 : fn start(
70 73354 : os: NodeOs,
71 73354 : disk: Arc<DiskWalProposer>,
72 73354 : ttid: TenantTimelineId,
73 73354 : addrs: Vec<String>,
74 73354 : lsn: Option<Lsn>,
75 73354 : ) {
76 73354 : let sync_safekeepers = lsn.is_none();
77 :
78 73354 : let _enter = if sync_safekeepers {
79 70603 : info_span!("sync", started = executor::now()).entered()
80 : } else {
81 2751 : info_span!("walproposer", started = executor::now()).entered()
82 : };
83 :
84 73354 : os.log_event(format!("started;walproposer;{}", sync_safekeepers as i32));
85 73354 :
86 73354 : let config = Config {
87 73354 : ttid,
88 73354 : safekeepers_list: addrs,
89 73354 : safekeeper_reconnect_timeout: 1000,
90 73354 : safekeeper_connection_timeout: 5000,
91 73354 : sync_safekeepers,
92 73354 : };
93 73354 : let args = walproposer_api::Args {
94 73354 : os,
95 73354 : config: config.clone(),
96 73354 : disk,
97 73354 : redo_start_lsn: lsn,
98 73354 : };
99 73354 : let api = SimulationApi::new(args);
100 73354 : let wp = Wrapper::new(Box::new(api), config);
101 73354 : wp.start();
102 73354 : }
103 :
104 : /// Start walproposer in a sync_safekeepers mode.
105 72015 : pub fn launch_sync(ttid: TenantTimelineId, addrs: Vec<String>, node: Arc<Node>) -> Self {
106 72015 : debug!("sync_safekeepers started at node {}", node.id);
107 72015 : let disk = DiskWalProposer::new();
108 72015 : let disk_wp = disk.clone();
109 72015 :
110 72015 : // start the client thread
111 72015 : let handle = node.launch(move |os| {
112 70603 : WalProposer::start(os, disk_wp, ttid, addrs, None);
113 72015 : });
114 72015 :
115 72015 : Self {
116 72015 : thread: handle,
117 72015 : node,
118 72015 : disk,
119 72015 : sync_safekeepers: true,
120 72015 : }
121 72015 : }
122 :
123 : /// Start walproposer in a normal mode.
124 2751 : pub fn launch_walproposer(
125 2751 : ttid: TenantTimelineId,
126 2751 : addrs: Vec<String>,
127 2751 : node: Arc<Node>,
128 2751 : lsn: Lsn,
129 2751 : ) -> Self {
130 2751 : debug!("walproposer started at node {}", node.id);
131 2751 : let disk = DiskWalProposer::new();
132 2751 : disk.lock().reset_to(lsn);
133 2751 : let disk_wp = disk.clone();
134 2751 :
135 2751 : // start the client thread
136 2751 : let handle = node.launch(move |os| {
137 2751 : WalProposer::start(os, disk_wp, ttid, addrs, Some(lsn));
138 2751 : });
139 2751 :
140 2751 : Self {
141 2751 : thread: handle,
142 2751 : node,
143 2751 : disk,
144 2751 : sync_safekeepers: false,
145 2751 : }
146 2751 : }
147 :
148 3029 : pub fn write_tx(&mut self, cnt: usize) {
149 3029 : let start_lsn = self.disk.lock().flush_rec_ptr();
150 3029 :
151 32806 : for _ in 0..cnt {
152 32806 : self.disk
153 32806 : .lock()
154 32806 : .insert_logical_message("prefix", b"message")
155 32806 : .expect("failed to generate logical message");
156 32806 : }
157 :
158 3029 : let end_lsn = self.disk.lock().flush_rec_ptr();
159 3029 :
160 3029 : // log event
161 3029 : self.node
162 3029 : .log_event(format!("write_wal;{};{};{}", start_lsn.0, end_lsn.0, cnt));
163 3029 :
164 3029 : // now we need to set "Latch" in walproposer
165 3029 : self.node
166 3029 : .node_events()
167 3029 : .send(NodeEvent::Internal(AnyMessage::Just32(0)));
168 3029 : }
169 :
170 67589 : pub fn stop(&self) {
171 67589 : self.thread.crash_stop();
172 67589 : }
173 : }
174 :
175 : /// Holds basic simulation settings, such as network options.
176 : pub struct TestConfig {
177 : pub network: NetworkOptions,
178 : pub timeout: u64,
179 : pub clock: Option<SimClock>,
180 : }
181 :
182 : impl TestConfig {
183 : /// Create a new TestConfig with default settings.
184 18 : pub fn new(clock: Option<SimClock>) -> Self {
185 18 : Self {
186 18 : network: NetworkOptions {
187 18 : keepalive_timeout: Some(2000),
188 18 : connect_delay: Delay {
189 18 : min: 1,
190 18 : max: 5,
191 18 : fail_prob: 0.0,
192 18 : },
193 18 : send_delay: Delay {
194 18 : min: 1,
195 18 : max: 5,
196 18 : fail_prob: 0.0,
197 18 : },
198 18 : },
199 18 : timeout: 1_000 * 10,
200 18 : clock,
201 18 : }
202 18 : }
203 :
204 : /// Start a new simulation with the specified seed.
205 4016 : pub fn start(&self, seed: u64) -> Test {
206 4016 : let world = Arc::new(World::new(seed, Arc::new(self.network.clone())));
207 :
208 4016 : if let Some(clock) = &self.clock {
209 4016 : clock.set_clock(world.clock());
210 4016 : }
211 :
212 4016 : let servers = [
213 4016 : SafekeeperNode::new(world.new_node()),
214 4016 : SafekeeperNode::new(world.new_node()),
215 4016 : SafekeeperNode::new(world.new_node()),
216 4016 : ];
217 4016 :
218 4016 : let server_ids = [servers[0].id, servers[1].id, servers[2].id];
219 12048 : let safekeepers_addrs = server_ids.map(|id| format!("node:{}", id)).to_vec();
220 4016 :
221 4016 : let ttid = TenantTimelineId::generate();
222 4016 :
223 4016 : Test {
224 4016 : world,
225 4016 : servers,
226 4016 : sk_list: safekeepers_addrs,
227 4016 : ttid,
228 4016 : timeout: self.timeout,
229 4016 : }
230 4016 : }
231 : }
232 :
233 : /// Holds simulation state.
234 : pub struct Test {
235 : pub world: Arc<World>,
236 : pub servers: [SafekeeperNode; 3],
237 : pub sk_list: Vec<String>,
238 : pub ttid: TenantTimelineId,
239 : pub timeout: u64,
240 : }
241 :
242 : impl Test {
243 : /// Start a sync_safekeepers thread and wait for it to finish.
244 12 : pub fn sync_safekeepers(&self) -> anyhow::Result<Lsn> {
245 12 : let wp = self.launch_sync_safekeepers();
246 12 :
247 12 : // poll until exit or timeout
248 12 : let time_limit = self.timeout;
249 460 : while self.world.step() && self.world.now() < time_limit && !wp.thread.is_finished() {}
250 :
251 12 : if !wp.thread.is_finished() {
252 0 : anyhow::bail!("timeout or idle stuck");
253 12 : }
254 12 :
255 12 : let res = wp.thread.result();
256 12 : if res.0 != 0 {
257 0 : anyhow::bail!("non-zero exitcode: {:?}", res);
258 12 : }
259 12 : let lsn = Lsn::from_str(&res.1)?;
260 12 : Ok(lsn)
261 12 : }
262 :
263 : /// Spawn a new sync_safekeepers thread.
264 72015 : pub fn launch_sync_safekeepers(&self) -> WalProposer {
265 72015 : WalProposer::launch_sync(self.ttid, self.sk_list.clone(), self.world.new_node())
266 72015 : }
267 :
268 : /// Spawn a new walproposer thread.
269 2751 : pub fn launch_walproposer(&self, lsn: Lsn) -> WalProposer {
270 2751 : let lsn = if lsn.0 == 0 {
271 : // usual LSN after basebackup
272 1483 : Lsn(21623024)
273 : } else {
274 1268 : lsn
275 : };
276 :
277 2751 : WalProposer::launch_walproposer(self.ttid, self.sk_list.clone(), self.world.new_node(), lsn)
278 2751 : }
279 :
280 : /// Execute the simulation for the specified duration.
281 210 : pub fn poll_for_duration(&self, duration: u64) {
282 210 : let time_limit = std::cmp::min(self.world.now() + duration, self.timeout);
283 3552 : while self.world.step() && self.world.now() < time_limit {}
284 210 : }
285 :
286 : /// Execute the simulation together with events defined in some schedule.
287 4008 : pub fn run_schedule(&self, schedule: &Schedule) -> anyhow::Result<()> {
288 4008 : // scheduling empty events so that world will stop in those points
289 4008 : {
290 4008 : let clock = self.world.clock();
291 4008 :
292 4008 : let now = self.world.now();
293 205913 : for (time, _) in schedule {
294 201905 : if *time < now {
295 0 : continue;
296 201905 : }
297 201905 : clock.schedule_fake(*time - now);
298 : }
299 : }
300 :
301 4008 : let mut wp = self.launch_sync_safekeepers();
302 4008 :
303 4008 : let mut skipped_tx = 0;
304 4008 : let mut started_tx = 0;
305 4008 :
306 4008 : let mut schedule_ptr = 0;
307 :
308 209160 : loop {
309 209160 : if wp.sync_safekeepers && wp.thread.is_finished() {
310 3153 : let res = wp.thread.result();
311 3153 : if res.0 != 0 {
312 408 : warn!("sync non-zero exitcode: {:?}", res);
313 408 : debug!("restarting sync_safekeepers");
314 : // restart the sync_safekeepers
315 408 : wp = self.launch_sync_safekeepers();
316 408 : continue;
317 2745 : }
318 2745 : let lsn = Lsn::from_str(&res.1)?;
319 2745 : debug!("sync_safekeepers finished at LSN {}", lsn);
320 2745 : wp = self.launch_walproposer(lsn);
321 2745 : debug!("walproposer started at thread {}", wp.thread.id());
322 206007 : }
323 :
324 208752 : let now = self.world.now();
325 410657 : while schedule_ptr < schedule.len() && schedule[schedule_ptr].0 <= now {
326 201905 : if now != schedule[schedule_ptr].0 {
327 0 : warn!("skipped event {:?} at {}", schedule[schedule_ptr], now);
328 201905 : }
329 :
330 201905 : let action = &schedule[schedule_ptr].1;
331 201905 : match action {
332 66974 : TestAction::WriteTx(size) => {
333 66974 : if !wp.sync_safekeepers && !wp.thread.is_finished() {
334 2825 : started_tx += *size;
335 2825 : wp.write_tx(*size);
336 2825 : debug!("written {} transactions", size);
337 : } else {
338 64149 : skipped_tx += size;
339 64149 : debug!("skipped {} transactions", size);
340 : }
341 : }
342 67344 : TestAction::RestartSafekeeper(id) => {
343 67344 : debug!("restarting safekeeper {}", id);
344 67344 : self.servers[*id].restart();
345 : }
346 : TestAction::RestartWalProposer => {
347 67587 : debug!("restarting sync_safekeepers");
348 67587 : wp.stop();
349 67587 : wp = self.launch_sync_safekeepers();
350 : }
351 : }
352 201905 : schedule_ptr += 1;
353 : }
354 :
355 208752 : if schedule_ptr == schedule.len() {
356 4008 : break;
357 204744 : }
358 204744 : let next_event_time = schedule[schedule_ptr].0;
359 204744 :
360 204744 : // poll until the next event
361 204744 : if wp.thread.is_finished() {
362 3527 : while self.world.step() && self.world.now() < next_event_time {}
363 : } else {
364 3175218 : while self.world.step()
365 3175218 : && self.world.now() < next_event_time
366 2973951 : && !wp.thread.is_finished()
367 2970709 : {}
368 : }
369 : }
370 :
371 4008 : debug!(
372 4 : "finished schedule, total steps: {}",
373 4 : self.world.get_thread_step_count()
374 4 : );
375 4008 : debug!("skipped_tx: {}", skipped_tx);
376 4008 : debug!("started_tx: {}", started_tx);
377 :
378 4008 : Ok(())
379 4008 : }
380 : }
381 :
382 240 : #[derive(Debug, Clone)]
383 : pub enum TestAction {
384 : WriteTx(usize),
385 : RestartSafekeeper(usize),
386 : RestartWalProposer,
387 : }
388 :
389 : pub type Schedule = Vec<(u64, TestAction)>;
390 :
391 4004 : pub fn generate_schedule(seed: u64) -> Schedule {
392 4004 : let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
393 4004 : let mut schedule = Vec::new();
394 4004 : let mut time = 0;
395 4004 :
396 4004 : let cnt = rng.gen_range(1..100);
397 4004 :
398 4004 : for _ in 0..cnt {
399 201675 : time += rng.gen_range(0..500);
400 201675 : let action = match rng.gen_range(0..3) {
401 66760 : 0 => TestAction::WriteTx(rng.gen_range(1..10)),
402 67332 : 1 => TestAction::RestartSafekeeper(rng.gen_range(0..3)),
403 67583 : 2 => TestAction::RestartWalProposer,
404 0 : _ => unreachable!(),
405 : };
406 201675 : schedule.push((time, action));
407 : }
408 :
409 4004 : schedule
410 4004 : }
411 :
412 4004 : pub fn generate_network_opts(seed: u64) -> NetworkOptions {
413 4004 : let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
414 4004 :
415 4004 : let timeout = rng.gen_range(100..2000);
416 4004 : let max_delay = rng.gen_range(1..2 * timeout);
417 4004 : let min_delay = rng.gen_range(1..=max_delay);
418 4004 :
419 4004 : let max_fail_prob = rng.gen_range(0.0..0.9);
420 4004 : let connect_fail_prob = rng.gen_range(0.0..max_fail_prob);
421 4004 : let send_fail_prob = rng.gen_range(0.0..connect_fail_prob);
422 4004 :
423 4004 : NetworkOptions {
424 4004 : keepalive_timeout: Some(timeout),
425 4004 : connect_delay: Delay {
426 4004 : min: min_delay,
427 4004 : max: max_delay,
428 4004 : fail_prob: connect_fail_prob,
429 4004 : },
430 4004 : send_delay: Delay {
431 4004 : min: min_delay,
432 4004 : max: max_delay,
433 4004 : fail_prob: send_fail_prob,
434 4004 : },
435 4004 : }
436 4004 : }
|