Line data Source code
1 : //! Import a PGDATA directory into an empty root timeline.
2 : //!
3 : //! This module is adapted hackathon code by Heikki and Stas.
4 : //! Other code in the parent module was written by Christian as part of a customer PoC.
5 : //!
6 : //! The hackathon code was producing image layer files as a free-standing program.
7 : //!
8 : //! It has been modified to
9 : //! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
10 : //! - => sharding-awareness: produce image layers with only the data relevant for this shard
11 : //! - => S3 as the source for the PGDATA instead of local filesystem
12 : //!
13 : //! TODOs before productionization:
14 : //! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
15 : //! => produced image layers likely too small.
16 : //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
17 : //! - asserts / unwraps need to be replaced with errors
18 : //! - don't trust remote objects will be small (=prevent OOMs in those cases)
19 : //! - limit all in-memory buffers in size, or download to disk and read from there
20 : //! - limit task concurrency
21 : //! - generally play nice with other tenants in the system
22 : //! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
23 : //! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
24 : //! - integrate with layer eviction system
25 : //! - audit for Tenant::cancel nor Timeline::cancel responsivity
26 : //! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
27 : //!
28 : //! An incomplete set of TODOs from the Hackathon:
29 : //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
30 :
31 : use std::collections::HashSet;
32 : use std::hash::{Hash, Hasher};
33 : use std::ops::Range;
34 : use std::sync::Arc;
35 :
36 : use anyhow::ensure;
37 : use bytes::Bytes;
38 : use futures::stream::FuturesOrdered;
39 : use itertools::Itertools;
40 : use pageserver_api::config::TimelineImportConfig;
41 : use pageserver_api::key::{
42 : CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
43 : rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
44 : slru_segment_size_to_key,
45 : };
46 : use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range};
47 : use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
48 : use pageserver_api::reltag::{RelTag, SlruKind};
49 : use pageserver_api::shard::ShardIdentity;
50 : use postgres_ffi::relfile_utils::parse_relfilename;
51 : use postgres_ffi::{BLCKSZ, pg_constants};
52 : use remote_storage::RemotePath;
53 : use tokio::sync::Semaphore;
54 : use tokio_stream::StreamExt;
55 : use tracing::{debug, instrument};
56 : use utils::bin_ser::BeSer;
57 : use utils::lsn::Lsn;
58 : use utils::pausable_failpoint;
59 :
60 : use super::Timeline;
61 : use super::importbucket_client::{ControlFile, RemoteStorageWrapper};
62 : use crate::assert_u64_eq_usize::UsizeIsU64;
63 : use crate::context::{DownloadBehavior, RequestContext};
64 : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
65 : use crate::pgdatadir_mapping::{
66 : DbDirectory, RelDirectory, SlruSegmentDirectory, TwoPhaseDirectory,
67 : };
68 : use crate::task_mgr::TaskKind;
69 : use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer};
70 :
71 0 : pub async fn run(
72 0 : timeline: Arc<Timeline>,
73 0 : control_file: ControlFile,
74 0 : storage: RemoteStorageWrapper,
75 0 : import_progress: Option<ShardImportProgress>,
76 0 : ctx: &RequestContext,
77 0 : ) -> anyhow::Result<()> {
78 0 : // Match how we run the import based on the progress version.
79 0 : // If there's no import progress, it means that this is a new import
80 0 : // and we can use whichever version we want.
81 0 : match import_progress {
82 0 : Some(ShardImportProgress::V1(progress)) => {
83 0 : run_v1(timeline, control_file, storage, Some(progress), ctx).await
84 : }
85 0 : None => run_v1(timeline, control_file, storage, None, ctx).await,
86 : }
87 0 : }
88 :
89 0 : async fn run_v1(
90 0 : timeline: Arc<Timeline>,
91 0 : control_file: ControlFile,
92 0 : storage: RemoteStorageWrapper,
93 0 : import_progress: Option<ShardImportProgressV1>,
94 0 : ctx: &RequestContext,
95 0 : ) -> anyhow::Result<()> {
96 0 : let planner = Planner {
97 0 : control_file,
98 0 : storage: storage.clone(),
99 0 : shard: timeline.shard_identity,
100 0 : tasks: Vec::default(),
101 0 : };
102 0 :
103 0 : let import_config = &timeline.conf.timeline_import_config;
104 0 : let plan = planner.plan(import_config).await?;
105 :
106 : // Hash the plan and compare with the hash of the plan we got back from the storage controller.
107 : // If the two match, it means that the planning stage had the same output.
108 : //
109 : // This is not intended to be a cryptographically secure hash.
110 : const SEED: u64 = 42;
111 0 : let mut hasher = twox_hash::XxHash64::with_seed(SEED);
112 0 : plan.hash(&mut hasher);
113 0 : let plan_hash = hasher.finish();
114 :
115 0 : if let Some(progress) = &import_progress {
116 0 : if plan_hash != progress.import_plan_hash {
117 0 : anyhow::bail!("Import plan does not match storcon metadata");
118 0 : }
119 0 :
120 0 : // Handle collisions on jobs of unequal length
121 0 : if progress.jobs != plan.jobs.len() {
122 0 : anyhow::bail!("Import plan job length does not match storcon metadata")
123 0 : }
124 0 : }
125 :
126 0 : pausable_failpoint!("import-timeline-pre-execute-pausable");
127 :
128 0 : let start_from_job_idx = import_progress.map(|progress| progress.completed);
129 0 : plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx)
130 0 : .await
131 0 : }
132 :
133 : struct Planner {
134 : control_file: ControlFile,
135 : storage: RemoteStorageWrapper,
136 : shard: ShardIdentity,
137 : tasks: Vec<AnyImportTask>,
138 : }
139 :
140 : #[derive(Hash)]
141 : struct Plan {
142 : jobs: Vec<ChunkProcessingJob>,
143 : // Included here such that it ends up in the hash for the plan
144 : shard: ShardIdentity,
145 : }
146 :
147 : impl Planner {
148 : /// Creates an import plan
149 : ///
150 : /// This function is and must remain pure: given the same input, it will generate the same import plan.
151 0 : async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
152 0 : let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
153 :
154 0 : let datadir = PgDataDir::new(&self.storage).await?;
155 :
156 : // Import dbdir (00:00:00 keyspace)
157 : // This is just constructed here, but will be written to the image layer in the first call to import_db()
158 0 : let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
159 0 : dbdirs: datadir
160 0 : .dbs
161 0 : .iter()
162 0 : .map(|db| ((db.spcnode, db.dboid), true))
163 0 : .collect(),
164 0 : })?);
165 0 : self.tasks
166 0 : .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
167 :
168 : // Import databases (00:spcnode:dbnode keyspace for each db)
169 0 : for db in datadir.dbs {
170 0 : self.import_db(&db).await?;
171 : }
172 :
173 : // Import SLRUs
174 0 : if self.shard.is_shard_zero() {
175 : // pg_xact (01:00 keyspace)
176 0 : self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
177 0 : .await?;
178 : // pg_multixact/members (01:01 keyspace)
179 0 : self.import_slru(
180 0 : SlruKind::MultiXactMembers,
181 0 : &self.storage.pgdata().join("pg_multixact/members"),
182 0 : )
183 0 : .await?;
184 : // pg_multixact/offsets (01:02 keyspace)
185 0 : self.import_slru(
186 0 : SlruKind::MultiXactOffsets,
187 0 : &self.storage.pgdata().join("pg_multixact/offsets"),
188 0 : )
189 0 : .await?;
190 0 : }
191 :
192 : // Import pg_twophase.
193 : // TODO: as empty
194 0 : let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
195 0 : xids: HashSet::new(),
196 0 : })?;
197 0 : self.tasks
198 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
199 0 : TWOPHASEDIR_KEY,
200 0 : Bytes::from(twophasedir_buf),
201 0 : )));
202 0 :
203 0 : // Controlfile, checkpoint
204 0 : self.tasks
205 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
206 0 : CONTROLFILE_KEY,
207 0 : self.control_file.control_file_buf().clone(),
208 0 : )));
209 :
210 0 : let checkpoint_buf = self
211 0 : .control_file
212 0 : .control_file_data()
213 0 : .checkPointCopy
214 0 : .encode()?;
215 0 : self.tasks
216 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
217 0 : CHECKPOINT_KEY,
218 0 : checkpoint_buf,
219 0 : )));
220 0 :
221 0 : // Assigns parts of key space to later parallel jobs
222 0 : let mut last_end_key = Key::MIN;
223 0 : let mut current_chunk = Vec::new();
224 0 : let mut current_chunk_size: usize = 0;
225 0 : let mut jobs = Vec::new();
226 0 : for task in std::mem::take(&mut self.tasks).into_iter() {
227 0 : if current_chunk_size + task.total_size()
228 0 : > import_config.import_job_soft_size_limit.into()
229 0 : {
230 0 : let key_range = last_end_key..task.key_range().start;
231 0 : jobs.push(ChunkProcessingJob::new(
232 0 : key_range.clone(),
233 0 : std::mem::take(&mut current_chunk),
234 0 : pgdata_lsn,
235 0 : ));
236 0 : last_end_key = key_range.end;
237 0 : current_chunk_size = 0;
238 0 : }
239 0 : current_chunk_size += task.total_size();
240 0 : current_chunk.push(task);
241 : }
242 0 : jobs.push(ChunkProcessingJob::new(
243 0 : last_end_key..Key::MAX,
244 0 : current_chunk,
245 0 : pgdata_lsn,
246 0 : ));
247 0 :
248 0 : Ok(Plan {
249 0 : jobs,
250 0 : shard: self.shard,
251 0 : })
252 0 : }
253 :
254 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
255 : async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
256 : debug!("start");
257 : scopeguard::defer! {
258 : debug!("return");
259 : }
260 :
261 : // Import relmap (00:spcnode:dbnode:00:*:00)
262 : let relmap_key = relmap_file_key(db.spcnode, db.dboid);
263 : debug!("Constructing relmap entry, key {relmap_key}");
264 : let relmap_path = db.path.join("pg_filenode.map");
265 : let relmap_buf = self.storage.get(&relmap_path).await?;
266 : self.tasks
267 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
268 : relmap_key, relmap_buf,
269 : )));
270 :
271 : // Import reldir (00:spcnode:dbnode:00:*:01)
272 : let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
273 : debug!("Constructing reldirs entry, key {reldir_key}");
274 : let reldir_buf = RelDirectory::ser(&RelDirectory {
275 : rels: db
276 : .files
277 : .iter()
278 0 : .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
279 : .collect(),
280 : })?;
281 : self.tasks
282 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
283 : reldir_key,
284 : Bytes::from(reldir_buf),
285 : )));
286 :
287 : // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
288 : // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
289 : for file in &db.files {
290 : debug!(%file.path, %file.filesize, "importing file");
291 : let len = file.filesize;
292 : ensure!(len % 8192 == 0);
293 : let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
294 : let start_key = rel_block_to_key(file.rel_tag, start_blk);
295 : let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
296 : self.tasks
297 : .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
298 : self.shard,
299 : start_key..end_key,
300 : &file.path,
301 : self.storage.clone(),
302 : )));
303 :
304 : // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
305 : if let Some(nblocks) = file.nblocks {
306 : let size_key = rel_size_to_key(file.rel_tag);
307 : //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
308 : let buf = nblocks.to_le_bytes();
309 : self.tasks
310 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
311 : size_key,
312 : Bytes::from(buf.to_vec()),
313 : )));
314 : }
315 : }
316 :
317 : Ok(())
318 : }
319 :
320 0 : async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
321 0 : assert!(self.shard.is_shard_zero());
322 :
323 0 : let segments = self.storage.listfilesindir(path).await?;
324 0 : let segments: Vec<(String, u32, usize)> = segments
325 0 : .into_iter()
326 0 : .filter_map(|(path, size)| {
327 0 : let filename = path.object_name()?;
328 0 : let segno = u32::from_str_radix(filename, 16).ok()?;
329 0 : Some((filename.to_string(), segno, size))
330 0 : })
331 0 : .collect();
332 0 :
333 0 : // Write SlruDir
334 0 : let slrudir_key = slru_dir_to_key(kind);
335 0 : let segnos: HashSet<u32> = segments
336 0 : .iter()
337 0 : .map(|(_path, segno, _size)| *segno)
338 0 : .collect();
339 0 : let slrudir = SlruSegmentDirectory { segments: segnos };
340 0 : let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
341 0 : self.tasks
342 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
343 0 : slrudir_key,
344 0 : Bytes::from(slrudir_buf),
345 0 : )));
346 :
347 0 : for (segpath, segno, size) in segments {
348 : // SlruSegBlocks for each segment
349 0 : let p = path.join(&segpath);
350 0 : let file_size = size;
351 0 : ensure!(file_size % 8192 == 0);
352 0 : let nblocks = u32::try_from(file_size / 8192)?;
353 0 : let start_key = slru_block_to_key(kind, segno, 0);
354 0 : let end_key = slru_block_to_key(kind, segno, nblocks);
355 0 : debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
356 0 : self.tasks
357 0 : .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
358 0 : start_key..end_key,
359 0 : &p,
360 0 : self.storage.clone(),
361 0 : )));
362 0 :
363 0 : // Followed by SlruSegSize
364 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
365 0 : let segsize_buf = nblocks.to_le_bytes();
366 0 : self.tasks
367 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
368 0 : segsize_key,
369 0 : Bytes::copy_from_slice(&segsize_buf),
370 0 : )));
371 : }
372 0 : Ok(())
373 0 : }
374 : }
375 :
376 : impl Plan {
377 0 : async fn execute(
378 0 : self,
379 0 : timeline: Arc<Timeline>,
380 0 : start_after_job_idx: Option<usize>,
381 0 : import_plan_hash: u64,
382 0 : import_config: &TimelineImportConfig,
383 0 : ctx: &RequestContext,
384 0 : ) -> anyhow::Result<()> {
385 0 : let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &timeline.cancel);
386 0 :
387 0 : let mut work = FuturesOrdered::new();
388 0 : let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
389 0 :
390 0 : let jobs_in_plan = self.jobs.len();
391 0 :
392 0 : let mut jobs = self
393 0 : .jobs
394 0 : .into_iter()
395 0 : .enumerate()
396 0 : .map(|(idx, job)| (idx + 1, job))
397 0 : .filter(|(idx, _job)| {
398 : // Filter out any jobs that have been done already
399 0 : if let Some(start_after) = start_after_job_idx {
400 0 : *idx > start_after
401 : } else {
402 0 : true
403 : }
404 0 : })
405 0 : .peekable();
406 0 :
407 0 : let mut last_completed_job_idx = start_after_job_idx.unwrap_or(0);
408 0 : let checkpoint_every: usize = import_config.import_job_checkpoint_threshold.into();
409 :
410 : // Run import jobs concurrently up to the limit specified by the pageserver configuration.
411 : // Note that we process completed futures in the oreder of insertion. This will be the
412 : // building block for resuming imports across pageserver restarts or tenant migrations.
413 0 : while last_completed_job_idx < jobs_in_plan {
414 0 : tokio::select! {
415 0 : permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
416 0 : let permit = permit.expect("never closed");
417 0 : let (job_idx, job) = jobs.next().expect("we peeked");
418 0 :
419 0 : let job_timeline = timeline.clone();
420 0 : let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
421 0 :
422 0 : work.push_back(tokio::task::spawn(async move {
423 0 : let _permit = permit;
424 0 : let res = job.run(job_timeline, &ctx).await;
425 0 : (job_idx, res)
426 0 : }));
427 0 : },
428 0 : maybe_complete_job_idx = work.next() => {
429 0 : match maybe_complete_job_idx {
430 0 : Some(Ok((job_idx, res))) => {
431 0 : assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
432 :
433 0 : res?;
434 0 : last_completed_job_idx = job_idx;
435 0 :
436 0 : if last_completed_job_idx % checkpoint_every == 0 {
437 0 : let progress = ShardImportProgressV1 {
438 0 : jobs: jobs_in_plan,
439 0 : completed: last_completed_job_idx,
440 0 : import_plan_hash,
441 0 : };
442 0 :
443 0 : storcon_client.put_timeline_import_status(
444 0 : timeline.tenant_shard_id,
445 0 : timeline.timeline_id,
446 0 : timeline.generation,
447 0 : ShardImportStatus::InProgress(Some(ShardImportProgress::V1(progress)))
448 0 : )
449 0 : .await
450 0 : .map_err(|_err| {
451 0 : anyhow::anyhow!("Shut down while putting timeline import status")
452 0 : })?;
453 0 : }
454 : },
455 : Some(Err(_)) => {
456 0 : anyhow::bail!(
457 0 : "import job panicked or cancelled"
458 0 : );
459 : }
460 0 : None => {}
461 : }
462 : }
463 : }
464 : }
465 :
466 0 : Ok(())
467 0 : }
468 : }
469 :
470 : //
471 : // dbdir iteration tools
472 : //
473 :
474 : struct PgDataDir {
475 : pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
476 : }
477 :
478 : struct PgDataDirDb {
479 : pub spcnode: u32,
480 : pub dboid: u32,
481 : pub path: RemotePath,
482 : pub files: Vec<PgDataDirDbFile>,
483 : }
484 :
485 : struct PgDataDirDbFile {
486 : pub path: RemotePath,
487 : pub rel_tag: RelTag,
488 : pub segno: u32,
489 : pub filesize: usize,
490 : // Cummulative size of the given fork, set only for the last segment of that fork
491 : pub nblocks: Option<usize>,
492 : }
493 :
494 : impl PgDataDir {
495 0 : async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
496 0 : let datadir_path = storage.pgdata();
497 0 : // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
498 0 : // Traverse database in increasing oid order
499 0 :
500 0 : let basedir = &datadir_path.join("base");
501 0 : let db_oids: Vec<_> = storage
502 0 : .listdir(basedir)
503 0 : .await?
504 0 : .into_iter()
505 0 : .filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
506 0 : .sorted()
507 0 : .collect();
508 0 : debug!(?db_oids, "found databases");
509 0 : let mut databases = Vec::new();
510 0 : for dboid in db_oids {
511 0 : databases.push(
512 0 : PgDataDirDb::new(
513 0 : storage,
514 0 : &basedir.join(dboid.to_string()),
515 0 : pg_constants::DEFAULTTABLESPACE_OID,
516 0 : dboid,
517 0 : &datadir_path,
518 0 : )
519 0 : .await?,
520 : );
521 : }
522 :
523 : // special case for global catalogs
524 0 : databases.push(
525 0 : PgDataDirDb::new(
526 0 : storage,
527 0 : &datadir_path.join("global"),
528 0 : postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
529 0 : 0,
530 0 : &datadir_path,
531 0 : )
532 0 : .await?,
533 : );
534 :
535 0 : databases.sort_by_key(|db| (db.spcnode, db.dboid));
536 0 :
537 0 : Ok(Self { dbs: databases })
538 0 : }
539 : }
540 :
541 : impl PgDataDirDb {
542 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
543 : async fn new(
544 : storage: &RemoteStorageWrapper,
545 : db_path: &RemotePath,
546 : spcnode: u32,
547 : dboid: u32,
548 : datadir_path: &RemotePath,
549 : ) -> anyhow::Result<Self> {
550 : let mut files: Vec<PgDataDirDbFile> = storage
551 : .listfilesindir(db_path)
552 : .await?
553 : .into_iter()
554 0 : .filter_map(|(path, size)| {
555 0 : debug!(%path, %size, "found file in dbdir");
556 0 : path.object_name().and_then(|name| {
557 0 : // returns (relnode, forknum, segno)
558 0 : parse_relfilename(name).ok().map(|x| (size, x))
559 0 : })
560 0 : })
561 0 : .sorted_by_key(|(_, relfilename)| *relfilename)
562 0 : .map(|(filesize, (relnode, forknum, segno))| {
563 0 : let rel_tag = RelTag {
564 0 : spcnode,
565 0 : dbnode: dboid,
566 0 : relnode,
567 0 : forknum,
568 0 : };
569 0 :
570 0 : let path = datadir_path.join(rel_tag.to_segfile_name(segno));
571 0 : assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error
572 0 : let nblocks = filesize / BLCKSZ as usize;
573 0 :
574 0 : PgDataDirDbFile {
575 0 : path,
576 0 : filesize,
577 0 : rel_tag,
578 0 : segno,
579 0 : nblocks: Some(nblocks), // first non-cummulative sizes
580 0 : }
581 0 : })
582 : .collect();
583 :
584 : // Set cummulative sizes. Do all of that math here, so that later we could easier
585 : // parallelize over segments and know with which segments we need to write relsize
586 : // entry.
587 : let mut cumulative_nblocks: usize = 0;
588 : let mut prev_rel_tag: Option<RelTag> = None;
589 : for i in 0..files.len() {
590 : if prev_rel_tag == Some(files[i].rel_tag) {
591 : cumulative_nblocks += files[i].nblocks.unwrap();
592 : } else {
593 : cumulative_nblocks = files[i].nblocks.unwrap();
594 : }
595 :
596 : files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
597 : Some(cumulative_nblocks)
598 : } else {
599 : None
600 : };
601 :
602 : prev_rel_tag = Some(files[i].rel_tag);
603 : }
604 :
605 : Ok(PgDataDirDb {
606 : files,
607 : path: db_path.clone(),
608 : spcnode,
609 : dboid,
610 : })
611 : }
612 : }
613 :
614 : trait ImportTask {
615 : fn key_range(&self) -> Range<Key>;
616 :
617 0 : fn total_size(&self) -> usize {
618 0 : // TODO: revisit this
619 0 : if is_contiguous_range(&self.key_range()) {
620 0 : contiguous_range_len(&self.key_range()) as usize * 8192
621 : } else {
622 0 : u32::MAX as usize
623 : }
624 0 : }
625 :
626 : async fn doit(
627 : self,
628 : layer_writer: &mut ImageLayerWriter,
629 : ctx: &RequestContext,
630 : ) -> anyhow::Result<usize>;
631 : }
632 :
633 : struct ImportSingleKeyTask {
634 : key: Key,
635 : buf: Bytes,
636 : }
637 :
638 : impl Hash for ImportSingleKeyTask {
639 0 : fn hash<H: Hasher>(&self, state: &mut H) {
640 0 : let ImportSingleKeyTask { key, buf } = self;
641 0 :
642 0 : key.hash(state);
643 0 : buf.hash(state);
644 0 : }
645 : }
646 :
647 : impl ImportSingleKeyTask {
648 0 : fn new(key: Key, buf: Bytes) -> Self {
649 0 : ImportSingleKeyTask { key, buf }
650 0 : }
651 : }
652 :
653 : impl ImportTask for ImportSingleKeyTask {
654 0 : fn key_range(&self) -> Range<Key> {
655 0 : singleton_range(self.key)
656 0 : }
657 :
658 0 : async fn doit(
659 0 : self,
660 0 : layer_writer: &mut ImageLayerWriter,
661 0 : ctx: &RequestContext,
662 0 : ) -> anyhow::Result<usize> {
663 0 : layer_writer.put_image(self.key, self.buf, ctx).await?;
664 0 : Ok(1)
665 0 : }
666 : }
667 :
668 : struct ImportRelBlocksTask {
669 : shard_identity: ShardIdentity,
670 : key_range: Range<Key>,
671 : path: RemotePath,
672 : storage: RemoteStorageWrapper,
673 : }
674 :
675 : impl Hash for ImportRelBlocksTask {
676 0 : fn hash<H: Hasher>(&self, state: &mut H) {
677 0 : let ImportRelBlocksTask {
678 0 : shard_identity: _,
679 0 : key_range,
680 0 : path,
681 0 : storage: _,
682 0 : } = self;
683 0 :
684 0 : key_range.hash(state);
685 0 : path.hash(state);
686 0 : }
687 : }
688 :
689 : impl ImportRelBlocksTask {
690 0 : fn new(
691 0 : shard_identity: ShardIdentity,
692 0 : key_range: Range<Key>,
693 0 : path: &RemotePath,
694 0 : storage: RemoteStorageWrapper,
695 0 : ) -> Self {
696 0 : ImportRelBlocksTask {
697 0 : shard_identity,
698 0 : key_range,
699 0 : path: path.clone(),
700 0 : storage,
701 0 : }
702 0 : }
703 : }
704 :
705 : impl ImportTask for ImportRelBlocksTask {
706 0 : fn key_range(&self) -> Range<Key> {
707 0 : self.key_range.clone()
708 0 : }
709 :
710 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
711 : async fn doit(
712 : self,
713 : layer_writer: &mut ImageLayerWriter,
714 : ctx: &RequestContext,
715 : ) -> anyhow::Result<usize> {
716 : debug!("Importing relation file");
717 :
718 : let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
719 : let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
720 : assert_eq!(rel_tag, rel_tag_end);
721 :
722 : let ranges = (start_blk..end_blk)
723 : .enumerate()
724 0 : .filter_map(|(i, blknum)| {
725 0 : let key = rel_block_to_key(rel_tag, blknum);
726 0 : if self.shard_identity.is_key_disposable(&key) {
727 0 : return None;
728 0 : }
729 0 : let file_offset = i.checked_mul(8192).unwrap();
730 0 : Some((
731 0 : vec![key],
732 0 : file_offset,
733 0 : file_offset.checked_add(8192).unwrap(),
734 0 : ))
735 0 : })
736 0 : .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
737 0 : assert_eq!(key.len(), 1);
738 0 : assert!(!acc.is_empty());
739 0 : assert!(acc_end > acc_start);
740 0 : if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
741 0 : acc.push(key.pop().unwrap());
742 0 : Ok((acc, acc_start, end))
743 : } else {
744 0 : Err(((acc, acc_start, acc_end), (key, start, end)))
745 : }
746 0 : });
747 :
748 : let mut nimages = 0;
749 : for (keys, range_start, range_end) in ranges {
750 : let range_buf = self
751 : .storage
752 : .get_range(&self.path, range_start.into_u64(), range_end.into_u64())
753 : .await?;
754 : let mut buf = Bytes::from(range_buf);
755 : // TODO: batched writes
756 : for key in keys {
757 : let image = buf.split_to(8192);
758 : layer_writer.put_image(key, image, ctx).await?;
759 : nimages += 1;
760 : }
761 : }
762 :
763 : Ok(nimages)
764 : }
765 : }
766 :
767 : struct ImportSlruBlocksTask {
768 : key_range: Range<Key>,
769 : path: RemotePath,
770 : storage: RemoteStorageWrapper,
771 : }
772 :
773 : impl Hash for ImportSlruBlocksTask {
774 0 : fn hash<H: Hasher>(&self, state: &mut H) {
775 0 : let ImportSlruBlocksTask {
776 0 : key_range,
777 0 : path,
778 0 : storage: _,
779 0 : } = self;
780 0 :
781 0 : key_range.hash(state);
782 0 : path.hash(state);
783 0 : }
784 : }
785 :
786 : impl ImportSlruBlocksTask {
787 0 : fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
788 0 : ImportSlruBlocksTask {
789 0 : key_range,
790 0 : path: path.clone(),
791 0 : storage,
792 0 : }
793 0 : }
794 : }
795 :
796 : impl ImportTask for ImportSlruBlocksTask {
797 0 : fn key_range(&self) -> Range<Key> {
798 0 : self.key_range.clone()
799 0 : }
800 :
801 0 : async fn doit(
802 0 : self,
803 0 : layer_writer: &mut ImageLayerWriter,
804 0 : ctx: &RequestContext,
805 0 : ) -> anyhow::Result<usize> {
806 0 : debug!("Importing SLRU segment file {}", self.path);
807 0 : let buf = self.storage.get(&self.path).await?;
808 :
809 0 : let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
810 0 : let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
811 0 : let mut blknum = start_blk;
812 0 : let mut nimages = 0;
813 0 : let mut file_offset = 0;
814 0 : while blknum < end_blk {
815 0 : let key = slru_block_to_key(kind, segno, blknum);
816 0 : let buf = &buf[file_offset..(file_offset + 8192)];
817 0 : file_offset += 8192;
818 0 : layer_writer
819 0 : .put_image(key, Bytes::copy_from_slice(buf), ctx)
820 0 : .await?;
821 0 : nimages += 1;
822 0 : blknum += 1;
823 : }
824 0 : Ok(nimages)
825 0 : }
826 : }
827 :
828 : #[derive(Hash)]
829 : enum AnyImportTask {
830 : SingleKey(ImportSingleKeyTask),
831 : RelBlocks(ImportRelBlocksTask),
832 : SlruBlocks(ImportSlruBlocksTask),
833 : }
834 :
835 : impl ImportTask for AnyImportTask {
836 0 : fn key_range(&self) -> Range<Key> {
837 0 : match self {
838 0 : Self::SingleKey(t) => t.key_range(),
839 0 : Self::RelBlocks(t) => t.key_range(),
840 0 : Self::SlruBlocks(t) => t.key_range(),
841 : }
842 0 : }
843 : /// returns the number of images put into the `layer_writer`
844 0 : async fn doit(
845 0 : self,
846 0 : layer_writer: &mut ImageLayerWriter,
847 0 : ctx: &RequestContext,
848 0 : ) -> anyhow::Result<usize> {
849 0 : match self {
850 0 : Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
851 0 : Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
852 0 : Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
853 : }
854 0 : }
855 : }
856 :
857 : impl From<ImportSingleKeyTask> for AnyImportTask {
858 0 : fn from(t: ImportSingleKeyTask) -> Self {
859 0 : Self::SingleKey(t)
860 0 : }
861 : }
862 :
863 : impl From<ImportRelBlocksTask> for AnyImportTask {
864 0 : fn from(t: ImportRelBlocksTask) -> Self {
865 0 : Self::RelBlocks(t)
866 0 : }
867 : }
868 :
869 : impl From<ImportSlruBlocksTask> for AnyImportTask {
870 0 : fn from(t: ImportSlruBlocksTask) -> Self {
871 0 : Self::SlruBlocks(t)
872 0 : }
873 : }
874 :
875 : #[derive(Hash)]
876 : struct ChunkProcessingJob {
877 : range: Range<Key>,
878 : tasks: Vec<AnyImportTask>,
879 :
880 : pgdata_lsn: Lsn,
881 : }
882 :
883 : impl ChunkProcessingJob {
884 0 : fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
885 0 : assert!(pgdata_lsn.is_valid());
886 0 : Self {
887 0 : range,
888 0 : tasks,
889 0 : pgdata_lsn,
890 0 : }
891 0 : }
892 :
893 0 : async fn run(self, timeline: Arc<Timeline>, ctx: &RequestContext) -> anyhow::Result<()> {
894 0 : let mut writer = ImageLayerWriter::new(
895 0 : timeline.conf,
896 0 : timeline.timeline_id,
897 0 : timeline.tenant_shard_id,
898 0 : &self.range,
899 0 : self.pgdata_lsn,
900 0 : &timeline.gate,
901 0 : timeline.cancel.clone(),
902 0 : ctx,
903 0 : )
904 0 : .await?;
905 :
906 0 : let mut nimages = 0;
907 0 : for task in self.tasks {
908 0 : nimages += task.doit(&mut writer, ctx).await?;
909 : }
910 :
911 0 : let resident_layer = if nimages > 0 {
912 0 : let (desc, path) = writer.finish(ctx).await?;
913 :
914 : {
915 0 : let guard = timeline.layers.read().await;
916 0 : let existing_layer = guard.try_get_from_key(&desc.key());
917 0 : if let Some(layer) = existing_layer {
918 0 : if layer.metadata().generation != timeline.generation {
919 0 : return Err(anyhow::anyhow!(
920 0 : "Import attempted to rewrite layer file in the same generation: {}",
921 0 : layer.local_path()
922 0 : ));
923 0 : }
924 0 : }
925 : }
926 :
927 0 : Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
928 : } else {
929 : // dropping the writer cleans up
930 0 : return Ok(());
931 : };
932 :
933 : // The same import job might run multiple times since not each job is checkpointed.
934 : // Hence, we must support the cases where the layer already exists. We cannot be
935 : // certain that the existing layer is identical to the new one, so in that case
936 : // we replace the old layer with the one we just generated.
937 :
938 0 : let mut guard = timeline.layers.write().await;
939 :
940 0 : let existing_layer = guard
941 0 : .try_get_from_key(&resident_layer.layer_desc().key())
942 0 : .cloned();
943 0 : match existing_layer {
944 0 : Some(existing) => {
945 0 : guard.open_mut()?.rewrite_layers(
946 0 : &[(existing.clone(), resident_layer.clone())],
947 0 : &[],
948 0 : &timeline.metrics,
949 0 : );
950 : }
951 : None => {
952 0 : guard
953 0 : .open_mut()?
954 0 : .track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
955 : }
956 : }
957 :
958 0 : crate::tenant::timeline::drop_wlock(guard);
959 0 :
960 0 : timeline
961 0 : .remote_client
962 0 : .schedule_layer_file_upload(resident_layer)?;
963 :
964 0 : Ok(())
965 0 : }
966 : }
|