Line data Source code
1 : use std::future::Future;
2 : use std::ops::Range;
3 : use std::sync::Arc;
4 :
5 : use bytes::Bytes;
6 : use pageserver_api::key::{KEY_SIZE, Key};
7 : use pageserver_api::value::Value;
8 : use utils::id::TimelineId;
9 : use utils::lsn::Lsn;
10 : use utils::shard::TenantShardId;
11 :
12 : use super::layer::S3_UPLOAD_LIMIT;
13 : use super::{
14 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
15 : };
16 : use crate::config::PageServerConf;
17 : use crate::context::RequestContext;
18 : use crate::tenant::Timeline;
19 : use crate::tenant::storage_layer::Layer;
20 :
21 : pub(crate) enum BatchWriterResult {
22 : Produced(ResidentLayer),
23 : Discarded(PersistentLayerKey),
24 : }
25 :
26 : #[cfg(test)]
27 : impl BatchWriterResult {
28 48 : fn into_resident_layer(self) -> ResidentLayer {
29 48 : match self {
30 48 : BatchWriterResult::Produced(layer) => layer,
31 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
32 : }
33 48 : }
34 :
35 32 : fn into_discarded_layer(self) -> PersistentLayerKey {
36 32 : match self {
37 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
38 32 : BatchWriterResult::Discarded(layer) => layer,
39 32 : }
40 32 : }
41 : }
42 :
43 : enum LayerWriterWrapper {
44 : Image(ImageLayerWriter),
45 : Delta(DeltaLayerWriter),
46 : }
47 :
48 : /// An layer writer that takes unfinished layers and finish them atomically.
49 : #[must_use]
50 : pub struct BatchLayerWriter {
51 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
52 : conf: &'static PageServerConf,
53 : }
54 :
55 : impl BatchLayerWriter {
56 1372 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
57 1372 : Ok(Self {
58 1372 : generated_layer_writers: Vec::new(),
59 1372 : conf,
60 1372 : })
61 1372 : }
62 :
63 616 : pub fn add_unfinished_image_writer(
64 616 : &mut self,
65 616 : writer: ImageLayerWriter,
66 616 : key_range: Range<Key>,
67 616 : lsn: Lsn,
68 616 : ) {
69 616 : self.generated_layer_writers.push((
70 616 : LayerWriterWrapper::Image(writer),
71 616 : PersistentLayerKey {
72 616 : key_range,
73 616 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
74 616 : is_delta: false,
75 616 : },
76 616 : ));
77 616 : }
78 :
79 120 : pub fn add_unfinished_delta_writer(
80 120 : &mut self,
81 120 : writer: DeltaLayerWriter,
82 120 : key_range: Range<Key>,
83 120 : lsn_range: Range<Lsn>,
84 120 : ) {
85 120 : self.generated_layer_writers.push((
86 120 : LayerWriterWrapper::Delta(writer),
87 120 : PersistentLayerKey {
88 120 : key_range,
89 120 : lsn_range,
90 120 : is_delta: true,
91 120 : },
92 120 : ));
93 120 : }
94 :
95 1148 : pub(crate) async fn finish(
96 1148 : self,
97 1148 : tline: &Arc<Timeline>,
98 1148 : ctx: &RequestContext,
99 1148 : ) -> anyhow::Result<Vec<ResidentLayer>> {
100 1148 : let res = self
101 1148 : .finish_with_discard_fn(tline, ctx, |_| async { false })
102 1148 : .await?;
103 1148 : let mut output = Vec::new();
104 1644 : for r in res {
105 496 : if let BatchWriterResult::Produced(layer) = r {
106 496 : output.push(layer);
107 496 : }
108 : }
109 1148 : Ok(output)
110 1148 : }
111 :
112 1356 : pub(crate) async fn finish_with_discard_fn<D, F>(
113 1356 : self,
114 1356 : tline: &Arc<Timeline>,
115 1356 : ctx: &RequestContext,
116 1356 : discard_fn: D,
117 1356 : ) -> anyhow::Result<Vec<BatchWriterResult>>
118 1356 : where
119 1356 : D: Fn(&PersistentLayerKey) -> F,
120 1356 : F: Future<Output = bool>,
121 1356 : {
122 1356 : let Self {
123 1356 : generated_layer_writers,
124 1356 : ..
125 1356 : } = self;
126 1356 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
127 0 : for produced_layer in generated_layers {
128 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
129 0 : let layer: Layer = resident_layer.into();
130 0 : layer.delete_on_drop();
131 0 : }
132 : }
133 0 : };
134 : // BEGIN: catch every error and do the recovery in the below section
135 1356 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
136 2092 : for (inner, layer_key) in generated_layer_writers {
137 736 : if discard_fn(&layer_key).await {
138 76 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
139 76 : } else {
140 660 : let res = match inner {
141 76 : LayerWriterWrapper::Delta(writer) => {
142 76 : writer.finish(layer_key.key_range.end, ctx).await
143 : }
144 584 : LayerWriterWrapper::Image(writer) => {
145 584 : writer
146 584 : .finish_with_end_key(layer_key.key_range.end, ctx)
147 584 : .await
148 : }
149 : };
150 660 : let layer = match res {
151 660 : Ok((desc, path)) => {
152 660 : match Layer::finish_creating(self.conf, tline, desc, &path) {
153 660 : Ok(layer) => layer,
154 0 : Err(e) => {
155 0 : tokio::fs::remove_file(&path).await.ok();
156 0 : clean_up_layers(generated_layers);
157 0 : return Err(e);
158 : }
159 : }
160 : }
161 0 : Err(e) => {
162 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
163 0 : // so we don't need to remove the layer we just failed to create by ourselves.
164 0 : clean_up_layers(generated_layers);
165 0 : return Err(e);
166 : }
167 : };
168 660 : generated_layers.push(BatchWriterResult::Produced(layer));
169 : }
170 : }
171 : // END: catch every error and do the recovery in the above section
172 1356 : Ok(generated_layers)
173 1356 : }
174 :
175 4 : pub fn pending_layer_num(&self) -> usize {
176 4 : self.generated_layer_writers.len()
177 4 : }
178 : }
179 :
180 : /// An image writer that takes images and produces multiple image layers.
181 : #[must_use]
182 : pub struct SplitImageLayerWriter {
183 : inner: ImageLayerWriter,
184 : target_layer_size: u64,
185 : lsn: Lsn,
186 : conf: &'static PageServerConf,
187 : timeline_id: TimelineId,
188 : tenant_shard_id: TenantShardId,
189 : batches: BatchLayerWriter,
190 : start_key: Key,
191 : }
192 :
193 : impl SplitImageLayerWriter {
194 100 : pub async fn new(
195 100 : conf: &'static PageServerConf,
196 100 : timeline_id: TimelineId,
197 100 : tenant_shard_id: TenantShardId,
198 100 : start_key: Key,
199 100 : lsn: Lsn,
200 100 : target_layer_size: u64,
201 100 : ctx: &RequestContext,
202 100 : ) -> anyhow::Result<Self> {
203 100 : Ok(Self {
204 100 : target_layer_size,
205 100 : inner: ImageLayerWriter::new(
206 100 : conf,
207 100 : timeline_id,
208 100 : tenant_shard_id,
209 100 : &(start_key..Key::MAX),
210 100 : lsn,
211 100 : ctx,
212 100 : )
213 100 : .await?,
214 100 : conf,
215 100 : timeline_id,
216 100 : tenant_shard_id,
217 100 : batches: BatchLayerWriter::new(conf).await?,
218 100 : lsn,
219 100 : start_key,
220 : })
221 100 : }
222 :
223 17180 : pub async fn put_image(
224 17180 : &mut self,
225 17180 : key: Key,
226 17180 : img: Bytes,
227 17180 : ctx: &RequestContext,
228 17180 : ) -> anyhow::Result<()> {
229 17180 : // The current estimation is an upper bound of the space that the key/image could take
230 17180 : // because we did not consider compression in this estimation. The resulting image layer
231 17180 : // could be smaller than the target size.
232 17180 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
233 17180 : if self.inner.num_keys() >= 1
234 17080 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
235 : {
236 28 : let next_image_writer = ImageLayerWriter::new(
237 28 : self.conf,
238 28 : self.timeline_id,
239 28 : self.tenant_shard_id,
240 28 : &(key..Key::MAX),
241 28 : self.lsn,
242 28 : ctx,
243 28 : )
244 28 : .await?;
245 28 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
246 28 : self.batches.add_unfinished_image_writer(
247 28 : prev_image_writer,
248 28 : self.start_key..key,
249 28 : self.lsn,
250 28 : );
251 28 : self.start_key = key;
252 17152 : }
253 17180 : self.inner.put_image(key, img, ctx).await
254 17180 : }
255 :
256 92 : pub(crate) async fn finish_with_discard_fn<D, F>(
257 92 : self,
258 92 : tline: &Arc<Timeline>,
259 92 : ctx: &RequestContext,
260 92 : end_key: Key,
261 92 : discard_fn: D,
262 92 : ) -> anyhow::Result<Vec<BatchWriterResult>>
263 92 : where
264 92 : D: Fn(&PersistentLayerKey) -> F,
265 92 : F: Future<Output = bool>,
266 92 : {
267 92 : let Self {
268 92 : mut batches, inner, ..
269 92 : } = self;
270 92 : if inner.num_keys() != 0 {
271 92 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
272 92 : }
273 92 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
274 92 : }
275 :
276 : #[cfg(test)]
277 8 : pub(crate) async fn finish(
278 8 : self,
279 8 : tline: &Arc<Timeline>,
280 8 : ctx: &RequestContext,
281 8 : end_key: Key,
282 8 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
283 12 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
284 8 : .await
285 8 : }
286 : }
287 :
288 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
289 : ///
290 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
291 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
292 : /// will split them into multiple files based on size.
293 : #[must_use]
294 : pub struct SplitDeltaLayerWriter {
295 : inner: Option<(Key, DeltaLayerWriter)>,
296 : target_layer_size: u64,
297 : conf: &'static PageServerConf,
298 : timeline_id: TimelineId,
299 : tenant_shard_id: TenantShardId,
300 : lsn_range: Range<Lsn>,
301 : last_key_written: Key,
302 : batches: BatchLayerWriter,
303 : }
304 :
305 : impl SplitDeltaLayerWriter {
306 124 : pub async fn new(
307 124 : conf: &'static PageServerConf,
308 124 : timeline_id: TimelineId,
309 124 : tenant_shard_id: TenantShardId,
310 124 : lsn_range: Range<Lsn>,
311 124 : target_layer_size: u64,
312 124 : ) -> anyhow::Result<Self> {
313 124 : Ok(Self {
314 124 : target_layer_size,
315 124 : inner: None,
316 124 : conf,
317 124 : timeline_id,
318 124 : tenant_shard_id,
319 124 : lsn_range,
320 124 : last_key_written: Key::MIN,
321 124 : batches: BatchLayerWriter::new(conf).await?,
322 : })
323 124 : }
324 :
325 24416 : pub async fn put_value(
326 24416 : &mut self,
327 24416 : key: Key,
328 24416 : lsn: Lsn,
329 24416 : val: Value,
330 24416 : ctx: &RequestContext,
331 24416 : ) -> anyhow::Result<()> {
332 24416 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
333 24416 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
334 24416 : //
335 24416 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
336 24416 : // strategy. https://github.com/neondatabase/neon/issues/8837
337 24416 :
338 24416 : if self.inner.is_none() {
339 100 : self.inner = Some((
340 100 : key,
341 100 : DeltaLayerWriter::new(
342 100 : self.conf,
343 100 : self.timeline_id,
344 100 : self.tenant_shard_id,
345 100 : key,
346 100 : self.lsn_range.clone(),
347 100 : ctx,
348 100 : )
349 100 : .await?,
350 : ));
351 24316 : }
352 24416 : let (_, inner) = self.inner.as_mut().unwrap();
353 24416 :
354 24416 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
355 24416 : if inner.num_keys() >= 1
356 24316 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
357 : {
358 5992 : if key != self.last_key_written {
359 28 : let next_delta_writer = DeltaLayerWriter::new(
360 28 : self.conf,
361 28 : self.timeline_id,
362 28 : self.tenant_shard_id,
363 28 : key,
364 28 : self.lsn_range.clone(),
365 28 : ctx,
366 28 : )
367 28 : .await?;
368 28 : let (start_key, prev_delta_writer) =
369 28 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
370 28 : self.batches.add_unfinished_delta_writer(
371 28 : prev_delta_writer,
372 28 : start_key..key,
373 28 : self.lsn_range.clone(),
374 28 : );
375 5964 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
376 : // We have to produce a very large file b/c a key is updated too often.
377 0 : anyhow::bail!(
378 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
379 0 : key,
380 0 : inner.estimated_size()
381 0 : );
382 5964 : }
383 18424 : }
384 24416 : self.last_key_written = key;
385 24416 : let (_, inner) = self.inner.as_mut().unwrap();
386 24416 : inner.put_value(key, lsn, val, ctx).await
387 24416 : }
388 :
389 116 : pub(crate) async fn finish_with_discard_fn<D, F>(
390 116 : self,
391 116 : tline: &Arc<Timeline>,
392 116 : ctx: &RequestContext,
393 116 : discard_fn: D,
394 116 : ) -> anyhow::Result<Vec<BatchWriterResult>>
395 116 : where
396 116 : D: Fn(&PersistentLayerKey) -> F,
397 116 : F: Future<Output = bool>,
398 116 : {
399 116 : let Self {
400 116 : mut batches, inner, ..
401 116 : } = self;
402 116 : if let Some((start_key, writer)) = inner {
403 92 : if writer.num_keys() != 0 {
404 92 : let end_key = self.last_key_written.next();
405 92 : batches.add_unfinished_delta_writer(
406 92 : writer,
407 92 : start_key..end_key,
408 92 : self.lsn_range.clone(),
409 92 : );
410 92 : }
411 24 : }
412 116 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
413 116 : }
414 :
415 : #[cfg(test)]
416 12 : pub(crate) async fn finish(
417 12 : self,
418 12 : tline: &Arc<Timeline>,
419 12 : ctx: &RequestContext,
420 12 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
421 16 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
422 12 : .await
423 12 : }
424 : }
425 :
426 : #[cfg(test)]
427 : mod tests {
428 : use itertools::Itertools;
429 : use rand::{RngCore, SeedableRng};
430 :
431 : use super::*;
432 : use crate::DEFAULT_PG_VERSION;
433 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
434 : use crate::tenant::storage_layer::AsLayerDesc;
435 :
436 40104 : fn get_key(id: u32) -> Key {
437 40104 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
438 40104 : key.field6 = id;
439 40104 : key
440 40104 : }
441 :
442 16 : fn get_img(id: u32) -> Bytes {
443 16 : format!("{id:064}").into()
444 16 : }
445 :
446 40008 : fn get_large_img() -> Bytes {
447 40008 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
448 40008 : let mut data = vec![0; 8192];
449 40008 : rng.fill_bytes(&mut data);
450 40008 : data.into()
451 40008 : }
452 :
453 : #[tokio::test]
454 4 : async fn write_one_image() {
455 4 : let harness = TenantHarness::create("split_writer_write_one_image")
456 4 : .await
457 4 : .unwrap();
458 4 : let (tenant, ctx) = harness.load().await;
459 4 :
460 4 : let tline = tenant
461 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
462 4 : .await
463 4 : .unwrap();
464 4 :
465 4 : let mut image_writer = SplitImageLayerWriter::new(
466 4 : tenant.conf,
467 4 : tline.timeline_id,
468 4 : tenant.tenant_shard_id,
469 4 : get_key(0),
470 4 : Lsn(0x18),
471 4 : 4 * 1024 * 1024,
472 4 : &ctx,
473 4 : )
474 4 : .await
475 4 : .unwrap();
476 4 :
477 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
478 4 : tenant.conf,
479 4 : tline.timeline_id,
480 4 : tenant.tenant_shard_id,
481 4 : Lsn(0x18)..Lsn(0x20),
482 4 : 4 * 1024 * 1024,
483 4 : )
484 4 : .await
485 4 : .unwrap();
486 4 :
487 4 : image_writer
488 4 : .put_image(get_key(0), get_img(0), &ctx)
489 4 : .await
490 4 : .unwrap();
491 4 : let layers = image_writer
492 4 : .finish(&tline, &ctx, get_key(10))
493 4 : .await
494 4 : .unwrap();
495 4 : assert_eq!(layers.len(), 1);
496 4 :
497 4 : delta_writer
498 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
499 4 : .await
500 4 : .unwrap();
501 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
502 4 : assert_eq!(layers.len(), 1);
503 4 : assert_eq!(
504 4 : layers
505 4 : .into_iter()
506 4 : .next()
507 4 : .unwrap()
508 4 : .into_resident_layer()
509 4 : .layer_desc()
510 4 : .key(),
511 4 : PersistentLayerKey {
512 4 : key_range: get_key(0)..get_key(1),
513 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
514 4 : is_delta: true
515 4 : }
516 4 : );
517 4 : }
518 :
519 : #[tokio::test]
520 4 : async fn write_split() {
521 4 : // Test the split writer with retaining all the layers we have produced (discard=false)
522 4 : write_split_helper("split_writer_write_split", false).await;
523 4 : }
524 :
525 : #[tokio::test]
526 4 : async fn write_split_discard() {
527 4 : // Test the split writer with discarding all the layers we have produced (discard=true)
528 4 : write_split_helper("split_writer_write_split_discard", true).await;
529 4 : }
530 :
531 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
532 : /// set to true, all layers will be discarded.
533 8 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
534 8 : let harness = TenantHarness::create(harness_name).await.unwrap();
535 8 : let (tenant, ctx) = harness.load().await;
536 :
537 8 : let tline = tenant
538 8 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
539 8 : .await
540 8 : .unwrap();
541 :
542 8 : let mut image_writer = SplitImageLayerWriter::new(
543 8 : tenant.conf,
544 8 : tline.timeline_id,
545 8 : tenant.tenant_shard_id,
546 8 : get_key(0),
547 8 : Lsn(0x18),
548 8 : 4 * 1024 * 1024,
549 8 : &ctx,
550 8 : )
551 8 : .await
552 8 : .unwrap();
553 8 : let mut delta_writer = SplitDeltaLayerWriter::new(
554 8 : tenant.conf,
555 8 : tline.timeline_id,
556 8 : tenant.tenant_shard_id,
557 8 : Lsn(0x18)..Lsn(0x20),
558 8 : 4 * 1024 * 1024,
559 8 : )
560 8 : .await
561 8 : .unwrap();
562 : const N: usize = 2000;
563 16008 : for i in 0..N {
564 16000 : let i = i as u32;
565 16000 : image_writer
566 16000 : .put_image(get_key(i), get_large_img(), &ctx)
567 16000 : .await
568 16000 : .unwrap();
569 16000 : delta_writer
570 16000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
571 16000 : .await
572 16000 : .unwrap();
573 : }
574 8 : let image_layers = image_writer
575 32 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
576 8 : .await
577 8 : .unwrap();
578 8 : let delta_layers = delta_writer
579 32 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
580 8 : .await
581 8 : .unwrap();
582 8 : let image_layers = image_layers
583 8 : .into_iter()
584 32 : .map(|x| {
585 32 : if discard {
586 16 : x.into_discarded_layer()
587 : } else {
588 16 : x.into_resident_layer().layer_desc().key()
589 : }
590 32 : })
591 8 : .collect_vec();
592 8 : let delta_layers = delta_layers
593 8 : .into_iter()
594 32 : .map(|x| {
595 32 : if discard {
596 16 : x.into_discarded_layer()
597 : } else {
598 16 : x.into_resident_layer().layer_desc().key()
599 : }
600 32 : })
601 8 : .collect_vec();
602 8 : assert_eq!(image_layers.len(), N / 512 + 1);
603 8 : assert_eq!(delta_layers.len(), N / 512 + 1);
604 8 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
605 8 : assert_eq!(
606 8 : delta_layers.last().unwrap().key_range.end,
607 8 : get_key(N as u32)
608 8 : );
609 32 : for idx in 0..image_layers.len() {
610 32 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
611 32 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
612 32 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
613 32 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
614 32 : if idx > 0 {
615 24 : assert_eq!(
616 24 : image_layers[idx - 1].key_range.end,
617 24 : image_layers[idx].key_range.start
618 24 : );
619 24 : assert_eq!(
620 24 : delta_layers[idx - 1].key_range.end,
621 24 : delta_layers[idx].key_range.start
622 24 : );
623 8 : }
624 : }
625 8 : }
626 :
627 : #[tokio::test]
628 4 : async fn write_large_img() {
629 4 : let harness = TenantHarness::create("split_writer_write_large_img")
630 4 : .await
631 4 : .unwrap();
632 4 : let (tenant, ctx) = harness.load().await;
633 4 :
634 4 : let tline = tenant
635 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
636 4 : .await
637 4 : .unwrap();
638 4 :
639 4 : let mut image_writer = SplitImageLayerWriter::new(
640 4 : tenant.conf,
641 4 : tline.timeline_id,
642 4 : tenant.tenant_shard_id,
643 4 : get_key(0),
644 4 : Lsn(0x18),
645 4 : 4 * 1024,
646 4 : &ctx,
647 4 : )
648 4 : .await
649 4 : .unwrap();
650 4 :
651 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
652 4 : tenant.conf,
653 4 : tline.timeline_id,
654 4 : tenant.tenant_shard_id,
655 4 : Lsn(0x18)..Lsn(0x20),
656 4 : 4 * 1024,
657 4 : )
658 4 : .await
659 4 : .unwrap();
660 4 :
661 4 : image_writer
662 4 : .put_image(get_key(0), get_img(0), &ctx)
663 4 : .await
664 4 : .unwrap();
665 4 : image_writer
666 4 : .put_image(get_key(1), get_large_img(), &ctx)
667 4 : .await
668 4 : .unwrap();
669 4 : let layers = image_writer
670 4 : .finish(&tline, &ctx, get_key(10))
671 4 : .await
672 4 : .unwrap();
673 4 : assert_eq!(layers.len(), 2);
674 4 :
675 4 : delta_writer
676 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
677 4 : .await
678 4 : .unwrap();
679 4 : delta_writer
680 4 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
681 4 : .await
682 4 : .unwrap();
683 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
684 4 : assert_eq!(layers.len(), 2);
685 4 : let mut layers_iter = layers.into_iter();
686 4 : assert_eq!(
687 4 : layers_iter
688 4 : .next()
689 4 : .unwrap()
690 4 : .into_resident_layer()
691 4 : .layer_desc()
692 4 : .key(),
693 4 : PersistentLayerKey {
694 4 : key_range: get_key(0)..get_key(1),
695 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
696 4 : is_delta: true
697 4 : }
698 4 : );
699 4 : assert_eq!(
700 4 : layers_iter
701 4 : .next()
702 4 : .unwrap()
703 4 : .into_resident_layer()
704 4 : .layer_desc()
705 4 : .key(),
706 4 : PersistentLayerKey {
707 4 : key_range: get_key(1)..get_key(2),
708 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
709 4 : is_delta: true
710 4 : }
711 4 : );
712 4 : }
713 :
714 : #[tokio::test]
715 4 : async fn write_split_single_key() {
716 4 : let harness = TenantHarness::create("split_writer_write_split_single_key")
717 4 : .await
718 4 : .unwrap();
719 4 : let (tenant, ctx) = harness.load().await;
720 4 :
721 4 : let tline = tenant
722 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
723 4 : .await
724 4 : .unwrap();
725 4 :
726 4 : const N: usize = 2000;
727 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
728 4 : tenant.conf,
729 4 : tline.timeline_id,
730 4 : tenant.tenant_shard_id,
731 4 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
732 4 : 4 * 1024 * 1024,
733 4 : )
734 4 : .await
735 4 : .unwrap();
736 4 :
737 8004 : for i in 0..N {
738 8000 : let i = i as u32;
739 8000 : delta_writer
740 8000 : .put_value(
741 8000 : get_key(0),
742 8000 : Lsn(i as u64 * 16 + 0x10),
743 8000 : Value::Image(get_large_img()),
744 8000 : &ctx,
745 8000 : )
746 8000 : .await
747 8000 : .unwrap();
748 4 : }
749 4 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
750 4 : assert_eq!(delta_layers.len(), 1);
751 4 : let delta_layer = delta_layers
752 4 : .into_iter()
753 4 : .next()
754 4 : .unwrap()
755 4 : .into_resident_layer();
756 4 : assert_eq!(
757 4 : delta_layer.layer_desc().key(),
758 4 : PersistentLayerKey {
759 4 : key_range: get_key(0)..get_key(1),
760 4 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
761 4 : is_delta: true
762 4 : }
763 4 : );
764 4 : }
765 : }
|