Line data Source code
1 : use anyhow::{anyhow, bail, Context, Result};
2 : use bytes::Bytes;
3 : use camino::Utf8PathBuf;
4 : use chrono::{DateTime, Utc};
5 : use futures::{SinkExt, StreamExt, TryStreamExt};
6 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
7 : use serde::{Deserialize, Serialize};
8 : use std::{
9 : cmp::min,
10 : io::{self, ErrorKind},
11 : };
12 : use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
13 : use tokio_tar::{Archive, Builder, Header};
14 : use tokio_util::{
15 : io::{CopyToBytes, SinkWriter},
16 : sync::PollSender,
17 : };
18 : use tracing::{error, info, instrument};
19 :
20 : use crate::{
21 : control_file::CONTROL_FILE_NAME,
22 : debug_dump,
23 : http::{
24 : client::{self, Client},
25 : routes::TimelineStatus,
26 : },
27 : safekeeper::Term,
28 : state::TimelinePersistentState,
29 : timeline::WalResidentTimeline,
30 : timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
31 : wal_backup,
32 : wal_storage::open_wal_file,
33 : GlobalTimelines,
34 : };
35 : use utils::{
36 : crashsafe::fsync_async_opt,
37 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
38 : logging::SecretString,
39 : lsn::Lsn,
40 : pausable_failpoint,
41 : };
42 :
43 : /// Stream tar archive of timeline to tx.
44 0 : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
45 : pub async fn stream_snapshot(
46 : tli: WalResidentTimeline,
47 : source: NodeId,
48 : destination: NodeId,
49 : tx: mpsc::Sender<Result<Bytes>>,
50 : ) {
51 : if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
52 : // Error type/contents don't matter as they won't can't reach the client
53 : // (hyper likely doesn't do anything with it), but http stream will be
54 : // prematurely terminated. It would be nice to try to send the error in
55 : // trailers though.
56 : tx.send(Err(anyhow!("snapshot failed"))).await.ok();
57 : error!("snapshot failed: {:#}", e);
58 : }
59 : }
60 :
61 : /// State needed while streaming the snapshot.
62 : pub struct SnapshotContext {
63 : pub from_segno: XLogSegNo, // including
64 : pub upto_segno: XLogSegNo, // including
65 : pub term: Term,
66 : pub last_log_term: Term,
67 : pub flush_lsn: Lsn,
68 : pub wal_seg_size: usize,
69 : // used to remove WAL hold off in Drop.
70 : pub tli: WalResidentTimeline,
71 : }
72 :
73 : impl Drop for SnapshotContext {
74 0 : fn drop(&mut self) {
75 0 : let tli = self.tli.clone();
76 0 : task::spawn(async move {
77 0 : let mut shared_state = tli.write_shared_state().await;
78 0 : shared_state.wal_removal_on_hold = false;
79 0 : });
80 0 : }
81 : }
82 :
83 0 : pub async fn stream_snapshot_guts(
84 0 : tli: WalResidentTimeline,
85 0 : source: NodeId,
86 0 : destination: NodeId,
87 0 : tx: mpsc::Sender<Result<Bytes>>,
88 0 : ) -> Result<()> {
89 0 : // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
90 0 : // use SinkWriter as a Write impl. That is,
91 0 : // - create Sink from the tx. It returns PollSendError if chan is closed.
92 0 : let sink = PollSender::new(tx);
93 0 : // - SinkWriter needs sink error to be io one, map it.
94 0 : let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
95 0 : // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
96 0 : // it with with(). Note that with() accepts async function which we don't
97 0 : // need and allows the map to fail, which we don't need either, but hence
98 0 : // two Oks.
99 0 : let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
100 0 : // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
101 0 : // into CopyToBytes. This is a data copy.
102 0 : let copy_to_bytes = CopyToBytes::new(oksink);
103 0 : let mut writer = SinkWriter::new(copy_to_bytes);
104 0 : let pinned_writer = std::pin::pin!(writer);
105 0 :
106 0 : // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
107 0 : // which is also likely suboptimal.
108 0 : let mut ar = Builder::new_non_terminated(pinned_writer);
109 :
110 0 : let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
111 0 : pausable_failpoint!("sk-snapshot-after-list-pausable");
112 :
113 0 : let tli_dir = tli.get_timeline_dir();
114 0 : info!(
115 0 : "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
116 0 : bctx.upto_segno - bctx.from_segno + 1,
117 : bctx.from_segno,
118 : bctx.upto_segno,
119 : bctx.term,
120 : bctx.last_log_term,
121 : bctx.flush_lsn,
122 : );
123 0 : for segno in bctx.from_segno..=bctx.upto_segno {
124 0 : let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
125 0 : let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
126 0 : if is_partial {
127 0 : wal_file_name.push_str(".partial");
128 0 : }
129 0 : ar.append_file(&wal_file_name, &mut sf).await?;
130 : }
131 :
132 : // Do the term check before ar.finish to make archive corrupted in case of
133 : // term change. Client shouldn't ignore abrupt stream end, but to be sure.
134 0 : tli.finish_snapshot(&bctx).await?;
135 :
136 0 : ar.finish().await?;
137 :
138 0 : Ok(())
139 0 : }
140 :
141 : impl WalResidentTimeline {
142 : /// Start streaming tar archive with timeline:
143 : /// 1) stream control file under lock;
144 : /// 2) hold off WAL removal;
145 : /// 3) collect SnapshotContext to understand which WAL segments should be
146 : /// streamed.
147 : ///
148 : /// Snapshot streams data up to flush_lsn. To make this safe, we must check
149 : /// that term doesn't change during the procedure, or we risk sending mix of
150 : /// WAL from different histories. Term is remembered in the SnapshotContext
151 : /// and checked in finish_snapshot. Note that in the last segment some WAL
152 : /// higher than flush_lsn set here might be streamed; that's fine as long as
153 : /// terms doesn't change.
154 : ///
155 : /// Alternatively we could send only up to commit_lsn to get some valid
156 : /// state which later will be recovered by compute, in this case term check
157 : /// is not needed, but we likely don't want that as there might be no
158 : /// compute which could perform the recovery.
159 : ///
160 : /// When returned SnapshotContext is dropped WAL hold is removed.
161 0 : async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
162 0 : &self,
163 0 : ar: &mut tokio_tar::Builder<W>,
164 0 : source: NodeId,
165 0 : destination: NodeId,
166 0 : ) -> Result<SnapshotContext> {
167 0 : let mut shared_state = self.write_shared_state().await;
168 0 : let wal_seg_size = shared_state.get_wal_seg_size();
169 0 :
170 0 : let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
171 : // Modify the partial segment of the in-memory copy for the control file to
172 : // point to the destination safekeeper.
173 0 : let replace = control_store
174 0 : .partial_backup
175 0 : .replace_uploaded_segment(source, destination)?;
176 :
177 0 : if let Some(replace) = replace {
178 : // The deserialized control file has an uploaded partial. We upload a copy
179 : // of it to object storage for the destination safekeeper and send an updated
180 : // control file in the snapshot.
181 0 : tracing::info!(
182 0 : "Replacing uploaded partial segment in in-mem control file: {replace:?}"
183 : );
184 :
185 0 : let remote_timeline_path = &self.tli.remote_path;
186 0 : wal_backup::copy_partial_segment(
187 0 : &replace.previous.remote_path(remote_timeline_path),
188 0 : &replace.current.remote_path(remote_timeline_path),
189 0 : )
190 0 : .await?;
191 0 : }
192 :
193 0 : let buf = control_store
194 0 : .write_to_buf()
195 0 : .with_context(|| "failed to serialize control store")?;
196 0 : let mut header = Header::new_gnu();
197 0 : header.set_size(buf.len().try_into().expect("never breaches u64"));
198 0 : ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
199 0 : .await
200 0 : .with_context(|| "failed to append to archive")?;
201 :
202 : // We need to stream since the oldest segment someone (s3 or pageserver)
203 : // still needs. This duplicates calc_horizon_lsn logic.
204 : //
205 : // We know that WAL wasn't removed up to this point because it cannot be
206 : // removed further than `backup_lsn`. Since we're holding shared_state
207 : // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
208 : // won't be removed until we're done.
209 0 : let from_lsn = min(
210 0 : shared_state.sk.state().remote_consistent_lsn,
211 0 : shared_state.sk.state().backup_lsn,
212 0 : );
213 0 : if from_lsn == Lsn::INVALID {
214 : // this is possible if snapshot is called before handling first
215 : // elected message
216 0 : bail!("snapshot is called on uninitialized timeline");
217 0 : }
218 0 : let from_segno = from_lsn.segment_number(wal_seg_size);
219 0 : let term = shared_state.sk.state().acceptor_state.term;
220 0 : let last_log_term = shared_state.sk.last_log_term();
221 0 : let flush_lsn = shared_state.sk.flush_lsn();
222 0 : let upto_segno = flush_lsn.segment_number(wal_seg_size);
223 : // have some limit on max number of segments as a sanity check
224 : const MAX_ALLOWED_SEGS: u64 = 1000;
225 0 : let num_segs = upto_segno - from_segno + 1;
226 0 : if num_segs > MAX_ALLOWED_SEGS {
227 0 : bail!(
228 0 : "snapshot is called on timeline with {} segments, but the limit is {}",
229 0 : num_segs,
230 0 : MAX_ALLOWED_SEGS
231 0 : );
232 0 : }
233 0 :
234 0 : // Prevent WAL removal while we're streaming data.
235 0 : //
236 0 : // Since this a flag, not a counter just bail out if already set; we
237 0 : // shouldn't need concurrent snapshotting.
238 0 : if shared_state.wal_removal_on_hold {
239 0 : bail!("wal_removal_on_hold is already true");
240 0 : }
241 0 : shared_state.wal_removal_on_hold = true;
242 0 :
243 0 : // Drop shared_state to release the lock, before calling wal_residence_guard().
244 0 : drop(shared_state);
245 :
246 0 : let tli_copy = self.wal_residence_guard().await?;
247 0 : let bctx = SnapshotContext {
248 0 : from_segno,
249 0 : upto_segno,
250 0 : term,
251 0 : last_log_term,
252 0 : flush_lsn,
253 0 : wal_seg_size,
254 0 : tli: tli_copy,
255 0 : };
256 0 :
257 0 : Ok(bctx)
258 0 : }
259 :
260 : /// Finish snapshotting: check that term(s) hasn't changed.
261 : ///
262 : /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
263 : /// forget this if snapshotting fails mid the way.
264 0 : pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
265 0 : let shared_state = self.read_shared_state().await;
266 0 : let term = shared_state.sk.state().acceptor_state.term;
267 0 : let last_log_term = shared_state.sk.last_log_term();
268 0 : // There are some cases to relax this check (e.g. last_log_term might
269 0 : // change, but as long as older history is strictly part of new that's
270 0 : // fine), but there is no need to do it.
271 0 : if bctx.term != term || bctx.last_log_term != last_log_term {
272 0 : bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
273 0 : bctx.term, bctx.last_log_term, term, last_log_term);
274 0 : }
275 0 : Ok(())
276 0 : }
277 : }
278 :
279 : /// pull_timeline request body.
280 0 : #[derive(Debug, Deserialize)]
281 : pub struct Request {
282 : pub tenant_id: TenantId,
283 : pub timeline_id: TimelineId,
284 : pub http_hosts: Vec<String>,
285 : }
286 :
287 : #[derive(Debug, Serialize)]
288 : pub struct Response {
289 : // Donor safekeeper host
290 : pub safekeeper_host: String,
291 : // TODO: add more fields?
292 : }
293 :
294 : /// Response for debug dump request.
295 0 : #[derive(Debug, Deserialize)]
296 : pub struct DebugDumpResponse {
297 : pub start_time: DateTime<Utc>,
298 : pub finish_time: DateTime<Utc>,
299 : pub timelines: Vec<debug_dump::Timeline>,
300 : pub timelines_count: usize,
301 : pub config: debug_dump::Config,
302 : }
303 :
304 : /// Find the most advanced safekeeper and pull timeline from it.
305 0 : pub async fn handle_request(
306 0 : request: Request,
307 0 : sk_auth_token: Option<SecretString>,
308 0 : ) -> Result<Response> {
309 0 : let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
310 0 : request.tenant_id,
311 0 : request.timeline_id,
312 0 : ));
313 0 : if existing_tli.is_ok() {
314 0 : bail!("Timeline {} already exists", request.timeline_id);
315 0 : }
316 0 :
317 0 : let http_hosts = request.http_hosts.clone();
318 :
319 : // Figure out statuses of potential donors.
320 0 : let responses: Vec<Result<TimelineStatus, client::Error>> =
321 0 : futures::future::join_all(http_hosts.iter().map(|url| async {
322 0 : let cclient = Client::new(url.clone(), sk_auth_token.clone());
323 0 : let info = cclient
324 0 : .timeline_status(request.tenant_id, request.timeline_id)
325 0 : .await?;
326 0 : Ok(info)
327 0 : }))
328 0 : .await;
329 :
330 0 : let mut statuses = Vec::new();
331 0 : for (i, response) in responses.into_iter().enumerate() {
332 0 : let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
333 0 : statuses.push((status, i));
334 : }
335 :
336 : // Find the most advanced safekeeper
337 0 : let (status, i) = statuses
338 0 : .into_iter()
339 0 : .max_by_key(|(status, _)| {
340 0 : (
341 0 : status.acceptor_state.epoch,
342 0 : status.flush_lsn,
343 0 : status.commit_lsn,
344 0 : )
345 0 : })
346 0 : .unwrap();
347 0 : let safekeeper_host = http_hosts[i].clone();
348 0 :
349 0 : assert!(status.tenant_id == request.tenant_id);
350 0 : assert!(status.timeline_id == request.timeline_id);
351 :
352 0 : pull_timeline(status, safekeeper_host, sk_auth_token).await
353 0 : }
354 :
355 0 : async fn pull_timeline(
356 0 : status: TimelineStatus,
357 0 : host: String,
358 0 : sk_auth_token: Option<SecretString>,
359 0 : ) -> Result<Response> {
360 0 : let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
361 0 : info!(
362 0 : "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
363 : ttid,
364 : host,
365 : status.commit_lsn,
366 : status.flush_lsn,
367 : status.acceptor_state.term,
368 : status.acceptor_state.epoch
369 : );
370 :
371 0 : let conf = &GlobalTimelines::get_global_config();
372 :
373 0 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
374 :
375 0 : let client = Client::new(host.clone(), sk_auth_token.clone());
376 : // Request stream with basebackup archive.
377 0 : let bb_resp = client
378 0 : .snapshot(status.tenant_id, status.timeline_id, conf.my_id)
379 0 : .await?;
380 :
381 : // Make Stream of Bytes from it...
382 0 : let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
383 0 : // and turn it into StreamReader implementing AsyncRead.
384 0 : let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
385 :
386 : // Extract it on the fly to the disk. We don't use simple unpack() to fsync
387 : // files.
388 0 : let mut entries = Archive::new(bb_reader).entries()?;
389 0 : while let Some(base_tar_entry) = entries.next().await {
390 0 : let mut entry = base_tar_entry?;
391 0 : let header = entry.header();
392 0 : let file_path = header.path()?.into_owned();
393 0 : match header.entry_type() {
394 : tokio_tar::EntryType::Regular => {
395 0 : let utf8_file_path =
396 0 : Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
397 0 : let dst_path = tli_dir_path.join(utf8_file_path);
398 0 : let mut f = OpenOptions::new()
399 0 : .create(true)
400 0 : .truncate(true)
401 0 : .write(true)
402 0 : .open(&dst_path)
403 0 : .await?;
404 0 : tokio::io::copy(&mut entry, &mut f).await?;
405 : // fsync the file
406 0 : f.sync_all().await?;
407 : }
408 : _ => {
409 0 : bail!(
410 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
411 0 : file_path.display(),
412 0 : header.entry_type()
413 0 : );
414 : }
415 : }
416 : }
417 :
418 : // fsync temp timeline directory to remember its contents.
419 0 : fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
420 :
421 : // Let's create timeline from temp directory and verify that it's correct
422 0 : let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
423 0 : info!(
424 0 : "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
425 : ttid, commit_lsn, flush_lsn
426 : );
427 0 : assert!(status.commit_lsn <= status.flush_lsn);
428 :
429 : // Finally, load the timeline.
430 0 : let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?;
431 :
432 0 : Ok(Response {
433 0 : safekeeper_host: host,
434 0 : })
435 0 : }
|