Line data Source code
1 : use clap::{Parser, Subcommand};
2 : use pageserver_compaction::simulator::MockTimeline;
3 : use rand::Rng;
4 : use std::io::Write;
5 : use std::path::{Path, PathBuf};
6 : use std::sync::OnceLock;
7 :
8 : use utils::project_git_version;
9 :
10 : project_git_version!(GIT_VERSION);
11 :
12 0 : #[derive(Parser)]
13 : #[command(
14 : version = GIT_VERSION,
15 : about = "Neon Pageserver compaction simulator",
16 : long_about = "A developer tool to visualize and test compaction"
17 : )]
18 : #[command(propagate_version = true)]
19 : struct CliOpts {
20 : #[command(subcommand)]
21 : command: Commands,
22 : }
23 :
24 0 : #[derive(Subcommand)]
25 : enum Commands {
26 : RunSuite,
27 : Simulate(SimulateCmd),
28 : }
29 :
30 0 : #[derive(Clone, clap::ValueEnum)]
31 : enum Distribution {
32 : Uniform,
33 : HotCold,
34 : }
35 :
36 : /// Read and update pageserver metadata file
37 0 : #[derive(Parser)]
38 : struct SimulateCmd {
39 0 : distribution: Distribution,
40 :
41 : /// Number of records to digest
42 0 : num_records: u64,
43 : /// Record length
44 0 : record_len: u64,
45 :
46 : // Logical database size in MB
47 0 : logical_size: u64,
48 : }
49 :
50 0 : async fn simulate(cmd: &SimulateCmd, results_path: &Path) -> anyhow::Result<()> {
51 0 : let mut executor = MockTimeline::new();
52 0 :
53 0 : // Convert the logical size in MB into a key range.
54 0 : let key_range = 0..((cmd.logical_size * 1024 * 1024) / 8192);
55 0 : //let key_range = u64::MIN..u64::MAX;
56 0 : println!(
57 0 : "starting simulation with key range {:016X}-{:016X}",
58 0 : key_range.start, key_range.end
59 0 : );
60 0 :
61 0 : // helper function to print progress indicator
62 0 : let print_progress = |i| -> anyhow::Result<()> {
63 0 : if i == 0 || (i + 1) % 10000 == 0 || i == cmd.num_records - 1 {
64 0 : print!(
65 0 : "\ringested {} / {} records, {} MiB / {} MiB...",
66 0 : i + 1,
67 0 : cmd.num_records,
68 0 : (i + 1) * cmd.record_len / (1_000_000),
69 0 : cmd.num_records * cmd.record_len / (1_000_000),
70 0 : );
71 0 : std::io::stdout().flush()?;
72 0 : }
73 0 : Ok(())
74 0 : };
75 :
76 0 : match cmd.distribution {
77 : Distribution::Uniform => {
78 0 : for i in 0..cmd.num_records {
79 0 : executor.ingest_uniform(1, cmd.record_len, &key_range)?;
80 0 : executor.compact_if_needed().await?;
81 :
82 0 : print_progress(i)?;
83 : }
84 : }
85 : Distribution::HotCold => {
86 0 : let splitpoint = key_range.start + (key_range.end - key_range.start) / 10;
87 0 : let hot_key_range = 0..splitpoint;
88 0 : let cold_key_range = splitpoint..key_range.end;
89 :
90 0 : for i in 0..cmd.num_records {
91 0 : let chosen_range = if rand::thread_rng().gen_bool(0.9) {
92 0 : &hot_key_range
93 : } else {
94 0 : &cold_key_range
95 : };
96 0 : executor.ingest_uniform(1, cmd.record_len, chosen_range)?;
97 0 : executor.compact_if_needed().await?;
98 :
99 0 : print_progress(i)?;
100 : }
101 : }
102 : }
103 0 : println!("done!");
104 0 : executor.flush_l0();
105 0 : executor.compact_if_needed().await?;
106 0 : let stats = executor.stats()?;
107 :
108 : // Print the stats to stdout, and also to a file
109 0 : print!("{stats}");
110 0 : std::fs::write(results_path.join("stats.txt"), stats)?;
111 :
112 0 : let animation_path = results_path.join("compaction-animation.html");
113 0 : executor.draw_history(std::fs::File::create(&animation_path)?)?;
114 : println!(
115 0 : "animation: file://{}",
116 0 : animation_path.canonicalize()?.display()
117 0 : );
118 0 :
119 0 : Ok(())
120 0 : }
121 :
122 0 : async fn run_suite_cmd(results_path: &Path, workload: &SimulateCmd) -> anyhow::Result<()> {
123 0 : std::fs::create_dir(results_path)?;
124 :
125 0 : set_log_file(File::create(results_path.join("log"))?);
126 0 : let result = simulate(workload, results_path).await;
127 0 : set_log_stdout();
128 0 : result
129 0 : }
130 :
131 0 : async fn run_suite() -> anyhow::Result<()> {
132 0 : let top_results_path = PathBuf::from(format!(
133 0 : "compaction-suite-results.{}",
134 0 : std::time::SystemTime::UNIX_EPOCH.elapsed()?.as_secs()
135 0 : ));
136 0 : std::fs::create_dir(&top_results_path)?;
137 :
138 0 : let workload = SimulateCmd {
139 0 : distribution: Distribution::Uniform,
140 0 : // Generate 20 GB of WAL
141 0 : record_len: 1_000,
142 0 : num_records: 20_000_000,
143 0 : // Logical size 5 GB
144 0 : logical_size: 5_000,
145 0 : };
146 0 :
147 0 : run_suite_cmd(&top_results_path.join("uniform-20GB-5GB"), &workload).await?;
148 :
149 0 : println!(
150 0 : "All tests finished. Results in {}",
151 0 : top_results_path.display()
152 0 : );
153 0 : Ok(())
154 0 : }
155 :
156 : use std::fs::File;
157 : use std::io::Stdout;
158 : use std::sync::Mutex;
159 : use tracing_subscriber::fmt::writer::EitherWriter;
160 : use tracing_subscriber::fmt::MakeWriter;
161 :
162 : static LOG_FILE: OnceLock<Mutex<EitherWriter<File, Stdout>>> = OnceLock::new();
163 0 : fn get_log_output() -> &'static Mutex<EitherWriter<File, Stdout>> {
164 0 : LOG_FILE.get_or_init(|| std::sync::Mutex::new(EitherWriter::B(std::io::stdout())))
165 0 : }
166 :
167 0 : fn set_log_file(f: File) {
168 0 : *get_log_output().lock().unwrap() = EitherWriter::A(f);
169 0 : }
170 :
171 0 : fn set_log_stdout() {
172 0 : *get_log_output().lock().unwrap() = EitherWriter::B(std::io::stdout());
173 0 : }
174 :
175 0 : fn init_logging() -> anyhow::Result<()> {
176 0 : // We fall back to printing all spans at info-level or above if
177 0 : // the RUST_LOG environment variable is not set.
178 0 : let rust_log_env_filter = || {
179 0 : tracing_subscriber::EnvFilter::try_from_default_env()
180 0 : .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
181 0 : };
182 :
183 : // NB: the order of the with() calls does not matter.
184 : // See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
185 : use tracing_subscriber::prelude::*;
186 0 : tracing_subscriber::registry()
187 0 : .with({
188 0 : let log_layer = tracing_subscriber::fmt::layer()
189 0 : .with_target(false)
190 0 : .with_ansi(false)
191 0 : .with_writer(|| get_log_output().make_writer());
192 0 : log_layer.with_filter(rust_log_env_filter())
193 0 : })
194 0 : .init();
195 0 :
196 0 : Ok(())
197 0 : }
198 :
199 : #[tokio::main]
200 0 : async fn main() -> anyhow::Result<()> {
201 0 : let cli = CliOpts::parse();
202 0 :
203 0 : init_logging()?;
204 0 :
205 0 : match cli.command {
206 0 : Commands::Simulate(cmd) => {
207 0 : simulate(&cmd, &PathBuf::from("/tmp/compactions.html")).await?;
208 0 : }
209 0 : Commands::RunSuite => {
210 0 : run_suite().await?;
211 0 : }
212 0 : };
213 0 : Ok(())
214 0 : }
|