TLA Line data Source code
1 : use serde::{Deserialize, Serialize};
2 :
3 : use anyhow::{bail, Context, Result};
4 : use tokio::io::AsyncWriteExt;
5 : use tracing::info;
6 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
7 :
8 : use serde_with::{serde_as, DisplayFromStr};
9 :
10 : use crate::{
11 : control_file, debug_dump,
12 : http::routes::TimelineStatus,
13 : wal_storage::{self, Storage},
14 : GlobalTimelines,
15 : };
16 :
17 : /// Info about timeline on safekeeper ready for reporting.
18 : #[serde_as]
19 CBC 9 : #[derive(Debug, Serialize, Deserialize)]
20 : pub struct Request {
21 : #[serde_as(as = "DisplayFromStr")]
22 : pub tenant_id: TenantId,
23 : #[serde_as(as = "DisplayFromStr")]
24 : pub timeline_id: TimelineId,
25 : pub http_hosts: Vec<String>,
26 : }
27 :
28 1 : #[derive(Debug, Serialize)]
29 : pub struct Response {
30 : // Donor safekeeper host
31 : pub safekeeper_host: String,
32 : // TODO: add more fields?
33 : }
34 :
35 : /// Find the most advanced safekeeper and pull timeline from it.
36 1 : pub async fn handle_request(request: Request) -> Result<Response> {
37 1 : let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
38 1 : request.tenant_id,
39 1 : request.timeline_id,
40 1 : ));
41 1 : if existing_tli.is_ok() {
42 UBC 0 : bail!("Timeline {} already exists", request.timeline_id);
43 CBC 1 : }
44 1 :
45 1 : let client = reqwest::Client::new();
46 1 : let http_hosts = request.http_hosts.clone();
47 :
48 : // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
49 2 : let responses = futures::future::join_all(http_hosts.iter().map(|url| {
50 2 : let url = format!(
51 2 : "{}/v1/tenant/{}/timeline/{}",
52 2 : url, request.tenant_id, request.timeline_id
53 2 : );
54 2 : client.get(url).send()
55 2 : }))
56 4 : .await;
57 :
58 1 : let mut statuses = Vec::new();
59 2 : for (i, response) in responses.into_iter().enumerate() {
60 2 : let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
61 2 : let status: crate::http::routes::TimelineStatus = response.json().await?;
62 2 : statuses.push((status, i));
63 : }
64 :
65 : // Find the most advanced safekeeper
66 : // TODO: current logic may be wrong, fix it later
67 1 : let (status, i) = statuses
68 1 : .into_iter()
69 2 : .max_by_key(|(status, _)| {
70 2 : (
71 2 : status.acceptor_state.epoch,
72 2 : status.flush_lsn,
73 2 : status.commit_lsn,
74 2 : )
75 2 : })
76 1 : .unwrap();
77 1 : let safekeeper_host = http_hosts[i].clone();
78 1 :
79 1 : assert!(status.tenant_id == request.tenant_id);
80 1 : assert!(status.timeline_id == request.timeline_id);
81 :
82 109 : pull_timeline(status, safekeeper_host).await
83 1 : }
84 :
85 1 : async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
86 1 : let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
87 1 : info!(
88 1 : "Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
89 1 : ttid,
90 1 : host,
91 1 : status.commit_lsn,
92 1 : status.flush_lsn,
93 1 : status.acceptor_state.term,
94 1 : status.acceptor_state.epoch
95 1 : );
96 :
97 1 : let conf = &GlobalTimelines::get_global_config();
98 1 :
99 1 : let client = reqwest::Client::new();
100 : // TODO: don't use debug dump, it should be used only in tests.
101 : // This is a proof of concept, we should figure out a way
102 : // to use scp without implementing it manually.
103 :
104 : // Implementing our own scp over HTTP.
105 : // At first, we need to fetch list of files from safekeeper.
106 1 : let dump: debug_dump::Response = client
107 1 : .get(format!(
108 1 : "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
109 1 : host, status.tenant_id, status.timeline_id
110 1 : ))
111 1 : .send()
112 5 : .await?
113 1 : .json()
114 UBC 0 : .await?;
115 :
116 CBC 1 : if dump.timelines.len() != 1 {
117 UBC 0 : bail!(
118 0 : "Expected to fetch single timeline, got {} timelines",
119 0 : dump.timelines.len()
120 0 : );
121 CBC 1 : }
122 1 :
123 1 : let timeline = dump.timelines.into_iter().next().unwrap();
124 1 : let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
125 1 : "Timeline {} doesn't have disk content",
126 1 : ttid
127 1 : ))?;
128 :
129 1 : let mut filenames = disk_content
130 1 : .files
131 1 : .iter()
132 3 : .map(|file| file.name.clone())
133 1 : .collect::<Vec<_>>();
134 1 :
135 1 : // Sort filenames to make sure we pull files in correct order
136 1 : // After sorting, we should have:
137 1 : // - 000000010000000000000001
138 1 : // - ...
139 1 : // - 000000010000000000000002.partial
140 1 : // - safekeeper.control
141 1 : filenames.sort();
142 :
143 : // safekeeper.control should be the first file, so we need to move it to the beginning
144 1 : let control_file_index = filenames
145 1 : .iter()
146 3 : .position(|name| name == "safekeeper.control")
147 1 : .ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
148 1 : filenames.remove(control_file_index);
149 1 : filenames.insert(0, "safekeeper.control".to_string());
150 :
151 1 : info!(
152 1 : "Downloading {} files from safekeeper {}",
153 1 : filenames.len(),
154 1 : host
155 1 : );
156 :
157 : // Creating temp directory for a new timeline. It needs to be
158 : // located on the same filesystem as the rest of the timelines.
159 :
160 : // conf.workdir is usually /storage/safekeeper/data
161 : // will try to transform it into /storage/safekeeper/tmp
162 1 : let temp_base = conf
163 1 : .workdir
164 1 : .parent()
165 1 : .ok_or(anyhow::anyhow!("workdir has no parent"))?
166 1 : .join("tmp");
167 1 :
168 1 : tokio::fs::create_dir_all(&temp_base).await?;
169 :
170 1 : let tli_dir = camino_tempfile::Builder::new()
171 1 : .suffix("_temptli")
172 1 : .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
173 1 : .tempdir_in(temp_base)?;
174 1 : let tli_dir_path = tli_dir.path().to_path_buf();
175 :
176 : // Note: some time happens between fetching list of files and fetching files themselves.
177 : // It's possible that some files will be removed from safekeeper and we will fail to fetch them.
178 : // This function will fail in this case, should be retried by the caller.
179 4 : for filename in filenames {
180 3 : let file_path = tli_dir_path.join(&filename);
181 3 : // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
182 3 : let http_url = format!(
183 3 : "{}/v1/tenant/{}/timeline/{}/file/{}",
184 3 : host, status.tenant_id, status.timeline_id, filename
185 3 : );
186 :
187 3 : let mut file = tokio::fs::File::create(&file_path).await?;
188 3 : let mut response = client.get(&http_url).send().await?;
189 93 : while let Some(chunk) = response.chunk().await? {
190 90 : file.write_all(&chunk).await?;
191 90 : file.flush().await?;
192 : }
193 : }
194 :
195 : // TODO: fsync?
196 :
197 : // Let's create timeline from temp directory and verify that it's correct
198 :
199 1 : let control_path = tli_dir_path.join("safekeeper.control");
200 :
201 1 : let control_store = control_file::FileStorage::load_control_file(control_path)?;
202 1 : if control_store.server.wal_seg_size == 0 {
203 UBC 0 : bail!("wal_seg_size is not set");
204 CBC 1 : }
205 :
206 1 : let wal_store =
207 1 : wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?;
208 :
209 1 : let commit_lsn = status.commit_lsn;
210 1 : let flush_lsn = wal_store.flush_lsn();
211 :
212 1 : info!(
213 1 : "Finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
214 1 : ttid, commit_lsn, flush_lsn
215 1 : );
216 1 : assert!(status.commit_lsn <= status.flush_lsn);
217 :
218 : // Move timeline dir to the correct location
219 1 : let timeline_path = conf.timeline_dir(&ttid);
220 :
221 1 : info!(
222 1 : "Moving timeline {} from {} to {}",
223 1 : ttid, tli_dir_path, timeline_path
224 1 : );
225 1 : tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
226 1 : tokio::fs::rename(tli_dir_path, &timeline_path).await?;
227 :
228 1 : let tli = GlobalTimelines::load_timeline(ttid)
229 UBC 0 : .await
230 CBC 1 : .context("Failed to load timeline after copy")?;
231 :
232 1 : info!(
233 1 : "Loaded timeline {}, flush_lsn={}",
234 1 : ttid,
235 1 : tli.get_flush_lsn().await
236 1 : );
237 :
238 1 : Ok(Response {
239 1 : safekeeper_host: host,
240 1 : })
241 1 : }
|