Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
9 :
10 : use super::layer::S3_UPLOAD_LIMIT;
11 : use super::{
12 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
13 : };
14 :
15 : pub(crate) enum SplitWriterResult {
16 : Produced(ResidentLayer),
17 : Discarded(PersistentLayerKey),
18 : }
19 :
20 : #[cfg(test)]
21 : impl SplitWriterResult {
22 24 : fn into_resident_layer(self) -> ResidentLayer {
23 24 : match self {
24 24 : SplitWriterResult::Produced(layer) => layer,
25 0 : SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
26 : }
27 24 : }
28 :
29 16 : fn into_discarded_layer(self) -> PersistentLayerKey {
30 16 : match self {
31 0 : SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
32 16 : SplitWriterResult::Discarded(layer) => layer,
33 16 : }
34 16 : }
35 : }
36 :
37 : /// An image writer that takes images and produces multiple image layers.
38 : ///
39 : /// The interface does not guarantee atomicity (i.e., if the image layer generation
40 : /// fails, there might be leftover files to be cleaned up)
41 : #[must_use]
42 : pub struct SplitImageLayerWriter {
43 : inner: ImageLayerWriter,
44 : target_layer_size: u64,
45 : generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>,
46 : conf: &'static PageServerConf,
47 : timeline_id: TimelineId,
48 : tenant_shard_id: TenantShardId,
49 : lsn: Lsn,
50 : start_key: Key,
51 : }
52 :
53 : impl SplitImageLayerWriter {
54 32 : pub async fn new(
55 32 : conf: &'static PageServerConf,
56 32 : timeline_id: TimelineId,
57 32 : tenant_shard_id: TenantShardId,
58 32 : start_key: Key,
59 32 : lsn: Lsn,
60 32 : target_layer_size: u64,
61 32 : ctx: &RequestContext,
62 32 : ) -> anyhow::Result<Self> {
63 32 : Ok(Self {
64 32 : target_layer_size,
65 32 : inner: ImageLayerWriter::new(
66 32 : conf,
67 32 : timeline_id,
68 32 : tenant_shard_id,
69 32 : &(start_key..Key::MAX),
70 32 : lsn,
71 32 : ctx,
72 32 : )
73 16 : .await?,
74 32 : generated_layer_writers: Vec::new(),
75 32 : conf,
76 32 : timeline_id,
77 32 : tenant_shard_id,
78 32 : lsn,
79 32 : start_key,
80 : })
81 32 : }
82 :
83 8414 : pub async fn put_image(
84 8414 : &mut self,
85 8414 : key: Key,
86 8414 : img: Bytes,
87 8414 : ctx: &RequestContext,
88 8414 : ) -> anyhow::Result<()> {
89 8414 : // The current estimation is an upper bound of the space that the key/image could take
90 8414 : // because we did not consider compression in this estimation. The resulting image layer
91 8414 : // could be smaller than the target size.
92 8414 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
93 8414 : if self.inner.num_keys() >= 1
94 8382 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
95 : {
96 14 : let next_image_writer = ImageLayerWriter::new(
97 14 : self.conf,
98 14 : self.timeline_id,
99 14 : self.tenant_shard_id,
100 14 : &(key..Key::MAX),
101 14 : self.lsn,
102 14 : ctx,
103 14 : )
104 7 : .await?;
105 14 : let layer_key = PersistentLayerKey {
106 14 : key_range: self.start_key..key,
107 14 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
108 14 : is_delta: false,
109 14 : };
110 14 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
111 14 : self.start_key = key;
112 14 :
113 14 : self.generated_layer_writers
114 14 : .push((prev_image_writer, layer_key));
115 8400 : }
116 8540 : self.inner.put_image(key, img, ctx).await
117 8414 : }
118 :
119 28 : pub(crate) async fn finish_with_discard_fn<D, F>(
120 28 : self,
121 28 : tline: &Arc<Timeline>,
122 28 : ctx: &RequestContext,
123 28 : end_key: Key,
124 28 : discard_fn: D,
125 28 : ) -> anyhow::Result<Vec<SplitWriterResult>>
126 28 : where
127 28 : D: Fn(&PersistentLayerKey) -> F,
128 28 : F: Future<Output = bool>,
129 28 : {
130 28 : let Self {
131 28 : mut generated_layer_writers,
132 28 : inner,
133 28 : ..
134 28 : } = self;
135 28 : if inner.num_keys() != 0 {
136 28 : let layer_key = PersistentLayerKey {
137 28 : key_range: self.start_key..end_key,
138 28 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
139 28 : is_delta: false,
140 28 : };
141 28 : generated_layer_writers.push((inner, layer_key));
142 28 : }
143 28 : let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
144 0 : for produced_layer in generated_layers {
145 0 : if let SplitWriterResult::Produced(image_layer) = produced_layer {
146 0 : let layer: Layer = image_layer.into();
147 0 : layer.delete_on_drop();
148 0 : }
149 : }
150 0 : };
151 : // BEGIN: catch every error and do the recovery in the below section
152 28 : let mut generated_layers = Vec::new();
153 70 : for (inner, layer_key) in generated_layer_writers {
154 42 : if discard_fn(&layer_key).await {
155 16 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
156 16 : } else {
157 26 : let layer = match inner
158 26 : .finish_with_end_key(layer_key.key_range.end, ctx)
159 53 : .await
160 : {
161 26 : Ok((desc, path)) => {
162 26 : match Layer::finish_creating(self.conf, tline, desc, &path) {
163 26 : Ok(layer) => layer,
164 0 : Err(e) => {
165 0 : tokio::fs::remove_file(&path).await.ok();
166 0 : clean_up_layers(generated_layers);
167 0 : return Err(e);
168 : }
169 : }
170 : }
171 0 : Err(e) => {
172 0 : // ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong,
173 0 : // so we don't need to remove it by ourselves.
174 0 : clean_up_layers(generated_layers);
175 0 : return Err(e);
176 : }
177 : };
178 26 : generated_layers.push(SplitWriterResult::Produced(layer));
179 : }
180 : }
181 : // END: catch every error and do the recovery in the above section
182 28 : Ok(generated_layers)
183 28 : }
184 :
185 : #[cfg(test)]
186 4 : pub(crate) async fn finish(
187 4 : self,
188 4 : tline: &Arc<Timeline>,
189 4 : ctx: &RequestContext,
190 4 : end_key: Key,
191 4 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
192 6 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
193 12 : .await
194 4 : }
195 : }
196 :
197 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
198 : ///
199 : /// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
200 : /// there might be leftover files to be cleaned up).
201 : ///
202 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
203 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
204 : /// will split them into multiple files based on size.
205 : #[must_use]
206 : pub struct SplitDeltaLayerWriter {
207 : inner: Option<(Key, DeltaLayerWriter)>,
208 : target_layer_size: u64,
209 : generated_layers: Vec<SplitWriterResult>,
210 : conf: &'static PageServerConf,
211 : timeline_id: TimelineId,
212 : tenant_shard_id: TenantShardId,
213 : lsn_range: Range<Lsn>,
214 : last_key_written: Key,
215 : }
216 :
217 : impl SplitDeltaLayerWriter {
218 36 : pub async fn new(
219 36 : conf: &'static PageServerConf,
220 36 : timeline_id: TimelineId,
221 36 : tenant_shard_id: TenantShardId,
222 36 : lsn_range: Range<Lsn>,
223 36 : target_layer_size: u64,
224 36 : ) -> anyhow::Result<Self> {
225 36 : Ok(Self {
226 36 : target_layer_size,
227 36 : inner: None,
228 36 : generated_layers: Vec::new(),
229 36 : conf,
230 36 : timeline_id,
231 36 : tenant_shard_id,
232 36 : lsn_range,
233 36 : last_key_written: Key::MIN,
234 36 : })
235 36 : }
236 :
237 : /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
238 12134 : pub async fn put_value_with_discard_fn<D, F>(
239 12134 : &mut self,
240 12134 : key: Key,
241 12134 : lsn: Lsn,
242 12134 : val: Value,
243 12134 : tline: &Arc<Timeline>,
244 12134 : ctx: &RequestContext,
245 12134 : discard: D,
246 12134 : ) -> anyhow::Result<()>
247 12134 : where
248 12134 : D: FnOnce(&PersistentLayerKey) -> F,
249 12134 : F: Future<Output = bool>,
250 12134 : {
251 12134 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
252 12134 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
253 12134 : //
254 12134 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
255 12134 : // strategy. https://github.com/neondatabase/neon/issues/8837
256 12134 :
257 12134 : if self.inner.is_none() {
258 32 : self.inner = Some((
259 32 : key,
260 32 : DeltaLayerWriter::new(
261 32 : self.conf,
262 32 : self.timeline_id,
263 32 : self.tenant_shard_id,
264 32 : key,
265 32 : self.lsn_range.clone(),
266 32 : ctx,
267 32 : )
268 16 : .await?,
269 : ));
270 12102 : }
271 12134 : let (_, inner) = self.inner.as_mut().unwrap();
272 12134 :
273 12134 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
274 12134 : if inner.num_keys() >= 1
275 12102 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
276 : {
277 2996 : if key != self.last_key_written {
278 14 : let next_delta_writer = DeltaLayerWriter::new(
279 14 : self.conf,
280 14 : self.timeline_id,
281 14 : self.tenant_shard_id,
282 14 : key,
283 14 : self.lsn_range.clone(),
284 14 : ctx,
285 14 : )
286 7 : .await?;
287 14 : let (start_key, prev_delta_writer) =
288 14 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
289 14 : let layer_key = PersistentLayerKey {
290 14 : key_range: start_key..key,
291 14 : lsn_range: self.lsn_range.clone(),
292 14 : is_delta: true,
293 14 : };
294 14 : if discard(&layer_key).await {
295 6 : drop(prev_delta_writer);
296 6 : self.generated_layers
297 6 : .push(SplitWriterResult::Discarded(layer_key));
298 6 : } else {
299 : // `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
300 : // files for `finish_creating`.
301 20 : let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
302 8 : let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
303 8 : Ok(layer) => layer,
304 0 : Err(e) => {
305 0 : tokio::fs::remove_file(&path).await.ok();
306 0 : return Err(e);
307 : }
308 : };
309 8 : self.generated_layers
310 8 : .push(SplitWriterResult::Produced(delta_layer));
311 : }
312 2982 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
313 : // We have to produce a very large file b/c a key is updated too often.
314 0 : anyhow::bail!(
315 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
316 0 : key,
317 0 : inner.estimated_size()
318 0 : );
319 2982 : }
320 9138 : }
321 12134 : self.last_key_written = key;
322 12134 : let (_, inner) = self.inner.as_mut().unwrap();
323 12134 : inner.put_value(key, lsn, val, ctx).await
324 12134 : }
325 :
326 4006 : pub async fn put_value(
327 4006 : &mut self,
328 4006 : key: Key,
329 4006 : lsn: Lsn,
330 4006 : val: Value,
331 4006 : tline: &Arc<Timeline>,
332 4006 : ctx: &RequestContext,
333 4006 : ) -> anyhow::Result<()> {
334 4006 : self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
335 262 : .await
336 4006 : }
337 :
338 32 : pub(crate) async fn finish_with_discard_fn<D, F>(
339 32 : self,
340 32 : tline: &Arc<Timeline>,
341 32 : ctx: &RequestContext,
342 32 : discard: D,
343 32 : ) -> anyhow::Result<Vec<SplitWriterResult>>
344 32 : where
345 32 : D: FnOnce(&PersistentLayerKey) -> F,
346 32 : F: Future<Output = bool>,
347 32 : {
348 32 : let Self {
349 32 : mut generated_layers,
350 32 : inner,
351 32 : ..
352 32 : } = self;
353 32 : let Some((start_key, inner)) = inner else {
354 4 : return Ok(generated_layers);
355 : };
356 28 : if inner.num_keys() == 0 {
357 0 : return Ok(generated_layers);
358 28 : }
359 28 : let end_key = self.last_key_written.next();
360 28 : let layer_key = PersistentLayerKey {
361 28 : key_range: start_key..end_key,
362 28 : lsn_range: self.lsn_range.clone(),
363 28 : is_delta: true,
364 28 : };
365 28 : if discard(&layer_key).await {
366 10 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
367 10 : } else {
368 : // `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
369 : // files for `finish_creating`.
370 48 : let (desc, path) = inner.finish(end_key, ctx).await?;
371 18 : let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
372 18 : Ok(layer) => layer,
373 0 : Err(e) => {
374 0 : tokio::fs::remove_file(&path).await.ok();
375 0 : return Err(e);
376 : }
377 : };
378 18 : generated_layers.push(SplitWriterResult::Produced(delta_layer));
379 : }
380 28 : Ok(generated_layers)
381 32 : }
382 :
383 : #[cfg(test)]
384 6 : pub(crate) async fn finish(
385 6 : self,
386 6 : tline: &Arc<Timeline>,
387 6 : ctx: &RequestContext,
388 6 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
389 6 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
390 18 : .await
391 6 : }
392 :
393 : /// This function will be deprecated with #8841.
394 4 : pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
395 4 : Ok((self.generated_layers, self.inner.map(|x| x.1)))
396 4 : }
397 : }
398 :
399 : #[cfg(test)]
400 : mod tests {
401 : use itertools::Itertools;
402 : use rand::{RngCore, SeedableRng};
403 :
404 : use crate::{
405 : tenant::{
406 : harness::{TenantHarness, TIMELINE_ID},
407 : storage_layer::AsLayerDesc,
408 : },
409 : DEFAULT_PG_VERSION,
410 : };
411 :
412 : use super::*;
413 :
414 20052 : fn get_key(id: u32) -> Key {
415 20052 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
416 20052 : key.field6 = id;
417 20052 : key
418 20052 : }
419 :
420 8 : fn get_img(id: u32) -> Bytes {
421 8 : format!("{id:064}").into()
422 8 : }
423 :
424 20004 : fn get_large_img() -> Bytes {
425 20004 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
426 20004 : let mut data = vec![0; 8192];
427 20004 : rng.fill_bytes(&mut data);
428 20004 : data.into()
429 20004 : }
430 :
431 : #[tokio::test]
432 2 : async fn write_one_image() {
433 2 : let harness = TenantHarness::create("split_writer_write_one_image")
434 2 : .await
435 2 : .unwrap();
436 5 : let (tenant, ctx) = harness.load().await;
437 2 :
438 2 : let tline = tenant
439 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
440 4 : .await
441 2 : .unwrap();
442 2 :
443 2 : let mut image_writer = SplitImageLayerWriter::new(
444 2 : tenant.conf,
445 2 : tline.timeline_id,
446 2 : tenant.tenant_shard_id,
447 2 : get_key(0),
448 2 : Lsn(0x18),
449 2 : 4 * 1024 * 1024,
450 2 : &ctx,
451 2 : )
452 2 : .await
453 2 : .unwrap();
454 2 :
455 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
456 2 : tenant.conf,
457 2 : tline.timeline_id,
458 2 : tenant.tenant_shard_id,
459 2 : Lsn(0x18)..Lsn(0x20),
460 2 : 4 * 1024 * 1024,
461 2 : )
462 2 : .await
463 2 : .unwrap();
464 2 :
465 2 : image_writer
466 2 : .put_image(get_key(0), get_img(0), &ctx)
467 2 : .await
468 2 : .unwrap();
469 2 : let layers = image_writer
470 2 : .finish(&tline, &ctx, get_key(10))
471 4 : .await
472 2 : .unwrap();
473 2 : assert_eq!(layers.len(), 1);
474 2 :
475 2 : delta_writer
476 2 : .put_value(
477 2 : get_key(0),
478 2 : Lsn(0x18),
479 2 : Value::Image(get_img(0)),
480 2 : &tline,
481 2 : &ctx,
482 2 : )
483 2 : .await
484 2 : .unwrap();
485 5 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
486 2 : assert_eq!(layers.len(), 1);
487 2 : assert_eq!(
488 2 : layers
489 2 : .into_iter()
490 2 : .next()
491 2 : .unwrap()
492 2 : .into_resident_layer()
493 2 : .layer_desc()
494 2 : .key(),
495 2 : PersistentLayerKey {
496 2 : key_range: get_key(0)..get_key(1),
497 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
498 2 : is_delta: true
499 2 : }
500 2 : );
501 2 : }
502 :
503 : #[tokio::test]
504 2 : async fn write_split() {
505 2 : // Test the split writer with retaining all the layers we have produced (discard=false)
506 4370 : write_split_helper("split_writer_write_split", false).await;
507 2 : }
508 :
509 : #[tokio::test]
510 2 : async fn write_split_discard() {
511 2 : // Test the split writer with discarding all the layers we have produced (discard=true)
512 4334 : write_split_helper("split_writer_write_split_discard", true).await;
513 2 : }
514 :
515 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
516 : /// set to true, all layers will be discarded.
517 4 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
518 4 : let harness = TenantHarness::create(harness_name).await.unwrap();
519 16 : let (tenant, ctx) = harness.load().await;
520 :
521 4 : let tline = tenant
522 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
523 8 : .await
524 4 : .unwrap();
525 :
526 4 : let mut image_writer = SplitImageLayerWriter::new(
527 4 : tenant.conf,
528 4 : tline.timeline_id,
529 4 : tenant.tenant_shard_id,
530 4 : get_key(0),
531 4 : Lsn(0x18),
532 4 : 4 * 1024 * 1024,
533 4 : &ctx,
534 4 : )
535 2 : .await
536 4 : .unwrap();
537 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
538 4 : tenant.conf,
539 4 : tline.timeline_id,
540 4 : tenant.tenant_shard_id,
541 4 : Lsn(0x18)..Lsn(0x20),
542 4 : 4 * 1024 * 1024,
543 4 : )
544 0 : .await
545 4 : .unwrap();
546 : const N: usize = 2000;
547 8004 : for i in 0..N {
548 8000 : let i = i as u32;
549 8000 : image_writer
550 8000 : .put_image(get_key(i), get_large_img(), &ctx)
551 8130 : .await
552 8000 : .unwrap();
553 8000 : delta_writer
554 8000 : .put_value_with_discard_fn(
555 8000 : get_key(i),
556 8000 : Lsn(0x20),
557 8000 : Value::Image(get_large_img()),
558 8000 : &tline,
559 8000 : &ctx,
560 8000 : |_| async { discard },
561 8000 : )
562 527 : .await
563 8000 : .unwrap();
564 : }
565 4 : let image_layers = image_writer
566 16 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
567 16 : .await
568 4 : .unwrap();
569 4 : let delta_layers = delta_writer
570 4 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
571 5 : .await
572 4 : .unwrap();
573 4 : let image_layers = image_layers
574 4 : .into_iter()
575 16 : .map(|x| {
576 16 : if discard {
577 8 : x.into_discarded_layer()
578 : } else {
579 8 : x.into_resident_layer().layer_desc().key()
580 : }
581 16 : })
582 4 : .collect_vec();
583 4 : let delta_layers = delta_layers
584 4 : .into_iter()
585 16 : .map(|x| {
586 16 : if discard {
587 8 : x.into_discarded_layer()
588 : } else {
589 8 : x.into_resident_layer().layer_desc().key()
590 : }
591 16 : })
592 4 : .collect_vec();
593 4 : assert_eq!(image_layers.len(), N / 512 + 1);
594 4 : assert_eq!(delta_layers.len(), N / 512 + 1);
595 4 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
596 4 : assert_eq!(
597 4 : delta_layers.last().unwrap().key_range.end,
598 4 : get_key(N as u32)
599 4 : );
600 16 : for idx in 0..image_layers.len() {
601 16 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
602 16 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
603 16 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
604 16 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
605 16 : if idx > 0 {
606 12 : assert_eq!(
607 12 : image_layers[idx - 1].key_range.end,
608 12 : image_layers[idx].key_range.start
609 12 : );
610 12 : assert_eq!(
611 12 : delta_layers[idx - 1].key_range.end,
612 12 : delta_layers[idx].key_range.start
613 12 : );
614 4 : }
615 : }
616 4 : }
617 :
618 : #[tokio::test]
619 2 : async fn write_large_img() {
620 2 : let harness = TenantHarness::create("split_writer_write_large_img")
621 2 : .await
622 2 : .unwrap();
623 8 : let (tenant, ctx) = harness.load().await;
624 2 :
625 2 : let tline = tenant
626 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
627 4 : .await
628 2 : .unwrap();
629 2 :
630 2 : let mut image_writer = SplitImageLayerWriter::new(
631 2 : tenant.conf,
632 2 : tline.timeline_id,
633 2 : tenant.tenant_shard_id,
634 2 : get_key(0),
635 2 : Lsn(0x18),
636 2 : 4 * 1024,
637 2 : &ctx,
638 2 : )
639 2 : .await
640 2 : .unwrap();
641 2 :
642 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
643 2 : tenant.conf,
644 2 : tline.timeline_id,
645 2 : tenant.tenant_shard_id,
646 2 : Lsn(0x18)..Lsn(0x20),
647 2 : 4 * 1024,
648 2 : )
649 2 : .await
650 2 : .unwrap();
651 2 :
652 2 : image_writer
653 2 : .put_image(get_key(0), get_img(0), &ctx)
654 2 : .await
655 2 : .unwrap();
656 2 : image_writer
657 2 : .put_image(get_key(1), get_large_img(), &ctx)
658 3 : .await
659 2 : .unwrap();
660 2 : let layers = image_writer
661 2 : .finish(&tline, &ctx, get_key(10))
662 8 : .await
663 2 : .unwrap();
664 2 : assert_eq!(layers.len(), 2);
665 2 :
666 2 : delta_writer
667 2 : .put_value(
668 2 : get_key(0),
669 2 : Lsn(0x18),
670 2 : Value::Image(get_img(0)),
671 2 : &tline,
672 2 : &ctx,
673 2 : )
674 2 : .await
675 2 : .unwrap();
676 2 : delta_writer
677 2 : .put_value(
678 2 : get_key(1),
679 2 : Lsn(0x1A),
680 2 : Value::Image(get_large_img()),
681 2 : &tline,
682 2 : &ctx,
683 2 : )
684 6 : .await
685 2 : .unwrap();
686 5 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
687 2 : assert_eq!(layers.len(), 2);
688 2 : let mut layers_iter = layers.into_iter();
689 2 : assert_eq!(
690 2 : layers_iter
691 2 : .next()
692 2 : .unwrap()
693 2 : .into_resident_layer()
694 2 : .layer_desc()
695 2 : .key(),
696 2 : PersistentLayerKey {
697 2 : key_range: get_key(0)..get_key(1),
698 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
699 2 : is_delta: true
700 2 : }
701 2 : );
702 2 : assert_eq!(
703 2 : layers_iter
704 2 : .next()
705 2 : .unwrap()
706 2 : .into_resident_layer()
707 2 : .layer_desc()
708 2 : .key(),
709 2 : PersistentLayerKey {
710 2 : key_range: get_key(1)..get_key(2),
711 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
712 2 : is_delta: true
713 2 : }
714 2 : );
715 2 : }
716 :
717 : #[tokio::test]
718 2 : async fn write_split_single_key() {
719 2 : let harness = TenantHarness::create("split_writer_write_split_single_key")
720 2 : .await
721 2 : .unwrap();
722 8 : let (tenant, ctx) = harness.load().await;
723 2 :
724 2 : let tline = tenant
725 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
726 4 : .await
727 2 : .unwrap();
728 2 :
729 2 : const N: usize = 2000;
730 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
731 2 : tenant.conf,
732 2 : tline.timeline_id,
733 2 : tenant.tenant_shard_id,
734 2 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
735 2 : 4 * 1024 * 1024,
736 2 : )
737 2 : .await
738 2 : .unwrap();
739 2 :
740 4002 : for i in 0..N {
741 4000 : let i = i as u32;
742 4000 : delta_writer
743 4000 : .put_value(
744 4000 : get_key(0),
745 4000 : Lsn(i as u64 * 16 + 0x10),
746 4000 : Value::Image(get_large_img()),
747 4000 : &tline,
748 4000 : &ctx,
749 4000 : )
750 254 : .await
751 4000 : .unwrap();
752 2 : }
753 8 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
754 2 : assert_eq!(delta_layers.len(), 1);
755 2 : let delta_layer = delta_layers
756 2 : .into_iter()
757 2 : .next()
758 2 : .unwrap()
759 2 : .into_resident_layer();
760 2 : assert_eq!(
761 2 : delta_layer.layer_desc().key(),
762 2 : PersistentLayerKey {
763 2 : key_range: get_key(0)..get_key(1),
764 2 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
765 2 : is_delta: true
766 2 : }
767 2 : );
768 2 : }
769 : }
|