LCOV - code coverage report
Current view: top level - pageserver/page_api/src - split.rs (source / functions) Coverage Total Hit
Test: a14d6a1f0ccf210374e9eaed9918e97cd6f5d5ba.info Lines: 0.0 % 158 0
Test Date: 2025-08-04 14:37:31 Functions: 0.0 % 13 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use bytes::Bytes;
       4              : 
       5              : use crate::model::*;
       6              : use pageserver_api::key::rel_block_to_key;
       7              : use pageserver_api::shard::key_to_shard_number;
       8              : use utils::shard::{ShardCount, ShardIndex, ShardStripeSize};
       9              : 
      10              : /// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
      11              : /// TODO: add tests for this.
      12              : pub struct GetPageSplitter {
      13              :     /// Split requests by shard index.
      14              :     requests: HashMap<ShardIndex, GetPageRequest>,
      15              :     /// The response being assembled. Preallocated with empty pages, to be filled in.
      16              :     response: GetPageResponse,
      17              :     /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
      18              :     /// to assemble the response pages in the same order as the original request.
      19              :     block_shards: Vec<ShardIndex>,
      20              : }
      21              : 
      22              : impl GetPageSplitter {
      23              :     /// Checks if the given request only touches a single shard, and returns the shard ID. This is
      24              :     /// the common case, so we check first in order to avoid unnecessary allocations and overhead.
      25            0 :     pub fn for_single_shard(
      26            0 :         req: &GetPageRequest,
      27            0 :         count: ShardCount,
      28            0 :         stripe_size: Option<ShardStripeSize>,
      29            0 :     ) -> Result<Option<ShardIndex>, SplitError> {
      30              :         // Fast path: unsharded tenant.
      31            0 :         if count.is_unsharded() {
      32            0 :             return Ok(Some(ShardIndex::unsharded()));
      33            0 :         }
      34              : 
      35            0 :         let Some(stripe_size) = stripe_size else {
      36            0 :             return Err("stripe size must be given for sharded tenants".into());
      37              :         };
      38              : 
      39              :         // Find the first page's shard, for comparison.
      40            0 :         let Some(&first_page) = req.block_numbers.first() else {
      41            0 :             return Err("no block numbers in request".into());
      42              :         };
      43            0 :         let key = rel_block_to_key(req.rel, first_page);
      44            0 :         let shard_number = key_to_shard_number(count, stripe_size, &key);
      45              : 
      46            0 :         Ok(req
      47            0 :             .block_numbers
      48            0 :             .iter()
      49            0 :             .skip(1) // computed above
      50            0 :             .all(|&blkno| {
      51            0 :                 let key = rel_block_to_key(req.rel, blkno);
      52            0 :                 key_to_shard_number(count, stripe_size, &key) == shard_number
      53            0 :             })
      54            0 :             .then_some(ShardIndex::new(shard_number, count)))
      55            0 :     }
      56              : 
      57              :     /// Splits the given request.
      58            0 :     pub fn split(
      59            0 :         req: GetPageRequest,
      60            0 :         count: ShardCount,
      61            0 :         stripe_size: Option<ShardStripeSize>,
      62            0 :     ) -> Result<Self, SplitError> {
      63              :         // The caller should make sure we don't split requests unnecessarily.
      64            0 :         debug_assert!(
      65            0 :             Self::for_single_shard(&req, count, stripe_size)?.is_none(),
      66            0 :             "unnecessary request split"
      67              :         );
      68              : 
      69            0 :         if count.is_unsharded() {
      70            0 :             return Err("unsharded tenant, no point in splitting request".into());
      71            0 :         }
      72            0 :         let Some(stripe_size) = stripe_size else {
      73            0 :             return Err("stripe size must be given for sharded tenants".into());
      74              :         };
      75              : 
      76              :         // Split the requests by shard index.
      77            0 :         let mut requests = HashMap::with_capacity(2); // common case
      78            0 :         let mut block_shards = Vec::with_capacity(req.block_numbers.len());
      79            0 :         for &blkno in &req.block_numbers {
      80            0 :             let key = rel_block_to_key(req.rel, blkno);
      81            0 :             let shard_number = key_to_shard_number(count, stripe_size, &key);
      82            0 :             let shard_id = ShardIndex::new(shard_number, count);
      83              : 
      84            0 :             requests
      85            0 :                 .entry(shard_id)
      86            0 :                 .or_insert_with(|| GetPageRequest {
      87            0 :                     request_id: req.request_id,
      88            0 :                     request_class: req.request_class,
      89            0 :                     rel: req.rel,
      90            0 :                     read_lsn: req.read_lsn,
      91            0 :                     block_numbers: Vec::new(),
      92            0 :                 })
      93              :                 .block_numbers
      94            0 :                 .push(blkno);
      95            0 :             block_shards.push(shard_id);
      96              :         }
      97              : 
      98              :         // Construct a response to be populated by shard responses. Preallocate empty page slots
      99              :         // with the expected block numbers.
     100            0 :         let response = GetPageResponse {
     101            0 :             request_id: req.request_id,
     102            0 :             status_code: GetPageStatusCode::Ok,
     103            0 :             reason: None,
     104            0 :             rel: req.rel,
     105            0 :             pages: req
     106            0 :                 .block_numbers
     107            0 :                 .into_iter()
     108            0 :                 .map(|block_number| {
     109            0 :                     Page {
     110            0 :                         block_number,
     111            0 :                         image: Bytes::new(), // empty page slot to be filled in
     112            0 :                     }
     113            0 :                 })
     114            0 :                 .collect(),
     115              :         };
     116              : 
     117            0 :         Ok(Self {
     118            0 :             requests,
     119            0 :             response,
     120            0 :             block_shards,
     121            0 :         })
     122            0 :     }
     123              : 
     124              :     /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
     125            0 :     pub fn drain_requests(&mut self) -> impl Iterator<Item = (ShardIndex, GetPageRequest)> {
     126            0 :         self.requests.drain()
     127            0 :     }
     128              : 
     129              :     /// Adds a response from the given shard. The response must match the request ID and have an OK
     130              :     /// status code. A response must not already exist for the given shard ID.
     131            0 :     pub fn add_response(
     132            0 :         &mut self,
     133            0 :         shard_id: ShardIndex,
     134            0 :         response: GetPageResponse,
     135            0 :     ) -> Result<(), SplitError> {
     136              :         // The caller should already have converted status codes into tonic::Status.
     137            0 :         if response.status_code != GetPageStatusCode::Ok {
     138            0 :             return Err(SplitError(format!(
     139            0 :                 "unexpected non-OK response for shard {shard_id}: {} {}",
     140            0 :                 response.status_code,
     141            0 :                 response.reason.unwrap_or_default()
     142            0 :             )));
     143            0 :         }
     144              : 
     145            0 :         if response.request_id != self.response.request_id {
     146            0 :             return Err(SplitError(format!(
     147            0 :                 "response ID mismatch for shard {shard_id}: expected {}, got {}",
     148            0 :                 self.response.request_id, response.request_id
     149            0 :             )));
     150            0 :         }
     151              : 
     152            0 :         if response.request_id != self.response.request_id {
     153            0 :             return Err(SplitError(format!(
     154            0 :                 "response ID mismatch for shard {shard_id}: expected {}, got {}",
     155            0 :                 self.response.request_id, response.request_id
     156            0 :             )));
     157            0 :         }
     158              : 
     159              :         // Place the shard response pages into the assembled response, in request order.
     160            0 :         let mut pages = response.pages.into_iter();
     161              : 
     162            0 :         for (i, &s) in self.block_shards.iter().enumerate() {
     163            0 :             if shard_id != s {
     164            0 :                 continue;
     165            0 :             }
     166              : 
     167            0 :             let Some(slot) = self.response.pages.get_mut(i) else {
     168            0 :                 return Err(SplitError(format!(
     169            0 :                     "no block_shards slot {i} for shard {shard_id}"
     170            0 :                 )));
     171              :             };
     172            0 :             let Some(page) = pages.next() else {
     173            0 :                 return Err(SplitError(format!(
     174            0 :                     "missing page {} in shard {shard_id} response",
     175            0 :                     slot.block_number
     176            0 :                 )));
     177              :             };
     178            0 :             if page.block_number != slot.block_number {
     179            0 :                 return Err(SplitError(format!(
     180            0 :                     "shard {shard_id} returned wrong page at index {i}, expected {} got {}",
     181            0 :                     slot.block_number, page.block_number
     182            0 :                 )));
     183            0 :             }
     184            0 :             if !slot.image.is_empty() {
     185            0 :                 return Err(SplitError(format!(
     186            0 :                     "shard {shard_id} returned duplicate page {} at index {i}",
     187            0 :                     slot.block_number
     188            0 :                 )));
     189            0 :             }
     190              : 
     191            0 :             *slot = page;
     192              :         }
     193              : 
     194              :         // Make sure we've consumed all pages from the shard response.
     195            0 :         if let Some(extra_page) = pages.next() {
     196            0 :             return Err(SplitError(format!(
     197            0 :                 "shard {shard_id} returned extra page: {}",
     198            0 :                 extra_page.block_number
     199            0 :             )));
     200            0 :         }
     201              : 
     202            0 :         Ok(())
     203            0 :     }
     204              : 
     205              :     /// Collects the final, assembled response.
     206            0 :     pub fn collect_response(self) -> Result<GetPageResponse, SplitError> {
     207              :         // Check that the response is complete.
     208            0 :         for (i, page) in self.response.pages.iter().enumerate() {
     209            0 :             if page.image.is_empty() {
     210            0 :                 return Err(SplitError(format!(
     211            0 :                     "missing page {} for shard {}",
     212              :                     page.block_number,
     213            0 :                     self.block_shards
     214            0 :                         .get(i)
     215            0 :                         .map(|s| s.to_string())
     216            0 :                         .unwrap_or_else(|| "?".to_string())
     217              :                 )));
     218            0 :             }
     219              :         }
     220              : 
     221            0 :         Ok(self.response)
     222            0 :     }
     223              : }
     224              : 
     225              : /// A GetPageSplitter error.
     226              : #[derive(Debug, thiserror::Error)]
     227              : #[error("{0}")]
     228              : pub struct SplitError(String);
     229              : 
     230              : impl From<&str> for SplitError {
     231            0 :     fn from(err: &str) -> Self {
     232            0 :         SplitError(err.to_string())
     233            0 :     }
     234              : }
     235              : 
     236              : impl From<String> for SplitError {
     237            0 :     fn from(err: String) -> Self {
     238            0 :         SplitError(err)
     239            0 :     }
     240              : }
     241              : 
     242              : impl From<SplitError> for tonic::Status {
     243            0 :     fn from(err: SplitError) -> Self {
     244            0 :         tonic::Status::internal(err.0)
     245            0 :     }
     246              : }
        

Generated by: LCOV version 2.1-beta