Line data Source code
1 : //! Import a PGDATA directory into an empty root timeline.
2 : //!
3 : //! This module is adapted hackathon code by Heikki and Stas.
4 : //! Other code in the parent module was written by Christian as part of a customer PoC.
5 : //!
6 : //! The hackathon code was producing image layer files as a free-standing program.
7 : //!
8 : //! It has been modified to
9 : //! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
10 : //! - => sharding-awareness: produce image layers with only the data relevant for this shard
11 : //! - => S3 as the source for the PGDATA instead of local filesystem
12 : //!
13 : //! TODOs before productionization:
14 : //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
15 : //!
16 : //! An incomplete set of TODOs from the Hackathon:
17 : //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
18 :
19 : use std::collections::HashSet;
20 : use std::hash::{Hash, Hasher};
21 : use std::num::NonZeroUsize;
22 : use std::ops::Range;
23 : use std::sync::Arc;
24 :
25 : use anyhow::ensure;
26 : use bytes::Bytes;
27 : use futures::stream::FuturesOrdered;
28 : use itertools::Itertools;
29 : use pageserver_api::config::TimelineImportConfig;
30 : use pageserver_api::key::{
31 : CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
32 : rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
33 : slru_segment_size_to_key,
34 : };
35 : use pageserver_api::keyspace::{ShardedRange, singleton_range};
36 : use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
37 : use pageserver_api::reltag::{RelTag, SlruKind};
38 : use pageserver_api::shard::ShardIdentity;
39 : use postgres_ffi::BLCKSZ;
40 : use postgres_ffi::relfile_utils::parse_relfilename;
41 : use remote_storage::RemotePath;
42 : use tokio::sync::Semaphore;
43 : use tokio_stream::StreamExt;
44 : use tracing::{debug, instrument};
45 : use utils::bin_ser::BeSer;
46 : use utils::lsn::Lsn;
47 : use utils::pausable_failpoint;
48 :
49 : use super::Timeline;
50 : use super::importbucket_client::{ControlFile, RemoteStorageWrapper};
51 : use crate::assert_u64_eq_usize::UsizeIsU64;
52 : use crate::context::{DownloadBehavior, RequestContext};
53 : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
54 : use crate::pgdatadir_mapping::{
55 : DbDirectory, RelDirectory, SlruSegmentDirectory, TwoPhaseDirectory,
56 : };
57 : use crate::task_mgr::TaskKind;
58 : use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer};
59 : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
60 :
61 0 : pub async fn run(
62 0 : timeline: Arc<Timeline>,
63 0 : control_file: ControlFile,
64 0 : storage: RemoteStorageWrapper,
65 0 : import_progress: Option<ShardImportProgress>,
66 0 : ctx: &RequestContext,
67 0 : ) -> anyhow::Result<()> {
68 : // Match how we run the import based on the progress version.
69 : // If there's no import progress, it means that this is a new import
70 : // and we can use whichever version we want.
71 0 : match import_progress {
72 0 : Some(ShardImportProgress::V1(progress)) => {
73 0 : run_v1(timeline, control_file, storage, Some(progress), ctx).await
74 : }
75 0 : None => run_v1(timeline, control_file, storage, None, ctx).await,
76 : }
77 0 : }
78 :
79 0 : async fn run_v1(
80 0 : timeline: Arc<Timeline>,
81 0 : control_file: ControlFile,
82 0 : storage: RemoteStorageWrapper,
83 0 : import_progress: Option<ShardImportProgressV1>,
84 0 : ctx: &RequestContext,
85 0 : ) -> anyhow::Result<()> {
86 0 : let planner = Planner {
87 0 : control_file,
88 0 : storage: storage.clone(),
89 0 : shard: timeline.shard_identity,
90 0 : tasks: Vec::default(),
91 0 : };
92 :
93 : // Use the job size limit encoded in the progress if we are resuming an import.
94 : // This ensures that imports have stable plans even if the pageserver config changes.
95 0 : let import_config = {
96 0 : match &import_progress {
97 0 : Some(progress) => {
98 0 : let base = &timeline.conf.timeline_import_config;
99 0 : TimelineImportConfig {
100 0 : import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit)
101 0 : .unwrap(),
102 0 : import_job_concurrency: base.import_job_concurrency,
103 0 : import_job_checkpoint_threshold: base.import_job_checkpoint_threshold,
104 0 : import_job_max_byte_range_size: base.import_job_max_byte_range_size,
105 0 : }
106 : }
107 0 : None => timeline.conf.timeline_import_config.clone(),
108 : }
109 : };
110 :
111 0 : let plan = planner.plan(&import_config).await?;
112 :
113 : // Hash the plan and compare with the hash of the plan we got back from the storage controller.
114 : // If the two match, it means that the planning stage had the same output.
115 : //
116 : // This is not intended to be a cryptographically secure hash.
117 : const SEED: u64 = 42;
118 0 : let mut hasher = twox_hash::XxHash64::with_seed(SEED);
119 0 : plan.hash(&mut hasher);
120 0 : let plan_hash = hasher.finish();
121 :
122 0 : if let Some(progress) = &import_progress {
123 : // Handle collisions on jobs of unequal length
124 0 : if progress.jobs != plan.jobs.len() {
125 0 : anyhow::bail!("Import plan job length does not match storcon metadata")
126 0 : }
127 :
128 0 : if plan_hash != progress.import_plan_hash {
129 0 : anyhow::bail!("Import plan does not match storcon metadata");
130 0 : }
131 0 : }
132 :
133 0 : pausable_failpoint!("import-timeline-pre-execute-pausable");
134 :
135 0 : let jobs_count = import_progress.as_ref().map(|p| p.jobs);
136 0 : let start_from_job_idx = import_progress.map(|progress| progress.completed);
137 :
138 0 : tracing::info!(
139 : start_from_job_idx=?start_from_job_idx,
140 : jobs=?jobs_count,
141 0 : "Executing import plan"
142 : );
143 :
144 0 : plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx)
145 0 : .await
146 0 : }
147 :
148 : struct Planner {
149 : control_file: ControlFile,
150 : storage: RemoteStorageWrapper,
151 : shard: ShardIdentity,
152 : tasks: Vec<AnyImportTask>,
153 : }
154 :
155 : #[derive(Hash)]
156 : struct Plan {
157 : jobs: Vec<ChunkProcessingJob>,
158 : // Included here such that it ends up in the hash for the plan
159 : shard: ShardIdentity,
160 : }
161 :
162 : impl Planner {
163 : /// Creates an import plan
164 : ///
165 : /// This function is and must remain pure: given the same input, it will generate the same import plan.
166 0 : async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
167 0 : let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
168 0 : anyhow::ensure!(pgdata_lsn.is_valid());
169 :
170 0 : let datadir = PgDataDir::new(&self.storage).await?;
171 :
172 : // Import dbdir (00:00:00 keyspace)
173 : // This is just constructed here, but will be written to the image layer in the first call to import_db()
174 0 : let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
175 0 : dbdirs: datadir
176 0 : .dbs
177 0 : .iter()
178 0 : .map(|db| ((db.spcnode, db.dboid), true))
179 0 : .collect(),
180 0 : })?);
181 0 : self.tasks
182 0 : .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
183 :
184 : // Import databases (00:spcnode:dbnode keyspace for each db)
185 0 : for db in datadir.dbs {
186 0 : self.import_db(&db).await?;
187 : }
188 :
189 : // Import SLRUs
190 0 : if self.shard.is_shard_zero() {
191 : // pg_xact (01:00 keyspace)
192 0 : self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
193 0 : .await?;
194 : // pg_multixact/members (01:01 keyspace)
195 0 : self.import_slru(
196 0 : SlruKind::MultiXactMembers,
197 0 : &self.storage.pgdata().join("pg_multixact/members"),
198 0 : )
199 0 : .await?;
200 : // pg_multixact/offsets (01:02 keyspace)
201 0 : self.import_slru(
202 0 : SlruKind::MultiXactOffsets,
203 0 : &self.storage.pgdata().join("pg_multixact/offsets"),
204 0 : )
205 0 : .await?;
206 0 : }
207 :
208 : // Import pg_twophase.
209 : // TODO: as empty
210 0 : let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
211 0 : xids: HashSet::new(),
212 0 : })?;
213 0 : self.tasks
214 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
215 0 : TWOPHASEDIR_KEY,
216 0 : Bytes::from(twophasedir_buf),
217 0 : )));
218 :
219 : // Controlfile, checkpoint
220 0 : self.tasks
221 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
222 0 : CONTROLFILE_KEY,
223 0 : self.control_file.control_file_buf().clone(),
224 0 : )));
225 :
226 0 : let checkpoint_buf = self
227 0 : .control_file
228 0 : .control_file_data()
229 0 : .checkPointCopy
230 0 : .encode()?;
231 0 : self.tasks
232 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
233 0 : CHECKPOINT_KEY,
234 0 : checkpoint_buf,
235 0 : )));
236 :
237 : // Sort the tasks by the key ranges they handle.
238 : // The plan being generated here needs to be stable across invocations
239 : // of this method.
240 0 : self.tasks.sort_by_key(|task| match task {
241 0 : AnyImportTask::SingleKey(key) => (key.key, key.key.next()),
242 0 : AnyImportTask::RelBlocks(rel_blocks) => {
243 0 : (rel_blocks.key_range.start, rel_blocks.key_range.end)
244 : }
245 0 : AnyImportTask::SlruBlocks(slru_blocks) => {
246 0 : (slru_blocks.key_range.start, slru_blocks.key_range.end)
247 : }
248 0 : });
249 :
250 : // Assigns parts of key space to later parallel jobs
251 : // Note: The image layers produced here may have gaps, meaning,
252 : // there is not an image for each key in the layer's key range.
253 : // The read path stops traversal at the first image layer, regardless
254 : // of whether a base image has been found for a key or not.
255 : // (Concept of sparse image layers doesn't exist.)
256 : // This behavior is exactly right for the base image layers we're producing here.
257 : // But, since no other place in the code currently produces image layers with gaps,
258 : // it seems noteworthy.
259 0 : let mut last_end_key = Key::MIN;
260 0 : let mut current_chunk = Vec::new();
261 0 : let mut current_chunk_size: usize = 0;
262 0 : let mut jobs = Vec::new();
263 0 : for task in std::mem::take(&mut self.tasks).into_iter() {
264 0 : let task_size = task.total_size(&self.shard);
265 0 : let projected_chunk_size = current_chunk_size.saturating_add(task_size);
266 0 : if projected_chunk_size > import_config.import_job_soft_size_limit.into() {
267 0 : let key_range = last_end_key..task.key_range().start;
268 0 : jobs.push(ChunkProcessingJob::new(
269 0 : key_range.clone(),
270 0 : std::mem::take(&mut current_chunk),
271 0 : pgdata_lsn,
272 0 : ));
273 0 : last_end_key = key_range.end;
274 0 : current_chunk_size = 0;
275 0 : }
276 0 : current_chunk_size = current_chunk_size.saturating_add(task_size);
277 0 : current_chunk.push(task);
278 : }
279 0 : jobs.push(ChunkProcessingJob::new(
280 0 : last_end_key..Key::MAX,
281 0 : current_chunk,
282 0 : pgdata_lsn,
283 : ));
284 :
285 0 : Ok(Plan {
286 0 : jobs,
287 0 : shard: self.shard,
288 0 : })
289 0 : }
290 :
291 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
292 : async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
293 : debug!("start");
294 : scopeguard::defer! {
295 : debug!("return");
296 : }
297 :
298 : // Import relmap (00:spcnode:dbnode:00:*:00)
299 : let relmap_key = relmap_file_key(db.spcnode, db.dboid);
300 : debug!("Constructing relmap entry, key {relmap_key}");
301 : let relmap_path = db.path.join("pg_filenode.map");
302 : let relmap_buf = self.storage.get(&relmap_path).await?;
303 : self.tasks
304 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
305 : relmap_key, relmap_buf,
306 : )));
307 :
308 : // Import reldir (00:spcnode:dbnode:00:*:01)
309 : let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
310 : debug!("Constructing reldirs entry, key {reldir_key}");
311 : let reldir_buf = RelDirectory::ser(&RelDirectory {
312 : rels: db
313 : .files
314 : .iter()
315 0 : .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
316 : .collect(),
317 : })?;
318 : self.tasks
319 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
320 : reldir_key,
321 : Bytes::from(reldir_buf),
322 : )));
323 :
324 : // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
325 : // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
326 : for file in &db.files {
327 : debug!(%file.path, %file.filesize, "importing file");
328 : let len = file.filesize;
329 : ensure!(len % 8192 == 0);
330 : let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
331 : let start_key = rel_block_to_key(file.rel_tag, start_blk);
332 : let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
333 : self.tasks
334 : .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
335 : self.shard,
336 : start_key..end_key,
337 : &file.path,
338 : self.storage.clone(),
339 : )));
340 :
341 : // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
342 : if let Some(nblocks) = file.nblocks {
343 : let size_key = rel_size_to_key(file.rel_tag);
344 : //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
345 : let buf = nblocks.to_le_bytes();
346 : self.tasks
347 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
348 : size_key,
349 : Bytes::from(buf.to_vec()),
350 : )));
351 : }
352 : }
353 :
354 : Ok(())
355 : }
356 :
357 0 : async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
358 0 : assert!(self.shard.is_shard_zero());
359 :
360 0 : let segments = self.storage.listfilesindir(path).await?;
361 0 : let segments: Vec<(String, u32, usize)> = segments
362 0 : .into_iter()
363 0 : .filter_map(|(path, size)| {
364 0 : let filename = path.object_name()?;
365 0 : let segno = u32::from_str_radix(filename, 16).ok()?;
366 0 : Some((filename.to_string(), segno, size))
367 0 : })
368 0 : .collect();
369 :
370 : // Write SlruDir
371 0 : let slrudir_key = slru_dir_to_key(kind);
372 0 : let segnos: HashSet<u32> = segments
373 0 : .iter()
374 0 : .map(|(_path, segno, _size)| *segno)
375 0 : .collect();
376 0 : let slrudir = SlruSegmentDirectory { segments: segnos };
377 0 : let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
378 0 : self.tasks
379 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
380 0 : slrudir_key,
381 0 : Bytes::from(slrudir_buf),
382 0 : )));
383 :
384 0 : for (segpath, segno, size) in segments {
385 : // SlruSegBlocks for each segment
386 0 : let p = path.join(&segpath);
387 0 : let file_size = size;
388 0 : ensure!(file_size % 8192 == 0);
389 0 : let nblocks = u32::try_from(file_size / 8192)?;
390 0 : let start_key = slru_block_to_key(kind, segno, 0);
391 0 : let end_key = slru_block_to_key(kind, segno, nblocks);
392 0 : debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
393 0 : self.tasks
394 0 : .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
395 0 : start_key..end_key,
396 0 : &p,
397 0 : self.storage.clone(),
398 0 : )));
399 :
400 : // Followed by SlruSegSize
401 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
402 0 : let segsize_buf = nblocks.to_le_bytes();
403 0 : self.tasks
404 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
405 0 : segsize_key,
406 0 : Bytes::copy_from_slice(&segsize_buf),
407 0 : )));
408 : }
409 0 : Ok(())
410 0 : }
411 : }
412 :
413 : impl Plan {
414 0 : async fn execute(
415 0 : self,
416 0 : timeline: Arc<Timeline>,
417 0 : start_after_job_idx: Option<usize>,
418 0 : import_plan_hash: u64,
419 0 : import_config: &TimelineImportConfig,
420 0 : ctx: &RequestContext,
421 0 : ) -> anyhow::Result<()> {
422 0 : let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &timeline.cancel);
423 :
424 0 : let mut work = FuturesOrdered::new();
425 0 : let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
426 :
427 0 : let jobs_in_plan = self.jobs.len();
428 :
429 0 : let mut jobs = self
430 0 : .jobs
431 0 : .into_iter()
432 0 : .enumerate()
433 0 : .map(|(idx, job)| (idx + 1, job))
434 0 : .filter(|(idx, _job)| {
435 : // Filter out any jobs that have been done already
436 0 : if let Some(start_after) = start_after_job_idx {
437 0 : *idx > start_after
438 : } else {
439 0 : true
440 : }
441 0 : })
442 0 : .peekable();
443 :
444 0 : let mut last_completed_job_idx = start_after_job_idx.unwrap_or(0);
445 0 : let checkpoint_every: usize = import_config.import_job_checkpoint_threshold.into();
446 0 : let max_byte_range_size: usize = import_config.import_job_max_byte_range_size.into();
447 :
448 : // Run import jobs concurrently up to the limit specified by the pageserver configuration.
449 : // Note that we process completed futures in the oreder of insertion. This will be the
450 : // building block for resuming imports across pageserver restarts or tenant migrations.
451 0 : while last_completed_job_idx < jobs_in_plan {
452 0 : tokio::select! {
453 0 : permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
454 0 : let permit = permit.expect("never closed");
455 0 : let (job_idx, job) = jobs.next().expect("we peeked");
456 :
457 0 : let job_timeline = timeline.clone();
458 0 : let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
459 :
460 0 : work.push_back(tokio::task::spawn(async move {
461 0 : let _permit = permit;
462 0 : let res = job.run(job_timeline, max_byte_range_size, &ctx).await;
463 0 : (job_idx, res)
464 0 : }));
465 : },
466 0 : maybe_complete_job_idx = work.next() => {
467 0 : pausable_failpoint!("import-task-complete-pausable");
468 :
469 0 : match maybe_complete_job_idx {
470 0 : Some(Ok((job_idx, res))) => {
471 0 : assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
472 :
473 0 : res?;
474 0 : last_completed_job_idx = job_idx;
475 :
476 0 : if last_completed_job_idx % checkpoint_every == 0 {
477 0 : tracing::info!(last_completed_job_idx, jobs=%jobs_in_plan, "Checkpointing import status");
478 :
479 0 : let progress = ShardImportProgressV1 {
480 0 : jobs: jobs_in_plan,
481 0 : completed: last_completed_job_idx,
482 0 : import_plan_hash,
483 0 : job_soft_size_limit: import_config.import_job_soft_size_limit.into(),
484 0 : };
485 :
486 0 : timeline.remote_client.schedule_index_upload_for_file_changes()?;
487 0 : timeline.remote_client.wait_completion().await?;
488 :
489 0 : storcon_client.put_timeline_import_status(
490 0 : timeline.tenant_shard_id,
491 0 : timeline.timeline_id,
492 0 : timeline.generation,
493 0 : ShardImportStatus::InProgress(Some(ShardImportProgress::V1(progress)))
494 0 : )
495 0 : .await
496 0 : .map_err(|_err| {
497 0 : anyhow::anyhow!("Shut down while putting timeline import status")
498 0 : })?;
499 0 : }
500 : },
501 : Some(Err(_)) => {
502 0 : anyhow::bail!(
503 0 : "import job panicked or cancelled"
504 : );
505 : }
506 0 : None => {}
507 : }
508 : }
509 : }
510 : }
511 :
512 0 : Ok(())
513 0 : }
514 : }
515 :
516 : //
517 : // dbdir iteration tools
518 : //
519 :
520 : struct PgDataDir {
521 : pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
522 : }
523 :
524 : struct PgDataDirDb {
525 : pub spcnode: u32,
526 : pub dboid: u32,
527 : pub path: RemotePath,
528 : pub files: Vec<PgDataDirDbFile>,
529 : }
530 :
531 : struct PgDataDirDbFile {
532 : pub path: RemotePath,
533 : pub rel_tag: RelTag,
534 : pub segno: u32,
535 : pub filesize: usize,
536 : // Cummulative size of the given fork, set only for the last segment of that fork
537 : pub nblocks: Option<usize>,
538 : }
539 :
540 : impl PgDataDir {
541 0 : async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
542 0 : let datadir_path = storage.pgdata();
543 : // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
544 : // Traverse database in increasing oid order
545 :
546 0 : let basedir = &datadir_path.join("base");
547 0 : let db_oids: Vec<_> = storage
548 0 : .listdir(basedir)
549 0 : .await?
550 0 : .into_iter()
551 0 : .filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
552 0 : .sorted()
553 0 : .collect();
554 0 : debug!(?db_oids, "found databases");
555 0 : let mut databases = Vec::new();
556 0 : for dboid in db_oids {
557 0 : databases.push(
558 0 : PgDataDirDb::new(
559 0 : storage,
560 0 : &basedir.join(dboid.to_string()),
561 0 : postgres_ffi_types::constants::DEFAULTTABLESPACE_OID,
562 0 : dboid,
563 0 : &datadir_path,
564 0 : )
565 0 : .await?,
566 : );
567 : }
568 :
569 : // special case for global catalogs
570 0 : databases.push(
571 0 : PgDataDirDb::new(
572 0 : storage,
573 0 : &datadir_path.join("global"),
574 0 : postgres_ffi_types::constants::GLOBALTABLESPACE_OID,
575 0 : 0,
576 0 : &datadir_path,
577 0 : )
578 0 : .await?,
579 : );
580 :
581 0 : databases.sort_by_key(|db| (db.spcnode, db.dboid));
582 :
583 0 : Ok(Self { dbs: databases })
584 0 : }
585 : }
586 :
587 : impl PgDataDirDb {
588 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
589 : async fn new(
590 : storage: &RemoteStorageWrapper,
591 : db_path: &RemotePath,
592 : spcnode: u32,
593 : dboid: u32,
594 : datadir_path: &RemotePath,
595 : ) -> anyhow::Result<Self> {
596 : let mut files: Vec<PgDataDirDbFile> = storage
597 : .listfilesindir(db_path)
598 : .await?
599 : .into_iter()
600 0 : .filter_map(|(path, size)| {
601 0 : debug!(%path, %size, "found file in dbdir");
602 0 : path.object_name().and_then(|name| {
603 : // returns (relnode, forknum, segno)
604 0 : parse_relfilename(name).ok().map(|x| (size, x))
605 0 : })
606 0 : })
607 : .sorted_by_key(|(_, relfilename)| *relfilename)
608 0 : .map(|(filesize, (relnode, forknum, segno))| {
609 0 : let rel_tag = RelTag {
610 0 : spcnode,
611 0 : dbnode: dboid,
612 0 : relnode,
613 0 : forknum,
614 0 : };
615 :
616 0 : let path = datadir_path.join(rel_tag.to_segfile_name(segno));
617 0 : anyhow::ensure!(filesize % BLCKSZ as usize == 0);
618 0 : let nblocks = filesize / BLCKSZ as usize;
619 :
620 0 : Ok(PgDataDirDbFile {
621 0 : path,
622 0 : filesize,
623 0 : rel_tag,
624 0 : segno,
625 0 : nblocks: Some(nblocks), // first non-cummulative sizes
626 0 : })
627 0 : })
628 : .collect::<anyhow::Result<_, _>>()?;
629 :
630 : // Set cummulative sizes. Do all of that math here, so that later we could easier
631 : // parallelize over segments and know with which segments we need to write relsize
632 : // entry.
633 : let mut cumulative_nblocks: usize = 0;
634 : let mut prev_rel_tag: Option<RelTag> = None;
635 : for i in 0..files.len() {
636 : if prev_rel_tag == Some(files[i].rel_tag) {
637 : cumulative_nblocks += files[i].nblocks.unwrap();
638 : } else {
639 : cumulative_nblocks = files[i].nblocks.unwrap();
640 : }
641 :
642 : files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
643 : Some(cumulative_nblocks)
644 : } else {
645 : None
646 : };
647 :
648 : prev_rel_tag = Some(files[i].rel_tag);
649 : }
650 :
651 : Ok(PgDataDirDb {
652 : files,
653 : path: db_path.clone(),
654 : spcnode,
655 : dboid,
656 : })
657 : }
658 : }
659 :
660 : trait ImportTask {
661 : fn key_range(&self) -> Range<Key>;
662 :
663 0 : fn total_size(&self, shard_identity: &ShardIdentity) -> usize {
664 0 : let range = ShardedRange::new(self.key_range(), shard_identity);
665 0 : let page_count = range.page_count();
666 0 : if page_count == u32::MAX {
667 0 : tracing::warn!(
668 0 : "Import task has non contiguous key range: {}..{}",
669 0 : self.key_range().start,
670 0 : self.key_range().end
671 : );
672 :
673 : // Tasks should operate on contiguous ranges. It is unexpected for
674 : // ranges to violate this assumption. Calling code handles this by mapping
675 : // any task on a non contiguous range to its own image layer.
676 0 : usize::MAX
677 : } else {
678 0 : page_count as usize * 8192
679 : }
680 0 : }
681 :
682 : async fn doit(
683 : self,
684 : layer_writer: &mut ImageLayerWriter,
685 : max_byte_range_size: usize,
686 : ctx: &RequestContext,
687 : ) -> anyhow::Result<usize>;
688 : }
689 :
690 : struct ImportSingleKeyTask {
691 : key: Key,
692 : buf: Bytes,
693 : }
694 :
695 : impl Hash for ImportSingleKeyTask {
696 0 : fn hash<H: Hasher>(&self, state: &mut H) {
697 0 : let ImportSingleKeyTask { key, buf } = self;
698 :
699 0 : key.hash(state);
700 : // The key value might not have a stable binary representation.
701 : // For instance, the db directory uses an unstable hash-map.
702 : // To work around this we are a bit lax here and only hash the
703 : // size of the buffer which must be consistent.
704 0 : buf.len().hash(state);
705 0 : }
706 : }
707 :
708 : impl ImportSingleKeyTask {
709 0 : fn new(key: Key, buf: Bytes) -> Self {
710 0 : ImportSingleKeyTask { key, buf }
711 0 : }
712 : }
713 :
714 : impl ImportTask for ImportSingleKeyTask {
715 0 : fn key_range(&self) -> Range<Key> {
716 0 : singleton_range(self.key)
717 0 : }
718 :
719 0 : async fn doit(
720 0 : self,
721 0 : layer_writer: &mut ImageLayerWriter,
722 0 : _max_byte_range_size: usize,
723 0 : ctx: &RequestContext,
724 0 : ) -> anyhow::Result<usize> {
725 0 : layer_writer.put_image(self.key, self.buf, ctx).await?;
726 0 : Ok(1)
727 0 : }
728 : }
729 :
730 : struct ImportRelBlocksTask {
731 : shard_identity: ShardIdentity,
732 : key_range: Range<Key>,
733 : path: RemotePath,
734 : storage: RemoteStorageWrapper,
735 : }
736 :
737 : impl Hash for ImportRelBlocksTask {
738 0 : fn hash<H: Hasher>(&self, state: &mut H) {
739 : let ImportRelBlocksTask {
740 : shard_identity: _,
741 0 : key_range,
742 0 : path,
743 : storage: _,
744 0 : } = self;
745 :
746 0 : key_range.hash(state);
747 0 : path.hash(state);
748 0 : }
749 : }
750 :
751 : impl ImportRelBlocksTask {
752 0 : fn new(
753 0 : shard_identity: ShardIdentity,
754 0 : key_range: Range<Key>,
755 0 : path: &RemotePath,
756 0 : storage: RemoteStorageWrapper,
757 0 : ) -> Self {
758 0 : ImportRelBlocksTask {
759 0 : shard_identity,
760 0 : key_range,
761 0 : path: path.clone(),
762 0 : storage,
763 0 : }
764 0 : }
765 : }
766 :
767 : impl ImportTask for ImportRelBlocksTask {
768 0 : fn key_range(&self) -> Range<Key> {
769 0 : self.key_range.clone()
770 0 : }
771 :
772 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
773 : async fn doit(
774 : self,
775 : layer_writer: &mut ImageLayerWriter,
776 : max_byte_range_size: usize,
777 : ctx: &RequestContext,
778 : ) -> anyhow::Result<usize> {
779 : debug!("Importing relation file");
780 :
781 : let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
782 : let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
783 : assert_eq!(rel_tag, rel_tag_end);
784 :
785 : let ranges = (start_blk..end_blk)
786 : .enumerate()
787 0 : .filter_map(|(i, blknum)| {
788 0 : let key = rel_block_to_key(rel_tag, blknum);
789 0 : if self.shard_identity.is_key_disposable(&key) {
790 0 : return None;
791 0 : }
792 0 : let file_offset = i.checked_mul(8192).unwrap();
793 0 : Some((
794 0 : vec![key],
795 0 : file_offset,
796 0 : file_offset.checked_add(8192).unwrap(),
797 0 : ))
798 0 : })
799 0 : .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
800 0 : assert_eq!(key.len(), 1);
801 0 : assert!(!acc.is_empty());
802 0 : assert!(acc_end > acc_start);
803 0 : if acc_end == start && end - acc_start <= max_byte_range_size {
804 0 : acc.push(key.pop().unwrap());
805 0 : Ok((acc, acc_start, end))
806 : } else {
807 0 : Err(((acc, acc_start, acc_end), (key, start, end)))
808 : }
809 0 : });
810 :
811 : let mut nimages = 0;
812 : for (keys, range_start, range_end) in ranges {
813 : let range_buf = self
814 : .storage
815 : .get_range(&self.path, range_start.into_u64(), range_end.into_u64())
816 : .await?;
817 : let mut buf = Bytes::from(range_buf);
818 : for key in keys {
819 : // The writer buffers writes internally
820 : let image = buf.split_to(8192);
821 : layer_writer.put_image(key, image, ctx).await?;
822 : nimages += 1;
823 : }
824 : }
825 :
826 : Ok(nimages)
827 : }
828 : }
829 :
830 : struct ImportSlruBlocksTask {
831 : key_range: Range<Key>,
832 : path: RemotePath,
833 : storage: RemoteStorageWrapper,
834 : }
835 :
836 : impl Hash for ImportSlruBlocksTask {
837 0 : fn hash<H: Hasher>(&self, state: &mut H) {
838 : let ImportSlruBlocksTask {
839 0 : key_range,
840 0 : path,
841 : storage: _,
842 0 : } = self;
843 :
844 0 : key_range.hash(state);
845 0 : path.hash(state);
846 0 : }
847 : }
848 :
849 : impl ImportSlruBlocksTask {
850 0 : fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
851 0 : ImportSlruBlocksTask {
852 0 : key_range,
853 0 : path: path.clone(),
854 0 : storage,
855 0 : }
856 0 : }
857 : }
858 :
859 : impl ImportTask for ImportSlruBlocksTask {
860 0 : fn key_range(&self) -> Range<Key> {
861 0 : self.key_range.clone()
862 0 : }
863 :
864 0 : async fn doit(
865 0 : self,
866 0 : layer_writer: &mut ImageLayerWriter,
867 0 : _max_byte_range_size: usize,
868 0 : ctx: &RequestContext,
869 0 : ) -> anyhow::Result<usize> {
870 0 : debug!("Importing SLRU segment file {}", self.path);
871 0 : let buf = self.storage.get(&self.path).await?;
872 :
873 : // TODO(vlad): Does timestamp to LSN work for imported timelines?
874 : // Probably not since we don't append the `xact_time` to it as in
875 : // [`WalIngest::ingest_xact_record`].
876 0 : let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
877 0 : let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
878 0 : let mut blknum = start_blk;
879 0 : let mut nimages = 0;
880 0 : let mut file_offset = 0;
881 0 : while blknum < end_blk {
882 0 : let key = slru_block_to_key(kind, segno, blknum);
883 0 : let buf = &buf[file_offset..(file_offset + 8192)];
884 0 : file_offset += 8192;
885 0 : layer_writer
886 0 : .put_image(key, Bytes::copy_from_slice(buf), ctx)
887 0 : .await?;
888 0 : nimages += 1;
889 0 : blknum += 1;
890 : }
891 0 : Ok(nimages)
892 0 : }
893 : }
894 :
895 : #[derive(Hash)]
896 : enum AnyImportTask {
897 : SingleKey(ImportSingleKeyTask),
898 : RelBlocks(ImportRelBlocksTask),
899 : SlruBlocks(ImportSlruBlocksTask),
900 : }
901 :
902 : impl ImportTask for AnyImportTask {
903 0 : fn key_range(&self) -> Range<Key> {
904 0 : match self {
905 0 : Self::SingleKey(t) => t.key_range(),
906 0 : Self::RelBlocks(t) => t.key_range(),
907 0 : Self::SlruBlocks(t) => t.key_range(),
908 : }
909 0 : }
910 : /// returns the number of images put into the `layer_writer`
911 0 : async fn doit(
912 0 : self,
913 0 : layer_writer: &mut ImageLayerWriter,
914 0 : max_byte_range_size: usize,
915 0 : ctx: &RequestContext,
916 0 : ) -> anyhow::Result<usize> {
917 0 : match self {
918 0 : Self::SingleKey(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
919 0 : Self::RelBlocks(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
920 0 : Self::SlruBlocks(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
921 : }
922 0 : }
923 : }
924 :
925 : impl From<ImportSingleKeyTask> for AnyImportTask {
926 0 : fn from(t: ImportSingleKeyTask) -> Self {
927 0 : Self::SingleKey(t)
928 0 : }
929 : }
930 :
931 : impl From<ImportRelBlocksTask> for AnyImportTask {
932 0 : fn from(t: ImportRelBlocksTask) -> Self {
933 0 : Self::RelBlocks(t)
934 0 : }
935 : }
936 :
937 : impl From<ImportSlruBlocksTask> for AnyImportTask {
938 0 : fn from(t: ImportSlruBlocksTask) -> Self {
939 0 : Self::SlruBlocks(t)
940 0 : }
941 : }
942 :
943 : #[derive(Hash)]
944 : struct ChunkProcessingJob {
945 : range: Range<Key>,
946 : tasks: Vec<AnyImportTask>,
947 :
948 : pgdata_lsn: Lsn,
949 : }
950 :
951 : impl ChunkProcessingJob {
952 0 : fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
953 0 : assert!(pgdata_lsn.is_valid());
954 0 : Self {
955 0 : range,
956 0 : tasks,
957 0 : pgdata_lsn,
958 0 : }
959 0 : }
960 :
961 0 : async fn run(
962 0 : self,
963 0 : timeline: Arc<Timeline>,
964 0 : max_byte_range_size: usize,
965 0 : ctx: &RequestContext,
966 0 : ) -> anyhow::Result<()> {
967 0 : let mut writer = ImageLayerWriter::new(
968 0 : timeline.conf,
969 0 : timeline.timeline_id,
970 0 : timeline.tenant_shard_id,
971 0 : &self.range,
972 0 : self.pgdata_lsn,
973 0 : &timeline.gate,
974 0 : timeline.cancel.clone(),
975 0 : ctx,
976 0 : )
977 0 : .await?;
978 :
979 0 : let mut nimages = 0;
980 0 : for task in self.tasks {
981 0 : nimages += task.doit(&mut writer, max_byte_range_size, ctx).await?;
982 : }
983 :
984 0 : let resident_layer = if nimages > 0 {
985 0 : let (desc, path) = writer.finish(ctx).await?;
986 :
987 : {
988 0 : let guard = timeline
989 0 : .layers
990 0 : .read(LayerManagerLockHolder::ImportPgData)
991 0 : .await;
992 0 : let existing_layer = guard.try_get_from_key(&desc.key());
993 0 : if let Some(layer) = existing_layer {
994 0 : if layer.metadata().generation == timeline.generation {
995 0 : return Err(anyhow::anyhow!(
996 0 : "Import attempted to rewrite layer file in the same generation: {}",
997 0 : layer.local_path()
998 0 : ));
999 0 : }
1000 0 : }
1001 : }
1002 :
1003 0 : Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
1004 : } else {
1005 : // dropping the writer cleans up
1006 0 : return Ok(());
1007 : };
1008 :
1009 : // The same import job might run multiple times since not each job is checkpointed.
1010 : // Hence, we must support the cases where the layer already exists. We cannot be
1011 : // certain that the existing layer is identical to the new one, so in that case
1012 : // we replace the old layer with the one we just generated.
1013 :
1014 0 : let mut guard = timeline
1015 0 : .layers
1016 0 : .write(LayerManagerLockHolder::ImportPgData)
1017 0 : .await;
1018 :
1019 0 : let existing_layer = guard
1020 0 : .try_get_from_key(&resident_layer.layer_desc().key())
1021 0 : .cloned();
1022 0 : match existing_layer {
1023 0 : Some(existing) => {
1024 : // Unlink the remote layer from the index without scheduling its deletion.
1025 : // When `existing_layer` drops [`LayerInner::drop`] will schedule its deletion from
1026 : // remote storage, but that assumes that the layer was unlinked from the index first.
1027 0 : timeline
1028 0 : .remote_client
1029 0 : .schedule_unlinking_of_layers_from_index_part(std::iter::once(
1030 0 : existing.layer_desc().layer_name(),
1031 0 : ))?;
1032 :
1033 0 : guard.open_mut()?.rewrite_layers(
1034 0 : &[(existing.clone(), resident_layer.clone())],
1035 0 : &[],
1036 0 : &timeline.metrics,
1037 : );
1038 : }
1039 : None => {
1040 0 : guard
1041 0 : .open_mut()?
1042 0 : .track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
1043 : }
1044 : }
1045 :
1046 0 : crate::tenant::timeline::drop_layer_manager_wlock(guard);
1047 :
1048 0 : timeline
1049 0 : .remote_client
1050 0 : .schedule_layer_file_upload(resident_layer)?;
1051 :
1052 0 : Ok(())
1053 0 : }
1054 : }
|