Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
9 :
10 : use super::layer::S3_UPLOAD_LIMIT;
11 : use super::{
12 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
13 : };
14 :
15 : pub(crate) enum SplitWriterResult {
16 : Produced(ResidentLayer),
17 : Discarded(PersistentLayerKey),
18 : }
19 :
20 : #[cfg(test)]
21 : impl SplitWriterResult {
22 120 : fn into_resident_layer(self) -> ResidentLayer {
23 120 : match self {
24 120 : SplitWriterResult::Produced(layer) => layer,
25 0 : SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
26 : }
27 120 : }
28 :
29 0 : fn into_discarded_layer(self) -> PersistentLayerKey {
30 0 : match self {
31 0 : SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
32 0 : SplitWriterResult::Discarded(layer) => layer,
33 0 : }
34 0 : }
35 : }
36 :
37 : /// An image writer that takes images and produces multiple image layers.
38 : ///
39 : /// The interface does not guarantee atomicity (i.e., if the image layer generation
40 : /// fails, there might be leftover files to be cleaned up)
41 : #[must_use]
42 : pub struct SplitImageLayerWriter {
43 : inner: ImageLayerWriter,
44 : target_layer_size: u64,
45 : generated_layers: Vec<SplitWriterResult>,
46 : conf: &'static PageServerConf,
47 : timeline_id: TimelineId,
48 : tenant_shard_id: TenantShardId,
49 : lsn: Lsn,
50 : start_key: Key,
51 : }
52 :
53 : impl SplitImageLayerWriter {
54 96 : pub async fn new(
55 96 : conf: &'static PageServerConf,
56 96 : timeline_id: TimelineId,
57 96 : tenant_shard_id: TenantShardId,
58 96 : start_key: Key,
59 96 : lsn: Lsn,
60 96 : target_layer_size: u64,
61 96 : ctx: &RequestContext,
62 96 : ) -> anyhow::Result<Self> {
63 96 : Ok(Self {
64 96 : target_layer_size,
65 96 : inner: ImageLayerWriter::new(
66 96 : conf,
67 96 : timeline_id,
68 96 : tenant_shard_id,
69 96 : &(start_key..Key::MAX),
70 96 : lsn,
71 96 : ctx,
72 96 : )
73 48 : .await?,
74 96 : generated_layers: Vec::new(),
75 96 : conf,
76 96 : timeline_id,
77 96 : tenant_shard_id,
78 96 : lsn,
79 96 : start_key,
80 : })
81 96 : }
82 :
83 25242 : pub async fn put_image_with_discard_fn<D, F>(
84 25242 : &mut self,
85 25242 : key: Key,
86 25242 : img: Bytes,
87 25242 : tline: &Arc<Timeline>,
88 25242 : ctx: &RequestContext,
89 25242 : discard: D,
90 25242 : ) -> anyhow::Result<()>
91 25242 : where
92 25242 : D: FnOnce(&PersistentLayerKey) -> F,
93 25242 : F: Future<Output = bool>,
94 25242 : {
95 25242 : // The current estimation is an upper bound of the space that the key/image could take
96 25242 : // because we did not consider compression in this estimation. The resulting image layer
97 25242 : // could be smaller than the target size.
98 25242 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
99 25242 : if self.inner.num_keys() >= 1
100 25146 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
101 : {
102 42 : let next_image_writer = ImageLayerWriter::new(
103 42 : self.conf,
104 42 : self.timeline_id,
105 42 : self.tenant_shard_id,
106 42 : &(key..Key::MAX),
107 42 : self.lsn,
108 42 : ctx,
109 42 : )
110 21 : .await?;
111 42 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
112 42 : let layer_key = PersistentLayerKey {
113 42 : key_range: self.start_key..key,
114 42 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
115 42 : is_delta: false,
116 42 : };
117 42 : self.start_key = key;
118 42 :
119 42 : if discard(&layer_key).await {
120 0 : drop(prev_image_writer);
121 0 : self.generated_layers
122 0 : .push(SplitWriterResult::Discarded(layer_key));
123 0 : } else {
124 42 : self.generated_layers.push(SplitWriterResult::Produced(
125 42 : prev_image_writer
126 42 : .finish_with_end_key(tline, key, ctx)
127 90 : .await?,
128 : ));
129 : }
130 25200 : }
131 25620 : self.inner.put_image(key, img, ctx).await
132 25242 : }
133 :
134 : #[cfg(test)]
135 18 : pub async fn put_image(
136 18 : &mut self,
137 18 : key: Key,
138 18 : img: Bytes,
139 18 : tline: &Arc<Timeline>,
140 18 : ctx: &RequestContext,
141 18 : ) -> anyhow::Result<()> {
142 18 : self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
143 33 : .await
144 18 : }
145 :
146 84 : pub(crate) async fn finish_with_discard_fn<D, F>(
147 84 : self,
148 84 : tline: &Arc<Timeline>,
149 84 : ctx: &RequestContext,
150 84 : end_key: Key,
151 84 : discard: D,
152 84 : ) -> anyhow::Result<Vec<SplitWriterResult>>
153 84 : where
154 84 : D: FnOnce(&PersistentLayerKey) -> F,
155 84 : F: Future<Output = bool>,
156 84 : {
157 84 : let Self {
158 84 : mut generated_layers,
159 84 : inner,
160 84 : ..
161 84 : } = self;
162 84 : if inner.num_keys() == 0 {
163 0 : return Ok(generated_layers);
164 84 : }
165 84 : let layer_key = PersistentLayerKey {
166 84 : key_range: self.start_key..end_key,
167 84 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
168 84 : is_delta: false,
169 84 : };
170 84 : if discard(&layer_key).await {
171 24 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
172 24 : } else {
173 60 : generated_layers.push(SplitWriterResult::Produced(
174 120 : inner.finish_with_end_key(tline, end_key, ctx).await?,
175 : ));
176 : }
177 84 : Ok(generated_layers)
178 84 : }
179 :
180 : #[cfg(test)]
181 24 : pub(crate) async fn finish(
182 24 : self,
183 24 : tline: &Arc<Timeline>,
184 24 : ctx: &RequestContext,
185 24 : end_key: Key,
186 24 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
187 24 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
188 48 : .await
189 24 : }
190 :
191 : /// This function will be deprecated with #8841.
192 12 : pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
193 12 : Ok((self.generated_layers, self.inner))
194 12 : }
195 : }
196 :
197 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
198 : ///
199 : /// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
200 : /// there might be leftover files to be cleaned up).
201 : ///
202 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
203 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
204 : /// will split them into multiple files based on size.
205 : #[must_use]
206 : pub struct SplitDeltaLayerWriter {
207 : inner: Option<(Key, DeltaLayerWriter)>,
208 : target_layer_size: u64,
209 : generated_layers: Vec<SplitWriterResult>,
210 : conf: &'static PageServerConf,
211 : timeline_id: TimelineId,
212 : tenant_shard_id: TenantShardId,
213 : lsn_range: Range<Lsn>,
214 : last_key_written: Key,
215 : }
216 :
217 : impl SplitDeltaLayerWriter {
218 108 : pub async fn new(
219 108 : conf: &'static PageServerConf,
220 108 : timeline_id: TimelineId,
221 108 : tenant_shard_id: TenantShardId,
222 108 : lsn_range: Range<Lsn>,
223 108 : target_layer_size: u64,
224 108 : ) -> anyhow::Result<Self> {
225 108 : Ok(Self {
226 108 : target_layer_size,
227 108 : inner: None,
228 108 : generated_layers: Vec::new(),
229 108 : conf,
230 108 : timeline_id,
231 108 : tenant_shard_id,
232 108 : lsn_range,
233 108 : last_key_written: Key::MIN,
234 108 : })
235 108 : }
236 :
237 : /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
238 36402 : pub async fn put_value_with_discard_fn<D, F>(
239 36402 : &mut self,
240 36402 : key: Key,
241 36402 : lsn: Lsn,
242 36402 : val: Value,
243 36402 : tline: &Arc<Timeline>,
244 36402 : ctx: &RequestContext,
245 36402 : discard: D,
246 36402 : ) -> anyhow::Result<()>
247 36402 : where
248 36402 : D: FnOnce(&PersistentLayerKey) -> F,
249 36402 : F: Future<Output = bool>,
250 36402 : {
251 36402 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
252 36402 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
253 36402 : //
254 36402 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
255 36402 : // strategy. https://github.com/neondatabase/neon/issues/8837
256 36402 :
257 36402 : if self.inner.is_none() {
258 96 : self.inner = Some((
259 96 : key,
260 96 : DeltaLayerWriter::new(
261 96 : self.conf,
262 96 : self.timeline_id,
263 96 : self.tenant_shard_id,
264 96 : key,
265 96 : self.lsn_range.clone(),
266 96 : ctx,
267 96 : )
268 48 : .await?,
269 : ));
270 36306 : }
271 36402 : let (_, inner) = self.inner.as_mut().unwrap();
272 36402 :
273 36402 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
274 36402 : if inner.num_keys() >= 1
275 36306 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
276 : {
277 8988 : if key != self.last_key_written {
278 42 : let next_delta_writer = DeltaLayerWriter::new(
279 42 : self.conf,
280 42 : self.timeline_id,
281 42 : self.tenant_shard_id,
282 42 : key,
283 42 : self.lsn_range.clone(),
284 42 : ctx,
285 42 : )
286 21 : .await?;
287 42 : let (start_key, prev_delta_writer) =
288 42 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
289 42 : let layer_key = PersistentLayerKey {
290 42 : key_range: start_key..key,
291 42 : lsn_range: self.lsn_range.clone(),
292 42 : is_delta: true,
293 42 : };
294 42 : if discard(&layer_key).await {
295 0 : drop(prev_delta_writer);
296 0 : self.generated_layers
297 0 : .push(SplitWriterResult::Discarded(layer_key));
298 0 : } else {
299 111 : let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
300 42 : let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
301 42 : self.generated_layers
302 42 : .push(SplitWriterResult::Produced(delta_layer));
303 : }
304 8946 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
305 : // We have to produce a very large file b/c a key is updated too often.
306 0 : anyhow::bail!(
307 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
308 0 : key,
309 0 : inner.estimated_size()
310 0 : );
311 8946 : }
312 27414 : }
313 36402 : self.last_key_written = key;
314 36402 : let (_, inner) = self.inner.as_mut().unwrap();
315 36402 : inner.put_value(key, lsn, val, ctx).await
316 36402 : }
317 :
318 12018 : pub async fn put_value(
319 12018 : &mut self,
320 12018 : key: Key,
321 12018 : lsn: Lsn,
322 12018 : val: Value,
323 12018 : tline: &Arc<Timeline>,
324 12018 : ctx: &RequestContext,
325 12018 : ) -> anyhow::Result<()> {
326 12018 : self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
327 786 : .await
328 12018 : }
329 :
330 96 : pub(crate) async fn finish_with_discard_fn<D, F>(
331 96 : self,
332 96 : tline: &Arc<Timeline>,
333 96 : ctx: &RequestContext,
334 96 : discard: D,
335 96 : ) -> anyhow::Result<Vec<SplitWriterResult>>
336 96 : where
337 96 : D: FnOnce(&PersistentLayerKey) -> F,
338 96 : F: Future<Output = bool>,
339 96 : {
340 96 : let Self {
341 96 : mut generated_layers,
342 96 : inner,
343 96 : ..
344 96 : } = self;
345 96 : let Some((start_key, inner)) = inner else {
346 12 : return Ok(generated_layers);
347 : };
348 84 : if inner.num_keys() == 0 {
349 0 : return Ok(generated_layers);
350 84 : }
351 84 : let end_key = self.last_key_written.next();
352 84 : let layer_key = PersistentLayerKey {
353 84 : key_range: start_key..end_key,
354 84 : lsn_range: self.lsn_range.clone(),
355 84 : is_delta: true,
356 84 : };
357 84 : if discard(&layer_key).await {
358 24 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
359 24 : } else {
360 162 : let (desc, path) = inner.finish(end_key, ctx).await?;
361 60 : let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
362 60 : generated_layers.push(SplitWriterResult::Produced(delta_layer));
363 : }
364 84 : Ok(generated_layers)
365 96 : }
366 :
367 : #[cfg(test)]
368 30 : pub(crate) async fn finish(
369 30 : self,
370 30 : tline: &Arc<Timeline>,
371 30 : ctx: &RequestContext,
372 30 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
373 30 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
374 84 : .await
375 30 : }
376 :
377 : /// This function will be deprecated with #8841.
378 12 : pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
379 12 : Ok((self.generated_layers, self.inner.map(|x| x.1)))
380 12 : }
381 : }
382 :
383 : #[cfg(test)]
384 : mod tests {
385 : use itertools::Itertools;
386 : use rand::{RngCore, SeedableRng};
387 :
388 : use crate::{
389 : tenant::{
390 : harness::{TenantHarness, TIMELINE_ID},
391 : storage_layer::AsLayerDesc,
392 : },
393 : DEFAULT_PG_VERSION,
394 : };
395 :
396 : use super::*;
397 :
398 60156 : fn get_key(id: u32) -> Key {
399 60156 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
400 60156 : key.field6 = id;
401 60156 : key
402 60156 : }
403 :
404 24 : fn get_img(id: u32) -> Bytes {
405 24 : format!("{id:064}").into()
406 24 : }
407 :
408 60012 : fn get_large_img() -> Bytes {
409 60012 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
410 60012 : let mut data = vec![0; 8192];
411 60012 : rng.fill_bytes(&mut data);
412 60012 : data.into()
413 60012 : }
414 :
415 : #[tokio::test]
416 6 : async fn write_one_image() {
417 6 : let harness = TenantHarness::create("split_writer_write_one_image")
418 6 : .await
419 6 : .unwrap();
420 24 : let (tenant, ctx) = harness.load().await;
421 6 :
422 6 : let tline = tenant
423 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
424 12 : .await
425 6 : .unwrap();
426 6 :
427 6 : let mut image_writer = SplitImageLayerWriter::new(
428 6 : tenant.conf,
429 6 : tline.timeline_id,
430 6 : tenant.tenant_shard_id,
431 6 : get_key(0),
432 6 : Lsn(0x18),
433 6 : 4 * 1024 * 1024,
434 6 : &ctx,
435 6 : )
436 6 : .await
437 6 : .unwrap();
438 6 :
439 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
440 6 : tenant.conf,
441 6 : tline.timeline_id,
442 6 : tenant.tenant_shard_id,
443 6 : Lsn(0x18)..Lsn(0x20),
444 6 : 4 * 1024 * 1024,
445 6 : )
446 6 : .await
447 6 : .unwrap();
448 6 :
449 6 : image_writer
450 6 : .put_image(get_key(0), get_img(0), &tline, &ctx)
451 6 : .await
452 6 : .unwrap();
453 6 : let layers = image_writer
454 6 : .finish(&tline, &ctx, get_key(10))
455 12 : .await
456 6 : .unwrap();
457 6 : assert_eq!(layers.len(), 1);
458 6 :
459 6 : delta_writer
460 6 : .put_value(
461 6 : get_key(0),
462 6 : Lsn(0x18),
463 6 : Value::Image(get_img(0)),
464 6 : &tline,
465 6 : &ctx,
466 6 : )
467 6 : .await
468 6 : .unwrap();
469 15 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
470 6 : assert_eq!(layers.len(), 1);
471 6 : assert_eq!(
472 6 : layers
473 6 : .into_iter()
474 6 : .next()
475 6 : .unwrap()
476 6 : .into_resident_layer()
477 6 : .layer_desc()
478 6 : .key(),
479 6 : PersistentLayerKey {
480 6 : key_range: get_key(0)..get_key(1),
481 6 : lsn_range: Lsn(0x18)..Lsn(0x20),
482 6 : is_delta: true
483 6 : }
484 6 : );
485 6 : }
486 :
487 : #[tokio::test]
488 6 : async fn write_split() {
489 13110 : write_split_helper("split_writer_write_split", false).await;
490 6 : }
491 :
492 : #[tokio::test]
493 6 : async fn write_split_discard() {
494 13110 : write_split_helper("split_writer_write_split_discard", false).await;
495 6 : }
496 :
497 12 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
498 12 : let harness = TenantHarness::create(harness_name).await.unwrap();
499 48 : let (tenant, ctx) = harness.load().await;
500 :
501 12 : let tline = tenant
502 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
503 24 : .await
504 12 : .unwrap();
505 :
506 12 : let mut image_writer = SplitImageLayerWriter::new(
507 12 : tenant.conf,
508 12 : tline.timeline_id,
509 12 : tenant.tenant_shard_id,
510 12 : get_key(0),
511 12 : Lsn(0x18),
512 12 : 4 * 1024 * 1024,
513 12 : &ctx,
514 12 : )
515 6 : .await
516 12 : .unwrap();
517 12 : let mut delta_writer = SplitDeltaLayerWriter::new(
518 12 : tenant.conf,
519 12 : tline.timeline_id,
520 12 : tenant.tenant_shard_id,
521 12 : Lsn(0x18)..Lsn(0x20),
522 12 : 4 * 1024 * 1024,
523 12 : )
524 0 : .await
525 12 : .unwrap();
526 : const N: usize = 2000;
527 24012 : for i in 0..N {
528 24000 : let i = i as u32;
529 24000 : image_writer
530 24000 : .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
531 36 : discard
532 24000 : })
533 24456 : .await
534 24000 : .unwrap();
535 24000 : delta_writer
536 24000 : .put_value_with_discard_fn(
537 24000 : get_key(i),
538 24000 : Lsn(0x20),
539 24000 : Value::Image(get_large_img()),
540 24000 : &tline,
541 24000 : &ctx,
542 24000 : |_| async { discard },
543 24000 : )
544 1632 : .await
545 24000 : .unwrap();
546 : }
547 12 : let image_layers = image_writer
548 12 : .finish(&tline, &ctx, get_key(N as u32))
549 24 : .await
550 12 : .unwrap();
551 30 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
552 12 : if discard {
553 0 : for layer in image_layers {
554 0 : layer.into_discarded_layer();
555 0 : }
556 0 : for layer in delta_layers {
557 0 : layer.into_discarded_layer();
558 0 : }
559 : } else {
560 12 : let image_layers = image_layers
561 12 : .into_iter()
562 48 : .map(|x| x.into_resident_layer())
563 12 : .collect_vec();
564 12 : let delta_layers = delta_layers
565 12 : .into_iter()
566 48 : .map(|x| x.into_resident_layer())
567 12 : .collect_vec();
568 12 : assert_eq!(image_layers.len(), N / 512 + 1);
569 12 : assert_eq!(delta_layers.len(), N / 512 + 1);
570 12 : assert_eq!(
571 12 : delta_layers.first().unwrap().layer_desc().key_range.start,
572 12 : get_key(0)
573 12 : );
574 12 : assert_eq!(
575 12 : delta_layers.last().unwrap().layer_desc().key_range.end,
576 12 : get_key(N as u32)
577 12 : );
578 48 : for idx in 0..image_layers.len() {
579 48 : assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
580 48 : assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
581 48 : assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
582 48 : assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
583 48 : if idx > 0 {
584 36 : assert_eq!(
585 36 : image_layers[idx - 1].layer_desc().key_range.end,
586 36 : image_layers[idx].layer_desc().key_range.start
587 36 : );
588 36 : assert_eq!(
589 36 : delta_layers[idx - 1].layer_desc().key_range.end,
590 36 : delta_layers[idx].layer_desc().key_range.start
591 36 : );
592 12 : }
593 : }
594 : }
595 12 : }
596 :
597 : #[tokio::test]
598 6 : async fn write_large_img() {
599 6 : let harness = TenantHarness::create("split_writer_write_large_img")
600 6 : .await
601 6 : .unwrap();
602 24 : let (tenant, ctx) = harness.load().await;
603 6 :
604 6 : let tline = tenant
605 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
606 12 : .await
607 6 : .unwrap();
608 6 :
609 6 : let mut image_writer = SplitImageLayerWriter::new(
610 6 : tenant.conf,
611 6 : tline.timeline_id,
612 6 : tenant.tenant_shard_id,
613 6 : get_key(0),
614 6 : Lsn(0x18),
615 6 : 4 * 1024,
616 6 : &ctx,
617 6 : )
618 6 : .await
619 6 : .unwrap();
620 6 :
621 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
622 6 : tenant.conf,
623 6 : tline.timeline_id,
624 6 : tenant.tenant_shard_id,
625 6 : Lsn(0x18)..Lsn(0x20),
626 6 : 4 * 1024,
627 6 : )
628 6 : .await
629 6 : .unwrap();
630 6 :
631 6 : image_writer
632 6 : .put_image(get_key(0), get_img(0), &tline, &ctx)
633 6 : .await
634 6 : .unwrap();
635 6 : image_writer
636 6 : .put_image(get_key(1), get_large_img(), &tline, &ctx)
637 21 : .await
638 6 : .unwrap();
639 6 : let layers = image_writer
640 6 : .finish(&tline, &ctx, get_key(10))
641 12 : .await
642 6 : .unwrap();
643 6 : assert_eq!(layers.len(), 2);
644 6 :
645 6 : delta_writer
646 6 : .put_value(
647 6 : get_key(0),
648 6 : Lsn(0x18),
649 6 : Value::Image(get_img(0)),
650 6 : &tline,
651 6 : &ctx,
652 6 : )
653 6 : .await
654 6 : .unwrap();
655 6 : delta_writer
656 6 : .put_value(
657 6 : get_key(1),
658 6 : Lsn(0x1A),
659 6 : Value::Image(get_large_img()),
660 6 : &tline,
661 6 : &ctx,
662 6 : )
663 18 : .await
664 6 : .unwrap();
665 15 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
666 6 : assert_eq!(layers.len(), 2);
667 6 : let mut layers_iter = layers.into_iter();
668 6 : assert_eq!(
669 6 : layers_iter
670 6 : .next()
671 6 : .unwrap()
672 6 : .into_resident_layer()
673 6 : .layer_desc()
674 6 : .key(),
675 6 : PersistentLayerKey {
676 6 : key_range: get_key(0)..get_key(1),
677 6 : lsn_range: Lsn(0x18)..Lsn(0x20),
678 6 : is_delta: true
679 6 : }
680 6 : );
681 6 : assert_eq!(
682 6 : layers_iter
683 6 : .next()
684 6 : .unwrap()
685 6 : .into_resident_layer()
686 6 : .layer_desc()
687 6 : .key(),
688 6 : PersistentLayerKey {
689 6 : key_range: get_key(1)..get_key(2),
690 6 : lsn_range: Lsn(0x18)..Lsn(0x20),
691 6 : is_delta: true
692 6 : }
693 6 : );
694 6 : }
695 :
696 : #[tokio::test]
697 6 : async fn write_split_single_key() {
698 6 : let harness = TenantHarness::create("split_writer_write_split_single_key")
699 6 : .await
700 6 : .unwrap();
701 24 : let (tenant, ctx) = harness.load().await;
702 6 :
703 6 : let tline = tenant
704 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
705 12 : .await
706 6 : .unwrap();
707 6 :
708 6 : const N: usize = 2000;
709 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
710 6 : tenant.conf,
711 6 : tline.timeline_id,
712 6 : tenant.tenant_shard_id,
713 6 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
714 6 : 4 * 1024 * 1024,
715 6 : )
716 6 : .await
717 6 : .unwrap();
718 6 :
719 12006 : for i in 0..N {
720 12000 : let i = i as u32;
721 12000 : delta_writer
722 12000 : .put_value(
723 12000 : get_key(0),
724 12000 : Lsn(i as u64 * 16 + 0x10),
725 12000 : Value::Image(get_large_img()),
726 12000 : &tline,
727 12000 : &ctx,
728 12000 : )
729 762 : .await
730 12000 : .unwrap();
731 6 : }
732 24 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
733 6 : assert_eq!(delta_layers.len(), 1);
734 6 : let delta_layer = delta_layers
735 6 : .into_iter()
736 6 : .next()
737 6 : .unwrap()
738 6 : .into_resident_layer();
739 6 : assert_eq!(
740 6 : delta_layer.layer_desc().key(),
741 6 : PersistentLayerKey {
742 6 : key_range: get_key(0)..get_key(1),
743 6 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
744 6 : is_delta: true
745 6 : }
746 6 : );
747 6 : }
748 : }
|