Line data Source code
1 : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
2 : //! and `flush_lsn` updates.
3 : //!
4 : //! After the partial segment was updated (`flush_lsn` was changed), the segment
5 : //! will be uploaded to S3 within the configured `partial_backup_timeout`.
6 : //!
7 : //! The filename format for partial segments is
8 : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
9 : //! - `Segment` – the segment name, like `000000010000000000000001`
10 : //! - `Term` – current term
11 : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
12 : //! - `Commit` – commit_lsn in the same hex format
13 : //! - `NN` – safekeeper_id, like `1`
14 : //!
15 : //! The full object name example:
16 : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
17 : //!
18 : //! Each safekeeper will keep info about remote partial segments in its control
19 : //! file. Code updates state in the control file before doing any S3 operations.
20 : //! This way control file stores information about all potentially existing
21 : //! remote partial segments and can clean them up after uploading a newer version.
22 : use std::sync::Arc;
23 :
24 : use camino::Utf8PathBuf;
25 : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
26 : use remote_storage::{GenericRemoteStorage, RemotePath};
27 : use safekeeper_api::Term;
28 : use serde::{Deserialize, Serialize};
29 : use tokio_util::sync::CancellationToken;
30 : use tracing::{debug, error, info, instrument, warn};
31 : use utils::id::NodeId;
32 : use utils::lsn::Lsn;
33 :
34 : use crate::SafeKeeperConf;
35 : use crate::metrics::{
36 : MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS,
37 : };
38 : use crate::rate_limit::{RateLimiter, rand_duration};
39 : use crate::timeline::WalResidentTimeline;
40 : use crate::timeline_manager::StateSnapshot;
41 : use crate::wal_backup::{self};
42 :
43 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44 : pub enum UploadStatus {
45 : /// Upload is in progress. This status should be used only for garbage collection,
46 : /// don't read data from the remote storage with this status.
47 : InProgress,
48 : /// Upload is finished. There is always at most one segment with this status.
49 : /// It means that the segment is actual and can be used.
50 : Uploaded,
51 : /// Deletion is in progress. This status should be used only for garbage collection,
52 : /// don't read data from the remote storage with this status.
53 : Deleting,
54 : }
55 :
56 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
57 : pub struct PartialRemoteSegment {
58 : pub status: UploadStatus,
59 : pub name: String,
60 : pub commit_lsn: Lsn,
61 : pub flush_lsn: Lsn,
62 : // We should use last_log_term here, otherwise it's possible to have inconsistent data in the
63 : // remote storage.
64 : //
65 : // More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
66 : pub term: Term,
67 : }
68 :
69 : impl PartialRemoteSegment {
70 0 : fn eq_without_status(&self, other: &Self) -> bool {
71 0 : self.name == other.name
72 0 : && self.commit_lsn == other.commit_lsn
73 0 : && self.flush_lsn == other.flush_lsn
74 0 : && self.term == other.term
75 0 : }
76 :
77 0 : pub(crate) fn remote_path(&self, remote_timeline_path: &RemotePath) -> RemotePath {
78 0 : remote_timeline_path.join(&self.name)
79 0 : }
80 : }
81 :
82 : // NB: these structures are a part of a control_file, you can't change them without
83 : // changing the control file format version.
84 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
85 : pub struct State {
86 : pub segments: Vec<PartialRemoteSegment>,
87 : }
88 :
89 : #[derive(Debug)]
90 : pub(crate) struct ReplaceUploadedSegment {
91 : pub(crate) previous: PartialRemoteSegment,
92 : pub(crate) current: PartialRemoteSegment,
93 : }
94 :
95 : impl State {
96 : /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
97 5 : pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
98 5 : self.segments
99 5 : .iter()
100 5 : .find(|seg| seg.status == UploadStatus::Uploaded)
101 5 : .cloned()
102 5 : }
103 :
104 : /// Replace the name of the Uploaded segment (if one exists) in order to match
105 : /// it with `destination` safekeeper. Returns a description of the change or None
106 : /// wrapped in anyhow::Result.
107 0 : pub(crate) fn replace_uploaded_segment(
108 0 : &mut self,
109 0 : source: NodeId,
110 0 : destination: NodeId,
111 0 : ) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
112 0 : let current = self
113 0 : .segments
114 0 : .iter_mut()
115 0 : .find(|seg| seg.status == UploadStatus::Uploaded);
116 :
117 0 : let current = match current {
118 0 : Some(some) => some,
119 : None => {
120 0 : return anyhow::Ok(None);
121 : }
122 : };
123 :
124 : // Sanity check that the partial segment we are replacing is belongs
125 : // to the `source` SK.
126 0 : if !current
127 0 : .name
128 0 : .ends_with(format!("sk{}.partial", source.0).as_str())
129 : {
130 0 : anyhow::bail!(
131 0 : "Partial segment name ({}) doesn't match self node id ({})",
132 0 : current.name,
133 0 : source
134 0 : );
135 0 : }
136 0 :
137 0 : let previous = current.clone();
138 0 :
139 0 : let new_name = current.name.replace(
140 0 : format!("_sk{}", source.0).as_str(),
141 0 : format!("_sk{}", destination.0).as_str(),
142 0 : );
143 0 :
144 0 : current.name = new_name;
145 0 :
146 0 : anyhow::Ok(Some(ReplaceUploadedSegment {
147 0 : previous,
148 0 : current: current.clone(),
149 0 : }))
150 0 : }
151 : }
152 :
153 : pub struct PartialBackup {
154 : wal_seg_size: usize,
155 : tli: WalResidentTimeline,
156 : conf: SafeKeeperConf,
157 : local_prefix: Utf8PathBuf,
158 : remote_timeline_path: RemotePath,
159 : storage: Arc<GenericRemoteStorage>,
160 : state: State,
161 : }
162 :
163 : impl PartialBackup {
164 0 : pub async fn new(
165 0 : tli: WalResidentTimeline,
166 0 : conf: SafeKeeperConf,
167 0 : storage: Arc<GenericRemoteStorage>,
168 0 : ) -> PartialBackup {
169 0 : let (_, persistent_state) = tli.get_state().await;
170 0 : let wal_seg_size = tli.get_wal_seg_size().await;
171 :
172 0 : let local_prefix = tli.get_timeline_dir();
173 0 : let remote_timeline_path = tli.remote_path.clone();
174 0 :
175 0 : PartialBackup {
176 0 : wal_seg_size,
177 0 : tli,
178 0 : state: persistent_state.partial_backup,
179 0 : conf,
180 0 : local_prefix,
181 0 : remote_timeline_path,
182 0 : storage,
183 0 : }
184 0 : }
185 :
186 : // Read-only methods for getting segment names
187 0 : fn segno(&self, lsn: Lsn) -> XLogSegNo {
188 0 : lsn.segment_number(self.wal_seg_size)
189 0 : }
190 :
191 0 : fn segment_name(&self, segno: u64) -> String {
192 0 : XLogFileName(PG_TLI, segno, self.wal_seg_size)
193 0 : }
194 :
195 0 : fn remote_segment_name(
196 0 : &self,
197 0 : segno: u64,
198 0 : term: u64,
199 0 : commit_lsn: Lsn,
200 0 : flush_lsn: Lsn,
201 0 : ) -> String {
202 0 : format!(
203 0 : "{}_{}_{:016X}_{:016X}_sk{}.partial",
204 0 : self.segment_name(segno),
205 0 : term,
206 0 : flush_lsn.0,
207 0 : commit_lsn.0,
208 0 : self.conf.my_id.0,
209 0 : )
210 0 : }
211 :
212 0 : fn local_segment_name(&self, segno: u64) -> String {
213 0 : format!("{}.partial", self.segment_name(segno))
214 0 : }
215 : }
216 :
217 : impl PartialBackup {
218 : /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
219 0 : async fn prepare_upload(&self) -> PartialRemoteSegment {
220 : // this operation takes a lock to get the actual state
221 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
222 0 : let flush_lsn = Lsn(sk_info.flush_lsn);
223 0 : let commit_lsn = Lsn(sk_info.commit_lsn);
224 0 : let last_log_term = sk_info.last_log_term;
225 0 : let segno = self.segno(flush_lsn);
226 0 :
227 0 : let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);
228 0 :
229 0 : PartialRemoteSegment {
230 0 : status: UploadStatus::InProgress,
231 0 : name,
232 0 : commit_lsn,
233 0 : flush_lsn,
234 0 : term: last_log_term,
235 0 : }
236 0 : }
237 :
238 : /// Reads segment from disk and uploads it to the remote storage.
239 0 : async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
240 0 : let flush_lsn = prepared.flush_lsn;
241 0 : let segno = self.segno(flush_lsn);
242 0 :
243 0 : // We're going to backup bytes from the start of the segment up to flush_lsn.
244 0 : let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
245 0 :
246 0 : let local_path = self.local_prefix.join(self.local_segment_name(segno));
247 0 : let remote_path = prepared.remote_path(&self.remote_timeline_path);
248 0 :
249 0 : // Upload first `backup_bytes` bytes of the segment to the remote storage.
250 0 : wal_backup::backup_partial_segment(&self.storage, &local_path, &remote_path, backup_bytes)
251 0 : .await?;
252 0 : PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
253 :
254 : // We uploaded the segment, now let's verify that the data is still actual.
255 : // If the term changed, we cannot guarantee the validity of the uploaded data.
256 : // If the term is the same, we know the data is not corrupted.
257 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
258 0 : if sk_info.last_log_term != prepared.term {
259 0 : anyhow::bail!("term changed during upload");
260 0 : }
261 0 : assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
262 0 : assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
263 :
264 0 : Ok(())
265 0 : }
266 :
267 : /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
268 0 : async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
269 0 : self.tli
270 0 : .map_control_file(|cf| {
271 0 : if cf.partial_backup != self.state {
272 0 : let memory = self.state.clone();
273 0 : self.state = cf.partial_backup.clone();
274 0 : anyhow::bail!(
275 0 : "partial backup state diverged, memory={:?}, disk={:?}",
276 0 : memory,
277 0 : cf.partial_backup
278 0 : );
279 0 : }
280 0 :
281 0 : cf.partial_backup = new_state.clone();
282 0 : Ok(())
283 0 : })
284 0 : .await?;
285 : // update in-memory state
286 0 : self.state = new_state;
287 0 : Ok(())
288 0 : }
289 :
290 : /// Upload the latest version of the partial segment and garbage collect older versions.
291 : #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
292 : async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
293 : let _timer = MISC_OPERATION_SECONDS
294 : .with_label_values(&["partial_do_upload"])
295 : .start_timer();
296 : info!("starting upload {:?}", prepared);
297 :
298 : let state_0 = self.state.clone();
299 : let state_1 = {
300 : let mut state = state_0.clone();
301 : state.segments.push(prepared.clone());
302 : state
303 : };
304 :
305 : // we're going to upload a new segment, let's write it to disk to make GC later
306 : self.commit_state(state_1).await?;
307 :
308 : self.upload_segment(prepared.clone()).await?;
309 :
310 : let state_2 = {
311 : let mut state = state_0.clone();
312 : for seg in state.segments.iter_mut() {
313 : seg.status = UploadStatus::Deleting;
314 : }
315 : let mut actual_remote_segment = prepared.clone();
316 : actual_remote_segment.status = UploadStatus::Uploaded;
317 : state.segments.push(actual_remote_segment);
318 : state
319 : };
320 :
321 : // we've uploaded new segment, it's actual, all other segments should be GCed
322 : self.commit_state(state_2).await?;
323 : self.gc().await?;
324 :
325 : Ok(())
326 : }
327 :
328 : // Prepend to the given segments remote prefix and delete them from the
329 : // remote storage.
330 0 : async fn delete_segments(&self, segments_to_delete: &Vec<String>) -> anyhow::Result<()> {
331 0 : info!("deleting objects: {:?}", segments_to_delete);
332 0 : let mut objects_to_delete = vec![];
333 0 : for seg in segments_to_delete.iter() {
334 0 : let remote_path = self.remote_timeline_path.join(seg);
335 0 : objects_to_delete.push(remote_path);
336 0 : }
337 0 : wal_backup::delete_objects(&self.storage, &objects_to_delete).await
338 0 : }
339 :
340 : /// Delete all non-Uploaded segments from the remote storage. There should be only one
341 : /// Uploaded segment at a time.
342 : #[instrument(name = "gc", skip_all)]
343 : async fn gc(&mut self) -> anyhow::Result<()> {
344 : let mut segments_to_delete = vec![];
345 :
346 : let new_segments: Vec<PartialRemoteSegment> = self
347 : .state
348 : .segments
349 : .iter()
350 0 : .filter_map(|seg| {
351 0 : if seg.status == UploadStatus::Uploaded {
352 0 : Some(seg.clone())
353 : } else {
354 0 : segments_to_delete.push(seg.name.clone());
355 0 : None
356 : }
357 0 : })
358 : .collect();
359 :
360 : if new_segments.len() == 1 {
361 : // we have an uploaded segment, it must not be deleted from remote storage
362 0 : segments_to_delete.retain(|name| name != &new_segments[0].name);
363 : } else {
364 : // there should always be zero or one uploaded segment
365 : assert!(
366 : new_segments.is_empty(),
367 : "too many uploaded segments: {:?}",
368 : new_segments
369 : );
370 : }
371 :
372 : // execute the deletion
373 : self.delete_segments(&segments_to_delete).await?;
374 :
375 : // now we can update the state on disk
376 : let new_state = {
377 : let mut state = self.state.clone();
378 : state.segments = new_segments;
379 : state
380 : };
381 : self.commit_state(new_state).await?;
382 :
383 : Ok(())
384 : }
385 :
386 : /// Remove uploaded segment(s) from the state and remote storage. Aimed for
387 : /// manual intervention, not normally needed.
388 : /// Returns list of segments which potentially existed in the remote storage.
389 0 : pub async fn reset(&mut self) -> anyhow::Result<Vec<String>> {
390 0 : let segments_to_delete = self
391 0 : .state
392 0 : .segments
393 0 : .iter()
394 0 : .map(|seg| seg.name.clone())
395 0 : .collect();
396 0 :
397 0 : // First reset cfile state, and only then objects themselves. If the
398 0 : // later fails we might leave some garbage behind; that's ok for this
399 0 : // single time usage.
400 0 : let new_state = State { segments: vec![] };
401 0 : self.commit_state(new_state).await?;
402 :
403 0 : self.delete_segments(&segments_to_delete).await?;
404 0 : Ok(segments_to_delete)
405 0 : }
406 : }
407 :
408 : /// Check if everything is uploaded and partial backup task doesn't need to run.
409 0 : pub(crate) fn needs_uploading(
410 0 : state: &StateSnapshot,
411 0 : uploaded: &Option<PartialRemoteSegment>,
412 0 : ) -> bool {
413 0 : match uploaded {
414 0 : Some(uploaded) => {
415 0 : uploaded.status != UploadStatus::Uploaded
416 0 : || uploaded.flush_lsn != state.flush_lsn
417 0 : || uploaded.commit_lsn != state.commit_lsn
418 0 : || uploaded.term != state.last_log_term
419 : }
420 0 : None => true,
421 : }
422 0 : }
423 :
424 : /// Main task for partial backup. It waits for the flush_lsn to change and then uploads the
425 : /// partial segment to the remote storage. It also does garbage collection of old segments.
426 : ///
427 : /// When there is nothing more to do and the last segment was successfully uploaded, the task
428 : /// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
429 : #[instrument(name = "partial_backup", skip_all, fields(ttid = %tli.ttid))]
430 : pub async fn main_task(
431 : tli: WalResidentTimeline,
432 : conf: SafeKeeperConf,
433 : limiter: RateLimiter,
434 : cancel: CancellationToken,
435 : storage: Arc<GenericRemoteStorage>,
436 : ) -> Option<PartialRemoteSegment> {
437 : debug!("started");
438 : let await_duration = conf.partial_backup_timeout;
439 : let mut first_iteration = true;
440 :
441 : let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
442 : let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
443 :
444 : let mut backup = PartialBackup::new(tli, conf, storage).await;
445 :
446 : debug!("state: {:?}", backup.state);
447 :
448 : // The general idea is that each safekeeper keeps only one partial segment
449 : // both in remote storage and in local state. If this is not true, something
450 : // went wrong.
451 : const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
452 :
453 : 'outer: loop {
454 : if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
455 : warn!(
456 : "too many segments in control_file state, running gc: {}",
457 : backup.state.segments.len()
458 : );
459 :
460 0 : backup.gc().await.unwrap_or_else(|e| {
461 0 : error!("failed to run gc: {:#}", e);
462 0 : });
463 : }
464 :
465 : // wait until we have something to upload
466 : let uploaded_segment = backup.state.uploaded_segment();
467 : if let Some(seg) = &uploaded_segment {
468 : // check if uploaded segment matches the current state
469 : if flush_lsn_rx.borrow().lsn == seg.flush_lsn
470 : && *commit_lsn_rx.borrow() == seg.commit_lsn
471 : && flush_lsn_rx.borrow().term == seg.term
472 : {
473 : // we have nothing to do, the last segment is already uploaded
474 : debug!(
475 : "exiting, uploaded up to term={} flush_lsn={} commit_lsn={}",
476 : seg.term, seg.flush_lsn, seg.commit_lsn
477 : );
478 : return Some(seg.clone());
479 : }
480 : }
481 :
482 : // if we don't have any data and zero LSNs, wait for something
483 : while flush_lsn_rx.borrow().lsn == Lsn(0) {
484 : tokio::select! {
485 : _ = backup.tli.cancel.cancelled() => {
486 : info!("timeline canceled");
487 : return None;
488 : }
489 : _ = cancel.cancelled() => {
490 : info!("task canceled");
491 : return None;
492 : }
493 : _ = flush_lsn_rx.changed() => {}
494 : }
495 : }
496 :
497 : // smoothing the load after restart, by sleeping for a random time.
498 : // if this is not the first iteration, we will wait for the full await_duration
499 : let await_duration = if first_iteration {
500 : first_iteration = false;
501 : rand_duration(&await_duration)
502 : } else {
503 : await_duration
504 : };
505 :
506 : // fixing the segno and waiting some time to prevent reuploading the same segment too often
507 : let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
508 : let timeout = tokio::time::sleep(await_duration);
509 : tokio::pin!(timeout);
510 : let mut timeout_expired = false;
511 :
512 : // waiting until timeout expires OR segno changes
513 : 'inner: loop {
514 : tokio::select! {
515 : _ = backup.tli.cancel.cancelled() => {
516 : info!("timeline canceled");
517 : return None;
518 : }
519 : _ = cancel.cancelled() => {
520 : info!("task canceled");
521 : return None;
522 : }
523 : _ = commit_lsn_rx.changed() => {}
524 : _ = flush_lsn_rx.changed() => {
525 : let segno = backup.segno(flush_lsn_rx.borrow().lsn);
526 : if segno != pending_segno {
527 : // previous segment is no longer partial, aborting the wait
528 : break 'inner;
529 : }
530 : }
531 : _ = &mut timeout => {
532 : // timeout expired, now we are ready for upload
533 : timeout_expired = true;
534 : break 'inner;
535 : }
536 : }
537 : }
538 :
539 : if !timeout_expired {
540 : // likely segno has changed, let's try again in the next iteration
541 : continue 'outer;
542 : }
543 :
544 : // limit concurrent uploads
545 : let _upload_permit = tokio::select! {
546 : acq = limiter.acquire_partial_backup() => acq,
547 : _ = backup.tli.cancel.cancelled() => {
548 : info!("timeline canceled");
549 : return None;
550 : }
551 : _ = cancel.cancelled() => {
552 : info!("task canceled");
553 : return None;
554 : }
555 : };
556 :
557 : let prepared = backup.prepare_upload().await;
558 : if let Some(seg) = &uploaded_segment {
559 : if seg.eq_without_status(&prepared) {
560 : // we already uploaded this segment, nothing to do
561 : continue 'outer;
562 : }
563 : }
564 :
565 : match backup.do_upload(&prepared).await {
566 : Ok(()) => {
567 : debug!(
568 : "uploaded {} up to flush_lsn {}",
569 : prepared.name, prepared.flush_lsn
570 : );
571 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
572 : }
573 : Err(e) => {
574 : info!("failed to upload {}: {:#}", prepared.name, e);
575 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
576 : }
577 : }
578 : }
579 : }
|