Line data Source code
1 : use anyhow::Context;
2 : use pageserver_api::shard::TenantShardId;
3 : use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
4 : use pageserver_client::page_service::BasebackupRequest;
5 :
6 : use utils::id::TenantTimelineId;
7 : use utils::lsn::Lsn;
8 :
9 : use rand::prelude::*;
10 : use tokio::sync::Barrier;
11 : use tokio::task::JoinSet;
12 : use tracing::{info, instrument};
13 :
14 : use std::collections::HashMap;
15 : use std::num::NonZeroUsize;
16 : use std::ops::Range;
17 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
18 : use std::sync::{Arc, Mutex};
19 : use std::time::Instant;
20 :
21 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
22 : use crate::util::{request_stats, tokio_thread_local_stats};
23 :
24 : /// basebackup@LatestLSN
25 0 : #[derive(clap::Parser)]
26 : pub(crate) struct Args {
27 : #[clap(long, default_value = "http://localhost:9898")]
28 0 : mgmt_api_endpoint: String,
29 : #[clap(long, default_value = "postgres://postgres@localhost:64000")]
30 0 : page_service_connstring: String,
31 : #[clap(long)]
32 : pageserver_jwt: Option<String>,
33 : #[clap(long, default_value = "1")]
34 0 : num_clients: NonZeroUsize,
35 : #[clap(long, default_value = "1.0")]
36 0 : gzip_probability: f64,
37 : #[clap(long)]
38 : runtime: Option<humantime::Duration>,
39 : #[clap(long)]
40 : limit_to_first_n_targets: Option<usize>,
41 0 : targets: Option<Vec<TenantTimelineId>>,
42 : }
43 :
44 : #[derive(Debug, Default)]
45 : struct LiveStats {
46 : completed_requests: AtomicU64,
47 : }
48 :
49 : impl LiveStats {
50 0 : fn inc(&self) {
51 0 : self.completed_requests.fetch_add(1, Ordering::Relaxed);
52 0 : }
53 : }
54 :
55 : struct Target {
56 : timeline: TenantTimelineId,
57 : lsn_range: Option<Range<Lsn>>,
58 : }
59 :
60 : #[derive(serde::Serialize)]
61 : struct Output {
62 : total: request_stats::Output,
63 : }
64 :
65 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
66 :
67 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
68 0 : tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
69 0 : main_impl(args, thread_local_stats)
70 0 : })
71 0 : }
72 :
73 0 : async fn main_impl(
74 0 : args: Args,
75 0 : all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
76 0 : ) -> anyhow::Result<()> {
77 0 : let args: &'static Args = Box::leak(Box::new(args));
78 0 :
79 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
80 0 : args.mgmt_api_endpoint.clone(),
81 0 : args.pageserver_jwt.as_deref(),
82 0 : ));
83 :
84 : // discover targets
85 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
86 0 : &mgmt_api_client,
87 0 : crate::util::cli::targets::Spec {
88 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
89 0 : targets: args.targets.clone(),
90 0 : },
91 0 : )
92 0 : .await?;
93 0 : let mut js = JoinSet::new();
94 0 : for timeline in &timelines {
95 0 : js.spawn({
96 0 : let timeline = *timeline;
97 0 : let info = mgmt_api_client
98 0 : .timeline_info(
99 0 : TenantShardId::unsharded(timeline.tenant_id),
100 0 : timeline.timeline_id,
101 0 : ForceAwaitLogicalSize::No,
102 0 : )
103 0 : .await
104 0 : .unwrap();
105 0 : async move {
106 0 : anyhow::Ok(Target {
107 0 : timeline,
108 0 : // TODO: support lsn_range != latest LSN
109 0 : lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
110 0 : })
111 0 : }
112 0 : });
113 0 : }
114 0 : let mut all_targets: Vec<Target> = Vec::new();
115 0 : while let Some(res) = js.join_next().await {
116 0 : all_targets.push(res.unwrap().unwrap());
117 0 : }
118 :
119 0 : let live_stats = Arc::new(LiveStats::default());
120 0 :
121 0 : let num_client_tasks = timelines.len();
122 0 : let num_live_stats_dump = 1;
123 0 : let num_work_sender_tasks = 1;
124 0 :
125 0 : let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
126 0 : num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
127 0 : ));
128 0 : let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
129 0 :
130 0 : tokio::spawn({
131 0 : let stats = Arc::clone(&live_stats);
132 0 : let start_work_barrier = Arc::clone(&start_work_barrier);
133 0 : async move {
134 0 : start_work_barrier.wait().await;
135 : loop {
136 0 : let start = std::time::Instant::now();
137 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
138 0 : let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
139 0 : let elapsed = start.elapsed();
140 0 : info!(
141 0 : "RPS: {:.0}",
142 0 : completed_requests as f64 / elapsed.as_secs_f64()
143 : );
144 : }
145 0 : }
146 0 : });
147 0 :
148 0 : let mut work_senders = HashMap::new();
149 0 : let mut tasks = Vec::new();
150 0 : for tl in &timelines {
151 0 : let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
152 0 : work_senders.insert(tl, sender);
153 0 : tasks.push(tokio::spawn(client(
154 0 : args,
155 0 : *tl,
156 0 : Arc::clone(&start_work_barrier),
157 0 : receiver,
158 0 : Arc::clone(&all_work_done_barrier),
159 0 : Arc::clone(&live_stats),
160 0 : )));
161 0 : }
162 :
163 0 : let work_sender = async move {
164 0 : start_work_barrier.wait().await;
165 : loop {
166 0 : let (timeline, work) = {
167 0 : let mut rng = rand::thread_rng();
168 0 : let target = all_targets.choose(&mut rng).unwrap();
169 0 : let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
170 0 : (
171 0 : target.timeline,
172 0 : Work {
173 0 : lsn,
174 0 : gzip: rng.gen_bool(args.gzip_probability),
175 0 : },
176 0 : )
177 0 : };
178 0 : let sender = work_senders.get(&timeline).unwrap();
179 0 : // TODO: what if this blocks?
180 0 : sender.send(work).await.ok().unwrap();
181 : }
182 : };
183 :
184 0 : if let Some(runtime) = args.runtime {
185 0 : match tokio::time::timeout(runtime.into(), work_sender).await {
186 0 : Ok(()) => unreachable!("work sender never terminates"),
187 0 : Err(_timeout) => {
188 0 : // this implicitly drops the work_senders, making all the clients exit
189 0 : }
190 : }
191 : } else {
192 0 : work_sender.await;
193 0 : unreachable!("work sender never terminates");
194 : }
195 :
196 0 : for t in tasks {
197 0 : t.await.unwrap();
198 : }
199 :
200 0 : let output = Output {
201 : total: {
202 0 : let mut agg_stats = request_stats::Stats::new();
203 0 : for stats in all_thread_local_stats.lock().unwrap().iter() {
204 0 : let stats = stats.lock().unwrap();
205 0 : agg_stats.add(&stats);
206 0 : }
207 0 : agg_stats.output()
208 0 : },
209 0 : };
210 0 :
211 0 : let output = serde_json::to_string_pretty(&output).unwrap();
212 0 : println!("{output}");
213 0 :
214 0 : anyhow::Ok(())
215 0 : }
216 :
217 : #[derive(Copy, Clone)]
218 : struct Work {
219 : lsn: Option<Lsn>,
220 : gzip: bool,
221 : }
222 :
223 0 : #[instrument(skip_all)]
224 : async fn client(
225 : args: &'static Args,
226 : timeline: TenantTimelineId,
227 : start_work_barrier: Arc<Barrier>,
228 : mut work: tokio::sync::mpsc::Receiver<Work>,
229 : all_work_done_barrier: Arc<Barrier>,
230 : live_stats: Arc<LiveStats>,
231 : ) {
232 : start_work_barrier.wait().await;
233 :
234 : let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
235 : .await
236 : .unwrap();
237 :
238 : while let Some(Work { lsn, gzip }) = work.recv().await {
239 : let start = Instant::now();
240 : let copy_out_stream = client
241 : .basebackup(&BasebackupRequest {
242 : tenant_id: timeline.tenant_id,
243 : timeline_id: timeline.timeline_id,
244 : lsn,
245 : gzip,
246 : })
247 : .await
248 0 : .with_context(|| format!("start basebackup for {timeline}"))
249 : .unwrap();
250 :
251 : use futures::StreamExt;
252 : let size = Arc::new(AtomicUsize::new(0));
253 : copy_out_stream
254 : .for_each({
255 0 : |r| {
256 0 : let size = Arc::clone(&size);
257 0 : async move {
258 0 : let size = Arc::clone(&size);
259 0 : size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
260 0 : }
261 0 : }
262 : })
263 : .await;
264 : info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
265 : let elapsed = start.elapsed();
266 : live_stats.inc();
267 0 : STATS.with(|stats| {
268 0 : stats.borrow().lock().unwrap().observe(elapsed).unwrap();
269 0 : });
270 : }
271 :
272 : all_work_done_barrier.wait().await;
273 : }
|