LCOV - differential code coverage report
Current view: top level - safekeeper/src - copy_timeline.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 94.0 % 168 158 10 158
Current Date: 2024-01-09 02:06:09 Functions: 87.5 % 8 7 1 7
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::sync::Arc;
       2                 : 
       3                 : use anyhow::{bail, Result};
       4                 : use camino::Utf8PathBuf;
       5                 : 
       6                 : use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
       7                 : use tokio::{
       8                 :     fs::OpenOptions,
       9                 :     io::{AsyncSeekExt, AsyncWriteExt},
      10                 : };
      11                 : use tracing::{info, warn};
      12                 : use utils::{id::TenantTimelineId, lsn::Lsn};
      13                 : 
      14                 : use crate::{
      15                 :     control_file::{FileStorage, Storage},
      16                 :     pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
      17                 :     safekeeper::SafeKeeperState,
      18                 :     timeline::{Timeline, TimelineError},
      19                 :     wal_backup::copy_s3_segments,
      20                 :     wal_storage::{wal_file_paths, WalReader},
      21                 :     GlobalTimelines, SafeKeeperConf,
      22                 : };
      23                 : 
      24                 : // we don't want to have more than 10 segments on disk after copy, because they take space
      25                 : const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
      26                 : 
      27                 : pub struct Request {
      28                 :     pub source: Arc<Timeline>,
      29                 :     pub until_lsn: Lsn,
      30                 :     pub destination_ttid: TenantTimelineId,
      31                 : }
      32                 : 
      33 CBC          48 : pub async fn handle_request(request: Request) -> Result<()> {
      34              48 :     // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
      35              48 :     //   if LSN will point to the middle of a WAL record, timeline will be in "broken" state
      36              48 : 
      37              48 :     match GlobalTimelines::get(request.destination_ttid) {
      38                 :         // timeline already exists. would be good to check that this timeline is the copy
      39                 :         // of the source timeline, but it isn't obvious how to do that
      40 UBC           0 :         Ok(_) => return Ok(()),
      41                 :         // timeline not found, we are going to create it
      42 CBC          48 :         Err(TimelineError::NotFound(_)) => {}
      43                 :         // error, probably timeline was deleted
      44 UBC           0 :         res => {
      45               0 :             res?;
      46                 :         }
      47                 :     }
      48                 : 
      49 CBC          48 :     let conf = &GlobalTimelines::get_global_config();
      50              48 :     let ttid = request.destination_ttid;
      51                 : 
      52              48 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
      53                 : 
      54              48 :     let (mem_state, state) = request.source.get_state().await;
      55              48 :     let start_lsn = state.timeline_start_lsn;
      56              48 :     if start_lsn == Lsn::INVALID {
      57 UBC           0 :         bail!("timeline is not initialized");
      58 CBC          48 :     }
      59              48 :     let backup_lsn = mem_state.backup_lsn;
      60              48 : 
      61              48 :     {
      62              48 :         let commit_lsn = mem_state.commit_lsn;
      63              48 :         let flush_lsn = request.source.get_flush_lsn().await;
      64                 : 
      65              48 :         info!(
      66              48 :             "collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}",
      67              48 :             start_lsn, backup_lsn, commit_lsn, flush_lsn
      68              48 :         );
      69                 : 
      70              48 :         assert!(backup_lsn >= start_lsn);
      71              48 :         assert!(commit_lsn >= start_lsn);
      72              48 :         assert!(flush_lsn >= start_lsn);
      73                 : 
      74              48 :         if request.until_lsn > flush_lsn {
      75 UBC           0 :             bail!("requested LSN is beyond the end of the timeline");
      76 CBC          48 :         }
      77              48 :         if request.until_lsn < start_lsn {
      78 UBC           0 :             bail!("requested LSN is before the start of the timeline");
      79 CBC          48 :         }
      80              48 : 
      81              48 :         if request.until_lsn > commit_lsn {
      82 UBC           0 :             warn!("copy_timeline WAL is not fully committed");
      83 CBC          48 :         }
      84                 : 
      85              48 :         if backup_lsn < request.until_lsn && request.until_lsn.0 - backup_lsn.0 > MAX_BACKUP_LAG {
      86                 :             // we have a lot of segments that are not backed up. we can try to wait here until
      87                 :             // segments will be backed up to remote storage, but it's not clear how long to wait
      88 UBC           0 :             bail!("too many segments are not backed up");
      89 CBC          48 :         }
      90              48 :     }
      91              48 : 
      92              48 :     let wal_seg_size = state.server.wal_seg_size as usize;
      93              48 :     if wal_seg_size == 0 {
      94 UBC           0 :         bail!("wal_seg_size is not set");
      95 CBC          48 :     }
      96              48 : 
      97              48 :     let first_segment = start_lsn.segment_number(wal_seg_size);
      98              48 :     let last_segment = request.until_lsn.segment_number(wal_seg_size);
      99                 : 
     100              48 :     let new_backup_lsn = {
     101                 :         // we can't have new backup_lsn greater than existing backup_lsn or start of the last segment
     102              48 :         let max_backup_lsn = backup_lsn.min(Lsn(last_segment * wal_seg_size as u64));
     103              48 : 
     104              48 :         if max_backup_lsn <= start_lsn {
     105                 :             // probably we are starting from the first segment, which was not backed up yet.
     106                 :             // note that start_lsn can be in the middle of the segment
     107              33 :             start_lsn
     108                 :         } else {
     109                 :             // we have some segments backed up, so we will assume all WAL below max_backup_lsn is backed up
     110              15 :             assert!(max_backup_lsn.segment_offset(wal_seg_size) == 0);
     111              15 :             max_backup_lsn
     112                 :         }
     113                 :     };
     114                 : 
     115                 :     // all previous segments will be copied inside S3
     116              48 :     let first_ondisk_segment = new_backup_lsn.segment_number(wal_seg_size);
     117              48 :     assert!(first_ondisk_segment <= last_segment);
     118              48 :     assert!(first_ondisk_segment >= first_segment);
     119                 : 
     120              48 :     copy_s3_segments(
     121              48 :         wal_seg_size,
     122              48 :         &request.source.ttid,
     123              48 :         &request.destination_ttid,
     124              48 :         first_segment,
     125              48 :         first_ondisk_segment,
     126              48 :     )
     127             238 :     .await?;
     128                 : 
     129              48 :     copy_disk_segments(
     130              48 :         conf,
     131              48 :         &state,
     132              48 :         wal_seg_size,
     133              48 :         &request.source.ttid,
     134              48 :         new_backup_lsn,
     135              48 :         request.until_lsn,
     136              48 :         &tli_dir_path,
     137              48 :     )
     138            1914 :     .await?;
     139                 : 
     140              48 :     let mut new_state = SafeKeeperState::new(
     141              48 :         &request.destination_ttid,
     142              48 :         state.server.clone(),
     143              48 :         vec![],
     144              48 :         request.until_lsn,
     145              48 :         start_lsn,
     146              48 :     );
     147              48 :     new_state.timeline_start_lsn = start_lsn;
     148              48 :     new_state.peer_horizon_lsn = request.until_lsn;
     149              48 :     new_state.backup_lsn = new_backup_lsn;
     150                 : 
     151              48 :     let mut file_storage = FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone())?;
     152             141 :     file_storage.persist(&new_state).await?;
     153                 : 
     154                 :     // now we have a ready timeline in a temp directory
     155              48 :     validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
     156              95 :     load_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
     157                 : 
     158              48 :     Ok(())
     159              48 : }
     160                 : 
     161              48 : async fn copy_disk_segments(
     162              48 :     conf: &SafeKeeperConf,
     163              48 :     persisted_state: &SafeKeeperState,
     164              48 :     wal_seg_size: usize,
     165              48 :     source_ttid: &TenantTimelineId,
     166              48 :     start_lsn: Lsn,
     167              48 :     end_lsn: Lsn,
     168              48 :     tli_dir_path: &Utf8PathBuf,
     169              48 : ) -> Result<()> {
     170              48 :     let mut wal_reader = WalReader::new(
     171              48 :         conf.workdir.clone(),
     172              48 :         conf.timeline_dir(source_ttid),
     173              48 :         persisted_state,
     174              48 :         start_lsn,
     175              48 :         true,
     176              48 :     )?;
     177                 : 
     178              48 :     let mut buf = [0u8; MAX_SEND_SIZE];
     179              48 : 
     180              48 :     let first_segment = start_lsn.segment_number(wal_seg_size);
     181              48 :     let last_segment = end_lsn.segment_number(wal_seg_size);
     182                 : 
     183              48 :     for segment in first_segment..=last_segment {
     184              48 :         let segment_start = segment * wal_seg_size as u64;
     185              48 :         let segment_end = segment_start + wal_seg_size as u64;
     186              48 : 
     187              48 :         let copy_start = segment_start.max(start_lsn.0);
     188              48 :         let copy_end = segment_end.min(end_lsn.0);
     189              48 : 
     190              48 :         let copy_start = copy_start - segment_start;
     191              48 :         let copy_end = copy_end - segment_start;
     192                 : 
     193              48 :         let wal_file_path = {
     194              48 :             let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size)?;
     195                 : 
     196              48 :             if segment == last_segment {
     197              48 :                 partial
     198                 :             } else {
     199 UBC           0 :                 normal
     200                 :             }
     201                 :         };
     202                 : 
     203 CBC          48 :         write_segment(
     204              48 :             &mut buf,
     205              48 :             &wal_file_path,
     206              48 :             wal_seg_size as u64,
     207              48 :             copy_start,
     208              48 :             copy_end,
     209              48 :             &mut wal_reader,
     210              48 :         )
     211            1914 :         .await?;
     212                 :     }
     213                 : 
     214              48 :     Ok(())
     215              48 : }
     216                 : 
     217              48 : async fn write_segment(
     218              48 :     buf: &mut [u8],
     219              48 :     file_path: &Utf8PathBuf,
     220              48 :     wal_seg_size: u64,
     221              48 :     from: u64,
     222              48 :     to: u64,
     223              48 :     reader: &mut WalReader,
     224              48 : ) -> Result<()> {
     225              48 :     assert!(from <= to);
     226              48 :     assert!(to <= wal_seg_size);
     227                 : 
     228              48 :     let mut file = OpenOptions::new()
     229              48 :         .create(true)
     230              48 :         .write(true)
     231              48 :         .open(&file_path)
     232              47 :         .await?;
     233                 : 
     234                 :     // maybe fill with zeros, as in wal_storage.rs?
     235              48 :     file.set_len(wal_seg_size).await?;
     236              48 :     file.seek(std::io::SeekFrom::Start(from)).await?;
     237                 : 
     238              48 :     let mut bytes_left = to - from;
     239            1293 :     while bytes_left > 0 {
     240            1245 :         let len = bytes_left as usize;
     241            1245 :         let len = len.min(buf.len());
     242            1310 :         let len = reader.read(&mut buf[..len]).await?;
     243            1245 :         file.write_all(&buf[..len]).await?;
     244            1245 :         bytes_left -= len as u64;
     245                 :     }
     246                 : 
     247              48 :     file.flush().await?;
     248              48 :     file.sync_all().await?;
     249              48 :     Ok(())
     250              48 : }
        

Generated by: LCOV version 2.1-beta