Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use bytes::Bytes;
4 :
5 : use pageserver_api::key::rel_block_to_key;
6 : use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
7 : use pageserver_page_api as page_api;
8 : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
9 :
10 : /// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
11 : /// TODO: add tests for this.
12 : pub struct GetPageSplitter {
13 : /// Split requests by shard index.
14 : requests: HashMap<ShardIndex, page_api::GetPageRequest>,
15 : /// The response being assembled. Preallocated with empty pages, to be filled in.
16 : response: page_api::GetPageResponse,
17 : /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
18 : /// to assemble the response pages in the same order as the original request.
19 : block_shards: Vec<ShardIndex>,
20 : }
21 :
22 : impl GetPageSplitter {
23 : /// Checks if the given request only touches a single shard, and returns the shard ID. This is
24 : /// the common case, so we check first in order to avoid unnecessary allocations and overhead.
25 0 : pub fn for_single_shard(
26 0 : req: &page_api::GetPageRequest,
27 0 : count: ShardCount,
28 0 : stripe_size: ShardStripeSize,
29 0 : ) -> Option<ShardIndex> {
30 : // Fast path: unsharded tenant.
31 0 : if count.is_unsharded() {
32 0 : return Some(ShardIndex::unsharded());
33 0 : }
34 :
35 : // Find the first page's shard, for comparison. If there are no pages, just return the first
36 : // shard (caller likely checked already, otherwise the server will reject it).
37 0 : let Some(&first_page) = req.block_numbers.first() else {
38 0 : return Some(ShardIndex::new(ShardNumber(0), count));
39 : };
40 0 : let key = rel_block_to_key(req.rel, first_page);
41 0 : let shard_number = key_to_shard_number(count, stripe_size, &key);
42 :
43 0 : req.block_numbers
44 0 : .iter()
45 0 : .skip(1) // computed above
46 0 : .all(|&blkno| {
47 0 : let key = rel_block_to_key(req.rel, blkno);
48 0 : key_to_shard_number(count, stripe_size, &key) == shard_number
49 0 : })
50 0 : .then_some(ShardIndex::new(shard_number, count))
51 0 : }
52 :
53 : /// Splits the given request.
54 0 : pub fn split(
55 0 : req: page_api::GetPageRequest,
56 0 : count: ShardCount,
57 0 : stripe_size: ShardStripeSize,
58 0 : ) -> Self {
59 : // The caller should make sure we don't split requests unnecessarily.
60 0 : debug_assert!(
61 0 : Self::for_single_shard(&req, count, stripe_size).is_none(),
62 0 : "unnecessary request split"
63 : );
64 :
65 : // Split the requests by shard index.
66 0 : let mut requests = HashMap::with_capacity(2); // common case
67 0 : let mut block_shards = Vec::with_capacity(req.block_numbers.len());
68 0 : for &blkno in &req.block_numbers {
69 0 : let key = rel_block_to_key(req.rel, blkno);
70 0 : let shard_number = key_to_shard_number(count, stripe_size, &key);
71 0 : let shard_id = ShardIndex::new(shard_number, count);
72 :
73 0 : requests
74 0 : .entry(shard_id)
75 0 : .or_insert_with(|| page_api::GetPageRequest {
76 0 : request_id: req.request_id,
77 0 : request_class: req.request_class,
78 0 : rel: req.rel,
79 0 : read_lsn: req.read_lsn,
80 0 : block_numbers: Vec::new(),
81 0 : })
82 : .block_numbers
83 0 : .push(blkno);
84 0 : block_shards.push(shard_id);
85 : }
86 :
87 : // Construct a response to be populated by shard responses. Preallocate empty page slots
88 : // with the expected block numbers.
89 0 : let response = page_api::GetPageResponse {
90 0 : request_id: req.request_id,
91 0 : status_code: page_api::GetPageStatusCode::Ok,
92 0 : reason: None,
93 0 : rel: req.rel,
94 0 : pages: req
95 0 : .block_numbers
96 0 : .into_iter()
97 0 : .map(|block_number| {
98 0 : page_api::Page {
99 0 : block_number,
100 0 : image: Bytes::new(), // empty page slot to be filled in
101 0 : }
102 0 : })
103 0 : .collect(),
104 : };
105 :
106 0 : Self {
107 0 : requests,
108 0 : response,
109 0 : block_shards,
110 0 : }
111 0 : }
112 :
113 : /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
114 0 : pub fn drain_requests(
115 0 : &mut self,
116 0 : ) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
117 0 : self.requests.drain()
118 0 : }
119 :
120 : /// Adds a response from the given shard. The response must match the request ID and have an OK
121 : /// status code. A response must not already exist for the given shard ID.
122 : #[allow(clippy::result_large_err)]
123 0 : pub fn add_response(
124 0 : &mut self,
125 0 : shard_id: ShardIndex,
126 0 : response: page_api::GetPageResponse,
127 0 : ) -> tonic::Result<()> {
128 : // The caller should already have converted status codes into tonic::Status.
129 0 : if response.status_code != page_api::GetPageStatusCode::Ok {
130 0 : return Err(tonic::Status::internal(format!(
131 0 : "unexpected non-OK response for shard {shard_id}: {} {}",
132 0 : response.status_code,
133 0 : response.reason.unwrap_or_default()
134 0 : )));
135 0 : }
136 :
137 0 : if response.request_id != self.response.request_id {
138 0 : return Err(tonic::Status::internal(format!(
139 0 : "response ID mismatch for shard {shard_id}: expected {}, got {}",
140 0 : self.response.request_id, response.request_id
141 0 : )));
142 0 : }
143 :
144 : // Place the shard response pages into the assembled response, in request order.
145 0 : let mut pages = response.pages.into_iter();
146 :
147 0 : for (i, &s) in self.block_shards.iter().enumerate() {
148 0 : if shard_id != s {
149 0 : continue;
150 0 : }
151 :
152 0 : let Some(slot) = self.response.pages.get_mut(i) else {
153 0 : return Err(tonic::Status::internal(format!(
154 0 : "no block_shards slot {i} for shard {shard_id}"
155 0 : )));
156 : };
157 0 : let Some(page) = pages.next() else {
158 0 : return Err(tonic::Status::internal(format!(
159 0 : "missing page {} in shard {shard_id} response",
160 0 : slot.block_number
161 0 : )));
162 : };
163 0 : if page.block_number != slot.block_number {
164 0 : return Err(tonic::Status::internal(format!(
165 0 : "shard {shard_id} returned wrong page at index {i}, expected {} got {}",
166 0 : slot.block_number, page.block_number
167 0 : )));
168 0 : }
169 0 : if !slot.image.is_empty() {
170 0 : return Err(tonic::Status::internal(format!(
171 0 : "shard {shard_id} returned duplicate page {} at index {i}",
172 0 : slot.block_number
173 0 : )));
174 0 : }
175 :
176 0 : *slot = page;
177 : }
178 :
179 : // Make sure we've consumed all pages from the shard response.
180 0 : if let Some(extra_page) = pages.next() {
181 0 : return Err(tonic::Status::internal(format!(
182 0 : "shard {shard_id} returned extra page: {}",
183 0 : extra_page.block_number
184 0 : )));
185 0 : }
186 :
187 0 : Ok(())
188 0 : }
189 :
190 : /// Fetches the final, assembled response.
191 : #[allow(clippy::result_large_err)]
192 0 : pub fn get_response(self) -> tonic::Result<page_api::GetPageResponse> {
193 : // Check that the response is complete.
194 0 : for (i, page) in self.response.pages.iter().enumerate() {
195 0 : if page.image.is_empty() {
196 0 : return Err(tonic::Status::internal(format!(
197 0 : "missing page {} for shard {}",
198 : page.block_number,
199 0 : self.block_shards
200 0 : .get(i)
201 0 : .map(|s| s.to_string())
202 0 : .unwrap_or_else(|| "?".to_string())
203 : )));
204 0 : }
205 : }
206 :
207 0 : Ok(self.response)
208 0 : }
209 : }
|