Line data Source code
1 : use std::collections::{HashSet, VecDeque};
2 : use std::future::Future;
3 : use std::num::NonZeroUsize;
4 : use std::pin::Pin;
5 : use std::sync::atomic::{AtomicU64, Ordering};
6 : use std::sync::{Arc, Mutex};
7 : use std::time::{Duration, Instant};
8 :
9 : use anyhow::Context;
10 : use camino::Utf8PathBuf;
11 : use pageserver_api::key::Key;
12 : use pageserver_api::keyspace::KeySpaceAccum;
13 : use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
14 : use pageserver_api::shard::TenantShardId;
15 : use rand::prelude::*;
16 : use tokio::task::JoinSet;
17 : use tokio_util::sync::CancellationToken;
18 : use tracing::info;
19 : use utils::id::TenantTimelineId;
20 : use utils::lsn::Lsn;
21 :
22 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
23 : use crate::util::{request_stats, tokio_thread_local_stats};
24 :
25 : /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
26 : #[derive(clap::Parser)]
27 : pub(crate) struct Args {
28 : #[clap(long, default_value = "http://localhost:9898")]
29 0 : mgmt_api_endpoint: String,
30 : #[clap(long, default_value = "postgres://postgres@localhost:64000")]
31 0 : page_service_connstring: String,
32 : #[clap(long)]
33 : pageserver_jwt: Option<String>,
34 : #[clap(long, default_value = "1")]
35 0 : num_clients: NonZeroUsize,
36 : #[clap(long)]
37 : runtime: Option<humantime::Duration>,
38 : /// Each client sends requests at the given rate.
39 : ///
40 : /// If a request takes too long and we should be issuing a new request already,
41 : /// we skip that request and account it as `MISSED`.
42 : #[clap(long)]
43 : per_client_rate: Option<usize>,
44 : /// Probability for sending `latest=true` in the request (uniform distribution).
45 : #[clap(long, default_value = "1")]
46 0 : req_latest_probability: f64,
47 : #[clap(long)]
48 : limit_to_first_n_targets: Option<usize>,
49 : /// For large pageserver installations, enumerating the keyspace takes a lot of time.
50 : /// If specified, the specified path is used to maintain a cache of the keyspace enumeration result.
51 : /// The cache is tagged and auto-invalided by the tenant/timeline ids only.
52 : /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
53 : #[clap(long)]
54 : keyspace_cache: Option<Utf8PathBuf>,
55 : /// Before starting the benchmark, live-reconfigure the pageserver to use the given
56 : /// [`pageserver_api::models::virtual_file::IoEngineKind`].
57 : #[clap(long)]
58 : set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
59 :
60 : /// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
61 : #[clap(long)]
62 : set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
63 :
64 : /// Queue depth generated in each client.
65 : #[clap(long, default_value = "1")]
66 0 : queue_depth: NonZeroUsize,
67 :
68 : #[clap(long)]
69 : only_relnode: Option<u32>,
70 :
71 0 : targets: Option<Vec<TenantTimelineId>>,
72 : }
73 :
74 : /// State shared by all clients
75 : #[derive(Debug)]
76 : struct SharedState {
77 : start_work_barrier: tokio::sync::Barrier,
78 : live_stats: LiveStats,
79 : }
80 :
81 : #[derive(Debug, Default)]
82 : struct LiveStats {
83 : completed_requests: AtomicU64,
84 : missed: AtomicU64,
85 : }
86 :
87 : impl LiveStats {
88 0 : fn request_done(&self) {
89 0 : self.completed_requests.fetch_add(1, Ordering::Relaxed);
90 0 : }
91 0 : fn missed(&self, n: u64) {
92 0 : self.missed.fetch_add(n, Ordering::Relaxed);
93 0 : }
94 : }
95 :
96 0 : #[derive(Clone, serde::Serialize, serde::Deserialize)]
97 : struct KeyRange {
98 : timeline: TenantTimelineId,
99 : timeline_lsn: Lsn,
100 : start: i128,
101 : end: i128,
102 : }
103 :
104 : impl KeyRange {
105 0 : fn len(&self) -> i128 {
106 0 : self.end - self.start
107 0 : }
108 : }
109 :
110 : #[derive(PartialEq, Eq, Hash, Copy, Clone)]
111 : struct WorkerId {
112 : timeline: TenantTimelineId,
113 : num_client: usize, // from 0..args.num_clients
114 : }
115 :
116 : #[derive(serde::Serialize)]
117 : struct Output {
118 : total: request_stats::Output,
119 : }
120 :
121 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
122 :
123 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
124 0 : tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
125 0 : main_impl(args, thread_local_stats)
126 0 : })
127 0 : }
128 :
129 0 : async fn main_impl(
130 0 : args: Args,
131 0 : all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
132 0 : ) -> anyhow::Result<()> {
133 0 : let args: &'static Args = Box::leak(Box::new(args));
134 0 :
135 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
136 0 : reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
137 0 : args.mgmt_api_endpoint.clone(),
138 0 : args.pageserver_jwt.as_deref(),
139 0 : ));
140 :
141 0 : if let Some(engine_str) = &args.set_io_engine {
142 0 : mgmt_api_client.put_io_engine(engine_str).await?;
143 0 : }
144 :
145 0 : if let Some(mode) = &args.set_io_mode {
146 0 : mgmt_api_client.put_io_mode(mode).await?;
147 0 : }
148 :
149 : // discover targets
150 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
151 0 : &mgmt_api_client,
152 0 : crate::util::cli::targets::Spec {
153 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
154 0 : targets: args.targets.clone(),
155 0 : },
156 0 : )
157 0 : .await?;
158 :
159 0 : #[derive(serde::Deserialize)]
160 : struct KeyspaceCacheDe {
161 : tag: Vec<TenantTimelineId>,
162 : data: Vec<KeyRange>,
163 : }
164 : #[derive(serde::Serialize)]
165 : struct KeyspaceCacheSer<'a> {
166 : tag: &'a [TenantTimelineId],
167 : data: &'a [KeyRange],
168 : }
169 0 : let cache = args
170 0 : .keyspace_cache
171 0 : .as_ref()
172 0 : .map(|keyspace_cache_file| {
173 0 : let contents = match std::fs::read(keyspace_cache_file) {
174 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
175 0 : return anyhow::Ok(None);
176 : }
177 0 : x => x.context("read keyspace cache file")?,
178 : };
179 0 : let cache: KeyspaceCacheDe =
180 0 : serde_json::from_slice(&contents).context("deserialize cache file")?;
181 0 : let tag_ok = HashSet::<TenantTimelineId>::from_iter(cache.tag.into_iter())
182 0 : == HashSet::from_iter(timelines.iter().cloned());
183 0 : info!("keyspace cache file matches tag: {tag_ok}");
184 0 : anyhow::Ok(if tag_ok { Some(cache.data) } else { None })
185 0 : })
186 0 : .transpose()?
187 0 : .flatten();
188 0 : let all_ranges: Vec<KeyRange> = if let Some(cached) = cache {
189 0 : info!("using keyspace cache file");
190 0 : cached
191 : } else {
192 0 : let mut js = JoinSet::new();
193 0 : for timeline in &timelines {
194 0 : js.spawn({
195 0 : let mgmt_api_client = Arc::clone(&mgmt_api_client);
196 0 : let timeline = *timeline;
197 0 : async move {
198 0 : let partitioning = mgmt_api_client
199 0 : .keyspace(
200 0 : TenantShardId::unsharded(timeline.tenant_id),
201 0 : timeline.timeline_id,
202 0 : )
203 0 : .await?;
204 0 : let lsn = partitioning.at_lsn;
205 0 : let start = Instant::now();
206 0 : let mut filtered = KeySpaceAccum::new();
207 : // let's hope this is inlined and vectorized...
208 : // TODO: turn this loop into a is_rel_block_range() function.
209 0 : for r in partitioning.keys.ranges.iter() {
210 0 : let mut i = r.start;
211 0 : while i != r.end {
212 0 : let mut include = true;
213 0 : include &= i.is_rel_block_key();
214 0 : if let Some(only_relnode) = args.only_relnode {
215 0 : include &= i.is_rel_block_of_rel(only_relnode);
216 0 : }
217 0 : if include {
218 0 : filtered.add_key(i);
219 0 : }
220 0 : i = i.next();
221 : }
222 : }
223 0 : let filtered = filtered.to_keyspace();
224 0 : let filter_duration = start.elapsed();
225 0 :
226 0 : anyhow::Ok((
227 0 : filter_duration,
228 0 : filtered.ranges.into_iter().map(move |r| KeyRange {
229 0 : timeline,
230 0 : timeline_lsn: lsn,
231 0 : start: r.start.to_i128(),
232 0 : end: r.end.to_i128(),
233 0 : }),
234 0 : ))
235 0 : }
236 0 : });
237 0 : }
238 0 : let mut total_filter_duration = Duration::from_secs(0);
239 0 : let mut all_ranges: Vec<KeyRange> = Vec::new();
240 0 : while let Some(res) = js.join_next().await {
241 0 : let (filter_duration, range) = res.unwrap().unwrap();
242 0 : all_ranges.extend(range);
243 0 : total_filter_duration += filter_duration;
244 0 : }
245 0 : info!("filter duration: {}", total_filter_duration.as_secs_f64());
246 0 : if let Some(cachefile) = args.keyspace_cache.as_ref() {
247 0 : let cache = KeyspaceCacheSer {
248 0 : tag: &timelines,
249 0 : data: &all_ranges,
250 0 : };
251 0 : let bytes = serde_json::to_vec(&cache).context("serialize keyspace for cache file")?;
252 0 : std::fs::write(cachefile, bytes).context("write keyspace cache file to disk")?;
253 0 : info!("successfully wrote keyspace cache file");
254 0 : }
255 0 : all_ranges
256 : };
257 :
258 0 : let num_live_stats_dump = 1;
259 0 : let num_work_sender_tasks = args.num_clients.get() * timelines.len();
260 0 : let num_main_impl = 1;
261 0 :
262 0 : let shared_state = Arc::new(SharedState {
263 0 : start_work_barrier: tokio::sync::Barrier::new(
264 0 : num_live_stats_dump + num_work_sender_tasks + num_main_impl,
265 0 : ),
266 0 : live_stats: LiveStats::default(),
267 0 : });
268 0 : let cancel = CancellationToken::new();
269 0 :
270 0 : let ss = shared_state.clone();
271 0 : tokio::spawn({
272 0 : async move {
273 0 : ss.start_work_barrier.wait().await;
274 : loop {
275 0 : let start = std::time::Instant::now();
276 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
277 0 : let stats = &ss.live_stats;
278 0 : let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
279 0 : let missed = stats.missed.swap(0, Ordering::Relaxed);
280 0 : let elapsed = start.elapsed();
281 0 : info!(
282 0 : "RPS: {:.0} MISSED: {:.0}",
283 0 : completed_requests as f64 / elapsed.as_secs_f64(),
284 0 : missed as f64 / elapsed.as_secs_f64()
285 : );
286 : }
287 0 : }
288 0 : });
289 0 :
290 0 : let rps_period = args
291 0 : .per_client_rate
292 0 : .map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
293 0 : let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
294 0 : let ss = shared_state.clone();
295 0 : let cancel = cancel.clone();
296 0 : let ranges: Vec<KeyRange> = all_ranges
297 0 : .iter()
298 0 : .filter(|r| r.timeline == worker_id.timeline)
299 0 : .cloned()
300 0 : .collect();
301 0 : let weights =
302 0 : rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
303 0 : .unwrap();
304 0 :
305 0 : Box::pin(async move {
306 0 : client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
307 0 : })
308 0 : };
309 :
310 0 : info!("spawning workers");
311 0 : let mut workers = JoinSet::new();
312 0 : for timeline in timelines.iter().cloned() {
313 0 : for num_client in 0..args.num_clients.get() {
314 0 : let worker_id = WorkerId {
315 0 : timeline,
316 0 : num_client,
317 0 : };
318 0 : workers.spawn(make_worker(worker_id));
319 0 : }
320 : }
321 0 : let workers = async move {
322 0 : while let Some(res) = workers.join_next().await {
323 0 : res.unwrap();
324 0 : }
325 0 : };
326 :
327 0 : info!("waiting for everything to become ready");
328 0 : shared_state.start_work_barrier.wait().await;
329 0 : info!("work started");
330 0 : if let Some(runtime) = args.runtime {
331 0 : tokio::time::sleep(runtime.into()).await;
332 0 : info!("runtime over, signalling cancellation");
333 0 : cancel.cancel();
334 0 : workers.await;
335 0 : info!("work sender exited");
336 : } else {
337 0 : workers.await;
338 0 : unreachable!("work sender never terminates");
339 : }
340 :
341 0 : let output = Output {
342 : total: {
343 0 : let mut agg_stats = request_stats::Stats::new();
344 0 : for stats in all_thread_local_stats.lock().unwrap().iter() {
345 0 : let stats = stats.lock().unwrap();
346 0 : agg_stats.add(&stats);
347 0 : }
348 0 : agg_stats.output()
349 0 : },
350 0 : };
351 0 :
352 0 : let output = serde_json::to_string_pretty(&output).unwrap();
353 0 : println!("{output}");
354 0 :
355 0 : anyhow::Ok(())
356 0 : }
357 :
358 0 : async fn client_libpq(
359 0 : args: &Args,
360 0 : worker_id: WorkerId,
361 0 : shared_state: Arc<SharedState>,
362 0 : cancel: CancellationToken,
363 0 : rps_period: Option<Duration>,
364 0 : ranges: Vec<KeyRange>,
365 0 : weights: rand::distributions::weighted::WeightedIndex<i128>,
366 0 : ) {
367 0 : let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
368 0 : .await
369 0 : .unwrap();
370 0 : let mut client = client
371 0 : .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
372 0 : .await
373 0 : .unwrap();
374 0 :
375 0 : shared_state.start_work_barrier.wait().await;
376 0 : let client_start = Instant::now();
377 0 : let mut ticks_processed = 0;
378 0 : let mut inflight = VecDeque::new();
379 0 : while !cancel.is_cancelled() {
380 : // Detect if a request took longer than the RPS rate
381 0 : if let Some(period) = &rps_period {
382 0 : let periods_passed_until_now =
383 0 : usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
384 0 :
385 0 : if periods_passed_until_now > ticks_processed {
386 0 : shared_state
387 0 : .live_stats
388 0 : .missed((periods_passed_until_now - ticks_processed) as u64);
389 0 : }
390 0 : ticks_processed = periods_passed_until_now;
391 0 : }
392 :
393 0 : while inflight.len() < args.queue_depth.get() {
394 0 : let start = Instant::now();
395 0 : let req = {
396 0 : let mut rng = rand::thread_rng();
397 0 : let r = &ranges[weights.sample(&mut rng)];
398 0 : let key: i128 = rng.gen_range(r.start..r.end);
399 0 : let key = Key::from_i128(key);
400 0 : assert!(key.is_rel_block_key());
401 0 : let (rel_tag, block_no) = key
402 0 : .to_rel_block()
403 0 : .expect("we filter non-rel-block keys out above");
404 0 : PagestreamGetPageRequest {
405 0 : hdr: PagestreamRequest {
406 0 : reqid: 0,
407 0 : request_lsn: if rng.gen_bool(args.req_latest_probability) {
408 0 : Lsn::MAX
409 : } else {
410 0 : r.timeline_lsn
411 : },
412 0 : not_modified_since: r.timeline_lsn,
413 0 : },
414 0 : rel: rel_tag,
415 0 : blkno: block_no,
416 0 : }
417 0 : };
418 0 : client.getpage_send(req).await.unwrap();
419 0 : inflight.push_back(start);
420 : }
421 :
422 0 : let start = inflight.pop_front().unwrap();
423 0 : client.getpage_recv().await.unwrap();
424 0 : let end = Instant::now();
425 0 : shared_state.live_stats.request_done();
426 0 : ticks_processed += 1;
427 0 : STATS.with(|stats| {
428 0 : stats
429 0 : .borrow()
430 0 : .lock()
431 0 : .unwrap()
432 0 : .observe(end.duration_since(start))
433 0 : .unwrap();
434 0 : });
435 :
436 0 : if let Some(period) = &rps_period {
437 0 : let next_at = client_start
438 0 : + Duration::from_micros(
439 0 : (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
440 0 : );
441 0 : tokio::time::sleep_until(next_at.into()).await;
442 0 : }
443 : }
444 0 : }
|