LCOV - code coverage report
Current view: top level - safekeeper/src - wal_reader_stream.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 93.6 % 157 147
Test Date: 2025-07-16 12:29:03 Functions: 100.0 % 11 11

            Line data    Source code
       1              : use std::pin::Pin;
       2              : use std::task::{Context, Poll};
       3              : 
       4              : use crate::send_wal::EndWatch;
       5              : use crate::timeline::WalResidentTimeline;
       6              : use crate::wal_storage::WalReader;
       7              : use bytes::Bytes;
       8              : use futures::stream::BoxStream;
       9              : use futures::{Stream, StreamExt};
      10              : use safekeeper_api::Term;
      11              : use utils::id::TenantTimelineId;
      12              : use utils::lsn::Lsn;
      13              : 
      14              : #[derive(PartialEq, Eq, Debug)]
      15              : pub(crate) struct WalBytes {
      16              :     /// Raw PG WAL
      17              :     pub(crate) wal: Bytes,
      18              :     /// Start LSN of [`Self::wal`]
      19              :     #[allow(dead_code)]
      20              :     pub(crate) wal_start_lsn: Lsn,
      21              :     /// End LSN of [`Self::wal`]
      22              :     pub(crate) wal_end_lsn: Lsn,
      23              :     /// End LSN of WAL available on the safekeeper.
      24              :     ///
      25              :     /// For pagservers this will be commit LSN,
      26              :     /// while for the compute it will be the flush LSN.
      27              :     pub(crate) available_wal_end_lsn: Lsn,
      28              : }
      29              : 
      30              : struct PositionedWalReader {
      31              :     start: Lsn,
      32              :     end: Lsn,
      33              :     reader: Option<WalReader>,
      34              : }
      35              : 
      36              : /// A streaming WAL reader wrapper which can be reset while running
      37              : pub(crate) struct StreamingWalReader {
      38              :     stream: BoxStream<'static, WalOrReset>,
      39              :     start_changed_tx: tokio::sync::watch::Sender<Lsn>,
      40              :     // HADRON: Added TenantTimelineId for instrumentation purposes.
      41              :     pub(crate) ttid: TenantTimelineId,
      42              : }
      43              : 
      44              : pub(crate) enum WalOrReset {
      45              :     Wal(anyhow::Result<WalBytes>),
      46              :     Reset(Lsn),
      47              : }
      48              : 
      49              : impl WalOrReset {
      50           76 :     pub(crate) fn get_wal(self) -> Option<anyhow::Result<WalBytes>> {
      51           76 :         match self {
      52           76 :             WalOrReset::Wal(wal) => Some(wal),
      53            0 :             WalOrReset::Reset(_) => None,
      54              :         }
      55           76 :     }
      56              : }
      57              : 
      58              : impl StreamingWalReader {
      59            5 :     pub(crate) fn new(
      60            5 :         tli: WalResidentTimeline,
      61            5 :         term: Option<Term>,
      62            5 :         start: Lsn,
      63            5 :         end: Lsn,
      64            5 :         end_watch: EndWatch,
      65            5 :         buffer_size: usize,
      66            5 :     ) -> Self {
      67            5 :         let (start_changed_tx, start_changed_rx) = tokio::sync::watch::channel(start);
      68            5 :         let ttid = tli.ttid;
      69              : 
      70            5 :         let state = WalReaderStreamState {
      71            5 :             tli,
      72            5 :             wal_reader: PositionedWalReader {
      73            5 :                 start,
      74            5 :                 end,
      75            5 :                 reader: None,
      76            5 :             },
      77            5 :             term,
      78            5 :             end_watch,
      79            5 :             buffer: vec![0; buffer_size],
      80            5 :             buffer_size,
      81            5 :         };
      82              : 
      83              :         // When a change notification is received while polling the internal
      84              :         // reader, stop polling the read future and service the change.
      85            5 :         let stream = futures::stream::unfold(
      86            5 :             (state, start_changed_rx),
      87           84 :             |(mut state, mut rx)| async move {
      88           84 :                 let wal_or_reset = tokio::select! {
      89           84 :                     read_res = state.read() => { WalOrReset::Wal(read_res) },
      90           84 :                     changed_res = rx.changed() => {
      91            4 :                         if changed_res.is_err() {
      92            0 :                             return None;
      93            4 :                         }
      94              : 
      95            4 :                         let new_start_pos = rx.borrow_and_update();
      96            4 :                         WalOrReset::Reset(*new_start_pos)
      97              :                     }
      98              :                 };
      99              : 
     100           80 :                 if let WalOrReset::Reset(lsn) = wal_or_reset {
     101            4 :                     state.wal_reader.start = lsn;
     102            4 :                     state.wal_reader.reader = None;
     103           76 :                 }
     104              : 
     105           80 :                 Some((wal_or_reset, (state, rx)))
     106          164 :             },
     107              :         )
     108            5 :         .boxed();
     109              : 
     110            5 :         Self {
     111            5 :             stream,
     112            5 :             start_changed_tx,
     113            5 :             ttid,
     114            5 :         }
     115            5 :     }
     116              : 
     117              :     /// Reset the stream to a given position.
     118            4 :     pub(crate) async fn reset(&mut self, start: Lsn) {
     119            4 :         self.start_changed_tx.send(start).unwrap();
     120            4 :         while let Some(wal_or_reset) = self.stream.next().await {
     121            4 :             match wal_or_reset {
     122            4 :                 WalOrReset::Reset(at) => {
     123              :                     // Stream confirmed the reset.
     124              :                     // There may only one ongoing reset at any given time,
     125              :                     // hence the assertion.
     126            4 :                     assert_eq!(at, start);
     127            4 :                     break;
     128              :                 }
     129            0 :                 WalOrReset::Wal(_) => {
     130            0 :                     // Ignore wal generated before reset was handled
     131            0 :                 }
     132              :             }
     133              :         }
     134            4 :     }
     135              : }
     136              : 
     137              : impl Stream for StreamingWalReader {
     138              :     type Item = WalOrReset;
     139              : 
     140         9430 :     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     141         9430 :         Pin::new(&mut self.stream).poll_next(cx)
     142         9430 :     }
     143              : }
     144              : 
     145              : struct WalReaderStreamState {
     146              :     tli: WalResidentTimeline,
     147              :     wal_reader: PositionedWalReader,
     148              :     term: Option<Term>,
     149              :     end_watch: EndWatch,
     150              :     buffer: Vec<u8>,
     151              :     buffer_size: usize,
     152              : }
     153              : 
     154              : impl WalReaderStreamState {
     155           84 :     async fn read(&mut self) -> anyhow::Result<WalBytes> {
     156              :         // Create reader if needed
     157           84 :         if self.wal_reader.reader.is_none() {
     158            9 :             self.wal_reader.reader = Some(self.tli.get_walreader(self.wal_reader.start).await?);
     159           75 :         }
     160              : 
     161           84 :         let have_something_to_send = self.wal_reader.end > self.wal_reader.start;
     162           84 :         if !have_something_to_send {
     163            5 :             tracing::debug!(
     164            0 :                 "Waiting for wal: start={}, end={}",
     165              :                 self.wal_reader.end,
     166              :                 self.wal_reader.start
     167              :             );
     168            5 :             self.wal_reader.end = self
     169            5 :                 .end_watch
     170            5 :                 .wait_for_lsn(self.wal_reader.start, self.term)
     171            5 :                 .await?;
     172            0 :             tracing::debug!(
     173            0 :                 "Done waiting for wal: start={}, end={}",
     174              :                 self.wal_reader.end,
     175              :                 self.wal_reader.start
     176              :             );
     177           79 :         }
     178              : 
     179           79 :         assert!(
     180           79 :             self.wal_reader.end > self.wal_reader.start,
     181            0 :             "nothing to send after waiting for WAL"
     182              :         );
     183              : 
     184              :         // Calculate chunk size
     185           79 :         let mut chunk_end_pos = self.wal_reader.start + self.buffer_size as u64;
     186           79 :         if chunk_end_pos >= self.wal_reader.end {
     187            6 :             chunk_end_pos = self.wal_reader.end;
     188           73 :         } else {
     189           73 :             chunk_end_pos = chunk_end_pos
     190           73 :                 .checked_sub(chunk_end_pos.block_offset())
     191           73 :                 .unwrap();
     192           73 :         }
     193              : 
     194           79 :         let send_size = (chunk_end_pos.0 - self.wal_reader.start.0) as usize;
     195           79 :         let buffer = &mut self.buffer[..send_size];
     196              : 
     197              :         // Read WAL
     198           76 :         let send_size = {
     199           79 :             let _term_guard = if let Some(t) = self.term {
     200            0 :                 Some(self.tli.acquire_term(t).await?)
     201              :             } else {
     202           79 :                 None
     203              :             };
     204           79 :             self.wal_reader
     205           79 :                 .reader
     206           79 :                 .as_mut()
     207           79 :                 .unwrap()
     208           79 :                 .read(buffer)
     209           79 :                 .await?
     210              :         };
     211              : 
     212           76 :         let wal = Bytes::copy_from_slice(&buffer[..send_size]);
     213           76 :         let result = WalBytes {
     214           76 :             wal,
     215           76 :             wal_start_lsn: self.wal_reader.start,
     216           76 :             wal_end_lsn: self.wal_reader.start + send_size as u64,
     217           76 :             available_wal_end_lsn: self.wal_reader.end,
     218           76 :         };
     219              : 
     220           76 :         self.wal_reader.start += send_size as u64;
     221              : 
     222           76 :         Ok(result)
     223           76 :     }
     224              : }
     225              : 
     226              : #[cfg(test)]
     227              : mod tests {
     228              :     use std::str::FromStr;
     229              : 
     230              :     use futures::StreamExt;
     231              :     use postgres_ffi::MAX_SEND_SIZE;
     232              :     use utils::id::{NodeId, TenantTimelineId};
     233              :     use utils::lsn::Lsn;
     234              : 
     235              :     use crate::test_utils::Env;
     236              :     use crate::wal_reader_stream::StreamingWalReader;
     237              : 
     238              :     #[tokio::test]
     239            1 :     async fn test_streaming_wal_reader_reset() {
     240            1 :         let _ = env_logger::builder().is_test(true).try_init();
     241              : 
     242              :         const SIZE: usize = 8 * 1024;
     243              :         const MSG_COUNT: usize = 200;
     244              : 
     245            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
     246            1 :         let env = Env::new(true).unwrap();
     247            1 :         let tli = env
     248            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
     249            1 :             .await
     250            1 :             .unwrap();
     251              : 
     252            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
     253            1 :         let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
     254            1 :             .await
     255            1 :             .unwrap();
     256            1 :         let end_pos = end_watch.get();
     257              : 
     258            1 :         tracing::info!("Doing first round of reads ...");
     259              : 
     260            1 :         let mut streaming_wal_reader = StreamingWalReader::new(
     261            1 :             resident_tli,
     262            1 :             None,
     263            1 :             start_lsn,
     264            1 :             end_pos,
     265            1 :             end_watch,
     266              :             MAX_SEND_SIZE,
     267              :         );
     268              : 
     269            1 :         let mut before_reset = Vec::new();
     270           13 :         while let Some(wor) = streaming_wal_reader.next().await {
     271           13 :             let wal = wor.get_wal().unwrap().unwrap();
     272           13 :             let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
     273           13 :             before_reset.push(wal);
     274              : 
     275           13 :             if stop {
     276            1 :                 break;
     277           12 :             }
     278              :         }
     279              : 
     280            1 :         tracing::info!("Resetting the WAL stream ...");
     281              : 
     282            1 :         streaming_wal_reader.reset(start_lsn).await;
     283              : 
     284            1 :         tracing::info!("Doing second round of reads ...");
     285              : 
     286            1 :         let mut after_reset = Vec::new();
     287           13 :         while let Some(wor) = streaming_wal_reader.next().await {
     288           13 :             let wal = wor.get_wal().unwrap().unwrap();
     289           13 :             let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
     290           13 :             after_reset.push(wal);
     291            1 : 
     292           13 :             if stop {
     293            1 :                 break;
     294           12 :             }
     295            1 :         }
     296            1 : 
     297            1 :         assert_eq!(before_reset, after_reset);
     298            1 :     }
     299              : }
        

Generated by: LCOV version 2.1-beta