LCOV - differential code coverage report
Current view: top level - pageserver/pagebench/src/cmd - getpage_latest_lsn.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 228 0 228
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 46 0 46
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use anyhow::Context;
       2                 : use futures::future::join_all;
       3                 : use pageserver::pgdatadir_mapping::key_to_rel_block;
       4                 : use pageserver::repository;
       5                 : use pageserver_api::key::is_rel_block_key;
       6                 : use pageserver_api::models::PagestreamGetPageRequest;
       7                 : 
       8                 : use utils::id::TenantTimelineId;
       9                 : use utils::lsn::Lsn;
      10                 : 
      11                 : use rand::prelude::*;
      12                 : use tokio::sync::Barrier;
      13                 : use tokio::task::JoinSet;
      14                 : use tracing::{info, instrument};
      15                 : 
      16                 : use std::collections::HashMap;
      17                 : use std::future::Future;
      18                 : use std::num::NonZeroUsize;
      19                 : use std::pin::Pin;
      20                 : use std::sync::atomic::{AtomicU64, Ordering};
      21                 : use std::sync::{Arc, Mutex};
      22                 : use std::time::{Duration, Instant};
      23                 : 
      24                 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
      25                 : use crate::util::{request_stats, tokio_thread_local_stats};
      26                 : 
      27                 : /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
      28 UBC           0 : #[derive(clap::Parser)]
      29                 : pub(crate) struct Args {
      30                 :     #[clap(long, default_value = "http://localhost:9898")]
      31               0 :     mgmt_api_endpoint: String,
      32                 :     #[clap(long, default_value = "postgres://postgres@localhost:64000")]
      33               0 :     page_service_connstring: String,
      34                 :     #[clap(long)]
      35                 :     pageserver_jwt: Option<String>,
      36                 :     #[clap(long, default_value = "1")]
      37               0 :     num_clients: NonZeroUsize,
      38                 :     #[clap(long)]
      39                 :     runtime: Option<humantime::Duration>,
      40                 :     #[clap(long)]
      41                 :     per_target_rate_limit: Option<usize>,
      42                 :     /// Probability for sending `latest=true` in the request (uniform distribution).
      43                 :     #[clap(long, default_value = "1")]
      44               0 :     req_latest_probability: f64,
      45                 :     #[clap(long)]
      46                 :     limit_to_first_n_targets: Option<usize>,
      47               0 :     targets: Option<Vec<TenantTimelineId>>,
      48                 : }
      49                 : 
      50               0 : #[derive(Debug, Default)]
      51                 : struct LiveStats {
      52                 :     completed_requests: AtomicU64,
      53                 : }
      54                 : 
      55                 : impl LiveStats {
      56               0 :     fn inc(&self) {
      57               0 :         self.completed_requests.fetch_add(1, Ordering::Relaxed);
      58               0 :     }
      59                 : }
      60                 : 
      61               0 : #[derive(Clone)]
      62                 : struct KeyRange {
      63                 :     timeline: TenantTimelineId,
      64                 :     timeline_lsn: Lsn,
      65                 :     start: i128,
      66                 :     end: i128,
      67                 : }
      68                 : 
      69                 : impl KeyRange {
      70               0 :     fn len(&self) -> i128 {
      71               0 :         self.end - self.start
      72               0 :     }
      73                 : }
      74                 : 
      75               0 : #[derive(serde::Serialize)]
      76                 : struct Output {
      77                 :     total: request_stats::Output,
      78                 : }
      79                 : 
      80               0 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
      81                 : 
      82               0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
      83               0 :     tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
      84               0 :         main_impl(args, thread_local_stats)
      85               0 :     })
      86               0 : }
      87                 : 
      88               0 : async fn main_impl(
      89               0 :     args: Args,
      90               0 :     all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
      91               0 : ) -> anyhow::Result<()> {
      92               0 :     let args: &'static Args = Box::leak(Box::new(args));
      93               0 : 
      94               0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
      95               0 :         args.mgmt_api_endpoint.clone(),
      96               0 :         args.pageserver_jwt.as_deref(),
      97               0 :     ));
      98                 : 
      99                 :     // discover targets
     100               0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
     101               0 :         &mgmt_api_client,
     102               0 :         crate::util::cli::targets::Spec {
     103               0 :             limit_to_first_n_targets: args.limit_to_first_n_targets,
     104               0 :             targets: args.targets.clone(),
     105               0 :         },
     106               0 :     )
     107               0 :     .await?;
     108                 : 
     109               0 :     let mut js = JoinSet::new();
     110               0 :     for timeline in &timelines {
     111               0 :         js.spawn({
     112               0 :             let mgmt_api_client = Arc::clone(&mgmt_api_client);
     113               0 :             let timeline = *timeline;
     114               0 :             async move {
     115               0 :                 let partitioning = mgmt_api_client
     116               0 :                     .keyspace(timeline.tenant_id, timeline.timeline_id)
     117               0 :                     .await?;
     118               0 :                 let lsn = partitioning.at_lsn;
     119               0 : 
     120               0 :                 let ranges = partitioning
     121               0 :                     .keys
     122               0 :                     .ranges
     123               0 :                     .iter()
     124               0 :                     .filter_map(|r| {
     125               0 :                         let start = r.start;
     126               0 :                         let end = r.end;
     127               0 :                         // filter out non-relblock keys
     128               0 :                         match (is_rel_block_key(&start), is_rel_block_key(&end)) {
     129               0 :                             (true, true) => Some(KeyRange {
     130               0 :                                 timeline,
     131               0 :                                 timeline_lsn: lsn,
     132               0 :                                 start: start.to_i128(),
     133               0 :                                 end: end.to_i128(),
     134               0 :                             }),
     135                 :                             (true, false) | (false, true) => {
     136               0 :                                 unimplemented!("split up range")
     137                 :                             }
     138               0 :                             (false, false) => None,
     139                 :                         }
     140               0 :                     })
     141               0 :                     .collect::<Vec<_>>();
     142               0 : 
     143               0 :                 anyhow::Ok(ranges)
     144               0 :             }
     145               0 :         });
     146               0 :     }
     147               0 :     let mut all_ranges: Vec<KeyRange> = Vec::new();
     148               0 :     while let Some(res) = js.join_next().await {
     149               0 :         all_ranges.extend(res.unwrap().unwrap());
     150               0 :     }
     151                 : 
     152               0 :     let live_stats = Arc::new(LiveStats::default());
     153               0 : 
     154               0 :     let num_client_tasks = timelines.len();
     155               0 :     let num_live_stats_dump = 1;
     156               0 :     let num_work_sender_tasks = 1;
     157               0 : 
     158               0 :     let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
     159               0 :         num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
     160               0 :     ));
     161               0 :     let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
     162               0 : 
     163               0 :     tokio::spawn({
     164               0 :         let stats = Arc::clone(&live_stats);
     165               0 :         let start_work_barrier = Arc::clone(&start_work_barrier);
     166               0 :         async move {
     167               0 :             start_work_barrier.wait().await;
     168                 :             loop {
     169               0 :                 let start = std::time::Instant::now();
     170               0 :                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     171               0 :                 let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
     172               0 :                 let elapsed = start.elapsed();
     173               0 :                 info!(
     174               0 :                     "RPS: {:.0}",
     175               0 :                     completed_requests as f64 / elapsed.as_secs_f64()
     176               0 :                 );
     177                 :             }
     178               0 :         }
     179               0 :     });
     180               0 : 
     181               0 :     let mut work_senders = HashMap::new();
     182               0 :     let mut tasks = Vec::new();
     183               0 :     for tl in &timelines {
     184               0 :         let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
     185               0 :         work_senders.insert(tl, sender);
     186               0 :         tasks.push(tokio::spawn(client(
     187               0 :             args,
     188               0 :             *tl,
     189               0 :             Arc::clone(&start_work_barrier),
     190               0 :             receiver,
     191               0 :             Arc::clone(&all_work_done_barrier),
     192               0 :             Arc::clone(&live_stats),
     193               0 :         )));
     194               0 :     }
     195                 : 
     196               0 :     let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = match args.per_target_rate_limit {
     197               0 :         None => Box::pin(async move {
     198               0 :             let weights = rand::distributions::weighted::WeightedIndex::new(
     199               0 :                 all_ranges.iter().map(|v| v.len()),
     200               0 :             )
     201               0 :             .unwrap();
     202               0 : 
     203               0 :             start_work_barrier.wait().await;
     204                 : 
     205                 :             loop {
     206               0 :                 let (timeline, req) = {
     207               0 :                     let mut rng = rand::thread_rng();
     208               0 :                     let r = &all_ranges[weights.sample(&mut rng)];
     209               0 :                     let key: i128 = rng.gen_range(r.start..r.end);
     210               0 :                     let key = repository::Key::from_i128(key);
     211               0 :                     let (rel_tag, block_no) =
     212               0 :                         key_to_rel_block(key).expect("we filter non-rel-block keys out above");
     213               0 :                     (
     214               0 :                         r.timeline,
     215               0 :                         PagestreamGetPageRequest {
     216               0 :                             latest: rng.gen_bool(args.req_latest_probability),
     217               0 :                             lsn: r.timeline_lsn,
     218               0 :                             rel: rel_tag,
     219               0 :                             blkno: block_no,
     220               0 :                         },
     221               0 :                     )
     222               0 :                 };
     223               0 :                 let sender = work_senders.get(&timeline).unwrap();
     224               0 :                 // TODO: what if this blocks?
     225               0 :                 sender.send(req).await.ok().unwrap();
     226                 :             }
     227               0 :         }),
     228               0 :         Some(rps_limit) => Box::pin(async move {
     229               0 :             let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
     230               0 : 
     231               0 :             let make_timeline_task: &dyn Fn(
     232               0 :                 TenantTimelineId,
     233               0 :             )
     234               0 :                 -> Pin<Box<dyn Send + Future<Output = ()>>> = &|timeline| {
     235               0 :                 let sender = work_senders.get(&timeline).unwrap();
     236               0 :                 let ranges: Vec<KeyRange> = all_ranges
     237               0 :                     .iter()
     238               0 :                     .filter(|r| r.timeline == timeline)
     239               0 :                     .cloned()
     240               0 :                     .collect();
     241               0 :                 let weights = rand::distributions::weighted::WeightedIndex::new(
     242               0 :                     ranges.iter().map(|v| v.len()),
     243               0 :                 )
     244               0 :                 .unwrap();
     245               0 : 
     246               0 :                 Box::pin(async move {
     247               0 :                     let mut ticker = tokio::time::interval(period);
     248               0 :                     ticker.set_missed_tick_behavior(
     249               0 :                         /* TODO review this choice */
     250               0 :                         tokio::time::MissedTickBehavior::Burst,
     251               0 :                     );
     252                 :                     loop {
     253               0 :                         ticker.tick().await;
     254               0 :                         let req = {
     255               0 :                             let mut rng = rand::thread_rng();
     256               0 :                             let r = &ranges[weights.sample(&mut rng)];
     257               0 :                             let key: i128 = rng.gen_range(r.start..r.end);
     258               0 :                             let key = repository::Key::from_i128(key);
     259               0 :                             let (rel_tag, block_no) = key_to_rel_block(key)
     260               0 :                                 .expect("we filter non-rel-block keys out above");
     261               0 :                             PagestreamGetPageRequest {
     262               0 :                                 latest: rng.gen_bool(args.req_latest_probability),
     263               0 :                                 lsn: r.timeline_lsn,
     264               0 :                                 rel: rel_tag,
     265               0 :                                 blkno: block_no,
     266               0 :                             }
     267               0 :                         };
     268               0 :                         sender.send(req).await.ok().unwrap();
     269                 :                     }
     270               0 :                 })
     271               0 :             };
     272                 : 
     273               0 :             let tasks: Vec<_> = work_senders
     274               0 :                 .keys()
     275               0 :                 .map(|tl| make_timeline_task(**tl))
     276               0 :                 .collect();
     277               0 : 
     278               0 :             start_work_barrier.wait().await;
     279                 : 
     280               0 :             join_all(tasks).await;
     281               0 :         }),
     282                 :     };
     283                 : 
     284               0 :     if let Some(runtime) = args.runtime {
     285               0 :         match tokio::time::timeout(runtime.into(), work_sender).await {
     286               0 :             Ok(()) => unreachable!("work sender never terminates"),
     287               0 :             Err(_timeout) => {
     288               0 :                 // this implicitly drops the work_senders, making all the clients exit
     289               0 :             }
     290                 :         }
     291                 :     } else {
     292               0 :         work_sender.await;
     293               0 :         unreachable!("work sender never terminates");
     294                 :     }
     295                 : 
     296               0 :     for t in tasks {
     297               0 :         t.await.unwrap();
     298                 :     }
     299                 : 
     300               0 :     let output = Output {
     301                 :         total: {
     302               0 :             let mut agg_stats = request_stats::Stats::new();
     303               0 :             for stats in all_thread_local_stats.lock().unwrap().iter() {
     304               0 :                 let stats = stats.lock().unwrap();
     305               0 :                 agg_stats.add(&stats);
     306               0 :             }
     307               0 :             agg_stats.output()
     308               0 :         },
     309               0 :     };
     310               0 : 
     311               0 :     let output = serde_json::to_string_pretty(&output).unwrap();
     312               0 :     println!("{output}");
     313               0 : 
     314               0 :     anyhow::Ok(())
     315               0 : }
     316                 : 
     317               0 : #[instrument(skip_all)]
     318                 : async fn client(
     319                 :     args: &'static Args,
     320                 :     timeline: TenantTimelineId,
     321                 :     start_work_barrier: Arc<Barrier>,
     322                 :     mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
     323                 :     all_work_done_barrier: Arc<Barrier>,
     324                 :     live_stats: Arc<LiveStats>,
     325                 : ) {
     326                 :     start_work_barrier.wait().await;
     327                 : 
     328                 :     let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
     329                 :         .await
     330                 :         .unwrap();
     331                 :     let mut client = client
     332                 :         .pagestream(timeline.tenant_id, timeline.timeline_id)
     333                 :         .await
     334                 :         .unwrap();
     335                 : 
     336                 :     while let Some(req) = work.recv().await {
     337                 :         let start = Instant::now();
     338                 :         client
     339                 :             .getpage(req)
     340                 :             .await
     341               0 :             .with_context(|| format!("getpage for {timeline}"))
     342                 :             .unwrap();
     343                 :         let elapsed = start.elapsed();
     344                 :         live_stats.inc();
     345               0 :         STATS.with(|stats| {
     346               0 :             stats.borrow().lock().unwrap().observe(elapsed).unwrap();
     347               0 :         });
     348                 :     }
     349                 : 
     350                 :     all_work_done_barrier.wait().await;
     351                 : }
        

Generated by: LCOV version 2.1-beta