LCOV - code coverage report
Current view: top level - pageserver/client_grpc/src - split.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 0.0 % 152 0
Test Date: 2025-07-22 17:50:06 Functions: 0.0 % 10 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use anyhow::anyhow;
       4              : use bytes::Bytes;
       5              : 
       6              : use pageserver_api::key::rel_block_to_key;
       7              : use pageserver_api::shard::key_to_shard_number;
       8              : use pageserver_page_api as page_api;
       9              : use utils::shard::{ShardCount, ShardIndex, ShardStripeSize};
      10              : 
      11              : /// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
      12              : /// TODO: add tests for this.
      13              : pub struct GetPageSplitter {
      14              :     /// Split requests by shard index.
      15              :     requests: HashMap<ShardIndex, page_api::GetPageRequest>,
      16              :     /// The response being assembled. Preallocated with empty pages, to be filled in.
      17              :     response: page_api::GetPageResponse,
      18              :     /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
      19              :     /// to assemble the response pages in the same order as the original request.
      20              :     block_shards: Vec<ShardIndex>,
      21              : }
      22              : 
      23              : impl GetPageSplitter {
      24              :     /// Checks if the given request only touches a single shard, and returns the shard ID. This is
      25              :     /// the common case, so we check first in order to avoid unnecessary allocations and overhead.
      26            0 :     pub fn for_single_shard(
      27            0 :         req: &page_api::GetPageRequest,
      28            0 :         count: ShardCount,
      29            0 :         stripe_size: Option<ShardStripeSize>,
      30            0 :     ) -> anyhow::Result<Option<ShardIndex>> {
      31              :         // Fast path: unsharded tenant.
      32            0 :         if count.is_unsharded() {
      33            0 :             return Ok(Some(ShardIndex::unsharded()));
      34            0 :         }
      35              : 
      36            0 :         let Some(stripe_size) = stripe_size else {
      37            0 :             return Err(anyhow!("stripe size must be given for sharded tenants"));
      38              :         };
      39              : 
      40              :         // Find the first page's shard, for comparison.
      41            0 :         let Some(&first_page) = req.block_numbers.first() else {
      42            0 :             return Err(anyhow!("no block numbers in request"));
      43              :         };
      44            0 :         let key = rel_block_to_key(req.rel, first_page);
      45            0 :         let shard_number = key_to_shard_number(count, stripe_size, &key);
      46              : 
      47            0 :         Ok(req
      48            0 :             .block_numbers
      49            0 :             .iter()
      50            0 :             .skip(1) // computed above
      51            0 :             .all(|&blkno| {
      52            0 :                 let key = rel_block_to_key(req.rel, blkno);
      53            0 :                 key_to_shard_number(count, stripe_size, &key) == shard_number
      54            0 :             })
      55            0 :             .then_some(ShardIndex::new(shard_number, count)))
      56            0 :     }
      57              : 
      58              :     /// Splits the given request.
      59            0 :     pub fn split(
      60            0 :         req: page_api::GetPageRequest,
      61            0 :         count: ShardCount,
      62            0 :         stripe_size: Option<ShardStripeSize>,
      63            0 :     ) -> anyhow::Result<Self> {
      64              :         // The caller should make sure we don't split requests unnecessarily.
      65            0 :         debug_assert!(
      66            0 :             Self::for_single_shard(&req, count, stripe_size)?.is_none(),
      67            0 :             "unnecessary request split"
      68              :         );
      69              : 
      70            0 :         if count.is_unsharded() {
      71            0 :             return Err(anyhow!("unsharded tenant, no point in splitting request"));
      72            0 :         }
      73            0 :         let Some(stripe_size) = stripe_size else {
      74            0 :             return Err(anyhow!("stripe size must be given for sharded tenants"));
      75              :         };
      76              : 
      77              :         // Split the requests by shard index.
      78            0 :         let mut requests = HashMap::with_capacity(2); // common case
      79            0 :         let mut block_shards = Vec::with_capacity(req.block_numbers.len());
      80            0 :         for &blkno in &req.block_numbers {
      81            0 :             let key = rel_block_to_key(req.rel, blkno);
      82            0 :             let shard_number = key_to_shard_number(count, stripe_size, &key);
      83            0 :             let shard_id = ShardIndex::new(shard_number, count);
      84              : 
      85            0 :             requests
      86            0 :                 .entry(shard_id)
      87            0 :                 .or_insert_with(|| page_api::GetPageRequest {
      88            0 :                     request_id: req.request_id,
      89            0 :                     request_class: req.request_class,
      90            0 :                     rel: req.rel,
      91            0 :                     read_lsn: req.read_lsn,
      92            0 :                     block_numbers: Vec::new(),
      93            0 :                 })
      94              :                 .block_numbers
      95            0 :                 .push(blkno);
      96            0 :             block_shards.push(shard_id);
      97              :         }
      98              : 
      99              :         // Construct a response to be populated by shard responses. Preallocate empty page slots
     100              :         // with the expected block numbers.
     101            0 :         let response = page_api::GetPageResponse {
     102            0 :             request_id: req.request_id,
     103            0 :             status_code: page_api::GetPageStatusCode::Ok,
     104            0 :             reason: None,
     105            0 :             rel: req.rel,
     106            0 :             pages: req
     107            0 :                 .block_numbers
     108            0 :                 .into_iter()
     109            0 :                 .map(|block_number| {
     110            0 :                     page_api::Page {
     111            0 :                         block_number,
     112            0 :                         image: Bytes::new(), // empty page slot to be filled in
     113            0 :                     }
     114            0 :                 })
     115            0 :                 .collect(),
     116              :         };
     117              : 
     118            0 :         Ok(Self {
     119            0 :             requests,
     120            0 :             response,
     121            0 :             block_shards,
     122            0 :         })
     123            0 :     }
     124              : 
     125              :     /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
     126            0 :     pub fn drain_requests(
     127            0 :         &mut self,
     128            0 :     ) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
     129            0 :         self.requests.drain()
     130            0 :     }
     131              : 
     132              :     /// Adds a response from the given shard. The response must match the request ID and have an OK
     133              :     /// status code. A response must not already exist for the given shard ID.
     134              :     #[allow(clippy::result_large_err)]
     135            0 :     pub fn add_response(
     136            0 :         &mut self,
     137            0 :         shard_id: ShardIndex,
     138            0 :         response: page_api::GetPageResponse,
     139            0 :     ) -> anyhow::Result<()> {
     140              :         // The caller should already have converted status codes into tonic::Status.
     141            0 :         if response.status_code != page_api::GetPageStatusCode::Ok {
     142            0 :             return Err(anyhow!(
     143            0 :                 "unexpected non-OK response for shard {shard_id}: {} {}",
     144            0 :                 response.status_code,
     145            0 :                 response.reason.unwrap_or_default()
     146            0 :             ));
     147            0 :         }
     148              : 
     149            0 :         if response.request_id != self.response.request_id {
     150            0 :             return Err(anyhow!(
     151            0 :                 "response ID mismatch for shard {shard_id}: expected {}, got {}",
     152            0 :                 self.response.request_id,
     153            0 :                 response.request_id
     154            0 :             ));
     155            0 :         }
     156              : 
     157            0 :         if response.request_id != self.response.request_id {
     158            0 :             return Err(anyhow!(
     159            0 :                 "response ID mismatch for shard {shard_id}: expected {}, got {}",
     160            0 :                 self.response.request_id,
     161            0 :                 response.request_id
     162            0 :             ));
     163            0 :         }
     164              : 
     165              :         // Place the shard response pages into the assembled response, in request order.
     166            0 :         let mut pages = response.pages.into_iter();
     167              : 
     168            0 :         for (i, &s) in self.block_shards.iter().enumerate() {
     169            0 :             if shard_id != s {
     170            0 :                 continue;
     171            0 :             }
     172              : 
     173            0 :             let Some(slot) = self.response.pages.get_mut(i) else {
     174            0 :                 return Err(anyhow!("no block_shards slot {i} for shard {shard_id}"));
     175              :             };
     176            0 :             let Some(page) = pages.next() else {
     177            0 :                 return Err(anyhow!(
     178            0 :                     "missing page {} in shard {shard_id} response",
     179            0 :                     slot.block_number
     180            0 :                 ));
     181              :             };
     182            0 :             if page.block_number != slot.block_number {
     183            0 :                 return Err(anyhow!(
     184            0 :                     "shard {shard_id} returned wrong page at index {i}, expected {} got {}",
     185            0 :                     slot.block_number,
     186            0 :                     page.block_number
     187            0 :                 ));
     188            0 :             }
     189            0 :             if !slot.image.is_empty() {
     190            0 :                 return Err(anyhow!(
     191            0 :                     "shard {shard_id} returned duplicate page {} at index {i}",
     192            0 :                     slot.block_number
     193            0 :                 ));
     194            0 :             }
     195              : 
     196            0 :             *slot = page;
     197              :         }
     198              : 
     199              :         // Make sure we've consumed all pages from the shard response.
     200            0 :         if let Some(extra_page) = pages.next() {
     201            0 :             return Err(anyhow!(
     202            0 :                 "shard {shard_id} returned extra page: {}",
     203            0 :                 extra_page.block_number
     204            0 :             ));
     205            0 :         }
     206              : 
     207            0 :         Ok(())
     208            0 :     }
     209              : 
     210              :     /// Fetches the final, assembled response.
     211              :     #[allow(clippy::result_large_err)]
     212            0 :     pub fn get_response(self) -> anyhow::Result<page_api::GetPageResponse> {
     213              :         // Check that the response is complete.
     214            0 :         for (i, page) in self.response.pages.iter().enumerate() {
     215            0 :             if page.image.is_empty() {
     216            0 :                 return Err(anyhow!(
     217            0 :                     "missing page {} for shard {}",
     218              :                     page.block_number,
     219            0 :                     self.block_shards
     220            0 :                         .get(i)
     221            0 :                         .map(|s| s.to_string())
     222            0 :                         .unwrap_or_else(|| "?".to_string())
     223              :                 ));
     224            0 :             }
     225              :         }
     226              : 
     227            0 :         Ok(self.response)
     228            0 :     }
     229              : }
        

Generated by: LCOV version 2.1-beta