LCOV - code coverage report
Current view: top level - safekeeper/src - send_interpreted_wal.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 0.0 % 77 0
Test Date: 2025-01-07 20:58:07 Functions: 0.0 % 10 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : 
       3              : use anyhow::Context;
       4              : use futures::StreamExt;
       5              : use pageserver_api::shard::ShardIdentity;
       6              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
       7              : use postgres_ffi::MAX_SEND_SIZE;
       8              : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
       9              : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
      10              : use tokio::io::{AsyncRead, AsyncWrite};
      11              : use tokio::time::MissedTickBehavior;
      12              : use utils::lsn::Lsn;
      13              : use utils::postgres_client::Compression;
      14              : use utils::postgres_client::InterpretedFormat;
      15              : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
      16              : use wal_decoder::wire_format::ToWireFormat;
      17              : 
      18              : use crate::send_wal::EndWatchView;
      19              : use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
      20              : 
      21              : /// Shard-aware interpreted record sender.
      22              : /// This is used for sending WAL to the pageserver. Said WAL
      23              : /// is pre-interpreted and filtered for the shard.
      24              : pub(crate) struct InterpretedWalSender<'a, IO> {
      25              :     pub(crate) format: InterpretedFormat,
      26              :     pub(crate) compression: Option<Compression>,
      27              :     pub(crate) pgb: &'a mut PostgresBackend<IO>,
      28              :     pub(crate) wal_stream_builder: WalReaderStreamBuilder,
      29              :     pub(crate) end_watch_view: EndWatchView,
      30              :     pub(crate) shard: ShardIdentity,
      31              :     pub(crate) pg_version: u32,
      32              :     pub(crate) appname: Option<String>,
      33              : }
      34              : 
      35              : struct Batch {
      36              :     wal_end_lsn: Lsn,
      37              :     available_wal_end_lsn: Lsn,
      38              :     records: InterpretedWalRecords,
      39              : }
      40              : 
      41              : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
      42              :     /// Send interpreted WAL to a receiver.
      43              :     /// Stops when an error occurs or the receiver is caught up and there's no active compute.
      44              :     ///
      45              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
      46              :     /// convenience.
      47            0 :     pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
      48            0 :         let mut wal_position = self.wal_stream_builder.start_pos();
      49            0 :         let mut wal_decoder =
      50            0 :             WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
      51              : 
      52            0 :         let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
      53            0 :         let mut stream = std::pin::pin!(stream);
      54            0 : 
      55            0 :         let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
      56            0 :         keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
      57            0 :         keepalive_ticker.reset();
      58            0 : 
      59            0 :         let (tx, mut rx) = tokio::sync::mpsc::channel::<Batch>(2);
      60              : 
      61              :         loop {
      62            0 :             tokio::select! {
      63              :                 // Get some WAL from the stream and then: decode, interpret and push it down the
      64              :                 // pipeline.
      65            0 :                 wal = stream.next(), if tx.capacity() > 0 => {
      66            0 :                     let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
      67            0 :                         Some(some) => some?,
      68            0 :                         None => { break; }
      69              :                     };
      70              : 
      71            0 :                     wal_position = wal_end_lsn;
      72            0 :                     wal_decoder.feed_bytes(&wal);
      73            0 : 
      74            0 :                     let mut records = Vec::new();
      75            0 :                     let mut max_next_record_lsn = None;
      76            0 :                     while let Some((next_record_lsn, recdata)) = wal_decoder
      77            0 :                         .poll_decode()
      78            0 :                         .with_context(|| "Failed to decode WAL")?
      79              :                     {
      80            0 :                         assert!(next_record_lsn.is_aligned());
      81            0 :                         max_next_record_lsn = Some(next_record_lsn);
      82              : 
      83              :                         // Deserialize and interpret WAL record
      84            0 :                         let interpreted = InterpretedWalRecord::from_bytes_filtered(
      85            0 :                             recdata,
      86            0 :                             &self.shard,
      87            0 :                             next_record_lsn,
      88            0 :                             self.pg_version,
      89            0 :                         )
      90            0 :                         .with_context(|| "Failed to interpret WAL")?;
      91              : 
      92            0 :                         if !interpreted.is_empty() {
      93            0 :                             records.push(interpreted);
      94            0 :                         }
      95              :                     }
      96              : 
      97            0 :                     let max_next_record_lsn = match max_next_record_lsn {
      98            0 :                         Some(lsn) => lsn,
      99            0 :                         None => { continue; }
     100              :                     };
     101              : 
     102            0 :                     let batch = InterpretedWalRecords {
     103            0 :                         records,
     104            0 :                         next_record_lsn: Some(max_next_record_lsn),
     105            0 :                     };
     106            0 : 
     107            0 :                     tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();
     108              :                 },
     109              :                 // For a previously interpreted batch, serialize it and push it down the wire.
     110            0 :                 batch = rx.recv() => {
     111            0 :                     let batch = match batch {
     112            0 :                         Some(b) => b,
     113            0 :                         None => { break; }
     114              :                     };
     115              : 
     116            0 :                     let buf = batch
     117            0 :                         .records
     118            0 :                         .to_wire(self.format, self.compression)
     119            0 :                         .await
     120            0 :                         .with_context(|| "Failed to serialize interpreted WAL")
     121            0 :                         .map_err(CopyStreamHandlerEnd::from)?;
     122              : 
     123              :                     // Reset the keep alive ticker since we are sending something
     124              :                     // over the wire now.
     125            0 :                     keepalive_ticker.reset();
     126            0 : 
     127            0 :                     self.pgb
     128            0 :                         .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
     129            0 :                             streaming_lsn: batch.wal_end_lsn.0,
     130            0 :                             commit_lsn: batch.available_wal_end_lsn.0,
     131            0 :                             data: &buf,
     132            0 :                         })).await?;
     133              :                 }
     134              :                 // Send a periodic keep alive when the connection has been idle for a while.
     135            0 :                 _ = keepalive_ticker.tick() => {
     136            0 :                     self.pgb
     137            0 :                         .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     138            0 :                             wal_end: self.end_watch_view.get().0,
     139            0 :                             timestamp: get_current_timestamp(),
     140            0 :                             request_reply: true,
     141            0 :                         }))
     142            0 :                         .await?;
     143              :                 }
     144              :             }
     145              :         }
     146              : 
     147              :         // The loop above ends when the receiver is caught up and there's no more WAL to send.
     148            0 :         Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     149            0 :             "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     150            0 :             self.appname, wal_position,
     151            0 :         )))
     152            0 :     }
     153              : }
        

Generated by: LCOV version 2.1-beta