Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline};
9 : use pageserver_api::value::Value;
10 :
11 : use super::layer::S3_UPLOAD_LIMIT;
12 : use super::{
13 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
14 : };
15 :
16 : pub(crate) enum BatchWriterResult {
17 : Produced(ResidentLayer),
18 : Discarded(PersistentLayerKey),
19 : }
20 :
21 : #[cfg(test)]
22 : impl BatchWriterResult {
23 24 : fn into_resident_layer(self) -> ResidentLayer {
24 24 : match self {
25 24 : BatchWriterResult::Produced(layer) => layer,
26 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
27 : }
28 24 : }
29 :
30 16 : fn into_discarded_layer(self) -> PersistentLayerKey {
31 16 : match self {
32 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
33 16 : BatchWriterResult::Discarded(layer) => layer,
34 16 : }
35 16 : }
36 : }
37 :
38 : enum LayerWriterWrapper {
39 : Image(ImageLayerWriter),
40 : Delta(DeltaLayerWriter),
41 : }
42 :
43 : /// An layer writer that takes unfinished layers and finish them atomically.
44 : #[must_use]
45 : pub struct BatchLayerWriter {
46 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
47 : conf: &'static PageServerConf,
48 : }
49 :
50 : impl BatchLayerWriter {
51 96 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
52 96 : Ok(Self {
53 96 : generated_layer_writers: Vec::new(),
54 96 : conf,
55 96 : })
56 96 : }
57 :
58 56 : pub fn add_unfinished_image_writer(
59 56 : &mut self,
60 56 : writer: ImageLayerWriter,
61 56 : key_range: Range<Key>,
62 56 : lsn: Lsn,
63 56 : ) {
64 56 : self.generated_layer_writers.push((
65 56 : LayerWriterWrapper::Image(writer),
66 56 : PersistentLayerKey {
67 56 : key_range,
68 56 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
69 56 : is_delta: false,
70 56 : },
71 56 : ));
72 56 : }
73 :
74 48 : pub fn add_unfinished_delta_writer(
75 48 : &mut self,
76 48 : writer: DeltaLayerWriter,
77 48 : key_range: Range<Key>,
78 48 : lsn_range: Range<Lsn>,
79 48 : ) {
80 48 : self.generated_layer_writers.push((
81 48 : LayerWriterWrapper::Delta(writer),
82 48 : PersistentLayerKey {
83 48 : key_range,
84 48 : lsn_range,
85 48 : is_delta: true,
86 48 : },
87 48 : ));
88 48 : }
89 :
90 88 : pub(crate) async fn finish_with_discard_fn<D, F>(
91 88 : self,
92 88 : tline: &Arc<Timeline>,
93 88 : ctx: &RequestContext,
94 88 : discard_fn: D,
95 88 : ) -> anyhow::Result<Vec<BatchWriterResult>>
96 88 : where
97 88 : D: Fn(&PersistentLayerKey) -> F,
98 88 : F: Future<Output = bool>,
99 88 : {
100 88 : let Self {
101 88 : generated_layer_writers,
102 88 : ..
103 88 : } = self;
104 88 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
105 0 : for produced_layer in generated_layers {
106 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
107 0 : let layer: Layer = resident_layer.into();
108 0 : layer.delete_on_drop();
109 0 : }
110 : }
111 0 : };
112 : // BEGIN: catch every error and do the recovery in the below section
113 88 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
114 192 : for (inner, layer_key) in generated_layer_writers {
115 104 : if discard_fn(&layer_key).await {
116 36 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
117 36 : } else {
118 68 : let res = match inner {
119 28 : LayerWriterWrapper::Delta(writer) => {
120 73 : writer.finish(layer_key.key_range.end, ctx).await
121 : }
122 40 : LayerWriterWrapper::Image(writer) => {
123 40 : writer
124 40 : .finish_with_end_key(layer_key.key_range.end, ctx)
125 80 : .await
126 : }
127 : };
128 68 : let layer = match res {
129 68 : Ok((desc, path)) => {
130 68 : match Layer::finish_creating(self.conf, tline, desc, &path) {
131 68 : Ok(layer) => layer,
132 0 : Err(e) => {
133 0 : tokio::fs::remove_file(&path).await.ok();
134 0 : clean_up_layers(generated_layers);
135 0 : return Err(e);
136 : }
137 : }
138 : }
139 0 : Err(e) => {
140 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
141 0 : // so we don't need to remove the layer we just failed to create by ourselves.
142 0 : clean_up_layers(generated_layers);
143 0 : return Err(e);
144 : }
145 : };
146 68 : generated_layers.push(BatchWriterResult::Produced(layer));
147 : }
148 : }
149 : // END: catch every error and do the recovery in the above section
150 88 : Ok(generated_layers)
151 88 : }
152 : }
153 :
154 : /// An image writer that takes images and produces multiple image layers.
155 : #[must_use]
156 : pub struct SplitImageLayerWriter {
157 : inner: ImageLayerWriter,
158 : target_layer_size: u64,
159 : lsn: Lsn,
160 : conf: &'static PageServerConf,
161 : timeline_id: TimelineId,
162 : tenant_shard_id: TenantShardId,
163 : batches: BatchLayerWriter,
164 : start_key: Key,
165 : }
166 :
167 : impl SplitImageLayerWriter {
168 46 : pub async fn new(
169 46 : conf: &'static PageServerConf,
170 46 : timeline_id: TimelineId,
171 46 : tenant_shard_id: TenantShardId,
172 46 : start_key: Key,
173 46 : lsn: Lsn,
174 46 : target_layer_size: u64,
175 46 : ctx: &RequestContext,
176 46 : ) -> anyhow::Result<Self> {
177 46 : Ok(Self {
178 46 : target_layer_size,
179 46 : inner: ImageLayerWriter::new(
180 46 : conf,
181 46 : timeline_id,
182 46 : tenant_shard_id,
183 46 : &(start_key..Key::MAX),
184 46 : lsn,
185 46 : ctx,
186 46 : )
187 23 : .await?,
188 46 : conf,
189 46 : timeline_id,
190 46 : tenant_shard_id,
191 46 : batches: BatchLayerWriter::new(conf).await?,
192 46 : lsn,
193 46 : start_key,
194 : })
195 46 : }
196 :
197 8522 : pub async fn put_image(
198 8522 : &mut self,
199 8522 : key: Key,
200 8522 : img: Bytes,
201 8522 : ctx: &RequestContext,
202 8522 : ) -> anyhow::Result<()> {
203 8522 : // The current estimation is an upper bound of the space that the key/image could take
204 8522 : // because we did not consider compression in this estimation. The resulting image layer
205 8522 : // could be smaller than the target size.
206 8522 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
207 8522 : if self.inner.num_keys() >= 1
208 8476 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
209 : {
210 14 : let next_image_writer = ImageLayerWriter::new(
211 14 : self.conf,
212 14 : self.timeline_id,
213 14 : self.tenant_shard_id,
214 14 : &(key..Key::MAX),
215 14 : self.lsn,
216 14 : ctx,
217 14 : )
218 7 : .await?;
219 14 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
220 14 : self.batches.add_unfinished_image_writer(
221 14 : prev_image_writer,
222 14 : self.start_key..key,
223 14 : self.lsn,
224 14 : );
225 14 : self.start_key = key;
226 8508 : }
227 8647 : self.inner.put_image(key, img, ctx).await
228 8522 : }
229 :
230 42 : pub(crate) async fn finish_with_discard_fn<D, F>(
231 42 : self,
232 42 : tline: &Arc<Timeline>,
233 42 : ctx: &RequestContext,
234 42 : end_key: Key,
235 42 : discard_fn: D,
236 42 : ) -> anyhow::Result<Vec<BatchWriterResult>>
237 42 : where
238 42 : D: Fn(&PersistentLayerKey) -> F,
239 42 : F: Future<Output = bool>,
240 42 : {
241 42 : let Self {
242 42 : mut batches, inner, ..
243 42 : } = self;
244 42 : if inner.num_keys() != 0 {
245 42 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
246 42 : }
247 80 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
248 42 : }
249 :
250 : #[cfg(test)]
251 4 : pub(crate) async fn finish(
252 4 : self,
253 4 : tline: &Arc<Timeline>,
254 4 : ctx: &RequestContext,
255 4 : end_key: Key,
256 4 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
257 6 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
258 12 : .await
259 4 : }
260 : }
261 :
262 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
263 : ///
264 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
265 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
266 : /// will split them into multiple files based on size.
267 : #[must_use]
268 : pub struct SplitDeltaLayerWriter {
269 : inner: Option<(Key, DeltaLayerWriter)>,
270 : target_layer_size: u64,
271 : conf: &'static PageServerConf,
272 : timeline_id: TimelineId,
273 : tenant_shard_id: TenantShardId,
274 : lsn_range: Range<Lsn>,
275 : last_key_written: Key,
276 : batches: BatchLayerWriter,
277 : }
278 :
279 : impl SplitDeltaLayerWriter {
280 50 : pub async fn new(
281 50 : conf: &'static PageServerConf,
282 50 : timeline_id: TimelineId,
283 50 : tenant_shard_id: TenantShardId,
284 50 : lsn_range: Range<Lsn>,
285 50 : target_layer_size: u64,
286 50 : ) -> anyhow::Result<Self> {
287 50 : Ok(Self {
288 50 : target_layer_size,
289 50 : inner: None,
290 50 : conf,
291 50 : timeline_id,
292 50 : tenant_shard_id,
293 50 : lsn_range,
294 50 : last_key_written: Key::MIN,
295 50 : batches: BatchLayerWriter::new(conf).await?,
296 : })
297 50 : }
298 :
299 12144 : pub async fn put_value(
300 12144 : &mut self,
301 12144 : key: Key,
302 12144 : lsn: Lsn,
303 12144 : val: Value,
304 12144 : ctx: &RequestContext,
305 12144 : ) -> anyhow::Result<()> {
306 12144 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
307 12144 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
308 12144 : //
309 12144 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
310 12144 : // strategy. https://github.com/neondatabase/neon/issues/8837
311 12144 :
312 12144 : if self.inner.is_none() {
313 38 : self.inner = Some((
314 38 : key,
315 38 : DeltaLayerWriter::new(
316 38 : self.conf,
317 38 : self.timeline_id,
318 38 : self.tenant_shard_id,
319 38 : key,
320 38 : self.lsn_range.clone(),
321 38 : ctx,
322 38 : )
323 19 : .await?,
324 : ));
325 12106 : }
326 12144 : let (_, inner) = self.inner.as_mut().unwrap();
327 12144 :
328 12144 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
329 12144 : if inner.num_keys() >= 1
330 12106 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
331 : {
332 2996 : if key != self.last_key_written {
333 14 : let next_delta_writer = DeltaLayerWriter::new(
334 14 : self.conf,
335 14 : self.timeline_id,
336 14 : self.tenant_shard_id,
337 14 : key,
338 14 : self.lsn_range.clone(),
339 14 : ctx,
340 14 : )
341 7 : .await?;
342 14 : let (start_key, prev_delta_writer) =
343 14 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
344 14 : self.batches.add_unfinished_delta_writer(
345 14 : prev_delta_writer,
346 14 : start_key..key,
347 14 : self.lsn_range.clone(),
348 14 : );
349 2982 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
350 : // We have to produce a very large file b/c a key is updated too often.
351 0 : anyhow::bail!(
352 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
353 0 : key,
354 0 : inner.estimated_size()
355 0 : );
356 2982 : }
357 9148 : }
358 12144 : self.last_key_written = key;
359 12144 : let (_, inner) = self.inner.as_mut().unwrap();
360 12144 : inner.put_value(key, lsn, val, ctx).await
361 12144 : }
362 :
363 46 : pub(crate) async fn finish_with_discard_fn<D, F>(
364 46 : self,
365 46 : tline: &Arc<Timeline>,
366 46 : ctx: &RequestContext,
367 46 : discard_fn: D,
368 46 : ) -> anyhow::Result<Vec<BatchWriterResult>>
369 46 : where
370 46 : D: Fn(&PersistentLayerKey) -> F,
371 46 : F: Future<Output = bool>,
372 46 : {
373 46 : let Self {
374 46 : mut batches, inner, ..
375 46 : } = self;
376 46 : if let Some((start_key, writer)) = inner {
377 34 : if writer.num_keys() != 0 {
378 34 : let end_key = self.last_key_written.next();
379 34 : batches.add_unfinished_delta_writer(
380 34 : writer,
381 34 : start_key..end_key,
382 34 : self.lsn_range.clone(),
383 34 : );
384 34 : }
385 12 : }
386 73 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
387 46 : }
388 :
389 : #[cfg(test)]
390 6 : pub(crate) async fn finish(
391 6 : self,
392 6 : tline: &Arc<Timeline>,
393 6 : ctx: &RequestContext,
394 6 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
395 8 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
396 23 : .await
397 6 : }
398 : }
399 :
400 : #[cfg(test)]
401 : mod tests {
402 : use itertools::Itertools;
403 : use rand::{RngCore, SeedableRng};
404 :
405 : use crate::{
406 : tenant::{
407 : harness::{TenantHarness, TIMELINE_ID},
408 : storage_layer::AsLayerDesc,
409 : },
410 : DEFAULT_PG_VERSION,
411 : };
412 :
413 : use super::*;
414 :
415 20052 : fn get_key(id: u32) -> Key {
416 20052 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
417 20052 : key.field6 = id;
418 20052 : key
419 20052 : }
420 :
421 8 : fn get_img(id: u32) -> Bytes {
422 8 : format!("{id:064}").into()
423 8 : }
424 :
425 20004 : fn get_large_img() -> Bytes {
426 20004 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
427 20004 : let mut data = vec![0; 8192];
428 20004 : rng.fill_bytes(&mut data);
429 20004 : data.into()
430 20004 : }
431 :
432 : #[tokio::test]
433 2 : async fn write_one_image() {
434 2 : let harness = TenantHarness::create("split_writer_write_one_image")
435 2 : .await
436 2 : .unwrap();
437 20 : let (tenant, ctx) = harness.load().await;
438 2 :
439 2 : let tline = tenant
440 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
441 6 : .await
442 2 : .unwrap();
443 2 :
444 2 : let mut image_writer = SplitImageLayerWriter::new(
445 2 : tenant.conf,
446 2 : tline.timeline_id,
447 2 : tenant.tenant_shard_id,
448 2 : get_key(0),
449 2 : Lsn(0x18),
450 2 : 4 * 1024 * 1024,
451 2 : &ctx,
452 2 : )
453 2 : .await
454 2 : .unwrap();
455 2 :
456 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
457 2 : tenant.conf,
458 2 : tline.timeline_id,
459 2 : tenant.tenant_shard_id,
460 2 : Lsn(0x18)..Lsn(0x20),
461 2 : 4 * 1024 * 1024,
462 2 : )
463 2 : .await
464 2 : .unwrap();
465 2 :
466 2 : image_writer
467 2 : .put_image(get_key(0), get_img(0), &ctx)
468 2 : .await
469 2 : .unwrap();
470 2 : let layers = image_writer
471 2 : .finish(&tline, &ctx, get_key(10))
472 4 : .await
473 2 : .unwrap();
474 2 : assert_eq!(layers.len(), 1);
475 2 :
476 2 : delta_writer
477 2 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
478 2 : .await
479 2 : .unwrap();
480 5 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
481 2 : assert_eq!(layers.len(), 1);
482 2 : assert_eq!(
483 2 : layers
484 2 : .into_iter()
485 2 : .next()
486 2 : .unwrap()
487 2 : .into_resident_layer()
488 2 : .layer_desc()
489 2 : .key(),
490 2 : PersistentLayerKey {
491 2 : key_range: get_key(0)..get_key(1),
492 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
493 2 : is_delta: true
494 2 : }
495 2 : );
496 2 : }
497 :
498 : #[tokio::test]
499 2 : async fn write_split() {
500 2 : // Test the split writer with retaining all the layers we have produced (discard=false)
501 4384 : write_split_helper("split_writer_write_split", false).await;
502 2 : }
503 :
504 : #[tokio::test]
505 2 : async fn write_split_discard() {
506 2 : // Test the split writer with discarding all the layers we have produced (discard=true)
507 4348 : write_split_helper("split_writer_write_split_discard", true).await;
508 2 : }
509 :
510 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
511 : /// set to true, all layers will be discarded.
512 4 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
513 4 : let harness = TenantHarness::create(harness_name).await.unwrap();
514 40 : let (tenant, ctx) = harness.load().await;
515 :
516 4 : let tline = tenant
517 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
518 12 : .await
519 4 : .unwrap();
520 :
521 4 : let mut image_writer = SplitImageLayerWriter::new(
522 4 : tenant.conf,
523 4 : tline.timeline_id,
524 4 : tenant.tenant_shard_id,
525 4 : get_key(0),
526 4 : Lsn(0x18),
527 4 : 4 * 1024 * 1024,
528 4 : &ctx,
529 4 : )
530 2 : .await
531 4 : .unwrap();
532 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
533 4 : tenant.conf,
534 4 : tline.timeline_id,
535 4 : tenant.tenant_shard_id,
536 4 : Lsn(0x18)..Lsn(0x20),
537 4 : 4 * 1024 * 1024,
538 4 : )
539 0 : .await
540 4 : .unwrap();
541 : const N: usize = 2000;
542 8004 : for i in 0..N {
543 8000 : let i = i as u32;
544 8000 : image_writer
545 8000 : .put_image(get_key(i), get_large_img(), &ctx)
546 8130 : .await
547 8000 : .unwrap();
548 8000 : delta_writer
549 8000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
550 512 : .await
551 8000 : .unwrap();
552 : }
553 4 : let image_layers = image_writer
554 16 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
555 16 : .await
556 4 : .unwrap();
557 4 : let delta_layers = delta_writer
558 16 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
559 20 : .await
560 4 : .unwrap();
561 4 : let image_layers = image_layers
562 4 : .into_iter()
563 16 : .map(|x| {
564 16 : if discard {
565 8 : x.into_discarded_layer()
566 : } else {
567 8 : x.into_resident_layer().layer_desc().key()
568 : }
569 16 : })
570 4 : .collect_vec();
571 4 : let delta_layers = delta_layers
572 4 : .into_iter()
573 16 : .map(|x| {
574 16 : if discard {
575 8 : x.into_discarded_layer()
576 : } else {
577 8 : x.into_resident_layer().layer_desc().key()
578 : }
579 16 : })
580 4 : .collect_vec();
581 4 : assert_eq!(image_layers.len(), N / 512 + 1);
582 4 : assert_eq!(delta_layers.len(), N / 512 + 1);
583 4 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
584 4 : assert_eq!(
585 4 : delta_layers.last().unwrap().key_range.end,
586 4 : get_key(N as u32)
587 4 : );
588 16 : for idx in 0..image_layers.len() {
589 16 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
590 16 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
591 16 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
592 16 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
593 16 : if idx > 0 {
594 12 : assert_eq!(
595 12 : image_layers[idx - 1].key_range.end,
596 12 : image_layers[idx].key_range.start
597 12 : );
598 12 : assert_eq!(
599 12 : delta_layers[idx - 1].key_range.end,
600 12 : delta_layers[idx].key_range.start
601 12 : );
602 4 : }
603 : }
604 4 : }
605 :
606 : #[tokio::test]
607 2 : async fn write_large_img() {
608 2 : let harness = TenantHarness::create("split_writer_write_large_img")
609 2 : .await
610 2 : .unwrap();
611 20 : let (tenant, ctx) = harness.load().await;
612 2 :
613 2 : let tline = tenant
614 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
615 6 : .await
616 2 : .unwrap();
617 2 :
618 2 : let mut image_writer = SplitImageLayerWriter::new(
619 2 : tenant.conf,
620 2 : tline.timeline_id,
621 2 : tenant.tenant_shard_id,
622 2 : get_key(0),
623 2 : Lsn(0x18),
624 2 : 4 * 1024,
625 2 : &ctx,
626 2 : )
627 2 : .await
628 2 : .unwrap();
629 2 :
630 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
631 2 : tenant.conf,
632 2 : tline.timeline_id,
633 2 : tenant.tenant_shard_id,
634 2 : Lsn(0x18)..Lsn(0x20),
635 2 : 4 * 1024,
636 2 : )
637 2 : .await
638 2 : .unwrap();
639 2 :
640 2 : image_writer
641 2 : .put_image(get_key(0), get_img(0), &ctx)
642 2 : .await
643 2 : .unwrap();
644 2 : image_writer
645 2 : .put_image(get_key(1), get_large_img(), &ctx)
646 3 : .await
647 2 : .unwrap();
648 2 : let layers = image_writer
649 2 : .finish(&tline, &ctx, get_key(10))
650 8 : .await
651 2 : .unwrap();
652 2 : assert_eq!(layers.len(), 2);
653 2 :
654 2 : delta_writer
655 2 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
656 2 : .await
657 2 : .unwrap();
658 2 : delta_writer
659 2 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
660 2 : .await
661 2 : .unwrap();
662 10 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
663 2 : assert_eq!(layers.len(), 2);
664 2 : let mut layers_iter = layers.into_iter();
665 2 : assert_eq!(
666 2 : layers_iter
667 2 : .next()
668 2 : .unwrap()
669 2 : .into_resident_layer()
670 2 : .layer_desc()
671 2 : .key(),
672 2 : PersistentLayerKey {
673 2 : key_range: get_key(0)..get_key(1),
674 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
675 2 : is_delta: true
676 2 : }
677 2 : );
678 2 : assert_eq!(
679 2 : layers_iter
680 2 : .next()
681 2 : .unwrap()
682 2 : .into_resident_layer()
683 2 : .layer_desc()
684 2 : .key(),
685 2 : PersistentLayerKey {
686 2 : key_range: get_key(1)..get_key(2),
687 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
688 2 : is_delta: true
689 2 : }
690 2 : );
691 2 : }
692 :
693 : #[tokio::test]
694 2 : async fn write_split_single_key() {
695 2 : let harness = TenantHarness::create("split_writer_write_split_single_key")
696 2 : .await
697 2 : .unwrap();
698 20 : let (tenant, ctx) = harness.load().await;
699 2 :
700 2 : let tline = tenant
701 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
702 6 : .await
703 2 : .unwrap();
704 2 :
705 2 : const N: usize = 2000;
706 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
707 2 : tenant.conf,
708 2 : tline.timeline_id,
709 2 : tenant.tenant_shard_id,
710 2 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
711 2 : 4 * 1024 * 1024,
712 2 : )
713 2 : .await
714 2 : .unwrap();
715 2 :
716 4002 : for i in 0..N {
717 4000 : let i = i as u32;
718 4000 : delta_writer
719 4000 : .put_value(
720 4000 : get_key(0),
721 4000 : Lsn(i as u64 * 16 + 0x10),
722 4000 : Value::Image(get_large_img()),
723 4000 : &ctx,
724 4000 : )
725 254 : .await
726 4000 : .unwrap();
727 2 : }
728 8 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
729 2 : assert_eq!(delta_layers.len(), 1);
730 2 : let delta_layer = delta_layers
731 2 : .into_iter()
732 2 : .next()
733 2 : .unwrap()
734 2 : .into_resident_layer();
735 2 : assert_eq!(
736 2 : delta_layer.layer_desc().key(),
737 2 : PersistentLayerKey {
738 2 : key_range: get_key(0)..get_key(1),
739 2 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
740 2 : is_delta: true
741 2 : }
742 2 : );
743 2 : }
744 : }
|