Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use anyhow::anyhow;
4 : use bytes::Bytes;
5 :
6 : use pageserver_api::key::rel_block_to_key;
7 : use pageserver_api::shard::key_to_shard_number;
8 : use pageserver_page_api as page_api;
9 : use utils::shard::{ShardCount, ShardIndex, ShardStripeSize};
10 :
11 : /// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
12 : /// TODO: add tests for this.
13 : pub struct GetPageSplitter {
14 : /// Split requests by shard index.
15 : requests: HashMap<ShardIndex, page_api::GetPageRequest>,
16 : /// The response being assembled. Preallocated with empty pages, to be filled in.
17 : response: page_api::GetPageResponse,
18 : /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
19 : /// to assemble the response pages in the same order as the original request.
20 : block_shards: Vec<ShardIndex>,
21 : }
22 :
23 : impl GetPageSplitter {
24 : /// Checks if the given request only touches a single shard, and returns the shard ID. This is
25 : /// the common case, so we check first in order to avoid unnecessary allocations and overhead.
26 0 : pub fn for_single_shard(
27 0 : req: &page_api::GetPageRequest,
28 0 : count: ShardCount,
29 0 : stripe_size: Option<ShardStripeSize>,
30 0 : ) -> anyhow::Result<Option<ShardIndex>> {
31 : // Fast path: unsharded tenant.
32 0 : if count.is_unsharded() {
33 0 : return Ok(Some(ShardIndex::unsharded()));
34 0 : }
35 :
36 0 : let Some(stripe_size) = stripe_size else {
37 0 : return Err(anyhow!("stripe size must be given for sharded tenants"));
38 : };
39 :
40 : // Find the first page's shard, for comparison.
41 0 : let Some(&first_page) = req.block_numbers.first() else {
42 0 : return Err(anyhow!("no block numbers in request"));
43 : };
44 0 : let key = rel_block_to_key(req.rel, first_page);
45 0 : let shard_number = key_to_shard_number(count, stripe_size, &key);
46 :
47 0 : Ok(req
48 0 : .block_numbers
49 0 : .iter()
50 0 : .skip(1) // computed above
51 0 : .all(|&blkno| {
52 0 : let key = rel_block_to_key(req.rel, blkno);
53 0 : key_to_shard_number(count, stripe_size, &key) == shard_number
54 0 : })
55 0 : .then_some(ShardIndex::new(shard_number, count)))
56 0 : }
57 :
58 : /// Splits the given request.
59 0 : pub fn split(
60 0 : req: page_api::GetPageRequest,
61 0 : count: ShardCount,
62 0 : stripe_size: Option<ShardStripeSize>,
63 0 : ) -> anyhow::Result<Self> {
64 : // The caller should make sure we don't split requests unnecessarily.
65 0 : debug_assert!(
66 0 : Self::for_single_shard(&req, count, stripe_size)?.is_none(),
67 0 : "unnecessary request split"
68 : );
69 :
70 0 : if count.is_unsharded() {
71 0 : return Err(anyhow!("unsharded tenant, no point in splitting request"));
72 0 : }
73 0 : let Some(stripe_size) = stripe_size else {
74 0 : return Err(anyhow!("stripe size must be given for sharded tenants"));
75 : };
76 :
77 : // Split the requests by shard index.
78 0 : let mut requests = HashMap::with_capacity(2); // common case
79 0 : let mut block_shards = Vec::with_capacity(req.block_numbers.len());
80 0 : for &blkno in &req.block_numbers {
81 0 : let key = rel_block_to_key(req.rel, blkno);
82 0 : let shard_number = key_to_shard_number(count, stripe_size, &key);
83 0 : let shard_id = ShardIndex::new(shard_number, count);
84 :
85 0 : requests
86 0 : .entry(shard_id)
87 0 : .or_insert_with(|| page_api::GetPageRequest {
88 0 : request_id: req.request_id,
89 0 : request_class: req.request_class,
90 0 : rel: req.rel,
91 0 : read_lsn: req.read_lsn,
92 0 : block_numbers: Vec::new(),
93 0 : })
94 : .block_numbers
95 0 : .push(blkno);
96 0 : block_shards.push(shard_id);
97 : }
98 :
99 : // Construct a response to be populated by shard responses. Preallocate empty page slots
100 : // with the expected block numbers.
101 0 : let response = page_api::GetPageResponse {
102 0 : request_id: req.request_id,
103 0 : status_code: page_api::GetPageStatusCode::Ok,
104 0 : reason: None,
105 0 : rel: req.rel,
106 0 : pages: req
107 0 : .block_numbers
108 0 : .into_iter()
109 0 : .map(|block_number| {
110 0 : page_api::Page {
111 0 : block_number,
112 0 : image: Bytes::new(), // empty page slot to be filled in
113 0 : }
114 0 : })
115 0 : .collect(),
116 : };
117 :
118 0 : Ok(Self {
119 0 : requests,
120 0 : response,
121 0 : block_shards,
122 0 : })
123 0 : }
124 :
125 : /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
126 0 : pub fn drain_requests(
127 0 : &mut self,
128 0 : ) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
129 0 : self.requests.drain()
130 0 : }
131 :
132 : /// Adds a response from the given shard. The response must match the request ID and have an OK
133 : /// status code. A response must not already exist for the given shard ID.
134 : #[allow(clippy::result_large_err)]
135 0 : pub fn add_response(
136 0 : &mut self,
137 0 : shard_id: ShardIndex,
138 0 : response: page_api::GetPageResponse,
139 0 : ) -> anyhow::Result<()> {
140 : // The caller should already have converted status codes into tonic::Status.
141 0 : if response.status_code != page_api::GetPageStatusCode::Ok {
142 0 : return Err(anyhow!(
143 0 : "unexpected non-OK response for shard {shard_id}: {} {}",
144 0 : response.status_code,
145 0 : response.reason.unwrap_or_default()
146 0 : ));
147 0 : }
148 :
149 0 : if response.request_id != self.response.request_id {
150 0 : return Err(anyhow!(
151 0 : "response ID mismatch for shard {shard_id}: expected {}, got {}",
152 0 : self.response.request_id,
153 0 : response.request_id
154 0 : ));
155 0 : }
156 :
157 0 : if response.request_id != self.response.request_id {
158 0 : return Err(anyhow!(
159 0 : "response ID mismatch for shard {shard_id}: expected {}, got {}",
160 0 : self.response.request_id,
161 0 : response.request_id
162 0 : ));
163 0 : }
164 :
165 : // Place the shard response pages into the assembled response, in request order.
166 0 : let mut pages = response.pages.into_iter();
167 :
168 0 : for (i, &s) in self.block_shards.iter().enumerate() {
169 0 : if shard_id != s {
170 0 : continue;
171 0 : }
172 :
173 0 : let Some(slot) = self.response.pages.get_mut(i) else {
174 0 : return Err(anyhow!("no block_shards slot {i} for shard {shard_id}"));
175 : };
176 0 : let Some(page) = pages.next() else {
177 0 : return Err(anyhow!(
178 0 : "missing page {} in shard {shard_id} response",
179 0 : slot.block_number
180 0 : ));
181 : };
182 0 : if page.block_number != slot.block_number {
183 0 : return Err(anyhow!(
184 0 : "shard {shard_id} returned wrong page at index {i}, expected {} got {}",
185 0 : slot.block_number,
186 0 : page.block_number
187 0 : ));
188 0 : }
189 0 : if !slot.image.is_empty() {
190 0 : return Err(anyhow!(
191 0 : "shard {shard_id} returned duplicate page {} at index {i}",
192 0 : slot.block_number
193 0 : ));
194 0 : }
195 :
196 0 : *slot = page;
197 : }
198 :
199 : // Make sure we've consumed all pages from the shard response.
200 0 : if let Some(extra_page) = pages.next() {
201 0 : return Err(anyhow!(
202 0 : "shard {shard_id} returned extra page: {}",
203 0 : extra_page.block_number
204 0 : ));
205 0 : }
206 :
207 0 : Ok(())
208 0 : }
209 :
210 : /// Fetches the final, assembled response.
211 : #[allow(clippy::result_large_err)]
212 0 : pub fn get_response(self) -> anyhow::Result<page_api::GetPageResponse> {
213 : // Check that the response is complete.
214 0 : for (i, page) in self.response.pages.iter().enumerate() {
215 0 : if page.image.is_empty() {
216 0 : return Err(anyhow!(
217 0 : "missing page {} for shard {}",
218 : page.block_number,
219 0 : self.block_shards
220 0 : .get(i)
221 0 : .map(|s| s.to_string())
222 0 : .unwrap_or_else(|| "?".to_string())
223 : ));
224 0 : }
225 : }
226 :
227 0 : Ok(self.response)
228 0 : }
229 : }
|