LCOV - code coverage report
Current view: top level - safekeeper/src - copy_timeline.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 153 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 6 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use anyhow::{Result, bail};
       4              : use camino::Utf8PathBuf;
       5              : use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
       6              : use remote_storage::GenericRemoteStorage;
       7              : use safekeeper_api::membership::Configuration;
       8              : use tokio::fs::OpenOptions;
       9              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      10              : use tracing::{info, warn};
      11              : use utils::id::TenantTimelineId;
      12              : use utils::lsn::Lsn;
      13              : 
      14              : use crate::GlobalTimelines;
      15              : use crate::control_file::FileStorage;
      16              : use crate::state::TimelinePersistentState;
      17              : use crate::timeline::{TimelineError, WalResidentTimeline};
      18              : use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
      19              : use crate::wal_backup::copy_s3_segments;
      20              : use crate::wal_storage::{WalReader, wal_file_paths};
      21              : 
      22              : // we don't want to have more than 10 segments on disk after copy, because they take space
      23              : const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
      24              : 
      25              : pub struct Request {
      26              :     pub source_ttid: TenantTimelineId,
      27              :     pub until_lsn: Lsn,
      28              :     pub destination_ttid: TenantTimelineId,
      29              : }
      30              : 
      31            0 : pub async fn handle_request(
      32            0 :     request: Request,
      33            0 :     global_timelines: Arc<GlobalTimelines>,
      34            0 :     storage: Arc<GenericRemoteStorage>,
      35            0 : ) -> Result<()> {
      36              :     // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
      37              :     //   if LSN will point to the middle of a WAL record, timeline will be in "broken" state
      38              : 
      39            0 :     match global_timelines.get(request.destination_ttid) {
      40              :         // timeline already exists. would be good to check that this timeline is the copy
      41              :         // of the source timeline, but it isn't obvious how to do that
      42            0 :         Ok(_) => return Ok(()),
      43              :         // timeline not found, we are going to create it
      44            0 :         Err(TimelineError::NotFound(_)) => {}
      45              :         // error, probably timeline was deleted
      46            0 :         res => {
      47            0 :             res?;
      48              :         }
      49              :     }
      50              : 
      51            0 :     let source = global_timelines.get(request.source_ttid)?;
      52            0 :     let source_tli = source.wal_residence_guard().await?;
      53              : 
      54            0 :     let conf = &global_timelines.get_global_config();
      55            0 :     let ttid = request.destination_ttid;
      56              : 
      57            0 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
      58              : 
      59            0 :     let (mem_state, state) = source_tli.get_state().await;
      60            0 :     let start_lsn = state.timeline_start_lsn;
      61            0 :     if start_lsn == Lsn::INVALID {
      62            0 :         bail!("timeline is not initialized");
      63            0 :     }
      64            0 :     let backup_lsn = mem_state.backup_lsn;
      65              : 
      66              :     {
      67            0 :         let commit_lsn = mem_state.commit_lsn;
      68            0 :         let flush_lsn = source_tli.get_flush_lsn().await;
      69              : 
      70            0 :         info!(
      71            0 :             "collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}",
      72              :             start_lsn, backup_lsn, commit_lsn, flush_lsn
      73              :         );
      74              : 
      75            0 :         assert!(backup_lsn >= start_lsn);
      76            0 :         assert!(commit_lsn >= start_lsn);
      77            0 :         assert!(flush_lsn >= start_lsn);
      78              : 
      79            0 :         if request.until_lsn > flush_lsn {
      80            0 :             bail!(format!(
      81            0 :                 "requested LSN {} is beyond the end of the timeline {}",
      82              :                 request.until_lsn, flush_lsn
      83              :             ));
      84            0 :         }
      85            0 :         if request.until_lsn < start_lsn {
      86            0 :             bail!(format!(
      87            0 :                 "requested LSN {} is before the start of the timeline {}",
      88              :                 request.until_lsn, start_lsn
      89              :             ));
      90            0 :         }
      91              : 
      92            0 :         if request.until_lsn > commit_lsn {
      93            0 :             warn!("copy_timeline WAL is not fully committed");
      94            0 :         }
      95              : 
      96            0 :         if backup_lsn < request.until_lsn && request.until_lsn.0 - backup_lsn.0 > MAX_BACKUP_LAG {
      97              :             // we have a lot of segments that are not backed up. we can try to wait here until
      98              :             // segments will be backed up to remote storage, but it's not clear how long to wait
      99            0 :             bail!("too many segments are not backed up");
     100            0 :         }
     101              :     }
     102              : 
     103            0 :     let wal_seg_size = state.server.wal_seg_size as usize;
     104            0 :     if wal_seg_size == 0 {
     105            0 :         bail!("wal_seg_size is not set");
     106            0 :     }
     107              : 
     108            0 :     let first_segment = start_lsn.segment_number(wal_seg_size);
     109            0 :     let last_segment = request.until_lsn.segment_number(wal_seg_size);
     110              : 
     111            0 :     let new_backup_lsn = {
     112              :         // we can't have new backup_lsn greater than existing backup_lsn or start of the last segment
     113            0 :         let max_backup_lsn = backup_lsn.min(Lsn(last_segment * wal_seg_size as u64));
     114              : 
     115            0 :         if max_backup_lsn <= start_lsn {
     116              :             // probably we are starting from the first segment, which was not backed up yet.
     117              :             // note that start_lsn can be in the middle of the segment
     118            0 :             start_lsn
     119              :         } else {
     120              :             // we have some segments backed up, so we will assume all WAL below max_backup_lsn is backed up
     121            0 :             assert!(max_backup_lsn.segment_offset(wal_seg_size) == 0);
     122            0 :             max_backup_lsn
     123              :         }
     124              :     };
     125              : 
     126              :     // all previous segments will be copied inside S3
     127            0 :     let first_ondisk_segment = new_backup_lsn.segment_number(wal_seg_size);
     128            0 :     assert!(first_ondisk_segment <= last_segment);
     129            0 :     assert!(first_ondisk_segment >= first_segment);
     130              : 
     131            0 :     copy_s3_segments(
     132            0 :         &storage,
     133            0 :         wal_seg_size,
     134            0 :         &request.source_ttid,
     135            0 :         &request.destination_ttid,
     136            0 :         first_segment,
     137            0 :         first_ondisk_segment,
     138            0 :     )
     139            0 :     .await?;
     140              : 
     141            0 :     copy_disk_segments(
     142            0 :         &source_tli,
     143            0 :         wal_seg_size,
     144            0 :         new_backup_lsn,
     145            0 :         request.until_lsn,
     146            0 :         &tli_dir_path,
     147            0 :     )
     148            0 :     .await?;
     149              : 
     150            0 :     let mut new_state = TimelinePersistentState::new(
     151            0 :         &request.destination_ttid,
     152            0 :         Configuration::empty(),
     153            0 :         state.server.clone(),
     154            0 :         start_lsn,
     155            0 :         request.until_lsn,
     156            0 :     )?;
     157            0 :     new_state.timeline_start_lsn = start_lsn;
     158            0 :     new_state.peer_horizon_lsn = request.until_lsn;
     159            0 :     new_state.backup_lsn = new_backup_lsn;
     160              : 
     161            0 :     FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
     162              : 
     163              :     // now we have a ready timeline in a temp directory
     164            0 :     validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
     165            0 :     global_timelines
     166            0 :         .load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
     167            0 :         .await?;
     168              : 
     169            0 :     Ok(())
     170            0 : }
     171              : 
     172            0 : async fn copy_disk_segments(
     173            0 :     tli: &WalResidentTimeline,
     174            0 :     wal_seg_size: usize,
     175            0 :     start_lsn: Lsn,
     176            0 :     end_lsn: Lsn,
     177            0 :     tli_dir_path: &Utf8PathBuf,
     178            0 : ) -> Result<()> {
     179            0 :     let mut wal_reader = tli.get_walreader(start_lsn).await?;
     180              : 
     181            0 :     let mut buf = vec![0u8; MAX_SEND_SIZE];
     182              : 
     183            0 :     let first_segment = start_lsn.segment_number(wal_seg_size);
     184            0 :     let last_segment = end_lsn.segment_number(wal_seg_size);
     185              : 
     186            0 :     for segment in first_segment..=last_segment {
     187            0 :         let segment_start = segment * wal_seg_size as u64;
     188            0 :         let segment_end = segment_start + wal_seg_size as u64;
     189              : 
     190            0 :         let copy_start = segment_start.max(start_lsn.0);
     191            0 :         let copy_end = segment_end.min(end_lsn.0);
     192              : 
     193            0 :         let copy_start = copy_start - segment_start;
     194            0 :         let copy_end = copy_end - segment_start;
     195              : 
     196            0 :         let wal_file_path = {
     197            0 :             let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size);
     198              : 
     199            0 :             if segment == last_segment {
     200            0 :                 partial
     201              :             } else {
     202            0 :                 normal
     203              :             }
     204              :         };
     205              : 
     206            0 :         write_segment(
     207            0 :             &mut buf,
     208            0 :             &wal_file_path,
     209            0 :             wal_seg_size as u64,
     210            0 :             copy_start,
     211            0 :             copy_end,
     212            0 :             &mut wal_reader,
     213            0 :         )
     214            0 :         .await?;
     215              :     }
     216              : 
     217            0 :     Ok(())
     218            0 : }
     219              : 
     220            0 : async fn write_segment(
     221            0 :     buf: &mut [u8],
     222            0 :     file_path: &Utf8PathBuf,
     223            0 :     wal_seg_size: u64,
     224            0 :     from: u64,
     225            0 :     to: u64,
     226            0 :     reader: &mut WalReader,
     227            0 : ) -> Result<()> {
     228            0 :     assert!(from <= to);
     229            0 :     assert!(to <= wal_seg_size);
     230              : 
     231              :     #[allow(clippy::suspicious_open_options)]
     232            0 :     let mut file = OpenOptions::new()
     233            0 :         .create(true)
     234            0 :         .write(true)
     235            0 :         .open(&file_path)
     236            0 :         .await?;
     237              : 
     238              :     // maybe fill with zeros, as in wal_storage.rs?
     239            0 :     file.set_len(wal_seg_size).await?;
     240            0 :     file.seek(std::io::SeekFrom::Start(from)).await?;
     241              : 
     242            0 :     let mut bytes_left = to - from;
     243            0 :     while bytes_left > 0 {
     244            0 :         let len = bytes_left as usize;
     245            0 :         let len = len.min(buf.len());
     246            0 :         let len = reader.read(&mut buf[..len]).await?;
     247            0 :         file.write_all(&buf[..len]).await?;
     248            0 :         bytes_left -= len as u64;
     249              :     }
     250              : 
     251            0 :     file.flush().await?;
     252            0 :     file.sync_all().await?;
     253            0 :     Ok(())
     254            0 : }
        

Generated by: LCOV version 2.1-beta