LCOV - differential code coverage report
Current view: top level - pageserver/pagebench/src/cmd - basebackup.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 151 0 151
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 41 0 41
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use anyhow::Context;
       2                 : use pageserver_client::page_service::BasebackupRequest;
       3                 : 
       4                 : use utils::id::TenantTimelineId;
       5                 : use utils::lsn::Lsn;
       6                 : 
       7                 : use rand::prelude::*;
       8                 : use tokio::sync::Barrier;
       9                 : use tokio::task::JoinSet;
      10                 : use tracing::{debug, info, instrument};
      11                 : 
      12                 : use std::collections::HashMap;
      13                 : use std::num::NonZeroUsize;
      14                 : use std::ops::Range;
      15                 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
      16                 : use std::sync::{Arc, Mutex};
      17                 : use std::time::Instant;
      18                 : 
      19                 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
      20                 : use crate::util::{request_stats, tokio_thread_local_stats};
      21                 : 
      22                 : /// basebackup@LatestLSN
      23 UBC           0 : #[derive(clap::Parser)]
      24                 : pub(crate) struct Args {
      25                 :     #[clap(long, default_value = "http://localhost:9898")]
      26               0 :     mgmt_api_endpoint: String,
      27                 :     #[clap(long, default_value = "localhost:64000")]
      28               0 :     page_service_host_port: String,
      29                 :     #[clap(long)]
      30                 :     pageserver_jwt: Option<String>,
      31                 :     #[clap(long, default_value = "1")]
      32               0 :     num_clients: NonZeroUsize,
      33                 :     #[clap(long, default_value = "1.0")]
      34               0 :     gzip_probability: f64,
      35                 :     #[clap(long)]
      36                 :     runtime: Option<humantime::Duration>,
      37                 :     #[clap(long)]
      38                 :     limit_to_first_n_targets: Option<usize>,
      39               0 :     targets: Option<Vec<TenantTimelineId>>,
      40                 : }
      41                 : 
      42               0 : #[derive(Debug, Default)]
      43                 : struct LiveStats {
      44                 :     completed_requests: AtomicU64,
      45                 : }
      46                 : 
      47                 : impl LiveStats {
      48               0 :     fn inc(&self) {
      49               0 :         self.completed_requests.fetch_add(1, Ordering::Relaxed);
      50               0 :     }
      51                 : }
      52                 : 
      53                 : struct Target {
      54                 :     timeline: TenantTimelineId,
      55                 :     lsn_range: Option<Range<Lsn>>,
      56                 : }
      57                 : 
      58               0 : #[derive(serde::Serialize)]
      59                 : struct Output {
      60                 :     total: request_stats::Output,
      61                 : }
      62                 : 
      63               0 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
      64                 : 
      65               0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
      66               0 :     tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
      67               0 :         main_impl(args, thread_local_stats)
      68               0 :     })
      69               0 : }
      70                 : 
      71               0 : async fn main_impl(
      72               0 :     args: Args,
      73               0 :     all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
      74               0 : ) -> anyhow::Result<()> {
      75               0 :     let args: &'static Args = Box::leak(Box::new(args));
      76               0 : 
      77               0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
      78               0 :         args.mgmt_api_endpoint.clone(),
      79               0 :         args.pageserver_jwt.as_deref(),
      80               0 :     ));
      81                 : 
      82                 :     // discover targets
      83               0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
      84               0 :         &mgmt_api_client,
      85               0 :         crate::util::cli::targets::Spec {
      86               0 :             limit_to_first_n_targets: args.limit_to_first_n_targets,
      87               0 :             targets: args.targets.clone(),
      88               0 :         },
      89               0 :     )
      90               0 :     .await?;
      91               0 :     let mut js = JoinSet::new();
      92               0 :     for timeline in &timelines {
      93               0 :         js.spawn({
      94               0 :             let timeline = *timeline;
      95                 :             // FIXME: this triggers initial logical size calculation
      96                 :             // https://github.com/neondatabase/neon/issues/6168
      97               0 :             let info = mgmt_api_client
      98               0 :                 .timeline_info(timeline.tenant_id, timeline.timeline_id)
      99               0 :                 .await
     100               0 :                 .unwrap();
     101               0 :             async move {
     102               0 :                 anyhow::Ok(Target {
     103               0 :                     timeline,
     104               0 :                     // TODO: support lsn_range != latest LSN
     105               0 :                     lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
     106               0 :                 })
     107               0 :             }
     108                 :         });
     109                 :     }
     110               0 :     let mut all_targets: Vec<Target> = Vec::new();
     111               0 :     while let Some(res) = js.join_next().await {
     112               0 :         all_targets.push(res.unwrap().unwrap());
     113               0 :     }
     114                 : 
     115               0 :     let live_stats = Arc::new(LiveStats::default());
     116               0 : 
     117               0 :     let num_client_tasks = timelines.len();
     118               0 :     let num_live_stats_dump = 1;
     119               0 :     let num_work_sender_tasks = 1;
     120               0 : 
     121               0 :     let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
     122               0 :         num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
     123               0 :     ));
     124               0 :     let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
     125               0 : 
     126               0 :     tokio::spawn({
     127               0 :         let stats = Arc::clone(&live_stats);
     128               0 :         let start_work_barrier = Arc::clone(&start_work_barrier);
     129               0 :         async move {
     130               0 :             start_work_barrier.wait().await;
     131                 :             loop {
     132               0 :                 let start = std::time::Instant::now();
     133               0 :                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     134               0 :                 let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
     135               0 :                 let elapsed = start.elapsed();
     136               0 :                 info!(
     137               0 :                     "RPS: {:.0}",
     138               0 :                     completed_requests as f64 / elapsed.as_secs_f64()
     139               0 :                 );
     140                 :             }
     141               0 :         }
     142               0 :     });
     143               0 : 
     144               0 :     let mut work_senders = HashMap::new();
     145               0 :     let mut tasks = Vec::new();
     146               0 :     for tl in &timelines {
     147               0 :         let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
     148               0 :         work_senders.insert(tl, sender);
     149               0 :         tasks.push(tokio::spawn(client(
     150               0 :             args,
     151               0 :             *tl,
     152               0 :             Arc::clone(&start_work_barrier),
     153               0 :             receiver,
     154               0 :             Arc::clone(&all_work_done_barrier),
     155               0 :             Arc::clone(&live_stats),
     156               0 :         )));
     157               0 :     }
     158                 : 
     159               0 :     let work_sender = async move {
     160               0 :         start_work_barrier.wait().await;
     161                 :         loop {
     162               0 :             let (timeline, work) = {
     163               0 :                 let mut rng = rand::thread_rng();
     164               0 :                 let target = all_targets.choose(&mut rng).unwrap();
     165               0 :                 let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
     166               0 :                 (
     167               0 :                     target.timeline,
     168               0 :                     Work {
     169               0 :                         lsn,
     170               0 :                         gzip: rng.gen_bool(args.gzip_probability),
     171               0 :                     },
     172               0 :                 )
     173               0 :             };
     174               0 :             let sender = work_senders.get(&timeline).unwrap();
     175               0 :             // TODO: what if this blocks?
     176               0 :             sender.send(work).await.ok().unwrap();
     177                 :         }
     178                 :     };
     179                 : 
     180               0 :     if let Some(runtime) = args.runtime {
     181               0 :         match tokio::time::timeout(runtime.into(), work_sender).await {
     182               0 :             Ok(()) => unreachable!("work sender never terminates"),
     183               0 :             Err(_timeout) => {
     184               0 :                 // this implicitly drops the work_senders, making all the clients exit
     185               0 :             }
     186                 :         }
     187                 :     } else {
     188               0 :         work_sender.await;
     189               0 :         unreachable!("work sender never terminates");
     190                 :     }
     191                 : 
     192               0 :     for t in tasks {
     193               0 :         t.await.unwrap();
     194                 :     }
     195                 : 
     196               0 :     let output = Output {
     197                 :         total: {
     198               0 :             let mut agg_stats = request_stats::Stats::new();
     199               0 :             for stats in all_thread_local_stats.lock().unwrap().iter() {
     200               0 :                 let stats = stats.lock().unwrap();
     201               0 :                 agg_stats.add(&stats);
     202               0 :             }
     203               0 :             agg_stats.output()
     204               0 :         },
     205               0 :     };
     206               0 : 
     207               0 :     let output = serde_json::to_string_pretty(&output).unwrap();
     208               0 :     println!("{output}");
     209               0 : 
     210               0 :     anyhow::Ok(())
     211               0 : }
     212                 : 
     213               0 : #[derive(Copy, Clone)]
     214                 : struct Work {
     215                 :     lsn: Option<Lsn>,
     216                 :     gzip: bool,
     217                 : }
     218                 : 
     219               0 : #[instrument(skip_all)]
     220                 : async fn client(
     221                 :     args: &'static Args,
     222                 :     timeline: TenantTimelineId,
     223                 :     start_work_barrier: Arc<Barrier>,
     224                 :     mut work: tokio::sync::mpsc::Receiver<Work>,
     225                 :     all_work_done_barrier: Arc<Barrier>,
     226                 :     live_stats: Arc<LiveStats>,
     227                 : ) {
     228                 :     start_work_barrier.wait().await;
     229                 : 
     230                 :     let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
     231                 :         &args.page_service_host_port,
     232                 :         args.pageserver_jwt.as_deref(),
     233                 :     ))
     234                 :     .await
     235                 :     .unwrap();
     236                 : 
     237                 :     while let Some(Work { lsn, gzip }) = work.recv().await {
     238                 :         let start = Instant::now();
     239                 :         let copy_out_stream = client
     240                 :             .basebackup(&BasebackupRequest {
     241                 :                 tenant_id: timeline.tenant_id,
     242                 :                 timeline_id: timeline.timeline_id,
     243                 :                 lsn,
     244                 :                 gzip,
     245                 :             })
     246                 :             .await
     247               0 :             .with_context(|| format!("start basebackup for {timeline}"))
     248                 :             .unwrap();
     249                 : 
     250                 :         use futures::StreamExt;
     251                 :         let size = Arc::new(AtomicUsize::new(0));
     252                 :         copy_out_stream
     253                 :             .for_each({
     254               0 :                 |r| {
     255               0 :                     let size = Arc::clone(&size);
     256               0 :                     async move {
     257               0 :                         let size = Arc::clone(&size);
     258               0 :                         size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
     259               0 :                     }
     260               0 :                 }
     261                 :             })
     262                 :             .await;
     263               0 :         debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
     264                 :         let elapsed = start.elapsed();
     265                 :         live_stats.inc();
     266               0 :         STATS.with(|stats| {
     267               0 :             stats.borrow().lock().unwrap().observe(elapsed).unwrap();
     268               0 :         });
     269                 :     }
     270                 : 
     271                 :     all_work_done_barrier.wait().await;
     272                 : }
        

Generated by: LCOV version 2.1-beta