LCOV - code coverage report
Current view: top level - safekeeper/src - pull_timeline.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 95.0 % 160 152
Test Date: 2023-09-06 10:18:01 Functions: 61.3 % 31 19

            Line data    Source code
       1              : use serde::{Deserialize, Serialize};
       2              : 
       3              : use anyhow::{bail, Context, Result};
       4              : use tokio::io::AsyncWriteExt;
       5              : use tracing::info;
       6              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
       7              : 
       8              : use serde_with::{serde_as, DisplayFromStr};
       9              : 
      10              : use crate::{
      11              :     control_file, debug_dump,
      12              :     http::routes::TimelineStatus,
      13              :     wal_storage::{self, Storage},
      14              :     GlobalTimelines,
      15              : };
      16              : 
      17              : /// Info about timeline on safekeeper ready for reporting.
      18              : #[serde_as]
      19            9 : #[derive(Debug, Serialize, Deserialize)]
      20              : pub struct Request {
      21              :     #[serde_as(as = "DisplayFromStr")]
      22              :     pub tenant_id: TenantId,
      23              :     #[serde_as(as = "DisplayFromStr")]
      24              :     pub timeline_id: TimelineId,
      25              :     pub http_hosts: Vec<String>,
      26              : }
      27              : 
      28            1 : #[derive(Debug, Serialize)]
      29              : pub struct Response {
      30              :     // Donor safekeeper host
      31              :     pub safekeeper_host: String,
      32              :     // TODO: add more fields?
      33              : }
      34              : 
      35              : /// Find the most advanced safekeeper and pull timeline from it.
      36            1 : pub async fn handle_request(request: Request) -> Result<Response> {
      37            1 :     let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
      38            1 :         request.tenant_id,
      39            1 :         request.timeline_id,
      40            1 :     ));
      41            1 :     if existing_tli.is_ok() {
      42            0 :         bail!("Timeline {} already exists", request.timeline_id);
      43            1 :     }
      44            1 : 
      45            1 :     let client = reqwest::Client::new();
      46            1 :     let http_hosts = request.http_hosts.clone();
      47              : 
      48              :     // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
      49            2 :     let responses = futures::future::join_all(http_hosts.iter().map(|url| {
      50            2 :         let url = format!(
      51            2 :             "{}/v1/tenant/{}/timeline/{}",
      52            2 :             url, request.tenant_id, request.timeline_id
      53            2 :         );
      54            2 :         client.get(url).send()
      55            2 :     }))
      56            5 :     .await;
      57              : 
      58            1 :     let mut statuses = Vec::new();
      59            2 :     for (i, response) in responses.into_iter().enumerate() {
      60            2 :         let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
      61            2 :         let status: crate::http::routes::TimelineStatus = response.json().await?;
      62            2 :         statuses.push((status, i));
      63              :     }
      64              : 
      65              :     // Find the most advanced safekeeper
      66              :     // TODO: current logic may be wrong, fix it later
      67            1 :     let (status, i) = statuses
      68            1 :         .into_iter()
      69            2 :         .max_by_key(|(status, _)| {
      70            2 :             (
      71            2 :                 status.acceptor_state.epoch,
      72            2 :                 status.flush_lsn,
      73            2 :                 status.commit_lsn,
      74            2 :             )
      75            2 :         })
      76            1 :         .unwrap();
      77            1 :     let safekeeper_host = http_hosts[i].clone();
      78            1 : 
      79            1 :     assert!(status.tenant_id == request.tenant_id);
      80            1 :     assert!(status.timeline_id == request.timeline_id);
      81              : 
      82          102 :     pull_timeline(status, safekeeper_host).await
      83            1 : }
      84              : 
      85            1 : async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
      86            1 :     let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
      87            1 :     info!(
      88            1 :         "Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
      89            1 :         ttid,
      90            1 :         host,
      91            1 :         status.commit_lsn,
      92            1 :         status.flush_lsn,
      93            1 :         status.acceptor_state.term,
      94            1 :         status.acceptor_state.epoch
      95            1 :     );
      96              : 
      97            1 :     let conf = &GlobalTimelines::get_global_config();
      98            1 : 
      99            1 :     let client = reqwest::Client::new();
     100              :     // TODO: don't use debug dump, it should be used only in tests.
     101              :     //      This is a proof of concept, we should figure out a way
     102              :     //      to use scp without implementing it manually.
     103              : 
     104              :     // Implementing our own scp over HTTP.
     105              :     // At first, we need to fetch list of files from safekeeper.
     106            1 :     let dump: debug_dump::Response = client
     107            1 :         .get(format!(
     108            1 :             "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
     109            1 :             host, status.tenant_id, status.timeline_id
     110            1 :         ))
     111            1 :         .send()
     112            4 :         .await?
     113            1 :         .json()
     114            0 :         .await?;
     115              : 
     116            1 :     if dump.timelines.len() != 1 {
     117            0 :         bail!(
     118            0 :             "Expected to fetch single timeline, got {} timelines",
     119            0 :             dump.timelines.len()
     120            0 :         );
     121            1 :     }
     122            1 : 
     123            1 :     let timeline = dump.timelines.into_iter().next().unwrap();
     124            1 :     let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
     125            1 :         "Timeline {} doesn't have disk content",
     126            1 :         ttid
     127            1 :     ))?;
     128              : 
     129            1 :     let mut filenames = disk_content
     130            1 :         .files
     131            1 :         .iter()
     132            3 :         .map(|file| file.name.clone())
     133            1 :         .collect::<Vec<_>>();
     134            1 : 
     135            1 :     // Sort filenames to make sure we pull files in correct order
     136            1 :     // After sorting, we should have:
     137            1 :     // - 000000010000000000000001
     138            1 :     // - ...
     139            1 :     // - 000000010000000000000002.partial
     140            1 :     // - safekeeper.control
     141            1 :     filenames.sort();
     142              : 
     143              :     // safekeeper.control should be the first file, so we need to move it to the beginning
     144            1 :     let control_file_index = filenames
     145            1 :         .iter()
     146            3 :         .position(|name| name == "safekeeper.control")
     147            1 :         .ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
     148            1 :     filenames.remove(control_file_index);
     149            1 :     filenames.insert(0, "safekeeper.control".to_string());
     150              : 
     151            1 :     info!(
     152            1 :         "Downloading {} files from safekeeper {}",
     153            1 :         filenames.len(),
     154            1 :         host
     155            1 :     );
     156              : 
     157              :     // Creating temp directory for a new timeline. It needs to be
     158              :     // located on the same filesystem as the rest of the timelines.
     159              : 
     160              :     // conf.workdir is usually /storage/safekeeper/data
     161              :     // will try to transform it into /storage/safekeeper/tmp
     162            1 :     let temp_base = conf
     163            1 :         .workdir
     164            1 :         .parent()
     165            1 :         .ok_or(anyhow::anyhow!("workdir has no parent"))?
     166            1 :         .join("tmp");
     167            1 : 
     168            1 :     tokio::fs::create_dir_all(&temp_base).await?;
     169              : 
     170            1 :     let tli_dir = tempfile::Builder::new()
     171            1 :         .suffix("_temptli")
     172            1 :         .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
     173            1 :         .tempdir_in(temp_base)?;
     174            1 :     let tli_dir_path = tli_dir.path().to_owned();
     175              : 
     176              :     // Note: some time happens between fetching list of files and fetching files themselves.
     177              :     //       It's possible that some files will be removed from safekeeper and we will fail to fetch them.
     178              :     //       This function will fail in this case, should be retried by the caller.
     179            4 :     for filename in filenames {
     180            3 :         let file_path = tli_dir_path.join(&filename);
     181            3 :         // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
     182            3 :         let http_url = format!(
     183            3 :             "{}/v1/tenant/{}/timeline/{}/file/{}",
     184            3 :             host, status.tenant_id, status.timeline_id, filename
     185            3 :         );
     186              : 
     187            3 :         let mut file = tokio::fs::File::create(&file_path).await?;
     188            3 :         let mut response = client.get(&http_url).send().await?;
     189           91 :         while let Some(chunk) = response.chunk().await? {
     190           88 :             file.write_all(&chunk).await?;
     191           88 :             file.flush().await?;
     192              :         }
     193              :     }
     194              : 
     195              :     // TODO: fsync?
     196              : 
     197              :     // Let's create timeline from temp directory and verify that it's correct
     198              : 
     199            1 :     let control_path = tli_dir_path.join("safekeeper.control");
     200              : 
     201            1 :     let control_store = control_file::FileStorage::load_control_file(control_path)?;
     202            1 :     if control_store.server.wal_seg_size == 0 {
     203            0 :         bail!("wal_seg_size is not set");
     204            1 :     }
     205              : 
     206            1 :     let wal_store =
     207            1 :         wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?;
     208              : 
     209            1 :     let commit_lsn = status.commit_lsn;
     210            1 :     let flush_lsn = wal_store.flush_lsn();
     211              : 
     212            1 :     info!(
     213            1 :         "Finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
     214            1 :         ttid, commit_lsn, flush_lsn
     215            1 :     );
     216            1 :     assert!(status.commit_lsn <= status.flush_lsn);
     217              : 
     218              :     // Move timeline dir to the correct location
     219            1 :     let timeline_path = conf.timeline_dir(&ttid);
     220              : 
     221            1 :     info!(
     222            1 :         "Moving timeline {} from {} to {}",
     223            1 :         ttid,
     224            1 :         tli_dir_path.display(),
     225            1 :         timeline_path.display()
     226            1 :     );
     227            1 :     tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
     228            1 :     tokio::fs::rename(tli_dir_path, &timeline_path).await?;
     229              : 
     230            1 :     let tli = GlobalTimelines::load_timeline(ttid)
     231            0 :         .await
     232            1 :         .context("Failed to load timeline after copy")?;
     233              : 
     234            1 :     info!(
     235            1 :         "Loaded timeline {}, flush_lsn={}",
     236            1 :         ttid,
     237            1 :         tli.get_flush_lsn().await
     238            1 :     );
     239              : 
     240            1 :     Ok(Response {
     241            1 :         safekeeper_host: host,
     242            1 :     })
     243            1 : }
        

Generated by: LCOV version 2.1-beta