LCOV - code coverage report
Current view: top level - pageserver/pagebench/src/cmd - getpage_latest_lsn.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 0.0 % 290 0
Test Date: 2025-04-24 20:31:15 Functions: 0.0 % 42 0

            Line data    Source code
       1              : use std::collections::{HashSet, VecDeque};
       2              : use std::future::Future;
       3              : use std::num::NonZeroUsize;
       4              : use std::pin::Pin;
       5              : use std::sync::atomic::{AtomicU64, Ordering};
       6              : use std::sync::{Arc, Mutex};
       7              : use std::time::{Duration, Instant};
       8              : 
       9              : use anyhow::Context;
      10              : use camino::Utf8PathBuf;
      11              : use pageserver_api::key::Key;
      12              : use pageserver_api::keyspace::KeySpaceAccum;
      13              : use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
      14              : use pageserver_api::shard::TenantShardId;
      15              : use rand::prelude::*;
      16              : use tokio::task::JoinSet;
      17              : use tokio_util::sync::CancellationToken;
      18              : use tracing::info;
      19              : use utils::id::TenantTimelineId;
      20              : use utils::lsn::Lsn;
      21              : 
      22              : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
      23              : use crate::util::{request_stats, tokio_thread_local_stats};
      24              : 
      25              : /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
      26              : #[derive(clap::Parser)]
      27              : pub(crate) struct Args {
      28              :     #[clap(long, default_value = "http://localhost:9898")]
      29            0 :     mgmt_api_endpoint: String,
      30              :     #[clap(long, default_value = "postgres://postgres@localhost:64000")]
      31            0 :     page_service_connstring: String,
      32              :     #[clap(long)]
      33              :     pageserver_jwt: Option<String>,
      34              :     #[clap(long, default_value = "1")]
      35            0 :     num_clients: NonZeroUsize,
      36              :     #[clap(long)]
      37              :     runtime: Option<humantime::Duration>,
      38              :     /// Each client sends requests at the given rate.
      39              :     ///
      40              :     /// If a request takes too long and we should be issuing a new request already,
      41              :     /// we skip that request and account it as `MISSED`.
      42              :     #[clap(long)]
      43              :     per_client_rate: Option<usize>,
      44              :     /// Probability for sending `latest=true` in the request (uniform distribution).
      45              :     #[clap(long, default_value = "1")]
      46            0 :     req_latest_probability: f64,
      47              :     #[clap(long)]
      48              :     limit_to_first_n_targets: Option<usize>,
      49              :     /// For large pageserver installations, enumerating the keyspace takes a lot of time.
      50              :     /// If specified, the specified path is used to maintain a cache of the keyspace enumeration result.
      51              :     /// The cache is tagged and auto-invalided by the tenant/timeline ids only.
      52              :     /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
      53              :     #[clap(long)]
      54              :     keyspace_cache: Option<Utf8PathBuf>,
      55              :     /// Before starting the benchmark, live-reconfigure the pageserver to use the given
      56              :     /// [`pageserver_api::models::virtual_file::IoEngineKind`].
      57              :     #[clap(long)]
      58              :     set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
      59              : 
      60              :     /// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
      61              :     #[clap(long)]
      62              :     set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
      63              : 
      64              :     /// Queue depth generated in each client.
      65              :     #[clap(long, default_value = "1")]
      66            0 :     queue_depth: NonZeroUsize,
      67              : 
      68            0 :     targets: Option<Vec<TenantTimelineId>>,
      69              : }
      70              : 
      71              : /// State shared by all clients
      72              : #[derive(Debug)]
      73              : struct SharedState {
      74              :     start_work_barrier: tokio::sync::Barrier,
      75              :     live_stats: LiveStats,
      76              : }
      77              : 
      78              : #[derive(Debug, Default)]
      79              : struct LiveStats {
      80              :     completed_requests: AtomicU64,
      81              :     missed: AtomicU64,
      82              : }
      83              : 
      84              : impl LiveStats {
      85            0 :     fn request_done(&self) {
      86            0 :         self.completed_requests.fetch_add(1, Ordering::Relaxed);
      87            0 :     }
      88            0 :     fn missed(&self, n: u64) {
      89            0 :         self.missed.fetch_add(n, Ordering::Relaxed);
      90            0 :     }
      91              : }
      92              : 
      93            0 : #[derive(Clone, serde::Serialize, serde::Deserialize)]
      94              : struct KeyRange {
      95              :     timeline: TenantTimelineId,
      96              :     timeline_lsn: Lsn,
      97              :     start: i128,
      98              :     end: i128,
      99              : }
     100              : 
     101              : impl KeyRange {
     102            0 :     fn len(&self) -> i128 {
     103            0 :         self.end - self.start
     104            0 :     }
     105              : }
     106              : 
     107              : #[derive(PartialEq, Eq, Hash, Copy, Clone)]
     108              : struct WorkerId {
     109              :     timeline: TenantTimelineId,
     110              :     num_client: usize, // from 0..args.num_clients
     111              : }
     112              : 
     113              : #[derive(serde::Serialize)]
     114              : struct Output {
     115              :     total: request_stats::Output,
     116              : }
     117              : 
     118              : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
     119              : 
     120            0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
     121            0 :     tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
     122            0 :         main_impl(args, thread_local_stats)
     123            0 :     })
     124            0 : }
     125              : 
     126            0 : async fn main_impl(
     127            0 :     args: Args,
     128            0 :     all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
     129            0 : ) -> anyhow::Result<()> {
     130            0 :     let args: &'static Args = Box::leak(Box::new(args));
     131            0 : 
     132            0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
     133            0 :         reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
     134            0 :         args.mgmt_api_endpoint.clone(),
     135            0 :         args.pageserver_jwt.as_deref(),
     136            0 :     ));
     137              : 
     138            0 :     if let Some(engine_str) = &args.set_io_engine {
     139            0 :         mgmt_api_client.put_io_engine(engine_str).await?;
     140            0 :     }
     141              : 
     142            0 :     if let Some(mode) = &args.set_io_mode {
     143            0 :         mgmt_api_client.put_io_mode(mode).await?;
     144            0 :     }
     145              : 
     146              :     // discover targets
     147            0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
     148            0 :         &mgmt_api_client,
     149            0 :         crate::util::cli::targets::Spec {
     150            0 :             limit_to_first_n_targets: args.limit_to_first_n_targets,
     151            0 :             targets: args.targets.clone(),
     152            0 :         },
     153            0 :     )
     154            0 :     .await?;
     155              : 
     156            0 :     #[derive(serde::Deserialize)]
     157              :     struct KeyspaceCacheDe {
     158              :         tag: Vec<TenantTimelineId>,
     159              :         data: Vec<KeyRange>,
     160              :     }
     161              :     #[derive(serde::Serialize)]
     162              :     struct KeyspaceCacheSer<'a> {
     163              :         tag: &'a [TenantTimelineId],
     164              :         data: &'a [KeyRange],
     165              :     }
     166            0 :     let cache = args
     167            0 :         .keyspace_cache
     168            0 :         .as_ref()
     169            0 :         .map(|keyspace_cache_file| {
     170            0 :             let contents = match std::fs::read(keyspace_cache_file) {
     171            0 :                 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
     172            0 :                     return anyhow::Ok(None);
     173              :                 }
     174            0 :                 x => x.context("read keyspace cache file")?,
     175              :             };
     176            0 :             let cache: KeyspaceCacheDe =
     177            0 :                 serde_json::from_slice(&contents).context("deserialize cache file")?;
     178            0 :             let tag_ok = HashSet::<TenantTimelineId>::from_iter(cache.tag.into_iter())
     179            0 :                 == HashSet::from_iter(timelines.iter().cloned());
     180            0 :             info!("keyspace cache file matches tag: {tag_ok}");
     181            0 :             anyhow::Ok(if tag_ok { Some(cache.data) } else { None })
     182            0 :         })
     183            0 :         .transpose()?
     184            0 :         .flatten();
     185            0 :     let all_ranges: Vec<KeyRange> = if let Some(cached) = cache {
     186            0 :         info!("using keyspace cache file");
     187            0 :         cached
     188              :     } else {
     189            0 :         let mut js = JoinSet::new();
     190            0 :         for timeline in &timelines {
     191            0 :             js.spawn({
     192            0 :                 let mgmt_api_client = Arc::clone(&mgmt_api_client);
     193            0 :                 let timeline = *timeline;
     194            0 :                 async move {
     195            0 :                     let partitioning = mgmt_api_client
     196            0 :                         .keyspace(
     197            0 :                             TenantShardId::unsharded(timeline.tenant_id),
     198            0 :                             timeline.timeline_id,
     199            0 :                         )
     200            0 :                         .await?;
     201            0 :                     let lsn = partitioning.at_lsn;
     202            0 :                     let start = Instant::now();
     203            0 :                     let mut filtered = KeySpaceAccum::new();
     204              :                     // let's hope this is inlined and vectorized...
     205              :                     // TODO: turn this loop into a is_rel_block_range() function.
     206            0 :                     for r in partitioning.keys.ranges.iter() {
     207            0 :                         let mut i = r.start;
     208            0 :                         while i != r.end {
     209            0 :                             if i.is_rel_block_key() {
     210            0 :                                 filtered.add_key(i);
     211            0 :                             }
     212            0 :                             i = i.next();
     213              :                         }
     214              :                     }
     215            0 :                     let filtered = filtered.to_keyspace();
     216            0 :                     let filter_duration = start.elapsed();
     217            0 : 
     218            0 :                     anyhow::Ok((
     219            0 :                         filter_duration,
     220            0 :                         filtered.ranges.into_iter().map(move |r| KeyRange {
     221            0 :                             timeline,
     222            0 :                             timeline_lsn: lsn,
     223            0 :                             start: r.start.to_i128(),
     224            0 :                             end: r.end.to_i128(),
     225            0 :                         }),
     226            0 :                     ))
     227            0 :                 }
     228            0 :             });
     229            0 :         }
     230            0 :         let mut total_filter_duration = Duration::from_secs(0);
     231            0 :         let mut all_ranges: Vec<KeyRange> = Vec::new();
     232            0 :         while let Some(res) = js.join_next().await {
     233            0 :             let (filter_duration, range) = res.unwrap().unwrap();
     234            0 :             all_ranges.extend(range);
     235            0 :             total_filter_duration += filter_duration;
     236            0 :         }
     237            0 :         info!("filter duration: {}", total_filter_duration.as_secs_f64());
     238            0 :         if let Some(cachefile) = args.keyspace_cache.as_ref() {
     239            0 :             let cache = KeyspaceCacheSer {
     240            0 :                 tag: &timelines,
     241            0 :                 data: &all_ranges,
     242            0 :             };
     243            0 :             let bytes = serde_json::to_vec(&cache).context("serialize keyspace for cache file")?;
     244            0 :             std::fs::write(cachefile, bytes).context("write keyspace cache file to disk")?;
     245            0 :             info!("successfully wrote keyspace cache file");
     246            0 :         }
     247            0 :         all_ranges
     248              :     };
     249              : 
     250            0 :     let num_live_stats_dump = 1;
     251            0 :     let num_work_sender_tasks = args.num_clients.get() * timelines.len();
     252            0 :     let num_main_impl = 1;
     253            0 : 
     254            0 :     let shared_state = Arc::new(SharedState {
     255            0 :         start_work_barrier: tokio::sync::Barrier::new(
     256            0 :             num_live_stats_dump + num_work_sender_tasks + num_main_impl,
     257            0 :         ),
     258            0 :         live_stats: LiveStats::default(),
     259            0 :     });
     260            0 :     let cancel = CancellationToken::new();
     261            0 : 
     262            0 :     let ss = shared_state.clone();
     263            0 :     tokio::spawn({
     264            0 :         async move {
     265            0 :             ss.start_work_barrier.wait().await;
     266              :             loop {
     267            0 :                 let start = std::time::Instant::now();
     268            0 :                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     269            0 :                 let stats = &ss.live_stats;
     270            0 :                 let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
     271            0 :                 let missed = stats.missed.swap(0, Ordering::Relaxed);
     272            0 :                 let elapsed = start.elapsed();
     273            0 :                 info!(
     274            0 :                     "RPS: {:.0}   MISSED: {:.0}",
     275            0 :                     completed_requests as f64 / elapsed.as_secs_f64(),
     276            0 :                     missed as f64 / elapsed.as_secs_f64()
     277              :                 );
     278              :             }
     279            0 :         }
     280            0 :     });
     281            0 : 
     282            0 :     let rps_period = args
     283            0 :         .per_client_rate
     284            0 :         .map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
     285            0 :     let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
     286            0 :         let ss = shared_state.clone();
     287            0 :         let cancel = cancel.clone();
     288            0 :         let ranges: Vec<KeyRange> = all_ranges
     289            0 :             .iter()
     290            0 :             .filter(|r| r.timeline == worker_id.timeline)
     291            0 :             .cloned()
     292            0 :             .collect();
     293            0 :         let weights =
     294            0 :             rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
     295            0 :                 .unwrap();
     296            0 : 
     297            0 :         Box::pin(async move {
     298            0 :             client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
     299            0 :         })
     300            0 :     };
     301              : 
     302            0 :     info!("spawning workers");
     303            0 :     let mut workers = JoinSet::new();
     304            0 :     for timeline in timelines.iter().cloned() {
     305            0 :         for num_client in 0..args.num_clients.get() {
     306            0 :             let worker_id = WorkerId {
     307            0 :                 timeline,
     308            0 :                 num_client,
     309            0 :             };
     310            0 :             workers.spawn(make_worker(worker_id));
     311            0 :         }
     312              :     }
     313            0 :     let workers = async move {
     314            0 :         while let Some(res) = workers.join_next().await {
     315            0 :             res.unwrap();
     316            0 :         }
     317            0 :     };
     318              : 
     319            0 :     info!("waiting for everything to become ready");
     320            0 :     shared_state.start_work_barrier.wait().await;
     321            0 :     info!("work started");
     322            0 :     if let Some(runtime) = args.runtime {
     323            0 :         tokio::time::sleep(runtime.into()).await;
     324            0 :         info!("runtime over, signalling cancellation");
     325            0 :         cancel.cancel();
     326            0 :         workers.await;
     327            0 :         info!("work sender exited");
     328              :     } else {
     329            0 :         workers.await;
     330            0 :         unreachable!("work sender never terminates");
     331              :     }
     332              : 
     333            0 :     let output = Output {
     334              :         total: {
     335            0 :             let mut agg_stats = request_stats::Stats::new();
     336            0 :             for stats in all_thread_local_stats.lock().unwrap().iter() {
     337            0 :                 let stats = stats.lock().unwrap();
     338            0 :                 agg_stats.add(&stats);
     339            0 :             }
     340            0 :             agg_stats.output()
     341            0 :         },
     342            0 :     };
     343            0 : 
     344            0 :     let output = serde_json::to_string_pretty(&output).unwrap();
     345            0 :     println!("{output}");
     346            0 : 
     347            0 :     anyhow::Ok(())
     348            0 : }
     349              : 
     350            0 : async fn client_libpq(
     351            0 :     args: &Args,
     352            0 :     worker_id: WorkerId,
     353            0 :     shared_state: Arc<SharedState>,
     354            0 :     cancel: CancellationToken,
     355            0 :     rps_period: Option<Duration>,
     356            0 :     ranges: Vec<KeyRange>,
     357            0 :     weights: rand::distributions::weighted::WeightedIndex<i128>,
     358            0 : ) {
     359            0 :     let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
     360            0 :         .await
     361            0 :         .unwrap();
     362            0 :     let mut client = client
     363            0 :         .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
     364            0 :         .await
     365            0 :         .unwrap();
     366            0 : 
     367            0 :     shared_state.start_work_barrier.wait().await;
     368            0 :     let client_start = Instant::now();
     369            0 :     let mut ticks_processed = 0;
     370            0 :     let mut inflight = VecDeque::new();
     371            0 :     while !cancel.is_cancelled() {
     372              :         // Detect if a request took longer than the RPS rate
     373            0 :         if let Some(period) = &rps_period {
     374            0 :             let periods_passed_until_now =
     375            0 :                 usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
     376            0 : 
     377            0 :             if periods_passed_until_now > ticks_processed {
     378            0 :                 shared_state
     379            0 :                     .live_stats
     380            0 :                     .missed((periods_passed_until_now - ticks_processed) as u64);
     381            0 :             }
     382            0 :             ticks_processed = periods_passed_until_now;
     383            0 :         }
     384              : 
     385            0 :         while inflight.len() < args.queue_depth.get() {
     386            0 :             let start = Instant::now();
     387            0 :             let req = {
     388            0 :                 let mut rng = rand::thread_rng();
     389            0 :                 let r = &ranges[weights.sample(&mut rng)];
     390            0 :                 let key: i128 = rng.gen_range(r.start..r.end);
     391            0 :                 let key = Key::from_i128(key);
     392            0 :                 assert!(key.is_rel_block_key());
     393            0 :                 let (rel_tag, block_no) = key
     394            0 :                     .to_rel_block()
     395            0 :                     .expect("we filter non-rel-block keys out above");
     396            0 :                 PagestreamGetPageRequest {
     397            0 :                     hdr: PagestreamRequest {
     398            0 :                         reqid: 0,
     399            0 :                         request_lsn: if rng.gen_bool(args.req_latest_probability) {
     400            0 :                             Lsn::MAX
     401              :                         } else {
     402            0 :                             r.timeline_lsn
     403              :                         },
     404            0 :                         not_modified_since: r.timeline_lsn,
     405            0 :                     },
     406            0 :                     rel: rel_tag,
     407            0 :                     blkno: block_no,
     408            0 :                 }
     409            0 :             };
     410            0 :             client.getpage_send(req).await.unwrap();
     411            0 :             inflight.push_back(start);
     412              :         }
     413              : 
     414            0 :         let start = inflight.pop_front().unwrap();
     415            0 :         client.getpage_recv().await.unwrap();
     416            0 :         let end = Instant::now();
     417            0 :         shared_state.live_stats.request_done();
     418            0 :         ticks_processed += 1;
     419            0 :         STATS.with(|stats| {
     420            0 :             stats
     421            0 :                 .borrow()
     422            0 :                 .lock()
     423            0 :                 .unwrap()
     424            0 :                 .observe(end.duration_since(start))
     425            0 :                 .unwrap();
     426            0 :         });
     427              : 
     428            0 :         if let Some(period) = &rps_period {
     429            0 :             let next_at = client_start
     430            0 :                 + Duration::from_micros(
     431            0 :                     (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
     432            0 :                 );
     433            0 :             tokio::time::sleep_until(next_at.into()).await;
     434            0 :         }
     435              :     }
     436            0 : }
        

Generated by: LCOV version 2.1-beta