LCOV - code coverage report
Current view: top level - pageserver/pagebench/src/cmd - basebackup.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 149 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 23 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::num::NonZeroUsize;
       3              : use std::ops::Range;
       4              : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
       5              : use std::sync::{Arc, Mutex};
       6              : use std::time::Instant;
       7              : 
       8              : use anyhow::Context;
       9              : use pageserver_api::shard::TenantShardId;
      10              : use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
      11              : use pageserver_client::page_service::BasebackupRequest;
      12              : use rand::prelude::*;
      13              : use tokio::sync::Barrier;
      14              : use tokio::task::JoinSet;
      15              : use tracing::{info, instrument};
      16              : use utils::id::TenantTimelineId;
      17              : use utils::lsn::Lsn;
      18              : 
      19              : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
      20              : use crate::util::{request_stats, tokio_thread_local_stats};
      21              : 
      22              : /// basebackup@LatestLSN
      23              : #[derive(clap::Parser)]
      24              : pub(crate) struct Args {
      25              :     #[clap(long, default_value = "http://localhost:9898")]
      26            0 :     mgmt_api_endpoint: String,
      27              :     #[clap(long, default_value = "postgres://postgres@localhost:64000")]
      28            0 :     page_service_connstring: String,
      29              :     #[clap(long)]
      30              :     pageserver_jwt: Option<String>,
      31              :     #[clap(long, default_value = "1")]
      32            0 :     num_clients: NonZeroUsize,
      33              :     #[clap(long, default_value = "1.0")]
      34            0 :     gzip_probability: f64,
      35              :     #[clap(long)]
      36              :     runtime: Option<humantime::Duration>,
      37              :     #[clap(long)]
      38              :     limit_to_first_n_targets: Option<usize>,
      39            0 :     targets: Option<Vec<TenantTimelineId>>,
      40              : }
      41              : 
      42              : #[derive(Debug, Default)]
      43              : struct LiveStats {
      44              :     completed_requests: AtomicU64,
      45              : }
      46              : 
      47              : impl LiveStats {
      48            0 :     fn inc(&self) {
      49            0 :         self.completed_requests.fetch_add(1, Ordering::Relaxed);
      50            0 :     }
      51              : }
      52              : 
      53              : struct Target {
      54              :     timeline: TenantTimelineId,
      55              :     lsn_range: Option<Range<Lsn>>,
      56              : }
      57              : 
      58              : #[derive(serde::Serialize)]
      59              : struct Output {
      60              :     total: request_stats::Output,
      61              : }
      62              : 
      63              : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
      64              : 
      65            0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
      66            0 :     tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
      67            0 :         main_impl(args, thread_local_stats)
      68            0 :     })
      69            0 : }
      70              : 
      71            0 : async fn main_impl(
      72            0 :     args: Args,
      73            0 :     all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
      74            0 : ) -> anyhow::Result<()> {
      75            0 :     let args: &'static Args = Box::leak(Box::new(args));
      76              : 
      77            0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
      78            0 :         args.mgmt_api_endpoint.clone(),
      79            0 :         args.pageserver_jwt.as_deref(),
      80            0 :         None, // TODO: support ssl_ca_file for https APIs in pagebench.
      81            0 :     )?);
      82              : 
      83              :     // discover targets
      84            0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
      85            0 :         &mgmt_api_client,
      86            0 :         crate::util::cli::targets::Spec {
      87            0 :             limit_to_first_n_targets: args.limit_to_first_n_targets,
      88            0 :             targets: args.targets.clone(),
      89            0 :         },
      90            0 :     )
      91            0 :     .await?;
      92            0 :     let mut js = JoinSet::new();
      93            0 :     for timeline in &timelines {
      94            0 :         js.spawn({
      95            0 :             let timeline = *timeline;
      96            0 :             let info = mgmt_api_client
      97            0 :                 .timeline_info(
      98            0 :                     TenantShardId::unsharded(timeline.tenant_id),
      99            0 :                     timeline.timeline_id,
     100            0 :                     ForceAwaitLogicalSize::No,
     101            0 :                 )
     102            0 :                 .await
     103            0 :                 .unwrap();
     104            0 :             async move {
     105            0 :                 anyhow::Ok(Target {
     106            0 :                     timeline,
     107            0 :                     // TODO: support lsn_range != latest LSN
     108            0 :                     lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
     109            0 :                 })
     110            0 :             }
     111            0 :         });
     112            0 :     }
     113            0 :     let mut all_targets: Vec<Target> = Vec::new();
     114            0 :     while let Some(res) = js.join_next().await {
     115            0 :         all_targets.push(res.unwrap().unwrap());
     116            0 :     }
     117              : 
     118            0 :     let live_stats = Arc::new(LiveStats::default());
     119            0 : 
     120            0 :     let num_client_tasks = timelines.len();
     121            0 :     let num_live_stats_dump = 1;
     122            0 :     let num_work_sender_tasks = 1;
     123            0 : 
     124            0 :     let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
     125            0 :         num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
     126            0 :     ));
     127            0 :     let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
     128            0 : 
     129            0 :     tokio::spawn({
     130            0 :         let stats = Arc::clone(&live_stats);
     131            0 :         let start_work_barrier = Arc::clone(&start_work_barrier);
     132            0 :         async move {
     133            0 :             start_work_barrier.wait().await;
     134              :             loop {
     135            0 :                 let start = std::time::Instant::now();
     136            0 :                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     137            0 :                 let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
     138            0 :                 let elapsed = start.elapsed();
     139            0 :                 info!(
     140            0 :                     "RPS: {:.0}",
     141            0 :                     completed_requests as f64 / elapsed.as_secs_f64()
     142              :                 );
     143              :             }
     144            0 :         }
     145            0 :     });
     146            0 : 
     147            0 :     let mut work_senders = HashMap::new();
     148            0 :     let mut tasks = Vec::new();
     149            0 :     for tl in &timelines {
     150            0 :         let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
     151            0 :         work_senders.insert(tl, sender);
     152            0 :         tasks.push(tokio::spawn(client(
     153            0 :             args,
     154            0 :             *tl,
     155            0 :             Arc::clone(&start_work_barrier),
     156            0 :             receiver,
     157            0 :             Arc::clone(&all_work_done_barrier),
     158            0 :             Arc::clone(&live_stats),
     159            0 :         )));
     160            0 :     }
     161              : 
     162            0 :     let work_sender = async move {
     163            0 :         start_work_barrier.wait().await;
     164              :         loop {
     165            0 :             let (timeline, work) = {
     166            0 :                 let mut rng = rand::thread_rng();
     167            0 :                 let target = all_targets.choose(&mut rng).unwrap();
     168            0 :                 let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
     169            0 :                 (
     170            0 :                     target.timeline,
     171            0 :                     Work {
     172            0 :                         lsn,
     173            0 :                         gzip: rng.gen_bool(args.gzip_probability),
     174            0 :                     },
     175            0 :                 )
     176            0 :             };
     177            0 :             let sender = work_senders.get(&timeline).unwrap();
     178            0 :             // TODO: what if this blocks?
     179            0 :             sender.send(work).await.ok().unwrap();
     180              :         }
     181              :     };
     182              : 
     183            0 :     if let Some(runtime) = args.runtime {
     184            0 :         match tokio::time::timeout(runtime.into(), work_sender).await {
     185            0 :             Ok(()) => unreachable!("work sender never terminates"),
     186            0 :             Err(_timeout) => {
     187            0 :                 // this implicitly drops the work_senders, making all the clients exit
     188            0 :             }
     189              :         }
     190              :     } else {
     191            0 :         work_sender.await;
     192            0 :         unreachable!("work sender never terminates");
     193              :     }
     194              : 
     195            0 :     for t in tasks {
     196            0 :         t.await.unwrap();
     197              :     }
     198              : 
     199            0 :     let output = Output {
     200              :         total: {
     201            0 :             let mut agg_stats = request_stats::Stats::new();
     202            0 :             for stats in all_thread_local_stats.lock().unwrap().iter() {
     203            0 :                 let stats = stats.lock().unwrap();
     204            0 :                 agg_stats.add(&stats);
     205            0 :             }
     206            0 :             agg_stats.output()
     207            0 :         },
     208            0 :     };
     209            0 : 
     210            0 :     let output = serde_json::to_string_pretty(&output).unwrap();
     211            0 :     println!("{output}");
     212            0 : 
     213            0 :     anyhow::Ok(())
     214            0 : }
     215              : 
     216              : #[derive(Copy, Clone)]
     217              : struct Work {
     218              :     lsn: Option<Lsn>,
     219              :     gzip: bool,
     220              : }
     221              : 
     222              : #[instrument(skip_all)]
     223              : async fn client(
     224              :     args: &'static Args,
     225              :     timeline: TenantTimelineId,
     226              :     start_work_barrier: Arc<Barrier>,
     227              :     mut work: tokio::sync::mpsc::Receiver<Work>,
     228              :     all_work_done_barrier: Arc<Barrier>,
     229              :     live_stats: Arc<LiveStats>,
     230              : ) {
     231              :     start_work_barrier.wait().await;
     232              : 
     233              :     let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
     234              :         .await
     235              :         .unwrap();
     236              : 
     237              :     while let Some(Work { lsn, gzip }) = work.recv().await {
     238              :         let start = Instant::now();
     239              :         let copy_out_stream = client
     240              :             .basebackup(&BasebackupRequest {
     241              :                 tenant_id: timeline.tenant_id,
     242              :                 timeline_id: timeline.timeline_id,
     243              :                 lsn,
     244              :                 gzip,
     245              :             })
     246              :             .await
     247            0 :             .with_context(|| format!("start basebackup for {timeline}"))
     248              :             .unwrap();
     249              : 
     250              :         use futures::StreamExt;
     251              :         let size = Arc::new(AtomicUsize::new(0));
     252              :         copy_out_stream
     253              :             .for_each({
     254            0 :                 |r| {
     255            0 :                     let size = Arc::clone(&size);
     256            0 :                     async move {
     257            0 :                         let size = Arc::clone(&size);
     258            0 :                         size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
     259            0 :                     }
     260            0 :                 }
     261              :             })
     262              :             .await;
     263              :         info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
     264              :         let elapsed = start.elapsed();
     265              :         live_stats.inc();
     266            0 :         STATS.with(|stats| {
     267            0 :             stats.borrow().lock().unwrap().observe(elapsed).unwrap();
     268            0 :         });
     269              :     }
     270              : 
     271              :     all_work_done_barrier.wait().await;
     272              : }
        

Generated by: LCOV version 2.1-beta