Line data Source code
1 : use anyhow::{Context, Result};
2 :
3 : use futures::stream::FuturesOrdered;
4 : use futures::StreamExt;
5 : use tokio::task::JoinHandle;
6 : use utils::id::NodeId;
7 :
8 : use std::cmp::min;
9 : use std::collections::HashMap;
10 : use std::path::{Path, PathBuf};
11 : use std::pin::Pin;
12 : use std::sync::Arc;
13 : use std::time::Duration;
14 :
15 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
16 : use postgres_ffi::XLogFileName;
17 : use postgres_ffi::{XLogSegNo, PG_TLI};
18 : use remote_storage::{GenericRemoteStorage, RemotePath};
19 : use tokio::fs::File;
20 :
21 : use tokio::select;
22 : use tokio::sync::mpsc::{self, Receiver, Sender};
23 : use tokio::sync::watch;
24 : use tokio::time::sleep;
25 : use tracing::*;
26 :
27 : use utils::{id::TenantTimelineId, lsn::Lsn};
28 :
29 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
30 : use crate::timeline::{PeerInfo, Timeline};
31 : use crate::{GlobalTimelines, SafeKeeperConf};
32 :
33 : use once_cell::sync::OnceCell;
34 :
35 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
36 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
37 :
38 : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
39 : /// aware of current status and return the timeline.
40 188 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
41 188 : match GlobalTimelines::get(ttid).ok() {
42 188 : Some(tli) => {
43 188 : tli.wal_backup_attend().await;
44 188 : Some(tli)
45 : }
46 0 : None => None,
47 : }
48 188 : }
49 :
50 : struct WalBackupTaskHandle {
51 : shutdown_tx: Sender<()>,
52 : handle: JoinHandle<()>,
53 : }
54 :
55 : struct WalBackupTimelineEntry {
56 : timeline: Arc<Timeline>,
57 : handle: Option<WalBackupTaskHandle>,
58 : }
59 :
60 10 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
61 10 : if let Some(wb_handle) = entry.handle.take() {
62 : // Tell the task to shutdown. Error means task exited earlier, that's ok.
63 10 : let _ = wb_handle.shutdown_tx.send(()).await;
64 : // Await the task itself. TODO: restart panicked tasks earlier.
65 10 : if let Err(e) = wb_handle.handle.await {
66 0 : warn!("WAL backup task for {} panicked: {}", ttid, e);
67 10 : }
68 0 : }
69 10 : }
70 :
71 : /// The goal is to ensure that normally only one safekeepers offloads. However,
72 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
73 : /// time we have several ones as they PUT the same files. Also,
74 : /// - frequently changing the offloader would be bad;
75 : /// - electing seriously lagging safekeeper is undesirable;
76 : /// So we deterministically choose among the reasonably caught up candidates.
77 : /// TODO: take into account failed attempts to deal with hypothetical situation
78 : /// where s3 is unreachable only for some sks.
79 244 : fn determine_offloader(
80 244 : alive_peers: &[PeerInfo],
81 244 : wal_backup_lsn: Lsn,
82 244 : ttid: TenantTimelineId,
83 244 : conf: &SafeKeeperConf,
84 244 : ) -> (Option<NodeId>, String) {
85 244 : // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
86 244 : let capable_peers = alive_peers
87 244 : .iter()
88 1680 : .filter(|p| p.local_start_lsn <= wal_backup_lsn);
89 560 : match capable_peers.clone().map(|p| p.commit_lsn).max() {
90 35 : None => (None, "no connected peers to elect from".to_string()),
91 209 : Some(max_commit_lsn) => {
92 209 : let threshold = max_commit_lsn
93 209 : .checked_sub(conf.max_offloader_lag_bytes)
94 209 : .unwrap_or(Lsn(0));
95 209 : let mut caughtup_peers = capable_peers
96 209 : .clone()
97 560 : .filter(|p| p.commit_lsn >= threshold)
98 209 : .collect::<Vec<_>>();
99 411 : caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
100 209 :
101 209 : // To distribute the load, shift by timeline_id.
102 209 : let offloader = caughtup_peers
103 209 : [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
104 209 : .sk_id;
105 209 :
106 209 : let mut capable_peers_dbg = capable_peers
107 560 : .map(|p| (p.sk_id, p.commit_lsn))
108 209 : .collect::<Vec<_>>();
109 411 : capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
110 209 : (
111 209 : Some(offloader),
112 209 : format!(
113 209 : "elected {} among {:?} peers, with {} of them being caughtup",
114 209 : offloader,
115 209 : capable_peers_dbg,
116 209 : caughtup_peers.len()
117 209 : ),
118 209 : )
119 : }
120 : }
121 244 : }
122 :
123 : /// Based on peer information determine which safekeeper should offload; if it
124 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
125 : /// is running, kill it.
126 244 : async fn update_task(
127 244 : conf: &SafeKeeperConf,
128 244 : ttid: TenantTimelineId,
129 244 : entry: &mut WalBackupTimelineEntry,
130 244 : ) {
131 244 : let alive_peers = entry.timeline.get_peers(conf).await;
132 244 : let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
133 244 : let (offloader, election_dbg_str) =
134 244 : determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
135 244 : let elected_me = Some(conf.my_id) == offloader;
136 244 :
137 244 : if elected_me != (entry.handle.is_some()) {
138 35 : if elected_me {
139 25 : info!("elected for backup {}: {}", ttid, election_dbg_str);
140 :
141 25 : let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
142 25 : let timeline_dir = conf.timeline_dir(&ttid);
143 :
144 25 : let handle = tokio::spawn(
145 25 : backup_task_main(
146 25 : ttid,
147 25 : timeline_dir,
148 25 : conf.workdir.clone(),
149 25 : conf.backup_parallel_jobs,
150 25 : shutdown_rx,
151 25 : )
152 25 : .instrument(info_span!("WAL backup task", ttid = %ttid)),
153 : );
154 :
155 25 : entry.handle = Some(WalBackupTaskHandle {
156 25 : shutdown_tx,
157 25 : handle,
158 25 : });
159 : } else {
160 10 : info!("stepping down from backup {}: {}", ttid, election_dbg_str);
161 10 : shut_down_task(ttid, entry).await;
162 : }
163 209 : }
164 244 : }
165 :
166 : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
167 :
168 : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
169 : /// tasks. Having this in separate task simplifies locking, allows to reap
170 : /// panics and separate elections from offloading itself.
171 517 : pub async fn wal_backup_launcher_task_main(
172 517 : conf: SafeKeeperConf,
173 517 : mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
174 517 : ) -> anyhow::Result<()> {
175 517 : info!(
176 517 : "WAL backup launcher started, remote config {:?}",
177 517 : conf.remote_storage
178 517 : );
179 :
180 517 : let conf_ = conf.clone();
181 517 : REMOTE_STORAGE.get_or_init(|| {
182 517 : conf_
183 517 : .remote_storage
184 517 : .as_ref()
185 517 : .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
186 517 : });
187 517 :
188 517 : // Presence in this map means launcher is aware s3 offloading is needed for
189 517 : // the timeline, but task is started only if it makes sense for to offload
190 517 : // from this safekeeper.
191 517 : let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
192 517 :
193 517 : let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
194 : loop {
195 18519 : tokio::select! {
196 11438 : ttid = wal_backup_launcher_rx.recv() => {
197 : // channel is never expected to get closed
198 : let ttid = ttid.unwrap();
199 : if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
200 : continue; /* just drain the channel and do nothing */
201 : }
202 : let timeline = is_wal_backup_required(ttid).await;
203 : // do we need to do anything at all?
204 : if timeline.is_some() != tasks.contains_key(&ttid) {
205 : if let Some(timeline) = timeline {
206 : // need to start the task
207 : let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
208 : timeline,
209 : handle: None,
210 : });
211 : update_task(&conf, ttid, entry).await;
212 : } else {
213 : // need to stop the task
214 0 : info!("stopping WAL backup task for {}", ttid);
215 : let mut entry = tasks.remove(&ttid).unwrap();
216 : shut_down_task(ttid, &mut entry).await;
217 : }
218 : }
219 : }
220 : // For each timeline needing offloading, check if this safekeeper
221 : // should do the job and start/stop the task accordingly.
222 : _ = ticker.tick() => {
223 : for (ttid, entry) in tasks.iter_mut() {
224 : update_task(&conf, *ttid, entry).await;
225 : }
226 : }
227 : }
228 : }
229 : }
230 :
231 : struct WalBackupTask {
232 : timeline: Arc<Timeline>,
233 : timeline_dir: PathBuf,
234 : workspace_dir: PathBuf,
235 : wal_seg_size: usize,
236 : parallel_jobs: usize,
237 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
238 : }
239 :
240 : /// Offload single timeline.
241 25 : async fn backup_task_main(
242 25 : ttid: TenantTimelineId,
243 25 : timeline_dir: PathBuf,
244 25 : workspace_dir: PathBuf,
245 25 : parallel_jobs: usize,
246 25 : mut shutdown_rx: Receiver<()>,
247 25 : ) {
248 25 : info!("started");
249 25 : let res = GlobalTimelines::get(ttid);
250 25 : if let Err(e) = res {
251 0 : error!("backup error for timeline {}: {}", ttid, e);
252 0 : return;
253 25 : }
254 25 : let tli = res.unwrap();
255 :
256 25 : let mut wb = WalBackupTask {
257 25 : wal_seg_size: tli.get_wal_seg_size().await,
258 25 : commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
259 25 : timeline: tli,
260 25 : timeline_dir,
261 25 : workspace_dir,
262 25 : parallel_jobs,
263 25 : };
264 25 :
265 25 : // task is spinned up only when wal_seg_size already initialized
266 25 : assert!(wb.wal_seg_size > 0);
267 :
268 25 : let mut canceled = false;
269 32354 : select! {
270 32354 : _ = wb.run() => {}
271 32354 : _ = shutdown_rx.recv() => {
272 32354 : canceled = true;
273 32354 : }
274 32354 : }
275 10 : info!("task {}", if canceled { "canceled" } else { "terminated" });
276 10 : }
277 :
278 : impl WalBackupTask {
279 25 : async fn run(&mut self) {
280 25 : let mut backup_lsn = Lsn(0);
281 25 :
282 25 : let mut retry_attempt = 0u32;
283 : // offload loop
284 2566 : loop {
285 2566 : if retry_attempt == 0 {
286 : // wait for new WAL to arrive
287 2568 : if let Err(e) = self.commit_lsn_watch_rx.changed().await {
288 : // should never happen, as we hold Arc to timeline.
289 0 : error!("commit_lsn watch shut down: {:?}", e);
290 0 : return;
291 2543 : }
292 : } else {
293 : // or just sleep if we errored previously
294 2 : let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
295 2 : if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
296 2 : {
297 2 : retry_delay = min(retry_delay, backoff_delay);
298 2 : }
299 2 : sleep(Duration::from_millis(retry_delay)).await;
300 : }
301 :
302 2543 : let commit_lsn = *self.commit_lsn_watch_rx.borrow();
303 2543 :
304 2543 : // Note that backup_lsn can be higher than commit_lsn if we
305 2543 : // don't have much local WAL and others already uploaded
306 2543 : // segments we don't even have.
307 2543 : if backup_lsn.segment_number(self.wal_seg_size)
308 2543 : >= commit_lsn.segment_number(self.wal_seg_size)
309 : {
310 2500 : retry_attempt = 0;
311 2500 : continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
312 43 : }
313 43 : // Perhaps peers advanced the position, check shmem value.
314 43 : backup_lsn = self.timeline.get_wal_backup_lsn().await;
315 43 : if backup_lsn.segment_number(self.wal_seg_size)
316 43 : >= commit_lsn.segment_number(self.wal_seg_size)
317 : {
318 17 : retry_attempt = 0;
319 17 : continue;
320 26 : }
321 26 :
322 26 : match backup_lsn_range(
323 26 : &self.timeline,
324 26 : &mut backup_lsn,
325 26 : commit_lsn,
326 26 : self.wal_seg_size,
327 26 : &self.timeline_dir,
328 26 : &self.workspace_dir,
329 26 : self.parallel_jobs,
330 26 : )
331 29750 : .await
332 : {
333 22 : Ok(()) => {
334 22 : retry_attempt = 0;
335 22 : }
336 2 : Err(e) => {
337 2 : error!(
338 2 : "failed while offloading range {}-{}: {:?}",
339 2 : backup_lsn, commit_lsn, e
340 2 : );
341 :
342 2 : retry_attempt = retry_attempt.saturating_add(1);
343 : }
344 : }
345 : }
346 0 : }
347 : }
348 :
349 26 : pub async fn backup_lsn_range(
350 26 : timeline: &Arc<Timeline>,
351 26 : backup_lsn: &mut Lsn,
352 26 : end_lsn: Lsn,
353 26 : wal_seg_size: usize,
354 26 : timeline_dir: &Path,
355 26 : workspace_dir: &Path,
356 26 : parallel_jobs: usize,
357 26 : ) -> Result<()> {
358 26 : if parallel_jobs < 1 {
359 0 : anyhow::bail!("parallel_jobs must be >= 1");
360 26 : }
361 26 :
362 26 : let start_lsn = *backup_lsn;
363 26 : let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
364 26 :
365 26 : // Pool of concurrent upload tasks. We use `FuturesOrdered` to
366 26 : // preserve order of uploads, and update `backup_lsn` only after
367 26 : // all previous uploads are finished.
368 26 : let mut uploads = FuturesOrdered::new();
369 26 : let mut iter = segments.iter();
370 :
371 : loop {
372 77 : let added_task = match iter.next() {
373 28 : Some(s) => {
374 28 : uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
375 28 : true
376 : }
377 49 : None => false,
378 : };
379 :
380 : // Wait for the next segment to upload if we don't have any more segments,
381 : // or if we have too many concurrent uploads.
382 77 : if !added_task || uploads.len() >= parallel_jobs {
383 29745 : let next = uploads.next().await;
384 47 : if let Some(res) = next {
385 : // next segment uploaded
386 25 : let segment = res?;
387 23 : let new_backup_lsn = segment.end_lsn;
388 23 : timeline
389 23 : .set_wal_backup_lsn(new_backup_lsn)
390 5 : .await
391 23 : .context("setting wal_backup_lsn")?;
392 23 : *backup_lsn = new_backup_lsn;
393 : } else {
394 : // no more segments to upload
395 22 : break;
396 : }
397 28 : }
398 : }
399 :
400 22 : info!(
401 22 : "offloaded segnos {:?} up to {}, previous backup_lsn {}",
402 23 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
403 22 : end_lsn,
404 22 : start_lsn,
405 22 : );
406 22 : Ok(())
407 24 : }
408 :
409 28 : async fn backup_single_segment(
410 28 : seg: &Segment,
411 28 : timeline_dir: &Path,
412 28 : workspace_dir: &Path,
413 28 : ) -> Result<Segment> {
414 28 : let segment_file_path = seg.file_path(timeline_dir)?;
415 28 : let remote_segment_path = segment_file_path
416 28 : .strip_prefix(workspace_dir)
417 28 : .context("Failed to strip workspace dir prefix")
418 28 : .and_then(RemotePath::new)
419 28 : .with_context(|| {
420 0 : format!(
421 0 : "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
422 0 : )
423 28 : })?;
424 :
425 26226 : let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
426 25 : if res.is_ok() {
427 23 : BACKED_UP_SEGMENTS.inc();
428 23 : } else {
429 2 : BACKUP_ERRORS.inc();
430 2 : }
431 25 : res?;
432 0 : debug!("Backup of {} done", segment_file_path.display());
433 :
434 23 : Ok(*seg)
435 25 : }
436 :
437 0 : #[derive(Debug, Copy, Clone)]
438 : pub struct Segment {
439 : seg_no: XLogSegNo,
440 : start_lsn: Lsn,
441 : end_lsn: Lsn,
442 : }
443 :
444 : impl Segment {
445 28 : pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
446 28 : Self {
447 28 : seg_no,
448 28 : start_lsn,
449 28 : end_lsn,
450 28 : }
451 28 : }
452 :
453 28 : pub fn object_name(self) -> String {
454 28 : XLogFileName(PG_TLI, self.seg_no, self.size())
455 28 : }
456 :
457 28 : pub fn file_path(self, timeline_dir: &Path) -> Result<PathBuf> {
458 28 : Ok(timeline_dir.join(self.object_name()))
459 28 : }
460 :
461 56 : pub fn size(self) -> usize {
462 56 : (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
463 56 : }
464 : }
465 :
466 26 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
467 26 : let first_seg = start.segment_number(seg_size);
468 26 : let last_seg = end.segment_number(seg_size);
469 26 :
470 26 : let res: Vec<Segment> = (first_seg..last_seg)
471 28 : .map(|s| {
472 28 : let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
473 28 : let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
474 28 : Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
475 28 : })
476 26 : .collect();
477 26 : res
478 26 : }
479 :
480 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
481 :
482 28 : async fn backup_object(source_file: &Path, target_file: &RemotePath, size: usize) -> Result<()> {
483 28 : let storage = REMOTE_STORAGE
484 28 : .get()
485 28 : .expect("failed to get remote storage")
486 28 : .as_ref()
487 28 : .unwrap();
488 :
489 28 : let file = tokio::io::BufReader::new(File::open(&source_file).await.with_context(|| {
490 0 : format!(
491 0 : "Failed to open file {} for wal backup",
492 0 : source_file.display()
493 0 : )
494 28 : })?);
495 :
496 28 : storage
497 28 : .upload_storage_object(Box::new(file), size, target_file)
498 26198 : .await
499 25 : }
500 :
501 6 : pub async fn read_object(
502 6 : file_path: &RemotePath,
503 6 : offset: u64,
504 6 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
505 6 : let storage = REMOTE_STORAGE
506 6 : .get()
507 6 : .context("Failed to get remote storage")?
508 6 : .as_ref()
509 6 : .context("No remote storage configured")?;
510 :
511 6 : info!("segment download about to start from remote path {file_path:?} at offset {offset}");
512 :
513 6 : let download = storage
514 6 : .download_storage_object(Some((offset, None)), file_path)
515 21 : .await
516 6 : .with_context(|| {
517 0 : format!("Failed to open WAL segment download stream for remote path {file_path:?}")
518 6 : })?;
519 :
520 6 : Ok(download.download_stream)
521 6 : }
|