LCOV - code coverage report
Current view: top level - safekeeper/src - send_interpreted_wal.rs (source / functions) Coverage Total Hit
Test: 91bf6c8f32e5e69adde6241313e732fdd6d6e277.info Lines: 85.3 % 796 679
Test Date: 2025-03-04 12:19:20 Functions: 72.9 % 48 35

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::fmt::Display;
       3              : use std::sync::Arc;
       4              : use std::time::Duration;
       5              : 
       6              : use anyhow::{Context, anyhow};
       7              : use futures::StreamExt;
       8              : use futures::future::Either;
       9              : use pageserver_api::shard::ShardIdentity;
      10              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
      11              : use postgres_ffi::get_current_timestamp;
      12              : use postgres_ffi::waldecoder::{WalDecodeError, WalStreamDecoder};
      13              : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
      14              : use tokio::io::{AsyncRead, AsyncWrite};
      15              : use tokio::sync::mpsc::error::SendError;
      16              : use tokio::task::JoinHandle;
      17              : use tokio::time::MissedTickBehavior;
      18              : use tracing::{Instrument, error, info, info_span};
      19              : use utils::critical;
      20              : use utils::lsn::Lsn;
      21              : use utils::postgres_client::{Compression, InterpretedFormat};
      22              : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
      23              : use wal_decoder::wire_format::ToWireFormat;
      24              : 
      25              : use crate::metrics::WAL_READERS;
      26              : use crate::send_wal::{EndWatchView, WalSenderGuard};
      27              : use crate::timeline::WalResidentTimeline;
      28              : use crate::wal_reader_stream::{StreamingWalReader, WalBytes};
      29              : 
      30              : /// Identifier used to differentiate between senders of the same
      31              : /// shard.
      32              : ///
      33              : /// In the steady state there's only one, but two pageservers may
      34              : /// temporarily have the same shard attached and attempt to ingest
      35              : /// WAL for it. See also [`ShardSenderId`].
      36              : #[derive(Hash, Eq, PartialEq, Copy, Clone)]
      37              : struct SenderId(u8);
      38              : 
      39              : impl SenderId {
      40            5 :     fn first() -> Self {
      41            5 :         SenderId(0)
      42            5 :     }
      43              : 
      44            2 :     fn next(&self) -> Self {
      45            2 :         SenderId(self.0.checked_add(1).expect("few senders"))
      46            2 :     }
      47              : }
      48              : 
      49              : #[derive(Hash, Eq, PartialEq)]
      50              : struct ShardSenderId {
      51              :     shard: ShardIdentity,
      52              :     sender_id: SenderId,
      53              : }
      54              : 
      55              : impl Display for ShardSenderId {
      56            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      57            0 :         write!(f, "{}{}", self.sender_id.0, self.shard.shard_slug())
      58            0 :     }
      59              : }
      60              : 
      61              : impl ShardSenderId {
      62          867 :     fn new(shard: ShardIdentity, sender_id: SenderId) -> Self {
      63          867 :         ShardSenderId { shard, sender_id }
      64          867 :     }
      65              : 
      66            0 :     fn shard(&self) -> ShardIdentity {
      67            0 :         self.shard
      68            0 :     }
      69              : }
      70              : 
      71              : /// Shard-aware fan-out interpreted record reader.
      72              : /// Reads WAL from disk, decodes it, intepretets it, and sends
      73              : /// it to any [`InterpretedWalSender`] connected to it.
      74              : /// Each [`InterpretedWalSender`] corresponds to one shard
      75              : /// and gets interpreted records concerning that shard only.
      76              : pub(crate) struct InterpretedWalReader {
      77              :     wal_stream: StreamingWalReader,
      78              :     shard_senders: HashMap<ShardIdentity, smallvec::SmallVec<[ShardSenderState; 1]>>,
      79              :     shard_notification_rx: Option<tokio::sync::mpsc::UnboundedReceiver<AttachShardNotification>>,
      80              :     state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
      81              :     pg_version: u32,
      82              : }
      83              : 
      84              : /// A handle for [`InterpretedWalReader`] which allows for interacting with it
      85              : /// when it runs as a separate tokio task.
      86              : #[derive(Debug)]
      87              : pub(crate) struct InterpretedWalReaderHandle {
      88              :     join_handle: JoinHandle<Result<(), InterpretedWalReaderError>>,
      89              :     state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
      90              :     shard_notification_tx: tokio::sync::mpsc::UnboundedSender<AttachShardNotification>,
      91              : }
      92              : 
      93              : struct ShardSenderState {
      94              :     sender_id: SenderId,
      95              :     tx: tokio::sync::mpsc::Sender<Batch>,
      96              :     next_record_lsn: Lsn,
      97              : }
      98              : 
      99              : /// State of [`InterpretedWalReader`] visible outside of the task running it.
     100              : #[derive(Debug)]
     101              : pub(crate) enum InterpretedWalReaderState {
     102              :     Running {
     103              :         current_position: Lsn,
     104              :         /// Tracks the start of the PG WAL LSN from which the current batch of
     105              :         /// interpreted records originated.
     106              :         current_batch_wal_start: Option<Lsn>,
     107              :     },
     108              :     Done,
     109              : }
     110              : 
     111              : pub(crate) struct Batch {
     112              :     wal_end_lsn: Lsn,
     113              :     available_wal_end_lsn: Lsn,
     114              :     records: InterpretedWalRecords,
     115              : }
     116              : 
     117              : #[derive(thiserror::Error, Debug)]
     118              : pub enum InterpretedWalReaderError {
     119              :     /// Handler initiates the end of streaming.
     120              :     #[error("decode error: {0}")]
     121              :     Decode(#[from] WalDecodeError),
     122              :     #[error("read or interpret error: {0}")]
     123              :     ReadOrInterpret(#[from] anyhow::Error),
     124              :     #[error("wal stream closed")]
     125              :     WalStreamClosed,
     126              : }
     127              : 
     128              : enum CurrentPositionUpdate {
     129              :     Reset { from: Lsn, to: Lsn },
     130              :     NotReset(Lsn),
     131              : }
     132              : 
     133              : impl CurrentPositionUpdate {
     134            0 :     fn current_position(&self) -> Lsn {
     135            0 :         match self {
     136            0 :             CurrentPositionUpdate::Reset { from: _, to } => *to,
     137            0 :             CurrentPositionUpdate::NotReset(lsn) => *lsn,
     138              :         }
     139            0 :     }
     140              : 
     141            0 :     fn previous_position(&self) -> Lsn {
     142            0 :         match self {
     143            0 :             CurrentPositionUpdate::Reset { from, to: _ } => *from,
     144            0 :             CurrentPositionUpdate::NotReset(lsn) => *lsn,
     145              :         }
     146            0 :     }
     147              : }
     148              : 
     149              : impl InterpretedWalReaderState {
     150            4 :     fn current_position(&self) -> Option<Lsn> {
     151            4 :         match self {
     152              :             InterpretedWalReaderState::Running {
     153            2 :                 current_position, ..
     154            2 :             } => Some(*current_position),
     155            2 :             InterpretedWalReaderState::Done => None,
     156              :         }
     157            4 :     }
     158              : 
     159              :     #[cfg(test)]
     160         1380 :     fn current_batch_wal_start(&self) -> Option<Lsn> {
     161         1380 :         match self {
     162              :             InterpretedWalReaderState::Running {
     163         1380 :                 current_batch_wal_start,
     164         1380 :                 ..
     165         1380 :             } => *current_batch_wal_start,
     166            0 :             InterpretedWalReaderState::Done => None,
     167              :         }
     168         1380 :     }
     169              : 
     170              :     // Reset the current position of the WAL reader if the requested starting position
     171              :     // of the new shard is smaller than the current value.
     172            4 :     fn maybe_reset(&mut self, new_shard_start_pos: Lsn) -> CurrentPositionUpdate {
     173            4 :         match self {
     174              :             InterpretedWalReaderState::Running {
     175            4 :                 current_position,
     176            4 :                 current_batch_wal_start,
     177            4 :             } => {
     178            4 :                 if new_shard_start_pos < *current_position {
     179            3 :                     let from = *current_position;
     180            3 :                     *current_position = new_shard_start_pos;
     181            3 :                     *current_batch_wal_start = None;
     182            3 :                     CurrentPositionUpdate::Reset {
     183            3 :                         from,
     184            3 :                         to: *current_position,
     185            3 :                     }
     186              :                 } else {
     187              :                     // Edge case: The new shard is at the same current position as
     188              :                     // the reader. Note that the current position is WAL record aligned,
     189              :                     // so the reader might have done some partial reads and updated the
     190              :                     // batch start. If that's the case, adjust the batch start to match
     191              :                     // starting position of the new shard. It can lead to some shards
     192              :                     // seeing overlaps, but in that case the actual record LSNs are checked
     193              :                     // which should be fine based on the filtering logic.
     194            1 :                     if let Some(start) = current_batch_wal_start {
     195            0 :                         *start = std::cmp::min(*start, new_shard_start_pos);
     196            1 :                     }
     197            1 :                     CurrentPositionUpdate::NotReset(*current_position)
     198              :                 }
     199              :             }
     200              :             InterpretedWalReaderState::Done => {
     201            0 :                 panic!("maybe_reset called on finished reader")
     202              :             }
     203              :         }
     204            4 :     }
     205              : 
     206           50 :     fn update_current_batch_wal_start(&mut self, lsn: Lsn) {
     207           50 :         match self {
     208              :             InterpretedWalReaderState::Running {
     209           50 :                 current_batch_wal_start,
     210           50 :                 ..
     211           50 :             } => {
     212           50 :                 if current_batch_wal_start.is_none() {
     213           41 :                     *current_batch_wal_start = Some(lsn);
     214           41 :                 }
     215              :             }
     216              :             InterpretedWalReaderState::Done => {
     217            0 :                 panic!("update_current_batch_wal_start called on finished reader")
     218              :             }
     219              :         }
     220           50 :     }
     221              : 
     222           40 :     fn take_current_batch_wal_start(&mut self) -> Lsn {
     223           40 :         match self {
     224              :             InterpretedWalReaderState::Running {
     225           40 :                 current_batch_wal_start,
     226           40 :                 ..
     227           40 :             } => current_batch_wal_start.take().unwrap(),
     228              :             InterpretedWalReaderState::Done => {
     229            0 :                 panic!("take_current_batch_wal_start called on finished reader")
     230              :             }
     231              :         }
     232           40 :     }
     233              : 
     234           40 :     fn update_current_position(&mut self, lsn: Lsn) {
     235           40 :         match self {
     236              :             InterpretedWalReaderState::Running {
     237           40 :                 current_position, ..
     238           40 :             } => {
     239           40 :                 *current_position = lsn;
     240           40 :             }
     241              :             InterpretedWalReaderState::Done => {
     242            0 :                 panic!("update_current_position called on finished reader")
     243              :             }
     244              :         }
     245           40 :     }
     246              : }
     247              : 
     248              : pub(crate) struct AttachShardNotification {
     249              :     shard_id: ShardIdentity,
     250              :     sender: tokio::sync::mpsc::Sender<Batch>,
     251              :     start_pos: Lsn,
     252              : }
     253              : 
     254              : impl InterpretedWalReader {
     255              :     /// Spawn the reader in a separate tokio task and return a handle
     256            2 :     pub(crate) fn spawn(
     257            2 :         wal_stream: StreamingWalReader,
     258            2 :         start_pos: Lsn,
     259            2 :         tx: tokio::sync::mpsc::Sender<Batch>,
     260            2 :         shard: ShardIdentity,
     261            2 :         pg_version: u32,
     262            2 :         appname: &Option<String>,
     263            2 :     ) -> InterpretedWalReaderHandle {
     264            2 :         let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
     265            2 :             current_position: start_pos,
     266            2 :             current_batch_wal_start: None,
     267            2 :         }));
     268            2 : 
     269            2 :         let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel();
     270              : 
     271            2 :         let reader = InterpretedWalReader {
     272            2 :             wal_stream,
     273            2 :             shard_senders: HashMap::from([(
     274            2 :                 shard,
     275            2 :                 smallvec::smallvec![ShardSenderState {
     276            0 :                     sender_id: SenderId::first(),
     277            0 :                     tx,
     278            0 :                     next_record_lsn: start_pos,
     279            0 :                 }],
     280              :             )]),
     281            2 :             shard_notification_rx: Some(shard_notification_rx),
     282            2 :             state: state.clone(),
     283            2 :             pg_version,
     284            2 :         };
     285            2 : 
     286            2 :         let metric = WAL_READERS
     287            2 :             .get_metric_with_label_values(&["task", appname.as_deref().unwrap_or("safekeeper")])
     288            2 :             .unwrap();
     289              : 
     290            2 :         let join_handle = tokio::task::spawn(
     291            2 :             async move {
     292            2 :                 metric.inc();
     293            2 :                 scopeguard::defer! {
     294            2 :                     metric.dec();
     295            2 :                 }
     296            2 : 
     297            2 :                 reader
     298            2 :                     .run_impl(start_pos)
     299            2 :                     .await
     300            0 :                     .inspect_err(|err| match err {
     301              :                         // TODO: we may want to differentiate these errors further.
     302              :                         InterpretedWalReaderError::Decode(_) => {
     303            0 :                             critical!("failed to decode WAL record: {err:?}");
     304              :                         }
     305            0 :                         err => error!("failed to read WAL record: {err}"),
     306            0 :                     })
     307            0 :             }
     308            2 :             .instrument(info_span!("interpreted wal reader")),
     309              :         );
     310              : 
     311            2 :         InterpretedWalReaderHandle {
     312            2 :             join_handle,
     313            2 :             state,
     314            2 :             shard_notification_tx,
     315            2 :         }
     316            2 :     }
     317              : 
     318              :     /// Construct the reader without spawning anything
     319              :     /// Callers should drive the future returned by [`Self::run`].
     320            1 :     pub(crate) fn new(
     321            1 :         wal_stream: StreamingWalReader,
     322            1 :         start_pos: Lsn,
     323            1 :         tx: tokio::sync::mpsc::Sender<Batch>,
     324            1 :         shard: ShardIdentity,
     325            1 :         pg_version: u32,
     326            1 :         shard_notification_rx: Option<
     327            1 :             tokio::sync::mpsc::UnboundedReceiver<AttachShardNotification>,
     328            1 :         >,
     329            1 :     ) -> InterpretedWalReader {
     330            1 :         let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
     331            1 :             current_position: start_pos,
     332            1 :             current_batch_wal_start: None,
     333            1 :         }));
     334            1 : 
     335            1 :         InterpretedWalReader {
     336            1 :             wal_stream,
     337            1 :             shard_senders: HashMap::from([(
     338            1 :                 shard,
     339            1 :                 smallvec::smallvec![ShardSenderState {
     340            0 :                     sender_id: SenderId::first(),
     341            0 :                     tx,
     342            0 :                     next_record_lsn: start_pos,
     343            0 :                 }],
     344              :             )]),
     345            1 :             shard_notification_rx,
     346            1 :             state: state.clone(),
     347            1 :             pg_version,
     348            1 :         }
     349            1 :     }
     350              : 
     351              :     /// Entry point for future (polling) based wal reader.
     352            1 :     pub(crate) async fn run(
     353            1 :         self,
     354            1 :         start_pos: Lsn,
     355            1 :         appname: &Option<String>,
     356            1 :     ) -> Result<(), CopyStreamHandlerEnd> {
     357            1 :         let metric = WAL_READERS
     358            1 :             .get_metric_with_label_values(&["future", appname.as_deref().unwrap_or("safekeeper")])
     359            1 :             .unwrap();
     360            1 : 
     361            1 :         metric.inc();
     362            1 :         scopeguard::defer! {
     363            1 :             metric.dec();
     364            1 :         }
     365            1 : 
     366            1 :         match self.run_impl(start_pos).await {
     367            0 :             Err(err @ InterpretedWalReaderError::Decode(_)) => {
     368            0 :                 critical!("failed to decode WAL record: {err:?}");
     369              :             }
     370            0 :             Err(err) => error!("failed to read WAL record: {err}"),
     371            0 :             Ok(()) => info!("interpreted wal reader exiting"),
     372              :         }
     373              : 
     374            0 :         Err(CopyStreamHandlerEnd::Other(anyhow!(
     375            0 :             "interpreted wal reader finished"
     376            0 :         )))
     377            0 :     }
     378              : 
     379              :     /// Send interpreted WAL to one or more [`InterpretedWalSender`]s
     380              :     /// Stops when an error is encountered or when the [`InterpretedWalReaderHandle`]
     381              :     /// goes out of scope.
     382            3 :     async fn run_impl(mut self, start_pos: Lsn) -> Result<(), InterpretedWalReaderError> {
     383            3 :         let defer_state = self.state.clone();
     384            3 :         scopeguard::defer! {
     385            3 :             *defer_state.write().unwrap() = InterpretedWalReaderState::Done;
     386            3 :         }
     387            3 : 
     388            3 :         let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
     389              : 
     390              :         loop {
     391           57 :             tokio::select! {
     392              :                 // Main branch for reading WAL and forwarding it
     393           57 :                 wal_or_reset = self.wal_stream.next() => {
     394           50 :                     let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
     395              :                     let WalBytes {
     396           50 :                         wal,
     397           50 :                         wal_start_lsn,
     398           50 :                         wal_end_lsn,
     399           50 :                         available_wal_end_lsn,
     400           50 :                     } = match wal {
     401           50 :                         Some(some) => some.map_err(InterpretedWalReaderError::ReadOrInterpret)?,
     402              :                         None => {
     403              :                             // [`StreamingWalReader::next`] is an endless stream of WAL.
     404              :                             // It shouldn't ever finish unless it panicked or became internally
     405              :                             // inconsistent.
     406            0 :                             return Result::Err(InterpretedWalReaderError::WalStreamClosed);
     407              :                         }
     408              :                     };
     409              : 
     410           50 :                     self.state.write().unwrap().update_current_batch_wal_start(wal_start_lsn);
     411           50 : 
     412           50 :                     wal_decoder.feed_bytes(&wal);
     413           50 : 
     414           50 :                     // Deserialize and interpret WAL records from this batch of WAL.
     415           50 :                     // Interpreted records for each shard are collected separately.
     416           50 :                     let shard_ids = self.shard_senders.keys().copied().collect::<Vec<_>>();
     417           50 :                     let mut records_by_sender: HashMap<ShardSenderId, Vec<InterpretedWalRecord>> = HashMap::new();
     418           50 :                     let mut max_next_record_lsn = None;
     419          647 :                     while let Some((next_record_lsn, recdata)) = wal_decoder.poll_decode()?
     420              :                     {
     421          597 :                         assert!(next_record_lsn.is_aligned());
     422          597 :                         max_next_record_lsn = Some(next_record_lsn);
     423              : 
     424          597 :                         let interpreted = InterpretedWalRecord::from_bytes_filtered(
     425          597 :                             recdata,
     426          597 :                             &shard_ids,
     427          597 :                             next_record_lsn,
     428          597 :                             self.pg_version,
     429          597 :                         )
     430          597 :                         .with_context(|| "Failed to interpret WAL")?;
     431              : 
     432         1394 :                         for (shard, record) in interpreted {
     433          797 :                             if record.is_empty() {
     434          200 :                                 continue;
     435          597 :                             }
     436          597 : 
     437          597 :                             let mut states_iter = self.shard_senders
     438          597 :                                 .get(&shard)
     439          597 :                                 .expect("keys collected above")
     440          597 :                                 .iter()
     441          993 :                                 .filter(|state| record.next_record_lsn > state.next_record_lsn)
     442          597 :                                 .peekable();
     443          987 :                             while let Some(state) = states_iter.next() {
     444          787 :                                 let shard_sender_id = ShardSenderId::new(shard, state.sender_id);
     445          787 : 
     446          787 :                                 // The most commont case is one sender per shard. Peek and break to avoid the
     447          787 :                                 // clone in that situation.
     448          787 :                                 if states_iter.peek().is_none() {
     449          397 :                                     records_by_sender.entry(shard_sender_id).or_default().push(record);
     450          397 :                                     break;
     451          390 :                                 } else {
     452          390 :                                     records_by_sender.entry(shard_sender_id).or_default().push(record.clone());
     453          390 :                                 }
     454              :                             }
     455              :                         }
     456              :                     }
     457              : 
     458           50 :                     let max_next_record_lsn = match max_next_record_lsn {
     459           40 :                         Some(lsn) => lsn,
     460              :                         None => {
     461           10 :                             continue;
     462              :                         }
     463              :                     };
     464              : 
     465              :                     // Update the current position such that new receivers can decide
     466              :                     // whether to attach to us or spawn a new WAL reader.
     467           40 :                     let batch_wal_start_lsn = {
     468           40 :                         let mut guard = self.state.write().unwrap();
     469           40 :                         guard.update_current_position(max_next_record_lsn);
     470           40 :                         guard.take_current_batch_wal_start()
     471           40 :                     };
     472           40 : 
     473           40 :                     // Send interpreted records downstream. Anything that has already been seen
     474           40 :                     // by a shard is filtered out.
     475           40 :                     let mut shard_senders_to_remove = Vec::new();
     476           94 :                     for (shard, states) in &mut self.shard_senders {
     477          134 :                         for state in states {
     478           80 :                             let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
     479              : 
     480           80 :                             let batch = if max_next_record_lsn > state.next_record_lsn {
     481              :                                 // This batch contains at least one record that this shard has not
     482              :                                 // seen yet.
     483           66 :                                 let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
     484           66 : 
     485           66 :                                 InterpretedWalRecords {
     486           66 :                                     records,
     487           66 :                                     next_record_lsn: max_next_record_lsn,
     488           66 :                                     raw_wal_start_lsn: Some(batch_wal_start_lsn),
     489           66 :                                 }
     490           14 :                             } else if wal_end_lsn > state.next_record_lsn {
     491              :                                 // All the records in this batch were seen by the shard
     492              :                                 // However, the batch maps to a chunk of WAL that the
     493              :                                 // shard has not yet seen. Notify it of the start LSN
     494              :                                 // of the PG WAL chunk such that it doesn't look like a gap.
     495            0 :                                 InterpretedWalRecords {
     496            0 :                                     records: Vec::default(),
     497            0 :                                     next_record_lsn: state.next_record_lsn,
     498            0 :                                     raw_wal_start_lsn: Some(batch_wal_start_lsn),
     499            0 :                                 }
     500              :                             } else {
     501              :                                 // The shard has seen this chunk of WAL before. Skip it.
     502           14 :                                 continue;
     503              :                             };
     504              : 
     505           66 :                             let res = state.tx.send(Batch {
     506           66 :                                 wal_end_lsn,
     507           66 :                                 available_wal_end_lsn,
     508           66 :                                 records: batch,
     509           66 :                             }).await;
     510              : 
     511           66 :                             if res.is_err() {
     512            0 :                                 shard_senders_to_remove.push(shard_sender_id);
     513           66 :                             } else {
     514           66 :                                 state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn);
     515           66 :                             }
     516              :                         }
     517              :                     }
     518              : 
     519              :                     // Clean up any shard senders that have dropped out.
     520              :                     // This is inefficient, but such events are rare (connection to PS termination)
     521              :                     // and the number of subscriptions on the same shards very small (only one
     522              :                     // for the steady state).
     523           40 :                     for to_remove in shard_senders_to_remove {
     524            0 :                         let shard_senders = self.shard_senders.get_mut(&to_remove.shard()).expect("saw it above");
     525            0 :                         if let Some(idx) = shard_senders.iter().position(|s| s.sender_id == to_remove.sender_id) {
     526            0 :                             shard_senders.remove(idx);
     527            0 :                             tracing::info!("Removed shard sender {}", to_remove);
     528            0 :                         }
     529              : 
     530            0 :                         if shard_senders.is_empty() {
     531            0 :                             self.shard_senders.remove(&to_remove.shard());
     532            0 :                         }
     533              :                     }
     534              :                 },
     535              :                 // Listen for new shards that want to attach to this reader.
     536              :                 // If the reader is not running as a task, then this is not supported
     537              :                 // (see the pending branch below).
     538           57 :                 notification = match self.shard_notification_rx.as_mut() {
     539           57 :                         Some(rx) => Either::Left(rx.recv()),
     540            0 :                         None => Either::Right(std::future::pending())
     541              :                     } => {
     542            4 :                     if let Some(n) = notification {
     543            4 :                         let AttachShardNotification { shard_id, sender, start_pos } = n;
     544            4 : 
     545            4 :                         // Update internal and external state, then reset the WAL stream
     546            4 :                         // if required.
     547            4 :                         let senders = self.shard_senders.entry(shard_id).or_default();
     548            4 :                         let new_sender_id = match senders.last() {
     549            2 :                             Some(sender) => sender.sender_id.next(),
     550            2 :                             None => SenderId::first()
     551              :                         };
     552              : 
     553            4 :                         senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
     554            4 : 
     555            4 :                         // If the shard is subscribing below the current position the we need
     556            4 :                         // to update the cursor that tracks where we are at in the WAL
     557            4 :                         // ([`Self::state`]) and reset the WAL stream itself
     558            4 :                         // (`[Self::wal_stream`]). This must be done atomically from the POV of
     559            4 :                         // anything outside the select statement.
     560            4 :                         let position_reset = self.state.write().unwrap().maybe_reset(start_pos);
     561            4 :                         match position_reset {
     562            3 :                             CurrentPositionUpdate::Reset { from: _, to } => {
     563            3 :                                 self.wal_stream.reset(to).await;
     564            3 :                                 wal_decoder = WalStreamDecoder::new(to, self.pg_version);
     565              :                             },
     566            1 :                             CurrentPositionUpdate::NotReset(_) => {}
     567              :                         };
     568              : 
     569            4 :                         tracing::info!(
     570            0 :                             "Added shard sender {} with start_pos={} previous_pos={} current_pos={}",
     571            0 :                             ShardSenderId::new(shard_id, new_sender_id),
     572            0 :                             start_pos,
     573            0 :                             position_reset.previous_position(),
     574            0 :                             position_reset.current_position(),
     575              :                         );
     576            0 :                     }
     577              :                 }
     578              :             }
     579              :         }
     580            0 :     }
     581              : 
     582              :     #[cfg(test)]
     583            1 :     fn state(&self) -> Arc<std::sync::RwLock<InterpretedWalReaderState>> {
     584            1 :         self.state.clone()
     585            1 :     }
     586              : }
     587              : 
     588              : impl InterpretedWalReaderHandle {
     589              :     /// Fan-out the reader by attaching a new shard to it
     590            3 :     pub(crate) fn fanout(
     591            3 :         &self,
     592            3 :         shard_id: ShardIdentity,
     593            3 :         sender: tokio::sync::mpsc::Sender<Batch>,
     594            3 :         start_pos: Lsn,
     595            3 :     ) -> Result<(), SendError<AttachShardNotification>> {
     596            3 :         self.shard_notification_tx.send(AttachShardNotification {
     597            3 :             shard_id,
     598            3 :             sender,
     599            3 :             start_pos,
     600            3 :         })
     601            3 :     }
     602              : 
     603              :     /// Get the current WAL position of the reader
     604            4 :     pub(crate) fn current_position(&self) -> Option<Lsn> {
     605            4 :         self.state.read().unwrap().current_position()
     606            4 :     }
     607              : 
     608            4 :     pub(crate) fn abort(&self) {
     609            4 :         self.join_handle.abort()
     610            4 :     }
     611              : }
     612              : 
     613              : impl Drop for InterpretedWalReaderHandle {
     614            2 :     fn drop(&mut self) {
     615            2 :         tracing::info!("Aborting interpreted wal reader");
     616            2 :         self.abort()
     617            2 :     }
     618              : }
     619              : 
     620              : pub(crate) struct InterpretedWalSender<'a, IO> {
     621              :     pub(crate) format: InterpretedFormat,
     622              :     pub(crate) compression: Option<Compression>,
     623              :     pub(crate) appname: Option<String>,
     624              : 
     625              :     pub(crate) tli: WalResidentTimeline,
     626              :     pub(crate) start_lsn: Lsn,
     627              : 
     628              :     pub(crate) pgb: &'a mut PostgresBackend<IO>,
     629              :     pub(crate) end_watch_view: EndWatchView,
     630              :     pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
     631              :     pub(crate) rx: tokio::sync::mpsc::Receiver<Batch>,
     632              : }
     633              : 
     634              : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
     635              :     /// Send interpreted WAL records over the network.
     636              :     /// Also manages keep-alives if nothing was sent for a while.
     637            0 :     pub(crate) async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
     638            0 :         let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
     639            0 :         keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
     640            0 :         keepalive_ticker.reset();
     641            0 : 
     642            0 :         let mut wal_position = self.start_lsn;
     643              : 
     644              :         loop {
     645            0 :             tokio::select! {
     646            0 :                 batch = self.rx.recv() => {
     647            0 :                     let batch = match batch {
     648            0 :                         Some(b) => b,
     649              :                         None => {
     650            0 :                             return Result::Err(
     651            0 :                                 CopyStreamHandlerEnd::Other(anyhow!("Interpreted WAL reader exited early"))
     652            0 :                             );
     653              :                         }
     654              :                     };
     655              : 
     656            0 :                     wal_position = batch.wal_end_lsn;
     657              : 
     658            0 :                     let buf = batch
     659            0 :                         .records
     660            0 :                         .to_wire(self.format, self.compression)
     661            0 :                         .await
     662            0 :                         .with_context(|| "Failed to serialize interpreted WAL")
     663            0 :                         .map_err(CopyStreamHandlerEnd::from)?;
     664              : 
     665              :                     // Reset the keep alive ticker since we are sending something
     666              :                     // over the wire now.
     667            0 :                     keepalive_ticker.reset();
     668            0 : 
     669            0 :                     self.pgb
     670            0 :                         .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
     671            0 :                             streaming_lsn: batch.wal_end_lsn.0,
     672            0 :                             commit_lsn: batch.available_wal_end_lsn.0,
     673            0 :                             data: &buf,
     674            0 :                         })).await?;
     675              :                 }
     676              :                 // Send a periodic keep alive when the connection has been idle for a while.
     677              :                 // Since we've been idle, also check if we can stop streaming.
     678            0 :                 _ = keepalive_ticker.tick() => {
     679            0 :                     if let Some(remote_consistent_lsn) = self.wal_sender_guard
     680            0 :                         .walsenders()
     681            0 :                         .get_ws_remote_consistent_lsn(self.wal_sender_guard.id())
     682              :                     {
     683            0 :                         if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     684              :                             // Stop streaming if the receivers are caught up and
     685              :                             // there's no active compute. This causes the loop in
     686              :                             // [`crate::send_interpreted_wal::InterpretedWalSender::run`]
     687              :                             // to exit and terminate the WAL stream.
     688            0 :                             break;
     689            0 :                         }
     690            0 :                     }
     691              : 
     692            0 :                     self.pgb
     693            0 :                         .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     694            0 :                             wal_end: self.end_watch_view.get().0,
     695            0 :                             timestamp: get_current_timestamp(),
     696            0 :                             request_reply: true,
     697            0 :                         }))
     698            0 :                         .await?;
     699              :                 },
     700              :             }
     701              :         }
     702              : 
     703            0 :         Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     704            0 :             "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     705            0 :             self.appname, wal_position,
     706            0 :         )))
     707            0 :     }
     708              : }
     709              : #[cfg(test)]
     710              : mod tests {
     711              :     use std::collections::HashMap;
     712              :     use std::str::FromStr;
     713              :     use std::time::Duration;
     714              : 
     715              :     use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
     716              :     use postgres_ffi::MAX_SEND_SIZE;
     717              :     use tokio::sync::mpsc::error::TryRecvError;
     718              :     use utils::id::{NodeId, TenantTimelineId};
     719              :     use utils::lsn::Lsn;
     720              :     use utils::shard::{ShardCount, ShardNumber};
     721              : 
     722              :     use crate::send_interpreted_wal::{AttachShardNotification, Batch, InterpretedWalReader};
     723              :     use crate::test_utils::Env;
     724              :     use crate::wal_reader_stream::StreamingWalReader;
     725              : 
     726              :     #[tokio::test]
     727            1 :     async fn test_interpreted_wal_reader_fanout() {
     728            1 :         let _ = env_logger::builder().is_test(true).try_init();
     729            1 : 
     730            1 :         const SIZE: usize = 8 * 1024;
     731            1 :         const MSG_COUNT: usize = 200;
     732            1 :         const PG_VERSION: u32 = 17;
     733            1 :         const SHARD_COUNT: u8 = 2;
     734            1 : 
     735            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
     736            1 :         let env = Env::new(true).unwrap();
     737            1 :         let tli = env
     738            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
     739            1 :             .await
     740            1 :             .unwrap();
     741            1 : 
     742            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
     743            1 :         let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
     744            1 :             .await
     745            1 :             .unwrap();
     746            1 :         let end_pos = end_watch.get();
     747            1 : 
     748            1 :         tracing::info!("Doing first round of reads ...");
     749            1 : 
     750            1 :         let streaming_wal_reader = StreamingWalReader::new(
     751            1 :             resident_tli,
     752            1 :             None,
     753            1 :             start_lsn,
     754            1 :             end_pos,
     755            1 :             end_watch,
     756            1 :             MAX_SEND_SIZE,
     757            1 :         );
     758            1 : 
     759            1 :         let shard_0 = ShardIdentity::new(
     760            1 :             ShardNumber(0),
     761            1 :             ShardCount(SHARD_COUNT),
     762            1 :             ShardStripeSize::default(),
     763            1 :         )
     764            1 :         .unwrap();
     765            1 : 
     766            1 :         let shard_1 = ShardIdentity::new(
     767            1 :             ShardNumber(1),
     768            1 :             ShardCount(SHARD_COUNT),
     769            1 :             ShardStripeSize::default(),
     770            1 :         )
     771            1 :         .unwrap();
     772            1 : 
     773            1 :         let mut shards = HashMap::new();
     774            1 : 
     775            3 :         for shard_number in 0..SHARD_COUNT {
     776            2 :             let shard_id = ShardIdentity::new(
     777            2 :                 ShardNumber(shard_number),
     778            2 :                 ShardCount(SHARD_COUNT),
     779            2 :                 ShardStripeSize::default(),
     780            2 :             )
     781            2 :             .unwrap();
     782            2 :             let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
     783            2 :             shards.insert(shard_id, (Some(tx), Some(rx)));
     784            2 :         }
     785            1 : 
     786            1 :         let shard_0_tx = shards.get_mut(&shard_0).unwrap().0.take().unwrap();
     787            1 :         let mut shard_0_rx = shards.get_mut(&shard_0).unwrap().1.take().unwrap();
     788            1 : 
     789            1 :         let handle = InterpretedWalReader::spawn(
     790            1 :             streaming_wal_reader,
     791            1 :             start_lsn,
     792            1 :             shard_0_tx,
     793            1 :             shard_0,
     794            1 :             PG_VERSION,
     795            1 :             &Some("pageserver".to_string()),
     796            1 :         );
     797            1 : 
     798            1 :         tracing::info!("Reading all WAL with only shard 0 attached ...");
     799            1 : 
     800            1 :         let mut shard_0_interpreted_records = Vec::new();
     801           13 :         while let Some(batch) = shard_0_rx.recv().await {
     802           13 :             shard_0_interpreted_records.push(batch.records);
     803           13 :             if batch.wal_end_lsn == batch.available_wal_end_lsn {
     804            1 :                 break;
     805           12 :             }
     806            1 :         }
     807            1 : 
     808            1 :         let shard_1_tx = shards.get_mut(&shard_1).unwrap().0.take().unwrap();
     809            1 :         let mut shard_1_rx = shards.get_mut(&shard_1).unwrap().1.take().unwrap();
     810            1 : 
     811            1 :         tracing::info!("Attaching shard 1 to the reader at start of WAL");
     812            1 :         handle.fanout(shard_1, shard_1_tx, start_lsn).unwrap();
     813            1 : 
     814            1 :         tracing::info!("Reading all WAL with shard 0 and shard 1 attached ...");
     815            1 : 
     816            1 :         let mut shard_1_interpreted_records = Vec::new();
     817           13 :         while let Some(batch) = shard_1_rx.recv().await {
     818           13 :             shard_1_interpreted_records.push(batch.records);
     819           13 :             if batch.wal_end_lsn == batch.available_wal_end_lsn {
     820            1 :                 break;
     821           12 :             }
     822            1 :         }
     823            1 : 
     824            1 :         // This test uses logical messages. Those only go to shard 0. Check that the
     825            1 :         // filtering worked and shard 1 did not get any.
     826            1 :         assert!(
     827            1 :             shard_1_interpreted_records
     828            1 :                 .iter()
     829           13 :                 .all(|recs| recs.records.is_empty())
     830            1 :         );
     831            1 : 
     832            1 :         // Shard 0 should not receive anything more since the reader is
     833            1 :         // going through wal that it has already processed.
     834            1 :         let res = shard_0_rx.try_recv();
     835            1 :         if let Ok(ref ok) = res {
     836            1 :             tracing::error!(
     837            1 :                 "Shard 0 received batch: wal_end_lsn={} available_wal_end_lsn={}",
     838            1 :                 ok.wal_end_lsn,
     839            1 :                 ok.available_wal_end_lsn
     840            1 :             );
     841            1 :         }
     842            1 :         assert!(matches!(res, Err(TryRecvError::Empty)));
     843            1 : 
     844            1 :         // Check that the next records lsns received by the two shards match up.
     845            1 :         let shard_0_next_lsns = shard_0_interpreted_records
     846            1 :             .iter()
     847           13 :             .map(|recs| recs.next_record_lsn)
     848            1 :             .collect::<Vec<_>>();
     849            1 :         let shard_1_next_lsns = shard_1_interpreted_records
     850            1 :             .iter()
     851           13 :             .map(|recs| recs.next_record_lsn)
     852            1 :             .collect::<Vec<_>>();
     853            1 :         assert_eq!(shard_0_next_lsns, shard_1_next_lsns);
     854            1 : 
     855            1 :         handle.abort();
     856            1 :         let mut done = false;
     857            2 :         for _ in 0..5 {
     858            2 :             if handle.current_position().is_none() {
     859            1 :                 done = true;
     860            1 :                 break;
     861            1 :             }
     862            1 :             tokio::time::sleep(Duration::from_millis(1)).await;
     863            1 :         }
     864            1 : 
     865            1 :         assert!(done);
     866            1 :     }
     867              : 
     868              :     #[tokio::test]
     869            1 :     async fn test_interpreted_wal_reader_same_shard_fanout() {
     870            1 :         let _ = env_logger::builder().is_test(true).try_init();
     871            1 : 
     872            1 :         const SIZE: usize = 8 * 1024;
     873            1 :         const MSG_COUNT: usize = 200;
     874            1 :         const PG_VERSION: u32 = 17;
     875            1 :         const SHARD_COUNT: u8 = 2;
     876            1 : 
     877            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
     878            1 :         let env = Env::new(true).unwrap();
     879            1 :         let tli = env
     880            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
     881            1 :             .await
     882            1 :             .unwrap();
     883            1 : 
     884            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
     885            1 :         let mut next_record_lsns = Vec::default();
     886            1 :         let end_watch =
     887            1 :             Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
     888            1 :                 .await
     889            1 :                 .unwrap();
     890            1 :         let end_pos = end_watch.get();
     891            1 : 
     892            1 :         let streaming_wal_reader = StreamingWalReader::new(
     893            1 :             resident_tli,
     894            1 :             None,
     895            1 :             start_lsn,
     896            1 :             end_pos,
     897            1 :             end_watch,
     898            1 :             MAX_SEND_SIZE,
     899            1 :         );
     900            1 : 
     901            1 :         let shard_0 = ShardIdentity::new(
     902            1 :             ShardNumber(0),
     903            1 :             ShardCount(SHARD_COUNT),
     904            1 :             ShardStripeSize::default(),
     905            1 :         )
     906            1 :         .unwrap();
     907            1 : 
     908            1 :         struct Sender {
     909            1 :             tx: Option<tokio::sync::mpsc::Sender<Batch>>,
     910            1 :             rx: tokio::sync::mpsc::Receiver<Batch>,
     911            1 :             shard: ShardIdentity,
     912            1 :             start_lsn: Lsn,
     913            1 :             received_next_record_lsns: Vec<Lsn>,
     914            1 :         }
     915            1 : 
     916            1 :         impl Sender {
     917            3 :             fn new(start_lsn: Lsn, shard: ShardIdentity) -> Self {
     918            3 :                 let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
     919            3 :                 Self {
     920            3 :                     tx: Some(tx),
     921            3 :                     rx,
     922            3 :                     shard,
     923            3 :                     start_lsn,
     924            3 :                     received_next_record_lsns: Vec::default(),
     925            3 :                 }
     926            3 :             }
     927            1 :         }
     928            1 : 
     929            1 :         assert!(next_record_lsns.len() > 7);
     930            1 :         let start_lsns = vec![
     931            1 :             next_record_lsns[5],
     932            1 :             next_record_lsns[1],
     933            1 :             next_record_lsns[3],
     934            1 :         ];
     935            1 :         let mut senders = start_lsns
     936            1 :             .into_iter()
     937            3 :             .map(|lsn| Sender::new(lsn, shard_0))
     938            1 :             .collect::<Vec<_>>();
     939            1 : 
     940            1 :         let first_sender = senders.first_mut().unwrap();
     941            1 :         let handle = InterpretedWalReader::spawn(
     942            1 :             streaming_wal_reader,
     943            1 :             first_sender.start_lsn,
     944            1 :             first_sender.tx.take().unwrap(),
     945            1 :             first_sender.shard,
     946            1 :             PG_VERSION,
     947            1 :             &Some("pageserver".to_string()),
     948            1 :         );
     949            1 : 
     950            2 :         for sender in senders.iter_mut().skip(1) {
     951            2 :             handle
     952            2 :                 .fanout(sender.shard, sender.tx.take().unwrap(), sender.start_lsn)
     953            2 :                 .unwrap();
     954            2 :         }
     955            1 : 
     956            3 :         for sender in senders.iter_mut() {
     957            1 :             loop {
     958           39 :                 let batch = sender.rx.recv().await.unwrap();
     959           39 :                 tracing::info!(
     960            1 :                     "Sender with start_lsn={} received batch ending at {} with {} records",
     961            0 :                     sender.start_lsn,
     962            0 :                     batch.wal_end_lsn,
     963            0 :                     batch.records.records.len()
     964            1 :                 );
     965            1 : 
     966          627 :                 for rec in batch.records.records {
     967          588 :                     sender.received_next_record_lsns.push(rec.next_record_lsn);
     968          588 :                 }
     969            1 : 
     970           39 :                 if batch.wal_end_lsn == batch.available_wal_end_lsn {
     971            3 :                     break;
     972           36 :                 }
     973            1 :             }
     974            1 :         }
     975            1 : 
     976            1 :         handle.abort();
     977            1 :         let mut done = false;
     978            2 :         for _ in 0..5 {
     979            2 :             if handle.current_position().is_none() {
     980            1 :                 done = true;
     981            1 :                 break;
     982            1 :             }
     983            1 :             tokio::time::sleep(Duration::from_millis(1)).await;
     984            1 :         }
     985            1 : 
     986            1 :         assert!(done);
     987            1 : 
     988            4 :         for sender in senders {
     989            3 :             tracing::info!(
     990            1 :                 "Validating records received by sender with start_lsn={}",
     991            1 :                 sender.start_lsn
     992            1 :             );
     993            1 : 
     994            3 :             assert!(sender.received_next_record_lsns.is_sorted());
     995            3 :             let expected = next_record_lsns
     996            3 :                 .iter()
     997          600 :                 .filter(|lsn| **lsn > sender.start_lsn)
     998            3 :                 .copied()
     999            3 :                 .collect::<Vec<_>>();
    1000            3 :             assert_eq!(sender.received_next_record_lsns, expected);
    1001            1 :         }
    1002            1 :     }
    1003              : 
    1004              :     #[tokio::test]
    1005            1 :     async fn test_batch_start_tracking_on_reset() {
    1006            1 :         // When the WAL stream is reset to an older LSN,
    1007            1 :         // the current batch start LSN should be invalidated.
    1008            1 :         // This test constructs such a scenario:
    1009            1 :         // 1. Shard 0 is reading somewhere ahead
    1010            1 :         // 2. Reader reads some WAL, but does not decode a full record (partial read)
    1011            1 :         // 3. Shard 1 attaches to the reader and resets it to an older LSN
    1012            1 :         // 4. Shard 1 should get the correct batch WAL start LSN
    1013            1 :         let _ = env_logger::builder().is_test(true).try_init();
    1014            1 : 
    1015            1 :         const SIZE: usize = 64 * 1024;
    1016            1 :         const MSG_COUNT: usize = 10;
    1017            1 :         const PG_VERSION: u32 = 17;
    1018            1 :         const SHARD_COUNT: u8 = 2;
    1019            1 :         const WAL_READER_BATCH_SIZE: usize = 8192;
    1020            1 : 
    1021            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
    1022            1 :         let env = Env::new(true).unwrap();
    1023            1 :         let mut next_record_lsns = Vec::default();
    1024            1 :         let tli = env
    1025            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
    1026            1 :             .await
    1027            1 :             .unwrap();
    1028            1 : 
    1029            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
    1030            1 :         let end_watch =
    1031            1 :             Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
    1032            1 :                 .await
    1033            1 :                 .unwrap();
    1034            1 : 
    1035            1 :         assert!(next_record_lsns.len() > 3);
    1036            1 :         let shard_0_start_lsn = next_record_lsns[3];
    1037            1 : 
    1038            1 :         let end_pos = end_watch.get();
    1039            1 : 
    1040            1 :         let streaming_wal_reader = StreamingWalReader::new(
    1041            1 :             resident_tli,
    1042            1 :             None,
    1043            1 :             shard_0_start_lsn,
    1044            1 :             end_pos,
    1045            1 :             end_watch,
    1046            1 :             WAL_READER_BATCH_SIZE,
    1047            1 :         );
    1048            1 : 
    1049            1 :         let shard_0 = ShardIdentity::new(
    1050            1 :             ShardNumber(0),
    1051            1 :             ShardCount(SHARD_COUNT),
    1052            1 :             ShardStripeSize::default(),
    1053            1 :         )
    1054            1 :         .unwrap();
    1055            1 : 
    1056            1 :         let shard_1 = ShardIdentity::new(
    1057            1 :             ShardNumber(1),
    1058            1 :             ShardCount(SHARD_COUNT),
    1059            1 :             ShardStripeSize::default(),
    1060            1 :         )
    1061            1 :         .unwrap();
    1062            1 : 
    1063            1 :         let mut shards = HashMap::new();
    1064            1 : 
    1065            3 :         for shard_number in 0..SHARD_COUNT {
    1066            2 :             let shard_id = ShardIdentity::new(
    1067            2 :                 ShardNumber(shard_number),
    1068            2 :                 ShardCount(SHARD_COUNT),
    1069            2 :                 ShardStripeSize::default(),
    1070            2 :             )
    1071            2 :             .unwrap();
    1072            2 :             let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
    1073            2 :             shards.insert(shard_id, (Some(tx), Some(rx)));
    1074            2 :         }
    1075            1 : 
    1076            1 :         let shard_0_tx = shards.get_mut(&shard_0).unwrap().0.take().unwrap();
    1077            1 : 
    1078            1 :         let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel();
    1079            1 : 
    1080            1 :         let reader = InterpretedWalReader::new(
    1081            1 :             streaming_wal_reader,
    1082            1 :             shard_0_start_lsn,
    1083            1 :             shard_0_tx,
    1084            1 :             shard_0,
    1085            1 :             PG_VERSION,
    1086            1 :             Some(shard_notification_rx),
    1087            1 :         );
    1088            1 : 
    1089            1 :         let reader_state = reader.state();
    1090            1 :         let mut reader_fut = std::pin::pin!(reader.run(shard_0_start_lsn, &None));
    1091            1 :         loop {
    1092         1380 :             let poll = futures::poll!(reader_fut.as_mut());
    1093         1380 :             assert!(poll.is_pending());
    1094            1 : 
    1095         1380 :             let guard = reader_state.read().unwrap();
    1096         1380 :             if guard.current_batch_wal_start().is_some() {
    1097            1 :                 break;
    1098         1379 :             }
    1099            1 :         }
    1100            1 : 
    1101            1 :         shard_notification_tx
    1102            1 :             .send(AttachShardNotification {
    1103            1 :                 shard_id: shard_1,
    1104            1 :                 sender: shards.get_mut(&shard_1).unwrap().0.take().unwrap(),
    1105            1 :                 start_pos: start_lsn,
    1106            1 :             })
    1107            1 :             .unwrap();
    1108            1 : 
    1109            1 :         let mut shard_1_rx = shards.get_mut(&shard_1).unwrap().1.take().unwrap();
    1110            1 :         loop {
    1111           66 :             let poll = futures::poll!(reader_fut.as_mut());
    1112           66 :             assert!(poll.is_pending());
    1113            1 : 
    1114           66 :             let try_recv_res = shard_1_rx.try_recv();
    1115           65 :             match try_recv_res {
    1116            1 :                 Ok(batch) => {
    1117            1 :                     assert_eq!(batch.records.raw_wal_start_lsn.unwrap(), start_lsn);
    1118            1 :                     break;
    1119            1 :                 }
    1120           65 :                 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
    1121            1 :                 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
    1122            1 :                     unreachable!();
    1123            1 :                 }
    1124            1 :             }
    1125            1 :         }
    1126            1 :     }
    1127              : }
        

Generated by: LCOV version 2.1-beta