LCOV - differential code coverage report
Current view: top level - safekeeper/src - pull_timeline.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 95.8 % 190 182 8 182
Current Date: 2024-01-09 02:06:09 Functions: 65.1 % 43 28 15 28
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::sync::Arc;
       2                 : 
       3                 : use camino::Utf8PathBuf;
       4                 : use camino_tempfile::Utf8TempDir;
       5                 : use chrono::{DateTime, Utc};
       6                 : use serde::{Deserialize, Serialize};
       7                 : 
       8                 : use anyhow::{bail, Context, Result};
       9                 : use tokio::io::AsyncWriteExt;
      10                 : use tracing::info;
      11                 : use utils::{
      12                 :     id::{TenantId, TenantTimelineId, TimelineId},
      13                 :     lsn::Lsn,
      14                 : };
      15                 : 
      16                 : use crate::{
      17                 :     control_file, debug_dump,
      18                 :     http::routes::TimelineStatus,
      19                 :     timeline::{Timeline, TimelineError},
      20                 :     wal_storage::{self, Storage},
      21                 :     GlobalTimelines, SafeKeeperConf,
      22                 : };
      23                 : 
      24                 : /// Info about timeline on safekeeper ready for reporting.
      25 CBC           7 : #[derive(Debug, Serialize, Deserialize)]
      26                 : pub struct Request {
      27                 :     pub tenant_id: TenantId,
      28                 :     pub timeline_id: TimelineId,
      29                 :     pub http_hosts: Vec<String>,
      30                 : }
      31                 : 
      32               1 : #[derive(Debug, Serialize)]
      33                 : pub struct Response {
      34                 :     // Donor safekeeper host
      35                 :     pub safekeeper_host: String,
      36                 :     // TODO: add more fields?
      37                 : }
      38                 : 
      39                 : /// Response for debug dump request.
      40              11 : #[derive(Debug, Serialize, Deserialize)]
      41                 : pub struct DebugDumpResponse {
      42                 :     pub start_time: DateTime<Utc>,
      43                 :     pub finish_time: DateTime<Utc>,
      44                 :     pub timelines: Vec<debug_dump::Timeline>,
      45                 :     pub timelines_count: usize,
      46                 :     pub config: debug_dump::Config,
      47                 : }
      48                 : 
      49                 : /// Find the most advanced safekeeper and pull timeline from it.
      50               1 : pub async fn handle_request(request: Request) -> Result<Response> {
      51               1 :     let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
      52               1 :         request.tenant_id,
      53               1 :         request.timeline_id,
      54               1 :     ));
      55               1 :     if existing_tli.is_ok() {
      56 UBC           0 :         bail!("Timeline {} already exists", request.timeline_id);
      57 CBC           1 :     }
      58               1 : 
      59               1 :     let client = reqwest::Client::new();
      60               1 :     let http_hosts = request.http_hosts.clone();
      61                 : 
      62                 :     // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
      63               2 :     let responses = futures::future::join_all(http_hosts.iter().map(|url| {
      64               2 :         let url = format!(
      65               2 :             "{}/v1/tenant/{}/timeline/{}",
      66               2 :             url, request.tenant_id, request.timeline_id
      67               2 :         );
      68               2 :         client.get(url).send()
      69               2 :     }))
      70               6 :     .await;
      71                 : 
      72               1 :     let mut statuses = Vec::new();
      73               2 :     for (i, response) in responses.into_iter().enumerate() {
      74               2 :         let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
      75               2 :         let status: crate::http::routes::TimelineStatus = response.json().await?;
      76               2 :         statuses.push((status, i));
      77                 :     }
      78                 : 
      79                 :     // Find the most advanced safekeeper
      80                 :     // TODO: current logic may be wrong, fix it later
      81               1 :     let (status, i) = statuses
      82               1 :         .into_iter()
      83               2 :         .max_by_key(|(status, _)| {
      84               2 :             (
      85               2 :                 status.acceptor_state.epoch,
      86               2 :                 status.flush_lsn,
      87               2 :                 status.commit_lsn,
      88               2 :             )
      89               2 :         })
      90               1 :         .unwrap();
      91               1 :     let safekeeper_host = http_hosts[i].clone();
      92               1 : 
      93               1 :     assert!(status.tenant_id == request.tenant_id);
      94               1 :     assert!(status.timeline_id == request.timeline_id);
      95                 : 
      96             106 :     pull_timeline(status, safekeeper_host).await
      97               1 : }
      98                 : 
      99               1 : async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
     100               1 :     let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
     101               1 :     info!(
     102               1 :         "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
     103               1 :         ttid,
     104               1 :         host,
     105               1 :         status.commit_lsn,
     106               1 :         status.flush_lsn,
     107               1 :         status.acceptor_state.term,
     108               1 :         status.acceptor_state.epoch
     109               1 :     );
     110                 : 
     111               1 :     let conf = &GlobalTimelines::get_global_config();
     112               1 : 
     113               1 :     let client = reqwest::Client::new();
     114                 :     // TODO: don't use debug dump, it should be used only in tests.
     115                 :     //      This is a proof of concept, we should figure out a way
     116                 :     //      to use scp without implementing it manually.
     117                 : 
     118                 :     // Implementing our own scp over HTTP.
     119                 :     // At first, we need to fetch list of files from safekeeper.
     120               1 :     let dump: DebugDumpResponse = client
     121               1 :         .get(format!(
     122               1 :             "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
     123               1 :             host, status.tenant_id, status.timeline_id
     124               1 :         ))
     125               1 :         .send()
     126               4 :         .await?
     127               1 :         .json()
     128               2 :         .await?;
     129                 : 
     130               1 :     if dump.timelines.len() != 1 {
     131 UBC           0 :         bail!(
     132               0 :             "expected to fetch single timeline, got {} timelines",
     133               0 :             dump.timelines.len()
     134               0 :         );
     135 CBC           1 :     }
     136               1 : 
     137               1 :     let timeline = dump.timelines.into_iter().next().unwrap();
     138               1 :     let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
     139               1 :         "timeline {} doesn't have disk content",
     140               1 :         ttid
     141               1 :     ))?;
     142                 : 
     143               1 :     let mut filenames = disk_content
     144               1 :         .files
     145               1 :         .iter()
     146               3 :         .map(|file| file.name.clone())
     147               1 :         .collect::<Vec<_>>();
     148               1 : 
     149               1 :     // Sort filenames to make sure we pull files in correct order
     150               1 :     // After sorting, we should have:
     151               1 :     // - 000000010000000000000001
     152               1 :     // - ...
     153               1 :     // - 000000010000000000000002.partial
     154               1 :     // - safekeeper.control
     155               1 :     filenames.sort();
     156                 : 
     157                 :     // safekeeper.control should be the first file, so we need to move it to the beginning
     158               1 :     let control_file_index = filenames
     159               1 :         .iter()
     160               3 :         .position(|name| name == "safekeeper.control")
     161               1 :         .ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
     162               1 :     filenames.remove(control_file_index);
     163               1 :     filenames.insert(0, "safekeeper.control".to_string());
     164                 : 
     165               1 :     info!(
     166               1 :         "downloading {} files from safekeeper {}",
     167               1 :         filenames.len(),
     168               1 :         host
     169               1 :     );
     170                 : 
     171               1 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
     172                 : 
     173                 :     // Note: some time happens between fetching list of files and fetching files themselves.
     174                 :     //       It's possible that some files will be removed from safekeeper and we will fail to fetch them.
     175                 :     //       This function will fail in this case, should be retried by the caller.
     176               4 :     for filename in filenames {
     177               3 :         let file_path = tli_dir_path.join(&filename);
     178               3 :         // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
     179               3 :         let http_url = format!(
     180               3 :             "{}/v1/tenant/{}/timeline/{}/file/{}",
     181               3 :             host, status.tenant_id, status.timeline_id, filename
     182               3 :         );
     183                 : 
     184               3 :         let mut file = tokio::fs::File::create(&file_path).await?;
     185               3 :         let mut response = client.get(&http_url).send().await?;
     186              91 :         while let Some(chunk) = response.chunk().await? {
     187              88 :             file.write_all(&chunk).await?;
     188              88 :             file.flush().await?;
     189                 :         }
     190                 :     }
     191                 : 
     192                 :     // TODO: fsync?
     193                 : 
     194                 :     // Let's create timeline from temp directory and verify that it's correct
     195               1 :     let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
     196               1 :     info!(
     197               1 :         "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
     198               1 :         ttid, commit_lsn, flush_lsn
     199               1 :     );
     200               1 :     assert!(status.commit_lsn <= status.flush_lsn);
     201                 : 
     202                 :     // Finally, load the timeline.
     203               2 :     let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
     204                 : 
     205               1 :     Ok(Response {
     206               1 :         safekeeper_host: host,
     207               1 :     })
     208               1 : }
     209                 : 
     210                 : /// Create temp directory for a new timeline. It needs to be located on the same
     211                 : /// filesystem as the rest of the timelines. It will be automatically deleted when
     212                 : /// Utf8TempDir goes out of scope.
     213              49 : pub async fn create_temp_timeline_dir(
     214              49 :     conf: &SafeKeeperConf,
     215              49 :     ttid: TenantTimelineId,
     216              49 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
     217                 :     // conf.workdir is usually /storage/safekeeper/data
     218                 :     // will try to transform it into /storage/safekeeper/tmp
     219              49 :     let temp_base = conf
     220              49 :         .workdir
     221              49 :         .parent()
     222              49 :         .ok_or(anyhow::anyhow!("workdir has no parent"))?
     223              49 :         .join("tmp");
     224              49 : 
     225              49 :     tokio::fs::create_dir_all(&temp_base).await?;
     226                 : 
     227              49 :     let tli_dir = camino_tempfile::Builder::new()
     228              49 :         .suffix("_temptli")
     229              49 :         .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
     230              49 :         .tempdir_in(temp_base)?;
     231                 : 
     232              49 :     let tli_dir_path = tli_dir.path().to_path_buf();
     233              49 : 
     234              49 :     Ok((tli_dir, tli_dir_path))
     235              49 : }
     236                 : 
     237                 : /// Do basic validation of a temp timeline, before moving it to the global map.
     238              49 : pub async fn validate_temp_timeline(
     239              49 :     conf: &SafeKeeperConf,
     240              49 :     ttid: TenantTimelineId,
     241              49 :     path: &Utf8PathBuf,
     242              49 : ) -> Result<(Lsn, Lsn)> {
     243              49 :     let control_path = path.join("safekeeper.control");
     244                 : 
     245              49 :     let control_store = control_file::FileStorage::load_control_file(control_path)?;
     246              49 :     if control_store.server.wal_seg_size == 0 {
     247 UBC           0 :         bail!("wal_seg_size is not set");
     248 CBC          49 :     }
     249                 : 
     250              49 :     let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
     251                 : 
     252              49 :     let commit_lsn = control_store.commit_lsn;
     253              49 :     let flush_lsn = wal_store.flush_lsn();
     254              49 : 
     255              49 :     Ok((commit_lsn, flush_lsn))
     256              49 : }
     257                 : 
     258                 : /// Move timeline from a temp directory to the main storage, and load it to the global map.
     259                 : /// This operation is done under a lock to prevent bugs if several concurrent requests are
     260                 : /// trying to load the same timeline. Note that it doesn't guard against creating the
     261                 : /// timeline with the same ttid, but no one should be doing this anyway.
     262              49 : pub async fn load_temp_timeline(
     263              49 :     conf: &SafeKeeperConf,
     264              49 :     ttid: TenantTimelineId,
     265              49 :     tmp_path: &Utf8PathBuf,
     266              49 : ) -> Result<Arc<Timeline>> {
     267                 :     // Take a lock to prevent concurrent loadings
     268              49 :     let load_lock = GlobalTimelines::loading_lock().await;
     269              49 :     let guard = load_lock.lock().await;
     270                 : 
     271              49 :     if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
     272 UBC           0 :         bail!("timeline already exists, cannot overwrite it")
     273 CBC          49 :     }
     274              49 : 
     275              49 :     // Move timeline dir to the correct location
     276              49 :     let timeline_path = conf.timeline_dir(&ttid);
     277                 : 
     278              49 :     info!(
     279              49 :         "moving timeline {} from {} to {}",
     280              49 :         ttid, tmp_path, timeline_path
     281              49 :     );
     282              49 :     tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
     283              49 :     tokio::fs::rename(tmp_path, &timeline_path).await?;
     284                 : 
     285              49 :     let tli = GlobalTimelines::load_timeline(&guard, ttid)
     286 UBC           0 :         .await
     287 CBC          49 :         .context("Failed to load timeline after copy")?;
     288                 : 
     289              49 :     info!(
     290              49 :         "loaded timeline {}, flush_lsn={}",
     291              49 :         ttid,
     292              49 :         tli.get_flush_lsn().await
     293              49 :     );
     294                 : 
     295              49 :     Ok(tli)
     296              49 : }
        

Generated by: LCOV version 2.1-beta