LCOV - code coverage report
Current view: top level - safekeeper/src - copy_timeline.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 94.0 % 168 158
Test Date: 2024-02-12 20:26:03 Functions: 87.5 % 8 7

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use anyhow::{bail, Result};
       4              : use camino::Utf8PathBuf;
       5              : 
       6              : use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
       7              : use tokio::{
       8              :     fs::OpenOptions,
       9              :     io::{AsyncSeekExt, AsyncWriteExt},
      10              : };
      11              : use tracing::{info, warn};
      12              : use utils::{id::TenantTimelineId, lsn::Lsn};
      13              : 
      14              : use crate::{
      15              :     control_file::{FileStorage, Storage},
      16              :     pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
      17              :     state::TimelinePersistentState,
      18              :     timeline::{Timeline, TimelineError},
      19              :     wal_backup::copy_s3_segments,
      20              :     wal_storage::{wal_file_paths, WalReader},
      21              :     GlobalTimelines, SafeKeeperConf,
      22              : };
      23              : 
      24              : // we don't want to have more than 10 segments on disk after copy, because they take space
      25              : const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
      26              : 
      27              : pub struct Request {
      28              :     pub source: Arc<Timeline>,
      29              :     pub until_lsn: Lsn,
      30              :     pub destination_ttid: TenantTimelineId,
      31              : }
      32              : 
      33           48 : pub async fn handle_request(request: Request) -> Result<()> {
      34           48 :     // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
      35           48 :     //   if LSN will point to the middle of a WAL record, timeline will be in "broken" state
      36           48 : 
      37           48 :     match GlobalTimelines::get(request.destination_ttid) {
      38              :         // timeline already exists. would be good to check that this timeline is the copy
      39              :         // of the source timeline, but it isn't obvious how to do that
      40            0 :         Ok(_) => return Ok(()),
      41              :         // timeline not found, we are going to create it
      42           48 :         Err(TimelineError::NotFound(_)) => {}
      43              :         // error, probably timeline was deleted
      44            0 :         res => {
      45            0 :             res?;
      46              :         }
      47              :     }
      48              : 
      49           48 :     let conf = &GlobalTimelines::get_global_config();
      50           48 :     let ttid = request.destination_ttid;
      51              : 
      52           48 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
      53              : 
      54           48 :     let (mem_state, state) = request.source.get_state().await;
      55           48 :     let start_lsn = state.timeline_start_lsn;
      56           48 :     if start_lsn == Lsn::INVALID {
      57            0 :         bail!("timeline is not initialized");
      58           48 :     }
      59           48 :     let backup_lsn = mem_state.backup_lsn;
      60           48 : 
      61           48 :     {
      62           48 :         let commit_lsn = mem_state.commit_lsn;
      63           48 :         let flush_lsn = request.source.get_flush_lsn().await;
      64              : 
      65           48 :         info!(
      66           48 :             "collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}",
      67           48 :             start_lsn, backup_lsn, commit_lsn, flush_lsn
      68           48 :         );
      69              : 
      70           48 :         assert!(backup_lsn >= start_lsn);
      71           48 :         assert!(commit_lsn >= start_lsn);
      72           48 :         assert!(flush_lsn >= start_lsn);
      73              : 
      74           48 :         if request.until_lsn > flush_lsn {
      75            0 :             bail!("requested LSN is beyond the end of the timeline");
      76           48 :         }
      77           48 :         if request.until_lsn < start_lsn {
      78            0 :             bail!("requested LSN is before the start of the timeline");
      79           48 :         }
      80           48 : 
      81           48 :         if request.until_lsn > commit_lsn {
      82            0 :             warn!("copy_timeline WAL is not fully committed");
      83           48 :         }
      84              : 
      85           48 :         if backup_lsn < request.until_lsn && request.until_lsn.0 - backup_lsn.0 > MAX_BACKUP_LAG {
      86              :             // we have a lot of segments that are not backed up. we can try to wait here until
      87              :             // segments will be backed up to remote storage, but it's not clear how long to wait
      88            0 :             bail!("too many segments are not backed up");
      89           48 :         }
      90           48 :     }
      91           48 : 
      92           48 :     let wal_seg_size = state.server.wal_seg_size as usize;
      93           48 :     if wal_seg_size == 0 {
      94            0 :         bail!("wal_seg_size is not set");
      95           48 :     }
      96           48 : 
      97           48 :     let first_segment = start_lsn.segment_number(wal_seg_size);
      98           48 :     let last_segment = request.until_lsn.segment_number(wal_seg_size);
      99              : 
     100           48 :     let new_backup_lsn = {
     101              :         // we can't have new backup_lsn greater than existing backup_lsn or start of the last segment
     102           48 :         let max_backup_lsn = backup_lsn.min(Lsn(last_segment * wal_seg_size as u64));
     103           48 : 
     104           48 :         if max_backup_lsn <= start_lsn {
     105              :             // probably we are starting from the first segment, which was not backed up yet.
     106              :             // note that start_lsn can be in the middle of the segment
     107           33 :             start_lsn
     108              :         } else {
     109              :             // we have some segments backed up, so we will assume all WAL below max_backup_lsn is backed up
     110           15 :             assert!(max_backup_lsn.segment_offset(wal_seg_size) == 0);
     111           15 :             max_backup_lsn
     112              :         }
     113              :     };
     114              : 
     115              :     // all previous segments will be copied inside S3
     116           48 :     let first_ondisk_segment = new_backup_lsn.segment_number(wal_seg_size);
     117           48 :     assert!(first_ondisk_segment <= last_segment);
     118           48 :     assert!(first_ondisk_segment >= first_segment);
     119              : 
     120           48 :     copy_s3_segments(
     121           48 :         wal_seg_size,
     122           48 :         &request.source.ttid,
     123           48 :         &request.destination_ttid,
     124           48 :         first_segment,
     125           48 :         first_ondisk_segment,
     126           48 :     )
     127          239 :     .await?;
     128              : 
     129           48 :     copy_disk_segments(
     130           48 :         conf,
     131           48 :         &state,
     132           48 :         wal_seg_size,
     133           48 :         &request.source.ttid,
     134           48 :         new_backup_lsn,
     135           48 :         request.until_lsn,
     136           48 :         &tli_dir_path,
     137           48 :     )
     138         2128 :     .await?;
     139              : 
     140           48 :     let mut new_state = TimelinePersistentState::new(
     141           48 :         &request.destination_ttid,
     142           48 :         state.server.clone(),
     143           48 :         vec![],
     144           48 :         request.until_lsn,
     145           48 :         start_lsn,
     146           48 :     );
     147           48 :     new_state.timeline_start_lsn = start_lsn;
     148           48 :     new_state.peer_horizon_lsn = request.until_lsn;
     149           48 :     new_state.backup_lsn = new_backup_lsn;
     150              : 
     151           48 :     let mut file_storage = FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone())?;
     152          144 :     file_storage.persist(&new_state).await?;
     153              : 
     154              :     // now we have a ready timeline in a temp directory
     155           48 :     validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
     156           95 :     load_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
     157              : 
     158           48 :     Ok(())
     159           48 : }
     160              : 
     161           48 : async fn copy_disk_segments(
     162           48 :     conf: &SafeKeeperConf,
     163           48 :     persisted_state: &TimelinePersistentState,
     164           48 :     wal_seg_size: usize,
     165           48 :     source_ttid: &TenantTimelineId,
     166           48 :     start_lsn: Lsn,
     167           48 :     end_lsn: Lsn,
     168           48 :     tli_dir_path: &Utf8PathBuf,
     169           48 : ) -> Result<()> {
     170           48 :     let mut wal_reader = WalReader::new(
     171           48 :         conf.workdir.clone(),
     172           48 :         conf.timeline_dir(source_ttid),
     173           48 :         persisted_state,
     174           48 :         start_lsn,
     175           48 :         true,
     176           48 :     )?;
     177              : 
     178           48 :     let mut buf = [0u8; MAX_SEND_SIZE];
     179           48 : 
     180           48 :     let first_segment = start_lsn.segment_number(wal_seg_size);
     181           48 :     let last_segment = end_lsn.segment_number(wal_seg_size);
     182              : 
     183           48 :     for segment in first_segment..=last_segment {
     184           48 :         let segment_start = segment * wal_seg_size as u64;
     185           48 :         let segment_end = segment_start + wal_seg_size as u64;
     186           48 : 
     187           48 :         let copy_start = segment_start.max(start_lsn.0);
     188           48 :         let copy_end = segment_end.min(end_lsn.0);
     189           48 : 
     190           48 :         let copy_start = copy_start - segment_start;
     191           48 :         let copy_end = copy_end - segment_start;
     192              : 
     193           48 :         let wal_file_path = {
     194           48 :             let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size)?;
     195              : 
     196           48 :             if segment == last_segment {
     197           48 :                 partial
     198              :             } else {
     199            0 :                 normal
     200              :             }
     201              :         };
     202              : 
     203           48 :         write_segment(
     204           48 :             &mut buf,
     205           48 :             &wal_file_path,
     206           48 :             wal_seg_size as u64,
     207           48 :             copy_start,
     208           48 :             copy_end,
     209           48 :             &mut wal_reader,
     210           48 :         )
     211         2128 :         .await?;
     212              :     }
     213              : 
     214           48 :     Ok(())
     215           48 : }
     216              : 
     217           48 : async fn write_segment(
     218           48 :     buf: &mut [u8],
     219           48 :     file_path: &Utf8PathBuf,
     220           48 :     wal_seg_size: u64,
     221           48 :     from: u64,
     222           48 :     to: u64,
     223           48 :     reader: &mut WalReader,
     224           48 : ) -> Result<()> {
     225           48 :     assert!(from <= to);
     226           48 :     assert!(to <= wal_seg_size);
     227              : 
     228           48 :     let mut file = OpenOptions::new()
     229           48 :         .create(true)
     230           48 :         .write(true)
     231           48 :         .open(&file_path)
     232           47 :         .await?;
     233              : 
     234              :     // maybe fill with zeros, as in wal_storage.rs?
     235           48 :     file.set_len(wal_seg_size).await?;
     236           48 :     file.seek(std::io::SeekFrom::Start(from)).await?;
     237              : 
     238           48 :     let mut bytes_left = to - from;
     239         1287 :     while bytes_left > 0 {
     240         1239 :         let len = bytes_left as usize;
     241         1239 :         let len = len.min(buf.len());
     242         1350 :         let len = reader.read(&mut buf[..len]).await?;
     243         1239 :         file.write_all(&buf[..len]).await?;
     244         1239 :         bytes_left -= len as u64;
     245              :     }
     246              : 
     247           48 :     file.flush().await?;
     248           48 :     file.sync_all().await?;
     249           48 :     Ok(())
     250           48 : }
        

Generated by: LCOV version 2.1-beta