Line data Source code
1 : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
2 : //! and `flush_lsn` updates.
3 : //!
4 : //! After the partial segment was updated (`flush_lsn` was changed), the segment
5 : //! will be uploaded to S3 within the configured `partial_backup_timeout`.
6 : //!
7 : //! The filename format for partial segments is
8 : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
9 : //! - `Segment` – the segment name, like `000000010000000000000001`
10 : //! - `Term` – current term
11 : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
12 : //! - `Commit` – commit_lsn in the same hex format
13 : //! - `NN` – safekeeper_id, like `1`
14 : //!
15 : //! The full object name example:
16 : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
17 : //!
18 : //! Each safekeeper will keep info about remote partial segments in its control
19 : //! file. Code updates state in the control file before doing any S3 operations.
20 : //! This way control file stores information about all potentially existing
21 : //! remote partial segments and can clean them up after uploading a newer version.
22 : use camino::Utf8PathBuf;
23 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
24 : use remote_storage::RemotePath;
25 : use serde::{Deserialize, Serialize};
26 :
27 : use tokio_util::sync::CancellationToken;
28 : use tracing::{debug, error, info, instrument, warn};
29 : use utils::{id::NodeId, lsn::Lsn};
30 :
31 : use crate::{
32 : metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
33 : rate_limit::{rand_duration, RateLimiter},
34 : safekeeper::Term,
35 : timeline::WalResidentTimeline,
36 : timeline_manager::StateSnapshot,
37 : wal_backup::{self},
38 : SafeKeeperConf,
39 : };
40 :
41 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42 : pub enum UploadStatus {
43 : /// Upload is in progress. This status should be used only for garbage collection,
44 : /// don't read data from the remote storage with this status.
45 : InProgress,
46 : /// Upload is finished. There is always at most one segment with this status.
47 : /// It means that the segment is actual and can be used.
48 : Uploaded,
49 : /// Deletion is in progress. This status should be used only for garbage collection,
50 : /// don't read data from the remote storage with this status.
51 : Deleting,
52 : }
53 :
54 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
55 : pub struct PartialRemoteSegment {
56 : pub status: UploadStatus,
57 : pub name: String,
58 : pub commit_lsn: Lsn,
59 : pub flush_lsn: Lsn,
60 : // We should use last_log_term here, otherwise it's possible to have inconsistent data in the
61 : // remote storage.
62 : //
63 : // More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
64 : pub term: Term,
65 : }
66 :
67 : impl PartialRemoteSegment {
68 0 : fn eq_without_status(&self, other: &Self) -> bool {
69 0 : self.name == other.name
70 0 : && self.commit_lsn == other.commit_lsn
71 0 : && self.flush_lsn == other.flush_lsn
72 0 : && self.term == other.term
73 0 : }
74 :
75 0 : pub(crate) fn remote_path(&self, remote_timeline_path: &RemotePath) -> RemotePath {
76 0 : remote_timeline_path.join(&self.name)
77 0 : }
78 : }
79 :
80 : // NB: these structures are a part of a control_file, you can't change them without
81 : // changing the control file format version.
82 3 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
83 : pub struct State {
84 : pub segments: Vec<PartialRemoteSegment>,
85 : }
86 :
87 : #[derive(Debug)]
88 : pub(crate) struct ReplaceUploadedSegment {
89 : pub(crate) previous: PartialRemoteSegment,
90 : pub(crate) current: PartialRemoteSegment,
91 : }
92 :
93 : impl State {
94 : /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
95 0 : pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
96 0 : self.segments
97 0 : .iter()
98 0 : .find(|seg| seg.status == UploadStatus::Uploaded)
99 0 : .cloned()
100 0 : }
101 :
102 : /// Replace the name of the Uploaded segment (if one exists) in order to match
103 : /// it with `destination` safekeeper. Returns a description of the change or None
104 : /// wrapped in anyhow::Result.
105 0 : pub(crate) fn replace_uploaded_segment(
106 0 : &mut self,
107 0 : source: NodeId,
108 0 : destination: NodeId,
109 0 : ) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
110 0 : let current = self
111 0 : .segments
112 0 : .iter_mut()
113 0 : .find(|seg| seg.status == UploadStatus::Uploaded);
114 :
115 0 : let current = match current {
116 0 : Some(some) => some,
117 : None => {
118 0 : return anyhow::Ok(None);
119 : }
120 : };
121 :
122 : // Sanity check that the partial segment we are replacing is belongs
123 : // to the `source` SK.
124 0 : if !current
125 0 : .name
126 0 : .ends_with(format!("sk{}.partial", source.0).as_str())
127 : {
128 0 : anyhow::bail!(
129 0 : "Partial segment name ({}) doesn't match self node id ({})",
130 0 : current.name,
131 0 : source
132 0 : );
133 0 : }
134 0 :
135 0 : let previous = current.clone();
136 0 :
137 0 : let new_name = current.name.replace(
138 0 : format!("_sk{}", source.0).as_str(),
139 0 : format!("_sk{}", destination.0).as_str(),
140 0 : );
141 0 :
142 0 : current.name = new_name;
143 0 :
144 0 : anyhow::Ok(Some(ReplaceUploadedSegment {
145 0 : previous,
146 0 : current: current.clone(),
147 0 : }))
148 0 : }
149 : }
150 :
151 : pub struct PartialBackup {
152 : wal_seg_size: usize,
153 : tli: WalResidentTimeline,
154 : conf: SafeKeeperConf,
155 : local_prefix: Utf8PathBuf,
156 : remote_timeline_path: RemotePath,
157 :
158 : state: State,
159 : }
160 :
161 : impl PartialBackup {
162 0 : pub async fn new(tli: WalResidentTimeline, conf: SafeKeeperConf) -> PartialBackup {
163 0 : let (_, persistent_state) = tli.get_state().await;
164 0 : let wal_seg_size = tli.get_wal_seg_size().await;
165 :
166 0 : let local_prefix = tli.get_timeline_dir();
167 0 : let remote_timeline_path = tli.remote_path.clone();
168 0 :
169 0 : PartialBackup {
170 0 : wal_seg_size,
171 0 : tli,
172 0 : state: persistent_state.partial_backup,
173 0 : conf,
174 0 : local_prefix,
175 0 : remote_timeline_path,
176 0 : }
177 0 : }
178 :
179 : // Read-only methods for getting segment names
180 0 : fn segno(&self, lsn: Lsn) -> XLogSegNo {
181 0 : lsn.segment_number(self.wal_seg_size)
182 0 : }
183 :
184 0 : fn segment_name(&self, segno: u64) -> String {
185 0 : XLogFileName(PG_TLI, segno, self.wal_seg_size)
186 0 : }
187 :
188 0 : fn remote_segment_name(
189 0 : &self,
190 0 : segno: u64,
191 0 : term: u64,
192 0 : commit_lsn: Lsn,
193 0 : flush_lsn: Lsn,
194 0 : ) -> String {
195 0 : format!(
196 0 : "{}_{}_{:016X}_{:016X}_sk{}.partial",
197 0 : self.segment_name(segno),
198 0 : term,
199 0 : flush_lsn.0,
200 0 : commit_lsn.0,
201 0 : self.conf.my_id.0,
202 0 : )
203 0 : }
204 :
205 0 : fn local_segment_name(&self, segno: u64) -> String {
206 0 : format!("{}.partial", self.segment_name(segno))
207 0 : }
208 : }
209 :
210 : impl PartialBackup {
211 : /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
212 0 : async fn prepare_upload(&self) -> PartialRemoteSegment {
213 : // this operation takes a lock to get the actual state
214 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
215 0 : let flush_lsn = Lsn(sk_info.flush_lsn);
216 0 : let commit_lsn = Lsn(sk_info.commit_lsn);
217 0 : let last_log_term = sk_info.last_log_term;
218 0 : let segno = self.segno(flush_lsn);
219 0 :
220 0 : let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);
221 0 :
222 0 : PartialRemoteSegment {
223 0 : status: UploadStatus::InProgress,
224 0 : name,
225 0 : commit_lsn,
226 0 : flush_lsn,
227 0 : term: last_log_term,
228 0 : }
229 0 : }
230 :
231 : /// Reads segment from disk and uploads it to the remote storage.
232 0 : async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
233 0 : let flush_lsn = prepared.flush_lsn;
234 0 : let segno = self.segno(flush_lsn);
235 0 :
236 0 : // We're going to backup bytes from the start of the segment up to flush_lsn.
237 0 : let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
238 0 :
239 0 : let local_path = self.local_prefix.join(self.local_segment_name(segno));
240 0 : let remote_path = prepared.remote_path(&self.remote_timeline_path);
241 0 :
242 0 : // Upload first `backup_bytes` bytes of the segment to the remote storage.
243 0 : wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
244 0 : PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
245 :
246 : // We uploaded the segment, now let's verify that the data is still actual.
247 : // If the term changed, we cannot guarantee the validity of the uploaded data.
248 : // If the term is the same, we know the data is not corrupted.
249 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
250 0 : if sk_info.last_log_term != prepared.term {
251 0 : anyhow::bail!("term changed during upload");
252 0 : }
253 0 : assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
254 0 : assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
255 :
256 0 : Ok(())
257 0 : }
258 :
259 : /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
260 0 : async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
261 0 : self.tli
262 0 : .map_control_file(|cf| {
263 0 : if cf.partial_backup != self.state {
264 0 : let memory = self.state.clone();
265 0 : self.state = cf.partial_backup.clone();
266 0 : anyhow::bail!(
267 0 : "partial backup state diverged, memory={:?}, disk={:?}",
268 0 : memory,
269 0 : cf.partial_backup
270 0 : );
271 0 : }
272 0 :
273 0 : cf.partial_backup = new_state.clone();
274 0 : Ok(())
275 0 : })
276 0 : .await?;
277 : // update in-memory state
278 0 : self.state = new_state;
279 0 : Ok(())
280 0 : }
281 :
282 : /// Upload the latest version of the partial segment and garbage collect older versions.
283 0 : #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
284 : async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
285 : let _timer = MISC_OPERATION_SECONDS
286 : .with_label_values(&["partial_do_upload"])
287 : .start_timer();
288 : info!("starting upload {:?}", prepared);
289 :
290 : let state_0 = self.state.clone();
291 : let state_1 = {
292 : let mut state = state_0.clone();
293 : state.segments.push(prepared.clone());
294 : state
295 : };
296 :
297 : // we're going to upload a new segment, let's write it to disk to make GC later
298 : self.commit_state(state_1).await?;
299 :
300 : self.upload_segment(prepared.clone()).await?;
301 :
302 : let state_2 = {
303 : let mut state = state_0.clone();
304 : for seg in state.segments.iter_mut() {
305 : seg.status = UploadStatus::Deleting;
306 : }
307 : let mut actual_remote_segment = prepared.clone();
308 : actual_remote_segment.status = UploadStatus::Uploaded;
309 : state.segments.push(actual_remote_segment);
310 : state
311 : };
312 :
313 : // we've uploaded new segment, it's actual, all other segments should be GCed
314 : self.commit_state(state_2).await?;
315 : self.gc().await?;
316 :
317 : Ok(())
318 : }
319 :
320 : // Prepend to the given segments remote prefix and delete them from the
321 : // remote storage.
322 0 : async fn delete_segments(&self, segments_to_delete: &Vec<String>) -> anyhow::Result<()> {
323 0 : info!("deleting objects: {:?}", segments_to_delete);
324 0 : let mut objects_to_delete = vec![];
325 0 : for seg in segments_to_delete.iter() {
326 0 : let remote_path = self.remote_timeline_path.join(seg);
327 0 : objects_to_delete.push(remote_path);
328 0 : }
329 0 : wal_backup::delete_objects(&objects_to_delete).await
330 0 : }
331 :
332 : /// Delete all non-Uploaded segments from the remote storage. There should be only one
333 : /// Uploaded segment at a time.
334 0 : #[instrument(name = "gc", skip_all)]
335 : async fn gc(&mut self) -> anyhow::Result<()> {
336 : let mut segments_to_delete = vec![];
337 :
338 : let new_segments: Vec<PartialRemoteSegment> = self
339 : .state
340 : .segments
341 : .iter()
342 0 : .filter_map(|seg| {
343 0 : if seg.status == UploadStatus::Uploaded {
344 0 : Some(seg.clone())
345 : } else {
346 0 : segments_to_delete.push(seg.name.clone());
347 0 : None
348 : }
349 0 : })
350 : .collect();
351 :
352 : if new_segments.len() == 1 {
353 : // we have an uploaded segment, it must not be deleted from remote storage
354 0 : segments_to_delete.retain(|name| name != &new_segments[0].name);
355 : } else {
356 : // there should always be zero or one uploaded segment
357 : assert!(
358 : new_segments.is_empty(),
359 : "too many uploaded segments: {:?}",
360 : new_segments
361 : );
362 : }
363 :
364 : // execute the deletion
365 : self.delete_segments(&segments_to_delete).await?;
366 :
367 : // now we can update the state on disk
368 : let new_state = {
369 : let mut state = self.state.clone();
370 : state.segments = new_segments;
371 : state
372 : };
373 : self.commit_state(new_state).await?;
374 :
375 : Ok(())
376 : }
377 :
378 : /// Remove uploaded segment(s) from the state and remote storage. Aimed for
379 : /// manual intervention, not normally needed.
380 : /// Returns list of segments which potentially existed in the remote storage.
381 0 : pub async fn reset(&mut self) -> anyhow::Result<Vec<String>> {
382 0 : let segments_to_delete = self
383 0 : .state
384 0 : .segments
385 0 : .iter()
386 0 : .map(|seg| seg.name.clone())
387 0 : .collect();
388 0 :
389 0 : // First reset cfile state, and only then objects themselves. If the
390 0 : // later fails we might leave some garbage behind; that's ok for this
391 0 : // single time usage.
392 0 : let new_state = State { segments: vec![] };
393 0 : self.commit_state(new_state).await?;
394 :
395 0 : self.delete_segments(&segments_to_delete).await?;
396 0 : Ok(segments_to_delete)
397 0 : }
398 : }
399 :
400 : /// Check if everything is uploaded and partial backup task doesn't need to run.
401 0 : pub(crate) fn needs_uploading(
402 0 : state: &StateSnapshot,
403 0 : uploaded: &Option<PartialRemoteSegment>,
404 0 : ) -> bool {
405 0 : match uploaded {
406 0 : Some(uploaded) => {
407 0 : uploaded.status != UploadStatus::Uploaded
408 0 : || uploaded.flush_lsn != state.flush_lsn
409 0 : || uploaded.commit_lsn != state.commit_lsn
410 0 : || uploaded.term != state.last_log_term
411 : }
412 0 : None => true,
413 : }
414 0 : }
415 :
416 : /// Main task for partial backup. It waits for the flush_lsn to change and then uploads the
417 : /// partial segment to the remote storage. It also does garbage collection of old segments.
418 : ///
419 : /// When there is nothing more to do and the last segment was successfully uploaded, the task
420 : /// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
421 0 : #[instrument(name = "partial_backup", skip_all, fields(ttid = %tli.ttid))]
422 : pub async fn main_task(
423 : tli: WalResidentTimeline,
424 : conf: SafeKeeperConf,
425 : limiter: RateLimiter,
426 : cancel: CancellationToken,
427 : ) -> Option<PartialRemoteSegment> {
428 : debug!("started");
429 : let await_duration = conf.partial_backup_timeout;
430 : let mut first_iteration = true;
431 :
432 : let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
433 : let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
434 :
435 : let mut backup = PartialBackup::new(tli, conf).await;
436 :
437 : debug!("state: {:?}", backup.state);
438 :
439 : // The general idea is that each safekeeper keeps only one partial segment
440 : // both in remote storage and in local state. If this is not true, something
441 : // went wrong.
442 : const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
443 :
444 : 'outer: loop {
445 : if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
446 : warn!(
447 : "too many segments in control_file state, running gc: {}",
448 : backup.state.segments.len()
449 : );
450 :
451 0 : backup.gc().await.unwrap_or_else(|e| {
452 0 : error!("failed to run gc: {:#}", e);
453 0 : });
454 : }
455 :
456 : // wait until we have something to upload
457 : let uploaded_segment = backup.state.uploaded_segment();
458 : if let Some(seg) = &uploaded_segment {
459 : // check if uploaded segment matches the current state
460 : if flush_lsn_rx.borrow().lsn == seg.flush_lsn
461 : && *commit_lsn_rx.borrow() == seg.commit_lsn
462 : && flush_lsn_rx.borrow().term == seg.term
463 : {
464 : // we have nothing to do, the last segment is already uploaded
465 : debug!(
466 : "exiting, uploaded up to term={} flush_lsn={} commit_lsn={}",
467 : seg.term, seg.flush_lsn, seg.commit_lsn
468 : );
469 : return Some(seg.clone());
470 : }
471 : }
472 :
473 : // if we don't have any data and zero LSNs, wait for something
474 : while flush_lsn_rx.borrow().lsn == Lsn(0) {
475 : tokio::select! {
476 : _ = backup.tli.cancel.cancelled() => {
477 : info!("timeline canceled");
478 : return None;
479 : }
480 : _ = cancel.cancelled() => {
481 : info!("task canceled");
482 : return None;
483 : }
484 : _ = flush_lsn_rx.changed() => {}
485 : }
486 : }
487 :
488 : // smoothing the load after restart, by sleeping for a random time.
489 : // if this is not the first iteration, we will wait for the full await_duration
490 : let await_duration = if first_iteration {
491 : first_iteration = false;
492 : rand_duration(&await_duration)
493 : } else {
494 : await_duration
495 : };
496 :
497 : // fixing the segno and waiting some time to prevent reuploading the same segment too often
498 : let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
499 : let timeout = tokio::time::sleep(await_duration);
500 : tokio::pin!(timeout);
501 : let mut timeout_expired = false;
502 :
503 : // waiting until timeout expires OR segno changes
504 : 'inner: loop {
505 : tokio::select! {
506 : _ = backup.tli.cancel.cancelled() => {
507 : info!("timeline canceled");
508 : return None;
509 : }
510 : _ = cancel.cancelled() => {
511 : info!("task canceled");
512 : return None;
513 : }
514 : _ = commit_lsn_rx.changed() => {}
515 : _ = flush_lsn_rx.changed() => {
516 : let segno = backup.segno(flush_lsn_rx.borrow().lsn);
517 : if segno != pending_segno {
518 : // previous segment is no longer partial, aborting the wait
519 : break 'inner;
520 : }
521 : }
522 : _ = &mut timeout => {
523 : // timeout expired, now we are ready for upload
524 : timeout_expired = true;
525 : break 'inner;
526 : }
527 : }
528 : }
529 :
530 : if !timeout_expired {
531 : // likely segno has changed, let's try again in the next iteration
532 : continue 'outer;
533 : }
534 :
535 : // limit concurrent uploads
536 : let _upload_permit = tokio::select! {
537 : acq = limiter.acquire_partial_backup() => acq,
538 : _ = cancel.cancelled() => {
539 : info!("task canceled");
540 : return None;
541 : }
542 : };
543 :
544 : let prepared = backup.prepare_upload().await;
545 : if let Some(seg) = &uploaded_segment {
546 : if seg.eq_without_status(&prepared) {
547 : // we already uploaded this segment, nothing to do
548 : continue 'outer;
549 : }
550 : }
551 :
552 : match backup.do_upload(&prepared).await {
553 : Ok(()) => {
554 : debug!(
555 : "uploaded {} up to flush_lsn {}",
556 : prepared.name, prepared.flush_lsn
557 : );
558 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
559 : }
560 : Err(e) => {
561 : info!("failed to upload {}: {:#}", prepared.name, e);
562 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
563 : }
564 : }
565 : }
566 : }
|