Line data Source code
1 : use anyhow::{anyhow, bail, Context, Result};
2 : use bytes::Bytes;
3 : use camino::Utf8PathBuf;
4 : use camino_tempfile::Utf8TempDir;
5 : use chrono::{DateTime, Utc};
6 : use futures::{SinkExt, StreamExt, TryStreamExt};
7 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
8 : use serde::{Deserialize, Serialize};
9 : use std::{
10 : cmp::min,
11 : io::{self, ErrorKind},
12 : sync::Arc,
13 : };
14 : use tokio::{
15 : fs::{File, OpenOptions},
16 : io::AsyncWrite,
17 : sync::mpsc,
18 : task,
19 : };
20 : use tokio_tar::{Archive, Builder};
21 : use tokio_util::{
22 : io::{CopyToBytes, SinkWriter},
23 : sync::PollSender,
24 : };
25 : use tracing::{error, info, instrument};
26 :
27 : use crate::{
28 : control_file::{self, CONTROL_FILE_NAME},
29 : debug_dump,
30 : http::{
31 : client::{self, Client},
32 : routes::TimelineStatus,
33 : },
34 : safekeeper::Term,
35 : timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
36 : wal_storage::{self, open_wal_file, Storage},
37 : GlobalTimelines, SafeKeeperConf,
38 : };
39 : use utils::{
40 : crashsafe::{durable_rename, fsync_async_opt},
41 : id::{TenantId, TenantTimelineId, TimelineId},
42 : logging::SecretString,
43 : lsn::Lsn,
44 : pausable_failpoint,
45 : };
46 :
47 : /// Stream tar archive of timeline to tx.
48 0 : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
49 : pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender<Result<Bytes>>) {
50 : if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
51 : // Error type/contents don't matter as they won't can't reach the client
52 : // (hyper likely doesn't do anything with it), but http stream will be
53 : // prematurely terminated. It would be nice to try to send the error in
54 : // trailers though.
55 : tx.send(Err(anyhow!("snapshot failed"))).await.ok();
56 : error!("snapshot failed: {:#}", e);
57 : }
58 : }
59 :
60 : /// State needed while streaming the snapshot.
61 : pub struct SnapshotContext {
62 : pub from_segno: XLogSegNo, // including
63 : pub upto_segno: XLogSegNo, // including
64 : pub term: Term,
65 : pub last_log_term: Term,
66 : pub flush_lsn: Lsn,
67 : pub wal_seg_size: usize,
68 : // used to remove WAL hold off in Drop.
69 : pub tli: WalResidentTimeline,
70 : }
71 :
72 : impl Drop for SnapshotContext {
73 0 : fn drop(&mut self) {
74 0 : let tli = self.tli.clone();
75 0 : task::spawn(async move {
76 0 : let mut shared_state = tli.write_shared_state().await;
77 0 : shared_state.wal_removal_on_hold = false;
78 0 : });
79 0 : }
80 : }
81 :
82 0 : pub async fn stream_snapshot_guts(
83 0 : tli: WalResidentTimeline,
84 0 : tx: mpsc::Sender<Result<Bytes>>,
85 0 : ) -> Result<()> {
86 0 : // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
87 0 : // use SinkWriter as a Write impl. That is,
88 0 : // - create Sink from the tx. It returns PollSendError if chan is closed.
89 0 : let sink = PollSender::new(tx);
90 0 : // - SinkWriter needs sink error to be io one, map it.
91 0 : let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
92 0 : // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
93 0 : // it with with(). Note that with() accepts async function which we don't
94 0 : // need and allows the map to fail, which we don't need either, but hence
95 0 : // two Oks.
96 0 : let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
97 0 : // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
98 0 : // into CopyToBytes. This is a data copy.
99 0 : let copy_to_bytes = CopyToBytes::new(oksink);
100 0 : let mut writer = SinkWriter::new(copy_to_bytes);
101 0 : let pinned_writer = std::pin::pin!(writer);
102 0 :
103 0 : // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
104 0 : // which is also likely suboptimal.
105 0 : let mut ar = Builder::new_non_terminated(pinned_writer);
106 :
107 0 : let bctx = tli.start_snapshot(&mut ar).await?;
108 : pausable_failpoint!("sk-snapshot-after-list-pausable");
109 :
110 0 : let tli_dir = tli.get_timeline_dir();
111 0 : info!(
112 0 : "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
113 0 : bctx.upto_segno - bctx.from_segno + 1,
114 : bctx.from_segno,
115 : bctx.upto_segno,
116 : bctx.term,
117 : bctx.last_log_term,
118 : bctx.flush_lsn,
119 : );
120 0 : for segno in bctx.from_segno..=bctx.upto_segno {
121 0 : let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
122 0 : let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
123 0 : if is_partial {
124 0 : wal_file_name.push_str(".partial");
125 0 : }
126 0 : ar.append_file(&wal_file_name, &mut sf).await?;
127 : }
128 :
129 : // Do the term check before ar.finish to make archive corrupted in case of
130 : // term change. Client shouldn't ignore abrupt stream end, but to be sure.
131 0 : tli.finish_snapshot(&bctx).await?;
132 :
133 0 : ar.finish().await?;
134 :
135 0 : Ok(())
136 0 : }
137 :
138 : impl WalResidentTimeline {
139 : /// Start streaming tar archive with timeline:
140 : /// 1) stream control file under lock;
141 : /// 2) hold off WAL removal;
142 : /// 3) collect SnapshotContext to understand which WAL segments should be
143 : /// streamed.
144 : ///
145 : /// Snapshot streams data up to flush_lsn. To make this safe, we must check
146 : /// that term doesn't change during the procedure, or we risk sending mix of
147 : /// WAL from different histories. Term is remembered in the SnapshotContext
148 : /// and checked in finish_snapshot. Note that in the last segment some WAL
149 : /// higher than flush_lsn set here might be streamed; that's fine as long as
150 : /// terms doesn't change.
151 : ///
152 : /// Alternatively we could send only up to commit_lsn to get some valid
153 : /// state which later will be recovered by compute, in this case term check
154 : /// is not needed, but we likely don't want that as there might be no
155 : /// compute which could perform the recovery.
156 : ///
157 : /// When returned SnapshotContext is dropped WAL hold is removed.
158 0 : async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
159 0 : &self,
160 0 : ar: &mut tokio_tar::Builder<W>,
161 0 : ) -> Result<SnapshotContext> {
162 0 : let mut shared_state = self.write_shared_state().await;
163 0 : let wal_seg_size = shared_state.get_wal_seg_size();
164 0 :
165 0 : let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
166 0 : let mut cf = File::open(cf_path).await?;
167 0 : ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
168 :
169 : // We need to stream since the oldest segment someone (s3 or pageserver)
170 : // still needs. This duplicates calc_horizon_lsn logic.
171 : //
172 : // We know that WAL wasn't removed up to this point because it cannot be
173 : // removed further than `backup_lsn`. Since we're holding shared_state
174 : // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
175 : // won't be removed until we're done.
176 0 : let from_lsn = min(
177 0 : shared_state.sk.state().remote_consistent_lsn,
178 0 : shared_state.sk.state().backup_lsn,
179 0 : );
180 0 : if from_lsn == Lsn::INVALID {
181 : // this is possible if snapshot is called before handling first
182 : // elected message
183 0 : bail!("snapshot is called on uninitialized timeline");
184 0 : }
185 0 : let from_segno = from_lsn.segment_number(wal_seg_size);
186 0 : let term = shared_state.sk.state().acceptor_state.term;
187 0 : let last_log_term = shared_state.sk.last_log_term();
188 0 : let flush_lsn = shared_state.sk.flush_lsn();
189 0 : let upto_segno = flush_lsn.segment_number(wal_seg_size);
190 0 : // have some limit on max number of segments as a sanity check
191 0 : const MAX_ALLOWED_SEGS: u64 = 1000;
192 0 : let num_segs = upto_segno - from_segno + 1;
193 0 : if num_segs > MAX_ALLOWED_SEGS {
194 0 : bail!(
195 0 : "snapshot is called on timeline with {} segments, but the limit is {}",
196 0 : num_segs,
197 0 : MAX_ALLOWED_SEGS
198 0 : );
199 0 : }
200 0 :
201 0 : // Prevent WAL removal while we're streaming data.
202 0 : //
203 0 : // Since this a flag, not a counter just bail out if already set; we
204 0 : // shouldn't need concurrent snapshotting.
205 0 : if shared_state.wal_removal_on_hold {
206 0 : bail!("wal_removal_on_hold is already true");
207 0 : }
208 0 : shared_state.wal_removal_on_hold = true;
209 0 :
210 0 : // Drop shared_state to release the lock, before calling wal_residence_guard().
211 0 : drop(shared_state);
212 :
213 0 : let tli_copy = self.wal_residence_guard().await?;
214 0 : let bctx = SnapshotContext {
215 0 : from_segno,
216 0 : upto_segno,
217 0 : term,
218 0 : last_log_term,
219 0 : flush_lsn,
220 0 : wal_seg_size,
221 0 : tli: tli_copy,
222 0 : };
223 0 :
224 0 : Ok(bctx)
225 0 : }
226 :
227 : /// Finish snapshotting: check that term(s) hasn't changed.
228 : ///
229 : /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
230 : /// forget this if snapshotting fails mid the way.
231 0 : pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
232 0 : let shared_state = self.read_shared_state().await;
233 0 : let term = shared_state.sk.state().acceptor_state.term;
234 0 : let last_log_term = shared_state.sk.last_log_term();
235 0 : // There are some cases to relax this check (e.g. last_log_term might
236 0 : // change, but as long as older history is strictly part of new that's
237 0 : // fine), but there is no need to do it.
238 0 : if bctx.term != term || bctx.last_log_term != last_log_term {
239 0 : bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
240 0 : bctx.term, bctx.last_log_term, term, last_log_term);
241 0 : }
242 0 : Ok(())
243 0 : }
244 : }
245 :
246 : /// pull_timeline request body.
247 0 : #[derive(Debug, Serialize, Deserialize)]
248 : pub struct Request {
249 : pub tenant_id: TenantId,
250 : pub timeline_id: TimelineId,
251 : pub http_hosts: Vec<String>,
252 : }
253 :
254 : #[derive(Debug, Serialize)]
255 : pub struct Response {
256 : // Donor safekeeper host
257 : pub safekeeper_host: String,
258 : // TODO: add more fields?
259 : }
260 :
261 : /// Response for debug dump request.
262 0 : #[derive(Debug, Serialize, Deserialize)]
263 : pub struct DebugDumpResponse {
264 : pub start_time: DateTime<Utc>,
265 : pub finish_time: DateTime<Utc>,
266 : pub timelines: Vec<debug_dump::Timeline>,
267 : pub timelines_count: usize,
268 : pub config: debug_dump::Config,
269 : }
270 :
271 : /// Find the most advanced safekeeper and pull timeline from it.
272 0 : pub async fn handle_request(
273 0 : request: Request,
274 0 : sk_auth_token: Option<SecretString>,
275 0 : ) -> Result<Response> {
276 0 : let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
277 0 : request.tenant_id,
278 0 : request.timeline_id,
279 0 : ));
280 0 : if existing_tli.is_ok() {
281 0 : bail!("Timeline {} already exists", request.timeline_id);
282 0 : }
283 0 :
284 0 : let http_hosts = request.http_hosts.clone();
285 :
286 : // Figure out statuses of potential donors.
287 0 : let responses: Vec<Result<TimelineStatus, client::Error>> =
288 0 : futures::future::join_all(http_hosts.iter().map(|url| async {
289 0 : let cclient = Client::new(url.clone(), sk_auth_token.clone());
290 0 : let info = cclient
291 0 : .timeline_status(request.tenant_id, request.timeline_id)
292 0 : .await?;
293 0 : Ok(info)
294 0 : }))
295 0 : .await;
296 :
297 0 : let mut statuses = Vec::new();
298 0 : for (i, response) in responses.into_iter().enumerate() {
299 0 : let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
300 0 : statuses.push((status, i));
301 : }
302 :
303 : // Find the most advanced safekeeper
304 0 : let (status, i) = statuses
305 0 : .into_iter()
306 0 : .max_by_key(|(status, _)| {
307 0 : (
308 0 : status.acceptor_state.epoch,
309 0 : status.flush_lsn,
310 0 : status.commit_lsn,
311 0 : )
312 0 : })
313 0 : .unwrap();
314 0 : let safekeeper_host = http_hosts[i].clone();
315 0 :
316 0 : assert!(status.tenant_id == request.tenant_id);
317 0 : assert!(status.timeline_id == request.timeline_id);
318 :
319 0 : pull_timeline(status, safekeeper_host, sk_auth_token).await
320 0 : }
321 :
322 0 : async fn pull_timeline(
323 0 : status: TimelineStatus,
324 0 : host: String,
325 0 : sk_auth_token: Option<SecretString>,
326 0 : ) -> Result<Response> {
327 0 : let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
328 0 : info!(
329 0 : "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
330 : ttid,
331 : host,
332 : status.commit_lsn,
333 : status.flush_lsn,
334 : status.acceptor_state.term,
335 : status.acceptor_state.epoch
336 : );
337 :
338 0 : let conf = &GlobalTimelines::get_global_config();
339 :
340 0 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
341 :
342 0 : let client = Client::new(host.clone(), sk_auth_token.clone());
343 : // Request stream with basebackup archive.
344 0 : let bb_resp = client
345 0 : .snapshot(status.tenant_id, status.timeline_id)
346 0 : .await?;
347 :
348 : // Make Stream of Bytes from it...
349 0 : let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
350 0 : // and turn it into StreamReader implementing AsyncRead.
351 0 : let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
352 :
353 : // Extract it on the fly to the disk. We don't use simple unpack() to fsync
354 : // files.
355 0 : let mut entries = Archive::new(bb_reader).entries()?;
356 0 : while let Some(base_tar_entry) = entries.next().await {
357 0 : let mut entry = base_tar_entry?;
358 0 : let header = entry.header();
359 0 : let file_path = header.path()?.into_owned();
360 0 : match header.entry_type() {
361 : tokio_tar::EntryType::Regular => {
362 0 : let utf8_file_path =
363 0 : Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
364 0 : let dst_path = tli_dir_path.join(utf8_file_path);
365 0 : let mut f = OpenOptions::new()
366 0 : .create(true)
367 0 : .truncate(true)
368 0 : .write(true)
369 0 : .open(&dst_path)
370 0 : .await?;
371 0 : tokio::io::copy(&mut entry, &mut f).await?;
372 : // fsync the file
373 0 : f.sync_all().await?;
374 : }
375 : _ => {
376 0 : bail!(
377 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
378 0 : file_path.display(),
379 0 : header.entry_type()
380 0 : );
381 : }
382 : }
383 : }
384 :
385 : // fsync temp timeline directory to remember its contents.
386 0 : fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
387 :
388 : // Let's create timeline from temp directory and verify that it's correct
389 0 : let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
390 0 : info!(
391 0 : "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
392 : ttid, commit_lsn, flush_lsn
393 : );
394 0 : assert!(status.commit_lsn <= status.flush_lsn);
395 :
396 : // Finally, load the timeline.
397 0 : let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
398 :
399 0 : Ok(Response {
400 0 : safekeeper_host: host,
401 0 : })
402 0 : }
403 :
404 : /// Create temp directory for a new timeline. It needs to be located on the same
405 : /// filesystem as the rest of the timelines. It will be automatically deleted when
406 : /// Utf8TempDir goes out of scope.
407 0 : pub async fn create_temp_timeline_dir(
408 0 : conf: &SafeKeeperConf,
409 0 : ttid: TenantTimelineId,
410 0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
411 : // conf.workdir is usually /storage/safekeeper/data
412 : // will try to transform it into /storage/safekeeper/tmp
413 0 : let temp_base = conf
414 0 : .workdir
415 0 : .parent()
416 0 : .ok_or(anyhow::anyhow!("workdir has no parent"))?
417 0 : .join("tmp");
418 0 :
419 0 : tokio::fs::create_dir_all(&temp_base).await?;
420 :
421 0 : let tli_dir = camino_tempfile::Builder::new()
422 0 : .suffix("_temptli")
423 0 : .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
424 0 : .tempdir_in(temp_base)?;
425 :
426 0 : let tli_dir_path = tli_dir.path().to_path_buf();
427 0 :
428 0 : Ok((tli_dir, tli_dir_path))
429 0 : }
430 :
431 : /// Do basic validation of a temp timeline, before moving it to the global map.
432 0 : pub async fn validate_temp_timeline(
433 0 : conf: &SafeKeeperConf,
434 0 : ttid: TenantTimelineId,
435 0 : path: &Utf8PathBuf,
436 0 : ) -> Result<(Lsn, Lsn)> {
437 0 : let control_path = path.join("safekeeper.control");
438 :
439 0 : let control_store = control_file::FileStorage::load_control_file(control_path)?;
440 0 : if control_store.server.wal_seg_size == 0 {
441 0 : bail!("wal_seg_size is not set");
442 0 : }
443 :
444 0 : let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
445 :
446 0 : let commit_lsn = control_store.commit_lsn;
447 0 : let flush_lsn = wal_store.flush_lsn();
448 0 :
449 0 : Ok((commit_lsn, flush_lsn))
450 0 : }
451 :
452 : /// Move timeline from a temp directory to the main storage, and load it to the global map.
453 : /// This operation is done under a lock to prevent bugs if several concurrent requests are
454 : /// trying to load the same timeline. Note that it doesn't guard against creating the
455 : /// timeline with the same ttid, but no one should be doing this anyway.
456 0 : pub async fn load_temp_timeline(
457 0 : conf: &SafeKeeperConf,
458 0 : ttid: TenantTimelineId,
459 0 : tmp_path: &Utf8PathBuf,
460 0 : ) -> Result<Arc<Timeline>> {
461 : // Take a lock to prevent concurrent loadings
462 0 : let load_lock = GlobalTimelines::loading_lock().await;
463 0 : let guard = load_lock.lock().await;
464 :
465 0 : if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
466 0 : bail!("timeline already exists, cannot overwrite it")
467 0 : }
468 0 :
469 0 : // Move timeline dir to the correct location
470 0 : let timeline_path = get_timeline_dir(conf, &ttid);
471 0 :
472 0 : info!(
473 0 : "moving timeline {} from {} to {}",
474 : ttid, tmp_path, timeline_path
475 : );
476 0 : tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
477 : // fsync tenant dir creation
478 0 : fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
479 0 : durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
480 :
481 0 : let tli = GlobalTimelines::load_timeline(&guard, ttid)
482 0 : .await
483 0 : .context("Failed to load timeline after copy")?;
484 :
485 0 : info!(
486 0 : "loaded timeline {}, flush_lsn={}",
487 0 : ttid,
488 0 : tli.get_flush_lsn().await
489 : );
490 :
491 0 : Ok(tli)
492 0 : }
|