LCOV - differential code coverage report
Current view: top level - safekeeper/src - pull_timeline.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 94.9 % 158 150 8 150
Current Date: 2023-10-19 02:04:12 Functions: 61.3 % 31 19 12 19
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use serde::{Deserialize, Serialize};
       2                 : 
       3                 : use anyhow::{bail, Context, Result};
       4                 : use tokio::io::AsyncWriteExt;
       5                 : use tracing::info;
       6                 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
       7                 : 
       8                 : use serde_with::{serde_as, DisplayFromStr};
       9                 : 
      10                 : use crate::{
      11                 :     control_file, debug_dump,
      12                 :     http::routes::TimelineStatus,
      13                 :     wal_storage::{self, Storage},
      14                 :     GlobalTimelines,
      15                 : };
      16                 : 
      17                 : /// Info about timeline on safekeeper ready for reporting.
      18                 : #[serde_as]
      19 CBC           9 : #[derive(Debug, Serialize, Deserialize)]
      20                 : pub struct Request {
      21                 :     #[serde_as(as = "DisplayFromStr")]
      22                 :     pub tenant_id: TenantId,
      23                 :     #[serde_as(as = "DisplayFromStr")]
      24                 :     pub timeline_id: TimelineId,
      25                 :     pub http_hosts: Vec<String>,
      26                 : }
      27                 : 
      28               1 : #[derive(Debug, Serialize)]
      29                 : pub struct Response {
      30                 :     // Donor safekeeper host
      31                 :     pub safekeeper_host: String,
      32                 :     // TODO: add more fields?
      33                 : }
      34                 : 
      35                 : /// Find the most advanced safekeeper and pull timeline from it.
      36               1 : pub async fn handle_request(request: Request) -> Result<Response> {
      37               1 :     let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
      38               1 :         request.tenant_id,
      39               1 :         request.timeline_id,
      40               1 :     ));
      41               1 :     if existing_tli.is_ok() {
      42 UBC           0 :         bail!("Timeline {} already exists", request.timeline_id);
      43 CBC           1 :     }
      44               1 : 
      45               1 :     let client = reqwest::Client::new();
      46               1 :     let http_hosts = request.http_hosts.clone();
      47                 : 
      48                 :     // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
      49               2 :     let responses = futures::future::join_all(http_hosts.iter().map(|url| {
      50               2 :         let url = format!(
      51               2 :             "{}/v1/tenant/{}/timeline/{}",
      52               2 :             url, request.tenant_id, request.timeline_id
      53               2 :         );
      54               2 :         client.get(url).send()
      55               2 :     }))
      56               4 :     .await;
      57                 : 
      58               1 :     let mut statuses = Vec::new();
      59               2 :     for (i, response) in responses.into_iter().enumerate() {
      60               2 :         let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
      61               2 :         let status: crate::http::routes::TimelineStatus = response.json().await?;
      62               2 :         statuses.push((status, i));
      63                 :     }
      64                 : 
      65                 :     // Find the most advanced safekeeper
      66                 :     // TODO: current logic may be wrong, fix it later
      67               1 :     let (status, i) = statuses
      68               1 :         .into_iter()
      69               2 :         .max_by_key(|(status, _)| {
      70               2 :             (
      71               2 :                 status.acceptor_state.epoch,
      72               2 :                 status.flush_lsn,
      73               2 :                 status.commit_lsn,
      74               2 :             )
      75               2 :         })
      76               1 :         .unwrap();
      77               1 :     let safekeeper_host = http_hosts[i].clone();
      78               1 : 
      79               1 :     assert!(status.tenant_id == request.tenant_id);
      80               1 :     assert!(status.timeline_id == request.timeline_id);
      81                 : 
      82             109 :     pull_timeline(status, safekeeper_host).await
      83               1 : }
      84                 : 
      85               1 : async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
      86               1 :     let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
      87               1 :     info!(
      88               1 :         "Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
      89               1 :         ttid,
      90               1 :         host,
      91               1 :         status.commit_lsn,
      92               1 :         status.flush_lsn,
      93               1 :         status.acceptor_state.term,
      94               1 :         status.acceptor_state.epoch
      95               1 :     );
      96                 : 
      97               1 :     let conf = &GlobalTimelines::get_global_config();
      98               1 : 
      99               1 :     let client = reqwest::Client::new();
     100                 :     // TODO: don't use debug dump, it should be used only in tests.
     101                 :     //      This is a proof of concept, we should figure out a way
     102                 :     //      to use scp without implementing it manually.
     103                 : 
     104                 :     // Implementing our own scp over HTTP.
     105                 :     // At first, we need to fetch list of files from safekeeper.
     106               1 :     let dump: debug_dump::Response = client
     107               1 :         .get(format!(
     108               1 :             "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
     109               1 :             host, status.tenant_id, status.timeline_id
     110               1 :         ))
     111               1 :         .send()
     112               5 :         .await?
     113               1 :         .json()
     114 UBC           0 :         .await?;
     115                 : 
     116 CBC           1 :     if dump.timelines.len() != 1 {
     117 UBC           0 :         bail!(
     118               0 :             "Expected to fetch single timeline, got {} timelines",
     119               0 :             dump.timelines.len()
     120               0 :         );
     121 CBC           1 :     }
     122               1 : 
     123               1 :     let timeline = dump.timelines.into_iter().next().unwrap();
     124               1 :     let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
     125               1 :         "Timeline {} doesn't have disk content",
     126               1 :         ttid
     127               1 :     ))?;
     128                 : 
     129               1 :     let mut filenames = disk_content
     130               1 :         .files
     131               1 :         .iter()
     132               3 :         .map(|file| file.name.clone())
     133               1 :         .collect::<Vec<_>>();
     134               1 : 
     135               1 :     // Sort filenames to make sure we pull files in correct order
     136               1 :     // After sorting, we should have:
     137               1 :     // - 000000010000000000000001
     138               1 :     // - ...
     139               1 :     // - 000000010000000000000002.partial
     140               1 :     // - safekeeper.control
     141               1 :     filenames.sort();
     142                 : 
     143                 :     // safekeeper.control should be the first file, so we need to move it to the beginning
     144               1 :     let control_file_index = filenames
     145               1 :         .iter()
     146               3 :         .position(|name| name == "safekeeper.control")
     147               1 :         .ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
     148               1 :     filenames.remove(control_file_index);
     149               1 :     filenames.insert(0, "safekeeper.control".to_string());
     150                 : 
     151               1 :     info!(
     152               1 :         "Downloading {} files from safekeeper {}",
     153               1 :         filenames.len(),
     154               1 :         host
     155               1 :     );
     156                 : 
     157                 :     // Creating temp directory for a new timeline. It needs to be
     158                 :     // located on the same filesystem as the rest of the timelines.
     159                 : 
     160                 :     // conf.workdir is usually /storage/safekeeper/data
     161                 :     // will try to transform it into /storage/safekeeper/tmp
     162               1 :     let temp_base = conf
     163               1 :         .workdir
     164               1 :         .parent()
     165               1 :         .ok_or(anyhow::anyhow!("workdir has no parent"))?
     166               1 :         .join("tmp");
     167               1 : 
     168               1 :     tokio::fs::create_dir_all(&temp_base).await?;
     169                 : 
     170               1 :     let tli_dir = camino_tempfile::Builder::new()
     171               1 :         .suffix("_temptli")
     172               1 :         .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
     173               1 :         .tempdir_in(temp_base)?;
     174               1 :     let tli_dir_path = tli_dir.path().to_path_buf();
     175                 : 
     176                 :     // Note: some time happens between fetching list of files and fetching files themselves.
     177                 :     //       It's possible that some files will be removed from safekeeper and we will fail to fetch them.
     178                 :     //       This function will fail in this case, should be retried by the caller.
     179               4 :     for filename in filenames {
     180               3 :         let file_path = tli_dir_path.join(&filename);
     181               3 :         // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
     182               3 :         let http_url = format!(
     183               3 :             "{}/v1/tenant/{}/timeline/{}/file/{}",
     184               3 :             host, status.tenant_id, status.timeline_id, filename
     185               3 :         );
     186                 : 
     187               3 :         let mut file = tokio::fs::File::create(&file_path).await?;
     188               3 :         let mut response = client.get(&http_url).send().await?;
     189              93 :         while let Some(chunk) = response.chunk().await? {
     190              90 :             file.write_all(&chunk).await?;
     191              90 :             file.flush().await?;
     192                 :         }
     193                 :     }
     194                 : 
     195                 :     // TODO: fsync?
     196                 : 
     197                 :     // Let's create timeline from temp directory and verify that it's correct
     198                 : 
     199               1 :     let control_path = tli_dir_path.join("safekeeper.control");
     200                 : 
     201               1 :     let control_store = control_file::FileStorage::load_control_file(control_path)?;
     202               1 :     if control_store.server.wal_seg_size == 0 {
     203 UBC           0 :         bail!("wal_seg_size is not set");
     204 CBC           1 :     }
     205                 : 
     206               1 :     let wal_store =
     207               1 :         wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?;
     208                 : 
     209               1 :     let commit_lsn = status.commit_lsn;
     210               1 :     let flush_lsn = wal_store.flush_lsn();
     211                 : 
     212               1 :     info!(
     213               1 :         "Finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
     214               1 :         ttid, commit_lsn, flush_lsn
     215               1 :     );
     216               1 :     assert!(status.commit_lsn <= status.flush_lsn);
     217                 : 
     218                 :     // Move timeline dir to the correct location
     219               1 :     let timeline_path = conf.timeline_dir(&ttid);
     220                 : 
     221               1 :     info!(
     222               1 :         "Moving timeline {} from {} to {}",
     223               1 :         ttid, tli_dir_path, timeline_path
     224               1 :     );
     225               1 :     tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
     226               1 :     tokio::fs::rename(tli_dir_path, &timeline_path).await?;
     227                 : 
     228               1 :     let tli = GlobalTimelines::load_timeline(ttid)
     229 UBC           0 :         .await
     230 CBC           1 :         .context("Failed to load timeline after copy")?;
     231                 : 
     232               1 :     info!(
     233               1 :         "Loaded timeline {}, flush_lsn={}",
     234               1 :         ttid,
     235               1 :         tli.get_flush_lsn().await
     236               1 :     );
     237                 : 
     238               1 :     Ok(Response {
     239               1 :         safekeeper_host: host,
     240               1 :     })
     241               1 : }
        

Generated by: LCOV version 2.1-beta