Line data Source code
1 : use std::cell::Cell;
2 : use std::str::FromStr;
3 : use std::sync::Arc;
4 :
5 : use desim::executor::{self, ExternalHandle};
6 : use desim::node_os::NodeOs;
7 : use desim::options::{Delay, NetworkOptions};
8 : use desim::proto::{AnyMessage, NodeEvent};
9 : use desim::world::{Node, World};
10 : use rand::{Rng, SeedableRng};
11 : use tracing::{debug, info_span, warn};
12 : use utils::id::TenantTimelineId;
13 : use utils::lsn::Lsn;
14 : use walproposer::walproposer::{Config, Wrapper};
15 :
16 : use super::log::SimClock;
17 : use super::safekeeper_disk::SafekeeperDisk;
18 : use super::walproposer_api;
19 : use super::walproposer_disk::DiskWalProposer;
20 : use crate::walproposer_sim::safekeeper::run_server;
21 : use crate::walproposer_sim::walproposer_api::SimulationApi;
22 :
23 : /// Simulated safekeeper node.
24 : pub struct SafekeeperNode {
25 : pub node: Arc<Node>,
26 : pub id: u32,
27 : pub disk: Arc<SafekeeperDisk>,
28 : pub thread: Cell<ExternalHandle>,
29 : }
30 :
31 : impl SafekeeperNode {
32 : /// Create and start a safekeeper at the specified Node.
33 1524 : pub fn new(node: Arc<Node>) -> Self {
34 1524 : let disk = Arc::new(SafekeeperDisk::new());
35 1524 : let thread = Cell::new(SafekeeperNode::launch(disk.clone(), node.clone()));
36 :
37 1524 : Self {
38 1524 : id: node.id,
39 1524 : node,
40 1524 : disk,
41 1524 : thread,
42 1524 : }
43 1524 : }
44 :
45 10264 : fn launch(disk: Arc<SafekeeperDisk>, node: Arc<Node>) -> ExternalHandle {
46 : // start the server thread
47 10264 : node.launch(move |os| {
48 10095 : run_server(os, disk).expect("server should finish without errors");
49 10095 : })
50 10264 : }
51 :
52 : /// Restart the safekeeper.
53 8740 : pub fn restart(&self) {
54 8740 : let new_thread = SafekeeperNode::launch(self.disk.clone(), self.node.clone());
55 8740 : let old_thread = self.thread.replace(new_thread);
56 8740 : old_thread.crash_stop();
57 8740 : }
58 : }
59 :
60 : /// Simulated walproposer node.
61 : pub struct WalProposer {
62 : thread: ExternalHandle,
63 : node: Arc<Node>,
64 : disk: Arc<DiskWalProposer>,
65 : sync_safekeepers: bool,
66 : }
67 :
68 : impl WalProposer {
69 : /// Generic start function for both modes.
70 9426 : fn start(
71 9426 : os: NodeOs,
72 9426 : disk: Arc<DiskWalProposer>,
73 9426 : ttid: TenantTimelineId,
74 9426 : addrs: Vec<String>,
75 9426 : lsn: Option<Lsn>,
76 9426 : ) {
77 9426 : let sync_safekeepers = lsn.is_none();
78 :
79 9426 : let _enter = if sync_safekeepers {
80 9128 : info_span!("sync", started = executor::now()).entered()
81 : } else {
82 298 : info_span!("walproposer", started = executor::now()).entered()
83 : };
84 :
85 9426 : os.log_event(format!("started;walproposer;{}", sync_safekeepers as i32));
86 :
87 9426 : let config = Config {
88 9426 : ttid,
89 9426 : safekeepers_list: addrs,
90 9426 : safekeeper_conninfo_options: String::new(),
91 9426 : safekeeper_reconnect_timeout: 1000,
92 9426 : safekeeper_connection_timeout: 5000,
93 9426 : sync_safekeepers,
94 9426 : };
95 9426 : let args = walproposer_api::Args {
96 9426 : os,
97 9426 : config: config.clone(),
98 9426 : disk,
99 9426 : redo_start_lsn: lsn,
100 9426 : };
101 9426 : let api = SimulationApi::new(args);
102 9426 : let wp = Wrapper::new(Box::new(api), config);
103 9426 : wp.start();
104 9426 : }
105 :
106 : /// Start walproposer in a sync_safekeepers mode.
107 9303 : pub fn launch_sync(ttid: TenantTimelineId, addrs: Vec<String>, node: Arc<Node>) -> Self {
108 9303 : debug!("sync_safekeepers started at node {}", node.id);
109 9303 : let disk = DiskWalProposer::new();
110 9303 : let disk_wp = disk.clone();
111 :
112 : // start the client thread
113 9303 : let handle = node.launch(move |os| {
114 9128 : WalProposer::start(os, disk_wp, ttid, addrs, None);
115 9128 : });
116 :
117 9303 : Self {
118 9303 : thread: handle,
119 9303 : node,
120 9303 : disk,
121 9303 : sync_safekeepers: true,
122 9303 : }
123 9303 : }
124 :
125 : /// Start walproposer in a normal mode.
126 298 : pub fn launch_walproposer(
127 298 : ttid: TenantTimelineId,
128 298 : addrs: Vec<String>,
129 298 : node: Arc<Node>,
130 298 : lsn: Lsn,
131 298 : ) -> Self {
132 298 : debug!("walproposer started at node {}", node.id);
133 298 : let disk = DiskWalProposer::new();
134 298 : disk.lock().reset_to(lsn);
135 298 : let disk_wp = disk.clone();
136 :
137 : // start the client thread
138 298 : let handle = node.launch(move |os| {
139 298 : WalProposer::start(os, disk_wp, ttid, addrs, Some(lsn));
140 298 : });
141 :
142 298 : Self {
143 298 : thread: handle,
144 298 : node,
145 298 : disk,
146 298 : sync_safekeepers: false,
147 298 : }
148 298 : }
149 :
150 486 : pub fn write_tx(&mut self, cnt: usize) {
151 486 : let start_lsn = self.disk.lock().flush_rec_ptr();
152 :
153 11334 : for _ in 0..cnt {
154 11334 : self.disk
155 11334 : .lock()
156 11334 : .insert_logical_message(c"prefix", b"message");
157 11334 : }
158 :
159 486 : let end_lsn = self.disk.lock().flush_rec_ptr();
160 :
161 : // log event
162 486 : self.node
163 486 : .log_event(format!("write_wal;{};{};{}", start_lsn.0, end_lsn.0, cnt));
164 :
165 : // now we need to set "Latch" in walproposer
166 486 : self.node
167 486 : .node_events()
168 486 : .send(NodeEvent::Internal(AnyMessage::Just32(0)));
169 486 : }
170 :
171 8759 : pub fn stop(&self) {
172 8759 : self.thread.crash_stop();
173 8759 : }
174 : }
175 :
176 : /// Holds basic simulation settings, such as network options.
177 : pub struct TestConfig {
178 : pub network: NetworkOptions,
179 : pub timeout: u64,
180 : pub clock: Option<SimClock>,
181 : }
182 :
183 : impl TestConfig {
184 : /// Create a new TestConfig with default settings.
185 9 : pub fn new(clock: Option<SimClock>) -> Self {
186 9 : Self {
187 9 : network: NetworkOptions {
188 9 : keepalive_timeout: Some(2000),
189 9 : connect_delay: Delay {
190 9 : min: 1,
191 9 : max: 5,
192 9 : fail_prob: 0.0,
193 9 : },
194 9 : send_delay: Delay {
195 9 : min: 1,
196 9 : max: 5,
197 9 : fail_prob: 0.0,
198 9 : },
199 9 : },
200 9 : timeout: 1_000 * 10,
201 9 : clock,
202 9 : }
203 9 : }
204 :
205 : /// Start a new simulation with the specified seed.
206 508 : pub fn start(&self, seed: u64) -> Test {
207 508 : let world = Arc::new(World::new(seed, Arc::new(self.network.clone())));
208 :
209 508 : if let Some(clock) = &self.clock {
210 508 : clock.set_clock(world.clock());
211 508 : }
212 :
213 508 : let servers = [
214 508 : SafekeeperNode::new(world.new_node()),
215 508 : SafekeeperNode::new(world.new_node()),
216 508 : SafekeeperNode::new(world.new_node()),
217 508 : ];
218 :
219 508 : let server_ids = [servers[0].id, servers[1].id, servers[2].id];
220 1524 : let safekeepers_addrs = server_ids.map(|id| format!("node:{id}")).to_vec();
221 :
222 508 : let ttid = TenantTimelineId::generate();
223 :
224 508 : Test {
225 508 : world,
226 508 : servers,
227 508 : sk_list: safekeepers_addrs,
228 508 : ttid,
229 508 : timeout: self.timeout,
230 508 : }
231 508 : }
232 : }
233 :
234 : /// Holds simulation state.
235 : pub struct Test {
236 : pub world: Arc<World>,
237 : pub servers: [SafekeeperNode; 3],
238 : pub sk_list: Vec<String>,
239 : pub ttid: TenantTimelineId,
240 : pub timeout: u64,
241 : }
242 :
243 : impl Test {
244 : /// Start a sync_safekeepers thread and wait for it to finish.
245 6 : pub fn sync_safekeepers(&self) -> anyhow::Result<Lsn> {
246 6 : let wp = self.launch_sync_safekeepers();
247 :
248 : // poll until exit or timeout
249 6 : let time_limit = self.timeout;
250 230 : while self.world.step() && self.world.now() < time_limit && !wp.thread.is_finished() {}
251 :
252 6 : if !wp.thread.is_finished() {
253 0 : anyhow::bail!("timeout or idle stuck");
254 6 : }
255 :
256 6 : let res = wp.thread.result();
257 6 : if res.0 != 0 {
258 0 : anyhow::bail!("non-zero exitcode: {:?}", res);
259 6 : }
260 6 : let lsn = Lsn::from_str(&res.1)?;
261 6 : Ok(lsn)
262 6 : }
263 :
264 : /// Spawn a new sync_safekeepers thread.
265 9303 : pub fn launch_sync_safekeepers(&self) -> WalProposer {
266 9303 : WalProposer::launch_sync(self.ttid, self.sk_list.clone(), self.world.new_node())
267 9303 : }
268 :
269 : /// Spawn a new walproposer thread.
270 298 : pub fn launch_walproposer(&self, lsn: Lsn) -> WalProposer {
271 298 : let lsn = if lsn.0 == 0 {
272 : // usual LSN after basebackup
273 184 : Lsn(21623024)
274 : } else {
275 114 : lsn
276 : };
277 :
278 298 : WalProposer::launch_walproposer(self.ttid, self.sk_list.clone(), self.world.new_node(), lsn)
279 298 : }
280 :
281 : /// Execute the simulation for the specified duration.
282 105 : pub fn poll_for_duration(&self, duration: u64) {
283 105 : let time_limit = std::cmp::min(self.world.now() + duration, self.timeout);
284 1762 : while self.world.step() && self.world.now() < time_limit {}
285 105 : }
286 :
287 : /// Execute the simulation together with events defined in some schedule.
288 504 : pub fn run_schedule(&self, schedule: &Schedule) -> anyhow::Result<()> {
289 : // scheduling empty events so that world will stop in those points
290 : {
291 504 : let clock = self.world.clock();
292 :
293 504 : let now = self.world.now();
294 26753 : for (time, _) in schedule {
295 26249 : if *time < now {
296 0 : continue;
297 26249 : }
298 26249 : clock.schedule_fake(*time - now);
299 : }
300 : }
301 :
302 504 : let mut wp = self.launch_sync_safekeepers();
303 :
304 504 : let mut skipped_tx = 0;
305 504 : let mut started_tx = 0;
306 :
307 504 : let mut schedule_ptr = 0;
308 :
309 : loop {
310 27065 : if wp.sync_safekeepers && wp.thread.is_finished() {
311 330 : let res = wp.thread.result();
312 330 : if res.0 != 0 {
313 35 : warn!("sync non-zero exitcode: {:?}", res);
314 35 : debug!("restarting sync_safekeepers");
315 : // restart the sync_safekeepers
316 35 : wp = self.launch_sync_safekeepers();
317 35 : continue;
318 295 : }
319 295 : let lsn = Lsn::from_str(&res.1)?;
320 295 : debug!("sync_safekeepers finished at LSN {}", lsn);
321 295 : wp = self.launch_walproposer(lsn);
322 295 : debug!("walproposer started at thread {}", wp.thread.id());
323 26735 : }
324 :
325 27030 : let now = self.world.now();
326 53279 : while schedule_ptr < schedule.len() && schedule[schedule_ptr].0 <= now {
327 26249 : if now != schedule[schedule_ptr].0 {
328 0 : warn!("skipped event {:?} at {}", schedule[schedule_ptr], now);
329 26249 : }
330 :
331 26249 : let action = &schedule[schedule_ptr].1;
332 26249 : match action {
333 8752 : TestAction::WriteTx(size) => {
334 8752 : if !wp.sync_safekeepers && !wp.thread.is_finished() {
335 384 : started_tx += *size;
336 384 : wp.write_tx(*size);
337 384 : debug!("written {} transactions", size);
338 : } else {
339 8368 : skipped_tx += size;
340 8368 : debug!("skipped {} transactions", size);
341 : }
342 : }
343 8739 : TestAction::RestartSafekeeper(id) => {
344 8739 : debug!("restarting safekeeper {}", id);
345 8739 : self.servers[*id].restart();
346 : }
347 : TestAction::RestartWalProposer => {
348 8758 : debug!("restarting sync_safekeepers");
349 8758 : wp.stop();
350 8758 : wp = self.launch_sync_safekeepers();
351 : }
352 : }
353 26249 : schedule_ptr += 1;
354 : }
355 :
356 27030 : if schedule_ptr == schedule.len() {
357 504 : break;
358 26526 : }
359 26526 : let next_event_time = schedule[schedule_ptr].0;
360 :
361 : // poll until the next event
362 26526 : if wp.thread.is_finished() {
363 171 : while self.world.step() && self.world.now() < next_event_time {}
364 : } else {
365 394002 : while self.world.step()
366 394002 : && self.world.now() < next_event_time
367 367826 : && !wp.thread.is_finished()
368 367490 : {}
369 : }
370 : }
371 :
372 504 : debug!(
373 0 : "finished schedule, total steps: {}",
374 0 : self.world.get_thread_step_count()
375 : );
376 504 : debug!("skipped_tx: {}", skipped_tx);
377 504 : debug!("started_tx: {}", started_tx);
378 :
379 504 : Ok(())
380 504 : }
381 : }
382 :
383 : #[derive(Debug, Clone)]
384 : pub enum TestAction {
385 : WriteTx(usize),
386 : RestartSafekeeper(usize),
387 : RestartWalProposer,
388 : }
389 :
390 : pub type Schedule = Vec<(u64, TestAction)>;
391 :
392 502 : pub fn generate_schedule(seed: u64) -> Schedule {
393 502 : let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
394 502 : let mut schedule = Vec::new();
395 502 : let mut time = 0;
396 :
397 502 : let cnt = rng.gen_range(1..100);
398 :
399 502 : for _ in 0..cnt {
400 26134 : time += rng.gen_range(0..500);
401 26134 : let action = match rng.gen_range(0..3) {
402 8645 : 0 => TestAction::WriteTx(rng.gen_range(1..10)),
403 8733 : 1 => TestAction::RestartSafekeeper(rng.gen_range(0..3)),
404 8756 : 2 => TestAction::RestartWalProposer,
405 0 : _ => unreachable!(),
406 : };
407 26134 : schedule.push((time, action));
408 : }
409 :
410 502 : schedule
411 502 : }
412 :
413 502 : pub fn generate_network_opts(seed: u64) -> NetworkOptions {
414 502 : let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
415 :
416 502 : let timeout = rng.gen_range(100..2000);
417 502 : let max_delay = rng.gen_range(1..2 * timeout);
418 502 : let min_delay = rng.gen_range(1..=max_delay);
419 :
420 502 : let max_fail_prob = rng.gen_range(0.0..0.9);
421 502 : let connect_fail_prob = rng.gen_range(0.0..max_fail_prob);
422 502 : let send_fail_prob = rng.gen_range(0.0..connect_fail_prob);
423 :
424 502 : NetworkOptions {
425 502 : keepalive_timeout: Some(timeout),
426 502 : connect_delay: Delay {
427 502 : min: min_delay,
428 502 : max: max_delay,
429 502 : fail_prob: connect_fail_prob,
430 502 : },
431 502 : send_delay: Delay {
432 502 : min: min_delay,
433 502 : max: max_delay,
434 502 : fail_prob: send_fail_prob,
435 502 : },
436 502 : }
437 502 : }
|