Line data Source code
1 : use anyhow::{anyhow, bail, Context, Result};
2 : use bytes::Bytes;
3 : use camino::Utf8PathBuf;
4 : use camino_tempfile::Utf8TempDir;
5 : use chrono::{DateTime, Utc};
6 : use futures::{SinkExt, StreamExt, TryStreamExt};
7 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
8 : use serde::{Deserialize, Serialize};
9 : use std::{
10 : cmp::min,
11 : io::{self, ErrorKind},
12 : sync::Arc,
13 : };
14 : use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
15 : use tokio_tar::{Archive, Builder, Header};
16 : use tokio_util::{
17 : io::{CopyToBytes, SinkWriter},
18 : sync::PollSender,
19 : };
20 : use tracing::{error, info, instrument};
21 :
22 : use crate::{
23 : control_file::{self, CONTROL_FILE_NAME},
24 : debug_dump,
25 : http::{
26 : client::{self, Client},
27 : routes::TimelineStatus,
28 : },
29 : safekeeper::Term,
30 : state::TimelinePersistentState,
31 : timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
32 : wal_backup,
33 : wal_storage::{self, open_wal_file, Storage},
34 : GlobalTimelines, SafeKeeperConf,
35 : };
36 : use utils::{
37 : crashsafe::{durable_rename, fsync_async_opt},
38 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
39 : logging::SecretString,
40 : lsn::Lsn,
41 : pausable_failpoint,
42 : };
43 :
44 : /// Stream tar archive of timeline to tx.
45 0 : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
46 : pub async fn stream_snapshot(
47 : tli: WalResidentTimeline,
48 : source: NodeId,
49 : destination: NodeId,
50 : tx: mpsc::Sender<Result<Bytes>>,
51 : ) {
52 : if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
53 : // Error type/contents don't matter as they won't can't reach the client
54 : // (hyper likely doesn't do anything with it), but http stream will be
55 : // prematurely terminated. It would be nice to try to send the error in
56 : // trailers though.
57 : tx.send(Err(anyhow!("snapshot failed"))).await.ok();
58 : error!("snapshot failed: {:#}", e);
59 : }
60 : }
61 :
62 : /// State needed while streaming the snapshot.
63 : pub struct SnapshotContext {
64 : pub from_segno: XLogSegNo, // including
65 : pub upto_segno: XLogSegNo, // including
66 : pub term: Term,
67 : pub last_log_term: Term,
68 : pub flush_lsn: Lsn,
69 : pub wal_seg_size: usize,
70 : // used to remove WAL hold off in Drop.
71 : pub tli: WalResidentTimeline,
72 : }
73 :
74 : impl Drop for SnapshotContext {
75 0 : fn drop(&mut self) {
76 0 : let tli = self.tli.clone();
77 0 : task::spawn(async move {
78 0 : let mut shared_state = tli.write_shared_state().await;
79 0 : shared_state.wal_removal_on_hold = false;
80 0 : });
81 0 : }
82 : }
83 :
84 0 : pub async fn stream_snapshot_guts(
85 0 : tli: WalResidentTimeline,
86 0 : source: NodeId,
87 0 : destination: NodeId,
88 0 : tx: mpsc::Sender<Result<Bytes>>,
89 0 : ) -> Result<()> {
90 0 : // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
91 0 : // use SinkWriter as a Write impl. That is,
92 0 : // - create Sink from the tx. It returns PollSendError if chan is closed.
93 0 : let sink = PollSender::new(tx);
94 0 : // - SinkWriter needs sink error to be io one, map it.
95 0 : let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
96 0 : // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
97 0 : // it with with(). Note that with() accepts async function which we don't
98 0 : // need and allows the map to fail, which we don't need either, but hence
99 0 : // two Oks.
100 0 : let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
101 0 : // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
102 0 : // into CopyToBytes. This is a data copy.
103 0 : let copy_to_bytes = CopyToBytes::new(oksink);
104 0 : let mut writer = SinkWriter::new(copy_to_bytes);
105 0 : let pinned_writer = std::pin::pin!(writer);
106 0 :
107 0 : // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
108 0 : // which is also likely suboptimal.
109 0 : let mut ar = Builder::new_non_terminated(pinned_writer);
110 :
111 0 : let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
112 0 : pausable_failpoint!("sk-snapshot-after-list-pausable");
113 :
114 0 : let tli_dir = tli.get_timeline_dir();
115 0 : info!(
116 0 : "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
117 0 : bctx.upto_segno - bctx.from_segno + 1,
118 : bctx.from_segno,
119 : bctx.upto_segno,
120 : bctx.term,
121 : bctx.last_log_term,
122 : bctx.flush_lsn,
123 : );
124 0 : for segno in bctx.from_segno..=bctx.upto_segno {
125 0 : let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
126 0 : let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
127 0 : if is_partial {
128 0 : wal_file_name.push_str(".partial");
129 0 : }
130 0 : ar.append_file(&wal_file_name, &mut sf).await?;
131 : }
132 :
133 : // Do the term check before ar.finish to make archive corrupted in case of
134 : // term change. Client shouldn't ignore abrupt stream end, but to be sure.
135 0 : tli.finish_snapshot(&bctx).await?;
136 :
137 0 : ar.finish().await?;
138 :
139 0 : Ok(())
140 0 : }
141 :
142 : impl WalResidentTimeline {
143 : /// Start streaming tar archive with timeline:
144 : /// 1) stream control file under lock;
145 : /// 2) hold off WAL removal;
146 : /// 3) collect SnapshotContext to understand which WAL segments should be
147 : /// streamed.
148 : ///
149 : /// Snapshot streams data up to flush_lsn. To make this safe, we must check
150 : /// that term doesn't change during the procedure, or we risk sending mix of
151 : /// WAL from different histories. Term is remembered in the SnapshotContext
152 : /// and checked in finish_snapshot. Note that in the last segment some WAL
153 : /// higher than flush_lsn set here might be streamed; that's fine as long as
154 : /// terms doesn't change.
155 : ///
156 : /// Alternatively we could send only up to commit_lsn to get some valid
157 : /// state which later will be recovered by compute, in this case term check
158 : /// is not needed, but we likely don't want that as there might be no
159 : /// compute which could perform the recovery.
160 : ///
161 : /// When returned SnapshotContext is dropped WAL hold is removed.
162 0 : async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
163 0 : &self,
164 0 : ar: &mut tokio_tar::Builder<W>,
165 0 : source: NodeId,
166 0 : destination: NodeId,
167 0 : ) -> Result<SnapshotContext> {
168 0 : let mut shared_state = self.write_shared_state().await;
169 0 : let wal_seg_size = shared_state.get_wal_seg_size();
170 0 :
171 0 : let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
172 : // Modify the partial segment of the in-memory copy for the control file to
173 : // point to the destination safekeeper.
174 0 : let replace = control_store
175 0 : .partial_backup
176 0 : .replace_uploaded_segment(source, destination)?;
177 :
178 0 : if let Some(replace) = replace {
179 : // The deserialized control file has an uploaded partial. We upload a copy
180 : // of it to object storage for the destination safekeeper and send an updated
181 : // control file in the snapshot.
182 0 : tracing::info!(
183 0 : "Replacing uploaded partial segment in in-mem control file: {replace:?}"
184 : );
185 :
186 0 : let remote_timeline_path = &self.tli.remote_path;
187 0 : wal_backup::copy_partial_segment(
188 0 : &replace.previous.remote_path(remote_timeline_path),
189 0 : &replace.current.remote_path(remote_timeline_path),
190 0 : )
191 0 : .await?;
192 0 : }
193 :
194 0 : let buf = control_store
195 0 : .write_to_buf()
196 0 : .with_context(|| "failed to serialize control store")?;
197 0 : let mut header = Header::new_gnu();
198 0 : header.set_size(buf.len().try_into().expect("never breaches u64"));
199 0 : ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
200 0 : .await
201 0 : .with_context(|| "failed to append to archive")?;
202 :
203 : // We need to stream since the oldest segment someone (s3 or pageserver)
204 : // still needs. This duplicates calc_horizon_lsn logic.
205 : //
206 : // We know that WAL wasn't removed up to this point because it cannot be
207 : // removed further than `backup_lsn`. Since we're holding shared_state
208 : // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
209 : // won't be removed until we're done.
210 0 : let from_lsn = min(
211 0 : shared_state.sk.state().remote_consistent_lsn,
212 0 : shared_state.sk.state().backup_lsn,
213 0 : );
214 0 : if from_lsn == Lsn::INVALID {
215 : // this is possible if snapshot is called before handling first
216 : // elected message
217 0 : bail!("snapshot is called on uninitialized timeline");
218 0 : }
219 0 : let from_segno = from_lsn.segment_number(wal_seg_size);
220 0 : let term = shared_state.sk.state().acceptor_state.term;
221 0 : let last_log_term = shared_state.sk.last_log_term();
222 0 : let flush_lsn = shared_state.sk.flush_lsn();
223 0 : let upto_segno = flush_lsn.segment_number(wal_seg_size);
224 : // have some limit on max number of segments as a sanity check
225 : const MAX_ALLOWED_SEGS: u64 = 1000;
226 0 : let num_segs = upto_segno - from_segno + 1;
227 0 : if num_segs > MAX_ALLOWED_SEGS {
228 0 : bail!(
229 0 : "snapshot is called on timeline with {} segments, but the limit is {}",
230 0 : num_segs,
231 0 : MAX_ALLOWED_SEGS
232 0 : );
233 0 : }
234 0 :
235 0 : // Prevent WAL removal while we're streaming data.
236 0 : //
237 0 : // Since this a flag, not a counter just bail out if already set; we
238 0 : // shouldn't need concurrent snapshotting.
239 0 : if shared_state.wal_removal_on_hold {
240 0 : bail!("wal_removal_on_hold is already true");
241 0 : }
242 0 : shared_state.wal_removal_on_hold = true;
243 0 :
244 0 : // Drop shared_state to release the lock, before calling wal_residence_guard().
245 0 : drop(shared_state);
246 :
247 0 : let tli_copy = self.wal_residence_guard().await?;
248 0 : let bctx = SnapshotContext {
249 0 : from_segno,
250 0 : upto_segno,
251 0 : term,
252 0 : last_log_term,
253 0 : flush_lsn,
254 0 : wal_seg_size,
255 0 : tli: tli_copy,
256 0 : };
257 0 :
258 0 : Ok(bctx)
259 0 : }
260 :
261 : /// Finish snapshotting: check that term(s) hasn't changed.
262 : ///
263 : /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
264 : /// forget this if snapshotting fails mid the way.
265 0 : pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
266 0 : let shared_state = self.read_shared_state().await;
267 0 : let term = shared_state.sk.state().acceptor_state.term;
268 0 : let last_log_term = shared_state.sk.last_log_term();
269 0 : // There are some cases to relax this check (e.g. last_log_term might
270 0 : // change, but as long as older history is strictly part of new that's
271 0 : // fine), but there is no need to do it.
272 0 : if bctx.term != term || bctx.last_log_term != last_log_term {
273 0 : bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
274 0 : bctx.term, bctx.last_log_term, term, last_log_term);
275 0 : }
276 0 : Ok(())
277 0 : }
278 : }
279 :
280 : /// pull_timeline request body.
281 0 : #[derive(Debug, Deserialize)]
282 : pub struct Request {
283 : pub tenant_id: TenantId,
284 : pub timeline_id: TimelineId,
285 : pub http_hosts: Vec<String>,
286 : }
287 :
288 : #[derive(Debug, Serialize)]
289 : pub struct Response {
290 : // Donor safekeeper host
291 : pub safekeeper_host: String,
292 : // TODO: add more fields?
293 : }
294 :
295 : /// Response for debug dump request.
296 0 : #[derive(Debug, Deserialize)]
297 : pub struct DebugDumpResponse {
298 : pub start_time: DateTime<Utc>,
299 : pub finish_time: DateTime<Utc>,
300 : pub timelines: Vec<debug_dump::Timeline>,
301 : pub timelines_count: usize,
302 : pub config: debug_dump::Config,
303 : }
304 :
305 : /// Find the most advanced safekeeper and pull timeline from it.
306 0 : pub async fn handle_request(
307 0 : request: Request,
308 0 : sk_auth_token: Option<SecretString>,
309 0 : ) -> Result<Response> {
310 0 : let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
311 0 : request.tenant_id,
312 0 : request.timeline_id,
313 0 : ));
314 0 : if existing_tli.is_ok() {
315 0 : bail!("Timeline {} already exists", request.timeline_id);
316 0 : }
317 0 :
318 0 : let http_hosts = request.http_hosts.clone();
319 :
320 : // Figure out statuses of potential donors.
321 0 : let responses: Vec<Result<TimelineStatus, client::Error>> =
322 0 : futures::future::join_all(http_hosts.iter().map(|url| async {
323 0 : let cclient = Client::new(url.clone(), sk_auth_token.clone());
324 0 : let info = cclient
325 0 : .timeline_status(request.tenant_id, request.timeline_id)
326 0 : .await?;
327 0 : Ok(info)
328 0 : }))
329 0 : .await;
330 :
331 0 : let mut statuses = Vec::new();
332 0 : for (i, response) in responses.into_iter().enumerate() {
333 0 : let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
334 0 : statuses.push((status, i));
335 : }
336 :
337 : // Find the most advanced safekeeper
338 0 : let (status, i) = statuses
339 0 : .into_iter()
340 0 : .max_by_key(|(status, _)| {
341 0 : (
342 0 : status.acceptor_state.epoch,
343 0 : status.flush_lsn,
344 0 : status.commit_lsn,
345 0 : )
346 0 : })
347 0 : .unwrap();
348 0 : let safekeeper_host = http_hosts[i].clone();
349 0 :
350 0 : assert!(status.tenant_id == request.tenant_id);
351 0 : assert!(status.timeline_id == request.timeline_id);
352 :
353 0 : pull_timeline(status, safekeeper_host, sk_auth_token).await
354 0 : }
355 :
356 0 : async fn pull_timeline(
357 0 : status: TimelineStatus,
358 0 : host: String,
359 0 : sk_auth_token: Option<SecretString>,
360 0 : ) -> Result<Response> {
361 0 : let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
362 0 : info!(
363 0 : "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
364 : ttid,
365 : host,
366 : status.commit_lsn,
367 : status.flush_lsn,
368 : status.acceptor_state.term,
369 : status.acceptor_state.epoch
370 : );
371 :
372 0 : let conf = &GlobalTimelines::get_global_config();
373 :
374 0 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
375 :
376 0 : let client = Client::new(host.clone(), sk_auth_token.clone());
377 : // Request stream with basebackup archive.
378 0 : let bb_resp = client
379 0 : .snapshot(status.tenant_id, status.timeline_id, conf.my_id)
380 0 : .await?;
381 :
382 : // Make Stream of Bytes from it...
383 0 : let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
384 0 : // and turn it into StreamReader implementing AsyncRead.
385 0 : let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
386 :
387 : // Extract it on the fly to the disk. We don't use simple unpack() to fsync
388 : // files.
389 0 : let mut entries = Archive::new(bb_reader).entries()?;
390 0 : while let Some(base_tar_entry) = entries.next().await {
391 0 : let mut entry = base_tar_entry?;
392 0 : let header = entry.header();
393 0 : let file_path = header.path()?.into_owned();
394 0 : match header.entry_type() {
395 : tokio_tar::EntryType::Regular => {
396 0 : let utf8_file_path =
397 0 : Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
398 0 : let dst_path = tli_dir_path.join(utf8_file_path);
399 0 : let mut f = OpenOptions::new()
400 0 : .create(true)
401 0 : .truncate(true)
402 0 : .write(true)
403 0 : .open(&dst_path)
404 0 : .await?;
405 0 : tokio::io::copy(&mut entry, &mut f).await?;
406 : // fsync the file
407 0 : f.sync_all().await?;
408 : }
409 : _ => {
410 0 : bail!(
411 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
412 0 : file_path.display(),
413 0 : header.entry_type()
414 0 : );
415 : }
416 : }
417 : }
418 :
419 : // fsync temp timeline directory to remember its contents.
420 0 : fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
421 :
422 : // Let's create timeline from temp directory and verify that it's correct
423 0 : let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
424 0 : info!(
425 0 : "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
426 : ttid, commit_lsn, flush_lsn
427 : );
428 0 : assert!(status.commit_lsn <= status.flush_lsn);
429 :
430 : // Finally, load the timeline.
431 0 : let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
432 :
433 0 : Ok(Response {
434 0 : safekeeper_host: host,
435 0 : })
436 0 : }
437 :
438 : /// Create temp directory for a new timeline. It needs to be located on the same
439 : /// filesystem as the rest of the timelines. It will be automatically deleted when
440 : /// Utf8TempDir goes out of scope.
441 0 : pub async fn create_temp_timeline_dir(
442 0 : conf: &SafeKeeperConf,
443 0 : ttid: TenantTimelineId,
444 0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
445 : // conf.workdir is usually /storage/safekeeper/data
446 : // will try to transform it into /storage/safekeeper/tmp
447 0 : let temp_base = conf
448 0 : .workdir
449 0 : .parent()
450 0 : .ok_or(anyhow::anyhow!("workdir has no parent"))?
451 0 : .join("tmp");
452 0 :
453 0 : tokio::fs::create_dir_all(&temp_base).await?;
454 :
455 0 : let tli_dir = camino_tempfile::Builder::new()
456 0 : .suffix("_temptli")
457 0 : .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
458 0 : .tempdir_in(temp_base)?;
459 :
460 0 : let tli_dir_path = tli_dir.path().to_path_buf();
461 0 :
462 0 : Ok((tli_dir, tli_dir_path))
463 0 : }
464 :
465 : /// Do basic validation of a temp timeline, before moving it to the global map.
466 0 : pub async fn validate_temp_timeline(
467 0 : conf: &SafeKeeperConf,
468 0 : ttid: TenantTimelineId,
469 0 : path: &Utf8PathBuf,
470 0 : ) -> Result<(Lsn, Lsn)> {
471 0 : let control_path = path.join("safekeeper.control");
472 :
473 0 : let control_store = control_file::FileStorage::load_control_file(control_path)?;
474 0 : if control_store.server.wal_seg_size == 0 {
475 0 : bail!("wal_seg_size is not set");
476 0 : }
477 :
478 0 : let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
479 :
480 0 : let commit_lsn = control_store.commit_lsn;
481 0 : let flush_lsn = wal_store.flush_lsn();
482 0 :
483 0 : Ok((commit_lsn, flush_lsn))
484 0 : }
485 :
486 : /// Move timeline from a temp directory to the main storage, and load it to the global map.
487 : ///
488 : /// This operation is done under a lock to prevent bugs if several concurrent requests are
489 : /// trying to load the same timeline. Note that it doesn't guard against creating the
490 : /// timeline with the same ttid, but no one should be doing this anyway.
491 0 : pub async fn load_temp_timeline(
492 0 : conf: &SafeKeeperConf,
493 0 : ttid: TenantTimelineId,
494 0 : tmp_path: &Utf8PathBuf,
495 0 : ) -> Result<Arc<Timeline>> {
496 : // Take a lock to prevent concurrent loadings
497 0 : let load_lock = GlobalTimelines::loading_lock().await;
498 0 : let guard = load_lock.lock().await;
499 :
500 0 : if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
501 0 : bail!("timeline already exists, cannot overwrite it")
502 0 : }
503 0 :
504 0 : // Move timeline dir to the correct location
505 0 : let timeline_path = get_timeline_dir(conf, &ttid);
506 0 :
507 0 : info!(
508 0 : "moving timeline {} from {} to {}",
509 : ttid, tmp_path, timeline_path
510 : );
511 0 : tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
512 : // fsync tenant dir creation
513 0 : fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
514 0 : durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
515 :
516 0 : let tli = GlobalTimelines::load_timeline(&guard, ttid)
517 0 : .await
518 0 : .context("Failed to load timeline after copy")?;
519 :
520 0 : info!(
521 0 : "loaded timeline {}, flush_lsn={}",
522 0 : ttid,
523 0 : tli.get_flush_lsn().await
524 : );
525 :
526 0 : Ok(tli)
527 0 : }
|