LCOV - code coverage report
Current view: top level - safekeeper/src - pull_timeline.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 0.0 % 177 0
Test Date: 2024-05-10 13:18:37 Functions: 0.0 % 30 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use camino::Utf8PathBuf;
       4              : use camino_tempfile::Utf8TempDir;
       5              : use chrono::{DateTime, Utc};
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use anyhow::{bail, Context, Result};
       9              : use tokio::io::AsyncWriteExt;
      10              : use tracing::info;
      11              : use utils::{
      12              :     id::{TenantId, TenantTimelineId, TimelineId},
      13              :     lsn::Lsn,
      14              : };
      15              : 
      16              : use crate::{
      17              :     control_file, debug_dump,
      18              :     http::routes::TimelineStatus,
      19              :     timeline::{Timeline, TimelineError},
      20              :     wal_storage::{self, Storage},
      21              :     GlobalTimelines, SafeKeeperConf,
      22              : };
      23              : 
      24              : /// Info about timeline on safekeeper ready for reporting.
      25            0 : #[derive(Debug, Serialize, Deserialize)]
      26              : pub struct Request {
      27              :     pub tenant_id: TenantId,
      28              :     pub timeline_id: TimelineId,
      29              :     pub http_hosts: Vec<String>,
      30              : }
      31              : 
      32              : #[derive(Debug, Serialize)]
      33              : pub struct Response {
      34              :     // Donor safekeeper host
      35              :     pub safekeeper_host: String,
      36              :     // TODO: add more fields?
      37              : }
      38              : 
      39              : /// Response for debug dump request.
      40            0 : #[derive(Debug, Serialize, Deserialize)]
      41              : pub struct DebugDumpResponse {
      42              :     pub start_time: DateTime<Utc>,
      43              :     pub finish_time: DateTime<Utc>,
      44              :     pub timelines: Vec<debug_dump::Timeline>,
      45              :     pub timelines_count: usize,
      46              :     pub config: debug_dump::Config,
      47              : }
      48              : 
      49              : /// Find the most advanced safekeeper and pull timeline from it.
      50            0 : pub async fn handle_request(request: Request) -> Result<Response> {
      51            0 :     let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
      52            0 :         request.tenant_id,
      53            0 :         request.timeline_id,
      54            0 :     ));
      55            0 :     if existing_tli.is_ok() {
      56            0 :         bail!("Timeline {} already exists", request.timeline_id);
      57            0 :     }
      58            0 : 
      59            0 :     let client = reqwest::Client::new();
      60            0 :     let http_hosts = request.http_hosts.clone();
      61              : 
      62              :     // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
      63            0 :     let responses = futures::future::join_all(http_hosts.iter().map(|url| {
      64            0 :         let url = format!(
      65            0 :             "{}/v1/tenant/{}/timeline/{}",
      66            0 :             url, request.tenant_id, request.timeline_id
      67            0 :         );
      68            0 :         client.get(url).send()
      69            0 :     }))
      70            0 :     .await;
      71              : 
      72            0 :     let mut statuses = Vec::new();
      73            0 :     for (i, response) in responses.into_iter().enumerate() {
      74            0 :         let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
      75            0 :         let status: crate::http::routes::TimelineStatus = response.json().await?;
      76            0 :         statuses.push((status, i));
      77              :     }
      78              : 
      79              :     // Find the most advanced safekeeper
      80              :     // TODO: current logic may be wrong, fix it later
      81            0 :     let (status, i) = statuses
      82            0 :         .into_iter()
      83            0 :         .max_by_key(|(status, _)| {
      84            0 :             (
      85            0 :                 status.acceptor_state.epoch,
      86            0 :                 status.flush_lsn,
      87            0 :                 status.commit_lsn,
      88            0 :             )
      89            0 :         })
      90            0 :         .unwrap();
      91            0 :     let safekeeper_host = http_hosts[i].clone();
      92            0 : 
      93            0 :     assert!(status.tenant_id == request.tenant_id);
      94            0 :     assert!(status.timeline_id == request.timeline_id);
      95              : 
      96            0 :     pull_timeline(status, safekeeper_host).await
      97            0 : }
      98              : 
      99            0 : async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
     100            0 :     let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
     101            0 :     info!(
     102            0 :         "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
     103              :         ttid,
     104              :         host,
     105              :         status.commit_lsn,
     106              :         status.flush_lsn,
     107              :         status.acceptor_state.term,
     108              :         status.acceptor_state.epoch
     109              :     );
     110              : 
     111            0 :     let conf = &GlobalTimelines::get_global_config();
     112            0 : 
     113            0 :     let client = reqwest::Client::new();
     114              :     // TODO: don't use debug dump, it should be used only in tests.
     115              :     //      This is a proof of concept, we should figure out a way
     116              :     //      to use scp without implementing it manually.
     117              : 
     118              :     // Implementing our own scp over HTTP.
     119              :     // At first, we need to fetch list of files from safekeeper.
     120            0 :     let dump: DebugDumpResponse = client
     121            0 :         .get(format!(
     122            0 :             "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
     123            0 :             host, status.tenant_id, status.timeline_id
     124            0 :         ))
     125            0 :         .send()
     126            0 :         .await?
     127            0 :         .json()
     128            0 :         .await?;
     129              : 
     130            0 :     if dump.timelines.len() != 1 {
     131            0 :         bail!(
     132            0 :             "expected to fetch single timeline, got {} timelines",
     133            0 :             dump.timelines.len()
     134            0 :         );
     135            0 :     }
     136            0 : 
     137            0 :     let timeline = dump.timelines.into_iter().next().unwrap();
     138            0 :     let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
     139            0 :         "timeline {} doesn't have disk content",
     140            0 :         ttid
     141            0 :     ))?;
     142              : 
     143            0 :     let mut filenames = disk_content
     144            0 :         .files
     145            0 :         .iter()
     146            0 :         .map(|file| file.name.clone())
     147            0 :         .collect::<Vec<_>>();
     148            0 : 
     149            0 :     // Sort filenames to make sure we pull files in correct order
     150            0 :     // After sorting, we should have:
     151            0 :     // - 000000010000000000000001
     152            0 :     // - ...
     153            0 :     // - 000000010000000000000002.partial
     154            0 :     // - safekeeper.control
     155            0 :     filenames.sort();
     156              : 
     157              :     // safekeeper.control should be the first file, so we need to move it to the beginning
     158            0 :     let control_file_index = filenames
     159            0 :         .iter()
     160            0 :         .position(|name| name == "safekeeper.control")
     161            0 :         .ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
     162            0 :     filenames.remove(control_file_index);
     163            0 :     filenames.insert(0, "safekeeper.control".to_string());
     164            0 : 
     165            0 :     info!(
     166            0 :         "downloading {} files from safekeeper {}",
     167            0 :         filenames.len(),
     168              :         host
     169              :     );
     170              : 
     171            0 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
     172              : 
     173              :     // Note: some time happens between fetching list of files and fetching files themselves.
     174              :     //       It's possible that some files will be removed from safekeeper and we will fail to fetch them.
     175              :     //       This function will fail in this case, should be retried by the caller.
     176            0 :     for filename in filenames {
     177            0 :         let file_path = tli_dir_path.join(&filename);
     178            0 :         // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
     179            0 :         let http_url = format!(
     180            0 :             "{}/v1/tenant/{}/timeline/{}/file/{}",
     181            0 :             host, status.tenant_id, status.timeline_id, filename
     182            0 :         );
     183              : 
     184            0 :         let mut file = tokio::fs::File::create(&file_path).await?;
     185            0 :         let mut response = client.get(&http_url).send().await?;
     186            0 :         while let Some(chunk) = response.chunk().await? {
     187            0 :             file.write_all(&chunk).await?;
     188            0 :             file.flush().await?;
     189              :         }
     190              :     }
     191              : 
     192              :     // TODO: fsync?
     193              : 
     194              :     // Let's create timeline from temp directory and verify that it's correct
     195            0 :     let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
     196            0 :     info!(
     197            0 :         "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
     198              :         ttid, commit_lsn, flush_lsn
     199              :     );
     200            0 :     assert!(status.commit_lsn <= status.flush_lsn);
     201              : 
     202              :     // Finally, load the timeline.
     203            0 :     let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
     204              : 
     205            0 :     Ok(Response {
     206            0 :         safekeeper_host: host,
     207            0 :     })
     208            0 : }
     209              : 
     210              : /// Create temp directory for a new timeline. It needs to be located on the same
     211              : /// filesystem as the rest of the timelines. It will be automatically deleted when
     212              : /// Utf8TempDir goes out of scope.
     213            0 : pub async fn create_temp_timeline_dir(
     214            0 :     conf: &SafeKeeperConf,
     215            0 :     ttid: TenantTimelineId,
     216            0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
     217              :     // conf.workdir is usually /storage/safekeeper/data
     218              :     // will try to transform it into /storage/safekeeper/tmp
     219            0 :     let temp_base = conf
     220            0 :         .workdir
     221            0 :         .parent()
     222            0 :         .ok_or(anyhow::anyhow!("workdir has no parent"))?
     223            0 :         .join("tmp");
     224            0 : 
     225            0 :     tokio::fs::create_dir_all(&temp_base).await?;
     226              : 
     227            0 :     let tli_dir = camino_tempfile::Builder::new()
     228            0 :         .suffix("_temptli")
     229            0 :         .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
     230            0 :         .tempdir_in(temp_base)?;
     231              : 
     232            0 :     let tli_dir_path = tli_dir.path().to_path_buf();
     233            0 : 
     234            0 :     Ok((tli_dir, tli_dir_path))
     235            0 : }
     236              : 
     237              : /// Do basic validation of a temp timeline, before moving it to the global map.
     238            0 : pub async fn validate_temp_timeline(
     239            0 :     conf: &SafeKeeperConf,
     240            0 :     ttid: TenantTimelineId,
     241            0 :     path: &Utf8PathBuf,
     242            0 : ) -> Result<(Lsn, Lsn)> {
     243            0 :     let control_path = path.join("safekeeper.control");
     244              : 
     245            0 :     let control_store = control_file::FileStorage::load_control_file(control_path)?;
     246            0 :     if control_store.server.wal_seg_size == 0 {
     247            0 :         bail!("wal_seg_size is not set");
     248            0 :     }
     249              : 
     250            0 :     let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
     251              : 
     252            0 :     let commit_lsn = control_store.commit_lsn;
     253            0 :     let flush_lsn = wal_store.flush_lsn();
     254            0 : 
     255            0 :     Ok((commit_lsn, flush_lsn))
     256            0 : }
     257              : 
     258              : /// Move timeline from a temp directory to the main storage, and load it to the global map.
     259              : /// This operation is done under a lock to prevent bugs if several concurrent requests are
     260              : /// trying to load the same timeline. Note that it doesn't guard against creating the
     261              : /// timeline with the same ttid, but no one should be doing this anyway.
     262            0 : pub async fn load_temp_timeline(
     263            0 :     conf: &SafeKeeperConf,
     264            0 :     ttid: TenantTimelineId,
     265            0 :     tmp_path: &Utf8PathBuf,
     266            0 : ) -> Result<Arc<Timeline>> {
     267              :     // Take a lock to prevent concurrent loadings
     268            0 :     let load_lock = GlobalTimelines::loading_lock().await;
     269            0 :     let guard = load_lock.lock().await;
     270              : 
     271            0 :     if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
     272            0 :         bail!("timeline already exists, cannot overwrite it")
     273            0 :     }
     274            0 : 
     275            0 :     // Move timeline dir to the correct location
     276            0 :     let timeline_path = conf.timeline_dir(&ttid);
     277            0 : 
     278            0 :     info!(
     279            0 :         "moving timeline {} from {} to {}",
     280              :         ttid, tmp_path, timeline_path
     281              :     );
     282            0 :     tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
     283            0 :     tokio::fs::rename(tmp_path, &timeline_path).await?;
     284              : 
     285            0 :     let tli = GlobalTimelines::load_timeline(&guard, ttid)
     286            0 :         .await
     287            0 :         .context("Failed to load timeline after copy")?;
     288              : 
     289            0 :     info!(
     290            0 :         "loaded timeline {}, flush_lsn={}",
     291            0 :         ttid,
     292            0 :         tli.get_flush_lsn().await
     293              :     );
     294              : 
     295            0 :     Ok(tli)
     296            0 : }
        

Generated by: LCOV version 2.1-beta