Line data Source code
1 : use std::{ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
9 :
10 : use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
11 :
12 : /// An image writer that takes images and produces multiple image layers. The interface does not
13 : /// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
14 : /// to be cleaned up)
15 : #[must_use]
16 : pub struct SplitImageLayerWriter {
17 : inner: ImageLayerWriter,
18 : target_layer_size: u64,
19 : generated_layers: Vec<ResidentLayer>,
20 : conf: &'static PageServerConf,
21 : timeline_id: TimelineId,
22 : tenant_shard_id: TenantShardId,
23 : lsn: Lsn,
24 : }
25 :
26 : impl SplitImageLayerWriter {
27 6 : pub async fn new(
28 6 : conf: &'static PageServerConf,
29 6 : timeline_id: TimelineId,
30 6 : tenant_shard_id: TenantShardId,
31 6 : start_key: Key,
32 6 : lsn: Lsn,
33 6 : target_layer_size: u64,
34 6 : ctx: &RequestContext,
35 6 : ) -> anyhow::Result<Self> {
36 6 : Ok(Self {
37 6 : target_layer_size,
38 6 : inner: ImageLayerWriter::new(
39 6 : conf,
40 6 : timeline_id,
41 6 : tenant_shard_id,
42 6 : &(start_key..Key::MAX),
43 6 : lsn,
44 6 : ctx,
45 6 : )
46 3 : .await?,
47 6 : generated_layers: Vec::new(),
48 6 : conf,
49 6 : timeline_id,
50 6 : tenant_shard_id,
51 6 : lsn,
52 : })
53 6 : }
54 :
55 4006 : pub async fn put_image(
56 4006 : &mut self,
57 4006 : key: Key,
58 4006 : img: Bytes,
59 4006 : tline: &Arc<Timeline>,
60 4006 : ctx: &RequestContext,
61 4006 : ) -> anyhow::Result<()> {
62 4006 : // The current estimation is an upper bound of the space that the key/image could take
63 4006 : // because we did not consider compression in this estimation. The resulting image layer
64 4006 : // could be smaller than the target size.
65 4006 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
66 4006 : if self.inner.num_keys() >= 1
67 4000 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
68 : {
69 8 : let next_image_writer = ImageLayerWriter::new(
70 8 : self.conf,
71 8 : self.timeline_id,
72 8 : self.tenant_shard_id,
73 8 : &(key..Key::MAX),
74 8 : self.lsn,
75 8 : ctx,
76 8 : )
77 4 : .await?;
78 8 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
79 8 : self.generated_layers.push(
80 8 : prev_image_writer
81 8 : .finish_with_end_key(tline, key, ctx)
82 17 : .await?,
83 : );
84 3998 : }
85 4066 : self.inner.put_image(key, img, ctx).await
86 4006 : }
87 :
88 6 : pub(crate) async fn finish(
89 6 : self,
90 6 : tline: &Arc<Timeline>,
91 6 : ctx: &RequestContext,
92 6 : end_key: Key,
93 6 : ) -> anyhow::Result<Vec<ResidentLayer>> {
94 6 : let Self {
95 6 : mut generated_layers,
96 6 : inner,
97 6 : ..
98 6 : } = self;
99 12 : generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?);
100 6 : Ok(generated_layers)
101 6 : }
102 :
103 : /// When split writer fails, the caller should call this function and handle partially generated layers.
104 : #[allow(dead_code)]
105 0 : pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, ImageLayerWriter)> {
106 0 : Ok((self.generated_layers, self.inner))
107 0 : }
108 : }
109 :
110 : /// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
111 : /// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
112 : /// to be cleaned up).
113 : #[must_use]
114 : pub struct SplitDeltaLayerWriter {
115 : inner: DeltaLayerWriter,
116 : target_layer_size: u64,
117 : generated_layers: Vec<ResidentLayer>,
118 : conf: &'static PageServerConf,
119 : timeline_id: TimelineId,
120 : tenant_shard_id: TenantShardId,
121 : lsn_range: Range<Lsn>,
122 : }
123 :
124 : impl SplitDeltaLayerWriter {
125 6 : pub async fn new(
126 6 : conf: &'static PageServerConf,
127 6 : timeline_id: TimelineId,
128 6 : tenant_shard_id: TenantShardId,
129 6 : start_key: Key,
130 6 : lsn_range: Range<Lsn>,
131 6 : target_layer_size: u64,
132 6 : ctx: &RequestContext,
133 6 : ) -> anyhow::Result<Self> {
134 6 : Ok(Self {
135 6 : target_layer_size,
136 6 : inner: DeltaLayerWriter::new(
137 6 : conf,
138 6 : timeline_id,
139 6 : tenant_shard_id,
140 6 : start_key,
141 6 : lsn_range.clone(),
142 6 : ctx,
143 6 : )
144 3 : .await?,
145 6 : generated_layers: Vec::new(),
146 6 : conf,
147 6 : timeline_id,
148 6 : tenant_shard_id,
149 6 : lsn_range,
150 : })
151 6 : }
152 :
153 4006 : pub async fn put_value(
154 4006 : &mut self,
155 4006 : key: Key,
156 4006 : lsn: Lsn,
157 4006 : val: Value,
158 4006 : tline: &Arc<Timeline>,
159 4006 : ctx: &RequestContext,
160 4006 : ) -> anyhow::Result<()> {
161 4006 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
162 4006 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
163 4006 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
164 4006 : if self.inner.num_keys() >= 1
165 4000 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
166 : {
167 8 : let next_delta_writer = DeltaLayerWriter::new(
168 8 : self.conf,
169 8 : self.timeline_id,
170 8 : self.tenant_shard_id,
171 8 : key,
172 8 : self.lsn_range.clone(),
173 8 : ctx,
174 8 : )
175 4 : .await?;
176 8 : let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
177 21 : let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
178 8 : let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
179 8 : self.generated_layers.push(delta_layer);
180 3998 : }
181 4006 : self.inner.put_value(key, lsn, val, ctx).await
182 4006 : }
183 :
184 6 : pub(crate) async fn finish(
185 6 : self,
186 6 : tline: &Arc<Timeline>,
187 6 : ctx: &RequestContext,
188 6 : end_key: Key,
189 6 : ) -> anyhow::Result<Vec<ResidentLayer>> {
190 6 : let Self {
191 6 : mut generated_layers,
192 6 : inner,
193 6 : ..
194 6 : } = self;
195 :
196 15 : let (desc, path) = inner.finish(end_key, ctx).await?;
197 6 : let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
198 6 : generated_layers.push(delta_layer);
199 6 : Ok(generated_layers)
200 6 : }
201 :
202 : /// When split writer fails, the caller should call this function and handle partially generated layers.
203 : #[allow(dead_code)]
204 0 : pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, DeltaLayerWriter)> {
205 0 : Ok((self.generated_layers, self.inner))
206 0 : }
207 : }
208 :
209 : #[cfg(test)]
210 : mod tests {
211 : use rand::{RngCore, SeedableRng};
212 :
213 : use crate::{
214 : tenant::{
215 : harness::{TenantHarness, TIMELINE_ID},
216 : storage_layer::AsLayerDesc,
217 : },
218 : DEFAULT_PG_VERSION,
219 : };
220 :
221 : use super::*;
222 :
223 8036 : fn get_key(id: u32) -> Key {
224 8036 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
225 8036 : key.field6 = id;
226 8036 : key
227 8036 : }
228 :
229 8 : fn get_img(id: u32) -> Bytes {
230 8 : format!("{id:064}").into()
231 8 : }
232 :
233 8004 : fn get_large_img() -> Bytes {
234 8004 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
235 8004 : let mut data = vec![0; 8192];
236 8004 : rng.fill_bytes(&mut data);
237 8004 : data.into()
238 8004 : }
239 :
240 : #[tokio::test]
241 2 : async fn write_one_image() {
242 2 : let harness = TenantHarness::create("split_writer_write_one_image")
243 2 : .await
244 2 : .unwrap();
245 8 : let (tenant, ctx) = harness.load().await;
246 2 :
247 2 : let tline = tenant
248 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
249 4 : .await
250 2 : .unwrap();
251 2 :
252 2 : let mut image_writer = SplitImageLayerWriter::new(
253 2 : tenant.conf,
254 2 : tline.timeline_id,
255 2 : tenant.tenant_shard_id,
256 2 : get_key(0),
257 2 : Lsn(0x18),
258 2 : 4 * 1024 * 1024,
259 2 : &ctx,
260 2 : )
261 2 : .await
262 2 : .unwrap();
263 2 :
264 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
265 2 : tenant.conf,
266 2 : tline.timeline_id,
267 2 : tenant.tenant_shard_id,
268 2 : get_key(0),
269 2 : Lsn(0x18)..Lsn(0x20),
270 2 : 4 * 1024 * 1024,
271 2 : &ctx,
272 2 : )
273 2 : .await
274 2 : .unwrap();
275 2 :
276 2 : image_writer
277 2 : .put_image(get_key(0), get_img(0), &tline, &ctx)
278 2 : .await
279 2 : .unwrap();
280 2 : let layers = image_writer
281 2 : .finish(&tline, &ctx, get_key(10))
282 4 : .await
283 2 : .unwrap();
284 2 : assert_eq!(layers.len(), 1);
285 2 :
286 2 : delta_writer
287 2 : .put_value(
288 2 : get_key(0),
289 2 : Lsn(0x18),
290 2 : Value::Image(get_img(0)),
291 2 : &tline,
292 2 : &ctx,
293 2 : )
294 2 : .await
295 2 : .unwrap();
296 2 : let layers = delta_writer
297 2 : .finish(&tline, &ctx, get_key(10))
298 5 : .await
299 2 : .unwrap();
300 2 : assert_eq!(layers.len(), 1);
301 2 : }
302 :
303 : #[tokio::test]
304 2 : async fn write_split() {
305 2 : let harness = TenantHarness::create("split_writer_write_split")
306 2 : .await
307 2 : .unwrap();
308 8 : let (tenant, ctx) = harness.load().await;
309 2 :
310 2 : let tline = tenant
311 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
312 4 : .await
313 2 : .unwrap();
314 2 :
315 2 : let mut image_writer = SplitImageLayerWriter::new(
316 2 : tenant.conf,
317 2 : tline.timeline_id,
318 2 : tenant.tenant_shard_id,
319 2 : get_key(0),
320 2 : Lsn(0x18),
321 2 : 4 * 1024 * 1024,
322 2 : &ctx,
323 2 : )
324 2 : .await
325 2 : .unwrap();
326 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
327 2 : tenant.conf,
328 2 : tline.timeline_id,
329 2 : tenant.tenant_shard_id,
330 2 : get_key(0),
331 2 : Lsn(0x18)..Lsn(0x20),
332 2 : 4 * 1024 * 1024,
333 2 : &ctx,
334 2 : )
335 2 : .await
336 2 : .unwrap();
337 2 : const N: usize = 2000;
338 4002 : for i in 0..N {
339 4000 : let i = i as u32;
340 4000 : image_writer
341 4000 : .put_image(get_key(i), get_large_img(), &tline, &ctx)
342 4076 : .await
343 4000 : .unwrap();
344 4000 : delta_writer
345 4000 : .put_value(
346 4000 : get_key(i),
347 4000 : Lsn(0x20),
348 4000 : Value::Image(get_large_img()),
349 4000 : &tline,
350 4000 : &ctx,
351 4000 : )
352 271 : .await
353 4000 : .unwrap();
354 2 : }
355 2 : let image_layers = image_writer
356 2 : .finish(&tline, &ctx, get_key(N as u32))
357 4 : .await
358 2 : .unwrap();
359 2 : let delta_layers = delta_writer
360 2 : .finish(&tline, &ctx, get_key(N as u32))
361 5 : .await
362 2 : .unwrap();
363 2 : assert_eq!(image_layers.len(), N / 512 + 1);
364 2 : assert_eq!(delta_layers.len(), N / 512 + 1);
365 8 : for idx in 0..image_layers.len() {
366 8 : assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
367 8 : assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
368 8 : assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
369 8 : assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
370 8 : if idx > 0 {
371 6 : assert_eq!(
372 6 : image_layers[idx - 1].layer_desc().key_range.end,
373 6 : image_layers[idx].layer_desc().key_range.start
374 6 : );
375 6 : assert_eq!(
376 6 : delta_layers[idx - 1].layer_desc().key_range.end,
377 6 : delta_layers[idx].layer_desc().key_range.start
378 6 : );
379 2 : }
380 2 : }
381 2 : }
382 :
383 : #[tokio::test]
384 2 : async fn write_large_img() {
385 2 : let harness = TenantHarness::create("split_writer_write_large_img")
386 2 : .await
387 2 : .unwrap();
388 8 : let (tenant, ctx) = harness.load().await;
389 2 :
390 2 : let tline = tenant
391 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
392 4 : .await
393 2 : .unwrap();
394 2 :
395 2 : let mut image_writer = SplitImageLayerWriter::new(
396 2 : tenant.conf,
397 2 : tline.timeline_id,
398 2 : tenant.tenant_shard_id,
399 2 : get_key(0),
400 2 : Lsn(0x18),
401 2 : 4 * 1024,
402 2 : &ctx,
403 2 : )
404 2 : .await
405 2 : .unwrap();
406 2 :
407 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
408 2 : tenant.conf,
409 2 : tline.timeline_id,
410 2 : tenant.tenant_shard_id,
411 2 : get_key(0),
412 2 : Lsn(0x18)..Lsn(0x20),
413 2 : 4 * 1024,
414 2 : &ctx,
415 2 : )
416 2 : .await
417 2 : .unwrap();
418 2 :
419 2 : image_writer
420 2 : .put_image(get_key(0), get_img(0), &tline, &ctx)
421 2 : .await
422 2 : .unwrap();
423 2 : image_writer
424 2 : .put_image(get_key(1), get_large_img(), &tline, &ctx)
425 7 : .await
426 2 : .unwrap();
427 2 : let layers = image_writer
428 2 : .finish(&tline, &ctx, get_key(10))
429 4 : .await
430 2 : .unwrap();
431 2 : assert_eq!(layers.len(), 2);
432 2 :
433 2 : delta_writer
434 2 : .put_value(
435 2 : get_key(0),
436 2 : Lsn(0x18),
437 2 : Value::Image(get_img(0)),
438 2 : &tline,
439 2 : &ctx,
440 2 : )
441 2 : .await
442 2 : .unwrap();
443 2 : delta_writer
444 2 : .put_value(
445 2 : get_key(1),
446 2 : Lsn(0x1A),
447 2 : Value::Image(get_large_img()),
448 2 : &tline,
449 2 : &ctx,
450 2 : )
451 6 : .await
452 2 : .unwrap();
453 2 : let layers = delta_writer
454 2 : .finish(&tline, &ctx, get_key(10))
455 5 : .await
456 2 : .unwrap();
457 2 : assert_eq!(layers.len(), 2);
458 2 : }
459 : }
|