Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use bytes::Bytes;
4 :
5 : use crate::model::*;
6 : use pageserver_api::key::rel_block_to_key;
7 : use pageserver_api::shard::key_to_shard_number;
8 : use utils::shard::{ShardCount, ShardIndex, ShardStripeSize};
9 :
10 : /// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
11 : /// TODO: add tests for this.
12 : pub struct GetPageSplitter {
13 : /// Split requests by shard index.
14 : requests: HashMap<ShardIndex, GetPageRequest>,
15 : /// The response being assembled. Preallocated with empty pages, to be filled in.
16 : response: GetPageResponse,
17 : /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
18 : /// to assemble the response pages in the same order as the original request.
19 : block_shards: Vec<ShardIndex>,
20 : }
21 :
22 : impl GetPageSplitter {
23 : /// Checks if the given request only touches a single shard, and returns the shard ID. This is
24 : /// the common case, so we check first in order to avoid unnecessary allocations and overhead.
25 0 : pub fn for_single_shard(
26 0 : req: &GetPageRequest,
27 0 : count: ShardCount,
28 0 : stripe_size: Option<ShardStripeSize>,
29 0 : ) -> Result<Option<ShardIndex>, SplitError> {
30 : // Fast path: unsharded tenant.
31 0 : if count.is_unsharded() {
32 0 : return Ok(Some(ShardIndex::unsharded()));
33 0 : }
34 :
35 0 : let Some(stripe_size) = stripe_size else {
36 0 : return Err("stripe size must be given for sharded tenants".into());
37 : };
38 :
39 : // Find the first page's shard, for comparison.
40 0 : let Some(&first_page) = req.block_numbers.first() else {
41 0 : return Err("no block numbers in request".into());
42 : };
43 0 : let key = rel_block_to_key(req.rel, first_page);
44 0 : let shard_number = key_to_shard_number(count, stripe_size, &key);
45 :
46 0 : Ok(req
47 0 : .block_numbers
48 0 : .iter()
49 0 : .skip(1) // computed above
50 0 : .all(|&blkno| {
51 0 : let key = rel_block_to_key(req.rel, blkno);
52 0 : key_to_shard_number(count, stripe_size, &key) == shard_number
53 0 : })
54 0 : .then_some(ShardIndex::new(shard_number, count)))
55 0 : }
56 :
57 : /// Splits the given request.
58 0 : pub fn split(
59 0 : req: GetPageRequest,
60 0 : count: ShardCount,
61 0 : stripe_size: Option<ShardStripeSize>,
62 0 : ) -> Result<Self, SplitError> {
63 : // The caller should make sure we don't split requests unnecessarily.
64 0 : debug_assert!(
65 0 : Self::for_single_shard(&req, count, stripe_size)?.is_none(),
66 0 : "unnecessary request split"
67 : );
68 :
69 0 : if count.is_unsharded() {
70 0 : return Err("unsharded tenant, no point in splitting request".into());
71 0 : }
72 0 : let Some(stripe_size) = stripe_size else {
73 0 : return Err("stripe size must be given for sharded tenants".into());
74 : };
75 :
76 : // Split the requests by shard index.
77 0 : let mut requests = HashMap::with_capacity(2); // common case
78 0 : let mut block_shards = Vec::with_capacity(req.block_numbers.len());
79 0 : for &blkno in &req.block_numbers {
80 0 : let key = rel_block_to_key(req.rel, blkno);
81 0 : let shard_number = key_to_shard_number(count, stripe_size, &key);
82 0 : let shard_id = ShardIndex::new(shard_number, count);
83 :
84 0 : requests
85 0 : .entry(shard_id)
86 0 : .or_insert_with(|| GetPageRequest {
87 0 : request_id: req.request_id,
88 0 : request_class: req.request_class,
89 0 : rel: req.rel,
90 0 : read_lsn: req.read_lsn,
91 0 : block_numbers: Vec::new(),
92 0 : })
93 : .block_numbers
94 0 : .push(blkno);
95 0 : block_shards.push(shard_id);
96 : }
97 :
98 : // Construct a response to be populated by shard responses. Preallocate empty page slots
99 : // with the expected block numbers.
100 0 : let response = GetPageResponse {
101 0 : request_id: req.request_id,
102 0 : status_code: GetPageStatusCode::Ok,
103 0 : reason: None,
104 0 : rel: req.rel,
105 0 : pages: req
106 0 : .block_numbers
107 0 : .into_iter()
108 0 : .map(|block_number| {
109 0 : Page {
110 0 : block_number,
111 0 : image: Bytes::new(), // empty page slot to be filled in
112 0 : }
113 0 : })
114 0 : .collect(),
115 : };
116 :
117 0 : Ok(Self {
118 0 : requests,
119 0 : response,
120 0 : block_shards,
121 0 : })
122 0 : }
123 :
124 : /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
125 0 : pub fn drain_requests(&mut self) -> impl Iterator<Item = (ShardIndex, GetPageRequest)> {
126 0 : self.requests.drain()
127 0 : }
128 :
129 : /// Adds a response from the given shard. The response must match the request ID and have an OK
130 : /// status code. A response must not already exist for the given shard ID.
131 0 : pub fn add_response(
132 0 : &mut self,
133 0 : shard_id: ShardIndex,
134 0 : response: GetPageResponse,
135 0 : ) -> Result<(), SplitError> {
136 : // The caller should already have converted status codes into tonic::Status.
137 0 : if response.status_code != GetPageStatusCode::Ok {
138 0 : return Err(SplitError(format!(
139 0 : "unexpected non-OK response for shard {shard_id}: {} {}",
140 0 : response.status_code,
141 0 : response.reason.unwrap_or_default()
142 0 : )));
143 0 : }
144 :
145 0 : if response.request_id != self.response.request_id {
146 0 : return Err(SplitError(format!(
147 0 : "response ID mismatch for shard {shard_id}: expected {}, got {}",
148 0 : self.response.request_id, response.request_id
149 0 : )));
150 0 : }
151 :
152 0 : if response.request_id != self.response.request_id {
153 0 : return Err(SplitError(format!(
154 0 : "response ID mismatch for shard {shard_id}: expected {}, got {}",
155 0 : self.response.request_id, response.request_id
156 0 : )));
157 0 : }
158 :
159 : // Place the shard response pages into the assembled response, in request order.
160 0 : let mut pages = response.pages.into_iter();
161 :
162 0 : for (i, &s) in self.block_shards.iter().enumerate() {
163 0 : if shard_id != s {
164 0 : continue;
165 0 : }
166 :
167 0 : let Some(slot) = self.response.pages.get_mut(i) else {
168 0 : return Err(SplitError(format!(
169 0 : "no block_shards slot {i} for shard {shard_id}"
170 0 : )));
171 : };
172 0 : let Some(page) = pages.next() else {
173 0 : return Err(SplitError(format!(
174 0 : "missing page {} in shard {shard_id} response",
175 0 : slot.block_number
176 0 : )));
177 : };
178 0 : if page.block_number != slot.block_number {
179 0 : return Err(SplitError(format!(
180 0 : "shard {shard_id} returned wrong page at index {i}, expected {} got {}",
181 0 : slot.block_number, page.block_number
182 0 : )));
183 0 : }
184 0 : if !slot.image.is_empty() {
185 0 : return Err(SplitError(format!(
186 0 : "shard {shard_id} returned duplicate page {} at index {i}",
187 0 : slot.block_number
188 0 : )));
189 0 : }
190 :
191 0 : *slot = page;
192 : }
193 :
194 : // Make sure we've consumed all pages from the shard response.
195 0 : if let Some(extra_page) = pages.next() {
196 0 : return Err(SplitError(format!(
197 0 : "shard {shard_id} returned extra page: {}",
198 0 : extra_page.block_number
199 0 : )));
200 0 : }
201 :
202 0 : Ok(())
203 0 : }
204 :
205 : /// Collects the final, assembled response.
206 0 : pub fn collect_response(self) -> Result<GetPageResponse, SplitError> {
207 : // Check that the response is complete.
208 0 : for (i, page) in self.response.pages.iter().enumerate() {
209 0 : if page.image.is_empty() {
210 0 : return Err(SplitError(format!(
211 0 : "missing page {} for shard {}",
212 : page.block_number,
213 0 : self.block_shards
214 0 : .get(i)
215 0 : .map(|s| s.to_string())
216 0 : .unwrap_or_else(|| "?".to_string())
217 : )));
218 0 : }
219 : }
220 :
221 0 : Ok(self.response)
222 0 : }
223 : }
224 :
225 : /// A GetPageSplitter error.
226 : #[derive(Debug, thiserror::Error)]
227 : #[error("{0}")]
228 : pub struct SplitError(String);
229 :
230 : impl From<&str> for SplitError {
231 0 : fn from(err: &str) -> Self {
232 0 : SplitError(err.to_string())
233 0 : }
234 : }
235 :
236 : impl From<String> for SplitError {
237 0 : fn from(err: String) -> Self {
238 0 : SplitError(err)
239 0 : }
240 : }
241 :
242 : impl From<SplitError> for tonic::Status {
243 0 : fn from(err: SplitError) -> Self {
244 0 : tonic::Status::internal(err.0)
245 0 : }
246 : }
|