LCOV - code coverage report
Current view: top level - pageserver/client_grpc/src - split.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 137 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 10 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use bytes::Bytes;
       4              : 
       5              : use pageserver_api::key::rel_block_to_key;
       6              : use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
       7              : use pageserver_page_api as page_api;
       8              : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
       9              : 
      10              : /// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
      11              : /// TODO: add tests for this.
      12              : pub struct GetPageSplitter {
      13              :     /// Split requests by shard index.
      14              :     requests: HashMap<ShardIndex, page_api::GetPageRequest>,
      15              :     /// The response being assembled. Preallocated with empty pages, to be filled in.
      16              :     response: page_api::GetPageResponse,
      17              :     /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
      18              :     /// to assemble the response pages in the same order as the original request.
      19              :     block_shards: Vec<ShardIndex>,
      20              : }
      21              : 
      22              : impl GetPageSplitter {
      23              :     /// Checks if the given request only touches a single shard, and returns the shard ID. This is
      24              :     /// the common case, so we check first in order to avoid unnecessary allocations and overhead.
      25            0 :     pub fn for_single_shard(
      26            0 :         req: &page_api::GetPageRequest,
      27            0 :         count: ShardCount,
      28            0 :         stripe_size: ShardStripeSize,
      29            0 :     ) -> Option<ShardIndex> {
      30              :         // Fast path: unsharded tenant.
      31            0 :         if count.is_unsharded() {
      32            0 :             return Some(ShardIndex::unsharded());
      33            0 :         }
      34              : 
      35              :         // Find the first page's shard, for comparison. If there are no pages, just return the first
      36              :         // shard (caller likely checked already, otherwise the server will reject it).
      37            0 :         let Some(&first_page) = req.block_numbers.first() else {
      38            0 :             return Some(ShardIndex::new(ShardNumber(0), count));
      39              :         };
      40            0 :         let key = rel_block_to_key(req.rel, first_page);
      41            0 :         let shard_number = key_to_shard_number(count, stripe_size, &key);
      42              : 
      43            0 :         req.block_numbers
      44            0 :             .iter()
      45            0 :             .skip(1) // computed above
      46            0 :             .all(|&blkno| {
      47            0 :                 let key = rel_block_to_key(req.rel, blkno);
      48            0 :                 key_to_shard_number(count, stripe_size, &key) == shard_number
      49            0 :             })
      50            0 :             .then_some(ShardIndex::new(shard_number, count))
      51            0 :     }
      52              : 
      53              :     /// Splits the given request.
      54            0 :     pub fn split(
      55            0 :         req: page_api::GetPageRequest,
      56            0 :         count: ShardCount,
      57            0 :         stripe_size: ShardStripeSize,
      58            0 :     ) -> Self {
      59              :         // The caller should make sure we don't split requests unnecessarily.
      60            0 :         debug_assert!(
      61            0 :             Self::for_single_shard(&req, count, stripe_size).is_none(),
      62            0 :             "unnecessary request split"
      63              :         );
      64              : 
      65              :         // Split the requests by shard index.
      66            0 :         let mut requests = HashMap::with_capacity(2); // common case
      67            0 :         let mut block_shards = Vec::with_capacity(req.block_numbers.len());
      68            0 :         for &blkno in &req.block_numbers {
      69            0 :             let key = rel_block_to_key(req.rel, blkno);
      70            0 :             let shard_number = key_to_shard_number(count, stripe_size, &key);
      71            0 :             let shard_id = ShardIndex::new(shard_number, count);
      72              : 
      73            0 :             requests
      74            0 :                 .entry(shard_id)
      75            0 :                 .or_insert_with(|| page_api::GetPageRequest {
      76            0 :                     request_id: req.request_id,
      77            0 :                     request_class: req.request_class,
      78            0 :                     rel: req.rel,
      79            0 :                     read_lsn: req.read_lsn,
      80            0 :                     block_numbers: Vec::new(),
      81            0 :                 })
      82              :                 .block_numbers
      83            0 :                 .push(blkno);
      84            0 :             block_shards.push(shard_id);
      85              :         }
      86              : 
      87              :         // Construct a response to be populated by shard responses. Preallocate empty page slots
      88              :         // with the expected block numbers.
      89            0 :         let response = page_api::GetPageResponse {
      90            0 :             request_id: req.request_id,
      91            0 :             status_code: page_api::GetPageStatusCode::Ok,
      92            0 :             reason: None,
      93            0 :             rel: req.rel,
      94            0 :             pages: req
      95            0 :                 .block_numbers
      96            0 :                 .into_iter()
      97            0 :                 .map(|block_number| {
      98            0 :                     page_api::Page {
      99            0 :                         block_number,
     100            0 :                         image: Bytes::new(), // empty page slot to be filled in
     101            0 :                     }
     102            0 :                 })
     103            0 :                 .collect(),
     104              :         };
     105              : 
     106            0 :         Self {
     107            0 :             requests,
     108            0 :             response,
     109            0 :             block_shards,
     110            0 :         }
     111            0 :     }
     112              : 
     113              :     /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
     114            0 :     pub fn drain_requests(
     115            0 :         &mut self,
     116            0 :     ) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
     117            0 :         self.requests.drain()
     118            0 :     }
     119              : 
     120              :     /// Adds a response from the given shard. The response must match the request ID and have an OK
     121              :     /// status code. A response must not already exist for the given shard ID.
     122              :     #[allow(clippy::result_large_err)]
     123            0 :     pub fn add_response(
     124            0 :         &mut self,
     125            0 :         shard_id: ShardIndex,
     126            0 :         response: page_api::GetPageResponse,
     127            0 :     ) -> tonic::Result<()> {
     128              :         // The caller should already have converted status codes into tonic::Status.
     129            0 :         if response.status_code != page_api::GetPageStatusCode::Ok {
     130            0 :             return Err(tonic::Status::internal(format!(
     131            0 :                 "unexpected non-OK response for shard {shard_id}: {} {}",
     132            0 :                 response.status_code,
     133            0 :                 response.reason.unwrap_or_default()
     134            0 :             )));
     135            0 :         }
     136              : 
     137            0 :         if response.request_id != self.response.request_id {
     138            0 :             return Err(tonic::Status::internal(format!(
     139            0 :                 "response ID mismatch for shard {shard_id}: expected {}, got {}",
     140            0 :                 self.response.request_id, response.request_id
     141            0 :             )));
     142            0 :         }
     143              : 
     144              :         // Place the shard response pages into the assembled response, in request order.
     145            0 :         let mut pages = response.pages.into_iter();
     146              : 
     147            0 :         for (i, &s) in self.block_shards.iter().enumerate() {
     148            0 :             if shard_id != s {
     149            0 :                 continue;
     150            0 :             }
     151              : 
     152            0 :             let Some(slot) = self.response.pages.get_mut(i) else {
     153            0 :                 return Err(tonic::Status::internal(format!(
     154            0 :                     "no block_shards slot {i} for shard {shard_id}"
     155            0 :                 )));
     156              :             };
     157            0 :             let Some(page) = pages.next() else {
     158            0 :                 return Err(tonic::Status::internal(format!(
     159            0 :                     "missing page {} in shard {shard_id} response",
     160            0 :                     slot.block_number
     161            0 :                 )));
     162              :             };
     163            0 :             if page.block_number != slot.block_number {
     164            0 :                 return Err(tonic::Status::internal(format!(
     165            0 :                     "shard {shard_id} returned wrong page at index {i}, expected {} got {}",
     166            0 :                     slot.block_number, page.block_number
     167            0 :                 )));
     168            0 :             }
     169            0 :             if !slot.image.is_empty() {
     170            0 :                 return Err(tonic::Status::internal(format!(
     171            0 :                     "shard {shard_id} returned duplicate page {} at index {i}",
     172            0 :                     slot.block_number
     173            0 :                 )));
     174            0 :             }
     175              : 
     176            0 :             *slot = page;
     177              :         }
     178              : 
     179              :         // Make sure we've consumed all pages from the shard response.
     180            0 :         if let Some(extra_page) = pages.next() {
     181            0 :             return Err(tonic::Status::internal(format!(
     182            0 :                 "shard {shard_id} returned extra page: {}",
     183            0 :                 extra_page.block_number
     184            0 :             )));
     185            0 :         }
     186              : 
     187            0 :         Ok(())
     188            0 :     }
     189              : 
     190              :     /// Fetches the final, assembled response.
     191              :     #[allow(clippy::result_large_err)]
     192            0 :     pub fn get_response(self) -> tonic::Result<page_api::GetPageResponse> {
     193              :         // Check that the response is complete.
     194            0 :         for (i, page) in self.response.pages.iter().enumerate() {
     195            0 :             if page.image.is_empty() {
     196            0 :                 return Err(tonic::Status::internal(format!(
     197            0 :                     "missing page {} for shard {}",
     198              :                     page.block_number,
     199            0 :                     self.block_shards
     200            0 :                         .get(i)
     201            0 :                         .map(|s| s.to_string())
     202            0 :                         .unwrap_or_else(|| "?".to_string())
     203              :                 )));
     204            0 :             }
     205              :         }
     206              : 
     207            0 :         Ok(self.response)
     208            0 :     }
     209              : }
        

Generated by: LCOV version 2.1-beta