LCOV - code coverage report
Current view: top level - safekeeper/src - send_interpreted_wal.rs (source / functions) Coverage Total Hit
Test: 45c9170b95180e9ecfad9a53e031030abf2a178c.info Lines: 77.9 % 615 479
Test Date: 2025-02-21 15:51:08 Functions: 62.5 % 40 25

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::fmt::Display;
       3              : use std::sync::Arc;
       4              : use std::time::Duration;
       5              : 
       6              : use anyhow::{anyhow, Context};
       7              : use futures::future::Either;
       8              : use futures::StreamExt;
       9              : use pageserver_api::shard::ShardIdentity;
      10              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
      11              : use postgres_ffi::waldecoder::WalDecodeError;
      12              : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
      13              : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
      14              : use tokio::io::{AsyncRead, AsyncWrite};
      15              : use tokio::sync::mpsc::error::SendError;
      16              : use tokio::task::JoinHandle;
      17              : use tokio::time::MissedTickBehavior;
      18              : use tracing::{error, info, info_span, Instrument};
      19              : use utils::critical;
      20              : use utils::lsn::Lsn;
      21              : use utils::postgres_client::Compression;
      22              : use utils::postgres_client::InterpretedFormat;
      23              : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
      24              : use wal_decoder::wire_format::ToWireFormat;
      25              : 
      26              : use crate::metrics::WAL_READERS;
      27              : use crate::send_wal::{EndWatchView, WalSenderGuard};
      28              : use crate::timeline::WalResidentTimeline;
      29              : use crate::wal_reader_stream::{StreamingWalReader, WalBytes};
      30              : 
      31              : /// Identifier used to differentiate between senders of the same
      32              : /// shard.
      33              : ///
      34              : /// In the steady state there's only one, but two pageservers may
      35              : /// temporarily have the same shard attached and attempt to ingest
      36              : /// WAL for it. See also [`ShardSenderId`].
      37              : #[derive(Hash, Eq, PartialEq, Copy, Clone)]
      38              : struct SenderId(u8);
      39              : 
      40              : impl SenderId {
      41            3 :     fn first() -> Self {
      42            3 :         SenderId(0)
      43            3 :     }
      44              : 
      45            2 :     fn next(&self) -> Self {
      46            2 :         SenderId(self.0.checked_add(1).expect("few senders"))
      47            2 :     }
      48              : }
      49              : 
      50              : #[derive(Hash, Eq, PartialEq)]
      51              : struct ShardSenderId {
      52              :     shard: ShardIdentity,
      53              :     sender_id: SenderId,
      54              : }
      55              : 
      56              : impl Display for ShardSenderId {
      57            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      58            0 :         write!(f, "{}{}", self.sender_id.0, self.shard.shard_slug())
      59            0 :     }
      60              : }
      61              : 
      62              : impl ShardSenderId {
      63          865 :     fn new(shard: ShardIdentity, sender_id: SenderId) -> Self {
      64          865 :         ShardSenderId { shard, sender_id }
      65          865 :     }
      66              : 
      67            0 :     fn shard(&self) -> ShardIdentity {
      68            0 :         self.shard
      69            0 :     }
      70              : }
      71              : 
      72              : /// Shard-aware fan-out interpreted record reader.
      73              : /// Reads WAL from disk, decodes it, intepretets it, and sends
      74              : /// it to any [`InterpretedWalSender`] connected to it.
      75              : /// Each [`InterpretedWalSender`] corresponds to one shard
      76              : /// and gets interpreted records concerning that shard only.
      77              : pub(crate) struct InterpretedWalReader {
      78              :     wal_stream: StreamingWalReader,
      79              :     shard_senders: HashMap<ShardIdentity, smallvec::SmallVec<[ShardSenderState; 1]>>,
      80              :     shard_notification_rx: Option<tokio::sync::mpsc::UnboundedReceiver<AttachShardNotification>>,
      81              :     state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
      82              :     pg_version: u32,
      83              : }
      84              : 
      85              : /// A handle for [`InterpretedWalReader`] which allows for interacting with it
      86              : /// when it runs as a separate tokio task.
      87              : #[derive(Debug)]
      88              : pub(crate) struct InterpretedWalReaderHandle {
      89              :     join_handle: JoinHandle<Result<(), InterpretedWalReaderError>>,
      90              :     state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
      91              :     shard_notification_tx: tokio::sync::mpsc::UnboundedSender<AttachShardNotification>,
      92              : }
      93              : 
      94              : struct ShardSenderState {
      95              :     sender_id: SenderId,
      96              :     tx: tokio::sync::mpsc::Sender<Batch>,
      97              :     next_record_lsn: Lsn,
      98              : }
      99              : 
     100              : /// State of [`InterpretedWalReader`] visible outside of the task running it.
     101              : #[derive(Debug)]
     102              : pub(crate) enum InterpretedWalReaderState {
     103              :     Running { current_position: Lsn },
     104              :     Done,
     105              : }
     106              : 
     107              : pub(crate) struct Batch {
     108              :     wal_end_lsn: Lsn,
     109              :     available_wal_end_lsn: Lsn,
     110              :     records: InterpretedWalRecords,
     111              : }
     112              : 
     113              : #[derive(thiserror::Error, Debug)]
     114              : pub enum InterpretedWalReaderError {
     115              :     /// Handler initiates the end of streaming.
     116              :     #[error("decode error: {0}")]
     117              :     Decode(#[from] WalDecodeError),
     118              :     #[error("read or interpret error: {0}")]
     119              :     ReadOrInterpret(#[from] anyhow::Error),
     120              :     #[error("wal stream closed")]
     121              :     WalStreamClosed,
     122              : }
     123              : 
     124              : enum CurrentPositionUpdate {
     125              :     Reset(Lsn),
     126              :     NotReset(Lsn),
     127              : }
     128              : 
     129              : impl CurrentPositionUpdate {
     130            0 :     fn current_position(&self) -> Lsn {
     131            0 :         match self {
     132            0 :             CurrentPositionUpdate::Reset(lsn) => *lsn,
     133            0 :             CurrentPositionUpdate::NotReset(lsn) => *lsn,
     134              :         }
     135            0 :     }
     136              : }
     137              : 
     138              : impl InterpretedWalReaderState {
     139            4 :     fn current_position(&self) -> Option<Lsn> {
     140            4 :         match self {
     141              :             InterpretedWalReaderState::Running {
     142            2 :                 current_position, ..
     143            2 :             } => Some(*current_position),
     144            2 :             InterpretedWalReaderState::Done => None,
     145              :         }
     146            4 :     }
     147              : 
     148              :     // Reset the current position of the WAL reader if the requested starting position
     149              :     // of the new shard is smaller than the current value.
     150            3 :     fn maybe_reset(&mut self, new_shard_start_pos: Lsn) -> CurrentPositionUpdate {
     151            3 :         match self {
     152              :             InterpretedWalReaderState::Running {
     153            3 :                 current_position, ..
     154            3 :             } => {
     155            3 :                 if new_shard_start_pos < *current_position {
     156            2 :                     *current_position = new_shard_start_pos;
     157            2 :                     CurrentPositionUpdate::Reset(*current_position)
     158              :                 } else {
     159            1 :                     CurrentPositionUpdate::NotReset(*current_position)
     160              :                 }
     161              :             }
     162              :             InterpretedWalReaderState::Done => {
     163            0 :                 panic!("maybe_reset called on finished reader")
     164              :             }
     165              :         }
     166            3 :     }
     167              : }
     168              : 
     169              : pub(crate) struct AttachShardNotification {
     170              :     shard_id: ShardIdentity,
     171              :     sender: tokio::sync::mpsc::Sender<Batch>,
     172              :     start_pos: Lsn,
     173              : }
     174              : 
     175              : impl InterpretedWalReader {
     176              :     /// Spawn the reader in a separate tokio task and return a handle
     177            2 :     pub(crate) fn spawn(
     178            2 :         wal_stream: StreamingWalReader,
     179            2 :         start_pos: Lsn,
     180            2 :         tx: tokio::sync::mpsc::Sender<Batch>,
     181            2 :         shard: ShardIdentity,
     182            2 :         pg_version: u32,
     183            2 :         appname: &Option<String>,
     184            2 :     ) -> InterpretedWalReaderHandle {
     185            2 :         let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
     186            2 :             current_position: start_pos,
     187            2 :         }));
     188            2 : 
     189            2 :         let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel();
     190              : 
     191            2 :         let reader = InterpretedWalReader {
     192            2 :             wal_stream,
     193            2 :             shard_senders: HashMap::from([(
     194            2 :                 shard,
     195            2 :                 smallvec::smallvec![ShardSenderState {
     196            0 :                     sender_id: SenderId::first(),
     197            0 :                     tx,
     198            0 :                     next_record_lsn: start_pos,
     199            0 :                 }],
     200              :             )]),
     201            2 :             shard_notification_rx: Some(shard_notification_rx),
     202            2 :             state: state.clone(),
     203            2 :             pg_version,
     204            2 :         };
     205            2 : 
     206            2 :         let metric = WAL_READERS
     207            2 :             .get_metric_with_label_values(&["task", appname.as_deref().unwrap_or("safekeeper")])
     208            2 :             .unwrap();
     209              : 
     210            2 :         let join_handle = tokio::task::spawn(
     211            2 :             async move {
     212            2 :                 metric.inc();
     213            2 :                 scopeguard::defer! {
     214            2 :                     metric.dec();
     215            2 :                 }
     216            2 : 
     217            2 :                 reader
     218            2 :                     .run_impl(start_pos)
     219            2 :                     .await
     220            0 :                     .inspect_err(|err| critical!("failed to read WAL record: {err:?}"))
     221            0 :             }
     222            2 :             .instrument(info_span!("interpreted wal reader")),
     223              :         );
     224              : 
     225            2 :         InterpretedWalReaderHandle {
     226            2 :             join_handle,
     227            2 :             state,
     228            2 :             shard_notification_tx,
     229            2 :         }
     230            2 :     }
     231              : 
     232              :     /// Construct the reader without spawning anything
     233              :     /// Callers should drive the future returned by [`Self::run`].
     234            0 :     pub(crate) fn new(
     235            0 :         wal_stream: StreamingWalReader,
     236            0 :         start_pos: Lsn,
     237            0 :         tx: tokio::sync::mpsc::Sender<Batch>,
     238            0 :         shard: ShardIdentity,
     239            0 :         pg_version: u32,
     240            0 :     ) -> InterpretedWalReader {
     241            0 :         let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
     242            0 :             current_position: start_pos,
     243            0 :         }));
     244            0 : 
     245            0 :         InterpretedWalReader {
     246            0 :             wal_stream,
     247            0 :             shard_senders: HashMap::from([(
     248            0 :                 shard,
     249            0 :                 smallvec::smallvec![ShardSenderState {
     250            0 :                     sender_id: SenderId::first(),
     251            0 :                     tx,
     252            0 :                     next_record_lsn: start_pos,
     253            0 :                 }],
     254              :             )]),
     255            0 :             shard_notification_rx: None,
     256            0 :             state: state.clone(),
     257            0 :             pg_version,
     258            0 :         }
     259            0 :     }
     260              : 
     261              :     /// Entry point for future (polling) based wal reader.
     262            0 :     pub(crate) async fn run(
     263            0 :         self,
     264            0 :         start_pos: Lsn,
     265            0 :         appname: &Option<String>,
     266            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     267            0 :         let metric = WAL_READERS
     268            0 :             .get_metric_with_label_values(&["future", appname.as_deref().unwrap_or("safekeeper")])
     269            0 :             .unwrap();
     270            0 : 
     271            0 :         metric.inc();
     272            0 :         scopeguard::defer! {
     273            0 :             metric.dec();
     274            0 :         }
     275              : 
     276            0 :         if let Err(err) = self.run_impl(start_pos).await {
     277            0 :             critical!("failed to read WAL record: {err:?}");
     278              :         } else {
     279            0 :             info!("interpreted wal reader exiting");
     280              :         }
     281              : 
     282            0 :         Err(CopyStreamHandlerEnd::Other(anyhow!(
     283            0 :             "interpreted wal reader finished"
     284            0 :         )))
     285            0 :     }
     286              : 
     287              :     /// Send interpreted WAL to one or more [`InterpretedWalSender`]s
     288              :     /// Stops when an error is encountered or when the [`InterpretedWalReaderHandle`]
     289              :     /// goes out of scope.
     290            2 :     async fn run_impl(mut self, start_pos: Lsn) -> Result<(), InterpretedWalReaderError> {
     291            2 :         let defer_state = self.state.clone();
     292            2 :         scopeguard::defer! {
     293            2 :             *defer_state.write().unwrap() = InterpretedWalReaderState::Done;
     294            2 :         }
     295            2 : 
     296            2 :         let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
     297            2 : 
     298            2 :         // Tracks the start of the PG WAL LSN from which the current batch of
     299            2 :         // interpreted records originated.
     300            2 :         let mut current_batch_wal_start_lsn: Option<Lsn> = None;
     301              : 
     302              :         loop {
     303           44 :             tokio::select! {
     304              :                 // Main branch for reading WAL and forwarding it
     305           44 :                 wal_or_reset = self.wal_stream.next() => {
     306           39 :                     let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
     307              :                     let WalBytes {
     308           39 :                         wal,
     309           39 :                         wal_start_lsn,
     310           39 :                         wal_end_lsn,
     311           39 :                         available_wal_end_lsn,
     312           39 :                     } = match wal {
     313           39 :                         Some(some) => some.map_err(InterpretedWalReaderError::ReadOrInterpret)?,
     314              :                         None => {
     315              :                             // [`StreamingWalReader::next`] is an endless stream of WAL.
     316              :                             // It shouldn't ever finish unless it panicked or became internally
     317              :                             // inconsistent.
     318            0 :                             return Result::Err(InterpretedWalReaderError::WalStreamClosed);
     319              :                         }
     320              :                     };
     321              : 
     322              :                     // We will already have a value if the previous chunks of WAL
     323              :                     // did not decode into anything useful.
     324           39 :                     if current_batch_wal_start_lsn.is_none() {
     325           39 :                         current_batch_wal_start_lsn = Some(wal_start_lsn);
     326           39 :                     }
     327              : 
     328           39 :                     wal_decoder.feed_bytes(&wal);
     329           39 : 
     330           39 :                     // Deserialize and interpret WAL records from this batch of WAL.
     331           39 :                     // Interpreted records for each shard are collected separately.
     332           39 :                     let shard_ids = self.shard_senders.keys().copied().collect::<Vec<_>>();
     333           39 :                     let mut records_by_sender: HashMap<ShardSenderId, Vec<InterpretedWalRecord>> = HashMap::new();
     334           39 :                     let mut max_next_record_lsn = None;
     335          635 :                     while let Some((next_record_lsn, recdata)) = wal_decoder.poll_decode()?
     336              :                     {
     337          596 :                         assert!(next_record_lsn.is_aligned());
     338          596 :                         max_next_record_lsn = Some(next_record_lsn);
     339              : 
     340          596 :                         let interpreted = InterpretedWalRecord::from_bytes_filtered(
     341          596 :                             recdata,
     342          596 :                             &shard_ids,
     343          596 :                             next_record_lsn,
     344          596 :                             self.pg_version,
     345          596 :                         )
     346          596 :                         .with_context(|| "Failed to interpret WAL")?;
     347              : 
     348         1391 :                         for (shard, record) in interpreted {
     349          795 :                             if record.is_empty() {
     350          199 :                                 continue;
     351          596 :                             }
     352          596 : 
     353          596 :                             let mut states_iter = self.shard_senders
     354          596 :                                 .get(&shard)
     355          596 :                                 .expect("keys collected above")
     356          596 :                                 .iter()
     357          992 :                                 .filter(|state| record.next_record_lsn > state.next_record_lsn)
     358          596 :                                 .peekable();
     359          986 :                             while let Some(state) = states_iter.next() {
     360          787 :                                 let shard_sender_id = ShardSenderId::new(shard, state.sender_id);
     361          787 : 
     362          787 :                                 // The most commont case is one sender per shard. Peek and break to avoid the
     363          787 :                                 // clone in that situation.
     364          787 :                                 if states_iter.peek().is_none() {
     365          397 :                                     records_by_sender.entry(shard_sender_id).or_default().push(record);
     366          397 :                                     break;
     367          390 :                                 } else {
     368          390 :                                     records_by_sender.entry(shard_sender_id).or_default().push(record.clone());
     369          390 :                                 }
     370              :                             }
     371              :                         }
     372              :                     }
     373              : 
     374           39 :                     let max_next_record_lsn = match max_next_record_lsn {
     375           39 :                         Some(lsn) => lsn,
     376              :                         None => {
     377            0 :                             continue;
     378              :                         }
     379              :                     };
     380              : 
     381              :                     // Update the current position such that new receivers can decide
     382              :                     // whether to attach to us or spawn a new WAL reader.
     383           39 :                     match &mut *self.state.write().unwrap() {
     384           39 :                         InterpretedWalReaderState::Running { current_position, .. } => {
     385           39 :                             *current_position = max_next_record_lsn;
     386           39 :                         },
     387              :                         InterpretedWalReaderState::Done => {
     388            0 :                             unreachable!()
     389              :                         }
     390              :                     }
     391              : 
     392           39 :                     let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap();
     393           39 : 
     394           39 :                     // Send interpreted records downstream. Anything that has already been seen
     395           39 :                     // by a shard is filtered out.
     396           39 :                     let mut shard_senders_to_remove = Vec::new();
     397           91 :                     for (shard, states) in &mut self.shard_senders {
     398          130 :                         for state in states {
     399           78 :                             let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
     400              : 
     401           78 :                             let batch = if max_next_record_lsn > state.next_record_lsn {
     402              :                                 // This batch contains at least one record that this shard has not
     403              :                                 // seen yet.
     404           65 :                                 let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
     405           65 : 
     406           65 :                                 InterpretedWalRecords {
     407           65 :                                     records,
     408           65 :                                     next_record_lsn: max_next_record_lsn,
     409           65 :                                     raw_wal_start_lsn: Some(batch_wal_start_lsn),
     410           65 :                                 }
     411           13 :                             } else if wal_end_lsn > state.next_record_lsn {
     412              :                                 // All the records in this batch were seen by the shard
     413              :                                 // However, the batch maps to a chunk of WAL that the
     414              :                                 // shard has not yet seen. Notify it of the start LSN
     415              :                                 // of the PG WAL chunk such that it doesn't look like a gap.
     416            0 :                                 InterpretedWalRecords {
     417            0 :                                     records: Vec::default(),
     418            0 :                                     next_record_lsn: state.next_record_lsn,
     419            0 :                                     raw_wal_start_lsn: Some(batch_wal_start_lsn),
     420            0 :                                 }
     421              :                             } else {
     422              :                                 // The shard has seen this chunk of WAL before. Skip it.
     423           13 :                                 continue;
     424              :                             };
     425              : 
     426           65 :                             let res = state.tx.send(Batch {
     427           65 :                                 wal_end_lsn,
     428           65 :                                 available_wal_end_lsn,
     429           65 :                                 records: batch,
     430           65 :                             }).await;
     431              : 
     432           65 :                             if res.is_err() {
     433            0 :                                 shard_senders_to_remove.push(shard_sender_id);
     434           65 :                             } else {
     435           65 :                                 state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn);
     436           65 :                             }
     437              :                         }
     438              :                     }
     439              : 
     440              :                     // Clean up any shard senders that have dropped out.
     441              :                     // This is inefficient, but such events are rare (connection to PS termination)
     442              :                     // and the number of subscriptions on the same shards very small (only one
     443              :                     // for the steady state).
     444           39 :                     for to_remove in shard_senders_to_remove {
     445            0 :                         let shard_senders = self.shard_senders.get_mut(&to_remove.shard()).expect("saw it above");
     446            0 :                         if let Some(idx) = shard_senders.iter().position(|s| s.sender_id == to_remove.sender_id) {
     447            0 :                             shard_senders.remove(idx);
     448            0 :                             tracing::info!("Removed shard sender {}", to_remove);
     449            0 :                         }
     450              : 
     451            0 :                         if shard_senders.is_empty() {
     452            0 :                             self.shard_senders.remove(&to_remove.shard());
     453            0 :                         }
     454              :                     }
     455              :                 },
     456              :                 // Listen for new shards that want to attach to this reader.
     457              :                 // If the reader is not running as a task, then this is not supported
     458              :                 // (see the pending branch below).
     459           44 :                 notification = match self.shard_notification_rx.as_mut() {
     460           44 :                         Some(rx) => Either::Left(rx.recv()),
     461            0 :                         None => Either::Right(std::future::pending())
     462              :                     } => {
     463            3 :                     if let Some(n) = notification {
     464            3 :                         let AttachShardNotification { shard_id, sender, start_pos } = n;
     465            3 : 
     466            3 :                         // Update internal and external state, then reset the WAL stream
     467            3 :                         // if required.
     468            3 :                         let senders = self.shard_senders.entry(shard_id).or_default();
     469            3 :                         let new_sender_id = match senders.last() {
     470            2 :                             Some(sender) => sender.sender_id.next(),
     471            1 :                             None => SenderId::first()
     472              :                         };
     473              : 
     474            3 :                         senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
     475            3 : 
     476            3 :                         // If the shard is subscribing below the current position the we need
     477            3 :                         // to update the cursor that tracks where we are at in the WAL
     478            3 :                         // ([`Self::state`]) and reset the WAL stream itself
     479            3 :                         // (`[Self::wal_stream`]). This must be done atomically from the POV of
     480            3 :                         // anything outside the select statement.
     481            3 :                         let position_reset = self.state.write().unwrap().maybe_reset(start_pos);
     482            3 :                         match position_reset {
     483            2 :                             CurrentPositionUpdate::Reset(to) => {
     484            2 :                                 self.wal_stream.reset(to).await;
     485            2 :                                 wal_decoder = WalStreamDecoder::new(to, self.pg_version);
     486              :                             },
     487            1 :                             CurrentPositionUpdate::NotReset(_) => {}
     488              :                         };
     489              : 
     490            3 :                         tracing::info!(
     491            0 :                             "Added shard sender {} with start_pos={} current_pos={}",
     492            0 :                             ShardSenderId::new(shard_id, new_sender_id), start_pos, position_reset.current_position()
     493              :                         );
     494            0 :                     }
     495              :                 }
     496              :             }
     497              :         }
     498            0 :     }
     499              : }
     500              : 
     501              : impl InterpretedWalReaderHandle {
     502              :     /// Fan-out the reader by attaching a new shard to it
     503            3 :     pub(crate) fn fanout(
     504            3 :         &self,
     505            3 :         shard_id: ShardIdentity,
     506            3 :         sender: tokio::sync::mpsc::Sender<Batch>,
     507            3 :         start_pos: Lsn,
     508            3 :     ) -> Result<(), SendError<AttachShardNotification>> {
     509            3 :         self.shard_notification_tx.send(AttachShardNotification {
     510            3 :             shard_id,
     511            3 :             sender,
     512            3 :             start_pos,
     513            3 :         })
     514            3 :     }
     515              : 
     516              :     /// Get the current WAL position of the reader
     517            4 :     pub(crate) fn current_position(&self) -> Option<Lsn> {
     518            4 :         self.state.read().unwrap().current_position()
     519            4 :     }
     520              : 
     521            4 :     pub(crate) fn abort(&self) {
     522            4 :         self.join_handle.abort()
     523            4 :     }
     524              : }
     525              : 
     526              : impl Drop for InterpretedWalReaderHandle {
     527            2 :     fn drop(&mut self) {
     528            2 :         tracing::info!("Aborting interpreted wal reader");
     529            2 :         self.abort()
     530            2 :     }
     531              : }
     532              : 
     533              : pub(crate) struct InterpretedWalSender<'a, IO> {
     534              :     pub(crate) format: InterpretedFormat,
     535              :     pub(crate) compression: Option<Compression>,
     536              :     pub(crate) appname: Option<String>,
     537              : 
     538              :     pub(crate) tli: WalResidentTimeline,
     539              :     pub(crate) start_lsn: Lsn,
     540              : 
     541              :     pub(crate) pgb: &'a mut PostgresBackend<IO>,
     542              :     pub(crate) end_watch_view: EndWatchView,
     543              :     pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
     544              :     pub(crate) rx: tokio::sync::mpsc::Receiver<Batch>,
     545              : }
     546              : 
     547              : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
     548              :     /// Send interpreted WAL records over the network.
     549              :     /// Also manages keep-alives if nothing was sent for a while.
     550            0 :     pub(crate) async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
     551            0 :         let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
     552            0 :         keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
     553            0 :         keepalive_ticker.reset();
     554            0 : 
     555            0 :         let mut wal_position = self.start_lsn;
     556              : 
     557              :         loop {
     558            0 :             tokio::select! {
     559            0 :                 batch = self.rx.recv() => {
     560            0 :                     let batch = match batch {
     561            0 :                         Some(b) => b,
     562              :                         None => {
     563            0 :                             return Result::Err(
     564            0 :                                 CopyStreamHandlerEnd::Other(anyhow!("Interpreted WAL reader exited early"))
     565            0 :                             );
     566              :                         }
     567              :                     };
     568              : 
     569            0 :                     wal_position = batch.wal_end_lsn;
     570              : 
     571            0 :                     let buf = batch
     572            0 :                         .records
     573            0 :                         .to_wire(self.format, self.compression)
     574            0 :                         .await
     575            0 :                         .with_context(|| "Failed to serialize interpreted WAL")
     576            0 :                         .map_err(CopyStreamHandlerEnd::from)?;
     577              : 
     578              :                     // Reset the keep alive ticker since we are sending something
     579              :                     // over the wire now.
     580            0 :                     keepalive_ticker.reset();
     581            0 : 
     582            0 :                     self.pgb
     583            0 :                         .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
     584            0 :                             streaming_lsn: batch.wal_end_lsn.0,
     585            0 :                             commit_lsn: batch.available_wal_end_lsn.0,
     586            0 :                             data: &buf,
     587            0 :                         })).await?;
     588              :                 }
     589              :                 // Send a periodic keep alive when the connection has been idle for a while.
     590              :                 // Since we've been idle, also check if we can stop streaming.
     591            0 :                 _ = keepalive_ticker.tick() => {
     592            0 :                     if let Some(remote_consistent_lsn) = self.wal_sender_guard
     593            0 :                         .walsenders()
     594            0 :                         .get_ws_remote_consistent_lsn(self.wal_sender_guard.id())
     595              :                     {
     596            0 :                         if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     597              :                             // Stop streaming if the receivers are caught up and
     598              :                             // there's no active compute. This causes the loop in
     599              :                             // [`crate::send_interpreted_wal::InterpretedWalSender::run`]
     600              :                             // to exit and terminate the WAL stream.
     601            0 :                             break;
     602            0 :                         }
     603            0 :                     }
     604              : 
     605            0 :                     self.pgb
     606            0 :                         .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     607            0 :                             wal_end: self.end_watch_view.get().0,
     608            0 :                             timestamp: get_current_timestamp(),
     609            0 :                             request_reply: true,
     610            0 :                         }))
     611            0 :                         .await?;
     612              :                 },
     613              :             }
     614              :         }
     615              : 
     616            0 :         Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     617            0 :             "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     618            0 :             self.appname, wal_position,
     619            0 :         )))
     620            0 :     }
     621              : }
     622              : #[cfg(test)]
     623              : mod tests {
     624              :     use std::{collections::HashMap, str::FromStr, time::Duration};
     625              : 
     626              :     use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
     627              :     use postgres_ffi::MAX_SEND_SIZE;
     628              :     use tokio::sync::mpsc::error::TryRecvError;
     629              :     use utils::{
     630              :         id::{NodeId, TenantTimelineId},
     631              :         lsn::Lsn,
     632              :         shard::{ShardCount, ShardNumber},
     633              :     };
     634              : 
     635              :     use crate::{
     636              :         send_interpreted_wal::{Batch, InterpretedWalReader},
     637              :         test_utils::Env,
     638              :         wal_reader_stream::StreamingWalReader,
     639              :     };
     640              : 
     641              :     #[tokio::test]
     642            1 :     async fn test_interpreted_wal_reader_fanout() {
     643            1 :         let _ = env_logger::builder().is_test(true).try_init();
     644            1 : 
     645            1 :         const SIZE: usize = 8 * 1024;
     646            1 :         const MSG_COUNT: usize = 200;
     647            1 :         const PG_VERSION: u32 = 17;
     648            1 :         const SHARD_COUNT: u8 = 2;
     649            1 : 
     650            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
     651            1 :         let env = Env::new(true).unwrap();
     652            1 :         let tli = env
     653            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
     654            1 :             .await
     655            1 :             .unwrap();
     656            1 : 
     657            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
     658            1 :         let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
     659            1 :             .await
     660            1 :             .unwrap();
     661            1 :         let end_pos = end_watch.get();
     662            1 : 
     663            1 :         tracing::info!("Doing first round of reads ...");
     664            1 : 
     665            1 :         let streaming_wal_reader = StreamingWalReader::new(
     666            1 :             resident_tli,
     667            1 :             None,
     668            1 :             start_lsn,
     669            1 :             end_pos,
     670            1 :             end_watch,
     671            1 :             MAX_SEND_SIZE,
     672            1 :         );
     673            1 : 
     674            1 :         let shard_0 = ShardIdentity::new(
     675            1 :             ShardNumber(0),
     676            1 :             ShardCount(SHARD_COUNT),
     677            1 :             ShardStripeSize::default(),
     678            1 :         )
     679            1 :         .unwrap();
     680            1 : 
     681            1 :         let shard_1 = ShardIdentity::new(
     682            1 :             ShardNumber(1),
     683            1 :             ShardCount(SHARD_COUNT),
     684            1 :             ShardStripeSize::default(),
     685            1 :         )
     686            1 :         .unwrap();
     687            1 : 
     688            1 :         let mut shards = HashMap::new();
     689            1 : 
     690            3 :         for shard_number in 0..SHARD_COUNT {
     691            2 :             let shard_id = ShardIdentity::new(
     692            2 :                 ShardNumber(shard_number),
     693            2 :                 ShardCount(SHARD_COUNT),
     694            2 :                 ShardStripeSize::default(),
     695            2 :             )
     696            2 :             .unwrap();
     697            2 :             let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
     698            2 :             shards.insert(shard_id, (Some(tx), Some(rx)));
     699            2 :         }
     700            1 : 
     701            1 :         let shard_0_tx = shards.get_mut(&shard_0).unwrap().0.take().unwrap();
     702            1 :         let mut shard_0_rx = shards.get_mut(&shard_0).unwrap().1.take().unwrap();
     703            1 : 
     704            1 :         let handle = InterpretedWalReader::spawn(
     705            1 :             streaming_wal_reader,
     706            1 :             start_lsn,
     707            1 :             shard_0_tx,
     708            1 :             shard_0,
     709            1 :             PG_VERSION,
     710            1 :             &Some("pageserver".to_string()),
     711            1 :         );
     712            1 : 
     713            1 :         tracing::info!("Reading all WAL with only shard 0 attached ...");
     714            1 : 
     715            1 :         let mut shard_0_interpreted_records = Vec::new();
     716           13 :         while let Some(batch) = shard_0_rx.recv().await {
     717           13 :             shard_0_interpreted_records.push(batch.records);
     718           13 :             if batch.wal_end_lsn == batch.available_wal_end_lsn {
     719            1 :                 break;
     720           12 :             }
     721            1 :         }
     722            1 : 
     723            1 :         let shard_1_tx = shards.get_mut(&shard_1).unwrap().0.take().unwrap();
     724            1 :         let mut shard_1_rx = shards.get_mut(&shard_1).unwrap().1.take().unwrap();
     725            1 : 
     726            1 :         tracing::info!("Attaching shard 1 to the reader at start of WAL");
     727            1 :         handle.fanout(shard_1, shard_1_tx, start_lsn).unwrap();
     728            1 : 
     729            1 :         tracing::info!("Reading all WAL with shard 0 and shard 1 attached ...");
     730            1 : 
     731            1 :         let mut shard_1_interpreted_records = Vec::new();
     732           13 :         while let Some(batch) = shard_1_rx.recv().await {
     733           13 :             shard_1_interpreted_records.push(batch.records);
     734           13 :             if batch.wal_end_lsn == batch.available_wal_end_lsn {
     735            1 :                 break;
     736           12 :             }
     737            1 :         }
     738            1 : 
     739            1 :         // This test uses logical messages. Those only go to shard 0. Check that the
     740            1 :         // filtering worked and shard 1 did not get any.
     741            1 :         assert!(shard_1_interpreted_records
     742            1 :             .iter()
     743           13 :             .all(|recs| recs.records.is_empty()));
     744            1 : 
     745            1 :         // Shard 0 should not receive anything more since the reader is
     746            1 :         // going through wal that it has already processed.
     747            1 :         let res = shard_0_rx.try_recv();
     748            1 :         if let Ok(ref ok) = res {
     749            1 :             tracing::error!(
     750            1 :                 "Shard 0 received batch: wal_end_lsn={} available_wal_end_lsn={}",
     751            1 :                 ok.wal_end_lsn,
     752            1 :                 ok.available_wal_end_lsn
     753            1 :             );
     754            1 :         }
     755            1 :         assert!(matches!(res, Err(TryRecvError::Empty)));
     756            1 : 
     757            1 :         // Check that the next records lsns received by the two shards match up.
     758            1 :         let shard_0_next_lsns = shard_0_interpreted_records
     759            1 :             .iter()
     760           13 :             .map(|recs| recs.next_record_lsn)
     761            1 :             .collect::<Vec<_>>();
     762            1 :         let shard_1_next_lsns = shard_1_interpreted_records
     763            1 :             .iter()
     764           13 :             .map(|recs| recs.next_record_lsn)
     765            1 :             .collect::<Vec<_>>();
     766            1 :         assert_eq!(shard_0_next_lsns, shard_1_next_lsns);
     767            1 : 
     768            1 :         handle.abort();
     769            1 :         let mut done = false;
     770            2 :         for _ in 0..5 {
     771            2 :             if handle.current_position().is_none() {
     772            1 :                 done = true;
     773            1 :                 break;
     774            1 :             }
     775            1 :             tokio::time::sleep(Duration::from_millis(1)).await;
     776            1 :         }
     777            1 : 
     778            1 :         assert!(done);
     779            1 :     }
     780              : 
     781              :     #[tokio::test]
     782            1 :     async fn test_interpreted_wal_reader_same_shard_fanout() {
     783            1 :         let _ = env_logger::builder().is_test(true).try_init();
     784            1 : 
     785            1 :         const SIZE: usize = 8 * 1024;
     786            1 :         const MSG_COUNT: usize = 200;
     787            1 :         const PG_VERSION: u32 = 17;
     788            1 :         const SHARD_COUNT: u8 = 2;
     789            1 : 
     790            1 :         let start_lsn = Lsn::from_str("0/149FD18").unwrap();
     791            1 :         let env = Env::new(true).unwrap();
     792            1 :         let tli = env
     793            1 :             .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
     794            1 :             .await
     795            1 :             .unwrap();
     796            1 : 
     797            1 :         let resident_tli = tli.wal_residence_guard().await.unwrap();
     798            1 :         let mut next_record_lsns = Vec::default();
     799            1 :         let end_watch =
     800            1 :             Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
     801            1 :                 .await
     802            1 :                 .unwrap();
     803            1 :         let end_pos = end_watch.get();
     804            1 : 
     805            1 :         let streaming_wal_reader = StreamingWalReader::new(
     806            1 :             resident_tli,
     807            1 :             None,
     808            1 :             start_lsn,
     809            1 :             end_pos,
     810            1 :             end_watch,
     811            1 :             MAX_SEND_SIZE,
     812            1 :         );
     813            1 : 
     814            1 :         let shard_0 = ShardIdentity::new(
     815            1 :             ShardNumber(0),
     816            1 :             ShardCount(SHARD_COUNT),
     817            1 :             ShardStripeSize::default(),
     818            1 :         )
     819            1 :         .unwrap();
     820            1 : 
     821            1 :         struct Sender {
     822            1 :             tx: Option<tokio::sync::mpsc::Sender<Batch>>,
     823            1 :             rx: tokio::sync::mpsc::Receiver<Batch>,
     824            1 :             shard: ShardIdentity,
     825            1 :             start_lsn: Lsn,
     826            1 :             received_next_record_lsns: Vec<Lsn>,
     827            1 :         }
     828            1 : 
     829            1 :         impl Sender {
     830            3 :             fn new(start_lsn: Lsn, shard: ShardIdentity) -> Self {
     831            3 :                 let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
     832            3 :                 Self {
     833            3 :                     tx: Some(tx),
     834            3 :                     rx,
     835            3 :                     shard,
     836            3 :                     start_lsn,
     837            3 :                     received_next_record_lsns: Vec::default(),
     838            3 :                 }
     839            3 :             }
     840            1 :         }
     841            1 : 
     842            1 :         assert!(next_record_lsns.len() > 7);
     843            1 :         let start_lsns = vec![
     844            1 :             next_record_lsns[5],
     845            1 :             next_record_lsns[1],
     846            1 :             next_record_lsns[3],
     847            1 :         ];
     848            1 :         let mut senders = start_lsns
     849            1 :             .into_iter()
     850            3 :             .map(|lsn| Sender::new(lsn, shard_0))
     851            1 :             .collect::<Vec<_>>();
     852            1 : 
     853            1 :         let first_sender = senders.first_mut().unwrap();
     854            1 :         let handle = InterpretedWalReader::spawn(
     855            1 :             streaming_wal_reader,
     856            1 :             first_sender.start_lsn,
     857            1 :             first_sender.tx.take().unwrap(),
     858            1 :             first_sender.shard,
     859            1 :             PG_VERSION,
     860            1 :             &Some("pageserver".to_string()),
     861            1 :         );
     862            1 : 
     863            2 :         for sender in senders.iter_mut().skip(1) {
     864            2 :             handle
     865            2 :                 .fanout(sender.shard, sender.tx.take().unwrap(), sender.start_lsn)
     866            2 :                 .unwrap();
     867            2 :         }
     868            1 : 
     869            3 :         for sender in senders.iter_mut() {
     870            1 :             loop {
     871           39 :                 let batch = sender.rx.recv().await.unwrap();
     872           39 :                 tracing::info!(
     873            1 :                     "Sender with start_lsn={} received batch ending at {} with {} records",
     874            0 :                     sender.start_lsn,
     875            0 :                     batch.wal_end_lsn,
     876            0 :                     batch.records.records.len()
     877            1 :                 );
     878            1 : 
     879          627 :                 for rec in batch.records.records {
     880          588 :                     sender.received_next_record_lsns.push(rec.next_record_lsn);
     881          588 :                 }
     882            1 : 
     883           39 :                 if batch.wal_end_lsn == batch.available_wal_end_lsn {
     884            3 :                     break;
     885           36 :                 }
     886            1 :             }
     887            1 :         }
     888            1 : 
     889            1 :         handle.abort();
     890            1 :         let mut done = false;
     891            2 :         for _ in 0..5 {
     892            2 :             if handle.current_position().is_none() {
     893            1 :                 done = true;
     894            1 :                 break;
     895            1 :             }
     896            1 :             tokio::time::sleep(Duration::from_millis(1)).await;
     897            1 :         }
     898            1 : 
     899            1 :         assert!(done);
     900            1 : 
     901            4 :         for sender in senders {
     902            3 :             tracing::info!(
     903            1 :                 "Validating records received by sender with start_lsn={}",
     904            1 :                 sender.start_lsn
     905            1 :             );
     906            1 : 
     907            3 :             assert!(sender.received_next_record_lsns.is_sorted());
     908            3 :             let expected = next_record_lsns
     909            3 :                 .iter()
     910          600 :                 .filter(|lsn| **lsn > sender.start_lsn)
     911            3 :                 .copied()
     912            3 :                 .collect::<Vec<_>>();
     913            3 :             assert_eq!(sender.received_next_record_lsns, expected);
     914            1 :         }
     915            1 :     }
     916              : }
        

Generated by: LCOV version 2.1-beta