Line data Source code
1 : use anyhow::{anyhow, bail, Context, Result};
2 : use bytes::Bytes;
3 : use camino::Utf8PathBuf;
4 : use camino_tempfile::Utf8TempDir;
5 : use chrono::{DateTime, Utc};
6 : use futures::{SinkExt, StreamExt, TryStreamExt};
7 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
8 : use serde::{Deserialize, Serialize};
9 : use std::{
10 : cmp::min,
11 : io::{self, ErrorKind},
12 : sync::Arc,
13 : };
14 : use tokio::{
15 : fs::{File, OpenOptions},
16 : io::AsyncWrite,
17 : sync::mpsc,
18 : task,
19 : };
20 : use tokio_tar::{Archive, Builder};
21 : use tokio_util::{
22 : io::{CopyToBytes, SinkWriter},
23 : sync::PollSender,
24 : };
25 : use tracing::{error, info, instrument};
26 :
27 : use crate::{
28 : control_file::{self, CONTROL_FILE_NAME},
29 : debug_dump,
30 : http::{
31 : client::{self, Client},
32 : routes::TimelineStatus,
33 : },
34 : safekeeper::Term,
35 : timeline::{get_tenant_dir, get_timeline_dir, FullAccessTimeline, Timeline, TimelineError},
36 : wal_storage::{self, open_wal_file, Storage},
37 : GlobalTimelines, SafeKeeperConf,
38 : };
39 : use utils::{
40 : crashsafe::{durable_rename, fsync_async_opt},
41 : id::{TenantId, TenantTimelineId, TimelineId},
42 : logging::SecretString,
43 : lsn::Lsn,
44 : pausable_failpoint,
45 : };
46 :
47 : /// Stream tar archive of timeline to tx.
48 0 : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
49 : pub async fn stream_snapshot(tli: FullAccessTimeline, tx: mpsc::Sender<Result<Bytes>>) {
50 : if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
51 : // Error type/contents don't matter as they won't can't reach the client
52 : // (hyper likely doesn't do anything with it), but http stream will be
53 : // prematurely terminated. It would be nice to try to send the error in
54 : // trailers though.
55 : tx.send(Err(anyhow!("snapshot failed"))).await.ok();
56 : error!("snapshot failed: {:#}", e);
57 : }
58 : }
59 :
60 : /// State needed while streaming the snapshot.
61 : pub struct SnapshotContext {
62 : pub from_segno: XLogSegNo, // including
63 : pub upto_segno: XLogSegNo, // including
64 : pub term: Term,
65 : pub last_log_term: Term,
66 : pub flush_lsn: Lsn,
67 : pub wal_seg_size: usize,
68 : // used to remove WAL hold off in Drop.
69 : pub tli: FullAccessTimeline,
70 : }
71 :
72 : impl Drop for SnapshotContext {
73 0 : fn drop(&mut self) {
74 0 : let tli = self.tli.clone();
75 0 : task::spawn(async move {
76 0 : let mut shared_state = tli.write_shared_state().await;
77 0 : shared_state.wal_removal_on_hold = false;
78 0 : });
79 0 : }
80 : }
81 :
82 0 : pub async fn stream_snapshot_guts(
83 0 : tli: FullAccessTimeline,
84 0 : tx: mpsc::Sender<Result<Bytes>>,
85 0 : ) -> Result<()> {
86 0 : // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
87 0 : // use SinkWriter as a Write impl. That is,
88 0 : // - create Sink from the tx. It returns PollSendError if chan is closed.
89 0 : let sink = PollSender::new(tx);
90 0 : // - SinkWriter needs sink error to be io one, map it.
91 0 : let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
92 0 : // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
93 0 : // it with with(). Note that with() accepts async function which we don't
94 0 : // need and allows the map to fail, which we don't need either, but hence
95 0 : // two Oks.
96 0 : let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
97 0 : // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
98 0 : // into CopyToBytes. This is a data copy.
99 0 : let copy_to_bytes = CopyToBytes::new(oksink);
100 0 : let mut writer = SinkWriter::new(copy_to_bytes);
101 0 : let pinned_writer = std::pin::pin!(writer);
102 0 :
103 0 : // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
104 0 : // which is also likely suboptimal.
105 0 : let mut ar = Builder::new_non_terminated(pinned_writer);
106 :
107 0 : let bctx = tli.start_snapshot(&mut ar).await?;
108 : pausable_failpoint!("sk-snapshot-after-list-pausable");
109 :
110 0 : let tli_dir = tli.get_timeline_dir();
111 0 : info!(
112 0 : "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
113 0 : bctx.upto_segno - bctx.from_segno + 1,
114 : bctx.from_segno,
115 : bctx.upto_segno,
116 : bctx.term,
117 : bctx.last_log_term,
118 : bctx.flush_lsn,
119 : );
120 0 : for segno in bctx.from_segno..=bctx.upto_segno {
121 0 : let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
122 0 : let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
123 0 : if is_partial {
124 0 : wal_file_name.push_str(".partial");
125 0 : }
126 0 : ar.append_file(&wal_file_name, &mut sf).await?;
127 : }
128 :
129 : // Do the term check before ar.finish to make archive corrupted in case of
130 : // term change. Client shouldn't ignore abrupt stream end, but to be sure.
131 0 : tli.finish_snapshot(&bctx).await?;
132 :
133 0 : ar.finish().await?;
134 :
135 0 : Ok(())
136 0 : }
137 :
138 : impl FullAccessTimeline {
139 : /// Start streaming tar archive with timeline:
140 : /// 1) stream control file under lock;
141 : /// 2) hold off WAL removal;
142 : /// 3) collect SnapshotContext to understand which WAL segments should be
143 : /// streamed.
144 : ///
145 : /// Snapshot streams data up to flush_lsn. To make this safe, we must check
146 : /// that term doesn't change during the procedure, or we risk sending mix of
147 : /// WAL from different histories. Term is remembered in the SnapshotContext
148 : /// and checked in finish_snapshot. Note that in the last segment some WAL
149 : /// higher than flush_lsn set here might be streamed; that's fine as long as
150 : /// terms doesn't change.
151 : ///
152 : /// Alternatively we could send only up to commit_lsn to get some valid
153 : /// state which later will be recovered by compute, in this case term check
154 : /// is not needed, but we likely don't want that as there might be no
155 : /// compute which could perform the recovery.
156 : ///
157 : /// When returned SnapshotContext is dropped WAL hold is removed.
158 0 : async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
159 0 : &self,
160 0 : ar: &mut tokio_tar::Builder<W>,
161 0 : ) -> Result<SnapshotContext> {
162 0 : let mut shared_state = self.write_shared_state().await;
163 :
164 0 : let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
165 0 : let mut cf = File::open(cf_path).await?;
166 0 : ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
167 :
168 : // We need to stream since the oldest segment someone (s3 or pageserver)
169 : // still needs. This duplicates calc_horizon_lsn logic.
170 : //
171 : // We know that WAL wasn't removed up to this point because it cannot be
172 : // removed further than `backup_lsn`. Since we're holding shared_state
173 : // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
174 : // won't be removed until we're done.
175 0 : let from_lsn = min(
176 0 : shared_state.sk.state.remote_consistent_lsn,
177 0 : shared_state.sk.state.backup_lsn,
178 0 : );
179 0 : if from_lsn == Lsn::INVALID {
180 : // this is possible if snapshot is called before handling first
181 : // elected message
182 0 : bail!("snapshot is called on uninitialized timeline");
183 0 : }
184 0 : let from_segno = from_lsn.segment_number(shared_state.get_wal_seg_size());
185 0 : let term = shared_state.sk.get_term();
186 0 : let last_log_term = shared_state.sk.get_last_log_term();
187 0 : let flush_lsn = shared_state.sk.flush_lsn();
188 0 : let upto_segno = flush_lsn.segment_number(shared_state.get_wal_seg_size());
189 0 : // have some limit on max number of segments as a sanity check
190 0 : const MAX_ALLOWED_SEGS: u64 = 1000;
191 0 : let num_segs = upto_segno - from_segno + 1;
192 0 : if num_segs > MAX_ALLOWED_SEGS {
193 0 : bail!(
194 0 : "snapshot is called on timeline with {} segments, but the limit is {}",
195 0 : num_segs,
196 0 : MAX_ALLOWED_SEGS
197 0 : );
198 0 : }
199 0 :
200 0 : // Prevent WAL removal while we're streaming data.
201 0 : //
202 0 : // Since this a flag, not a counter just bail out if already set; we
203 0 : // shouldn't need concurrent snapshotting.
204 0 : if shared_state.wal_removal_on_hold {
205 0 : bail!("wal_removal_on_hold is already true");
206 0 : }
207 0 : shared_state.wal_removal_on_hold = true;
208 0 :
209 0 : let bctx = SnapshotContext {
210 0 : from_segno,
211 0 : upto_segno,
212 0 : term,
213 0 : last_log_term,
214 0 : flush_lsn,
215 0 : wal_seg_size: shared_state.get_wal_seg_size(),
216 0 : tli: self.clone(),
217 0 : };
218 0 :
219 0 : Ok(bctx)
220 0 : }
221 :
222 : /// Finish snapshotting: check that term(s) hasn't changed.
223 : ///
224 : /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
225 : /// forget this if snapshotting fails mid the way.
226 0 : pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
227 0 : let shared_state = self.read_shared_state().await;
228 0 : let term = shared_state.sk.get_term();
229 0 : let last_log_term = shared_state.sk.get_last_log_term();
230 0 : // There are some cases to relax this check (e.g. last_log_term might
231 0 : // change, but as long as older history is strictly part of new that's
232 0 : // fine), but there is no need to do it.
233 0 : if bctx.term != term || bctx.last_log_term != last_log_term {
234 0 : bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
235 0 : bctx.term, bctx.last_log_term, term, last_log_term);
236 0 : }
237 0 : Ok(())
238 0 : }
239 : }
240 :
241 : /// pull_timeline request body.
242 0 : #[derive(Debug, Serialize, Deserialize)]
243 : pub struct Request {
244 : pub tenant_id: TenantId,
245 : pub timeline_id: TimelineId,
246 : pub http_hosts: Vec<String>,
247 : }
248 :
249 : #[derive(Debug, Serialize)]
250 : pub struct Response {
251 : // Donor safekeeper host
252 : pub safekeeper_host: String,
253 : // TODO: add more fields?
254 : }
255 :
256 : /// Response for debug dump request.
257 0 : #[derive(Debug, Serialize, Deserialize)]
258 : pub struct DebugDumpResponse {
259 : pub start_time: DateTime<Utc>,
260 : pub finish_time: DateTime<Utc>,
261 : pub timelines: Vec<debug_dump::Timeline>,
262 : pub timelines_count: usize,
263 : pub config: debug_dump::Config,
264 : }
265 :
266 : /// Find the most advanced safekeeper and pull timeline from it.
267 0 : pub async fn handle_request(
268 0 : request: Request,
269 0 : sk_auth_token: Option<SecretString>,
270 0 : ) -> Result<Response> {
271 0 : let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
272 0 : request.tenant_id,
273 0 : request.timeline_id,
274 0 : ));
275 0 : if existing_tli.is_ok() {
276 0 : bail!("Timeline {} already exists", request.timeline_id);
277 0 : }
278 0 :
279 0 : let http_hosts = request.http_hosts.clone();
280 :
281 : // Figure out statuses of potential donors.
282 0 : let responses: Vec<Result<TimelineStatus, client::Error>> =
283 0 : futures::future::join_all(http_hosts.iter().map(|url| async {
284 0 : let cclient = Client::new(url.clone(), sk_auth_token.clone());
285 0 : let info = cclient
286 0 : .timeline_status(request.tenant_id, request.timeline_id)
287 0 : .await?;
288 0 : Ok(info)
289 0 : }))
290 0 : .await;
291 :
292 0 : let mut statuses = Vec::new();
293 0 : for (i, response) in responses.into_iter().enumerate() {
294 0 : let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
295 0 : statuses.push((status, i));
296 : }
297 :
298 : // Find the most advanced safekeeper
299 0 : let (status, i) = statuses
300 0 : .into_iter()
301 0 : .max_by_key(|(status, _)| {
302 0 : (
303 0 : status.acceptor_state.epoch,
304 0 : status.flush_lsn,
305 0 : status.commit_lsn,
306 0 : )
307 0 : })
308 0 : .unwrap();
309 0 : let safekeeper_host = http_hosts[i].clone();
310 0 :
311 0 : assert!(status.tenant_id == request.tenant_id);
312 0 : assert!(status.timeline_id == request.timeline_id);
313 :
314 0 : pull_timeline(status, safekeeper_host, sk_auth_token).await
315 0 : }
316 :
317 0 : async fn pull_timeline(
318 0 : status: TimelineStatus,
319 0 : host: String,
320 0 : sk_auth_token: Option<SecretString>,
321 0 : ) -> Result<Response> {
322 0 : let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
323 0 : info!(
324 0 : "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
325 : ttid,
326 : host,
327 : status.commit_lsn,
328 : status.flush_lsn,
329 : status.acceptor_state.term,
330 : status.acceptor_state.epoch
331 : );
332 :
333 0 : let conf = &GlobalTimelines::get_global_config();
334 :
335 0 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
336 :
337 0 : let client = Client::new(host.clone(), sk_auth_token.clone());
338 : // Request stream with basebackup archive.
339 0 : let bb_resp = client
340 0 : .snapshot(status.tenant_id, status.timeline_id)
341 0 : .await?;
342 :
343 : // Make Stream of Bytes from it...
344 0 : let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
345 0 : // and turn it into StreamReader implementing AsyncRead.
346 0 : let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
347 :
348 : // Extract it on the fly to the disk. We don't use simple unpack() to fsync
349 : // files.
350 0 : let mut entries = Archive::new(bb_reader).entries()?;
351 0 : while let Some(base_tar_entry) = entries.next().await {
352 0 : let mut entry = base_tar_entry?;
353 0 : let header = entry.header();
354 0 : let file_path = header.path()?.into_owned();
355 0 : match header.entry_type() {
356 : tokio_tar::EntryType::Regular => {
357 0 : let utf8_file_path =
358 0 : Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
359 0 : let dst_path = tli_dir_path.join(utf8_file_path);
360 0 : let mut f = OpenOptions::new()
361 0 : .create(true)
362 0 : .truncate(true)
363 0 : .write(true)
364 0 : .open(&dst_path)
365 0 : .await?;
366 0 : tokio::io::copy(&mut entry, &mut f).await?;
367 : // fsync the file
368 0 : f.sync_all().await?;
369 : }
370 : _ => {
371 0 : bail!(
372 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
373 0 : file_path.display(),
374 0 : header.entry_type()
375 0 : );
376 : }
377 : }
378 : }
379 :
380 : // fsync temp timeline directory to remember its contents.
381 0 : fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
382 :
383 : // Let's create timeline from temp directory and verify that it's correct
384 0 : let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
385 0 : info!(
386 0 : "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
387 : ttid, commit_lsn, flush_lsn
388 : );
389 0 : assert!(status.commit_lsn <= status.flush_lsn);
390 :
391 : // Finally, load the timeline.
392 0 : let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
393 :
394 0 : Ok(Response {
395 0 : safekeeper_host: host,
396 0 : })
397 0 : }
398 :
399 : /// Create temp directory for a new timeline. It needs to be located on the same
400 : /// filesystem as the rest of the timelines. It will be automatically deleted when
401 : /// Utf8TempDir goes out of scope.
402 0 : pub async fn create_temp_timeline_dir(
403 0 : conf: &SafeKeeperConf,
404 0 : ttid: TenantTimelineId,
405 0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
406 : // conf.workdir is usually /storage/safekeeper/data
407 : // will try to transform it into /storage/safekeeper/tmp
408 0 : let temp_base = conf
409 0 : .workdir
410 0 : .parent()
411 0 : .ok_or(anyhow::anyhow!("workdir has no parent"))?
412 0 : .join("tmp");
413 0 :
414 0 : tokio::fs::create_dir_all(&temp_base).await?;
415 :
416 0 : let tli_dir = camino_tempfile::Builder::new()
417 0 : .suffix("_temptli")
418 0 : .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
419 0 : .tempdir_in(temp_base)?;
420 :
421 0 : let tli_dir_path = tli_dir.path().to_path_buf();
422 0 :
423 0 : Ok((tli_dir, tli_dir_path))
424 0 : }
425 :
426 : /// Do basic validation of a temp timeline, before moving it to the global map.
427 0 : pub async fn validate_temp_timeline(
428 0 : conf: &SafeKeeperConf,
429 0 : ttid: TenantTimelineId,
430 0 : path: &Utf8PathBuf,
431 0 : ) -> Result<(Lsn, Lsn)> {
432 0 : let control_path = path.join("safekeeper.control");
433 :
434 0 : let control_store = control_file::FileStorage::load_control_file(control_path)?;
435 0 : if control_store.server.wal_seg_size == 0 {
436 0 : bail!("wal_seg_size is not set");
437 0 : }
438 :
439 0 : let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
440 :
441 0 : let commit_lsn = control_store.commit_lsn;
442 0 : let flush_lsn = wal_store.flush_lsn();
443 0 :
444 0 : Ok((commit_lsn, flush_lsn))
445 0 : }
446 :
447 : /// Move timeline from a temp directory to the main storage, and load it to the global map.
448 : /// This operation is done under a lock to prevent bugs if several concurrent requests are
449 : /// trying to load the same timeline. Note that it doesn't guard against creating the
450 : /// timeline with the same ttid, but no one should be doing this anyway.
451 0 : pub async fn load_temp_timeline(
452 0 : conf: &SafeKeeperConf,
453 0 : ttid: TenantTimelineId,
454 0 : tmp_path: &Utf8PathBuf,
455 0 : ) -> Result<Arc<Timeline>> {
456 : // Take a lock to prevent concurrent loadings
457 0 : let load_lock = GlobalTimelines::loading_lock().await;
458 0 : let guard = load_lock.lock().await;
459 :
460 0 : if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
461 0 : bail!("timeline already exists, cannot overwrite it")
462 0 : }
463 0 :
464 0 : // Move timeline dir to the correct location
465 0 : let timeline_path = get_timeline_dir(conf, &ttid);
466 0 :
467 0 : info!(
468 0 : "moving timeline {} from {} to {}",
469 : ttid, tmp_path, timeline_path
470 : );
471 0 : tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
472 : // fsync tenant dir creation
473 0 : fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
474 0 : durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
475 :
476 0 : let tli = GlobalTimelines::load_timeline(&guard, ttid)
477 0 : .await
478 0 : .context("Failed to load timeline after copy")?;
479 :
480 0 : info!(
481 0 : "loaded timeline {}, flush_lsn={}",
482 0 : ttid,
483 0 : tli.get_flush_lsn().await
484 : );
485 :
486 0 : Ok(tli)
487 0 : }
|