Line data Source code
1 : //! Import a PGDATA directory into an empty root timeline.
2 : //!
3 : //! This module is adapted hackathon code by Heikki and Stas.
4 : //! Other code in the parent module was written by Christian as part of a customer PoC.
5 : //!
6 : //! The hackathon code was producing image layer files as a free-standing program.
7 : //!
8 : //! It has been modified to
9 : //! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
10 : //! - => sharding-awareness: produce image layers with only the data relevant for this shard
11 : //! - => S3 as the source for the PGDATA instead of local filesystem
12 : //!
13 : //! TODOs before productionization:
14 : //! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
15 : //! => produced image layers likely too small.
16 : //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
17 : //! - asserts / unwraps need to be replaced with errors
18 : //! - don't trust remote objects will be small (=prevent OOMs in those cases)
19 : //! - limit all in-memory buffers in size, or download to disk and read from there
20 : //! - limit task concurrency
21 : //! - generally play nice with other tenants in the system
22 : //! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
23 : //! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
24 : //! - integrate with layer eviction system
25 : //! - audit for Tenant::cancel nor Timeline::cancel responsivity
26 : //! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
27 : //!
28 : //! An incomplete set of TODOs from the Hackathon:
29 : //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
30 :
31 : use std::sync::Arc;
32 :
33 : use anyhow::{bail, ensure};
34 : use bytes::Bytes;
35 :
36 : use itertools::Itertools;
37 : use pageserver_api::{
38 : key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY},
39 : reltag::RelTag,
40 : shard::ShardIdentity,
41 : };
42 : use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, BLCKSZ};
43 : use tokio::task::JoinSet;
44 : use tracing::{debug, info_span, instrument, Instrument};
45 :
46 : use crate::{
47 : assert_u64_eq_usize::UsizeIsU64,
48 : pgdatadir_mapping::{SlruSegmentDirectory, TwoPhaseDirectory},
49 : };
50 : use crate::{
51 : context::{DownloadBehavior, RequestContext},
52 : pgdatadir_mapping::{DbDirectory, RelDirectory},
53 : task_mgr::TaskKind,
54 : tenant::storage_layer::{ImageLayerWriter, Layer},
55 : };
56 :
57 : use pageserver_api::key::Key;
58 : use pageserver_api::key::{
59 : slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, CHECKPOINT_KEY, CONTROLFILE_KEY,
60 : TWOPHASEDIR_KEY,
61 : };
62 : use pageserver_api::keyspace::singleton_range;
63 : use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range};
64 : use pageserver_api::reltag::SlruKind;
65 : use utils::bin_ser::BeSer;
66 : use utils::lsn::Lsn;
67 :
68 : use std::collections::HashSet;
69 : use std::ops::Range;
70 :
71 : use super::{
72 : importbucket_client::{ControlFile, RemoteStorageWrapper},
73 : Timeline,
74 : };
75 :
76 : use remote_storage::RemotePath;
77 :
78 0 : pub async fn run(
79 0 : timeline: Arc<Timeline>,
80 0 : pgdata_lsn: Lsn,
81 0 : control_file: ControlFile,
82 0 : storage: RemoteStorageWrapper,
83 0 : ctx: &RequestContext,
84 0 : ) -> anyhow::Result<()> {
85 0 : Flow {
86 0 : timeline,
87 0 : pgdata_lsn,
88 0 : control_file,
89 0 : tasks: Vec::new(),
90 0 : storage,
91 0 : }
92 0 : .run(ctx)
93 0 : .await
94 0 : }
95 :
96 : struct Flow {
97 : timeline: Arc<Timeline>,
98 : pgdata_lsn: Lsn,
99 : control_file: ControlFile,
100 : tasks: Vec<AnyImportTask>,
101 : storage: RemoteStorageWrapper,
102 : }
103 :
104 : impl Flow {
105 : /// Perform the ingestion into [`Self::timeline`].
106 : /// Assumes the timeline is empty (= no layers).
107 0 : pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
108 0 : let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
109 0 :
110 0 : self.pgdata_lsn = pgdata_lsn;
111 :
112 0 : let datadir = PgDataDir::new(&self.storage).await?;
113 :
114 : // Import dbdir (00:00:00 keyspace)
115 : // This is just constructed here, but will be written to the image layer in the first call to import_db()
116 0 : let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
117 0 : dbdirs: datadir
118 0 : .dbs
119 0 : .iter()
120 0 : .map(|db| ((db.spcnode, db.dboid), true))
121 0 : .collect(),
122 0 : })?);
123 0 : self.tasks
124 0 : .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
125 :
126 : // Import databases (00:spcnode:dbnode keyspace for each db)
127 0 : for db in datadir.dbs {
128 0 : self.import_db(&db).await?;
129 : }
130 :
131 : // Import SLRUs
132 0 : if self.timeline.tenant_shard_id.is_shard_zero() {
133 : // pg_xact (01:00 keyspace)
134 0 : self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
135 0 : .await?;
136 : // pg_multixact/members (01:01 keyspace)
137 0 : self.import_slru(
138 0 : SlruKind::MultiXactMembers,
139 0 : &self.storage.pgdata().join("pg_multixact/members"),
140 0 : )
141 0 : .await?;
142 : // pg_multixact/offsets (01:02 keyspace)
143 0 : self.import_slru(
144 0 : SlruKind::MultiXactOffsets,
145 0 : &self.storage.pgdata().join("pg_multixact/offsets"),
146 0 : )
147 0 : .await?;
148 0 : }
149 :
150 : // Import pg_twophase.
151 : // TODO: as empty
152 0 : let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
153 0 : xids: HashSet::new(),
154 0 : })?;
155 0 : self.tasks
156 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
157 0 : TWOPHASEDIR_KEY,
158 0 : Bytes::from(twophasedir_buf),
159 0 : )));
160 0 :
161 0 : // Controlfile, checkpoint
162 0 : self.tasks
163 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
164 0 : CONTROLFILE_KEY,
165 0 : self.control_file.control_file_buf().clone(),
166 0 : )));
167 :
168 0 : let checkpoint_buf = self
169 0 : .control_file
170 0 : .control_file_data()
171 0 : .checkPointCopy
172 0 : .encode()?;
173 0 : self.tasks
174 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
175 0 : CHECKPOINT_KEY,
176 0 : checkpoint_buf,
177 0 : )));
178 0 :
179 0 : // Assigns parts of key space to later parallel jobs
180 0 : let mut last_end_key = Key::MIN;
181 0 : let mut current_chunk = Vec::new();
182 0 : let mut current_chunk_size: usize = 0;
183 0 : let mut parallel_jobs = Vec::new();
184 0 : for task in std::mem::take(&mut self.tasks).into_iter() {
185 0 : if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
186 0 : let key_range = last_end_key..task.key_range().start;
187 0 : parallel_jobs.push(ChunkProcessingJob::new(
188 0 : key_range.clone(),
189 0 : std::mem::take(&mut current_chunk),
190 0 : &self,
191 0 : ));
192 0 : last_end_key = key_range.end;
193 0 : current_chunk_size = 0;
194 0 : }
195 0 : current_chunk_size += task.total_size();
196 0 : current_chunk.push(task);
197 : }
198 0 : parallel_jobs.push(ChunkProcessingJob::new(
199 0 : last_end_key..Key::MAX,
200 0 : current_chunk,
201 0 : &self,
202 0 : ));
203 0 :
204 0 : // Start all jobs simultaneosly
205 0 : let mut work = JoinSet::new();
206 : // TODO: semaphore?
207 0 : for job in parallel_jobs {
208 0 : let ctx: RequestContext =
209 0 : ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
210 0 : work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
211 : }
212 0 : let mut results = Vec::new();
213 0 : while let Some(result) = work.join_next().await {
214 0 : match result {
215 0 : Ok(res) => {
216 0 : results.push(res);
217 0 : }
218 0 : Err(_joinset_err) => {
219 0 : results.push(Err(anyhow::anyhow!(
220 0 : "parallel job panicked or cancelled, check pageserver logs"
221 0 : )));
222 0 : }
223 : }
224 : }
225 :
226 0 : if results.iter().all(|r| r.is_ok()) {
227 0 : Ok(())
228 : } else {
229 0 : let mut msg = String::new();
230 0 : for result in results {
231 0 : if let Err(err) = result {
232 0 : msg.push_str(&format!("{err:?}\n\n"));
233 0 : }
234 : }
235 0 : bail!("Some parallel jobs failed:\n\n{msg}");
236 : }
237 0 : }
238 :
239 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
240 : async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
241 : debug!("start");
242 : scopeguard::defer! {
243 : debug!("return");
244 : }
245 :
246 : // Import relmap (00:spcnode:dbnode:00:*:00)
247 : let relmap_key = relmap_file_key(db.spcnode, db.dboid);
248 : debug!("Constructing relmap entry, key {relmap_key}");
249 : let relmap_path = db.path.join("pg_filenode.map");
250 : let relmap_buf = self.storage.get(&relmap_path).await?;
251 : self.tasks
252 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
253 : relmap_key, relmap_buf,
254 : )));
255 :
256 : // Import reldir (00:spcnode:dbnode:00:*:01)
257 : let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
258 : debug!("Constructing reldirs entry, key {reldir_key}");
259 : let reldir_buf = RelDirectory::ser(&RelDirectory {
260 : rels: db
261 : .files
262 : .iter()
263 0 : .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
264 : .collect(),
265 : })?;
266 : self.tasks
267 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
268 : reldir_key,
269 : Bytes::from(reldir_buf),
270 : )));
271 :
272 : // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
273 : // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
274 : for file in &db.files {
275 : debug!(%file.path, %file.filesize, "importing file");
276 : let len = file.filesize;
277 : ensure!(len % 8192 == 0);
278 : let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
279 : let start_key = rel_block_to_key(file.rel_tag, start_blk);
280 : let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
281 : self.tasks
282 : .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
283 : *self.timeline.get_shard_identity(),
284 : start_key..end_key,
285 : &file.path,
286 : self.storage.clone(),
287 : )));
288 :
289 : // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
290 : if let Some(nblocks) = file.nblocks {
291 : let size_key = rel_size_to_key(file.rel_tag);
292 : //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
293 : let buf = nblocks.to_le_bytes();
294 : self.tasks
295 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
296 : size_key,
297 : Bytes::from(buf.to_vec()),
298 : )));
299 : }
300 : }
301 :
302 : Ok(())
303 : }
304 :
305 0 : async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
306 0 : assert!(self.timeline.tenant_shard_id.is_shard_zero());
307 :
308 0 : let segments = self.storage.listfilesindir(path).await?;
309 0 : let segments: Vec<(String, u32, usize)> = segments
310 0 : .into_iter()
311 0 : .filter_map(|(path, size)| {
312 0 : let filename = path.object_name()?;
313 0 : let segno = u32::from_str_radix(filename, 16).ok()?;
314 0 : Some((filename.to_string(), segno, size))
315 0 : })
316 0 : .collect();
317 0 :
318 0 : // Write SlruDir
319 0 : let slrudir_key = slru_dir_to_key(kind);
320 0 : let segnos: HashSet<u32> = segments
321 0 : .iter()
322 0 : .map(|(_path, segno, _size)| *segno)
323 0 : .collect();
324 0 : let slrudir = SlruSegmentDirectory { segments: segnos };
325 0 : let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
326 0 : self.tasks
327 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
328 0 : slrudir_key,
329 0 : Bytes::from(slrudir_buf),
330 0 : )));
331 :
332 0 : for (segpath, segno, size) in segments {
333 : // SlruSegBlocks for each segment
334 0 : let p = path.join(&segpath);
335 0 : let file_size = size;
336 0 : ensure!(file_size % 8192 == 0);
337 0 : let nblocks = u32::try_from(file_size / 8192)?;
338 0 : let start_key = slru_block_to_key(kind, segno, 0);
339 0 : let end_key = slru_block_to_key(kind, segno, nblocks);
340 0 : debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
341 0 : self.tasks
342 0 : .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
343 0 : start_key..end_key,
344 0 : &p,
345 0 : self.storage.clone(),
346 0 : )));
347 0 :
348 0 : // Followed by SlruSegSize
349 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
350 0 : let segsize_buf = nblocks.to_le_bytes();
351 0 : self.tasks
352 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
353 0 : segsize_key,
354 0 : Bytes::copy_from_slice(&segsize_buf),
355 0 : )));
356 : }
357 0 : Ok(())
358 0 : }
359 : }
360 :
361 : //
362 : // dbdir iteration tools
363 : //
364 :
365 : struct PgDataDir {
366 : pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
367 : }
368 :
369 : struct PgDataDirDb {
370 : pub spcnode: u32,
371 : pub dboid: u32,
372 : pub path: RemotePath,
373 : pub files: Vec<PgDataDirDbFile>,
374 : }
375 :
376 : struct PgDataDirDbFile {
377 : pub path: RemotePath,
378 : pub rel_tag: RelTag,
379 : pub segno: u32,
380 : pub filesize: usize,
381 : // Cummulative size of the given fork, set only for the last segment of that fork
382 : pub nblocks: Option<usize>,
383 : }
384 :
385 : impl PgDataDir {
386 0 : async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
387 0 : let datadir_path = storage.pgdata();
388 0 : // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
389 0 : // Traverse database in increasing oid order
390 0 :
391 0 : let basedir = &datadir_path.join("base");
392 0 : let db_oids: Vec<_> = storage
393 0 : .listdir(basedir)
394 0 : .await?
395 0 : .into_iter()
396 0 : .filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
397 0 : .sorted()
398 0 : .collect();
399 0 : debug!(?db_oids, "found databases");
400 0 : let mut databases = Vec::new();
401 0 : for dboid in db_oids {
402 0 : databases.push(
403 0 : PgDataDirDb::new(
404 0 : storage,
405 0 : &basedir.join(dboid.to_string()),
406 0 : pg_constants::DEFAULTTABLESPACE_OID,
407 0 : dboid,
408 0 : &datadir_path,
409 0 : )
410 0 : .await?,
411 : );
412 : }
413 :
414 : // special case for global catalogs
415 0 : databases.push(
416 0 : PgDataDirDb::new(
417 0 : storage,
418 0 : &datadir_path.join("global"),
419 0 : postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
420 0 : 0,
421 0 : &datadir_path,
422 0 : )
423 0 : .await?,
424 : );
425 :
426 0 : databases.sort_by_key(|db| (db.spcnode, db.dboid));
427 0 :
428 0 : Ok(Self { dbs: databases })
429 0 : }
430 : }
431 :
432 : impl PgDataDirDb {
433 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
434 : async fn new(
435 : storage: &RemoteStorageWrapper,
436 : db_path: &RemotePath,
437 : spcnode: u32,
438 : dboid: u32,
439 : datadir_path: &RemotePath,
440 : ) -> anyhow::Result<Self> {
441 : let mut files: Vec<PgDataDirDbFile> = storage
442 : .listfilesindir(db_path)
443 : .await?
444 : .into_iter()
445 0 : .filter_map(|(path, size)| {
446 0 : debug!(%path, %size, "found file in dbdir");
447 0 : path.object_name().and_then(|name| {
448 0 : // returns (relnode, forknum, segno)
449 0 : parse_relfilename(name).ok().map(|x| (size, x))
450 0 : })
451 0 : })
452 0 : .sorted_by_key(|(_, relfilename)| *relfilename)
453 0 : .map(|(filesize, (relnode, forknum, segno))| {
454 0 : let rel_tag = RelTag {
455 0 : spcnode,
456 0 : dbnode: dboid,
457 0 : relnode,
458 0 : forknum,
459 0 : };
460 0 :
461 0 : let path = datadir_path.join(rel_tag.to_segfile_name(segno));
462 0 : assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error
463 0 : let nblocks = filesize / BLCKSZ as usize;
464 0 :
465 0 : PgDataDirDbFile {
466 0 : path,
467 0 : filesize,
468 0 : rel_tag,
469 0 : segno,
470 0 : nblocks: Some(nblocks), // first non-cummulative sizes
471 0 : }
472 0 : })
473 : .collect();
474 :
475 : // Set cummulative sizes. Do all of that math here, so that later we could easier
476 : // parallelize over segments and know with which segments we need to write relsize
477 : // entry.
478 : let mut cumulative_nblocks: usize = 0;
479 : let mut prev_rel_tag: Option<RelTag> = None;
480 : for i in 0..files.len() {
481 : if prev_rel_tag == Some(files[i].rel_tag) {
482 : cumulative_nblocks += files[i].nblocks.unwrap();
483 : } else {
484 : cumulative_nblocks = files[i].nblocks.unwrap();
485 : }
486 :
487 : files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
488 : Some(cumulative_nblocks)
489 : } else {
490 : None
491 : };
492 :
493 : prev_rel_tag = Some(files[i].rel_tag);
494 : }
495 :
496 : Ok(PgDataDirDb {
497 : files,
498 : path: db_path.clone(),
499 : spcnode,
500 : dboid,
501 : })
502 : }
503 : }
504 :
505 : trait ImportTask {
506 : fn key_range(&self) -> Range<Key>;
507 :
508 0 : fn total_size(&self) -> usize {
509 0 : // TODO: revisit this
510 0 : if is_contiguous_range(&self.key_range()) {
511 0 : contiguous_range_len(&self.key_range()) as usize * 8192
512 : } else {
513 0 : u32::MAX as usize
514 : }
515 0 : }
516 :
517 : async fn doit(
518 : self,
519 : layer_writer: &mut ImageLayerWriter,
520 : ctx: &RequestContext,
521 : ) -> anyhow::Result<usize>;
522 : }
523 :
524 : struct ImportSingleKeyTask {
525 : key: Key,
526 : buf: Bytes,
527 : }
528 :
529 : impl ImportSingleKeyTask {
530 0 : fn new(key: Key, buf: Bytes) -> Self {
531 0 : ImportSingleKeyTask { key, buf }
532 0 : }
533 : }
534 :
535 : impl ImportTask for ImportSingleKeyTask {
536 0 : fn key_range(&self) -> Range<Key> {
537 0 : singleton_range(self.key)
538 0 : }
539 :
540 0 : async fn doit(
541 0 : self,
542 0 : layer_writer: &mut ImageLayerWriter,
543 0 : ctx: &RequestContext,
544 0 : ) -> anyhow::Result<usize> {
545 0 : layer_writer.put_image(self.key, self.buf, ctx).await?;
546 0 : Ok(1)
547 0 : }
548 : }
549 :
550 : struct ImportRelBlocksTask {
551 : shard_identity: ShardIdentity,
552 : key_range: Range<Key>,
553 : path: RemotePath,
554 : storage: RemoteStorageWrapper,
555 : }
556 :
557 : impl ImportRelBlocksTask {
558 0 : fn new(
559 0 : shard_identity: ShardIdentity,
560 0 : key_range: Range<Key>,
561 0 : path: &RemotePath,
562 0 : storage: RemoteStorageWrapper,
563 0 : ) -> Self {
564 0 : ImportRelBlocksTask {
565 0 : shard_identity,
566 0 : key_range,
567 0 : path: path.clone(),
568 0 : storage,
569 0 : }
570 0 : }
571 : }
572 :
573 : impl ImportTask for ImportRelBlocksTask {
574 0 : fn key_range(&self) -> Range<Key> {
575 0 : self.key_range.clone()
576 0 : }
577 :
578 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
579 : async fn doit(
580 : self,
581 : layer_writer: &mut ImageLayerWriter,
582 : ctx: &RequestContext,
583 : ) -> anyhow::Result<usize> {
584 : debug!("Importing relation file");
585 :
586 : let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
587 : let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
588 : assert_eq!(rel_tag, rel_tag_end);
589 :
590 : let ranges = (start_blk..end_blk)
591 : .enumerate()
592 0 : .filter_map(|(i, blknum)| {
593 0 : let key = rel_block_to_key(rel_tag, blknum);
594 0 : if self.shard_identity.is_key_disposable(&key) {
595 0 : return None;
596 0 : }
597 0 : let file_offset = i.checked_mul(8192).unwrap();
598 0 : Some((
599 0 : vec![key],
600 0 : file_offset,
601 0 : file_offset.checked_add(8192).unwrap(),
602 0 : ))
603 0 : })
604 0 : .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
605 0 : assert_eq!(key.len(), 1);
606 0 : assert!(!acc.is_empty());
607 0 : assert!(acc_end > acc_start);
608 0 : if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
609 0 : acc.push(key.pop().unwrap());
610 0 : Ok((acc, acc_start, end))
611 : } else {
612 0 : Err(((acc, acc_start, acc_end), (key, start, end)))
613 : }
614 0 : });
615 :
616 : let mut nimages = 0;
617 : for (keys, range_start, range_end) in ranges {
618 : let range_buf = self
619 : .storage
620 : .get_range(&self.path, range_start.into_u64(), range_end.into_u64())
621 : .await?;
622 : let mut buf = Bytes::from(range_buf);
623 : // TODO: batched writes
624 : for key in keys {
625 : let image = buf.split_to(8192);
626 : layer_writer.put_image(key, image, ctx).await?;
627 : nimages += 1;
628 : }
629 : }
630 :
631 : Ok(nimages)
632 : }
633 : }
634 :
635 : struct ImportSlruBlocksTask {
636 : key_range: Range<Key>,
637 : path: RemotePath,
638 : storage: RemoteStorageWrapper,
639 : }
640 :
641 : impl ImportSlruBlocksTask {
642 0 : fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
643 0 : ImportSlruBlocksTask {
644 0 : key_range,
645 0 : path: path.clone(),
646 0 : storage,
647 0 : }
648 0 : }
649 : }
650 :
651 : impl ImportTask for ImportSlruBlocksTask {
652 0 : fn key_range(&self) -> Range<Key> {
653 0 : self.key_range.clone()
654 0 : }
655 :
656 0 : async fn doit(
657 0 : self,
658 0 : layer_writer: &mut ImageLayerWriter,
659 0 : ctx: &RequestContext,
660 0 : ) -> anyhow::Result<usize> {
661 0 : debug!("Importing SLRU segment file {}", self.path);
662 0 : let buf = self.storage.get(&self.path).await?;
663 :
664 0 : let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
665 0 : let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
666 0 : let mut blknum = start_blk;
667 0 : let mut nimages = 0;
668 0 : let mut file_offset = 0;
669 0 : while blknum < end_blk {
670 0 : let key = slru_block_to_key(kind, segno, blknum);
671 0 : let buf = &buf[file_offset..(file_offset + 8192)];
672 0 : file_offset += 8192;
673 0 : layer_writer
674 0 : .put_image(key, Bytes::copy_from_slice(buf), ctx)
675 0 : .await?;
676 0 : nimages += 1;
677 0 : blknum += 1;
678 : }
679 0 : Ok(nimages)
680 0 : }
681 : }
682 :
683 : enum AnyImportTask {
684 : SingleKey(ImportSingleKeyTask),
685 : RelBlocks(ImportRelBlocksTask),
686 : SlruBlocks(ImportSlruBlocksTask),
687 : }
688 :
689 : impl ImportTask for AnyImportTask {
690 0 : fn key_range(&self) -> Range<Key> {
691 0 : match self {
692 0 : Self::SingleKey(t) => t.key_range(),
693 0 : Self::RelBlocks(t) => t.key_range(),
694 0 : Self::SlruBlocks(t) => t.key_range(),
695 : }
696 0 : }
697 : /// returns the number of images put into the `layer_writer`
698 0 : async fn doit(
699 0 : self,
700 0 : layer_writer: &mut ImageLayerWriter,
701 0 : ctx: &RequestContext,
702 0 : ) -> anyhow::Result<usize> {
703 0 : match self {
704 0 : Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
705 0 : Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
706 0 : Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
707 : }
708 0 : }
709 : }
710 :
711 : impl From<ImportSingleKeyTask> for AnyImportTask {
712 0 : fn from(t: ImportSingleKeyTask) -> Self {
713 0 : Self::SingleKey(t)
714 0 : }
715 : }
716 :
717 : impl From<ImportRelBlocksTask> for AnyImportTask {
718 0 : fn from(t: ImportRelBlocksTask) -> Self {
719 0 : Self::RelBlocks(t)
720 0 : }
721 : }
722 :
723 : impl From<ImportSlruBlocksTask> for AnyImportTask {
724 0 : fn from(t: ImportSlruBlocksTask) -> Self {
725 0 : Self::SlruBlocks(t)
726 0 : }
727 : }
728 :
729 : struct ChunkProcessingJob {
730 : timeline: Arc<Timeline>,
731 : range: Range<Key>,
732 : tasks: Vec<AnyImportTask>,
733 :
734 : pgdata_lsn: Lsn,
735 : }
736 :
737 : impl ChunkProcessingJob {
738 0 : fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
739 0 : assert!(env.pgdata_lsn.is_valid());
740 0 : Self {
741 0 : timeline: env.timeline.clone(),
742 0 : range,
743 0 : tasks,
744 0 : pgdata_lsn: env.pgdata_lsn,
745 0 : }
746 0 : }
747 :
748 0 : async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
749 0 : let mut writer = ImageLayerWriter::new(
750 0 : self.timeline.conf,
751 0 : self.timeline.timeline_id,
752 0 : self.timeline.tenant_shard_id,
753 0 : &self.range,
754 0 : self.pgdata_lsn,
755 0 : ctx,
756 0 : )
757 0 : .await?;
758 :
759 0 : let mut nimages = 0;
760 0 : for task in self.tasks {
761 0 : nimages += task.doit(&mut writer, ctx).await?;
762 : }
763 :
764 0 : let resident_layer = if nimages > 0 {
765 0 : let (desc, path) = writer.finish(ctx).await?;
766 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
767 : } else {
768 : // dropping the writer cleans up
769 0 : return Ok(());
770 : };
771 :
772 : // this is sharing the same code as create_image_layers
773 0 : let mut guard = self.timeline.layers.write().await;
774 0 : guard
775 0 : .open_mut()?
776 0 : .track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
777 0 : crate::tenant::timeline::drop_wlock(guard);
778 0 :
779 0 : // Schedule the layer for upload but don't add barriers such as
780 0 : // wait for completion or index upload, so we don't inhibit upload parallelism.
781 0 : // TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
782 0 : // TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
783 0 : self.timeline
784 0 : .remote_client
785 0 : .schedule_layer_file_upload(resident_layer)?;
786 :
787 0 : Ok(())
788 0 : }
789 : }
|