LCOV - code coverage report
Current view: top level - safekeeper/src - send_interpreted_wal.rs (source / functions) Coverage Total Hit
Test: a2f0f8a80fbf1089336086fa360ce27fa555cb1a.info Lines: 0.0 % 83 0
Test Date: 2024-11-20 17:59:39 Functions: 0.0 % 14 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : 
       3              : use anyhow::Context;
       4              : use bytes::Bytes;
       5              : use futures::StreamExt;
       6              : use pageserver_api::shard::ShardIdentity;
       7              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
       8              : use postgres_ffi::MAX_SEND_SIZE;
       9              : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
      10              : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
      11              : use tokio::io::{AsyncRead, AsyncWrite};
      12              : use tokio::time::MissedTickBehavior;
      13              : use utils::lsn::Lsn;
      14              : use utils::postgres_client::Compression;
      15              : use utils::postgres_client::InterpretedFormat;
      16              : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
      17              : use wal_decoder::wire_format::ToWireFormat;
      18              : 
      19              : use crate::send_wal::EndWatchView;
      20              : use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
      21              : 
      22              : /// Shard-aware interpreted record sender.
      23              : /// This is used for sending WAL to the pageserver. Said WAL
      24              : /// is pre-interpreted and filtered for the shard.
      25              : pub(crate) struct InterpretedWalSender<'a, IO> {
      26              :     pub(crate) format: InterpretedFormat,
      27              :     pub(crate) compression: Option<Compression>,
      28              :     pub(crate) pgb: &'a mut PostgresBackend<IO>,
      29              :     pub(crate) wal_stream_builder: WalReaderStreamBuilder,
      30              :     pub(crate) end_watch_view: EndWatchView,
      31              :     pub(crate) shard: ShardIdentity,
      32              :     pub(crate) pg_version: u32,
      33              :     pub(crate) appname: Option<String>,
      34              : }
      35              : 
      36              : struct Batch {
      37              :     wal_end_lsn: Lsn,
      38              :     commit_lsn: Lsn,
      39              :     records: InterpretedWalRecords,
      40              : }
      41              : 
      42              : struct SerializedBatch {
      43              :     wal_end_lsn: Lsn,
      44              :     commit_lsn: Lsn,
      45              :     buf: Bytes,
      46              : }
      47              : 
      48              : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
      49              :     /// Send interpreted WAL to a receiver.
      50              :     /// Stops when an error occurs or the receiver is caught up and there's no active compute.
      51              :     ///
      52              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
      53              :     /// convenience.
      54            0 :     pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
      55            0 :         let mut wal_position = self.wal_stream_builder.start_pos();
      56            0 :         let mut wal_decoder =
      57            0 :             WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
      58              : 
      59            0 :         let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
      60            0 :         let mut stream = std::pin::pin!(stream);
      61            0 : 
      62            0 :         let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
      63            0 :         keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
      64            0 :         keepalive_ticker.reset();
      65            0 : 
      66            0 :         let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(2);
      67            0 :         let batches_stream =
      68            0 :             tokio_stream::wrappers::ReceiverStream::new(rx).then(|batch| async move {
      69            0 :                 let buf: Result<Bytes, CopyStreamHandlerEnd> = batch
      70            0 :                     .records
      71            0 :                     .to_wire(self.format, self.compression)
      72            0 :                     .await
      73            0 :                     .with_context(|| "Failed to serialize interpreted WAL")
      74            0 :                     .map_err(CopyStreamHandlerEnd::from);
      75            0 : 
      76            0 :                 Result::<_, CopyStreamHandlerEnd>::Ok(SerializedBatch {
      77            0 :                     wal_end_lsn: batch.wal_end_lsn,
      78            0 :                     commit_lsn: batch.commit_lsn,
      79            0 :                     buf: buf?,
      80              :                 })
      81            0 :             });
      82            0 :         let mut batches_stream = std::pin::pin!(batches_stream);
      83              : 
      84              :         loop {
      85            0 :             tokio::select! {
      86              :                 // Get some WAL from the stream and then: decode, interpret and push it down the
      87              :                 // pipeline.
      88            0 :                 wal = stream.next(), if tx.capacity() > 0 => {
      89            0 :                     let WalBytes { wal, wal_start_lsn, wal_end_lsn, commit_lsn } = match wal {
      90            0 :                         Some(some) => some?,
      91            0 :                         None => { break; }
      92              :                     };
      93              : 
      94            0 :                     wal_position = wal_start_lsn;
      95            0 :                     wal_decoder.feed_bytes(&wal);
      96            0 : 
      97            0 :                     let mut records = Vec::new();
      98            0 :                     let mut max_next_record_lsn = None;
      99            0 :                     while let Some((next_record_lsn, recdata)) = wal_decoder
     100            0 :                         .poll_decode()
     101            0 :                         .with_context(|| "Failed to decode WAL")?
     102              :                     {
     103            0 :                         assert!(next_record_lsn.is_aligned());
     104            0 :                         max_next_record_lsn = Some(next_record_lsn);
     105              : 
     106              :                         // Deserialize and interpret WAL record
     107            0 :                         let interpreted = InterpretedWalRecord::from_bytes_filtered(
     108            0 :                             recdata,
     109            0 :                             &self.shard,
     110            0 :                             next_record_lsn,
     111            0 :                             self.pg_version,
     112            0 :                         )
     113            0 :                         .with_context(|| "Failed to interpret WAL")?;
     114              : 
     115            0 :                         if !interpreted.is_empty() {
     116            0 :                             records.push(interpreted);
     117            0 :                         }
     118              :                     }
     119              : 
     120            0 :                     let batch = InterpretedWalRecords {
     121            0 :                         records,
     122            0 :                         next_record_lsn: max_next_record_lsn
     123            0 :                     };
     124            0 : 
     125            0 :                     tx.send(Batch {wal_end_lsn, commit_lsn, records: batch}).await.unwrap();
     126              :                 },
     127              :                 // For a previously interpreted batch, serialize it and push it down the wire.
     128            0 :                 encoded_batch = batches_stream.next() => {
     129            0 :                     let SerializedBatch {wal_end_lsn, commit_lsn, buf } = match encoded_batch {
     130            0 :                         Some(ser_batch) => ser_batch?,
     131            0 :                         None => { break; }
     132              :                     };
     133              : 
     134              :                     // Reset the keep alive ticker since we are sending something
     135              :                     // over the wire now.
     136            0 :                     keepalive_ticker.reset();
     137            0 : 
     138            0 :                     self.pgb
     139            0 :                         .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
     140            0 :                             streaming_lsn: wal_end_lsn.0,
     141            0 :                             commit_lsn: commit_lsn.0,
     142            0 :                             data: &buf,
     143            0 :                         })).await?;
     144              :                 }
     145              :                 // Send a periodic keep alive when the connection has been idle for a while.
     146            0 :                 _ = keepalive_ticker.tick() => {
     147            0 :                     self.pgb
     148            0 :                         .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     149            0 :                             wal_end: self.end_watch_view.get().0,
     150            0 :                             timestamp: get_current_timestamp(),
     151            0 :                             request_reply: true,
     152            0 :                         }))
     153            0 :                         .await?;
     154              :                 }
     155              :             }
     156              :         }
     157              : 
     158              :         // The loop above ends when the receiver is caught up and there's no more WAL to send.
     159            0 :         Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     160            0 :             "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     161            0 :             self.appname, wal_position,
     162            0 :         )))
     163            0 :     }
     164              : }
        

Generated by: LCOV version 2.1-beta