Line data Source code
1 : use anyhow::Context;
2 : use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
3 : use pageserver_client::page_service::BasebackupRequest;
4 :
5 : use utils::id::TenantTimelineId;
6 : use utils::lsn::Lsn;
7 :
8 : use rand::prelude::*;
9 : use tokio::sync::Barrier;
10 : use tokio::task::JoinSet;
11 : use tracing::{debug, info, instrument};
12 :
13 : use std::collections::HashMap;
14 : use std::num::NonZeroUsize;
15 : use std::ops::Range;
16 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17 : use std::sync::{Arc, Mutex};
18 : use std::time::Instant;
19 :
20 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
21 : use crate::util::{request_stats, tokio_thread_local_stats};
22 :
23 : /// basebackup@LatestLSN
24 0 : #[derive(clap::Parser)]
25 : pub(crate) struct Args {
26 : #[clap(long, default_value = "http://localhost:9898")]
27 0 : mgmt_api_endpoint: String,
28 : #[clap(long, default_value = "localhost:64000")]
29 0 : page_service_host_port: String,
30 : #[clap(long)]
31 : pageserver_jwt: Option<String>,
32 : #[clap(long, default_value = "1")]
33 0 : num_clients: NonZeroUsize,
34 : #[clap(long, default_value = "1.0")]
35 0 : gzip_probability: f64,
36 : #[clap(long)]
37 : runtime: Option<humantime::Duration>,
38 : #[clap(long)]
39 : limit_to_first_n_targets: Option<usize>,
40 0 : targets: Option<Vec<TenantTimelineId>>,
41 : }
42 :
43 0 : #[derive(Debug, Default)]
44 : struct LiveStats {
45 : completed_requests: AtomicU64,
46 : }
47 :
48 : impl LiveStats {
49 0 : fn inc(&self) {
50 0 : self.completed_requests.fetch_add(1, Ordering::Relaxed);
51 0 : }
52 : }
53 :
54 : struct Target {
55 : timeline: TenantTimelineId,
56 : lsn_range: Option<Range<Lsn>>,
57 : }
58 :
59 0 : #[derive(serde::Serialize)]
60 : struct Output {
61 : total: request_stats::Output,
62 : }
63 :
64 0 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
65 :
66 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
67 0 : tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
68 0 : main_impl(args, thread_local_stats)
69 0 : })
70 0 : }
71 :
72 0 : async fn main_impl(
73 0 : args: Args,
74 0 : all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
75 0 : ) -> anyhow::Result<()> {
76 0 : let args: &'static Args = Box::leak(Box::new(args));
77 0 :
78 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
79 0 : args.mgmt_api_endpoint.clone(),
80 0 : args.pageserver_jwt.as_deref(),
81 0 : ));
82 :
83 : // discover targets
84 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
85 0 : &mgmt_api_client,
86 0 : crate::util::cli::targets::Spec {
87 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
88 0 : targets: args.targets.clone(),
89 0 : },
90 0 : )
91 0 : .await?;
92 0 : let mut js = JoinSet::new();
93 0 : for timeline in &timelines {
94 0 : js.spawn({
95 0 : let timeline = *timeline;
96 0 : let info = mgmt_api_client
97 0 : .timeline_info(
98 0 : timeline.tenant_id,
99 0 : timeline.timeline_id,
100 0 : ForceAwaitLogicalSize::No,
101 0 : )
102 0 : .await
103 0 : .unwrap();
104 0 : async move {
105 0 : anyhow::Ok(Target {
106 0 : timeline,
107 0 : // TODO: support lsn_range != latest LSN
108 0 : lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
109 0 : })
110 0 : }
111 : });
112 : }
113 0 : let mut all_targets: Vec<Target> = Vec::new();
114 0 : while let Some(res) = js.join_next().await {
115 0 : all_targets.push(res.unwrap().unwrap());
116 0 : }
117 :
118 0 : let live_stats = Arc::new(LiveStats::default());
119 0 :
120 0 : let num_client_tasks = timelines.len();
121 0 : let num_live_stats_dump = 1;
122 0 : let num_work_sender_tasks = 1;
123 0 :
124 0 : let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
125 0 : num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
126 0 : ));
127 0 : let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
128 0 :
129 0 : tokio::spawn({
130 0 : let stats = Arc::clone(&live_stats);
131 0 : let start_work_barrier = Arc::clone(&start_work_barrier);
132 0 : async move {
133 0 : start_work_barrier.wait().await;
134 : loop {
135 0 : let start = std::time::Instant::now();
136 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
137 0 : let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
138 0 : let elapsed = start.elapsed();
139 0 : info!(
140 0 : "RPS: {:.0}",
141 0 : completed_requests as f64 / elapsed.as_secs_f64()
142 0 : );
143 : }
144 0 : }
145 0 : });
146 0 :
147 0 : let mut work_senders = HashMap::new();
148 0 : let mut tasks = Vec::new();
149 0 : for tl in &timelines {
150 0 : let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
151 0 : work_senders.insert(tl, sender);
152 0 : tasks.push(tokio::spawn(client(
153 0 : args,
154 0 : *tl,
155 0 : Arc::clone(&start_work_barrier),
156 0 : receiver,
157 0 : Arc::clone(&all_work_done_barrier),
158 0 : Arc::clone(&live_stats),
159 0 : )));
160 0 : }
161 :
162 0 : let work_sender = async move {
163 0 : start_work_barrier.wait().await;
164 : loop {
165 0 : let (timeline, work) = {
166 0 : let mut rng = rand::thread_rng();
167 0 : let target = all_targets.choose(&mut rng).unwrap();
168 0 : let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
169 0 : (
170 0 : target.timeline,
171 0 : Work {
172 0 : lsn,
173 0 : gzip: rng.gen_bool(args.gzip_probability),
174 0 : },
175 0 : )
176 0 : };
177 0 : let sender = work_senders.get(&timeline).unwrap();
178 0 : // TODO: what if this blocks?
179 0 : sender.send(work).await.ok().unwrap();
180 : }
181 : };
182 :
183 0 : if let Some(runtime) = args.runtime {
184 0 : match tokio::time::timeout(runtime.into(), work_sender).await {
185 0 : Ok(()) => unreachable!("work sender never terminates"),
186 0 : Err(_timeout) => {
187 0 : // this implicitly drops the work_senders, making all the clients exit
188 0 : }
189 : }
190 : } else {
191 0 : work_sender.await;
192 0 : unreachable!("work sender never terminates");
193 : }
194 :
195 0 : for t in tasks {
196 0 : t.await.unwrap();
197 : }
198 :
199 0 : let output = Output {
200 : total: {
201 0 : let mut agg_stats = request_stats::Stats::new();
202 0 : for stats in all_thread_local_stats.lock().unwrap().iter() {
203 0 : let stats = stats.lock().unwrap();
204 0 : agg_stats.add(&stats);
205 0 : }
206 0 : agg_stats.output()
207 0 : },
208 0 : };
209 0 :
210 0 : let output = serde_json::to_string_pretty(&output).unwrap();
211 0 : println!("{output}");
212 0 :
213 0 : anyhow::Ok(())
214 0 : }
215 :
216 0 : #[derive(Copy, Clone)]
217 : struct Work {
218 : lsn: Option<Lsn>,
219 : gzip: bool,
220 : }
221 :
222 0 : #[instrument(skip_all)]
223 : async fn client(
224 : args: &'static Args,
225 : timeline: TenantTimelineId,
226 : start_work_barrier: Arc<Barrier>,
227 : mut work: tokio::sync::mpsc::Receiver<Work>,
228 : all_work_done_barrier: Arc<Barrier>,
229 : live_stats: Arc<LiveStats>,
230 : ) {
231 : start_work_barrier.wait().await;
232 :
233 : let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
234 : &args.page_service_host_port,
235 : args.pageserver_jwt.as_deref(),
236 : ))
237 : .await
238 : .unwrap();
239 :
240 : while let Some(Work { lsn, gzip }) = work.recv().await {
241 : let start = Instant::now();
242 : let copy_out_stream = client
243 : .basebackup(&BasebackupRequest {
244 : tenant_id: timeline.tenant_id,
245 : timeline_id: timeline.timeline_id,
246 : lsn,
247 : gzip,
248 : })
249 : .await
250 0 : .with_context(|| format!("start basebackup for {timeline}"))
251 : .unwrap();
252 :
253 : use futures::StreamExt;
254 : let size = Arc::new(AtomicUsize::new(0));
255 : copy_out_stream
256 : .for_each({
257 0 : |r| {
258 0 : let size = Arc::clone(&size);
259 0 : async move {
260 0 : let size = Arc::clone(&size);
261 0 : size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
262 0 : }
263 0 : }
264 : })
265 : .await;
266 0 : debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
267 : let elapsed = start.elapsed();
268 : live_stats.inc();
269 0 : STATS.with(|stats| {
270 0 : stats.borrow().lock().unwrap().observe(elapsed).unwrap();
271 0 : });
272 : }
273 :
274 : all_work_done_barrier.wait().await;
275 : }
|