LCOV - code coverage report
Current view: top level - safekeeper/src - wal_reader_stream.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 94.6 % 184 174
Test Date: 2025-03-12 00:01:28 Functions: 100.0 % 11 11

            Line data    Source code
       1              : use std::pin::Pin;
       2              : use std::task::{Context, Poll};
       3              : 
       4              : use bytes::Bytes;
       5              : use futures::stream::BoxStream;
       6              : use futures::{Stream, StreamExt};
       7              : use safekeeper_api::Term;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use crate::send_wal::EndWatch;
      11              : use crate::timeline::WalResidentTimeline;
      12              : use crate::wal_storage::WalReader;
      13              : 
      14              : #[derive(PartialEq, Eq, Debug)]
      15              : pub(crate) struct WalBytes {
      16              :     /// Raw PG WAL
      17              :     pub(crate) wal: Bytes,
      18              :     /// Start LSN of [`Self::wal`]
      19              :     #[allow(dead_code)]
      20              :     pub(crate) wal_start_lsn: Lsn,
      21              :     /// End LSN of [`Self::wal`]
      22              :     pub(crate) wal_end_lsn: Lsn,
      23              :     /// End LSN of WAL available on the safekeeper.
      24              :     ///
      25              :     /// For pagservers this will be commit LSN,
      26              :     /// while for the compute it will be the flush LSN.
      27              :     pub(crate) available_wal_end_lsn: Lsn,
      28              : }
      29              : 
      30              : struct PositionedWalReader {
      31              :     start: Lsn,
      32              :     end: Lsn,
      33              :     reader: Option<WalReader>,
      34              : }
      35              : 
      36              : /// A streaming WAL reader wrapper which can be reset while running
      37              : pub(crate) struct StreamingWalReader {
      38              :     stream: BoxStream<'static, WalOrReset>,
      39              :     start_changed_tx: tokio::sync::watch::Sender<Lsn>,
      40              : }
      41              : 
      42              : pub(crate) enum WalOrReset {
      43              :     Wal(anyhow::Result<WalBytes>),
      44              :     Reset(Lsn),
      45              : }
      46              : 
      47              : impl WalOrReset {
      48           76 :     pub(crate) fn get_wal(self) -> Option<anyhow::Result<WalBytes>> {
      49           76 :         match self {
      50           76 :             WalOrReset::Wal(wal) => Some(wal),
      51            0 :             WalOrReset::Reset(_) => None,
      52              :         }
      53           76 :     }
      54              : }
      55              : 
      56              : impl StreamingWalReader {
      57            5 :     pub(crate) fn new(
      58            5 :         tli: WalResidentTimeline,
      59            5 :         term: Option<Term>,
      60            5 :         start: Lsn,
      61            5 :         end: Lsn,
      62            5 :         end_watch: EndWatch,
      63            5 :         buffer_size: usize,
      64            5 :     ) -> Self {
      65            5 :         let (start_changed_tx, start_changed_rx) = tokio::sync::watch::channel(start);
      66            5 : 
      67            5 :         let state = WalReaderStreamState {
      68            5 :             tli,
      69            5 :             wal_reader: PositionedWalReader {
      70            5 :                 start,
      71            5 :                 end,
      72            5 :                 reader: None,
      73            5 :             },
      74            5 :             term,
      75            5 :             end_watch,
      76            5 :             buffer: vec![0; buffer_size],
      77            5 :             buffer_size,
      78            5 :         };
      79            5 : 
      80            5 :         // When a change notification is received while polling the internal
      81            5 :         // reader, stop polling the read future and service the change.
      82            5 :         let stream = futures::stream::unfold(
      83            5 :             (state, start_changed_rx),
      84           84 :             |(mut state, mut rx)| async move {
      85           84 :                 let wal_or_reset = tokio::select! {
      86           84 :                     read_res = state.read() => { WalOrReset::Wal(read_res) },
      87           84 :                     changed_res = rx.changed() => {
      88            4 :                         if changed_res.is_err() {
      89            0 :                             return None;
      90            4 :                         }
      91            4 : 
      92            4 :                         let new_start_pos = rx.borrow_and_update();
      93            4 :                         WalOrReset::Reset(*new_start_pos)
      94              :                     }
      95              :                 };
      96              : 
      97           80 :                 if let WalOrReset::Reset(lsn) = wal_or_reset {
      98            4 :                     state.wal_reader.start = lsn;
      99            4 :                     state.wal_reader.reader = None;
     100           76 :                 }
     101              : 
     102           80 :                 Some((wal_or_reset, (state, rx)))
     103          164 :             },
     104            5 :         )
     105            5 :         .boxed();
     106            5 : 
     107            5 :         Self {
     108            5 :             stream,
     109            5 :             start_changed_tx,
     110            5 :         }
     111            5 :     }
     112              : 
     113              :     /// Reset the stream to a given position.
     114            4 :     pub(crate) async fn reset(&mut self, start: Lsn) {
     115            4 :         self.start_changed_tx.send(start).unwrap();
     116            4 :         while let Some(wal_or_reset) = self.stream.next().await {
     117            4 :             match wal_or_reset {
     118            4 :                 WalOrReset::Reset(at) => {
     119            4 :                     // Stream confirmed the reset.
     120            4 :                     // There may only one ongoing reset at any given time,
     121            4 :                     // hence the assertion.
     122            4 :                     assert_eq!(at, start);
     123            4 :                     break;
     124              :                 }
     125            0 :                 WalOrReset::Wal(_) => {
     126            0 :                     // Ignore wal generated before reset was handled
     127            0 :                 }
     128              :             }
     129              :         }
     130            4 :     }
     131              : }
     132              : 
     133              : impl Stream for StreamingWalReader {
     134              :     type Item = WalOrReset;
     135              : 
     136          365 :     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     137          365 :         Pin::new(&mut self.stream).poll_next(cx)
     138          365 :     }
     139              : }
     140              : 
     141              : struct WalReaderStreamState {
     142              :     tli: WalResidentTimeline,
     143              :     wal_reader: PositionedWalReader,
     144              :     term: Option<Term>,
     145              :     end_watch: EndWatch,
     146              :     buffer: Vec<u8>,
     147              :     buffer_size: usize,
     148              : }
     149              : 
     150              : impl WalReaderStreamState {
     151           84 :     async fn read(&mut self) -> anyhow::Result<WalBytes> {
     152           83 :         // Create reader if needed
     153           83 :         if self.wal_reader.reader.is_none() {
     154            9 :             self.wal_reader.reader = Some(self.tli.get_walreader(self.wal_reader.start).await?);
     155           74 :         }
     156              : 
     157           83 :         let have_something_to_send = self.wal_reader.end > self.wal_reader.start;
     158           83 :         if !have_something_to_send {
     159            4 :             tracing::debug!(
     160            0 :                 "Waiting for wal: start={}, end={}",
     161              :                 self.wal_reader.end,
     162              :                 self.wal_reader.start
     163              :             );
     164            4 :             self.wal_reader.end = self
     165            4 :                 .end_watch
     166            4 :                 .wait_for_lsn(self.wal_reader.start, self.term)
     167            4 :                 .await?;
     168            0 :             tracing::debug!(
     169            0 :                 "Done waiting for wal: start={}, end={}",
     170              :                 self.wal_reader.end,
     171              :                 self.wal_reader.start
     172              :             );
     173           79 :         }
     174              : 
     175           79 :         assert!(
     176           79 :             self.wal_reader.end > self.wal_reader.start,
     177            0 :             "nothing to send after waiting for WAL"
     178              :         );
     179              : 
     180              :         // Calculate chunk size
     181           79 :         let mut chunk_end_pos = self.wal_reader.start + self.buffer_size as u64;
     182           79 :         if chunk_end_pos >= self.wal_reader.end {
     183            6 :             chunk_end_pos = self.wal_reader.end;
     184           73 :         } else {
     185           73 :             chunk_end_pos = chunk_end_pos
     186           73 :                 .checked_sub(chunk_end_pos.block_offset())
     187           73 :                 .unwrap();
     188           73 :         }
     189              : 
     190           79 :         let send_size = (chunk_end_pos.0 - self.wal_reader.start.0) as usize;
     191           79 :         let buffer = &mut self.buffer[..send_size];
     192              : 
     193              :         // Read WAL
     194           76 :         let send_size = {
     195           79 :             let _term_guard = if let Some(t) = self.term {
     196            0 :                 Some(self.tli.acquire_term(t).await?)
     197              :             } else {
     198           79 :                 None
     199              :             };
     200           79 :             self.wal_reader
     201           79 :                 .reader
     202           79 :                 .as_mut()
     203           79 :                 .unwrap()
     204           79 :                 .read(buffer)
     205           79 :                 .await?
     206              :         };
     207              : 
     208           76 :         let wal = Bytes::copy_from_slice(&buffer[..send_size]);
     209           76 :         let result = WalBytes {
     210           76 :             wal,
     211           76 :             wal_start_lsn: self.wal_reader.start,
     212           76 :             wal_end_lsn: self.wal_reader.start + send_size as u64,
     213           76 :             available_wal_end_lsn: self.wal_reader.end,
     214           76 :         };
     215           76 : 
     216           76 :         self.wal_reader.start += send_size as u64;
     217           76 : 
     218           76 :         Ok(result)
     219           76 :     }
     220              : }
     221              : 
     222              : #[cfg(test)]
     223              : mod tests {
     224              :     use std::str::FromStr;
     225              : 
     226              :     use futures::StreamExt;
     227              :     use postgres_ffi::MAX_SEND_SIZE;
     228              :     use utils::id::{NodeId, TenantTimelineId};
     229              :     use utils::lsn::Lsn;
     230              : 
     231              :     use crate::test_utils::Env;
     232              :     use crate::wal_reader_stream::StreamingWalReader;
     233              : 
     234              :     #[tokio::test]
     235            1 :     async fn test_streaming_wal_reader_reset() {
     236            1 :         let _ = env_logger::builder().is_test(true).try_init();
     237            1 : 
     238            1 :         const SIZE: usize = 8 * 1024;
     239            1 :         const MSG_COUNT: usize = 200;
     240            1 : 
     241            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
     242            1 :         let env = Env::new(true).unwrap();
     243            1 :         let tli = env
     244            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
     245            1 :             .await
     246            1 :             .unwrap();
     247            1 : 
     248            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
     249            1 :         let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
     250            1 :             .await
     251            1 :             .unwrap();
     252            1 :         let end_pos = end_watch.get();
     253            1 : 
     254            1 :         tracing::info!("Doing first round of reads ...");
     255            1 : 
     256            1 :         let mut streaming_wal_reader = StreamingWalReader::new(
     257            1 :             resident_tli,
     258            1 :             None,
     259            1 :             start_lsn,
     260            1 :             end_pos,
     261            1 :             end_watch,
     262            1 :             MAX_SEND_SIZE,
     263            1 :         );
     264            1 : 
     265            1 :         let mut before_reset = Vec::new();
     266           13 :         while let Some(wor) = streaming_wal_reader.next().await {
     267           13 :             let wal = wor.get_wal().unwrap().unwrap();
     268           13 :             let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
     269           13 :             before_reset.push(wal);
     270           13 : 
     271           13 :             if stop {
     272            1 :                 break;
     273           12 :             }
     274            1 :         }
     275            1 : 
     276            1 :         tracing::info!("Resetting the WAL stream ...");
     277            1 : 
     278            1 :         streaming_wal_reader.reset(start_lsn).await;
     279            1 : 
     280            1 :         tracing::info!("Doing second round of reads ...");
     281            1 : 
     282            1 :         let mut after_reset = Vec::new();
     283           13 :         while let Some(wor) = streaming_wal_reader.next().await {
     284           13 :             let wal = wor.get_wal().unwrap().unwrap();
     285           13 :             let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
     286           13 :             after_reset.push(wal);
     287           13 : 
     288           13 :             if stop {
     289            1 :                 break;
     290           12 :             }
     291            1 :         }
     292            1 : 
     293            1 :         assert_eq!(before_reset, after_reset);
     294            1 :     }
     295              : }
        

Generated by: LCOV version 2.1-beta