LCOV - code coverage report
Current view: top level - pageserver/pagebench/src/cmd - basebackup.rs (source / functions) Coverage Total Hit
Test: fabb29a6339542ee130cd1d32b534fafdc0be240.info Lines: 0.0 % 149 0
Test Date: 2024-06-25 13:20:00 Functions: 0.0 % 34 0

            Line data    Source code
       1              : use anyhow::Context;
       2              : use pageserver_api::shard::TenantShardId;
       3              : use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
       4              : use pageserver_client::page_service::BasebackupRequest;
       5              : 
       6              : use utils::id::TenantTimelineId;
       7              : use utils::lsn::Lsn;
       8              : 
       9              : use rand::prelude::*;
      10              : use tokio::sync::Barrier;
      11              : use tokio::task::JoinSet;
      12              : use tracing::{info, instrument};
      13              : 
      14              : use std::collections::HashMap;
      15              : use std::num::NonZeroUsize;
      16              : use std::ops::Range;
      17              : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
      18              : use std::sync::{Arc, Mutex};
      19              : use std::time::Instant;
      20              : 
      21              : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
      22              : use crate::util::{request_stats, tokio_thread_local_stats};
      23              : 
      24              : /// basebackup@LatestLSN
      25            0 : #[derive(clap::Parser)]
      26              : pub(crate) struct Args {
      27              :     #[clap(long, default_value = "http://localhost:9898")]
      28            0 :     mgmt_api_endpoint: String,
      29              :     #[clap(long, default_value = "postgres://postgres@localhost:64000")]
      30            0 :     page_service_connstring: String,
      31              :     #[clap(long)]
      32              :     pageserver_jwt: Option<String>,
      33              :     #[clap(long, default_value = "1")]
      34            0 :     num_clients: NonZeroUsize,
      35              :     #[clap(long, default_value = "1.0")]
      36            0 :     gzip_probability: f64,
      37              :     #[clap(long)]
      38              :     runtime: Option<humantime::Duration>,
      39              :     #[clap(long)]
      40              :     limit_to_first_n_targets: Option<usize>,
      41            0 :     targets: Option<Vec<TenantTimelineId>>,
      42              : }
      43              : 
      44              : #[derive(Debug, Default)]
      45              : struct LiveStats {
      46              :     completed_requests: AtomicU64,
      47              : }
      48              : 
      49              : impl LiveStats {
      50            0 :     fn inc(&self) {
      51            0 :         self.completed_requests.fetch_add(1, Ordering::Relaxed);
      52            0 :     }
      53              : }
      54              : 
      55              : struct Target {
      56              :     timeline: TenantTimelineId,
      57              :     lsn_range: Option<Range<Lsn>>,
      58              : }
      59              : 
      60              : #[derive(serde::Serialize)]
      61              : struct Output {
      62              :     total: request_stats::Output,
      63              : }
      64              : 
      65              : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
      66              : 
      67            0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
      68            0 :     tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
      69            0 :         main_impl(args, thread_local_stats)
      70            0 :     })
      71            0 : }
      72              : 
      73            0 : async fn main_impl(
      74            0 :     args: Args,
      75            0 :     all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
      76            0 : ) -> anyhow::Result<()> {
      77            0 :     let args: &'static Args = Box::leak(Box::new(args));
      78            0 : 
      79            0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
      80            0 :         args.mgmt_api_endpoint.clone(),
      81            0 :         args.pageserver_jwt.as_deref(),
      82            0 :     ));
      83              : 
      84              :     // discover targets
      85            0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
      86            0 :         &mgmt_api_client,
      87            0 :         crate::util::cli::targets::Spec {
      88            0 :             limit_to_first_n_targets: args.limit_to_first_n_targets,
      89            0 :             targets: args.targets.clone(),
      90            0 :         },
      91            0 :     )
      92            0 :     .await?;
      93            0 :     let mut js = JoinSet::new();
      94            0 :     for timeline in &timelines {
      95            0 :         js.spawn({
      96            0 :             let timeline = *timeline;
      97            0 :             let info = mgmt_api_client
      98            0 :                 .timeline_info(
      99            0 :                     TenantShardId::unsharded(timeline.tenant_id),
     100            0 :                     timeline.timeline_id,
     101            0 :                     ForceAwaitLogicalSize::No,
     102            0 :                 )
     103            0 :                 .await
     104            0 :                 .unwrap();
     105            0 :             async move {
     106            0 :                 anyhow::Ok(Target {
     107            0 :                     timeline,
     108            0 :                     // TODO: support lsn_range != latest LSN
     109            0 :                     lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
     110            0 :                 })
     111            0 :             }
     112              :         });
     113              :     }
     114            0 :     let mut all_targets: Vec<Target> = Vec::new();
     115            0 :     while let Some(res) = js.join_next().await {
     116            0 :         all_targets.push(res.unwrap().unwrap());
     117            0 :     }
     118              : 
     119            0 :     let live_stats = Arc::new(LiveStats::default());
     120            0 : 
     121            0 :     let num_client_tasks = timelines.len();
     122            0 :     let num_live_stats_dump = 1;
     123            0 :     let num_work_sender_tasks = 1;
     124            0 : 
     125            0 :     let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
     126            0 :         num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
     127            0 :     ));
     128            0 :     let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
     129            0 : 
     130            0 :     tokio::spawn({
     131            0 :         let stats = Arc::clone(&live_stats);
     132            0 :         let start_work_barrier = Arc::clone(&start_work_barrier);
     133            0 :         async move {
     134            0 :             start_work_barrier.wait().await;
     135              :             loop {
     136            0 :                 let start = std::time::Instant::now();
     137            0 :                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     138            0 :                 let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
     139            0 :                 let elapsed = start.elapsed();
     140            0 :                 info!(
     141            0 :                     "RPS: {:.0}",
     142            0 :                     completed_requests as f64 / elapsed.as_secs_f64()
     143              :                 );
     144              :             }
     145            0 :         }
     146            0 :     });
     147            0 : 
     148            0 :     let mut work_senders = HashMap::new();
     149            0 :     let mut tasks = Vec::new();
     150            0 :     for tl in &timelines {
     151            0 :         let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
     152            0 :         work_senders.insert(tl, sender);
     153            0 :         tasks.push(tokio::spawn(client(
     154            0 :             args,
     155            0 :             *tl,
     156            0 :             Arc::clone(&start_work_barrier),
     157            0 :             receiver,
     158            0 :             Arc::clone(&all_work_done_barrier),
     159            0 :             Arc::clone(&live_stats),
     160            0 :         )));
     161            0 :     }
     162              : 
     163            0 :     let work_sender = async move {
     164            0 :         start_work_barrier.wait().await;
     165              :         loop {
     166            0 :             let (timeline, work) = {
     167            0 :                 let mut rng = rand::thread_rng();
     168            0 :                 let target = all_targets.choose(&mut rng).unwrap();
     169            0 :                 let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
     170            0 :                 (
     171            0 :                     target.timeline,
     172            0 :                     Work {
     173            0 :                         lsn,
     174            0 :                         gzip: rng.gen_bool(args.gzip_probability),
     175            0 :                     },
     176            0 :                 )
     177            0 :             };
     178            0 :             let sender = work_senders.get(&timeline).unwrap();
     179            0 :             // TODO: what if this blocks?
     180            0 :             sender.send(work).await.ok().unwrap();
     181              :         }
     182              :     };
     183              : 
     184            0 :     if let Some(runtime) = args.runtime {
     185            0 :         match tokio::time::timeout(runtime.into(), work_sender).await {
     186            0 :             Ok(()) => unreachable!("work sender never terminates"),
     187            0 :             Err(_timeout) => {
     188            0 :                 // this implicitly drops the work_senders, making all the clients exit
     189            0 :             }
     190              :         }
     191              :     } else {
     192            0 :         work_sender.await;
     193            0 :         unreachable!("work sender never terminates");
     194              :     }
     195              : 
     196            0 :     for t in tasks {
     197            0 :         t.await.unwrap();
     198              :     }
     199              : 
     200            0 :     let output = Output {
     201              :         total: {
     202            0 :             let mut agg_stats = request_stats::Stats::new();
     203            0 :             for stats in all_thread_local_stats.lock().unwrap().iter() {
     204            0 :                 let stats = stats.lock().unwrap();
     205            0 :                 agg_stats.add(&stats);
     206            0 :             }
     207            0 :             agg_stats.output()
     208            0 :         },
     209            0 :     };
     210            0 : 
     211            0 :     let output = serde_json::to_string_pretty(&output).unwrap();
     212            0 :     println!("{output}");
     213            0 : 
     214            0 :     anyhow::Ok(())
     215            0 : }
     216              : 
     217              : #[derive(Copy, Clone)]
     218              : struct Work {
     219              :     lsn: Option<Lsn>,
     220              :     gzip: bool,
     221              : }
     222              : 
     223            0 : #[instrument(skip_all)]
     224              : async fn client(
     225              :     args: &'static Args,
     226              :     timeline: TenantTimelineId,
     227              :     start_work_barrier: Arc<Barrier>,
     228              :     mut work: tokio::sync::mpsc::Receiver<Work>,
     229              :     all_work_done_barrier: Arc<Barrier>,
     230              :     live_stats: Arc<LiveStats>,
     231              : ) {
     232              :     start_work_barrier.wait().await;
     233              : 
     234              :     let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
     235              :         .await
     236              :         .unwrap();
     237              : 
     238              :     while let Some(Work { lsn, gzip }) = work.recv().await {
     239              :         let start = Instant::now();
     240              :         let copy_out_stream = client
     241              :             .basebackup(&BasebackupRequest {
     242              :                 tenant_id: timeline.tenant_id,
     243              :                 timeline_id: timeline.timeline_id,
     244              :                 lsn,
     245              :                 gzip,
     246              :             })
     247              :             .await
     248            0 :             .with_context(|| format!("start basebackup for {timeline}"))
     249              :             .unwrap();
     250              : 
     251              :         use futures::StreamExt;
     252              :         let size = Arc::new(AtomicUsize::new(0));
     253              :         copy_out_stream
     254              :             .for_each({
     255            0 :                 |r| {
     256            0 :                     let size = Arc::clone(&size);
     257            0 :                     async move {
     258            0 :                         let size = Arc::clone(&size);
     259            0 :                         size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
     260            0 :                     }
     261            0 :                 }
     262              :             })
     263              :             .await;
     264              :         info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
     265              :         let elapsed = start.elapsed();
     266              :         live_stats.inc();
     267            0 :         STATS.with(|stats| {
     268            0 :             stats.borrow().lock().unwrap().observe(elapsed).unwrap();
     269            0 :         });
     270              :     }
     271              : 
     272              :     all_work_done_barrier.wait().await;
     273              : }
        

Generated by: LCOV version 2.1-beta