Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
9 :
10 : use super::layer::S3_UPLOAD_LIMIT;
11 : use super::{
12 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
13 : };
14 :
15 : pub(crate) enum SplitWriterResult {
16 : Produced(ResidentLayer),
17 : Discarded(PersistentLayerKey),
18 : }
19 :
20 : #[cfg(test)]
21 : impl SplitWriterResult {
22 24 : fn into_resident_layer(self) -> ResidentLayer {
23 24 : match self {
24 24 : SplitWriterResult::Produced(layer) => layer,
25 0 : SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
26 : }
27 24 : }
28 :
29 16 : fn into_discarded_layer(self) -> PersistentLayerKey {
30 16 : match self {
31 0 : SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
32 16 : SplitWriterResult::Discarded(layer) => layer,
33 16 : }
34 16 : }
35 : }
36 :
37 : /// An image writer that takes images and produces multiple image layers.
38 : ///
39 : /// The interface does not guarantee atomicity (i.e., if the image layer generation
40 : /// fails, there might be leftover files to be cleaned up)
41 : #[must_use]
42 : pub struct SplitImageLayerWriter {
43 : inner: ImageLayerWriter,
44 : target_layer_size: u64,
45 : generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>,
46 : conf: &'static PageServerConf,
47 : timeline_id: TimelineId,
48 : tenant_shard_id: TenantShardId,
49 : lsn: Lsn,
50 : start_key: Key,
51 : }
52 :
53 : impl SplitImageLayerWriter {
54 32 : pub async fn new(
55 32 : conf: &'static PageServerConf,
56 32 : timeline_id: TimelineId,
57 32 : tenant_shard_id: TenantShardId,
58 32 : start_key: Key,
59 32 : lsn: Lsn,
60 32 : target_layer_size: u64,
61 32 : ctx: &RequestContext,
62 32 : ) -> anyhow::Result<Self> {
63 32 : Ok(Self {
64 32 : target_layer_size,
65 32 : inner: ImageLayerWriter::new(
66 32 : conf,
67 32 : timeline_id,
68 32 : tenant_shard_id,
69 32 : &(start_key..Key::MAX),
70 32 : lsn,
71 32 : ctx,
72 32 : )
73 16 : .await?,
74 32 : generated_layer_writers: Vec::new(),
75 32 : conf,
76 32 : timeline_id,
77 32 : tenant_shard_id,
78 32 : lsn,
79 32 : start_key,
80 : })
81 32 : }
82 :
83 8414 : pub async fn put_image(
84 8414 : &mut self,
85 8414 : key: Key,
86 8414 : img: Bytes,
87 8414 : ctx: &RequestContext,
88 8414 : ) -> anyhow::Result<()> {
89 8414 : // The current estimation is an upper bound of the space that the key/image could take
90 8414 : // because we did not consider compression in this estimation. The resulting image layer
91 8414 : // could be smaller than the target size.
92 8414 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
93 8414 : if self.inner.num_keys() >= 1
94 8382 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
95 : {
96 14 : let next_image_writer = ImageLayerWriter::new(
97 14 : self.conf,
98 14 : self.timeline_id,
99 14 : self.tenant_shard_id,
100 14 : &(key..Key::MAX),
101 14 : self.lsn,
102 14 : ctx,
103 14 : )
104 7 : .await?;
105 14 : let layer_key = PersistentLayerKey {
106 14 : key_range: self.start_key..key,
107 14 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
108 14 : is_delta: false,
109 14 : };
110 14 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
111 14 : self.start_key = key;
112 14 :
113 14 : self.generated_layer_writers
114 14 : .push((prev_image_writer, layer_key));
115 8400 : }
116 8540 : self.inner.put_image(key, img, ctx).await
117 8414 : }
118 :
119 28 : pub(crate) async fn finish_with_discard_fn<D, F>(
120 28 : self,
121 28 : tline: &Arc<Timeline>,
122 28 : ctx: &RequestContext,
123 28 : end_key: Key,
124 28 : discard_fn: D,
125 28 : ) -> anyhow::Result<Vec<SplitWriterResult>>
126 28 : where
127 28 : D: Fn(&PersistentLayerKey) -> F,
128 28 : F: Future<Output = bool>,
129 28 : {
130 28 : let Self {
131 28 : mut generated_layer_writers,
132 28 : inner,
133 28 : ..
134 28 : } = self;
135 28 : if inner.num_keys() != 0 {
136 28 : let layer_key = PersistentLayerKey {
137 28 : key_range: self.start_key..end_key,
138 28 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
139 28 : is_delta: false,
140 28 : };
141 28 : generated_layer_writers.push((inner, layer_key));
142 28 : }
143 28 : let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
144 0 : for produced_layer in generated_layers {
145 0 : if let SplitWriterResult::Produced(image_layer) = produced_layer {
146 0 : let layer: Layer = image_layer.into();
147 0 : layer.delete_on_drop();
148 0 : }
149 : }
150 0 : };
151 : // BEGIN: catch every error and do the recovery in the below section
152 28 : let mut generated_layers = Vec::new();
153 70 : for (inner, layer_key) in generated_layer_writers {
154 42 : if discard_fn(&layer_key).await {
155 16 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
156 16 : } else {
157 26 : let layer = match inner
158 26 : .finish_with_end_key(layer_key.key_range.end, ctx)
159 53 : .await
160 : {
161 26 : Ok((desc, path)) => {
162 26 : match Layer::finish_creating(self.conf, tline, desc, &path) {
163 26 : Ok(layer) => layer,
164 0 : Err(e) => {
165 0 : tokio::fs::remove_file(&path).await.ok();
166 0 : clean_up_layers(generated_layers);
167 0 : return Err(e);
168 : }
169 : }
170 : }
171 0 : Err(e) => {
172 0 : // ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong,
173 0 : // so we don't need to remove the layer we just failed to create by ourselves.
174 0 : clean_up_layers(generated_layers);
175 0 : return Err(e);
176 : }
177 : };
178 26 : generated_layers.push(SplitWriterResult::Produced(layer));
179 : }
180 : }
181 : // END: catch every error and do the recovery in the above section
182 28 : Ok(generated_layers)
183 28 : }
184 :
185 : #[cfg(test)]
186 4 : pub(crate) async fn finish(
187 4 : self,
188 4 : tline: &Arc<Timeline>,
189 4 : ctx: &RequestContext,
190 4 : end_key: Key,
191 4 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
192 6 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
193 12 : .await
194 4 : }
195 : }
196 :
197 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
198 : ///
199 : /// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
200 : /// there might be leftover files to be cleaned up).
201 : ///
202 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
203 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
204 : /// will split them into multiple files based on size.
205 : #[must_use]
206 : pub struct SplitDeltaLayerWriter {
207 : inner: Option<(Key, DeltaLayerWriter)>,
208 : target_layer_size: u64,
209 : generated_layer_writers: Vec<(DeltaLayerWriter, PersistentLayerKey)>,
210 : conf: &'static PageServerConf,
211 : timeline_id: TimelineId,
212 : tenant_shard_id: TenantShardId,
213 : lsn_range: Range<Lsn>,
214 : last_key_written: Key,
215 : }
216 :
217 : impl SplitDeltaLayerWriter {
218 36 : pub async fn new(
219 36 : conf: &'static PageServerConf,
220 36 : timeline_id: TimelineId,
221 36 : tenant_shard_id: TenantShardId,
222 36 : lsn_range: Range<Lsn>,
223 36 : target_layer_size: u64,
224 36 : ) -> anyhow::Result<Self> {
225 36 : Ok(Self {
226 36 : target_layer_size,
227 36 : inner: None,
228 36 : generated_layer_writers: Vec::new(),
229 36 : conf,
230 36 : timeline_id,
231 36 : tenant_shard_id,
232 36 : lsn_range,
233 36 : last_key_written: Key::MIN,
234 36 : })
235 36 : }
236 :
237 12134 : pub async fn put_value(
238 12134 : &mut self,
239 12134 : key: Key,
240 12134 : lsn: Lsn,
241 12134 : val: Value,
242 12134 : ctx: &RequestContext,
243 12134 : ) -> anyhow::Result<()> {
244 12134 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
245 12134 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
246 12134 : //
247 12134 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
248 12134 : // strategy. https://github.com/neondatabase/neon/issues/8837
249 12134 :
250 12134 : if self.inner.is_none() {
251 32 : self.inner = Some((
252 32 : key,
253 32 : DeltaLayerWriter::new(
254 32 : self.conf,
255 32 : self.timeline_id,
256 32 : self.tenant_shard_id,
257 32 : key,
258 32 : self.lsn_range.clone(),
259 32 : ctx,
260 32 : )
261 16 : .await?,
262 : ));
263 12102 : }
264 12134 : let (_, inner) = self.inner.as_mut().unwrap();
265 12134 :
266 12134 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
267 12134 : if inner.num_keys() >= 1
268 12102 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
269 : {
270 2996 : if key != self.last_key_written {
271 14 : let next_delta_writer = DeltaLayerWriter::new(
272 14 : self.conf,
273 14 : self.timeline_id,
274 14 : self.tenant_shard_id,
275 14 : key,
276 14 : self.lsn_range.clone(),
277 14 : ctx,
278 14 : )
279 7 : .await?;
280 14 : let (start_key, prev_delta_writer) =
281 14 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
282 14 : let layer_key = PersistentLayerKey {
283 14 : key_range: start_key..key,
284 14 : lsn_range: self.lsn_range.clone(),
285 14 : is_delta: true,
286 14 : };
287 14 : self.generated_layer_writers
288 14 : .push((prev_delta_writer, layer_key));
289 2982 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
290 : // We have to produce a very large file b/c a key is updated too often.
291 0 : anyhow::bail!(
292 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
293 0 : key,
294 0 : inner.estimated_size()
295 0 : );
296 2982 : }
297 9138 : }
298 12134 : self.last_key_written = key;
299 12134 : let (_, inner) = self.inner.as_mut().unwrap();
300 12134 : inner.put_value(key, lsn, val, ctx).await
301 12134 : }
302 :
303 32 : pub(crate) async fn finish_with_discard_fn<D, F>(
304 32 : self,
305 32 : tline: &Arc<Timeline>,
306 32 : ctx: &RequestContext,
307 32 : discard_fn: D,
308 32 : ) -> anyhow::Result<Vec<SplitWriterResult>>
309 32 : where
310 32 : D: Fn(&PersistentLayerKey) -> F,
311 32 : F: Future<Output = bool>,
312 32 : {
313 32 : let Self {
314 32 : mut generated_layer_writers,
315 32 : inner,
316 32 : ..
317 32 : } = self;
318 32 : if let Some((start_key, writer)) = inner {
319 28 : if writer.num_keys() != 0 {
320 28 : let end_key = self.last_key_written.next();
321 28 : let layer_key = PersistentLayerKey {
322 28 : key_range: start_key..end_key,
323 28 : lsn_range: self.lsn_range.clone(),
324 28 : is_delta: true,
325 28 : };
326 28 : generated_layer_writers.push((writer, layer_key));
327 28 : }
328 4 : }
329 32 : let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
330 0 : for produced_layer in generated_layers {
331 0 : if let SplitWriterResult::Produced(delta_layer) = produced_layer {
332 0 : let layer: Layer = delta_layer.into();
333 0 : layer.delete_on_drop();
334 0 : }
335 : }
336 0 : };
337 : // BEGIN: catch every error and do the recovery in the below section
338 32 : let mut generated_layers = Vec::new();
339 74 : for (inner, layer_key) in generated_layer_writers {
340 42 : if discard_fn(&layer_key).await {
341 16 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
342 16 : } else {
343 68 : let layer = match inner.finish(layer_key.key_range.end, ctx).await {
344 26 : Ok((desc, path)) => {
345 26 : match Layer::finish_creating(self.conf, tline, desc, &path) {
346 26 : Ok(layer) => layer,
347 0 : Err(e) => {
348 0 : tokio::fs::remove_file(&path).await.ok();
349 0 : clean_up_layers(generated_layers);
350 0 : return Err(e);
351 : }
352 : }
353 : }
354 0 : Err(e) => {
355 0 : // DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
356 0 : // so we don't need to remove the layer we just failed to create by ourselves.
357 0 : clean_up_layers(generated_layers);
358 0 : return Err(e);
359 : }
360 : };
361 26 : generated_layers.push(SplitWriterResult::Produced(layer));
362 : }
363 : }
364 : // END: catch every error and do the recovery in the above section
365 32 : Ok(generated_layers)
366 32 : }
367 :
368 : #[cfg(test)]
369 6 : pub(crate) async fn finish(
370 6 : self,
371 6 : tline: &Arc<Timeline>,
372 6 : ctx: &RequestContext,
373 6 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
374 8 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
375 23 : .await
376 6 : }
377 : }
378 :
379 : #[cfg(test)]
380 : mod tests {
381 : use itertools::Itertools;
382 : use rand::{RngCore, SeedableRng};
383 :
384 : use crate::{
385 : tenant::{
386 : harness::{TenantHarness, TIMELINE_ID},
387 : storage_layer::AsLayerDesc,
388 : },
389 : DEFAULT_PG_VERSION,
390 : };
391 :
392 : use super::*;
393 :
394 20052 : fn get_key(id: u32) -> Key {
395 20052 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
396 20052 : key.field6 = id;
397 20052 : key
398 20052 : }
399 :
400 8 : fn get_img(id: u32) -> Bytes {
401 8 : format!("{id:064}").into()
402 8 : }
403 :
404 20004 : fn get_large_img() -> Bytes {
405 20004 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
406 20004 : let mut data = vec![0; 8192];
407 20004 : rng.fill_bytes(&mut data);
408 20004 : data.into()
409 20004 : }
410 :
411 : #[tokio::test]
412 2 : async fn write_one_image() {
413 2 : let harness = TenantHarness::create("split_writer_write_one_image")
414 2 : .await
415 2 : .unwrap();
416 8 : let (tenant, ctx) = harness.load().await;
417 2 :
418 2 : let tline = tenant
419 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
420 4 : .await
421 2 : .unwrap();
422 2 :
423 2 : let mut image_writer = SplitImageLayerWriter::new(
424 2 : tenant.conf,
425 2 : tline.timeline_id,
426 2 : tenant.tenant_shard_id,
427 2 : get_key(0),
428 2 : Lsn(0x18),
429 2 : 4 * 1024 * 1024,
430 2 : &ctx,
431 2 : )
432 2 : .await
433 2 : .unwrap();
434 2 :
435 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
436 2 : tenant.conf,
437 2 : tline.timeline_id,
438 2 : tenant.tenant_shard_id,
439 2 : Lsn(0x18)..Lsn(0x20),
440 2 : 4 * 1024 * 1024,
441 2 : )
442 2 : .await
443 2 : .unwrap();
444 2 :
445 2 : image_writer
446 2 : .put_image(get_key(0), get_img(0), &ctx)
447 2 : .await
448 2 : .unwrap();
449 2 : let layers = image_writer
450 2 : .finish(&tline, &ctx, get_key(10))
451 4 : .await
452 2 : .unwrap();
453 2 : assert_eq!(layers.len(), 1);
454 2 :
455 2 : delta_writer
456 2 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
457 2 : .await
458 2 : .unwrap();
459 5 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
460 2 : assert_eq!(layers.len(), 1);
461 2 : assert_eq!(
462 2 : layers
463 2 : .into_iter()
464 2 : .next()
465 2 : .unwrap()
466 2 : .into_resident_layer()
467 2 : .layer_desc()
468 2 : .key(),
469 2 : PersistentLayerKey {
470 2 : key_range: get_key(0)..get_key(1),
471 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
472 2 : is_delta: true
473 2 : }
474 2 : );
475 2 : }
476 :
477 : #[tokio::test]
478 2 : async fn write_split() {
479 2 : // Test the split writer with retaining all the layers we have produced (discard=false)
480 4369 : write_split_helper("split_writer_write_split", false).await;
481 2 : }
482 :
483 : #[tokio::test]
484 2 : async fn write_split_discard() {
485 2 : // Test the split writer with discarding all the layers we have produced (discard=true)
486 4334 : write_split_helper("split_writer_write_split_discard", true).await;
487 2 : }
488 :
489 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
490 : /// set to true, all layers will be discarded.
491 4 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
492 4 : let harness = TenantHarness::create(harness_name).await.unwrap();
493 15 : let (tenant, ctx) = harness.load().await;
494 :
495 4 : let tline = tenant
496 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
497 8 : .await
498 4 : .unwrap();
499 :
500 4 : let mut image_writer = SplitImageLayerWriter::new(
501 4 : tenant.conf,
502 4 : tline.timeline_id,
503 4 : tenant.tenant_shard_id,
504 4 : get_key(0),
505 4 : Lsn(0x18),
506 4 : 4 * 1024 * 1024,
507 4 : &ctx,
508 4 : )
509 2 : .await
510 4 : .unwrap();
511 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
512 4 : tenant.conf,
513 4 : tline.timeline_id,
514 4 : tenant.tenant_shard_id,
515 4 : Lsn(0x18)..Lsn(0x20),
516 4 : 4 * 1024 * 1024,
517 4 : )
518 0 : .await
519 4 : .unwrap();
520 : const N: usize = 2000;
521 8004 : for i in 0..N {
522 8000 : let i = i as u32;
523 8000 : image_writer
524 8000 : .put_image(get_key(i), get_large_img(), &ctx)
525 8130 : .await
526 8000 : .unwrap();
527 8000 : delta_writer
528 8000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
529 512 : .await
530 8000 : .unwrap();
531 : }
532 4 : let image_layers = image_writer
533 16 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
534 16 : .await
535 4 : .unwrap();
536 4 : let delta_layers = delta_writer
537 16 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
538 20 : .await
539 4 : .unwrap();
540 4 : let image_layers = image_layers
541 4 : .into_iter()
542 16 : .map(|x| {
543 16 : if discard {
544 8 : x.into_discarded_layer()
545 : } else {
546 8 : x.into_resident_layer().layer_desc().key()
547 : }
548 16 : })
549 4 : .collect_vec();
550 4 : let delta_layers = delta_layers
551 4 : .into_iter()
552 16 : .map(|x| {
553 16 : if discard {
554 8 : x.into_discarded_layer()
555 : } else {
556 8 : x.into_resident_layer().layer_desc().key()
557 : }
558 16 : })
559 4 : .collect_vec();
560 4 : assert_eq!(image_layers.len(), N / 512 + 1);
561 4 : assert_eq!(delta_layers.len(), N / 512 + 1);
562 4 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
563 4 : assert_eq!(
564 4 : delta_layers.last().unwrap().key_range.end,
565 4 : get_key(N as u32)
566 4 : );
567 16 : for idx in 0..image_layers.len() {
568 16 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
569 16 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
570 16 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
571 16 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
572 16 : if idx > 0 {
573 12 : assert_eq!(
574 12 : image_layers[idx - 1].key_range.end,
575 12 : image_layers[idx].key_range.start
576 12 : );
577 12 : assert_eq!(
578 12 : delta_layers[idx - 1].key_range.end,
579 12 : delta_layers[idx].key_range.start
580 12 : );
581 4 : }
582 : }
583 4 : }
584 :
585 : #[tokio::test]
586 2 : async fn write_large_img() {
587 2 : let harness = TenantHarness::create("split_writer_write_large_img")
588 2 : .await
589 2 : .unwrap();
590 8 : let (tenant, ctx) = harness.load().await;
591 2 :
592 2 : let tline = tenant
593 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
594 4 : .await
595 2 : .unwrap();
596 2 :
597 2 : let mut image_writer = SplitImageLayerWriter::new(
598 2 : tenant.conf,
599 2 : tline.timeline_id,
600 2 : tenant.tenant_shard_id,
601 2 : get_key(0),
602 2 : Lsn(0x18),
603 2 : 4 * 1024,
604 2 : &ctx,
605 2 : )
606 2 : .await
607 2 : .unwrap();
608 2 :
609 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
610 2 : tenant.conf,
611 2 : tline.timeline_id,
612 2 : tenant.tenant_shard_id,
613 2 : Lsn(0x18)..Lsn(0x20),
614 2 : 4 * 1024,
615 2 : )
616 2 : .await
617 2 : .unwrap();
618 2 :
619 2 : image_writer
620 2 : .put_image(get_key(0), get_img(0), &ctx)
621 2 : .await
622 2 : .unwrap();
623 2 : image_writer
624 2 : .put_image(get_key(1), get_large_img(), &ctx)
625 3 : .await
626 2 : .unwrap();
627 2 : let layers = image_writer
628 2 : .finish(&tline, &ctx, get_key(10))
629 8 : .await
630 2 : .unwrap();
631 2 : assert_eq!(layers.len(), 2);
632 2 :
633 2 : delta_writer
634 2 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
635 2 : .await
636 2 : .unwrap();
637 2 : delta_writer
638 2 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
639 2 : .await
640 2 : .unwrap();
641 10 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
642 2 : assert_eq!(layers.len(), 2);
643 2 : let mut layers_iter = layers.into_iter();
644 2 : assert_eq!(
645 2 : layers_iter
646 2 : .next()
647 2 : .unwrap()
648 2 : .into_resident_layer()
649 2 : .layer_desc()
650 2 : .key(),
651 2 : PersistentLayerKey {
652 2 : key_range: get_key(0)..get_key(1),
653 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
654 2 : is_delta: true
655 2 : }
656 2 : );
657 2 : assert_eq!(
658 2 : layers_iter
659 2 : .next()
660 2 : .unwrap()
661 2 : .into_resident_layer()
662 2 : .layer_desc()
663 2 : .key(),
664 2 : PersistentLayerKey {
665 2 : key_range: get_key(1)..get_key(2),
666 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
667 2 : is_delta: true
668 2 : }
669 2 : );
670 2 : }
671 :
672 : #[tokio::test]
673 2 : async fn write_split_single_key() {
674 2 : let harness = TenantHarness::create("split_writer_write_split_single_key")
675 2 : .await
676 2 : .unwrap();
677 5 : let (tenant, ctx) = harness.load().await;
678 2 :
679 2 : let tline = tenant
680 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
681 4 : .await
682 2 : .unwrap();
683 2 :
684 2 : const N: usize = 2000;
685 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
686 2 : tenant.conf,
687 2 : tline.timeline_id,
688 2 : tenant.tenant_shard_id,
689 2 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
690 2 : 4 * 1024 * 1024,
691 2 : )
692 2 : .await
693 2 : .unwrap();
694 2 :
695 4002 : for i in 0..N {
696 4000 : let i = i as u32;
697 4000 : delta_writer
698 4000 : .put_value(
699 4000 : get_key(0),
700 4000 : Lsn(i as u64 * 16 + 0x10),
701 4000 : Value::Image(get_large_img()),
702 4000 : &ctx,
703 4000 : )
704 254 : .await
705 4000 : .unwrap();
706 2 : }
707 8 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
708 2 : assert_eq!(delta_layers.len(), 1);
709 2 : let delta_layer = delta_layers
710 2 : .into_iter()
711 2 : .next()
712 2 : .unwrap()
713 2 : .into_resident_layer();
714 2 : assert_eq!(
715 2 : delta_layer.layer_desc().key(),
716 2 : PersistentLayerKey {
717 2 : key_range: get_key(0)..get_key(1),
718 2 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
719 2 : is_delta: true
720 2 : }
721 2 : );
722 2 : }
723 : }
|