LCOV - code coverage report
Current view: top level - safekeeper/src - send_interpreted_wal.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 72.6 % 529 384
Test Date: 2025-01-30 15:18:43 Functions: 58.8 % 34 20

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::fmt::Display;
       3              : use std::sync::Arc;
       4              : use std::time::Duration;
       5              : 
       6              : use anyhow::{anyhow, Context};
       7              : use futures::future::Either;
       8              : use futures::StreamExt;
       9              : use pageserver_api::shard::ShardIdentity;
      10              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
      11              : use postgres_ffi::waldecoder::WalDecodeError;
      12              : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
      13              : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
      14              : use tokio::io::{AsyncRead, AsyncWrite};
      15              : use tokio::sync::mpsc::error::SendError;
      16              : use tokio::task::JoinHandle;
      17              : use tokio::time::MissedTickBehavior;
      18              : use tracing::{info_span, Instrument};
      19              : use utils::lsn::Lsn;
      20              : use utils::postgres_client::Compression;
      21              : use utils::postgres_client::InterpretedFormat;
      22              : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
      23              : use wal_decoder::wire_format::ToWireFormat;
      24              : 
      25              : use crate::metrics::WAL_READERS;
      26              : use crate::send_wal::{EndWatchView, WalSenderGuard};
      27              : use crate::timeline::WalResidentTimeline;
      28              : use crate::wal_reader_stream::{StreamingWalReader, WalBytes};
      29              : 
      30              : /// Identifier used to differentiate between senders of the same
      31              : /// shard.
      32              : ///
      33              : /// In the steady state there's only one, but two pageservers may
      34              : /// temporarily have the same shard attached and attempt to ingest
      35              : /// WAL for it. See also [`ShardSenderId`].
      36              : #[derive(Hash, Eq, PartialEq, Copy, Clone)]
      37              : struct SenderId(u8);
      38              : 
      39              : impl SenderId {
      40            3 :     fn first() -> Self {
      41            3 :         SenderId(0)
      42            3 :     }
      43              : 
      44            3 :     fn next(&self) -> Self {
      45            3 :         SenderId(self.0.checked_add(1).expect("few senders"))
      46            3 :     }
      47              : }
      48              : 
      49              : #[derive(Hash, Eq, PartialEq)]
      50              : struct ShardSenderId {
      51              :     shard: ShardIdentity,
      52              :     sender_id: SenderId,
      53              : }
      54              : 
      55              : impl Display for ShardSenderId {
      56            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      57            0 :         write!(f, "{}{}", self.sender_id.0, self.shard.shard_slug())
      58            0 :     }
      59              : }
      60              : 
      61              : impl ShardSenderId {
      62           78 :     fn new(shard: ShardIdentity, sender_id: SenderId) -> Self {
      63           78 :         ShardSenderId { shard, sender_id }
      64           78 :     }
      65              : 
      66            0 :     fn shard(&self) -> ShardIdentity {
      67            0 :         self.shard
      68            0 :     }
      69              : }
      70              : 
      71              : /// Shard-aware fan-out interpreted record reader.
      72              : /// Reads WAL from disk, decodes it, intepretets it, and sends
      73              : /// it to any [`InterpretedWalSender`] connected to it.
      74              : /// Each [`InterpretedWalSender`] corresponds to one shard
      75              : /// and gets interpreted records concerning that shard only.
      76              : pub(crate) struct InterpretedWalReader {
      77              :     wal_stream: StreamingWalReader,
      78              :     shard_senders: HashMap<ShardIdentity, smallvec::SmallVec<[ShardSenderState; 1]>>,
      79              :     shard_notification_rx: Option<tokio::sync::mpsc::UnboundedReceiver<AttachShardNotification>>,
      80              :     state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
      81              :     pg_version: u32,
      82              : }
      83              : 
      84              : /// A handle for [`InterpretedWalReader`] which allows for interacting with it
      85              : /// when it runs as a separate tokio task.
      86              : #[derive(Debug)]
      87              : pub(crate) struct InterpretedWalReaderHandle {
      88              :     join_handle: JoinHandle<Result<(), InterpretedWalReaderError>>,
      89              :     state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
      90              :     shard_notification_tx: tokio::sync::mpsc::UnboundedSender<AttachShardNotification>,
      91              : }
      92              : 
      93              : struct ShardSenderState {
      94              :     sender_id: SenderId,
      95              :     tx: tokio::sync::mpsc::Sender<Batch>,
      96              :     next_record_lsn: Lsn,
      97              : }
      98              : 
      99              : /// State of [`InterpretedWalReader`] visible outside of the task running it.
     100              : #[derive(Debug)]
     101              : pub(crate) enum InterpretedWalReaderState {
     102              :     Running { current_position: Lsn },
     103              :     Done,
     104              : }
     105              : 
     106              : pub(crate) struct Batch {
     107              :     wal_end_lsn: Lsn,
     108              :     available_wal_end_lsn: Lsn,
     109              :     records: InterpretedWalRecords,
     110              : }
     111              : 
     112              : #[derive(thiserror::Error, Debug)]
     113              : pub enum InterpretedWalReaderError {
     114              :     /// Handler initiates the end of streaming.
     115              :     #[error("decode error: {0}")]
     116              :     Decode(#[from] WalDecodeError),
     117              :     #[error("read or interpret error: {0}")]
     118              :     ReadOrInterpret(#[from] anyhow::Error),
     119              :     #[error("wal stream closed")]
     120              :     WalStreamClosed,
     121              : }
     122              : 
     123              : impl InterpretedWalReaderState {
     124            8 :     fn current_position(&self) -> Option<Lsn> {
     125            8 :         match self {
     126              :             InterpretedWalReaderState::Running {
     127            6 :                 current_position, ..
     128            6 :             } => Some(*current_position),
     129            2 :             InterpretedWalReaderState::Done => None,
     130              :         }
     131            8 :     }
     132              : }
     133              : 
     134              : pub(crate) struct AttachShardNotification {
     135              :     shard_id: ShardIdentity,
     136              :     sender: tokio::sync::mpsc::Sender<Batch>,
     137              :     start_pos: Lsn,
     138              : }
     139              : 
     140              : impl InterpretedWalReader {
     141              :     /// Spawn the reader in a separate tokio task and return a handle
     142            2 :     pub(crate) fn spawn(
     143            2 :         wal_stream: StreamingWalReader,
     144            2 :         start_pos: Lsn,
     145            2 :         tx: tokio::sync::mpsc::Sender<Batch>,
     146            2 :         shard: ShardIdentity,
     147            2 :         pg_version: u32,
     148            2 :         appname: &Option<String>,
     149            2 :     ) -> InterpretedWalReaderHandle {
     150            2 :         let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
     151            2 :             current_position: start_pos,
     152            2 :         }));
     153            2 : 
     154            2 :         let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel();
     155              : 
     156            2 :         let reader = InterpretedWalReader {
     157            2 :             wal_stream,
     158            2 :             shard_senders: HashMap::from([(
     159            2 :                 shard,
     160            2 :                 smallvec::smallvec![ShardSenderState {
     161            0 :                     sender_id: SenderId::first(),
     162            0 :                     tx,
     163            0 :                     next_record_lsn: start_pos,
     164            0 :                 }],
     165              :             )]),
     166            2 :             shard_notification_rx: Some(shard_notification_rx),
     167            2 :             state: state.clone(),
     168            2 :             pg_version,
     169            2 :         };
     170            2 : 
     171            2 :         let metric = WAL_READERS
     172            2 :             .get_metric_with_label_values(&["task", appname.as_deref().unwrap_or("safekeeper")])
     173            2 :             .unwrap();
     174              : 
     175            2 :         let join_handle = tokio::task::spawn(
     176            2 :             async move {
     177            2 :                 metric.inc();
     178            2 :                 scopeguard::defer! {
     179            2 :                     metric.dec();
     180            2 :                 }
     181              : 
     182            2 :                 let res = reader.run_impl(start_pos).await;
     183            0 :                 if let Err(ref err) = res {
     184            0 :                     tracing::error!("Task finished with error: {err}");
     185            0 :                 }
     186            0 :                 res
     187            0 :             }
     188            2 :             .instrument(info_span!("interpreted wal reader")),
     189              :         );
     190              : 
     191            2 :         InterpretedWalReaderHandle {
     192            2 :             join_handle,
     193            2 :             state,
     194            2 :             shard_notification_tx,
     195            2 :         }
     196            2 :     }
     197              : 
     198              :     /// Construct the reader without spawning anything
     199              :     /// Callers should drive the future returned by [`Self::run`].
     200            0 :     pub(crate) fn new(
     201            0 :         wal_stream: StreamingWalReader,
     202            0 :         start_pos: Lsn,
     203            0 :         tx: tokio::sync::mpsc::Sender<Batch>,
     204            0 :         shard: ShardIdentity,
     205            0 :         pg_version: u32,
     206            0 :     ) -> InterpretedWalReader {
     207            0 :         let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
     208            0 :             current_position: start_pos,
     209            0 :         }));
     210            0 : 
     211            0 :         InterpretedWalReader {
     212            0 :             wal_stream,
     213            0 :             shard_senders: HashMap::from([(
     214            0 :                 shard,
     215            0 :                 smallvec::smallvec![ShardSenderState {
     216            0 :                     sender_id: SenderId::first(),
     217            0 :                     tx,
     218            0 :                     next_record_lsn: start_pos,
     219            0 :                 }],
     220              :             )]),
     221            0 :             shard_notification_rx: None,
     222            0 :             state: state.clone(),
     223            0 :             pg_version,
     224            0 :         }
     225            0 :     }
     226              : 
     227              :     /// Entry point for future (polling) based wal reader.
     228            0 :     pub(crate) async fn run(
     229            0 :         self,
     230            0 :         start_pos: Lsn,
     231            0 :         appname: &Option<String>,
     232            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     233            0 :         let metric = WAL_READERS
     234            0 :             .get_metric_with_label_values(&["future", appname.as_deref().unwrap_or("safekeeper")])
     235            0 :             .unwrap();
     236            0 : 
     237            0 :         metric.inc();
     238            0 :         scopeguard::defer! {
     239            0 :             metric.dec();
     240            0 :         }
     241              : 
     242            0 :         let res = self.run_impl(start_pos).await;
     243            0 :         if let Err(err) = res {
     244            0 :             tracing::error!("Interpreted wal reader encountered error: {err}");
     245              :         } else {
     246            0 :             tracing::info!("Interpreted wal reader exiting");
     247              :         }
     248              : 
     249            0 :         Err(CopyStreamHandlerEnd::Other(anyhow!(
     250            0 :             "interpreted wal reader finished"
     251            0 :         )))
     252            0 :     }
     253              : 
     254              :     /// Send interpreted WAL to one or more [`InterpretedWalSender`]s
     255              :     /// Stops when an error is encountered or when the [`InterpretedWalReaderHandle`]
     256              :     /// goes out of scope.
     257            2 :     async fn run_impl(mut self, start_pos: Lsn) -> Result<(), InterpretedWalReaderError> {
     258            2 :         let defer_state = self.state.clone();
     259            2 :         scopeguard::defer! {
     260            2 :             *defer_state.write().unwrap() = InterpretedWalReaderState::Done;
     261            2 :         }
     262            2 : 
     263            2 :         let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
     264              : 
     265              :         loop {
     266           45 :             tokio::select! {
     267              :                 // Main branch for reading WAL and forwarding it
     268           45 :                 wal_or_reset = self.wal_stream.next() => {
     269           39 :                     let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
     270              :                     let WalBytes {
     271           39 :                         wal,
     272           39 :                         wal_start_lsn: _,
     273           39 :                         wal_end_lsn,
     274           39 :                         available_wal_end_lsn,
     275           39 :                     } = match wal {
     276           39 :                         Some(some) => some.map_err(InterpretedWalReaderError::ReadOrInterpret)?,
     277              :                         None => {
     278              :                             // [`StreamingWalReader::next`] is an endless stream of WAL.
     279              :                             // It shouldn't ever finish unless it panicked or became internally
     280              :                             // inconsistent.
     281            0 :                             return Result::Err(InterpretedWalReaderError::WalStreamClosed);
     282              :                         }
     283              :                     };
     284              : 
     285           39 :                     wal_decoder.feed_bytes(&wal);
     286           39 : 
     287           39 :                     // Deserialize and interpret WAL records from this batch of WAL.
     288           39 :                     // Interpreted records for each shard are collected separately.
     289           39 :                     let shard_ids = self.shard_senders.keys().copied().collect::<Vec<_>>();
     290           39 :                     let mut records_by_sender: HashMap<ShardSenderId, Vec<InterpretedWalRecord>> = HashMap::new();
     291           39 :                     let mut max_next_record_lsn = None;
     292          636 :                     while let Some((next_record_lsn, recdata)) = wal_decoder.poll_decode()?
     293              :                     {
     294          597 :                         assert!(next_record_lsn.is_aligned());
     295          597 :                         max_next_record_lsn = Some(next_record_lsn);
     296              : 
     297          597 :                         let interpreted = InterpretedWalRecord::from_bytes_filtered(
     298          597 :                             recdata,
     299          597 :                             &shard_ids,
     300          597 :                             next_record_lsn,
     301          597 :                             self.pg_version,
     302          597 :                         )
     303          597 :                         .with_context(|| "Failed to interpret WAL")?;
     304              : 
     305         1393 :                         for (shard, record) in interpreted {
     306          796 :                             if record.is_empty() {
     307          796 :                                 continue;
     308            0 :                             }
     309            0 : 
     310            0 :                             let mut states_iter = self.shard_senders
     311            0 :                                 .get(&shard)
     312            0 :                                 .expect("keys collected above")
     313            0 :                                 .iter()
     314            0 :                                 .filter(|state| record.next_record_lsn > state.next_record_lsn)
     315            0 :                                 .peekable();
     316            0 :                             while let Some(state) = states_iter.next() {
     317            0 :                                 let shard_sender_id = ShardSenderId::new(shard, state.sender_id);
     318            0 : 
     319            0 :                                 // The most commont case is one sender per shard. Peek and break to avoid the
     320            0 :                                 // clone in that situation.
     321            0 :                                 if states_iter.peek().is_none() {
     322            0 :                                     records_by_sender.entry(shard_sender_id).or_default().push(record);
     323            0 :                                     break;
     324            0 :                                 } else {
     325            0 :                                     records_by_sender.entry(shard_sender_id).or_default().push(record.clone());
     326            0 :                                 }
     327              :                             }
     328              :                         }
     329              :                     }
     330              : 
     331           39 :                     let max_next_record_lsn = match max_next_record_lsn {
     332           39 :                         Some(lsn) => lsn,
     333            0 :                         None => { continue; }
     334              :                     };
     335              : 
     336              :                     // Update the current position such that new receivers can decide
     337              :                     // whether to attach to us or spawn a new WAL reader.
     338           39 :                     match &mut *self.state.write().unwrap() {
     339           39 :                         InterpretedWalReaderState::Running { current_position, .. } => {
     340           39 :                             *current_position = max_next_record_lsn;
     341           39 :                         },
     342              :                         InterpretedWalReaderState::Done => {
     343            0 :                             unreachable!()
     344              :                         }
     345              :                     }
     346              : 
     347              :                     // Send interpreted records downstream. Anything that has already been seen
     348              :                     // by a shard is filtered out.
     349           39 :                     let mut shard_senders_to_remove = Vec::new();
     350           91 :                     for (shard, states) in &mut self.shard_senders {
     351          143 :                         for state in states {
     352           91 :                             if max_next_record_lsn <= state.next_record_lsn {
     353           13 :                                 continue;
     354           78 :                             }
     355           78 : 
     356           78 :                             let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
     357           78 :                             let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
     358           78 : 
     359           78 :                             let batch = InterpretedWalRecords {
     360           78 :                                 records,
     361           78 :                                 next_record_lsn: Some(max_next_record_lsn),
     362           78 :                             };
     363              : 
     364           78 :                             let res = state.tx.send(Batch {
     365           78 :                                 wal_end_lsn,
     366           78 :                                 available_wal_end_lsn,
     367           78 :                                 records: batch,
     368           78 :                             }).await;
     369              : 
     370           78 :                             if res.is_err() {
     371            0 :                                 shard_senders_to_remove.push(shard_sender_id);
     372           78 :                             } else {
     373           78 :                                 state.next_record_lsn = max_next_record_lsn;
     374           78 :                             }
     375              :                         }
     376              :                     }
     377              : 
     378              :                     // Clean up any shard senders that have dropped out.
     379              :                     // This is inefficient, but such events are rare (connection to PS termination)
     380              :                     // and the number of subscriptions on the same shards very small (only one
     381              :                     // for the steady state).
     382           39 :                     for to_remove in shard_senders_to_remove {
     383            0 :                         let shard_senders = self.shard_senders.get_mut(&to_remove.shard()).expect("saw it above");
     384            0 :                         if let Some(idx) = shard_senders.iter().position(|s| s.sender_id == to_remove.sender_id) {
     385            0 :                             shard_senders.remove(idx);
     386            0 :                             tracing::info!("Removed shard sender {}", to_remove);
     387            0 :                         }
     388              : 
     389            0 :                         if shard_senders.is_empty() {
     390            0 :                             self.shard_senders.remove(&to_remove.shard());
     391            0 :                         }
     392              :                     }
     393              :                 },
     394              :                 // Listen for new shards that want to attach to this reader.
     395              :                 // If the reader is not running as a task, then this is not supported
     396              :                 // (see the pending branch below).
     397           45 :                 notification = match self.shard_notification_rx.as_mut() {
     398           45 :                         Some(rx) => Either::Left(rx.recv()),
     399            0 :                         None => Either::Right(std::future::pending())
     400              :                     } => {
     401            4 :                     if let Some(n) = notification {
     402            4 :                         let AttachShardNotification { shard_id, sender, start_pos } = n;
     403            4 : 
     404            4 :                         // Update internal and external state, then reset the WAL stream
     405            4 :                         // if required.
     406            4 :                         let senders = self.shard_senders.entry(shard_id).or_default();
     407            4 :                         let new_sender_id = match senders.last() {
     408            3 :                             Some(sender) => sender.sender_id.next(),
     409            1 :                             None => SenderId::first()
     410              :                         };
     411              : 
     412            4 :                         senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
     413            4 :                         let current_pos = self.state.read().unwrap().current_position().unwrap();
     414            4 :                         if start_pos < current_pos {
     415            1 :                             self.wal_stream.reset(start_pos).await;
     416            1 :                             wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
     417            3 :                         }
     418              : 
     419            4 :                         tracing::info!(
     420            0 :                             "Added shard sender {} with start_pos={} current_pos={}",
     421            0 :                             ShardSenderId::new(shard_id, new_sender_id), start_pos, current_pos
     422              :                         );
     423            0 :                     }
     424              :                 }
     425              :             }
     426              :         }
     427            0 :     }
     428              : }
     429              : 
     430              : impl InterpretedWalReaderHandle {
     431              :     /// Fan-out the reader by attaching a new shard to it
     432            4 :     pub(crate) fn fanout(
     433            4 :         &self,
     434            4 :         shard_id: ShardIdentity,
     435            4 :         sender: tokio::sync::mpsc::Sender<Batch>,
     436            4 :         start_pos: Lsn,
     437            4 :     ) -> Result<(), SendError<AttachShardNotification>> {
     438            4 :         self.shard_notification_tx.send(AttachShardNotification {
     439            4 :             shard_id,
     440            4 :             sender,
     441            4 :             start_pos,
     442            4 :         })
     443            4 :     }
     444              : 
     445              :     /// Get the current WAL position of the reader
     446            4 :     pub(crate) fn current_position(&self) -> Option<Lsn> {
     447            4 :         self.state.read().unwrap().current_position()
     448            4 :     }
     449              : 
     450            4 :     pub(crate) fn abort(&self) {
     451            4 :         self.join_handle.abort()
     452            4 :     }
     453              : }
     454              : 
     455              : impl Drop for InterpretedWalReaderHandle {
     456            2 :     fn drop(&mut self) {
     457            2 :         tracing::info!("Aborting interpreted wal reader");
     458            2 :         self.abort()
     459            2 :     }
     460              : }
     461              : 
     462              : pub(crate) struct InterpretedWalSender<'a, IO> {
     463              :     pub(crate) format: InterpretedFormat,
     464              :     pub(crate) compression: Option<Compression>,
     465              :     pub(crate) appname: Option<String>,
     466              : 
     467              :     pub(crate) tli: WalResidentTimeline,
     468              :     pub(crate) start_lsn: Lsn,
     469              : 
     470              :     pub(crate) pgb: &'a mut PostgresBackend<IO>,
     471              :     pub(crate) end_watch_view: EndWatchView,
     472              :     pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
     473              :     pub(crate) rx: tokio::sync::mpsc::Receiver<Batch>,
     474              : }
     475              : 
     476              : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
     477              :     /// Send interpreted WAL records over the network.
     478              :     /// Also manages keep-alives if nothing was sent for a while.
     479            0 :     pub(crate) async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
     480            0 :         let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
     481            0 :         keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
     482            0 :         keepalive_ticker.reset();
     483            0 : 
     484            0 :         let mut wal_position = self.start_lsn;
     485              : 
     486              :         loop {
     487            0 :             tokio::select! {
     488            0 :                 batch = self.rx.recv() => {
     489            0 :                     let batch = match batch {
     490            0 :                         Some(b) => b,
     491              :                         None => {
     492            0 :                             return Result::Err(
     493            0 :                                 CopyStreamHandlerEnd::Other(anyhow!("Interpreted WAL reader exited early"))
     494            0 :                             );
     495              :                         }
     496              :                     };
     497              : 
     498            0 :                     wal_position = batch.wal_end_lsn;
     499              : 
     500            0 :                     let buf = batch
     501            0 :                         .records
     502            0 :                         .to_wire(self.format, self.compression)
     503            0 :                         .await
     504            0 :                         .with_context(|| "Failed to serialize interpreted WAL")
     505            0 :                         .map_err(CopyStreamHandlerEnd::from)?;
     506              : 
     507              :                     // Reset the keep alive ticker since we are sending something
     508              :                     // over the wire now.
     509            0 :                     keepalive_ticker.reset();
     510            0 : 
     511            0 :                     self.pgb
     512            0 :                         .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
     513            0 :                             streaming_lsn: batch.wal_end_lsn.0,
     514            0 :                             commit_lsn: batch.available_wal_end_lsn.0,
     515            0 :                             data: &buf,
     516            0 :                         })).await?;
     517              :                 }
     518              :                 // Send a periodic keep alive when the connection has been idle for a while.
     519              :                 // Since we've been idle, also check if we can stop streaming.
     520            0 :                 _ = keepalive_ticker.tick() => {
     521            0 :                     if let Some(remote_consistent_lsn) = self.wal_sender_guard
     522            0 :                         .walsenders()
     523            0 :                         .get_ws_remote_consistent_lsn(self.wal_sender_guard.id())
     524              :                     {
     525            0 :                         if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     526              :                             // Stop streaming if the receivers are caught up and
     527              :                             // there's no active compute. This causes the loop in
     528              :                             // [`crate::send_interpreted_wal::InterpretedWalSender::run`]
     529              :                             // to exit and terminate the WAL stream.
     530            0 :                             break;
     531            0 :                         }
     532            0 :                     }
     533              : 
     534            0 :                     self.pgb
     535            0 :                         .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     536            0 :                             wal_end: self.end_watch_view.get().0,
     537            0 :                             timestamp: get_current_timestamp(),
     538            0 :                             request_reply: true,
     539            0 :                         }))
     540            0 :                         .await?;
     541              :                 },
     542              :             }
     543              :         }
     544              : 
     545            0 :         Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     546            0 :             "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     547            0 :             self.appname, wal_position,
     548            0 :         )))
     549            0 :     }
     550              : }
     551              : #[cfg(test)]
     552              : mod tests {
     553              :     use std::{collections::HashMap, str::FromStr, time::Duration};
     554              : 
     555              :     use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
     556              :     use postgres_ffi::MAX_SEND_SIZE;
     557              :     use tokio::sync::mpsc::error::TryRecvError;
     558              :     use utils::{
     559              :         id::{NodeId, TenantTimelineId},
     560              :         lsn::Lsn,
     561              :         shard::{ShardCount, ShardNumber},
     562              :     };
     563              : 
     564              :     use crate::{
     565              :         send_interpreted_wal::{Batch, InterpretedWalReader},
     566              :         test_utils::Env,
     567              :         wal_reader_stream::StreamingWalReader,
     568              :     };
     569              : 
     570              :     #[tokio::test]
     571            1 :     async fn test_interpreted_wal_reader_fanout() {
     572            1 :         let _ = env_logger::builder().is_test(true).try_init();
     573            1 : 
     574            1 :         const SIZE: usize = 8 * 1024;
     575            1 :         const MSG_COUNT: usize = 200;
     576            1 :         const PG_VERSION: u32 = 17;
     577            1 :         const SHARD_COUNT: u8 = 2;
     578            1 : 
     579            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
     580            1 :         let env = Env::new(true).unwrap();
     581            1 :         let tli = env
     582            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
     583            1 :             .await
     584            1 :             .unwrap();
     585            1 : 
     586            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
     587            1 :         let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
     588            1 :             .await
     589            1 :             .unwrap();
     590            1 :         let end_pos = end_watch.get();
     591            1 : 
     592            1 :         tracing::info!("Doing first round of reads ...");
     593            1 : 
     594            1 :         let streaming_wal_reader = StreamingWalReader::new(
     595            1 :             resident_tli,
     596            1 :             None,
     597            1 :             start_lsn,
     598            1 :             end_pos,
     599            1 :             end_watch,
     600            1 :             MAX_SEND_SIZE,
     601            1 :         );
     602            1 : 
     603            1 :         let shard_0 = ShardIdentity::new(
     604            1 :             ShardNumber(0),
     605            1 :             ShardCount(SHARD_COUNT),
     606            1 :             ShardStripeSize::default(),
     607            1 :         )
     608            1 :         .unwrap();
     609            1 : 
     610            1 :         let shard_1 = ShardIdentity::new(
     611            1 :             ShardNumber(1),
     612            1 :             ShardCount(SHARD_COUNT),
     613            1 :             ShardStripeSize::default(),
     614            1 :         )
     615            1 :         .unwrap();
     616            1 : 
     617            1 :         let mut shards = HashMap::new();
     618            1 : 
     619            3 :         for shard_number in 0..SHARD_COUNT {
     620            2 :             let shard_id = ShardIdentity::new(
     621            2 :                 ShardNumber(shard_number),
     622            2 :                 ShardCount(SHARD_COUNT),
     623            2 :                 ShardStripeSize::default(),
     624            2 :             )
     625            2 :             .unwrap();
     626            2 :             let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
     627            2 :             shards.insert(shard_id, (Some(tx), Some(rx)));
     628            2 :         }
     629            1 : 
     630            1 :         let shard_0_tx = shards.get_mut(&shard_0).unwrap().0.take().unwrap();
     631            1 :         let mut shard_0_rx = shards.get_mut(&shard_0).unwrap().1.take().unwrap();
     632            1 : 
     633            1 :         let handle = InterpretedWalReader::spawn(
     634            1 :             streaming_wal_reader,
     635            1 :             start_lsn,
     636            1 :             shard_0_tx,
     637            1 :             shard_0,
     638            1 :             PG_VERSION,
     639            1 :             &Some("pageserver".to_string()),
     640            1 :         );
     641            1 : 
     642            1 :         tracing::info!("Reading all WAL with only shard 0 attached ...");
     643            1 : 
     644            1 :         let mut shard_0_interpreted_records = Vec::new();
     645           13 :         while let Some(batch) = shard_0_rx.recv().await {
     646           13 :             shard_0_interpreted_records.push(batch.records);
     647           13 :             if batch.wal_end_lsn == batch.available_wal_end_lsn {
     648            1 :                 break;
     649           12 :             }
     650            1 :         }
     651            1 : 
     652            1 :         let shard_1_tx = shards.get_mut(&shard_1).unwrap().0.take().unwrap();
     653            1 :         let mut shard_1_rx = shards.get_mut(&shard_1).unwrap().1.take().unwrap();
     654            1 : 
     655            1 :         tracing::info!("Attaching shard 1 to the reader at start of WAL");
     656            1 :         handle.fanout(shard_1, shard_1_tx, start_lsn).unwrap();
     657            1 : 
     658            1 :         tracing::info!("Reading all WAL with shard 0 and shard 1 attached ...");
     659            1 : 
     660            1 :         let mut shard_1_interpreted_records = Vec::new();
     661           13 :         while let Some(batch) = shard_1_rx.recv().await {
     662           13 :             shard_1_interpreted_records.push(batch.records);
     663           13 :             if batch.wal_end_lsn == batch.available_wal_end_lsn {
     664            1 :                 break;
     665           12 :             }
     666            1 :         }
     667            1 : 
     668            1 :         // This test uses logical messages. Those only go to shard 0. Check that the
     669            1 :         // filtering worked and shard 1 did not get any.
     670            1 :         assert!(shard_1_interpreted_records
     671            1 :             .iter()
     672           13 :             .all(|recs| recs.records.is_empty()));
     673            1 : 
     674            1 :         // Shard 0 should not receive anything more since the reader is
     675            1 :         // going through wal that it has already processed.
     676            1 :         let res = shard_0_rx.try_recv();
     677            1 :         if let Ok(ref ok) = res {
     678            1 :             tracing::error!(
     679            1 :                 "Shard 0 received batch: wal_end_lsn={} available_wal_end_lsn={}",
     680            1 :                 ok.wal_end_lsn,
     681            1 :                 ok.available_wal_end_lsn
     682            1 :             );
     683            1 :         }
     684            1 :         assert!(matches!(res, Err(TryRecvError::Empty)));
     685            1 : 
     686            1 :         // Check that the next records lsns received by the two shards match up.
     687            1 :         let shard_0_next_lsns = shard_0_interpreted_records
     688            1 :             .iter()
     689           13 :             .map(|recs| recs.next_record_lsn)
     690            1 :             .collect::<Vec<_>>();
     691            1 :         let shard_1_next_lsns = shard_1_interpreted_records
     692            1 :             .iter()
     693           13 :             .map(|recs| recs.next_record_lsn)
     694            1 :             .collect::<Vec<_>>();
     695            1 :         assert_eq!(shard_0_next_lsns, shard_1_next_lsns);
     696            1 : 
     697            1 :         handle.abort();
     698            1 :         let mut done = false;
     699            2 :         for _ in 0..5 {
     700            2 :             if handle.current_position().is_none() {
     701            1 :                 done = true;
     702            1 :                 break;
     703            1 :             }
     704            1 :             tokio::time::sleep(Duration::from_millis(1)).await;
     705            1 :         }
     706            1 : 
     707            1 :         assert!(done);
     708            1 :     }
     709              : 
     710              :     #[tokio::test]
     711            1 :     async fn test_interpreted_wal_reader_same_shard_fanout() {
     712            1 :         let _ = env_logger::builder().is_test(true).try_init();
     713            1 : 
     714            1 :         const SIZE: usize = 8 * 1024;
     715            1 :         const MSG_COUNT: usize = 200;
     716            1 :         const PG_VERSION: u32 = 17;
     717            1 :         const SHARD_COUNT: u8 = 2;
     718            1 :         const ATTACHED_SHARDS: u8 = 4;
     719            1 : 
     720            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
     721            1 :         let env = Env::new(true).unwrap();
     722            1 :         let tli = env
     723            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
     724            1 :             .await
     725            1 :             .unwrap();
     726            1 : 
     727            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
     728            1 :         let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
     729            1 :             .await
     730            1 :             .unwrap();
     731            1 :         let end_pos = end_watch.get();
     732            1 : 
     733            1 :         let streaming_wal_reader = StreamingWalReader::new(
     734            1 :             resident_tli,
     735            1 :             None,
     736            1 :             start_lsn,
     737            1 :             end_pos,
     738            1 :             end_watch,
     739            1 :             MAX_SEND_SIZE,
     740            1 :         );
     741            1 : 
     742            1 :         let shard_0 = ShardIdentity::new(
     743            1 :             ShardNumber(0),
     744            1 :             ShardCount(SHARD_COUNT),
     745            1 :             ShardStripeSize::default(),
     746            1 :         )
     747            1 :         .unwrap();
     748            1 : 
     749            1 :         let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
     750            1 :         let mut batch_receivers = vec![rx];
     751            1 : 
     752            1 :         let handle = InterpretedWalReader::spawn(
     753            1 :             streaming_wal_reader,
     754            1 :             start_lsn,
     755            1 :             tx,
     756            1 :             shard_0,
     757            1 :             PG_VERSION,
     758            1 :             &Some("pageserver".to_string()),
     759            1 :         );
     760            1 : 
     761            3 :         for _ in 0..(ATTACHED_SHARDS - 1) {
     762            3 :             let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
     763            3 :             handle.fanout(shard_0, tx, start_lsn).unwrap();
     764            3 :             batch_receivers.push(rx);
     765            3 :         }
     766            1 : 
     767            1 :         loop {
     768           13 :             let batch = batch_receivers.first_mut().unwrap().recv().await.unwrap();
     769           39 :             for rx in batch_receivers.iter_mut().skip(1) {
     770           39 :                 let other_batch = rx.recv().await.unwrap();
     771           39 : 
     772           39 :                 assert_eq!(batch.wal_end_lsn, other_batch.wal_end_lsn);
     773           39 :                 assert_eq!(
     774           39 :                     batch.available_wal_end_lsn,
     775           39 :                     other_batch.available_wal_end_lsn
     776           39 :                 );
     777            1 :             }
     778            1 : 
     779           13 :             if batch.wal_end_lsn == batch.available_wal_end_lsn {
     780            1 :                 break;
     781           12 :             }
     782            1 :         }
     783            1 : 
     784            1 :         handle.abort();
     785            1 :         let mut done = false;
     786            2 :         for _ in 0..5 {
     787            2 :             if handle.current_position().is_none() {
     788            1 :                 done = true;
     789            1 :                 break;
     790            1 :             }
     791            1 :             tokio::time::sleep(Duration::from_millis(1)).await;
     792            1 :         }
     793            1 : 
     794            1 :         assert!(done);
     795            1 :     }
     796              : }
        

Generated by: LCOV version 2.1-beta