Line data Source code
1 : use std::cmp::min;
2 : use std::io::{self, ErrorKind};
3 : use std::ops::RangeInclusive;
4 : use std::sync::Arc;
5 :
6 : use anyhow::{Context, Result, anyhow, bail};
7 : use bytes::Bytes;
8 : use camino::Utf8PathBuf;
9 : use chrono::{DateTime, Utc};
10 : use futures::{SinkExt, StreamExt, TryStreamExt};
11 : use http::StatusCode;
12 : use http_utils::error::ApiError;
13 : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
14 : use remote_storage::GenericRemoteStorage;
15 : use reqwest::Certificate;
16 : use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
17 : use safekeeper_api::{Term, membership};
18 : use safekeeper_client::mgmt_api;
19 : use safekeeper_client::mgmt_api::Client;
20 : use serde::Deserialize;
21 : use tokio::fs::OpenOptions;
22 : use tokio::io::AsyncWrite;
23 : use tokio::sync::mpsc;
24 : use tokio::task;
25 : use tokio::time::sleep;
26 : use tokio_tar::{Archive, Builder, Header};
27 : use tokio_util::io::{CopyToBytes, SinkWriter};
28 : use tokio_util::sync::PollSender;
29 : use tracing::{error, info, instrument, warn};
30 : use utils::crashsafe::fsync_async_opt;
31 : use utils::id::{NodeId, TenantTimelineId};
32 : use utils::logging::SecretString;
33 : use utils::lsn::Lsn;
34 : use utils::pausable_failpoint;
35 :
36 : use crate::control_file::CONTROL_FILE_NAME;
37 : use crate::state::{EvictionState, TimelinePersistentState};
38 : use crate::timeline::{Timeline, TimelineError, WalResidentTimeline};
39 : use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
40 : use crate::wal_storage::{open_wal_file, wal_file_paths};
41 : use crate::{GlobalTimelines, debug_dump, wal_backup};
42 :
43 : /// Stream tar archive of timeline to tx.
44 : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
45 : pub async fn stream_snapshot(
46 : tli: Arc<Timeline>,
47 : source: NodeId,
48 : destination: NodeId,
49 : tx: mpsc::Sender<Result<Bytes>>,
50 : storage: Option<Arc<GenericRemoteStorage>>,
51 : ) {
52 : match tli.try_wal_residence_guard().await {
53 : Err(e) => {
54 : tx.send(Err(anyhow!("Error checking residence: {:#}", e)))
55 : .await
56 : .ok();
57 : }
58 : Ok(maybe_resident_tli) => {
59 : if let Err(e) = match maybe_resident_tli {
60 : Some(resident_tli) => {
61 : stream_snapshot_resident_guts(
62 : resident_tli,
63 : source,
64 : destination,
65 : tx.clone(),
66 : storage,
67 : )
68 : .await
69 : }
70 : None => {
71 : if let Some(storage) = storage {
72 : stream_snapshot_offloaded_guts(
73 : tli,
74 : source,
75 : destination,
76 : tx.clone(),
77 : &storage,
78 : )
79 : .await
80 : } else {
81 : tx.send(Err(anyhow!("remote storage not configured")))
82 : .await
83 : .ok();
84 : return;
85 : }
86 : }
87 : } {
88 : // Error type/contents don't matter as they won't can't reach the client
89 : // (hyper likely doesn't do anything with it), but http stream will be
90 : // prematurely terminated. It would be nice to try to send the error in
91 : // trailers though.
92 : tx.send(Err(anyhow!("snapshot failed"))).await.ok();
93 : error!("snapshot failed: {:#}", e);
94 : }
95 : }
96 : }
97 : }
98 :
99 : /// State needed while streaming the snapshot.
100 : pub struct SnapshotContext {
101 : /// The interval of segment numbers. If None, the timeline hasn't had writes yet, so only send the control file
102 : pub from_to_segno: Option<RangeInclusive<XLogSegNo>>,
103 : pub term: Term,
104 : pub last_log_term: Term,
105 : pub flush_lsn: Lsn,
106 : pub wal_seg_size: usize,
107 : // used to remove WAL hold off in Drop.
108 : pub tli: WalResidentTimeline,
109 : }
110 :
111 : impl Drop for SnapshotContext {
112 0 : fn drop(&mut self) {
113 0 : let tli = self.tli.clone();
114 0 : task::spawn(async move {
115 0 : let mut shared_state = tli.write_shared_state().await;
116 0 : shared_state.wal_removal_on_hold = false;
117 0 : });
118 0 : }
119 : }
120 :
121 : /// Build a tokio_tar stream that sends encoded bytes into a Bytes channel.
122 0 : fn prepare_tar_stream(
123 0 : tx: mpsc::Sender<Result<Bytes>>,
124 0 : ) -> tokio_tar::Builder<impl AsyncWrite + Unpin + Send> {
125 : // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
126 : // use SinkWriter as a Write impl. That is,
127 : // - create Sink from the tx. It returns PollSendError if chan is closed.
128 0 : let sink = PollSender::new(tx);
129 : // - SinkWriter needs sink error to be io one, map it.
130 0 : let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
131 : // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
132 : // it with with(). Note that with() accepts async function which we don't
133 : // need and allows the map to fail, which we don't need either, but hence
134 : // two Oks.
135 0 : let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
136 : // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
137 : // into CopyToBytes. This is a data copy.
138 0 : let copy_to_bytes = CopyToBytes::new(oksink);
139 0 : let writer = SinkWriter::new(copy_to_bytes);
140 0 : let pinned_writer = Box::pin(writer);
141 :
142 : // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
143 : // which is also likely suboptimal.
144 0 : Builder::new_non_terminated(pinned_writer)
145 0 : }
146 :
147 : /// Implementation of snapshot for an offloaded timeline, only reads control file
148 0 : pub(crate) async fn stream_snapshot_offloaded_guts(
149 0 : tli: Arc<Timeline>,
150 0 : source: NodeId,
151 0 : destination: NodeId,
152 0 : tx: mpsc::Sender<Result<Bytes>>,
153 0 : storage: &GenericRemoteStorage,
154 0 : ) -> Result<()> {
155 0 : let mut ar = prepare_tar_stream(tx);
156 :
157 0 : tli.snapshot_offloaded(&mut ar, source, destination, storage)
158 0 : .await?;
159 :
160 0 : ar.finish().await?;
161 :
162 0 : Ok(())
163 0 : }
164 :
165 : /// Implementation of snapshot for a timeline which is resident (includes some segment data)
166 0 : pub async fn stream_snapshot_resident_guts(
167 0 : tli: WalResidentTimeline,
168 0 : source: NodeId,
169 0 : destination: NodeId,
170 0 : tx: mpsc::Sender<Result<Bytes>>,
171 0 : storage: Option<Arc<GenericRemoteStorage>>,
172 0 : ) -> Result<()> {
173 0 : let mut ar = prepare_tar_stream(tx);
174 :
175 0 : let bctx = tli
176 0 : .start_snapshot(&mut ar, source, destination, storage)
177 0 : .await?;
178 0 : pausable_failpoint!("sk-snapshot-after-list-pausable");
179 :
180 0 : if let Some(from_to_segno) = &bctx.from_to_segno {
181 0 : let tli_dir = tli.get_timeline_dir();
182 0 : info!(
183 0 : "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
184 0 : from_to_segno.end() - from_to_segno.start() + 1,
185 0 : from_to_segno.start(),
186 0 : from_to_segno.end(),
187 : bctx.term,
188 : bctx.last_log_term,
189 : bctx.flush_lsn,
190 : );
191 0 : for segno in from_to_segno.clone() {
192 0 : let Some((mut sf, is_partial)) =
193 0 : open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?
194 : else {
195 : // File is not found
196 0 : let (wal_file_path, _wal_file_partial_path) =
197 0 : wal_file_paths(&tli_dir, segno, bctx.wal_seg_size);
198 0 : tracing::warn!("couldn't find WAL segment file {wal_file_path}");
199 0 : bail!("couldn't find WAL segment file {wal_file_path}")
200 : };
201 0 : let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
202 0 : if is_partial {
203 0 : wal_file_name.push_str(".partial");
204 0 : }
205 0 : ar.append_file(&wal_file_name, &mut sf).await?;
206 : }
207 : } else {
208 0 : info!("Not including any segments into the snapshot");
209 : }
210 :
211 : // Do the term check before ar.finish to make archive corrupted in case of
212 : // term change. Client shouldn't ignore abrupt stream end, but to be sure.
213 0 : tli.finish_snapshot(&bctx).await?;
214 :
215 0 : ar.finish().await?;
216 :
217 0 : Ok(())
218 0 : }
219 :
220 : impl Timeline {
221 : /// Simple snapshot for an offloaded timeline: we will only upload a renamed partial segment and
222 : /// pass a modified control file into the provided tar stream (nothing with data segments on disk, since
223 : /// we are offloaded and there aren't any)
224 0 : async fn snapshot_offloaded<W: AsyncWrite + Unpin + Send>(
225 0 : self: &Arc<Timeline>,
226 0 : ar: &mut tokio_tar::Builder<W>,
227 0 : source: NodeId,
228 0 : destination: NodeId,
229 0 : storage: &GenericRemoteStorage,
230 0 : ) -> Result<()> {
231 : // Take initial copy of control file, then release state lock
232 0 : let mut control_file = {
233 0 : let shared_state = self.write_shared_state().await;
234 :
235 0 : let control_file = TimelinePersistentState::clone(shared_state.sk.state());
236 :
237 : // Rare race: we got unevicted between entering function and reading control file.
238 : // We error out and let API caller retry.
239 0 : if !matches!(control_file.eviction_state, EvictionState::Offloaded(_)) {
240 0 : bail!("Timeline was un-evicted during snapshot, please retry");
241 0 : }
242 :
243 0 : control_file
244 : };
245 :
246 : // Modify the partial segment of the in-memory copy for the control file to
247 : // point to the destination safekeeper.
248 0 : let replace = control_file
249 0 : .partial_backup
250 0 : .replace_uploaded_segment(source, destination)?;
251 :
252 0 : let Some(replace) = replace else {
253 : // In Manager:: ready_for_eviction, we do not permit eviction unless the timeline
254 : // has a partial segment. It is unexpected that
255 0 : anyhow::bail!("Timeline has no partial segment, cannot generate snapshot");
256 : };
257 :
258 0 : tracing::info!("Replacing uploaded partial segment in in-mem control file: {replace:?}");
259 :
260 : // Optimistically try to copy the partial segment to the destination's path: this
261 : // can fail if the timeline was un-evicted and modified in the background.
262 0 : let remote_timeline_path = &self.remote_path;
263 0 : wal_backup::copy_partial_segment(
264 0 : storage,
265 0 : &replace.previous.remote_path(remote_timeline_path),
266 0 : &replace.current.remote_path(remote_timeline_path),
267 0 : )
268 0 : .await?;
269 :
270 : // Since the S3 copy succeeded with the path given in our control file snapshot, and
271 : // we are sending that snapshot in our response, we are giving the caller a consistent
272 : // snapshot even if our local Timeline was unevicted or otherwise modified in the meantime.
273 0 : let buf = control_file
274 0 : .write_to_buf()
275 0 : .with_context(|| "failed to serialize control store")?;
276 0 : let mut header = Header::new_gnu();
277 0 : header.set_size(buf.len().try_into().expect("never breaches u64"));
278 0 : ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
279 0 : .await
280 0 : .with_context(|| "failed to append to archive")?;
281 :
282 0 : Ok(())
283 0 : }
284 : }
285 :
286 : impl WalResidentTimeline {
287 : /// Start streaming tar archive with timeline:
288 : /// 1) stream control file under lock;
289 : /// 2) hold off WAL removal;
290 : /// 3) collect SnapshotContext to understand which WAL segments should be
291 : /// streamed.
292 : ///
293 : /// Snapshot streams data up to flush_lsn. To make this safe, we must check
294 : /// that term doesn't change during the procedure, or we risk sending mix of
295 : /// WAL from different histories. Term is remembered in the SnapshotContext
296 : /// and checked in finish_snapshot. Note that in the last segment some WAL
297 : /// higher than flush_lsn set here might be streamed; that's fine as long as
298 : /// terms doesn't change.
299 : ///
300 : /// Alternatively we could send only up to commit_lsn to get some valid
301 : /// state which later will be recovered by compute, in this case term check
302 : /// is not needed, but we likely don't want that as there might be no
303 : /// compute which could perform the recovery.
304 : ///
305 : /// When returned SnapshotContext is dropped WAL hold is removed.
306 0 : async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
307 0 : &self,
308 0 : ar: &mut tokio_tar::Builder<W>,
309 0 : source: NodeId,
310 0 : destination: NodeId,
311 0 : storage: Option<Arc<GenericRemoteStorage>>,
312 0 : ) -> Result<SnapshotContext> {
313 0 : let mut shared_state = self.write_shared_state().await;
314 0 : let wal_seg_size = shared_state.get_wal_seg_size();
315 :
316 0 : let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
317 : // Modify the partial segment of the in-memory copy for the control file to
318 : // point to the destination safekeeper.
319 0 : let replace = control_store
320 0 : .partial_backup
321 0 : .replace_uploaded_segment(source, destination)?;
322 :
323 0 : if let Some(replace) = replace {
324 : // The deserialized control file has an uploaded partial. We upload a copy
325 : // of it to object storage for the destination safekeeper and send an updated
326 : // control file in the snapshot.
327 0 : tracing::info!(
328 0 : "Replacing uploaded partial segment in in-mem control file: {replace:?}"
329 : );
330 :
331 0 : let remote_timeline_path = &self.tli.remote_path;
332 0 : wal_backup::copy_partial_segment(
333 0 : &*storage.context("remote storage not configured")?,
334 0 : &replace.previous.remote_path(remote_timeline_path),
335 0 : &replace.current.remote_path(remote_timeline_path),
336 : )
337 0 : .await?;
338 0 : }
339 :
340 0 : let buf = control_store
341 0 : .write_to_buf()
342 0 : .with_context(|| "failed to serialize control store")?;
343 0 : let mut header = Header::new_gnu();
344 0 : header.set_size(buf.len().try_into().expect("never breaches u64"));
345 0 : ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
346 0 : .await
347 0 : .with_context(|| "failed to append to archive")?;
348 :
349 : // We need to stream since the oldest segment someone (s3 or pageserver)
350 : // still needs. This duplicates calc_horizon_lsn logic.
351 : //
352 : // We know that WAL wasn't removed up to this point because it cannot be
353 : // removed further than `backup_lsn`. Since we're holding shared_state
354 : // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
355 : // won't be removed until we're done.
356 0 : let timeline_state = shared_state.sk.state();
357 0 : let from_lsn = min(
358 0 : timeline_state.remote_consistent_lsn,
359 0 : timeline_state.backup_lsn,
360 : );
361 0 : let flush_lsn = shared_state.sk.flush_lsn();
362 0 : let (send_segments, msg) = if from_lsn == Lsn::INVALID {
363 0 : (false, "snapshot is called on uninitialized timeline")
364 : } else {
365 0 : (true, "timeline is initialized")
366 : };
367 0 : tracing::info!(
368 0 : remote_consistent_lsn=%timeline_state.remote_consistent_lsn,
369 0 : backup_lsn=%timeline_state.backup_lsn,
370 : %flush_lsn,
371 0 : "{msg}"
372 : );
373 0 : let from_segno = from_lsn.segment_number(wal_seg_size);
374 0 : let term = shared_state.sk.state().acceptor_state.term;
375 0 : let last_log_term = shared_state.sk.last_log_term();
376 0 : let upto_segno = flush_lsn.segment_number(wal_seg_size);
377 : // have some limit on max number of segments as a sanity check
378 : const MAX_ALLOWED_SEGS: u64 = 1000;
379 0 : let num_segs = upto_segno - from_segno + 1;
380 0 : if num_segs > MAX_ALLOWED_SEGS {
381 0 : bail!(
382 0 : "snapshot is called on timeline with {} segments, but the limit is {}",
383 : num_segs,
384 : MAX_ALLOWED_SEGS
385 : );
386 0 : }
387 :
388 : // Prevent WAL removal while we're streaming data.
389 : //
390 : // Since this a flag, not a counter just bail out if already set; we
391 : // shouldn't need concurrent snapshotting.
392 0 : if shared_state.wal_removal_on_hold {
393 0 : bail!("wal_removal_on_hold is already true");
394 0 : }
395 0 : shared_state.wal_removal_on_hold = true;
396 :
397 : // Drop shared_state to release the lock, before calling wal_residence_guard().
398 0 : drop(shared_state);
399 :
400 0 : let tli_copy = self.wal_residence_guard().await?;
401 0 : let from_to_segno = send_segments.then_some(from_segno..=upto_segno);
402 0 : let bctx = SnapshotContext {
403 0 : from_to_segno,
404 0 : term,
405 0 : last_log_term,
406 0 : flush_lsn,
407 0 : wal_seg_size,
408 0 : tli: tli_copy,
409 0 : };
410 :
411 0 : Ok(bctx)
412 0 : }
413 :
414 : /// Finish snapshotting: check that term(s) hasn't changed.
415 : ///
416 : /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
417 : /// forget this if snapshotting fails mid the way.
418 0 : pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
419 0 : let shared_state = self.read_shared_state().await;
420 0 : let term = shared_state.sk.state().acceptor_state.term;
421 0 : let last_log_term = shared_state.sk.last_log_term();
422 : // There are some cases to relax this check (e.g. last_log_term might
423 : // change, but as long as older history is strictly part of new that's
424 : // fine), but there is no need to do it.
425 0 : if bctx.term != term || bctx.last_log_term != last_log_term {
426 0 : bail!(
427 0 : "term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
428 : bctx.term,
429 : bctx.last_log_term,
430 : term,
431 : last_log_term
432 : );
433 0 : }
434 0 : Ok(())
435 0 : }
436 : }
437 :
438 : /// Response for debug dump request.
439 0 : #[derive(Debug, Deserialize)]
440 : pub struct DebugDumpResponse {
441 : pub start_time: DateTime<Utc>,
442 : pub finish_time: DateTime<Utc>,
443 : pub timelines: Vec<debug_dump::Timeline>,
444 : pub timelines_count: usize,
445 : pub config: debug_dump::Config,
446 : }
447 :
448 : /// Find the most advanced safekeeper and pull timeline from it.
449 0 : pub async fn handle_request(
450 0 : request: PullTimelineRequest,
451 0 : sk_auth_token: Option<SecretString>,
452 0 : ssl_ca_certs: Vec<Certificate>,
453 0 : global_timelines: Arc<GlobalTimelines>,
454 0 : wait_for_peer_timeline_status: bool,
455 0 : ) -> Result<PullTimelineResponse, ApiError> {
456 0 : if let Some(mconf) = &request.mconf {
457 0 : let sk_id = global_timelines.get_sk_id();
458 0 : if !mconf.contains(sk_id) {
459 0 : return Err(ApiError::BadRequest(anyhow!(
460 0 : "refused to pull timeline with {mconf}, node {sk_id} is not member of it",
461 0 : )));
462 0 : }
463 0 : }
464 :
465 0 : let existing_tli = global_timelines.get(TenantTimelineId::new(
466 0 : request.tenant_id,
467 0 : request.timeline_id,
468 0 : ));
469 0 : if let Ok(timeline) = existing_tli {
470 0 : let cur_generation = timeline
471 0 : .read_shared_state()
472 0 : .await
473 : .sk
474 0 : .state()
475 : .mconf
476 : .generation;
477 :
478 0 : info!(
479 0 : "Timeline {} already exists with generation {cur_generation}",
480 : request.timeline_id,
481 : );
482 :
483 0 : if let Some(mconf) = request.mconf {
484 0 : timeline
485 0 : .membership_switch(mconf)
486 0 : .await
487 0 : .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
488 0 : }
489 :
490 0 : return Ok(PullTimelineResponse {
491 0 : safekeeper_host: None,
492 0 : });
493 0 : }
494 :
495 0 : let mut http_client = reqwest::Client::builder();
496 0 : for ssl_ca_cert in ssl_ca_certs {
497 0 : http_client = http_client.add_root_certificate(ssl_ca_cert);
498 0 : }
499 0 : let http_client = http_client
500 0 : .build()
501 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
502 :
503 0 : let http_hosts = request.http_hosts.clone();
504 :
505 : // Figure out statuses of potential donors.
506 0 : let mut statuses = Vec::new();
507 0 : if !wait_for_peer_timeline_status {
508 0 : let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
509 0 : futures::future::join_all(http_hosts.iter().map(|url| async {
510 0 : let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
511 0 : let resp = cclient
512 0 : .timeline_status(request.tenant_id, request.timeline_id)
513 0 : .await?;
514 0 : let info: TimelineStatus = resp
515 0 : .json()
516 0 : .await
517 0 : .context("Failed to deserialize timeline status")
518 0 : .map_err(|e| mgmt_api::Error::ReceiveErrorBody(e.to_string()))?;
519 0 : Ok(info)
520 0 : }))
521 0 : .await;
522 :
523 0 : for (i, response) in responses.into_iter().enumerate() {
524 0 : match response {
525 0 : Ok(status) => {
526 0 : if let Some(mconf) = &request.mconf {
527 0 : if status.mconf.generation > mconf.generation {
528 : // We probably raced with another timeline membership change with higher generation.
529 : // Ignore this request.
530 0 : return Err(ApiError::Conflict(format!(
531 0 : "cannot pull timeline with generation {}: timeline {} already exists with generation {} on {}",
532 0 : mconf.generation,
533 0 : request.timeline_id,
534 0 : status.mconf.generation,
535 0 : http_hosts[i],
536 0 : )));
537 0 : }
538 0 : }
539 0 : statuses.push((status, i));
540 : }
541 0 : Err(e) => {
542 0 : info!("error fetching status from {}: {e}", http_hosts[i]);
543 : }
544 : }
545 : }
546 :
547 : // Allow missing responses from up to one safekeeper (say due to downtime)
548 : // e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
549 : // offline and C comes online. Then we want a pull on C with A and B as hosts to work.
550 0 : let min_required_successful = (http_hosts.len() - 1).max(1);
551 0 : if statuses.len() < min_required_successful {
552 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
553 0 : "only got {} successful status responses. required: {min_required_successful}",
554 0 : statuses.len()
555 0 : )));
556 0 : }
557 : } else {
558 0 : let mut retry = true;
559 : // We must get status from all other peers.
560 : // Otherwise, we may run into split-brain scenario.
561 0 : while retry {
562 0 : statuses.clear();
563 0 : retry = false;
564 0 : for (i, url) in http_hosts.iter().enumerate() {
565 0 : let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
566 0 : match cclient
567 0 : .timeline_status(request.tenant_id, request.timeline_id)
568 0 : .await
569 : {
570 0 : Ok(resp) => {
571 0 : if resp.status() == StatusCode::NOT_FOUND {
572 0 : warn!(
573 0 : "Timeline {} not found on peer SK {}, no need to pull it",
574 0 : TenantTimelineId::new(request.tenant_id, request.timeline_id),
575 : url
576 : );
577 0 : return Ok(PullTimelineResponse {
578 0 : safekeeper_host: None,
579 0 : });
580 0 : }
581 0 : let info: TimelineStatus = resp
582 0 : .json()
583 0 : .await
584 0 : .context("Failed to deserialize timeline status")
585 0 : .map_err(ApiError::InternalServerError)?;
586 0 : statuses.push((info, i));
587 : }
588 0 : Err(e) => {
589 0 : match e {
590 : // If we get a 404, it means the timeline doesn't exist on this safekeeper.
591 : // We can ignore this error.
592 0 : mgmt_api::Error::ApiError(status, _)
593 0 : if status == StatusCode::NOT_FOUND =>
594 : {
595 0 : warn!(
596 0 : "Timeline {} not found on peer SK {}, no need to pull it",
597 0 : TenantTimelineId::new(request.tenant_id, request.timeline_id),
598 : url
599 : );
600 0 : return Ok(PullTimelineResponse {
601 0 : safekeeper_host: None,
602 0 : });
603 : }
604 0 : _ => {}
605 : }
606 0 : retry = true;
607 0 : error!("Failed to get timeline status from {}: {:#}", url, e);
608 : }
609 : }
610 : }
611 0 : sleep(std::time::Duration::from_millis(100)).await;
612 : }
613 : }
614 :
615 : // Find the most advanced safekeeper
616 0 : let (status, i) = statuses
617 0 : .into_iter()
618 0 : .max_by_key(|(status, _)| {
619 0 : (
620 0 : status.acceptor_state.epoch,
621 0 : /* BEGIN_HADRON */
622 0 : // We need to pull from the SK with the highest term.
623 0 : // This is because another compute may come online and vote the same highest term again on the other two SKs.
624 0 : // Then, there will be 2 computes running on the same term.
625 0 : status.acceptor_state.term,
626 0 : /* END_HADRON */
627 0 : status.flush_lsn,
628 0 : status.commit_lsn,
629 0 : )
630 0 : })
631 0 : .unwrap();
632 0 : let safekeeper_host = http_hosts[i].clone();
633 :
634 0 : assert!(status.tenant_id == request.tenant_id);
635 0 : assert!(status.timeline_id == request.timeline_id);
636 :
637 0 : match pull_timeline(
638 0 : status,
639 0 : safekeeper_host,
640 0 : sk_auth_token,
641 0 : http_client,
642 0 : global_timelines,
643 0 : request.mconf,
644 : )
645 0 : .await
646 : {
647 0 : Ok(resp) => Ok(resp),
648 0 : Err(e) => {
649 0 : match e.downcast_ref::<TimelineError>() {
650 0 : Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
651 0 : safekeeper_host: None,
652 0 : }),
653 0 : Some(TimelineError::Deleted(_)) => Err(ApiError::Conflict(format!(
654 0 : "Timeline {}/{} deleted",
655 0 : request.tenant_id, request.timeline_id
656 0 : ))),
657 : Some(TimelineError::CreationInProgress(_)) => {
658 : // We don't return success here because creation might still fail.
659 0 : Err(ApiError::Conflict("Creation in progress".to_owned()))
660 : }
661 0 : _ => Err(ApiError::InternalServerError(e)),
662 : }
663 : }
664 : }
665 0 : }
666 :
667 0 : async fn pull_timeline(
668 0 : status: TimelineStatus,
669 0 : host: String,
670 0 : sk_auth_token: Option<SecretString>,
671 0 : http_client: reqwest::Client,
672 0 : global_timelines: Arc<GlobalTimelines>,
673 0 : mconf: Option<membership::Configuration>,
674 0 : ) -> Result<PullTimelineResponse> {
675 0 : let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
676 0 : info!(
677 0 : "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
678 : ttid,
679 : host,
680 : status.commit_lsn,
681 : status.flush_lsn,
682 : status.acceptor_state.term,
683 : status.acceptor_state.epoch
684 : );
685 :
686 0 : let conf = &global_timelines.get_global_config();
687 :
688 0 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
689 0 : let client = Client::new(http_client, host.clone(), sk_auth_token.clone());
690 : // Request stream with basebackup archive.
691 0 : let bb_resp = client
692 0 : .snapshot(status.tenant_id, status.timeline_id, conf.my_id)
693 0 : .await?;
694 :
695 : // Make Stream of Bytes from it...
696 0 : let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
697 : // and turn it into StreamReader implementing AsyncRead.
698 0 : let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
699 :
700 : // Extract it on the fly to the disk. We don't use simple unpack() to fsync
701 : // files.
702 0 : let mut entries = Archive::new(bb_reader).entries()?;
703 0 : while let Some(base_tar_entry) = entries.next().await {
704 0 : let mut entry = base_tar_entry?;
705 0 : let header = entry.header();
706 0 : let file_path = header.path()?.into_owned();
707 0 : match header.entry_type() {
708 : tokio_tar::EntryType::Regular => {
709 0 : let utf8_file_path =
710 0 : Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
711 0 : let dst_path = tli_dir_path.join(utf8_file_path);
712 0 : let mut f = OpenOptions::new()
713 0 : .create(true)
714 0 : .truncate(true)
715 0 : .write(true)
716 0 : .open(&dst_path)
717 0 : .await?;
718 0 : tokio::io::copy(&mut entry, &mut f).await?;
719 : // fsync the file
720 0 : f.sync_all().await?;
721 : }
722 : _ => {
723 0 : bail!(
724 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
725 0 : file_path.display(),
726 0 : header.entry_type()
727 : );
728 : }
729 : }
730 : }
731 :
732 : // fsync temp timeline directory to remember its contents.
733 0 : fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
734 :
735 0 : let generation = mconf.as_ref().map(|c| c.generation);
736 :
737 : // Let's create timeline from temp directory and verify that it's correct
738 0 : let (commit_lsn, flush_lsn) =
739 0 : validate_temp_timeline(conf, ttid, &tli_dir_path, generation).await?;
740 0 : info!(
741 0 : "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
742 : ttid, commit_lsn, flush_lsn
743 : );
744 0 : assert!(status.commit_lsn <= status.flush_lsn);
745 :
746 : // Finally, load the timeline.
747 0 : let timeline = global_timelines
748 0 : .load_temp_timeline(ttid, &tli_dir_path, generation)
749 0 : .await?;
750 :
751 0 : if let Some(mconf) = mconf {
752 : // Switch to provided mconf to guarantee that the timeline will not
753 : // be deleted by request with older generation.
754 : // The generation might already be higer than the one in mconf, e.g.
755 : // if another membership_switch request was executed between `load_temp_timeline`
756 : // and `membership_switch`, but that's totaly fine. `membership_switch` will
757 : // ignore switch to older generation.
758 0 : timeline.membership_switch(mconf).await?;
759 0 : }
760 :
761 0 : Ok(PullTimelineResponse {
762 0 : safekeeper_host: Some(host),
763 0 : })
764 0 : }
|