Line data Source code
1 : use clap::{Parser, Subcommand};
2 : use pageserver_compaction::helpers::PAGE_SZ;
3 : use pageserver_compaction::simulator::MockTimeline;
4 : use rand::Rng;
5 : use std::io::Write;
6 : use std::path::{Path, PathBuf};
7 : use std::sync::OnceLock;
8 :
9 : use utils::project_git_version;
10 :
11 : project_git_version!(GIT_VERSION);
12 :
13 0 : #[derive(Parser)]
14 : #[command(
15 : version = GIT_VERSION,
16 : about = "Neon Pageserver compaction simulator",
17 : long_about = "A developer tool to visualize and test compaction"
18 : )]
19 : #[command(propagate_version = true)]
20 : struct CliOpts {
21 : #[command(subcommand)]
22 : command: Commands,
23 : }
24 :
25 0 : #[derive(Subcommand)]
26 : enum Commands {
27 : RunSuite,
28 : Simulate(SimulateCmd),
29 : }
30 :
31 0 : #[derive(Clone, clap::ValueEnum)]
32 : enum Distribution {
33 : Uniform,
34 : HotCold,
35 : }
36 :
37 : /// Read and update pageserver metadata file
38 0 : #[derive(Parser)]
39 : struct SimulateCmd {
40 0 : distribution: Distribution,
41 :
42 : /// Number of records to digest
43 0 : num_records: u64,
44 : /// Record length
45 0 : record_len: u64,
46 :
47 : // Logical database size in MB
48 0 : logical_size: u64,
49 : }
50 :
51 0 : async fn simulate(cmd: &SimulateCmd, results_path: &Path) -> anyhow::Result<()> {
52 0 : let mut executor = MockTimeline::new();
53 0 :
54 0 : // Convert the logical size in MB into a key range.
55 0 : let key_range = 0..((cmd.logical_size * 1024 * 1024) / PAGE_SZ);
56 0 : //let key_range = u64::MIN..u64::MAX;
57 0 : println!(
58 0 : "starting simulation with key range {:016X}-{:016X}",
59 0 : key_range.start, key_range.end
60 0 : );
61 0 :
62 0 : // helper function to print progress indicator
63 0 : let print_progress = |i| -> anyhow::Result<()> {
64 0 : if i == 0 || (i + 1) % 10000 == 0 || i == cmd.num_records - 1 {
65 0 : print!(
66 0 : "\ringested {} / {} records, {} MiB / {} MiB...",
67 0 : i + 1,
68 0 : cmd.num_records,
69 0 : (i + 1) * cmd.record_len / (1_000_000),
70 0 : cmd.num_records * cmd.record_len / (1_000_000),
71 0 : );
72 0 : std::io::stdout().flush()?;
73 0 : }
74 0 : Ok(())
75 0 : };
76 :
77 0 : match cmd.distribution {
78 : Distribution::Uniform => {
79 0 : for i in 0..cmd.num_records {
80 0 : executor.ingest_uniform(1, cmd.record_len, &key_range)?;
81 0 : executor.compact_if_needed().await?;
82 :
83 0 : print_progress(i)?;
84 : }
85 : }
86 : Distribution::HotCold => {
87 0 : let splitpoint = key_range.start + (key_range.end - key_range.start) / 10;
88 0 : let hot_key_range = 0..splitpoint;
89 0 : let cold_key_range = splitpoint..key_range.end;
90 :
91 0 : for i in 0..cmd.num_records {
92 0 : let chosen_range = if rand::thread_rng().gen_bool(0.9) {
93 0 : &hot_key_range
94 : } else {
95 0 : &cold_key_range
96 : };
97 0 : executor.ingest_uniform(1, cmd.record_len, chosen_range)?;
98 0 : executor.compact_if_needed().await?;
99 :
100 0 : print_progress(i)?;
101 : }
102 : }
103 : }
104 0 : println!("done!");
105 0 : executor.flush_l0();
106 0 : executor.compact_if_needed().await?;
107 0 : let stats = executor.stats()?;
108 :
109 : // Print the stats to stdout, and also to a file
110 0 : print!("{stats}");
111 0 : std::fs::write(results_path.join("stats.txt"), stats)?;
112 :
113 0 : let animation_path = results_path.join("compaction-animation.html");
114 0 : executor.draw_history(std::fs::File::create(&animation_path)?)?;
115 0 : println!(
116 0 : "animation: file://{}",
117 0 : animation_path.canonicalize()?.display()
118 0 : );
119 0 :
120 0 : Ok(())
121 0 : }
122 :
123 0 : async fn run_suite_cmd(results_path: &Path, workload: &SimulateCmd) -> anyhow::Result<()> {
124 0 : std::fs::create_dir(results_path)?;
125 :
126 0 : set_log_file(File::create(results_path.join("log"))?);
127 0 : let result = simulate(workload, results_path).await;
128 0 : set_log_stdout();
129 0 : result
130 0 : }
131 :
132 0 : async fn run_suite() -> anyhow::Result<()> {
133 0 : let top_results_path = PathBuf::from(format!(
134 0 : "compaction-suite-results.{}",
135 0 : std::time::SystemTime::UNIX_EPOCH.elapsed()?.as_secs()
136 0 : ));
137 0 : std::fs::create_dir(&top_results_path)?;
138 :
139 0 : let workload = SimulateCmd {
140 0 : distribution: Distribution::Uniform,
141 0 : // Generate 20 GB of WAL
142 0 : record_len: 1_000,
143 0 : num_records: 20_000_000,
144 0 : // Logical size 5 GB
145 0 : logical_size: 5_000,
146 0 : };
147 0 :
148 0 : run_suite_cmd(&top_results_path.join("uniform-20GB-5GB"), &workload).await?;
149 :
150 0 : println!(
151 0 : "All tests finished. Results in {}",
152 0 : top_results_path.display()
153 0 : );
154 0 : Ok(())
155 0 : }
156 :
157 : use std::fs::File;
158 : use std::io::Stdout;
159 : use std::sync::Mutex;
160 : use tracing_subscriber::fmt::writer::EitherWriter;
161 : use tracing_subscriber::fmt::MakeWriter;
162 :
163 : static LOG_FILE: OnceLock<Mutex<EitherWriter<File, Stdout>>> = OnceLock::new();
164 0 : fn get_log_output() -> &'static Mutex<EitherWriter<File, Stdout>> {
165 0 : LOG_FILE.get_or_init(|| std::sync::Mutex::new(EitherWriter::B(std::io::stdout())))
166 0 : }
167 :
168 0 : fn set_log_file(f: File) {
169 0 : *get_log_output().lock().unwrap() = EitherWriter::A(f);
170 0 : }
171 :
172 0 : fn set_log_stdout() {
173 0 : *get_log_output().lock().unwrap() = EitherWriter::B(std::io::stdout());
174 0 : }
175 :
176 0 : fn init_logging() -> anyhow::Result<()> {
177 0 : // We fall back to printing all spans at info-level or above if
178 0 : // the RUST_LOG environment variable is not set.
179 0 : let rust_log_env_filter = || {
180 0 : tracing_subscriber::EnvFilter::try_from_default_env()
181 0 : .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
182 0 : };
183 :
184 : // NB: the order of the with() calls does not matter.
185 : // See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
186 : use tracing_subscriber::prelude::*;
187 0 : tracing_subscriber::registry()
188 0 : .with({
189 0 : let log_layer = tracing_subscriber::fmt::layer()
190 0 : .with_target(false)
191 0 : .with_ansi(false)
192 0 : .with_writer(|| get_log_output().make_writer());
193 0 : log_layer.with_filter(rust_log_env_filter())
194 0 : })
195 0 : .init();
196 0 :
197 0 : Ok(())
198 0 : }
199 :
200 : #[tokio::main]
201 0 : async fn main() -> anyhow::Result<()> {
202 0 : let cli = CliOpts::parse();
203 0 :
204 0 : init_logging()?;
205 0 :
206 0 : match cli.command {
207 0 : Commands::Simulate(cmd) => {
208 0 : simulate(&cmd, &PathBuf::from("/tmp/compactions.html")).await?;
209 0 : }
210 0 : Commands::RunSuite => {
211 0 : run_suite().await?;
212 0 : }
213 0 : };
214 0 : Ok(())
215 0 : }
|