LCOV - code coverage report
Current view: top level - safekeeper/src - pull_timeline.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 95.8 % 190 182
Test Date: 2024-02-12 20:26:03 Functions: 65.1 % 43 28

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use camino::Utf8PathBuf;
       4              : use camino_tempfile::Utf8TempDir;
       5              : use chrono::{DateTime, Utc};
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use anyhow::{bail, Context, Result};
       9              : use tokio::io::AsyncWriteExt;
      10              : use tracing::info;
      11              : use utils::{
      12              :     id::{TenantId, TenantTimelineId, TimelineId},
      13              :     lsn::Lsn,
      14              : };
      15              : 
      16              : use crate::{
      17              :     control_file, debug_dump,
      18              :     http::routes::TimelineStatus,
      19              :     timeline::{Timeline, TimelineError},
      20              :     wal_storage::{self, Storage},
      21              :     GlobalTimelines, SafeKeeperConf,
      22              : };
      23              : 
      24              : /// Info about timeline on safekeeper ready for reporting.
      25            7 : #[derive(Debug, Serialize, Deserialize)]
      26              : pub struct Request {
      27              :     pub tenant_id: TenantId,
      28              :     pub timeline_id: TimelineId,
      29              :     pub http_hosts: Vec<String>,
      30              : }
      31              : 
      32            1 : #[derive(Debug, Serialize)]
      33              : pub struct Response {
      34              :     // Donor safekeeper host
      35              :     pub safekeeper_host: String,
      36              :     // TODO: add more fields?
      37              : }
      38              : 
      39              : /// Response for debug dump request.
      40           11 : #[derive(Debug, Serialize, Deserialize)]
      41              : pub struct DebugDumpResponse {
      42              :     pub start_time: DateTime<Utc>,
      43              :     pub finish_time: DateTime<Utc>,
      44              :     pub timelines: Vec<debug_dump::Timeline>,
      45              :     pub timelines_count: usize,
      46              :     pub config: debug_dump::Config,
      47              : }
      48              : 
      49              : /// Find the most advanced safekeeper and pull timeline from it.
      50            1 : pub async fn handle_request(request: Request) -> Result<Response> {
      51            1 :     let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
      52            1 :         request.tenant_id,
      53            1 :         request.timeline_id,
      54            1 :     ));
      55            1 :     if existing_tli.is_ok() {
      56            0 :         bail!("Timeline {} already exists", request.timeline_id);
      57            1 :     }
      58            1 : 
      59            1 :     let client = reqwest::Client::new();
      60            1 :     let http_hosts = request.http_hosts.clone();
      61              : 
      62              :     // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
      63            2 :     let responses = futures::future::join_all(http_hosts.iter().map(|url| {
      64            2 :         let url = format!(
      65            2 :             "{}/v1/tenant/{}/timeline/{}",
      66            2 :             url, request.tenant_id, request.timeline_id
      67            2 :         );
      68            2 :         client.get(url).send()
      69            2 :     }))
      70            5 :     .await;
      71              : 
      72            1 :     let mut statuses = Vec::new();
      73            2 :     for (i, response) in responses.into_iter().enumerate() {
      74            2 :         let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
      75            2 :         let status: crate::http::routes::TimelineStatus = response.json().await?;
      76            2 :         statuses.push((status, i));
      77              :     }
      78              : 
      79              :     // Find the most advanced safekeeper
      80              :     // TODO: current logic may be wrong, fix it later
      81            1 :     let (status, i) = statuses
      82            1 :         .into_iter()
      83            2 :         .max_by_key(|(status, _)| {
      84            2 :             (
      85            2 :                 status.acceptor_state.epoch,
      86            2 :                 status.flush_lsn,
      87            2 :                 status.commit_lsn,
      88            2 :             )
      89            2 :         })
      90            1 :         .unwrap();
      91            1 :     let safekeeper_host = http_hosts[i].clone();
      92            1 : 
      93            1 :     assert!(status.tenant_id == request.tenant_id);
      94            1 :     assert!(status.timeline_id == request.timeline_id);
      95              : 
      96          108 :     pull_timeline(status, safekeeper_host).await
      97            1 : }
      98              : 
      99            1 : async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
     100            1 :     let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
     101            1 :     info!(
     102            1 :         "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
     103            1 :         ttid,
     104            1 :         host,
     105            1 :         status.commit_lsn,
     106            1 :         status.flush_lsn,
     107            1 :         status.acceptor_state.term,
     108            1 :         status.acceptor_state.epoch
     109            1 :     );
     110              : 
     111            1 :     let conf = &GlobalTimelines::get_global_config();
     112            1 : 
     113            1 :     let client = reqwest::Client::new();
     114              :     // TODO: don't use debug dump, it should be used only in tests.
     115              :     //      This is a proof of concept, we should figure out a way
     116              :     //      to use scp without implementing it manually.
     117              : 
     118              :     // Implementing our own scp over HTTP.
     119              :     // At first, we need to fetch list of files from safekeeper.
     120            1 :     let dump: DebugDumpResponse = client
     121            1 :         .get(format!(
     122            1 :             "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
     123            1 :             host, status.tenant_id, status.timeline_id
     124            1 :         ))
     125            1 :         .send()
     126            4 :         .await?
     127            1 :         .json()
     128            3 :         .await?;
     129              : 
     130            1 :     if dump.timelines.len() != 1 {
     131            0 :         bail!(
     132            0 :             "expected to fetch single timeline, got {} timelines",
     133            0 :             dump.timelines.len()
     134            0 :         );
     135            1 :     }
     136            1 : 
     137            1 :     let timeline = dump.timelines.into_iter().next().unwrap();
     138            1 :     let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
     139            1 :         "timeline {} doesn't have disk content",
     140            1 :         ttid
     141            1 :     ))?;
     142              : 
     143            1 :     let mut filenames = disk_content
     144            1 :         .files
     145            1 :         .iter()
     146            3 :         .map(|file| file.name.clone())
     147            1 :         .collect::<Vec<_>>();
     148            1 : 
     149            1 :     // Sort filenames to make sure we pull files in correct order
     150            1 :     // After sorting, we should have:
     151            1 :     // - 000000010000000000000001
     152            1 :     // - ...
     153            1 :     // - 000000010000000000000002.partial
     154            1 :     // - safekeeper.control
     155            1 :     filenames.sort();
     156              : 
     157              :     // safekeeper.control should be the first file, so we need to move it to the beginning
     158            1 :     let control_file_index = filenames
     159            1 :         .iter()
     160            3 :         .position(|name| name == "safekeeper.control")
     161            1 :         .ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
     162            1 :     filenames.remove(control_file_index);
     163            1 :     filenames.insert(0, "safekeeper.control".to_string());
     164              : 
     165            1 :     info!(
     166            1 :         "downloading {} files from safekeeper {}",
     167            1 :         filenames.len(),
     168            1 :         host
     169            1 :     );
     170              : 
     171            1 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
     172              : 
     173              :     // Note: some time happens between fetching list of files and fetching files themselves.
     174              :     //       It's possible that some files will be removed from safekeeper and we will fail to fetch them.
     175              :     //       This function will fail in this case, should be retried by the caller.
     176            4 :     for filename in filenames {
     177            3 :         let file_path = tli_dir_path.join(&filename);
     178            3 :         // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
     179            3 :         let http_url = format!(
     180            3 :             "{}/v1/tenant/{}/timeline/{}/file/{}",
     181            3 :             host, status.tenant_id, status.timeline_id, filename
     182            3 :         );
     183              : 
     184            3 :         let mut file = tokio::fs::File::create(&file_path).await?;
     185            3 :         let mut response = client.get(&http_url).send().await?;
     186           91 :         while let Some(chunk) = response.chunk().await? {
     187           88 :             file.write_all(&chunk).await?;
     188           88 :             file.flush().await?;
     189              :         }
     190              :     }
     191              : 
     192              :     // TODO: fsync?
     193              : 
     194              :     // Let's create timeline from temp directory and verify that it's correct
     195            1 :     let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
     196            1 :     info!(
     197            1 :         "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
     198            1 :         ttid, commit_lsn, flush_lsn
     199            1 :     );
     200            1 :     assert!(status.commit_lsn <= status.flush_lsn);
     201              : 
     202              :     // Finally, load the timeline.
     203            2 :     let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
     204              : 
     205            1 :     Ok(Response {
     206            1 :         safekeeper_host: host,
     207            1 :     })
     208            1 : }
     209              : 
     210              : /// Create temp directory for a new timeline. It needs to be located on the same
     211              : /// filesystem as the rest of the timelines. It will be automatically deleted when
     212              : /// Utf8TempDir goes out of scope.
     213           49 : pub async fn create_temp_timeline_dir(
     214           49 :     conf: &SafeKeeperConf,
     215           49 :     ttid: TenantTimelineId,
     216           49 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
     217              :     // conf.workdir is usually /storage/safekeeper/data
     218              :     // will try to transform it into /storage/safekeeper/tmp
     219           49 :     let temp_base = conf
     220           49 :         .workdir
     221           49 :         .parent()
     222           49 :         .ok_or(anyhow::anyhow!("workdir has no parent"))?
     223           49 :         .join("tmp");
     224           49 : 
     225           49 :     tokio::fs::create_dir_all(&temp_base).await?;
     226              : 
     227           49 :     let tli_dir = camino_tempfile::Builder::new()
     228           49 :         .suffix("_temptli")
     229           49 :         .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
     230           49 :         .tempdir_in(temp_base)?;
     231              : 
     232           49 :     let tli_dir_path = tli_dir.path().to_path_buf();
     233           49 : 
     234           49 :     Ok((tli_dir, tli_dir_path))
     235           49 : }
     236              : 
     237              : /// Do basic validation of a temp timeline, before moving it to the global map.
     238           49 : pub async fn validate_temp_timeline(
     239           49 :     conf: &SafeKeeperConf,
     240           49 :     ttid: TenantTimelineId,
     241           49 :     path: &Utf8PathBuf,
     242           49 : ) -> Result<(Lsn, Lsn)> {
     243           49 :     let control_path = path.join("safekeeper.control");
     244              : 
     245           49 :     let control_store = control_file::FileStorage::load_control_file(control_path)?;
     246           49 :     if control_store.server.wal_seg_size == 0 {
     247            0 :         bail!("wal_seg_size is not set");
     248           49 :     }
     249              : 
     250           49 :     let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
     251              : 
     252           49 :     let commit_lsn = control_store.commit_lsn;
     253           49 :     let flush_lsn = wal_store.flush_lsn();
     254           49 : 
     255           49 :     Ok((commit_lsn, flush_lsn))
     256           49 : }
     257              : 
     258              : /// Move timeline from a temp directory to the main storage, and load it to the global map.
     259              : /// This operation is done under a lock to prevent bugs if several concurrent requests are
     260              : /// trying to load the same timeline. Note that it doesn't guard against creating the
     261              : /// timeline with the same ttid, but no one should be doing this anyway.
     262           49 : pub async fn load_temp_timeline(
     263           49 :     conf: &SafeKeeperConf,
     264           49 :     ttid: TenantTimelineId,
     265           49 :     tmp_path: &Utf8PathBuf,
     266           49 : ) -> Result<Arc<Timeline>> {
     267              :     // Take a lock to prevent concurrent loadings
     268           49 :     let load_lock = GlobalTimelines::loading_lock().await;
     269           49 :     let guard = load_lock.lock().await;
     270              : 
     271           49 :     if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
     272            0 :         bail!("timeline already exists, cannot overwrite it")
     273           49 :     }
     274           49 : 
     275           49 :     // Move timeline dir to the correct location
     276           49 :     let timeline_path = conf.timeline_dir(&ttid);
     277              : 
     278           49 :     info!(
     279           49 :         "moving timeline {} from {} to {}",
     280           49 :         ttid, tmp_path, timeline_path
     281           49 :     );
     282           49 :     tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
     283           49 :     tokio::fs::rename(tmp_path, &timeline_path).await?;
     284              : 
     285           49 :     let tli = GlobalTimelines::load_timeline(&guard, ttid)
     286            0 :         .await
     287           49 :         .context("Failed to load timeline after copy")?;
     288              : 
     289           49 :     info!(
     290           49 :         "loaded timeline {}, flush_lsn={}",
     291           49 :         ttid,
     292           49 :         tli.get_flush_lsn().await
     293           49 :     );
     294              : 
     295           49 :     Ok(tli)
     296           49 : }
        

Generated by: LCOV version 2.1-beta