LCOV - code coverage report
Current view: top level - pageserver/compaction/src/bin - compaction-simulator.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 0.0 % 135 0
Test Date: 2024-10-22 22:13:45 Functions: 0.0 % 52 0

            Line data    Source code
       1              : use clap::{Parser, Subcommand};
       2              : use pageserver_compaction::helpers::PAGE_SZ;
       3              : use pageserver_compaction::simulator::MockTimeline;
       4              : use rand::Rng;
       5              : use std::io::Write;
       6              : use std::path::{Path, PathBuf};
       7              : use std::sync::OnceLock;
       8              : 
       9              : use utils::project_git_version;
      10              : 
      11              : project_git_version!(GIT_VERSION);
      12              : 
      13            0 : #[derive(Parser)]
      14              : #[command(
      15              :     version = GIT_VERSION,
      16              :     about = "Neon Pageserver compaction simulator",
      17              :     long_about = "A developer tool to visualize and test compaction"
      18              : )]
      19              : #[command(propagate_version = true)]
      20              : struct CliOpts {
      21              :     #[command(subcommand)]
      22              :     command: Commands,
      23              : }
      24              : 
      25            0 : #[derive(Subcommand)]
      26              : enum Commands {
      27              :     RunSuite,
      28              :     Simulate(SimulateCmd),
      29              : }
      30              : 
      31            0 : #[derive(Clone, clap::ValueEnum)]
      32              : enum Distribution {
      33              :     Uniform,
      34              :     HotCold,
      35              : }
      36              : 
      37              : /// Read and update pageserver metadata file
      38            0 : #[derive(Parser)]
      39              : struct SimulateCmd {
      40            0 :     distribution: Distribution,
      41              : 
      42              :     /// Number of records to digest
      43            0 :     num_records: u64,
      44              :     /// Record length
      45            0 :     record_len: u64,
      46              : 
      47              :     // Logical database size in MB
      48            0 :     logical_size: u64,
      49              : }
      50              : 
      51            0 : async fn simulate(cmd: &SimulateCmd, results_path: &Path) -> anyhow::Result<()> {
      52            0 :     let mut executor = MockTimeline::new();
      53            0 : 
      54            0 :     // Convert the logical size in MB into a key range.
      55            0 :     let key_range = 0..((cmd.logical_size * 1024 * 1024) / PAGE_SZ);
      56            0 :     //let key_range = u64::MIN..u64::MAX;
      57            0 :     println!(
      58            0 :         "starting simulation with key range {:016X}-{:016X}",
      59            0 :         key_range.start, key_range.end
      60            0 :     );
      61            0 : 
      62            0 :     // helper function to print progress indicator
      63            0 :     let print_progress = |i| -> anyhow::Result<()> {
      64            0 :         if i == 0 || (i + 1) % 10000 == 0 || i == cmd.num_records - 1 {
      65            0 :             print!(
      66            0 :                 "\ringested {} / {} records, {} MiB / {} MiB...",
      67            0 :                 i + 1,
      68            0 :                 cmd.num_records,
      69            0 :                 (i + 1) * cmd.record_len / (1_000_000),
      70            0 :                 cmd.num_records * cmd.record_len / (1_000_000),
      71            0 :             );
      72            0 :             std::io::stdout().flush()?;
      73            0 :         }
      74            0 :         Ok(())
      75            0 :     };
      76              : 
      77            0 :     match cmd.distribution {
      78              :         Distribution::Uniform => {
      79            0 :             for i in 0..cmd.num_records {
      80            0 :                 executor.ingest_uniform(1, cmd.record_len, &key_range)?;
      81            0 :                 executor.compact_if_needed().await?;
      82              : 
      83            0 :                 print_progress(i)?;
      84              :             }
      85              :         }
      86              :         Distribution::HotCold => {
      87            0 :             let splitpoint = key_range.start + (key_range.end - key_range.start) / 10;
      88            0 :             let hot_key_range = 0..splitpoint;
      89            0 :             let cold_key_range = splitpoint..key_range.end;
      90              : 
      91            0 :             for i in 0..cmd.num_records {
      92            0 :                 let chosen_range = if rand::thread_rng().gen_bool(0.9) {
      93            0 :                     &hot_key_range
      94              :                 } else {
      95            0 :                     &cold_key_range
      96              :                 };
      97            0 :                 executor.ingest_uniform(1, cmd.record_len, chosen_range)?;
      98            0 :                 executor.compact_if_needed().await?;
      99              : 
     100            0 :                 print_progress(i)?;
     101              :             }
     102              :         }
     103              :     }
     104            0 :     println!("done!");
     105            0 :     executor.flush_l0();
     106            0 :     executor.compact_if_needed().await?;
     107            0 :     let stats = executor.stats()?;
     108              : 
     109              :     // Print the stats to stdout, and also to a file
     110            0 :     print!("{stats}");
     111            0 :     std::fs::write(results_path.join("stats.txt"), stats)?;
     112              : 
     113            0 :     let animation_path = results_path.join("compaction-animation.html");
     114            0 :     executor.draw_history(std::fs::File::create(&animation_path)?)?;
     115            0 :     println!(
     116            0 :         "animation: file://{}",
     117            0 :         animation_path.canonicalize()?.display()
     118            0 :     );
     119            0 : 
     120            0 :     Ok(())
     121            0 : }
     122              : 
     123            0 : async fn run_suite_cmd(results_path: &Path, workload: &SimulateCmd) -> anyhow::Result<()> {
     124            0 :     std::fs::create_dir(results_path)?;
     125              : 
     126            0 :     set_log_file(File::create(results_path.join("log"))?);
     127            0 :     let result = simulate(workload, results_path).await;
     128            0 :     set_log_stdout();
     129            0 :     result
     130            0 : }
     131              : 
     132            0 : async fn run_suite() -> anyhow::Result<()> {
     133            0 :     let top_results_path = PathBuf::from(format!(
     134            0 :         "compaction-suite-results.{}",
     135            0 :         std::time::SystemTime::UNIX_EPOCH.elapsed()?.as_secs()
     136            0 :     ));
     137            0 :     std::fs::create_dir(&top_results_path)?;
     138              : 
     139            0 :     let workload = SimulateCmd {
     140            0 :         distribution: Distribution::Uniform,
     141            0 :         // Generate 20 GB of WAL
     142            0 :         record_len: 1_000,
     143            0 :         num_records: 20_000_000,
     144            0 :         // Logical size 5 GB
     145            0 :         logical_size: 5_000,
     146            0 :     };
     147            0 : 
     148            0 :     run_suite_cmd(&top_results_path.join("uniform-20GB-5GB"), &workload).await?;
     149              : 
     150            0 :     println!(
     151            0 :         "All tests finished. Results in {}",
     152            0 :         top_results_path.display()
     153            0 :     );
     154            0 :     Ok(())
     155            0 : }
     156              : 
     157              : use std::fs::File;
     158              : use std::io::Stdout;
     159              : use std::sync::Mutex;
     160              : use tracing_subscriber::fmt::writer::EitherWriter;
     161              : use tracing_subscriber::fmt::MakeWriter;
     162              : 
     163              : static LOG_FILE: OnceLock<Mutex<EitherWriter<File, Stdout>>> = OnceLock::new();
     164            0 : fn get_log_output() -> &'static Mutex<EitherWriter<File, Stdout>> {
     165            0 :     LOG_FILE.get_or_init(|| std::sync::Mutex::new(EitherWriter::B(std::io::stdout())))
     166            0 : }
     167              : 
     168            0 : fn set_log_file(f: File) {
     169            0 :     *get_log_output().lock().unwrap() = EitherWriter::A(f);
     170            0 : }
     171              : 
     172            0 : fn set_log_stdout() {
     173            0 :     *get_log_output().lock().unwrap() = EitherWriter::B(std::io::stdout());
     174            0 : }
     175              : 
     176            0 : fn init_logging() -> anyhow::Result<()> {
     177            0 :     // We fall back to printing all spans at info-level or above if
     178            0 :     // the RUST_LOG environment variable is not set.
     179            0 :     let rust_log_env_filter = || {
     180            0 :         tracing_subscriber::EnvFilter::try_from_default_env()
     181            0 :             .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
     182            0 :     };
     183              : 
     184              :     // NB: the order of the with() calls does not matter.
     185              :     // See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
     186              :     use tracing_subscriber::prelude::*;
     187            0 :     tracing_subscriber::registry()
     188            0 :         .with({
     189            0 :             let log_layer = tracing_subscriber::fmt::layer()
     190            0 :                 .with_target(false)
     191            0 :                 .with_ansi(false)
     192            0 :                 .with_writer(|| get_log_output().make_writer());
     193            0 :             log_layer.with_filter(rust_log_env_filter())
     194            0 :         })
     195            0 :         .init();
     196            0 : 
     197            0 :     Ok(())
     198            0 : }
     199              : 
     200              : #[tokio::main]
     201            0 : async fn main() -> anyhow::Result<()> {
     202            0 :     let cli = CliOpts::parse();
     203            0 : 
     204            0 :     init_logging()?;
     205            0 : 
     206            0 :     match cli.command {
     207            0 :         Commands::Simulate(cmd) => {
     208            0 :             simulate(&cmd, &PathBuf::from("/tmp/compactions.html")).await?;
     209            0 :         }
     210            0 :         Commands::RunSuite => {
     211            0 :             run_suite().await?;
     212            0 :         }
     213            0 :     };
     214            0 :     Ok(())
     215            0 : }
        

Generated by: LCOV version 2.1-beta