LCOV - code coverage report
Current view: top level - libs/utils/src - pageserver_feedback.rs (source / functions) Coverage Total Hit
Test: f26987deef05b637be3b9ae5d95c30faa25ab621.info Lines: 87.8 % 148 130
Test Date: 2025-07-31 11:15:47 Functions: 46.2 % 13 6

            Line data    Source code
       1              : use std::time::{Duration, SystemTime};
       2              : 
       3              : use bytes::{Buf, BufMut, Bytes, BytesMut};
       4              : use pq_proto::{PG_EPOCH, read_cstr};
       5              : use serde::{Deserialize, Serialize};
       6              : use tracing::{trace, warn};
       7              : 
       8              : use crate::lsn::Lsn;
       9              : 
      10              : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
      11              : ///
      12              : /// Serialized in custom flexible key/value format. In replication protocol, it
      13              : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
      14              : /// Standby status update / Hot standby feedback messages.
      15              : ///
      16              : /// serde Serialize is used only for human readable dump to json (e.g. in
      17              : /// safekeepers debug_dump).
      18              : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
      19              : pub struct PageserverFeedback {
      20              :     /// Last known size of the timeline. Used to enforce timeline size limit.
      21              :     pub current_timeline_size: u64,
      22              :     /// LSN last received and ingested by the pageserver. Controls backpressure.
      23              :     pub last_received_lsn: Lsn,
      24              :     /// LSN up to which data is persisted by the pageserver to its local disc.
      25              :     /// Controls backpressure.
      26              :     pub disk_consistent_lsn: Lsn,
      27              :     /// LSN up to which data is persisted by the pageserver on s3; safekeepers
      28              :     /// consider WAL before it can be removed.
      29              :     pub remote_consistent_lsn: Lsn,
      30              :     // Serialize with RFC3339 format.
      31              :     #[serde(with = "serde_systemtime")]
      32              :     pub replytime: SystemTime,
      33              :     /// Used to track feedbacks from different shards. Always zero for unsharded tenants.
      34              :     pub shard_number: u32,
      35              :     /// If true, the pageserver has detected corruption and the safekeeper and postgres
      36              :     /// should stop sending WAL.
      37              :     pub corruption_detected: bool,
      38              : }
      39              : 
      40              : impl PageserverFeedback {
      41           13 :     pub fn empty() -> PageserverFeedback {
      42           13 :         PageserverFeedback {
      43           13 :             current_timeline_size: 0,
      44           13 :             last_received_lsn: Lsn::INVALID,
      45           13 :             remote_consistent_lsn: Lsn::INVALID,
      46           13 :             disk_consistent_lsn: Lsn::INVALID,
      47           13 :             replytime: *PG_EPOCH,
      48           13 :             shard_number: 0,
      49           13 :             corruption_detected: false,
      50           13 :         }
      51           13 :     }
      52              : 
      53              :     // Serialize PageserverFeedback using custom format
      54              :     // to support protocol extensibility.
      55              :     //
      56              :     // Following layout is used:
      57              :     // char - number of key-value pairs that follow.
      58              :     //
      59              :     // key-value pairs:
      60              :     // null-terminated string - key,
      61              :     // uint32 - value length in bytes
      62              :     // value itself
      63              :     //
      64              :     // TODO: change serialized fields names once all computes migrate to rename.
      65            3 :     pub fn serialize(&self, buf: &mut BytesMut) {
      66            3 :         let buf_ptr = buf.len();
      67            3 :         buf.put_u8(0); // # of keys, will be filled later
      68            3 :         let mut nkeys = 0;
      69              : 
      70            3 :         nkeys += 1;
      71            3 :         buf.put_slice(b"current_timeline_size\0");
      72            3 :         buf.put_i32(8);
      73            3 :         buf.put_u64(self.current_timeline_size);
      74              : 
      75            3 :         nkeys += 1;
      76            3 :         buf.put_slice(b"ps_writelsn\0");
      77            3 :         buf.put_i32(8);
      78            3 :         buf.put_u64(self.last_received_lsn.0);
      79              : 
      80            3 :         nkeys += 1;
      81            3 :         buf.put_slice(b"ps_flushlsn\0");
      82            3 :         buf.put_i32(8);
      83            3 :         buf.put_u64(self.disk_consistent_lsn.0);
      84              : 
      85            3 :         nkeys += 1;
      86            3 :         buf.put_slice(b"ps_applylsn\0");
      87            3 :         buf.put_i32(8);
      88            3 :         buf.put_u64(self.remote_consistent_lsn.0);
      89              : 
      90            3 :         let timestamp = self
      91            3 :             .replytime
      92            3 :             .duration_since(*PG_EPOCH)
      93            3 :             .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
      94            3 :             .as_micros() as i64;
      95              : 
      96            3 :         nkeys += 1;
      97            3 :         buf.put_slice(b"ps_replytime\0");
      98            3 :         buf.put_i32(8);
      99            3 :         buf.put_i64(timestamp);
     100              : 
     101            3 :         if self.shard_number > 0 {
     102            1 :             nkeys += 1;
     103            1 :             buf.put_slice(b"shard_number\0");
     104            1 :             buf.put_i32(4);
     105            1 :             buf.put_u32(self.shard_number);
     106            2 :         }
     107              : 
     108            3 :         if self.corruption_detected {
     109            1 :             nkeys += 1;
     110            1 :             buf.put_slice(b"corruption_detected\0");
     111            1 :             buf.put_i32(1);
     112            1 :             buf.put_u8(1);
     113            2 :         }
     114              : 
     115            3 :         buf[buf_ptr] = nkeys;
     116            3 :     }
     117              : 
     118              :     // Deserialize PageserverFeedback message
     119              :     // TODO: change serialized fields names once all computes migrate to rename.
     120            3 :     pub fn parse(mut buf: Bytes) -> PageserverFeedback {
     121            3 :         let mut rf = PageserverFeedback::empty();
     122            3 :         let nfields = buf.get_u8();
     123            3 :         for _ in 0..nfields {
     124           18 :             let key = read_cstr(&mut buf).unwrap();
     125           18 :             match key.as_ref() {
     126           18 :                 b"current_timeline_size" => {
     127            3 :                     let len = buf.get_i32();
     128            3 :                     assert_eq!(len, 8);
     129            3 :                     rf.current_timeline_size = buf.get_u64();
     130              :                 }
     131           15 :                 b"ps_writelsn" => {
     132            3 :                     let len = buf.get_i32();
     133            3 :                     assert_eq!(len, 8);
     134            3 :                     rf.last_received_lsn = Lsn(buf.get_u64());
     135              :                 }
     136              :                 b"ps_flushlsn" => {
     137            3 :                     let len = buf.get_i32();
     138            3 :                     assert_eq!(len, 8);
     139            3 :                     rf.disk_consistent_lsn = Lsn(buf.get_u64());
     140              :                 }
     141              :                 b"ps_applylsn" => {
     142            3 :                     let len = buf.get_i32();
     143            3 :                     assert_eq!(len, 8);
     144            3 :                     rf.remote_consistent_lsn = Lsn(buf.get_u64());
     145              :                 }
     146            6 :                 b"ps_replytime" => {
     147            3 :                     let len = buf.get_i32();
     148            3 :                     assert_eq!(len, 8);
     149            3 :                     let raw_time = buf.get_i64();
     150            3 :                     if raw_time > 0 {
     151            3 :                         rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
     152            3 :                     } else {
     153            0 :                         rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
     154            0 :                     }
     155              :                 }
     156              :                 b"shard_number" => {
     157            1 :                     let len = buf.get_i32();
     158            1 :                     assert_eq!(len, 4);
     159            1 :                     rf.shard_number = buf.get_u32();
     160              :                 }
     161            2 :                 b"corruption_detected" => {
     162            1 :                     let len = buf.get_i32();
     163            1 :                     assert_eq!(len, 1);
     164            1 :                     rf.corruption_detected = buf.get_u8() != 0;
     165              :                 }
     166              :                 _ => {
     167            1 :                     let len = buf.get_i32();
     168            1 :                     warn!(
     169            0 :                         "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
     170            0 :                         String::from_utf8_lossy(key.as_ref())
     171              :                     );
     172            1 :                     buf.advance(len as usize);
     173              :                 }
     174              :             }
     175              :         }
     176            3 :         trace!("PageserverFeedback parsed is {:?}", rf);
     177            3 :         rf
     178            3 :     }
     179              : }
     180              : 
     181              : mod serde_systemtime {
     182              :     use std::time::SystemTime;
     183              : 
     184              :     use chrono::{DateTime, Utc};
     185              :     use serde::{Deserialize, Deserializer, Serializer};
     186              : 
     187            0 :     pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
     188            0 :     where
     189            0 :         S: Serializer,
     190              :     {
     191            0 :         let chrono_dt: DateTime<Utc> = (*ts).into();
     192            0 :         serializer.serialize_str(&chrono_dt.to_rfc3339())
     193            0 :     }
     194              : 
     195            0 :     pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
     196            0 :     where
     197            0 :         D: Deserializer<'de>,
     198              :     {
     199            0 :         let time: String = Deserialize::deserialize(deserializer)?;
     200            0 :         Ok(DateTime::parse_from_rfc3339(&time)
     201            0 :             .map_err(serde::de::Error::custom)?
     202            0 :             .into())
     203            0 :     }
     204              : }
     205              : 
     206              : #[cfg(test)]
     207              : mod tests {
     208              :     use super::*;
     209              : 
     210              :     #[test]
     211            1 :     fn test_replication_feedback_serialization() {
     212            1 :         let mut rf = PageserverFeedback::empty();
     213              :         // Fill rf with some values
     214            1 :         rf.current_timeline_size = 12345678;
     215              :         // Set rounded time to be able to compare it with deserialized value,
     216              :         // because it is rounded up to microseconds during serialization.
     217            1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     218            1 :         let mut data = BytesMut::new();
     219            1 :         rf.serialize(&mut data);
     220              : 
     221            1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     222            1 :         assert_eq!(rf, rf_parsed);
     223            1 :     }
     224              : 
     225              :     // Test that databricks-specific fields added to the PageserverFeedback message are serialized
     226              :     // and deserialized correctly, in addition to the existing fields from upstream.
     227              :     #[test]
     228            1 :     fn test_replication_feedback_databricks_fields() {
     229            1 :         let mut rf = PageserverFeedback::empty();
     230            1 :         rf.current_timeline_size = 12345678;
     231            1 :         rf.last_received_lsn = Lsn(23456789);
     232            1 :         rf.disk_consistent_lsn = Lsn(34567890);
     233            1 :         rf.remote_consistent_lsn = Lsn(45678901);
     234            1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     235            1 :         rf.shard_number = 1;
     236            1 :         rf.corruption_detected = true;
     237              : 
     238            1 :         let mut data = BytesMut::new();
     239            1 :         rf.serialize(&mut data);
     240              : 
     241            1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     242            1 :         assert_eq!(rf, rf_parsed);
     243            1 :     }
     244              : 
     245              :     #[test]
     246            1 :     fn test_replication_feedback_unknown_key() {
     247            1 :         let mut rf = PageserverFeedback::empty();
     248              :         // Fill rf with some values
     249            1 :         rf.current_timeline_size = 12345678;
     250              :         // Set rounded time to be able to compare it with deserialized value,
     251              :         // because it is rounded up to microseconds during serialization.
     252            1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     253            1 :         let mut data = BytesMut::new();
     254            1 :         rf.serialize(&mut data);
     255              : 
     256              :         // Add an extra field to the buffer and adjust number of keys
     257            1 :         data[0] += 1;
     258            1 :         data.put_slice(b"new_field_one\0");
     259            1 :         data.put_i32(8);
     260            1 :         data.put_u64(42);
     261              : 
     262              :         // Parse serialized data and check that new field is not parsed
     263            1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     264            1 :         assert_eq!(rf, rf_parsed);
     265            1 :     }
     266              : }
        

Generated by: LCOV version 2.1-beta