TLA Line data Source code
1 : use anyhow::{Context, Result};
2 :
3 : use camino::{Utf8Path, Utf8PathBuf};
4 : use futures::stream::FuturesOrdered;
5 : use futures::StreamExt;
6 : use tokio::task::JoinHandle;
7 : use utils::id::NodeId;
8 :
9 : use std::cmp::min;
10 : use std::collections::HashMap;
11 : use std::pin::Pin;
12 : use std::sync::Arc;
13 : use std::time::Duration;
14 :
15 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
16 : use postgres_ffi::XLogFileName;
17 : use postgres_ffi::{XLogSegNo, PG_TLI};
18 : use remote_storage::{GenericRemoteStorage, RemotePath};
19 : use tokio::fs::File;
20 :
21 : use tokio::select;
22 : use tokio::sync::mpsc::{self, Receiver, Sender};
23 : use tokio::sync::watch;
24 : use tokio::time::sleep;
25 : use tracing::*;
26 :
27 : use utils::{id::TenantTimelineId, lsn::Lsn};
28 :
29 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
30 : use crate::timeline::{PeerInfo, Timeline};
31 : use crate::{GlobalTimelines, SafeKeeperConf};
32 :
33 : use once_cell::sync::OnceCell;
34 :
35 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
36 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
37 :
38 : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
39 : /// aware of current status and return the timeline.
40 CBC 192 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
41 192 : match GlobalTimelines::get(ttid).ok() {
42 192 : Some(tli) => {
43 192 : tli.wal_backup_attend().await;
44 192 : Some(tli)
45 : }
46 UBC 0 : None => None,
47 : }
48 CBC 192 : }
49 :
50 : struct WalBackupTaskHandle {
51 : shutdown_tx: Sender<()>,
52 : handle: JoinHandle<()>,
53 : }
54 :
55 : struct WalBackupTimelineEntry {
56 : timeline: Arc<Timeline>,
57 : handle: Option<WalBackupTaskHandle>,
58 : }
59 :
60 9 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
61 9 : if let Some(wb_handle) = entry.handle.take() {
62 : // Tell the task to shutdown. Error means task exited earlier, that's ok.
63 9 : let _ = wb_handle.shutdown_tx.send(()).await;
64 : // Await the task itself. TODO: restart panicked tasks earlier.
65 10 : if let Err(e) = wb_handle.handle.await {
66 UBC 0 : warn!("WAL backup task for {} panicked: {}", ttid, e);
67 CBC 9 : }
68 UBC 0 : }
69 CBC 9 : }
70 :
71 : /// The goal is to ensure that normally only one safekeepers offloads. However,
72 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
73 : /// time we have several ones as they PUT the same files. Also,
74 : /// - frequently changing the offloader would be bad;
75 : /// - electing seriously lagging safekeeper is undesirable;
76 : /// So we deterministically choose among the reasonably caught up candidates.
77 : /// TODO: take into account failed attempts to deal with hypothetical situation
78 : /// where s3 is unreachable only for some sks.
79 260 : fn determine_offloader(
80 260 : alive_peers: &[PeerInfo],
81 260 : wal_backup_lsn: Lsn,
82 260 : ttid: TenantTimelineId,
83 260 : conf: &SafeKeeperConf,
84 260 : ) -> (Option<NodeId>, String) {
85 260 : // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
86 260 : let capable_peers = alive_peers
87 260 : .iter()
88 1836 : .filter(|p| p.local_start_lsn <= wal_backup_lsn);
89 612 : match capable_peers.clone().map(|p| p.commit_lsn).max() {
90 37 : None => (None, "no connected peers to elect from".to_string()),
91 223 : Some(max_commit_lsn) => {
92 223 : let threshold = max_commit_lsn
93 223 : .checked_sub(conf.max_offloader_lag_bytes)
94 223 : .unwrap_or(Lsn(0));
95 223 : let mut caughtup_peers = capable_peers
96 223 : .clone()
97 612 : .filter(|p| p.commit_lsn >= threshold)
98 223 : .collect::<Vec<_>>();
99 455 : caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
100 223 :
101 223 : // To distribute the load, shift by timeline_id.
102 223 : let offloader = caughtup_peers
103 223 : [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
104 223 : .sk_id;
105 223 :
106 223 : let mut capable_peers_dbg = capable_peers
107 612 : .map(|p| (p.sk_id, p.commit_lsn))
108 223 : .collect::<Vec<_>>();
109 455 : capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
110 223 : (
111 223 : Some(offloader),
112 223 : format!(
113 223 : "elected {} among {:?} peers, with {} of them being caughtup",
114 223 : offloader,
115 223 : capable_peers_dbg,
116 223 : caughtup_peers.len()
117 223 : ),
118 223 : )
119 : }
120 : }
121 260 : }
122 :
123 : /// Based on peer information determine which safekeeper should offload; if it
124 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
125 : /// is running, kill it.
126 260 : async fn update_task(
127 260 : conf: &SafeKeeperConf,
128 260 : ttid: TenantTimelineId,
129 260 : entry: &mut WalBackupTimelineEntry,
130 260 : ) {
131 260 : let alive_peers = entry.timeline.get_peers(conf).await;
132 260 : let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
133 260 : let (offloader, election_dbg_str) =
134 260 : determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
135 260 : let elected_me = Some(conf.my_id) == offloader;
136 260 :
137 260 : if elected_me != (entry.handle.is_some()) {
138 34 : if elected_me {
139 25 : info!("elected for backup {}: {}", ttid, election_dbg_str);
140 :
141 25 : let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
142 25 : let timeline_dir = conf.timeline_dir(&ttid);
143 :
144 25 : let handle = tokio::spawn(
145 25 : backup_task_main(
146 25 : ttid,
147 25 : timeline_dir,
148 25 : conf.workdir.clone(),
149 25 : conf.backup_parallel_jobs,
150 25 : shutdown_rx,
151 25 : )
152 25 : .instrument(info_span!("WAL backup task", ttid = %ttid)),
153 : );
154 :
155 25 : entry.handle = Some(WalBackupTaskHandle {
156 25 : shutdown_tx,
157 25 : handle,
158 25 : });
159 : } else {
160 9 : info!("stepping down from backup {}: {}", ttid, election_dbg_str);
161 10 : shut_down_task(ttid, entry).await;
162 : }
163 226 : }
164 260 : }
165 :
166 : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
167 :
168 : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
169 : /// tasks. Having this in separate task simplifies locking, allows to reap
170 : /// panics and separate elections from offloading itself.
171 500 : pub async fn wal_backup_launcher_task_main(
172 500 : conf: SafeKeeperConf,
173 500 : mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
174 500 : ) -> anyhow::Result<()> {
175 500 : info!(
176 500 : "WAL backup launcher started, remote config {:?}",
177 500 : conf.remote_storage
178 500 : );
179 :
180 500 : let conf_ = conf.clone();
181 500 : REMOTE_STORAGE.get_or_init(|| {
182 500 : conf_
183 500 : .remote_storage
184 500 : .as_ref()
185 500 : .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
186 500 : });
187 500 :
188 500 : // Presence in this map means launcher is aware s3 offloading is needed for
189 500 : // the timeline, but task is started only if it makes sense for to offload
190 500 : // from this safekeeper.
191 500 : let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
192 500 :
193 500 : let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
194 : loop {
195 19305 : tokio::select! {
196 11857 : ttid = wal_backup_launcher_rx.recv() => {
197 : // channel is never expected to get closed
198 : let ttid = ttid.unwrap();
199 : if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
200 : continue; /* just drain the channel and do nothing */
201 : }
202 : let timeline = is_wal_backup_required(ttid).await;
203 : // do we need to do anything at all?
204 : if timeline.is_some() != tasks.contains_key(&ttid) {
205 : if let Some(timeline) = timeline {
206 : // need to start the task
207 : let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
208 : timeline,
209 : handle: None,
210 : });
211 : update_task(&conf, ttid, entry).await;
212 : } else {
213 : // need to stop the task
214 UBC 0 : info!("stopping WAL backup task for {}", ttid);
215 : let mut entry = tasks.remove(&ttid).unwrap();
216 : shut_down_task(ttid, &mut entry).await;
217 : }
218 : }
219 : }
220 : // For each timeline needing offloading, check if this safekeeper
221 : // should do the job and start/stop the task accordingly.
222 : _ = ticker.tick() => {
223 : for (ttid, entry) in tasks.iter_mut() {
224 : update_task(&conf, *ttid, entry).await;
225 : }
226 : }
227 : }
228 : }
229 : }
230 :
231 : struct WalBackupTask {
232 : timeline: Arc<Timeline>,
233 : timeline_dir: Utf8PathBuf,
234 : workspace_dir: Utf8PathBuf,
235 : wal_seg_size: usize,
236 : parallel_jobs: usize,
237 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
238 : }
239 :
240 : /// Offload single timeline.
241 CBC 25 : async fn backup_task_main(
242 25 : ttid: TenantTimelineId,
243 25 : timeline_dir: Utf8PathBuf,
244 25 : workspace_dir: Utf8PathBuf,
245 25 : parallel_jobs: usize,
246 25 : mut shutdown_rx: Receiver<()>,
247 25 : ) {
248 25 : info!("started");
249 25 : let res = GlobalTimelines::get(ttid);
250 25 : if let Err(e) = res {
251 UBC 0 : error!("backup error for timeline {}: {}", ttid, e);
252 0 : return;
253 CBC 25 : }
254 25 : let tli = res.unwrap();
255 :
256 25 : let mut wb = WalBackupTask {
257 25 : wal_seg_size: tli.get_wal_seg_size().await,
258 25 : commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
259 25 : timeline: tli,
260 25 : timeline_dir,
261 25 : workspace_dir,
262 25 : parallel_jobs,
263 25 : };
264 25 :
265 25 : // task is spinned up only when wal_seg_size already initialized
266 25 : assert!(wb.wal_seg_size > 0);
267 :
268 25 : let mut canceled = false;
269 21386 : select! {
270 21386 : _ = wb.run() => {}
271 21386 : _ = shutdown_rx.recv() => {
272 21386 : canceled = true;
273 21386 : }
274 21386 : }
275 9 : info!("task {}", if canceled { "canceled" } else { "terminated" });
276 9 : }
277 :
278 : impl WalBackupTask {
279 25 : async fn run(&mut self) {
280 25 : let mut backup_lsn = Lsn(0);
281 25 :
282 25 : let mut retry_attempt = 0u32;
283 : // offload loop
284 2329 : loop {
285 2329 : if retry_attempt == 0 {
286 : // wait for new WAL to arrive
287 2328 : if let Err(e) = self.commit_lsn_watch_rx.changed().await {
288 : // should never happen, as we hold Arc to timeline.
289 UBC 0 : error!("commit_lsn watch shut down: {:?}", e);
290 0 : return;
291 CBC 2304 : }
292 : } else {
293 : // or just sleep if we errored previously
294 1 : let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
295 1 : if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
296 1 : {
297 1 : retry_delay = min(retry_delay, backoff_delay);
298 1 : }
299 1 : sleep(Duration::from_millis(retry_delay)).await;
300 : }
301 :
302 2304 : let commit_lsn = *self.commit_lsn_watch_rx.borrow();
303 2304 :
304 2304 : // Note that backup_lsn can be higher than commit_lsn if we
305 2304 : // don't have much local WAL and others already uploaded
306 2304 : // segments we don't even have.
307 2304 : if backup_lsn.segment_number(self.wal_seg_size)
308 2304 : >= commit_lsn.segment_number(self.wal_seg_size)
309 : {
310 2267 : retry_attempt = 0;
311 2267 : continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
312 37 : }
313 37 : // Perhaps peers advanced the position, check shmem value.
314 37 : backup_lsn = self.timeline.get_wal_backup_lsn().await;
315 37 : if backup_lsn.segment_number(self.wal_seg_size)
316 37 : >= commit_lsn.segment_number(self.wal_seg_size)
317 : {
318 16 : retry_attempt = 0;
319 16 : continue;
320 21 : }
321 21 :
322 21 : match backup_lsn_range(
323 21 : &self.timeline,
324 21 : &mut backup_lsn,
325 21 : commit_lsn,
326 21 : self.wal_seg_size,
327 21 : &self.timeline_dir,
328 21 : &self.workspace_dir,
329 21 : self.parallel_jobs,
330 21 : )
331 19058 : .await
332 : {
333 20 : Ok(()) => {
334 20 : retry_attempt = 0;
335 20 : }
336 1 : Err(e) => {
337 1 : error!(
338 1 : "failed while offloading range {}-{}: {:?}",
339 1 : backup_lsn, commit_lsn, e
340 1 : );
341 :
342 1 : retry_attempt = retry_attempt.saturating_add(1);
343 : }
344 : }
345 : }
346 UBC 0 : }
347 : }
348 :
349 CBC 21 : pub async fn backup_lsn_range(
350 21 : timeline: &Arc<Timeline>,
351 21 : backup_lsn: &mut Lsn,
352 21 : end_lsn: Lsn,
353 21 : wal_seg_size: usize,
354 21 : timeline_dir: &Utf8Path,
355 21 : workspace_dir: &Utf8Path,
356 21 : parallel_jobs: usize,
357 21 : ) -> Result<()> {
358 21 : if parallel_jobs < 1 {
359 UBC 0 : anyhow::bail!("parallel_jobs must be >= 1");
360 CBC 21 : }
361 21 :
362 21 : let start_lsn = *backup_lsn;
363 21 : let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
364 21 :
365 21 : // Pool of concurrent upload tasks. We use `FuturesOrdered` to
366 21 : // preserve order of uploads, and update `backup_lsn` only after
367 21 : // all previous uploads are finished.
368 21 : let mut uploads = FuturesOrdered::new();
369 21 : let mut iter = segments.iter();
370 :
371 : loop {
372 62 : let added_task = match iter.next() {
373 21 : Some(s) => {
374 21 : uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
375 21 : true
376 : }
377 41 : None => false,
378 : };
379 :
380 : // Wait for the next segment to upload if we don't have any more segments,
381 : // or if we have too many concurrent uploads.
382 62 : if !added_task || uploads.len() >= parallel_jobs {
383 19058 : let next = uploads.next().await;
384 41 : if let Some(res) = next {
385 : // next segment uploaded
386 21 : let segment = res?;
387 20 : let new_backup_lsn = segment.end_lsn;
388 20 : timeline
389 20 : .set_wal_backup_lsn(new_backup_lsn)
390 LBC (6) : .await
391 CBC 20 : .context("setting wal_backup_lsn")?;
392 20 : *backup_lsn = new_backup_lsn;
393 : } else {
394 : // no more segments to upload
395 20 : break;
396 : }
397 21 : }
398 : }
399 :
400 20 : info!(
401 20 : "offloaded segnos {:?} up to {}, previous backup_lsn {}",
402 20 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
403 20 : end_lsn,
404 20 : start_lsn,
405 20 : );
406 20 : Ok(())
407 21 : }
408 :
409 21 : async fn backup_single_segment(
410 21 : seg: &Segment,
411 21 : timeline_dir: &Utf8Path,
412 21 : workspace_dir: &Utf8Path,
413 21 : ) -> Result<Segment> {
414 21 : let segment_file_path = seg.file_path(timeline_dir)?;
415 21 : let remote_segment_path = segment_file_path
416 21 : .strip_prefix(workspace_dir)
417 21 : .context("Failed to strip workspace dir prefix")
418 21 : .and_then(RemotePath::new)
419 21 : .with_context(|| {
420 UBC 0 : format!(
421 0 : "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
422 0 : )
423 CBC 21 : })?;
424 :
425 16287 : let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
426 21 : if res.is_ok() {
427 20 : BACKED_UP_SEGMENTS.inc();
428 20 : } else {
429 1 : BACKUP_ERRORS.inc();
430 1 : }
431 21 : res?;
432 UBC 0 : debug!("Backup of {} done", segment_file_path);
433 :
434 CBC 20 : Ok(*seg)
435 21 : }
436 :
437 UBC 0 : #[derive(Debug, Copy, Clone)]
438 : pub struct Segment {
439 : seg_no: XLogSegNo,
440 : start_lsn: Lsn,
441 : end_lsn: Lsn,
442 : }
443 :
444 : impl Segment {
445 CBC 21 : pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
446 21 : Self {
447 21 : seg_no,
448 21 : start_lsn,
449 21 : end_lsn,
450 21 : }
451 21 : }
452 :
453 21 : pub fn object_name(self) -> String {
454 21 : XLogFileName(PG_TLI, self.seg_no, self.size())
455 21 : }
456 :
457 21 : pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
458 21 : Ok(timeline_dir.join(self.object_name()))
459 21 : }
460 :
461 42 : pub fn size(self) -> usize {
462 42 : (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
463 42 : }
464 : }
465 :
466 21 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
467 21 : let first_seg = start.segment_number(seg_size);
468 21 : let last_seg = end.segment_number(seg_size);
469 21 :
470 21 : let res: Vec<Segment> = (first_seg..last_seg)
471 21 : .map(|s| {
472 21 : let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
473 21 : let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
474 21 : Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
475 21 : })
476 21 : .collect();
477 21 : res
478 21 : }
479 :
480 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
481 :
482 21 : async fn backup_object(
483 21 : source_file: &Utf8Path,
484 21 : target_file: &RemotePath,
485 21 : size: usize,
486 21 : ) -> Result<()> {
487 21 : let storage = REMOTE_STORAGE
488 21 : .get()
489 21 : .expect("failed to get remote storage")
490 21 : .as_ref()
491 21 : .unwrap();
492 :
493 21 : let file = tokio::io::BufReader::new(
494 21 : File::open(&source_file)
495 21 : .await
496 21 : .with_context(|| format!("Failed to open file {} for wal backup", source_file))?,
497 : );
498 :
499 21 : storage
500 21 : .upload_storage_object(Box::new(file), size, target_file)
501 16266 : .await
502 21 : }
503 :
504 6 : pub async fn read_object(
505 6 : file_path: &RemotePath,
506 6 : offset: u64,
507 6 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
508 6 : let storage = REMOTE_STORAGE
509 6 : .get()
510 6 : .context("Failed to get remote storage")?
511 6 : .as_ref()
512 6 : .context("No remote storage configured")?;
513 :
514 6 : info!("segment download about to start from remote path {file_path:?} at offset {offset}");
515 :
516 6 : let download = storage
517 6 : .download_storage_object(Some((offset, None)), file_path)
518 23 : .await
519 6 : .with_context(|| {
520 UBC 0 : format!("Failed to open WAL segment download stream for remote path {file_path:?}")
521 CBC 6 : })?;
522 :
523 6 : Ok(download.download_stream)
524 6 : }
|