LCOV - code coverage report
Current view: top level - safekeeper/src - wal_reader_stream.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 0.0 % 98 0
Test Date: 2025-01-07 20:58:07 Functions: 0.0 % 3 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use async_stream::try_stream;
       4              : use bytes::Bytes;
       5              : use futures::Stream;
       6              : use postgres_backend::CopyStreamHandlerEnd;
       7              : use safekeeper_api::Term;
       8              : use std::time::Duration;
       9              : use tokio::time::timeout;
      10              : use utils::lsn::Lsn;
      11              : 
      12              : use crate::{
      13              :     send_wal::{EndWatch, WalSenderGuard},
      14              :     timeline::WalResidentTimeline,
      15              : };
      16              : 
      17              : pub(crate) struct WalReaderStreamBuilder {
      18              :     pub(crate) tli: WalResidentTimeline,
      19              :     pub(crate) start_pos: Lsn,
      20              :     pub(crate) end_pos: Lsn,
      21              :     pub(crate) term: Option<Term>,
      22              :     pub(crate) end_watch: EndWatch,
      23              :     pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
      24              : }
      25              : 
      26              : impl WalReaderStreamBuilder {
      27            0 :     pub(crate) fn start_pos(&self) -> Lsn {
      28            0 :         self.start_pos
      29            0 :     }
      30              : }
      31              : 
      32              : pub(crate) struct WalBytes {
      33              :     /// Raw PG WAL
      34              :     pub(crate) wal: Bytes,
      35              :     /// Start LSN of [`Self::wal`]
      36              :     #[allow(dead_code)]
      37              :     pub(crate) wal_start_lsn: Lsn,
      38              :     /// End LSN of [`Self::wal`]
      39              :     pub(crate) wal_end_lsn: Lsn,
      40              :     /// End LSN of WAL available on the safekeeper.
      41              :     ///
      42              :     /// For pagservers this will be commit LSN,
      43              :     /// while for the compute it will be the flush LSN.
      44              :     pub(crate) available_wal_end_lsn: Lsn,
      45              : }
      46              : 
      47              : impl WalReaderStreamBuilder {
      48              :     /// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
      49              :     /// The stream terminates when the receiver (pageserver) is fully caught up
      50              :     /// and there's no active computes.
      51            0 :     pub(crate) async fn build(
      52            0 :         self,
      53            0 :         buffer_size: usize,
      54            0 :     ) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
      55            0 :         // TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
      56            0 :         // We can make the raw WAL sender use this stream too and remove the duplication.
      57            0 :         let Self {
      58            0 :             tli,
      59            0 :             mut start_pos,
      60            0 :             mut end_pos,
      61            0 :             term,
      62            0 :             mut end_watch,
      63            0 :             wal_sender_guard,
      64            0 :         } = self;
      65            0 :         let mut wal_reader = tli.get_walreader(start_pos).await?;
      66            0 :         let mut buffer = vec![0; buffer_size];
      67              : 
      68              :         const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
      69              : 
      70            0 :         Ok(try_stream! {
      71            0 :             loop {
      72            0 :                 let have_something_to_send = end_pos > start_pos;
      73            0 : 
      74            0 :                 if !have_something_to_send {
      75            0 :                     // wait for lsn
      76            0 :                     let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
      77            0 :                     match res {
      78            0 :                         Ok(ok) => {
      79            0 :                             end_pos = ok?;
      80            0 :                         },
      81            0 :                         Err(_) => {
      82            0 :                             if let EndWatch::Commit(_) = end_watch {
      83            0 :                                 if let Some(remote_consistent_lsn) = wal_sender_guard
      84            0 :                                     .walsenders()
      85            0 :                                     .get_ws_remote_consistent_lsn(wal_sender_guard.id())
      86            0 :                                 {
      87            0 :                                     if tli.should_walsender_stop(remote_consistent_lsn).await {
      88            0 :                                         // Stop streaming if the receivers are caught up and
      89            0 :                                         // there's no active compute. This causes the loop in
      90            0 :                                         // [`crate::send_interpreted_wal::InterpretedWalSender::run`]
      91            0 :                                         // to exit and terminate the WAL stream.
      92            0 :                                         return;
      93            0 :                                     }
      94            0 :                                 }
      95            0 :                             }
      96            0 : 
      97            0 :                             continue;
      98            0 :                         }
      99            0 :                     }
     100            0 :                 }
     101            0 : 
     102            0 : 
     103            0 :                 assert!(
     104            0 :                     end_pos > start_pos,
     105            0 :                     "nothing to send after waiting for WAL"
     106            0 :                 );
     107            0 : 
     108            0 :                 // try to send as much as available, capped by the buffer size
     109            0 :                 let mut chunk_end_pos = start_pos + buffer_size as u64;
     110            0 :                 // if we went behind available WAL, back off
     111            0 :                 if chunk_end_pos >= end_pos {
     112            0 :                     chunk_end_pos = end_pos;
     113            0 :                 } else {
     114            0 :                     // If sending not up to end pos, round down to page boundary to
     115            0 :                     // avoid breaking WAL record not at page boundary, as protocol
     116            0 :                     // demands. See walsender.c (XLogSendPhysical).
     117            0 :                     chunk_end_pos = chunk_end_pos
     118            0 :                         .checked_sub(chunk_end_pos.block_offset())
     119            0 :                         .unwrap();
     120            0 :                 }
     121            0 :                 let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
     122            0 :                 let buffer = &mut buffer[..send_size];
     123            0 :                 let send_size: usize;
     124            0 :                 {
     125            0 :                     // If uncommitted part is being pulled, check that the term is
     126            0 :                     // still the expected one.
     127            0 :                     let _term_guard = if let Some(t) = term {
     128            0 :                         Some(tli.acquire_term(t).await?)
     129            0 :                     } else {
     130            0 :                         None
     131            0 :                     };
     132            0 :                     // Read WAL into buffer. send_size can be additionally capped to
     133            0 :                     // segment boundary here.
     134            0 :                     send_size = wal_reader.read(buffer).await?
     135            0 :                 };
     136            0 :                 let wal = Bytes::copy_from_slice(&buffer[..send_size]);
     137            0 : 
     138            0 :                 yield WalBytes {
     139            0 :                     wal,
     140            0 :                     wal_start_lsn: start_pos,
     141            0 :                     wal_end_lsn: start_pos + send_size as u64,
     142            0 :                     available_wal_end_lsn: end_pos
     143            0 :                 };
     144            0 : 
     145            0 :                 start_pos += send_size as u64;
     146            0 :             }
     147            0 :         })
     148            0 :     }
     149              : }
        

Generated by: LCOV version 2.1-beta