Line data Source code
1 : use std::sync::Arc;
2 :
3 : use camino::Utf8PathBuf;
4 : use camino_tempfile::Utf8TempDir;
5 : use chrono::{DateTime, Utc};
6 : use serde::{Deserialize, Serialize};
7 :
8 : use anyhow::{bail, Context, Result};
9 : use tokio::io::AsyncWriteExt;
10 : use tracing::info;
11 : use utils::{
12 : id::{TenantId, TenantTimelineId, TimelineId},
13 : lsn::Lsn,
14 : };
15 :
16 : use crate::{
17 : control_file, debug_dump,
18 : http::routes::TimelineStatus,
19 : timeline::{Timeline, TimelineError},
20 : wal_storage::{self, Storage},
21 : GlobalTimelines, SafeKeeperConf,
22 : };
23 :
24 : /// Info about timeline on safekeeper ready for reporting.
25 7 : #[derive(Debug, Serialize, Deserialize)]
26 : pub struct Request {
27 : pub tenant_id: TenantId,
28 : pub timeline_id: TimelineId,
29 : pub http_hosts: Vec<String>,
30 : }
31 :
32 1 : #[derive(Debug, Serialize)]
33 : pub struct Response {
34 : // Donor safekeeper host
35 : pub safekeeper_host: String,
36 : // TODO: add more fields?
37 : }
38 :
39 : /// Response for debug dump request.
40 11 : #[derive(Debug, Serialize, Deserialize)]
41 : pub struct DebugDumpResponse {
42 : pub start_time: DateTime<Utc>,
43 : pub finish_time: DateTime<Utc>,
44 : pub timelines: Vec<debug_dump::Timeline>,
45 : pub timelines_count: usize,
46 : pub config: debug_dump::Config,
47 : }
48 :
49 : /// Find the most advanced safekeeper and pull timeline from it.
50 1 : pub async fn handle_request(request: Request) -> Result<Response> {
51 1 : let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
52 1 : request.tenant_id,
53 1 : request.timeline_id,
54 1 : ));
55 1 : if existing_tli.is_ok() {
56 0 : bail!("Timeline {} already exists", request.timeline_id);
57 1 : }
58 1 :
59 1 : let client = reqwest::Client::new();
60 1 : let http_hosts = request.http_hosts.clone();
61 :
62 : // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
63 2 : let responses = futures::future::join_all(http_hosts.iter().map(|url| {
64 2 : let url = format!(
65 2 : "{}/v1/tenant/{}/timeline/{}",
66 2 : url, request.tenant_id, request.timeline_id
67 2 : );
68 2 : client.get(url).send()
69 2 : }))
70 5 : .await;
71 :
72 1 : let mut statuses = Vec::new();
73 2 : for (i, response) in responses.into_iter().enumerate() {
74 2 : let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
75 2 : let status: crate::http::routes::TimelineStatus = response.json().await?;
76 2 : statuses.push((status, i));
77 : }
78 :
79 : // Find the most advanced safekeeper
80 : // TODO: current logic may be wrong, fix it later
81 1 : let (status, i) = statuses
82 1 : .into_iter()
83 2 : .max_by_key(|(status, _)| {
84 2 : (
85 2 : status.acceptor_state.epoch,
86 2 : status.flush_lsn,
87 2 : status.commit_lsn,
88 2 : )
89 2 : })
90 1 : .unwrap();
91 1 : let safekeeper_host = http_hosts[i].clone();
92 1 :
93 1 : assert!(status.tenant_id == request.tenant_id);
94 1 : assert!(status.timeline_id == request.timeline_id);
95 :
96 108 : pull_timeline(status, safekeeper_host).await
97 1 : }
98 :
99 1 : async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
100 1 : let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
101 1 : info!(
102 1 : "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
103 1 : ttid,
104 1 : host,
105 1 : status.commit_lsn,
106 1 : status.flush_lsn,
107 1 : status.acceptor_state.term,
108 1 : status.acceptor_state.epoch
109 1 : );
110 :
111 1 : let conf = &GlobalTimelines::get_global_config();
112 1 :
113 1 : let client = reqwest::Client::new();
114 : // TODO: don't use debug dump, it should be used only in tests.
115 : // This is a proof of concept, we should figure out a way
116 : // to use scp without implementing it manually.
117 :
118 : // Implementing our own scp over HTTP.
119 : // At first, we need to fetch list of files from safekeeper.
120 1 : let dump: DebugDumpResponse = client
121 1 : .get(format!(
122 1 : "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
123 1 : host, status.tenant_id, status.timeline_id
124 1 : ))
125 1 : .send()
126 4 : .await?
127 1 : .json()
128 3 : .await?;
129 :
130 1 : if dump.timelines.len() != 1 {
131 0 : bail!(
132 0 : "expected to fetch single timeline, got {} timelines",
133 0 : dump.timelines.len()
134 0 : );
135 1 : }
136 1 :
137 1 : let timeline = dump.timelines.into_iter().next().unwrap();
138 1 : let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
139 1 : "timeline {} doesn't have disk content",
140 1 : ttid
141 1 : ))?;
142 :
143 1 : let mut filenames = disk_content
144 1 : .files
145 1 : .iter()
146 3 : .map(|file| file.name.clone())
147 1 : .collect::<Vec<_>>();
148 1 :
149 1 : // Sort filenames to make sure we pull files in correct order
150 1 : // After sorting, we should have:
151 1 : // - 000000010000000000000001
152 1 : // - ...
153 1 : // - 000000010000000000000002.partial
154 1 : // - safekeeper.control
155 1 : filenames.sort();
156 :
157 : // safekeeper.control should be the first file, so we need to move it to the beginning
158 1 : let control_file_index = filenames
159 1 : .iter()
160 3 : .position(|name| name == "safekeeper.control")
161 1 : .ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
162 1 : filenames.remove(control_file_index);
163 1 : filenames.insert(0, "safekeeper.control".to_string());
164 :
165 1 : info!(
166 1 : "downloading {} files from safekeeper {}",
167 1 : filenames.len(),
168 1 : host
169 1 : );
170 :
171 1 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
172 :
173 : // Note: some time happens between fetching list of files and fetching files themselves.
174 : // It's possible that some files will be removed from safekeeper and we will fail to fetch them.
175 : // This function will fail in this case, should be retried by the caller.
176 4 : for filename in filenames {
177 3 : let file_path = tli_dir_path.join(&filename);
178 3 : // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
179 3 : let http_url = format!(
180 3 : "{}/v1/tenant/{}/timeline/{}/file/{}",
181 3 : host, status.tenant_id, status.timeline_id, filename
182 3 : );
183 :
184 3 : let mut file = tokio::fs::File::create(&file_path).await?;
185 3 : let mut response = client.get(&http_url).send().await?;
186 91 : while let Some(chunk) = response.chunk().await? {
187 88 : file.write_all(&chunk).await?;
188 88 : file.flush().await?;
189 : }
190 : }
191 :
192 : // TODO: fsync?
193 :
194 : // Let's create timeline from temp directory and verify that it's correct
195 1 : let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
196 1 : info!(
197 1 : "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
198 1 : ttid, commit_lsn, flush_lsn
199 1 : );
200 1 : assert!(status.commit_lsn <= status.flush_lsn);
201 :
202 : // Finally, load the timeline.
203 2 : let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
204 :
205 1 : Ok(Response {
206 1 : safekeeper_host: host,
207 1 : })
208 1 : }
209 :
210 : /// Create temp directory for a new timeline. It needs to be located on the same
211 : /// filesystem as the rest of the timelines. It will be automatically deleted when
212 : /// Utf8TempDir goes out of scope.
213 49 : pub async fn create_temp_timeline_dir(
214 49 : conf: &SafeKeeperConf,
215 49 : ttid: TenantTimelineId,
216 49 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
217 : // conf.workdir is usually /storage/safekeeper/data
218 : // will try to transform it into /storage/safekeeper/tmp
219 49 : let temp_base = conf
220 49 : .workdir
221 49 : .parent()
222 49 : .ok_or(anyhow::anyhow!("workdir has no parent"))?
223 49 : .join("tmp");
224 49 :
225 49 : tokio::fs::create_dir_all(&temp_base).await?;
226 :
227 49 : let tli_dir = camino_tempfile::Builder::new()
228 49 : .suffix("_temptli")
229 49 : .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
230 49 : .tempdir_in(temp_base)?;
231 :
232 49 : let tli_dir_path = tli_dir.path().to_path_buf();
233 49 :
234 49 : Ok((tli_dir, tli_dir_path))
235 49 : }
236 :
237 : /// Do basic validation of a temp timeline, before moving it to the global map.
238 49 : pub async fn validate_temp_timeline(
239 49 : conf: &SafeKeeperConf,
240 49 : ttid: TenantTimelineId,
241 49 : path: &Utf8PathBuf,
242 49 : ) -> Result<(Lsn, Lsn)> {
243 49 : let control_path = path.join("safekeeper.control");
244 :
245 49 : let control_store = control_file::FileStorage::load_control_file(control_path)?;
246 49 : if control_store.server.wal_seg_size == 0 {
247 0 : bail!("wal_seg_size is not set");
248 49 : }
249 :
250 49 : let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
251 :
252 49 : let commit_lsn = control_store.commit_lsn;
253 49 : let flush_lsn = wal_store.flush_lsn();
254 49 :
255 49 : Ok((commit_lsn, flush_lsn))
256 49 : }
257 :
258 : /// Move timeline from a temp directory to the main storage, and load it to the global map.
259 : /// This operation is done under a lock to prevent bugs if several concurrent requests are
260 : /// trying to load the same timeline. Note that it doesn't guard against creating the
261 : /// timeline with the same ttid, but no one should be doing this anyway.
262 49 : pub async fn load_temp_timeline(
263 49 : conf: &SafeKeeperConf,
264 49 : ttid: TenantTimelineId,
265 49 : tmp_path: &Utf8PathBuf,
266 49 : ) -> Result<Arc<Timeline>> {
267 : // Take a lock to prevent concurrent loadings
268 49 : let load_lock = GlobalTimelines::loading_lock().await;
269 49 : let guard = load_lock.lock().await;
270 :
271 49 : if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
272 0 : bail!("timeline already exists, cannot overwrite it")
273 49 : }
274 49 :
275 49 : // Move timeline dir to the correct location
276 49 : let timeline_path = conf.timeline_dir(&ttid);
277 :
278 49 : info!(
279 49 : "moving timeline {} from {} to {}",
280 49 : ttid, tmp_path, timeline_path
281 49 : );
282 49 : tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
283 49 : tokio::fs::rename(tmp_path, &timeline_path).await?;
284 :
285 49 : let tli = GlobalTimelines::load_timeline(&guard, ttid)
286 0 : .await
287 49 : .context("Failed to load timeline after copy")?;
288 :
289 49 : info!(
290 49 : "loaded timeline {}, flush_lsn={}",
291 49 : ttid,
292 49 : tli.get_flush_lsn().await
293 49 : );
294 :
295 49 : Ok(tli)
296 49 : }
|