LCOV - code coverage report
Current view: top level - pageserver/pagebench/src/cmd - basebackup.rs (source / functions) Coverage Total Hit
Test: 91bf6c8f32e5e69adde6241313e732fdd6d6e277.info Lines: 0.0 % 149 0
Test Date: 2025-03-04 12:19:20 Functions: 0.0 % 23 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::num::NonZeroUsize;
       3              : use std::ops::Range;
       4              : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
       5              : use std::sync::{Arc, Mutex};
       6              : use std::time::Instant;
       7              : 
       8              : use anyhow::Context;
       9              : use pageserver_api::shard::TenantShardId;
      10              : use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
      11              : use pageserver_client::page_service::BasebackupRequest;
      12              : use rand::prelude::*;
      13              : use tokio::sync::Barrier;
      14              : use tokio::task::JoinSet;
      15              : use tracing::{info, instrument};
      16              : use utils::id::TenantTimelineId;
      17              : use utils::lsn::Lsn;
      18              : 
      19              : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
      20              : use crate::util::{request_stats, tokio_thread_local_stats};
      21              : 
      22              : /// basebackup@LatestLSN
      23              : #[derive(clap::Parser)]
      24              : pub(crate) struct Args {
      25              :     #[clap(long, default_value = "http://localhost:9898")]
      26            0 :     mgmt_api_endpoint: String,
      27              :     #[clap(long, default_value = "postgres://postgres@localhost:64000")]
      28            0 :     page_service_connstring: String,
      29              :     #[clap(long)]
      30              :     pageserver_jwt: Option<String>,
      31              :     #[clap(long, default_value = "1")]
      32            0 :     num_clients: NonZeroUsize,
      33              :     #[clap(long, default_value = "1.0")]
      34            0 :     gzip_probability: f64,
      35              :     #[clap(long)]
      36              :     runtime: Option<humantime::Duration>,
      37              :     #[clap(long)]
      38              :     limit_to_first_n_targets: Option<usize>,
      39            0 :     targets: Option<Vec<TenantTimelineId>>,
      40              : }
      41              : 
      42              : #[derive(Debug, Default)]
      43              : struct LiveStats {
      44              :     completed_requests: AtomicU64,
      45              : }
      46              : 
      47              : impl LiveStats {
      48            0 :     fn inc(&self) {
      49            0 :         self.completed_requests.fetch_add(1, Ordering::Relaxed);
      50            0 :     }
      51              : }
      52              : 
      53              : struct Target {
      54              :     timeline: TenantTimelineId,
      55              :     lsn_range: Option<Range<Lsn>>,
      56              : }
      57              : 
      58              : #[derive(serde::Serialize)]
      59              : struct Output {
      60              :     total: request_stats::Output,
      61              : }
      62              : 
      63              : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
      64              : 
      65            0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
      66            0 :     tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
      67            0 :         main_impl(args, thread_local_stats)
      68            0 :     })
      69            0 : }
      70              : 
      71            0 : async fn main_impl(
      72            0 :     args: Args,
      73            0 :     all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
      74            0 : ) -> anyhow::Result<()> {
      75            0 :     let args: &'static Args = Box::leak(Box::new(args));
      76            0 : 
      77            0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
      78            0 :         args.mgmt_api_endpoint.clone(),
      79            0 :         args.pageserver_jwt.as_deref(),
      80            0 :     ));
      81              : 
      82              :     // discover targets
      83            0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
      84            0 :         &mgmt_api_client,
      85            0 :         crate::util::cli::targets::Spec {
      86            0 :             limit_to_first_n_targets: args.limit_to_first_n_targets,
      87            0 :             targets: args.targets.clone(),
      88            0 :         },
      89            0 :     )
      90            0 :     .await?;
      91            0 :     let mut js = JoinSet::new();
      92            0 :     for timeline in &timelines {
      93            0 :         js.spawn({
      94            0 :             let timeline = *timeline;
      95            0 :             let info = mgmt_api_client
      96            0 :                 .timeline_info(
      97            0 :                     TenantShardId::unsharded(timeline.tenant_id),
      98            0 :                     timeline.timeline_id,
      99            0 :                     ForceAwaitLogicalSize::No,
     100            0 :                 )
     101            0 :                 .await
     102            0 :                 .unwrap();
     103            0 :             async move {
     104            0 :                 anyhow::Ok(Target {
     105            0 :                     timeline,
     106            0 :                     // TODO: support lsn_range != latest LSN
     107            0 :                     lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
     108            0 :                 })
     109            0 :             }
     110            0 :         });
     111            0 :     }
     112            0 :     let mut all_targets: Vec<Target> = Vec::new();
     113            0 :     while let Some(res) = js.join_next().await {
     114            0 :         all_targets.push(res.unwrap().unwrap());
     115            0 :     }
     116              : 
     117            0 :     let live_stats = Arc::new(LiveStats::default());
     118            0 : 
     119            0 :     let num_client_tasks = timelines.len();
     120            0 :     let num_live_stats_dump = 1;
     121            0 :     let num_work_sender_tasks = 1;
     122            0 : 
     123            0 :     let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
     124            0 :         num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
     125            0 :     ));
     126            0 :     let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
     127            0 : 
     128            0 :     tokio::spawn({
     129            0 :         let stats = Arc::clone(&live_stats);
     130            0 :         let start_work_barrier = Arc::clone(&start_work_barrier);
     131            0 :         async move {
     132            0 :             start_work_barrier.wait().await;
     133              :             loop {
     134            0 :                 let start = std::time::Instant::now();
     135            0 :                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     136            0 :                 let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
     137            0 :                 let elapsed = start.elapsed();
     138            0 :                 info!(
     139            0 :                     "RPS: {:.0}",
     140            0 :                     completed_requests as f64 / elapsed.as_secs_f64()
     141              :                 );
     142              :             }
     143            0 :         }
     144            0 :     });
     145            0 : 
     146            0 :     let mut work_senders = HashMap::new();
     147            0 :     let mut tasks = Vec::new();
     148            0 :     for tl in &timelines {
     149            0 :         let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
     150            0 :         work_senders.insert(tl, sender);
     151            0 :         tasks.push(tokio::spawn(client(
     152            0 :             args,
     153            0 :             *tl,
     154            0 :             Arc::clone(&start_work_barrier),
     155            0 :             receiver,
     156            0 :             Arc::clone(&all_work_done_barrier),
     157            0 :             Arc::clone(&live_stats),
     158            0 :         )));
     159            0 :     }
     160              : 
     161            0 :     let work_sender = async move {
     162            0 :         start_work_barrier.wait().await;
     163              :         loop {
     164            0 :             let (timeline, work) = {
     165            0 :                 let mut rng = rand::thread_rng();
     166            0 :                 let target = all_targets.choose(&mut rng).unwrap();
     167            0 :                 let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
     168            0 :                 (
     169            0 :                     target.timeline,
     170            0 :                     Work {
     171            0 :                         lsn,
     172            0 :                         gzip: rng.gen_bool(args.gzip_probability),
     173            0 :                     },
     174            0 :                 )
     175            0 :             };
     176            0 :             let sender = work_senders.get(&timeline).unwrap();
     177            0 :             // TODO: what if this blocks?
     178            0 :             sender.send(work).await.ok().unwrap();
     179              :         }
     180              :     };
     181              : 
     182            0 :     if let Some(runtime) = args.runtime {
     183            0 :         match tokio::time::timeout(runtime.into(), work_sender).await {
     184            0 :             Ok(()) => unreachable!("work sender never terminates"),
     185            0 :             Err(_timeout) => {
     186            0 :                 // this implicitly drops the work_senders, making all the clients exit
     187            0 :             }
     188              :         }
     189              :     } else {
     190            0 :         work_sender.await;
     191            0 :         unreachable!("work sender never terminates");
     192              :     }
     193              : 
     194            0 :     for t in tasks {
     195            0 :         t.await.unwrap();
     196              :     }
     197              : 
     198            0 :     let output = Output {
     199              :         total: {
     200            0 :             let mut agg_stats = request_stats::Stats::new();
     201            0 :             for stats in all_thread_local_stats.lock().unwrap().iter() {
     202            0 :                 let stats = stats.lock().unwrap();
     203            0 :                 agg_stats.add(&stats);
     204            0 :             }
     205            0 :             agg_stats.output()
     206            0 :         },
     207            0 :     };
     208            0 : 
     209            0 :     let output = serde_json::to_string_pretty(&output).unwrap();
     210            0 :     println!("{output}");
     211            0 : 
     212            0 :     anyhow::Ok(())
     213            0 : }
     214              : 
     215              : #[derive(Copy, Clone)]
     216              : struct Work {
     217              :     lsn: Option<Lsn>,
     218              :     gzip: bool,
     219              : }
     220              : 
     221              : #[instrument(skip_all)]
     222              : async fn client(
     223              :     args: &'static Args,
     224              :     timeline: TenantTimelineId,
     225              :     start_work_barrier: Arc<Barrier>,
     226              :     mut work: tokio::sync::mpsc::Receiver<Work>,
     227              :     all_work_done_barrier: Arc<Barrier>,
     228              :     live_stats: Arc<LiveStats>,
     229              : ) {
     230              :     start_work_barrier.wait().await;
     231              : 
     232              :     let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
     233              :         .await
     234              :         .unwrap();
     235              : 
     236              :     while let Some(Work { lsn, gzip }) = work.recv().await {
     237              :         let start = Instant::now();
     238              :         let copy_out_stream = client
     239              :             .basebackup(&BasebackupRequest {
     240              :                 tenant_id: timeline.tenant_id,
     241              :                 timeline_id: timeline.timeline_id,
     242              :                 lsn,
     243              :                 gzip,
     244              :             })
     245              :             .await
     246            0 :             .with_context(|| format!("start basebackup for {timeline}"))
     247              :             .unwrap();
     248              : 
     249              :         use futures::StreamExt;
     250              :         let size = Arc::new(AtomicUsize::new(0));
     251              :         copy_out_stream
     252              :             .for_each({
     253            0 :                 |r| {
     254            0 :                     let size = Arc::clone(&size);
     255            0 :                     async move {
     256            0 :                         let size = Arc::clone(&size);
     257            0 :                         size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
     258            0 :                     }
     259            0 :                 }
     260              :             })
     261              :             .await;
     262              :         info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
     263              :         let elapsed = start.elapsed();
     264              :         live_stats.inc();
     265            0 :         STATS.with(|stats| {
     266            0 :             stats.borrow().lock().unwrap().observe(elapsed).unwrap();
     267            0 :         });
     268              :     }
     269              : 
     270              :     all_work_done_barrier.wait().await;
     271              : }
        

Generated by: LCOV version 2.1-beta