LCOV - code coverage report
Current view: top level - safekeeper/src - send_interpreted_wal.rs (source / functions) Coverage Total Hit
Test: 685df7483efdc579d44aa7093bca9796bb9d088e.info Lines: 0.0 % 61 0
Test Date: 2024-11-25 17:08:35 Functions: 0.0 % 10 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : 
       3              : use anyhow::Context;
       4              : use futures::StreamExt;
       5              : use pageserver_api::shard::ShardIdentity;
       6              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
       7              : use postgres_ffi::MAX_SEND_SIZE;
       8              : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
       9              : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
      10              : use tokio::io::{AsyncRead, AsyncWrite};
      11              : use tokio::time::MissedTickBehavior;
      12              : use utils::bin_ser::BeSer;
      13              : use utils::lsn::Lsn;
      14              : use wal_decoder::models::InterpretedWalRecord;
      15              : 
      16              : use crate::send_wal::EndWatchView;
      17              : use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
      18              : 
      19              : /// Shard-aware interpreted record sender.
      20              : /// This is used for sending WAL to the pageserver. Said WAL
      21              : /// is pre-interpreted and filtered for the shard.
      22              : pub(crate) struct InterpretedWalSender<'a, IO> {
      23              :     pub(crate) pgb: &'a mut PostgresBackend<IO>,
      24              :     pub(crate) wal_stream_builder: WalReaderStreamBuilder,
      25              :     pub(crate) end_watch_view: EndWatchView,
      26              :     pub(crate) shard: ShardIdentity,
      27              :     pub(crate) pg_version: u32,
      28              :     pub(crate) appname: Option<String>,
      29              : }
      30              : 
      31              : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
      32              :     /// Send interpreted WAL to a receiver.
      33              :     /// Stops when an error occurs or the receiver is caught up and there's no active compute.
      34              :     ///
      35              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
      36              :     /// convenience.
      37            0 :     pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
      38            0 :         let mut wal_position = self.wal_stream_builder.start_pos();
      39            0 :         let mut wal_decoder =
      40            0 :             WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
      41              : 
      42            0 :         let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
      43            0 :         let mut stream = std::pin::pin!(stream);
      44            0 : 
      45            0 :         let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
      46            0 :         keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
      47            0 :         keepalive_ticker.reset();
      48              : 
      49              :         loop {
      50            0 :             tokio::select! {
      51              :                 // Get some WAL from the stream and then: decode, interpret and send it
      52            0 :                 wal = stream.next() => {
      53            0 :                     let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
      54            0 :                         Some(some) => some?,
      55            0 :                         None => { break; }
      56              :                     };
      57              : 
      58            0 :                     wal_position = wal_end_lsn;
      59            0 :                     wal_decoder.feed_bytes(&wal);
      60            0 : 
      61            0 :                     let mut records = Vec::new();
      62            0 :                     let mut max_next_record_lsn = None;
      63            0 :                     while let Some((next_record_lsn, recdata)) = wal_decoder
      64            0 :                         .poll_decode()
      65            0 :                         .with_context(|| "Failed to decode WAL")?
      66              :                     {
      67            0 :                         assert!(next_record_lsn.is_aligned());
      68            0 :                         max_next_record_lsn = Some(next_record_lsn);
      69              : 
      70              :                         // Deserialize and interpret WAL record
      71            0 :                         let interpreted = InterpretedWalRecord::from_bytes_filtered(
      72            0 :                             recdata,
      73            0 :                             &self.shard,
      74            0 :                             next_record_lsn,
      75            0 :                             self.pg_version,
      76            0 :                         )
      77            0 :                         .with_context(|| "Failed to interpret WAL")?;
      78              : 
      79            0 :                         if !interpreted.is_empty() {
      80            0 :                             records.push(interpreted);
      81            0 :                         }
      82              :                     }
      83              : 
      84            0 :                     let mut buf = Vec::new();
      85            0 :                     records
      86            0 :                         .ser_into(&mut buf)
      87            0 :                         .with_context(|| "Failed to serialize interpreted WAL")?;
      88              : 
      89              :                     // Reset the keep alive ticker since we are sending something
      90              :                     // over the wire now.
      91            0 :                     keepalive_ticker.reset();
      92            0 : 
      93            0 :                     self.pgb
      94            0 :                         .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
      95            0 :                             streaming_lsn: wal_end_lsn.0,
      96            0 :                             commit_lsn: available_wal_end_lsn.0,
      97            0 :                             next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
      98            0 :                             data: buf.as_slice(),
      99            0 :                         })).await?;
     100              :                 }
     101              : 
     102              :                 // Send a periodic keep alive when the connection has been idle for a while.
     103            0 :                 _ = keepalive_ticker.tick() => {
     104            0 :                     self.pgb
     105            0 :                         .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     106            0 :                             wal_end: self.end_watch_view.get().0,
     107            0 :                             timestamp: get_current_timestamp(),
     108            0 :                             request_reply: true,
     109            0 :                         }))
     110            0 :                         .await?;
     111              :                 }
     112              :             }
     113              :         }
     114              : 
     115              :         // The loop above ends when the receiver is caught up and there's no more WAL to send.
     116            0 :         Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     117            0 :             "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     118            0 :             self.appname, wal_position,
     119            0 :         )))
     120            0 :     }
     121              : }
        

Generated by: LCOV version 2.1-beta