LCOV - code coverage report
Current view: top level - safekeeper/src - wal_reader_stream.rs (source / functions) Coverage Total Hit
Test: 8b13a09a5c233d98abd4a0d3e59157e7db16d6fd.info Lines: 0.0 % 98 0
Test Date: 2024-11-21 10:53:51 Functions: 0.0 % 3 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use async_stream::try_stream;
       4              : use bytes::Bytes;
       5              : use futures::Stream;
       6              : use postgres_backend::CopyStreamHandlerEnd;
       7              : use std::time::Duration;
       8              : use tokio::time::timeout;
       9              : use utils::lsn::Lsn;
      10              : 
      11              : use crate::{
      12              :     safekeeper::Term,
      13              :     send_wal::{EndWatch, WalSenderGuard},
      14              :     timeline::WalResidentTimeline,
      15              : };
      16              : 
      17              : pub(crate) struct WalReaderStreamBuilder {
      18              :     pub(crate) tli: WalResidentTimeline,
      19              :     pub(crate) start_pos: Lsn,
      20              :     pub(crate) end_pos: Lsn,
      21              :     pub(crate) term: Option<Term>,
      22              :     pub(crate) end_watch: EndWatch,
      23              :     pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
      24              : }
      25              : 
      26              : impl WalReaderStreamBuilder {
      27            0 :     pub(crate) fn start_pos(&self) -> Lsn {
      28            0 :         self.start_pos
      29            0 :     }
      30              : }
      31              : 
      32              : pub(crate) struct WalBytes {
      33              :     /// Raw PG WAL
      34              :     pub(crate) wal: Bytes,
      35              :     /// Start LSN of [`Self::wal`]
      36              :     pub(crate) wal_start_lsn: Lsn,
      37              :     /// End LSN of [`Self::wal`]
      38              :     pub(crate) wal_end_lsn: Lsn,
      39              :     /// End LSN of WAL available on the safekeeper
      40              :     pub(crate) commit_lsn: Lsn,
      41              : }
      42              : 
      43              : impl WalReaderStreamBuilder {
      44              :     /// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
      45              :     /// The stream terminates when the receiver (pageserver) is fully caught up
      46              :     /// and there's no active computes.
      47            0 :     pub(crate) async fn build(
      48            0 :         self,
      49            0 :         buffer_size: usize,
      50            0 :     ) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
      51            0 :         // TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
      52            0 :         // We can make the raw WAL sender use this stream too and remove the duplication.
      53            0 :         let Self {
      54            0 :             tli,
      55            0 :             mut start_pos,
      56            0 :             mut end_pos,
      57            0 :             term,
      58            0 :             mut end_watch,
      59            0 :             wal_sender_guard,
      60            0 :         } = self;
      61            0 :         let mut wal_reader = tli.get_walreader(start_pos).await?;
      62            0 :         let mut buffer = vec![0; buffer_size];
      63              : 
      64              :         const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
      65              : 
      66            0 :         Ok(try_stream! {
      67            0 :             loop {
      68            0 :                 let have_something_to_send = end_pos > start_pos;
      69            0 : 
      70            0 :                 if !have_something_to_send {
      71            0 :                     // wait for lsn
      72            0 :                     let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
      73            0 :                     match res {
      74            0 :                         Ok(ok) => {
      75            0 :                             end_pos = ok?;
      76            0 :                         },
      77            0 :                         Err(_) => {
      78            0 :                             if let EndWatch::Commit(_) = end_watch {
      79            0 :                                 if let Some(remote_consistent_lsn) = wal_sender_guard
      80            0 :                                     .walsenders()
      81            0 :                                     .get_ws_remote_consistent_lsn(wal_sender_guard.id())
      82            0 :                                 {
      83            0 :                                     if tli.should_walsender_stop(remote_consistent_lsn).await {
      84            0 :                                         // Terminate if there is nothing more to send.
      85            0 :                                         // Note that "ending streaming" part of the string is used by
      86            0 :                                         // pageserver to identify WalReceiverError::SuccessfulCompletion,
      87            0 :                                         // do not change this string without updating pageserver.
      88            0 :                                         return;
      89            0 :                                     }
      90            0 :                                 }
      91            0 :                             }
      92            0 : 
      93            0 :                             continue;
      94            0 :                         }
      95            0 :                     }
      96            0 :                 }
      97            0 : 
      98            0 : 
      99            0 :                 assert!(
     100            0 :                     end_pos > start_pos,
     101            0 :                     "nothing to send after waiting for WAL"
     102            0 :                 );
     103            0 : 
     104            0 :                 // try to send as much as available, capped by the buffer size
     105            0 :                 let mut chunk_end_pos = start_pos + buffer_size as u64;
     106            0 :                 // if we went behind available WAL, back off
     107            0 :                 if chunk_end_pos >= end_pos {
     108            0 :                     chunk_end_pos = end_pos;
     109            0 :                 } else {
     110            0 :                     // If sending not up to end pos, round down to page boundary to
     111            0 :                     // avoid breaking WAL record not at page boundary, as protocol
     112            0 :                     // demands. See walsender.c (XLogSendPhysical).
     113            0 :                     chunk_end_pos = chunk_end_pos
     114            0 :                         .checked_sub(chunk_end_pos.block_offset())
     115            0 :                         .unwrap();
     116            0 :                 }
     117            0 :                 let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
     118            0 :                 let buffer = &mut buffer[..send_size];
     119            0 :                 let send_size: usize;
     120            0 :                 {
     121            0 :                     // If uncommitted part is being pulled, check that the term is
     122            0 :                     // still the expected one.
     123            0 :                     let _term_guard = if let Some(t) = term {
     124            0 :                         Some(tli.acquire_term(t).await?)
     125            0 :                     } else {
     126            0 :                         None
     127            0 :                     };
     128            0 :                     // Read WAL into buffer. send_size can be additionally capped to
     129            0 :                     // segment boundary here.
     130            0 :                     send_size = wal_reader.read(buffer).await?
     131            0 :                 };
     132            0 :                 let wal = Bytes::copy_from_slice(&buffer[..send_size]);
     133            0 : 
     134            0 :                 yield WalBytes {
     135            0 :                     wal,
     136            0 :                     wal_start_lsn: start_pos,
     137            0 :                     wal_end_lsn: start_pos + send_size as u64,
     138            0 :                     commit_lsn: end_pos
     139            0 :                 };
     140            0 : 
     141            0 :                 start_pos += send_size as u64;
     142            0 :             }
     143            0 :         })
     144            0 :     }
     145              : }
        

Generated by: LCOV version 2.1-beta