Line data Source code
1 : use std::collections::{HashMap, HashSet, VecDeque};
2 : use std::future::Future;
3 : use std::num::NonZeroUsize;
4 : use std::pin::Pin;
5 : use std::sync::atomic::{AtomicU64, Ordering};
6 : use std::sync::{Arc, Mutex};
7 : use std::time::{Duration, Instant};
8 :
9 : use anyhow::Context;
10 : use async_trait::async_trait;
11 : use bytes::Bytes;
12 : use camino::Utf8PathBuf;
13 : use futures::stream::FuturesUnordered;
14 : use futures::{Stream, StreamExt as _};
15 : use pageserver_api::key::Key;
16 : use pageserver_api::keyspace::KeySpaceAccum;
17 : use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest};
18 : use pageserver_api::reltag::RelTag;
19 : use pageserver_api::shard::TenantShardId;
20 : use pageserver_client_grpc::{self as client_grpc, ShardSpec};
21 : use pageserver_page_api as page_api;
22 : use rand::prelude::*;
23 : use tokio::task::JoinSet;
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::info;
26 : use url::Url;
27 : use utils::id::TenantTimelineId;
28 : use utils::lsn::Lsn;
29 : use utils::shard::ShardIndex;
30 :
31 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
32 : use crate::util::{request_stats, tokio_thread_local_stats};
33 :
34 : /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
35 : #[derive(clap::Parser)]
36 : pub(crate) struct Args {
37 : #[clap(long, default_value = "http://localhost:9898")]
38 : mgmt_api_endpoint: String,
39 : /// Pageserver connection string. Supports postgresql:// and grpc:// protocols.
40 : #[clap(long, default_value = "postgres://postgres@localhost:64000")]
41 : page_service_connstring: String,
42 : /// Use the rich gRPC Pageserver client `client_grpc::PageserverClient`, rather than the basic
43 : /// no-frills `page_api::Client`. Only valid with grpc:// connstrings.
44 : #[clap(long)]
45 : rich_client: bool,
46 : #[clap(long)]
47 : pageserver_jwt: Option<String>,
48 : #[clap(long, default_value = "1")]
49 : num_clients: NonZeroUsize,
50 : #[clap(long)]
51 : runtime: Option<humantime::Duration>,
52 : /// If true, enable compression (only for gRPC).
53 : #[clap(long)]
54 : compression: bool,
55 : /// Each client sends requests at the given rate.
56 : ///
57 : /// If a request takes too long and we should be issuing a new request already,
58 : /// we skip that request and account it as `MISSED`.
59 : #[clap(long)]
60 : per_client_rate: Option<usize>,
61 : /// Probability for sending `latest=true` in the request (uniform distribution).
62 : #[clap(long, default_value = "1")]
63 : req_latest_probability: f64,
64 : #[clap(long)]
65 : limit_to_first_n_targets: Option<usize>,
66 : /// For large pageserver installations, enumerating the keyspace takes a lot of time.
67 : /// If specified, the specified path is used to maintain a cache of the keyspace enumeration result.
68 : /// The cache is tagged and auto-invalided by the tenant/timeline ids only.
69 : /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
70 : #[clap(long)]
71 : keyspace_cache: Option<Utf8PathBuf>,
72 : /// Before starting the benchmark, live-reconfigure the pageserver to use the given
73 : /// [`pageserver_api::models::virtual_file::IoEngineKind`].
74 : #[clap(long)]
75 : set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
76 :
77 : /// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
78 : #[clap(long)]
79 : set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
80 :
81 : /// Queue depth generated in each client.
82 : #[clap(long, default_value = "1")]
83 : queue_depth: NonZeroUsize,
84 :
85 : /// Batch size of contiguous pages generated by each client. This is equivalent to how Postgres
86 : /// will request page batches (e.g. prefetches or vectored reads). A batch counts as 1 RPS and
87 : /// 1 queue depth.
88 : ///
89 : /// The libpq protocol does not support client-side batching, and will submit batches as many
90 : /// individual requests, in the hope that the server will batch them. Each batch still counts as
91 : /// 1 RPS and 1 queue depth.
92 : #[clap(long, default_value = "1")]
93 : batch_size: NonZeroUsize,
94 :
95 : #[clap(long)]
96 : only_relnode: Option<u32>,
97 :
98 : targets: Option<Vec<TenantTimelineId>>,
99 : }
100 :
101 : /// State shared by all clients
102 : #[derive(Debug)]
103 : struct SharedState {
104 : start_work_barrier: tokio::sync::Barrier,
105 : live_stats: LiveStats,
106 : }
107 :
108 : #[derive(Debug, Default)]
109 : struct LiveStats {
110 : completed_requests: AtomicU64,
111 : missed: AtomicU64,
112 : }
113 :
114 : impl LiveStats {
115 0 : fn request_done(&self) {
116 0 : self.completed_requests.fetch_add(1, Ordering::Relaxed);
117 0 : }
118 0 : fn missed(&self, n: u64) {
119 0 : self.missed.fetch_add(n, Ordering::Relaxed);
120 0 : }
121 : }
122 :
123 0 : #[derive(Clone, serde::Serialize, serde::Deserialize)]
124 : struct KeyRange {
125 : timeline: TenantTimelineId,
126 : timeline_lsn: Lsn,
127 : start: i128,
128 : end: i128,
129 : }
130 :
131 : impl KeyRange {
132 0 : fn len(&self) -> i128 {
133 0 : self.end - self.start
134 0 : }
135 : }
136 :
137 : #[derive(PartialEq, Eq, Hash, Copy, Clone)]
138 : struct WorkerId {
139 : timeline: TenantTimelineId,
140 : num_client: usize, // from 0..args.num_clients
141 : }
142 :
143 : #[derive(serde::Serialize)]
144 : struct Output {
145 : total: request_stats::Output,
146 : }
147 :
148 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
149 :
150 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
151 0 : tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
152 0 : main_impl(args, thread_local_stats)
153 0 : })
154 0 : }
155 :
156 0 : async fn main_impl(
157 0 : args: Args,
158 0 : all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
159 0 : ) -> anyhow::Result<()> {
160 0 : let args: &'static Args = Box::leak(Box::new(args));
161 :
162 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
163 0 : reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
164 0 : args.mgmt_api_endpoint.clone(),
165 0 : args.pageserver_jwt.as_deref(),
166 : ));
167 :
168 0 : if let Some(engine_str) = &args.set_io_engine {
169 0 : mgmt_api_client.put_io_engine(engine_str).await?;
170 0 : }
171 :
172 0 : if let Some(mode) = &args.set_io_mode {
173 0 : mgmt_api_client.put_io_mode(mode).await?;
174 0 : }
175 :
176 : // discover targets
177 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
178 0 : &mgmt_api_client,
179 0 : crate::util::cli::targets::Spec {
180 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
181 0 : targets: args.targets.clone(),
182 0 : },
183 0 : )
184 0 : .await?;
185 :
186 0 : #[derive(serde::Deserialize)]
187 : struct KeyspaceCacheDe {
188 : tag: Vec<TenantTimelineId>,
189 : data: Vec<KeyRange>,
190 : }
191 : #[derive(serde::Serialize)]
192 : struct KeyspaceCacheSer<'a> {
193 : tag: &'a [TenantTimelineId],
194 : data: &'a [KeyRange],
195 : }
196 0 : let cache = args
197 0 : .keyspace_cache
198 0 : .as_ref()
199 0 : .map(|keyspace_cache_file| {
200 0 : let contents = match std::fs::read(keyspace_cache_file) {
201 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
202 0 : return anyhow::Ok(None);
203 : }
204 0 : x => x.context("read keyspace cache file")?,
205 : };
206 0 : let cache: KeyspaceCacheDe =
207 0 : serde_json::from_slice(&contents).context("deserialize cache file")?;
208 0 : let tag_ok = HashSet::<TenantTimelineId>::from_iter(cache.tag.into_iter())
209 0 : == HashSet::from_iter(timelines.iter().cloned());
210 0 : info!("keyspace cache file matches tag: {tag_ok}");
211 0 : anyhow::Ok(if tag_ok { Some(cache.data) } else { None })
212 0 : })
213 0 : .transpose()?
214 0 : .flatten();
215 0 : let all_ranges: Vec<KeyRange> = if let Some(cached) = cache {
216 0 : info!("using keyspace cache file");
217 0 : cached
218 : } else {
219 0 : let mut js = JoinSet::new();
220 0 : for timeline in &timelines {
221 0 : js.spawn({
222 0 : let mgmt_api_client = Arc::clone(&mgmt_api_client);
223 0 : let timeline = *timeline;
224 0 : async move {
225 0 : let partitioning = mgmt_api_client
226 0 : .keyspace(
227 0 : TenantShardId::unsharded(timeline.tenant_id),
228 0 : timeline.timeline_id,
229 0 : )
230 0 : .await?;
231 0 : let lsn = partitioning.at_lsn;
232 0 : let start = Instant::now();
233 0 : let mut filtered = KeySpaceAccum::new();
234 : // let's hope this is inlined and vectorized...
235 : // TODO: turn this loop into a is_rel_block_range() function.
236 0 : for r in partitioning.keys.ranges.iter() {
237 0 : let mut i = r.start;
238 0 : while i != r.end {
239 0 : let mut include = true;
240 0 : include &= i.is_rel_block_key();
241 0 : if let Some(only_relnode) = args.only_relnode {
242 0 : include &= i.is_rel_block_of_rel(only_relnode);
243 0 : }
244 0 : if include {
245 0 : filtered.add_key(i);
246 0 : }
247 0 : i = i.next();
248 : }
249 : }
250 0 : let filtered = filtered.to_keyspace();
251 0 : let filter_duration = start.elapsed();
252 :
253 0 : anyhow::Ok((
254 0 : filter_duration,
255 0 : filtered.ranges.into_iter().map(move |r| KeyRange {
256 0 : timeline,
257 0 : timeline_lsn: lsn,
258 0 : start: r.start.to_i128(),
259 0 : end: r.end.to_i128(),
260 0 : }),
261 : ))
262 0 : }
263 : });
264 : }
265 0 : let mut total_filter_duration = Duration::from_secs(0);
266 0 : let mut all_ranges: Vec<KeyRange> = Vec::new();
267 0 : while let Some(res) = js.join_next().await {
268 0 : let (filter_duration, range) = res.unwrap().unwrap();
269 0 : all_ranges.extend(range);
270 0 : total_filter_duration += filter_duration;
271 0 : }
272 0 : info!("filter duration: {}", total_filter_duration.as_secs_f64());
273 0 : if let Some(cachefile) = args.keyspace_cache.as_ref() {
274 0 : let cache = KeyspaceCacheSer {
275 0 : tag: &timelines,
276 0 : data: &all_ranges,
277 0 : };
278 0 : let bytes = serde_json::to_vec(&cache).context("serialize keyspace for cache file")?;
279 0 : std::fs::write(cachefile, bytes).context("write keyspace cache file to disk")?;
280 0 : info!("successfully wrote keyspace cache file");
281 0 : }
282 0 : all_ranges
283 : };
284 :
285 0 : let num_live_stats_dump = 1;
286 0 : let num_work_sender_tasks = args.num_clients.get() * timelines.len();
287 0 : let num_main_impl = 1;
288 :
289 0 : let shared_state = Arc::new(SharedState {
290 0 : start_work_barrier: tokio::sync::Barrier::new(
291 0 : num_live_stats_dump + num_work_sender_tasks + num_main_impl,
292 0 : ),
293 0 : live_stats: LiveStats::default(),
294 0 : });
295 0 : let cancel = CancellationToken::new();
296 :
297 0 : let ss = shared_state.clone();
298 0 : tokio::spawn({
299 0 : async move {
300 0 : ss.start_work_barrier.wait().await;
301 : loop {
302 0 : let start = std::time::Instant::now();
303 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
304 0 : let stats = &ss.live_stats;
305 0 : let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
306 0 : let missed = stats.missed.swap(0, Ordering::Relaxed);
307 0 : let elapsed = start.elapsed();
308 0 : info!(
309 0 : "RPS: {:.0} MISSED: {:.0}",
310 0 : completed_requests as f64 / elapsed.as_secs_f64(),
311 0 : missed as f64 / elapsed.as_secs_f64()
312 : );
313 : }
314 : }
315 : });
316 :
317 0 : let rps_period = args
318 0 : .per_client_rate
319 0 : .map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
320 0 : let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
321 0 : let ss = shared_state.clone();
322 0 : let cancel = cancel.clone();
323 0 : let ranges: Vec<KeyRange> = all_ranges
324 0 : .iter()
325 0 : .filter(|r| r.timeline == worker_id.timeline)
326 0 : .cloned()
327 0 : .collect();
328 0 : let weights =
329 0 : rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
330 0 : .unwrap();
331 :
332 0 : Box::pin(async move {
333 0 : let scheme = match Url::parse(&args.page_service_connstring) {
334 0 : Ok(url) => url.scheme().to_lowercase().to_string(),
335 0 : Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(),
336 0 : Err(err) => panic!("invalid connstring: {err}"),
337 : };
338 0 : let client: Box<dyn Client> = match scheme.as_str() {
339 0 : "postgresql" | "postgres" => {
340 0 : assert!(!args.compression, "libpq does not support compression");
341 0 : assert!(!args.rich_client, "rich client requires grpc://");
342 0 : Box::new(
343 0 : LibpqClient::new(&args.page_service_connstring, worker_id.timeline)
344 0 : .await
345 0 : .unwrap(),
346 : )
347 : }
348 :
349 0 : "grpc" if args.rich_client => Box::new(
350 0 : RichGrpcClient::new(
351 0 : &args.page_service_connstring,
352 0 : worker_id.timeline,
353 0 : args.compression,
354 0 : )
355 0 : .await
356 0 : .unwrap(),
357 : ),
358 :
359 0 : "grpc" => Box::new(
360 0 : GrpcClient::new(
361 0 : &args.page_service_connstring,
362 0 : worker_id.timeline,
363 0 : args.compression,
364 0 : )
365 0 : .await
366 0 : .unwrap(),
367 : ),
368 :
369 0 : scheme => panic!("unsupported scheme {scheme}"),
370 : };
371 0 : run_worker(args, client, ss, cancel, rps_period, ranges, weights).await
372 0 : })
373 0 : };
374 :
375 0 : info!("spawning workers");
376 0 : let mut workers = JoinSet::new();
377 0 : for timeline in timelines.iter().cloned() {
378 0 : for num_client in 0..args.num_clients.get() {
379 0 : let worker_id = WorkerId {
380 0 : timeline,
381 0 : num_client,
382 0 : };
383 0 : workers.spawn(make_worker(worker_id));
384 0 : }
385 : }
386 0 : let workers = async move {
387 0 : while let Some(res) = workers.join_next().await {
388 0 : res.unwrap();
389 0 : }
390 0 : };
391 :
392 0 : info!("waiting for everything to become ready");
393 0 : shared_state.start_work_barrier.wait().await;
394 0 : info!("work started");
395 0 : if let Some(runtime) = args.runtime {
396 0 : tokio::time::sleep(runtime.into()).await;
397 0 : info!("runtime over, signalling cancellation");
398 0 : cancel.cancel();
399 0 : workers.await;
400 0 : info!("work sender exited");
401 : } else {
402 0 : workers.await;
403 0 : unreachable!("work sender never terminates");
404 : }
405 :
406 0 : let output = Output {
407 : total: {
408 0 : let mut agg_stats = request_stats::Stats::new();
409 0 : for stats in all_thread_local_stats.lock().unwrap().iter() {
410 0 : let stats = stats.lock().unwrap();
411 0 : agg_stats.add(&stats);
412 0 : }
413 0 : agg_stats.output()
414 : },
415 : };
416 :
417 0 : let output = serde_json::to_string_pretty(&output).unwrap();
418 0 : println!("{output}");
419 :
420 0 : anyhow::Ok(())
421 0 : }
422 :
423 0 : async fn run_worker(
424 0 : args: &Args,
425 0 : mut client: Box<dyn Client>,
426 0 : shared_state: Arc<SharedState>,
427 0 : cancel: CancellationToken,
428 0 : rps_period: Option<Duration>,
429 0 : ranges: Vec<KeyRange>,
430 0 : weights: rand::distributions::weighted::WeightedIndex<i128>,
431 0 : ) {
432 0 : shared_state.start_work_barrier.wait().await;
433 0 : let client_start = Instant::now();
434 0 : let mut ticks_processed = 0;
435 0 : let mut req_id = 0;
436 0 : let batch_size: usize = args.batch_size.into();
437 :
438 : // Track inflight requests by request ID and start time. This times the request duration, and
439 : // ensures responses match requests. We don't expect responses back in any particular order.
440 : //
441 : // NB: this does not check that all requests received a response, because we don't wait for the
442 : // inflight requests to complete when the duration elapses.
443 0 : let mut inflight: HashMap<u64, Instant> = HashMap::new();
444 :
445 0 : while !cancel.is_cancelled() {
446 : // Detect if a request took longer than the RPS rate
447 0 : if let Some(period) = &rps_period {
448 0 : let periods_passed_until_now =
449 0 : usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
450 :
451 0 : if periods_passed_until_now > ticks_processed {
452 0 : shared_state
453 0 : .live_stats
454 0 : .missed((periods_passed_until_now - ticks_processed) as u64);
455 0 : }
456 0 : ticks_processed = periods_passed_until_now;
457 0 : }
458 :
459 0 : while inflight.len() < args.queue_depth.get() {
460 0 : req_id += 1;
461 0 : let start = Instant::now();
462 0 : let (req_lsn, mod_lsn, rel, blks) = {
463 : /// Converts a compact i128 key to a relation tag and block number.
464 0 : fn key_to_block(key: i128) -> (RelTag, u32) {
465 0 : let key = Key::from_i128(key);
466 0 : assert!(key.is_rel_block_key());
467 0 : key.to_rel_block()
468 0 : .expect("we filter non-rel-block keys out above")
469 0 : }
470 :
471 : // Pick a random page from a random relation.
472 0 : let mut rng = rand::thread_rng();
473 0 : let r = &ranges[weights.sample(&mut rng)];
474 0 : let key: i128 = rng.gen_range(r.start..r.end);
475 0 : let (rel_tag, block_no) = key_to_block(key);
476 :
477 0 : let mut blks = VecDeque::with_capacity(batch_size);
478 0 : blks.push_back(block_no);
479 :
480 : // If requested, populate a batch of sequential pages. This is how Postgres will
481 : // request page batches (e.g. prefetches). If we hit the end of the relation, we
482 : // grow the batch towards the start too.
483 0 : for i in 1..batch_size {
484 0 : let (r, b) = key_to_block(key + i as i128);
485 0 : if r != rel_tag {
486 0 : break; // went outside relation
487 0 : }
488 0 : blks.push_back(b)
489 : }
490 :
491 0 : if blks.len() < batch_size {
492 : // Grow batch backwards if needed.
493 0 : for i in 1..batch_size {
494 0 : let (r, b) = key_to_block(key - i as i128);
495 0 : if r != rel_tag {
496 0 : break; // went outside relation
497 0 : }
498 0 : blks.push_front(b)
499 : }
500 0 : }
501 :
502 : // We assume that the entire batch can fit within the relation.
503 0 : assert_eq!(blks.len(), batch_size, "incomplete batch");
504 :
505 0 : let req_lsn = if rng.gen_bool(args.req_latest_probability) {
506 0 : Lsn::MAX
507 : } else {
508 0 : r.timeline_lsn
509 : };
510 0 : (req_lsn, r.timeline_lsn, rel_tag, blks.into())
511 : };
512 0 : client
513 0 : .send_get_page(req_id, req_lsn, mod_lsn, rel, blks)
514 0 : .await
515 0 : .unwrap();
516 0 : let old = inflight.insert(req_id, start);
517 0 : assert!(old.is_none(), "duplicate request ID {req_id}");
518 : }
519 :
520 0 : let (req_id, pages) = client.recv_get_page().await.unwrap();
521 0 : assert_eq!(pages.len(), batch_size, "unexpected page count");
522 0 : assert!(pages.iter().all(|p| !p.is_empty()), "empty page");
523 0 : let start = inflight
524 0 : .remove(&req_id)
525 0 : .expect("response for unknown request ID");
526 0 : let end = Instant::now();
527 0 : shared_state.live_stats.request_done();
528 0 : ticks_processed += 1;
529 0 : STATS.with(|stats| {
530 0 : stats
531 0 : .borrow()
532 0 : .lock()
533 0 : .unwrap()
534 0 : .observe(end.duration_since(start))
535 0 : .unwrap();
536 0 : });
537 :
538 0 : if let Some(period) = &rps_period {
539 0 : let next_at = client_start
540 0 : + Duration::from_micros(
541 0 : (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
542 0 : );
543 0 : tokio::time::sleep_until(next_at.into()).await;
544 0 : }
545 : }
546 0 : }
547 :
548 : /// A benchmark client, to allow switching out the transport protocol.
549 : ///
550 : /// For simplicity, this just uses separate asynchronous send/recv methods. The send method could
551 : /// return a future that resolves when the response is received, but we don't really need it.
552 : #[async_trait]
553 : trait Client: Send {
554 : /// Sends an asynchronous GetPage request to the pageserver.
555 : async fn send_get_page(
556 : &mut self,
557 : req_id: u64,
558 : req_lsn: Lsn,
559 : mod_lsn: Lsn,
560 : rel: RelTag,
561 : blks: Vec<u32>,
562 : ) -> anyhow::Result<()>;
563 :
564 : /// Receives the next GetPage response from the pageserver.
565 : async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)>;
566 : }
567 :
568 : /// A libpq-based Pageserver client.
569 : struct LibpqClient {
570 : inner: pageserver_client::page_service::PagestreamClient,
571 : // Track sent batches, so we know how many responses to expect.
572 : batch_sizes: VecDeque<usize>,
573 : }
574 :
575 : impl LibpqClient {
576 0 : async fn new(connstring: &str, ttid: TenantTimelineId) -> anyhow::Result<Self> {
577 0 : let inner = pageserver_client::page_service::Client::new(connstring.to_string())
578 0 : .await?
579 0 : .pagestream(ttid.tenant_id, ttid.timeline_id)
580 0 : .await?;
581 0 : Ok(Self {
582 0 : inner,
583 0 : batch_sizes: VecDeque::new(),
584 0 : })
585 0 : }
586 : }
587 :
588 : #[async_trait]
589 : impl Client for LibpqClient {
590 0 : async fn send_get_page(
591 : &mut self,
592 : req_id: u64,
593 : req_lsn: Lsn,
594 : mod_lsn: Lsn,
595 : rel: RelTag,
596 : blks: Vec<u32>,
597 0 : ) -> anyhow::Result<()> {
598 : // libpq doesn't support client-side batches, so we send a bunch of individual requests
599 : // instead in the hope that the server will batch them for us. We use the same request ID
600 : // for all, because we'll return a single batch response.
601 0 : self.batch_sizes.push_back(blks.len());
602 0 : for blkno in blks {
603 0 : let req = PagestreamGetPageRequest {
604 0 : hdr: PagestreamRequest {
605 0 : reqid: req_id,
606 0 : request_lsn: req_lsn,
607 0 : not_modified_since: mod_lsn,
608 0 : },
609 0 : rel,
610 0 : blkno,
611 0 : };
612 0 : self.inner.getpage_send(req).await?;
613 : }
614 0 : Ok(())
615 0 : }
616 :
617 0 : async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
618 0 : let batch_size = self.batch_sizes.pop_front().unwrap();
619 0 : let mut batch = Vec::with_capacity(batch_size);
620 0 : let mut req_id = None;
621 0 : for _ in 0..batch_size {
622 0 : let resp = self.inner.getpage_recv().await?;
623 0 : if req_id.is_none() {
624 0 : req_id = Some(resp.req.hdr.reqid);
625 0 : }
626 0 : assert_eq!(req_id, Some(resp.req.hdr.reqid), "request ID mismatch");
627 0 : batch.push(resp.page);
628 : }
629 0 : Ok((req_id.unwrap(), batch))
630 0 : }
631 : }
632 :
633 : /// A gRPC Pageserver client.
634 : struct GrpcClient {
635 : req_tx: tokio::sync::mpsc::Sender<page_api::GetPageRequest>,
636 : resp_rx: Pin<Box<dyn Stream<Item = Result<page_api::GetPageResponse, tonic::Status>> + Send>>,
637 : }
638 :
639 : impl GrpcClient {
640 0 : async fn new(
641 0 : connstring: &str,
642 0 : ttid: TenantTimelineId,
643 0 : compression: bool,
644 0 : ) -> anyhow::Result<Self> {
645 0 : let mut client = page_api::Client::connect(
646 0 : connstring.to_string(),
647 0 : ttid.tenant_id,
648 0 : ttid.timeline_id,
649 0 : ShardIndex::unsharded(),
650 0 : None,
651 0 : compression.then_some(tonic::codec::CompressionEncoding::Zstd),
652 0 : )
653 0 : .await?;
654 :
655 : // The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the
656 : // benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are
657 : // buffered by Tonic and the OS too.
658 0 : let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
659 0 : let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
660 0 : let resp_rx = Box::pin(client.get_pages(req_stream).await?);
661 :
662 0 : Ok(Self { req_tx, resp_rx })
663 0 : }
664 : }
665 :
666 : #[async_trait]
667 : impl Client for GrpcClient {
668 0 : async fn send_get_page(
669 : &mut self,
670 : req_id: u64,
671 : req_lsn: Lsn,
672 : mod_lsn: Lsn,
673 : rel: RelTag,
674 : blks: Vec<u32>,
675 0 : ) -> anyhow::Result<()> {
676 0 : let req = page_api::GetPageRequest {
677 0 : request_id: req_id.into(),
678 0 : request_class: page_api::GetPageClass::Normal,
679 0 : read_lsn: page_api::ReadLsn {
680 0 : request_lsn: req_lsn,
681 0 : not_modified_since_lsn: Some(mod_lsn),
682 0 : },
683 0 : rel,
684 0 : block_numbers: blks,
685 0 : };
686 0 : self.req_tx.send(req).await?;
687 0 : Ok(())
688 0 : }
689 :
690 0 : async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
691 0 : let resp = self.resp_rx.next().await.unwrap().unwrap();
692 0 : anyhow::ensure!(
693 0 : resp.status_code == page_api::GetPageStatusCode::Ok,
694 0 : "unexpected status code: {}",
695 : resp.status_code,
696 : );
697 : Ok((
698 0 : resp.request_id.id,
699 0 : resp.pages.into_iter().map(|p| p.image).collect(),
700 : ))
701 0 : }
702 : }
703 :
704 : /// A rich gRPC Pageserver client.
705 : struct RichGrpcClient {
706 : inner: Arc<client_grpc::PageserverClient>,
707 : requests: FuturesUnordered<
708 : Pin<Box<dyn Future<Output = anyhow::Result<page_api::GetPageResponse>> + Send>>,
709 : >,
710 : }
711 :
712 : impl RichGrpcClient {
713 0 : async fn new(
714 0 : connstring: &str,
715 0 : ttid: TenantTimelineId,
716 0 : compression: bool,
717 0 : ) -> anyhow::Result<Self> {
718 0 : let inner = Arc::new(client_grpc::PageserverClient::new(
719 0 : ttid.tenant_id,
720 0 : ttid.timeline_id,
721 0 : ShardSpec::new(
722 0 : [(ShardIndex::unsharded(), connstring.to_string())].into(),
723 0 : None,
724 0 : )?,
725 0 : None,
726 0 : compression.then_some(tonic::codec::CompressionEncoding::Zstd),
727 0 : )?);
728 0 : Ok(Self {
729 0 : inner,
730 0 : requests: FuturesUnordered::new(),
731 0 : })
732 0 : }
733 : }
734 :
735 : #[async_trait]
736 : impl Client for RichGrpcClient {
737 0 : async fn send_get_page(
738 : &mut self,
739 : req_id: u64,
740 : req_lsn: Lsn,
741 : mod_lsn: Lsn,
742 : rel: RelTag,
743 : blks: Vec<u32>,
744 0 : ) -> anyhow::Result<()> {
745 0 : let req = page_api::GetPageRequest {
746 0 : request_id: req_id.into(),
747 0 : request_class: page_api::GetPageClass::Normal,
748 0 : read_lsn: page_api::ReadLsn {
749 0 : request_lsn: req_lsn,
750 0 : not_modified_since_lsn: Some(mod_lsn),
751 0 : },
752 0 : rel,
753 0 : block_numbers: blks,
754 0 : };
755 0 : let inner = self.inner.clone();
756 0 : self.requests.push(Box::pin(async move {
757 0 : inner
758 0 : .get_page(req)
759 0 : .await
760 0 : .map_err(|err| anyhow::anyhow!("{err}"))
761 0 : }));
762 0 : Ok(())
763 0 : }
764 :
765 0 : async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
766 0 : let resp = self.requests.next().await.unwrap()?;
767 : Ok((
768 0 : resp.request_id.id,
769 0 : resp.pages.into_iter().map(|p| p.image).collect(),
770 : ))
771 0 : }
772 : }
|