LCOV - code coverage report
Current view: top level - pageserver/pagebench/src/cmd - getpage_latest_lsn.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 0.0 % 303 0
Test Date: 2024-02-14 18:05:35 Functions: 0.0 % 84 0

            Line data    Source code
       1              : use anyhow::Context;
       2              : use camino::Utf8PathBuf;
       3              : use futures::future::join_all;
       4              : use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
       5              : use pageserver_api::keyspace::KeySpaceAccum;
       6              : use pageserver_api::models::PagestreamGetPageRequest;
       7              : 
       8              : use tokio_util::sync::CancellationToken;
       9              : use utils::id::TenantTimelineId;
      10              : use utils::lsn::Lsn;
      11              : 
      12              : use rand::prelude::*;
      13              : use tokio::sync::Barrier;
      14              : use tokio::task::JoinSet;
      15              : use tracing::{info, instrument};
      16              : 
      17              : use std::collections::{HashMap, HashSet};
      18              : use std::future::Future;
      19              : use std::num::NonZeroUsize;
      20              : use std::pin::Pin;
      21              : use std::sync::atomic::{AtomicU64, Ordering};
      22              : use std::sync::{Arc, Mutex};
      23              : use std::time::{Duration, Instant};
      24              : 
      25              : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
      26              : use crate::util::{request_stats, tokio_thread_local_stats};
      27              : 
      28              : /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
      29            0 : #[derive(clap::Parser)]
      30              : pub(crate) struct Args {
      31              :     #[clap(long, default_value = "http://localhost:9898")]
      32            0 :     mgmt_api_endpoint: String,
      33              :     #[clap(long, default_value = "postgres://postgres@localhost:64000")]
      34            0 :     page_service_connstring: String,
      35              :     #[clap(long)]
      36              :     pageserver_jwt: Option<String>,
      37              :     #[clap(long, default_value = "1")]
      38            0 :     num_clients: NonZeroUsize,
      39              :     #[clap(long)]
      40              :     runtime: Option<humantime::Duration>,
      41              :     #[clap(long)]
      42              :     per_target_rate_limit: Option<usize>,
      43              :     /// Probability for sending `latest=true` in the request (uniform distribution).
      44              :     #[clap(long, default_value = "1")]
      45            0 :     req_latest_probability: f64,
      46              :     #[clap(long)]
      47              :     limit_to_first_n_targets: Option<usize>,
      48              :     /// For large pageserver installations, enumerating the keyspace takes a lot of time.
      49              :     /// If specified, the specified path is used to maintain a cache of the keyspace enumeration result.
      50              :     /// The cache is tagged and auto-invalided by the tenant/timeline ids only.
      51              :     /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
      52              :     #[clap(long)]
      53              :     keyspace_cache: Option<Utf8PathBuf>,
      54              :     /// Before starting the benchmark, live-reconfigure the pageserver to use the given
      55              :     /// [`pageserver_api::models::virtual_file::IoEngineKind`].
      56              :     #[clap(long)]
      57              :     set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
      58            0 :     targets: Option<Vec<TenantTimelineId>>,
      59              : }
      60              : 
      61            0 : #[derive(Debug, Default)]
      62              : struct LiveStats {
      63              :     completed_requests: AtomicU64,
      64              : }
      65              : 
      66              : impl LiveStats {
      67            0 :     fn inc(&self) {
      68            0 :         self.completed_requests.fetch_add(1, Ordering::Relaxed);
      69            0 :     }
      70              : }
      71              : 
      72            0 : #[derive(Clone, serde::Serialize, serde::Deserialize)]
      73              : struct KeyRange {
      74              :     timeline: TenantTimelineId,
      75              :     timeline_lsn: Lsn,
      76              :     start: i128,
      77              :     end: i128,
      78              : }
      79              : 
      80              : impl KeyRange {
      81            0 :     fn len(&self) -> i128 {
      82            0 :         self.end - self.start
      83            0 :     }
      84              : }
      85              : 
      86            0 : #[derive(PartialEq, Eq, Hash, Copy, Clone)]
      87              : struct WorkerId {
      88              :     timeline: TenantTimelineId,
      89              :     num_client: usize, // from 0..args.num_clients
      90              : }
      91              : 
      92            0 : #[derive(serde::Serialize)]
      93              : struct Output {
      94              :     total: request_stats::Output,
      95              : }
      96              : 
      97            0 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
      98              : 
      99            0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
     100            0 :     tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
     101            0 :         main_impl(args, thread_local_stats)
     102            0 :     })
     103            0 : }
     104              : 
     105            0 : async fn main_impl(
     106            0 :     args: Args,
     107            0 :     all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
     108            0 : ) -> anyhow::Result<()> {
     109            0 :     let args: &'static Args = Box::leak(Box::new(args));
     110            0 : 
     111            0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
     112            0 :         args.mgmt_api_endpoint.clone(),
     113            0 :         args.pageserver_jwt.as_deref(),
     114            0 :     ));
     115              : 
     116            0 :     if let Some(engine_str) = &args.set_io_engine {
     117            0 :         mgmt_api_client.put_io_engine(engine_str).await?;
     118            0 :     }
     119              : 
     120              :     // discover targets
     121            0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
     122            0 :         &mgmt_api_client,
     123            0 :         crate::util::cli::targets::Spec {
     124            0 :             limit_to_first_n_targets: args.limit_to_first_n_targets,
     125            0 :             targets: args.targets.clone(),
     126            0 :         },
     127            0 :     )
     128            0 :     .await?;
     129              : 
     130            0 :     #[derive(serde::Deserialize)]
     131              :     struct KeyspaceCacheDe {
     132              :         tag: Vec<TenantTimelineId>,
     133              :         data: Vec<KeyRange>,
     134              :     }
     135            0 :     #[derive(serde::Serialize)]
     136              :     struct KeyspaceCacheSer<'a> {
     137              :         tag: &'a [TenantTimelineId],
     138              :         data: &'a [KeyRange],
     139              :     }
     140            0 :     let cache = args
     141            0 :         .keyspace_cache
     142            0 :         .as_ref()
     143            0 :         .map(|keyspace_cache_file| {
     144            0 :             let contents = match std::fs::read(keyspace_cache_file) {
     145            0 :                 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
     146            0 :                     return anyhow::Ok(None);
     147              :                 }
     148            0 :                 x => x.context("read keyspace cache file")?,
     149              :             };
     150            0 :             let cache: KeyspaceCacheDe =
     151            0 :                 serde_json::from_slice(&contents).context("deserialize cache file")?;
     152            0 :             let tag_ok = HashSet::<TenantTimelineId>::from_iter(cache.tag.into_iter())
     153            0 :                 == HashSet::from_iter(timelines.iter().cloned());
     154            0 :             info!("keyspace cache file matches tag: {tag_ok}");
     155            0 :             anyhow::Ok(if tag_ok { Some(cache.data) } else { None })
     156            0 :         })
     157            0 :         .transpose()?
     158            0 :         .flatten();
     159            0 :     let all_ranges: Vec<KeyRange> = if let Some(cached) = cache {
     160            0 :         info!("using keyspace cache file");
     161            0 :         cached
     162              :     } else {
     163            0 :         let mut js = JoinSet::new();
     164            0 :         for timeline in &timelines {
     165            0 :             js.spawn({
     166            0 :                 let mgmt_api_client = Arc::clone(&mgmt_api_client);
     167            0 :                 let timeline = *timeline;
     168            0 :                 async move {
     169            0 :                     let partitioning = mgmt_api_client
     170            0 :                         .keyspace(timeline.tenant_id, timeline.timeline_id)
     171            0 :                         .await?;
     172            0 :                     let lsn = partitioning.at_lsn;
     173            0 :                     let start = Instant::now();
     174            0 :                     let mut filtered = KeySpaceAccum::new();
     175              :                     // let's hope this is inlined and vectorized...
     176              :                     // TODO: turn this loop into a is_rel_block_range() function.
     177            0 :                     for r in partitioning.keys.ranges.iter() {
     178            0 :                         let mut i = r.start;
     179            0 :                         while i != r.end {
     180            0 :                             if is_rel_block_key(&i) {
     181            0 :                                 filtered.add_key(i);
     182            0 :                             }
     183            0 :                             i = i.next();
     184              :                         }
     185              :                     }
     186            0 :                     let filtered = filtered.to_keyspace();
     187            0 :                     let filter_duration = start.elapsed();
     188            0 : 
     189            0 :                     anyhow::Ok((
     190            0 :                         filter_duration,
     191            0 :                         filtered.ranges.into_iter().map(move |r| KeyRange {
     192            0 :                             timeline,
     193            0 :                             timeline_lsn: lsn,
     194            0 :                             start: r.start.to_i128(),
     195            0 :                             end: r.end.to_i128(),
     196            0 :                         }),
     197            0 :                     ))
     198            0 :                 }
     199            0 :             });
     200            0 :         }
     201            0 :         let mut total_filter_duration = Duration::from_secs(0);
     202            0 :         let mut all_ranges: Vec<KeyRange> = Vec::new();
     203            0 :         while let Some(res) = js.join_next().await {
     204            0 :             let (filter_duration, range) = res.unwrap().unwrap();
     205            0 :             all_ranges.extend(range);
     206            0 :             total_filter_duration += filter_duration;
     207            0 :         }
     208            0 :         info!("filter duration: {}", total_filter_duration.as_secs_f64());
     209            0 :         if let Some(cachefile) = args.keyspace_cache.as_ref() {
     210            0 :             let cache = KeyspaceCacheSer {
     211            0 :                 tag: &timelines,
     212            0 :                 data: &all_ranges,
     213            0 :             };
     214            0 :             let bytes = serde_json::to_vec(&cache).context("serialize keyspace for cache file")?;
     215            0 :             std::fs::write(cachefile, bytes).context("write keyspace cache file to disk")?;
     216            0 :             info!("successfully wrote keyspace cache file");
     217            0 :         }
     218            0 :         all_ranges
     219              :     };
     220              : 
     221            0 :     let live_stats = Arc::new(LiveStats::default());
     222            0 : 
     223            0 :     let num_client_tasks = args.num_clients.get() * timelines.len();
     224            0 :     let num_live_stats_dump = 1;
     225            0 :     let num_work_sender_tasks = 1;
     226            0 :     let num_main_impl = 1;
     227            0 : 
     228            0 :     let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
     229            0 :         num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl,
     230            0 :     ));
     231            0 : 
     232            0 :     tokio::spawn({
     233            0 :         let stats = Arc::clone(&live_stats);
     234            0 :         let start_work_barrier = Arc::clone(&start_work_barrier);
     235            0 :         async move {
     236            0 :             start_work_barrier.wait().await;
     237              :             loop {
     238            0 :                 let start = std::time::Instant::now();
     239            0 :                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     240            0 :                 let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
     241            0 :                 let elapsed = start.elapsed();
     242            0 :                 info!(
     243            0 :                     "RPS: {:.0}",
     244            0 :                     completed_requests as f64 / elapsed.as_secs_f64()
     245            0 :                 );
     246              :             }
     247            0 :         }
     248            0 :     });
     249            0 : 
     250            0 :     let cancel = CancellationToken::new();
     251            0 : 
     252            0 :     let mut work_senders: HashMap<WorkerId, _> = HashMap::new();
     253            0 :     let mut tasks = Vec::new();
     254            0 :     for timeline in timelines.iter().cloned() {
     255            0 :         for num_client in 0..args.num_clients.get() {
     256            0 :             let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
     257            0 :             let worker_id = WorkerId {
     258            0 :                 timeline,
     259            0 :                 num_client,
     260            0 :             };
     261            0 :             work_senders.insert(worker_id, sender);
     262            0 :             tasks.push(tokio::spawn(client(
     263            0 :                 args,
     264            0 :                 worker_id,
     265            0 :                 Arc::clone(&start_work_barrier),
     266            0 :                 receiver,
     267            0 :                 Arc::clone(&live_stats),
     268            0 :                 cancel.clone(),
     269            0 :             )));
     270            0 :         }
     271              :     }
     272              : 
     273            0 :     let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = {
     274            0 :         let start_work_barrier = start_work_barrier.clone();
     275            0 :         let cancel = cancel.clone();
     276            0 :         match args.per_target_rate_limit {
     277            0 :             None => Box::pin(async move {
     278            0 :                 let weights = rand::distributions::weighted::WeightedIndex::new(
     279            0 :                     all_ranges.iter().map(|v| v.len()),
     280            0 :                 )
     281            0 :                 .unwrap();
     282            0 : 
     283            0 :                 start_work_barrier.wait().await;
     284              : 
     285            0 :                 while !cancel.is_cancelled() {
     286            0 :                     let (timeline, req) = {
     287            0 :                         let mut rng = rand::thread_rng();
     288            0 :                         let r = &all_ranges[weights.sample(&mut rng)];
     289            0 :                         let key: i128 = rng.gen_range(r.start..r.end);
     290            0 :                         let key = Key::from_i128(key);
     291            0 :                         let (rel_tag, block_no) =
     292            0 :                             key_to_rel_block(key).expect("we filter non-rel-block keys out above");
     293            0 :                         (
     294            0 :                             WorkerId {
     295            0 :                                 timeline: r.timeline,
     296            0 :                                 num_client: rng.gen_range(0..args.num_clients.get()),
     297            0 :                             },
     298            0 :                             PagestreamGetPageRequest {
     299            0 :                                 latest: rng.gen_bool(args.req_latest_probability),
     300            0 :                                 lsn: r.timeline_lsn,
     301            0 :                                 rel: rel_tag,
     302            0 :                                 blkno: block_no,
     303            0 :                             },
     304            0 :                         )
     305            0 :                     };
     306            0 :                     let sender = work_senders.get(&timeline).unwrap();
     307            0 :                     // TODO: what if this blocks?
     308            0 :                     if sender.send(req).await.is_err() {
     309            0 :                         assert!(cancel.is_cancelled(), "client has gone away unexpectedly");
     310            0 :                     }
     311              :                 }
     312            0 :             }),
     313            0 :             Some(rps_limit) => Box::pin(async move {
     314            0 :                 let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
     315            0 :                 let make_task: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> =
     316            0 :                     &|worker_id| {
     317            0 :                         let sender = work_senders.get(&worker_id).unwrap();
     318            0 :                         let ranges: Vec<KeyRange> = all_ranges
     319            0 :                             .iter()
     320            0 :                             .filter(|r| r.timeline == worker_id.timeline)
     321            0 :                             .cloned()
     322            0 :                             .collect();
     323            0 :                         let weights = rand::distributions::weighted::WeightedIndex::new(
     324            0 :                             ranges.iter().map(|v| v.len()),
     325            0 :                         )
     326            0 :                         .unwrap();
     327            0 : 
     328            0 :                         let cancel = cancel.clone();
     329            0 :                         Box::pin(async move {
     330            0 :                             let mut ticker = tokio::time::interval(period);
     331            0 :                             ticker.set_missed_tick_behavior(
     332            0 :                                 /* TODO review this choice */
     333            0 :                                 tokio::time::MissedTickBehavior::Burst,
     334            0 :                             );
     335            0 :                             while !cancel.is_cancelled() {
     336            0 :                                 ticker.tick().await;
     337            0 :                                 let req = {
     338            0 :                                     let mut rng = rand::thread_rng();
     339            0 :                                     let r = &ranges[weights.sample(&mut rng)];
     340            0 :                                     let key: i128 = rng.gen_range(r.start..r.end);
     341            0 :                                     let key = Key::from_i128(key);
     342            0 :                                     assert!(is_rel_block_key(&key));
     343            0 :                                     let (rel_tag, block_no) = key_to_rel_block(key)
     344            0 :                                         .expect("we filter non-rel-block keys out above");
     345            0 :                                     PagestreamGetPageRequest {
     346            0 :                                         latest: rng.gen_bool(args.req_latest_probability),
     347            0 :                                         lsn: r.timeline_lsn,
     348            0 :                                         rel: rel_tag,
     349            0 :                                         blkno: block_no,
     350            0 :                                     }
     351            0 :                                 };
     352            0 :                                 if sender.send(req).await.is_err() {
     353            0 :                                     assert!(
     354            0 :                                         cancel.is_cancelled(),
     355            0 :                                         "client has gone away unexpectedly"
     356              :                                     );
     357            0 :                                 }
     358              :                             }
     359            0 :                         })
     360            0 :                     };
     361              : 
     362            0 :                 let tasks: Vec<_> = work_senders.keys().map(|tl| make_task(*tl)).collect();
     363            0 : 
     364            0 :                 start_work_barrier.wait().await;
     365              : 
     366            0 :                 join_all(tasks).await;
     367            0 :             }),
     368              :         }
     369              :     };
     370              : 
     371            0 :     let work_sender_task = tokio::spawn(work_sender);
     372              : 
     373            0 :     info!("waiting for everything to become ready");
     374            0 :     start_work_barrier.wait().await;
     375            0 :     info!("work started");
     376            0 :     if let Some(runtime) = args.runtime {
     377            0 :         tokio::time::sleep(runtime.into()).await;
     378            0 :         info!("runtime over, signalling cancellation");
     379            0 :         cancel.cancel();
     380            0 :         work_sender_task.await.unwrap();
     381            0 :         info!("work sender exited");
     382              :     } else {
     383            0 :         work_sender_task.await.unwrap();
     384            0 :         unreachable!("work sender never terminates");
     385              :     }
     386              : 
     387            0 :     info!("joining clients");
     388            0 :     for t in tasks {
     389            0 :         t.await.unwrap();
     390              :     }
     391              : 
     392            0 :     info!("all clients stopped");
     393              : 
     394            0 :     let output = Output {
     395              :         total: {
     396            0 :             let mut agg_stats = request_stats::Stats::new();
     397            0 :             for stats in all_thread_local_stats.lock().unwrap().iter() {
     398            0 :                 let stats = stats.lock().unwrap();
     399            0 :                 agg_stats.add(&stats);
     400            0 :             }
     401            0 :             agg_stats.output()
     402            0 :         },
     403            0 :     };
     404            0 : 
     405            0 :     let output = serde_json::to_string_pretty(&output).unwrap();
     406            0 :     println!("{output}");
     407            0 : 
     408            0 :     anyhow::Ok(())
     409            0 : }
     410              : 
     411            0 : #[instrument(skip_all)]
     412              : async fn client(
     413              :     args: &'static Args,
     414              :     id: WorkerId,
     415              :     start_work_barrier: Arc<Barrier>,
     416              :     mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
     417              :     live_stats: Arc<LiveStats>,
     418              :     cancel: CancellationToken,
     419              : ) {
     420              :     let WorkerId {
     421              :         timeline,
     422              :         num_client: _,
     423              :     } = id;
     424              :     let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
     425              :         .await
     426              :         .unwrap();
     427              :     let mut client = client
     428              :         .pagestream(timeline.tenant_id, timeline.timeline_id)
     429              :         .await
     430              :         .unwrap();
     431              : 
     432            0 :     let do_requests = async {
     433            0 :         start_work_barrier.wait().await;
     434            0 :         while let Some(req) = work.recv().await {
     435            0 :             let start = Instant::now();
     436            0 :             client
     437            0 :                 .getpage(req)
     438            0 :                 .await
     439            0 :                 .with_context(|| format!("getpage for {timeline}"))
     440            0 :                 .unwrap();
     441            0 :             let elapsed = start.elapsed();
     442            0 :             live_stats.inc();
     443            0 :             STATS.with(|stats| {
     444            0 :                 stats.borrow().lock().unwrap().observe(elapsed).unwrap();
     445            0 :             });
     446              :         }
     447            0 :     };
     448            0 :     tokio::select! {
     449            0 :         res = do_requests => { res },
     450              :         _ = cancel.cancelled() => {
     451              :             // fallthrough to shutdown
     452              :         }
     453              :     }
     454              :     client.shutdown().await;
     455              : }
        

Generated by: LCOV version 2.1-beta