LCOV - code coverage report
Current view: top level - pageserver/pagebench/src/cmd - getpage_latest_lsn.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 442 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 46 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet, VecDeque};
       2              : use std::future::Future;
       3              : use std::num::NonZeroUsize;
       4              : use std::pin::Pin;
       5              : use std::sync::atomic::{AtomicU64, Ordering};
       6              : use std::sync::{Arc, Mutex};
       7              : use std::time::{Duration, Instant};
       8              : 
       9              : use anyhow::Context;
      10              : use async_trait::async_trait;
      11              : use bytes::Bytes;
      12              : use camino::Utf8PathBuf;
      13              : use futures::stream::FuturesUnordered;
      14              : use futures::{Stream, StreamExt as _};
      15              : use pageserver_api::key::Key;
      16              : use pageserver_api::keyspace::KeySpaceAccum;
      17              : use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest};
      18              : use pageserver_api::reltag::RelTag;
      19              : use pageserver_api::shard::TenantShardId;
      20              : use pageserver_client_grpc::{self as client_grpc, ShardSpec};
      21              : use pageserver_page_api as page_api;
      22              : use rand::prelude::*;
      23              : use tokio::task::JoinSet;
      24              : use tokio_util::sync::CancellationToken;
      25              : use tracing::info;
      26              : use url::Url;
      27              : use utils::id::TenantTimelineId;
      28              : use utils::lsn::Lsn;
      29              : use utils::shard::ShardIndex;
      30              : 
      31              : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
      32              : use crate::util::{request_stats, tokio_thread_local_stats};
      33              : 
      34              : /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
      35              : #[derive(clap::Parser)]
      36              : pub(crate) struct Args {
      37              :     #[clap(long, default_value = "http://localhost:9898")]
      38              :     mgmt_api_endpoint: String,
      39              :     /// Pageserver connection string. Supports postgresql:// and grpc:// protocols.
      40              :     #[clap(long, default_value = "postgres://postgres@localhost:64000")]
      41              :     page_service_connstring: String,
      42              :     /// Use the rich gRPC Pageserver client `client_grpc::PageserverClient`, rather than the basic
      43              :     /// no-frills `page_api::Client`. Only valid with grpc:// connstrings.
      44              :     #[clap(long)]
      45              :     rich_client: bool,
      46              :     #[clap(long)]
      47              :     pageserver_jwt: Option<String>,
      48              :     #[clap(long, default_value = "1")]
      49              :     num_clients: NonZeroUsize,
      50              :     #[clap(long)]
      51              :     runtime: Option<humantime::Duration>,
      52              :     /// If true, enable compression (only for gRPC).
      53              :     #[clap(long)]
      54              :     compression: bool,
      55              :     /// Each client sends requests at the given rate.
      56              :     ///
      57              :     /// If a request takes too long and we should be issuing a new request already,
      58              :     /// we skip that request and account it as `MISSED`.
      59              :     #[clap(long)]
      60              :     per_client_rate: Option<usize>,
      61              :     /// Probability for sending `latest=true` in the request (uniform distribution).
      62              :     #[clap(long, default_value = "1")]
      63              :     req_latest_probability: f64,
      64              :     #[clap(long)]
      65              :     limit_to_first_n_targets: Option<usize>,
      66              :     /// For large pageserver installations, enumerating the keyspace takes a lot of time.
      67              :     /// If specified, the specified path is used to maintain a cache of the keyspace enumeration result.
      68              :     /// The cache is tagged and auto-invalided by the tenant/timeline ids only.
      69              :     /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
      70              :     #[clap(long)]
      71              :     keyspace_cache: Option<Utf8PathBuf>,
      72              :     /// Before starting the benchmark, live-reconfigure the pageserver to use the given
      73              :     /// [`pageserver_api::models::virtual_file::IoEngineKind`].
      74              :     #[clap(long)]
      75              :     set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
      76              : 
      77              :     /// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
      78              :     #[clap(long)]
      79              :     set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
      80              : 
      81              :     /// Queue depth generated in each client.
      82              :     #[clap(long, default_value = "1")]
      83              :     queue_depth: NonZeroUsize,
      84              : 
      85              :     /// Batch size of contiguous pages generated by each client. This is equivalent to how Postgres
      86              :     /// will request page batches (e.g. prefetches or vectored reads). A batch counts as 1 RPS and
      87              :     /// 1 queue depth.
      88              :     ///
      89              :     /// The libpq protocol does not support client-side batching, and will submit batches as many
      90              :     /// individual requests, in the hope that the server will batch them. Each batch still counts as
      91              :     /// 1 RPS and 1 queue depth.
      92              :     #[clap(long, default_value = "1")]
      93              :     batch_size: NonZeroUsize,
      94              : 
      95              :     #[clap(long)]
      96              :     only_relnode: Option<u32>,
      97              : 
      98              :     targets: Option<Vec<TenantTimelineId>>,
      99              : }
     100              : 
     101              : /// State shared by all clients
     102              : #[derive(Debug)]
     103              : struct SharedState {
     104              :     start_work_barrier: tokio::sync::Barrier,
     105              :     live_stats: LiveStats,
     106              : }
     107              : 
     108              : #[derive(Debug, Default)]
     109              : struct LiveStats {
     110              :     completed_requests: AtomicU64,
     111              :     missed: AtomicU64,
     112              : }
     113              : 
     114              : impl LiveStats {
     115            0 :     fn request_done(&self) {
     116            0 :         self.completed_requests.fetch_add(1, Ordering::Relaxed);
     117            0 :     }
     118            0 :     fn missed(&self, n: u64) {
     119            0 :         self.missed.fetch_add(n, Ordering::Relaxed);
     120            0 :     }
     121              : }
     122              : 
     123            0 : #[derive(Clone, serde::Serialize, serde::Deserialize)]
     124              : struct KeyRange {
     125              :     timeline: TenantTimelineId,
     126              :     timeline_lsn: Lsn,
     127              :     start: i128,
     128              :     end: i128,
     129              : }
     130              : 
     131              : impl KeyRange {
     132            0 :     fn len(&self) -> i128 {
     133            0 :         self.end - self.start
     134            0 :     }
     135              : }
     136              : 
     137              : #[derive(PartialEq, Eq, Hash, Copy, Clone)]
     138              : struct WorkerId {
     139              :     timeline: TenantTimelineId,
     140              :     num_client: usize, // from 0..args.num_clients
     141              : }
     142              : 
     143              : #[derive(serde::Serialize)]
     144              : struct Output {
     145              :     total: request_stats::Output,
     146              : }
     147              : 
     148              : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
     149              : 
     150            0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
     151            0 :     tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
     152            0 :         main_impl(args, thread_local_stats)
     153            0 :     })
     154            0 : }
     155              : 
     156            0 : async fn main_impl(
     157            0 :     args: Args,
     158            0 :     all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
     159            0 : ) -> anyhow::Result<()> {
     160            0 :     let args: &'static Args = Box::leak(Box::new(args));
     161              : 
     162            0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
     163            0 :         reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
     164            0 :         args.mgmt_api_endpoint.clone(),
     165            0 :         args.pageserver_jwt.as_deref(),
     166              :     ));
     167              : 
     168            0 :     if let Some(engine_str) = &args.set_io_engine {
     169            0 :         mgmt_api_client.put_io_engine(engine_str).await?;
     170            0 :     }
     171              : 
     172            0 :     if let Some(mode) = &args.set_io_mode {
     173            0 :         mgmt_api_client.put_io_mode(mode).await?;
     174            0 :     }
     175              : 
     176              :     // discover targets
     177            0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
     178            0 :         &mgmt_api_client,
     179            0 :         crate::util::cli::targets::Spec {
     180            0 :             limit_to_first_n_targets: args.limit_to_first_n_targets,
     181            0 :             targets: args.targets.clone(),
     182            0 :         },
     183            0 :     )
     184            0 :     .await?;
     185              : 
     186            0 :     #[derive(serde::Deserialize)]
     187              :     struct KeyspaceCacheDe {
     188              :         tag: Vec<TenantTimelineId>,
     189              :         data: Vec<KeyRange>,
     190              :     }
     191              :     #[derive(serde::Serialize)]
     192              :     struct KeyspaceCacheSer<'a> {
     193              :         tag: &'a [TenantTimelineId],
     194              :         data: &'a [KeyRange],
     195              :     }
     196            0 :     let cache = args
     197            0 :         .keyspace_cache
     198            0 :         .as_ref()
     199            0 :         .map(|keyspace_cache_file| {
     200            0 :             let contents = match std::fs::read(keyspace_cache_file) {
     201            0 :                 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
     202            0 :                     return anyhow::Ok(None);
     203              :                 }
     204            0 :                 x => x.context("read keyspace cache file")?,
     205              :             };
     206            0 :             let cache: KeyspaceCacheDe =
     207            0 :                 serde_json::from_slice(&contents).context("deserialize cache file")?;
     208            0 :             let tag_ok = HashSet::<TenantTimelineId>::from_iter(cache.tag.into_iter())
     209            0 :                 == HashSet::from_iter(timelines.iter().cloned());
     210            0 :             info!("keyspace cache file matches tag: {tag_ok}");
     211            0 :             anyhow::Ok(if tag_ok { Some(cache.data) } else { None })
     212            0 :         })
     213            0 :         .transpose()?
     214            0 :         .flatten();
     215            0 :     let all_ranges: Vec<KeyRange> = if let Some(cached) = cache {
     216            0 :         info!("using keyspace cache file");
     217            0 :         cached
     218              :     } else {
     219            0 :         let mut js = JoinSet::new();
     220            0 :         for timeline in &timelines {
     221            0 :             js.spawn({
     222            0 :                 let mgmt_api_client = Arc::clone(&mgmt_api_client);
     223            0 :                 let timeline = *timeline;
     224            0 :                 async move {
     225            0 :                     let partitioning = mgmt_api_client
     226            0 :                         .keyspace(
     227            0 :                             TenantShardId::unsharded(timeline.tenant_id),
     228            0 :                             timeline.timeline_id,
     229            0 :                         )
     230            0 :                         .await?;
     231            0 :                     let lsn = partitioning.at_lsn;
     232            0 :                     let start = Instant::now();
     233            0 :                     let mut filtered = KeySpaceAccum::new();
     234              :                     // let's hope this is inlined and vectorized...
     235              :                     // TODO: turn this loop into a is_rel_block_range() function.
     236            0 :                     for r in partitioning.keys.ranges.iter() {
     237            0 :                         let mut i = r.start;
     238            0 :                         while i != r.end {
     239            0 :                             let mut include = true;
     240            0 :                             include &= i.is_rel_block_key();
     241            0 :                             if let Some(only_relnode) = args.only_relnode {
     242            0 :                                 include &= i.is_rel_block_of_rel(only_relnode);
     243            0 :                             }
     244            0 :                             if include {
     245            0 :                                 filtered.add_key(i);
     246            0 :                             }
     247            0 :                             i = i.next();
     248              :                         }
     249              :                     }
     250            0 :                     let filtered = filtered.to_keyspace();
     251            0 :                     let filter_duration = start.elapsed();
     252              : 
     253            0 :                     anyhow::Ok((
     254            0 :                         filter_duration,
     255            0 :                         filtered.ranges.into_iter().map(move |r| KeyRange {
     256            0 :                             timeline,
     257            0 :                             timeline_lsn: lsn,
     258            0 :                             start: r.start.to_i128(),
     259            0 :                             end: r.end.to_i128(),
     260            0 :                         }),
     261              :                     ))
     262            0 :                 }
     263              :             });
     264              :         }
     265            0 :         let mut total_filter_duration = Duration::from_secs(0);
     266            0 :         let mut all_ranges: Vec<KeyRange> = Vec::new();
     267            0 :         while let Some(res) = js.join_next().await {
     268            0 :             let (filter_duration, range) = res.unwrap().unwrap();
     269            0 :             all_ranges.extend(range);
     270            0 :             total_filter_duration += filter_duration;
     271            0 :         }
     272            0 :         info!("filter duration: {}", total_filter_duration.as_secs_f64());
     273            0 :         if let Some(cachefile) = args.keyspace_cache.as_ref() {
     274            0 :             let cache = KeyspaceCacheSer {
     275            0 :                 tag: &timelines,
     276            0 :                 data: &all_ranges,
     277            0 :             };
     278            0 :             let bytes = serde_json::to_vec(&cache).context("serialize keyspace for cache file")?;
     279            0 :             std::fs::write(cachefile, bytes).context("write keyspace cache file to disk")?;
     280            0 :             info!("successfully wrote keyspace cache file");
     281            0 :         }
     282            0 :         all_ranges
     283              :     };
     284              : 
     285            0 :     let num_live_stats_dump = 1;
     286            0 :     let num_work_sender_tasks = args.num_clients.get() * timelines.len();
     287            0 :     let num_main_impl = 1;
     288              : 
     289            0 :     let shared_state = Arc::new(SharedState {
     290            0 :         start_work_barrier: tokio::sync::Barrier::new(
     291            0 :             num_live_stats_dump + num_work_sender_tasks + num_main_impl,
     292            0 :         ),
     293            0 :         live_stats: LiveStats::default(),
     294            0 :     });
     295            0 :     let cancel = CancellationToken::new();
     296              : 
     297            0 :     let ss = shared_state.clone();
     298            0 :     tokio::spawn({
     299            0 :         async move {
     300            0 :             ss.start_work_barrier.wait().await;
     301              :             loop {
     302            0 :                 let start = std::time::Instant::now();
     303            0 :                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     304            0 :                 let stats = &ss.live_stats;
     305            0 :                 let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
     306            0 :                 let missed = stats.missed.swap(0, Ordering::Relaxed);
     307            0 :                 let elapsed = start.elapsed();
     308            0 :                 info!(
     309            0 :                     "RPS: {:.0}   MISSED: {:.0}",
     310            0 :                     completed_requests as f64 / elapsed.as_secs_f64(),
     311            0 :                     missed as f64 / elapsed.as_secs_f64()
     312              :                 );
     313              :             }
     314              :         }
     315              :     });
     316              : 
     317            0 :     let rps_period = args
     318            0 :         .per_client_rate
     319            0 :         .map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
     320            0 :     let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
     321            0 :         let ss = shared_state.clone();
     322            0 :         let cancel = cancel.clone();
     323            0 :         let ranges: Vec<KeyRange> = all_ranges
     324            0 :             .iter()
     325            0 :             .filter(|r| r.timeline == worker_id.timeline)
     326            0 :             .cloned()
     327            0 :             .collect();
     328            0 :         let weights =
     329            0 :             rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
     330            0 :                 .unwrap();
     331              : 
     332            0 :         Box::pin(async move {
     333            0 :             let scheme = match Url::parse(&args.page_service_connstring) {
     334            0 :                 Ok(url) => url.scheme().to_lowercase().to_string(),
     335            0 :                 Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(),
     336            0 :                 Err(err) => panic!("invalid connstring: {err}"),
     337              :             };
     338            0 :             let client: Box<dyn Client> = match scheme.as_str() {
     339            0 :                 "postgresql" | "postgres" => {
     340            0 :                     assert!(!args.compression, "libpq does not support compression");
     341            0 :                     assert!(!args.rich_client, "rich client requires grpc://");
     342            0 :                     Box::new(
     343            0 :                         LibpqClient::new(&args.page_service_connstring, worker_id.timeline)
     344            0 :                             .await
     345            0 :                             .unwrap(),
     346              :                     )
     347              :                 }
     348              : 
     349            0 :                 "grpc" if args.rich_client => Box::new(
     350            0 :                     RichGrpcClient::new(
     351            0 :                         &args.page_service_connstring,
     352            0 :                         worker_id.timeline,
     353            0 :                         args.compression,
     354            0 :                     )
     355            0 :                     .await
     356            0 :                     .unwrap(),
     357              :                 ),
     358              : 
     359            0 :                 "grpc" => Box::new(
     360            0 :                     GrpcClient::new(
     361            0 :                         &args.page_service_connstring,
     362            0 :                         worker_id.timeline,
     363            0 :                         args.compression,
     364            0 :                     )
     365            0 :                     .await
     366            0 :                     .unwrap(),
     367              :                 ),
     368              : 
     369            0 :                 scheme => panic!("unsupported scheme {scheme}"),
     370              :             };
     371            0 :             run_worker(args, client, ss, cancel, rps_period, ranges, weights).await
     372            0 :         })
     373            0 :     };
     374              : 
     375            0 :     info!("spawning workers");
     376            0 :     let mut workers = JoinSet::new();
     377            0 :     for timeline in timelines.iter().cloned() {
     378            0 :         for num_client in 0..args.num_clients.get() {
     379            0 :             let worker_id = WorkerId {
     380            0 :                 timeline,
     381            0 :                 num_client,
     382            0 :             };
     383            0 :             workers.spawn(make_worker(worker_id));
     384            0 :         }
     385              :     }
     386            0 :     let workers = async move {
     387            0 :         while let Some(res) = workers.join_next().await {
     388            0 :             res.unwrap();
     389            0 :         }
     390            0 :     };
     391              : 
     392            0 :     info!("waiting for everything to become ready");
     393            0 :     shared_state.start_work_barrier.wait().await;
     394            0 :     info!("work started");
     395            0 :     if let Some(runtime) = args.runtime {
     396            0 :         tokio::time::sleep(runtime.into()).await;
     397            0 :         info!("runtime over, signalling cancellation");
     398            0 :         cancel.cancel();
     399            0 :         workers.await;
     400            0 :         info!("work sender exited");
     401              :     } else {
     402            0 :         workers.await;
     403            0 :         unreachable!("work sender never terminates");
     404              :     }
     405              : 
     406            0 :     let output = Output {
     407              :         total: {
     408            0 :             let mut agg_stats = request_stats::Stats::new();
     409            0 :             for stats in all_thread_local_stats.lock().unwrap().iter() {
     410            0 :                 let stats = stats.lock().unwrap();
     411            0 :                 agg_stats.add(&stats);
     412            0 :             }
     413            0 :             agg_stats.output()
     414              :         },
     415              :     };
     416              : 
     417            0 :     let output = serde_json::to_string_pretty(&output).unwrap();
     418            0 :     println!("{output}");
     419              : 
     420            0 :     anyhow::Ok(())
     421            0 : }
     422              : 
     423            0 : async fn run_worker(
     424            0 :     args: &Args,
     425            0 :     mut client: Box<dyn Client>,
     426            0 :     shared_state: Arc<SharedState>,
     427            0 :     cancel: CancellationToken,
     428            0 :     rps_period: Option<Duration>,
     429            0 :     ranges: Vec<KeyRange>,
     430            0 :     weights: rand::distributions::weighted::WeightedIndex<i128>,
     431            0 : ) {
     432            0 :     shared_state.start_work_barrier.wait().await;
     433            0 :     let client_start = Instant::now();
     434            0 :     let mut ticks_processed = 0;
     435            0 :     let mut req_id = 0;
     436            0 :     let batch_size: usize = args.batch_size.into();
     437              : 
     438              :     // Track inflight requests by request ID and start time. This times the request duration, and
     439              :     // ensures responses match requests. We don't expect responses back in any particular order.
     440              :     //
     441              :     // NB: this does not check that all requests received a response, because we don't wait for the
     442              :     // inflight requests to complete when the duration elapses.
     443            0 :     let mut inflight: HashMap<u64, Instant> = HashMap::new();
     444              : 
     445            0 :     while !cancel.is_cancelled() {
     446              :         // Detect if a request took longer than the RPS rate
     447            0 :         if let Some(period) = &rps_period {
     448            0 :             let periods_passed_until_now =
     449            0 :                 usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
     450              : 
     451            0 :             if periods_passed_until_now > ticks_processed {
     452            0 :                 shared_state
     453            0 :                     .live_stats
     454            0 :                     .missed((periods_passed_until_now - ticks_processed) as u64);
     455            0 :             }
     456            0 :             ticks_processed = periods_passed_until_now;
     457            0 :         }
     458              : 
     459            0 :         while inflight.len() < args.queue_depth.get() {
     460            0 :             req_id += 1;
     461            0 :             let start = Instant::now();
     462            0 :             let (req_lsn, mod_lsn, rel, blks) = {
     463              :                 /// Converts a compact i128 key to a relation tag and block number.
     464            0 :                 fn key_to_block(key: i128) -> (RelTag, u32) {
     465            0 :                     let key = Key::from_i128(key);
     466            0 :                     assert!(key.is_rel_block_key());
     467            0 :                     key.to_rel_block()
     468            0 :                         .expect("we filter non-rel-block keys out above")
     469            0 :                 }
     470              : 
     471              :                 // Pick a random page from a random relation.
     472            0 :                 let mut rng = rand::thread_rng();
     473            0 :                 let r = &ranges[weights.sample(&mut rng)];
     474            0 :                 let key: i128 = rng.gen_range(r.start..r.end);
     475            0 :                 let (rel_tag, block_no) = key_to_block(key);
     476              : 
     477            0 :                 let mut blks = VecDeque::with_capacity(batch_size);
     478            0 :                 blks.push_back(block_no);
     479              : 
     480              :                 // If requested, populate a batch of sequential pages. This is how Postgres will
     481              :                 // request page batches (e.g. prefetches). If we hit the end of the relation, we
     482              :                 // grow the batch towards the start too.
     483            0 :                 for i in 1..batch_size {
     484            0 :                     let (r, b) = key_to_block(key + i as i128);
     485            0 :                     if r != rel_tag {
     486            0 :                         break; // went outside relation
     487            0 :                     }
     488            0 :                     blks.push_back(b)
     489              :                 }
     490              : 
     491            0 :                 if blks.len() < batch_size {
     492              :                     // Grow batch backwards if needed.
     493            0 :                     for i in 1..batch_size {
     494            0 :                         let (r, b) = key_to_block(key - i as i128);
     495            0 :                         if r != rel_tag {
     496            0 :                             break; // went outside relation
     497            0 :                         }
     498            0 :                         blks.push_front(b)
     499              :                     }
     500            0 :                 }
     501              : 
     502              :                 // We assume that the entire batch can fit within the relation.
     503            0 :                 assert_eq!(blks.len(), batch_size, "incomplete batch");
     504              : 
     505            0 :                 let req_lsn = if rng.gen_bool(args.req_latest_probability) {
     506            0 :                     Lsn::MAX
     507              :                 } else {
     508            0 :                     r.timeline_lsn
     509              :                 };
     510            0 :                 (req_lsn, r.timeline_lsn, rel_tag, blks.into())
     511              :             };
     512            0 :             client
     513            0 :                 .send_get_page(req_id, req_lsn, mod_lsn, rel, blks)
     514            0 :                 .await
     515            0 :                 .unwrap();
     516            0 :             let old = inflight.insert(req_id, start);
     517            0 :             assert!(old.is_none(), "duplicate request ID {req_id}");
     518              :         }
     519              : 
     520            0 :         let (req_id, pages) = client.recv_get_page().await.unwrap();
     521            0 :         assert_eq!(pages.len(), batch_size, "unexpected page count");
     522            0 :         assert!(pages.iter().all(|p| !p.is_empty()), "empty page");
     523            0 :         let start = inflight
     524            0 :             .remove(&req_id)
     525            0 :             .expect("response for unknown request ID");
     526            0 :         let end = Instant::now();
     527            0 :         shared_state.live_stats.request_done();
     528            0 :         ticks_processed += 1;
     529            0 :         STATS.with(|stats| {
     530            0 :             stats
     531            0 :                 .borrow()
     532            0 :                 .lock()
     533            0 :                 .unwrap()
     534            0 :                 .observe(end.duration_since(start))
     535            0 :                 .unwrap();
     536            0 :         });
     537              : 
     538            0 :         if let Some(period) = &rps_period {
     539            0 :             let next_at = client_start
     540            0 :                 + Duration::from_micros(
     541            0 :                     (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
     542            0 :                 );
     543            0 :             tokio::time::sleep_until(next_at.into()).await;
     544            0 :         }
     545              :     }
     546            0 : }
     547              : 
     548              : /// A benchmark client, to allow switching out the transport protocol.
     549              : ///
     550              : /// For simplicity, this just uses separate asynchronous send/recv methods. The send method could
     551              : /// return a future that resolves when the response is received, but we don't really need it.
     552              : #[async_trait]
     553              : trait Client: Send {
     554              :     /// Sends an asynchronous GetPage request to the pageserver.
     555              :     async fn send_get_page(
     556              :         &mut self,
     557              :         req_id: u64,
     558              :         req_lsn: Lsn,
     559              :         mod_lsn: Lsn,
     560              :         rel: RelTag,
     561              :         blks: Vec<u32>,
     562              :     ) -> anyhow::Result<()>;
     563              : 
     564              :     /// Receives the next GetPage response from the pageserver.
     565              :     async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)>;
     566              : }
     567              : 
     568              : /// A libpq-based Pageserver client.
     569              : struct LibpqClient {
     570              :     inner: pageserver_client::page_service::PagestreamClient,
     571              :     // Track sent batches, so we know how many responses to expect.
     572              :     batch_sizes: VecDeque<usize>,
     573              : }
     574              : 
     575              : impl LibpqClient {
     576            0 :     async fn new(connstring: &str, ttid: TenantTimelineId) -> anyhow::Result<Self> {
     577            0 :         let inner = pageserver_client::page_service::Client::new(connstring.to_string())
     578            0 :             .await?
     579            0 :             .pagestream(ttid.tenant_id, ttid.timeline_id)
     580            0 :             .await?;
     581            0 :         Ok(Self {
     582            0 :             inner,
     583            0 :             batch_sizes: VecDeque::new(),
     584            0 :         })
     585            0 :     }
     586              : }
     587              : 
     588              : #[async_trait]
     589              : impl Client for LibpqClient {
     590            0 :     async fn send_get_page(
     591              :         &mut self,
     592              :         req_id: u64,
     593              :         req_lsn: Lsn,
     594              :         mod_lsn: Lsn,
     595              :         rel: RelTag,
     596              :         blks: Vec<u32>,
     597            0 :     ) -> anyhow::Result<()> {
     598              :         // libpq doesn't support client-side batches, so we send a bunch of individual requests
     599              :         // instead in the hope that the server will batch them for us. We use the same request ID
     600              :         // for all, because we'll return a single batch response.
     601            0 :         self.batch_sizes.push_back(blks.len());
     602            0 :         for blkno in blks {
     603            0 :             let req = PagestreamGetPageRequest {
     604            0 :                 hdr: PagestreamRequest {
     605            0 :                     reqid: req_id,
     606            0 :                     request_lsn: req_lsn,
     607            0 :                     not_modified_since: mod_lsn,
     608            0 :                 },
     609            0 :                 rel,
     610            0 :                 blkno,
     611            0 :             };
     612            0 :             self.inner.getpage_send(req).await?;
     613              :         }
     614            0 :         Ok(())
     615            0 :     }
     616              : 
     617            0 :     async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
     618            0 :         let batch_size = self.batch_sizes.pop_front().unwrap();
     619            0 :         let mut batch = Vec::with_capacity(batch_size);
     620            0 :         let mut req_id = None;
     621            0 :         for _ in 0..batch_size {
     622            0 :             let resp = self.inner.getpage_recv().await?;
     623            0 :             if req_id.is_none() {
     624            0 :                 req_id = Some(resp.req.hdr.reqid);
     625            0 :             }
     626            0 :             assert_eq!(req_id, Some(resp.req.hdr.reqid), "request ID mismatch");
     627            0 :             batch.push(resp.page);
     628              :         }
     629            0 :         Ok((req_id.unwrap(), batch))
     630            0 :     }
     631              : }
     632              : 
     633              : /// A gRPC Pageserver client.
     634              : struct GrpcClient {
     635              :     req_tx: tokio::sync::mpsc::Sender<page_api::GetPageRequest>,
     636              :     resp_rx: Pin<Box<dyn Stream<Item = Result<page_api::GetPageResponse, tonic::Status>> + Send>>,
     637              : }
     638              : 
     639              : impl GrpcClient {
     640            0 :     async fn new(
     641            0 :         connstring: &str,
     642            0 :         ttid: TenantTimelineId,
     643            0 :         compression: bool,
     644            0 :     ) -> anyhow::Result<Self> {
     645            0 :         let mut client = page_api::Client::connect(
     646            0 :             connstring.to_string(),
     647            0 :             ttid.tenant_id,
     648            0 :             ttid.timeline_id,
     649            0 :             ShardIndex::unsharded(),
     650            0 :             None,
     651            0 :             compression.then_some(tonic::codec::CompressionEncoding::Zstd),
     652            0 :         )
     653            0 :         .await?;
     654              : 
     655              :         // The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the
     656              :         // benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are
     657              :         // buffered by Tonic and the OS too.
     658            0 :         let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
     659            0 :         let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
     660            0 :         let resp_rx = Box::pin(client.get_pages(req_stream).await?);
     661              : 
     662            0 :         Ok(Self { req_tx, resp_rx })
     663            0 :     }
     664              : }
     665              : 
     666              : #[async_trait]
     667              : impl Client for GrpcClient {
     668            0 :     async fn send_get_page(
     669              :         &mut self,
     670              :         req_id: u64,
     671              :         req_lsn: Lsn,
     672              :         mod_lsn: Lsn,
     673              :         rel: RelTag,
     674              :         blks: Vec<u32>,
     675            0 :     ) -> anyhow::Result<()> {
     676            0 :         let req = page_api::GetPageRequest {
     677            0 :             request_id: req_id.into(),
     678            0 :             request_class: page_api::GetPageClass::Normal,
     679            0 :             read_lsn: page_api::ReadLsn {
     680            0 :                 request_lsn: req_lsn,
     681            0 :                 not_modified_since_lsn: Some(mod_lsn),
     682            0 :             },
     683            0 :             rel,
     684            0 :             block_numbers: blks,
     685            0 :         };
     686            0 :         self.req_tx.send(req).await?;
     687            0 :         Ok(())
     688            0 :     }
     689              : 
     690            0 :     async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
     691            0 :         let resp = self.resp_rx.next().await.unwrap().unwrap();
     692            0 :         anyhow::ensure!(
     693            0 :             resp.status_code == page_api::GetPageStatusCode::Ok,
     694            0 :             "unexpected status code: {}",
     695              :             resp.status_code,
     696              :         );
     697              :         Ok((
     698            0 :             resp.request_id.id,
     699            0 :             resp.pages.into_iter().map(|p| p.image).collect(),
     700              :         ))
     701            0 :     }
     702              : }
     703              : 
     704              : /// A rich gRPC Pageserver client.
     705              : struct RichGrpcClient {
     706              :     inner: Arc<client_grpc::PageserverClient>,
     707              :     requests: FuturesUnordered<
     708              :         Pin<Box<dyn Future<Output = anyhow::Result<page_api::GetPageResponse>> + Send>>,
     709              :     >,
     710              : }
     711              : 
     712              : impl RichGrpcClient {
     713            0 :     async fn new(
     714            0 :         connstring: &str,
     715            0 :         ttid: TenantTimelineId,
     716            0 :         compression: bool,
     717            0 :     ) -> anyhow::Result<Self> {
     718            0 :         let inner = Arc::new(client_grpc::PageserverClient::new(
     719            0 :             ttid.tenant_id,
     720            0 :             ttid.timeline_id,
     721            0 :             ShardSpec::new(
     722            0 :                 [(ShardIndex::unsharded(), connstring.to_string())].into(),
     723            0 :                 None,
     724            0 :             )?,
     725            0 :             None,
     726            0 :             compression.then_some(tonic::codec::CompressionEncoding::Zstd),
     727            0 :         )?);
     728            0 :         Ok(Self {
     729            0 :             inner,
     730            0 :             requests: FuturesUnordered::new(),
     731            0 :         })
     732            0 :     }
     733              : }
     734              : 
     735              : #[async_trait]
     736              : impl Client for RichGrpcClient {
     737            0 :     async fn send_get_page(
     738              :         &mut self,
     739              :         req_id: u64,
     740              :         req_lsn: Lsn,
     741              :         mod_lsn: Lsn,
     742              :         rel: RelTag,
     743              :         blks: Vec<u32>,
     744            0 :     ) -> anyhow::Result<()> {
     745            0 :         let req = page_api::GetPageRequest {
     746            0 :             request_id: req_id.into(),
     747            0 :             request_class: page_api::GetPageClass::Normal,
     748            0 :             read_lsn: page_api::ReadLsn {
     749            0 :                 request_lsn: req_lsn,
     750            0 :                 not_modified_since_lsn: Some(mod_lsn),
     751            0 :             },
     752            0 :             rel,
     753            0 :             block_numbers: blks,
     754            0 :         };
     755            0 :         let inner = self.inner.clone();
     756            0 :         self.requests.push(Box::pin(async move {
     757            0 :             inner
     758            0 :                 .get_page(req)
     759            0 :                 .await
     760            0 :                 .map_err(|err| anyhow::anyhow!("{err}"))
     761            0 :         }));
     762            0 :         Ok(())
     763            0 :     }
     764              : 
     765            0 :     async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
     766            0 :         let resp = self.requests.next().await.unwrap()?;
     767              :         Ok((
     768            0 :             resp.request_id.id,
     769            0 :             resp.pages.into_iter().map(|p| p.image).collect(),
     770              :         ))
     771            0 :     }
     772              : }
        

Generated by: LCOV version 2.1-beta