Line data Source code
1 : use std::{cell::Cell, str::FromStr, sync::Arc};
2 :
3 : use crate::walproposer_sim::{safekeeper::run_server, walproposer_api::SimulationApi};
4 : use desim::{
5 : executor::{self, ExternalHandle},
6 : node_os::NodeOs,
7 : options::{Delay, NetworkOptions},
8 : proto::{AnyMessage, NodeEvent},
9 : world::Node,
10 : world::World,
11 : };
12 : use rand::{Rng, SeedableRng};
13 : use tracing::{debug, info_span, warn};
14 : use utils::{id::TenantTimelineId, lsn::Lsn};
15 : use walproposer::walproposer::{Config, Wrapper};
16 :
17 : use super::{
18 : log::SimClock, safekeeper_disk::SafekeeperDisk, walproposer_api,
19 : walproposer_disk::DiskWalProposer,
20 : };
21 :
22 : /// Simulated safekeeper node.
23 : pub struct SafekeeperNode {
24 : pub node: Arc<Node>,
25 : pub id: u32,
26 : pub disk: Arc<SafekeeperDisk>,
27 : pub thread: Cell<ExternalHandle>,
28 : }
29 :
30 : impl SafekeeperNode {
31 : /// Create and start a safekeeper at the specified Node.
32 1524 : pub fn new(node: Arc<Node>) -> Self {
33 1524 : let disk = Arc::new(SafekeeperDisk::new());
34 1524 : let thread = Cell::new(SafekeeperNode::launch(disk.clone(), node.clone()));
35 1524 :
36 1524 : Self {
37 1524 : id: node.id,
38 1524 : node,
39 1524 : disk,
40 1524 : thread,
41 1524 : }
42 1524 : }
43 :
44 10223 : fn launch(disk: Arc<SafekeeperDisk>, node: Arc<Node>) -> ExternalHandle {
45 10223 : // start the server thread
46 10223 : node.launch(move |os| {
47 10063 : run_server(os, disk).expect("server should finish without errors");
48 10223 : })
49 10223 : }
50 :
51 : /// Restart the safekeeper.
52 8699 : pub fn restart(&self) {
53 8699 : let new_thread = SafekeeperNode::launch(self.disk.clone(), self.node.clone());
54 8699 : let old_thread = self.thread.replace(new_thread);
55 8699 : old_thread.crash_stop();
56 8699 : }
57 : }
58 :
59 : /// Simulated walproposer node.
60 : pub struct WalProposer {
61 : thread: ExternalHandle,
62 : node: Arc<Node>,
63 : disk: Arc<DiskWalProposer>,
64 : sync_safekeepers: bool,
65 : }
66 :
67 : impl WalProposer {
68 : /// Generic start function for both modes.
69 9437 : fn start(
70 9437 : os: NodeOs,
71 9437 : disk: Arc<DiskWalProposer>,
72 9437 : ttid: TenantTimelineId,
73 9437 : addrs: Vec<String>,
74 9437 : lsn: Option<Lsn>,
75 9437 : ) {
76 9437 : let sync_safekeepers = lsn.is_none();
77 :
78 9437 : let _enter = if sync_safekeepers {
79 9079 : info_span!("sync", started = executor::now()).entered()
80 : } else {
81 358 : info_span!("walproposer", started = executor::now()).entered()
82 : };
83 :
84 9437 : os.log_event(format!("started;walproposer;{}", sync_safekeepers as i32));
85 9437 :
86 9437 : let config = Config {
87 9437 : ttid,
88 9437 : safekeepers_list: addrs,
89 9437 : safekeeper_reconnect_timeout: 1000,
90 9437 : safekeeper_connection_timeout: 5000,
91 9437 : sync_safekeepers,
92 9437 : };
93 9437 : let args = walproposer_api::Args {
94 9437 : os,
95 9437 : config: config.clone(),
96 9437 : disk,
97 9437 : redo_start_lsn: lsn,
98 9437 : };
99 9437 : let api = SimulationApi::new(args);
100 9437 : let wp = Wrapper::new(Box::new(api), config);
101 9437 : wp.start();
102 9437 : }
103 :
104 : /// Start walproposer in a sync_safekeepers mode.
105 9248 : pub fn launch_sync(ttid: TenantTimelineId, addrs: Vec<String>, node: Arc<Node>) -> Self {
106 9248 : debug!("sync_safekeepers started at node {}", node.id);
107 9248 : let disk = DiskWalProposer::new();
108 9248 : let disk_wp = disk.clone();
109 9248 :
110 9248 : // start the client thread
111 9248 : let handle = node.launch(move |os| {
112 9079 : WalProposer::start(os, disk_wp, ttid, addrs, None);
113 9248 : });
114 9248 :
115 9248 : Self {
116 9248 : thread: handle,
117 9248 : node,
118 9248 : disk,
119 9248 : sync_safekeepers: true,
120 9248 : }
121 9248 : }
122 :
123 : /// Start walproposer in a normal mode.
124 358 : pub fn launch_walproposer(
125 358 : ttid: TenantTimelineId,
126 358 : addrs: Vec<String>,
127 358 : node: Arc<Node>,
128 358 : lsn: Lsn,
129 358 : ) -> Self {
130 358 : debug!("walproposer started at node {}", node.id);
131 358 : let disk = DiskWalProposer::new();
132 358 : disk.lock().reset_to(lsn);
133 358 : let disk_wp = disk.clone();
134 358 :
135 358 : // start the client thread
136 358 : let handle = node.launch(move |os| {
137 358 : WalProposer::start(os, disk_wp, ttid, addrs, Some(lsn));
138 358 : });
139 358 :
140 358 : Self {
141 358 : thread: handle,
142 358 : node,
143 358 : disk,
144 358 : sync_safekeepers: false,
145 358 : }
146 358 : }
147 :
148 568 : pub fn write_tx(&mut self, cnt: usize) {
149 568 : let start_lsn = self.disk.lock().flush_rec_ptr();
150 568 :
151 11726 : for _ in 0..cnt {
152 11726 : self.disk
153 11726 : .lock()
154 11726 : .insert_logical_message(c"prefix", b"message");
155 11726 : }
156 :
157 568 : let end_lsn = self.disk.lock().flush_rec_ptr();
158 568 :
159 568 : // log event
160 568 : self.node
161 568 : .log_event(format!("write_wal;{};{};{}", start_lsn.0, end_lsn.0, cnt));
162 568 :
163 568 : // now we need to set "Latch" in walproposer
164 568 : self.node
165 568 : .node_events()
166 568 : .send(NodeEvent::Internal(AnyMessage::Just32(0)));
167 568 : }
168 :
169 8703 : pub fn stop(&self) {
170 8703 : self.thread.crash_stop();
171 8703 : }
172 : }
173 :
174 : /// Holds basic simulation settings, such as network options.
175 : pub struct TestConfig {
176 : pub network: NetworkOptions,
177 : pub timeout: u64,
178 : pub clock: Option<SimClock>,
179 : }
180 :
181 : impl TestConfig {
182 : /// Create a new TestConfig with default settings.
183 9 : pub fn new(clock: Option<SimClock>) -> Self {
184 9 : Self {
185 9 : network: NetworkOptions {
186 9 : keepalive_timeout: Some(2000),
187 9 : connect_delay: Delay {
188 9 : min: 1,
189 9 : max: 5,
190 9 : fail_prob: 0.0,
191 9 : },
192 9 : send_delay: Delay {
193 9 : min: 1,
194 9 : max: 5,
195 9 : fail_prob: 0.0,
196 9 : },
197 9 : },
198 9 : timeout: 1_000 * 10,
199 9 : clock,
200 9 : }
201 9 : }
202 :
203 : /// Start a new simulation with the specified seed.
204 508 : pub fn start(&self, seed: u64) -> Test {
205 508 : let world = Arc::new(World::new(seed, Arc::new(self.network.clone())));
206 :
207 508 : if let Some(clock) = &self.clock {
208 508 : clock.set_clock(world.clock());
209 508 : }
210 :
211 508 : let servers = [
212 508 : SafekeeperNode::new(world.new_node()),
213 508 : SafekeeperNode::new(world.new_node()),
214 508 : SafekeeperNode::new(world.new_node()),
215 508 : ];
216 508 :
217 508 : let server_ids = [servers[0].id, servers[1].id, servers[2].id];
218 1524 : let safekeepers_addrs = server_ids.map(|id| format!("node:{}", id)).to_vec();
219 508 :
220 508 : let ttid = TenantTimelineId::generate();
221 508 :
222 508 : Test {
223 508 : world,
224 508 : servers,
225 508 : sk_list: safekeepers_addrs,
226 508 : ttid,
227 508 : timeout: self.timeout,
228 508 : }
229 508 : }
230 : }
231 :
232 : /// Holds simulation state.
233 : pub struct Test {
234 : pub world: Arc<World>,
235 : pub servers: [SafekeeperNode; 3],
236 : pub sk_list: Vec<String>,
237 : pub ttid: TenantTimelineId,
238 : pub timeout: u64,
239 : }
240 :
241 : impl Test {
242 : /// Start a sync_safekeepers thread and wait for it to finish.
243 6 : pub fn sync_safekeepers(&self) -> anyhow::Result<Lsn> {
244 6 : let wp = self.launch_sync_safekeepers();
245 6 :
246 6 : // poll until exit or timeout
247 6 : let time_limit = self.timeout;
248 230 : while self.world.step() && self.world.now() < time_limit && !wp.thread.is_finished() {}
249 :
250 6 : if !wp.thread.is_finished() {
251 0 : anyhow::bail!("timeout or idle stuck");
252 6 : }
253 6 :
254 6 : let res = wp.thread.result();
255 6 : if res.0 != 0 {
256 0 : anyhow::bail!("non-zero exitcode: {:?}", res);
257 6 : }
258 6 : let lsn = Lsn::from_str(&res.1)?;
259 6 : Ok(lsn)
260 6 : }
261 :
262 : /// Spawn a new sync_safekeepers thread.
263 9248 : pub fn launch_sync_safekeepers(&self) -> WalProposer {
264 9248 : WalProposer::launch_sync(self.ttid, self.sk_list.clone(), self.world.new_node())
265 9248 : }
266 :
267 : /// Spawn a new walproposer thread.
268 358 : pub fn launch_walproposer(&self, lsn: Lsn) -> WalProposer {
269 358 : let lsn = if lsn.0 == 0 {
270 : // usual LSN after basebackup
271 186 : Lsn(21623024)
272 : } else {
273 172 : lsn
274 : };
275 :
276 358 : WalProposer::launch_walproposer(self.ttid, self.sk_list.clone(), self.world.new_node(), lsn)
277 358 : }
278 :
279 : /// Execute the simulation for the specified duration.
280 105 : pub fn poll_for_duration(&self, duration: u64) {
281 105 : let time_limit = std::cmp::min(self.world.now() + duration, self.timeout);
282 1762 : while self.world.step() && self.world.now() < time_limit {}
283 105 : }
284 :
285 : /// Execute the simulation together with events defined in some schedule.
286 504 : pub fn run_schedule(&self, schedule: &Schedule) -> anyhow::Result<()> {
287 504 : // scheduling empty events so that world will stop in those points
288 504 : {
289 504 : let clock = self.world.clock();
290 504 :
291 504 : let now = self.world.now();
292 26583 : for (time, _) in schedule {
293 26079 : if *time < now {
294 0 : continue;
295 26079 : }
296 26079 : clock.schedule_fake(*time - now);
297 : }
298 : }
299 :
300 504 : let mut wp = self.launch_sync_safekeepers();
301 504 :
302 504 : let mut skipped_tx = 0;
303 504 : let mut started_tx = 0;
304 504 :
305 504 : let mut schedule_ptr = 0;
306 :
307 : loop {
308 26958 : if wp.sync_safekeepers && wp.thread.is_finished() {
309 391 : let res = wp.thread.result();
310 391 : if res.0 != 0 {
311 36 : warn!("sync non-zero exitcode: {:?}", res);
312 36 : debug!("restarting sync_safekeepers");
313 : // restart the sync_safekeepers
314 36 : wp = self.launch_sync_safekeepers();
315 36 : continue;
316 355 : }
317 355 : let lsn = Lsn::from_str(&res.1)?;
318 355 : debug!("sync_safekeepers finished at LSN {}", lsn);
319 355 : wp = self.launch_walproposer(lsn);
320 355 : debug!("walproposer started at thread {}", wp.thread.id());
321 26567 : }
322 :
323 26922 : let now = self.world.now();
324 53001 : while schedule_ptr < schedule.len() && schedule[schedule_ptr].0 <= now {
325 26079 : if now != schedule[schedule_ptr].0 {
326 0 : warn!("skipped event {:?} at {}", schedule[schedule_ptr], now);
327 26079 : }
328 :
329 26079 : let action = &schedule[schedule_ptr].1;
330 26079 : match action {
331 8679 : TestAction::WriteTx(size) => {
332 8679 : if !wp.sync_safekeepers && !wp.thread.is_finished() {
333 466 : started_tx += *size;
334 466 : wp.write_tx(*size);
335 466 : debug!("written {} transactions", size);
336 : } else {
337 8213 : skipped_tx += size;
338 8213 : debug!("skipped {} transactions", size);
339 : }
340 : }
341 8698 : TestAction::RestartSafekeeper(id) => {
342 8698 : debug!("restarting safekeeper {}", id);
343 8698 : self.servers[*id].restart();
344 : }
345 : TestAction::RestartWalProposer => {
346 8702 : debug!("restarting sync_safekeepers");
347 8702 : wp.stop();
348 8702 : wp = self.launch_sync_safekeepers();
349 : }
350 : }
351 26079 : schedule_ptr += 1;
352 : }
353 :
354 26922 : if schedule_ptr == schedule.len() {
355 504 : break;
356 26418 : }
357 26418 : let next_event_time = schedule[schedule_ptr].0;
358 26418 :
359 26418 : // poll until the next event
360 26418 : if wp.thread.is_finished() {
361 99 : while self.world.step() && self.world.now() < next_event_time {}
362 : } else {
363 406789 : while self.world.step()
364 406789 : && self.world.now() < next_event_time
365 380783 : && !wp.thread.is_finished()
366 380384 : {}
367 : }
368 : }
369 :
370 504 : debug!(
371 0 : "finished schedule, total steps: {}",
372 0 : self.world.get_thread_step_count()
373 : );
374 504 : debug!("skipped_tx: {}", skipped_tx);
375 504 : debug!("started_tx: {}", started_tx);
376 :
377 504 : Ok(())
378 504 : }
379 : }
380 :
381 : #[derive(Debug, Clone)]
382 : pub enum TestAction {
383 : WriteTx(usize),
384 : RestartSafekeeper(usize),
385 : RestartWalProposer,
386 : }
387 :
388 : pub type Schedule = Vec<(u64, TestAction)>;
389 :
390 502 : pub fn generate_schedule(seed: u64) -> Schedule {
391 502 : let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
392 502 : let mut schedule = Vec::new();
393 502 : let mut time = 0;
394 502 :
395 502 : let cnt = rng.gen_range(1..100);
396 502 :
397 502 : for _ in 0..cnt {
398 25964 : time += rng.gen_range(0..500);
399 25964 : let action = match rng.gen_range(0..3) {
400 8572 : 0 => TestAction::WriteTx(rng.gen_range(1..10)),
401 8692 : 1 => TestAction::RestartSafekeeper(rng.gen_range(0..3)),
402 8700 : 2 => TestAction::RestartWalProposer,
403 0 : _ => unreachable!(),
404 : };
405 25964 : schedule.push((time, action));
406 : }
407 :
408 502 : schedule
409 502 : }
410 :
411 502 : pub fn generate_network_opts(seed: u64) -> NetworkOptions {
412 502 : let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
413 502 :
414 502 : let timeout = rng.gen_range(100..2000);
415 502 : let max_delay = rng.gen_range(1..2 * timeout);
416 502 : let min_delay = rng.gen_range(1..=max_delay);
417 502 :
418 502 : let max_fail_prob = rng.gen_range(0.0..0.9);
419 502 : let connect_fail_prob = rng.gen_range(0.0..max_fail_prob);
420 502 : let send_fail_prob = rng.gen_range(0.0..connect_fail_prob);
421 502 :
422 502 : NetworkOptions {
423 502 : keepalive_timeout: Some(timeout),
424 502 : connect_delay: Delay {
425 502 : min: min_delay,
426 502 : max: max_delay,
427 502 : fail_prob: connect_fail_prob,
428 502 : },
429 502 : send_delay: Delay {
430 502 : min: min_delay,
431 502 : max: max_delay,
432 502 : fail_prob: send_fail_prob,
433 502 : },
434 502 : }
435 502 : }
|