Line data Source code
1 : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
2 : //! and `flush_lsn` updates.
3 : //!
4 : //! After the partial segment was updated (`flush_lsn` was changed), the segment
5 : //! will be uploaded to S3 within the configured `partial_backup_timeout`.
6 : //!
7 : //! The filename format for partial segments is
8 : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
9 : //! - `Segment` – the segment name, like `000000010000000000000001`
10 : //! - `Term` – current term
11 : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
12 : //! - `Commit` – commit_lsn in the same hex format
13 : //! - `NN` – safekeeper_id, like `1`
14 : //!
15 : //! The full object name example:
16 : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
17 : //!
18 : //! Each safekeeper will keep info about remote partial segments in its control
19 : //! file. Code updates state in the control file before doing any S3 operations.
20 : //! This way control file stores information about all potentially existing
21 : //! remote partial segments and can clean them up after uploading a newer version.
22 : use std::sync::Arc;
23 :
24 : use camino::Utf8PathBuf;
25 : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
26 : use remote_storage::{GenericRemoteStorage, RemotePath};
27 : use safekeeper_api::Term;
28 : use serde::{Deserialize, Serialize};
29 : use tokio_util::sync::CancellationToken;
30 : use tracing::{debug, error, info, instrument, warn};
31 : use utils::id::NodeId;
32 : use utils::lsn::Lsn;
33 :
34 : use crate::SafeKeeperConf;
35 : use crate::metrics::{
36 : MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS,
37 : };
38 : use crate::rate_limit::{RateLimiter, rand_duration};
39 : use crate::timeline::WalResidentTimeline;
40 : use crate::timeline_manager::StateSnapshot;
41 : use crate::wal_backup::{self};
42 :
43 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44 : pub enum UploadStatus {
45 : /// Upload is in progress. This status should be used only for garbage collection,
46 : /// don't read data from the remote storage with this status.
47 : InProgress,
48 : /// Upload is finished. There is always at most one segment with this status.
49 : /// It means that the segment is actual and can be used.
50 : Uploaded,
51 : /// Deletion is in progress. This status should be used only for garbage collection,
52 : /// don't read data from the remote storage with this status.
53 : Deleting,
54 : }
55 :
56 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
57 : pub struct PartialRemoteSegment {
58 : pub status: UploadStatus,
59 : pub name: String,
60 : pub commit_lsn: Lsn,
61 : pub flush_lsn: Lsn,
62 : // We should use last_log_term here, otherwise it's possible to have inconsistent data in the
63 : // remote storage.
64 : //
65 : // More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
66 : pub term: Term,
67 : }
68 :
69 : impl PartialRemoteSegment {
70 0 : fn eq_without_status(&self, other: &Self) -> bool {
71 0 : self.name == other.name
72 0 : && self.commit_lsn == other.commit_lsn
73 0 : && self.flush_lsn == other.flush_lsn
74 0 : && self.term == other.term
75 0 : }
76 :
77 0 : pub(crate) fn remote_path(&self, remote_timeline_path: &RemotePath) -> RemotePath {
78 0 : remote_timeline_path.join(&self.name)
79 0 : }
80 : }
81 :
82 : // NB: these structures are a part of a control_file, you can't change them without
83 : // changing the control file format version.
84 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
85 : pub struct State {
86 : pub segments: Vec<PartialRemoteSegment>,
87 : }
88 :
89 : #[derive(Debug)]
90 : pub(crate) struct ReplaceUploadedSegment {
91 : pub(crate) previous: PartialRemoteSegment,
92 : pub(crate) current: PartialRemoteSegment,
93 : }
94 :
95 : impl State {
96 : /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
97 5 : pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
98 5 : self.segments
99 5 : .iter()
100 5 : .find(|seg| seg.status == UploadStatus::Uploaded)
101 5 : .cloned()
102 5 : }
103 :
104 : /// Replace the name of the Uploaded segment (if one exists) in order to match
105 : /// it with `destination` safekeeper. Returns a description of the change or None
106 : /// wrapped in anyhow::Result.
107 0 : pub(crate) fn replace_uploaded_segment(
108 0 : &mut self,
109 0 : source: NodeId,
110 0 : destination: NodeId,
111 0 : ) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
112 0 : let current = self
113 0 : .segments
114 0 : .iter_mut()
115 0 : .find(|seg| seg.status == UploadStatus::Uploaded);
116 :
117 0 : let current = match current {
118 0 : Some(some) => some,
119 : None => {
120 0 : return anyhow::Ok(None);
121 : }
122 : };
123 :
124 : // Sanity check that the partial segment we are replacing is belongs
125 : // to the `source` SK.
126 0 : if !current
127 0 : .name
128 0 : .ends_with(format!("sk{}.partial", source.0).as_str())
129 : {
130 0 : anyhow::bail!(
131 0 : "Partial segment name ({}) doesn't match self node id ({})",
132 : current.name,
133 : source
134 : );
135 0 : }
136 :
137 0 : let previous = current.clone();
138 :
139 0 : let new_name = current.name.replace(
140 0 : format!("_sk{}", source.0).as_str(),
141 0 : format!("_sk{}", destination.0).as_str(),
142 0 : );
143 :
144 0 : current.name = new_name;
145 :
146 0 : anyhow::Ok(Some(ReplaceUploadedSegment {
147 0 : previous,
148 0 : current: current.clone(),
149 0 : }))
150 0 : }
151 : }
152 :
153 : pub struct PartialBackup {
154 : wal_seg_size: usize,
155 : tli: WalResidentTimeline,
156 : conf: SafeKeeperConf,
157 : local_prefix: Utf8PathBuf,
158 : remote_timeline_path: RemotePath,
159 : storage: Arc<GenericRemoteStorage>,
160 : state: State,
161 : }
162 :
163 : impl PartialBackup {
164 0 : pub async fn new(
165 0 : tli: WalResidentTimeline,
166 0 : conf: SafeKeeperConf,
167 0 : storage: Arc<GenericRemoteStorage>,
168 0 : ) -> PartialBackup {
169 0 : let (_, persistent_state) = tli.get_state().await;
170 0 : let wal_seg_size = tli.get_wal_seg_size().await;
171 :
172 0 : let local_prefix = tli.get_timeline_dir();
173 0 : let remote_timeline_path = tli.remote_path.clone();
174 :
175 0 : PartialBackup {
176 0 : wal_seg_size,
177 0 : tli,
178 0 : state: persistent_state.partial_backup,
179 0 : conf,
180 0 : local_prefix,
181 0 : remote_timeline_path,
182 0 : storage,
183 0 : }
184 0 : }
185 :
186 : // Read-only methods for getting segment names
187 0 : fn segno(&self, lsn: Lsn) -> XLogSegNo {
188 0 : lsn.segment_number(self.wal_seg_size)
189 0 : }
190 :
191 0 : fn segment_name(&self, segno: u64) -> String {
192 0 : XLogFileName(PG_TLI, segno, self.wal_seg_size)
193 0 : }
194 :
195 0 : fn remote_segment_name(
196 0 : &self,
197 0 : segno: u64,
198 0 : term: u64,
199 0 : commit_lsn: Lsn,
200 0 : flush_lsn: Lsn,
201 0 : ) -> String {
202 0 : format!(
203 0 : "{}_{}_{:016X}_{:016X}_sk{}.partial",
204 0 : self.segment_name(segno),
205 : term,
206 : flush_lsn.0,
207 : commit_lsn.0,
208 : self.conf.my_id.0,
209 : )
210 0 : }
211 :
212 0 : fn local_segment_name(&self, segno: u64) -> String {
213 0 : format!("{}.partial", self.segment_name(segno))
214 0 : }
215 : }
216 :
217 : impl PartialBackup {
218 : /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
219 0 : async fn prepare_upload(&self) -> PartialRemoteSegment {
220 : // this operation takes a lock to get the actual state
221 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
222 0 : let flush_lsn = Lsn(sk_info.flush_lsn);
223 0 : let commit_lsn = Lsn(sk_info.commit_lsn);
224 0 : let last_log_term = sk_info.last_log_term;
225 0 : let segno = self.segno(flush_lsn);
226 :
227 0 : let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);
228 :
229 0 : PartialRemoteSegment {
230 0 : status: UploadStatus::InProgress,
231 0 : name,
232 0 : commit_lsn,
233 0 : flush_lsn,
234 0 : term: last_log_term,
235 0 : }
236 0 : }
237 :
238 : /// Reads segment from disk and uploads it to the remote storage.
239 0 : async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
240 0 : let flush_lsn = prepared.flush_lsn;
241 0 : let segno = self.segno(flush_lsn);
242 :
243 : // We're going to backup bytes from the start of the segment up to flush_lsn.
244 0 : let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
245 :
246 0 : let local_path = self.local_prefix.join(self.local_segment_name(segno));
247 0 : let remote_path = prepared.remote_path(&self.remote_timeline_path);
248 :
249 : // Upload first `backup_bytes` bytes of the segment to the remote storage.
250 0 : wal_backup::backup_partial_segment(&self.storage, &local_path, &remote_path, backup_bytes)
251 0 : .await?;
252 0 : PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
253 :
254 : // We uploaded the segment, now let's verify that the data is still actual.
255 : // If the term changed, we cannot guarantee the validity of the uploaded data.
256 : // If the term is the same, we know the data is not corrupted.
257 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
258 0 : if sk_info.last_log_term != prepared.term {
259 0 : anyhow::bail!("term changed during upload");
260 0 : }
261 0 : assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
262 0 : assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
263 :
264 0 : Ok(())
265 0 : }
266 :
267 : /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
268 0 : async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
269 0 : self.tli
270 0 : .map_control_file(|cf| {
271 0 : if cf.partial_backup != self.state {
272 0 : let memory = self.state.clone();
273 0 : self.state = cf.partial_backup.clone();
274 0 : anyhow::bail!(
275 0 : "partial backup state diverged, memory={:?}, disk={:?}",
276 : memory,
277 : cf.partial_backup
278 : );
279 0 : }
280 :
281 0 : cf.partial_backup = new_state.clone();
282 0 : Ok(())
283 0 : })
284 0 : .await?;
285 : // update in-memory state
286 0 : self.state = new_state;
287 0 : Ok(())
288 0 : }
289 :
290 : /// Upload the latest version of the partial segment and garbage collect older versions.
291 : #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
292 : async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
293 : let _timer = MISC_OPERATION_SECONDS
294 : .with_label_values(&["partial_do_upload"])
295 : .start_timer();
296 : info!("starting upload {:?}", prepared);
297 :
298 : let state_0 = self.state.clone();
299 : let state_1 = {
300 : let mut state = state_0.clone();
301 : state.segments.push(prepared.clone());
302 : state
303 : };
304 :
305 : // we're going to upload a new segment, let's write it to disk to make GC later
306 : self.commit_state(state_1).await?;
307 :
308 : self.upload_segment(prepared.clone()).await?;
309 :
310 : let state_2 = {
311 : let mut state = state_0.clone();
312 : for seg in state.segments.iter_mut() {
313 : seg.status = UploadStatus::Deleting;
314 : }
315 : let mut actual_remote_segment = prepared.clone();
316 : actual_remote_segment.status = UploadStatus::Uploaded;
317 : state.segments.push(actual_remote_segment);
318 : state
319 : };
320 :
321 : // we've uploaded new segment, it's actual, all other segments should be GCed
322 : self.commit_state(state_2).await?;
323 : self.gc().await?;
324 :
325 : Ok(())
326 : }
327 :
328 : // Prepend to the given segments remote prefix and delete them from the
329 : // remote storage.
330 0 : async fn delete_segments(&self, segments_to_delete: &Vec<String>) -> anyhow::Result<()> {
331 0 : info!("deleting objects: {:?}", segments_to_delete);
332 0 : let mut objects_to_delete = vec![];
333 0 : for seg in segments_to_delete.iter() {
334 0 : let remote_path = self.remote_timeline_path.join(seg);
335 0 : objects_to_delete.push(remote_path);
336 0 : }
337 0 : wal_backup::delete_objects(&self.storage, &objects_to_delete).await
338 0 : }
339 :
340 : /// Delete all non-Uploaded segments from the remote storage. There should be only one
341 : /// Uploaded segment at a time.
342 : #[instrument(name = "gc", skip_all)]
343 : async fn gc(&mut self) -> anyhow::Result<()> {
344 : let mut segments_to_delete = vec![];
345 :
346 : let new_segments: Vec<PartialRemoteSegment> = self
347 : .state
348 : .segments
349 : .iter()
350 0 : .filter_map(|seg| {
351 0 : if seg.status == UploadStatus::Uploaded {
352 0 : Some(seg.clone())
353 : } else {
354 0 : segments_to_delete.push(seg.name.clone());
355 0 : None
356 : }
357 0 : })
358 : .collect();
359 :
360 : if new_segments.len() == 1 {
361 : // we have an uploaded segment, it must not be deleted from remote storage
362 0 : segments_to_delete.retain(|name| name != &new_segments[0].name);
363 : } else {
364 : // there should always be zero or one uploaded segment
365 : assert!(
366 : new_segments.is_empty(),
367 : "too many uploaded segments: {new_segments:?}"
368 : );
369 : }
370 :
371 : // execute the deletion
372 : self.delete_segments(&segments_to_delete).await?;
373 :
374 : // now we can update the state on disk
375 : let new_state = {
376 : let mut state = self.state.clone();
377 : state.segments = new_segments;
378 : state
379 : };
380 : self.commit_state(new_state).await?;
381 :
382 : Ok(())
383 : }
384 :
385 : /// Remove uploaded segment(s) from the state and remote storage. Aimed for
386 : /// manual intervention, not normally needed.
387 : /// Returns list of segments which potentially existed in the remote storage.
388 0 : pub async fn reset(&mut self) -> anyhow::Result<Vec<String>> {
389 0 : let segments_to_delete = self
390 0 : .state
391 0 : .segments
392 0 : .iter()
393 0 : .map(|seg| seg.name.clone())
394 0 : .collect();
395 :
396 : // First reset cfile state, and only then objects themselves. If the
397 : // later fails we might leave some garbage behind; that's ok for this
398 : // single time usage.
399 0 : let new_state = State { segments: vec![] };
400 0 : self.commit_state(new_state).await?;
401 :
402 0 : self.delete_segments(&segments_to_delete).await?;
403 0 : Ok(segments_to_delete)
404 0 : }
405 : }
406 :
407 : /// Check if everything is uploaded and partial backup task doesn't need to run.
408 0 : pub(crate) fn needs_uploading(
409 0 : state: &StateSnapshot,
410 0 : uploaded: &Option<PartialRemoteSegment>,
411 0 : ) -> bool {
412 0 : match uploaded {
413 0 : Some(uploaded) => {
414 0 : uploaded.status != UploadStatus::Uploaded
415 0 : || uploaded.flush_lsn != state.flush_lsn
416 0 : || uploaded.commit_lsn != state.commit_lsn
417 0 : || uploaded.term != state.last_log_term
418 : }
419 0 : None => true,
420 : }
421 0 : }
422 :
423 : /// Main task for partial backup. It waits for the flush_lsn to change and then uploads the
424 : /// partial segment to the remote storage. It also does garbage collection of old segments.
425 : ///
426 : /// When there is nothing more to do and the last segment was successfully uploaded, the task
427 : /// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
428 : #[instrument(name = "partial_backup", skip_all, fields(ttid = %tli.ttid))]
429 : pub async fn main_task(
430 : tli: WalResidentTimeline,
431 : conf: SafeKeeperConf,
432 : limiter: RateLimiter,
433 : cancel: CancellationToken,
434 : storage: Arc<GenericRemoteStorage>,
435 : ) -> Option<PartialRemoteSegment> {
436 : debug!("started");
437 : let await_duration = conf.partial_backup_timeout;
438 : let mut first_iteration = true;
439 :
440 : let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
441 : let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
442 :
443 : let mut backup = PartialBackup::new(tli, conf, storage).await;
444 :
445 : debug!("state: {:?}", backup.state);
446 :
447 : // The general idea is that each safekeeper keeps only one partial segment
448 : // both in remote storage and in local state. If this is not true, something
449 : // went wrong.
450 : const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
451 :
452 : 'outer: loop {
453 : if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
454 : warn!(
455 : "too many segments in control_file state, running gc: {}",
456 : backup.state.segments.len()
457 : );
458 :
459 0 : backup.gc().await.unwrap_or_else(|e| {
460 0 : error!("failed to run gc: {:#}", e);
461 0 : });
462 : }
463 :
464 : // wait until we have something to upload
465 : let uploaded_segment = backup.state.uploaded_segment();
466 : if let Some(seg) = &uploaded_segment {
467 : // check if uploaded segment matches the current state
468 : if flush_lsn_rx.borrow().lsn == seg.flush_lsn
469 : && *commit_lsn_rx.borrow() == seg.commit_lsn
470 : && flush_lsn_rx.borrow().term == seg.term
471 : {
472 : // we have nothing to do, the last segment is already uploaded
473 : debug!(
474 : "exiting, uploaded up to term={} flush_lsn={} commit_lsn={}",
475 : seg.term, seg.flush_lsn, seg.commit_lsn
476 : );
477 : return Some(seg.clone());
478 : }
479 : }
480 :
481 : // if we don't have any data and zero LSNs, wait for something
482 : while flush_lsn_rx.borrow().lsn == Lsn(0) {
483 : tokio::select! {
484 : _ = backup.tli.cancel.cancelled() => {
485 : info!("timeline canceled");
486 : return None;
487 : }
488 : _ = cancel.cancelled() => {
489 : info!("task canceled");
490 : return None;
491 : }
492 : _ = flush_lsn_rx.changed() => {}
493 : }
494 : }
495 :
496 : // smoothing the load after restart, by sleeping for a random time.
497 : // if this is not the first iteration, we will wait for the full await_duration
498 : let await_duration = if first_iteration {
499 : first_iteration = false;
500 : rand_duration(&await_duration)
501 : } else {
502 : await_duration
503 : };
504 :
505 : // fixing the segno and waiting some time to prevent reuploading the same segment too often
506 : let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
507 : let timeout = tokio::time::sleep(await_duration);
508 : tokio::pin!(timeout);
509 : let mut timeout_expired = false;
510 :
511 : // waiting until timeout expires OR segno changes
512 : 'inner: loop {
513 : tokio::select! {
514 : _ = backup.tli.cancel.cancelled() => {
515 : info!("timeline canceled");
516 : return None;
517 : }
518 : _ = cancel.cancelled() => {
519 : info!("task canceled");
520 : return None;
521 : }
522 : _ = commit_lsn_rx.changed() => {}
523 : _ = flush_lsn_rx.changed() => {
524 : let segno = backup.segno(flush_lsn_rx.borrow().lsn);
525 : if segno != pending_segno {
526 : // previous segment is no longer partial, aborting the wait
527 : break 'inner;
528 : }
529 : }
530 : _ = &mut timeout => {
531 : // timeout expired, now we are ready for upload
532 : timeout_expired = true;
533 : break 'inner;
534 : }
535 : }
536 : }
537 :
538 : if !timeout_expired {
539 : // likely segno has changed, let's try again in the next iteration
540 : continue 'outer;
541 : }
542 :
543 : // limit concurrent uploads
544 : let _upload_permit = tokio::select! {
545 : acq = limiter.acquire_partial_backup() => acq,
546 : _ = backup.tli.cancel.cancelled() => {
547 : info!("timeline canceled");
548 : return None;
549 : }
550 : _ = cancel.cancelled() => {
551 : info!("task canceled");
552 : return None;
553 : }
554 : };
555 :
556 : let prepared = backup.prepare_upload().await;
557 : if let Some(seg) = &uploaded_segment {
558 : if seg.eq_without_status(&prepared) {
559 : // we already uploaded this segment, nothing to do
560 : continue 'outer;
561 : }
562 : }
563 :
564 : match backup.do_upload(&prepared).await {
565 : Ok(()) => {
566 : debug!(
567 : "uploaded {} up to flush_lsn {}",
568 : prepared.name, prepared.flush_lsn
569 : );
570 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
571 : }
572 : Err(e) => {
573 : info!("failed to upload {}: {:#}", prepared.name, e);
574 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
575 : }
576 : }
577 : }
578 : }
|